1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/4ef50975607e/
Changeset: 4ef50975607e
User: jmchilton
Date: 2014-06-24 04:15:44
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #411)
Allow dynamic job destinations rules to throw `galaxy.jobs.mapper.JobNotReadyException`
indicating job not ready.
Affected #: 2 files
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r
4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -13,6 +13,7 @@
from galaxy import model
from galaxy.util.sleeper import Sleeper
from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination
+from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -263,7 +264,7 @@
try:
# Check the job's dependencies, requeue if they're not done.
# Some of these states will only happen when using the in-memory job
queue
- job_state = self.__check_if_ready_to_run( job )
+ job_state = self.__check_job_state( job )
if job_state == JOB_WAIT:
new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
@@ -304,7 +305,7 @@
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job ):
+ def __check_job_state( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -314,62 +315,97 @@
job can be dispatched. Otherwise, return JOB_WAIT indicating that input
datasets are still being prepared.
"""
- # If tracking in the database, job.state is guaranteed to be NEW and the inputs
are guaranteed to be OK
if not self.track_jobs_in_database:
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s (file: %s) was deleted before the job started" % ( idata.hid,
idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state ==
idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id ==
self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
+ in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job )
+ if in_memory_not_ready_state:
+ return in_memory_not_ready_state
+
+ # Else, if tracking in the database, job.state is guaranteed to be NEW and
+ # the inputs are guaranteed to be OK.
+
# Create the job wrapper so that the destination can be set
- if job.id not in self.job_wrappers:
- self.job_wrappers[job.id] = self.job_wrapper( job )
- # Cause the job_destination to be set and cached by the mapper
+ job_id = job.id
+ job_wrapper = self.job_wrappers.get( job_id, None )
+ if not job_wrapper:
+ job_wrapper = self.job_wrapper( job )
+ self.job_wrappers[ job_id ] = job_wrapper
+
+ # If state == JOB_READY, assume job_destination also set - otherwise
+ # in case of various error or cancelled states do not assume
+ # destination has been set.
+ state, job_destination = self.__verify_job_ready( job, job_wrapper )
+
+ if state == JOB_READY:
+ # PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, job_destination.id )
+ return state
+
+ def __verify_job_ready( self, job, job_wrapper ):
+ """ Compute job destination and verify job is ready at that
+ destination by checking job limits and quota. If this method
+ return a job state of JOB_READY - it MUST also return a job
+ destination.
+ """
+ job_destination = None
try:
- self.job_wrappers[job.id].job_destination
+ # Cause the job_destination to be set and cached by the mapper
+ job_destination = job_wrapper.job_destination
+ except JobNotReadyException as e:
+ job_state = e.job_state or JOB_WAIT
+ return job_state, None
except Exception, e:
failure_message = getattr( e, 'failure_message',
DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
log.exception( 'Failed to generate job destination' )
else:
log.debug( "Intentionally failing job with message (%s)" %
failure_message )
- self.job_wrappers[job.id].fail( failure_message )
- return JOB_ERROR
+ job_wrapper.fail( failure_message )
+ return JOB_ERROR, job_destination
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more
modular/pluggable
- state = self.__check_destination_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_destination_jobs( job, job_wrapper )
if state == JOB_READY:
- state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_user_jobs( job, job_wrapper )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
try:
usage = self.app.quota_agent.get_usage( user=job.user,
history=job.history )
if usage > quota:
- return JOB_USER_OVER_QUOTA
+ return JOB_USER_OVER_QUOTA, job_destination
except AssertionError, e:
pass # No history, should not happen with an anon user
- if state == JOB_READY:
- # PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id,
self.job_wrappers[job.id].job_destination.id)
- return state
+ return state, job_destination
+
+ def __verify_in_memory_job_inputs( self, job ):
+ """ Perform the same checks that happen via SQL for in-memory
managed
+ jobs.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name
) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state ==
idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id ==
self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+
+ # All inputs ready to go.
+ return None
def __clear_job_count( self ):
self.user_job_count = None
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r
4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -16,6 +16,13 @@
self.failure_message = failure_message
+class JobNotReadyException( Exception ):
+
+ def __init__( self, job_state=None, message=None ):
+ self.job_state = job_state
+ self.message = message
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
Repository URL:
https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from
bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.