5 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/72a0157988f1/
Changeset: 72a0157988f1
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Refactor to simplify __check_if_ready_to_run.
Move checks for in memory managed jobs prior to common path through code into their own
helper method.
Affected #: 1 file
diff -r c52cb1f827d49f0f0ee2c1c97d3626dd1b2cc348 -r
72a0157988f1cf7e9440dc1cb3dc13ca6cdc42ba lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -240,7 +240,7 @@
try:
# Check the job's dependencies, requeue if they're not done.
# Some of these states will only happen when using the in-memory job
queue
- job_state = self.__check_if_ready_to_run( job )
+ job_state = self.__check_job_state( job )
if job_state == JOB_WAIT:
new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
@@ -281,7 +281,7 @@
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job ):
+ def __check_job_state( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -293,28 +293,12 @@
"""
# If tracking in the database, job.state is guaranteed to be NEW and the inputs
are guaranteed to be OK
if not self.track_jobs_in_database:
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s (file: %s) was deleted before the job started" % ( idata.hid,
idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state ==
idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id ==
self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
+ in_memory_job_state = self.__check_in_memory_job_state( job )
+ if in_memory_job_state:
+ return in_memory_job_state
+ # If no problems, all inputs ready to go, continue
+ # with same checks either way.
+
# Create the job wrapper so that the destination can be set
if job.id not in self.job_wrappers:
self.job_wrappers[job.id] = self.job_wrapper( job )
@@ -348,6 +332,33 @@
self.increase_running_job_count(job.user_id,
self.job_wrappers[job.id].job_destination.id)
return state
+ def __check_in_memory_job_state( self, job ):
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name
) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state ==
idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id ==
self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+
+ # All inputs ready to go.
+ return None
+
def __clear_job_count( self ):
self.user_job_count = None
self.user_job_count_per_destination = None
https://bitbucket.org/galaxy/galaxy-central/commits/519050ab95d7/
Changeset: 519050ab95d7
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Refactor __check_if_ready_to_run - less code duplication...
... in the sense that it is no longer repeatedly grabbing same values from map, from
wrapper, etc.
Affected #: 1 file
diff -r 72a0157988f1cf7e9440dc1cb3dc13ca6cdc42ba -r
519050ab95d7177abdea8d55bae3b03a3a289476 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -291,33 +291,37 @@
job can be dispatched. Otherwise, return JOB_WAIT indicating that input
datasets are still being prepared.
"""
- # If tracking in the database, job.state is guaranteed to be NEW and the inputs
are guaranteed to be OK
if not self.track_jobs_in_database:
- in_memory_job_state = self.__check_in_memory_job_state( job )
- if in_memory_job_state:
- return in_memory_job_state
- # If no problems, all inputs ready to go, continue
- # with same checks either way.
+ in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job )
+ if in_memory_not_ready_state:
+ return in_memory_not_ready_state
+
+ # Else, if tracking in the database, job.state is guaranteed to be NEW and
+ # the inputs are guaranteed to be OK.
# Create the job wrapper so that the destination can be set
- if job.id not in self.job_wrappers:
- self.job_wrappers[job.id] = self.job_wrapper( job )
+ job_id = job.id
+ job_wrapper = self.job_wrappers.get( job_id, None )
+ if not job_wrapper:
+ job_wrapper = self.job_wrapper( job )
+ self.job_wrappers[ job_id ] = job_wrapper
+
# Cause the job_destination to be set and cached by the mapper
try:
- self.job_wrappers[job.id].job_destination
+ job_destination = job_wrapper.job_destination
except Exception, e:
failure_message = getattr( e, 'failure_message',
DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
log.exception( 'Failed to generate job destination' )
else:
log.debug( "Intentionally failing job with message (%s)" %
failure_message )
- self.job_wrappers[job.id].fail( failure_message )
+ job_wrapper.fail( failure_message )
return JOB_ERROR
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more
modular/pluggable
- state = self.__check_destination_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_destination_jobs( job, job_wrapper )
if state == JOB_READY:
- state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_user_jobs( job, job_wrapper )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
@@ -329,10 +333,13 @@
pass # No history, should not happen with an anon user
if state == JOB_READY:
# PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id,
self.job_wrappers[job.id].job_destination.id)
+ self.increase_running_job_count(job.user_id, job_destination.id )
return state
- def __check_in_memory_job_state( self, job ):
+ def __verify_in_memory_job_inputs( self, job ):
+ """ Perform the same checks that happen via SQL for in-memory
managed
+ jobs.
+ """
if job.state == model.Job.states.DELETED:
return JOB_DELETED
elif job.state == model.Job.states.ERROR:
https://bitbucket.org/galaxy/galaxy-central/commits/8bbb244d339b/
Changeset: 8bbb244d339b
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Break up __check_job_state - extract new __verify_job_ready method.
Affected #: 1 file
diff -r 519050ab95d7177abdea8d55bae3b03a3a289476 -r
8bbb244d339b396c841341faaa9353f4a1c5e5f1 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -306,8 +306,25 @@
job_wrapper = self.job_wrapper( job )
self.job_wrappers[ job_id ] = job_wrapper
- # Cause the job_destination to be set and cached by the mapper
+ # If state == JOB_READY, assume job_destination also set - otherwise
+ # in case of various error or cancelled states do not assume
+ # destination has been set.
+ state, job_destination = self.__verify_job_ready( job, job_wrapper )
+
+ if state == JOB_READY:
+ # PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, job_destination.id )
+ return state
+
+ def __verify_job_ready( self, job, job_wrapper ):
+ """ Compute job destination and verify job is ready at that
+ destination by checking job limits and quota. If this method
+ return a job state of JOB_READY - it MUST also return a job
+ destination.
+ """
+ job_destination = None
try:
+ # Cause the job_destination to be set and cached by the mapper
job_destination = job_wrapper.job_destination
except Exception, e:
failure_message = getattr( e, 'failure_message',
DEFAULT_JOB_PUT_FAILURE_MESSAGE )
@@ -316,7 +333,7 @@
else:
log.debug( "Intentionally failing job with message (%s)" %
failure_message )
job_wrapper.fail( failure_message )
- return JOB_ERROR
+ return JOB_ERROR, job_destination
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more
modular/pluggable
state = self.__check_destination_jobs( job, job_wrapper )
@@ -328,13 +345,10 @@
try:
usage = self.app.quota_agent.get_usage( user=job.user,
history=job.history )
if usage > quota:
- return JOB_USER_OVER_QUOTA
+ return JOB_USER_OVER_QUOTA, job_destination
except AssertionError, e:
pass # No history, should not happen with an anon user
- if state == JOB_READY:
- # PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id, job_destination.id )
- return state
+ return state, job_destination
def __verify_in_memory_job_inputs( self, job ):
""" Perform the same checks that happen via SQL for in-memory
managed
https://bitbucket.org/galaxy/galaxy-central/commits/e1c8d892be5d/
Changeset: e1c8d892be5d
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Allow dynamic job destinations to throw new JobNotReadyException to indicate
delayed decision.
Perhaps wait on some deployer defined limit or resource to become available, etc...
Affected #: 2 files
diff -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 -r
e1c8d892be5de20ed4564a3e467943701dccacd8 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -13,6 +13,7 @@
from galaxy import model
from galaxy.util.sleeper import Sleeper
from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination
+from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -326,6 +327,9 @@
try:
# Cause the job_destination to be set and cached by the mapper
job_destination = job_wrapper.job_destination
+ except JobNotReadyException as e:
+ job_state = e.job_state or JOB_WAIT
+ return job_state, None
except Exception, e:
failure_message = getattr( e, 'failure_message',
DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
diff -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 -r
e1c8d892be5de20ed4564a3e467943701dccacd8 lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -16,6 +16,13 @@
self.failure_message = failure_message
+class JobNotReadyException( Exception ):
+
+ def __init__( self, job_state=None, message=None ):
+ self.job_state = job_state
+ self.message = message
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
https://bitbucket.org/galaxy/galaxy-central/commits/4ef50975607e/
Changeset: 4ef50975607e
User: jmchilton
Date: 2014-06-24 04:15:44
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #411)
Allow dynamic job destinations rules to throw `galaxy.jobs.mapper.JobNotReadyException`
indicating job not ready.
Affected #: 2 files
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r
4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -13,6 +13,7 @@
from galaxy import model
from galaxy.util.sleeper import Sleeper
from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination
+from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -263,7 +264,7 @@
try:
# Check the job's dependencies, requeue if they're not done.
# Some of these states will only happen when using the in-memory job
queue
- job_state = self.__check_if_ready_to_run( job )
+ job_state = self.__check_job_state( job )
if job_state == JOB_WAIT:
new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
@@ -304,7 +305,7 @@
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job ):
+ def __check_job_state( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -314,62 +315,97 @@
job can be dispatched. Otherwise, return JOB_WAIT indicating that input
datasets are still being prepared.
"""
- # If tracking in the database, job.state is guaranteed to be NEW and the inputs
are guaranteed to be OK
if not self.track_jobs_in_database:
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s (file: %s) was deleted before the job started" % ( idata.hid,
idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail(
"input data %s failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state ==
idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id ==
self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
+ in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job )
+ if in_memory_not_ready_state:
+ return in_memory_not_ready_state
+
+ # Else, if tracking in the database, job.state is guaranteed to be NEW and
+ # the inputs are guaranteed to be OK.
+
# Create the job wrapper so that the destination can be set
- if job.id not in self.job_wrappers:
- self.job_wrappers[job.id] = self.job_wrapper( job )
- # Cause the job_destination to be set and cached by the mapper
+ job_id = job.id
+ job_wrapper = self.job_wrappers.get( job_id, None )
+ if not job_wrapper:
+ job_wrapper = self.job_wrapper( job )
+ self.job_wrappers[ job_id ] = job_wrapper
+
+ # If state == JOB_READY, assume job_destination also set - otherwise
+ # in case of various error or cancelled states do not assume
+ # destination has been set.
+ state, job_destination = self.__verify_job_ready( job, job_wrapper )
+
+ if state == JOB_READY:
+ # PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, job_destination.id )
+ return state
+
+ def __verify_job_ready( self, job, job_wrapper ):
+ """ Compute job destination and verify job is ready at that
+ destination by checking job limits and quota. If this method
+ return a job state of JOB_READY - it MUST also return a job
+ destination.
+ """
+ job_destination = None
try:
- self.job_wrappers[job.id].job_destination
+ # Cause the job_destination to be set and cached by the mapper
+ job_destination = job_wrapper.job_destination
+ except JobNotReadyException as e:
+ job_state = e.job_state or JOB_WAIT
+ return job_state, None
except Exception, e:
failure_message = getattr( e, 'failure_message',
DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
log.exception( 'Failed to generate job destination' )
else:
log.debug( "Intentionally failing job with message (%s)" %
failure_message )
- self.job_wrappers[job.id].fail( failure_message )
- return JOB_ERROR
+ job_wrapper.fail( failure_message )
+ return JOB_ERROR, job_destination
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more
modular/pluggable
- state = self.__check_destination_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_destination_jobs( job, job_wrapper )
if state == JOB_READY:
- state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_user_jobs( job, job_wrapper )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
try:
usage = self.app.quota_agent.get_usage( user=job.user,
history=job.history )
if usage > quota:
- return JOB_USER_OVER_QUOTA
+ return JOB_USER_OVER_QUOTA, job_destination
except AssertionError, e:
pass # No history, should not happen with an anon user
- if state == JOB_READY:
- # PASS. increase usage by one job (if caching) so that multiple jobs
aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id,
self.job_wrappers[job.id].job_destination.id)
- return state
+ return state, job_destination
+
+ def __verify_in_memory_job_inputs( self, job ):
+ """ Perform the same checks that happen via SQL for in-memory
managed
+ jobs.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name
) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input
data %s failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state ==
idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id ==
self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+
+ # All inputs ready to go.
+ return None
def __clear_job_count( self ):
self.user_job_count = None
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r
4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -16,6 +16,13 @@
self.failure_message = failure_message
+class JobNotReadyException( Exception ):
+
+ def __init__( self, job_state=None, message=None ):
+ self.job_state = job_state
+ self.message = message
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
Repository URL:
https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from
bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.