galaxy-dist commit 0518ce52d8d3: Allow jobs to be stopped when using track_jobs_in_database/enable_job_running (multiprocess config).
# HG changeset patch -- Bitbucket.org # Project galaxy-dist # URL http://bitbucket.org/galaxy/galaxy-dist/overview # User Nate Coraor <nate@bx.psu.edu> # Date 1288379862 14400 # Node ID 0518ce52d8d3ca15ec6070f2addb4bfe7608c23b # Parent a22917c97fbe89aa2c0393d30f51d2a2183fd2f8 Allow jobs to be stopped when using track_jobs_in_database/enable_job_running (multiprocess config). --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -813,6 +813,8 @@ class JobStopQueue( object ): self.sa_session = app.model.context self.dispatcher = dispatcher + self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) + # Keep track of the pid that started the job manager, only it # has valid threads self.parent_pid = os.getpid() @@ -848,21 +850,29 @@ class JobStopQueue( object ): Called repeatedly by `monitor` to stop jobs. """ # Pull all new jobs from the queue at once - jobs = [] - try: - while 1: - ( job_id, error_msg ) = self.queue.get_nowait() - if job_id is self.STOP_SIGNAL: - return - # Append to watch queue - jobs.append( ( job_id, error_msg ) ) - except Empty: - pass - - for job_id, error_msg in jobs: - job = self.sa_session.query( model.Job ).get( job_id ) - self.sa_session.refresh( job ) - # if desired, error the job so we can inform the user. + jobs_to_check = [] + if self.track_jobs_in_database: + # Clear the session so we get fresh states for job and all datasets + self.sa_session.expunge_all() + # Fetch all new jobs + newly_deleted_jobs = self.sa_session.query( model.Job ) \ + .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ + .filter( model.Job.state == model.Job.states.DELETED_NEW ).all() + for job in newly_deleted_jobs: + jobs_to_check.append( ( job, None ) ) + else: + try: + while 1: + message = self.queue.get_nowait() + if message is self.STOP_SIGNAL: + return + # Unpack the message + job_id, error_msg = message + # Get the job object and append to watch queue + jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) ) + except Empty: + pass + for job, error_msg in jobs_to_check: if error_msg is not None: job.state = job.states.ERROR job.info = error_msg @@ -870,9 +880,6 @@ class JobStopQueue( object ): job.state = job.states.DELETED self.sa_session.add( job ) self.sa_session.flush() - # if job is in JobQueue or FooJobRunner's put method, - # job_runner_name will be unset and the job will be dequeued due to - # state change above if job.job_runner_name is not None: # tell the dispatcher to stop the job self.dispatcher.stop( job ) @@ -888,7 +895,8 @@ class JobStopQueue( object ): else: log.info( "sending stop signal to worker thread" ) self.running = False - self.queue.put( ( self.STOP_SIGNAL, None ) ) + if not self.track_jobs_in_database: + self.queue.put( self.STOP_SIGNAL ) self.sleeper.wake() log.info( "job stopper stopped" ) --- a/lib/galaxy/web/controllers/root.py +++ b/lib/galaxy/web/controllers/root.py @@ -432,7 +432,8 @@ class RootController( BaseController, Us if job.state in [ self.app.model.Job.states.QUEUED, self.app.model.Job.states.RUNNING, self.app.model.Job.states.NEW ]: # Are *all* of the job's other output datasets deleted? if job.check_if_output_datasets_deleted(): - job.mark_deleted() + job.mark_deleted( self.app.config.get_bool( 'enable_job_running', True ), + self.app.config.get_bool( 'track_jobs_in_database', False ) ) self.app.job_manager.job_stop_queue.put( job.id ) trans.sa_session.flush() --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -97,7 +97,8 @@ class Job( object ): RUNNING = 'running', OK = 'ok', ERROR = 'error', - DELETED = 'deleted' ) + DELETED = 'deleted', + DELETED_NEW = 'deleted_new' ) def __init__( self ): self.session_id = None self.user_id = None @@ -152,11 +153,17 @@ class Job( object ): if not dataset.deleted: return False return True - def mark_deleted( self ): + def mark_deleted( self, enable_job_running=True, track_jobs_in_database=False ): """ Mark this job as deleted, and mark any output datasets as discarded. """ - self.state = Job.states.DELETED + # This could be handled with *just* track_jobs_in_database, but I + # didn't want to make setting track_jobs_in_database required in + # non-runner configs. + if not enable_job_running or track_jobs_in_database: + self.state = Job.states.DELETED_NEW + else: + self.state = Job.states.DELETED self.info = "Job output deleted by user before job completed." for dataset_assoc in self.output_datasets: dataset = dataset_assoc.dataset
participants (1)
-
commits-noreply@bitbucket.org