11 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/611bc9467b22/ changeset: 611bc9467b22 user: jmchilton date: 2013-01-11 17:53:05 summary: Implement new job runner super class ClusterJobRunner intended to reduce amount of duplicated code between drmaa, pbs, and lwr job runners (also I guess cli and condor classes seem like they could benefit from this as well). This super class will manage the monitor and worker threads and queues. I am submitting only changes to the LWR that use this class, but I would encourage the Galaxy team to refactor the drmaa and pbs runners to use this class as well (or I would be happy to make these changes if given access or a promise the changes will be accepted quickly). A variant of the drmaa runner that has been refactored to use this class can be found here: https://bitbucket.org/jmchilton/galaxy-central-lwr-enhancement-1/src/tip/lib... from the now defunct pull request 80. affected #: 1 file diff -r 3faa833c15b5162db69f061bdbd4e951ba2ffdc7 -r 611bc9467b222bb1cf0b1ef5d5b6120f210a471f lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -1,5 +1,10 @@ import os, logging, os.path +from galaxy import model +from Queue import Queue, Empty +import time +import threading + log = logging.getLogger( __name__ ) class BaseJobRunner( object ): @@ -90,3 +95,158 @@ set_extension = False, kwds = { 'overwrite' : False } ) return commands + +class ClusterJobState( object ): + """ + Encapsulate the state of a cluster job, this should be subclassed as + needed for various job runners to capture additional information needed + to communicate with cluster job manager. + """ + + def __init__( self ): + self.job_wrapper = None + self.job_id = None + self.old_state = None + self.running = False + self.runner_url = None + +STOP_SIGNAL = object() + +JOB_STATUS_QUEUED = 'queue' +JOB_STATUS_FAILED = 'fail' +JOB_STATUS_FINISHED = 'finish' + +class ClusterJobRunner( BaseJobRunner ): + """ + Not sure this is the best name for this class, but there is common code + shared between sge, pbs, drmaa, etc... + """ + + def __init__( self, app ): + self.app = app + self.sa_session = app.model.context + # 'watched' and 'queue' are both used to keep track of jobs to watch. + # 'queue' is used to add new watched jobs, and can be called from + # any thread (usually by the 'queue_job' method). 'watched' must only + # be modified by the monitor thread, which will move items from 'queue' + # to 'watched' and then manage the watched jobs. + self.watched = [] + self.monitor_queue = Queue() + + def _init_monitor_thread(self): + self.monitor_thread = threading.Thread( name="%s.monitor_thread" % self.runner_name, target=self.monitor ) + self.monitor_thread.setDaemon( True ) + self.monitor_thread.start() + + def _init_worker_threads(self): + self.work_queue = Queue() + self.work_threads = [] + nworkers = self.app.config.cluster_job_queue_workers + for i in range( nworkers ): + worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + + def handle_stop(self): + # DRMAA and SGE runners should override this and disconnect. + pass + + def monitor( self ): + """ + Watches jobs currently in the cluster queue and deals with state changes + (queued to running) and job completion + """ + while 1: + # Take any new watched jobs and put them on the monitor list + try: + while 1: + cluster_job_state = self.monitor_queue.get_nowait() + if cluster_job_state is STOP_SIGNAL: + # TODO: This is where any cleanup would occur + self.handle_stop() + return + self.watched.append( cluster_job_state ) + except Empty: + pass + # Iterate over the list of watched jobs and check state + self.check_watched_items() + # Sleep a bit before the next state check + time.sleep( 1 ) + + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is STOP_SIGNAL: + return + try: + if op == JOB_STATUS_QUEUED: + # If the next item is to be run, then only run it if the + # job state is "queued". Otherwise the next item was either + # cancelled or one of its siblings encountered an error. + job_state = obj.get_state() + if model.Job.states.QUEUED == job_state: + self.queue_job( obj ) + else: + log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) ) + elif op == JOB_STATUS_FINISHED: + self.finish_job( obj ) + elif op == JOB_STATUS_FAILED: + self.fail_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + + def monitor_job(self, job_state): + self.monitor_queue.put( job_state ) + + def put( self, job_wrapper ): + """Add a job to the queue (by job identifier)""" + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.mark_as_queued(job_wrapper) + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads" ) + self.monitor_queue.put( STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( STOP_SIGNAL, None ) ) + + def check_watched_items(self): + """ + This method is responsible for iterating over self.watched and handling + state changes and updating self.watched with a new list of watched job + states. Subclasses can opt to override this directly (as older job runners will + initially) or just override check_watched_item and allow the list processing to + reuse the logic here. + """ + new_watched = [] + for cluster_job_state in self.watched: + new_cluster_job_state = self.check_watched_item(cluster_job_state) + if new_cluster_job_state: + new_watched.append(new_cluster_job_state) + self.watched = new_watched + + # Subclasses should implement this unless they override check_watched_items all together. + def check_watched_item(self): + raise NotImplementedError() + + def queue_job(self, job_wrapper): + raise NotImplementedError() + + def finish_job(self, job_state): + raise NotImplementedError() + + def fail_job(self, job_state): + raise NotImplementedError() + + def mark_as_finished(self, job_state): + self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) ) + + def mark_as_failed(self, job_state): + self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) ) + + def mark_as_queued(self, job_wrapper): + self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) ) https://bitbucket.org/galaxy/galaxy-central/commits/57d33740aa70/ changeset: 57d33740aa70 user: jmchilton date: 2013-01-11 17:53:05 summary: Refactor the LWRJobRunner to be a ClusterJobRunner and implement a recover method for this runner, the upshot of this is that LWR jobs can now survive Galaxy restarts. Downside is that jobs are no longer queued on Galaxy server, so LWR server should be updated (to changeset 5213f6d or newer) to queue jobs on the remote server. This is not manidatory however, this will still work it is just more jobs may run simultaneously than is desired. affected #: 1 file diff -r 611bc9467b222bb1cf0b1ef5d5b6120f210a471f -r 57d33740aa70c18c0fa082c0e2ef40042edb62e4 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -7,7 +7,7 @@ from galaxy import model from galaxy.datatypes.data import nice_size -from galaxy.jobs.runners import BaseJobRunner +from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner import os, errno from time import sleep @@ -199,12 +199,18 @@ def wait(self): """ """ while True: - check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id }) - complete = check_complete_response["complete"] == "true" + complete = self.check_complete() if complete: return check_complete_response time.sleep(1) + def raw_check_complete(self): + check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id }) + return check_complete_response + + def check_complete(self): + return self.raw_check_complete()["complete"] == "true" + def clean(self): self.__raw_execute("clean", { "job_id" : self.job_id }) @@ -213,51 +219,34 @@ -class LwrJobRunner( BaseJobRunner ): +class LwrJobRunner( ClusterJobRunner ): """ - Lwr Job Runner + LWR Job Runner """ - STOP_SIGNAL = object() + runner_name = "LWRRunner" + def __init__( self, app ): - """Start the job runner with 'nworkers' worker threads""" - self.app = app - self.sa_session = app.model.context + """Start the job runner """ + super( LwrJobRunner, self ).__init__( app ) + self._init_monitor_thread() + log.info( "starting LWR workers" ) + self._init_worker_threads() - # start workers - self.queue = Queue() - self.threads = [] - nworkers = app.config.local_job_queue_workers - log.info( "starting workers" ) - for i in range( nworkers ): - worker = threading.Thread( ( name="LwrJobRunner.thread-%d" % i ), target=self.run_next ) - worker.setDaemon( True ) - worker.start() - self.threads.append( worker ) - log.debug( "%d workers ready", nworkers ) + def check_watched_item(self, job_state): + try: + client = self.get_client_from_state(job_state) + complete = client.check_complete() + except Exception: + # An orphaned job was put into the queue at app startup, so remote server went down + # either way we are done I guess. + self.mark_as_finished(job_state) + return None + if complete: + self.mark_as_finished(job_state) + return None + return job_state - def run_next( self ): - """Run the next job, waiting until one is available if neccesary""" - while 1: - job_wrapper = self.queue.get() - if job_wrapper is self.STOP_SIGNAL: - return - try: - self.run_job( job_wrapper ) - except: - log.exception( "Uncaught exception running job" ) - - def determine_lwr_url(self, url): - lwr_url = url[ len( 'lwr://' ) : ] - return lwr_url - - def get_client_from_wrapper(self, job_wrapper): - return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id ) - - def get_client(self, job_runner, job_id): - lwr_url = self.determine_lwr_url( job_runner ) - return Client(lwr_url, job_id) - - def run_job( self, job_wrapper ): + def queue_job(self, job_wrapper): stderr = stdout = command_line = '' runner_url = job_wrapper.get_job_runner_url() @@ -277,35 +266,76 @@ return # If we were able to get a command line, run the job - if command_line: - try: - #log.debug( 'executing: %s' % command_line ) - client = self.get_client_from_wrapper(job_wrapper) - output_fnames = job_wrapper.get_output_fnames() - output_files = [ str( o ) for o in output_fnames ] - input_files = job_wrapper.get_input_fnames() - file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir) - rebuilt_command_line = file_stager.get_rewritten_command_line() - client.launch( rebuilt_command_line ) + if not command_line: + job_wrapper.finish( '', '' ) + return - job_wrapper.set_runner( runner_url, job_wrapper.job_id ) - job_wrapper.change_state( model.Job.states.RUNNING ) + try: + #log.debug( 'executing: %s' % command_line ) + client = self.get_client_from_wrapper(job_wrapper) + output_files = self.get_output_files(job_wrapper) + input_files = job_wrapper.get_input_fnames() + file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir) + rebuilt_command_line = file_stager.get_rewritten_command_line() + client.launch( rebuilt_command_line ) + job_wrapper.set_runner( runner_url, job_wrapper.job_id ) + job_wrapper.change_state( model.Job.states.RUNNING ) - run_results = client.wait() - log.debug('run_results %s' % run_results ) - stdout = run_results['stdout'] - stderr = run_results['stderr'] + except Exception, exc: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return - - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: - for output_file in output_files: - client.download_output(output_file) - client.clean() - log.debug('execution finished: %s' % command_line) - except Exception, exc: - job_wrapper.fail( "failure running job", exception=True ) - log.exception("failure running job %d" % job_wrapper.job_id) - return + lwr_job_state = ClusterJobState() + lwr_job_state.job_wrapper = job_wrapper + lwr_job_state.job_id = job_wrapper.job_id + lwr_job_state.old_state = True + lwr_job_state.running = True + lwr_job_state.runner_url = runner_url + self.monitor_job(lwr_job_state) + + def get_output_files(self, job_wrapper): + output_fnames = job_wrapper.get_output_fnames() + return [ str( o ) for o in output_fnames ] + + + def determine_lwr_url(self, url): + lwr_url = url[ len( 'lwr://' ) : ] + return lwr_url + + def get_client_from_wrapper(self, job_wrapper): + return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id ) + + def get_client_from_state(self, job_state): + job_runner = job_state.runner_url + job_id = job_state.job_id + return self.get_client(job_runner, job_id) + + def get_client(self, job_runner, job_id): + lwr_url = self.determine_lwr_url( job_runner ) + return Client(lwr_url, job_id) + + def finish_job( self, job_state ): + stderr = stdout = command_line = '' + job_wrapper = job_state.job_wrapper + try: + client = self.get_client_from_state(job_state) + + run_results = client.raw_check_complete() + log.debug('run_results %s' % run_results ) + stdout = run_results['stdout'] + stderr = run_results['stderr'] + + if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: + output_files = self.get_output_files(job_wrapper) + for output_file in output_files: + client.download_output(output_file) + client.clean() + log.debug('execution finished: %s' % command_line) + except Exception, exc: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return #run the metadata setting script here #this is terminate-able when output dataset/job is deleted #so that long running set_meta()s can be canceled without having to reboot the server @@ -321,7 +351,7 @@ job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) external_metadata_proc.wait() log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) - + # Finish the job try: job_wrapper.finish( stdout, stderr ) @@ -329,12 +359,13 @@ log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) - def put( self, job_wrapper ): - """Add a job to the queue (by job identifier)""" - # Change to queued state before handing to worker thread so the runner won't pick it up again - job_wrapper.change_state( model.Job.states.QUEUED ) - self.queue.put( job_wrapper ) - + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( job_state.fail_message ) + def shutdown( self ): """Attempts to gracefully shut down the worker threads""" log.info( "sending stop signal to worker threads" ) @@ -383,7 +414,21 @@ log.debug("Attempt remote lwr kill of job with url %s and id %s" % (lwr_url, job_id)) client = self.get_client(lwr_url, job_id) client.kill() + + def recover( self, job, job_wrapper ): - # local jobs can't be recovered - job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) - + """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_state = ClusterJobState() + job_state.job_id = str( job.get_job_runner_external_id() ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_wrapper.command_line = job.get_command_line() + job_state.job_wrapper = job_wrapper + if job.get_state() == model.Job.states.RUNNING: + log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) ) + job_state.old_state = True + job_state.running = True + self.monitor_queue.put( job_state ) + elif job.get_state() == model.Job.states.QUEUED: + # LWR doesn't queue currently, so this indicates galaxy was shutoff while + # job was being staged. Not sure how to recover from that. + job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted. Please retry the job." ) https://bitbucket.org/galaxy/galaxy-central/commits/0a2976ff3b73/ changeset: 0a2976ff3b73 user: jmchilton date: 2013-01-11 17:53:05 summary: Allow execution of jobs created by task splitting via the LWR job runner. affected #: 1 file diff -r 57d33740aa70c18c0fa082c0e2ef40042edb62e4 -r 0a2976ff3b73d5130674c1e1019e3425b16d1fc0 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -27,13 +27,14 @@ class FileStager(object): - def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir): + def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory): self.client = client self.command_line = command_line self.config_files = config_files self.input_files = input_files self.output_files = output_files self.tool_dir = os.path.abspath(tool_dir) + self.working_directory = working_directory self.file_renames = {} @@ -46,7 +47,9 @@ self.__initialize_referenced_tool_files() self.__upload_tool_files() self.__upload_input_files() + self.__upload_working_directory_files() self.__initialize_output_file_renames() + self.__initialize_task_output_file_renames() self.__initialize_config_file_renames() self.__rewrite_and_upload_config_files() self.__rewrite_command_line() @@ -69,13 +72,27 @@ for input_file in self.input_files: input_upload_response = self.client.upload_input(input_file) self.file_renames[input_file] = input_upload_response['path'] - + + def __upload_working_directory_files(self): + # Task manager stages files into working directory, these need to be uploaded + for working_directory_file in os.listdir(self.working_directory): + path = os.path.join(self.working_directory, working_directory_file) + working_file_response = self.client.upload_working_directory_file(path) + self.file_renames[path] = working_file_response['path'] + def __initialize_output_file_renames(self): for output_file in self.output_files: self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, self.remote_path_separator, os.path.basename(output_file)) + def __initialize_task_output_file_renames(self): + for output_file in self.output_files: + name = os.path.basename(output_file) + self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + name) + def __initialize_config_file_renames(self): for config_file in self.config_files: self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, @@ -172,13 +189,27 @@ def upload_config_file(self, path, contents): return self.__upload_contents("upload_config_file", path, contents) - - def download_output(self, path): + + def upload_working_directory_file(self, path): + return self.__upload_file("upload_working_directory_file", path) + + def _get_output_type(self, name): + return self.__raw_execute_and_parse('get_output_type', {'name': name, + 'job_id': self.job_id}) + + def download_output(self, path, working_directory): """ """ name = os.path.basename(path) - response = self.__raw_execute('download_output', {'name' : name, - "job_id" : self.job_id}) - output = open(path, 'wb') + output_type = self._get_output_type(name) + response = self.__raw_execute('download_output', {'name' : name, + "job_id" : self.job_id, + 'output_type': output_type}) + if output_type == 'direct': + output = open(path, 'wb') + elif output_type == 'task': + output = open(os.path.join(working_directory, name), 'wb') + else: + raise Exception("No remote output found for dataset with path %s" % path) try: while True: buffer = response.read(1024) @@ -254,7 +285,7 @@ try: job_wrapper.prepare() if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: - for cmd in job_wrapper.prepare_input_file_cmds: # run the commands to stage the input files + for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files #log.debug( 'executing: %s' % cmd ) if 0 != os.system(cmd): raise Exception('Error running file staging command: %s' % cmd) @@ -275,7 +306,8 @@ client = self.get_client_from_wrapper(job_wrapper) output_files = self.get_output_files(job_wrapper) input_files = job_wrapper.get_input_fnames() - file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir) + working_directory = job_wrapper.working_directory + file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory) rebuilt_command_line = file_stager.get_rewritten_command_line() client.launch( rebuilt_command_line ) job_wrapper.set_runner( runner_url, job_wrapper.job_id ) @@ -304,7 +336,10 @@ return lwr_url def get_client_from_wrapper(self, job_wrapper): - return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id ) + job_id = job_wrapper.job_id + if hasattr(job_wrapper, 'task_id'): + job_id = "%s_%s" % (job_id, job_wrapper.task_id) + return self.get_client( job_wrapper.get_job_runner_url(), job_id ) def get_client_from_state(self, job_state): job_runner = job_state.runner_url @@ -329,7 +364,7 @@ if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: output_files = self.get_output_files(job_wrapper) for output_file in output_files: - client.download_output(output_file) + client.download_output(output_file, working_directory=job_wrapper.working_directory) client.clean() log.debug('execution finished: %s' % command_line) except Exception, exc: @@ -386,8 +421,9 @@ def stop_job( self, job ): #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished - if job.external_output_metadata: - pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + job_ext_output_metadata = job.get_external_output_metadata() + if job_ext_output_metadata: + pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them if pid in [ None, '' ]: log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) return https://bitbucket.org/galaxy/galaxy-central/commits/ed3d477d056f/ changeset: ed3d477d056f user: jmchilton date: 2013-01-11 17:53:05 summary: Extend LWR job runner to stage an input's extra_files_path (if present). affected #: 1 file diff -r 0a2976ff3b73d5130674c1e1019e3425b16d1fc0 -r ed3d477d056fd77a207e61ae7ee7ed9cc2c1e9b2 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -72,6 +72,16 @@ for input_file in self.input_files: input_upload_response = self.client.upload_input(input_file) self.file_renames[input_file] = input_upload_response['path'] + # TODO: Determine if this is object store safe and what needs to be + # done if it is not. + files_path = "%s_files" % input_file[0:-len(".dat")] + if os.path.exists(files_path): + for extra_file in os.listdir(files_path): + extra_file_path = os.path.join(files_path, extra_file) + relative_path = os.path.basename(files_path) + extra_file_relative_path = os.path.join(relative_path, extra_file) + response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) + self.file_renames[extra_file_path] = response['path'] def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be uploaded @@ -167,17 +177,18 @@ response = self.__raw_execute(command, args, data) return simplejson.loads(response.read()) - def __upload_file(self, action, path, contents = None): + def __upload_file(self, action, path, name=None, contents = None): """ """ input = open(path, 'rb') try: mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ) - return self.__upload_contents(action, path, mmapped_input) + return self.__upload_contents(action, path, mmapped_input, name) finally: input.close() - def __upload_contents(self, action, path, contents): - name = os.path.basename(path) + def __upload_contents(self, action, path, contents, name=None): + if not name: + name = os.path.basename(path) args = {"job_id" : self.job_id, "name" : name} return self.__raw_execute_and_parse(action, args, contents) @@ -187,6 +198,9 @@ def upload_input(self, path): return self.__upload_file("upload_input", path) + def upload_extra_input(self, path, relative_name): + return self.__upload_file("upload_extra_input", path, name=relative_name) + def upload_config_file(self, path, contents): return self.__upload_contents("upload_config_file", path, contents) https://bitbucket.org/galaxy/galaxy-central/commits/6fa106f2678d/ changeset: 6fa106f2678d user: jmchilton date: 2013-01-11 17:53:05 summary: Refactor much of the lwr client code out into its own module. This will make it easier to keep content insync with client code from lwr source. affected #: 2 files diff -r ed3d477d056fd77a207e61ae7ee7ed9cc2c1e9b2 -r 6fa106f2678da0e0af8d8c25e90928817254c6f5 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -1,268 +1,18 @@ import logging import subprocess -from Queue import Queue -import threading - -import re from galaxy import model -from galaxy.datatypes.data import nice_size from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner -import os, errno +import errno from time import sleep +from lwr_client import FileStager, Client + log = logging.getLogger( __name__ ) __all__ = [ 'LwrJobRunner' ] -import urllib -import urllib2 -import httplib -import mmap -import tempfile -import time - -import simplejson - -class FileStager(object): - - def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory): - self.client = client - self.command_line = command_line - self.config_files = config_files - self.input_files = input_files - self.output_files = output_files - self.tool_dir = os.path.abspath(tool_dir) - self.working_directory = working_directory - - self.file_renames = {} - - job_config = client.setup() - - self.new_working_directory = job_config['working_directory'] - self.new_outputs_directory = job_config['outputs_directory'] - self.remote_path_separator = job_config['path_separator'] - - self.__initialize_referenced_tool_files() - self.__upload_tool_files() - self.__upload_input_files() - self.__upload_working_directory_files() - self.__initialize_output_file_renames() - self.__initialize_task_output_file_renames() - self.__initialize_config_file_renames() - self.__rewrite_and_upload_config_files() - self.__rewrite_command_line() - - def __initialize_referenced_tool_files(self): - pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep) - referenced_tool_files = [] - referenced_tool_files += re.findall(pattern, self.command_line) - if self.config_files != None: - for config_file in self.config_files: - referenced_tool_files += re.findall(pattern, self.__read(config_file)) - self.referenced_tool_files = referenced_tool_files - - def __upload_tool_files(self): - for referenced_tool_file in self.referenced_tool_files: - tool_upload_response = self.client.upload_tool_file(referenced_tool_file) - self.file_renames[referenced_tool_file] = tool_upload_response['path'] - - def __upload_input_files(self): - for input_file in self.input_files: - input_upload_response = self.client.upload_input(input_file) - self.file_renames[input_file] = input_upload_response['path'] - # TODO: Determine if this is object store safe and what needs to be - # done if it is not. - files_path = "%s_files" % input_file[0:-len(".dat")] - if os.path.exists(files_path): - for extra_file in os.listdir(files_path): - extra_file_path = os.path.join(files_path, extra_file) - relative_path = os.path.basename(files_path) - extra_file_relative_path = os.path.join(relative_path, extra_file) - response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) - self.file_renames[extra_file_path] = response['path'] - - def __upload_working_directory_files(self): - # Task manager stages files into working directory, these need to be uploaded - for working_directory_file in os.listdir(self.working_directory): - path = os.path.join(self.working_directory, working_directory_file) - working_file_response = self.client.upload_working_directory_file(path) - self.file_renames[path] = working_file_response['path'] - - def __initialize_output_file_renames(self): - for output_file in self.output_files: - self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, - self.remote_path_separator, - os.path.basename(output_file)) - - def __initialize_task_output_file_renames(self): - for output_file in self.output_files: - name = os.path.basename(output_file) - self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory, - self.remote_path_separator, - name) - - def __initialize_config_file_renames(self): - for config_file in self.config_files: - self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, - self.remote_path_separator, - os.path.basename(config_file)) - - def __rewrite_paths(self, contents): - new_contents = contents - for local_path, remote_path in self.file_renames.iteritems(): - new_contents = new_contents.replace(local_path, remote_path) - return new_contents - - def __rewrite_and_upload_config_files(self): - for config_file in self.config_files: - config_contents = self.__read(config_file) - new_config_contents = self.__rewrite_paths(config_contents) - self.client.upload_config_file(config_file, new_config_contents) - - def __rewrite_command_line(self): - self.rewritten_command_line = self.__rewrite_paths(self.command_line) - - def get_rewritten_command_line(self): - return self.rewritten_command_line - - def __read(self, path): - input = open(path, "r") - try: - return input.read() - finally: - input.close() - - - -class Client(object): - """ - """ - """ - """ - def __init__(self, remote_host, job_id, private_key=None): - if not remote_host.endswith("/"): - remote_host = remote_host + "/" - ## If we don't have an explicit private_key defined, check for - ## one embedded in the URL. A URL of the form - ## https://moo@cow:8913 will try to contact https://cow:8913 - ## with a private key of moo - private_key_format = "https?://(.*)@.*/?" - private_key_match= re.match(private_key_format, remote_host) - if not private_key and private_key_match: - private_key = private_key_match.group(1) - remote_host = remote_host.replace("%s@" % private_key, '', 1) - self.remote_host = remote_host - self.job_id = job_id - self.private_key = private_key - - def url_open(self, request, data): - return urllib2.urlopen(request, data) - - def __build_url(self, command, args): - if self.private_key: - args["private_key"] = self.private_key - data = urllib.urlencode(args) - url = self.remote_host + command + "?" + data - return url - - def __raw_execute(self, command, args = {}, data = None): - url = self.__build_url(command, args) - request = urllib2.Request(url=url, data=data) - response = self.url_open(request, data) - return response - - def __raw_execute_and_parse(self, command, args = {}, data = None): - response = self.__raw_execute(command, args, data) - return simplejson.loads(response.read()) - - def __upload_file(self, action, path, name=None, contents = None): - """ """ - input = open(path, 'rb') - try: - mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ) - return self.__upload_contents(action, path, mmapped_input, name) - finally: - input.close() - - def __upload_contents(self, action, path, contents, name=None): - if not name: - name = os.path.basename(path) - args = {"job_id" : self.job_id, "name" : name} - return self.__raw_execute_and_parse(action, args, contents) - - def upload_tool_file(self, path): - return self.__upload_file("upload_tool_file", path) - - def upload_input(self, path): - return self.__upload_file("upload_input", path) - - def upload_extra_input(self, path, relative_name): - return self.__upload_file("upload_extra_input", path, name=relative_name) - - def upload_config_file(self, path, contents): - return self.__upload_contents("upload_config_file", path, contents) - - def upload_working_directory_file(self, path): - return self.__upload_file("upload_working_directory_file", path) - - def _get_output_type(self, name): - return self.__raw_execute_and_parse('get_output_type', {'name': name, - 'job_id': self.job_id}) - - def download_output(self, path, working_directory): - """ """ - name = os.path.basename(path) - output_type = self._get_output_type(name) - response = self.__raw_execute('download_output', {'name' : name, - "job_id" : self.job_id, - 'output_type': output_type}) - if output_type == 'direct': - output = open(path, 'wb') - elif output_type == 'task': - output = open(os.path.join(working_directory, name), 'wb') - else: - raise Exception("No remote output found for dataset with path %s" % path) - try: - while True: - buffer = response.read(1024) - if buffer == "": - break - output.write(buffer) - finally: - output.close() - - def launch(self, command_line): - """ """ - return self.__raw_execute("launch", {"command_line" : command_line, - "job_id" : self.job_id}) - - def kill(self): - return self.__raw_execute("kill", {"job_id" : self.job_id}) - - def wait(self): - """ """ - while True: - complete = self.check_complete() - if complete: - return check_complete_response - time.sleep(1) - - def raw_check_complete(self): - check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id }) - return check_complete_response - - def check_complete(self): - return self.raw_check_complete()["complete"] == "true" - - def clean(self): - self.__raw_execute("clean", { "job_id" : self.job_id }) - - def setup(self): - return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id }) - - class LwrJobRunner( ClusterJobRunner ): """ diff -r ed3d477d056fd77a207e61ae7ee7ed9cc2c1e9b2 -r 6fa106f2678da0e0af8d8c25e90928817254c6f5 lib/galaxy/jobs/runners/lwr_client/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -0,0 +1,246 @@ +import mmap +import os +import re +import time +import urllib +import urllib2 + +import simplejson + + +class FileStager(object): + + def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory): + self.client = client + self.command_line = command_line + self.config_files = config_files + self.input_files = input_files + self.output_files = output_files + self.tool_dir = os.path.abspath(tool_dir) + self.working_directory = working_directory + + self.file_renames = {} + + job_config = client.setup() + + self.new_working_directory = job_config['working_directory'] + self.new_outputs_directory = job_config['outputs_directory'] + self.remote_path_separator = job_config['path_separator'] + + self.__initialize_referenced_tool_files() + self.__upload_tool_files() + self.__upload_input_files() + self.__upload_working_directory_files() + self.__initialize_output_file_renames() + self.__initialize_task_output_file_renames() + self.__initialize_config_file_renames() + self.__rewrite_and_upload_config_files() + self.__rewrite_command_line() + + def __initialize_referenced_tool_files(self): + pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep) + referenced_tool_files = [] + referenced_tool_files += re.findall(pattern, self.command_line) + if self.config_files != None: + for config_file in self.config_files: + referenced_tool_files += re.findall(pattern, self.__read(config_file)) + self.referenced_tool_files = referenced_tool_files + + def __upload_tool_files(self): + for referenced_tool_file in self.referenced_tool_files: + tool_upload_response = self.client.upload_tool_file(referenced_tool_file) + self.file_renames[referenced_tool_file] = tool_upload_response['path'] + + def __upload_input_files(self): + for input_file in self.input_files: + input_upload_response = self.client.upload_input(input_file) + self.file_renames[input_file] = input_upload_response['path'] + # TODO: Determine if this is object store safe and what needs to be + # done if it is not. + files_path = "%s_files" % input_file[0:-len(".dat")] + if os.path.exists(files_path): + for extra_file in os.listdir(files_path): + extra_file_path = os.path.join(files_path, extra_file) + relative_path = os.path.basename(files_path) + extra_file_relative_path = os.path.join(relative_path, extra_file) + response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) + self.file_renames[extra_file_path] = response['path'] + + def __upload_working_directory_files(self): + # Task manager stages files into working directory, these need to be uploaded + for working_directory_file in os.listdir(self.working_directory): + path = os.path.join(self.working_directory, working_directory_file) + working_file_response = self.client.upload_working_directory_file(path) + self.file_renames[path] = working_file_response['path'] + + def __initialize_output_file_renames(self): + for output_file in self.output_files: + self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, + self.remote_path_separator, + os.path.basename(output_file)) + + def __initialize_task_output_file_renames(self): + for output_file in self.output_files: + name = os.path.basename(output_file) + self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + name) + + def __initialize_config_file_renames(self): + for config_file in self.config_files: + self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + os.path.basename(config_file)) + + def __rewrite_paths(self, contents): + new_contents = contents + for local_path, remote_path in self.file_renames.iteritems(): + new_contents = new_contents.replace(local_path, remote_path) + return new_contents + + def __rewrite_and_upload_config_files(self): + for config_file in self.config_files: + config_contents = self.__read(config_file) + new_config_contents = self.__rewrite_paths(config_contents) + self.client.upload_config_file(config_file, new_config_contents) + + def __rewrite_command_line(self): + self.rewritten_command_line = self.__rewrite_paths(self.command_line) + + def get_rewritten_command_line(self): + return self.rewritten_command_line + + def __read(self, path): + input = open(path, "r") + try: + return input.read() + finally: + input.close() + + + +class Client(object): + """ + """ + """ + """ + def __init__(self, remote_host, job_id, private_key=None): + if not remote_host.endswith("/"): + remote_host = remote_host + "/" + ## If we don't have an explicit private_key defined, check for + ## one embedded in the URL. A URL of the form + ## https://moo@cow:8913 will try to contact https://cow:8913 + ## with a private key of moo + private_key_format = "https?://(.*)@.*/?" + private_key_match= re.match(private_key_format, remote_host) + if not private_key and private_key_match: + private_key = private_key_match.group(1) + remote_host = remote_host.replace("%s@" % private_key, '', 1) + self.remote_host = remote_host + self.job_id = job_id + self.private_key = private_key + + def url_open(self, request, data): + return urllib2.urlopen(request, data) + + def __build_url(self, command, args): + if self.private_key: + args["private_key"] = self.private_key + data = urllib.urlencode(args) + url = self.remote_host + command + "?" + data + return url + + def __raw_execute(self, command, args = {}, data = None): + url = self.__build_url(command, args) + request = urllib2.Request(url=url, data=data) + response = self.url_open(request, data) + return response + + def __raw_execute_and_parse(self, command, args = {}, data = None): + response = self.__raw_execute(command, args, data) + return simplejson.loads(response.read()) + + def __upload_file(self, action, path, name=None, contents = None): + """ """ + input = open(path, 'rb') + try: + mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ) + return self.__upload_contents(action, path, mmapped_input, name) + finally: + input.close() + + def __upload_contents(self, action, path, contents, name=None): + if not name: + name = os.path.basename(path) + args = {"job_id" : self.job_id, "name" : name} + return self.__raw_execute_and_parse(action, args, contents) + + def upload_tool_file(self, path): + return self.__upload_file("upload_tool_file", path) + + def upload_input(self, path): + return self.__upload_file("upload_input", path) + + def upload_extra_input(self, path, relative_name): + return self.__upload_file("upload_extra_input", path, name=relative_name) + + def upload_config_file(self, path, contents): + return self.__upload_contents("upload_config_file", path, contents) + + def upload_working_directory_file(self, path): + return self.__upload_file("upload_working_directory_file", path) + + def _get_output_type(self, name): + return self.__raw_execute_and_parse('get_output_type', {'name': name, + 'job_id': self.job_id}) + + def download_output(self, path, working_directory): + """ """ + name = os.path.basename(path) + output_type = self._get_output_type(name) + response = self.__raw_execute('download_output', {'name' : name, + "job_id" : self.job_id, + 'output_type': output_type}) + if output_type == 'direct': + output = open(path, 'wb') + elif output_type == 'task': + output = open(os.path.join(working_directory, name), 'wb') + else: + raise Exception("No remote output found for dataset with path %s" % path) + try: + while True: + buffer = response.read(1024) + if buffer == "": + break + output.write(buffer) + finally: + output.close() + + def launch(self, command_line): + """ """ + return self.__raw_execute("launch", {"command_line" : command_line, + "job_id" : self.job_id}) + + def kill(self): + return self.__raw_execute("kill", {"job_id" : self.job_id}) + + def wait(self): + """ """ + while True: + complete = self.check_complete() + if complete: + return check_complete_response + time.sleep(1) + + def raw_check_complete(self): + check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id }) + return check_complete_response + + def check_complete(self): + return self.raw_check_complete()["complete"] == "true" + + def clean(self): + self.__raw_execute("clean", { "job_id" : self.job_id }) + + def setup(self): + return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id }) https://bitbucket.org/galaxy/galaxy-central/commits/b109c7db9347/ changeset: b109c7db9347 user: jmchilton date: 2013-01-11 17:53:05 summary: Documentation and PEP8 fixes for lwr client code. affected #: 1 file diff -r 6fa106f2678da0e0af8d8c25e90928817254c6f5 -r b109c7db9347271b35d426d3148120ce359977e6 lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -1,3 +1,10 @@ +""" +lwr_client +========== + +This module contains logic for interfacing with an external LWR server. + +""" import mmap import os import re @@ -9,8 +16,32 @@ class FileStager(object): - + """ + Objects of the FileStager class interact with an LWR client object to + stage the files required to run jobs on a remote LWR server. + + **Parameters** + + client : Client + LWR client object. + command_line : str + The local command line to execute, this will be rewritten for the remote server. + config_files : list + List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. + input_files : list + List of input files used by job. These will be transferred and references rewritten. + output_files : list + List of output_files produced by job. + tool_dir : str + Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). + working_directory : str + Local path created by Galaxy for running this job. + + """ + def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory): + """ + """ self.client = client self.command_line = command_line self.config_files = config_files @@ -67,7 +98,8 @@ self.file_renames[extra_file_path] = response['path'] def __upload_working_directory_files(self): - # Task manager stages files into working directory, these need to be uploaded + # Task manager stages files into working directory, these need to be + # uploaded if present. for working_directory_file in os.listdir(self.working_directory): path = os.path.join(self.working_directory, working_directory_file) working_file_response = self.client.upload_working_directory_file(path) @@ -75,8 +107,8 @@ def __initialize_output_file_renames(self): for output_file in self.output_files: - self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, - self.remote_path_separator, + self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, + self.remote_path_separator, os.path.basename(output_file)) def __initialize_task_output_file_renames(self): @@ -108,6 +140,10 @@ self.rewritten_command_line = self.__rewrite_paths(self.command_line) def get_rewritten_command_line(self): + """ + Returns the rewritten version of the command line to execute suitable + for remote host. + """ return self.rewritten_command_line def __read(self, path): @@ -117,13 +153,21 @@ finally: input.close() - - + class Client(object): - """ """ - """ + Objects of this client class perform low-level communication with a remote LWR server. + + **Parameters** + + remote_host : str + Remote URL of the LWR server. + job_id : str + Galaxy job/task id. + private_key : str (optional) + Secret key the remote LWR server is configured with. """ + def __init__(self, remote_host, job_id, private_key=None): if not remote_host.endswith("/"): remote_host = remote_host + "/" @@ -132,7 +176,7 @@ ## https://moo@cow:8913 will try to contact https://cow:8913 ## with a private key of moo private_key_format = "https?://(.*)@.*/?" - private_key_match= re.match(private_key_format, remote_host) + private_key_match = re.match(private_key_format, remote_host) if not private_key and private_key_match: private_key = private_key_match.group(1) remote_host = remote_host.replace("%s@" % private_key, '', 1) @@ -140,9 +184,9 @@ self.job_id = job_id self.private_key = private_key - def url_open(self, request, data): + def _url_open(self, request, data): return urllib2.urlopen(request, data) - + def __build_url(self, command, args): if self.private_key: args["private_key"] = self.private_key @@ -150,21 +194,20 @@ url = self.remote_host + command + "?" + data return url - def __raw_execute(self, command, args = {}, data = None): + def __raw_execute(self, command, args={}, data=None): url = self.__build_url(command, args) request = urllib2.Request(url=url, data=data) - response = self.url_open(request, data) + response = self._url_open(request, data) return response - def __raw_execute_and_parse(self, command, args = {}, data = None): + def __raw_execute_and_parse(self, command, args={}, data=None): response = self.__raw_execute(command, args, data) return simplejson.loads(response.read()) - def __upload_file(self, action, path, name=None, contents = None): - """ """ + def __upload_file(self, action, path, name=None, contents=None): input = open(path, 'rb') try: - mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ) + mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) return self.__upload_contents(action, path, mmapped_input, name) finally: input.close() @@ -172,39 +215,93 @@ def __upload_contents(self, action, path, contents, name=None): if not name: name = os.path.basename(path) - args = {"job_id" : self.job_id, "name" : name} + args = {"job_id": self.job_id, "name": name} return self.__raw_execute_and_parse(action, args, contents) - + def upload_tool_file(self, path): + """ + Upload a tool related file (e.g. wrapper) required to run job. + + **Parameters** + + path : str + Local path tool. + """ return self.__upload_file("upload_tool_file", path) def upload_input(self, path): + """ + Upload input dataset to remote server. + + **Parameters** + + path : str + Local path of input dataset. + """ return self.__upload_file("upload_input", path) def upload_extra_input(self, path, relative_name): + """ + Upload extra input file to remote server. + + **Parameters** + + path : str + Extra files path of input dataset corresponding to this input. + relative_name : str + Relative path of extra file to upload relative to inputs extra files path. + """ return self.__upload_file("upload_extra_input", path, name=relative_name) def upload_config_file(self, path, contents): + """ + Upload a job's config file to the remote server. + + **Parameters** + + path : str + Local path to the original config file. + contents : str + Rewritten contents of the config file to upload. + """ return self.__upload_contents("upload_config_file", path, contents) def upload_working_directory_file(self, path): + """ + Upload the supplied file (path) from a job's working directory + to remote server. + + **Parameters** + + path : str + Path to file to upload. + """ return self.__upload_file("upload_working_directory_file", path) def _get_output_type(self, name): - return self.__raw_execute_and_parse('get_output_type', {'name': name, - 'job_id': self.job_id}) + return self.__raw_execute_and_parse("get_output_type", {"name": name, + "job_id": self.job_id}) def download_output(self, path, working_directory): - """ """ + """ + Download an output dataset from the remote server. + + **Parameters** + + path : str + Local path of the dataset. + working_directory : str + Local working_directory for the job. + """ name = os.path.basename(path) output_type = self._get_output_type(name) - response = self.__raw_execute('download_output', {'name' : name, - "job_id" : self.job_id, - 'output_type': output_type}) - if output_type == 'direct': - output = open(path, 'wb') - elif output_type == 'task': - output = open(os.path.join(working_directory, name), 'wb') + response = self.__raw_execute("download_output", {"name": name, + "job_id": self.job_id, + "output_type": output_type}) + if output_type == "direct": + output = open(path, "wb") + elif output_type == "task": + output = open(os.path.join(working_directory, name), "wb") else: raise Exception("No remote output found for dataset with path %s" % path) try: @@ -215,32 +312,57 @@ output.write(buffer) finally: output.close() - + def launch(self, command_line): - """ """ - return self.__raw_execute("launch", {"command_line" : command_line, - "job_id" : self.job_id}) + """ + Run or queue up the execution of the supplied + `command_line` on the remote server. + + **Parameters** + + command_line : str + Command to execute. + """ + return self.__raw_execute("launch", {"command_line": command_line, + "job_id": self.job_id}) def kill(self): - return self.__raw_execute("kill", {"job_id" : self.job_id}) - + """ + Cancel remote job, either removing from the queue or killing it. + """ + return self.__raw_execute("kill", {"job_id": self.job_id}) + def wait(self): - """ """ + """ + Wait for job to finish. + """ while True: - complete = self.check_complete() - if complete: - return check_complete_response + complete_response = self.raw_check_complete() + if complete_response["complete"] == "true": + return complete_response time.sleep(1) def raw_check_complete(self): - check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id }) + """ + Get check_complete response from the remote server. + """ + check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id": self.job_id}) return check_complete_response def check_complete(self): + """ + Return boolean indicating whether the job is complete. + """ return self.raw_check_complete()["complete"] == "true" def clean(self): - self.__raw_execute("clean", { "job_id" : self.job_id }) + """ + Cleanup the remote job. + """ + self.__raw_execute("clean", {"job_id": self.job_id}) def setup(self): - return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id }) + """ + Setup remote LWR server to run this job. + """ + return self.__raw_execute_and_parse("setup", {"job_id": self.job_id}) https://bitbucket.org/galaxy/galaxy-central/commits/856d9508b97f/ changeset: 856d9508b97f user: jmchilton date: 2013-01-11 17:53:05 summary: Extend lwr to allow execution of jobs with outputs specified using 'from_work_dir'. affected #: 3 files diff -r b109c7db9347271b35d426d3148120ce359977e6 -r 856d9508b97fe655e9652750b59fbb787c8f632a lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -8,7 +8,7 @@ log = logging.getLogger( __name__ ) class BaseJobRunner( object ): - def build_command_line( self, job_wrapper, include_metadata=False ): + def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ): """ Compose the sequence of commands necessary to execute a job. This will currently include: @@ -19,18 +19,6 @@ - commands to set metadata (if include_metadata is True) """ - def in_directory( file, directory ): - """ - Return true, if the common prefix of both is equal to directory - e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b - """ - - # Make both absolute. - directory = os.path.abspath( directory ) - file = os.path.abspath( file ) - - return os.path.commonprefix( [ file, directory ] ) == directory - commands = job_wrapper.get_command_line() # All job runners currently handle this case which should never # occur @@ -47,6 +35,41 @@ commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] ) # -- Append commands to copy job outputs based on from_work_dir attribute. -- + if include_work_dir_outputs: + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + if work_dir_outputs: + commands += "; " + "; ".join( [ "cp %s %s" % ( source_file, destination ) for ( source_file, destination ) in work_dir_outputs ] ) + + # Append metadata setting commands, we don't want to overwrite metadata + # that was copied over in init_meta(), as per established behavior + if include_metadata and self.app.config.set_metadata_externally: + commands += "; cd %s; " % os.path.abspath( os.getcwd() ) + commands += job_wrapper.setup_external_metadata( + exec_dir = os.path.abspath( os.getcwd() ), + tmp_dir = job_wrapper.working_directory, + dataset_files_path = self.app.model.Dataset.file_path, + output_fnames = job_wrapper.get_output_fnames(), + set_extension = False, + kwds = { 'overwrite' : False } ) + return commands + + def get_work_dir_outputs( self, job_wrapper ): + """ + Returns list of pairs (source_file, destination) describing path + to work_dir output file and ultimate destination. + """ + + def in_directory( file, directory ): + """ + Return true, if the common prefix of both is equal to directory + e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b + """ + + # Make both absolute. + directory = os.path.abspath( directory ) + file = os.path.abspath( file ) + + return os.path.commonprefix( [ file, directory ] ) == directory # Set up dict of dataset id --> output path; output path can be real or # false depending on outputs_to_working_directory @@ -57,6 +80,7 @@ path = dataset_path.false_path output_paths[ dataset_path.dataset_id ] = path + output_pairs = [] # Walk job's output associations to find and use from_work_dir attributes. job = job_wrapper.get_job() job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None ) @@ -72,30 +96,14 @@ source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir ) destination = output_paths[ dataset.dataset_id ] if in_directory( source_file, job_wrapper.working_directory ): - try: - commands += "; cp %s %s" % ( source_file, destination ) - log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) ) - except ( IOError, OSError ): - log.debug( "Could not copy %s to %s as directed by from_work_dir" % ( source_file, destination ) ) + output_pairs.append( ( source_file, destination ) ) + log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) ) else: # Security violation. log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) + return output_pairs - - # Append metadata setting commands, we don't want to overwrite metadata - # that was copied over in init_meta(), as per established behavior - if include_metadata and self.app.config.set_metadata_externally: - commands += "; cd %s; " % os.path.abspath( os.getcwd() ) - commands += job_wrapper.setup_external_metadata( - exec_dir = os.path.abspath( os.getcwd() ), - tmp_dir = job_wrapper.working_directory, - dataset_files_path = self.app.model.Dataset.file_path, - output_fnames = job_wrapper.get_output_fnames(), - set_extension = False, - kwds = { 'overwrite' : False } ) - return commands - class ClusterJobState( object ): """ Encapsulate the state of a cluster job, this should be subclassed as diff -r b109c7db9347271b35d426d3148120ce359977e6 -r 856d9508b97fe655e9652750b59fbb787c8f632a lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -54,7 +54,7 @@ if 0 != os.system(cmd): raise Exception('Error running file staging command: %s' % cmd) job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line - command_line = self.build_command_line( job_wrapper, include_metadata=False ) + command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) except: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) @@ -126,7 +126,12 @@ stderr = run_results['stderr'] if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: + work_dir_outputs = self.get_work_dir_outputs(job_wrapper) output_files = self.get_output_files(job_wrapper) + for source_file, output_file in work_dir_outputs: + client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file) + # Remove from full output_files list so don't try to download directly. + output_files.remove(output_file) for output_file in output_files: client.download_output(output_file, working_directory=job_wrapper.working_directory) client.clean() diff -r b109c7db9347271b35d426d3148120ce359977e6 -r 856d9508b97fe655e9652750b59fbb787c8f632a lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -282,6 +282,24 @@ return self.__raw_execute_and_parse("get_output_type", {"name": name, "job_id": self.job_id}) + def download_work_dir_output(self, source, working_directory, output_path): + """ + Download an output dataset specified with from_work_dir from the + remote server. + + **Parameters** + + source : str + Path in job's working_directory to find output in. + working_directory : str + Local working_directory for the job. + output_path : str + Full path to output dataset. + """ + output = open(output_path, "wb") + name = os.path.basename(source) + self.__raw_download_output(name, self.job_id, "work_dir", output) + def download_output(self, path, working_directory): """ Download an output dataset from the remote server. @@ -295,23 +313,26 @@ """ name = os.path.basename(path) output_type = self._get_output_type(name) - response = self.__raw_execute("download_output", {"name": name, - "job_id": self.job_id, - "output_type": output_type}) if output_type == "direct": output = open(path, "wb") elif output_type == "task": output = open(os.path.join(working_directory, name), "wb") else: raise Exception("No remote output found for dataset with path %s" % path) + self.__raw_download_output(name, self.job_id, output_type, output) + + def __raw_download_output(self, name, job_id, output_type, output_file): + response = self.__raw_execute("download_output", {"name": name, + "job_id": self.job_id, + "output_type": output_type}) try: while True: buffer = response.read(1024) if buffer == "": break - output.write(buffer) + output_file.write(buffer) finally: - output.close() + output_file.close() def launch(self, command_line): """ https://bitbucket.org/galaxy/galaxy-central/commits/b749bb68c1f7/ changeset: b749bb68c1f7 user: jmchilton date: 2013-01-11 17:53:05 summary: Implement optimization attempting to not transfer unneeded inputs to remote LWR server. More general refactoring and testing of lwr client code. affected #: 1 file diff -r 856d9508b97fe655e9652750b59fbb787c8f632a -r b749bb68c1f7a71c775e7486152dcfe93813c668 lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -15,6 +15,99 @@ import simplejson +class JobInputs(object): + """ + Abstractions over dynamic inputs created for a given job (namely the command to + execute and created configfiles). + + **Parameters** + + command_line : str + Local command to execute for this job. (To be rewritten.) + config_files : str + Config files created for this job. (To be rewritten.) + + + >>> import tempfile + >>> tf = tempfile.NamedTemporaryFile() + >>> def setup_inputs(tf): + ... open(tf.name, "w").write("world /path/to/input the rest") + ... inputs = JobInputs("hello /path/to/input", [tf.name]) + ... return inputs + >>> inputs = setup_inputs(tf) + >>> inputs.rewrite_paths("/path/to/input", 'C:\\input') + >>> inputs.rewritten_command_line + 'hello C:\\\\input' + >>> inputs.rewritten_config_files[tf.name] + 'world C:\\\\input the rest' + >>> tf.close() + >>> tf = tempfile.NamedTemporaryFile() + >>> inputs = setup_inputs(tf) + >>> inputs.find_referenced_subfiles('/path/to') + ['/path/to/input'] + >>> inputs.path_referenced('/path/to') + True + >>> inputs.path_referenced('/path/to/input') + True + >>> inputs.path_referenced('/path/to/notinput') + False + >>> tf.close() + """ + + def __init__(self, command_line, config_files): + self.rewritten_command_line = command_line + self.rewritten_config_files = {} + for config_file in config_files or []: + config_contents = _read(config_file) + self.rewritten_config_files[config_file] = config_contents + + def find_referenced_subfiles(self, directory): + """ + Return list of files below specified `directory` in job inputs. Could + use more sophisticated logic (match quotes to handle spaces, handle + subdirectories, etc...). + + **Parameters** + + directory : str + Full path to directory to search. + + """ + pattern = r"(%s%s\S+)" % (directory, os.sep) + referenced_files = set() + for input_contents in self.__items(): + referenced_files.update(re.findall(pattern, input_contents)) + return list(referenced_files) + + def path_referenced(self, path): + pattern = r"%s" % path + found = False + for input_contents in self.__items(): + if re.findall(pattern, input_contents): + found = True + break + return found + + def rewrite_paths(self, local_path, remote_path): + """ + Rewrite references to `local_path` with `remote_path` in job inputs. + """ + self.__rewrite_command_line(local_path, remote_path) + self.__rewrite_config_files(local_path, remote_path) + + def __rewrite_command_line(self, local_path, remote_path): + self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path) + + def __rewrite_config_files(self, local_path, remote_path): + for config_file, rewritten_contents in self.rewritten_config_files.iteritems(): + self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path) + + def __items(self): + items = [self.rewritten_command_line] + items.extend(self.rewritten_config_files.values()) + return items + + class FileStager(object): """ Objects of the FileStager class interact with an LWR client object to @@ -50,6 +143,10 @@ self.tool_dir = os.path.abspath(tool_dir) self.working_directory = working_directory + # Setup job inputs, these will need to be rewritten before + # shipping off to remote LWR server. + self.job_inputs = JobInputs(self.command_line, self.config_files) + self.file_renames = {} job_config = client.setup() @@ -65,17 +162,11 @@ self.__initialize_output_file_renames() self.__initialize_task_output_file_renames() self.__initialize_config_file_renames() - self.__rewrite_and_upload_config_files() - self.__rewrite_command_line() + self.__handle_rewrites() + self.__upload_rewritten_config_files() def __initialize_referenced_tool_files(self): - pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep) - referenced_tool_files = [] - referenced_tool_files += re.findall(pattern, self.command_line) - if self.config_files != None: - for config_file in self.config_files: - referenced_tool_files += re.findall(pattern, self.__read(config_file)) - self.referenced_tool_files = referenced_tool_files + self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) def __upload_tool_files(self): for referenced_tool_file in self.referenced_tool_files: @@ -84,18 +175,25 @@ def __upload_input_files(self): for input_file in self.input_files: + self.__upload_input_file(input_file) + self.__upload_input_extra_files(input_file) + + def __upload_input_file(self, input_file): + if self.job_inputs.path_referenced(input_file): input_upload_response = self.client.upload_input(input_file) self.file_renames[input_file] = input_upload_response['path'] - # TODO: Determine if this is object store safe and what needs to be - # done if it is not. - files_path = "%s_files" % input_file[0:-len(".dat")] - if os.path.exists(files_path): - for extra_file in os.listdir(files_path): - extra_file_path = os.path.join(files_path, extra_file) - relative_path = os.path.basename(files_path) - extra_file_relative_path = os.path.join(relative_path, extra_file) - response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) - self.file_renames[extra_file_path] = response['path'] + + def __upload_input_extra_files(self, input_file): + # TODO: Determine if this is object store safe and what needs to be + # done if it is not. + files_path = "%s_files" % input_file[0:-len(".dat")] + if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path): + for extra_file in os.listdir(files_path): + extra_file_path = os.path.join(files_path, extra_file) + relative_path = os.path.basename(files_path) + extra_file_relative_path = os.path.join(relative_path, extra_file) + response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) + self.file_renames[extra_file_path] = response['path'] def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be @@ -130,28 +228,20 @@ new_contents = new_contents.replace(local_path, remote_path) return new_contents - def __rewrite_and_upload_config_files(self): - for config_file in self.config_files: - config_contents = self.__read(config_file) - new_config_contents = self.__rewrite_paths(config_contents) + def __handle_rewrites(self): + for local_path, remote_path in self.file_renames.iteritems(): + self.job_inputs.rewrite_paths(local_path, remote_path) + + def __upload_rewritten_config_files(self): + for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems(): self.client.upload_config_file(config_file, new_config_contents) - def __rewrite_command_line(self): - self.rewritten_command_line = self.__rewrite_paths(self.command_line) - def get_rewritten_command_line(self): """ Returns the rewritten version of the command line to execute suitable for remote host. """ - return self.rewritten_command_line - - def __read(self, path): - input = open(path, "r") - try: - return input.read() - finally: - input.close() + return self.job_inputs.rewritten_command_line class Client(object): @@ -387,3 +477,15 @@ Setup remote LWR server to run this job. """ return self.__raw_execute_and_parse("setup", {"job_id": self.job_id}) + + +def _read(path): + """ + Utility method to quickly read small files (config files and tool + wrappers) into memory as strings. + """ + input = open(path, "r") + try: + return input.read() + finally: + input.close() https://bitbucket.org/galaxy/galaxy-central/commits/2631faf42a8f/ changeset: 2631faf42a8f user: jmchilton date: 2013-01-11 17:53:05 summary: Rework job_id handling in LWR runner allowing remote LWR server to assign a job_id during setup, save this will serve as the job's external id. This change allows multiple Galaxy instances to submit jobs to the same LWR backend server and will prove useful when implementing additional backends (pbs/drmaa/etc...) for the LWR server. affected #: 2 files diff -r b749bb68c1f7a71c775e7486152dcfe93813c668 -r 2631faf42a8fe4f3b3f9b556809da07b69ee845a lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -73,8 +73,9 @@ working_directory = job_wrapper.working_directory file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory) rebuilt_command_line = file_stager.get_rewritten_command_line() + job_id = file_stager.job_id client.launch( rebuilt_command_line ) - job_wrapper.set_runner( runner_url, job_wrapper.job_id ) + job_wrapper.set_runner( runner_url, job_id ) job_wrapper.change_state( model.Job.states.RUNNING ) except Exception, exc: @@ -84,7 +85,7 @@ lwr_job_state = ClusterJobState() lwr_job_state.job_wrapper = job_wrapper - lwr_job_state.job_id = job_wrapper.job_id + lwr_job_state.job_id = job_id lwr_job_state.old_state = True lwr_job_state.running = True lwr_job_state.runner_url = runner_url diff -r b749bb68c1f7a71c775e7486152dcfe93813c668 -r 2631faf42a8fe4f3b3f9b556809da07b69ee845a lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -149,12 +149,7 @@ self.file_renames = {} - job_config = client.setup() - - self.new_working_directory = job_config['working_directory'] - self.new_outputs_directory = job_config['outputs_directory'] - self.remote_path_separator = job_config['path_separator'] - + self.__handle_setup() self.__initialize_referenced_tool_files() self.__upload_tool_files() self.__upload_input_files() @@ -165,6 +160,21 @@ self.__handle_rewrites() self.__upload_rewritten_config_files() + def __handle_setup(self): + job_config = self.client.setup() + + self.new_working_directory = job_config['working_directory'] + self.new_outputs_directory = job_config['outputs_directory'] + self.remote_path_separator = job_config['path_separator'] + # If remote LWR server assigned job id, use that otherwise + # just use local job_id assigned. + galaxy_job_id = self.client.job_id + self.job_id = job_config.get('job_id', galaxy_job_id) + if self.job_id != galaxy_job_id: + # Remote LWR server assigned an id different than the + # Galaxy job id, update client to reflect this. + self.client.job_id = self.job_id + def __initialize_referenced_tool_files(self): self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) https://bitbucket.org/galaxy/galaxy-central/commits/21a91117bcc8/ changeset: 21a91117bcc8 user: jmchilton date: 2013-01-11 18:02:43 summary: Re-introduce LWR shutdown fixes from Dave B.. affected #: 1 file diff -r 2631faf42a8fe4f3b3f9b556809da07b69ee845a -r 21a91117bcc80e55aa5a196caa8b801180ed9480 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -2,7 +2,7 @@ import subprocess from galaxy import model -from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner +from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner, STOP_SIGNAL import errno from time import sleep @@ -174,8 +174,9 @@ def shutdown( self ): """Attempts to gracefully shut down the worker threads""" log.info( "sending stop signal to worker threads" ) - for i in range( len( self.threads ) ): - self.queue.put( self.STOP_SIGNAL ) + self.monitor_queue.put( STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( STOP_SIGNAL, None ) ) log.info( "local job runner stopped" ) def check_pid( self, pid ): https://bitbucket.org/galaxy/galaxy-central/commits/e5ec6ed8f033/ changeset: e5ec6ed8f033 user: natefoo date: 2013-01-11 18:25:58 summary: Merged in jmchilton/galaxy-central-lwr (pull request #106: Ongoing LWR Enhancements) affected #: 3 files diff -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -1,9 +1,14 @@ import os, logging, os.path +from galaxy import model +from Queue import Queue, Empty +import time +import threading + log = logging.getLogger( __name__ ) class BaseJobRunner( object ): - def build_command_line( self, job_wrapper, include_metadata=False ): + def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ): """ Compose the sequence of commands necessary to execute a job. This will currently include: @@ -14,18 +19,6 @@ - commands to set metadata (if include_metadata is True) """ - def in_directory( file, directory ): - """ - Return true, if the common prefix of both is equal to directory - e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b - """ - - # Make both absolute. - directory = os.path.abspath( directory ) - file = os.path.abspath( file ) - - return os.path.commonprefix( [ file, directory ] ) == directory - commands = job_wrapper.get_command_line() # All job runners currently handle this case which should never # occur @@ -42,6 +35,41 @@ commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] ) # -- Append commands to copy job outputs based on from_work_dir attribute. -- + if include_work_dir_outputs: + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + if work_dir_outputs: + commands += "; " + "; ".join( [ "cp %s %s" % ( source_file, destination ) for ( source_file, destination ) in work_dir_outputs ] ) + + # Append metadata setting commands, we don't want to overwrite metadata + # that was copied over in init_meta(), as per established behavior + if include_metadata and self.app.config.set_metadata_externally: + commands += "; cd %s; " % os.path.abspath( os.getcwd() ) + commands += job_wrapper.setup_external_metadata( + exec_dir = os.path.abspath( os.getcwd() ), + tmp_dir = job_wrapper.working_directory, + dataset_files_path = self.app.model.Dataset.file_path, + output_fnames = job_wrapper.get_output_fnames(), + set_extension = False, + kwds = { 'overwrite' : False } ) + return commands + + def get_work_dir_outputs( self, job_wrapper ): + """ + Returns list of pairs (source_file, destination) describing path + to work_dir output file and ultimate destination. + """ + + def in_directory( file, directory ): + """ + Return true, if the common prefix of both is equal to directory + e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b + """ + + # Make both absolute. + directory = os.path.abspath( directory ) + file = os.path.abspath( file ) + + return os.path.commonprefix( [ file, directory ] ) == directory # Set up dict of dataset id --> output path; output path can be real or # false depending on outputs_to_working_directory @@ -52,6 +80,7 @@ path = dataset_path.false_path output_paths[ dataset_path.dataset_id ] = path + output_pairs = [] # Walk job's output associations to find and use from_work_dir attributes. job = job_wrapper.get_job() job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None ) @@ -67,26 +96,165 @@ source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir ) destination = output_paths[ dataset.dataset_id ] if in_directory( source_file, job_wrapper.working_directory ): - try: - commands += "; cp %s %s" % ( source_file, destination ) - log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) ) - except ( IOError, OSError ): - log.debug( "Could not copy %s to %s as directed by from_work_dir" % ( source_file, destination ) ) + output_pairs.append( ( source_file, destination ) ) + log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) ) else: # Security violation. log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) + return output_pairs +class ClusterJobState( object ): + """ + Encapsulate the state of a cluster job, this should be subclassed as + needed for various job runners to capture additional information needed + to communicate with cluster job manager. + """ - # Append metadata setting commands, we don't want to overwrite metadata - # that was copied over in init_meta(), as per established behavior - if include_metadata and self.app.config.set_metadata_externally: - commands += "; cd %s; " % os.path.abspath( os.getcwd() ) - commands += job_wrapper.setup_external_metadata( - exec_dir = os.path.abspath( os.getcwd() ), - tmp_dir = job_wrapper.working_directory, - dataset_files_path = self.app.model.Dataset.file_path, - output_fnames = job_wrapper.get_output_fnames(), - set_extension = False, - kwds = { 'overwrite' : False } ) - return commands + def __init__( self ): + self.job_wrapper = None + self.job_id = None + self.old_state = None + self.running = False + self.runner_url = None + +STOP_SIGNAL = object() + +JOB_STATUS_QUEUED = 'queue' +JOB_STATUS_FAILED = 'fail' +JOB_STATUS_FINISHED = 'finish' + +class ClusterJobRunner( BaseJobRunner ): + """ + Not sure this is the best name for this class, but there is common code + shared between sge, pbs, drmaa, etc... + """ + + def __init__( self, app ): + self.app = app + self.sa_session = app.model.context + # 'watched' and 'queue' are both used to keep track of jobs to watch. + # 'queue' is used to add new watched jobs, and can be called from + # any thread (usually by the 'queue_job' method). 'watched' must only + # be modified by the monitor thread, which will move items from 'queue' + # to 'watched' and then manage the watched jobs. + self.watched = [] + self.monitor_queue = Queue() + + def _init_monitor_thread(self): + self.monitor_thread = threading.Thread( name="%s.monitor_thread" % self.runner_name, target=self.monitor ) + self.monitor_thread.setDaemon( True ) + self.monitor_thread.start() + + def _init_worker_threads(self): + self.work_queue = Queue() + self.work_threads = [] + nworkers = self.app.config.cluster_job_queue_workers + for i in range( nworkers ): + worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + + def handle_stop(self): + # DRMAA and SGE runners should override this and disconnect. + pass + + def monitor( self ): + """ + Watches jobs currently in the cluster queue and deals with state changes + (queued to running) and job completion + """ + while 1: + # Take any new watched jobs and put them on the monitor list + try: + while 1: + cluster_job_state = self.monitor_queue.get_nowait() + if cluster_job_state is STOP_SIGNAL: + # TODO: This is where any cleanup would occur + self.handle_stop() + return + self.watched.append( cluster_job_state ) + except Empty: + pass + # Iterate over the list of watched jobs and check state + self.check_watched_items() + # Sleep a bit before the next state check + time.sleep( 1 ) + + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is STOP_SIGNAL: + return + try: + if op == JOB_STATUS_QUEUED: + # If the next item is to be run, then only run it if the + # job state is "queued". Otherwise the next item was either + # cancelled or one of its siblings encountered an error. + job_state = obj.get_state() + if model.Job.states.QUEUED == job_state: + self.queue_job( obj ) + else: + log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) ) + elif op == JOB_STATUS_FINISHED: + self.finish_job( obj ) + elif op == JOB_STATUS_FAILED: + self.fail_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + + def monitor_job(self, job_state): + self.monitor_queue.put( job_state ) + + def put( self, job_wrapper ): + """Add a job to the queue (by job identifier)""" + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.mark_as_queued(job_wrapper) + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads" ) + self.monitor_queue.put( STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( STOP_SIGNAL, None ) ) + + def check_watched_items(self): + """ + This method is responsible for iterating over self.watched and handling + state changes and updating self.watched with a new list of watched job + states. Subclasses can opt to override this directly (as older job runners will + initially) or just override check_watched_item and allow the list processing to + reuse the logic here. + """ + new_watched = [] + for cluster_job_state in self.watched: + new_cluster_job_state = self.check_watched_item(cluster_job_state) + if new_cluster_job_state: + new_watched.append(new_cluster_job_state) + self.watched = new_watched + + # Subclasses should implement this unless they override check_watched_items all together. + def check_watched_item(self): + raise NotImplementedError() + + def queue_job(self, job_wrapper): + raise NotImplementedError() + + def finish_job(self, job_state): + raise NotImplementedError() + + def fail_job(self, job_state): + raise NotImplementedError() + + def mark_as_finished(self, job_state): + self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) ) + + def mark_as_failed(self, job_state): + self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) ) + + def mark_as_queued(self, job_wrapper): + self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) ) diff -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -1,263 +1,47 @@ import logging import subprocess -from Queue import Queue -import threading - -import re from galaxy import model -from galaxy.datatypes.data import nice_size -from galaxy.jobs.runners import BaseJobRunner +from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner, STOP_SIGNAL -import os, errno +import errno from time import sleep +from lwr_client import FileStager, Client + log = logging.getLogger( __name__ ) __all__ = [ 'LwrJobRunner' ] -import urllib -import urllib2 -import httplib -import mmap -import tempfile -import time -import simplejson +class LwrJobRunner( ClusterJobRunner ): + """ + LWR Job Runner + """ + runner_name = "LWRRunner" -class FileStager(object): - - def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir): - self.client = client - self.command_line = command_line - self.config_files = config_files - self.input_files = input_files - self.output_files = output_files - self.tool_dir = os.path.abspath(tool_dir) + def __init__( self, app ): + """Start the job runner """ + super( LwrJobRunner, self ).__init__( app ) + self._init_monitor_thread() + log.info( "starting LWR workers" ) + self._init_worker_threads() - self.file_renames = {} + def check_watched_item(self, job_state): + try: + client = self.get_client_from_state(job_state) + complete = client.check_complete() + except Exception: + # An orphaned job was put into the queue at app startup, so remote server went down + # either way we are done I guess. + self.mark_as_finished(job_state) + return None + if complete: + self.mark_as_finished(job_state) + return None + return job_state - job_config = client.setup() - - self.new_working_directory = job_config['working_directory'] - self.new_outputs_directory = job_config['outputs_directory'] - self.remote_path_separator = job_config['path_separator'] - - self.__initialize_referenced_tool_files() - self.__upload_tool_files() - self.__upload_input_files() - self.__initialize_output_file_renames() - self.__initialize_config_file_renames() - self.__rewrite_and_upload_config_files() - self.__rewrite_command_line() - - def __initialize_referenced_tool_files(self): - pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep) - referenced_tool_files = [] - referenced_tool_files += re.findall(pattern, self.command_line) - if self.config_files != None: - for config_file in self.config_files: - referenced_tool_files += re.findall(pattern, self.__read(config_file)) - self.referenced_tool_files = referenced_tool_files - - def __upload_tool_files(self): - for referenced_tool_file in self.referenced_tool_files: - tool_upload_response = self.client.upload_tool_file(referenced_tool_file) - self.file_renames[referenced_tool_file] = tool_upload_response['path'] - - def __upload_input_files(self): - for input_file in self.input_files: - input_upload_response = self.client.upload_input(input_file) - self.file_renames[input_file] = input_upload_response['path'] - - def __initialize_output_file_renames(self): - for output_file in self.output_files: - self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, - self.remote_path_separator, - os.path.basename(output_file)) - - def __initialize_config_file_renames(self): - for config_file in self.config_files: - self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, - self.remote_path_separator, - os.path.basename(config_file)) - - def __rewrite_paths(self, contents): - new_contents = contents - for local_path, remote_path in self.file_renames.iteritems(): - new_contents = new_contents.replace(local_path, remote_path) - return new_contents - - def __rewrite_and_upload_config_files(self): - for config_file in self.config_files: - config_contents = self.__read(config_file) - new_config_contents = self.__rewrite_paths(config_contents) - self.client.upload_config_file(config_file, new_config_contents) - - def __rewrite_command_line(self): - self.rewritten_command_line = self.__rewrite_paths(self.command_line) - - def get_rewritten_command_line(self): - return self.rewritten_command_line - - def __read(self, path): - input = open(path, "r") - try: - return input.read() - finally: - input.close() - - - -class Client(object): - """ - """ - """ - """ - def __init__(self, remote_host, job_id, private_key=None): - if not remote_host.endswith("/"): - remote_host = remote_host + "/" - ## If we don't have an explicit private_key defined, check for - ## one embedded in the URL. A URL of the form - ## https://moo@cow:8913 will try to contact https://cow:8913 - ## with a private key of moo - private_key_format = "https?://(.*)@.*/?" - private_key_match= re.match(private_key_format, remote_host) - if not private_key and private_key_match: - private_key = private_key_match.group(1) - remote_host = remote_host.replace("%s@" % private_key, '', 1) - self.remote_host = remote_host - self.job_id = job_id - self.private_key = private_key - - def url_open(self, request, data): - return urllib2.urlopen(request, data) - - def __build_url(self, command, args): - if self.private_key: - args["private_key"] = self.private_key - data = urllib.urlencode(args) - url = self.remote_host + command + "?" + data - return url - - def __raw_execute(self, command, args = {}, data = None): - url = self.__build_url(command, args) - request = urllib2.Request(url=url, data=data) - response = self.url_open(request, data) - return response - - def __raw_execute_and_parse(self, command, args = {}, data = None): - response = self.__raw_execute(command, args, data) - return simplejson.loads(response.read()) - - def __upload_file(self, action, path, contents = None): - """ """ - input = open(path, 'rb') - try: - mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ) - return self.__upload_contents(action, path, mmapped_input) - finally: - input.close() - - def __upload_contents(self, action, path, contents): - name = os.path.basename(path) - args = {"job_id" : self.job_id, "name" : name} - return self.__raw_execute_and_parse(action, args, contents) - - def upload_tool_file(self, path): - return self.__upload_file("upload_tool_file", path) - - def upload_input(self, path): - return self.__upload_file("upload_input", path) - - def upload_config_file(self, path, contents): - return self.__upload_contents("upload_config_file", path, contents) - - def download_output(self, path): - """ """ - name = os.path.basename(path) - response = self.__raw_execute('download_output', {'name' : name, - "job_id" : self.job_id}) - output = open(path, 'wb') - try: - while True: - buffer = response.read(1024) - if buffer == "": - break - output.write(buffer) - finally: - output.close() - - def launch(self, command_line): - """ """ - return self.__raw_execute("launch", {"command_line" : command_line, - "job_id" : self.job_id}) - - def kill(self): - return self.__raw_execute("kill", {"job_id" : self.job_id}) - - def wait(self): - """ """ - while True: - check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id }) - complete = check_complete_response["complete"] == "true" - if complete: - return check_complete_response - time.sleep(1) - - def clean(self): - self.__raw_execute("clean", { "job_id" : self.job_id }) - - def setup(self): - return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id }) - - - -class LwrJobRunner( BaseJobRunner ): - """ - Lwr Job Runner - """ - STOP_SIGNAL = object() - def __init__( self, app ): - """Start the job runner with 'nworkers' worker threads""" - self.app = app - self.sa_session = app.model.context - - # start workers - self.queue = Queue() - self.threads = [] - nworkers = app.config.local_job_queue_workers - log.info( "starting workers" ) - for i in range( nworkers ): - worker = threading.Thread( ( name="LwrJobRunner.thread-%d" % i ), target=self.run_next ) - worker.setDaemon( True ) - worker.start() - self.threads.append( worker ) - log.debug( "%d workers ready", nworkers ) - - def run_next( self ): - """Run the next job, waiting until one is available if neccesary""" - while 1: - job_wrapper = self.queue.get() - if job_wrapper is self.STOP_SIGNAL: - return - try: - self.run_job( job_wrapper ) - except: - log.exception( "Uncaught exception running job" ) - - def determine_lwr_url(self, url): - lwr_url = url[ len( 'lwr://' ) : ] - return lwr_url - - def get_client_from_wrapper(self, job_wrapper): - return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id ) - - def get_client(self, job_runner, job_id): - lwr_url = self.determine_lwr_url( job_runner ) - return Client(lwr_url, job_id) - - def run_job( self, job_wrapper ): + def queue_job(self, job_wrapper): stderr = stdout = command_line = '' runner_url = job_wrapper.get_job_runner_url() @@ -265,47 +49,98 @@ try: job_wrapper.prepare() if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None: - for cmd in job_wrapper.prepare_input_file_cmds: # run the commands to stage the input files + for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files #log.debug( 'executing: %s' % cmd ) if 0 != os.system(cmd): raise Exception('Error running file staging command: %s' % cmd) job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line - command_line = self.build_command_line( job_wrapper, include_metadata=False ) + command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False ) except: job_wrapper.fail( "failure preparing job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) return # If we were able to get a command line, run the job - if command_line: - try: - #log.debug( 'executing: %s' % command_line ) - client = self.get_client_from_wrapper(job_wrapper) - output_fnames = job_wrapper.get_output_fnames() - output_files = [ str( o ) for o in output_fnames ] - input_files = job_wrapper.get_input_fnames() - file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir) - rebuilt_command_line = file_stager.get_rewritten_command_line() - client.launch( rebuilt_command_line ) + if not command_line: + job_wrapper.finish( '', '' ) + return - job_wrapper.set_runner( runner_url, job_wrapper.job_id ) - job_wrapper.change_state( model.Job.states.RUNNING ) + try: + #log.debug( 'executing: %s' % command_line ) + client = self.get_client_from_wrapper(job_wrapper) + output_files = self.get_output_files(job_wrapper) + input_files = job_wrapper.get_input_fnames() + working_directory = job_wrapper.working_directory + file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory) + rebuilt_command_line = file_stager.get_rewritten_command_line() + job_id = file_stager.job_id + client.launch( rebuilt_command_line ) + job_wrapper.set_runner( runner_url, job_id ) + job_wrapper.change_state( model.Job.states.RUNNING ) - run_results = client.wait() - log.debug('run_results %s' % run_results ) - stdout = run_results['stdout'] - stderr = run_results['stderr'] + except Exception, exc: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return - - if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: - for output_file in output_files: - client.download_output(output_file) - client.clean() - log.debug('execution finished: %s' % command_line) - except Exception, exc: - job_wrapper.fail( "failure running job", exception=True ) - log.exception("failure running job %d" % job_wrapper.job_id) - return + lwr_job_state = ClusterJobState() + lwr_job_state.job_wrapper = job_wrapper + lwr_job_state.job_id = job_id + lwr_job_state.old_state = True + lwr_job_state.running = True + lwr_job_state.runner_url = runner_url + self.monitor_job(lwr_job_state) + + def get_output_files(self, job_wrapper): + output_fnames = job_wrapper.get_output_fnames() + return [ str( o ) for o in output_fnames ] + + + def determine_lwr_url(self, url): + lwr_url = url[ len( 'lwr://' ) : ] + return lwr_url + + def get_client_from_wrapper(self, job_wrapper): + job_id = job_wrapper.job_id + if hasattr(job_wrapper, 'task_id'): + job_id = "%s_%s" % (job_id, job_wrapper.task_id) + return self.get_client( job_wrapper.get_job_runner_url(), job_id ) + + def get_client_from_state(self, job_state): + job_runner = job_state.runner_url + job_id = job_state.job_id + return self.get_client(job_runner, job_id) + + def get_client(self, job_runner, job_id): + lwr_url = self.determine_lwr_url( job_runner ) + return Client(lwr_url, job_id) + + def finish_job( self, job_state ): + stderr = stdout = command_line = '' + job_wrapper = job_state.job_wrapper + try: + client = self.get_client_from_state(job_state) + + run_results = client.raw_check_complete() + log.debug('run_results %s' % run_results ) + stdout = run_results['stdout'] + stderr = run_results['stderr'] + + if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]: + work_dir_outputs = self.get_work_dir_outputs(job_wrapper) + output_files = self.get_output_files(job_wrapper) + for source_file, output_file in work_dir_outputs: + client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file) + # Remove from full output_files list so don't try to download directly. + output_files.remove(output_file) + for output_file in output_files: + client.download_output(output_file, working_directory=job_wrapper.working_directory) + client.clean() + log.debug('execution finished: %s' % command_line) + except Exception, exc: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return #run the metadata setting script here #this is terminate-able when output dataset/job is deleted #so that long running set_meta()s can be canceled without having to reboot the server @@ -321,7 +156,7 @@ job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) external_metadata_proc.wait() log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) - + # Finish the job try: job_wrapper.finish( stdout, stderr ) @@ -329,17 +164,19 @@ log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) - def put( self, job_wrapper ): - """Add a job to the queue (by job identifier)""" - # Change to queued state before handing to worker thread so the runner won't pick it up again - job_wrapper.change_state( model.Job.states.QUEUED ) - self.queue.put( job_wrapper ) - + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( job_state.fail_message ) + def shutdown( self ): """Attempts to gracefully shut down the worker threads""" log.info( "sending stop signal to worker threads" ) - for i in range( len( self.threads ) ): - self.queue.put( self.STOP_SIGNAL ) + self.monitor_queue.put( STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( STOP_SIGNAL, None ) ) log.info( "local job runner stopped" ) def check_pid( self, pid ): @@ -355,8 +192,9 @@ def stop_job( self, job ): #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished - if job.external_output_metadata: - pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them + job_ext_output_metadata = job.get_external_output_metadata() + if job_ext_output_metadata: + pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them if pid in [ None, '' ]: log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) return @@ -383,7 +221,21 @@ log.debug("Attempt remote lwr kill of job with url %s and id %s" % (lwr_url, job_id)) client = self.get_client(lwr_url, job_id) client.kill() + + def recover( self, job, job_wrapper ): - # local jobs can't be recovered - job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." ) - + """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_state = ClusterJobState() + job_state.job_id = str( job.get_job_runner_external_id() ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_wrapper.command_line = job.get_command_line() + job_state.job_wrapper = job_wrapper + if job.get_state() == model.Job.states.RUNNING: + log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) ) + job_state.old_state = True + job_state.running = True + self.monitor_queue.put( job_state ) + elif job.get_state() == model.Job.states.QUEUED: + # LWR doesn't queue currently, so this indicates galaxy was shutoff while + # job was being staged. Not sure how to recover from that. + job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted. Please retry the job." ) diff -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 lib/galaxy/jobs/runners/lwr_client/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -0,0 +1,501 @@ +""" +lwr_client +========== + +This module contains logic for interfacing with an external LWR server. + +""" +import mmap +import os +import re +import time +import urllib +import urllib2 + +import simplejson + + +class JobInputs(object): + """ + Abstractions over dynamic inputs created for a given job (namely the command to + execute and created configfiles). + + **Parameters** + + command_line : str + Local command to execute for this job. (To be rewritten.) + config_files : str + Config files created for this job. (To be rewritten.) + + + >>> import tempfile + >>> tf = tempfile.NamedTemporaryFile() + >>> def setup_inputs(tf): + ... open(tf.name, "w").write("world /path/to/input the rest") + ... inputs = JobInputs("hello /path/to/input", [tf.name]) + ... return inputs + >>> inputs = setup_inputs(tf) + >>> inputs.rewrite_paths("/path/to/input", 'C:\\input') + >>> inputs.rewritten_command_line + 'hello C:\\\\input' + >>> inputs.rewritten_config_files[tf.name] + 'world C:\\\\input the rest' + >>> tf.close() + >>> tf = tempfile.NamedTemporaryFile() + >>> inputs = setup_inputs(tf) + >>> inputs.find_referenced_subfiles('/path/to') + ['/path/to/input'] + >>> inputs.path_referenced('/path/to') + True + >>> inputs.path_referenced('/path/to/input') + True + >>> inputs.path_referenced('/path/to/notinput') + False + >>> tf.close() + """ + + def __init__(self, command_line, config_files): + self.rewritten_command_line = command_line + self.rewritten_config_files = {} + for config_file in config_files or []: + config_contents = _read(config_file) + self.rewritten_config_files[config_file] = config_contents + + def find_referenced_subfiles(self, directory): + """ + Return list of files below specified `directory` in job inputs. Could + use more sophisticated logic (match quotes to handle spaces, handle + subdirectories, etc...). + + **Parameters** + + directory : str + Full path to directory to search. + + """ + pattern = r"(%s%s\S+)" % (directory, os.sep) + referenced_files = set() + for input_contents in self.__items(): + referenced_files.update(re.findall(pattern, input_contents)) + return list(referenced_files) + + def path_referenced(self, path): + pattern = r"%s" % path + found = False + for input_contents in self.__items(): + if re.findall(pattern, input_contents): + found = True + break + return found + + def rewrite_paths(self, local_path, remote_path): + """ + Rewrite references to `local_path` with `remote_path` in job inputs. + """ + self.__rewrite_command_line(local_path, remote_path) + self.__rewrite_config_files(local_path, remote_path) + + def __rewrite_command_line(self, local_path, remote_path): + self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path) + + def __rewrite_config_files(self, local_path, remote_path): + for config_file, rewritten_contents in self.rewritten_config_files.iteritems(): + self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path) + + def __items(self): + items = [self.rewritten_command_line] + items.extend(self.rewritten_config_files.values()) + return items + + +class FileStager(object): + """ + Objects of the FileStager class interact with an LWR client object to + stage the files required to run jobs on a remote LWR server. + + **Parameters** + + client : Client + LWR client object. + command_line : str + The local command line to execute, this will be rewritten for the remote server. + config_files : list + List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server. + input_files : list + List of input files used by job. These will be transferred and references rewritten. + output_files : list + List of output_files produced by job. + tool_dir : str + Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). + working_directory : str + Local path created by Galaxy for running this job. + + """ + + def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory): + """ + """ + self.client = client + self.command_line = command_line + self.config_files = config_files + self.input_files = input_files + self.output_files = output_files + self.tool_dir = os.path.abspath(tool_dir) + self.working_directory = working_directory + + # Setup job inputs, these will need to be rewritten before + # shipping off to remote LWR server. + self.job_inputs = JobInputs(self.command_line, self.config_files) + + self.file_renames = {} + + self.__handle_setup() + self.__initialize_referenced_tool_files() + self.__upload_tool_files() + self.__upload_input_files() + self.__upload_working_directory_files() + self.__initialize_output_file_renames() + self.__initialize_task_output_file_renames() + self.__initialize_config_file_renames() + self.__handle_rewrites() + self.__upload_rewritten_config_files() + + def __handle_setup(self): + job_config = self.client.setup() + + self.new_working_directory = job_config['working_directory'] + self.new_outputs_directory = job_config['outputs_directory'] + self.remote_path_separator = job_config['path_separator'] + # If remote LWR server assigned job id, use that otherwise + # just use local job_id assigned. + galaxy_job_id = self.client.job_id + self.job_id = job_config.get('job_id', galaxy_job_id) + if self.job_id != galaxy_job_id: + # Remote LWR server assigned an id different than the + # Galaxy job id, update client to reflect this. + self.client.job_id = self.job_id + + def __initialize_referenced_tool_files(self): + self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir) + + def __upload_tool_files(self): + for referenced_tool_file in self.referenced_tool_files: + tool_upload_response = self.client.upload_tool_file(referenced_tool_file) + self.file_renames[referenced_tool_file] = tool_upload_response['path'] + + def __upload_input_files(self): + for input_file in self.input_files: + self.__upload_input_file(input_file) + self.__upload_input_extra_files(input_file) + + def __upload_input_file(self, input_file): + if self.job_inputs.path_referenced(input_file): + input_upload_response = self.client.upload_input(input_file) + self.file_renames[input_file] = input_upload_response['path'] + + def __upload_input_extra_files(self, input_file): + # TODO: Determine if this is object store safe and what needs to be + # done if it is not. + files_path = "%s_files" % input_file[0:-len(".dat")] + if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path): + for extra_file in os.listdir(files_path): + extra_file_path = os.path.join(files_path, extra_file) + relative_path = os.path.basename(files_path) + extra_file_relative_path = os.path.join(relative_path, extra_file) + response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path) + self.file_renames[extra_file_path] = response['path'] + + def __upload_working_directory_files(self): + # Task manager stages files into working directory, these need to be + # uploaded if present. + for working_directory_file in os.listdir(self.working_directory): + path = os.path.join(self.working_directory, working_directory_file) + working_file_response = self.client.upload_working_directory_file(path) + self.file_renames[path] = working_file_response['path'] + + def __initialize_output_file_renames(self): + for output_file in self.output_files: + self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory, + self.remote_path_separator, + os.path.basename(output_file)) + + def __initialize_task_output_file_renames(self): + for output_file in self.output_files: + name = os.path.basename(output_file) + self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + name) + + def __initialize_config_file_renames(self): + for config_file in self.config_files: + self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory, + self.remote_path_separator, + os.path.basename(config_file)) + + def __rewrite_paths(self, contents): + new_contents = contents + for local_path, remote_path in self.file_renames.iteritems(): + new_contents = new_contents.replace(local_path, remote_path) + return new_contents + + def __handle_rewrites(self): + for local_path, remote_path in self.file_renames.iteritems(): + self.job_inputs.rewrite_paths(local_path, remote_path) + + def __upload_rewritten_config_files(self): + for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems(): + self.client.upload_config_file(config_file, new_config_contents) + + def get_rewritten_command_line(self): + """ + Returns the rewritten version of the command line to execute suitable + for remote host. + """ + return self.job_inputs.rewritten_command_line + + +class Client(object): + """ + Objects of this client class perform low-level communication with a remote LWR server. + + **Parameters** + + remote_host : str + Remote URL of the LWR server. + job_id : str + Galaxy job/task id. + private_key : str (optional) + Secret key the remote LWR server is configured with. + """ + + def __init__(self, remote_host, job_id, private_key=None): + if not remote_host.endswith("/"): + remote_host = remote_host + "/" + ## If we don't have an explicit private_key defined, check for + ## one embedded in the URL. A URL of the form + ## https://moo@cow:8913 will try to contact https://cow:8913 + ## with a private key of moo + private_key_format = "https?://(.*)@.*/?" + private_key_match = re.match(private_key_format, remote_host) + if not private_key and private_key_match: + private_key = private_key_match.group(1) + remote_host = remote_host.replace("%s@" % private_key, '', 1) + self.remote_host = remote_host + self.job_id = job_id + self.private_key = private_key + + def _url_open(self, request, data): + return urllib2.urlopen(request, data) + + def __build_url(self, command, args): + if self.private_key: + args["private_key"] = self.private_key + data = urllib.urlencode(args) + url = self.remote_host + command + "?" + data + return url + + def __raw_execute(self, command, args={}, data=None): + url = self.__build_url(command, args) + request = urllib2.Request(url=url, data=data) + response = self._url_open(request, data) + return response + + def __raw_execute_and_parse(self, command, args={}, data=None): + response = self.__raw_execute(command, args, data) + return simplejson.loads(response.read()) + + def __upload_file(self, action, path, name=None, contents=None): + input = open(path, 'rb') + try: + mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) + return self.__upload_contents(action, path, mmapped_input, name) + finally: + input.close() + + def __upload_contents(self, action, path, contents, name=None): + if not name: + name = os.path.basename(path) + args = {"job_id": self.job_id, "name": name} + return self.__raw_execute_and_parse(action, args, contents) + + def upload_tool_file(self, path): + """ + Upload a tool related file (e.g. wrapper) required to run job. + + **Parameters** + + path : str + Local path tool. + """ + return self.__upload_file("upload_tool_file", path) + + def upload_input(self, path): + """ + Upload input dataset to remote server. + + **Parameters** + + path : str + Local path of input dataset. + """ + return self.__upload_file("upload_input", path) + + def upload_extra_input(self, path, relative_name): + """ + Upload extra input file to remote server. + + **Parameters** + + path : str + Extra files path of input dataset corresponding to this input. + relative_name : str + Relative path of extra file to upload relative to inputs extra files path. + """ + return self.__upload_file("upload_extra_input", path, name=relative_name) + + def upload_config_file(self, path, contents): + """ + Upload a job's config file to the remote server. + + **Parameters** + + path : str + Local path to the original config file. + contents : str + Rewritten contents of the config file to upload. + """ + return self.__upload_contents("upload_config_file", path, contents) + + def upload_working_directory_file(self, path): + """ + Upload the supplied file (path) from a job's working directory + to remote server. + + **Parameters** + + path : str + Path to file to upload. + """ + return self.__upload_file("upload_working_directory_file", path) + + def _get_output_type(self, name): + return self.__raw_execute_and_parse("get_output_type", {"name": name, + "job_id": self.job_id}) + + def download_work_dir_output(self, source, working_directory, output_path): + """ + Download an output dataset specified with from_work_dir from the + remote server. + + **Parameters** + + source : str + Path in job's working_directory to find output in. + working_directory : str + Local working_directory for the job. + output_path : str + Full path to output dataset. + """ + output = open(output_path, "wb") + name = os.path.basename(source) + self.__raw_download_output(name, self.job_id, "work_dir", output) + + def download_output(self, path, working_directory): + """ + Download an output dataset from the remote server. + + **Parameters** + + path : str + Local path of the dataset. + working_directory : str + Local working_directory for the job. + """ + name = os.path.basename(path) + output_type = self._get_output_type(name) + if output_type == "direct": + output = open(path, "wb") + elif output_type == "task": + output = open(os.path.join(working_directory, name), "wb") + else: + raise Exception("No remote output found for dataset with path %s" % path) + self.__raw_download_output(name, self.job_id, output_type, output) + + def __raw_download_output(self, name, job_id, output_type, output_file): + response = self.__raw_execute("download_output", {"name": name, + "job_id": self.job_id, + "output_type": output_type}) + try: + while True: + buffer = response.read(1024) + if buffer == "": + break + output_file.write(buffer) + finally: + output_file.close() + + def launch(self, command_line): + """ + Run or queue up the execution of the supplied + `command_line` on the remote server. + + **Parameters** + + command_line : str + Command to execute. + """ + return self.__raw_execute("launch", {"command_line": command_line, + "job_id": self.job_id}) + + def kill(self): + """ + Cancel remote job, either removing from the queue or killing it. + """ + return self.__raw_execute("kill", {"job_id": self.job_id}) + + def wait(self): + """ + Wait for job to finish. + """ + while True: + complete_response = self.raw_check_complete() + if complete_response["complete"] == "true": + return complete_response + time.sleep(1) + + def raw_check_complete(self): + """ + Get check_complete response from the remote server. + """ + check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id": self.job_id}) + return check_complete_response + + def check_complete(self): + """ + Return boolean indicating whether the job is complete. + """ + return self.raw_check_complete()["complete"] == "true" + + def clean(self): + """ + Cleanup the remote job. + """ + self.__raw_execute("clean", {"job_id": self.job_id}) + + def setup(self): + """ + Setup remote LWR server to run this job. + """ + return self.__raw_execute_and_parse("setup", {"job_id": self.job_id}) + + +def _read(path): + """ + Utility method to quickly read small files (config files and tool + wrappers) into memory as strings. + """ + input = open(path, "r") + try: + return input.read() + finally: + input.close() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.