6 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/ae0dd02c419c/ Changeset: ae0dd02c419c User: jmchilton Date: 2014-08-17 22:38:34 Summary: PEP-8 fixes for galaxy.jobs.mapper. Affected #: 1 file diff -r 109f40daa21ad0348662805f93f1cf1c9fbd6003 -r ae0dd02c419c0cc781c8601ffc5fd9bf4f8af2ca lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -10,6 +10,7 @@ DYNAMIC_RUNNER_NAME = "dynamic" DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url" + class JobMappingException( Exception ): def __init__( self, failure_message ): @@ -65,12 +66,14 @@ def __invoke_expand_function( self, expand_function ): function_arg_names = inspect.getargspec( expand_function ).args app = self.job_wrapper.app - possible_args = { "job_id" : self.job_wrapper.job_id, - "tool" : self.job_wrapper.tool, - "tool_id" : self.job_wrapper.tool.id, - "job_wrapper" : self.job_wrapper, - "rule_helper": RuleHelper( app ), - "app" : app } + possible_args = { + "job_id": self.job_wrapper.job_id, + "tool": self.job_wrapper.tool, + "tool_id": self.job_wrapper.tool.id, + "job_wrapper": self.job_wrapper, + "rule_helper": RuleHelper( app ), + "app": app + } actual_args = {} @@ -82,7 +85,6 @@ # Don't hit the DB to load the job object if not needed if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names or "resource_params" in function_arg_names: job = self.job_wrapper.get_job() - history = job.history user = job.user user_email = user and str(user.email) https://bitbucket.org/galaxy/galaxy-central/commits/cfb0c443884f/ Changeset: cfb0c443884f User: jmchilton Date: 2014-08-17 22:38:34 Summary: Small refactor of galaxy.jobs.mapper to make it more testable. Affected #: 1 file diff -r ae0dd02c419c0cc781c8601ffc5fd9bf4f8af2ca -r cfb0c443884f65a6297699b35b323962e2b74539 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -35,6 +35,8 @@ self.url_to_destination = url_to_destination self.job_config = job_config + self.rules_module = galaxy.jobs.rules + def __get_rule_modules( self ): unsorted_module_names = self.__get_rule_module_names( ) ## Load modules in reverse order to allow hierarchical overrides @@ -55,11 +57,12 @@ return modules def __get_rule_module_names( self ): - rules_dir = galaxy.jobs.rules.__path__[0] + rules_dir = self.rules_module.__path__[0] names = [] for fname in os.listdir( rules_dir ): if not( fname.startswith( "_" ) ) and fname.endswith( ".py" ): - rule_module_name = "galaxy.jobs.rules.%s" % fname[:-len(".py")] + base_name = self.rules_module.__name__ + rule_module_name = "%s.%s" % (base_name, fname[:-len(".py")]) names.append( rule_module_name ) return names https://bitbucket.org/galaxy/galaxy-central/commits/949be0b9ef72/ Changeset: 949be0b9ef72 User: jmchilton Date: 2014-08-17 22:38:34 Summary: Job mapping unit tests. A bunch of unit tests for galaxy.jobs.mapper and galaxy.jobs.rule_helper testing various aspects of static and dynamic job mapping and the utility class (RuleHelper) that can be supplied to dynamic job rules. Affected #: 5 files diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_mapper.py --- /dev/null +++ b/test/unit/jobs/test_mapper.py @@ -0,0 +1,129 @@ +import jobs.test_rules + +from galaxy.jobs.mapper import JobRunnerMapper +from galaxy.jobs import JobDestination + +from galaxy.util import bunch + + +TOOL_JOB_DESTINATION = JobDestination() +DYNAMICALLY_GENERATED_DESTINATION = JobDestination() + + +def test_static_mapping(): + mapper = __mapper() + assert mapper.get_job_destination( {} ) is TOOL_JOB_DESTINATION + + +def test_caching(): + mapper = __mapper() + mapper.get_job_destination( {} ) + mapper.get_job_destination( {} ) + assert mapper.job_wrapper.tool.call_count == 1 + + +def test_dynamic_mapping(): + mapper = __mapper( __dynamic_destination( dict( function="upload" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "local_runner" + + +def test_dynamic_mapping_priorities(): + mapper = __mapper( __dynamic_destination( dict( function="tophat" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + # Next line verifies we using definition in 20_instance.py instead of + # 10_site.py. + assert mapper.job_config.rule_response == "instance_dest_id" + + +def test_dynamic_mapping_defaults_to_tool_id_as_rule(): + mapper = __mapper( __dynamic_destination( ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "tool1_dest_id" + + +def test_dynamic_mapping_function_parameters(): + mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "all_passed" + + +def test_dynamic_mapping_resource_parameters(): + mapper = __mapper( __dynamic_destination( dict( function="check_resource_params" ) ) ) + assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION + assert mapper.job_config.rule_response == "have_resource_params" + + +def __mapper( tool_job_destination=TOOL_JOB_DESTINATION ): + job_wrapper = MockJobWrapper( tool_job_destination ) + job_config = MockJobConfig() + + mapper = JobRunnerMapper( + job_wrapper, + {}, + job_config + ) + mapper.rules_module = jobs.test_rules + return mapper + + +def __dynamic_destination( params={} ): + return JobDestination( runner="dynamic", params=params ) + + +class MockJobConfig( object ): + + def __init__( self ): + self.rule_response = None + + def get_destination( self, rep ): + # Called to transform dynamic job destination rule response + # from destination id/runner url into a dynamic job destination. + self.rule_response = rep + return DYNAMICALLY_GENERATED_DESTINATION + + +class MockJobWrapper( object ): + + def __init__( self, tool_job_destination ): + self.tool = MockTool( tool_job_destination ) + self.job_id = 12345 + self.app = object() + + def is_mock_job_wrapper( self ): + return True + + def get_job(self): + def get_param_values( app, ignore_errors ): + assert app == self.app + return { + "threshold": 8, + "__job_resource": { + "__job_resource__select": "True", + "memory": "8gb" + } + } + + return bunch.Bunch( + user=bunch.Bunch( + id=6789, + email="test@example.com" + ), + get_param_values=get_param_values + ) + + +class MockTool( object ): + + def __init__( self, tool_job_destination ): + self.id = "testtoolshed/devteam/tool1/23abcd13123" + self.call_count = 0 + self.tool_job_destination = tool_job_destination + self.all_ids = [ "testtoolshed/devteam/tool1/23abcd13123", "tool1" ] + + def get_job_destination( self, params ): + self.call_count += 1 + return self.tool_job_destination + + def is_mock_tool( self ): + return True diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_rule_helper.py --- /dev/null +++ b/test/unit/jobs/test_rule_helper.py @@ -0,0 +1,98 @@ +from galaxy.util import bunch +from galaxy import model +from galaxy.model import mapping + +from galaxy.jobs.rule_helper import RuleHelper + +USER_EMAIL_1 = "u1@example.com" +USER_EMAIL_2 = "u2@example.com" +USER_EMAIL_3 = "u3@example.com" + + +def test_job_count(): + rule_helper = __rule_helper() + __assert_job_count_is( 0, rule_helper ) + + __setup_fixtures( rule_helper.app ) + + # Test raw counts for users... + __assert_job_count_is( 7, rule_helper, for_user_email=USER_EMAIL_1 ) + __assert_job_count_is( 2, rule_helper, for_user_email=USER_EMAIL_2 ) + __assert_job_count_is( 0, rule_helper, for_user_email=USER_EMAIL_3 ) + + # Test desitnation counts + __assert_job_count_is( 2, rule_helper, for_destination="local" ) + __assert_job_count_is( 7, rule_helper, for_destination="cluster1" ) + + # Test per user destination counts + __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 ) + __assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 ) + + __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 ) + __assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 ) + + # Test per user, per state destination counts + __assert_job_count_is( 3, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "queued" ] ) + __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "running" ] ) + __assert_job_count_is( 0, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "error" ] ) + __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "queued", "running", "error" ] ) + + +def __assert_job_count_is( expected_count, rule_helper, **kwds ): + acutal_count = rule_helper.job_count( **kwds ) + + if expected_count != acutal_count: + template = "Expected job count %d, actual job count %s for params %s" + raise AssertionError( template % ( expected_count, acutal_count, kwds ) ) + + +def __setup_fixtures( app ): + # user1 has 3 jobs queued and 2 jobs running on cluster1 and one queued and + # on running job on local. user2 has a queued and running job on the cluster. + # user3 has no jobs. + user1 = model.User( email=USER_EMAIL_1, password="pass1" ) + user2 = model.User( email=USER_EMAIL_2, password="pass2" ) + user3 = model.User( email=USER_EMAIL_2, password="pass2" ) + + app.add( user1, user2, user3 ) + + app.add( __new_job( user=user1, destination_id="cluster1", state="queued" ) ) + app.add( __new_job( user=user1, destination_id="cluster1", state="queued" ) ) + app.add( __new_job( user=user1, destination_id="cluster1", state="queued" ) ) + app.add( __new_job( user=user1, destination_id="cluster1", state="running" ) ) + app.add( __new_job( user=user1, destination_id="cluster1", state="running" ) ) + + app.add( __new_job( user=user1, destination_id="local", state="queued" ) ) + app.add( __new_job( user=user1, destination_id="local", state="running" ) ) + + app.add( __new_job( user=user2, destination_id="cluster1", state="queued" ) ) + app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) ) + + +def __rule_helper(): + app = MockApp() + rule_helper = RuleHelper( app ) + return rule_helper + + +def __new_job( **kwds ): + job = model.Job() + for key, value in kwds.items(): + setattr( job, key, value ) + return job + + +class MockApp( object ): + + def __init__( self ): + self.config = bunch.Bunch( ) + self.model = mapping.init( + "/tmp", + "sqlite:///:memory:", + create_tables=True + ) + + def add( self, *args ): + for arg in args: + self.model.context.add( arg ) + self.model.context.flush() diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_rules/10_site.py --- /dev/null +++ b/test/unit/jobs/test_rules/10_site.py @@ -0,0 +1,45 @@ + + +def upload(): + return 'local_runner' + + +def tophat(): + return 'site_dest_id' + + +def tool1(): + # tool1 is id to test tool mocked out in test_mapper.py, without specify + # function name in dynamic destination - this function should be used by + # default. + return 'tool1_dest_id' + + +def check_rule_params( + job_id, + tool, + tool_id, + job_wrapper, + rule_helper, + app, + job, + user, + user_email, +): + assert job_id == 12345 + assert tool.is_mock_tool() + assert tool_id == "testtoolshed/devteam/tool1/23abcd13123" + assert job_wrapper.is_mock_job_wrapper() + assert app == job_wrapper.app + assert rule_helper is not None + + assert job.user == user + assert user.id == 6789 + assert user_email == "test@example.com" + + return "all_passed" + + +def check_resource_params( resource_params ): + assert resource_params["memory"] == "8gb" + return "have_resource_params" diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_rules/20_instance.py --- /dev/null +++ b/test/unit/jobs/test_rules/20_instance.py @@ -0,0 +1,6 @@ + + +def tophat(): + # This should override definition in 10_site.py + return 'instance_dest_id' + https://bitbucket.org/galaxy/galaxy-central/commits/47efdb818f41/ Changeset: 47efdb818f41 User: jmchilton Date: 2014-08-17 22:38:34 Summary: Improved error handling for dynamic job destinations. More and more verbose error messages with updated terminology and error conditions unit tested. Affected #: 2 files diff -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa -r 47efdb818f41227c4c4ff179a0c9a28e9e154acd lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -10,6 +10,9 @@ DYNAMIC_RUNNER_NAME = "dynamic" DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url" +ERROR_MESSAGE_NO_RULE_FUNCTION = "Galaxy misconfigured - cannot find dynamic rule function name for destination %s." +ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND = "Galaxy misconfigured - no rule function named %s found in dynamic rule modules." + class JobMappingException( Exception ): @@ -150,7 +153,8 @@ expand_function = getattr( matching_rule_module, expand_function_name ) return expand_function else: - raise Exception( "Dynamic job runner cannot find function to expand job runner type - %s" % expand_function_name ) + message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % ( expand_function_name ) + raise Exception( message ) def __last_rule_module_with_function( self, function_name ): # self.rule_modules is sorted in reverse order, so find first @@ -164,6 +168,10 @@ expand_type = destination.params.get('type', "python") if expand_type == "python": expand_function_name = self.__determine_expand_function_name( destination ) + if not expand_function_name: + message = ERROR_MESSAGE_NO_RULE_FUNCTION % destination + raise Exception( message ) + expand_function = self.__get_expand_function( expand_function_name ) job_destination = self.__invoke_expand_function( expand_function ) if not isinstance(job_destination, galaxy.jobs.JobDestination): diff -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa -r 47efdb818f41227c4c4ff179a0c9a28e9e154acd test/unit/jobs/test_mapper.py --- a/test/unit/jobs/test_mapper.py +++ b/test/unit/jobs/test_mapper.py @@ -1,6 +1,10 @@ import jobs.test_rules -from galaxy.jobs.mapper import JobRunnerMapper +from galaxy.jobs.mapper import ( + JobRunnerMapper, + ERROR_MESSAGE_NO_RULE_FUNCTION, + ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND, +) from galaxy.jobs import JobDestination from galaxy.util import bunch @@ -54,6 +58,32 @@ assert mapper.job_config.rule_response == "have_resource_params" +def test_dynamic_mapping_no_function(): + dest = __dynamic_destination( dict( ) ) + mapper = __mapper( dest ) + mapper.job_wrapper.tool.all_ids = [ "no_such_function" ] + error_message = ERROR_MESSAGE_NO_RULE_FUNCTION % dest + __assert_mapper_errors_with_message( mapper, error_message ) + + +def test_dynamic_mapping_missing_function(): + dest = __dynamic_destination( dict( function="missing_func" ) ) + mapper = __mapper( dest ) + mapper.job_wrapper.tool.all_ids = [ "no_such_function" ] + error_message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % ( "missing_func" ) + __assert_mapper_errors_with_message( mapper, error_message ) + + +def __assert_mapper_errors_with_message( mapper, message ): + exception = None + try: + mapper.get_job_destination( {} ) + except Exception as e: + exception = e + assert exception + assert str( exception ) == message, "%s != %s" % ( str( exception ), message ) + + def __mapper( tool_job_destination=TOOL_JOB_DESTINATION ): job_wrapper = MockJobWrapper( tool_job_destination ) job_config = MockJobConfig() https://bitbucket.org/galaxy/galaxy-central/commits/ef7107f56ffb/ Changeset: ef7107f56ffb User: jmchilton Date: 2014-08-17 22:38:34 Summary: galaxy.job.mapper refactoring and cleanup. Affected #: 1 file diff -r 47efdb818f41227c4c4ff179a0c9a28e9e154acd -r ef7107f56ffb6377d0cb0bd04a5cfae9e6b22a2f lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -89,7 +89,12 @@ actual_args[ possible_arg_name ] = possible_args[ possible_arg_name ] # Don't hit the DB to load the job object if not needed - if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names or "resource_params" in function_arg_names: + require_db = False + for param in ["job", "user", "user_email", "resource_params"]: + if param in function_arg_names: + require_db = True + break + if require_db: job = self.job_wrapper.get_job() user = job.user user_email = user and str(user.email) @@ -106,16 +111,12 @@ if "resource_params" in function_arg_names: # Find the dymically inserted resource parameters and give them # to rule. - app = self.job_wrapper.app - param_values = job.get_param_values( app, ignore_errors=True ) + param_values = self.__job_params( job ) resource_params = {} try: resource_params_raw = param_values[ "__job_resource" ] if resource_params_raw[ "__job_resource__select" ].lower() in [ "1", "yes", "true" ]: for key, value in resource_params_raw.iteritems(): - #if key.startswith( "__job_resource_param__" ): - # resource_key = key[ len( "__job_resource_param__" ): ] - # resource_params[ resource_key ] = value resource_params[ key ] = value except KeyError: pass @@ -123,6 +124,11 @@ return expand_function( **actual_args ) + def __job_params( self, job ): + app = self.job_wrapper.app + param_values = job.get_param_values( app, ignore_errors=True ) + return param_values + def __convert_url_to_destination( self, url ): """ Job runner URLs are deprecated, but dynamic mapper functions may still @@ -173,28 +179,28 @@ raise Exception( message ) expand_function = self.__get_expand_function( expand_function_name ) - job_destination = self.__invoke_expand_function( expand_function ) - if not isinstance(job_destination, galaxy.jobs.JobDestination): - job_destination_rep = str(job_destination) # Should be either id or url - if '://' in job_destination_rep: - job_destination = self.__convert_url_to_destination(job_destination_rep) - else: - job_destination = self.job_config.get_destination(job_destination_rep) - return job_destination + return self.__handle_rule( expand_function ) else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) + def __handle_rule( self, rule_function ): + job_destination = self.__invoke_expand_function( rule_function ) + if not isinstance(job_destination, galaxy.jobs.JobDestination): + job_destination_rep = str(job_destination) # Should be either id or url + if '://' in job_destination_rep: + job_destination = self.__convert_url_to_destination(job_destination_rep) + else: + job_destination = self.job_config.get_destination(job_destination_rep) + return job_destination + def __cache_job_destination( self, params, raw_job_destination=None ): if raw_job_destination is None: raw_job_destination = self.job_wrapper.tool.get_job_destination( params ) - #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params ) if raw_job_destination.runner == DYNAMIC_RUNNER_NAME: job_destination = self.__handle_dynamic_job_destination( raw_job_destination ) else: job_destination = raw_job_destination - #job_destination_id_or_tag = raw_job_destination_id_or_tag self.cached_job_destination = job_destination - #self.cached_job_destination_id_or_tag = job_destination_id_or_tag def get_job_destination( self, params ): """ https://bitbucket.org/galaxy/galaxy-central/commits/572a8d5ee218/ Changeset: 572a8d5ee218 User: jmchilton Date: 2014-08-17 22:38:34 Summary: Small optimization in galaxy.jobs.rule_helper. Affected #: 1 file diff -r ef7107f56ffb6377d0cb0bd04a5cfae9e6b22a2f -r 572a8d5ee218aeb37f77660948cb75a439936868 lib/galaxy/jobs/rule_helper.py --- a/lib/galaxy/jobs/rule_helper.py +++ b/lib/galaxy/jobs/rule_helper.py @@ -82,6 +82,10 @@ query = query.filter( model.Job.table.c.update_time >= start_date ) if for_job_states is not None: - query = query.filter( model.Job.table.c.state.in_( for_job_states ) ) + # Optimize the singleton case - can be much more performant in my experience. + if len( for_job_states ) == 1: + query = query.filter( model.Job.table.c.state == for_job_states[ 0 ] ) + else: + query = query.filter( model.Job.table.c.state.in_( for_job_states ) ) return query Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.