![](https://secure.gravatar.com/avatar/80f5559d84eded8948e370d0267057c0.jpg?s=120&d=mm&r=g)
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/e241ff584048/ changeset: e241ff584048 user: natefoo date: 2012-07-02 19:05:08 summary: Create CLI job runner for submitting jobs via a (remote) shell. affected #: 13 files diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -89,9 +89,11 @@ self.__user_system_pwent = None self.__galaxy_system_pwent = None - def get_job_runner( self ): - job_runner = self.job_runner_mapper.get_job_runner( self.params ) - return job_runner + def get_job_runner_url( self ): + return self.job_runner_mapper.get_job_runner_url( self.params ) + + # legacy naming + get_job_runner = get_job_runner_url def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -55,7 +55,6 @@ # Contains new jobs. Note this is not used if track_jobs_in_database is True self.queue = Queue() # Contains jobs that are waiting (only use from monitor thread) - ## This and jobs_to_check[] are closest to a "Job Queue" self.waiting_jobs = [] # Helper for interruptable sleep self.sleeper = Sleeper() @@ -392,13 +391,12 @@ if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): runner_name = "tasks" else: - runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0] + runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0] return runner_name def put( self, job_wrapper ): try: runner_name = self.__get_runner_name( job_wrapper ) - log.debug( "Runner_name: " + runner_name ) if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper): #DBTODO Refactor log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -11,7 +11,7 @@ class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs - (in the form of job_wrappers) to job runner strings. + (in the form of job_wrappers) to job runner url strings. """ def __init__( self, job_wrapper ): @@ -103,7 +103,7 @@ return rule_module return None - def __expand_dynamic_job_runner( self, options_str ): + def __expand_dynamic_job_runner_url( self, options_str ): option_parts = options_str.split( '/' ) expand_type = option_parts[ 0 ] if expand_type == "python": @@ -113,18 +113,18 @@ else: raise Exception( "Unhandled dynamic job runner type specified - %s" % calculation_type ) - def __cache_job_runner( self, params ): - raw_job_runner = self.job_wrapper.tool.get_job_runner( params ) - if raw_job_runner.startswith( DYNAMIC_RUNNER_PREFIX ): - job_runner = self.__expand_dynamic_job_runner( raw_job_runner[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) + def __cache_job_runner_url( self, params ): + raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params ) + if raw_job_runner_url.startswith( DYNAMIC_RUNNER_PREFIX ): + job_runner_url = self.__expand_dynamic_job_runner_url( raw_job_runner_url[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) else: - job_runner = raw_job_runner - self.cached_job_runner = job_runner + job_runner_url = raw_job_runner_url + self.cached_job_runner_url = job_runner_url - def get_job_runner( self, params ): + def get_job_runner_url( self, params ): """ - Cache the job_runner string to avoid recalculation. + Cache the job_runner_url string to avoid recalculation. """ - if not hasattr( self, 'cached_job_runner' ): - self.__cache_job_runner( params ) - return self.cached_job_runner + if not hasattr( self, 'cached_job_runner_url' ): + self.__cache_job_runner_url( params ) + return self.cached_job_runner_url diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/cli.py --- /dev/null +++ b/lib/galaxy/jobs/runners/cli.py @@ -0,0 +1,380 @@ +""" +Job control via a command line interface (e.g. qsub/qstat), possibly over a remote connection (e.g. ssh). +""" + +import os +import time +import glob +import logging +import threading +import subprocess + +from Queue import Queue, Empty + +from galaxy import model +from galaxy.jobs.runners import BaseJobRunner + +log = logging.getLogger( __name__ ) + +__all__ = [ 'ShellJobRunner' ] + +class RunnerJobState( object ): + def __init__( self ): + """ + Encapsulates state related to a job that is being run and that we need to monitor. + """ + self.job_wrapper = None + self.external_job_id = None + self.old_state = None + self.running = False + self.job_file = None + self.ofile = None + self.efile = None + self.runner_url = None + +class ShellJobRunner( BaseJobRunner ): + """ + Job runner backed by a finite pool of worker threads. FIFO scheduling + """ + STOP_SIGNAL = object() + def __init__( self, app ): + """Initialize this job runner and start the monitor thread""" + # Check if drmaa was importable, fail if not + self.app = app + self.sa_session = app.model.context + self.remote_home_directory = None + # 'watched' and 'queue' are both used to keep track of jobs to watch. + # 'queue' is used to add new watched jobs, and can be called from + # any thread (usually by the 'queue_job' method). 'watched' must only + # be modified by the monitor thread, which will move items from 'queue' + # to 'watched' and then manage the watched jobs. + self.watched = [] + self.monitor_queue = Queue() + self.monitor_thread = threading.Thread( target=self.monitor ) + self.monitor_thread.start() + self.work_queue = Queue() + self.work_threads = [] + nworkers = app.config.cluster_job_queue_workers + + self.cli_shells = None + self.cli_job_interfaces = None + self.__load_cli_plugins() + + for i in range( nworkers ): + worker = threading.Thread( target=self.run_next ) + worker.start() + self.work_threads.append( worker ) + log.debug( "%d workers ready" % nworkers ) + + def __load_cli_plugins(self): + def __load(module_path, d): + for file in glob.glob(os.path.join(os.path.join(os.getcwd(), 'lib', *module_path.split('.')), '*.py')): + if os.path.basename(file).startswith('_'): + continue + module_name = '%s.%s' % (module_path, os.path.basename(file).rsplit('.py', 1)[0]) + module = __import__(module_name) + for comp in module_name.split( "." )[1:]: + module = getattr(module, comp) + for name in module.__all__: + log.debug('Loaded cli plugin %s' % name) + d[name] = getattr(module, name) + + self.cli_shells = {} + self.cli_job_interfaces = {} + __load('galaxy.jobs.runners.cli_shell', self.cli_shells) + __load('galaxy.jobs.runners.cli_job', self.cli_job_interfaces) + + def get_cli_plugins(self, runner_url): + shell_params, job_params = runner_url.split('/')[2:4] + # split 'foo=bar&baz=quux' into { 'foo' : 'bar', 'baz' : 'quux' } + shell_params = dict ( [ ( k, v ) for k, v in [ kv.split('=', 1) for kv in shell_params.split('&') ] ] ) + job_params = dict ( [ ( k, v ) for k, v in [ kv.split('=', 1) for kv in job_params.split('&') ] ] ) + # load shell plugin + shell = self.cli_shells[shell_params['plugin']](**shell_params) + job_interface = self.cli_job_interfaces[job_params['plugin']](**job_params) + return shell, job_interface + + def run_next( self ): + """ + Run the next item in the queue (a job waiting to run or finish ) + """ + while 1: + ( op, obj ) = self.work_queue.get() + if op is self.STOP_SIGNAL: + return + try: + if op == 'queue': + self.queue_job( obj ) + elif op == 'finish': + self.finish_job( obj ) + elif op == 'fail': + self.fail_job( obj ) + except: + log.exception( "Uncaught exception %sing job" % op ) + + def queue_job( self, job_wrapper ): + """Create job script and submit it to the DRM""" + try: + job_wrapper.prepare() + command_line = self.build_command_line( job_wrapper, include_metadata=True ) + except: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + return + + # This is silly, why would we queue a job with no command line? + if not command_line: + job_wrapper.finish( '', '' ) + return + + # Check for deletion before we change state + if job_wrapper.get_state() == model.Job.states.DELETED: + log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) + if self.app.config.cleanup_job in ( "always", "onsuccess" ): + job_wrapper.cleanup() + return + + # Get shell and job execution interface + runner_url = job_wrapper.get_job_runner_url() + shell, job_interface = self.get_cli_plugins(runner_url) + + # Change to queued state immediately + job_wrapper.change_state( model.Job.states.QUEUED ) + + # define job attributes + ofile = "%s.gjout" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) + efile = "%s.gjerr" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) + ecfile = "%s.gjec" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) + job_name = "g%s_%s_%s" % ( job_wrapper.job_id, job_wrapper.tool.id, job_wrapper.user ) + + # fill in the DRM's job run template + script = job_interface.get_job_template(ofile, efile, job_name, job_wrapper, command_line, ecfile) + script_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.get_id_tag()) + + try: + fh = file(script_file, "w") + fh.write(script) + fh.close() + except: + job_wrapper.fail("failure preparing job script", exception=True) + log.exception("failure running job %s" % job_wrapper.get_id_tag()) + return + + # job was deleted while we were preparing it + if job_wrapper.get_state() == model.Job.states.DELETED: + log.info("Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag()) + if self.app.config.cleanup_job in ("always", "onsuccess"): + job_wrapper.cleanup() + return + + # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. + galaxy_id_tag = job_wrapper.get_id_tag() + + log.debug("(%s) submitting file: %s" % (galaxy_id_tag, script_file )) + log.debug("(%s) command is: %s" % (galaxy_id_tag, command_line ) ) + + cmd_out = shell.execute(job_interface.submit(script_file)) + if cmd_out.returncode != 0: + log.error('(%s) submission failed (stdout): %s' % (galaxy_id_tag, cmd_out.stdout)) + log.error('(%s) submission failed (stderr): %s' % (galaxy_id_tag, cmd_out.stderr)) + job_wrapper.fail("failure submitting job") + return + external_job_id = cmd_out.stdout.strip() + if not external_job_id: + log.error('(%s) submission did not return a job identifier, failing job' % galaxy_id_tag) + job_wrapper.fail("failure submitting job") + return + + log.info("(%s) queued with identifier: %s" % ( galaxy_id_tag, external_job_id ) ) + + # store runner information for tracking if Galaxy restarts + job_wrapper.set_runner( runner_url, external_job_id ) + + # Store state information for job + runner_job_state = RunnerJobState() + runner_job_state.job_wrapper = job_wrapper + runner_job_state.external_job_id = external_job_id + runner_job_state.ofile = ofile + runner_job_state.efile = efile + runner_job_state.ecfile = ecfile + runner_job_state.job_file = script_file + runner_job_state.old_state = 'new' + runner_job_state.running = False + runner_job_state.runner_url = runner_url + + # Add to our 'queue' of jobs to monitor + self.monitor_queue.put( runner_job_state ) + + def monitor( self ): + """ + Watches jobs currently in the PBS queue and deals with state changes + (queued to running) and job completion + """ + while 1: + # Take any new watched jobs and put them on the monitor list + try: + while 1: + runner_job_state = self.monitor_queue.get_nowait() + if runner_job_state is self.STOP_SIGNAL: + # TODO: This is where any cleanup would occur + return + self.watched.append( runner_job_state ) + except Empty: + pass + # Iterate over the list of watched jobs and check state + try: + self.check_watched_items() + except: + log.exception('Uncaught exception checking job state:') + # Sleep a bit before the next state check + time.sleep( 15 ) + + def check_watched_items( self ): + """ + Called by the monitor thread to look at each watched job and deal + with state changes. + """ + new_watched = [] + + job_states = self.__get_job_states() + + for runner_job_state in self.watched: + external_job_id = runner_job_state.external_job_id + galaxy_job_id = runner_job_state.job_wrapper.job_id + old_state = runner_job_state.old_state + state = job_states.get(external_job_id, None) + if state is None: + log.debug("(%s/%s) job not found in batch state check" % ( galaxy_job_id, external_job_id ) ) + shell, job_interface = self.get_cli_plugins(runner_job_state.runner_url) + cmd_out = shell.execute(job_interface.get_single_status(external_job_id)) + state = job_interface.parse_single_status(cmd_out.stdout, external_job_id) + if state == model.Job.states.OK: + log.debug('(%s/%s) job execution finished, running job wrapper finish method' % ( galaxy_job_id, external_job_id ) ) + self.work_queue.put( ( 'finish', runner_job_state ) ) + continue + else: + log.warning('(%s/%s) job not found in batch state check, but found in individual state check' % ( galaxy_job_id, external_job_id ) ) + if state != old_state: + runner_job_state.job_wrapper.change_state( state ) + else: + if state != old_state: + log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, external_job_id, state ) ) + runner_job_state.job_wrapper.change_state( state ) + if state == model.Job.states.RUNNING and not runner_job_state.running: + runner_job_state.running = True + runner_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + runner_job_state.old_state = state + new_watched.append( runner_job_state ) + # Replace the watch list with the updated version + self.watched = new_watched + + def __get_job_states(self): + runner_urls = {} + job_states = {} + for runner_job_state in self.watched: + # remove any job plugin options from the runner URL since they should not affect doing a batch state check + runner_url = runner_job_state.runner_url.split('/') + job_params = runner_url[3] + job_params = dict ( [ ( k, v ) for k, v in [ kv.split('=', 1) for kv in job_params.split('&') ] ] ) + runner_url[3] = 'plugin=%s' % job_params['plugin'] + runner_url = '/'.join(runner_url) + # create the list of job ids to check for each runner url + if runner_job_state.runner_url not in runner_urls: + runner_urls[runner_job_state.runner_url] = [runner_job_state.external_job_id] + else: + runner_urls[runner_job_state.runner_url].append(runner_job_state.external_job_id) + # check each runner url for the listed job ids + for runner_url, job_ids in runner_urls.items(): + shell, job_interface = self.get_cli_plugins(runner_url) + cmd_out = shell.execute(job_interface.get_status(job_ids)) + assert cmd_out.returncode == 0, cmd_out.stderr + job_states.update(job_interface.parse_status(cmd_out.stdout, job_ids)) + return job_states + + def finish_job( self, runner_job_state ): + """ + Get the output/error for a finished job, pass to `job_wrapper.finish` + and cleanup all the DRM temporary files. + """ + ofile = runner_job_state.ofile + efile = runner_job_state.efile + ecfile = runner_job_state.ecfile + job_file = runner_job_state.job_file + # collect the output + # wait for the files to appear + which_try = 0 + while which_try < (self.app.config.retry_job_output_collection + 1): + try: + ofh = file(ofile, "r") + efh = file(efile, "r") + ecfh = file(ecfile, "r") + stdout = ofh.read( 32768 ) + stderr = efh.read( 32768 ) + exit_code = ecfh.read(32) + which_try = (self.app.config.retry_job_output_collection + 1) + except: + if which_try == self.app.config.retry_job_output_collection: + stdout = '' + stderr = 'Job output not returned from cluster' + log.debug( stderr ) + else: + time.sleep(1) + which_try += 1 + + try: + runner_job_state.job_wrapper.finish( stdout, stderr, exit_code ) + except: + log.exception("Job wrapper finish method failed") + + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( job_state.fail_message ) + + def put( self, job_wrapper ): + """Add a job to the queue (by job identifier)""" + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + self.work_queue.put( ( 'queue', job_wrapper ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the monitor thread""" + log.info( "sending stop signal to worker threads" ) + self.monitor_queue.put( self.STOP_SIGNAL ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( self.STOP_SIGNAL, None ) ) + log.info( "drmaa job runner stopped" ) + + def stop_job( self, job ): + """Attempts to delete a dispatched job""" + try: + shell, job_interface = self.get_cli_plugins( job.job_runner ) + cmd_out = shell.execute(job_interface.delete( job.job_runner_external_id )) + assert cmd_out.returncode == 0, cmd_out.stderr + log.debug( "(%s/%s) Terminated at user's request" % ( job.id, job.job_runner_external_id ) ) + except Exception, e: + log.debug( "(%s/%s) User killed running job, but error encountered during termination: %s" % ( job.id, job.job_runner_external_id, e ) ) + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + runner_job_state = RunnerJobState() + runner_job_state.ofile = "%s.gjout" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) + runner_job_state.efile = "%s.gjerr" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) + runner_job_state.ecfile = "%s.gjec" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) + runner_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.get_id_tag()) + runner_job_state.external_job_id = str( job.job_runner_external_id ) + job_wrapper.command_line = job.command_line + runner_job_state.job_wrapper = job_wrapper + runner_job_state.runner_url = job.job_runner_name + if job.state == model.Job.states.RUNNING: + log.debug( "(%s/%s) is still in running state, adding to the runner monitor queue" % ( job.id, job.job_runner_external_id ) ) + runner_job_state.old_state = model.Job.states.RUNNING + runner_job_state.running = True + self.monitor_queue.put( runner_job_state ) + elif job.state == model.Job.states.QUEUED: + log.debug( "(%s/%s) is still in queued state, adding to the runner monitor queue" % ( job.id, job.job_runner_external_id ) ) + runner_job_state.old_state = model.Job.states.QUEUED + runner_job_state.running = False + self.monitor_queue.put( runner_job_state ) diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/cli_job/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/cli_job/__init__.py @@ -0,0 +1,21 @@ +""" +Base class for cli job plugins +""" + +class BaseJobExec(object): + def __init__(self, **params): + raise NotImplementedError() + def get_job_template(self, ofile, efile, job_name, job_wrapper, command_line, ecfile): + raise NotImplementedError() + def submit(self, script_file): + raise NotImplementedError() + def delete(self, job_id): + raise NotImplementedError() + def get_status(self, job_ids=None): + raise NotImplementedError() + def get_single_status(self, job_id): + raise NotImplementedError() + def parse_status(self, status, job_ids): + raise NotImplementedError() + def parse_single_status(self, status, job_id): + raise NotImplementedError() diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/cli_job/torque.py --- /dev/null +++ b/lib/galaxy/jobs/runners/cli_job/torque.py @@ -0,0 +1,132 @@ +""" +Command-line interface to TORQUE PBS +""" + +import os +import logging + +from galaxy.model import Job +job_states = Job.states + +from galaxy.jobs.runners.cli_job import BaseJobExec + +log = logging.getLogger( __name__ ) + +__all__ = ('Torque',) + +try: + import xml.etree.cElementTree as et +except: + import xml.etree.ElementTree as et + +job_template = """#!/bin/sh +%s +GALAXY_LIB="%s" +if [ "$GALAXY_LIB" != "None" ]; then + if [ -n "$PYTHONPATH" ]; then + export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" + else + export PYTHONPATH="$GALAXY_LIB" + fi +fi +%s +cd %s +%s +echo $? > %s +""" + +argmap = { 'Execution_Time' : '-a', + 'Account_Name' : '-A', + 'Checkpoint' : '-c', + 'Error_Path' : '-e', + 'Group_List' : '-g', + 'Hold_Types' : '-h', + 'Join_Paths' : '-j', + 'Keep_Files' : '-k', + 'Resource_List' : '-l', + 'Mail_Points' : '-m', + 'Mail_Users' : '-M', + 'Job_Name' : '-N', + 'Output_Path' : '-o', + 'Priority' : '-p', + 'Rerunable' : '-r', + 'Shell_Path_List' : '-S', + 'job_array_request' : '-t', + 'User_List' : '-u', + 'Variable_List' : '-v' } + +class Torque(BaseJobExec): + def __init__(self, **params): + self.params = {} + for k, v in params.items(): + self.params[k] = v + + def get_job_template(self, ofile, efile, job_name, job_wrapper, command_line, ecfile): + pbsargs = { '-o' : ofile, + '-e' : efile, + '-N' : job_name } + for k, v in self.params.items(): + if k == 'plugin': + continue + try: + if not k.startswith('-'): + k = argmap[k] + pbsargs[k] = v + except: + log.warning('Unrecognized long argument passed to Torque CLI plugin: %s' % k) + template_pbsargs = '' + for k, v in pbsargs.items(): + template_pbsargs += '#PBS %s %s\n' % (k, v) + return job_template % (template_pbsargs, + job_wrapper.galaxy_lib_dir, + job_wrapper.get_env_setup_clause(), + os.path.abspath(job_wrapper.working_directory), + command_line, + ecfile) + + def submit(self, script_file): + return 'qsub %s' % script_file + + def delete(self, job_id): + return 'qdel %s' % job_id + + def get_status(self, job_ids=None): + return 'qstat -x' + + def get_single_status(self, job_id): + return 'qstat -f %s' % job_id + + def parse_status(self, status, job_ids): + # in case there's noise in the output, find the big blob 'o xml + tree = None + rval = {} + for line in status.strip().splitlines(): + try: + tree = et.fromstring(line.strip()) + assert tree.tag == 'Data' + break + except Exception, e: + tree = None + if tree is None: + log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status) + return None + else: + for job in tree.findall('Job'): + id = job.find('Job_Id').text + if id in job_ids: + state = job.find('job_state').text + # map PBS job states to Galaxy job states. + rval[id] = self.__get_job_state(state) + return rval + + def parse_single_status(self, status, job_id): + for line in status.splitlines(): + line = line.split(' = ') + if line[0] == 'job_state': + return line[1] + # no state found, job has exited + return job_states.OK + + def __get_job_state(self, state): + return { 'R' : job_states.RUNNING, + 'Q' : job_states.QUEUED }.get(state, state) diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/cli_shell/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/cli_shell/__init__.py @@ -0,0 +1,11 @@ +""" +Base class for runners which execute commands via a shell +""" + +class BaseShellExec(object): + def __init__(self, *args, **kwargs): + raise NotImplementedError() + def copy(self, rcp_cmd, files, dest): + raise NotImplementedError() + def execute(self, cmd, persist=False, timeout=60): + raise NotImplementedError() diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/cli_shell/rsh.py --- /dev/null +++ b/lib/galaxy/jobs/runners/cli_shell/rsh.py @@ -0,0 +1,59 @@ +""" +Interface for remote shell commands (rsh, rcp) and derivatives that use the same syntax (ssh, scp) +""" + +import time +import logging +import subprocess + +from galaxy.util.bunch import Bunch +from galaxy.jobs.runners.cli_shell import BaseShellExec + +log = logging.getLogger( __name__ ) + +__all__ = ('RemoteShell', 'SecureShell', 'GlobusSecureShell') + +class RemoteShell(BaseShellExec): + def __init__(self, rsh='rsh', rcp='rcp', hostname=None, username=None, **kwargs): + self.rsh = rsh + self.rcp = rcp + self.hostname = hostname + self.username = username + self.sessions = {} + def copy(self, rcp_cmd, files, dest): + pass + def execute(self, cmd, persist=False, timeout=60): + # TODO: implement persistence + if self.username is None: + fullcmd = '%s %s %s' % (self.rsh, self.hostname, cmd) + else: + fullcmd = '%s -l %s %s %s' % (self.rsh, self.username, self.hostname, cmd) + p = subprocess.Popen(fullcmd, shell=True, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + # poll until timeout + for i in range(timeout/3): + r = p.poll() + if r is not None: + break + time.sleep(3) + else: + pid = int(p.pid) + for sig in (15, 9): + try: + os.kill(pid, sig) + time.sleep(3) + except: + log.warning('Killing pid %s (cmd: "%s") with signal %s failed' % (p.pid, fullcmd, sig)) + return Bunch(stdout='', stderr='Execution timed out', returncode=-1) + return Bunch(stdout=p.stdout.read(), stderr=p.stderr.read(), returncode=p.returncode) + + +class SecureShell(RemoteShell): + SSH_NEW_KEY_STRING = 'Are you sure you want to continue connecting' + def __init__(self, rsh='ssh', rcp='scp', **kwargs): + rsh += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' + rcp += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' + super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) + +class GlobusSecureShell(SecureShell): + def __init__(self, rsh='gsissh', rcp='gsiscp', **kwargs): + super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -155,7 +155,7 @@ log.exception("failure running job %s" % job_wrapper.get_id_tag()) return - runner_url = job_wrapper.get_job_runner() + runner_url = job_wrapper.get_job_runner_url() # This is silly, why would we queue a job with no command line? if not command_line: @@ -404,7 +404,7 @@ drm_job_state.ecfile = "%s.drmec" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) drm_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job.id) drm_job_state.job_id = str( job.job_runner_external_id ) - drm_job_state.runner_url = job_wrapper.get_job_runner() + drm_job_state.runner_url = job_wrapper.get_job_runner_url() job_wrapper.command_line = job.command_line drm_job_state.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -236,7 +236,7 @@ return lwr_url def get_client_from_wrapper(self, job_wrapper): - return self.get_client( job_wrapper.get_job_runner(), job_wrapper.job_id ) + return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id ) def get_client(self, job_runner, job_id): lwr_url = self.determine_lwr_url( job_runner ) @@ -245,7 +245,7 @@ def run_job( self, job_wrapper ): stderr = stdout = command_line = '' - runner_url = job_wrapper.get_job_runner() + runner_url = job_wrapper.get_job_runner_url() try: job_wrapper.prepare() diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -213,7 +213,7 @@ log.exception("failure running job %d" % job_wrapper.job_id) return - runner_url = job_wrapper.get_job_runner() + runner_url = job_wrapper.get_job_runner_url() # This is silly, why would we queue a job with no command line? if not command_line: @@ -623,7 +623,7 @@ pbs_job_state.ecfile = "%s/%s.ec" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.job_id = str( job.job_runner_external_id ) - pbs_job_state.runner_url = job_wrapper.get_job_runner() + pbs_job_state.runner_url = job_wrapper.get_job_runner_url() job_wrapper.command_line = job.command_line pbs_job_state.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/jobs/runners/sge.py --- a/lib/galaxy/jobs/runners/sge.py +++ b/lib/galaxy/jobs/runners/sge.py @@ -164,7 +164,7 @@ log.exception("failure running job %d" % job_wrapper.job_id) return - runner_url = job_wrapper.get_job_runner() + runner_url = job_wrapper.get_job_runner_url() # This is silly, why would we queue a job with no command line? if not command_line: @@ -377,7 +377,7 @@ sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) sge_job_state.job_id = str( job.job_runner_external_id ) - sge_job_state.runner_url = job_wrapper.get_job_runner() + sge_job_state.runner_url = job_wrapper.get_job_runner_url() job_wrapper.command_line = job.command_line sge_job_state.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: diff -r 05f25cefa1751a165d76882919ba3f7ecafe0bb1 -r e241ff58404830172524fb099fee7b5d7324f026 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -846,7 +846,7 @@ if match: available_configs.append( config ) return random.choice( available_configs )[ key ] - def get_job_runner( self, job_params=None ): + def get_job_runner_url( self, job_params=None ): return self.__get_job_run_config( self.job_runners, key='url', job_params=job_params ) def get_job_handler( self, job_params=None ): return self.__get_job_run_config( self.job_handlers, key='name', job_params=job_params ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.