2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/a300bc54ca1f/ Changeset: a300bc54ca1f User: jmchilton Date: 2014-05-05 19:34:38 Summary: Backport CLI-manager infrastructure from LWR. I had previously reworked this CLI stuff to work with LWR - this changeset brings those changes back to Galaxy. This version has the advantage of using the job script module - the upshot is that GALAXY_SLOTS, job_conf env directives, and job metrics should all be usable now with the CLI runner. This version also has a local shell variant (to eliminate the use of SSH and run qsub directly on the same machine), a variant of the Torque executor to target slurm's torque compatibility executables (doesn't support -x), and some other small bug fixes. Affected #: 14 files diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/cli.py --- a/lib/galaxy/jobs/runners/cli.py +++ b/lib/galaxy/jobs/runners/cli.py @@ -3,18 +3,18 @@ """ import os -import time -import glob import logging from galaxy import model from galaxy.jobs import JobDestination from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from .util.cli import CliInterface, split_params log = logging.getLogger( __name__ ) __all__ = [ 'ShellJobRunner' ] + class ShellJobRunner( AsynchronousJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -25,53 +25,27 @@ """Start the job runner """ super( ShellJobRunner, self ).__init__( app, nworkers ) - self.cli_shells = None - self.cli_job_interfaces = None - self.__load_cli_plugins() - + self.cli_interface = CliInterface() self._init_monitor_thread() self._init_worker_threads() - def __load_cli_plugins(self): - def __load(module_path, d): - for file in glob.glob(os.path.join(os.path.join(os.getcwd(), 'lib', *module_path.split('.')), '*.py')): - if os.path.basename(file).startswith('_'): - continue - module_name = '%s.%s' % (module_path, os.path.basename(file).rsplit('.py', 1)[0]) - module = __import__(module_name) - for comp in module_name.split( "." )[1:]: - module = getattr(module, comp) - for name in module.__all__: - log.debug('Loaded cli plugin %s' % name) - d[name] = getattr(module, name) - - self.cli_shells = {} - self.cli_job_interfaces = {} - __load('galaxy.jobs.runners.cli_shell', self.cli_shells) - __load('galaxy.jobs.runners.cli_job', self.cli_job_interfaces) - def get_cli_plugins( self, shell_params, job_params ): - # load shell plugin - shell = self.cli_shells[shell_params['plugin']](**shell_params) - job_interface = self.cli_job_interfaces[job_params['plugin']](**job_params) - return shell, job_interface + return self.cli_interface.get_plugins( shell_params, job_params ) def url_to_destination( self, url ): params = {} - shell_params, job_params = url.split('/')[2:4] + shell_params, job_params = url.split( '/' )[ 2:4 ] # split 'foo=bar&baz=quux' into { 'foo' : 'bar', 'baz' : 'quux' } - shell_params = dict ( [ ( 'shell_' + k, v ) for k, v in [ kv.split('=', 1) for kv in shell_params.split('&') ] ] ) - job_params = dict ( [ ( 'job_' + k, v ) for k, v in [ kv.split('=', 1) for kv in job_params.split('&') ] ] ) + shell_params = dict( [ ( 'shell_' + k, v ) for k, v in [ kv.split( '=', 1 ) for kv in shell_params.split( '&' ) ] ] ) + job_params = dict( [ ( 'job_' + k, v ) for k, v in [ kv.split( '=', 1 ) for kv in job_params.split( '&' ) ] ] ) params.update( shell_params ) params.update( job_params ) - log.debug("Converted URL '%s' to destination runner=cli, params=%s" % (url, params)) + log.debug( "Converted URL '%s' to destination runner=cli, params=%s" % ( url, params ) ) # Create a dynamic JobDestination - return JobDestination(runner='cli', params=params) + return JobDestination( runner='cli', params=params ) def parse_destination_params( self, params ): - shell_params = dict((k.replace('shell_', '', 1), v) for k, v in params.items() if k.startswith('shell_')) - job_params = dict((k.replace('job_', '', 1), v) for k, v in params.items() if k.startswith('job_')) - return shell_params, job_params + return split_params( params ) def queue_job( self, job_wrapper ): """Create job script and submit it to the DRM""" @@ -93,8 +67,12 @@ # define job attributes ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper ) - # fill in the DRM's job run template - script = job_interface.get_job_template(ajs.output_file, ajs.error_file, ajs.job_name, job_wrapper, command_line, ajs.exit_code_file) + job_file_kwargs = job_interface.job_script_kwargs(ajs.output_file, ajs.error_file, ajs.job_name) + script = self.get_job_file( + job_wrapper, + exit_code_path=ajs.exit_code_file, + **job_file_kwargs + ) try: fh = file(ajs.job_file, "w") diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/cli_job/__init__.py --- a/lib/galaxy/jobs/runners/cli_job/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Base class for cli job plugins -""" - -class BaseJobExec(object): - def __init__(self, **params): - raise NotImplementedError() - def get_job_template(self, ofile, efile, job_name, job_wrapper, command_line, ecfile): - raise NotImplementedError() - def submit(self, script_file): - raise NotImplementedError() - def delete(self, job_id): - raise NotImplementedError() - def get_status(self, job_ids=None): - raise NotImplementedError() - def get_single_status(self, job_id): - raise NotImplementedError() - def parse_status(self, status, job_ids): - raise NotImplementedError() - def parse_single_status(self, status, job_id): - raise NotImplementedError() diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/cli_job/torque.py --- a/lib/galaxy/jobs/runners/cli_job/torque.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -Command-line interface to TORQUE PBS -""" - -import os -import logging - -from galaxy.model import Job -job_states = Job.states - -from galaxy.jobs.runners.cli_job import BaseJobExec - -log = logging.getLogger( __name__ ) - -__all__ = ('Torque',) - -try: - import xml.etree.cElementTree as et -except: - import xml.etree.ElementTree as et - -job_template = """#!/bin/sh -%s -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - export PYTHONPATH="$GALAXY_LIB" - fi -fi -%s -cd %s -%s -echo $? > %s -""" - -argmap = { 'destination' : '-q', - 'Execution_Time' : '-a', - 'Account_Name' : '-A', - 'Checkpoint' : '-c', - 'Error_Path' : '-e', - 'Group_List' : '-g', - 'Hold_Types' : '-h', - 'Join_Paths' : '-j', - 'Keep_Files' : '-k', - 'Resource_List' : '-l', - 'Mail_Points' : '-m', - 'Mail_Users' : '-M', - 'Job_Name' : '-N', - 'Output_Path' : '-o', - 'Priority' : '-p', - 'Rerunable' : '-r', - 'Shell_Path_List' : '-S', - 'job_array_request' : '-t', - 'User_List' : '-u', - 'Variable_List' : '-v' } - -class Torque(BaseJobExec): - def __init__(self, **params): - self.params = {} - for k, v in params.items(): - self.params[k] = v - - def get_job_template(self, ofile, efile, job_name, job_wrapper, command_line, ecfile): - pbsargs = { '-o' : ofile, - '-e' : efile, - '-N' : job_name } - for k, v in self.params.items(): - if k == 'plugin': - continue - try: - if not k.startswith('-'): - k = argmap[k] - pbsargs[k] = v - except: - log.warning('Unrecognized long argument passed to Torque CLI plugin: %s' % k) - template_pbsargs = '' - for k, v in pbsargs.items(): - template_pbsargs += '#PBS %s %s\n' % (k, v) - return job_template % (template_pbsargs, - job_wrapper.galaxy_lib_dir, - job_wrapper.get_env_setup_clause(), - os.path.abspath(job_wrapper.working_directory), - command_line, - ecfile) - - def submit(self, script_file): - return 'qsub %s' % script_file - - def delete(self, job_id): - return 'qdel %s' % job_id - - def get_status(self, job_ids=None): - return 'qstat -x' - - def get_single_status(self, job_id): - return 'qstat -f %s' % job_id - - def parse_status(self, status, job_ids): - # in case there's noise in the output, find the big blob 'o xml - tree = None - rval = {} - for line in status.strip().splitlines(): - try: - tree = et.fromstring(line.strip()) - assert tree.tag == 'Data' - break - except Exception, e: - tree = None - if tree is None: - log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status) - return None - else: - for job in tree.findall('Job'): - id = job.find('Job_Id').text - if id in job_ids: - state = job.find('job_state').text - # map PBS job states to Galaxy job states. - rval[id] = self.__get_job_state(state) - return rval - - def parse_single_status(self, status, job_id): - for line in status.splitlines(): - line = line.split(' = ') - if line[0] == 'job_state': - return line[1] - # no state found, job has exited - return job_states.OK - - def __get_job_state(self, state): - return { 'E' : job_states.RUNNING, - 'R' : job_states.RUNNING, - 'Q' : job_states.QUEUED, - 'C' : job_states.OK }.get(state, state) diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/cli_shell/__init__.py --- a/lib/galaxy/jobs/runners/cli_shell/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -""" -Base class for runners which execute commands via a shell -""" - -class BaseShellExec(object): - def __init__(self, *args, **kwargs): - raise NotImplementedError() - def copy(self, rcp_cmd, files, dest): - raise NotImplementedError() - def execute(self, cmd, persist=False, timeout=60): - raise NotImplementedError() diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/cli_shell/rsh.py --- a/lib/galaxy/jobs/runners/cli_shell/rsh.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -Interface for remote shell commands (rsh, rcp) and derivatives that use the same syntax (ssh, scp) -""" - -import logging -import os -import subprocess -import tempfile -import time - -from galaxy.util.bunch import Bunch -from galaxy.jobs.runners.cli_shell import BaseShellExec - -log = logging.getLogger( __name__ ) - -__all__ = ('RemoteShell', 'SecureShell', 'GlobusSecureShell') - -class RemoteShell(BaseShellExec): - def __init__(self, rsh='rsh', rcp='rcp', hostname=None, username=None, **kwargs): - self.rsh = rsh - self.rcp = rcp - self.hostname = hostname - self.username = username - self.sessions = {} - def copy(self, rcp_cmd, files, dest): - pass - def execute(self, cmd, persist=False, timeout=60): - # TODO: implement persistence - if self.username is None: - fullcmd = '%s %s %s' % (self.rsh, self.hostname, cmd) - else: - fullcmd = '%s -l %s %s %s' % (self.rsh, self.username, self.hostname, cmd) - # Read stdout to a tempfile in case it's large (>65K) - outf = tempfile.TemporaryFile() - p = subprocess.Popen(fullcmd, shell=True, stdin=None, stdout=outf, stderr=subprocess.PIPE) - # poll until timeout - for i in range(timeout/3): - r = p.poll() - if r is not None: - break - time.sleep(3) - else: - pid = int(p.pid) - for sig in (15, 9): - try: - os.kill(pid, sig) - time.sleep(3) - except: - log.warning('Killing pid %s (cmd: "%s") with signal %s failed' % (p.pid, fullcmd, sig)) - return Bunch(stdout='', stderr='Execution timed out', returncode=-1) - outf.seek(0) - return Bunch(stdout=outf.read(), stderr=p.stderr.read(), returncode=p.returncode) - - -class SecureShell(RemoteShell): - SSH_NEW_KEY_STRING = 'Are you sure you want to continue connecting' - def __init__(self, rsh='ssh', rcp='scp', **kwargs): - rsh += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' - rcp += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' - super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) - -class GlobusSecureShell(SecureShell): - def __init__(self, rsh='gsissh', rcp='gsiscp', **kwargs): - super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/__init__.py --- a/lib/galaxy/jobs/runners/util/__init__.py +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -3,5 +3,8 @@ processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the LWR. """ +from galaxy.util.bunch import Bunch -from galaxy.util.bunch import Bunch +from .kill import kill_pid + +__all__ = [kill_pid, Bunch] diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/__init__.py @@ -0,0 +1,56 @@ +""" +""" +from glob import glob +from os.path import basename, join +from os import getcwd + +DEFAULT_SHELL_PLUGIN = 'LocalShell' + + +class CliInterface(object): + """ + High-level interface for loading shell and job plugins and matching + them to specified parameters. + """ + + def __init__(self, code_dir='lib'): + """ + """ + def __load(module_path, d): + module_pattern = join(join(getcwd(), code_dir, *module_path.split('.')), '*.py') + for file in glob(module_pattern): + if basename(file).startswith('_'): + continue + module_name = '%s.%s' % (module_path, basename(file).rsplit('.py', 1)[0]) + module = __import__(module_name) + for comp in module_name.split(".")[1:]: + module = getattr(module, comp) + for name in module.__all__: + try: + d[name] = getattr(module, name) + except TypeError: + raise TypeError("Invalid type for name %s" % name) + + self.cli_shells = {} + self.cli_job_interfaces = {} + + module_prefix = self.__module__ + __load('%s.shell' % module_prefix, self.cli_shells) + __load('%s.job' % module_prefix, self.cli_job_interfaces) + + def get_plugins(self, shell_params, job_params): + """ + Return shell and job interface defined by and configured via + specified params. + """ + shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN) + job_plugin = job_params['plugin'] + shell = self.cli_shells[shell_plugin](**shell_params) + job_interface = self.cli_job_interfaces[job_plugin](**job_params) + return shell, job_interface + + +def split_params(params): + shell_params = dict((k.replace('shell_', '', 1), v) for k, v in params.items() if k.startswith('shell_')) + job_params = dict((k.replace('job_', '', 1), v) for k, v in params.items() if k.startswith('job_')) + return shell_params, job_params diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/job/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/job/__init__.py @@ -0,0 +1,58 @@ +""" +Abstract base class for cli job plugins. +""" +from abc import ABCMeta, abstractmethod + + +class BaseJobExec(object): + __metaclass__ = ABCMeta + + @abstractmethod + def __init__(self, **params): + """ + Constructor for CLI job executor. + """ + + def job_script_kwargs(self, ofile, efile, job_name): + """ Return extra keyword argument for consumption by job script + module. + """ + return {} + + @abstractmethod + def submit(self, script_file): + """ + Given specified script_file path, yield command to submit it + to external job manager. + """ + + @abstractmethod + def delete(self, job_id): + """ + Given job id, return command to stop execution or dequeue specified + job. + """ + + @abstractmethod + def get_status(self, job_ids=None): + """ + Return command to get statuses of specified job ids. + """ + + @abstractmethod + def get_single_status(self, job_id): + """ + Return command to get the status of a single, specified job. + """ + + @abstractmethod + def parse_status(self, status, job_ids): + """ + Parse the statuses of output from get_status command. + """ + + @abstractmethod + def parse_single_status(self, status, job_id): + """ + Parse the status of output from get_single_status command. + """ diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/job/slurm_torque.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm_torque.py @@ -0,0 +1,29 @@ +import re +from .torque import Torque + +__all__ = ('SlurmTorque',) + + +class SlurmTorque(Torque): + """ A CLI job executor for Slurm's Torque compatibility mode. This differs + from real torque CLI in that -x command line is not available so job status + needs to be parsed from qstat table instead of XML. + """ + + def get_status(self, job_ids=None): + return 'qstat' + + def parse_status(self, status, job_ids): + rval = {} + for line in status.strip().splitlines(): + if line.startswith("Job ID"): + continue + line_parts = re.compile("\s+").split(line) + if len(line_parts) < 5: + continue + id = line_parts[0] + state = line_parts[4] + if id in job_ids: + # map PBS job states to Galaxy job states. + rval[id] = self._get_job_state(state) + return rval diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/job/torque.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -0,0 +1,120 @@ +try: + import xml.etree.cElementTree as et +except: + import xml.etree.ElementTree as et + +try: + from galaxy.model import Job + job_states = Job.states +except ImportError: + # Not in Galaxy, map Galaxy job states to LWR ones. + from galaxy.util import enum + job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') + +from ..job import BaseJobExec + +__all__ = ('Torque',) + +from logging import getLogger +log = getLogger(__name__) + +argmap = {'destination': '-q', + 'Execution_Time': '-a', + 'Account_Name': '-A', + 'Checkpoint': '-c', + 'Error_Path': '-e', + 'Group_List': '-g', + 'Hold_Types': '-h', + 'Join_Paths': '-j', + 'Keep_Files': '-k', + 'Resource_List': '-l', + 'Mail_Points': '-m', + 'Mail_Users': '-M', + 'Job_Name': '-N', + 'Output_Path': '-o', + 'Priority': '-p', + 'Rerunable': '-r', + 'Shell_Path_List': '-S', + 'job_array_request': '-t', + 'User_List': '-u', + 'Variable_List': '-v'} + + +class Torque(BaseJobExec): + + def __init__(self, **params): + self.params = {} + for k, v in params.items(): + self.params[k] = v + + def job_script_kwargs(self, ofile, efile, job_name): + pbsargs = {'-o': ofile, + '-e': efile, + '-N': job_name} + for k, v in self.params.items(): + if k == 'plugin': + continue + try: + if not k.startswith('-'): + k = argmap[k] + pbsargs[k] = v + except: + log.warning('Unrecognized long argument passed to Torque CLI plugin: %s' % k) + template_pbsargs = '' + for k, v in pbsargs.items(): + template_pbsargs += '#PBS %s %s\n' % (k, v) + return dict(headers=template_pbsargs) + + def submit(self, script_file): + return 'qsub %s' % script_file + + def delete(self, job_id): + return 'qdel %s' % job_id + + def get_status(self, job_ids=None): + return 'qstat -x' + + def get_single_status(self, job_id): + return 'qstat -f %s' % job_id + + def parse_status(self, status, job_ids): + # in case there's noise in the output, find the big blob 'o xml + tree = None + rval = {} + for line in status.strip().splitlines(): + try: + tree = et.fromstring(line.strip()) + assert tree.tag == 'Data' + break + except Exception: + tree = None + if tree is None: + log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status) + return None + else: + for job in tree.findall('Job'): + id = job.find('Job_Id').text + if id in job_ids: + state = job.find('job_state').text + # map PBS job states to Galaxy job states. + rval[id] = self._get_job_state(state) + return rval + + def parse_single_status(self, status, job_id): + for line in status.splitlines(): + line = line.split(' = ') + if line[0] == 'job_state': + return self._get_job_state(line[1].strip()) + # no state found, job has exited + return job_states.OK + + def _get_job_state(self, state): + try: + return { + 'E': job_states.RUNNING, + 'R': job_states.RUNNING, + 'Q': job_states.QUEUED, + 'C': job_states.OK + }.get(state) + except KeyError: + raise KeyError("Failed to map torque status code [%s] to job state." % state) diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/shell/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/shell/__init__.py @@ -0,0 +1,19 @@ +""" +Abstract base class for runners which execute commands via a shell. +""" +from abc import ABCMeta, abstractmethod + + +class BaseShellExec(object): + __metaclass__ = ABCMeta + + @abstractmethod + def __init__(self, *args, **kwargs): + """ + Constructor for shell executor instance. + """ + + def execute(self, cmd, persist=False, timeout=60): + """ + Execute the specified command via defined shell. + """ diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/shell/local.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/shell/local.py @@ -0,0 +1,61 @@ +from tempfile import TemporaryFile +from time import sleep +from subprocess import Popen, PIPE + +from ..shell import BaseShellExec +from ....util import Bunch, kill_pid + +from logging import getLogger +log = getLogger(__name__) + +TIMEOUT_ERROR_MESSAGE = u'Execution timed out' +TIMEOUT_RETURN_CODE = -1 +DEFAULT_TIMEOUT = 60 +DEFAULT_TIMEOUT_CHECK_INTERVAL = 3 + + +class LocalShell(BaseShellExec): + """ + + >>> shell = LocalShell() + >>> def exec_python(script, **kwds): return shell.execute('python -c "%s"' % script, **kwds) + >>> exec_result = exec_python("from __future__ import print_function; print('Hello World')") + >>> exec_result.stderr == u'' + True + >>> exec_result.stdout.strip() == u'Hello World' + True + >>> exec_result = exec_python("import time; time.sleep(90)", timeout=3, timeout_check_interval=1) + >>> exec_result.stdout == u'' + True + >>> exec_result.stderr == 'Execution timed out' + True + >>> exec_result.returncode == TIMEOUT_RETURN_CODE + True + """ + + def __init__(self, **kwds): + pass + + def execute(self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds): + outf = TemporaryFile() + p = Popen(cmd, shell=True, stdin=None, stdout=outf, stderr=PIPE) + # poll until timeout + + for i in range(int(timeout / timeout_check_interval)): + r = p.poll() + if r is not None: + break + sleep(timeout_check_interval) + else: + kill_pid(p.pid) + return Bunch(stdout=u'', stderr=TIMEOUT_ERROR_MESSAGE, returncode=TIMEOUT_RETURN_CODE) + outf.seek(0) + return Bunch(stdout=_read_str(outf), stderr=_read_str(p.stderr), returncode=p.returncode) + + +def _read_str(stream): + contents = stream.read() + return contents.decode('UTF-8') if isinstance(contents, bytes) else contents + + +__all__ = ('LocalShell',) diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/cli/shell/rsh.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/shell/rsh.py @@ -0,0 +1,40 @@ +from .local import LocalShell + +from logging import getLogger +log = getLogger(__name__) + +__all__ = ('RemoteShell', 'SecureShell', 'GlobusSecureShell') + + +class RemoteShell(LocalShell): + + def __init__(self, rsh='rsh', rcp='rcp', hostname='localhost', username=None, **kwargs): + super(RemoteShell, self).__init__(**kwargs) + self.rsh = rsh + self.rcp = rcp + self.hostname = hostname + self.username = username + self.sessions = {} + + def execute(self, cmd, persist=False, timeout=60): + # TODO: implement persistence + if self.username is None: + fullcmd = '%s %s %s' % (self.rsh, self.hostname, cmd) + else: + fullcmd = '%s -l %s %s %s' % (self.rsh, self.username, self.hostname, cmd) + return super(RemoteShell, self).execute(fullcmd, persist, timeout) + + +class SecureShell(RemoteShell): + SSH_NEW_KEY_STRING = 'Are you sure you want to continue connecting' + + def __init__(self, rsh='ssh', rcp='scp', **kwargs): + rsh += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' + rcp += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' + super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) + + +class GlobusSecureShell(SecureShell): + + def __init__(self, rsh='gsissh', rcp='gsiscp', **kwargs): + super(GlobusSecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) diff -r b94b4da49bfdf4f204d0ad2dce878a37e6e4f3d3 -r a300bc54ca1fd0479ae94541f56ec7899f2e2c90 lib/galaxy/jobs/runners/util/kill.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/kill.py @@ -0,0 +1,65 @@ +import os +from platform import system +from time import sleep +from subprocess import Popen + +try: + from psutil import Process, NoSuchProcess +except ImportError: + """ Don't make psutil a strict requirement, but use if available. """ + Process = None + + +def kill_pid(pid, use_psutil=True): + if use_psutil and Process: + _psutil_kill_pid(pid) + else: + _stock_kill_pid(pid) + + +def _psutil_kill_pid(pid): + """ + http://stackoverflow.com/questions/1230669/subprocess-deleting-child-process... + """ + try: + parent = Process(pid) + for child in parent.get_children(recursive=True): + child.kill() + parent.kill() + except NoSuchProcess: + return + + +def _stock_kill_pid(pid): + is_windows = system() == 'Windows' + + if is_windows: + __kill_windows(pid) + else: + __kill_posix(pid) + + +def __kill_windows(pid): + try: + Popen("taskkill /F /T /PID %i" % pid, shell=True) + except Exception: + pass + + +def __kill_posix(pid): + def __check_pid(): + try: + os.kill(pid, 0) + return True + except OSError: + return False + + if __check_pid(): + for sig in [15, 9]: + try: + os.killpg(pid, sig) + except OSError: + return + sleep(1) + if not __check_pid(): + return https://bitbucket.org/galaxy/galaxy-central/commits/9eb2b42e21fe/ Changeset: 9eb2b42e21fe User: jmchilton Date: 2014-05-07 18:57:21 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #385) CLI Job Runner Enhancements Affected #: 14 files diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/cli.py --- a/lib/galaxy/jobs/runners/cli.py +++ b/lib/galaxy/jobs/runners/cli.py @@ -3,18 +3,18 @@ """ import os -import time -import glob import logging from galaxy import model from galaxy.jobs import JobDestination from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from .util.cli import CliInterface, split_params log = logging.getLogger( __name__ ) __all__ = [ 'ShellJobRunner' ] + class ShellJobRunner( AsynchronousJobRunner ): """ Job runner backed by a finite pool of worker threads. FIFO scheduling @@ -25,53 +25,27 @@ """Start the job runner """ super( ShellJobRunner, self ).__init__( app, nworkers ) - self.cli_shells = None - self.cli_job_interfaces = None - self.__load_cli_plugins() - + self.cli_interface = CliInterface() self._init_monitor_thread() self._init_worker_threads() - def __load_cli_plugins(self): - def __load(module_path, d): - for file in glob.glob(os.path.join(os.path.join(os.getcwd(), 'lib', *module_path.split('.')), '*.py')): - if os.path.basename(file).startswith('_'): - continue - module_name = '%s.%s' % (module_path, os.path.basename(file).rsplit('.py', 1)[0]) - module = __import__(module_name) - for comp in module_name.split( "." )[1:]: - module = getattr(module, comp) - for name in module.__all__: - log.debug('Loaded cli plugin %s' % name) - d[name] = getattr(module, name) - - self.cli_shells = {} - self.cli_job_interfaces = {} - __load('galaxy.jobs.runners.cli_shell', self.cli_shells) - __load('galaxy.jobs.runners.cli_job', self.cli_job_interfaces) - def get_cli_plugins( self, shell_params, job_params ): - # load shell plugin - shell = self.cli_shells[shell_params['plugin']](**shell_params) - job_interface = self.cli_job_interfaces[job_params['plugin']](**job_params) - return shell, job_interface + return self.cli_interface.get_plugins( shell_params, job_params ) def url_to_destination( self, url ): params = {} - shell_params, job_params = url.split('/')[2:4] + shell_params, job_params = url.split( '/' )[ 2:4 ] # split 'foo=bar&baz=quux' into { 'foo' : 'bar', 'baz' : 'quux' } - shell_params = dict ( [ ( 'shell_' + k, v ) for k, v in [ kv.split('=', 1) for kv in shell_params.split('&') ] ] ) - job_params = dict ( [ ( 'job_' + k, v ) for k, v in [ kv.split('=', 1) for kv in job_params.split('&') ] ] ) + shell_params = dict( [ ( 'shell_' + k, v ) for k, v in [ kv.split( '=', 1 ) for kv in shell_params.split( '&' ) ] ] ) + job_params = dict( [ ( 'job_' + k, v ) for k, v in [ kv.split( '=', 1 ) for kv in job_params.split( '&' ) ] ] ) params.update( shell_params ) params.update( job_params ) - log.debug("Converted URL '%s' to destination runner=cli, params=%s" % (url, params)) + log.debug( "Converted URL '%s' to destination runner=cli, params=%s" % ( url, params ) ) # Create a dynamic JobDestination - return JobDestination(runner='cli', params=params) + return JobDestination( runner='cli', params=params ) def parse_destination_params( self, params ): - shell_params = dict((k.replace('shell_', '', 1), v) for k, v in params.items() if k.startswith('shell_')) - job_params = dict((k.replace('job_', '', 1), v) for k, v in params.items() if k.startswith('job_')) - return shell_params, job_params + return split_params( params ) def queue_job( self, job_wrapper ): """Create job script and submit it to the DRM""" @@ -93,8 +67,12 @@ # define job attributes ajs = AsynchronousJobState( files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper ) - # fill in the DRM's job run template - script = job_interface.get_job_template(ajs.output_file, ajs.error_file, ajs.job_name, job_wrapper, command_line, ajs.exit_code_file) + job_file_kwargs = job_interface.job_script_kwargs(ajs.output_file, ajs.error_file, ajs.job_name) + script = self.get_job_file( + job_wrapper, + exit_code_path=ajs.exit_code_file, + **job_file_kwargs + ) try: fh = file(ajs.job_file, "w") diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/cli_job/__init__.py --- a/lib/galaxy/jobs/runners/cli_job/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Base class for cli job plugins -""" - -class BaseJobExec(object): - def __init__(self, **params): - raise NotImplementedError() - def get_job_template(self, ofile, efile, job_name, job_wrapper, command_line, ecfile): - raise NotImplementedError() - def submit(self, script_file): - raise NotImplementedError() - def delete(self, job_id): - raise NotImplementedError() - def get_status(self, job_ids=None): - raise NotImplementedError() - def get_single_status(self, job_id): - raise NotImplementedError() - def parse_status(self, status, job_ids): - raise NotImplementedError() - def parse_single_status(self, status, job_id): - raise NotImplementedError() diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/cli_job/torque.py --- a/lib/galaxy/jobs/runners/cli_job/torque.py +++ /dev/null @@ -1,135 +0,0 @@ -""" -Command-line interface to TORQUE PBS -""" - -import os -import logging - -from galaxy.model import Job -job_states = Job.states - -from galaxy.jobs.runners.cli_job import BaseJobExec - -log = logging.getLogger( __name__ ) - -__all__ = ('Torque',) - -try: - import xml.etree.cElementTree as et -except: - import xml.etree.ElementTree as et - -job_template = """#!/bin/sh -%s -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - export PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - export PYTHONPATH="$GALAXY_LIB" - fi -fi -%s -cd %s -%s -echo $? > %s -""" - -argmap = { 'destination' : '-q', - 'Execution_Time' : '-a', - 'Account_Name' : '-A', - 'Checkpoint' : '-c', - 'Error_Path' : '-e', - 'Group_List' : '-g', - 'Hold_Types' : '-h', - 'Join_Paths' : '-j', - 'Keep_Files' : '-k', - 'Resource_List' : '-l', - 'Mail_Points' : '-m', - 'Mail_Users' : '-M', - 'Job_Name' : '-N', - 'Output_Path' : '-o', - 'Priority' : '-p', - 'Rerunable' : '-r', - 'Shell_Path_List' : '-S', - 'job_array_request' : '-t', - 'User_List' : '-u', - 'Variable_List' : '-v' } - -class Torque(BaseJobExec): - def __init__(self, **params): - self.params = {} - for k, v in params.items(): - self.params[k] = v - - def get_job_template(self, ofile, efile, job_name, job_wrapper, command_line, ecfile): - pbsargs = { '-o' : ofile, - '-e' : efile, - '-N' : job_name } - for k, v in self.params.items(): - if k == 'plugin': - continue - try: - if not k.startswith('-'): - k = argmap[k] - pbsargs[k] = v - except: - log.warning('Unrecognized long argument passed to Torque CLI plugin: %s' % k) - template_pbsargs = '' - for k, v in pbsargs.items(): - template_pbsargs += '#PBS %s %s\n' % (k, v) - return job_template % (template_pbsargs, - job_wrapper.galaxy_lib_dir, - job_wrapper.get_env_setup_clause(), - os.path.abspath(job_wrapper.working_directory), - command_line, - ecfile) - - def submit(self, script_file): - return 'qsub %s' % script_file - - def delete(self, job_id): - return 'qdel %s' % job_id - - def get_status(self, job_ids=None): - return 'qstat -x' - - def get_single_status(self, job_id): - return 'qstat -f %s' % job_id - - def parse_status(self, status, job_ids): - # in case there's noise in the output, find the big blob 'o xml - tree = None - rval = {} - for line in status.strip().splitlines(): - try: - tree = et.fromstring(line.strip()) - assert tree.tag == 'Data' - break - except Exception, e: - tree = None - if tree is None: - log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status) - return None - else: - for job in tree.findall('Job'): - id = job.find('Job_Id').text - if id in job_ids: - state = job.find('job_state').text - # map PBS job states to Galaxy job states. - rval[id] = self.__get_job_state(state) - return rval - - def parse_single_status(self, status, job_id): - for line in status.splitlines(): - line = line.split(' = ') - if line[0] == 'job_state': - return line[1] - # no state found, job has exited - return job_states.OK - - def __get_job_state(self, state): - return { 'E' : job_states.RUNNING, - 'R' : job_states.RUNNING, - 'Q' : job_states.QUEUED, - 'C' : job_states.OK }.get(state, state) diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/cli_shell/__init__.py --- a/lib/galaxy/jobs/runners/cli_shell/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -""" -Base class for runners which execute commands via a shell -""" - -class BaseShellExec(object): - def __init__(self, *args, **kwargs): - raise NotImplementedError() - def copy(self, rcp_cmd, files, dest): - raise NotImplementedError() - def execute(self, cmd, persist=False, timeout=60): - raise NotImplementedError() diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/cli_shell/rsh.py --- a/lib/galaxy/jobs/runners/cli_shell/rsh.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -Interface for remote shell commands (rsh, rcp) and derivatives that use the same syntax (ssh, scp) -""" - -import logging -import os -import subprocess -import tempfile -import time - -from galaxy.util.bunch import Bunch -from galaxy.jobs.runners.cli_shell import BaseShellExec - -log = logging.getLogger( __name__ ) - -__all__ = ('RemoteShell', 'SecureShell', 'GlobusSecureShell') - -class RemoteShell(BaseShellExec): - def __init__(self, rsh='rsh', rcp='rcp', hostname=None, username=None, **kwargs): - self.rsh = rsh - self.rcp = rcp - self.hostname = hostname - self.username = username - self.sessions = {} - def copy(self, rcp_cmd, files, dest): - pass - def execute(self, cmd, persist=False, timeout=60): - # TODO: implement persistence - if self.username is None: - fullcmd = '%s %s %s' % (self.rsh, self.hostname, cmd) - else: - fullcmd = '%s -l %s %s %s' % (self.rsh, self.username, self.hostname, cmd) - # Read stdout to a tempfile in case it's large (>65K) - outf = tempfile.TemporaryFile() - p = subprocess.Popen(fullcmd, shell=True, stdin=None, stdout=outf, stderr=subprocess.PIPE) - # poll until timeout - for i in range(timeout/3): - r = p.poll() - if r is not None: - break - time.sleep(3) - else: - pid = int(p.pid) - for sig in (15, 9): - try: - os.kill(pid, sig) - time.sleep(3) - except: - log.warning('Killing pid %s (cmd: "%s") with signal %s failed' % (p.pid, fullcmd, sig)) - return Bunch(stdout='', stderr='Execution timed out', returncode=-1) - outf.seek(0) - return Bunch(stdout=outf.read(), stderr=p.stderr.read(), returncode=p.returncode) - - -class SecureShell(RemoteShell): - SSH_NEW_KEY_STRING = 'Are you sure you want to continue connecting' - def __init__(self, rsh='ssh', rcp='scp', **kwargs): - rsh += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' - rcp += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' - super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) - -class GlobusSecureShell(SecureShell): - def __init__(self, rsh='gsissh', rcp='gsiscp', **kwargs): - super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/__init__.py --- a/lib/galaxy/jobs/runners/util/__init__.py +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -3,5 +3,8 @@ processes and interfacing with job managers. This module should contain functionality shared between Galaxy and the LWR. """ +from galaxy.util.bunch import Bunch -from galaxy.util.bunch import Bunch +from .kill import kill_pid + +__all__ = [kill_pid, Bunch] diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/__init__.py @@ -0,0 +1,56 @@ +""" +""" +from glob import glob +from os.path import basename, join +from os import getcwd + +DEFAULT_SHELL_PLUGIN = 'LocalShell' + + +class CliInterface(object): + """ + High-level interface for loading shell and job plugins and matching + them to specified parameters. + """ + + def __init__(self, code_dir='lib'): + """ + """ + def __load(module_path, d): + module_pattern = join(join(getcwd(), code_dir, *module_path.split('.')), '*.py') + for file in glob(module_pattern): + if basename(file).startswith('_'): + continue + module_name = '%s.%s' % (module_path, basename(file).rsplit('.py', 1)[0]) + module = __import__(module_name) + for comp in module_name.split(".")[1:]: + module = getattr(module, comp) + for name in module.__all__: + try: + d[name] = getattr(module, name) + except TypeError: + raise TypeError("Invalid type for name %s" % name) + + self.cli_shells = {} + self.cli_job_interfaces = {} + + module_prefix = self.__module__ + __load('%s.shell' % module_prefix, self.cli_shells) + __load('%s.job' % module_prefix, self.cli_job_interfaces) + + def get_plugins(self, shell_params, job_params): + """ + Return shell and job interface defined by and configured via + specified params. + """ + shell_plugin = shell_params.get('plugin', DEFAULT_SHELL_PLUGIN) + job_plugin = job_params['plugin'] + shell = self.cli_shells[shell_plugin](**shell_params) + job_interface = self.cli_job_interfaces[job_plugin](**job_params) + return shell, job_interface + + +def split_params(params): + shell_params = dict((k.replace('shell_', '', 1), v) for k, v in params.items() if k.startswith('shell_')) + job_params = dict((k.replace('job_', '', 1), v) for k, v in params.items() if k.startswith('job_')) + return shell_params, job_params diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/job/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/job/__init__.py @@ -0,0 +1,58 @@ +""" +Abstract base class for cli job plugins. +""" +from abc import ABCMeta, abstractmethod + + +class BaseJobExec(object): + __metaclass__ = ABCMeta + + @abstractmethod + def __init__(self, **params): + """ + Constructor for CLI job executor. + """ + + def job_script_kwargs(self, ofile, efile, job_name): + """ Return extra keyword argument for consumption by job script + module. + """ + return {} + + @abstractmethod + def submit(self, script_file): + """ + Given specified script_file path, yield command to submit it + to external job manager. + """ + + @abstractmethod + def delete(self, job_id): + """ + Given job id, return command to stop execution or dequeue specified + job. + """ + + @abstractmethod + def get_status(self, job_ids=None): + """ + Return command to get statuses of specified job ids. + """ + + @abstractmethod + def get_single_status(self, job_id): + """ + Return command to get the status of a single, specified job. + """ + + @abstractmethod + def parse_status(self, status, job_ids): + """ + Parse the statuses of output from get_status command. + """ + + @abstractmethod + def parse_single_status(self, status, job_id): + """ + Parse the status of output from get_single_status command. + """ diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/job/slurm_torque.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm_torque.py @@ -0,0 +1,29 @@ +import re +from .torque import Torque + +__all__ = ('SlurmTorque',) + + +class SlurmTorque(Torque): + """ A CLI job executor for Slurm's Torque compatibility mode. This differs + from real torque CLI in that -x command line is not available so job status + needs to be parsed from qstat table instead of XML. + """ + + def get_status(self, job_ids=None): + return 'qstat' + + def parse_status(self, status, job_ids): + rval = {} + for line in status.strip().splitlines(): + if line.startswith("Job ID"): + continue + line_parts = re.compile("\s+").split(line) + if len(line_parts) < 5: + continue + id = line_parts[0] + state = line_parts[4] + if id in job_ids: + # map PBS job states to Galaxy job states. + rval[id] = self._get_job_state(state) + return rval diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/job/torque.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -0,0 +1,120 @@ +try: + import xml.etree.cElementTree as et +except: + import xml.etree.ElementTree as et + +try: + from galaxy.model import Job + job_states = Job.states +except ImportError: + # Not in Galaxy, map Galaxy job states to LWR ones. + from galaxy.util import enum + job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') + +from ..job import BaseJobExec + +__all__ = ('Torque',) + +from logging import getLogger +log = getLogger(__name__) + +argmap = {'destination': '-q', + 'Execution_Time': '-a', + 'Account_Name': '-A', + 'Checkpoint': '-c', + 'Error_Path': '-e', + 'Group_List': '-g', + 'Hold_Types': '-h', + 'Join_Paths': '-j', + 'Keep_Files': '-k', + 'Resource_List': '-l', + 'Mail_Points': '-m', + 'Mail_Users': '-M', + 'Job_Name': '-N', + 'Output_Path': '-o', + 'Priority': '-p', + 'Rerunable': '-r', + 'Shell_Path_List': '-S', + 'job_array_request': '-t', + 'User_List': '-u', + 'Variable_List': '-v'} + + +class Torque(BaseJobExec): + + def __init__(self, **params): + self.params = {} + for k, v in params.items(): + self.params[k] = v + + def job_script_kwargs(self, ofile, efile, job_name): + pbsargs = {'-o': ofile, + '-e': efile, + '-N': job_name} + for k, v in self.params.items(): + if k == 'plugin': + continue + try: + if not k.startswith('-'): + k = argmap[k] + pbsargs[k] = v + except: + log.warning('Unrecognized long argument passed to Torque CLI plugin: %s' % k) + template_pbsargs = '' + for k, v in pbsargs.items(): + template_pbsargs += '#PBS %s %s\n' % (k, v) + return dict(headers=template_pbsargs) + + def submit(self, script_file): + return 'qsub %s' % script_file + + def delete(self, job_id): + return 'qdel %s' % job_id + + def get_status(self, job_ids=None): + return 'qstat -x' + + def get_single_status(self, job_id): + return 'qstat -f %s' % job_id + + def parse_status(self, status, job_ids): + # in case there's noise in the output, find the big blob 'o xml + tree = None + rval = {} + for line in status.strip().splitlines(): + try: + tree = et.fromstring(line.strip()) + assert tree.tag == 'Data' + break + except Exception: + tree = None + if tree is None: + log.warning('No valid qstat XML return from `qstat -x`, got the following: %s' % status) + return None + else: + for job in tree.findall('Job'): + id = job.find('Job_Id').text + if id in job_ids: + state = job.find('job_state').text + # map PBS job states to Galaxy job states. + rval[id] = self._get_job_state(state) + return rval + + def parse_single_status(self, status, job_id): + for line in status.splitlines(): + line = line.split(' = ') + if line[0] == 'job_state': + return self._get_job_state(line[1].strip()) + # no state found, job has exited + return job_states.OK + + def _get_job_state(self, state): + try: + return { + 'E': job_states.RUNNING, + 'R': job_states.RUNNING, + 'Q': job_states.QUEUED, + 'C': job_states.OK + }.get(state) + except KeyError: + raise KeyError("Failed to map torque status code [%s] to job state." % state) diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/shell/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/shell/__init__.py @@ -0,0 +1,19 @@ +""" +Abstract base class for runners which execute commands via a shell. +""" +from abc import ABCMeta, abstractmethod + + +class BaseShellExec(object): + __metaclass__ = ABCMeta + + @abstractmethod + def __init__(self, *args, **kwargs): + """ + Constructor for shell executor instance. + """ + + def execute(self, cmd, persist=False, timeout=60): + """ + Execute the specified command via defined shell. + """ diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/shell/local.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/shell/local.py @@ -0,0 +1,61 @@ +from tempfile import TemporaryFile +from time import sleep +from subprocess import Popen, PIPE + +from ..shell import BaseShellExec +from ....util import Bunch, kill_pid + +from logging import getLogger +log = getLogger(__name__) + +TIMEOUT_ERROR_MESSAGE = u'Execution timed out' +TIMEOUT_RETURN_CODE = -1 +DEFAULT_TIMEOUT = 60 +DEFAULT_TIMEOUT_CHECK_INTERVAL = 3 + + +class LocalShell(BaseShellExec): + """ + + >>> shell = LocalShell() + >>> def exec_python(script, **kwds): return shell.execute('python -c "%s"' % script, **kwds) + >>> exec_result = exec_python("from __future__ import print_function; print('Hello World')") + >>> exec_result.stderr == u'' + True + >>> exec_result.stdout.strip() == u'Hello World' + True + >>> exec_result = exec_python("import time; time.sleep(90)", timeout=3, timeout_check_interval=1) + >>> exec_result.stdout == u'' + True + >>> exec_result.stderr == 'Execution timed out' + True + >>> exec_result.returncode == TIMEOUT_RETURN_CODE + True + """ + + def __init__(self, **kwds): + pass + + def execute(self, cmd, persist=False, timeout=DEFAULT_TIMEOUT, timeout_check_interval=DEFAULT_TIMEOUT_CHECK_INTERVAL, **kwds): + outf = TemporaryFile() + p = Popen(cmd, shell=True, stdin=None, stdout=outf, stderr=PIPE) + # poll until timeout + + for i in range(int(timeout / timeout_check_interval)): + r = p.poll() + if r is not None: + break + sleep(timeout_check_interval) + else: + kill_pid(p.pid) + return Bunch(stdout=u'', stderr=TIMEOUT_ERROR_MESSAGE, returncode=TIMEOUT_RETURN_CODE) + outf.seek(0) + return Bunch(stdout=_read_str(outf), stderr=_read_str(p.stderr), returncode=p.returncode) + + +def _read_str(stream): + contents = stream.read() + return contents.decode('UTF-8') if isinstance(contents, bytes) else contents + + +__all__ = ('LocalShell',) diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/cli/shell/rsh.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/cli/shell/rsh.py @@ -0,0 +1,40 @@ +from .local import LocalShell + +from logging import getLogger +log = getLogger(__name__) + +__all__ = ('RemoteShell', 'SecureShell', 'GlobusSecureShell') + + +class RemoteShell(LocalShell): + + def __init__(self, rsh='rsh', rcp='rcp', hostname='localhost', username=None, **kwargs): + super(RemoteShell, self).__init__(**kwargs) + self.rsh = rsh + self.rcp = rcp + self.hostname = hostname + self.username = username + self.sessions = {} + + def execute(self, cmd, persist=False, timeout=60): + # TODO: implement persistence + if self.username is None: + fullcmd = '%s %s %s' % (self.rsh, self.hostname, cmd) + else: + fullcmd = '%s -l %s %s %s' % (self.rsh, self.username, self.hostname, cmd) + return super(RemoteShell, self).execute(fullcmd, persist, timeout) + + +class SecureShell(RemoteShell): + SSH_NEW_KEY_STRING = 'Are you sure you want to continue connecting' + + def __init__(self, rsh='ssh', rcp='scp', **kwargs): + rsh += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' + rcp += ' -oStrictHostKeyChecking=yes -oConnectTimeout=60' + super(SecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) + + +class GlobusSecureShell(SecureShell): + + def __init__(self, rsh='gsissh', rcp='gsiscp', **kwargs): + super(GlobusSecureShell, self).__init__(rsh=rsh, rcp=rcp, **kwargs) diff -r a014c2d841a8490f017086c687997a191f3a602f -r 9eb2b42e21fee519b1ea5e9c791bb6b8989f6b5c lib/galaxy/jobs/runners/util/kill.py --- /dev/null +++ b/lib/galaxy/jobs/runners/util/kill.py @@ -0,0 +1,65 @@ +import os +from platform import system +from time import sleep +from subprocess import Popen + +try: + from psutil import Process, NoSuchProcess +except ImportError: + """ Don't make psutil a strict requirement, but use if available. """ + Process = None + + +def kill_pid(pid, use_psutil=True): + if use_psutil and Process: + _psutil_kill_pid(pid) + else: + _stock_kill_pid(pid) + + +def _psutil_kill_pid(pid): + """ + http://stackoverflow.com/questions/1230669/subprocess-deleting-child-process... + """ + try: + parent = Process(pid) + for child in parent.get_children(recursive=True): + child.kill() + parent.kill() + except NoSuchProcess: + return + + +def _stock_kill_pid(pid): + is_windows = system() == 'Windows' + + if is_windows: + __kill_windows(pid) + else: + __kill_posix(pid) + + +def __kill_windows(pid): + try: + Popen("taskkill /F /T /PID %i" % pid, shell=True) + except Exception: + pass + + +def __kill_posix(pid): + def __check_pid(): + try: + os.kill(pid, 0) + return True + except OSError: + return False + + if __check_pid(): + for sig in [15, 9]: + try: + os.killpg(pid, sig) + except OSError: + return + sleep(1) + if not __check_pid(): + return Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.