1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/7b209e06ddb9/ Changeset: 7b209e06ddb9 User: natefoo Date: 2014-06-18 21:55:32 Summary: Add some new features to job execution: - Runner states - Allow runner plugins to provide job finish/failure conditions back to Galaxy so that actions can be taken on specific actions. Currently only the WALLTIME_REACHED state is defined. This can be set on the (Asynchronous)JobState's `runner_state` attribute. Only the slurm runner currently does this. - Runner state handlers - Pluggable interface for defining actions to take when runner state actions occur. Any (non _) python file in galaxy.jobs.runners.state_handlers will be loaded, but handler function names should match the step in the job lifecycle where they should be used. Only the 'failure' method is currently implemented, but adding more would be trivial. Clever parameterization a la the dynamic runner would be a nice improvement here. - Destination resubmission - Destinations in the job config can specify a new destination that jobs should be resubmitted to under certain conditions (currently the only condition implemented is walltime_reached on the original destination...) - Resubmit on walltime reached state handler plugin - The actual resubmission implementation. - RESUBMITTED Job state - Allows resubmitted jobs to bypass the normal ready to run checks and begin execution immediately. - RESUBMITTED DatasetInstance state - This was the best method Carl and I could come up with for persisting the resubmitted state so that it would be visible in the UI. It's not perfect but I didn't want to alter the schema and the only place it could go (job table) is not eagerloaded on history status updates. The resubmission code will not actully set this state yet (it is commented out) until the UI can cope with it. Bonus: once this is done we can pretty easily add a "job concurrency limit reached" to give users a visual cue on jobs waiting for that reason. This has been tested pretty extensively with job recovery, concurrency limits and multiprocess setups, which is to say that it will surely fail miserably in production. Affected #: 10 files diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -323,6 +323,30 @@ --><param id="request_cpus">8</param></destination> + + <!-- Jobs that hit the walltime on one destination can be automatically + resubmitted to another destination. Walltime detection is + currently only implemented in the slurm runner. + + Multiple resubmit tags can be defined, the first resubmit matching + the terminal condition of a job will be used. + + The 'condition' attribute is optional, if not present, the + resubmit destination will be used for all conditions. Currently, + only the "walltime_reached" condition is implemented. + + The 'handler' tag is optional, if not present, the job's original + handler will be reused for the resubmitted job. + --> + <destination id="short_fast" runner="slurm"> + <param id="nativeSpecification">--time=00:05:00 --nodes=1</param> + <resubmit condition="walltime_reached" destination="long_slow" handler="sge_handler" /> + </destination> + <destination id="long_slow" runner="sge"> + <!-- The destination that you resubmit jobs to can be any runner type --> + <param id="nativeSpecification">-l h_rt=96:00:00</param> + </destination> + </destinations><resources default="default"><!-- Group different parameters defined in job_resource_params_conf.xml diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -57,6 +57,8 @@ self['runner'] = None self['legacy'] = False self['converted'] = False + self['env'] = [] + self['resubmit'] = [] # dict is appropriate (rather than a bunch) since keys may not be valid as attributes self['params'] = dict() super(JobDestination, self).__init__(**kwds) @@ -187,6 +189,7 @@ job_destination = JobDestination(**dict(destination.items())) job_destination['params'] = self.__get_params(destination) job_destination['env'] = self.__get_envs(destination) + job_destination['resubmit'] = self.__get_resubmits(destination) self.destinations[id] = (job_destination,) if job_destination.tags is not None: for tag in job_destination.tags: @@ -420,7 +423,7 @@ def __get_envs(self, parent): """Parses any child <env> tags in to a dictionary suitable for persistence. - :param parent: Parent element in which to find child <param> tags. + :param parent: Parent element in which to find child <env> tags. :type parent: ``xml.etree.ElementTree.Element`` :returns: dict @@ -436,6 +439,23 @@ ) ) return rval + def __get_resubmits(self, parent): + """Parses any child <resubmit> tags in to a dictionary suitable for persistence. + + :param parent: Parent element in which to find child <resubmit> tags. + :type parent: ``xml.etree.ElementTree.Element`` + + :returns: dict + """ + rval = [] + for resubmit in parent.findall('resubmit'): + rval.append( dict( + condition=resubmit.get('condition'), + destination=resubmit.get('destination'), + handler=resubmit.get('handler') + ) ) + return rval + @property def default_job_tool_configuration(self): """The default JobToolConfiguration, used if a tool does not have an explicit defintion in the configuration. It consists of a reference to the default handler and default destination. @@ -909,6 +929,17 @@ job.state = job.states.PAUSED self.sa_session.add( job ) + def mark_as_resubmitted( self ): + job = self.get_job() + self.sa_session.refresh( job ) + # TODO: Enable this code once a UI for resubmitted datasets exists + #for dataset in [ dataset_assoc.dataset for dataset_assoc in job.output_datasets + job.output_library_datasets ]: + # dataset._state = model.Dataset.states.RESUBMITTED + # self.sa_session.add( dataset ) + job.state = model.Job.states.RESUBMITTED + self.sa_session.add( job ) + self.sa_session.flush() + def change_state( self, state, info=False ): job = self.get_job() self.sa_session.refresh( job ) diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -128,7 +128,6 @@ self.queue.put( ( job.id, job.tool_id ) ) elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None: # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist - # TODO: test me extensively job_wrapper = self.job_wrapper( job ) job_destination = self.dispatcher.url_to_destination(job.job_runner_name) if job_destination.id is None: @@ -146,7 +145,17 @@ else: # Already dispatched and running job_wrapper = self.job_wrapper( job ) - job_wrapper.job_runner_mapper.cached_job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params) + # Use the persisted destination as its params may differ from + # what's in the job_conf xml + job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params) + # resubmits are not persisted (it's a good thing) so they + # should be added back to the in-memory destination on startup + try: + config_job_destination = self.app.job_config.get_destination( job.destination_id ) + job_destination.resubmit = config_job_destination.resubmit + except KeyError: + log.warning( '(%s) Recovered destination id (%s) does not exist in job config (but this may be normal in the case of a dynamically generated destination)', job.id, job.destination_id ) + job_wrapper.job_runner_mapper.cached_job_destination = job_destination self.dispatcher.recover( job, job_wrapper ) if self.sa_session.dirty: self.sa_session.flush() @@ -177,6 +186,7 @@ """ # Pull all new jobs from the queue at once jobs_to_check = [] + resubmit_jobs = [] if self.track_jobs_in_database: # Clear the session so we get fresh states for job and all datasets self.sa_session.expunge_all() @@ -215,6 +225,11 @@ ~model.Job.table.c.id.in_(hda_not_ready), ~model.Job.table.c.id.in_(ldda_not_ready))) \ .order_by(model.Job.id).all() + # Fetch all "resubmit" jobs + resubmit_jobs = self.sa_session.query(model.Job).enable_eagerloads(False) \ + .filter(and_((model.Job.state == model.Job.states.RESUBMITTED), + (model.Job.handler == self.app.config.server_name))) \ + .order_by(model.Job.id).all() else: # Get job objects and append to watch queue for any which were # previously waiting @@ -233,6 +248,14 @@ pass # Ensure that we get new job counts on each iteration self.__clear_job_count() + # Check resubmit jobs first so that limits of new jobs will still be enforced + for job in resubmit_jobs: + log.debug( '(%s) Job was resubmitted and is being dispatched immediately', job.id ) + # Reassemble resubmit job destination from persisted value + jw = self.job_wrapper( job ) + jw.job_runner_mapper.cached_job_destination = JobDestination( id=job.destination_id, runner=job.job_runner_name, params=job.destination_params ) + self.increase_running_job_count(job.user_id, jw.job_destination.id) + self.dispatcher.put( jw ) # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting_jobs = [] @@ -358,7 +381,11 @@ # This could have been incremented by a previous job dispatched on this iteration, even if we're not caching rval = self.user_job_count.get(user_id, 0) if not self.app.config.cache_user_job_count: - result = self.sa_session.execute(select([func.count(model.Job.table.c.id)]).where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id)))) + result = self.sa_session.execute(select([func.count(model.Job.table.c.id)]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, + model.Job.states.RUNNING, + model.Job.states.RESUBMITTED)), + (model.Job.table.c.user_id == user_id)))) for row in result: # there should only be one row rval += row[0] @@ -369,8 +396,11 @@ if self.user_job_count is None and self.app.config.cache_user_job_count: self.user_job_count = {} query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \ - .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \ - .group_by(model.Job.table.c.user_id)) + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, + model.Job.states.RUNNING, + model.Job.states.RESUBMITTED)), + (model.Job.table.c.user_id is not None))) \ + .group_by(model.Job.table.c.user_id)) for row in query: self.user_job_count[row[0]] = row[1] elif self.user_job_count is None: @@ -428,6 +458,9 @@ self.total_job_count_per_destination[destination_id] = self.total_job_count_per_destination.get(destination_id, 0) + 1 def __check_user_jobs( self, job, job_wrapper ): + # TODO: Update output datasets' _state = LIMITED or some such new + # state, so the UI can reflect what jobs are waiting due to concurrency + # limits if job.user: # Check the hard limit first if self.app.job_config.limits.registered_user_concurrent_jobs: diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -164,8 +164,9 @@ else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) - def __cache_job_destination( self, params ): - raw_job_destination = self.job_wrapper.tool.get_job_destination( params ) + def __cache_job_destination( self, params, raw_job_destination=None ): + if raw_job_destination is None: + raw_job_destination = self.job_wrapper.tool.get_job_destination( params ) #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params ) if raw_job_destination.runner == DYNAMIC_RUNNER_NAME: job_destination = self.__handle_dynamic_job_destination( raw_job_destination ) @@ -183,7 +184,6 @@ self.__cache_job_destination( params ) return self.cached_job_destination - #def get_job_destination_id_or_tag( self, params ): - # if not hasattr( self, 'cached_job_destination_id_or_tag' ): - # self.__cache_job_destination( params ) - # return self.cached_job_destination_id_or_tag + def cache_job_destination( self, raw_job_destination ): + self.__cache_job_destination( None, raw_job_destination=raw_job_destination ) + return self.cached_job_destination diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -17,9 +17,12 @@ from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size from galaxy.util import in_directory from galaxy.util import ParamsWithSpecs +from galaxy.util.bunch import Bunch from galaxy.jobs.runners.util.job_script import job_script from galaxy.jobs.runners.util.env import env_to_statement +from .state_handler_factory import build_state_handlers + log = logging.getLogger( __name__ ) STOP_SIGNAL = object() @@ -57,6 +60,7 @@ if kwargs: log.debug( 'Loading %s with params: %s', self.runner_name, kwargs ) self.runner_params = RunnerParams( specs=runner_param_specs, params=kwargs ) + self.runner_state_handlers = build_state_handlers() def _init_worker_threads(self): """Start ``nworkers`` worker threads. @@ -310,11 +314,31 @@ job_info ) + def _handle_runner_state( self, runner_state, job_state ): + try: + for handler in self.runner_state_handlers.get(runner_state, []): + handler(self.app, self, job_state) + if job_state.runner_state_handled: + break + except: + log.exception('Caught exception in runner state handler:') + + def mark_as_resubmitted( self, job_state ): + job_state.job_wrapper.mark_as_resubmitted() + if not self.app.config.track_jobs_in_database: + job_state.job_wrapper.change_state( model.Job.states.QUEUED ) + self.app.job_manager.job_handler.dispatcher.put( job_state.job_wrapper ) + class JobState( object ): """ Encapsulate state of jobs. """ + runner_states = Bunch( + WALLTIME_REACHED = 'walltime_reached' + ) + def __init__( self ): + self.runner_state_handled = False def set_defaults( self, files_dir ): if self.job_wrapper is not None: @@ -348,6 +372,7 @@ """ def __init__( self, files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None ): + super( AsynchronousJobState, self ).__init__() self.old_state = None self.running = False self.check_count = 0 @@ -513,9 +538,13 @@ def fail_job( self, job_state ): if getattr( job_state, 'stop_job', True ): self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) - job_state.job_wrapper.fail( getattr( job_state, 'fail_message', 'Job failed' ) ) - if self.app.config.cleanup_job == "always": - job_state.cleanup() + self._handle_runner_state( 'failure', job_state ) + # Not convinced this is the best way to indicate this state, but + # something necessary + if not job_state.runner_state_handled: + job_state.job_wrapper.fail( getattr( job_state, 'fail_message', 'Job failed' ) ) + if self.app.config.cleanup_job == "always": + job_state.cleanup() def mark_as_finished(self, job_state): self.work_queue.put( ( self.finish_job, job_state ) ) diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/runners/slurm.py --- a/lib/galaxy/jobs/runners/slurm.py +++ b/lib/galaxy/jobs/runners/slurm.py @@ -43,7 +43,9 @@ log.warning( '(%s/%s) Job not found, assuming job check exceeded MinJobAge and completing as successful', ajs.job_wrapper.get_id_tag(), ajs.job_id ) drmaa_state = self.drmaa_job_states.DONE elif job_info['JobState'] == 'TIMEOUT': + log.info( '(%s/%s) Job hit walltime', ajs.job_wrapper.get_id_tag(), ajs.job_id ) ajs.fail_message = "This job was terminated because it ran longer than the maximum allowed job run time." + ajs.runner_state = ajs.runner_states.WALLTIME_REACHED elif job_info['JobState'] == 'NODE_FAIL': log.warning( '(%s/%s) Job failed due to node failure, attempting resubmission', ajs.job_wrapper.get_id_tag(), ajs.job_id ) ajs.job_wrapper.change_state( model.Job.states.QUEUED, info = 'Job was resubmitted due to node failure' ) @@ -53,13 +55,16 @@ except: ajs.fail_message = "This job failed due to a cluster node failure, and an attempt to resubmit the job failed." elif job_info['JobState'] == 'CANCELLED': + log.info( '(%s/%s) Job was cancelled via slurm (e.g. with scancel(1))', ajs.job_wrapper.get_id_tag(), ajs.job_id ) ajs.fail_message = "This job failed because it was cancelled by an administrator." else: + log.warning( '(%s/%s) Job failed due to unknown reasons, JobState was: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, job_info['JobState'] ) ajs.fail_message = "This job failed for reasons that could not be determined." if drmaa_state == self.drmaa_job_states.FAILED: ajs.fail_message += '\nPlease click the bug icon to report this problem if you need help.' ajs.stop_job = False self.work_queue.put( ( self.fail_job, ajs ) ) + return except Exception, e: log.exception( '(%s/%s) Unable to inspect failed slurm job using scontrol, job will be unconditionally failed: %s', ajs.job_wrapper.get_id_tag(), ajs.job_id, e ) super( SlurmJobRunner, self )._complete_terminal_job( ajs, drmaa_state = drmaa_state ) diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/runners/state_handler_factory.py --- /dev/null +++ b/lib/galaxy/jobs/runners/state_handler_factory.py @@ -0,0 +1,54 @@ +# Shamelessly stolen from the LWR. + +import os +import logging + +import galaxy.jobs.runners.state_handlers + + +log = logging.getLogger(__name__) + +def build_state_handlers(): + return _get_state_handlers_dict() + +def _get_modules(): + """ + >>> 'galaxy.jobs.runners.state_handlers.resubmit' in _get_modules() + True + """ + state_handlers_dir = galaxy.jobs.runners.state_handlers.__path__[0] + module_names = [] + for fname in os.listdir(state_handlers_dir): + if not(fname.startswith("_")) and fname.endswith(".py"): + module_name = "galaxy.jobs.runners.state_handlers.%s" % fname[:-len(".py")] + module_names.append(module_name) + log.debug('module_names: %s', module_names) + return module_names + +def _load_modules(): + modules = [] + for module_name in _get_modules(): + try: + log.debug('Importing %s', module_name) + module = __import__(module_name) + for comp in module_name.split(".")[1:]: + module = getattr(module, comp) + modules.append(module) + except BaseException as exception: + exception_str = str(exception) + message = "%s module could not be loaded: %s" % (module_name, exception_str) + log.warn(message) + continue + + return modules + +def _get_state_handlers_dict(): + state_handlers = {} + for module in _load_modules(): + for func in module.__all__: + if func not in state_handlers: + state_handlers[func] = [] + state_handlers[func].append(getattr(module, func)) + log.debug("Loaded '%s' state handler from module %s", func, module.__name__) + return state_handlers + diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/jobs/runners/state_handlers/resubmit.py --- /dev/null +++ b/lib/galaxy/jobs/runners/state_handlers/resubmit.py @@ -0,0 +1,39 @@ +import logging +from galaxy import model + + +__all__ = ['failure'] + +log = logging.getLogger(__name__) + + +def failure(app, job_runner, job_state): + if getattr( job_state, 'runner_state', None ) and job_state.runner_state == job_state.runner_states.WALLTIME_REACHED: + # Intercept jobs that hit the walltime and have a walltime or nonspecific resubmit destination configured + for resubmit in job_state.job_destination.get('resubmit'): + if resubmit.get('condition', None) and resubmit['condition'] != 'walltime_reached': + continue # There is a resubmit defined for the destination but its condition is not for walltime_reached + log.info("(%s/%s) Job will be resubmitted to '%s' because it reached the walltime at the '%s' destination", job_state.job_wrapper.job_id, job_state.job_id, resubmit['destination'], job_state.job_wrapper.job_destination.id ) + # fetch JobDestination for the id or tag + new_destination = app.job_config.get_destination(resubmit['destination']) + # Resolve dynamic if necessary + new_destination = job_state.job_wrapper.job_runner_mapper.cache_job_destination( new_destination ) + # Reset job state + job = job_state.job_wrapper.get_job() + if resubmit.get('handler', None): + log.debug('(%s/%s) Job reassigned to handler %s', job_state.job_wrapper.job_id, job_state.job_id, resubmit['handler']) + job.set_handler(resubmit['handler']) + job_runner.sa_session.add( job ) + # Is this safe to do here? + job_runner.sa_session.flush() + # Cache the destination to prevent rerunning dynamic after resubmit + job_state.job_wrapper.job_runner_mapper.cached_job_destination = new_destination + job_state.job_wrapper.set_job_destination(new_destination) + # Clear external ID (state change below flushes the change) + job.job_runner_external_id = None + # Allow the UI to query for resubmitted state + if job.params is None: + job.params = {} + job_state.runner_state_handled = True + job_runner.mark_as_resubmitted( job_state ) + return diff -r 49b11be8c5c4f1e51eb3e1f10f4b20f9cd77ab8e -r 7b209e06ddb944e953d340754439f4e3e5dc339d lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -295,6 +295,7 @@ _text_metric = JobMetricText states = Bunch( NEW = 'new', + RESUBMITTED = 'resubmitted', UPLOAD = 'upload', WAITING = 'waiting', QUEUED = 'queued', @@ -1299,7 +1300,9 @@ DISCARDED = 'discarded', PAUSED = 'paused', SETTING_METADATA = 'setting_metadata', - FAILED_METADATA = 'failed_metadata' ) + FAILED_METADATA = 'failed_metadata', + RESUBMITTED = 'resubmitted' ) + # failed_metadata and resubmitted are only valid as DatasetInstance states currently conversion_messages = Bunch( PENDING = "pending", NO_DATA = "no data", Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.