commit/galaxy-central: 3 new changesets
3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/66395a9d870f/ changeset: 66395a9d870f user: jmchilton date: 2012-06-06 03:35:33 summary: First attempt at updated dynamic job runners. affected #: 3 files diff -r 1890cb0d1cfbb3ef5a09affcdd18d2b8acf7d811 -r 66395a9d870fbe48660dbf75cc0a59264bb862f6 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -21,6 +21,7 @@ from galaxy.util.expressions import ExpressionContext from galaxy.jobs.actions.post import ActionBox from galaxy.exceptions import ObjectInvalid +from galaxy.jobs.mapper import JobRunnerMapper log = logging.getLogger( __name__ ) @@ -80,6 +81,7 @@ self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) + self.job_runner_mapper = JobRunnerMapper( self ) self.params = None if job.params: self.params = from_json_string( job.params ) @@ -88,7 +90,8 @@ self.__galaxy_system_pwent = None def get_job_runner( self ): - return self.tool.get_job_runner( self.params ) + job_runner = self.job_runner_mapper.get_job_runner( self.params ) + return job_runner def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) diff -r 1890cb0d1cfbb3ef5a09affcdd18d2b8acf7d811 -r 66395a9d870fbe48660dbf75cc0a59264bb862f6 lib/galaxy/jobs/mapper.py --- /dev/null +++ b/lib/galaxy/jobs/mapper.py @@ -0,0 +1,82 @@ +import inspect, sys + +import galaxy.jobs.rules + +DYNAMIC_RUNNER_PREFIX = "dynamic:///" + +class JobRunnerMapper( object ): + + def __init__( self, job_wrapper ): + self.job_wrapper = job_wrapper + + def __invoke_expand_function( self, expand_function ): + function_arg_names = inspect.getargspec( expand_function ).args + + possible_args = { "job_id" : self.job_wrapper.job_id, + "tool" : self.job_wrapper.tool, + "tool_id" : self.job_wrapper.tool.id, + "job_wrapper" : self.job_wrapper, + "app" : self.job_wrapper.app } + + actual_args = {} + + # Populate needed args + for possible_arg_name in possible_args: + if possible_arg_name in function_arg_names: + actual_args[ possible_arg_name ] = possible_args[ possible_arg_name ] + + # Don't hit the DB to load the job object is not needed + if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names: + job = self.job_wrapper.get_job() + history = job.history + user = history and history.user + user_email = user and str(user.email) + + if "job" in function_arg_names: + actual_args[ "job" ] = job + + if "user" in function_arg_names: + actual_args[ "user" ] = user + + if "user_email" in function_arg_names: + actual_args[ "user_email" ] = user_email + + return expand_function( **actual_args ) + + def __determine_expand_function_name( self, option_parts ): + # default look for function with same name as tool, unless one specified + expand_function_name = self.job_wrapper.tool.id + if len( option_parts ) > 1: + expand_function_name = option_parts[ 1 ] + return expand_function_name + + def __get_expand_function( self, expand_function_name ): + rules_module = sys.modules[ "galaxy.jobs.rules" ] + if hasattr( rules_module, expand_function_name ): + expand_function = getattr( rules_module, expand_function_name ) + return expand_function + else: + raise Exception( "Dynamic job runner cannot find function to expand job runner type - %s" % expand_function_name ) + + def __expand_dynamic_job_runner( self, options_str ): + option_parts = options_str.split( '/' ) + expand_type = option_parts[ 0 ] + if expand_type == "python": + expand_function_name = self.__determine_expand_function_name( option_parts ) + expand_function = self.__get_expand_function( expand_function_name ) + return self.__invoke_expand_function( expand_function ) + else: + raise Exception( "Unhandled dynamic job runner type specified - %s" % calculation_type ) + + def __cache_job_runner( self, params ): + raw_job_runner = self.job_wrapper.tool.get_job_runner( params ) + if raw_job_runner.startswith( DYNAMIC_RUNNER_PREFIX ): + job_runner = self.__expand_dynamic_job_runner( raw_job_runner[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) + else: + job_runner = raw_job_runner + self.cached_job_runner = job_runner + + def get_job_runner( self, params ): + if not hasattr( self, 'cached_job_runner' ): + self.__cache_job_runner( params ) + return self.cached_job_runner diff -r 1890cb0d1cfbb3ef5a09affcdd18d2b8acf7d811 -r 66395a9d870fbe48660dbf75cc0a59264bb862f6 lib/galaxy/jobs/rules.py --- /dev/null +++ b/lib/galaxy/jobs/rules.py @@ -0,0 +1,9 @@ +import logging + +log = logging.getLogger( __name__ ) + +# Add functions to dynamically map job descriptions to job runners in +# this file. These functions can optionally take in any of the +# following arguments - job_wrapper, app, user_email, job, tool, +# email, tool_id, and job_id. + https://bitbucket.org/galaxy/galaxy-central/changeset/f54b848298d4/ changeset: f54b848298d4 user: jmchilton date: 2012-06-10 06:00:16 summary: Rework dynamic job runner config so that instead using a rules.py file for storing rules, they should be placed in lib/galaxy/jobs/rules/. The rules submodules are "searched" in lexiographically allowing for hierarchical configuration overrides (e.g. naming schemes like: 000_galaxy_rules.py, 100_site_rules.py, 200_instance_rules.py) affected #: 3 files diff -r 66395a9d870fbe48660dbf75cc0a59264bb862f6 -r f54b848298d4d78d6fbcc85e9887960f21b5c239 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -1,13 +1,50 @@ -import inspect, sys +import logging +import inspect +import os + +log = logging.getLogger( __name__ ) import galaxy.jobs.rules DYNAMIC_RUNNER_PREFIX = "dynamic:///" class JobRunnerMapper( object ): - + """ + This class is responsible to managing the mapping of jobs + (in the form of job_wrappers) to job runner strings. + """ + def __init__( self, job_wrapper ): self.job_wrapper = job_wrapper + self.rule_modules = self.__get_rule_modules( ) + + def __get_rule_modules( self ): + unsorted_module_names = self.__get_rule_module_names( ) + ## Load modules in reverse order to allow hierarchical overrides + ## i.e. 000_galaxy_rules.py, 100_site_rules.py, 200_instance_rules.py + module_names = sorted( unsorted_module_names, reverse=True ) + modules = [] + for rule_module_name in module_names: + try: + module = __import__( rule_module_name ) + for comp in rule_module_name.split( "." )[1:]: + module = getattr( module, comp ) + modules.append( module ) + except BaseException, exception: + exception_str = str( exception ) + message = "%s rule module could not be loaded: %s" % ( rule_module_name, exception_str ) + log.debug( message ) + continue + return modules + + def __get_rule_module_names( self ): + rules_dir = galaxy.jobs.rules.__path__[0] + names = [] + for fname in os.listdir( rules_dir ): + if not( fname.startswith( "_" ) ) and fname.endswith( ".py" ): + rule_module_name = "galaxy.jobs.rules.%s" % fname[:-len(".py")] + names.append( rule_module_name ) + return names def __invoke_expand_function( self, expand_function ): function_arg_names = inspect.getargspec( expand_function ).args @@ -15,7 +52,7 @@ possible_args = { "job_id" : self.job_wrapper.job_id, "tool" : self.job_wrapper.tool, "tool_id" : self.job_wrapper.tool.id, - "job_wrapper" : self.job_wrapper, + "job_wrapper" : self.job_wrapper, "app" : self.job_wrapper.app } actual_args = {} @@ -25,7 +62,7 @@ if possible_arg_name in function_arg_names: actual_args[ possible_arg_name ] = possible_args[ possible_arg_name ] - # Don't hit the DB to load the job object is not needed + # Don't hit the DB to load the job object if not needed if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names: job = self.job_wrapper.get_job() history = job.history @@ -51,13 +88,21 @@ return expand_function_name def __get_expand_function( self, expand_function_name ): - rules_module = sys.modules[ "galaxy.jobs.rules" ] - if hasattr( rules_module, expand_function_name ): - expand_function = getattr( rules_module, expand_function_name ) + matching_rule_module = self.__last_rule_module_with_function( expand_function_name ) + if matching_rule_module: + expand_function = getattr( matching_rule_module, expand_function_name ) return expand_function else: raise Exception( "Dynamic job runner cannot find function to expand job runner type - %s" % expand_function_name ) - + + def __last_rule_module_with_function( self, function_name ): + # self.rule_modules is sorted in reverse order, so find first + # wiht function + for rule_module in self.rule_modules: + if hasattr( rule_module, function_name ): + return rule_module + return None + def __expand_dynamic_job_runner( self, options_str ): option_parts = options_str.split( '/' ) expand_type = option_parts[ 0 ] @@ -77,6 +122,9 @@ self.cached_job_runner = job_runner def get_job_runner( self, params ): + """ + Cache the job_runner string to avoid recalculation. + """ if not hasattr( self, 'cached_job_runner' ): self.__cache_job_runner( params ) return self.cached_job_runner diff -r 66395a9d870fbe48660dbf75cc0a59264bb862f6 -r f54b848298d4d78d6fbcc85e9887960f21b5c239 lib/galaxy/jobs/rules.py --- a/lib/galaxy/jobs/rules.py +++ /dev/null @@ -1,9 +0,0 @@ -import logging - -log = logging.getLogger( __name__ ) - -# Add functions to dynamically map job descriptions to job runners in -# this file. These functions can optionally take in any of the -# following arguments - job_wrapper, app, user_email, job, tool, -# email, tool_id, and job_id. - https://bitbucket.org/galaxy/galaxy-central/changeset/0f32c2fc37cd/ changeset: 0f32c2fc37cd user: natefoo date: 2012-06-11 16:26:12 summary: Merged in jmchilton/umn-galaxy-central (pull request #47) affected #: 3 files diff -r 1df26d9240bb4e115cdd8cd019673c56c8e4ad67 -r 0f32c2fc37cdc2959c86d2fdb0debe9a4c66282b lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -21,6 +21,7 @@ from galaxy.util.expressions import ExpressionContext from galaxy.jobs.actions.post import ActionBox from galaxy.exceptions import ObjectInvalid +from galaxy.jobs.mapper import JobRunnerMapper log = logging.getLogger( __name__ ) @@ -80,6 +81,7 @@ self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) + self.job_runner_mapper = JobRunnerMapper( self ) self.params = None if job.params: self.params = from_json_string( job.params ) @@ -88,7 +90,8 @@ self.__galaxy_system_pwent = None def get_job_runner( self ): - return self.tool.get_job_runner( self.params ) + job_runner = self.job_runner_mapper.get_job_runner( self.params ) + return job_runner def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) diff -r 1df26d9240bb4e115cdd8cd019673c56c8e4ad67 -r 0f32c2fc37cdc2959c86d2fdb0debe9a4c66282b lib/galaxy/jobs/mapper.py --- /dev/null +++ b/lib/galaxy/jobs/mapper.py @@ -0,0 +1,130 @@ +import logging +import inspect +import os + +log = logging.getLogger( __name__ ) + +import galaxy.jobs.rules + +DYNAMIC_RUNNER_PREFIX = "dynamic:///" + +class JobRunnerMapper( object ): + """ + This class is responsible to managing the mapping of jobs + (in the form of job_wrappers) to job runner strings. + """ + + def __init__( self, job_wrapper ): + self.job_wrapper = job_wrapper + self.rule_modules = self.__get_rule_modules( ) + + def __get_rule_modules( self ): + unsorted_module_names = self.__get_rule_module_names( ) + ## Load modules in reverse order to allow hierarchical overrides + ## i.e. 000_galaxy_rules.py, 100_site_rules.py, 200_instance_rules.py + module_names = sorted( unsorted_module_names, reverse=True ) + modules = [] + for rule_module_name in module_names: + try: + module = __import__( rule_module_name ) + for comp in rule_module_name.split( "." )[1:]: + module = getattr( module, comp ) + modules.append( module ) + except BaseException, exception: + exception_str = str( exception ) + message = "%s rule module could not be loaded: %s" % ( rule_module_name, exception_str ) + log.debug( message ) + continue + return modules + + def __get_rule_module_names( self ): + rules_dir = galaxy.jobs.rules.__path__[0] + names = [] + for fname in os.listdir( rules_dir ): + if not( fname.startswith( "_" ) ) and fname.endswith( ".py" ): + rule_module_name = "galaxy.jobs.rules.%s" % fname[:-len(".py")] + names.append( rule_module_name ) + return names + + def __invoke_expand_function( self, expand_function ): + function_arg_names = inspect.getargspec( expand_function ).args + + possible_args = { "job_id" : self.job_wrapper.job_id, + "tool" : self.job_wrapper.tool, + "tool_id" : self.job_wrapper.tool.id, + "job_wrapper" : self.job_wrapper, + "app" : self.job_wrapper.app } + + actual_args = {} + + # Populate needed args + for possible_arg_name in possible_args: + if possible_arg_name in function_arg_names: + actual_args[ possible_arg_name ] = possible_args[ possible_arg_name ] + + # Don't hit the DB to load the job object if not needed + if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names: + job = self.job_wrapper.get_job() + history = job.history + user = history and history.user + user_email = user and str(user.email) + + if "job" in function_arg_names: + actual_args[ "job" ] = job + + if "user" in function_arg_names: + actual_args[ "user" ] = user + + if "user_email" in function_arg_names: + actual_args[ "user_email" ] = user_email + + return expand_function( **actual_args ) + + def __determine_expand_function_name( self, option_parts ): + # default look for function with same name as tool, unless one specified + expand_function_name = self.job_wrapper.tool.id + if len( option_parts ) > 1: + expand_function_name = option_parts[ 1 ] + return expand_function_name + + def __get_expand_function( self, expand_function_name ): + matching_rule_module = self.__last_rule_module_with_function( expand_function_name ) + if matching_rule_module: + expand_function = getattr( matching_rule_module, expand_function_name ) + return expand_function + else: + raise Exception( "Dynamic job runner cannot find function to expand job runner type - %s" % expand_function_name ) + + def __last_rule_module_with_function( self, function_name ): + # self.rule_modules is sorted in reverse order, so find first + # wiht function + for rule_module in self.rule_modules: + if hasattr( rule_module, function_name ): + return rule_module + return None + + def __expand_dynamic_job_runner( self, options_str ): + option_parts = options_str.split( '/' ) + expand_type = option_parts[ 0 ] + if expand_type == "python": + expand_function_name = self.__determine_expand_function_name( option_parts ) + expand_function = self.__get_expand_function( expand_function_name ) + return self.__invoke_expand_function( expand_function ) + else: + raise Exception( "Unhandled dynamic job runner type specified - %s" % calculation_type ) + + def __cache_job_runner( self, params ): + raw_job_runner = self.job_wrapper.tool.get_job_runner( params ) + if raw_job_runner.startswith( DYNAMIC_RUNNER_PREFIX ): + job_runner = self.__expand_dynamic_job_runner( raw_job_runner[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) + else: + job_runner = raw_job_runner + self.cached_job_runner = job_runner + + def get_job_runner( self, params ): + """ + Cache the job_runner string to avoid recalculation. + """ + if not hasattr( self, 'cached_job_runner' ): + self.__cache_job_runner( params ) + return self.cached_job_runner Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket