commit/galaxy-central: natefoo: Make loading of job runners dynamic. Changes to lib/galaxy/config.py and lib/galaxy/jobs/__init__.py are no longer required. Existing runners which are not part of the Galaxy source will need to define the class name to instantiate in the __all__ list in the runner. See the included runners for an example.
1 new changeset in galaxy-central: http://bitbucket.org/galaxy/galaxy-central/changeset/6937b1a3b3a2/ changeset: r5047:6937b1a3b3a2 user: natefoo date: 2011-02-11 19:12:27 summary: Make loading of job runners dynamic. Changes to lib/galaxy/config.py and lib/galaxy/jobs/__init__.py are no longer required. Existing runners which are not part of the Galaxy source will need to define the class name to instantiate in the __all__ list in the runner. See the included runners for an example. affected #: 7 files (2.2 KB) --- a/lib/galaxy/config.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/config.py Fri Feb 11 13:12:27 2011 -0500 @@ -189,16 +189,6 @@ for path in self.tool_config, self.datatypes_config: if not os.path.isfile(path): raise ConfigurationError("File not found: %s" % path ) - # Check job runners so the admin can scramble dependent egg. - if self.start_job_runners is not None: - runner_to_egg = dict( pbs = 'pbs_python', sge = 'DRMAA_python', drmaa = 'drmaa' ) - for runner in self.start_job_runners.split( ',' ): - try: - pkg_resources.require( runner_to_egg[runner] ) - except eggs.EggNotFetchable, e: - raise eggs.EggNotFetchable( 'You must scramble the %s egg to use the %s job runner. Instructions are available at:\n http://bitbucket.org/galaxy/galaxy-central/wiki/Config/Cluster' % ( runner_to_egg[runner], runner ) ) - except KeyError: - raise Exception( 'No such job runner: %s. Please double-check the value of start_job_runners in universe_wsgi.ini' % runner ) # Check for deprecated options. for key in self.config_dict.keys(): if key in self.deprecated_options: --- a/lib/galaxy/jobs/__init__.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/jobs/__init__.py Fri Feb 11 13:12:27 2011 -0500 @@ -1015,44 +1015,58 @@ start_job_runners.extend( app.config.start_job_runners.split(",") ) if app.config.use_tasked_jobs: start_job_runners.append("tasks") - for runner_name in start_job_runners: - if runner_name == "local": - import runners.local - self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) - elif runner_name == "tasks": - import runners.tasks - self.job_runners[runner_name] = runners.tasks.TaskedJobRunner( app ) - elif runner_name == "pbs": - import runners.pbs - self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) - elif runner_name == "sge": - import runners.sge - self.job_runners[runner_name] = runners.sge.SGEJobRunner( app ) - elif runner_name == "drmaa": - import runners.drmaa - self.job_runners[runner_name] = runners.drmaa.DRMAAJobRunner( app ) - else: - log.error( "Unable to start unknown job runner: '%s'" %runner_name ) + for name in start_job_runners: + self._load_plugin( name ) + + def _load_plugin( self, name ): + module_name = 'galaxy.jobs.runners.' + name + try: + module = __import__( module_name ) + except: + log.exception( 'Job runner is not loadable: %s' % module_name ) + return + for comp in module_name.split( "." )[1:]: + module = getattr( module, comp ) + if '__all__' not in dir( module ): + log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name ) + return + for obj in module.__all__: + display_name = ':'.join( ( module_name, obj ) ) + runner = getattr( module, obj ) + self.job_runners[obj] = runner( self.app ) + log.debug( 'Loaded job runner: %s' % display_name ) def put( self, job_wrapper ): - if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): - runner_name = "tasks" - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) - self.job_runners[runner_name].put( job_wrapper ) - else: - runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) - self.job_runners[runner_name].put( job_wrapper ) + try: + if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): + runner_name = "tasks" + log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) + else: + runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] + log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) + except KeyError: + log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) def stop( self, job ): runner_name = ( job.job_runner_name.split(":", 1) )[0] log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) - self.job_runners[runner_name].stop_job( job ) + try: + self.job_runners[runner_name].stop_job( job ) + except KeyError: + log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + # Job and output dataset states have already been updated, so nothing is done here. def recover( self, job, job_wrapper ): runner_name = ( job.job_runner_name.split(":", 1) )[0] log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) - self.job_runners[runner_name].recover( job, job_wrapper ) + try: + self.job_runners[runner_name].recover( job, job_wrapper ) + except KeyError: + log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) def shutdown( self ): for runner in self.job_runners.itervalues(): --- a/lib/galaxy/jobs/runners/drmaa.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/jobs/runners/drmaa.py Fri Feb 11 13:12:27 2011 -0500 @@ -8,29 +8,29 @@ import pkg_resources -try: - if sys.version_info[:2] == ( 2, 4 ): - pkg_resources.require( "ctypes" ) - pkg_resources.require( "drmaa" ) - drmaa = __import__( "drmaa" ) -except Exception, e: - drmaa = str( e ) +if sys.version_info[:2] == ( 2, 4 ): + pkg_resources.require( "ctypes" ) +pkg_resources.require( "drmaa" ) +# We foolishly named this file the same as the name exported by the drmaa +# library... 'import drmaa' import itself. +drmaa = __import__( "drmaa" ) log = logging.getLogger( __name__ ) -if type( drmaa ) != str: - drmaa_state = { - drmaa.JobState.UNDETERMINED: 'process status cannot be determined', - drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', - drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', - drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', - drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', - drmaa.JobState.RUNNING: 'job is running', - drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', - drmaa.JobState.USER_SUSPENDED: 'job is user suspended', - drmaa.JobState.DONE: 'job finished normally', - drmaa.JobState.FAILED: 'job finished, but failed', - } +__all__ = [ 'DRMAAJobRunner' ] + +drmaa_state = { + drmaa.JobState.UNDETERMINED: 'process status cannot be determined', + drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', + drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', + drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', + drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', + drmaa.JobState.RUNNING: 'job is running', + drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', + drmaa.JobState.USER_SUSPENDED: 'job is user suspended', + drmaa.JobState.DONE: 'job finished normally', + drmaa.JobState.FAILED: 'job finished, but failed', +} drm_template = """#!/bin/sh #$ -S /bin/sh @@ -70,8 +70,6 @@ def __init__( self, app ): """Initialize this job runner and start the monitor thread""" # Check if drmaa was importable, fail if not - if type( drmaa ) == str: - raise Exception( "DRMAAJobRunner requires drmaa module which could not be loaded: %s" % drmaa ) self.app = app self.sa_session = app.model.context # 'watched' and 'queue' are both used to keep track of jobs to watch. --- a/lib/galaxy/jobs/runners/local.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/jobs/runners/local.py Fri Feb 11 13:12:27 2011 -0500 @@ -12,6 +12,8 @@ log = logging.getLogger( __name__ ) +__all__ = [ 'LocalJobRunner' ] + class LocalJobRunner( BaseJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling --- a/lib/galaxy/jobs/runners/pbs.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/jobs/runners/pbs.py Fri Feb 11 13:12:27 2011 -0500 @@ -11,14 +11,28 @@ import pkg_resources +egg_message = """ + +The 'pbs' runner depends on 'pbs_python' which is not installed or not +configured properly. Galaxy's "scramble" system should make this installation +simple, please follow the instructions found at: + + http://bitbucket.org/galaxy/galaxy-central/wiki/Config/Cluster + +Additional errors may follow: +%s +""" + try: pkg_resources.require( "pbs_python" ) - pbs = __import__( "pbs" ) -except: - pbs = None + import pbs +except Exception, e: + raise Exception( egg_message % str( e ) ) log = logging.getLogger( __name__ ) +__all__ = [ 'PBSJobRunner' ] + pbs_template = """#!/bin/sh GALAXY_LIB="%s" if [ "$GALAXY_LIB" != "None" ]; then --- a/lib/galaxy/jobs/runners/sge.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/jobs/runners/sge.py Fri Feb 11 13:12:27 2011 -0500 @@ -8,27 +8,42 @@ import pkg_resources +egg_message = """ + +The 'sge' runner depends on 'DRMAA_python' which is not installed. Galaxy's +"scramble" system should make this installation simple, please follow the +instructions found at: + + http://bitbucket.org/galaxy/galaxy-central/wiki/Config/Cluster + +Additional errors may follow: +%s +""" + + try: pkg_resources.require( "DRMAA_python" ) - DRMAA = __import__( "DRMAA" ) -except: - DRMAA = None + import DRMAA +except Exception, e: + raise Exception( egg_message % str( e ) ) + log = logging.getLogger( __name__ ) -if DRMAA is not None: - DRMAA_state = { - DRMAA.Session.UNDETERMINED: 'process status cannot be determined', - DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled', - DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold', - DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold', - DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', - DRMAA.Session.RUNNING: 'job is running', - DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended', - DRMAA.Session.USER_SUSPENDED: 'job is user suspended', - DRMAA.Session.DONE: 'job finished normally', - DRMAA.Session.FAILED: 'job finished, but failed', - } +__all__ = [ 'SGEJobRunner' ] + +DRMAA_state = { + DRMAA.Session.UNDETERMINED: 'process status cannot be determined', + DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled', + DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold', + DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold', + DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', + DRMAA.Session.RUNNING: 'job is running', + DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended', + DRMAA.Session.USER_SUSPENDED: 'job is user suspended', + DRMAA.Session.DONE: 'job finished normally', + DRMAA.Session.FAILED: 'job finished, but failed', +} sge_template = """#!/bin/sh #$ -S /bin/sh @@ -67,9 +82,6 @@ STOP_SIGNAL = object() def __init__( self, app ): """Initialize this job runner and start the monitor thread""" - # Check if SGE was importable, fail if not - if DRMAA is None: - raise Exception( "SGEJobRunner requires DRMAA_python which was not found" ) self.app = app self.sa_session = app.model.context # 'watched' and 'queue' are both used to keep track of jobs to watch. --- a/lib/galaxy/jobs/runners/tasks.py Fri Feb 11 09:27:10 2011 -0500 +++ b/lib/galaxy/jobs/runners/tasks.py Fri Feb 11 13:12:27 2011 -0500 @@ -12,6 +12,8 @@ log = logging.getLogger( __name__ ) +__all__ = [ 'TaskedJobRunner' ] + class TaskedJobRunner( object ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket