7 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/40d8d9e3ebd5/ Changeset: 40d8d9e3ebd5 User: jmchilton Date: 2014-08-18 00:43:54 Summary: Dynamic destinations - extend RuleHelper for reasoning about groups of destinations... If multiple destinations map to the same underlying resource (a very typical case), this could allow rule developer to reason about cluster as a whole - though perhaps clunkily - by supplying all destination ids mapping to that cluster. Affected #: 2 files diff -r 572a8d5ee218aeb37f77660948cb75a439936868 -r 40d8d9e3ebd57820aac97115950818d97019ba17 lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -58,16 +58,23 @@ query, for_user_email=None, for_destination=None, + for_destinations=None, for_job_states=None, created_in_last=None, updated_in_last=None, ): + if for_destination is not None: + for_destinations = [ for_destination ] + query = query.join( model.User ) if for_user_email is not None: query = query.filter( model.User.table.c.email == for_user_email ) - if for_destination is not None: - query = query.filter( model.Job.table.c.destination_id == for_destination ) + if for_destinations is not None: + if len( for_destinations ) == 1: + query = query.filter( model.Job.table.c.destination_id == for_destinations[ 0 ] ) + else: + query = query.filter( model.Job.table.c.destination_id.in_( for_destinations ) ) if created_in_last is not None: end_date = datetime.now() diff -r 572a8d5ee218aeb37f77660948cb75a439936868 -r 40d8d9e3ebd57820aac97115950818d97019ba17 test/unit/jobs/test_rule_helper.py --- a/test/unit/jobs/test_rule_helper.py +++ b/test/unit/jobs/test_rule_helper.py @@ -24,9 +24,12 @@ __assert_job_count_is( 2, rule_helper, for_destination="local" ) __assert_job_count_is( 7, rule_helper, for_destination="cluster1" ) + __assert_job_count_is( 9, rule_helper, for_destinations=["cluster1", "local"] ) + # Test per user destination counts __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 ) __assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 ) + __assert_job_count_is( 7, rule_helper, for_destinations=["cluster1", "local"], for_user_email=USER_EMAIL_1 ) __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 ) __assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 ) https://bitbucket.org/galaxy/galaxy-central/commits/ae77e648918a/ Changeset: ae77e648918a User: jmchilton Date: 2014-08-18 00:43:54 Summary: Dynamic destinations - pass job_conf.xml params to rule functions. In other words, send extra job destination parameters to dynamic rule functions as arguments (in addition to those dynamically populated by Galaxy itself). This enables greater parameterization of rule functions and should lead to cleaner separation of logic and data (i.e. sites can program rules that restrict access to users, but which users can be populated at a higher level in `job_conf.xml`). For example the following dynamic job rule: def cluster1(app, memory="4096", cores="1", hours="48"): native_spec = "--time=%s:00:00 --nodes=1 --ntasks=%s --mem=%s" % ( hours, cores, memory ) return JobDestination( "cluster1", params=dict( native_specification=native_spec ) ) Could then be called with various parameters in job_conf.xml as follows: <destination id="short_job" type="dyanmic"><param id="function">cluster1</param><param id="hours">1</param></destination><destination id="big_job" type="dynamic"><param id="function">cluster1</param><param id="cores">8</param><param id="memory">32768</param></destination> Affected #: 3 files diff -r 40d8d9e3ebd57820aac97115950818d97019ba17 -r ae77e648918aa881618e196bd6b036ae4095b6db lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -69,7 +69,7 @@ names.append( rule_module_name ) return names - def __invoke_expand_function( self, expand_function ): + def __invoke_expand_function( self, expand_function, destination_params ): function_arg_names = inspect.getargspec( expand_function ).args app = self.job_wrapper.app possible_args = { @@ -83,6 +83,11 @@ actual_args = {} + # Send through any job_conf.xml defined args to function + for destination_param in destination_params.keys(): + if destination_param in function_arg_names: + actual_args[ destination_param ] = destination_params[ destination_param ] + # Populate needed args for possible_arg_name in possible_args: if possible_arg_name in function_arg_names: @@ -179,12 +184,12 @@ raise Exception( message ) expand_function = self.__get_expand_function( expand_function_name ) - return self.__handle_rule( expand_function ) + return self.__handle_rule( expand_function, destination ) else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) - def __handle_rule( self, rule_function ): - job_destination = self.__invoke_expand_function( rule_function ) + def __handle_rule( self, rule_function, destination ): + job_destination = self.__invoke_expand_function( rule_function, destination.params ) if not isinstance(job_destination, galaxy.jobs.JobDestination): job_destination_rep = str(job_destination) # Should be either id or url if '://' in job_destination_rep: diff -r 40d8d9e3ebd57820aac97115950818d97019ba17 -r ae77e648918aa881618e196bd6b036ae4095b6db test/unit/jobs/test_mapper.py --- a/test/unit/jobs/test_mapper.py +++ b/test/unit/jobs/test_mapper.py @@ -46,6 +46,12 @@ assert mapper.job_config.rule_response == "tool1_dest_id" +def test_dynamic_mapping_job_conf_params(): + mapper = __mapper( __dynamic_destination( dict( function="check_job_conf_params", param1="7" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "sent_7_dest_id" + + def test_dynamic_mapping_function_parameters(): mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) ) assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION diff -r 40d8d9e3ebd57820aac97115950818d97019ba17 -r ae77e648918aa881618e196bd6b036ae4095b6db test/unit/jobs/test_rules/10_site.py --- a/test/unit/jobs/test_rules/10_site.py +++ b/test/unit/jobs/test_rules/10_site.py @@ -40,6 +40,11 @@ return "all_passed" +def check_job_conf_params( param1 ): + assert param1 == "7" + return "sent_7_dest_id" + + def check_resource_params( resource_params ): assert resource_params["memory"] == "8gb" return "have_resource_params" https://bitbucket.org/galaxy/galaxy-central/commits/ec6a5e37d428/ Changeset: ec6a5e37d428 User: jmchilton Date: 2014-08-18 00:43:54 Summary: Dynamic destinations - allow dispatching on workflow invocation. Generate a workflow invocation UUID that is stored with each job in the workflow and allow dynamic job destinations to consume these. Should allow for grouping jobs from the same workflow together during resource allocation (a sort of first attempt at dealing with data locality in workflows). Affected #: 6 files diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -95,7 +95,7 @@ # Don't hit the DB to load the job object if not needed require_db = False - for param in ["job", "user", "user_email", "resource_params"]: + for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]: if param in function_arg_names: require_db = True break @@ -127,6 +127,11 @@ pass actual_args[ "resource_params" ] = resource_params + if "workflow_invocation_uuid" in function_arg_names: + param_values = job.raw_param_dict( ) + workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None ) + actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid + return expand_function( **actual_args ) def __job_params( self, job ): diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -482,10 +482,13 @@ Read encoded parameter values from the database and turn back into a dict of tool parameter values. """ - param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) + param_dict = self.raw_param_dict() tool = app.toolbox.get_tool( self.tool_id ) param_dict = tool.params_from_strings( param_dict, app, ignore_errors=ignore_errors ) return param_dict + def raw_param_dict( self ): + param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) + return param_dict def check_if_output_datasets_deleted( self ): """ Return true if all of the output datasets associated with this job are diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/tools/execute.py --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -10,13 +10,19 @@ log = logging.getLogger( __name__ ) -def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None ): +def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None, workflow_invocation_uuid=None ): """ Execute a tool and return object containing summary (output data, number of failures, etc...). """ execution_tracker = ToolExecutionTracker( tool, param_combinations, collection_info ) for params in execution_tracker.param_combinations: + if workflow_invocation_uuid: + params[ '__workflow_invocation_uuid__' ] = workflow_invocation_uuid + elif '__workflow_invocation_uuid__' in params: + # Only workflow invocation code gets to set this, ignore user supplied + # values or rerun parameters. + del params[ '__workflow_invocation_uuid__' ] job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history ) if job: execution_tracker.record_success( job, result ) diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -1,3 +1,5 @@ +import uuid + from galaxy import model from galaxy import exceptions from galaxy import util @@ -41,6 +43,8 @@ self.param_map = workflow_run_config.param_map self.outputs = odict() + # TODO: Attach to actual model object and persist someday... + self.invocation_uuid = uuid.uuid1().hex def invoke( self ): workflow_invocation = model.WorkflowInvocation() @@ -132,6 +136,7 @@ param_combinations=param_combinations, history=self.target_history, collection_info=collection_info, + workflow_invocation_uuid=self.invocation_uuid ) if collection_info: outputs[ step.id ] = dict( execution_tracker.created_collections ) diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd test/unit/jobs/test_mapper.py --- a/test/unit/jobs/test_mapper.py +++ b/test/unit/jobs/test_mapper.py @@ -1,3 +1,5 @@ +import uuid + import jobs.test_rules from galaxy.jobs.mapper import ( @@ -9,7 +11,7 @@ from galaxy.util import bunch - +WORKFLOW_UUID = uuid.uuid1().hex TOOL_JOB_DESTINATION = JobDestination() DYNAMICALLY_GENERATED_DESTINATION = JobDestination() @@ -64,6 +66,12 @@ assert mapper.job_config.rule_response == "have_resource_params" +def test_dynamic_mapping_workflow_invocation_parameter(): + mapper = __mapper( __dynamic_destination( dict( function="check_workflow_invocation_uuid" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == WORKFLOW_UUID + + def test_dynamic_mapping_no_function(): dest = __dynamic_destination( dict( ) ) mapper = __mapper( dest ) @@ -130,21 +138,26 @@ return True def get_job(self): + raw_params = { + "threshold": 8, + "__workflow_invocation_uuid__": WORKFLOW_UUID, + } + def get_param_values( app, ignore_errors ): assert app == self.app - return { - "threshold": 8, - "__job_resource": { - "__job_resource__select": "True", - "memory": "8gb" - } + params = raw_params.copy() + params[ "__job_resource" ] = { + "__job_resource__select": "True", + "memory": "8gb" } + return params return bunch.Bunch( user=bunch.Bunch( id=6789, email="test@example.com" ), + raw_param_dict=lambda: raw_params, get_param_values=get_param_values ) diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd test/unit/jobs/test_rules/10_site.py --- a/test/unit/jobs/test_rules/10_site.py +++ b/test/unit/jobs/test_rules/10_site.py @@ -48,3 +48,7 @@ def check_resource_params( resource_params ): assert resource_params["memory"] == "8gb" return "have_resource_params" + + +def check_workflow_invocation_uuid( workflow_invocation_uuid ): + return workflow_invocation_uuid https://bitbucket.org/galaxy/galaxy-central/commits/05b550588d9d/ Changeset: 05b550588d9d User: jmchilton Date: 2014-08-18 00:43:54 Summary: Dynamic destinations - easier, high-level reasoning about data locality. ... if only in a limitted sort of way. Provide ability to hash jobs by history, workflow, user, etc... and choose among various destinations semi-randomly based on that to distribute work across destinations based on these factors. Add a stock rule as in the form of a new dynamic destination type ("choose_one") that demonstrate using this to quickly proxy to some fixed static destinations. Add examples to job_conf.xml.sample_advanced. Affected #: 5 files diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -235,6 +235,21 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> + <destination id="load_balance" runner="dynamic"> + <param id="type">choose_one</param> + <!-- Randomly assign jobs to various static destinatin ids --> + <param id="destination_ids">cluster1,cluster2,cluster3</param> + </destination> + <destination id="load_balance_with_data_locality" runner="dynamic"> + <!-- Randomly assign jobs to various static destinatin ids, + but keep jobs in the same workflow invocation together and + for those jobs ran outside of workflows keep jobs in same + history together. + --> + <param id="type">choose_one</param> + <param id="destination_ids">cluster1,cluster2,cluster3</param> + <param id="hash_by">workflow_invocation,history</param> + </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -5,6 +5,8 @@ log = logging.getLogger( __name__ ) import galaxy.jobs.rules +from galaxy.jobs import stock_rules + from .rule_helper import RuleHelper DYNAMIC_RUNNER_NAME = "dynamic" @@ -27,6 +29,11 @@ self.message = message +STOCK_RULES = dict( + choose_one=stock_rules.choose_one +) + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs @@ -182,6 +189,7 @@ def __handle_dynamic_job_destination( self, destination ): expand_type = destination.params.get('type', "python") + expand_function = None if expand_type == "python": expand_function_name = self.__determine_expand_function_name( destination ) if not expand_function_name: @@ -189,10 +197,13 @@ raise Exception( message ) expand_function = self.__get_expand_function( expand_function_name ) - return self.__handle_rule( expand_function, destination ) + elif expand_type in STOCK_RULES: + expand_function = STOCK_RULES[ expand_type ] else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) + return self.__handle_rule( expand_function, destination ) + def __handle_rule( self, rule_function, destination ): job_destination = self.__invoke_expand_function( rule_function, destination.params ) if not isinstance(job_destination, galaxy.jobs.JobDestination): diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -1,4 +1,6 @@ from datetime import datetime +import hashlib +import random from sqlalchemy import ( and_, @@ -6,10 +8,13 @@ ) from galaxy import model +from galaxy import util import logging log = logging.getLogger( __name__ ) +VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"] + class RuleHelper( object ): """ Utillity to allow job rules to interface cleanly with the rest of @@ -96,3 +101,60 @@ query = query.filter( model.Job.table.c.state.in_( for_job_states ) ) return query + + def choose_one( self, lst, hash_value=None ): + """ Choose a random value from supplied list. If hash_value is passed + in then every request with that same hash_value would produce the same + choice from the supplied list. + """ + if hash_value is None: + return random.choice( lst ) + + if not isinstance( hash_value, int ): + # Convert hash_value string into index + as_hex = hashlib.md5( hash_value ).hexdigest() + hash_value = int(as_hex, 16) + # else assumed to be 'random' int from 0-~Inf + random_index = hash_value % len( lst ) + return lst[ random_index ] + + def job_hash( self, job, hash_by=None ): + """ Produce a reproducible hash for the given job on various + criteria - for instance if hash_by is "workflow_invocation,history" - + all jobs within the same workflow invocation will recieve the same + hash - for jobs outside of workflows all jobs within the same history + will recieve the same hash, other jobs will be hashed on job's id + randomly. + + Primarily intended for use with ``choose_one`` above - to consistent + route or schedule related jobs. + """ + if hash_by is None: + hash_by = [ "job" ] + hash_bys = util.listify( hash_by ) + for hash_by in hash_bys: + job_hash = self._try_hash_for_job( job, hash_by ) + if job_hash: + return job_hash + + # Fallback to just hashing by job id, should always return a value. + return self._try_hash_for_job( job, "job" ) + + def _try_hash_for_job( self, job, hash_by ): + """ May return False or None if hash type is invalid for that job - + e.g. attempting to hash by user for anonymous job or by workflow + invocation for jobs outside of workflows. + """ + if hash_by not in VALID_JOB_HASH_STRATEGIES: + message = "Do not know how to hash jobs by %s, must be one of %s" % ( hash_by, VALID_JOB_HASH_STRATEGIES ) + raise Exception( message ) + + if hash_by == "workflow_invocation": + return job.raw_param_dict().get( "__workflow_invocation_uuid__", None ) + elif hash_by == "history": + return job.history_id + elif hash_by == "user": + user = job.user + return user and user.id + elif hash_by == "job": + return job.id diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc lib/galaxy/jobs/stock_rules.py --- /dev/null +++ b/lib/galaxy/jobs/stock_rules.py @@ -0,0 +1,13 @@ +""" Stock job 'dynamic' rules for use in job_conf.xml - these may cover some +simple use cases but will just proxy into functions in rule_helper so similar +functionality - but more tailored and composable can be utilized in custom +rules. +""" + +from galaxy import util + + +def choose_one( rule_helper, job, destination_ids, hash_by="job" ): + destination_id_list = util.listify( destination_ids ) + job_hash = rule_helper.job_hash( job, hash_by ) + return rule_helper.choose_one( destination_id_list, hash_value=job_hash ) diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc test/unit/jobs/test_rule_helper.py --- a/test/unit/jobs/test_rule_helper.py +++ b/test/unit/jobs/test_rule_helper.py @@ -1,3 +1,5 @@ +import uuid + from galaxy.util import bunch from galaxy import model from galaxy.model import mapping @@ -72,10 +74,87 @@ app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) ) -def __rule_helper(): - app = MockApp() - rule_helper = RuleHelper( app ) - return rule_helper +def test_choose_one_unhashed(): + rule_helper = __rule_helper() + + # Random choices if hash not set. + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'])) ) + + assert chosen_ones == set(['a', 'b']) + + +def test_choose_one_hashed(): + rule_helper = __rule_helper() + + # Hashed, so all choosen ones should be the same... + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value=1234)) ) + assert len( chosen_ones ) == 1 + + # ... also can verify hashing on strings + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value="i am a string")) ) + + assert len( chosen_ones ) == 1 + + +def test_job_hash_unique_by_default( ): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + rule_helper.job_hash( job1 ) != rule_helper.job_hash( job2 ) + + +def test_job_hash_history( ): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + __assert_same_hash( rule_helper, job1, job2, hash_by="history" ) + + +def test_job_hash_workflow_invocation(): + rule_helper = __rule_helper() + job1, job2 = __two_jobs() + wi_uuid = uuid.uuid1().hex + + job1.add_parameter( "__workflow_invocation_uuid__", wi_uuid ) + job2.add_parameter( "__workflow_invocation_uuid__", wi_uuid ) + + __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation" ) + + +def test_job_hash_fallback(): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" ) + + +def __assert_same_hash( rule_helper, job1, job2, hash_by ): + job1_hash = rule_helper.job_hash( job1, hash_by=hash_by ) + job2_hash = rule_helper.job_hash( job2, hash_by=hash_by ) + assert job1_hash == job2_hash + + +def __two_jobs_in_a_history(): + job1, job2 = __two_jobs() + job1.history_id = 4 + job2.history_id = 4 + return job1, job2 + + +def __two_jobs( ): + job1 = model.Job() + job1.id = 1 + job2 = model.Job() + job2.id = 2 + return job1, job2 + + +def __do_a_bunch( work ): + for i in range( 20 ): + work() def __new_job( **kwds ): @@ -85,6 +164,12 @@ return job +def __rule_helper(): + app = MockApp() + rule_helper = RuleHelper( app ) + return rule_helper + + class MockApp( object ): def __init__( self ): https://bitbucket.org/galaxy/galaxy-central/commits/8a56397528dc/ Changeset: 8a56397528dc User: jmchilton Date: 2014-08-18 00:43:54 Summary: Dynamic destinations - easier, high-level reasoning about bursting. Add utility to RuleHelper to simplify bursting reasoning slightly, setup a stock rule for bursting between two static job destinations - mostly for demonstration purprose but it might be useful in some settings. Affected #: 5 files diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -250,6 +250,22 @@ <param id="destination_ids">cluster1,cluster2,cluster3</param><param id="hash_by">workflow_invocation,history</param></destination> + <destination id="burst_out" runner="dynamic"> + <!-- Burst out from static destination local_cluster_8_core to + static destination shared_cluster_8_core when there are about + 50 Galaxy jobs assigned to any of the local_cluster_XXX + destinations (either running or queued). If there are fewer + than 50 jobs, just use local_cluster_8_core destination. + + Uncomment job_state parameter to make this bursting happen when + roughly 50 jobs are queued instead. + --> + <param id="type">burst</param> + <param id="from_destination_ids">local_cluster_8_core,local_cluster_1_core,local_cluster_16_core</param> + <param id="to_destination_id">shared_cluster_8_core</param> + <param id="num_jobs">50</param> + <!-- <param id="job_states">queued</param> --> + </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -30,7 +30,8 @@ STOCK_RULES = dict( - choose_one=stock_rules.choose_one + choose_one=stock_rules.choose_one, + burst=stock_rules.burst ) diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -102,6 +102,27 @@ return query + def should_burst( self, destination_ids, num_jobs, job_states=None ): + """ Check if the specified destinations ``destination_ids`` have at + least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued`` + to limit this check to number of jobs queued. + + See stock_rules for an simple example of using this function - but to + get the most out of it - it should probably be used with custom job + rules that can respond to the bursting by allocatin resources, + launching cloud nodes, etc.... + """ + if job_states is None: + job_states = "queued,running" + from_destination_job_count = self.job_count( + for_destinations=destination_ids, + for_job_states=util.listify( job_states ) + ) + # Would this job push us over maximum job count before requiring + # bursting (roughly... very roughly given many handler threads may be + # scheduling jobs). + return ( from_destination_job_count + 1 ) > int( num_jobs ) + def choose_one( self, lst, hash_value=None ): """ Choose a random value from supplied list. If hash_value is passed in then every request with that same hash_value would produce the same diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 lib/galaxy/jobs/stock_rules.py --- a/lib/galaxy/jobs/stock_rules.py +++ b/lib/galaxy/jobs/stock_rules.py @@ -11,3 +11,11 @@ destination_id_list = util.listify( destination_ids ) job_hash = rule_helper.job_hash( job, hash_by ) return rule_helper.choose_one( destination_id_list, hash_value=job_hash ) + + +def burst( rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None): + from_destination_ids = util.listify( from_destination_ids ) + if rule_helper.should_burst( from_destination_ids, num_jobs=num_jobs, job_states=job_states ): + return to_destination_id + else: + return from_destination_ids[ 0 ] diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 test/unit/jobs/test_rule_helper.py --- a/test/unit/jobs/test_rule_helper.py +++ b/test/unit/jobs/test_rule_helper.py @@ -131,6 +131,17 @@ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" ) +def test_should_burst( ): + rule_helper = __rule_helper() + __setup_fixtures( rule_helper.app ) + # cluster1 fixture has 4 queued jobs, 3 running + assert rule_helper.should_burst( [ "cluster1" ], "7" ) + assert not rule_helper.should_burst( [ "cluster1" ], "10" ) + + assert rule_helper.should_burst( [ "cluster1" ], "2", job_states="queued" ) + assert not rule_helper.should_burst( [ "cluster1" ], "6", job_states="queued" ) + + def __assert_same_hash( rule_helper, job1, job2, hash_by ): job1_hash = rule_helper.job_hash( job1, hash_by=hash_by ) job2_hash = rule_helper.job_hash( job2, hash_by=hash_by ) https://bitbucket.org/galaxy/galaxy-central/commits/d0d386587231/ Changeset: d0d386587231 User: jmchilton Date: 2014-08-18 00:43:54 Summary: Dynamic destinations - easier, high-level reasoning about docker tool support. Added supports_docker function to RuleHelper for use in dynamic destination rules. Add a very simple stock dynamic destination type (docker_dispatcher) that will send all jobs into either a docker enabled destination or a vanilla destination depending on whether the tool being mapped supports docker (at this time that means is annotated explicitly with a container id). Affected #: 4 files diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -266,6 +266,17 @@ <param id="num_jobs">50</param><!-- <param id="job_states">queued</param> --></destination> + <destination id="docker_dispatch" runner="dynamic"> + <!-- Follow dynamic destination type will send all tool's that + support docker to static destination defined by + docker_destination_id (docker_cluster in this example) and all + other tools to default_destination_id (normal_cluster in this + example). + --> + <param id="type">docker_dispatch</param> + <param id="docker_destination_id">docker_cluster</param> + <param id="default_destination_id">normal_cluster</param> + </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -31,7 +31,8 @@ STOCK_RULES = dict( choose_one=stock_rules.choose_one, - burst=stock_rules.burst + burst=stock_rules.burst, + docker_dispatch=stock_rules.docker_dispatch, ) diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -27,6 +27,24 @@ def __init__( self, app ): self.app = app + def supports_docker( self, job_or_tool ): + """ Job rules can pass this function a job, job_wrapper, or tool and + determine if the underlying tool believes it can be containered. + """ + # Not a ton of logic in this method - but the idea is to shield rule + # developers from the details and they shouldn't have to know how to + # interrogate tool or job to figure out if it can be run in a + # container. + if hasattr( job_or_tool, 'containers' ): + tool = job_or_tool + elif hasattr( job_or_tool, 'tool' ): + # Have a JobWrapper-like + tool = job_or_tool.tool + else: + # Have a Job object. + tool = self.app.toolbox.get_tool( job_or_tool.tool_id ) + return any( [ c.type == "docker" for c in tool.containers ] ) + def job_count( self, **kwds diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 lib/galaxy/jobs/stock_rules.py --- a/lib/galaxy/jobs/stock_rules.py +++ b/lib/galaxy/jobs/stock_rules.py @@ -19,3 +19,7 @@ return to_destination_id else: return from_destination_ids[ 0 ] + + +def docker_dispatch( rule_helper, tool, docker_destination_id, default_destination_id ): + return docker_destination_id if rule_helper.supports_docker( tool ) else default_destination_id https://bitbucket.org/galaxy/galaxy-central/commits/3c9a16993672/ Changeset: 3c9a16993672 User: dannon Date: 2014-08-18 21:05:18 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #465) Dynamic destination enhancements aimed at enabling co-locating data and (cloud) bursting. Affected #: 10 files diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -235,6 +235,48 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> + <destination id="load_balance" runner="dynamic"> + <param id="type">choose_one</param> + <!-- Randomly assign jobs to various static destinatin ids --> + <param id="destination_ids">cluster1,cluster2,cluster3</param> + </destination> + <destination id="load_balance_with_data_locality" runner="dynamic"> + <!-- Randomly assign jobs to various static destinatin ids, + but keep jobs in the same workflow invocation together and + for those jobs ran outside of workflows keep jobs in same + history together. + --> + <param id="type">choose_one</param> + <param id="destination_ids">cluster1,cluster2,cluster3</param> + <param id="hash_by">workflow_invocation,history</param> + </destination> + <destination id="burst_out" runner="dynamic"> + <!-- Burst out from static destination local_cluster_8_core to + static destination shared_cluster_8_core when there are about + 50 Galaxy jobs assigned to any of the local_cluster_XXX + destinations (either running or queued). If there are fewer + than 50 jobs, just use local_cluster_8_core destination. + + Uncomment job_state parameter to make this bursting happen when + roughly 50 jobs are queued instead. + --> + <param id="type">burst</param> + <param id="from_destination_ids">local_cluster_8_core,local_cluster_1_core,local_cluster_16_core</param> + <param id="to_destination_id">shared_cluster_8_core</param> + <param id="num_jobs">50</param> + <!-- <param id="job_states">queued</param> --> + </destination> + <destination id="docker_dispatch" runner="dynamic"> + <!-- Follow dynamic destination type will send all tool's that + support docker to static destination defined by + docker_destination_id (docker_cluster in this example) and all + other tools to default_destination_id (normal_cluster in this + example). + --> + <param id="type">docker_dispatch</param> + <param id="docker_destination_id">docker_cluster</param> + <param id="default_destination_id">normal_cluster</param> + </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -5,6 +5,8 @@ log = logging.getLogger( __name__ ) import galaxy.jobs.rules +from galaxy.jobs import stock_rules + from .rule_helper import RuleHelper DYNAMIC_RUNNER_NAME = "dynamic" @@ -27,6 +29,13 @@ self.message = message +STOCK_RULES = dict( + choose_one=stock_rules.choose_one, + burst=stock_rules.burst, + docker_dispatch=stock_rules.docker_dispatch, +) + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs @@ -69,7 +78,7 @@ names.append( rule_module_name ) return names - def __invoke_expand_function( self, expand_function ): + def __invoke_expand_function( self, expand_function, destination_params ): function_arg_names = inspect.getargspec( expand_function ).args app = self.job_wrapper.app possible_args = { @@ -83,6 +92,11 @@ actual_args = {} + # Send through any job_conf.xml defined args to function + for destination_param in destination_params.keys(): + if destination_param in function_arg_names: + actual_args[ destination_param ] = destination_params[ destination_param ] + # Populate needed args for possible_arg_name in possible_args: if possible_arg_name in function_arg_names: @@ -90,7 +104,7 @@ # Don't hit the DB to load the job object if not needed require_db = False - for param in ["job", "user", "user_email", "resource_params"]: + for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]: if param in function_arg_names: require_db = True break @@ -122,6 +136,11 @@ pass actual_args[ "resource_params" ] = resource_params + if "workflow_invocation_uuid" in function_arg_names: + param_values = job.raw_param_dict( ) + workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None ) + actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid + return expand_function( **actual_args ) def __job_params( self, job ): @@ -172,6 +191,7 @@ def __handle_dynamic_job_destination( self, destination ): expand_type = destination.params.get('type', "python") + expand_function = None if expand_type == "python": expand_function_name = self.__determine_expand_function_name( destination ) if not expand_function_name: @@ -179,12 +199,15 @@ raise Exception( message ) expand_function = self.__get_expand_function( expand_function_name ) - return self.__handle_rule( expand_function ) + elif expand_type in STOCK_RULES: + expand_function = STOCK_RULES[ expand_type ] else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) - def __handle_rule( self, rule_function ): - job_destination = self.__invoke_expand_function( rule_function ) + return self.__handle_rule( expand_function, destination ) + + def __handle_rule( self, rule_function, destination ): + job_destination = self.__invoke_expand_function( rule_function, destination.params ) if not isinstance(job_destination, galaxy.jobs.JobDestination): job_destination_rep = str(job_destination) # Should be either id or url if '://' in job_destination_rep: diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -1,4 +1,6 @@ from datetime import datetime +import hashlib +import random from sqlalchemy import ( and_, @@ -6,10 +8,13 @@ ) from galaxy import model +from galaxy import util import logging log = logging.getLogger( __name__ ) +VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"] + class RuleHelper( object ): """ Utillity to allow job rules to interface cleanly with the rest of @@ -22,6 +27,24 @@ def __init__( self, app ): self.app = app + def supports_docker( self, job_or_tool ): + """ Job rules can pass this function a job, job_wrapper, or tool and + determine if the underlying tool believes it can be containered. + """ + # Not a ton of logic in this method - but the idea is to shield rule + # developers from the details and they shouldn't have to know how to + # interrogate tool or job to figure out if it can be run in a + # container. + if hasattr( job_or_tool, 'containers' ): + tool = job_or_tool + elif hasattr( job_or_tool, 'tool' ): + # Have a JobWrapper-like + tool = job_or_tool.tool + else: + # Have a Job object. + tool = self.app.toolbox.get_tool( job_or_tool.tool_id ) + return any( [ c.type == "docker" for c in tool.containers ] ) + def job_count( self, **kwds @@ -58,16 +81,23 @@ query, for_user_email=None, for_destination=None, + for_destinations=None, for_job_states=None, created_in_last=None, updated_in_last=None, ): + if for_destination is not None: + for_destinations = [ for_destination ] + query = query.join( model.User ) if for_user_email is not None: query = query.filter( model.User.table.c.email == for_user_email ) - if for_destination is not None: - query = query.filter( model.Job.table.c.destination_id == for_destination ) + if for_destinations is not None: + if len( for_destinations ) == 1: + query = query.filter( model.Job.table.c.destination_id == for_destinations[ 0 ] ) + else: + query = query.filter( model.Job.table.c.destination_id.in_( for_destinations ) ) if created_in_last is not None: end_date = datetime.now() @@ -89,3 +119,81 @@ query = query.filter( model.Job.table.c.state.in_( for_job_states ) ) return query + + def should_burst( self, destination_ids, num_jobs, job_states=None ): + """ Check if the specified destinations ``destination_ids`` have at + least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued`` + to limit this check to number of jobs queued. + + See stock_rules for an simple example of using this function - but to + get the most out of it - it should probably be used with custom job + rules that can respond to the bursting by allocatin resources, + launching cloud nodes, etc.... + """ + if job_states is None: + job_states = "queued,running" + from_destination_job_count = self.job_count( + for_destinations=destination_ids, + for_job_states=util.listify( job_states ) + ) + # Would this job push us over maximum job count before requiring + # bursting (roughly... very roughly given many handler threads may be + # scheduling jobs). + return ( from_destination_job_count + 1 ) > int( num_jobs ) + + def choose_one( self, lst, hash_value=None ): + """ Choose a random value from supplied list. If hash_value is passed + in then every request with that same hash_value would produce the same + choice from the supplied list. + """ + if hash_value is None: + return random.choice( lst ) + + if not isinstance( hash_value, int ): + # Convert hash_value string into index + as_hex = hashlib.md5( hash_value ).hexdigest() + hash_value = int(as_hex, 16) + # else assumed to be 'random' int from 0-~Inf + random_index = hash_value % len( lst ) + return lst[ random_index ] + + def job_hash( self, job, hash_by=None ): + """ Produce a reproducible hash for the given job on various + criteria - for instance if hash_by is "workflow_invocation,history" - + all jobs within the same workflow invocation will recieve the same + hash - for jobs outside of workflows all jobs within the same history + will recieve the same hash, other jobs will be hashed on job's id + randomly. + + Primarily intended for use with ``choose_one`` above - to consistent + route or schedule related jobs. + """ + if hash_by is None: + hash_by = [ "job" ] + hash_bys = util.listify( hash_by ) + for hash_by in hash_bys: + job_hash = self._try_hash_for_job( job, hash_by ) + if job_hash: + return job_hash + + # Fallback to just hashing by job id, should always return a value. + return self._try_hash_for_job( job, "job" ) + + def _try_hash_for_job( self, job, hash_by ): + """ May return False or None if hash type is invalid for that job - + e.g. attempting to hash by user for anonymous job or by workflow + invocation for jobs outside of workflows. + """ + if hash_by not in VALID_JOB_HASH_STRATEGIES: + message = "Do not know how to hash jobs by %s, must be one of %s" % ( hash_by, VALID_JOB_HASH_STRATEGIES ) + raise Exception( message ) + + if hash_by == "workflow_invocation": + return job.raw_param_dict().get( "__workflow_invocation_uuid__", None ) + elif hash_by == "history": + return job.history_id + elif hash_by == "user": + user = job.user + return user and user.id + elif hash_by == "job": + return job.id diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/stock_rules.py --- /dev/null +++ b/lib/galaxy/jobs/stock_rules.py @@ -0,0 +1,25 @@ +""" Stock job 'dynamic' rules for use in job_conf.xml - these may cover some +simple use cases but will just proxy into functions in rule_helper so similar +functionality - but more tailored and composable can be utilized in custom +rules. +""" + +from galaxy import util + + +def choose_one( rule_helper, job, destination_ids, hash_by="job" ): + destination_id_list = util.listify( destination_ids ) + job_hash = rule_helper.job_hash( job, hash_by ) + return rule_helper.choose_one( destination_id_list, hash_value=job_hash ) + + +def burst( rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None): + from_destination_ids = util.listify( from_destination_ids ) + if rule_helper.should_burst( from_destination_ids, num_jobs=num_jobs, job_states=job_states ): + return to_destination_id + else: + return from_destination_ids[ 0 ] + + +def docker_dispatch( rule_helper, tool, docker_destination_id, default_destination_id ): + return docker_destination_id if rule_helper.supports_docker( tool ) else default_destination_id diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -482,10 +482,13 @@ Read encoded parameter values from the database and turn back into a dict of tool parameter values. """ - param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) + param_dict = self.raw_param_dict() tool = app.toolbox.get_tool( self.tool_id ) param_dict = tool.params_from_strings( param_dict, app, ignore_errors=ignore_errors ) return param_dict + def raw_param_dict( self ): + param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) + return param_dict def check_if_output_datasets_deleted( self ): """ Return true if all of the output datasets associated with this job are diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/tools/execute.py --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -10,13 +10,19 @@ log = logging.getLogger( __name__ ) -def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None ): +def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None, workflow_invocation_uuid=None ): """ Execute a tool and return object containing summary (output data, number of failures, etc...). """ execution_tracker = ToolExecutionTracker( tool, param_combinations, collection_info ) for params in execution_tracker.param_combinations: + if workflow_invocation_uuid: + params[ '__workflow_invocation_uuid__' ] = workflow_invocation_uuid + elif '__workflow_invocation_uuid__' in params: + # Only workflow invocation code gets to set this, ignore user supplied + # values or rerun parameters. + del params[ '__workflow_invocation_uuid__' ] job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history ) if job: execution_tracker.record_success( job, result ) diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -1,3 +1,5 @@ +import uuid + from galaxy import model from galaxy import exceptions from galaxy import util @@ -41,6 +43,8 @@ self.param_map = workflow_run_config.param_map self.outputs = odict() + # TODO: Attach to actual model object and persist someday... + self.invocation_uuid = uuid.uuid1().hex def invoke( self ): workflow_invocation = model.WorkflowInvocation() @@ -132,6 +136,7 @@ param_combinations=param_combinations, history=self.target_history, collection_info=collection_info, + workflow_invocation_uuid=self.invocation_uuid ) if collection_info: outputs[ step.id ] = dict( execution_tracker.created_collections ) diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_mapper.py --- a/test/unit/jobs/test_mapper.py +++ b/test/unit/jobs/test_mapper.py @@ -1,3 +1,5 @@ +import uuid + import jobs.test_rules from galaxy.jobs.mapper import ( @@ -9,7 +11,7 @@ from galaxy.util import bunch - +WORKFLOW_UUID = uuid.uuid1().hex TOOL_JOB_DESTINATION = JobDestination() DYNAMICALLY_GENERATED_DESTINATION = JobDestination() @@ -46,6 +48,12 @@ assert mapper.job_config.rule_response == "tool1_dest_id" +def test_dynamic_mapping_job_conf_params(): + mapper = __mapper( __dynamic_destination( dict( function="check_job_conf_params", param1="7" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "sent_7_dest_id" + + def test_dynamic_mapping_function_parameters(): mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) ) assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION @@ -58,6 +66,12 @@ assert mapper.job_config.rule_response == "have_resource_params" +def test_dynamic_mapping_workflow_invocation_parameter(): + mapper = __mapper( __dynamic_destination( dict( function="check_workflow_invocation_uuid" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == WORKFLOW_UUID + + def test_dynamic_mapping_no_function(): dest = __dynamic_destination( dict( ) ) mapper = __mapper( dest ) @@ -124,21 +138,26 @@ return True def get_job(self): + raw_params = { + "threshold": 8, + "__workflow_invocation_uuid__": WORKFLOW_UUID, + } + def get_param_values( app, ignore_errors ): assert app == self.app - return { - "threshold": 8, - "__job_resource": { - "__job_resource__select": "True", - "memory": "8gb" - } + params = raw_params.copy() + params[ "__job_resource" ] = { + "__job_resource__select": "True", + "memory": "8gb" } + return params return bunch.Bunch( user=bunch.Bunch( id=6789, email="test@example.com" ), + raw_param_dict=lambda: raw_params, get_param_values=get_param_values ) diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rule_helper.py --- a/test/unit/jobs/test_rule_helper.py +++ b/test/unit/jobs/test_rule_helper.py @@ -1,3 +1,5 @@ +import uuid + from galaxy.util import bunch from galaxy import model from galaxy.model import mapping @@ -24,9 +26,12 @@ __assert_job_count_is( 2, rule_helper, for_destination="local" ) __assert_job_count_is( 7, rule_helper, for_destination="cluster1" ) + __assert_job_count_is( 9, rule_helper, for_destinations=["cluster1", "local"] ) + # Test per user destination counts __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 ) __assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 ) + __assert_job_count_is( 7, rule_helper, for_destinations=["cluster1", "local"], for_user_email=USER_EMAIL_1 ) __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 ) __assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 ) @@ -69,10 +74,98 @@ app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) ) -def __rule_helper(): - app = MockApp() - rule_helper = RuleHelper( app ) - return rule_helper +def test_choose_one_unhashed(): + rule_helper = __rule_helper() + + # Random choices if hash not set. + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'])) ) + + assert chosen_ones == set(['a', 'b']) + + +def test_choose_one_hashed(): + rule_helper = __rule_helper() + + # Hashed, so all choosen ones should be the same... + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value=1234)) ) + assert len( chosen_ones ) == 1 + + # ... also can verify hashing on strings + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value="i am a string")) ) + + assert len( chosen_ones ) == 1 + + +def test_job_hash_unique_by_default( ): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + rule_helper.job_hash( job1 ) != rule_helper.job_hash( job2 ) + + +def test_job_hash_history( ): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + __assert_same_hash( rule_helper, job1, job2, hash_by="history" ) + + +def test_job_hash_workflow_invocation(): + rule_helper = __rule_helper() + job1, job2 = __two_jobs() + wi_uuid = uuid.uuid1().hex + + job1.add_parameter( "__workflow_invocation_uuid__", wi_uuid ) + job2.add_parameter( "__workflow_invocation_uuid__", wi_uuid ) + + __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation" ) + + +def test_job_hash_fallback(): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" ) + + +def test_should_burst( ): + rule_helper = __rule_helper() + __setup_fixtures( rule_helper.app ) + # cluster1 fixture has 4 queued jobs, 3 running + assert rule_helper.should_burst( [ "cluster1" ], "7" ) + assert not rule_helper.should_burst( [ "cluster1" ], "10" ) + + assert rule_helper.should_burst( [ "cluster1" ], "2", job_states="queued" ) + assert not rule_helper.should_burst( [ "cluster1" ], "6", job_states="queued" ) + + +def __assert_same_hash( rule_helper, job1, job2, hash_by ): + job1_hash = rule_helper.job_hash( job1, hash_by=hash_by ) + job2_hash = rule_helper.job_hash( job2, hash_by=hash_by ) + assert job1_hash == job2_hash + + +def __two_jobs_in_a_history(): + job1, job2 = __two_jobs() + job1.history_id = 4 + job2.history_id = 4 + return job1, job2 + + +def __two_jobs( ): + job1 = model.Job() + job1.id = 1 + job2 = model.Job() + job2.id = 2 + return job1, job2 + + +def __do_a_bunch( work ): + for i in range( 20 ): + work() def __new_job( **kwds ): @@ -82,6 +175,12 @@ return job +def __rule_helper(): + app = MockApp() + rule_helper = RuleHelper( app ) + return rule_helper + + class MockApp( object ): def __init__( self ): diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rules/10_site.py --- a/test/unit/jobs/test_rules/10_site.py +++ b/test/unit/jobs/test_rules/10_site.py @@ -40,6 +40,15 @@ return "all_passed" +def check_job_conf_params( param1 ): + assert param1 == "7" + return "sent_7_dest_id" + + def check_resource_params( resource_params ): assert resource_params["memory"] == "8gb" return "have_resource_params" + + +def check_workflow_invocation_uuid( workflow_invocation_uuid ): + return workflow_invocation_uuid Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.