6 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/229064ed6add/ Changeset: 229064ed6add User: natefoo Date: 2014-01-07 21:13:01 Summary: Allow handlers to control which runner plugins they will load. Affected #: 2 files diff -r 968389346b3491bf1a90cf359ff258075816b1e8 -r 229064ed6addac469460190415360e553ae91a0f lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -86,6 +86,7 @@ self.app = app self.runner_plugins = [] self.handlers = {} + self.handler_runner_plugins = {} self.default_handler_id = None self.destinations = {} self.destination_tags = {} @@ -138,6 +139,10 @@ else: log.debug("Read definition for handler '%s'" % id) self.handlers[id] = (id,) + for plugin in handler.findall('plugin'): + if id not in self.handler_runner_plugins: + self.handler_runner_plugins[id] = [] + self.handler_runner_plugins[id].append( plugin.get('id') ) if handler.get('tags', None) is not None: for tag in [ x.strip() for x in handler.get('tags').split(',') ]: if tag in self.handlers: @@ -420,13 +425,19 @@ """ return self.destinations.get(id_or_tag, None) - def get_job_runner_plugins(self): + def get_job_runner_plugins(self, handler_id): """Load all configured job runner plugins :returns: list of job runner plugins """ rval = {} - for runner in self.runner_plugins: + if handler_id in self.handler_runner_plugins: + plugins_to_load = [ rp for rp in self.runner_plugins if rp['id'] in self.handler_runner_plugins[handler_id] ] + log.info( "Handler '%s' will load specified runner plugins: %s", handler_id, ', '.join( [ rp['id'] for rp in plugins_to_load ] ) ) + else: + plugins_to_load = self.runner_plugins + log.info( "Handler '%s' will load all configured runner plugins", handler_id ) + for runner in plugins_to_load: class_names = [] module = None id = runner['id'] @@ -477,7 +488,7 @@ try: rval[id] = runner_class( self.app, runner[ 'workers' ], **runner.get( 'kwds', {} ) ) except TypeError: - log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) ) + log.exception( "Job runner '%s:%s' has not been converted to a new-style runner or encountered TypeError on load" % ( module_name, class_name ) ) rval[id] = runner_class( self.app ) log.debug( "Loaded job runner '%s:%s' as '%s'" % ( module_name, class_name, id ) ) return rval diff -r 968389346b3491bf1a90cf359ff258075816b1e8 -r 229064ed6addac469460190415360e553ae91a0f lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -565,7 +565,7 @@ class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app - self.job_runners = self.app.job_config.get_job_runner_plugins() + self.job_runners = self.app.job_config.get_job_runner_plugins( self.app.config.server_name ) # Once plugins are loaded, all job destinations that were created from # URLs can have their URL params converted to the destination's param # dict by the plugin. https://bitbucket.org/galaxy/galaxy-central/commits/681643c22fed/ Changeset: 681643c22fed User: natefoo Date: 2014-01-07 21:18:11 Summary: Allow runners plugins to accept params, provide some simple load-time validation logic for params. Affected #: 1 file diff -r 229064ed6addac469460190415360e553ae91a0f -r 681643c22fedeac632567ae6bcf204616d871f07 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -22,13 +22,39 @@ STOP_SIGNAL = object() + +class RunnerParams( object ): + + def __init__( self, specs = None, params = None ): + self.specs = specs or dict() + self.params = params or dict() + for name, value in self.params.items(): + assert name in self.specs, 'Invalid job runner parameter for this plugin: %s' % name + if 'map' in self.specs[ name ]: + try: + self.params[ name ] = self.specs[ name ][ 'map' ]( value ) + except Exception, e: + raise Exception( 'Job runner parameter "%s" value "%s" could not be converted to the correct type: %s' % ( name, value, e ) ) + if 'valid' in self.specs[ name ]: + assert self.specs[ name ][ 'valid' ]( value ), 'Job runner parameter %s failed validation' % name + + def __getattr__( self, name ): + return self.params.get( name, self.specs[ name ][ 'default' ] ) + + class BaseJobRunner( object ): - def __init__( self, app, nworkers ): + def __init__( self, app, nworkers, **kwargs ): """Start the job runner """ self.app = app self.sa_session = app.model.context self.nworkers = nworkers + runner_param_specs = dict( recheck_missing_job_retries = dict( map = int, valid = lambda x: x >= 0, default = 0 ) ) + if 'runner_param_specs' in kwargs: + runner_param_specs.update( kwargs.pop( 'runner_param_specs' ) ) + if kwargs: + log.debug( 'Loading %s with params: %s', self.runner_name, kwargs ) + self.runner_params = RunnerParams( specs = runner_param_specs, params = kwargs ) def _init_worker_threads(self): """Start ``nworkers`` worker threads. @@ -115,7 +141,7 @@ job_wrapper.cleanup() return False elif job_state != model.Job.states.QUEUED: - log.info( "(%d) Job is in state %s, skipping execution" % ( job_id, job_state ) ) + log.info( "(%s) Job is in state %s, skipping execution" % ( job_id, job_state ) ) # cleanup may not be safe in all states return False @@ -287,8 +313,8 @@ to the correct methods (queue, finish, cleanup) at appropriate times.. """ - def __init__( self, app, nworkers ): - super( AsynchronousJobRunner, self ).__init__( app, nworkers ) + def __init__( self, app, nworkers, **kwargs ): + super( AsynchronousJobRunner, self ).__init__( app, nworkers, **kwargs ) # 'watched' and 'queue' are both used to keep track of jobs to watch. # 'queue' is used to add new watched jobs, and can be called from # any thread (usually by the 'queue_job' method). 'watched' must only https://bitbucket.org/galaxy/galaxy-central/commits/e67776961e48/ Changeset: e67776961e48 User: natefoo Date: 2014-01-07 21:21:50 Summary: Make job terminal state logic somewhat configurable via runner params, update DRMAA runner plugin changes for configurable terminal state handling. Also allow the drmaa plugin to read the drmaa library path from job_conf.xml rather than the $DRMAA_LIBRARY_PATH environment variable. Affected #: 2 files diff -r 681643c22fedeac632567ae6bcf204616d871f07 -r e67776961e48dca45850c95b0f053178fd0e61b6 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -252,6 +252,10 @@ options.update(**kwds) return job_script(**options) + def _complete_terminal_job( self, ajs, **kwargs ): + if ajs.job_wrapper.get_state() != model.Job.states.DELETED: + self.work_queue.put( ( self.finish_job, ajs ) ) + class AsynchronousJobState( object ): """ diff -r 681643c22fedeac632567ae6bcf204616d871f07 -r e67776961e48dca45850c95b0f053178fd0e61b6 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -16,27 +16,12 @@ from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner eggs.require( "drmaa" ) -# We foolishly named this file the same as the name exported by the drmaa -# library... 'import drmaa' imports itself. -drmaa = __import__( "drmaa" ) log = logging.getLogger( __name__ ) __all__ = [ 'DRMAAJobRunner' ] -drmaa_state = { - drmaa.JobState.UNDETERMINED: 'process status cannot be determined', - drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', - drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', - drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', - drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', - drmaa.JobState.RUNNING: 'job is running', - drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', - drmaa.JobState.USER_SUSPENDED: 'job is user suspended', - drmaa.JobState.DONE: 'job finished normally', - drmaa.JobState.FAILED: 'job finished, but failed', -} - +drmaa = None DRMAA_jobTemplate_attributes = [ 'args', 'remoteCommand', 'outputPath', 'errorPath', 'nativeSpecification', 'jobName', 'email', 'project' ] @@ -48,8 +33,50 @@ """ runner_name = "DRMAARunner" - def __init__( self, app, nworkers ): + def __init__( self, app, nworkers, **kwargs ): """Start the job runner""" + + global drmaa + + runner_param_specs = dict( + drmaa_library_path = dict( map = str, default = os.environ.get( 'DRMAA_LIBRARY_PATH', None ) ), + invalidjobexception_state = dict( map = str, valid = lambda x: x in ( model.Job.states.OK, model.Job.states.ERROR ), default = model.Job.states.OK ), + invalidjobexception_retries = dict( map = int, valid = lambda x: int >= 0, default = 0 ), + internalexception_state = dict( map = str, valid = lambda x: x in ( model.Job.states.OK, model.Job.states.ERROR ), default = model.Job.states.OK ), + internalexception_retries = dict( map = int, valid = lambda x: int >= 0, default = 0 ) ) + + if 'runner_param_specs' not in kwargs: + kwargs[ 'runner_param_specs' ] = dict() + kwargs[ 'runner_param_specs' ].update( runner_param_specs ) + + super( DRMAAJobRunner, self ).__init__( app, nworkers, **kwargs ) + + # This allows multiple drmaa runners (although only one per handler) in the same job config file + if 'drmaa_library_path' in kwargs: + log.info( 'Overriding DRMAA_LIBRARY_PATH due to runner plugin parameter: %s', self.runner_params.drmaa_library_path ) + os.environ['DRMAA_LIBRARY_PATH'] = self.runner_params.drmaa_library_path + + # We foolishly named this file the same as the name exported by the drmaa + # library... 'import drmaa' imports itself. + drmaa = __import__( "drmaa" ) + + # Subclasses may need access to state constants + self.drmaa_job_states = drmaa.JobState + + # Descriptive state strings pulled from the drmaa lib itself + self.drmaa_job_state_strings = { + drmaa.JobState.UNDETERMINED: 'process status cannot be determined', + drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', + drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', + drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', + drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', + drmaa.JobState.RUNNING: 'job is running', + drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', + drmaa.JobState.USER_SUSPENDED: 'job is user suspended', + drmaa.JobState.DONE: 'job finished normally', + drmaa.JobState.FAILED: 'job finished, but failed', + } + self.ds = drmaa.Session() self.ds.initialize() @@ -58,7 +85,6 @@ self.external_killJob_script = app.config.drmaa_external_killjob_script self.userid = None - super( DRMAAJobRunner, self ).__init__( app, nworkers ) self._init_monitor_thread() self._init_worker_threads() @@ -175,6 +201,20 @@ # Add to our 'queue' of jobs to monitor self.monitor_queue.put( ajs ) + def _complete_terminal_job( self, ajs, drmaa_state, **kwargs ): + """ + Handle a job upon its termination in the DRM. This method is meant to + be overridden by subclasses to improve post-mortem and reporting of + failures. + """ + if drmaa_state == drmaa.JobState.FAILED: + if ajs.job_wrapper.get_state() != model.Job.states.DELETED: + ajs.stop_job = False + ajs.fail_message = "The cluster DRM system terminated this job" + self.work_queue.put( ( self.fail_job, ajs ) ) + elif drmaa_state == drmaa.JobState.DONE: + super( DRMAAJobRunner, self )._complete_terminal_job( ajs ) + def check_watched_items( self ): """ Called by the monitor thread to look at each watched job and deal @@ -188,16 +228,27 @@ try: assert external_job_id not in ( None, 'None' ), '(%s/%s) Invalid job id' % ( galaxy_id_tag, external_job_id ) state = self.ds.jobStatus( external_job_id ) - # InternalException was reported to be necessary on some DRMs, but - # this could cause failures to be detected as completion! Please - # report if you experience problems with this. - except ( drmaa.InvalidJobException, drmaa.InternalException ), e: - # we should only get here if an orphaned job was put into the queue at app startup - log.info( "(%s/%s) job left DRM queue with following message: %s" % ( galaxy_id_tag, external_job_id, e ) ) - self.work_queue.put( ( self.finish_job, ajs ) ) + except ( drmaa.InternalException, drmaa.InvalidJobException ), e: + ecn = e.__class__.__name__ + retry_param = ecn.lower() + '_retries' + state_param = ecn.lower() + '_state' + retries = getattr( ajs, retry_param, 0 ) + if self.runner_params[ retry_param ] > 0: + if retries < self.runner_params[ retry_param ]: + # will retry check on next iteration + setattr( ajs, retry_param, retries + 1 ) + continue + if self.runner_params[ state_param ] == model.Job.states.OK: + log.info( "(%s/%s) job left DRM queue with following message: %s", galaxy_id_tag, external_job_id, e ) + self.work_queue.put( ( self.finish_job, ajs ) ) + elif self.runner_params[ state_param ] == model.Job.states.ERROR: + log.info( "(%s/%s) job check resulted in %s after %s tries: %s", galaxy_id_tag, external_job_id, ecn, retries, e ) + self.work_queue.put( ( self.fail_job, ajs ) ) + else: + raise Exception( "%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[ state_param ] ) continue except drmaa.DrmCommunicationException, e: - log.warning( "(%s/%s) unable to communicate with DRM: %s" % ( galaxy_id_tag, external_job_id, e )) + log.warning( "(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e ) new_watched.append( ajs ) continue except Exception, e: @@ -208,19 +259,12 @@ self.work_queue.put( ( self.fail_job, ajs ) ) continue if state != old_state: - log.debug( "(%s/%s) state change: %s" % ( galaxy_id_tag, external_job_id, drmaa_state[state] ) ) + log.debug( "(%s/%s) state change: %s" % ( galaxy_id_tag, external_job_id, self.drmaa_job_state_strings[state] ) ) if state == drmaa.JobState.RUNNING and not ajs.running: ajs.running = True ajs.job_wrapper.change_state( model.Job.states.RUNNING ) - if state == drmaa.JobState.FAILED: - if ajs.job_wrapper.get_state() != model.Job.states.DELETED: - ajs.stop_job = False - ajs.fail_message = "The cluster DRM system terminated this job" - self.work_queue.put( ( self.fail_job, ajs ) ) - continue - if state == drmaa.JobState.DONE: - if ajs.job_wrapper.get_state() != model.Job.states.DELETED: - self.work_queue.put( ( self.finish_job, ajs ) ) + if state in ( drmaa.JobState.FAILED, drmaa.JobState.DONE ): + self._complete_terminal_job( ajs, drmaa_state = state ) continue ajs.old_state = state new_watched.append( ajs ) https://bitbucket.org/galaxy/galaxy-central/commits/d3df5b5e91f3/ Changeset: d3df5b5e91f3 User: natefoo Date: 2014-01-07 21:23:11 Summary: Add a Slurm runner (subclassed from the drmaa runner), adds some logic for determining/reporting the reason of job failure and requeuing jobs that failed due to node failure. Affected #: 1 file diff -r e67776961e48dca45850c95b0f053178fd0e61b6 -r d3df5b5e91f3c42cffdba83f0ea9300b5caa172a lib/galaxy/jobs/runners/slurm.py --- /dev/null +++ b/lib/galaxy/jobs/runners/slurm.py @@ -0,0 +1,57 @@ +""" +SLURM job control via the DRMAA API. +""" + +import time +import logging +import subprocess + +from galaxy import model +from galaxy.jobs.runners.drmaa import DRMAAJobRunner + +log = logging.getLogger( __name__ ) + +__all__ = [ 'SlurmJobRunner' ] + + +class SlurmJobRunner( DRMAAJobRunner ): + runner_name = "SlurmRunner" + + def _complete_terminal_job( self, ajs, drmaa_state, **kwargs ): + def __get_jobinfo(): + scontrol_out = subprocess.check_output( ( 'scontrol', '-o', 'show', 'job', ajs.job_id ) ) + return dict( [ out_param.split( '=', 1 ) for out_param in scontrol_out.split() ] ) + if drmaa_state == self.drmaa_job_states.FAILED: + try: + job_info = __get_jobinfo() + sleep = 1 + while job_info['JobState'] == 'COMPLETING': + log.debug( '(%s/%s) Waiting %s seconds for failed job to exit COMPLETING state for post-mortem', ajs.job_wrapper.get_id_tag(), ajs.job_id, sleep ) + time.sleep( sleep ) + sleep *= 2 + if sleep > 64: + ajs.fail_message = "This job failed and the system timed out while trying to determine the cause of the failure." + break + job_info = __get_jobinfo() + if job_info['JobState'] == 'TIMEOUT': + ajs.fail_message = "This job was terminated because it ran longer than the maximum allowed job run time." + elif job_info['JobState'] == 'NODE_FAIL': + log.warning( '(%s/%s) Job failed due to node failure, attempting resubmission', ajs.job_wrapper.get_id_tag(), ajs.job_id ) + ajs.job_wrapper.change_state( model.Job.states.QUEUED, info = 'Job was resubmitted due to node failure' ) + try: + self.queue_job( ajs.job_wrapper ) + return + except: + ajs.fail_message = "This job failed due to a cluster node failure, and an attempt to resubmit the job failed." + elif job_info['JobState'] == 'CANCELLED': + ajs.fail_message = "This job failed because it was cancelled by an administrator." + else: + ajs.fail_message = "This job failed for reasons that could not be determined." + ajs.fail_message += '\nPlease click the bug icon to report this problem if you need help.' + ajs.stop_job = False + self.work_queue.put( ( self.fail_job, ajs ) ) + except Exception, e: + log.exception( '(%s/%s) Unable to inspect failed slurm job using scontrol, job will be unconditionally failed: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, e ) + super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) + elif drmaa_state == self.drmaa_job_states.DONE: + super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) https://bitbucket.org/galaxy/galaxy-central/commits/aac6bd9c589b/ Changeset: aac6bd9c589b User: natefoo Date: 2014-01-07 21:23:38 Summary: Sample job_conf.xml for new features introduced. Affected #: 1 file diff -r d3df5b5e91f3c42cffdba83f0ea9300b5caa172a -r aac6bd9c589bf43ec5835c901911704336932d1d job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -6,7 +6,19 @@ --><plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/><plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/> - <plugin id="drmaa" type="runner" load="galaxy.jobs.runners.drmaa:DRMAAJobRunner"/> + <plugin id="drmaa" type="runner" load="galaxy.jobs.runners.drmaa:DRMAAJobRunner"> + <!-- Different DRMs handle successfully completed jobs differently, + these options can be changed to handle such differences and + are explained in detail on the Galaxy wiki. Defaults are shown --> + <param id="invalidjobexception_state">ok</param> + <param id="invalidjobexception_retries">0</param> + <param id="internalexception_state">ok</param> + <param id="internalexception_retries">0</param> + </plugin> + <plugin id="sge" type="runner" load="galaxy.jobs.runners.drmaa:DRMAAJobRunner"> + <!-- Override the $DRMAA_LIBRARY_PATH environment variable --> + <param id="drmaa_library_path">/sge/lib/libdrmaa.so</param> + </plugin><plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"><!-- More information on LWR can be found at https://lwr.readthedocs.org --><!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --> @@ -14,6 +26,7 @@ </plugin><plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /><plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> + <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -21,6 +34,15 @@ --><handler id="handler0" tags="handlers"/><handler id="handler1" tags="handlers"/> + <!-- Handlers will load all plugins defined in the <plugins> collection + above by default, but can be limited to a subset using <plugin> + tags. This is useful for heterogenous environments where the DRMAA + plugin would need to be loaded more than once with different + configs. + --> + <handler id="sge_handler"> + <plugin id="sge"/> + </handler><handler id="special_handler0" tags="special_handlers"/><handler id="special_handler1" tags="special_handlers"/><handler id="trackster_handler"/> https://bitbucket.org/galaxy/galaxy-central/commits/2d525856226f/ Changeset: 2d525856226f User: natefoo Date: 2014-01-09 18:01:19 Summary: Merged in natefoo/galaxy-central (pull request #290) Configurable terminal job state, Slurm enhancements Affected #: 6 files diff -r 10ac38a448152b9a927c34f36334724dfd173bfc -r 2d525856226f0115a5f8dea5e7fedfae2fcbcf8a job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -6,7 +6,19 @@ --><plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/><plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/> - <plugin id="drmaa" type="runner" load="galaxy.jobs.runners.drmaa:DRMAAJobRunner"/> + <plugin id="drmaa" type="runner" load="galaxy.jobs.runners.drmaa:DRMAAJobRunner"> + <!-- Different DRMs handle successfully completed jobs differently, + these options can be changed to handle such differences and + are explained in detail on the Galaxy wiki. Defaults are shown --> + <param id="invalidjobexception_state">ok</param> + <param id="invalidjobexception_retries">0</param> + <param id="internalexception_state">ok</param> + <param id="internalexception_retries">0</param> + </plugin> + <plugin id="sge" type="runner" load="galaxy.jobs.runners.drmaa:DRMAAJobRunner"> + <!-- Override the $DRMAA_LIBRARY_PATH environment variable --> + <param id="drmaa_library_path">/sge/lib/libdrmaa.so</param> + </plugin><plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"><!-- More information on LWR can be found at https://lwr.readthedocs.org --><!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --> @@ -14,6 +26,7 @@ </plugin><plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /><plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> + <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -21,6 +34,15 @@ --><handler id="handler0" tags="handlers"/><handler id="handler1" tags="handlers"/> + <!-- Handlers will load all plugins defined in the <plugins> collection + above by default, but can be limited to a subset using <plugin> + tags. This is useful for heterogenous environments where the DRMAA + plugin would need to be loaded more than once with different + configs. + --> + <handler id="sge_handler"> + <plugin id="sge"/> + </handler><handler id="special_handler0" tags="special_handlers"/><handler id="special_handler1" tags="special_handlers"/><handler id="trackster_handler"/> diff -r 10ac38a448152b9a927c34f36334724dfd173bfc -r 2d525856226f0115a5f8dea5e7fedfae2fcbcf8a lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -86,6 +86,7 @@ self.app = app self.runner_plugins = [] self.handlers = {} + self.handler_runner_plugins = {} self.default_handler_id = None self.destinations = {} self.destination_tags = {} @@ -138,6 +139,10 @@ else: log.debug("Read definition for handler '%s'" % id) self.handlers[id] = (id,) + for plugin in handler.findall('plugin'): + if id not in self.handler_runner_plugins: + self.handler_runner_plugins[id] = [] + self.handler_runner_plugins[id].append( plugin.get('id') ) if handler.get('tags', None) is not None: for tag in [ x.strip() for x in handler.get('tags').split(',') ]: if tag in self.handlers: @@ -420,13 +425,19 @@ """ return self.destinations.get(id_or_tag, None) - def get_job_runner_plugins(self): + def get_job_runner_plugins(self, handler_id): """Load all configured job runner plugins :returns: list of job runner plugins """ rval = {} - for runner in self.runner_plugins: + if handler_id in self.handler_runner_plugins: + plugins_to_load = [ rp for rp in self.runner_plugins if rp['id'] in self.handler_runner_plugins[handler_id] ] + log.info( "Handler '%s' will load specified runner plugins: %s", handler_id, ', '.join( [ rp['id'] for rp in plugins_to_load ] ) ) + else: + plugins_to_load = self.runner_plugins + log.info( "Handler '%s' will load all configured runner plugins", handler_id ) + for runner in plugins_to_load: class_names = [] module = None id = runner['id'] @@ -477,7 +488,7 @@ try: rval[id] = runner_class( self.app, runner[ 'workers' ], **runner.get( 'kwds', {} ) ) except TypeError: - log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) ) + log.exception( "Job runner '%s:%s' has not been converted to a new-style runner or encountered TypeError on load" % ( module_name, class_name ) ) rval[id] = runner_class( self.app ) log.debug( "Loaded job runner '%s:%s' as '%s'" % ( module_name, class_name, id ) ) return rval diff -r 10ac38a448152b9a927c34f36334724dfd173bfc -r 2d525856226f0115a5f8dea5e7fedfae2fcbcf8a lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -565,7 +565,7 @@ class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app - self.job_runners = self.app.job_config.get_job_runner_plugins() + self.job_runners = self.app.job_config.get_job_runner_plugins( self.app.config.server_name ) # Once plugins are loaded, all job destinations that were created from # URLs can have their URL params converted to the destination's param # dict by the plugin. diff -r 10ac38a448152b9a927c34f36334724dfd173bfc -r 2d525856226f0115a5f8dea5e7fedfae2fcbcf8a lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -22,13 +22,39 @@ STOP_SIGNAL = object() + +class RunnerParams( object ): + + def __init__( self, specs = None, params = None ): + self.specs = specs or dict() + self.params = params or dict() + for name, value in self.params.items(): + assert name in self.specs, 'Invalid job runner parameter for this plugin: %s' % name + if 'map' in self.specs[ name ]: + try: + self.params[ name ] = self.specs[ name ][ 'map' ]( value ) + except Exception, e: + raise Exception( 'Job runner parameter "%s" value "%s" could not be converted to the correct type: %s' % ( name, value, e ) ) + if 'valid' in self.specs[ name ]: + assert self.specs[ name ][ 'valid' ]( value ), 'Job runner parameter %s failed validation' % name + + def __getattr__( self, name ): + return self.params.get( name, self.specs[ name ][ 'default' ] ) + + class BaseJobRunner( object ): - def __init__( self, app, nworkers ): + def __init__( self, app, nworkers, **kwargs ): """Start the job runner """ self.app = app self.sa_session = app.model.context self.nworkers = nworkers + runner_param_specs = dict( recheck_missing_job_retries = dict( map = int, valid = lambda x: x >= 0, default = 0 ) ) + if 'runner_param_specs' in kwargs: + runner_param_specs.update( kwargs.pop( 'runner_param_specs' ) ) + if kwargs: + log.debug( 'Loading %s with params: %s', self.runner_name, kwargs ) + self.runner_params = RunnerParams( specs = runner_param_specs, params = kwargs ) def _init_worker_threads(self): """Start ``nworkers`` worker threads. @@ -115,7 +141,7 @@ job_wrapper.cleanup() return False elif job_state != model.Job.states.QUEUED: - log.info( "(%d) Job is in state %s, skipping execution" % ( job_id, job_state ) ) + log.info( "(%s) Job is in state %s, skipping execution" % ( job_id, job_state ) ) # cleanup may not be safe in all states return False @@ -226,6 +252,10 @@ options.update(**kwds) return job_script(**options) + def _complete_terminal_job( self, ajs, **kwargs ): + if ajs.job_wrapper.get_state() != model.Job.states.DELETED: + self.work_queue.put( ( self.finish_job, ajs ) ) + class AsynchronousJobState( object ): """ @@ -287,8 +317,8 @@ to the correct methods (queue, finish, cleanup) at appropriate times.. """ - def __init__( self, app, nworkers ): - super( AsynchronousJobRunner, self ).__init__( app, nworkers ) + def __init__( self, app, nworkers, **kwargs ): + super( AsynchronousJobRunner, self ).__init__( app, nworkers, **kwargs ) # 'watched' and 'queue' are both used to keep track of jobs to watch. # 'queue' is used to add new watched jobs, and can be called from # any thread (usually by the 'queue_job' method). 'watched' must only diff -r 10ac38a448152b9a927c34f36334724dfd173bfc -r 2d525856226f0115a5f8dea5e7fedfae2fcbcf8a lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -16,27 +16,12 @@ from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner eggs.require( "drmaa" ) -# We foolishly named this file the same as the name exported by the drmaa -# library... 'import drmaa' imports itself. -drmaa = __import__( "drmaa" ) log = logging.getLogger( __name__ ) __all__ = [ 'DRMAAJobRunner' ] -drmaa_state = { - drmaa.JobState.UNDETERMINED: 'process status cannot be determined', - drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', - drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', - drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', - drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', - drmaa.JobState.RUNNING: 'job is running', - drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', - drmaa.JobState.USER_SUSPENDED: 'job is user suspended', - drmaa.JobState.DONE: 'job finished normally', - drmaa.JobState.FAILED: 'job finished, but failed', -} - +drmaa = None DRMAA_jobTemplate_attributes = [ 'args', 'remoteCommand', 'outputPath', 'errorPath', 'nativeSpecification', 'jobName', 'email', 'project' ] @@ -48,8 +33,50 @@ """ runner_name = "DRMAARunner" - def __init__( self, app, nworkers ): + def __init__( self, app, nworkers, **kwargs ): """Start the job runner""" + + global drmaa + + runner_param_specs = dict( + drmaa_library_path = dict( map = str, default = os.environ.get( 'DRMAA_LIBRARY_PATH', None ) ), + invalidjobexception_state = dict( map = str, valid = lambda x: x in ( model.Job.states.OK, model.Job.states.ERROR ), default = model.Job.states.OK ), + invalidjobexception_retries = dict( map = int, valid = lambda x: int >= 0, default = 0 ), + internalexception_state = dict( map = str, valid = lambda x: x in ( model.Job.states.OK, model.Job.states.ERROR ), default = model.Job.states.OK ), + internalexception_retries = dict( map = int, valid = lambda x: int >= 0, default = 0 ) ) + + if 'runner_param_specs' not in kwargs: + kwargs[ 'runner_param_specs' ] = dict() + kwargs[ 'runner_param_specs' ].update( runner_param_specs ) + + super( DRMAAJobRunner, self ).__init__( app, nworkers, **kwargs ) + + # This allows multiple drmaa runners (although only one per handler) in the same job config file + if 'drmaa_library_path' in kwargs: + log.info( 'Overriding DRMAA_LIBRARY_PATH due to runner plugin parameter: %s', self.runner_params.drmaa_library_path ) + os.environ['DRMAA_LIBRARY_PATH'] = self.runner_params.drmaa_library_path + + # We foolishly named this file the same as the name exported by the drmaa + # library... 'import drmaa' imports itself. + drmaa = __import__( "drmaa" ) + + # Subclasses may need access to state constants + self.drmaa_job_states = drmaa.JobState + + # Descriptive state strings pulled from the drmaa lib itself + self.drmaa_job_state_strings = { + drmaa.JobState.UNDETERMINED: 'process status cannot be determined', + drmaa.JobState.QUEUED_ACTIVE: 'job is queued and active', + drmaa.JobState.SYSTEM_ON_HOLD: 'job is queued and in system hold', + drmaa.JobState.USER_ON_HOLD: 'job is queued and in user hold', + drmaa.JobState.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', + drmaa.JobState.RUNNING: 'job is running', + drmaa.JobState.SYSTEM_SUSPENDED: 'job is system suspended', + drmaa.JobState.USER_SUSPENDED: 'job is user suspended', + drmaa.JobState.DONE: 'job finished normally', + drmaa.JobState.FAILED: 'job finished, but failed', + } + self.ds = drmaa.Session() self.ds.initialize() @@ -58,7 +85,6 @@ self.external_killJob_script = app.config.drmaa_external_killjob_script self.userid = None - super( DRMAAJobRunner, self ).__init__( app, nworkers ) self._init_monitor_thread() self._init_worker_threads() @@ -175,6 +201,20 @@ # Add to our 'queue' of jobs to monitor self.monitor_queue.put( ajs ) + def _complete_terminal_job( self, ajs, drmaa_state, **kwargs ): + """ + Handle a job upon its termination in the DRM. This method is meant to + be overridden by subclasses to improve post-mortem and reporting of + failures. + """ + if drmaa_state == drmaa.JobState.FAILED: + if ajs.job_wrapper.get_state() != model.Job.states.DELETED: + ajs.stop_job = False + ajs.fail_message = "The cluster DRM system terminated this job" + self.work_queue.put( ( self.fail_job, ajs ) ) + elif drmaa_state == drmaa.JobState.DONE: + super( DRMAAJobRunner, self )._complete_terminal_job( ajs ) + def check_watched_items( self ): """ Called by the monitor thread to look at each watched job and deal @@ -188,16 +228,27 @@ try: assert external_job_id not in ( None, 'None' ), '(%s/%s) Invalid job id' % ( galaxy_id_tag, external_job_id ) state = self.ds.jobStatus( external_job_id ) - # InternalException was reported to be necessary on some DRMs, but - # this could cause failures to be detected as completion! Please - # report if you experience problems with this. - except ( drmaa.InvalidJobException, drmaa.InternalException ), e: - # we should only get here if an orphaned job was put into the queue at app startup - log.info( "(%s/%s) job left DRM queue with following message: %s" % ( galaxy_id_tag, external_job_id, e ) ) - self.work_queue.put( ( self.finish_job, ajs ) ) + except ( drmaa.InternalException, drmaa.InvalidJobException ), e: + ecn = e.__class__.__name__ + retry_param = ecn.lower() + '_retries' + state_param = ecn.lower() + '_state' + retries = getattr( ajs, retry_param, 0 ) + if self.runner_params[ retry_param ] > 0: + if retries < self.runner_params[ retry_param ]: + # will retry check on next iteration + setattr( ajs, retry_param, retries + 1 ) + continue + if self.runner_params[ state_param ] == model.Job.states.OK: + log.info( "(%s/%s) job left DRM queue with following message: %s", galaxy_id_tag, external_job_id, e ) + self.work_queue.put( ( self.finish_job, ajs ) ) + elif self.runner_params[ state_param ] == model.Job.states.ERROR: + log.info( "(%s/%s) job check resulted in %s after %s tries: %s", galaxy_id_tag, external_job_id, ecn, retries, e ) + self.work_queue.put( ( self.fail_job, ajs ) ) + else: + raise Exception( "%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[ state_param ] ) continue except drmaa.DrmCommunicationException, e: - log.warning( "(%s/%s) unable to communicate with DRM: %s" % ( galaxy_id_tag, external_job_id, e )) + log.warning( "(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e ) new_watched.append( ajs ) continue except Exception, e: @@ -208,19 +259,12 @@ self.work_queue.put( ( self.fail_job, ajs ) ) continue if state != old_state: - log.debug( "(%s/%s) state change: %s" % ( galaxy_id_tag, external_job_id, drmaa_state[state] ) ) + log.debug( "(%s/%s) state change: %s" % ( galaxy_id_tag, external_job_id, self.drmaa_job_state_strings[state] ) ) if state == drmaa.JobState.RUNNING and not ajs.running: ajs.running = True ajs.job_wrapper.change_state( model.Job.states.RUNNING ) - if state == drmaa.JobState.FAILED: - if ajs.job_wrapper.get_state() != model.Job.states.DELETED: - ajs.stop_job = False - ajs.fail_message = "The cluster DRM system terminated this job" - self.work_queue.put( ( self.fail_job, ajs ) ) - continue - if state == drmaa.JobState.DONE: - if ajs.job_wrapper.get_state() != model.Job.states.DELETED: - self.work_queue.put( ( self.finish_job, ajs ) ) + if state in ( drmaa.JobState.FAILED, drmaa.JobState.DONE ): + self._complete_terminal_job( ajs, drmaa_state = state ) continue ajs.old_state = state new_watched.append( ajs ) diff -r 10ac38a448152b9a927c34f36334724dfd173bfc -r 2d525856226f0115a5f8dea5e7fedfae2fcbcf8a lib/galaxy/jobs/runners/slurm.py --- /dev/null +++ b/lib/galaxy/jobs/runners/slurm.py @@ -0,0 +1,57 @@ +""" +SLURM job control via the DRMAA API. +""" + +import time +import logging +import subprocess + +from galaxy import model +from galaxy.jobs.runners.drmaa import DRMAAJobRunner + +log = logging.getLogger( __name__ ) + +__all__ = [ 'SlurmJobRunner' ] + + +class SlurmJobRunner( DRMAAJobRunner ): + runner_name = "SlurmRunner" + + def _complete_terminal_job( self, ajs, drmaa_state, **kwargs ): + def __get_jobinfo(): + scontrol_out = subprocess.check_output( ( 'scontrol', '-o', 'show', 'job', ajs.job_id ) ) + return dict( [ out_param.split( '=', 1 ) for out_param in scontrol_out.split() ] ) + if drmaa_state == self.drmaa_job_states.FAILED: + try: + job_info = __get_jobinfo() + sleep = 1 + while job_info['JobState'] == 'COMPLETING': + log.debug( '(%s/%s) Waiting %s seconds for failed job to exit COMPLETING state for post-mortem', ajs.job_wrapper.get_id_tag(), ajs.job_id, sleep ) + time.sleep( sleep ) + sleep *= 2 + if sleep > 64: + ajs.fail_message = "This job failed and the system timed out while trying to determine the cause of the failure." + break + job_info = __get_jobinfo() + if job_info['JobState'] == 'TIMEOUT': + ajs.fail_message = "This job was terminated because it ran longer than the maximum allowed job run time." + elif job_info['JobState'] == 'NODE_FAIL': + log.warning( '(%s/%s) Job failed due to node failure, attempting resubmission', ajs.job_wrapper.get_id_tag(), ajs.job_id ) + ajs.job_wrapper.change_state( model.Job.states.QUEUED, info = 'Job was resubmitted due to node failure' ) + try: + self.queue_job( ajs.job_wrapper ) + return + except: + ajs.fail_message = "This job failed due to a cluster node failure, and an attempt to resubmit the job failed." + elif job_info['JobState'] == 'CANCELLED': + ajs.fail_message = "This job failed because it was cancelled by an administrator." + else: + ajs.fail_message = "This job failed for reasons that could not be determined." + ajs.fail_message += '\nPlease click the bug icon to report this problem if you need help.' + ajs.stop_job = False + self.work_queue.put( ( self.fail_job, ajs ) ) + except Exception, e: + log.exception( '(%s/%s) Unable to inspect failed slurm job using scontrol, job will be unconditionally failed: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, e ) + super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) + elif drmaa_state == self.drmaa_job_states.DONE: + super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.