commit/galaxy-central: 3 new changesets
3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/37c0fd353dd7/ changeset: 37c0fd353dd7 user: natefoo date: 2013-02-07 21:31:09 summary: Define and implement job destinations, an improved method for defining resources on which Galaxy will run jobs. Functionality is similar to the current system configured via "URLs" in the Galaxy config file, but job concurrency limits on a per-destination basis works (whereas per-URL limits were broken). Sample configs for the new destination system can be found in job_conf.xml.sample_*. Also, renamed ClusterJobRunner to AsynchronousJobRunner, moved worker threads to BaseJobRunner and significantly expanded both of these classes. Updated the tasks, local, lwr, and pbs runners to use destinations, and to inherit from AsynchronousJobRunner, drmaa coming shortly. This should be entirely backwards-compatible, so if you update to this revision and change nothing, your jobs should all still run exactly as they have before. Regardless, it is not recommended that you upgrade your Galaxy instance with jobs running unless you are willing to risk losing those jobs. affected #: 25 files diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf job_conf.xml.sample_advanced --- /dev/null +++ b/job_conf.xml.sample_advanced @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<job_conf> + <plugins workers="4"> + <!-- "workers" is the number of threads for the runner's work queue. + The default from <plugins> is used if not defined for a <plugin>. + --> + <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/> + <plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/> + <plugin id="gridengine" type="runner" load="galaxy.jobs.runners.drmaa:DRMAARunner"/> + </plugins> + <handlers default="handlers"> + <!-- Additional job handlers - the id should match the name of a + [server:<id>] in universe_wsgi.ini. + --> + <handler id="handler0" tags="handlers"/> + <handler id="handler1" tags="handlers"/> + <handler id="special_handler0" tags="special_handlers"/> + <handler id="special_handler1" tags="special_handlers"/> + <handler id="trackster_handler"/> + </handlers> + <destinations default="local"> + <!-- Destinations define details about remote resources and how jobs + should be executed on those remote resources. + --> + <destination id="local" runner="local"/> + <destination id="pbs" runner="pbs" tags="mycluster"/> + <destination id="pbs_longjobs" runner="pbs" tags="mycluster,longjobs"> + <!-- Define parameters that are native to the job runner plugin. --> + <param id="Execution_Time">72:00:00</param> + </destination> + <destination id="remote_cluster" runner="drmaa" tags="longjobs"/> + <destination id="real_user_cluster" runner="drmaa"> + <!-- TODO: The real user options should maybe not be considered runner params. --> + <param id="galaxy_external_runjob_script">scripts/drmaa_external_runner.py</param> + <param id="galaxy_external_killjob_script">scripts/drmaa_external_killer.py</param> + <param id="galaxy_external_chown_script">scripts/external_chown_script.py</param> + </destination> + <destination id="dynamic" runner="dynamic"> + <!-- A destination that represents a method in the dynamic runner. --> + <param id="type">python</param> + <param id="function">foo</param> + </destination> + </destinations> + <tools> + <!-- Tools can be configured to use specific destinations or handlers, + identified by either the "id" or "tags" attribute. If assigned to + a tag, a handler or destination that matches that tag will be + chosen at random. + --> + <tool id="foo" handler="trackster_handler"> + <param id="source">trackster</param> + </tool> + <tool id="bar" destination="dynamic"/> + <tool id="baz" handler="special_handlers" destination="bigmem"/> + </tools> + <limits> + <!-- Certain limits can be defined. --> + <limit type="registered_user_concurrent_jobs">2</limit> + <limit type="unregistered_user_concurrent_jobs">1</limit> + <limit type="job_walltime">24:00:00</limit> + <limit type="concurrent_jobs" id="local">1</limit> + <limit type="concurrent_jobs" tag="mycluster">2</limit> + <limit type="concurrent_jobs" tag="longjobs">1</limit> + </limits> +</job_conf> diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf job_conf.xml.sample_basic --- /dev/null +++ b/job_conf.xml.sample_basic @@ -0,0 +1,13 @@ +<?xml version="1.0"?> +<!-- A sample job config that explicitly configures job running the way it is configured by default (if there is no explicit config). --> +<job_conf> + <plugins> + <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4"/> + </plugins> + <handlers> + <handler id="main"/> + </handlers> + <destinations> + <destination id="local" runner="local"/> + </destinations> +</job_conf> diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -86,6 +86,8 @@ self.tool_data_tables.load_from_config_file( config_filename=self.config.shed_tool_data_table_config, tool_data_path=self.tool_data_tables.tool_data_path, from_shed_config=True ) + # Initialize the job management configuration + self.job_config = jobs.JobConfiguration(self) # Initialize the tools, making sure the list of tool configs includes the reserved migrated_tools_conf.xml file. tool_configs = self.config.tool_configs if self.config.migrated_tools_config not in tool_configs: diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -91,6 +91,7 @@ self.collect_outputs_from = [ x.strip() for x in kwargs.get( 'collect_outputs_from', 'new_file_path,job_working_directory' ).lower().split(',') ] self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root ) self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root ) + self.job_config_file = resolve_path( kwargs.get( 'job_config_file', 'job_conf.xml' ), self.root ) self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) ) self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) ) self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") ) @@ -111,8 +112,8 @@ self.smtp_server = kwargs.get( 'smtp_server', None ) self.smtp_username = kwargs.get( 'smtp_username', None ) self.smtp_password = kwargs.get( 'smtp_password', None ) - self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', None ) - self.start_job_runners = kwargs.get( 'start_job_runners', None ) + self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', 'None' ) + self.start_job_runners = listify(kwargs.get( 'start_job_runners', '' )) self.expose_dataset_path = string_as_bool( kwargs.get( 'expose_dataset_path', 'False' ) ) # External Service types used in sample tracking self.external_service_type_config_file = resolve_path( kwargs.get( 'external_service_type_config_file', 'external_service_types_conf.xml' ), self.root ) @@ -123,8 +124,8 @@ # The transfer manager and deferred job queue self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) ) # Per-user Job concurrency limitations + self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) ) self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) ) - # user_job_limit for backwards-compatibility self.registered_user_job_limit = int( kwargs.get( 'registered_user_job_limit', self.user_job_limit ) ) self.anonymous_user_job_limit = int( kwargs.get( 'anonymous_user_job_limit', self.user_job_limit ) ) self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) @@ -214,28 +215,20 @@ # Crummy, but PasteScript does not give you a way to determine this if arg.lower().startswith('--server-name='): self.server_name = arg.split('=', 1)[-1] + # Store all configured server names + self.server_names = [] + for section in global_conf_parser.sections(): + if section.startswith('server:'): + self.server_names.append(section.replace('server:', '', 1)) # Store advanced job management config self.job_manager = kwargs.get('job_manager', self.server_name).strip() self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ] self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ] - # parse the [galaxy:job_limits] section - self.job_limits = {} - try: - job_limits = global_conf_parser.items( 'galaxy:job_limits' ) - for k, v in job_limits: - # Since the URL contains a colon and possibly an equals sign, consider ' = ' the delimiter - more_k, v = v.split(' = ', 1) - k = '%s:%s' % (k, more_k.strip()) - v = v.strip().rsplit(None, 1) - v[1] = int(v[1]) - self.job_limits[k] = v - except ConfigParser.NoSectionError: - pass - # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory) - if self.track_jobs_in_database is None or self.track_jobs_in_database == "None": - self.track_jobs_in_database = True - if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ): - self.track_jobs_in_database = False + # Use database for job running IPC unless this is a standalone server or explicitly set in the config + if self.track_jobs_in_database == 'None': + self.track_jobs_in_database = False + if len(self.server_names) > 1: + self.track_jobs_in_database = True else: self.track_jobs_in_database = string_as_bool( self.track_jobs_in_database ) # Store per-tool runner configs diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -6,13 +6,17 @@ import sys import pwd import time +import copy +import random import logging +import datetime import threading import traceback import subprocess import galaxy from galaxy import util, model +from galaxy.util.bunch import Bunch from galaxy.datatypes.tabular import * from galaxy.datatypes.interval import * # tabular/interval imports appear to be unused. Clean up? @@ -22,6 +26,7 @@ from galaxy.jobs.actions.post import ActionBox from galaxy.exceptions import ObjectInvalid from galaxy.jobs.mapper import JobRunnerMapper +from galaxy.jobs.runners import BaseJobRunner log = logging.getLogger( __name__ ) @@ -47,6 +52,486 @@ self.condition.notify() self.condition.release() +class JobDestination( Bunch ): + """ + Provides details about where a job runs + """ + def __init__(self, **kwds): + self['id'] = None + self['url'] = None + self['tags'] = None + self['runner'] = None + self['legacy'] = False + self['converted'] = False + # dict is appropriate (rather than a bunch) since keys may not be valid as attributes + self['params'] = dict() + super(JobDestination, self).__init__(**kwds) + + # Store tags as a list + if self.tags is not None: + self['tags'] = [ x.strip() for x in self.tags.split(',') ] + +class JobToolConfiguration( Bunch ): + """ + Provides details on what handler and destination a tool should use + + A JobToolConfiguration will have the required attribute 'id' and optional + attributes 'handler', 'destination', and 'params' + """ + def __init__(self, **kwds): + self['handler'] = None + self['destination'] = None + self['params'] = dict() + super(JobToolConfiguration, self).__init__(**kwds) + +class JobConfiguration( object ): + """A parser and interface to advanced job management features. + + These features are configured in the job configuration, by default, ``job_conf.xml`` + """ + DEFAULT_NWORKERS = 4 + def __init__(self, app): + """Parse the job configuration XML. + """ + self.app = app + self.runner_plugins = [] + self.handlers = {} + self.default_handler_id = None + self.destinations = {} + self.destination_tags = {} + self.default_destination_id = None + self.tools = {} + self.limits = Bunch() + + # Initialize the config + try: + tree = util.parse_xml(self.app.config.job_config_file) + self.__parse_job_conf_xml(tree) + except IOError: + log.warning( 'Job configuration "%s" does not exist, using legacy job configuration from Galaxy config file "%s" instead' % ( self.app.config.job_config_file, self.app.config.config_file ) ) + self.__parse_job_conf_legacy() + + def __parse_job_conf_xml(self, tree): + """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). + + :param tree: Object representing the root ``<job_conf>`` object in the job config file. + :type tree: ``xml.etree.ElementTree.Element`` + """ + root = tree.getroot() + log.debug('Loading job configuration from %s' % self.app.config.job_config_file) + + # Parse job plugins + plugins = root.find('plugins') + if plugins is not None: + for plugin in self.__findall_with_required(plugins, 'plugin', ('id', 'type', 'load')): + if plugin.get('type') == 'runner': + workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS)) + self.runner_plugins.append(dict(id=plugin.get('id'), load=plugin.get('load'), workers=int(workers))) + else: + log.error('Unknown plugin type: %s' % plugin.get('type')) + # Load tasks if configured + if self.app.config.use_tasked_jobs: + self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers)) + + # Parse handlers + handlers = root.find('handlers') + if handlers is not None: + for handler in self.__findall_with_required(handlers, 'handler'): + id = handler.get('id') + if id in self.handlers: + log.error("Handler '%s' overlaps handler with the same name, ignoring" % id) + else: + log.debug("Read definition for handler '%s'" % id) + self.handlers[id] = (id,) + if handler.get('tags', None) is not None: + for tag in [ x.strip() for x in handler.get('tags').split(',') ]: + if tag in self.handlers: + self.handlers[tag].append(id) + else: + self.handlers[tag] = [id] + + # Determine the default handler(s) + self.default_handler_id = self.__get_default(handlers, self.handlers.keys()) + + # Parse destinations + destinations = root.find('destinations') + for destination in self.__findall_with_required(destinations, 'destination', ('id', 'runner')): + id = destination.get('id') + job_destination = JobDestination(**dict(destination.items())) + job_destination['params'] = self.__get_params(destination) + self.destinations[id] = (job_destination,) + if job_destination.tags is not None: + for tag in job_destination.tags: + if tag not in self.destinations: + self.destinations[tag] = [] + self.destinations[tag].append(job_destination) + + # Determine the default destination + self.default_destination_id = self.__get_default(destinations, self.destinations.keys()) + + # Parse tool mappings + tools = root.find('tools') + if tools is not None: + for tool in self.__findall_with_required(tools, 'tool'): + # There can be multiple definitions with identical ids, but different params + id = tool.get('id') + if id not in self.tools: + self.tools[id] = list() + self.tools[id].append(JobToolConfiguration(**dict(tool.items()))) + self.tools[id][-1]['params'] = self.__get_params(tool) + + self.limits = Bunch(registered_user_concurrent_jobs = None, + anonymous_user_concurrent_jobs = None, + walltime = None, + walltime_delta = None, + output_size = None, + concurrent_jobs = {}) + + # Parse job limits + limits = root.find('limits') + if limits is not None: + for limit in self.__findall_with_required(limits, 'limit', ('type',)): + type = limit.get('type') + if type == 'concurrent_jobs': + id = limit.get('tag', None) or limit.get('id') + self.limits.concurrent_jobs[id] = int(limit.text) + elif limit.text: + self.limits.__dict__[type] = limit.text + + if self.limits.walltime is not None: + h, m, s = [ int( v ) for v in self.limits.walltime.split( ':' ) ] + self.limits.walltime_delta = datetime.timedelta( 0, s, 0, 0, m, h ) + + log.debug('Done loading job configuration') + + def __parse_job_conf_legacy(self): + """Loads the old-style job configuration from options in the galaxy config file (by default, universe_wsgi.ini). + """ + log.debug('Loading job configuration from %s' % self.app.config.config_file) + + # Always load local and lwr + self.runner_plugins = [dict(id='local', load='local', workers=self.app.config.local_job_queue_workers), dict(id='lwr', load='lwr', workers=self.app.config.cluster_job_queue_workers)] + # Load tasks if configured + if self.app.config.use_tasked_jobs: + self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers)) + for runner in self.app.config.start_job_runners: + self.runner_plugins.append(dict(id=runner, load=runner, workers=self.app.config.cluster_job_queue_workers)) + + # Set the handlers + for id in self.app.config.job_handlers: + self.handlers[id] = (id,) + + self.handlers['default_job_handlers'] = self.app.config.default_job_handlers + self.default_handler_id = 'default_job_handlers' + + # Set tool handler configs + for id, tool_handlers in self.app.config.tool_handlers.items(): + self.tools[id] = list() + for handler_config in tool_handlers: + # rename the 'name' key to 'handler' + handler_config['handler'] = handler_config.pop('name') + self.tools[id].append(JobToolConfiguration(**handler_config)) + + # Set tool runner configs + for id, tool_runners in self.app.config.tool_runners.items(): + # Might have been created in the handler parsing above + if id not in self.tools: + self.tools[id] = list() + for runner_config in tool_runners: + url = runner_config['url'] + if url not in self.destinations: + # Create a new "legacy" JobDestination - it will have its URL converted to a destination params once the appropriate plugin has loaded + self.destinations[url] = (JobDestination(id=url, runner=url.split(':', 1)[0], url=url, legacy=True, converted=False),) + for tool_conf in self.tools[id]: + if tool_conf.params == runner_config.get('params', {}): + tool_conf['destination'] = url + break + else: + # There was not an existing config (from the handlers section) with the same params + # rename the 'url' key to 'destination' + runner_config['destination'] = runner_config.pop('url') + self.tools[id].append(JobToolConfiguration(**runner_config)) + + self.destinations[self.app.config.default_cluster_job_runner] = (JobDestination(id=self.app.config.default_cluster_job_runner, runner=self.app.config.default_cluster_job_runner.split(':', 1)[0], url=self.app.config.default_cluster_job_runner, legacy=True, converted=False),) + self.default_destination_id = self.app.config.default_cluster_job_runner + + # Set the job limits + self.limits = Bunch(registered_user_concurrent_jobs = self.app.config.registered_user_job_limit, + anonymous_user_concurrent_jobs = self.app.config.anonymous_user_job_limit, + walltime = self.app.config.job_walltime, + walltime_delta = self.app.config.job_walltime_delta, + output_size = self.app.config.output_size_limit, + concurrent_jobs = {}) + + log.debug('Done loading job configuration') + + def __get_default(self, parent, names): + """Returns the default attribute set in a parent tag like <handlers> or <destinations>, or return the ID of the child, if there is no explicit default and only one child. + + :param parent: Object representing a tag that may or may not have a 'default' attribute. + :type parent: ``xml.etree.ElementTree.Element`` + :param names: The list of destination or handler IDs or tags that were loaded. + :type names: list of str + + :returns: str -- id or tag representing the default. + """ + rval = parent.get('default') + if rval is not None: + # If the parent element has a 'default' attribute, use the id or tag in that attribute + if rval not in names: + raise Exception("<%s> default attribute '%s' does not match a defined id or tag in a child element" % (parent.tag, rval)) + log.debug("<%s> default set to child with id or tag '%s'" % (parent.tag, rval)) + elif len(names) == 1: + log.info("Setting <%s> default to child with id '%s'" % (parent.tag, names[0])) + rval = names[0] + else: + raise Exception("No <%s> default specified, please specify a valid id or tag with the 'default' attribute" % parent.tag) + return rval + + def __findall_with_required(self, parent, match, attribs=None): + """Like ``xml.etree.ElementTree.Element.findall()``, except only returns children that have the specified attribs. + + :param parent: Parent element in which to find. + :type parent: ``xml.etree.ElementTree.Element`` + :param match: Name of child elements to find. + :type match: str + :param attribs: List of required attributes in children elements. + :type attribs: list of str + + :returns: list of ``xml.etree.ElementTree.Element`` + """ + rval = [] + if attribs is None: + attribs = ('id',) + for elem in parent.findall(match): + for attrib in attribs: + if attrib not in elem.attrib: + log.warning("required '%s' attribute is missing from <%s> element" % (attrib, match)) + break + else: + rval.append(elem) + return rval + + def __get_params(self, parent): + """Parses any child <param> tags in to a dictionary suitable for persistence. + + :param parent: Parent element in which to find child <param> tags. + :type parent: ``xml.etree.ElementTree.Element`` + + :returns: dict + """ + rval = {} + for param in parent.findall('param'): + rval[param.get('id')] = param.text + return rval + + @property + def default_job_tool_configuration(self): + """The default JobToolConfiguration, used if a tool does not have an explicit defintion in the configuration. It consists of a reference to the default handler and default destination. + + :returns: JobToolConfiguration -- a representation of a <tool> element that uses the default handler and destination + """ + return JobToolConfiguration(id='default', handler=self.default_handler_id, destination=self.default_destination_id) + + # Called upon instantiation of a Tool object + def get_job_tool_configurations(self, ids): + """Get all configured JobToolConfigurations for a tool ID, or, if given a list of IDs, the JobToolConfigurations for the first id in ``ids`` matching a tool definition. + + .. note:: + + You should not mix tool shed tool IDs, versionless tool shed IDs, and tool config tool IDs that refer to the same tool. + + :param ids: Tool ID or IDs to fetch the JobToolConfiguration of. + :type ids: list or str. + :returns: list -- JobToolConfiguration Bunches representing <tool> elements matching the specified ID(s). + + Example tool ID strings include: + + * Full tool shed id: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool/1.0.0`` + * Tool shed id less version: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool`` + * Tool config tool id: ``filter_tool`` + """ + rval = [] + # listify if ids is a single (string) id + ids = util.listify(ids) + for id in ids: + if id in self.tools: + # If a tool has definitions that include job params but not a + # definition for jobs without params, include the default + # config + for job_tool_configuration in self.tools[id]: + if not job_tool_configuration.params: + break + else: + rval.append(self.default_job_tool_configuration) + rval.extend(self.tools[id]) + break + else: + rval.append(self.default_job_tool_configuration) + return rval + + def __get_single_item(self, collection): + """Given a collection of handlers or destinations, return one item from the collection at random. + """ + # Done like this to avoid random under the assumption it's faster to avoid it + if len(collection) == 1: + return collection[0] + else: + return random.choice(collection) + + # This is called by Tool.get_job_handler() + def get_handler(self, id_or_tag): + """Given a handler ID or tag, return the provided ID or an ID matching the provided tag + + :param id_or_tag: A handler ID or tag. + :type id_or_tag: str + + :returns: str -- A valid job handler ID. + """ + return self.__get_single_item(self.handlers[id_or_tag]) + + def get_destination(self, id_or_tag): + """Given a destination ID or tag, return the JobDestination matching the provided ID or tag + + :param id_or_tag: A destination ID or tag. + :type id_or_tag: str + + :returns: JobDestination -- A valid destination + + Destinations are deepcopied as they are expected to be passed in to job + runners, which will modify them for persisting params set at runtime. + """ + return copy.deepcopy(self.__get_single_item(self.destinations[id_or_tag])) + + def get_destinations(self, id_or_tag): + """Given a destination ID or tag, return all JobDestinations matching the provided ID or tag + + :param id_or_tag: A destination ID or tag. + :type id_or_tag: str + + :returns: list or tuple of JobDestinations + + Destinations are not deepcopied, so they should not be passed to + anything which might modify them. + """ + return self.destinations.get(id_or_tag, None) + + def get_job_runner_plugins(self): + """Load all configured job runner plugins + + :returns: list of job runner plugins + """ + rval = {} + for runner in self.runner_plugins: + class_names = [] + module = None + id = runner['id'] + load = runner['load'] + if ':' in load: + # Name to load was specified as '<module>:<class>' + module_name, class_name = load.rsplit(':', 1) + class_names = [ class_name ] + module = __import__( module_name ) + else: + # Name to load was specified as '<module>' + if '.' not in load: + # For legacy reasons, try from galaxy.jobs.runners first if there's no '.' in the name + module_name = 'galaxy.jobs.runners.' + load + try: + module = __import__( module_name ) + except ImportError: + # No such module, we'll retry without prepending galaxy.jobs.runners. + # All other exceptions (e.g. something wrong with the module code) will raise + pass + if module is None: + # If the name included a '.' or loading from the static runners path failed, try the original name + module = __import__( load ) + module_name = load + if module is None: + # Module couldn't be loaded, error should have already been displayed + continue + for comp in module_name.split( "." )[1:]: + module = getattr( module, comp ) + if not class_names: + # If there's not a ':', we check <module>.__all__ for class names + try: + assert module.__all__ + class_names = module.__all__ + except AssertionError: + log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % load ) + continue + for class_name in class_names: + runner_class = getattr( module, class_name ) + try: + assert issubclass(runner_class, BaseJobRunner) + except TypeError: + log.warning("A non-class name was found in __all__, ignoring: %s" % id) + continue + except AssertionError: + log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__)) + continue + try: + rval[id] = runner_class( self.app, runner['workers'] ) + except TypeError: + log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) ) + rval[id] = runner_class( self.app ) + log.debug( "Loaded job runner '%s:%s' as '%s'" % ( module_name, class_name, id ) ) + return rval + + def is_id(self, collection): + """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID + + :param collection: A representation of a destination or handler + :type collection: tuple or list + + :returns: bool + """ + return type(collection) == tuple + + def is_tag(self, collection): + """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID + + :param collection: A representation of a destination or handler + :type collection: tuple or list + + :returns: bool + """ + return type(collection) == list + + def is_handler(self, server_name): + """Given a server name, indicate whether the server is a job handler + + :param server_name: The name to check + :type server_name: str + + :return: bool + """ + for collection in self.handlers.values(): + if server_name in collection: + return True + return False + + def convert_legacy_destinations(self, job_runners): + """Converts legacy (from a URL) destinations to contain the appropriate runner params defined in the URL. + + :param job_runners: All loaded job runner plugins. + :type job_runners: list of job runner plugins + """ + for id, destination in [ ( id, destinations[0] ) for id, destinations in self.destinations.items() if self.is_id(destinations) ]: + # Only need to deal with real destinations, not members of tags + if destination.legacy and not destination.converted: + if destination.runner in job_runners: + destination.params = job_runners[destination.runner].url_to_destination(destination.url).params + destination.converted = True + if destination.params: + log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url)) + for k, v in destination.params.items(): + log.debug(" %s: %s" % (k, v)) + else: + log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url)) + else: + log.warning("Legacy destination with id '%s' could not be converted: Unknown runner plugin: %s" % (id, destination.runner)) + class JobWrapper( object ): """ Wraps a 'model.Job' with convenience methods for running processes and @@ -81,7 +566,7 @@ self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) - self.job_runner_mapper = JobRunnerMapper( self, job.job_runner_name ) + self.job_runner_mapper = JobRunnerMapper( self, queue.dispatcher.url_to_destination ) self.params = None if job.params: self.params = from_json_string( job.params ) @@ -94,7 +579,8 @@ return self.app.config.use_tasked_jobs and self.tool.parallelism def get_job_runner_url( self ): - return self.job_runner_mapper.get_job_runner_url( self.params ) + log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id) + return self.job_destination.url def get_parallelism(self): return self.tool.parallelism @@ -102,6 +588,20 @@ # legacy naming get_job_runner = get_job_runner_url + @property + def job_destination(self): + """Return the JobDestination that this job will use to run. This will + either be a configured destination, a randomly selected destination if + the configured destination was a tag, or a dynamically generated + destination from the dynamic runner. + + Calling this method for the first time causes the dynamic runner to do + its calculation, if any. + + :returns: ``JobDestination`` + """ + return self.job_runner_mapper.get_job_destination(self.params) + def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) @@ -321,11 +821,24 @@ return job.state def set_runner( self, runner_url, external_id ): + log.warning('set_runner() is deprecated, use set_job_destination()') + self.set_job_destination(self.job_destination, external_id) + + def set_job_destination(self, job_destination, external_id): + """ + Persist job destination params in the database for recovery. + + self.job_destination is not used because a runner may choose to rewrite + parts of the destination (e.g. the params). + """ job = self.get_job() - self.sa_session.refresh( job ) - job.job_runner_name = runner_url + self.sa_session.refresh(job) + log.debug('(%s) Persisting job destination (destination id: %s)' % (job.id, job_destination.id)) + job.destination_id = job_destination.id + job.destination_params = job_destination.params + job.job_runner_name = job_destination.runner job.job_runner_external_id = external_id - self.sa_session.add( job ) + self.sa_session.add(job) self.sa_session.flush() def finish( self, stdout, stderr, tool_exit_code=None ): @@ -699,6 +1212,28 @@ except: log.exception( "Unable to cleanup job %d" % self.job_id ) + def get_output_sizes( self ): + sizes = [] + output_paths = self.get_output_fnames() + for outfile in [ str( o ) for o in output_paths ]: + if os.path.exists( outfile ): + sizes.append( ( outfile, os.stat( outfile ).st_size ) ) + else: + sizes.append( ( outfile, 0 ) ) + return sizes + + def check_limits(self, runtime=None): + if self.app.job_config.limits.output_size > 0: + for outfile, size in self.get_output_sizes(): + if size > self.app.config.output_size_limit: + log.warning( '(%s) Job output %s is over the output size limit' % ( self.get_id_tag(), os.path.basename( outfile ) ) ) + return 'Job output file grew too large (greater than %s), please try different inputs or parameters' % util.nice_size( self.app.job_config.limits.output_size ) + if self.app.job_config.limits.walltime_delta is not None and runtime is not None: + if runtime > self.app.job_config.limits.walltime_delta: + log.warning( '(%s) Job has reached walltime, it will be terminated' % ( self.get_id_tag() ) ) + return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime + return None + def get_command_line( self ): return self.command_line @@ -825,16 +1360,6 @@ return ExpressionContext( meta, job_context ) return job_context - def check_output_sizes( self ): - sizes = [] - output_paths = self.get_output_fnames() - for outfile in [ str( o ) for o in output_paths ]: - if os.path.exists( outfile ): - sizes.append( ( outfile, os.stat( outfile ).st_size ) ) - else: - sizes.append( ( outfile, 0 ) ) - return sizes - def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ): # extension could still be 'auto' if this is the upload tool. job = self.get_job() @@ -1148,16 +1673,6 @@ # Handled at the parent job level. Do nothing here. pass - def check_output_sizes( self ): - sizes = [] - output_paths = self.get_output_fnames() - for outfile in [ str( o ) for o in output_paths ]: - if os.path.exists( outfile ): - sizes.append( ( outfile, os.stat( outfile ).st_size ) ) - else: - sizes.append( ( outfile, 0 ) ) - return sizes - def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ): # There is no metadata setting for tasks. This is handled after the merge, at the job level. return "" diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -11,7 +11,7 @@ from sqlalchemy.sql.expression import and_, or_, select, func from galaxy import util, model -from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper +from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination log = logging.getLogger( __name__ ) @@ -51,6 +51,9 @@ self.sa_session = app.model.context self.track_jobs_in_database = self.app.config.track_jobs_in_database + # Initialize structures for handling job limits + self.__clear_user_job_count() + # Keep track of the pid that started the job manager, only it # has valid threads self.parent_pid = os.getpid() @@ -58,6 +61,8 @@ self.queue = Queue() # Contains jobs that are waiting (only use from monitor thread) self.waiting_jobs = [] + # Contains wrappers of jobs that are limited or ready (so they aren't created unnecessarily/multiple times) + self.job_wrappers = {} # Helper for interruptable sleep self.sleeper = Sleeper() self.running = True @@ -78,7 +83,7 @@ """ Checks all jobs that are in the 'new', 'queued' or 'running' state in the database and requeues or cleans up as necessary. Only run as the - job manager starts. + job handler starts. """ for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \ .filter( ( ( model.Job.state == model.Job.states.NEW ) \ @@ -88,17 +93,32 @@ if job.tool_id not in self.app.toolbox.tools_by_id: log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) - elif job.job_runner_name is None or (job.job_runner_name is not None and job.job_runner_external_id is None): - if job.job_runner_name is None: - log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) ) + if job.job_runner_name is not None and job.job_runner_external_id is None: + # This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner. + log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id ) + job.job_runner_name = None + if self.track_jobs_in_database: + job.state = model.Job.states.NEW else: - log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id ) + self.queue.put( ( job.id, job.tool_id ) ) + elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None: + # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist + # TODO: test me extensively + job_wrapper = JobWrapper( job, self ) + job_wrapper.set_job_destination(self.dispatcher.url_to_destination(self.job_runner_name)) + self.dispatcher.recover( job, job_wrapper ) + log.info('(%s) Converted job from a URL to a destination and recovered' % (job.id)) + elif job.job_runner_name is None: + # Never (fully) dispatched + log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) ) if self.track_jobs_in_database: job.state = model.Job.states.NEW else: self.queue.put( ( job.id, job.tool_id ) ) else: + # Already dispatched and running job_wrapper = JobWrapper( job, self ) + job_wrapper.job_runner_mapper.cached_job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params) self.dispatcher.recover( job, job_wrapper ) if self.sa_session.dirty: self.sa_session.flush() @@ -156,8 +176,6 @@ ~model.Job.table.c.id.in_(hda_not_ready), ~model.Job.table.c.id.in_(ldda_not_ready))) \ .order_by(model.Job.id).all() - # Ensure that we get new job counts on each iteration - self.__clear_user_job_count() else: # Get job objects and append to watch queue for any which were # previously waiting @@ -174,6 +192,8 @@ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) except Empty: pass + # Ensure that we get new job counts on each iteration + self.__clear_user_job_count() # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting_jobs = [] @@ -183,14 +203,13 @@ # Some of these states will only happen when using the in-memory job queue job_state = self.__check_if_ready_to_run( job ) if job_state == JOB_WAIT: - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) + new_waiting_jobs.append( job.id ) elif job_state == JOB_INPUT_ERROR: log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id ) elif job_state == JOB_INPUT_DELETED: log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id ) elif job_state == JOB_READY: - self.dispatcher.put( JobWrapper( job, self ) ) + self.dispatcher.put( self.job_wrappers.pop( job.id ) ) log.info( "(%d) Job dispatched" % job.id ) elif job_state == JOB_DELETED: log.info( "(%d) Job deleted by user while still queued" % job.id ) @@ -204,14 +223,20 @@ dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run" self.sa_session.add( dataset_assoc.dataset.dataset ) self.sa_session.add( job ) + elif job_state == JOB_ERROR: + log.error( "(%d) Error checking job readiness" % job.id ) else: log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) ) - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) + new_waiting_jobs.append( job.id ) except Exception: log.exception( "failure running job %d" % job.id ) # Update the waiting list - self.waiting_jobs = new_waiting_jobs + if not self.track_jobs_in_database: + self.waiting_jobs = new_waiting_jobs + # Remove cached wrappers for any jobs that are no longer being tracked + for id in self.job_wrappers.keys(): + if id not in new_waiting_jobs: + del self.job_wrappers[id] # Flush, if we updated the state self.sa_session.flush() # Done with the session @@ -239,19 +264,34 @@ continue # don't run jobs for which the input dataset was deleted if idata.deleted: - JobWrapper( job, self ).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) return JOB_INPUT_DELETED # an error in the input data causes us to bail immediately elif idata.state == idata.states.ERROR: - JobWrapper( job, self ).fail( "input data %s is in error state" % ( idata.hid ) ) + self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s is in error state" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state == idata.states.FAILED_METADATA: - JobWrapper( job, self ).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): # need to requeue return JOB_WAIT - state = self.__check_user_jobs( job ) + # Create the job wrapper so that the destination can be set + if job.id not in self.job_wrappers: + self.job_wrappers[job.id] = JobWrapper(job, self) + # Cause the job_destination to be set and cached by the mapper + try: + self.job_wrappers[job.id].job_destination + except Exception, e: + failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) + if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: + log.exception( 'Failed to generate job destination' ) + else: + log.debug( "Intentionally failing job with message (%s)" % failure_message ) + self.job_wrappers[job.id].fail( failure_message ) + return JOB_ERROR + # job is ready to run, check limits + state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: @@ -264,48 +304,114 @@ return state def __clear_user_job_count( self ): - self.user_job_count = {} - self.user_job_count_per_runner = {} + self.user_job_count = None + self.user_job_count_per_destination = None - def __check_user_jobs( self, job ): + def get_user_job_count(self, user_id): + self.__cache_user_job_count() + # This could have been incremented by a previous job dispatched on this iteration, even if we're not caching + rval = self.user_job_count.get(user_id, 0) + if not self.app.config.cache_user_job_count: + result = self.sa_session.execute(select([func.count(model.Job.table.c.id)]).where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id)))) + for row in result: + # there should only be one row + rval += row[0] + return rval + + def __cache_user_job_count( self ): + # Cache the job count if necessary + if self.user_job_count is None and self.app.config.cache_user_job_count: + self.user_job_count = {} + query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \ + .group_by(model.Job.table.c.user_id)) + for row in query: + self.user_job_count[row[0]] = row[1] + elif self.user_job_count is None: + self.user_job_count = {} + + def get_user_job_count_per_destination(self, user_id): + self.__cache_user_job_count_per_destination() + cached = self.user_job_count_per_destination.get(user_id, {}) + if self.app.config.cache_user_job_count: + rval = cached + else: + # The cached count is still used even when we're not caching, it is + # incremented when a job is run by this handler to ensure that + # multiple jobs can't get past the limits in one iteration of the + # queue. + rval = {} + rval.update(cached) + result = self.sa_session.execute(select([model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label('job_count')]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id))) \ + .group_by(model.Job.table.c.destination_id)) + for row in result: + # Add the count from the database to the cached count + rval[row['destination_id']] = rval.get(row['destination_id'], 0) + row['job_count'] + return rval + + def __cache_user_job_count_per_destination(self): + # Cache the job count if necessary + if self.user_job_count_per_destination is None and self.app.config.cache_user_job_count: + self.user_job_count_per_destination = {} + result = self.sa_session.execute(select([model.Job.table.c.user_id, model.Job.table.c.destination_id, func.count(model.Job.table.c.user_id).label('job_count')]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)))) \ + .group_by(model.Job.table.c.user_id, model.Job.table.c.destination_id)) + for row in result: + if row['user_id'] not in self.user_job_count_per_destination: + self.user_job_count_per_destination[row['user_id']] = {} + self.user_job_count_per_destination[row['user_id']][row['destination_id']] = row['job_count'] + elif self.user_job_count_per_destination is None: + self.user_job_count_per_destination = {} + + def increase_running_job_count(self, user_id, destination_id): + if self.user_job_count is None: + self.user_job_count = {} + if self.user_job_count_per_destination is None: + self.user_job_count_per_destination = {} + self.user_job_count[user_id] = self.user_job_count.get(user_id, 0) + 1 + if user_id not in self.user_job_count_per_destination: + self.user_job_count_per_destination[user_id] = {} + self.user_job_count_per_destination[user_id][destination_id] = self.user_job_count_per_destination[user_id].get(destination_id, 0) + 1 + + def __check_user_jobs( self, job, job_wrapper ): if job.user: # Check the hard limit first - if self.app.config.registered_user_job_limit: - # Cache the job count if necessary - if not self.user_job_count: - query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \ - .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \ - .group_by(model.Job.table.c.user_id)) - for row in query: - self.user_job_count[row[0]] = row[1] - if self.user_job_count.get(job.user_id, 0) >= self.app.config.registered_user_job_limit: + if self.app.job_config.limits.registered_user_concurrent_jobs: + count = self.get_user_job_count(job.user_id) + # Check the user's number of dispatched jobs against the overall limit + if count >= self.app.job_config.limits.registered_user_concurrent_jobs: return JOB_WAIT - # If we pass the hard limit, also check the per-runner count - if job.job_runner_name in self.app.config.job_limits: - # Cache the job count if necessary - if job.job_runner_name not in self.user_job_count_per_runner: - self.user_job_count_per_runner[job.job_runner_name] = {} - query_url, limit = self.app.config.job_limits[job.job_runner_name] - base_query = select([model.Job.table.c.user_id, model.Job.table.c.job_runner_name, func.count(model.Job.table.c.user_id).label('job_count')]) \ - .where(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))) \ - .group_by(model.Job.table.c.user_id, model.Job.table.c.job_runner_name) - if '%' in query_url or '_' in query_url: - subq = base_query.having(model.Job.table.c.job_runner_name.like(query_url)).alias('subq') - query = self.sa_session.execute(select([subq.c.user_id, func.sum(subq.c.job_count).label('job_count')]).group_by(subq.c.user_id)) - else: - query = self.sa_session.execute(base_query.having(model.Job.table.c.job_runner_name == query_url)) - for row in query: - self.user_job_count_per_runner[job.job_runner_name][row['user_id']] = row['job_count'] - if self.user_job_count_per_runner[job.job_runner_name].get(job.user_id, 0) >= self.app.config.job_limits[job.job_runner_name][1]: + # If we pass the hard limit, also check the per-destination count + id = job_wrapper.job_destination.id + count_per_id = self.get_user_job_count_per_destination(job.user_id) + if id in self.app.job_config.limits.concurrent_jobs: + count = count_per_id.get(id, 0) + # Check the user's number of dispatched jobs in the assigned destination id against the limit for that id + if count >= self.app.job_config.limits.concurrent_jobs[id]: return JOB_WAIT + # If we pass the destination limit (if there is one), also check limits on any tags (if any) + if job_wrapper.job_destination.tags: + for tag in job_wrapper.job_destination.tags: + # Check each tag for this job's destination + if tag in self.app.job_config.limits.concurrent_jobs: + # Only if there's a limit defined for this tag + count = 0 + for id in [ d.id for d in self.app.job_config.get_destinations(tag) ]: + # Add up the aggregate job total for this tag + count += count_per_id.get(id, 0) + if count >= self.app.job_config.limits.concurrent_jobs[tag]: + return JOB_WAIT + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, id) elif job.galaxy_session: # Anonymous users only get the hard limit - if self.app.config.anonymous_user_job_limit: + if self.app.job_config.limits.anonymous_user_concurrent_jobs: count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ .filter( and_( model.Job.session_id == job.galaxy_session.id, or_( model.Job.state == model.Job.states.RUNNING, model.Job.state == model.Job.states.QUEUED ) ) ).count() - if count >= self.app.config.anonymous_user_job_limit: + if count >= self.app.job_config.limits.anonymous_user_concurrent_jobs: return JOB_WAIT else: log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) @@ -431,58 +537,41 @@ class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app - self.job_runners = {} - start_job_runners = ["local", "lwr"] - if app.config.start_job_runners is not None: - start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] ) - if app.config.use_tasked_jobs: - start_job_runners.append("tasks") - for name in start_job_runners: - self._load_plugin( name ) - log.debug( "Job runners: " + ':'.join( start_job_runners ) ) - - def _load_plugin( self, name ): - module_name = 'galaxy.jobs.runners.' + name - try: - module = __import__( module_name ) - except: - log.exception( 'Job runner is not loadable: %s' % module_name ) - return - for comp in module_name.split( "." )[1:]: - module = getattr( module, comp ) - if '__all__' not in dir( module ): - log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name ) - return - for obj in module.__all__: - display_name = ':'.join( ( module_name, obj ) ) - runner = getattr( module, obj ) - self.job_runners[name] = runner( self.app ) - log.debug( 'Loaded job runner: %s' % display_name ) + self.job_runners = self.app.job_config.get_job_runner_plugins() + # Once plugins are loaded, all job destinations that were created from + # URLs can have their URL params converted to the destination's param + # dict by the plugin. + self.app.job_config.convert_legacy_destinations(self.job_runners) + log.debug( "Loaded job runners plugins: " + ':'.join( self.job_runners.keys() ) ) def __get_runner_name( self, job_wrapper ): if job_wrapper.can_split(): runner_name = "tasks" else: - runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0] + runner_name = job_wrapper.job_destination.runner return runner_name + def url_to_destination( self, url ): + """This is used by the runner mapper (a.k.a. dynamic runner) and + recovery methods to have runners convert URLs to destinations. + + New-style runner plugin IDs must match the URL's scheme for this to work. + """ + runner_name = url.split(':', 1)[0] + try: + return self.job_runners[runner_name].url_to_destination(url) + except Exception, e: + log.error("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e)) + return JobDestination(runner=runner_name) + def put( self, job_wrapper ): - try: - runner_name = self.__get_runner_name( job_wrapper ) - except Exception, e: - failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) - if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: - log.exception( 'Failed to generate job runner name' ) - else: - log.debug( "Intentionally failing job with message (%s)" % failure_message ) - job_wrapper.fail( failure_message ) - return + runner_name = self.__get_runner_name( job_wrapper ) try: if isinstance(job_wrapper, TaskWrapper): #DBTODO Refactor - log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) + log.debug( "(%s) Dispatching task %s to %s runner" %( job_wrapper.job_id, job_wrapper.task_id, runner_name ) ) else: - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + log.debug( "(%s) Dispatching to %s runner" %( job_wrapper.job_id, runner_name ) ) self.job_runners[runner_name].put( job_wrapper ) except KeyError: log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -26,257 +26,18 @@ """ def __init__( self, app ): self.app = app - self.job_handler = NoopHandler() - if self.app.config.server_name in self.app.config.job_handlers: + if (self.app.config.track_jobs_in_database and self.app.job_config.is_handler(self.app.config.server_name)) or not self.app.config.track_jobs_in_database: + log.debug("Starting job handler") self.job_handler = handler.JobHandler( app ) - if self.app.config.server_name == self.app.config.job_manager: - job_handler = NoopHandler() - # In the case that webapp == manager == handler, pass jobs in memory - if not self.app.config.track_jobs_in_database: - job_handler = self.job_handler - # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database - self.job_queue = JobManagerQueue( app, job_handler ) - self.job_stop_queue = JobManagerStopQueue( app, job_handler ) - if self.app.config.enable_beta_job_managers: - from galaxy.jobs.deferred import DeferredJobQueue - self.deferred_job_queue = DeferredJobQueue( app ) + self.job_queue = self.job_handler.job_queue + self.job_stop_queue = self.job_handler.job_stop_queue else: + self.job_handler = NoopHandler() self.job_queue = self.job_stop_queue = NoopQueue() self.job_handler.start() def shutdown( self ): - self.job_queue.shutdown() - self.job_stop_queue.shutdown() self.job_handler.shutdown() -class JobManagerQueue( object ): - """ - Job manager, waits for jobs to be runnable and then dispatches to a - JobHandler. - """ - STOP_SIGNAL = object() - def __init__( self, app, job_handler ): - self.app = app - self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory - - self.sa_session = app.model.context - self.job_lock = False - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.monitor_thread = threading.Thread( name="JobManagerQueue.monitor_thread", target=self.__monitor ) - self.monitor_thread.setDaemon( True ) - # Recover jobs at startup - self.__check_jobs_at_startup() - # Start the queue - self.monitor_thread.start() - log.info( "job manager queue started" ) - - def __check_jobs_at_startup( self ): - """ - Checks all jobs that are in the 'new', 'queued' or 'running' state in - the database and requeues or cleans up as necessary. Only run as the - job manager starts. - """ - for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( ( ( model.Job.state == model.Job.states.NEW ) \ - | ( model.Job.state == model.Job.states.RUNNING ) \ - | ( model.Job.state == model.Job.states.QUEUED ) ) \ - & ( model.Job.handler == None ) ): - if job.tool_id not in self.app.toolbox.tools_by_id: - log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) - JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) - else: - job.handler = self.__get_handler( job ) # handler's recovery method will take it from here - log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) ) - if self.sa_session.dirty: - self.sa_session.flush() - - def __monitor( self ): - """ - Continually iterate the waiting jobs and dispatch to a handler - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.__monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def __monitor_step( self ): - """ - Called repeatedly by `monitor` to process waiting jobs. Gets any new - jobs (either from the database or from its own queue), then assigns a - handler. - """ - # Do nothing if the queue is locked - if self.job_lock: - log.info( 'Job queue is administratively locked, sleeping...' ) - time.sleep( 10 ) - return - # Pull all new jobs from the queue at once - jobs_to_check = [] - if self.app.config.track_jobs_in_database: - # Clear the session so we get fresh states for job and all datasets - self.sa_session.expunge_all() - # Fetch all new jobs - jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( ( model.Job.state == model.Job.states.NEW ) \ - & ( model.Job.handler == None ) ).all() - else: - # Get job objects and append to watch queue for any which were - # previously waiting - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, tool_id = message - # Get the job object and append to watch queue - jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) - except Empty: - pass - - for job in jobs_to_check: - job.handler = self.__get_handler( job ) - job.job_runner_name = self.__get_runner_url( job ) - log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) ) - self.sa_session.add( job ) - - # If tracking in the database, handlers will pick up the job now - self.sa_session.flush() - - time.sleep( 5 ) - - # This only does something in the case that there is only one handler and it is this Galaxy process - for job in jobs_to_check: - self.job_handler.job_queue.put( job.id, job.tool_id ) - - def __get_handler( self, job ): - try: - params = None - if job.params: - params = from_json_string( job.params ) - return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params ) - except: - log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) ) - return random.choice( self.app.config.job_handlers ) - - def __get_runner_url( self, job ): - """This fetches the raw runner URL, and does not perform any computation e.g. for the dynamic runner""" - try: - return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_runner_url( job.params ) - except Exception, e: - log.warning( 'Unable to determine job runner URL for job %s: %s' % (job.id, str(e)) ) - return None - - def put( self, job_id, tool ): - """Add a job to the queue (by job identifier)""" - if not self.app.config.track_jobs_in_database: - self.queue.put( ( job_id, tool.id ) ) - self.sleeper.wake() - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - if not self.app.config.track_jobs_in_database: - self.queue.put( self.STOP_SIGNAL ) - self.sleeper.wake() - log.info( "job manager queue stopped" ) - -class JobManagerStopQueue( object ): - """ - A queue for jobs which need to be terminated prematurely. - """ - STOP_SIGNAL = object() - def __init__( self, app, job_handler ): - self.app = app - self.job_handler = job_handler - - self.sa_session = app.model.context - - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - - # Contains jobs that are waiting (only use from monitor thread) - self.waiting = [] - - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.monitor_thread = threading.Thread( name="JobManagerStopQueue.monitor_thread", target=self.monitor ) - self.monitor_thread.setDaemon( True ) - self.monitor_thread.start() - log.info( "job manager stop queue started" ) - - def monitor( self ): - """ - Continually iterate the waiting jobs, stop any that are found. - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def monitor_step( self ): - """ - Called repeatedly by `monitor` to stop jobs. - """ - jobs_to_check = [] - # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs) - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, error_msg = message - # Get the job object and append to watch queue - jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) ) - except Empty: - pass - - # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler. - for job, error_msg in jobs_to_check: - self.job_handler.job_stop_queue.put( job.id, error_msg ) - - def put( self, job_id, error_msg=None ): - self.queue.put( ( job_id, error_msg ) ) - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - if not self.app.config.track_jobs_in_database: - self.queue.put( self.STOP_SIGNAL ) - self.sleeper.wake() - log.info( "job manager stop queue stopped" ) - class NoopHandler( object ): def __init__( self, *args, **kwargs ): self.job_queue = NoopQueue() diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -6,7 +6,8 @@ import galaxy.jobs.rules -DYNAMIC_RUNNER_PREFIX = "dynamic:///" +DYNAMIC_RUNNER_NAME = "dynamic" +DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url" class JobMappingException( Exception ): @@ -20,9 +21,9 @@ (in the form of job_wrappers) to job runner url strings. """ - def __init__( self, job_wrapper, job_runner_name=None ): + def __init__( self, job_wrapper, url_to_destination ): self.job_wrapper = job_wrapper - self.job_runner_name = job_runner_name + self.url_to_destination = url_to_destination self.rule_modules = self.__get_rule_modules( ) def __get_rule_modules( self ): @@ -87,11 +88,23 @@ return expand_function( **actual_args ) - def __determine_expand_function_name( self, option_parts ): + def __convert_url_to_destination( self, url ): + """ + Job runner URLs are deprecated, but dynamic mapper functions may still + be returning them. Runners are expected to be able to convert these to + destinations. + + This method calls + JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn + calls the url_to_destination method for the appropriate runner. + """ + dest = self.url_to_destination( url ) + dest['id'] = DYNAMIC_DESTINATION_ID + return dest + + def __determine_expand_function_name( self, destination ): # default look for function with same name as tool, unless one specified - expand_function_name = self.job_wrapper.tool.id - if len( option_parts ) > 1: - expand_function_name = option_parts[ 1 ] + expand_function_name = destination.params.get('function', self.job_wrapper.tool.id) return expand_function_name def __get_expand_function( self, expand_function_name ): @@ -110,32 +123,57 @@ return rule_module return None - def __expand_dynamic_job_runner_url( self, options_str ): - option_parts = options_str.split( '/' ) - expand_type = option_parts[ 0 ] + def __handle_dynamic_job_destination( self, destination ): + expand_type = destination.params.get('type', None) if expand_type == "python": - expand_function_name = self.__determine_expand_function_name( option_parts ) + expand_function_name = self.__determine_expand_function_name( destination ) expand_function = self.__get_expand_function( expand_function_name ) - return self.__invoke_expand_function( expand_function ) + rval = self.__invoke_expand_function( expand_function ) + # TODO: test me extensively + if isinstance(rval, basestring): + # If the function returned a string, check if it's a URL, convert if necessary + if '://' in rval: + return self.__convert_url_to_destination(rval) + else: + return self.app.job_config.get_destination(rval) + elif isinstance(rval, galaxy.jobs.JobDestination): + # If the function generated a JobDestination, we'll use that + # destination directly. However, for advanced job limiting, a + # function may want to set the JobDestination's 'tags' + # attribute so that limiting can be done on a destination tag. + #id_or_tag = rval.get('id') + #if rval.get('tags', None): + # # functions that are generating destinations should only define one tag + # id_or_tag = rval.get('tags')[0] + #return id_or_tag, rval + return rval + else: + raise Exception( 'Dynamic function returned a value that could not be understood: %s' % rval ) + elif expand_type is None: + raise Exception( 'Dynamic function type not specified (hint: add <param id="type">python</param> to your <destination>)' ) else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) - def __cache_job_runner_url( self, params ): - # If there's already a runner set in the Job object, don't overwrite from the tool - if self.job_runner_name is not None and not self.job_runner_name.startswith('tasks'): - raw_job_runner_url = self.job_runner_name + def __cache_job_destination( self, params ): + raw_job_destination = self.job_wrapper.tool.get_job_destination( params ) + #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params ) + if raw_job_destination.runner == DYNAMIC_RUNNER_NAME: + job_destination = self.__handle_dynamic_job_destination( raw_job_destination ) else: - raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params ) - if raw_job_runner_url.startswith( DYNAMIC_RUNNER_PREFIX ): - job_runner_url = self.__expand_dynamic_job_runner_url( raw_job_runner_url[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) - else: - job_runner_url = raw_job_runner_url - self.cached_job_runner_url = job_runner_url + job_destination = raw_job_destination + #job_destination_id_or_tag = raw_job_destination_id_or_tag + self.cached_job_destination = job_destination + #self.cached_job_destination_id_or_tag = job_destination_id_or_tag - def get_job_runner_url( self, params ): + def get_job_destination( self, params ): """ - Cache the job_runner_url string to avoid recalculation. + Cache the job_destination to avoid recalculation. """ - if not hasattr( self, 'cached_job_runner_url' ): - self.__cache_job_runner_url( params ) - return self.cached_job_runner_url + if not hasattr( self, 'cached_job_destination' ): + self.__cache_job_destination( params ) + return self.cached_job_destination + + #def get_job_destination_id_or_tag( self, params ): + # if not hasattr( self, 'cached_job_destination_id_or_tag' ): + # self.__cache_job_destination( params ) + # return self.cached_job_destination_id_or_tag diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -1,13 +1,129 @@ -import os, logging, os.path +""" +Base classes for job runner plugins. +""" +import os +import time +import logging +import threading + +from Queue import Queue, Empty + +import galaxy.jobs from galaxy import model -from Queue import Queue, Empty -import time -import threading log = logging.getLogger( __name__ ) +STOP_SIGNAL = object() + class BaseJobRunner( object ): + def __init__( self, app, nworkers ): + """Start the job runner + """ + self.app = app + self.sa_session = app.model.context + self.nworkers = nworkers + + def _init_worker_threads(self): + """Start ``nworkers`` worker threads. + """ + self.work_queue = Queue() + self.work_threads = [] + log.debug('Starting %s %s workers' % (self.nworkers, self.runner_name)) + for i in range(self.nworkers): + worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next ) + worker.setDaemon( True ) + worker.start() + self.work_threads.append( worker ) + + def run_next(self): + """Run the next item in the work queue (a job waiting to run) + """ + while 1: + ( method, arg ) = self.work_queue.get() + if method is STOP_SIGNAL: + return + # id and name are collected first so that the call of method() is the last exception. + try: + # arg should be a JobWrapper/TaskWrapper + job_id = arg.get_id_tag() + except: + job_id = 'unknown' + try: + name = method.__name__ + except: + name = 'unknown' + try: + method(arg) + except: + log.exception( "(%s) Unhandled exception calling %s" % ( job_id, name ) ) + + # Causes a runner's `queue_job` method to be called from a worker thread + def put(self, job_wrapper): + """Add a job to the queue (by job identifier), indicate that the job is ready to run. + """ + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + # Persist the destination so that the job will be included in counts if using concurrency limits + job_wrapper.set_job_destination( job_wrapper.job_destination, None ) + self.mark_as_queued(job_wrapper) + + def mark_as_queued(self, job_wrapper): + self.work_queue.put( ( self.queue_job, job_wrapper ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the worker threads + """ + log.info( "%s: Sending stop signal to %s worker threads" % ( self.runner_name, len( self.work_threads ) ) ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( STOP_SIGNAL, None ) ) + + # Most runners should override the legacy URL handler methods and destination param method + def url_to_destination(self, url): + """ + Convert a legacy URL to a JobDestination. + + Job runner URLs are deprecated, JobDestinations should be used instead. + This base class method converts from a URL to a very basic + JobDestination without destination params. + """ + return galaxy.jobs.JobDestination(runner=url.split(':')[0]) + + def parse_destination_params(self, params): + """Parse the JobDestination ``params`` dict and return the runner's native representation of those params. + """ + raise NotImplementedError() + + # Runners must override the job handling methods + def queue_job(self, job_wrapper): + """Some sanity checks that all runners' queue_job() methods are likely to want to do + """ + job_id = job_wrapper.get_id_tag() + job_state = job_wrapper.get_state() + job_wrapper.is_ready = False + + # Make sure the job hasn't been deleted + if job_state != model.Job.states.QUEUED: + log.info( "(%d) Job is in state %s, skipping execution" % ( job_id, job_state ) ) + return + + # Prepare the job + try: + job_wrapper.prepare() + job_wrapper.runner_command_line = self.build_command_line( job_wrapper ) + except: + log.exception("(%d) Failure preparing job" % job_id) + job_wrapper.fail( "failure preparing job", exception=True ) + return + + job_wrapper.is_ready = True + + def stop_job(self, job): + raise NotImplementedError() + + def recover(self, job, job_wrapper): + raise NotImplementedError() + def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ): """ Compose the sequence of commands necessary to execute a job. This will @@ -104,12 +220,11 @@ log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) return output_pairs - -class ClusterJobState( object ): +class AsynchronousJobState( object ): """ - Encapsulate the state of a cluster job, this should be subclassed as + Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed - to communicate with cluster job manager. + to communicate with distributed resource manager. """ def __init__( self ): @@ -117,23 +232,22 @@ self.job_id = None self.old_state = None self.running = False - self.runner_url = None + self.job_file = None + self.output_file = None + self.error_file = None + self.exit_code_file = None + self.check_count = 0 + self.job_destination = None -STOP_SIGNAL = object() - -JOB_STATUS_QUEUED = 'queue' -JOB_STATUS_FAILED = 'fail' -JOB_STATUS_FINISHED = 'finish' - -class ClusterJobRunner( BaseJobRunner ): - """ - Not sure this is the best name for this class, but there is common code - shared between sge, pbs, drmaa, etc... +class AsynchronousJobRunner( BaseJobRunner ): + """Parent class for any job runner that runs jobs asynchronously (e.g. via + a distributed resource manager). Provides general methods for having a + thread to monitor the state of asynchronous jobs and submitting those jobs + to the correct methods (queue, finish, cleanup) at appropriate times.. """ - def __init__( self, app ): - self.app = app - self.sa_session = app.model.context + def __init__( self, app, nworkers ): + super( AsynchronousJobRunner, self ).__init__( app, nworkers ) # 'watched' and 'queue' are both used to keep track of jobs to watch. # 'queue' is used to add new watched jobs, and can be called from # any thread (usually by the 'queue_job' method). 'watched' must only @@ -147,82 +261,44 @@ self.monitor_thread.setDaemon( True ) self.monitor_thread.start() - def _init_worker_threads(self): - self.work_queue = Queue() - self.work_threads = [] - nworkers = self.app.config.cluster_job_queue_workers - for i in range( nworkers ): - worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next ) - worker.setDaemon( True ) - worker.start() - self.work_threads.append( worker ) - def handle_stop(self): # DRMAA and SGE runners should override this and disconnect. pass def monitor( self ): """ - Watches jobs currently in the cluster queue and deals with state changes - (queued to running) and job completion + Watches jobs currently in the monitor queue and deals with state + changes (queued to running) and job completion. """ while 1: # Take any new watched jobs and put them on the monitor list try: while 1: - cluster_job_state = self.monitor_queue.get_nowait() - if cluster_job_state is STOP_SIGNAL: + async_job_state = self.monitor_queue.get_nowait() + if async_job_state is STOP_SIGNAL: # TODO: This is where any cleanup would occur self.handle_stop() return - self.watched.append( cluster_job_state ) + self.watched.append( async_job_state ) except Empty: pass # Iterate over the list of watched jobs and check state - self.check_watched_items() + try: + self.check_watched_items() + except Exception, e: + log.exception('Unhandled exception checking active jobs') # Sleep a bit before the next state check time.sleep( 1 ) - def run_next( self ): - """ - Run the next item in the queue (a job waiting to run or finish ) - """ - while 1: - ( op, obj ) = self.work_queue.get() - if op is STOP_SIGNAL: - return - try: - if op == JOB_STATUS_QUEUED: - # If the next item is to be run, then only run it if the - # job state is "queued". Otherwise the next item was either - # cancelled or one of its siblings encountered an error. - job_state = obj.get_state() - if model.Job.states.QUEUED == job_state: - self.queue_job( obj ) - else: - log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) ) - elif op == JOB_STATUS_FINISHED: - self.finish_job( obj ) - elif op == JOB_STATUS_FAILED: - self.fail_job( obj ) - except: - log.exception( "Uncaught exception %sing job" % op ) - def monitor_job(self, job_state): self.monitor_queue.put( job_state ) - def put( self, job_wrapper ): - """Add a job to the queue (by job identifier)""" - # Change to queued state before handing to worker thread so the runner won't pick it up again - job_wrapper.change_state( model.Job.states.QUEUED ) - self.mark_as_queued(job_wrapper) - def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" - log.info( "sending stop signal to worker threads" ) + log.info( "%s: Sending stop signal to monitor thread" % self.runner_name ) self.monitor_queue.put( STOP_SIGNAL ) - for i in range( len( self.work_threads ) ): - self.work_queue.put( ( STOP_SIGNAL, None ) ) + # Call the parent's shutdown method to stop workers + super( AsynchronousJobRunner, self ).shutdown() def check_watched_items(self): """ @@ -233,19 +309,16 @@ reuse the logic here. """ new_watched = [] - for cluster_job_state in self.watched: - new_cluster_job_state = self.check_watched_item(cluster_job_state) - if new_cluster_job_state: - new_watched.append(new_cluster_job_state) + for async_job_state in self.watched: + new_async_job_state = self.check_watched_item(async_job_state) + if new_async_job_state: + new_watched.append(new_async_job_state) self.watched = new_watched # Subclasses should implement this unless they override check_watched_items all together. def check_watched_item(self): raise NotImplementedError() - def queue_job(self, job_wrapper): - raise NotImplementedError() - def finish_job(self, job_state): raise NotImplementedError() @@ -253,10 +326,7 @@ raise NotImplementedError() def mark_as_finished(self, job_state): - self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) ) + self.work_queue.put( ( self.finish_job, job_state ) ) def mark_as_failed(self, job_state): - self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) ) - - def mark_as_queued(self, job_wrapper): - self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) ) + self.work_queue.put( ( self.fail_job, job_state ) ) diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -89,7 +89,7 @@ Job runner backed by a finite pool of worker threads. FIFO scheduling """ STOP_SIGNAL = object() - def __init__( self, app ): + def __init__( self, app, nworkers ): """Initialize this job runner and start the monitor thread""" # Check if drmaa was importable, fail if not self.app = app This diff is so big that we needed to truncate the remainder. https://bitbucket.org/galaxy/galaxy-central/commits/f6570f99cc5c/ changeset: f6570f99cc5c user: natefoo date: 2013-02-11 18:19:44 summary: Fixes for job limiting and recovery with destinations. affected #: 3 files diff -r 37c0fd353dd778d2de8edf51c8d454dadee26caf -r f6570f99cc5c6cf7afd7736c8ec0ddd821af758f lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -180,6 +180,11 @@ self.tools[id].append(JobToolConfiguration(**dict(tool.items()))) self.tools[id][-1]['params'] = self.__get_params(tool) + types = dict(registered_user_concurrent_jobs = int, + anonymous_user_concurrent_jobs = int, + walltime = str, + output_size = int) + self.limits = Bunch(registered_user_concurrent_jobs = None, anonymous_user_concurrent_jobs = None, walltime = None, @@ -196,7 +201,7 @@ id = limit.get('tag', None) or limit.get('id') self.limits.concurrent_jobs[id] = int(limit.text) elif limit.text: - self.limits.__dict__[type] = limit.text + self.limits.__dict__[type] = types.get(type, str)(limit.text) if self.limits.walltime is not None: h, m, s = [ int( v ) for v in self.limits.walltime.split( ':' ) ] diff -r 37c0fd353dd778d2de8edf51c8d454dadee26caf -r f6570f99cc5c6cf7afd7736c8ec0ddd821af758f lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -105,7 +105,7 @@ # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist # TODO: test me extensively job_wrapper = JobWrapper( job, self ) - job_wrapper.set_job_destination(self.dispatcher.url_to_destination(self.job_runner_name)) + job_wrapper.set_job_destination(self.dispatcher.url_to_destination(job.job_runner_name), job.job_runner_external_id) self.dispatcher.recover( job, job_wrapper ) log.info('(%s) Converted job from a URL to a destination and recovered' % (job.id)) elif job.job_runner_name is None: @@ -561,7 +561,7 @@ try: return self.job_runners[runner_name].url_to_destination(url) except Exception, e: - log.error("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e)) + log.exception("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e)) return JobDestination(runner=runner_name) def put( self, job_wrapper ): diff -r 37c0fd353dd778d2de8edf51c8d454dadee26caf -r f6570f99cc5c6cf7afd7736c8ec0ddd821af758f lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -167,6 +167,8 @@ param, value = opt.split( None, 1 ) params[param] = value + log.debug("Converted URL '%s' to destination runner=pbs, params=%s" % (url, params)) + # Create a dynamic JobDestination return JobDestination(runner='pbs', params=params) https://bitbucket.org/galaxy/galaxy-central/commits/f3ff8526fde6/ changeset: f3ff8526fde6 user: natefoo date: 2013-02-11 18:27:37 summary: Merge in job URL->destination changes. If you have running jobs and are using DRMAA, Condor, or the CLI runner plugins and you are not willing to lose those jobs, please wait for the commit converting those plugins to the new style. affected #: 25 files diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 job_conf.xml.sample_advanced --- /dev/null +++ b/job_conf.xml.sample_advanced @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<job_conf> + <plugins workers="4"> + <!-- "workers" is the number of threads for the runner's work queue. + The default from <plugins> is used if not defined for a <plugin>. + --> + <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/> + <plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/> + <plugin id="gridengine" type="runner" load="galaxy.jobs.runners.drmaa:DRMAARunner"/> + </plugins> + <handlers default="handlers"> + <!-- Additional job handlers - the id should match the name of a + [server:<id>] in universe_wsgi.ini. + --> + <handler id="handler0" tags="handlers"/> + <handler id="handler1" tags="handlers"/> + <handler id="special_handler0" tags="special_handlers"/> + <handler id="special_handler1" tags="special_handlers"/> + <handler id="trackster_handler"/> + </handlers> + <destinations default="local"> + <!-- Destinations define details about remote resources and how jobs + should be executed on those remote resources. + --> + <destination id="local" runner="local"/> + <destination id="pbs" runner="pbs" tags="mycluster"/> + <destination id="pbs_longjobs" runner="pbs" tags="mycluster,longjobs"> + <!-- Define parameters that are native to the job runner plugin. --> + <param id="Execution_Time">72:00:00</param> + </destination> + <destination id="remote_cluster" runner="drmaa" tags="longjobs"/> + <destination id="real_user_cluster" runner="drmaa"> + <!-- TODO: The real user options should maybe not be considered runner params. --> + <param id="galaxy_external_runjob_script">scripts/drmaa_external_runner.py</param> + <param id="galaxy_external_killjob_script">scripts/drmaa_external_killer.py</param> + <param id="galaxy_external_chown_script">scripts/external_chown_script.py</param> + </destination> + <destination id="dynamic" runner="dynamic"> + <!-- A destination that represents a method in the dynamic runner. --> + <param id="type">python</param> + <param id="function">foo</param> + </destination> + </destinations> + <tools> + <!-- Tools can be configured to use specific destinations or handlers, + identified by either the "id" or "tags" attribute. If assigned to + a tag, a handler or destination that matches that tag will be + chosen at random. + --> + <tool id="foo" handler="trackster_handler"> + <param id="source">trackster</param> + </tool> + <tool id="bar" destination="dynamic"/> + <tool id="baz" handler="special_handlers" destination="bigmem"/> + </tools> + <limits> + <!-- Certain limits can be defined. --> + <limit type="registered_user_concurrent_jobs">2</limit> + <limit type="unregistered_user_concurrent_jobs">1</limit> + <limit type="job_walltime">24:00:00</limit> + <limit type="concurrent_jobs" id="local">1</limit> + <limit type="concurrent_jobs" tag="mycluster">2</limit> + <limit type="concurrent_jobs" tag="longjobs">1</limit> + </limits> +</job_conf> diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 job_conf.xml.sample_basic --- /dev/null +++ b/job_conf.xml.sample_basic @@ -0,0 +1,13 @@ +<?xml version="1.0"?> +<!-- A sample job config that explicitly configures job running the way it is configured by default (if there is no explicit config). --> +<job_conf> + <plugins> + <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4"/> + </plugins> + <handlers> + <handler id="main"/> + </handlers> + <destinations> + <destination id="local" runner="local"/> + </destinations> +</job_conf> diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -86,6 +86,8 @@ self.tool_data_tables.load_from_config_file( config_filename=self.config.shed_tool_data_table_config, tool_data_path=self.tool_data_tables.tool_data_path, from_shed_config=True ) + # Initialize the job management configuration + self.job_config = jobs.JobConfiguration(self) # Initialize the tools, making sure the list of tool configs includes the reserved migrated_tools_conf.xml file. tool_configs = self.config.tool_configs if self.config.migrated_tools_config not in tool_configs: diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -91,6 +91,7 @@ self.collect_outputs_from = [ x.strip() for x in kwargs.get( 'collect_outputs_from', 'new_file_path,job_working_directory' ).lower().split(',') ] self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root ) self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root ) + self.job_config_file = resolve_path( kwargs.get( 'job_config_file', 'job_conf.xml' ), self.root ) self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) ) self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) ) self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") ) @@ -111,8 +112,8 @@ self.smtp_server = kwargs.get( 'smtp_server', None ) self.smtp_username = kwargs.get( 'smtp_username', None ) self.smtp_password = kwargs.get( 'smtp_password', None ) - self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', None ) - self.start_job_runners = kwargs.get( 'start_job_runners', None ) + self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', 'None' ) + self.start_job_runners = listify(kwargs.get( 'start_job_runners', '' )) self.expose_dataset_path = string_as_bool( kwargs.get( 'expose_dataset_path', 'False' ) ) # External Service types used in sample tracking self.external_service_type_config_file = resolve_path( kwargs.get( 'external_service_type_config_file', 'external_service_types_conf.xml' ), self.root ) @@ -123,8 +124,8 @@ # The transfer manager and deferred job queue self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) ) # Per-user Job concurrency limitations + self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) ) self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) ) - # user_job_limit for backwards-compatibility self.registered_user_job_limit = int( kwargs.get( 'registered_user_job_limit', self.user_job_limit ) ) self.anonymous_user_job_limit = int( kwargs.get( 'anonymous_user_job_limit', self.user_job_limit ) ) self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) @@ -214,28 +215,20 @@ # Crummy, but PasteScript does not give you a way to determine this if arg.lower().startswith('--server-name='): self.server_name = arg.split('=', 1)[-1] + # Store all configured server names + self.server_names = [] + for section in global_conf_parser.sections(): + if section.startswith('server:'): + self.server_names.append(section.replace('server:', '', 1)) # Store advanced job management config self.job_manager = kwargs.get('job_manager', self.server_name).strip() self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ] self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ] - # parse the [galaxy:job_limits] section - self.job_limits = {} - try: - job_limits = global_conf_parser.items( 'galaxy:job_limits' ) - for k, v in job_limits: - # Since the URL contains a colon and possibly an equals sign, consider ' = ' the delimiter - more_k, v = v.split(' = ', 1) - k = '%s:%s' % (k, more_k.strip()) - v = v.strip().rsplit(None, 1) - v[1] = int(v[1]) - self.job_limits[k] = v - except ConfigParser.NoSectionError: - pass - # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory) - if self.track_jobs_in_database is None or self.track_jobs_in_database == "None": - self.track_jobs_in_database = True - if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ): - self.track_jobs_in_database = False + # Use database for job running IPC unless this is a standalone server or explicitly set in the config + if self.track_jobs_in_database == 'None': + self.track_jobs_in_database = False + if len(self.server_names) > 1: + self.track_jobs_in_database = True else: self.track_jobs_in_database = string_as_bool( self.track_jobs_in_database ) # Store per-tool runner configs diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -6,13 +6,17 @@ import sys import pwd import time +import copy +import random import logging +import datetime import threading import traceback import subprocess import galaxy from galaxy import util, model +from galaxy.util.bunch import Bunch from galaxy.datatypes.tabular import * from galaxy.datatypes.interval import * # tabular/interval imports appear to be unused. Clean up? @@ -22,6 +26,7 @@ from galaxy.jobs.actions.post import ActionBox from galaxy.exceptions import ObjectInvalid from galaxy.jobs.mapper import JobRunnerMapper +from galaxy.jobs.runners import BaseJobRunner log = logging.getLogger( __name__ ) @@ -47,6 +52,491 @@ self.condition.notify() self.condition.release() +class JobDestination( Bunch ): + """ + Provides details about where a job runs + """ + def __init__(self, **kwds): + self['id'] = None + self['url'] = None + self['tags'] = None + self['runner'] = None + self['legacy'] = False + self['converted'] = False + # dict is appropriate (rather than a bunch) since keys may not be valid as attributes + self['params'] = dict() + super(JobDestination, self).__init__(**kwds) + + # Store tags as a list + if self.tags is not None: + self['tags'] = [ x.strip() for x in self.tags.split(',') ] + +class JobToolConfiguration( Bunch ): + """ + Provides details on what handler and destination a tool should use + + A JobToolConfiguration will have the required attribute 'id' and optional + attributes 'handler', 'destination', and 'params' + """ + def __init__(self, **kwds): + self['handler'] = None + self['destination'] = None + self['params'] = dict() + super(JobToolConfiguration, self).__init__(**kwds) + +class JobConfiguration( object ): + """A parser and interface to advanced job management features. + + These features are configured in the job configuration, by default, ``job_conf.xml`` + """ + DEFAULT_NWORKERS = 4 + def __init__(self, app): + """Parse the job configuration XML. + """ + self.app = app + self.runner_plugins = [] + self.handlers = {} + self.default_handler_id = None + self.destinations = {} + self.destination_tags = {} + self.default_destination_id = None + self.tools = {} + self.limits = Bunch() + + # Initialize the config + try: + tree = util.parse_xml(self.app.config.job_config_file) + self.__parse_job_conf_xml(tree) + except IOError: + log.warning( 'Job configuration "%s" does not exist, using legacy job configuration from Galaxy config file "%s" instead' % ( self.app.config.job_config_file, self.app.config.config_file ) ) + self.__parse_job_conf_legacy() + + def __parse_job_conf_xml(self, tree): + """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml). + + :param tree: Object representing the root ``<job_conf>`` object in the job config file. + :type tree: ``xml.etree.ElementTree.Element`` + """ + root = tree.getroot() + log.debug('Loading job configuration from %s' % self.app.config.job_config_file) + + # Parse job plugins + plugins = root.find('plugins') + if plugins is not None: + for plugin in self.__findall_with_required(plugins, 'plugin', ('id', 'type', 'load')): + if plugin.get('type') == 'runner': + workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS)) + self.runner_plugins.append(dict(id=plugin.get('id'), load=plugin.get('load'), workers=int(workers))) + else: + log.error('Unknown plugin type: %s' % plugin.get('type')) + # Load tasks if configured + if self.app.config.use_tasked_jobs: + self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers)) + + # Parse handlers + handlers = root.find('handlers') + if handlers is not None: + for handler in self.__findall_with_required(handlers, 'handler'): + id = handler.get('id') + if id in self.handlers: + log.error("Handler '%s' overlaps handler with the same name, ignoring" % id) + else: + log.debug("Read definition for handler '%s'" % id) + self.handlers[id] = (id,) + if handler.get('tags', None) is not None: + for tag in [ x.strip() for x in handler.get('tags').split(',') ]: + if tag in self.handlers: + self.handlers[tag].append(id) + else: + self.handlers[tag] = [id] + + # Determine the default handler(s) + self.default_handler_id = self.__get_default(handlers, self.handlers.keys()) + + # Parse destinations + destinations = root.find('destinations') + for destination in self.__findall_with_required(destinations, 'destination', ('id', 'runner')): + id = destination.get('id') + job_destination = JobDestination(**dict(destination.items())) + job_destination['params'] = self.__get_params(destination) + self.destinations[id] = (job_destination,) + if job_destination.tags is not None: + for tag in job_destination.tags: + if tag not in self.destinations: + self.destinations[tag] = [] + self.destinations[tag].append(job_destination) + + # Determine the default destination + self.default_destination_id = self.__get_default(destinations, self.destinations.keys()) + + # Parse tool mappings + tools = root.find('tools') + if tools is not None: + for tool in self.__findall_with_required(tools, 'tool'): + # There can be multiple definitions with identical ids, but different params + id = tool.get('id') + if id not in self.tools: + self.tools[id] = list() + self.tools[id].append(JobToolConfiguration(**dict(tool.items()))) + self.tools[id][-1]['params'] = self.__get_params(tool) + + types = dict(registered_user_concurrent_jobs = int, + anonymous_user_concurrent_jobs = int, + walltime = str, + output_size = int) + + self.limits = Bunch(registered_user_concurrent_jobs = None, + anonymous_user_concurrent_jobs = None, + walltime = None, + walltime_delta = None, + output_size = None, + concurrent_jobs = {}) + + # Parse job limits + limits = root.find('limits') + if limits is not None: + for limit in self.__findall_with_required(limits, 'limit', ('type',)): + type = limit.get('type') + if type == 'concurrent_jobs': + id = limit.get('tag', None) or limit.get('id') + self.limits.concurrent_jobs[id] = int(limit.text) + elif limit.text: + self.limits.__dict__[type] = types.get(type, str)(limit.text) + + if self.limits.walltime is not None: + h, m, s = [ int( v ) for v in self.limits.walltime.split( ':' ) ] + self.limits.walltime_delta = datetime.timedelta( 0, s, 0, 0, m, h ) + + log.debug('Done loading job configuration') + + def __parse_job_conf_legacy(self): + """Loads the old-style job configuration from options in the galaxy config file (by default, universe_wsgi.ini). + """ + log.debug('Loading job configuration from %s' % self.app.config.config_file) + + # Always load local and lwr + self.runner_plugins = [dict(id='local', load='local', workers=self.app.config.local_job_queue_workers), dict(id='lwr', load='lwr', workers=self.app.config.cluster_job_queue_workers)] + # Load tasks if configured + if self.app.config.use_tasked_jobs: + self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers)) + for runner in self.app.config.start_job_runners: + self.runner_plugins.append(dict(id=runner, load=runner, workers=self.app.config.cluster_job_queue_workers)) + + # Set the handlers + for id in self.app.config.job_handlers: + self.handlers[id] = (id,) + + self.handlers['default_job_handlers'] = self.app.config.default_job_handlers + self.default_handler_id = 'default_job_handlers' + + # Set tool handler configs + for id, tool_handlers in self.app.config.tool_handlers.items(): + self.tools[id] = list() + for handler_config in tool_handlers: + # rename the 'name' key to 'handler' + handler_config['handler'] = handler_config.pop('name') + self.tools[id].append(JobToolConfiguration(**handler_config)) + + # Set tool runner configs + for id, tool_runners in self.app.config.tool_runners.items(): + # Might have been created in the handler parsing above + if id not in self.tools: + self.tools[id] = list() + for runner_config in tool_runners: + url = runner_config['url'] + if url not in self.destinations: + # Create a new "legacy" JobDestination - it will have its URL converted to a destination params once the appropriate plugin has loaded + self.destinations[url] = (JobDestination(id=url, runner=url.split(':', 1)[0], url=url, legacy=True, converted=False),) + for tool_conf in self.tools[id]: + if tool_conf.params == runner_config.get('params', {}): + tool_conf['destination'] = url + break + else: + # There was not an existing config (from the handlers section) with the same params + # rename the 'url' key to 'destination' + runner_config['destination'] = runner_config.pop('url') + self.tools[id].append(JobToolConfiguration(**runner_config)) + + self.destinations[self.app.config.default_cluster_job_runner] = (JobDestination(id=self.app.config.default_cluster_job_runner, runner=self.app.config.default_cluster_job_runner.split(':', 1)[0], url=self.app.config.default_cluster_job_runner, legacy=True, converted=False),) + self.default_destination_id = self.app.config.default_cluster_job_runner + + # Set the job limits + self.limits = Bunch(registered_user_concurrent_jobs = self.app.config.registered_user_job_limit, + anonymous_user_concurrent_jobs = self.app.config.anonymous_user_job_limit, + walltime = self.app.config.job_walltime, + walltime_delta = self.app.config.job_walltime_delta, + output_size = self.app.config.output_size_limit, + concurrent_jobs = {}) + + log.debug('Done loading job configuration') + + def __get_default(self, parent, names): + """Returns the default attribute set in a parent tag like <handlers> or <destinations>, or return the ID of the child, if there is no explicit default and only one child. + + :param parent: Object representing a tag that may or may not have a 'default' attribute. + :type parent: ``xml.etree.ElementTree.Element`` + :param names: The list of destination or handler IDs or tags that were loaded. + :type names: list of str + + :returns: str -- id or tag representing the default. + """ + rval = parent.get('default') + if rval is not None: + # If the parent element has a 'default' attribute, use the id or tag in that attribute + if rval not in names: + raise Exception("<%s> default attribute '%s' does not match a defined id or tag in a child element" % (parent.tag, rval)) + log.debug("<%s> default set to child with id or tag '%s'" % (parent.tag, rval)) + elif len(names) == 1: + log.info("Setting <%s> default to child with id '%s'" % (parent.tag, names[0])) + rval = names[0] + else: + raise Exception("No <%s> default specified, please specify a valid id or tag with the 'default' attribute" % parent.tag) + return rval + + def __findall_with_required(self, parent, match, attribs=None): + """Like ``xml.etree.ElementTree.Element.findall()``, except only returns children that have the specified attribs. + + :param parent: Parent element in which to find. + :type parent: ``xml.etree.ElementTree.Element`` + :param match: Name of child elements to find. + :type match: str + :param attribs: List of required attributes in children elements. + :type attribs: list of str + + :returns: list of ``xml.etree.ElementTree.Element`` + """ + rval = [] + if attribs is None: + attribs = ('id',) + for elem in parent.findall(match): + for attrib in attribs: + if attrib not in elem.attrib: + log.warning("required '%s' attribute is missing from <%s> element" % (attrib, match)) + break + else: + rval.append(elem) + return rval + + def __get_params(self, parent): + """Parses any child <param> tags in to a dictionary suitable for persistence. + + :param parent: Parent element in which to find child <param> tags. + :type parent: ``xml.etree.ElementTree.Element`` + + :returns: dict + """ + rval = {} + for param in parent.findall('param'): + rval[param.get('id')] = param.text + return rval + + @property + def default_job_tool_configuration(self): + """The default JobToolConfiguration, used if a tool does not have an explicit defintion in the configuration. It consists of a reference to the default handler and default destination. + + :returns: JobToolConfiguration -- a representation of a <tool> element that uses the default handler and destination + """ + return JobToolConfiguration(id='default', handler=self.default_handler_id, destination=self.default_destination_id) + + # Called upon instantiation of a Tool object + def get_job_tool_configurations(self, ids): + """Get all configured JobToolConfigurations for a tool ID, or, if given a list of IDs, the JobToolConfigurations for the first id in ``ids`` matching a tool definition. + + .. note:: + + You should not mix tool shed tool IDs, versionless tool shed IDs, and tool config tool IDs that refer to the same tool. + + :param ids: Tool ID or IDs to fetch the JobToolConfiguration of. + :type ids: list or str. + :returns: list -- JobToolConfiguration Bunches representing <tool> elements matching the specified ID(s). + + Example tool ID strings include: + + * Full tool shed id: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool/1.0.0`` + * Tool shed id less version: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool`` + * Tool config tool id: ``filter_tool`` + """ + rval = [] + # listify if ids is a single (string) id + ids = util.listify(ids) + for id in ids: + if id in self.tools: + # If a tool has definitions that include job params but not a + # definition for jobs without params, include the default + # config + for job_tool_configuration in self.tools[id]: + if not job_tool_configuration.params: + break + else: + rval.append(self.default_job_tool_configuration) + rval.extend(self.tools[id]) + break + else: + rval.append(self.default_job_tool_configuration) + return rval + + def __get_single_item(self, collection): + """Given a collection of handlers or destinations, return one item from the collection at random. + """ + # Done like this to avoid random under the assumption it's faster to avoid it + if len(collection) == 1: + return collection[0] + else: + return random.choice(collection) + + # This is called by Tool.get_job_handler() + def get_handler(self, id_or_tag): + """Given a handler ID or tag, return the provided ID or an ID matching the provided tag + + :param id_or_tag: A handler ID or tag. + :type id_or_tag: str + + :returns: str -- A valid job handler ID. + """ + return self.__get_single_item(self.handlers[id_or_tag]) + + def get_destination(self, id_or_tag): + """Given a destination ID or tag, return the JobDestination matching the provided ID or tag + + :param id_or_tag: A destination ID or tag. + :type id_or_tag: str + + :returns: JobDestination -- A valid destination + + Destinations are deepcopied as they are expected to be passed in to job + runners, which will modify them for persisting params set at runtime. + """ + return copy.deepcopy(self.__get_single_item(self.destinations[id_or_tag])) + + def get_destinations(self, id_or_tag): + """Given a destination ID or tag, return all JobDestinations matching the provided ID or tag + + :param id_or_tag: A destination ID or tag. + :type id_or_tag: str + + :returns: list or tuple of JobDestinations + + Destinations are not deepcopied, so they should not be passed to + anything which might modify them. + """ + return self.destinations.get(id_or_tag, None) + + def get_job_runner_plugins(self): + """Load all configured job runner plugins + + :returns: list of job runner plugins + """ + rval = {} + for runner in self.runner_plugins: + class_names = [] + module = None + id = runner['id'] + load = runner['load'] + if ':' in load: + # Name to load was specified as '<module>:<class>' + module_name, class_name = load.rsplit(':', 1) + class_names = [ class_name ] + module = __import__( module_name ) + else: + # Name to load was specified as '<module>' + if '.' not in load: + # For legacy reasons, try from galaxy.jobs.runners first if there's no '.' in the name + module_name = 'galaxy.jobs.runners.' + load + try: + module = __import__( module_name ) + except ImportError: + # No such module, we'll retry without prepending galaxy.jobs.runners. + # All other exceptions (e.g. something wrong with the module code) will raise + pass + if module is None: + # If the name included a '.' or loading from the static runners path failed, try the original name + module = __import__( load ) + module_name = load + if module is None: + # Module couldn't be loaded, error should have already been displayed + continue + for comp in module_name.split( "." )[1:]: + module = getattr( module, comp ) + if not class_names: + # If there's not a ':', we check <module>.__all__ for class names + try: + assert module.__all__ + class_names = module.__all__ + except AssertionError: + log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % load ) + continue + for class_name in class_names: + runner_class = getattr( module, class_name ) + try: + assert issubclass(runner_class, BaseJobRunner) + except TypeError: + log.warning("A non-class name was found in __all__, ignoring: %s" % id) + continue + except AssertionError: + log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__)) + continue + try: + rval[id] = runner_class( self.app, runner['workers'] ) + except TypeError: + log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) ) + rval[id] = runner_class( self.app ) + log.debug( "Loaded job runner '%s:%s' as '%s'" % ( module_name, class_name, id ) ) + return rval + + def is_id(self, collection): + """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID + + :param collection: A representation of a destination or handler + :type collection: tuple or list + + :returns: bool + """ + return type(collection) == tuple + + def is_tag(self, collection): + """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID + + :param collection: A representation of a destination or handler + :type collection: tuple or list + + :returns: bool + """ + return type(collection) == list + + def is_handler(self, server_name): + """Given a server name, indicate whether the server is a job handler + + :param server_name: The name to check + :type server_name: str + + :return: bool + """ + for collection in self.handlers.values(): + if server_name in collection: + return True + return False + + def convert_legacy_destinations(self, job_runners): + """Converts legacy (from a URL) destinations to contain the appropriate runner params defined in the URL. + + :param job_runners: All loaded job runner plugins. + :type job_runners: list of job runner plugins + """ + for id, destination in [ ( id, destinations[0] ) for id, destinations in self.destinations.items() if self.is_id(destinations) ]: + # Only need to deal with real destinations, not members of tags + if destination.legacy and not destination.converted: + if destination.runner in job_runners: + destination.params = job_runners[destination.runner].url_to_destination(destination.url).params + destination.converted = True + if destination.params: + log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url)) + for k, v in destination.params.items(): + log.debug(" %s: %s" % (k, v)) + else: + log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url)) + else: + log.warning("Legacy destination with id '%s' could not be converted: Unknown runner plugin: %s" % (id, destination.runner)) + class JobWrapper( object ): """ Wraps a 'model.Job' with convenience methods for running processes and @@ -81,7 +571,7 @@ self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) - self.job_runner_mapper = JobRunnerMapper( self, job.job_runner_name ) + self.job_runner_mapper = JobRunnerMapper( self, queue.dispatcher.url_to_destination ) self.params = None if job.params: self.params = from_json_string( job.params ) @@ -94,7 +584,8 @@ return self.app.config.use_tasked_jobs and self.tool.parallelism def get_job_runner_url( self ): - return self.job_runner_mapper.get_job_runner_url( self.params ) + log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id) + return self.job_destination.url def get_parallelism(self): return self.tool.parallelism @@ -102,6 +593,20 @@ # legacy naming get_job_runner = get_job_runner_url + @property + def job_destination(self): + """Return the JobDestination that this job will use to run. This will + either be a configured destination, a randomly selected destination if + the configured destination was a tag, or a dynamically generated + destination from the dynamic runner. + + Calling this method for the first time causes the dynamic runner to do + its calculation, if any. + + :returns: ``JobDestination`` + """ + return self.job_runner_mapper.get_job_destination(self.params) + def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) @@ -321,11 +826,24 @@ return job.state def set_runner( self, runner_url, external_id ): + log.warning('set_runner() is deprecated, use set_job_destination()') + self.set_job_destination(self.job_destination, external_id) + + def set_job_destination(self, job_destination, external_id): + """ + Persist job destination params in the database for recovery. + + self.job_destination is not used because a runner may choose to rewrite + parts of the destination (e.g. the params). + """ job = self.get_job() - self.sa_session.refresh( job ) - job.job_runner_name = runner_url + self.sa_session.refresh(job) + log.debug('(%s) Persisting job destination (destination id: %s)' % (job.id, job_destination.id)) + job.destination_id = job_destination.id + job.destination_params = job_destination.params + job.job_runner_name = job_destination.runner job.job_runner_external_id = external_id - self.sa_session.add( job ) + self.sa_session.add(job) self.sa_session.flush() def finish( self, stdout, stderr, tool_exit_code=None ): @@ -699,6 +1217,28 @@ except: log.exception( "Unable to cleanup job %d" % self.job_id ) + def get_output_sizes( self ): + sizes = [] + output_paths = self.get_output_fnames() + for outfile in [ str( o ) for o in output_paths ]: + if os.path.exists( outfile ): + sizes.append( ( outfile, os.stat( outfile ).st_size ) ) + else: + sizes.append( ( outfile, 0 ) ) + return sizes + + def check_limits(self, runtime=None): + if self.app.job_config.limits.output_size > 0: + for outfile, size in self.get_output_sizes(): + if size > self.app.config.output_size_limit: + log.warning( '(%s) Job output %s is over the output size limit' % ( self.get_id_tag(), os.path.basename( outfile ) ) ) + return 'Job output file grew too large (greater than %s), please try different inputs or parameters' % util.nice_size( self.app.job_config.limits.output_size ) + if self.app.job_config.limits.walltime_delta is not None and runtime is not None: + if runtime > self.app.job_config.limits.walltime_delta: + log.warning( '(%s) Job has reached walltime, it will be terminated' % ( self.get_id_tag() ) ) + return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime + return None + def get_command_line( self ): return self.command_line @@ -825,16 +1365,6 @@ return ExpressionContext( meta, job_context ) return job_context - def check_output_sizes( self ): - sizes = [] - output_paths = self.get_output_fnames() - for outfile in [ str( o ) for o in output_paths ]: - if os.path.exists( outfile ): - sizes.append( ( outfile, os.stat( outfile ).st_size ) ) - else: - sizes.append( ( outfile, 0 ) ) - return sizes - def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ): # extension could still be 'auto' if this is the upload tool. job = self.get_job() @@ -1148,16 +1678,6 @@ # Handled at the parent job level. Do nothing here. pass - def check_output_sizes( self ): - sizes = [] - output_paths = self.get_output_fnames() - for outfile in [ str( o ) for o in output_paths ]: - if os.path.exists( outfile ): - sizes.append( ( outfile, os.stat( outfile ).st_size ) ) - else: - sizes.append( ( outfile, 0 ) ) - return sizes - def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ): # There is no metadata setting for tasks. This is handled after the merge, at the job level. return "" diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -11,7 +11,7 @@ from sqlalchemy.sql.expression import and_, or_, select, func from galaxy import util, model -from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper +from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination log = logging.getLogger( __name__ ) @@ -51,6 +51,9 @@ self.sa_session = app.model.context self.track_jobs_in_database = self.app.config.track_jobs_in_database + # Initialize structures for handling job limits + self.__clear_user_job_count() + # Keep track of the pid that started the job manager, only it # has valid threads self.parent_pid = os.getpid() @@ -58,6 +61,8 @@ self.queue = Queue() # Contains jobs that are waiting (only use from monitor thread) self.waiting_jobs = [] + # Contains wrappers of jobs that are limited or ready (so they aren't created unnecessarily/multiple times) + self.job_wrappers = {} # Helper for interruptable sleep self.sleeper = Sleeper() self.running = True @@ -78,7 +83,7 @@ """ Checks all jobs that are in the 'new', 'queued' or 'running' state in the database and requeues or cleans up as necessary. Only run as the - job manager starts. + job handler starts. """ for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \ .filter( ( ( model.Job.state == model.Job.states.NEW ) \ @@ -88,17 +93,32 @@ if job.tool_id not in self.app.toolbox.tools_by_id: log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) - elif job.job_runner_name is None or (job.job_runner_name is not None and job.job_runner_external_id is None): - if job.job_runner_name is None: - log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) ) + if job.job_runner_name is not None and job.job_runner_external_id is None: + # This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner. + log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id ) + job.job_runner_name = None + if self.track_jobs_in_database: + job.state = model.Job.states.NEW else: - log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id ) + self.queue.put( ( job.id, job.tool_id ) ) + elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None: + # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist + # TODO: test me extensively + job_wrapper = JobWrapper( job, self ) + job_wrapper.set_job_destination(self.dispatcher.url_to_destination(job.job_runner_name), job.job_runner_external_id) + self.dispatcher.recover( job, job_wrapper ) + log.info('(%s) Converted job from a URL to a destination and recovered' % (job.id)) + elif job.job_runner_name is None: + # Never (fully) dispatched + log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) ) if self.track_jobs_in_database: job.state = model.Job.states.NEW else: self.queue.put( ( job.id, job.tool_id ) ) else: + # Already dispatched and running job_wrapper = JobWrapper( job, self ) + job_wrapper.job_runner_mapper.cached_job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params) self.dispatcher.recover( job, job_wrapper ) if self.sa_session.dirty: self.sa_session.flush() @@ -156,8 +176,6 @@ ~model.Job.table.c.id.in_(hda_not_ready), ~model.Job.table.c.id.in_(ldda_not_ready))) \ .order_by(model.Job.id).all() - # Ensure that we get new job counts on each iteration - self.__clear_user_job_count() else: # Get job objects and append to watch queue for any which were # previously waiting @@ -174,6 +192,8 @@ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) except Empty: pass + # Ensure that we get new job counts on each iteration + self.__clear_user_job_count() # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting_jobs = [] @@ -183,14 +203,13 @@ # Some of these states will only happen when using the in-memory job queue job_state = self.__check_if_ready_to_run( job ) if job_state == JOB_WAIT: - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) + new_waiting_jobs.append( job.id ) elif job_state == JOB_INPUT_ERROR: log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id ) elif job_state == JOB_INPUT_DELETED: log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id ) elif job_state == JOB_READY: - self.dispatcher.put( JobWrapper( job, self ) ) + self.dispatcher.put( self.job_wrappers.pop( job.id ) ) log.info( "(%d) Job dispatched" % job.id ) elif job_state == JOB_DELETED: log.info( "(%d) Job deleted by user while still queued" % job.id ) @@ -204,14 +223,20 @@ dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run" self.sa_session.add( dataset_assoc.dataset.dataset ) self.sa_session.add( job ) + elif job_state == JOB_ERROR: + log.error( "(%d) Error checking job readiness" % job.id ) else: log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) ) - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) + new_waiting_jobs.append( job.id ) except Exception: log.exception( "failure running job %d" % job.id ) # Update the waiting list - self.waiting_jobs = new_waiting_jobs + if not self.track_jobs_in_database: + self.waiting_jobs = new_waiting_jobs + # Remove cached wrappers for any jobs that are no longer being tracked + for id in self.job_wrappers.keys(): + if id not in new_waiting_jobs: + del self.job_wrappers[id] # Flush, if we updated the state self.sa_session.flush() # Done with the session @@ -239,19 +264,34 @@ continue # don't run jobs for which the input dataset was deleted if idata.deleted: - JobWrapper( job, self ).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) return JOB_INPUT_DELETED # an error in the input data causes us to bail immediately elif idata.state == idata.states.ERROR: - JobWrapper( job, self ).fail( "input data %s is in error state" % ( idata.hid ) ) + self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s is in error state" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state == idata.states.FAILED_METADATA: - JobWrapper( job, self ).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) return JOB_INPUT_ERROR elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): # need to requeue return JOB_WAIT - state = self.__check_user_jobs( job ) + # Create the job wrapper so that the destination can be set + if job.id not in self.job_wrappers: + self.job_wrappers[job.id] = JobWrapper(job, self) + # Cause the job_destination to be set and cached by the mapper + try: + self.job_wrappers[job.id].job_destination + except Exception, e: + failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) + if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: + log.exception( 'Failed to generate job destination' ) + else: + log.debug( "Intentionally failing job with message (%s)" % failure_message ) + self.job_wrappers[job.id].fail( failure_message ) + return JOB_ERROR + # job is ready to run, check limits + state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: @@ -264,48 +304,114 @@ return state def __clear_user_job_count( self ): - self.user_job_count = {} - self.user_job_count_per_runner = {} + self.user_job_count = None + self.user_job_count_per_destination = None - def __check_user_jobs( self, job ): + def get_user_job_count(self, user_id): + self.__cache_user_job_count() + # This could have been incremented by a previous job dispatched on this iteration, even if we're not caching + rval = self.user_job_count.get(user_id, 0) + if not self.app.config.cache_user_job_count: + result = self.sa_session.execute(select([func.count(model.Job.table.c.id)]).where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id)))) + for row in result: + # there should only be one row + rval += row[0] + return rval + + def __cache_user_job_count( self ): + # Cache the job count if necessary + if self.user_job_count is None and self.app.config.cache_user_job_count: + self.user_job_count = {} + query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \ + .group_by(model.Job.table.c.user_id)) + for row in query: + self.user_job_count[row[0]] = row[1] + elif self.user_job_count is None: + self.user_job_count = {} + + def get_user_job_count_per_destination(self, user_id): + self.__cache_user_job_count_per_destination() + cached = self.user_job_count_per_destination.get(user_id, {}) + if self.app.config.cache_user_job_count: + rval = cached + else: + # The cached count is still used even when we're not caching, it is + # incremented when a job is run by this handler to ensure that + # multiple jobs can't get past the limits in one iteration of the + # queue. + rval = {} + rval.update(cached) + result = self.sa_session.execute(select([model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label('job_count')]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id))) \ + .group_by(model.Job.table.c.destination_id)) + for row in result: + # Add the count from the database to the cached count + rval[row['destination_id']] = rval.get(row['destination_id'], 0) + row['job_count'] + return rval + + def __cache_user_job_count_per_destination(self): + # Cache the job count if necessary + if self.user_job_count_per_destination is None and self.app.config.cache_user_job_count: + self.user_job_count_per_destination = {} + result = self.sa_session.execute(select([model.Job.table.c.user_id, model.Job.table.c.destination_id, func.count(model.Job.table.c.user_id).label('job_count')]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)))) \ + .group_by(model.Job.table.c.user_id, model.Job.table.c.destination_id)) + for row in result: + if row['user_id'] not in self.user_job_count_per_destination: + self.user_job_count_per_destination[row['user_id']] = {} + self.user_job_count_per_destination[row['user_id']][row['destination_id']] = row['job_count'] + elif self.user_job_count_per_destination is None: + self.user_job_count_per_destination = {} + + def increase_running_job_count(self, user_id, destination_id): + if self.user_job_count is None: + self.user_job_count = {} + if self.user_job_count_per_destination is None: + self.user_job_count_per_destination = {} + self.user_job_count[user_id] = self.user_job_count.get(user_id, 0) + 1 + if user_id not in self.user_job_count_per_destination: + self.user_job_count_per_destination[user_id] = {} + self.user_job_count_per_destination[user_id][destination_id] = self.user_job_count_per_destination[user_id].get(destination_id, 0) + 1 + + def __check_user_jobs( self, job, job_wrapper ): if job.user: # Check the hard limit first - if self.app.config.registered_user_job_limit: - # Cache the job count if necessary - if not self.user_job_count: - query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \ - .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \ - .group_by(model.Job.table.c.user_id)) - for row in query: - self.user_job_count[row[0]] = row[1] - if self.user_job_count.get(job.user_id, 0) >= self.app.config.registered_user_job_limit: + if self.app.job_config.limits.registered_user_concurrent_jobs: + count = self.get_user_job_count(job.user_id) + # Check the user's number of dispatched jobs against the overall limit + if count >= self.app.job_config.limits.registered_user_concurrent_jobs: return JOB_WAIT - # If we pass the hard limit, also check the per-runner count - if job.job_runner_name in self.app.config.job_limits: - # Cache the job count if necessary - if job.job_runner_name not in self.user_job_count_per_runner: - self.user_job_count_per_runner[job.job_runner_name] = {} - query_url, limit = self.app.config.job_limits[job.job_runner_name] - base_query = select([model.Job.table.c.user_id, model.Job.table.c.job_runner_name, func.count(model.Job.table.c.user_id).label('job_count')]) \ - .where(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))) \ - .group_by(model.Job.table.c.user_id, model.Job.table.c.job_runner_name) - if '%' in query_url or '_' in query_url: - subq = base_query.having(model.Job.table.c.job_runner_name.like(query_url)).alias('subq') - query = self.sa_session.execute(select([subq.c.user_id, func.sum(subq.c.job_count).label('job_count')]).group_by(subq.c.user_id)) - else: - query = self.sa_session.execute(base_query.having(model.Job.table.c.job_runner_name == query_url)) - for row in query: - self.user_job_count_per_runner[job.job_runner_name][row['user_id']] = row['job_count'] - if self.user_job_count_per_runner[job.job_runner_name].get(job.user_id, 0) >= self.app.config.job_limits[job.job_runner_name][1]: + # If we pass the hard limit, also check the per-destination count + id = job_wrapper.job_destination.id + count_per_id = self.get_user_job_count_per_destination(job.user_id) + if id in self.app.job_config.limits.concurrent_jobs: + count = count_per_id.get(id, 0) + # Check the user's number of dispatched jobs in the assigned destination id against the limit for that id + if count >= self.app.job_config.limits.concurrent_jobs[id]: return JOB_WAIT + # If we pass the destination limit (if there is one), also check limits on any tags (if any) + if job_wrapper.job_destination.tags: + for tag in job_wrapper.job_destination.tags: + # Check each tag for this job's destination + if tag in self.app.job_config.limits.concurrent_jobs: + # Only if there's a limit defined for this tag + count = 0 + for id in [ d.id for d in self.app.job_config.get_destinations(tag) ]: + # Add up the aggregate job total for this tag + count += count_per_id.get(id, 0) + if count >= self.app.job_config.limits.concurrent_jobs[tag]: + return JOB_WAIT + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, id) elif job.galaxy_session: # Anonymous users only get the hard limit - if self.app.config.anonymous_user_job_limit: + if self.app.job_config.limits.anonymous_user_concurrent_jobs: count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ .filter( and_( model.Job.session_id == job.galaxy_session.id, or_( model.Job.state == model.Job.states.RUNNING, model.Job.state == model.Job.states.QUEUED ) ) ).count() - if count >= self.app.config.anonymous_user_job_limit: + if count >= self.app.job_config.limits.anonymous_user_concurrent_jobs: return JOB_WAIT else: log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) @@ -431,58 +537,41 @@ class DefaultJobDispatcher( object ): def __init__( self, app ): self.app = app - self.job_runners = {} - start_job_runners = ["local", "lwr"] - if app.config.start_job_runners is not None: - start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] ) - if app.config.use_tasked_jobs: - start_job_runners.append("tasks") - for name in start_job_runners: - self._load_plugin( name ) - log.debug( "Job runners: " + ':'.join( start_job_runners ) ) - - def _load_plugin( self, name ): - module_name = 'galaxy.jobs.runners.' + name - try: - module = __import__( module_name ) - except: - log.exception( 'Job runner is not loadable: %s' % module_name ) - return - for comp in module_name.split( "." )[1:]: - module = getattr( module, comp ) - if '__all__' not in dir( module ): - log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name ) - return - for obj in module.__all__: - display_name = ':'.join( ( module_name, obj ) ) - runner = getattr( module, obj ) - self.job_runners[name] = runner( self.app ) - log.debug( 'Loaded job runner: %s' % display_name ) + self.job_runners = self.app.job_config.get_job_runner_plugins() + # Once plugins are loaded, all job destinations that were created from + # URLs can have their URL params converted to the destination's param + # dict by the plugin. + self.app.job_config.convert_legacy_destinations(self.job_runners) + log.debug( "Loaded job runners plugins: " + ':'.join( self.job_runners.keys() ) ) def __get_runner_name( self, job_wrapper ): if job_wrapper.can_split(): runner_name = "tasks" else: - runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0] + runner_name = job_wrapper.job_destination.runner return runner_name + def url_to_destination( self, url ): + """This is used by the runner mapper (a.k.a. dynamic runner) and + recovery methods to have runners convert URLs to destinations. + + New-style runner plugin IDs must match the URL's scheme for this to work. + """ + runner_name = url.split(':', 1)[0] + try: + return self.job_runners[runner_name].url_to_destination(url) + except Exception, e: + log.exception("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e)) + return JobDestination(runner=runner_name) + def put( self, job_wrapper ): - try: - runner_name = self.__get_runner_name( job_wrapper ) - except Exception, e: - failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) - if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: - log.exception( 'Failed to generate job runner name' ) - else: - log.debug( "Intentionally failing job with message (%s)" % failure_message ) - job_wrapper.fail( failure_message ) - return + runner_name = self.__get_runner_name( job_wrapper ) try: if isinstance(job_wrapper, TaskWrapper): #DBTODO Refactor - log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) + log.debug( "(%s) Dispatching task %s to %s runner" %( job_wrapper.job_id, job_wrapper.task_id, runner_name ) ) else: - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + log.debug( "(%s) Dispatching to %s runner" %( job_wrapper.job_id, runner_name ) ) self.job_runners[runner_name].put( job_wrapper ) except KeyError: log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -26,257 +26,18 @@ """ def __init__( self, app ): self.app = app - self.job_handler = NoopHandler() - if self.app.config.server_name in self.app.config.job_handlers: + if (self.app.config.track_jobs_in_database and self.app.job_config.is_handler(self.app.config.server_name)) or not self.app.config.track_jobs_in_database: + log.debug("Starting job handler") self.job_handler = handler.JobHandler( app ) - if self.app.config.server_name == self.app.config.job_manager: - job_handler = NoopHandler() - # In the case that webapp == manager == handler, pass jobs in memory - if not self.app.config.track_jobs_in_database: - job_handler = self.job_handler - # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database - self.job_queue = JobManagerQueue( app, job_handler ) - self.job_stop_queue = JobManagerStopQueue( app, job_handler ) - if self.app.config.enable_beta_job_managers: - from galaxy.jobs.deferred import DeferredJobQueue - self.deferred_job_queue = DeferredJobQueue( app ) + self.job_queue = self.job_handler.job_queue + self.job_stop_queue = self.job_handler.job_stop_queue else: + self.job_handler = NoopHandler() self.job_queue = self.job_stop_queue = NoopQueue() self.job_handler.start() def shutdown( self ): - self.job_queue.shutdown() - self.job_stop_queue.shutdown() self.job_handler.shutdown() -class JobManagerQueue( object ): - """ - Job manager, waits for jobs to be runnable and then dispatches to a - JobHandler. - """ - STOP_SIGNAL = object() - def __init__( self, app, job_handler ): - self.app = app - self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory - - self.sa_session = app.model.context - self.job_lock = False - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.monitor_thread = threading.Thread( name="JobManagerQueue.monitor_thread", target=self.__monitor ) - self.monitor_thread.setDaemon( True ) - # Recover jobs at startup - self.__check_jobs_at_startup() - # Start the queue - self.monitor_thread.start() - log.info( "job manager queue started" ) - - def __check_jobs_at_startup( self ): - """ - Checks all jobs that are in the 'new', 'queued' or 'running' state in - the database and requeues or cleans up as necessary. Only run as the - job manager starts. - """ - for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( ( ( model.Job.state == model.Job.states.NEW ) \ - | ( model.Job.state == model.Job.states.RUNNING ) \ - | ( model.Job.state == model.Job.states.QUEUED ) ) \ - & ( model.Job.handler == None ) ): - if job.tool_id not in self.app.toolbox.tools_by_id: - log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) - JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) - else: - job.handler = self.__get_handler( job ) # handler's recovery method will take it from here - log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) ) - if self.sa_session.dirty: - self.sa_session.flush() - - def __monitor( self ): - """ - Continually iterate the waiting jobs and dispatch to a handler - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.__monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def __monitor_step( self ): - """ - Called repeatedly by `monitor` to process waiting jobs. Gets any new - jobs (either from the database or from its own queue), then assigns a - handler. - """ - # Do nothing if the queue is locked - if self.job_lock: - log.info( 'Job queue is administratively locked, sleeping...' ) - time.sleep( 10 ) - return - # Pull all new jobs from the queue at once - jobs_to_check = [] - if self.app.config.track_jobs_in_database: - # Clear the session so we get fresh states for job and all datasets - self.sa_session.expunge_all() - # Fetch all new jobs - jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( ( model.Job.state == model.Job.states.NEW ) \ - & ( model.Job.handler == None ) ).all() - else: - # Get job objects and append to watch queue for any which were - # previously waiting - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, tool_id = message - # Get the job object and append to watch queue - jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) - except Empty: - pass - - for job in jobs_to_check: - job.handler = self.__get_handler( job ) - job.job_runner_name = self.__get_runner_url( job ) - log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) ) - self.sa_session.add( job ) - - # If tracking in the database, handlers will pick up the job now - self.sa_session.flush() - - time.sleep( 5 ) - - # This only does something in the case that there is only one handler and it is this Galaxy process - for job in jobs_to_check: - self.job_handler.job_queue.put( job.id, job.tool_id ) - - def __get_handler( self, job ): - try: - params = None - if job.params: - params = from_json_string( job.params ) - return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params ) - except: - log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) ) - return random.choice( self.app.config.job_handlers ) - - def __get_runner_url( self, job ): - """This fetches the raw runner URL, and does not perform any computation e.g. for the dynamic runner""" - try: - return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_runner_url( job.params ) - except Exception, e: - log.warning( 'Unable to determine job runner URL for job %s: %s' % (job.id, str(e)) ) - return None - - def put( self, job_id, tool ): - """Add a job to the queue (by job identifier)""" - if not self.app.config.track_jobs_in_database: - self.queue.put( ( job_id, tool.id ) ) - self.sleeper.wake() - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - if not self.app.config.track_jobs_in_database: - self.queue.put( self.STOP_SIGNAL ) - self.sleeper.wake() - log.info( "job manager queue stopped" ) - -class JobManagerStopQueue( object ): - """ - A queue for jobs which need to be terminated prematurely. - """ - STOP_SIGNAL = object() - def __init__( self, app, job_handler ): - self.app = app - self.job_handler = job_handler - - self.sa_session = app.model.context - - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - - # Contains jobs that are waiting (only use from monitor thread) - self.waiting = [] - - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.monitor_thread = threading.Thread( name="JobManagerStopQueue.monitor_thread", target=self.monitor ) - self.monitor_thread.setDaemon( True ) - self.monitor_thread.start() - log.info( "job manager stop queue started" ) - - def monitor( self ): - """ - Continually iterate the waiting jobs, stop any that are found. - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def monitor_step( self ): - """ - Called repeatedly by `monitor` to stop jobs. - """ - jobs_to_check = [] - # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs) - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, error_msg = message - # Get the job object and append to watch queue - jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) ) - except Empty: - pass - - # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler. - for job, error_msg in jobs_to_check: - self.job_handler.job_stop_queue.put( job.id, error_msg ) - - def put( self, job_id, error_msg=None ): - self.queue.put( ( job_id, error_msg ) ) - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - if not self.app.config.track_jobs_in_database: - self.queue.put( self.STOP_SIGNAL ) - self.sleeper.wake() - log.info( "job manager stop queue stopped" ) - class NoopHandler( object ): def __init__( self, *args, **kwargs ): self.job_queue = NoopQueue() diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -6,7 +6,8 @@ import galaxy.jobs.rules -DYNAMIC_RUNNER_PREFIX = "dynamic:///" +DYNAMIC_RUNNER_NAME = "dynamic" +DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url" class JobMappingException( Exception ): @@ -20,9 +21,9 @@ (in the form of job_wrappers) to job runner url strings. """ - def __init__( self, job_wrapper, job_runner_name=None ): + def __init__( self, job_wrapper, url_to_destination ): self.job_wrapper = job_wrapper - self.job_runner_name = job_runner_name + self.url_to_destination = url_to_destination self.rule_modules = self.__get_rule_modules( ) def __get_rule_modules( self ): @@ -87,11 +88,23 @@ return expand_function( **actual_args ) - def __determine_expand_function_name( self, option_parts ): + def __convert_url_to_destination( self, url ): + """ + Job runner URLs are deprecated, but dynamic mapper functions may still + be returning them. Runners are expected to be able to convert these to + destinations. + + This method calls + JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn + calls the url_to_destination method for the appropriate runner. + """ + dest = self.url_to_destination( url ) + dest['id'] = DYNAMIC_DESTINATION_ID + return dest + + def __determine_expand_function_name( self, destination ): # default look for function with same name as tool, unless one specified - expand_function_name = self.job_wrapper.tool.id - if len( option_parts ) > 1: - expand_function_name = option_parts[ 1 ] + expand_function_name = destination.params.get('function', self.job_wrapper.tool.id) return expand_function_name def __get_expand_function( self, expand_function_name ): @@ -110,32 +123,57 @@ return rule_module return None - def __expand_dynamic_job_runner_url( self, options_str ): - option_parts = options_str.split( '/' ) - expand_type = option_parts[ 0 ] + def __handle_dynamic_job_destination( self, destination ): + expand_type = destination.params.get('type', None) if expand_type == "python": - expand_function_name = self.__determine_expand_function_name( option_parts ) + expand_function_name = self.__determine_expand_function_name( destination ) expand_function = self.__get_expand_function( expand_function_name ) - return self.__invoke_expand_function( expand_function ) + rval = self.__invoke_expand_function( expand_function ) + # TODO: test me extensively + if isinstance(rval, basestring): + # If the function returned a string, check if it's a URL, convert if necessary + if '://' in rval: + return self.__convert_url_to_destination(rval) + else: + return self.app.job_config.get_destination(rval) + elif isinstance(rval, galaxy.jobs.JobDestination): + # If the function generated a JobDestination, we'll use that + # destination directly. However, for advanced job limiting, a + # function may want to set the JobDestination's 'tags' + # attribute so that limiting can be done on a destination tag. + #id_or_tag = rval.get('id') + #if rval.get('tags', None): + # # functions that are generating destinations should only define one tag + # id_or_tag = rval.get('tags')[0] + #return id_or_tag, rval + return rval + else: + raise Exception( 'Dynamic function returned a value that could not be understood: %s' % rval ) + elif expand_type is None: + raise Exception( 'Dynamic function type not specified (hint: add <param id="type">python</param> to your <destination>)' ) else: raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) - def __cache_job_runner_url( self, params ): - # If there's already a runner set in the Job object, don't overwrite from the tool - if self.job_runner_name is not None and not self.job_runner_name.startswith('tasks'): - raw_job_runner_url = self.job_runner_name + def __cache_job_destination( self, params ): + raw_job_destination = self.job_wrapper.tool.get_job_destination( params ) + #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params ) + if raw_job_destination.runner == DYNAMIC_RUNNER_NAME: + job_destination = self.__handle_dynamic_job_destination( raw_job_destination ) else: - raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params ) - if raw_job_runner_url.startswith( DYNAMIC_RUNNER_PREFIX ): - job_runner_url = self.__expand_dynamic_job_runner_url( raw_job_runner_url[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) - else: - job_runner_url = raw_job_runner_url - self.cached_job_runner_url = job_runner_url + job_destination = raw_job_destination + #job_destination_id_or_tag = raw_job_destination_id_or_tag + self.cached_job_destination = job_destination + #self.cached_job_destination_id_or_tag = job_destination_id_or_tag - def get_job_runner_url( self, params ): + def get_job_destination( self, params ): """ - Cache the job_runner_url string to avoid recalculation. + Cache the job_destination to avoid recalculation. """ - if not hasattr( self, 'cached_job_runner_url' ): - self.__cache_job_runner_url( params ) - return self.cached_job_runner_url + if not hasattr( self, 'cached_job_destination' ): + self.__cache_job_destination( params ) + return self.cached_job_destination + + #def get_job_destination_id_or_tag( self, params ): + # if not hasattr( self, 'cached_job_destination_id_or_tag' ): + # self.__cache_job_destination( params ) + # return self.cached_job_destination_id_or_tag diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -1,13 +1,129 @@ -import os, logging, os.path +""" +Base classes for job runner plugins. +""" +import os +import time +import logging +import threading + +from Queue import Queue, Empty + +import galaxy.jobs from galaxy import model -from Queue import Queue, Empty -import time -import threading log = logging.getLogger( __name__ ) +STOP_SIGNAL = object() + class BaseJobRunner( object ): + def __init__( self, app, nworkers ): + """Start the job runner + """ + self.app = app + self.sa_session = app.model.context + self.nworkers = nworkers + + def _init_worker_threads(self): + """Start ``nworkers`` worker threads. + """ + self.work_queue = Queue() + self.work_threads = [] + log.debug('Starting %s %s workers' % (self.nworkers, self.runner_name)) + for i in range(self.nworkers): + worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next ) + worker.setDaemon( True ) + worker.start() + self.work_threads.append( worker ) + + def run_next(self): + """Run the next item in the work queue (a job waiting to run) + """ + while 1: + ( method, arg ) = self.work_queue.get() + if method is STOP_SIGNAL: + return + # id and name are collected first so that the call of method() is the last exception. + try: + # arg should be a JobWrapper/TaskWrapper + job_id = arg.get_id_tag() + except: + job_id = 'unknown' + try: + name = method.__name__ + except: + name = 'unknown' + try: + method(arg) + except: + log.exception( "(%s) Unhandled exception calling %s" % ( job_id, name ) ) + + # Causes a runner's `queue_job` method to be called from a worker thread + def put(self, job_wrapper): + """Add a job to the queue (by job identifier), indicate that the job is ready to run. + """ + # Change to queued state before handing to worker thread so the runner won't pick it up again + job_wrapper.change_state( model.Job.states.QUEUED ) + # Persist the destination so that the job will be included in counts if using concurrency limits + job_wrapper.set_job_destination( job_wrapper.job_destination, None ) + self.mark_as_queued(job_wrapper) + + def mark_as_queued(self, job_wrapper): + self.work_queue.put( ( self.queue_job, job_wrapper ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the worker threads + """ + log.info( "%s: Sending stop signal to %s worker threads" % ( self.runner_name, len( self.work_threads ) ) ) + for i in range( len( self.work_threads ) ): + self.work_queue.put( ( STOP_SIGNAL, None ) ) + + # Most runners should override the legacy URL handler methods and destination param method + def url_to_destination(self, url): + """ + Convert a legacy URL to a JobDestination. + + Job runner URLs are deprecated, JobDestinations should be used instead. + This base class method converts from a URL to a very basic + JobDestination without destination params. + """ + return galaxy.jobs.JobDestination(runner=url.split(':')[0]) + + def parse_destination_params(self, params): + """Parse the JobDestination ``params`` dict and return the runner's native representation of those params. + """ + raise NotImplementedError() + + # Runners must override the job handling methods + def queue_job(self, job_wrapper): + """Some sanity checks that all runners' queue_job() methods are likely to want to do + """ + job_id = job_wrapper.get_id_tag() + job_state = job_wrapper.get_state() + job_wrapper.is_ready = False + + # Make sure the job hasn't been deleted + if job_state != model.Job.states.QUEUED: + log.info( "(%d) Job is in state %s, skipping execution" % ( job_id, job_state ) ) + return + + # Prepare the job + try: + job_wrapper.prepare() + job_wrapper.runner_command_line = self.build_command_line( job_wrapper ) + except: + log.exception("(%d) Failure preparing job" % job_id) + job_wrapper.fail( "failure preparing job", exception=True ) + return + + job_wrapper.is_ready = True + + def stop_job(self, job): + raise NotImplementedError() + + def recover(self, job, job_wrapper): + raise NotImplementedError() + def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ): """ Compose the sequence of commands necessary to execute a job. This will @@ -104,12 +220,11 @@ log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) ) return output_pairs - -class ClusterJobState( object ): +class AsynchronousJobState( object ): """ - Encapsulate the state of a cluster job, this should be subclassed as + Encapsulate the state of an asynchronous job, this should be subclassed as needed for various job runners to capture additional information needed - to communicate with cluster job manager. + to communicate with distributed resource manager. """ def __init__( self ): @@ -117,23 +232,22 @@ self.job_id = None self.old_state = None self.running = False - self.runner_url = None + self.job_file = None + self.output_file = None + self.error_file = None + self.exit_code_file = None + self.check_count = 0 + self.job_destination = None -STOP_SIGNAL = object() - -JOB_STATUS_QUEUED = 'queue' -JOB_STATUS_FAILED = 'fail' -JOB_STATUS_FINISHED = 'finish' - -class ClusterJobRunner( BaseJobRunner ): - """ - Not sure this is the best name for this class, but there is common code - shared between sge, pbs, drmaa, etc... +class AsynchronousJobRunner( BaseJobRunner ): + """Parent class for any job runner that runs jobs asynchronously (e.g. via + a distributed resource manager). Provides general methods for having a + thread to monitor the state of asynchronous jobs and submitting those jobs + to the correct methods (queue, finish, cleanup) at appropriate times.. """ - def __init__( self, app ): - self.app = app - self.sa_session = app.model.context + def __init__( self, app, nworkers ): + super( AsynchronousJobRunner, self ).__init__( app, nworkers ) # 'watched' and 'queue' are both used to keep track of jobs to watch. # 'queue' is used to add new watched jobs, and can be called from # any thread (usually by the 'queue_job' method). 'watched' must only @@ -147,82 +261,44 @@ self.monitor_thread.setDaemon( True ) self.monitor_thread.start() - def _init_worker_threads(self): - self.work_queue = Queue() - self.work_threads = [] - nworkers = self.app.config.cluster_job_queue_workers - for i in range( nworkers ): - worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next ) - worker.setDaemon( True ) - worker.start() - self.work_threads.append( worker ) - def handle_stop(self): # DRMAA and SGE runners should override this and disconnect. pass def monitor( self ): """ - Watches jobs currently in the cluster queue and deals with state changes - (queued to running) and job completion + Watches jobs currently in the monitor queue and deals with state + changes (queued to running) and job completion. """ while 1: # Take any new watched jobs and put them on the monitor list try: while 1: - cluster_job_state = self.monitor_queue.get_nowait() - if cluster_job_state is STOP_SIGNAL: + async_job_state = self.monitor_queue.get_nowait() + if async_job_state is STOP_SIGNAL: # TODO: This is where any cleanup would occur self.handle_stop() return - self.watched.append( cluster_job_state ) + self.watched.append( async_job_state ) except Empty: pass # Iterate over the list of watched jobs and check state - self.check_watched_items() + try: + self.check_watched_items() + except Exception, e: + log.exception('Unhandled exception checking active jobs') # Sleep a bit before the next state check time.sleep( 1 ) - def run_next( self ): - """ - Run the next item in the queue (a job waiting to run or finish ) - """ - while 1: - ( op, obj ) = self.work_queue.get() - if op is STOP_SIGNAL: - return - try: - if op == JOB_STATUS_QUEUED: - # If the next item is to be run, then only run it if the - # job state is "queued". Otherwise the next item was either - # cancelled or one of its siblings encountered an error. - job_state = obj.get_state() - if model.Job.states.QUEUED == job_state: - self.queue_job( obj ) - else: - log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) ) - elif op == JOB_STATUS_FINISHED: - self.finish_job( obj ) - elif op == JOB_STATUS_FAILED: - self.fail_job( obj ) - except: - log.exception( "Uncaught exception %sing job" % op ) - def monitor_job(self, job_state): self.monitor_queue.put( job_state ) - def put( self, job_wrapper ): - """Add a job to the queue (by job identifier)""" - # Change to queued state before handing to worker thread so the runner won't pick it up again - job_wrapper.change_state( model.Job.states.QUEUED ) - self.mark_as_queued(job_wrapper) - def shutdown( self ): """Attempts to gracefully shut down the monitor thread""" - log.info( "sending stop signal to worker threads" ) + log.info( "%s: Sending stop signal to monitor thread" % self.runner_name ) self.monitor_queue.put( STOP_SIGNAL ) - for i in range( len( self.work_threads ) ): - self.work_queue.put( ( STOP_SIGNAL, None ) ) + # Call the parent's shutdown method to stop workers + super( AsynchronousJobRunner, self ).shutdown() def check_watched_items(self): """ @@ -233,19 +309,16 @@ reuse the logic here. """ new_watched = [] - for cluster_job_state in self.watched: - new_cluster_job_state = self.check_watched_item(cluster_job_state) - if new_cluster_job_state: - new_watched.append(new_cluster_job_state) + for async_job_state in self.watched: + new_async_job_state = self.check_watched_item(async_job_state) + if new_async_job_state: + new_watched.append(new_async_job_state) self.watched = new_watched # Subclasses should implement this unless they override check_watched_items all together. def check_watched_item(self): raise NotImplementedError() - def queue_job(self, job_wrapper): - raise NotImplementedError() - def finish_job(self, job_state): raise NotImplementedError() @@ -253,10 +326,7 @@ raise NotImplementedError() def mark_as_finished(self, job_state): - self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) ) + self.work_queue.put( ( self.finish_job, job_state ) ) def mark_as_failed(self, job_state): - self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) ) - - def mark_as_queued(self, job_wrapper): - self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) ) + self.work_queue.put( ( self.fail_job, job_state ) ) diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -89,7 +89,7 @@ Job runner backed by a finite pool of worker threads. FIFO scheduling """ STOP_SIGNAL = object() - def __init__( self, app ): + def __init__( self, app, nworkers ): """Initialize this job runner and start the monitor thread""" # Check if drmaa was importable, fail if not self.app = app This diff is so big that we needed to truncate the remainder. Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket