1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/4ef50975607e/ Changeset: 4ef50975607e User: jmchilton Date: 2014-06-24 04:15:44 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #411) Allow dynamic job destinations rules to throw `galaxy.jobs.mapper.JobNotReadyException` indicating job not ready. Affected #: 2 files diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,7 @@ from galaxy import model from galaxy.util.sleeper import Sleeper from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination +from galaxy.jobs.mapper import JobNotReadyException log = logging.getLogger( __name__ ) @@ -263,7 +264,7 @@ try: # Check the job's dependencies, requeue if they're not done. # Some of these states will only happen when using the in-memory job queue - job_state = self.__check_if_ready_to_run( job ) + job_state = self.__check_job_state( job ) if job_state == JOB_WAIT: new_waiting_jobs.append( job.id ) elif job_state == JOB_INPUT_ERROR: @@ -304,7 +305,7 @@ # Done with the session self.sa_session.remove() - def __check_if_ready_to_run( self, job ): + def __check_job_state( self, job ): """ Check if a job is ready to run by verifying that each of its input datasets is ready (specifically in the OK state). If any input dataset @@ -314,62 +315,97 @@ job can be dispatched. Otherwise, return JOB_WAIT indicating that input datasets are still being prepared. """ - # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK if not self.track_jobs_in_database: - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - for dataset_assoc in job.input_datasets + job.input_library_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state == idata.states.FAILED_METADATA: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT + in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job ) + if in_memory_not_ready_state: + return in_memory_not_ready_state + + # Else, if tracking in the database, job.state is guaranteed to be NEW and + # the inputs are guaranteed to be OK. + # Create the job wrapper so that the destination can be set - if job.id not in self.job_wrappers: - self.job_wrappers[job.id] = self.job_wrapper( job ) - # Cause the job_destination to be set and cached by the mapper + job_id = job.id + job_wrapper = self.job_wrappers.get( job_id, None ) + if not job_wrapper: + job_wrapper = self.job_wrapper( job ) + self.job_wrappers[ job_id ] = job_wrapper + + # If state == JOB_READY, assume job_destination also set - otherwise + # in case of various error or cancelled states do not assume + # destination has been set. + state, job_destination = self.__verify_job_ready( job, job_wrapper ) + + if state == JOB_READY: + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, job_destination.id ) + return state + + def __verify_job_ready( self, job, job_wrapper ): + """ Compute job destination and verify job is ready at that + destination by checking job limits and quota. If this method + return a job state of JOB_READY - it MUST also return a job + destination. + """ + job_destination = None try: - self.job_wrappers[job.id].job_destination + # Cause the job_destination to be set and cached by the mapper + job_destination = job_wrapper.job_destination + except JobNotReadyException as e: + job_state = e.job_state or JOB_WAIT + return job_state, None except Exception, e: failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: log.exception( 'Failed to generate job destination' ) else: log.debug( "Intentionally failing job with message (%s)" % failure_message ) - self.job_wrappers[job.id].fail( failure_message ) - return JOB_ERROR + job_wrapper.fail( failure_message ) + return JOB_ERROR, job_destination # job is ready to run, check limits # TODO: these checks should be refactored to minimize duplication and made more modular/pluggable - state = self.__check_destination_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_destination_jobs( job, job_wrapper ) if state == JOB_READY: - state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_user_jobs( job, job_wrapper ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: try: usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) if usage > quota: - return JOB_USER_OVER_QUOTA + return JOB_USER_OVER_QUOTA, job_destination except AssertionError, e: pass # No history, should not happen with an anon user - if state == JOB_READY: - # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration - self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id) - return state + return state, job_destination + + def __verify_in_memory_job_inputs( self, job ): + """ Perform the same checks that happen via SQL for in-memory managed + jobs. + """ + if job.state == model.Job.states.DELETED: + return JOB_DELETED + elif job.state == model.Job.states.ERROR: + return JOB_ADMIN_DELETED + for dataset_assoc in job.input_datasets + job.input_library_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state == idata.states.FAILED_METADATA: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): + # need to requeue + return JOB_WAIT + + # All inputs ready to go. + return None def __clear_job_count( self ): self.user_job_count = None diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -16,6 +16,13 @@ self.failure_message = failure_message +class JobNotReadyException( Exception ): + + def __init__( self, job_state=None, message=None ): + self.job_state = job_state + self.message = message + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.