5 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/72a0157988f1/ Changeset: 72a0157988f1 User: jmchilton Date: 2014-06-11 07:19:20 Summary: Refactor to simplify __check_if_ready_to_run. Move checks for in memory managed jobs prior to common path through code into their own helper method. Affected #: 1 file
diff -r c52cb1f827d49f0f0ee2c1c97d3626dd1b2cc348 -r 72a0157988f1cf7e9440dc1cb3dc13ca6cdc42ba lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -240,7 +240,7 @@ try: # Check the job's dependencies, requeue if they're not done. # Some of these states will only happen when using the in-memory job queue - job_state = self.__check_if_ready_to_run( job ) + job_state = self.__check_job_state( job ) if job_state == JOB_WAIT: new_waiting_jobs.append( job.id ) elif job_state == JOB_INPUT_ERROR: @@ -281,7 +281,7 @@ # Done with the session self.sa_session.remove()
- def __check_if_ready_to_run( self, job ): + def __check_job_state( self, job ): """ Check if a job is ready to run by verifying that each of its input datasets is ready (specifically in the OK state). If any input dataset @@ -293,28 +293,12 @@ """ # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK if not self.track_jobs_in_database: - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - for dataset_assoc in job.input_datasets + job.input_library_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state == idata.states.FAILED_METADATA: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT + in_memory_job_state = self.__check_in_memory_job_state( job ) + if in_memory_job_state: + return in_memory_job_state + # If no problems, all inputs ready to go, continue + # with same checks either way. + # Create the job wrapper so that the destination can be set if job.id not in self.job_wrappers: self.job_wrappers[job.id] = self.job_wrapper( job ) @@ -348,6 +332,33 @@ self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id) return state
+ def __check_in_memory_job_state( self, job ): + if job.state == model.Job.states.DELETED: + return JOB_DELETED + elif job.state == model.Job.states.ERROR: + return JOB_ADMIN_DELETED + for dataset_assoc in job.input_datasets + job.input_library_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state == idata.states.FAILED_METADATA: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): + # need to requeue + return JOB_WAIT + + # All inputs ready to go. + return None + def __clear_job_count( self ): self.user_job_count = None self.user_job_count_per_destination = None
https://bitbucket.org/galaxy/galaxy-central/commits/519050ab95d7/ Changeset: 519050ab95d7 User: jmchilton Date: 2014-06-11 07:19:20 Summary: Refactor __check_if_ready_to_run - less code duplication... ... in the sense that it is no longer repeatedly grabbing same values from map, from wrapper, etc. Affected #: 1 file
diff -r 72a0157988f1cf7e9440dc1cb3dc13ca6cdc42ba -r 519050ab95d7177abdea8d55bae3b03a3a289476 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -291,33 +291,37 @@ job can be dispatched. Otherwise, return JOB_WAIT indicating that input datasets are still being prepared. """ - # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK if not self.track_jobs_in_database: - in_memory_job_state = self.__check_in_memory_job_state( job ) - if in_memory_job_state: - return in_memory_job_state - # If no problems, all inputs ready to go, continue - # with same checks either way. + in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job ) + if in_memory_not_ready_state: + return in_memory_not_ready_state + + # Else, if tracking in the database, job.state is guaranteed to be NEW and + # the inputs are guaranteed to be OK.
# Create the job wrapper so that the destination can be set - if job.id not in self.job_wrappers: - self.job_wrappers[job.id] = self.job_wrapper( job ) + job_id = job.id + job_wrapper = self.job_wrappers.get( job_id, None ) + if not job_wrapper: + job_wrapper = self.job_wrapper( job ) + self.job_wrappers[ job_id ] = job_wrapper + # Cause the job_destination to be set and cached by the mapper try: - self.job_wrappers[job.id].job_destination + job_destination = job_wrapper.job_destination except Exception, e: failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: log.exception( 'Failed to generate job destination' ) else: log.debug( "Intentionally failing job with message (%s)" % failure_message ) - self.job_wrappers[job.id].fail( failure_message ) + job_wrapper.fail( failure_message ) return JOB_ERROR # job is ready to run, check limits # TODO: these checks should be refactored to minimize duplication and made more modular/pluggable - state = self.__check_destination_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_destination_jobs( job, job_wrapper ) if state == JOB_READY: - state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_user_jobs( job, job_wrapper ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: @@ -329,10 +333,13 @@ pass # No history, should not happen with an anon user if state == JOB_READY: # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration - self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id) + self.increase_running_job_count(job.user_id, job_destination.id ) return state
- def __check_in_memory_job_state( self, job ): + def __verify_in_memory_job_inputs( self, job ): + """ Perform the same checks that happen via SQL for in-memory managed + jobs. + """ if job.state == model.Job.states.DELETED: return JOB_DELETED elif job.state == model.Job.states.ERROR:
https://bitbucket.org/galaxy/galaxy-central/commits/8bbb244d339b/ Changeset: 8bbb244d339b User: jmchilton Date: 2014-06-11 07:19:20 Summary: Break up __check_job_state - extract new __verify_job_ready method. Affected #: 1 file
diff -r 519050ab95d7177abdea8d55bae3b03a3a289476 -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -306,8 +306,25 @@ job_wrapper = self.job_wrapper( job ) self.job_wrappers[ job_id ] = job_wrapper
- # Cause the job_destination to be set and cached by the mapper + # If state == JOB_READY, assume job_destination also set - otherwise + # in case of various error or cancelled states do not assume + # destination has been set. + state, job_destination = self.__verify_job_ready( job, job_wrapper ) + + if state == JOB_READY: + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, job_destination.id ) + return state + + def __verify_job_ready( self, job, job_wrapper ): + """ Compute job destination and verify job is ready at that + destination by checking job limits and quota. If this method + return a job state of JOB_READY - it MUST also return a job + destination. + """ + job_destination = None try: + # Cause the job_destination to be set and cached by the mapper job_destination = job_wrapper.job_destination except Exception, e: failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) @@ -316,7 +333,7 @@ else: log.debug( "Intentionally failing job with message (%s)" % failure_message ) job_wrapper.fail( failure_message ) - return JOB_ERROR + return JOB_ERROR, job_destination # job is ready to run, check limits # TODO: these checks should be refactored to minimize duplication and made more modular/pluggable state = self.__check_destination_jobs( job, job_wrapper ) @@ -328,13 +345,10 @@ try: usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) if usage > quota: - return JOB_USER_OVER_QUOTA + return JOB_USER_OVER_QUOTA, job_destination except AssertionError, e: pass # No history, should not happen with an anon user - if state == JOB_READY: - # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration - self.increase_running_job_count(job.user_id, job_destination.id ) - return state + return state, job_destination
def __verify_in_memory_job_inputs( self, job ): """ Perform the same checks that happen via SQL for in-memory managed
https://bitbucket.org/galaxy/galaxy-central/commits/e1c8d892be5d/ Changeset: e1c8d892be5d User: jmchilton Date: 2014-06-11 07:19:20 Summary: Allow dynamic job destinations to throw new JobNotReadyException to indicate delayed decision. Perhaps wait on some deployer defined limit or resource to become available, etc... Affected #: 2 files
diff -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 -r e1c8d892be5de20ed4564a3e467943701dccacd8 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,7 @@ from galaxy import model from galaxy.util.sleeper import Sleeper from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination +from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -326,6 +327,9 @@ try: # Cause the job_destination to be set and cached by the mapper job_destination = job_wrapper.job_destination + except JobNotReadyException as e: + job_state = e.job_state or JOB_WAIT + return job_state, None except Exception, e: failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
diff -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 -r e1c8d892be5de20ed4564a3e467943701dccacd8 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -16,6 +16,13 @@ self.failure_message = failure_message
+class JobNotReadyException( Exception ): + + def __init__( self, job_state=None, message=None ): + self.job_state = job_state + self.message = message + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs
https://bitbucket.org/galaxy/galaxy-central/commits/4ef50975607e/ Changeset: 4ef50975607e User: jmchilton Date: 2014-06-24 04:15:44 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #411)
Allow dynamic job destinations rules to throw `galaxy.jobs.mapper.JobNotReadyException` indicating job not ready. Affected #: 2 files
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,7 @@ from galaxy import model from galaxy.util.sleeper import Sleeper from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination +from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -263,7 +264,7 @@ try: # Check the job's dependencies, requeue if they're not done. # Some of these states will only happen when using the in-memory job queue - job_state = self.__check_if_ready_to_run( job ) + job_state = self.__check_job_state( job ) if job_state == JOB_WAIT: new_waiting_jobs.append( job.id ) elif job_state == JOB_INPUT_ERROR: @@ -304,7 +305,7 @@ # Done with the session self.sa_session.remove()
- def __check_if_ready_to_run( self, job ): + def __check_job_state( self, job ): """ Check if a job is ready to run by verifying that each of its input datasets is ready (specifically in the OK state). If any input dataset @@ -314,62 +315,97 @@ job can be dispatched. Otherwise, return JOB_WAIT indicating that input datasets are still being prepared. """ - # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK if not self.track_jobs_in_database: - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - for dataset_assoc in job.input_datasets + job.input_library_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state == idata.states.FAILED_METADATA: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT + in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job ) + if in_memory_not_ready_state: + return in_memory_not_ready_state + + # Else, if tracking in the database, job.state is guaranteed to be NEW and + # the inputs are guaranteed to be OK. + # Create the job wrapper so that the destination can be set - if job.id not in self.job_wrappers: - self.job_wrappers[job.id] = self.job_wrapper( job ) - # Cause the job_destination to be set and cached by the mapper + job_id = job.id + job_wrapper = self.job_wrappers.get( job_id, None ) + if not job_wrapper: + job_wrapper = self.job_wrapper( job ) + self.job_wrappers[ job_id ] = job_wrapper + + # If state == JOB_READY, assume job_destination also set - otherwise + # in case of various error or cancelled states do not assume + # destination has been set. + state, job_destination = self.__verify_job_ready( job, job_wrapper ) + + if state == JOB_READY: + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, job_destination.id ) + return state + + def __verify_job_ready( self, job, job_wrapper ): + """ Compute job destination and verify job is ready at that + destination by checking job limits and quota. If this method + return a job state of JOB_READY - it MUST also return a job + destination. + """ + job_destination = None try: - self.job_wrappers[job.id].job_destination + # Cause the job_destination to be set and cached by the mapper + job_destination = job_wrapper.job_destination + except JobNotReadyException as e: + job_state = e.job_state or JOB_WAIT + return job_state, None except Exception, e: failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: log.exception( 'Failed to generate job destination' ) else: log.debug( "Intentionally failing job with message (%s)" % failure_message ) - self.job_wrappers[job.id].fail( failure_message ) - return JOB_ERROR + job_wrapper.fail( failure_message ) + return JOB_ERROR, job_destination # job is ready to run, check limits # TODO: these checks should be refactored to minimize duplication and made more modular/pluggable - state = self.__check_destination_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_destination_jobs( job, job_wrapper ) if state == JOB_READY: - state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_user_jobs( job, job_wrapper ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: try: usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) if usage > quota: - return JOB_USER_OVER_QUOTA + return JOB_USER_OVER_QUOTA, job_destination except AssertionError, e: pass # No history, should not happen with an anon user - if state == JOB_READY: - # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration - self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id) - return state + return state, job_destination + + def __verify_in_memory_job_inputs( self, job ): + """ Perform the same checks that happen via SQL for in-memory managed + jobs. + """ + if job.state == model.Job.states.DELETED: + return JOB_DELETED + elif job.state == model.Job.states.ERROR: + return JOB_ADMIN_DELETED + for dataset_assoc in job.input_datasets + job.input_library_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state == idata.states.FAILED_METADATA: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): + # need to requeue + return JOB_WAIT + + # All inputs ready to go. + return None
def __clear_job_count( self ): self.user_job_count = None
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -16,6 +16,13 @@ self.failure_message = failure_message
+class JobNotReadyException( Exception ): + + def __init__( self, job_state=None, message=None ): + self.job_state = job_state + self.message = message + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
galaxy-commits@lists.galaxyproject.org