4 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/a709ca0ea728/ Changeset: a709ca0ea728 User: jmchilton Date: 2013-05-13 20:59:16 Summary: Refactor logic for setting metadata exteranlly used local job runner out into a method on the BaseJobRunner class. This improves the readibility and allows other job runners to use this logic. Rework LWR to use this new method. Overtime the lwr.py and local.py version of that code had diverged and it had never been noticed because I guess no one had been using the LWR with set_metadata_exteranlly = True, which is no longer an option. Affected #: 3 files diff -r d92704335a2774e1bde2be72f9f78ce63c0447fd -r a709ca0ea7289833b8016613a646c88bbf04cef9 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -7,6 +7,7 @@ import string import logging import threading +import subprocess from Queue import Queue, Empty @@ -235,6 +236,30 @@ log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) return output_pairs + def _handle_metadata_externally(self, job_wrapper): + """ + Set metadata externally. Used by the local and lwr job runners where this + shouldn't be attached to command-line to execute. + """ + #run the metadata setting script here + #this is terminate-able when output dataset/job is deleted + #so that long running set_meta()s can be canceled without having to reboot the server + if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: + external_metadata_script = job_wrapper.setup_external_metadata( output_fnames=job_wrapper.get_output_fnames(), + set_extension=True, + tmp_dir=job_wrapper.working_directory, + #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior + kwds={ 'overwrite' : False } ) + log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) + external_metadata_proc = subprocess.Popen( args=external_metadata_script, + shell=True, + env=os.environ, + preexec_fn=os.setpgrp ) + job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) + external_metadata_proc.wait() + log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) + + class AsynchronousJobState( object ): """ Encapsulate the state of an asynchronous job, this should be subclassed as diff -r d92704335a2774e1bde2be72f9f78ce63c0447fd -r a709ca0ea7289833b8016613a646c88bbf04cef9 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -87,23 +87,7 @@ job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return - #run the metadata setting script here - #this is terminate-able when output dataset/job is deleted - #so that long running set_meta()s can be canceled without having to reboot the server - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: - external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), - set_extension = True, - tmp_dir = job_wrapper.working_directory, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior - log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, - env = os.environ, - preexec_fn = os.setpgrp ) - job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) - external_metadata_proc.wait() - log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) - + self._handle_metadata_externally( job_wrapper ) # Finish the job! try: job_wrapper.finish( stdout, stderr, exit_code ) diff -r d92704335a2774e1bde2be72f9f78ce63c0447fd -r a709ca0ea7289833b8016613a646c88bbf04cef9 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -1,5 +1,4 @@ import logging -import subprocess from galaxy import model from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner @@ -7,6 +6,7 @@ import errno from time import sleep +import os from lwr_client import FileStager, Client, url_to_destination_params @@ -140,23 +140,8 @@ job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return - #run the metadata setting script here - #this is terminate-able when output dataset/job is deleted - #so that long running set_meta()s can be canceled without having to reboot the server - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: - external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), - set_extension = True, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior - log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, - env = os.environ, - preexec_fn = os.setpgrp ) - job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) - external_metadata_proc.wait() - log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) - - # Finish the job + self._handle_metadata_externally( job_wrapper ) + # Finish the job try: job_wrapper.finish( stdout, stderr ) except: https://bitbucket.org/galaxy/galaxy-central/commits/e456392f0557/ Changeset: e456392f0557 User: jmchilton Date: 2013-05-16 06:37:16 Summary: Implement queued state for LWR jobs - previously all queued and running LWR jobs were marked as running in the Galaxy. Affected #: 2 files diff -r a709ca0ea7289833b8016613a646c88bbf04cef9 -r e456392f0557079488a901a2c0d90459919aa9ea lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -34,15 +34,18 @@ def check_watched_item(self, job_state): try: client = self.get_client_from_state(job_state) - complete = client.check_complete() + status = client.get_status() except Exception: # An orphaned job was put into the queue at app startup, so remote server went down # either way we are done I guess. self.mark_as_finished(job_state) return None - if complete: + if status == "complete": self.mark_as_finished(job_state) return None + if status == "running" and not job_state.running: + job_state.running = True + job_state.job_wrapper.change_state( model.Job.states.RUNNING ) return job_state def queue_job(self, job_wrapper): @@ -81,7 +84,7 @@ job_id = file_stager.job_id client.launch( rebuilt_command_line ) job_wrapper.set_job_destination( job_destination, job_id ) - job_wrapper.change_state( model.Job.states.RUNNING ) + job_wrapper.change_state( model.Job.states.QUEUED ) except Exception, exc: job_wrapper.fail( "failure running job", exception=True ) @@ -92,7 +95,7 @@ lwr_job_state.job_wrapper = job_wrapper lwr_job_state.job_id = job_id lwr_job_state.old_state = True - lwr_job_state.running = True + lwr_job_state.running = False lwr_job_state.job_destination = job_destination self.monitor_job(lwr_job_state) @@ -207,12 +210,8 @@ job_state.job_destination = job_wrapper.job_destination job_wrapper.command_line = job.get_command_line() job_state.job_wrapper = job_wrapper - if job.get_state() == model.Job.states.RUNNING: + if job.get_state() in [model.Job.states.RUNNING, model.Job.states.QUEUED]: log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) ) job_state.old_state = True job_state.running = True self.monitor_queue.put( job_state ) - elif job.get_state() == model.Job.states.QUEUED: - # LWR doesn't queue currently, so this indicates galaxy was shutoff while - # job was being staged. Not sure how to recover from that. - job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted. Please retry the job." ) diff -r a709ca0ea7289833b8016613a646c88bbf04cef9 -r e456392f0557079488a901a2c0d90459919aa9ea lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -210,11 +210,25 @@ check_complete_response = self.__raw_execute("check_complete", {"job_id": self.job_id}) return check_complete_response - def check_complete(self): + def check_complete(self, response=None): """ Return boolean indicating whether the job is complete. """ - return self.raw_check_complete()["complete"] == "true" + if response == None: + response = self.raw_check_complete() + return response["complete"] == "true" + + def get_status(self): + check_complete_response = self.raw_check_complete() + # Older LWR instances won't set status so use 'complete', at some + # point drop backward compatibility. + complete = self.check_complete(check_complete_response) + old_status = "complete" if complete else "running" + status = check_complete_response.get("status", old_status) + # Bug in certains older LWR instances returned literal "status". + if status not in ["complete", "running", "queued"]: + status = old_status + return status def clean(self): """ https://bitbucket.org/galaxy/galaxy-central/commits/2ffd8c25e9ce/ Changeset: 2ffd8c25e9ce User: jmchilton Date: 2013-05-16 06:50:19 Summary: Fix bug that was preventing LWR jobs from entering the RUNNING state post job "recover" on Galaxy startup. Affected #: 1 file diff -r e456392f0557079488a901a2c0d90459919aa9ea -r 2ffd8c25e9cea2c0f3b7e36373eeb5cac27357d2 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -210,8 +210,9 @@ job_state.job_destination = job_wrapper.job_destination job_wrapper.command_line = job.get_command_line() job_state.job_wrapper = job_wrapper - if job.get_state() in [model.Job.states.RUNNING, model.Job.states.QUEUED]: + state = job.get_state() + if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) ) job_state.old_state = True - job_state.running = True + job_state.running = state == model.Job.states.RUNNING self.monitor_queue.put( job_state ) https://bitbucket.org/galaxy/galaxy-central/commits/ae1fb20222cc/ Changeset: ae1fb20222cc User: natefoo Date: 2013-05-20 21:00:11 Summary: Merged in jmchilton/galaxy-central-lwr (pull request #166) LWR fix for setting metadata externally. Affected #: 4 files diff -r 64c6e3dfaa2bd54f344026752f4ab7ae008a5094 -r ae1fb20222cc7d075009b59295e247c00616bc15 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -7,6 +7,7 @@ import string import logging import threading +import subprocess from Queue import Queue, Empty @@ -236,6 +237,30 @@ log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) return output_pairs + def _handle_metadata_externally(self, job_wrapper): + """ + Set metadata externally. Used by the local and lwr job runners where this + shouldn't be attached to command-line to execute. + """ + #run the metadata setting script here + #this is terminate-able when output dataset/job is deleted + #so that long running set_meta()s can be canceled without having to reboot the server + if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: + external_metadata_script = job_wrapper.setup_external_metadata( output_fnames=job_wrapper.get_output_fnames(), + set_extension=True, + tmp_dir=job_wrapper.working_directory, + #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior + kwds={ 'overwrite' : False } ) + log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) + external_metadata_proc = subprocess.Popen( args=external_metadata_script, + shell=True, + env=os.environ, + preexec_fn=os.setpgrp ) + job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) + external_metadata_proc.wait() + log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) + + class AsynchronousJobState( object ): """ Encapsulate the state of an asynchronous job, this should be subclassed as diff -r 64c6e3dfaa2bd54f344026752f4ab7ae008a5094 -r ae1fb20222cc7d075009b59295e247c00616bc15 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -88,23 +88,7 @@ job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return - #run the metadata setting script here - #this is terminate-able when output dataset/job is deleted - #so that long running set_meta()s can be canceled without having to reboot the server - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: - external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), - set_extension = True, - tmp_dir = job_wrapper.working_directory, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior - log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, - env = os.environ, - preexec_fn = os.setpgrp ) - job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) - external_metadata_proc.wait() - log.debug( 'execution of external set_meta for job %d finished' % job_wrapper.job_id ) - + self._handle_metadata_externally( job_wrapper ) # Finish the job! try: job_wrapper.finish( stdout, stderr, exit_code ) diff -r 64c6e3dfaa2bd54f344026752f4ab7ae008a5094 -r ae1fb20222cc7d075009b59295e247c00616bc15 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -1,5 +1,4 @@ import logging -import subprocess from galaxy import model from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner @@ -7,6 +6,7 @@ import errno from time import sleep +import os from lwr_client import FileStager, Client, url_to_destination_params @@ -34,15 +34,18 @@ def check_watched_item(self, job_state): try: client = self.get_client_from_state(job_state) - complete = client.check_complete() + status = client.get_status() except Exception: # An orphaned job was put into the queue at app startup, so remote server went down # either way we are done I guess. self.mark_as_finished(job_state) return None - if complete: + if status == "complete": self.mark_as_finished(job_state) return None + if status == "running" and not job_state.running: + job_state.running = True + job_state.job_wrapper.change_state( model.Job.states.RUNNING ) return job_state def queue_job(self, job_wrapper): @@ -81,7 +84,7 @@ job_id = file_stager.job_id client.launch( rebuilt_command_line ) job_wrapper.set_job_destination( job_destination, job_id ) - job_wrapper.change_state( model.Job.states.RUNNING ) + job_wrapper.change_state( model.Job.states.QUEUED ) except Exception, exc: job_wrapper.fail( "failure running job", exception=True ) @@ -92,7 +95,7 @@ lwr_job_state.job_wrapper = job_wrapper lwr_job_state.job_id = job_id lwr_job_state.old_state = True - lwr_job_state.running = True + lwr_job_state.running = False lwr_job_state.job_destination = job_destination self.monitor_job(lwr_job_state) @@ -140,23 +143,8 @@ job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return - #run the metadata setting script here - #this is terminate-able when output dataset/job is deleted - #so that long running set_meta()s can be canceled without having to reboot the server - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] and job_wrapper.output_paths: - external_metadata_script = job_wrapper.setup_external_metadata( output_fnames = job_wrapper.get_output_fnames(), - set_extension = True, - kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior - log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, - env = os.environ, - preexec_fn = os.setpgrp ) - job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) - external_metadata_proc.wait() - log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) - - # Finish the job + self._handle_metadata_externally( job_wrapper ) + # Finish the job try: job_wrapper.finish( stdout, stderr ) except: @@ -222,12 +210,9 @@ job_state.job_destination = job_wrapper.job_destination job_wrapper.command_line = job.get_command_line() job_state.job_wrapper = job_wrapper - if job.get_state() == model.Job.states.RUNNING: + state = job.get_state() + if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) ) job_state.old_state = True - job_state.running = True + job_state.running = state == model.Job.states.RUNNING self.monitor_queue.put( job_state ) - elif job.get_state() == model.Job.states.QUEUED: - # LWR doesn't queue currently, so this indicates galaxy was shutoff while - # job was being staged. Not sure how to recover from that. - job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted. Please retry the job." ) diff -r 64c6e3dfaa2bd54f344026752f4ab7ae008a5094 -r ae1fb20222cc7d075009b59295e247c00616bc15 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -210,11 +210,25 @@ check_complete_response = self.__raw_execute("check_complete", {"job_id": self.job_id}) return check_complete_response - def check_complete(self): + def check_complete(self, response=None): """ Return boolean indicating whether the job is complete. """ - return self.raw_check_complete()["complete"] == "true" + if response == None: + response = self.raw_check_complete() + return response["complete"] == "true" + + def get_status(self): + check_complete_response = self.raw_check_complete() + # Older LWR instances won't set status so use 'complete', at some + # point drop backward compatibility. + complete = self.check_complete(check_complete_response) + old_status = "complete" if complete else "running" + status = check_complete_response.get("status", old_status) + # Bug in certains older LWR instances returned literal "status". + if status not in ["complete", "running", "queued"]: + status = old_status + return status def clean(self): """ Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.