1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/fbf4b5ab17a9/
Changeset: fbf4b5ab17a9
User: jmchilton
Date: 2014-06-24 04:55:11
Summary: Make galaxy.util.in_directory slightly more generic to match Pulsar's variant.
Fixes failing unit test in just added code.
Affected #: 1 file
diff -r 566388d623749dad4259ea04cf44a5d858a56671 -r fbf4b5ab17a90ec7b0deac658ead6a99b9a6ca76 lib/galaxy/util/__init__.py
--- a/lib/galaxy/util/__init__.py
+++ b/lib/galaxy/util/__init__.py
@@ -417,17 +417,16 @@
return slug_base
-def in_directory( file, directory ):
+def in_directory( file, directory, local_path_module=os.path ):
"""
Return true, if the common prefix of both is equal to directory
e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
"""
# Make both absolute.
- directory = os.path.abspath( directory )
- file = os.path.abspath( file )
-
- return os.path.commonprefix( [ file, directory ] ) == directory
+ directory = local_path_module.abspath(directory)
+ file = local_path_module.abspath(file)
+ return local_path_module.commonprefix([file, directory]) == directory
def merge_sorted_iterables( operator, *iterables ):
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/4ef50975607e/
Changeset: 4ef50975607e
User: jmchilton
Date: 2014-06-24 04:15:44
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #411)
Allow dynamic job destinations rules to throw `galaxy.jobs.mapper.JobNotReadyException` indicating job not ready.
Affected #: 2 files
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -13,6 +13,7 @@
from galaxy import model
from galaxy.util.sleeper import Sleeper
from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination
+from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -263,7 +264,7 @@
try:
# Check the job's dependencies, requeue if they're not done.
# Some of these states will only happen when using the in-memory job queue
- job_state = self.__check_if_ready_to_run( job )
+ job_state = self.__check_job_state( job )
if job_state == JOB_WAIT:
new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
@@ -304,7 +305,7 @@
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job ):
+ def __check_job_state( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -314,62 +315,97 @@
job can be dispatched. Otherwise, return JOB_WAIT indicating that input
datasets are still being prepared.
"""
- # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK
if not self.track_jobs_in_database:
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
+ in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job )
+ if in_memory_not_ready_state:
+ return in_memory_not_ready_state
+
+ # Else, if tracking in the database, job.state is guaranteed to be NEW and
+ # the inputs are guaranteed to be OK.
+
# Create the job wrapper so that the destination can be set
- if job.id not in self.job_wrappers:
- self.job_wrappers[job.id] = self.job_wrapper( job )
- # Cause the job_destination to be set and cached by the mapper
+ job_id = job.id
+ job_wrapper = self.job_wrappers.get( job_id, None )
+ if not job_wrapper:
+ job_wrapper = self.job_wrapper( job )
+ self.job_wrappers[ job_id ] = job_wrapper
+
+ # If state == JOB_READY, assume job_destination also set - otherwise
+ # in case of various error or cancelled states do not assume
+ # destination has been set.
+ state, job_destination = self.__verify_job_ready( job, job_wrapper )
+
+ if state == JOB_READY:
+ # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, job_destination.id )
+ return state
+
+ def __verify_job_ready( self, job, job_wrapper ):
+ """ Compute job destination and verify job is ready at that
+ destination by checking job limits and quota. If this method
+ return a job state of JOB_READY - it MUST also return a job
+ destination.
+ """
+ job_destination = None
try:
- self.job_wrappers[job.id].job_destination
+ # Cause the job_destination to be set and cached by the mapper
+ job_destination = job_wrapper.job_destination
+ except JobNotReadyException as e:
+ job_state = e.job_state or JOB_WAIT
+ return job_state, None
except Exception, e:
failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
log.exception( 'Failed to generate job destination' )
else:
log.debug( "Intentionally failing job with message (%s)" % failure_message )
- self.job_wrappers[job.id].fail( failure_message )
- return JOB_ERROR
+ job_wrapper.fail( failure_message )
+ return JOB_ERROR, job_destination
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more modular/pluggable
- state = self.__check_destination_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_destination_jobs( job, job_wrapper )
if state == JOB_READY:
- state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_user_jobs( job, job_wrapper )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
try:
usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
if usage > quota:
- return JOB_USER_OVER_QUOTA
+ return JOB_USER_OVER_QUOTA, job_destination
except AssertionError, e:
pass # No history, should not happen with an anon user
- if state == JOB_READY:
- # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id)
- return state
+ return state, job_destination
+
+ def __verify_in_memory_job_inputs( self, job ):
+ """ Perform the same checks that happen via SQL for in-memory managed
+ jobs.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+
+ # All inputs ready to go.
+ return None
def __clear_job_count( self ):
self.user_job_count = None
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -16,6 +16,13 @@
self.failure_message = failure_message
+class JobNotReadyException( Exception ):
+
+ def __init__( self, job_state=None, message=None ):
+ self.job_state = job_state
+ self.message = message
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
5 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/72a0157988f1/
Changeset: 72a0157988f1
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Refactor to simplify __check_if_ready_to_run.
Move checks for in memory managed jobs prior to common path through code into their own helper method.
Affected #: 1 file
diff -r c52cb1f827d49f0f0ee2c1c97d3626dd1b2cc348 -r 72a0157988f1cf7e9440dc1cb3dc13ca6cdc42ba lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -240,7 +240,7 @@
try:
# Check the job's dependencies, requeue if they're not done.
# Some of these states will only happen when using the in-memory job queue
- job_state = self.__check_if_ready_to_run( job )
+ job_state = self.__check_job_state( job )
if job_state == JOB_WAIT:
new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
@@ -281,7 +281,7 @@
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job ):
+ def __check_job_state( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -293,28 +293,12 @@
"""
# If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK
if not self.track_jobs_in_database:
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
+ in_memory_job_state = self.__check_in_memory_job_state( job )
+ if in_memory_job_state:
+ return in_memory_job_state
+ # If no problems, all inputs ready to go, continue
+ # with same checks either way.
+
# Create the job wrapper so that the destination can be set
if job.id not in self.job_wrappers:
self.job_wrappers[job.id] = self.job_wrapper( job )
@@ -348,6 +332,33 @@
self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id)
return state
+ def __check_in_memory_job_state( self, job ):
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+
+ # All inputs ready to go.
+ return None
+
def __clear_job_count( self ):
self.user_job_count = None
self.user_job_count_per_destination = None
https://bitbucket.org/galaxy/galaxy-central/commits/519050ab95d7/
Changeset: 519050ab95d7
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Refactor __check_if_ready_to_run - less code duplication...
... in the sense that it is no longer repeatedly grabbing same values from map, from wrapper, etc.
Affected #: 1 file
diff -r 72a0157988f1cf7e9440dc1cb3dc13ca6cdc42ba -r 519050ab95d7177abdea8d55bae3b03a3a289476 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -291,33 +291,37 @@
job can be dispatched. Otherwise, return JOB_WAIT indicating that input
datasets are still being prepared.
"""
- # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK
if not self.track_jobs_in_database:
- in_memory_job_state = self.__check_in_memory_job_state( job )
- if in_memory_job_state:
- return in_memory_job_state
- # If no problems, all inputs ready to go, continue
- # with same checks either way.
+ in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job )
+ if in_memory_not_ready_state:
+ return in_memory_not_ready_state
+
+ # Else, if tracking in the database, job.state is guaranteed to be NEW and
+ # the inputs are guaranteed to be OK.
# Create the job wrapper so that the destination can be set
- if job.id not in self.job_wrappers:
- self.job_wrappers[job.id] = self.job_wrapper( job )
+ job_id = job.id
+ job_wrapper = self.job_wrappers.get( job_id, None )
+ if not job_wrapper:
+ job_wrapper = self.job_wrapper( job )
+ self.job_wrappers[ job_id ] = job_wrapper
+
# Cause the job_destination to be set and cached by the mapper
try:
- self.job_wrappers[job.id].job_destination
+ job_destination = job_wrapper.job_destination
except Exception, e:
failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
log.exception( 'Failed to generate job destination' )
else:
log.debug( "Intentionally failing job with message (%s)" % failure_message )
- self.job_wrappers[job.id].fail( failure_message )
+ job_wrapper.fail( failure_message )
return JOB_ERROR
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more modular/pluggable
- state = self.__check_destination_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_destination_jobs( job, job_wrapper )
if state == JOB_READY:
- state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_user_jobs( job, job_wrapper )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
@@ -329,10 +333,13 @@
pass # No history, should not happen with an anon user
if state == JOB_READY:
# PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id)
+ self.increase_running_job_count(job.user_id, job_destination.id )
return state
- def __check_in_memory_job_state( self, job ):
+ def __verify_in_memory_job_inputs( self, job ):
+ """ Perform the same checks that happen via SQL for in-memory managed
+ jobs.
+ """
if job.state == model.Job.states.DELETED:
return JOB_DELETED
elif job.state == model.Job.states.ERROR:
https://bitbucket.org/galaxy/galaxy-central/commits/8bbb244d339b/
Changeset: 8bbb244d339b
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Break up __check_job_state - extract new __verify_job_ready method.
Affected #: 1 file
diff -r 519050ab95d7177abdea8d55bae3b03a3a289476 -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -306,8 +306,25 @@
job_wrapper = self.job_wrapper( job )
self.job_wrappers[ job_id ] = job_wrapper
- # Cause the job_destination to be set and cached by the mapper
+ # If state == JOB_READY, assume job_destination also set - otherwise
+ # in case of various error or cancelled states do not assume
+ # destination has been set.
+ state, job_destination = self.__verify_job_ready( job, job_wrapper )
+
+ if state == JOB_READY:
+ # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, job_destination.id )
+ return state
+
+ def __verify_job_ready( self, job, job_wrapper ):
+ """ Compute job destination and verify job is ready at that
+ destination by checking job limits and quota. If this method
+ return a job state of JOB_READY - it MUST also return a job
+ destination.
+ """
+ job_destination = None
try:
+ # Cause the job_destination to be set and cached by the mapper
job_destination = job_wrapper.job_destination
except Exception, e:
failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
@@ -316,7 +333,7 @@
else:
log.debug( "Intentionally failing job with message (%s)" % failure_message )
job_wrapper.fail( failure_message )
- return JOB_ERROR
+ return JOB_ERROR, job_destination
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more modular/pluggable
state = self.__check_destination_jobs( job, job_wrapper )
@@ -328,13 +345,10 @@
try:
usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
if usage > quota:
- return JOB_USER_OVER_QUOTA
+ return JOB_USER_OVER_QUOTA, job_destination
except AssertionError, e:
pass # No history, should not happen with an anon user
- if state == JOB_READY:
- # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id, job_destination.id )
- return state
+ return state, job_destination
def __verify_in_memory_job_inputs( self, job ):
""" Perform the same checks that happen via SQL for in-memory managed
https://bitbucket.org/galaxy/galaxy-central/commits/e1c8d892be5d/
Changeset: e1c8d892be5d
User: jmchilton
Date: 2014-06-11 07:19:20
Summary: Allow dynamic job destinations to throw new JobNotReadyException to indicate delayed decision.
Perhaps wait on some deployer defined limit or resource to become available, etc...
Affected #: 2 files
diff -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 -r e1c8d892be5de20ed4564a3e467943701dccacd8 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -13,6 +13,7 @@
from galaxy import model
from galaxy.util.sleeper import Sleeper
from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination
+from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -326,6 +327,9 @@
try:
# Cause the job_destination to be set and cached by the mapper
job_destination = job_wrapper.job_destination
+ except JobNotReadyException as e:
+ job_state = e.job_state or JOB_WAIT
+ return job_state, None
except Exception, e:
failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
diff -r 8bbb244d339b396c841341faaa9353f4a1c5e5f1 -r e1c8d892be5de20ed4564a3e467943701dccacd8 lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -16,6 +16,13 @@
self.failure_message = failure_message
+class JobNotReadyException( Exception ):
+
+ def __init__( self, job_state=None, message=None ):
+ self.job_state = job_state
+ self.message = message
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
https://bitbucket.org/galaxy/galaxy-central/commits/4ef50975607e/
Changeset: 4ef50975607e
User: jmchilton
Date: 2014-06-24 04:15:44
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #411)
Allow dynamic job destinations rules to throw `galaxy.jobs.mapper.JobNotReadyException` indicating job not ready.
Affected #: 2 files
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -13,6 +13,7 @@
from galaxy import model
from galaxy.util.sleeper import Sleeper
from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination
+from galaxy.jobs.mapper import JobNotReadyException
log = logging.getLogger( __name__ )
@@ -263,7 +264,7 @@
try:
# Check the job's dependencies, requeue if they're not done.
# Some of these states will only happen when using the in-memory job queue
- job_state = self.__check_if_ready_to_run( job )
+ job_state = self.__check_job_state( job )
if job_state == JOB_WAIT:
new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
@@ -304,7 +305,7 @@
# Done with the session
self.sa_session.remove()
- def __check_if_ready_to_run( self, job ):
+ def __check_job_state( self, job ):
"""
Check if a job is ready to run by verifying that each of its input
datasets is ready (specifically in the OK state). If any input dataset
@@ -314,62 +315,97 @@
job can be dispatched. Otherwise, return JOB_WAIT indicating that input
datasets are still being prepared.
"""
- # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK
if not self.track_jobs_in_database:
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
+ in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job )
+ if in_memory_not_ready_state:
+ return in_memory_not_ready_state
+
+ # Else, if tracking in the database, job.state is guaranteed to be NEW and
+ # the inputs are guaranteed to be OK.
+
# Create the job wrapper so that the destination can be set
- if job.id not in self.job_wrappers:
- self.job_wrappers[job.id] = self.job_wrapper( job )
- # Cause the job_destination to be set and cached by the mapper
+ job_id = job.id
+ job_wrapper = self.job_wrappers.get( job_id, None )
+ if not job_wrapper:
+ job_wrapper = self.job_wrapper( job )
+ self.job_wrappers[ job_id ] = job_wrapper
+
+ # If state == JOB_READY, assume job_destination also set - otherwise
+ # in case of various error or cancelled states do not assume
+ # destination has been set.
+ state, job_destination = self.__verify_job_ready( job, job_wrapper )
+
+ if state == JOB_READY:
+ # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, job_destination.id )
+ return state
+
+ def __verify_job_ready( self, job, job_wrapper ):
+ """ Compute job destination and verify job is ready at that
+ destination by checking job limits and quota. If this method
+ return a job state of JOB_READY - it MUST also return a job
+ destination.
+ """
+ job_destination = None
try:
- self.job_wrappers[job.id].job_destination
+ # Cause the job_destination to be set and cached by the mapper
+ job_destination = job_wrapper.job_destination
+ except JobNotReadyException as e:
+ job_state = e.job_state or JOB_WAIT
+ return job_state, None
except Exception, e:
failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
log.exception( 'Failed to generate job destination' )
else:
log.debug( "Intentionally failing job with message (%s)" % failure_message )
- self.job_wrappers[job.id].fail( failure_message )
- return JOB_ERROR
+ job_wrapper.fail( failure_message )
+ return JOB_ERROR, job_destination
# job is ready to run, check limits
# TODO: these checks should be refactored to minimize duplication and made more modular/pluggable
- state = self.__check_destination_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_destination_jobs( job, job_wrapper )
if state == JOB_READY:
- state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
+ state = self.__check_user_jobs( job, job_wrapper )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
try:
usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
if usage > quota:
- return JOB_USER_OVER_QUOTA
+ return JOB_USER_OVER_QUOTA, job_destination
except AssertionError, e:
pass # No history, should not happen with an anon user
- if state == JOB_READY:
- # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
- self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id)
- return state
+ return state, job_destination
+
+ def __verify_in_memory_job_inputs( self, job ):
+ """ Perform the same checks that happen via SQL for in-memory managed
+ jobs.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+
+ # All inputs ready to go.
+ return None
def __clear_job_count( self ):
self.user_job_count = None
diff -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -16,6 +16,13 @@
self.failure_message = failure_message
+class JobNotReadyException( Exception ):
+
+ def __init__( self, job_state=None, message=None ):
+ self.job_state = job_state
+ self.message = message
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/5eaeb25bff8f/
Changeset: 5eaeb25bff8f
User: jmchilton
Date: 2014-06-23 19:49:05
Summary: Docfix: Delete diminutive docker doc deficiency.
Affected #: 1 file
diff -r 631127bb6c2657140e75f203301abfdf567d82e5 -r 5eaeb25bff8f383e54c36bd32e17432e6cafadde job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -178,7 +178,7 @@
does trust tool's specified container - but also wants tool's not
configured to run in a container the following option can provide
a fallback. -->
- <!-- <param id="dockers_default_container_id">busybox:ubuntu-14.04</param> -->
+ <!-- <param id="docker_default_container_id">busybox:ubuntu-14.04</param> --></destination><destination id="pbs" runner="pbs" tags="mycluster"/>
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/ae53980fad82/
Changeset: ae53980fad82
User: greg
Date: 2014-06-22 14:00:57
Summary: Move 2 Tool Shed utility functions into the middleware hg class.
Affected #: 3 files
diff -r 63919647ca8424a28042ccb79ea2db6e0c68d507 -r ae53980fad820db494c67ddb5ad0ad5f9deb9dd1 lib/galaxy/webapps/tool_shed/framework/middleware/hg.py
--- a/lib/galaxy/webapps/tool_shed/framework/middleware/hg.py
+++ b/lib/galaxy/webapps/tool_shed/framework/middleware/hg.py
@@ -11,7 +11,6 @@
from galaxy.util import asbool
from galaxy.util.hash_util import new_secure_hash
-from tool_shed.util import commit_util
from tool_shed.util import hg_util
import tool_shed.repository_types.util as rt_util
@@ -54,13 +53,15 @@
connection = engine.connect()
path_info = environ[ 'PATH_INFO' ].lstrip( '/' )
user_id, repository_name = self.__get_user_id_repository_name_from_path_info( connection, path_info )
- sql_cmd = "SELECT times_downloaded FROM repository WHERE user_id = %d AND name = '%s'" % ( user_id, repository_name.lower() )
+ sql_cmd = "SELECT times_downloaded FROM repository WHERE user_id = %d AND name = '%s'" % \
+ ( user_id, repository_name.lower() )
result_set = connection.execute( sql_cmd )
for row in result_set:
# Should only be 1 row...
times_downloaded = row[ 'times_downloaded' ]
times_downloaded += 1
- sql_cmd = "UPDATE repository SET times_downloaded = %d WHERE user_id = %d AND name = '%s'" % ( times_downloaded, user_id, repository_name.lower() )
+ sql_cmd = "UPDATE repository SET times_downloaded = %d WHERE user_id = %d AND name = '%s'" % \
+ ( times_downloaded, user_id, repository_name.lower() )
connection.execute( sql_cmd )
connection.close()
elif cmd in [ 'unbundle', 'pushkey' ]:
@@ -132,7 +133,7 @@
if filename and isinstance( filename, str ):
if filename == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME:
# Make sure the any complex repository dependency definitions contain valid <repository> tags.
- is_valid, error_msg = commit_util.repository_tags_are_valid( filename, change_list )
+ is_valid, error_msg = self.repository_tags_are_valid( filename, change_list )
if not is_valid:
log.debug( error_msg )
return self.__display_exception_remotely( start_response, error_msg )
@@ -151,7 +152,7 @@
if filename and isinstance( filename, str ):
if filename == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME:
# Make sure the any complex repository dependency definitions contain valid <repository> tags.
- is_valid, error_msg = commit_util.repository_tags_are_valid( filename, change_list )
+ is_valid, error_msg = self.repository_tags_are_valid( filename, change_list )
if not is_valid:
log.debug( error_msg )
return self.__display_exception_remotely( start_response, error_msg )
@@ -174,7 +175,7 @@
rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME ]:
# We check both files since tool dependency definitions files can contain complex
# repository dependency definitions.
- is_valid, error_msg = commit_util.repository_tags_are_valid( filename, change_list )
+ is_valid, error_msg = self.repository_tags_are_valid( filename, change_list )
if not is_valid:
log.debug( error_msg )
return self.__display_exception_remotely( start_response, error_msg )
@@ -186,6 +187,54 @@
return result.wsgi_application( environ, start_response )
return self.app( environ, start_response )
+ def __authenticate( self, username, password ):
+ db_password = None
+ # Instantiate a database connection
+ engine = sqlalchemy.create_engine( self.db_url )
+ connection = engine.connect()
+ result_set = connection.execute( "select email, password from galaxy_user where username = '%s'" % username.lower() )
+ for row in result_set:
+ # Should only be 1 row...
+ db_email = row[ 'email' ]
+ db_password = row[ 'password' ]
+ connection.close()
+ if db_password:
+ # Check if password matches db_password when hashed.
+ return new_secure_hash( text_type=password ) == db_password
+ return False
+
+ def __authenticate_remote_user( self, environ, username, password ):
+ """
+ Look after a remote user and "authenticate" - upstream server should already have achieved
+ this for us, but we check that the user exists at least. Hg allow_push = must include username
+ - some versions of mercurial blow up with 500 errors.
+ """
+ db_username = None
+ ru_email = environ[ 'HTTP_REMOTE_USER' ].lower()
+ ## Instantiate a database connection...
+ engine = sqlalchemy.create_engine( self.db_url )
+ connection = engine.connect()
+ result_set = connection.execute( "select email, username, password from galaxy_user where email = '%s'" % ru_email )
+ for row in result_set:
+ # Should only be 1 row...
+ db_email = row[ 'email' ]
+ db_password = row[ 'password' ]
+ db_username = row[ 'username' ]
+ connection.close()
+ if db_username:
+ # We could check the password here except that the function galaxy.web.framework.get_or_create_remote_user()
+ # does some random generation of a password - so that no-one knows the password and only the hash is stored...
+ return db_username == username
+ return False
+
+ def __basic_authentication( self, environ, username, password ):
+ """The environ parameter is needed in basic authentication. We also check it if use_remote_user is true."""
+ if asbool( self.config.get( 'use_remote_user', False ) ):
+ assert "HTTP_REMOTE_USER" in environ, "use_remote_user is set but no HTTP_REMOTE_USER variable"
+ return self.__authenticate_remote_user( environ, username, password )
+ else:
+ return self.__authenticate( username, password )
+
def __display_exception_remotely( self, start_response, msg ):
# Display the exception to the remote user's command line.
status = "500 %s" % msg
@@ -213,49 +262,37 @@
user_id = row[ 'id' ]
return user_id, repository_name
- def __basic_authentication( self, environ, username, password ):
- """The environ parameter is needed in basic authentication. We also check it if use_remote_user is true."""
- if asbool( self.config.get( 'use_remote_user', False ) ):
- assert "HTTP_REMOTE_USER" in environ, "use_remote_user is set but no HTTP_REMOTE_USER variable"
- return self.__authenticate_remote_user( environ, username, password )
- else:
- return self.__authenticate( username, password )
-
- def __authenticate( self, username, password ):
- db_password = None
- # Instantiate a database connection
- engine = sqlalchemy.create_engine( self.db_url )
- connection = engine.connect()
- result_set = connection.execute( "select email, password from galaxy_user where username = '%s'" % username.lower() )
- for row in result_set:
- # Should only be 1 row...
- db_email = row[ 'email' ]
- db_password = row[ 'password' ]
- connection.close()
- if db_password:
- # Check if password matches db_password when hashed.
- return new_secure_hash( text_type=password ) == db_password
- return False
-
- def __authenticate_remote_user( self, environ, username, password ):
+ def repository_tag_is_valid( self, filename, line ):
"""
- Look after a remote user and "authenticate" - upstream server should already have achieved this for us, but we check that the
- user exists at least. Hg allow_push = must include username - some versions of mercurial blow up with 500 errors.
+ Checks changes made to <repository> tags in a dependency definition file being pushed to the
+ Tool Shed from the command line to ensure that all required attributes exist.
"""
- db_username = None
- ru_email = environ[ 'HTTP_REMOTE_USER' ].lower()
- ## Instantiate a database connection...
- engine = sqlalchemy.create_engine( self.db_url )
- connection = engine.connect()
- result_set = connection.execute( "select email, username, password from galaxy_user where email = '%s'" % ru_email )
- for row in result_set:
- # Should only be 1 row...
- db_email = row[ 'email' ]
- db_password = row[ 'password' ]
- db_username = row[ 'username' ]
- connection.close()
- if db_username:
- # We could check the password here except that the function galaxy.web.framework.get_or_create_remote_user() does some random generation of
- # a password - so that no-one knows the password and only the hash is stored...
- return db_username == username
- return False
+ required_attributes = [ 'toolshed', 'name', 'owner', 'changeset_revision' ]
+ defined_attributes = line.split()
+ for required_attribute in required_attributes:
+ defined = False
+ for defined_attribute in defined_attributes:
+ if defined_attribute.startswith( required_attribute ):
+ defined = True
+ break
+ if not defined:
+ error_msg = 'The %s file contains a <repository> tag that is missing the required attribute %s. ' % \
+ ( filename, required_attribute )
+ error_msg += 'Automatically populating dependency definition attributes occurs only when using '
+ error_msg += 'the Tool Shed upload utility. '
+ return False, error_msg
+ return True, ''
+
+ def repository_tags_are_valid( self, filename, change_list ):
+ """
+ Make sure the any complex repository dependency definitions contain valid <repository> tags when pushing
+ changes to the tool shed on the command line.
+ """
+ tag = '<repository'
+ for change_dict in change_list:
+ lines = get_change_lines_in_file_for_tag( tag, change_dict )
+ for line in lines:
+ is_valid, error_msg = repository_tag_is_valid( filename, line )
+ if not is_valid:
+ return False, error_msg
+ return True, ''
diff -r 63919647ca8424a28042ccb79ea2db6e0c68d507 -r ae53980fad820db494c67ddb5ad0ad5f9deb9dd1 lib/tool_shed/dependencies/dependency_manager.py
--- a/lib/tool_shed/dependencies/dependency_manager.py
+++ b/lib/tool_shed/dependencies/dependency_manager.py
@@ -177,6 +177,7 @@
new_root[ index ] = new_elem
return root_altered, new_root, error_message
+
class ToolDependencyAttributeHandler( object ):
def __init__( self, app, unpopulate ):
diff -r 63919647ca8424a28042ccb79ea2db6e0c68d507 -r ae53980fad820db494c67ddb5ad0ad5f9deb9dd1 lib/tool_shed/util/commit_util.py
--- a/lib/tool_shed/util/commit_util.py
+++ b/lib/tool_shed/util/commit_util.py
@@ -221,41 +221,6 @@
gzipped_file.close()
shutil.move( uncompressed, uploaded_file_name )
-def repository_tag_is_valid( filename, line ):
- """
- Checks changes made to <repository> tags in a dependency definition file being pushed to the
- Tool Shed from the command line to ensure that all required attributes exist.
- """
- required_attributes = [ 'toolshed', 'name', 'owner', 'changeset_revision' ]
- defined_attributes = line.split()
- for required_attribute in required_attributes:
- defined = False
- for defined_attribute in defined_attributes:
- if defined_attribute.startswith( required_attribute ):
- defined = True
- break
- if not defined:
- error_msg = 'The %s file contains a <repository> tag that is missing the required attribute %s. ' % \
- ( filename, required_attribute )
- error_msg += 'Automatically populating dependency definition attributes occurs only when using '
- error_msg += 'the Tool Shed upload utility. '
- return False, error_msg
- return True, ''
-
-def repository_tags_are_valid( filename, change_list ):
- """
- Make sure the any complex repository dependency definitions contain valid <repository> tags when pushing
- changes to the tool shed on the command line.
- """
- tag = '<repository'
- for change_dict in change_list:
- lines = get_change_lines_in_file_for_tag( tag, change_dict )
- for line in lines:
- is_valid, error_msg = repository_tag_is_valid( filename, line )
- if not is_valid:
- return False, error_msg
- return True, ''
-
def uncompress( repository, uploaded_file_name, uploaded_file_filename, isgzip=False, isbz2=False ):
if isgzip:
handle_gzip( repository, uploaded_file_name )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.