1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/3c9a16993672/ Changeset: 3c9a16993672 User: dannon Date: 2014-08-18 21:05:18 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #465) Dynamic destination enhancements aimed at enabling co-locating data and (cloud) bursting. Affected #: 10 files diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -235,6 +235,48 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> + <destination id="load_balance" runner="dynamic"> + <param id="type">choose_one</param> + <!-- Randomly assign jobs to various static destinatin ids --> + <param id="destination_ids">cluster1,cluster2,cluster3</param> + </destination> + <destination id="load_balance_with_data_locality" runner="dynamic"> + <!-- Randomly assign jobs to various static destinatin ids, + but keep jobs in the same workflow invocation together and + for those jobs ran outside of workflows keep jobs in same + history together. + --> + <param id="type">choose_one</param> + <param id="destination_ids">cluster1,cluster2,cluster3</param> + <param id="hash_by">workflow_invocation,history</param> + </destination> + <destination id="burst_out" runner="dynamic"> + <!-- Burst out from static destination local_cluster_8_core to + static destination shared_cluster_8_core when there are about + 50 Galaxy jobs assigned to any of the local_cluster_XXX + destinations (either running or queued). If there are fewer + than 50 jobs, just use local_cluster_8_core destination. + + Uncomment job_state parameter to make this bursting happen when + roughly 50 jobs are queued instead. + --> + <param id="type">burst</param> + <param id="from_destination_ids">local_cluster_8_core,local_cluster_1_core,local_cluster_16_core</param> + <param id="to_destination_id">shared_cluster_8_core</param> + <param id="num_jobs">50</param> + <!-- <param id="job_states">queued</param> --> + </destination> + <destination id="docker_dispatch" runner="dynamic"> + <!-- Follow dynamic destination type will send all tool's that + support docker to static destination defined by + docker_destination_id (docker_cluster in this example) and all + other tools to default_destination_id (normal_cluster in this + example). + --> + <param id="type">docker_dispatch</param> + <param id="docker_destination_id">docker_cluster</param> + <param id="default_destination_id">normal_cluster</param> + </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -5,6 +5,8 @@ log = logging.getLogger( __name__ ) import galaxy.jobs.rules +from galaxy.jobs import stock_rules + from .rule_helper import RuleHelper DYNAMIC_RUNNER_NAME = "dynamic" @@ -27,6 +29,13 @@ self.message = message +STOCK_RULES = dict( + choose_one=stock_rules.choose_one, + burst=stock_rules.burst, + docker_dispatch=stock_rules.docker_dispatch, +) + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs @@ -69,7 +78,7 @@ names.append( rule_module_name ) return names - def __invoke_expand_function( self, expand_function ): + def __invoke_expand_function( self, expand_function, destination_params ): function_arg_names = inspect.getargspec( expand_function ).args app = self.job_wrapper.app possible_args = { @@ -83,6 +92,11 @@ actual_args = {} + # Send through any job_conf.xml defined args to function + for destination_param in destination_params.keys(): + if destination_param in function_arg_names: + actual_args[ destination_param ] = destination_params[ destination_param ] + # Populate needed args for possible_arg_name in possible_args: if possible_arg_name in function_arg_names: @@ -90,7 +104,7 @@ # Don't hit the DB to load the job object if not needed require_db = False - for param in ["job", "user", "user_email", "resource_params"]: + for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]: if param in function_arg_names: require_db = True break @@ -122,6 +136,11 @@ pass actual_args[ "resource_params" ] = resource_params + if "workflow_invocation_uuid" in function_arg_names: + param_values = job.raw_param_dict( ) + workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None ) + actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid + return expand_function( **actual_args ) def __job_params( self, job ): @@ -172,6 +191,7 @@ def __handle_dynamic_job_destination( self, destination ): expand_type = destination.params.get('type', "python") + expand_function = None if expand_type == "python": expand_function_name = self.__determine_expand_function_name( destination ) if not expand_function_name: @@ -179,12 +199,15 @@ raise Exception( message ) expand_function = self.__get_expand_function( expand_function_name ) - return self.__handle_rule( expand_function ) + elif expand_type in STOCK_RULES: + expand_function = STOCK_RULES[ expand_type ] else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) - def __handle_rule( self, rule_function ): - job_destination = self.__invoke_expand_function( rule_function ) + return self.__handle_rule( expand_function, destination ) + + def __handle_rule( self, rule_function, destination ): + job_destination = self.__invoke_expand_function( rule_function, destination.params ) if not isinstance(job_destination, galaxy.jobs.JobDestination): job_destination_rep = str(job_destination) # Should be either id or url if '://' in job_destination_rep: diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -1,4 +1,6 @@ from datetime import datetime +import hashlib +import random from sqlalchemy import ( and_, @@ -6,10 +8,13 @@ ) from galaxy import model +from galaxy import util import logging log = logging.getLogger( __name__ ) +VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"] + class RuleHelper( object ): """ Utillity to allow job rules to interface cleanly with the rest of @@ -22,6 +27,24 @@ def __init__( self, app ): self.app = app + def supports_docker( self, job_or_tool ): + """ Job rules can pass this function a job, job_wrapper, or tool and + determine if the underlying tool believes it can be containered. + """ + # Not a ton of logic in this method - but the idea is to shield rule + # developers from the details and they shouldn't have to know how to + # interrogate tool or job to figure out if it can be run in a + # container. + if hasattr( job_or_tool, 'containers' ): + tool = job_or_tool + elif hasattr( job_or_tool, 'tool' ): + # Have a JobWrapper-like + tool = job_or_tool.tool + else: + # Have a Job object. + tool = self.app.toolbox.get_tool( job_or_tool.tool_id ) + return any( [ c.type == "docker" for c in tool.containers ] ) + def job_count( self, **kwds @@ -58,16 +81,23 @@ query, for_user_email=None, for_destination=None, + for_destinations=None, for_job_states=None, created_in_last=None, updated_in_last=None, ): + if for_destination is not None: + for_destinations = [ for_destination ] + query = query.join( model.User ) if for_user_email is not None: query = query.filter( model.User.table.c.email == for_user_email ) - if for_destination is not None: - query = query.filter( model.Job.table.c.destination_id == for_destination ) + if for_destinations is not None: + if len( for_destinations ) == 1: + query = query.filter( model.Job.table.c.destination_id == for_destinations[ 0 ] ) + else: + query = query.filter( model.Job.table.c.destination_id.in_( for_destinations ) ) if created_in_last is not None: end_date = datetime.now() @@ -89,3 +119,81 @@ query = query.filter( model.Job.table.c.state.in_( for_job_states ) ) return query + + def should_burst( self, destination_ids, num_jobs, job_states=None ): + """ Check if the specified destinations ``destination_ids`` have at + least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued`` + to limit this check to number of jobs queued. + + See stock_rules for an simple example of using this function - but to + get the most out of it - it should probably be used with custom job + rules that can respond to the bursting by allocatin resources, + launching cloud nodes, etc.... + """ + if job_states is None: + job_states = "queued,running" + from_destination_job_count = self.job_count( + for_destinations=destination_ids, + for_job_states=util.listify( job_states ) + ) + # Would this job push us over maximum job count before requiring + # bursting (roughly... very roughly given many handler threads may be + # scheduling jobs). + return ( from_destination_job_count + 1 ) > int( num_jobs ) + + def choose_one( self, lst, hash_value=None ): + """ Choose a random value from supplied list. If hash_value is passed + in then every request with that same hash_value would produce the same + choice from the supplied list. + """ + if hash_value is None: + return random.choice( lst ) + + if not isinstance( hash_value, int ): + # Convert hash_value string into index + as_hex = hashlib.md5( hash_value ).hexdigest() + hash_value = int(as_hex, 16) + # else assumed to be 'random' int from 0-~Inf + random_index = hash_value % len( lst ) + return lst[ random_index ] + + def job_hash( self, job, hash_by=None ): + """ Produce a reproducible hash for the given job on various + criteria - for instance if hash_by is "workflow_invocation,history" - + all jobs within the same workflow invocation will recieve the same + hash - for jobs outside of workflows all jobs within the same history + will recieve the same hash, other jobs will be hashed on job's id + randomly. + + Primarily intended for use with ``choose_one`` above - to consistent + route or schedule related jobs. + """ + if hash_by is None: + hash_by = [ "job" ] + hash_bys = util.listify( hash_by ) + for hash_by in hash_bys: + job_hash = self._try_hash_for_job( job, hash_by ) + if job_hash: + return job_hash + + # Fallback to just hashing by job id, should always return a value. + return self._try_hash_for_job( job, "job" ) + + def _try_hash_for_job( self, job, hash_by ): + """ May return False or None if hash type is invalid for that job - + e.g. attempting to hash by user for anonymous job or by workflow + invocation for jobs outside of workflows. + """ + if hash_by not in VALID_JOB_HASH_STRATEGIES: + message = "Do not know how to hash jobs by %s, must be one of %s" % ( hash_by, VALID_JOB_HASH_STRATEGIES ) + raise Exception( message ) + + if hash_by == "workflow_invocation": + return job.raw_param_dict().get( "__workflow_invocation_uuid__", None ) + elif hash_by == "history": + return job.history_id + elif hash_by == "user": + user = job.user + return user and user.id + elif hash_by == "job": + return job.id diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/stock_rules.py --- /dev/null +++ b/lib/galaxy/jobs/stock_rules.py @@ -0,0 +1,25 @@ +""" Stock job 'dynamic' rules for use in job_conf.xml - these may cover some +simple use cases but will just proxy into functions in rule_helper so similar +functionality - but more tailored and composable can be utilized in custom +rules. +""" + +from galaxy import util + + +def choose_one( rule_helper, job, destination_ids, hash_by="job" ): + destination_id_list = util.listify( destination_ids ) + job_hash = rule_helper.job_hash( job, hash_by ) + return rule_helper.choose_one( destination_id_list, hash_value=job_hash ) + + +def burst( rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None): + from_destination_ids = util.listify( from_destination_ids ) + if rule_helper.should_burst( from_destination_ids, num_jobs=num_jobs, job_states=job_states ): + return to_destination_id + else: + return from_destination_ids[ 0 ] + + +def docker_dispatch( rule_helper, tool, docker_destination_id, default_destination_id ): + return docker_destination_id if rule_helper.supports_docker( tool ) else default_destination_id diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -482,10 +482,13 @@ Read encoded parameter values from the database and turn back into a dict of tool parameter values. """ - param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) + param_dict = self.raw_param_dict() tool = app.toolbox.get_tool( self.tool_id ) param_dict = tool.params_from_strings( param_dict, app, ignore_errors=ignore_errors ) return param_dict + def raw_param_dict( self ): + param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] ) + return param_dict def check_if_output_datasets_deleted( self ): """ Return true if all of the output datasets associated with this job are diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/tools/execute.py --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -10,13 +10,19 @@ log = logging.getLogger( __name__ ) -def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None ): +def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None, workflow_invocation_uuid=None ): """ Execute a tool and return object containing summary (output data, number of failures, etc...). """ execution_tracker = ToolExecutionTracker( tool, param_combinations, collection_info ) for params in execution_tracker.param_combinations: + if workflow_invocation_uuid: + params[ '__workflow_invocation_uuid__' ] = workflow_invocation_uuid + elif '__workflow_invocation_uuid__' in params: + # Only workflow invocation code gets to set this, ignore user supplied + # values or rerun parameters. + del params[ '__workflow_invocation_uuid__' ] job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history ) if job: execution_tracker.record_success( job, result ) diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -1,3 +1,5 @@ +import uuid + from galaxy import model from galaxy import exceptions from galaxy import util @@ -41,6 +43,8 @@ self.param_map = workflow_run_config.param_map self.outputs = odict() + # TODO: Attach to actual model object and persist someday... + self.invocation_uuid = uuid.uuid1().hex def invoke( self ): workflow_invocation = model.WorkflowInvocation() @@ -132,6 +136,7 @@ param_combinations=param_combinations, history=self.target_history, collection_info=collection_info, + workflow_invocation_uuid=self.invocation_uuid ) if collection_info: outputs[ step.id ] = dict( execution_tracker.created_collections ) diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_mapper.py --- a/test/unit/jobs/test_mapper.py +++ b/test/unit/jobs/test_mapper.py @@ -1,3 +1,5 @@ +import uuid + import jobs.test_rules from galaxy.jobs.mapper import ( @@ -9,7 +11,7 @@ from galaxy.util import bunch - +WORKFLOW_UUID = uuid.uuid1().hex TOOL_JOB_DESTINATION = JobDestination() DYNAMICALLY_GENERATED_DESTINATION = JobDestination() @@ -46,6 +48,12 @@ assert mapper.job_config.rule_response == "tool1_dest_id" +def test_dynamic_mapping_job_conf_params(): + mapper = __mapper( __dynamic_destination( dict( function="check_job_conf_params", param1="7" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "sent_7_dest_id" + + def test_dynamic_mapping_function_parameters(): mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) ) assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION @@ -58,6 +66,12 @@ assert mapper.job_config.rule_response == "have_resource_params" +def test_dynamic_mapping_workflow_invocation_parameter(): + mapper = __mapper( __dynamic_destination( dict( function="check_workflow_invocation_uuid" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == WORKFLOW_UUID + + def test_dynamic_mapping_no_function(): dest = __dynamic_destination( dict( ) ) mapper = __mapper( dest ) @@ -124,21 +138,26 @@ return True def get_job(self): + raw_params = { + "threshold": 8, + "__workflow_invocation_uuid__": WORKFLOW_UUID, + } + def get_param_values( app, ignore_errors ): assert app == self.app - return { - "threshold": 8, - "__job_resource": { - "__job_resource__select": "True", - "memory": "8gb" - } + params = raw_params.copy() + params[ "__job_resource" ] = { + "__job_resource__select": "True", + "memory": "8gb" } + return params return bunch.Bunch( user=bunch.Bunch( id=6789, email="test@example.com" ), + raw_param_dict=lambda: raw_params, get_param_values=get_param_values ) diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rule_helper.py --- a/test/unit/jobs/test_rule_helper.py +++ b/test/unit/jobs/test_rule_helper.py @@ -1,3 +1,5 @@ +import uuid + from galaxy.util import bunch from galaxy import model from galaxy.model import mapping @@ -24,9 +26,12 @@ __assert_job_count_is( 2, rule_helper, for_destination="local" ) __assert_job_count_is( 7, rule_helper, for_destination="cluster1" ) + __assert_job_count_is( 9, rule_helper, for_destinations=["cluster1", "local"] ) + # Test per user destination counts __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 ) __assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 ) + __assert_job_count_is( 7, rule_helper, for_destinations=["cluster1", "local"], for_user_email=USER_EMAIL_1 ) __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 ) __assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 ) @@ -69,10 +74,98 @@ app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) ) -def __rule_helper(): - app = MockApp() - rule_helper = RuleHelper( app ) - return rule_helper +def test_choose_one_unhashed(): + rule_helper = __rule_helper() + + # Random choices if hash not set. + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'])) ) + + assert chosen_ones == set(['a', 'b']) + + +def test_choose_one_hashed(): + rule_helper = __rule_helper() + + # Hashed, so all choosen ones should be the same... + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value=1234)) ) + assert len( chosen_ones ) == 1 + + # ... also can verify hashing on strings + chosen_ones = set([]) + __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value="i am a string")) ) + + assert len( chosen_ones ) == 1 + + +def test_job_hash_unique_by_default( ): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + rule_helper.job_hash( job1 ) != rule_helper.job_hash( job2 ) + + +def test_job_hash_history( ): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + __assert_same_hash( rule_helper, job1, job2, hash_by="history" ) + + +def test_job_hash_workflow_invocation(): + rule_helper = __rule_helper() + job1, job2 = __two_jobs() + wi_uuid = uuid.uuid1().hex + + job1.add_parameter( "__workflow_invocation_uuid__", wi_uuid ) + job2.add_parameter( "__workflow_invocation_uuid__", wi_uuid ) + + __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation" ) + + +def test_job_hash_fallback(): + rule_helper = __rule_helper() + job1, job2 = __two_jobs_in_a_history() + + __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" ) + + +def test_should_burst( ): + rule_helper = __rule_helper() + __setup_fixtures( rule_helper.app ) + # cluster1 fixture has 4 queued jobs, 3 running + assert rule_helper.should_burst( [ "cluster1" ], "7" ) + assert not rule_helper.should_burst( [ "cluster1" ], "10" ) + + assert rule_helper.should_burst( [ "cluster1" ], "2", job_states="queued" ) + assert not rule_helper.should_burst( [ "cluster1" ], "6", job_states="queued" ) + + +def __assert_same_hash( rule_helper, job1, job2, hash_by ): + job1_hash = rule_helper.job_hash( job1, hash_by=hash_by ) + job2_hash = rule_helper.job_hash( job2, hash_by=hash_by ) + assert job1_hash == job2_hash + + +def __two_jobs_in_a_history(): + job1, job2 = __two_jobs() + job1.history_id = 4 + job2.history_id = 4 + return job1, job2 + + +def __two_jobs( ): + job1 = model.Job() + job1.id = 1 + job2 = model.Job() + job2.id = 2 + return job1, job2 + + +def __do_a_bunch( work ): + for i in range( 20 ): + work() def __new_job( **kwds ): @@ -82,6 +175,12 @@ return job +def __rule_helper(): + app = MockApp() + rule_helper = RuleHelper( app ) + return rule_helper + + class MockApp( object ): def __init__( self ): diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rules/10_site.py --- a/test/unit/jobs/test_rules/10_site.py +++ b/test/unit/jobs/test_rules/10_site.py @@ -40,6 +40,15 @@ return "all_passed" +def check_job_conf_params( param1 ): + assert param1 == "7" + return "sent_7_dest_id" + + def check_resource_params( resource_params ): assert resource_params["memory"] == "8gb" return "have_resource_params" + + +def check_workflow_invocation_uuid( workflow_invocation_uuid ): + return workflow_invocation_uuid Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.