commit/galaxy-central: natefoo: Allow for job handler selection based on job params (like source).
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/0626ddb49084/ changeset: 0626ddb49084 user: natefoo date: 2012-03-30 20:10:45 summary: Allow for job handler selection based on job params (like source). affected #: 4 files diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -198,13 +198,27 @@ if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ): self.track_jobs_in_database = False # Store per-tool runner configs + self.tool_handlers = self.__read_tool_job_config( global_conf_parser, 'galaxy:tool_handlers', 'name' ) + self.tool_runners = self.__read_tool_job_config( global_conf_parser, 'galaxy:tool_runners', 'url' ) + self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' ) + # Cloud configuration options + self.enable_cloud_launch = string_as_bool( kwargs.get( 'enable_cloud_launch', False ) ) + # Galaxy messaging (AMQP) configuration options + self.amqp = {} try: - tool_runners_config = global_conf_parser.items("galaxy:tool_runners") + amqp_config = global_conf_parser.items("galaxy_amqp") + except ConfigParser.NoSectionError: + amqp_config = {} + for k, v in amqp_config: + self.amqp[k] = v + def __read_tool_job_config( self, global_conf_parser, section, key ): + try: + tool_runners_config = global_conf_parser.items( section ) # Process config to group multiple configs for the same tool. - tool_runners = {} + rval = {} for entry in tool_runners_config: - tool_config, url = entry + tool_config, val = entry tool = None runner_dict = {} if tool_config.find("[") != -1: @@ -219,29 +233,18 @@ tool = tool_config # Add runner URL. - runner_dict[ 'url' ] = url + runner_dict[ key ] = val # Create tool entry if necessary. - if tool not in tool_runners: - tool_runners[ tool ] = [] + if tool not in rval: + rval[ tool ] = [] # Add entry to runners. - tool_runners[ tool ].append( runner_dict ) + rval[ tool ].append( runner_dict ) - self.tool_runners = tool_runners + return rval except ConfigParser.NoSectionError: - self.tool_runners = [] - self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' ) - # Cloud configuration options - self.enable_cloud_launch = string_as_bool( kwargs.get( 'enable_cloud_launch', False ) ) - # Galaxy messaging (AMQP) configuration options - self.amqp = {} - try: - amqp_config = global_conf_parser.items("galaxy_amqp") - except ConfigParser.NoSectionError: - amqp_config = {} - for k, v in amqp_config: - self.amqp[k] = v + return [] def get( self, key, default ): return self.config_dict.get( key, default ) def get_bool( self, key, default ): diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -144,7 +144,7 @@ pass for job in jobs_to_check: - job.handler = self.__select_handler( job ) + job.handler = self.__get_handler( job ) log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) ) self.sa_session.add( job ) @@ -157,9 +157,15 @@ for job in jobs_to_check: self.job_handler.job_queue.put( job.id, job.tool_id ) - def __select_handler( self, job ): - # TODO: handler selection based on params, tool, etc. - return random.choice( self.app.config.job_handlers ) + def __get_handler( self, job ): + try: + params = None + if job.params: + params = from_json_string( job.params ) + return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params ) + except: + log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) ) + return random.choice( self.app.config.job_handlers ) def put( self, job_id, tool ): """Add a job to the queue (by job identifier)""" diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -5,7 +5,7 @@ pkg_resources.require( "simplejson" ) -import logging, os, string, sys, tempfile, glob, shutil, types, urllib, subprocess +import logging, os, string, sys, tempfile, glob, shutil, types, urllib, subprocess, random import simplejson import binascii from UserDict import DictMixin @@ -682,31 +682,35 @@ if tool_version: return tool_version.get_version_ids( self.app ) return [] - def get_job_runner( self, job_params=None ): - # Look through runners to find one with matching parameters. - selected_runner = None - if len( self.job_runners ) == 1: - # Most tools have a single runner. - selected_runner = self.job_runners[0] + def __get_job_run_config( self, run_configs, key, job_params=None ): + # Look through runners/handlers to find one with matching parameters. + available_configs = [] + if len( run_configs ) == 1: + # Most tools have a single config. + return run_configs[0][ key ] # return to avoid random when this will be the case most of the time elif job_params is None: - # Use job runner with no params - for runner in self.job_runners: - if "params" not in runner: - selected_runner = runner + # Use job config with no params + for config in run_configs: + if "params" not in config: + available_configs.append( config ) else: - # Find runner with matching parameters. - for runner in self.job_runners: - if "params" in runner: + # Find config with matching parameters. + for config in run_configs: + if "params" in config: match = True - runner_params = runner[ "params" ] + config_params = config[ "params" ] for param, value in job_params.items(): - if param not in runner_params or \ - runner_params[ param ] != job_params[ param ]: + if param not in config_params or \ + config_params[ param ] != job_params[ param ]: match = False break if match: - selected_runner = runner - return selected_runner[ "url" ] + available_configs.append( config ) + return random.choice( available_configs )[ key ] + def get_job_runner( self, job_params=None ): + return self.__get_job_run_config( self.job_runners, key='url', job_params=job_params ) + def get_job_handler( self, job_params=None ): + return self.__get_job_run_config( self.job_handlers, key='name', job_params=job_params ) def parse( self, root, guid=None ): """ Read tool configuration from the element `root` and fill in `self`. @@ -773,6 +777,12 @@ self.parallelism = ToolParallelismInfo(parallelism) else: self.parallelism = None + # Set job handler(s). Each handler is a dict with 'url' and, optionally, 'params'. + self_id = self.id.lower() + self.job_handlers = [ { "name" : name } for name in self.app.config.job_handlers ] + # Set custom handler(s) if they're defined. + if self_id in self.app.config.tool_handlers: + self.job_handlers = self.app.config.tool_handlers[ self_id ] # Set job runner(s). Each runner is a dict with 'url' and, optionally, 'params'. if self.app.config.start_job_runners is None: # Jobs are always local regardless of tool config if no additional @@ -782,7 +792,6 @@ # Set job runner to the cluster default self.job_runners = [ { "url" : self.app.config.default_cluster_job_runner } ] # Set custom runner(s) if they're defined. - self_id = self.id.lower() if self_id in self.app.config.tool_runners: self.job_runners = self.app.config.tool_runners[ self_id ] # Is this a 'hidden' tool (hidden in tool menu) diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -664,18 +664,30 @@ #pbs_stage_path = #pbs_dataset_server = -# ---- Tool Job Runners ----------------------------------------------------- +# ---- Per-Tool Job Management ---------------------------------------------- -# Individual per-tool job runner overrides. Parameters can be included to define -# multiple runners per tool. E.g. to run Cufflinks jobs initiated from Trackster +# Per-tool job handler and runner overrides. Parameters can be included to define multiple +# runners per tool. E.g. to run Cufflinks jobs initiated from Trackster # differently than standard Cufflinks jobs: -# cufflinks = local:/// -# cufflinks[source@trackster] = local:/// -# If not listed here, a tool will run with the runner defined with -# default_cluster_job_runner. +# +# cufflinks = local:/// +# cufflinks[source@trackster] = local:/// + +[galaxy:tool_handlers] + +# By default, Galaxy will select a handler at random from the list of +# job_handlers set above. You can override as in the following examples: +# +#upload1 = upload_handler +#cufflinks[source@trackster] = realtime_handler [galaxy:tool_runners] +# If not listed here, a tool will run with the runner defined with +# default_cluster_job_runner. These overrides for local:/// are done because +# these tools can fetch data from remote sites, which may not be suitable to +# run on a cluster (if it does not have access to the Internet, for example). + biomart = local:/// encode_db1 = local:/// hbvar = local:/// Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket