commit/galaxy-central: jgoecks: Parameterize per-tool job runners so that parameter name/value pairs can be used to define multiple runners per tool. Documentation is in sample universe file. Add 'params' column to jobs table to store job parameters, and add source parameter for all jobs initiated in Trackster.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/c754d8c07440/ changeset: c754d8c07440 user: jgoecks date: 2012-02-28 23:50:26 summary: Parameterize per-tool job runners so that parameter name/value pairs can be used to define multiple runners per tool. Documentation is in sample universe file. Add 'params' column to jobs table to store job parameters, and add source parameter for all jobs initiated in Trackster. affected #: 8 files diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -180,9 +180,38 @@ # Heartbeat log file name override if global_conf is not None: self.heartbeat_log = global_conf.get( 'heartbeat_log', 'heartbeat.log' ) - #Store per-tool runner config + #Store per-tool runner configs. try: - self.tool_runners = global_conf_parser.items("galaxy:tool_runners") + tool_runners_config = global_conf_parser.items("galaxy:tool_runners") + + # Process config to group multiple configs for the same tool. + tool_runners = {} + for entry in tool_runners_config: + tool_config, url = entry + tool = None + runner_dict = {} + if tool_config.find("[") != -1: + # Found tool with additional params; put params in dict. + tool, params = tool_config[:-1].split( "[" ) + param_dict = {} + for param in params.split( "," ): + name, value = param.split( "@" ) + param_dict[ name ] = value + runner_dict[ 'params' ] = param_dict + else: + tool = tool_config + + # Add runner URL. + runner_dict[ 'url' ] = url + + # Create tool entry if necessary. + if tool not in tool_runners: + tool_runners[ tool ] = [] + + # Add entry to runners. + tool_runners[ tool ].append( runner_dict ) + + self.tool_runners = tool_runners except ConfigParser.NoSectionError: self.tool_runners = [] self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' ) diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -336,12 +336,15 @@ self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) + self.params = None + if job.params: + self.params = from_json_string( job.params ) self.__user_system_pwent = None self.__galaxy_system_pwent = None def get_job_runner( self ): - return self.tool.job_runner + return self.tool.get_job_runner( self.params ) def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -420,7 +420,8 @@ Column( "job_runner_name", String( 255 ) ), Column( "job_runner_external_id", String( 255 ) ), Column( "object_store_id", TrimmedString( 255 ), index=True ), - Column( "imported", Boolean, default=False, index=True ) ) + Column( "imported", Boolean, default=False, index=True ), + Column( "params", TrimmedString(255), index=True ) ) JobParameter.table = Table( "job_parameter", metadata, Column( "id", Integer, primary_key=True ), diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/model/migrate/versions/0093_add_job_params_col.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0093_add_job_params_col.py @@ -0,0 +1,49 @@ +""" +Migration script to create "params" column in job table. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import logging +log = logging.getLogger( __name__ ) + +# Need our custom types, but don't import anything else from model +from galaxy.model.custom_types import * + +metadata = MetaData( migrate_engine ) +db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) ) + +# Column to add. +params_col = Column( "params", TrimmedString(255), index=True ) + +def display_migration_details(): + print "" + print "This migration script adds a 'params' column to the Job table." + +def upgrade(): + print __doc__ + metadata.reflect() + + # Add column to Job table. + try: + Job_table = Table( "job", metadata, autoload=True ) + params_col.create( Job_table ) + assert params_col is Job_table.c.params + + except Exception, e: + print str(e) + log.debug( "Adding column 'params' to job table failed: %s" % str( e ) ) + +def downgrade(): + metadata.reflect() + + # Drop column from Job table. + try: + Job_table = Table( "job", metadata, autoload=True ) + params_col = Job_table.c.params + params_col.drop() + except Exception, e: + log.debug( "Dropping column 'params' from job table failed: %s" % ( str( e ) ) ) \ No newline at end of file diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -562,6 +562,31 @@ if tool_version: return tool_version.get_version_ids( self.app ) return [] + def get_job_runner( self, job_params=None ): + # Look through runners to find one with matching parameters. + selected_runner = None + if len( self.job_runners ) == 1: + # Most tools have a single runner. + selected_runner = self.job_runners[0] + elif job_params is None: + # Use job runner with no params + for runner in self.job_runners: + if "params" not in runner: + selected_runner = runner + else: + # Find runner with matching parameters. + for runner in self.job_runners: + if "params" in runner: + match = True + runner_params = runner[ "params" ] + for param, value in job_params.items(): + if param not in runner_params or \ + runner_params[ param ] != job_params[ param ]: + match = False + break + if match: + selected_runner = runner + return selected_runner[ "url" ] def parse( self, root, guid=None ): """ Read tool configuration from the element `root` and fill in `self`. @@ -626,17 +651,18 @@ self.parallelism = ToolParallelismInfo(parallelism) else: self.parallelism = None + # Set job runner(s). Each runner is a dict with 'url' and, optionally, 'params'. if self.app.config.start_job_runners is None: # Jobs are always local regardless of tool config if no additional # runners are started - self.job_runner = "local:///" + self.job_runners = [ { "url" : "local:///" } ] else: # Set job runner to the cluster default - self.job_runner = self.app.config.default_cluster_job_runner - for tup in self.app.config.tool_runners: - if tup[0] == self.id.lower(): - self.job_runner = tup[1] - break + self.job_runners = [ { "url" : self.app.config.default_cluster_job_runner } ] + # Set custom runner(s) if they're defined. + self_id = self.id.lower() + if self_id in self.app.config.tool_runners: + self.job_runners = self.app.config.tool_runners[ self_id ] # Is this a 'hidden' tool (hidden in tool menu) self.hidden = util.xml_text(root, "hidden") if self.hidden: self.hidden = util.string_as_bool(self.hidden) diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/tools/actions/__init__.py --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -1,6 +1,7 @@ from galaxy.model import LibraryDatasetDatasetAssociation from galaxy.util.bunch import Bunch from galaxy.util.odict import odict +from galaxy.util.json import to_json_string from galaxy.tools.parameters import * from galaxy.tools.parameters.grouping import * from galaxy.util.template import fill_template @@ -100,7 +101,7 @@ tool.visit_inputs( param_values, visitor ) return input_datasets - def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True, set_output_history=True, history=None ): + def execute(self, tool, trans, incoming={}, return_job=False, set_output_hid=True, set_output_history=True, history=None, job_params=None ): """ Executes a tool, creating job and tool outputs, associating them, and submitting the job to the job queue. If history is not specified, use @@ -389,6 +390,8 @@ for name, dataset in out_data.iteritems(): job.add_output_dataset( name, dataset ) job.object_store_id = object_store_id + if job_params: + job.params = to_json_string( job_params ) trans.sa_session.add( job ) trans.sa_session.flush() # Some tools are not really executable, but jobs are still created for them ( for record keeping ). diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 lib/galaxy/web/controllers/tracks.py --- a/lib/galaxy/web/controllers/tracks.py +++ b/lib/galaxy/web/controllers/tracks.py @@ -991,7 +991,9 @@ # Execute tool and handle outputs. # try: - subset_job, subset_job_outputs = tool.execute( trans, incoming=tool_params, history=target_history ) + subset_job, subset_job_outputs = tool.execute( trans, incoming=tool_params, + history=target_history, + job_params={ "source" : "trackster" } ) except Exception, e: # Lots of things can go wrong when trying to execute tool. return to_json_string( { "error" : True, "message" : e.__class__.__name__ + ": " + str(e) } ) diff -r c7ca68b622823d146fac4db1cb0d5963e1daa1a9 -r c754d8c07440e4376853eb3b804af366b1442ff0 universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -630,8 +630,13 @@ # ---- Tool Job Runners ----------------------------------------------------- -# Individual per-tool job runner overrides. If not listed here, a tool will -# run with the runner defined with default_cluster_job_runner. +# Individual per-tool job runner overrides. Parameters can be included to define +# multiple runners per tool. E.g. to run Cufflinks jobs initiated from Trackster +# differently than standard Cufflinks jobs: +# cufflinks = local:/// +# cufflinks[source@trackster] = local:/// +# If not listed here, a tool will run with the runner defined with +# default_cluster_job_runner. [galaxy:tool_runners] Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket