galaxy-commits
Threads by month
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
March 2012
- 1 participants
- 112 discussions
commit/galaxy-central: natefoo: Allow job handlers to be exempted from the default random choice pool.
by Bitbucket 31 Mar '12
by Bitbucket 31 Mar '12
31 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/4dd7564268fc/
changeset: 4dd7564268fc
user: natefoo
date: 2012-03-31 16:06:43
summary: Allow job handlers to be exempted from the default random choice pool.
affected #: 3 files
diff -r 40abca016f1ebd7be7adb362614eb16d8f2116bf -r 4dd7564268fcd4f5bd3275fa3b5da012b7adefb5 lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -193,6 +193,7 @@
# Store advanced job management config
self.job_manager = kwargs.get('job_manager', self.server_name).strip()
self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ]
+ self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ]
# Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory)
self.track_jobs_in_database = True
if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
diff -r 40abca016f1ebd7be7adb362614eb16d8f2116bf -r 4dd7564268fcd4f5bd3275fa3b5da012b7adefb5 lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py
+++ b/lib/galaxy/tools/__init__.py
@@ -791,7 +791,7 @@
self.parallelism = None
# Set job handler(s). Each handler is a dict with 'url' and, optionally, 'params'.
self_id = self.id.lower()
- self.job_handlers = [ { "name" : name } for name in self.app.config.job_handlers ]
+ self.job_handlers = [ { "name" : name } for name in self.app.config.default_job_handlers ]
# Set custom handler(s) if they're defined.
if self_id in self.app.config.tool_handlers:
self.job_handlers = self.app.config.tool_handlers[ self_id ]
diff -r 40abca016f1ebd7be7adb362614eb16d8f2116bf -r 4dd7564268fcd4f5bd3275fa3b5da012b7adefb5 universe_wsgi.ini.sample
--- a/universe_wsgi.ini.sample
+++ b/universe_wsgi.ini.sample
@@ -562,6 +562,13 @@
# comma-separated list.
#job_handlers = main
+# By default, a handler from job_handlers will be selected at random if the
+# tool to run does specify a handler below in [galaxy:tool_handlers]. If you
+# want certain handlers to only handle jobs for tools/params explicitly
+# assigned below, use default_job_handlers to specify which handlers should be
+# used for jobs without explicit handlers.
+#default_job_handlers = main
+
# This enables splitting of jobs into tasks, if specified by the particular tool config.
# This is a new feature and not recommended for production servers yet.
#use_tasked_jobs = False
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dan: Small fix for setting upload path in GenomeSpace export tool.
by Bitbucket 30 Mar '12
by Bitbucket 30 Mar '12
30 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/40abca016f1e/
changeset: 40abca016f1e
user: dan
date: 2012-03-30 23:08:35
summary: Small fix for setting upload path in GenomeSpace export tool.
affected #: 1 file
diff -r de97f00ff16feb64cd26d3393a68c303f33e5e33 -r 40abca016f1ebd7be7adb362614eb16d8f2116bf tools/genomespace/genomespace_exporter.py
--- a/tools/genomespace/genomespace_exporter.py
+++ b/tools/genomespace/genomespace_exporter.py
@@ -49,7 +49,8 @@
def get_directory( url_opener, dm_url, path ):
url = dm_url
- for sub_path in path:
+ i = None
+ for i, sub_path in enumerate( path ):
url = "%s/%s" % ( url, sub_path )
dir_request = urllib2.Request( url, headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } )
dir_request.get_method = lambda: 'GET'
@@ -59,10 +60,14 @@
#print "e", e, url #punting, assuming lack of permisions at this low of a level...
continue
break
- return dir_dict
+ if i is not None:
+ path = path[i+1:]
+ else:
+ path = []
+ return ( dir_dict, path )
def get_default_directory( url_opener, dm_url ):
- return get_directory( url_opener, dm_url, ["defaultdirectory"] )
+ return get_directory( url_opener, dm_url, ["defaultdirectory"] )[0]
def create_directory( url_opener, directory_dict, new_dir, dm_url ):
payload = { "isDirectory": True }
@@ -142,8 +147,8 @@
dm_url = genomespace_site_dict['dmServer']
#get default directory
if target_directory and target_directory[0] == '/':
- directory_dict = get_directory( url_opener, dm_url, [ "%s/%s/%s" % ( GENOMESPACE_API_VERSION_STRING, 'file', target_directory[1] ) ] + target_directory[2:] )['directory']
- target_directory.pop(0)
+ directory_dict, target_directory = get_directory( url_opener, dm_url, [ "%s/%s/%s" % ( GENOMESPACE_API_VERSION_STRING, 'file', target_directory[1] ) ] + target_directory[2:] )
+ directory_dict = directory_dict['directory']
else:
directory_dict = get_default_directory( url_opener, dm_url )['directory']
#what directory to stuff this in
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Additional setting to fix tool shed config which was broken due to the additions in the Galaxy webapp in recent commits.
by Bitbucket 30 Mar '12
by Bitbucket 30 Mar '12
30 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/de97f00ff16f/
changeset: de97f00ff16f
user: greg
date: 2012-03-30 20:37:55
summary: Additional setting to fix tool shed config which was broken due to the additions in the Galaxy webapp in recent commits.
affected #: 1 file
diff -r 25db1453bb2e55d7d58d407364ce71001bd24096 -r de97f00ff16feb64cd26d3393a68c303f33e5e33 lib/galaxy/webapps/community/config.py
--- a/lib/galaxy/webapps/community/config.py
+++ b/lib/galaxy/webapps/community/config.py
@@ -84,6 +84,11 @@
self.screencasts_url = kwargs.get( 'screencasts_url', None )
self.log_events = False
self.cloud_controller_instance = False
+ self.server_name = ''
+ self.job_manager = ''
+ self.job_handlers = []
+ self.tool_handlers = []
+ self.tool_runners = []
# Proxy features
self.apache_xsendfile = kwargs.get( 'apache_xsendfile', False )
self.nginx_x_accel_redirect_base = kwargs.get( 'nginx_x_accel_redirect_base', False )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/25db1453bb2e/
changeset: 25db1453bb2e
user: greg
date: 2012-03-30 20:19:44
summary: Enhance odict to enable insertion into a specified location in it's keys, and enhance integrated_tool_panel generation to take advantage of this. New tool panel items will now be inserted into an existing integrated_tool_panel.xml file in the location in whatever tool panel configuration file the new item has been added.
affected #: 2 files
diff -r 0626ddb49084b94248e9621d584409d398fd546d -r 25db1453bb2e55d7d58d407364ce71001bd24096 lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py
+++ b/lib/galaxy/tools/__init__.py
@@ -114,17 +114,17 @@
tool_path = self.tool_root_dir
# Only load the panel_dict under certain conditions.
load_panel_dict = not self.integrated_tool_panel_config_has_contents
- for elem in root:
+ for index, elem in enumerate( root ):
if parsing_shed_tool_conf:
config_elems.append( elem )
if elem.tag == 'tool':
- self.load_tool_tag_set( elem, self.tool_panel, self.integrated_tool_panel, tool_path, load_panel_dict, guid=elem.get( 'guid' ) )
+ self.load_tool_tag_set( elem, self.tool_panel, self.integrated_tool_panel, tool_path, load_panel_dict, guid=elem.get( 'guid' ), index=index )
elif elem.tag == 'workflow':
- self.load_workflow_tag_set( elem, self.tool_panel, self.integrated_tool_panel, load_panel_dict )
+ self.load_workflow_tag_set( elem, self.tool_panel, self.integrated_tool_panel, load_panel_dict, index=index )
elif elem.tag == 'section':
- self.load_section_tag_set( elem, tool_path, load_panel_dict )
+ self.load_section_tag_set( elem, tool_path, load_panel_dict, index=index )
elif elem.tag == 'label':
- self.load_label_tag_set( elem, self.tool_panel, self.integrated_tool_panel, load_panel_dict )
+ self.load_label_tag_set( elem, self.tool_panel, self.integrated_tool_panel, load_panel_dict, index=index )
if parsing_shed_tool_conf:
shed_tool_conf_dict = dict( config_filename=config_filename,
tool_path=tool_path,
@@ -286,7 +286,7 @@
self.app.model.ToolShedRepository.table.c.owner == owner,
self.app.model.ToolShedRepository.table.c.installed_changeset_revision == installed_changeset_revision ) ) \
.first()
- def load_tool_tag_set( self, elem, panel_dict, integrated_panel_dict, tool_path, load_panel_dict, guid=None ):
+ def load_tool_tag_set( self, elem, panel_dict, integrated_panel_dict, tool_path, load_panel_dict, guid=None, index=None ):
try:
path = elem.get( "file" )
if guid is None:
@@ -354,10 +354,13 @@
if load_panel_dict:
panel_dict[ key ] = tool
# Always load the tool into the integrated_panel_dict, or it will not be included in the integrated_tool_panel.xml file.
- integrated_panel_dict[ key ] = tool
+ if key in integrated_panel_dict or index is None:
+ integrated_panel_dict[ key ] = tool
+ else:
+ integrated_panel_dict.insert( index, key, tool )
except:
log.exception( "Error reading tool from path: %s" % path )
- def load_workflow_tag_set( self, elem, panel_dict, integrated_panel_dict, load_panel_dict ):
+ def load_workflow_tag_set( self, elem, panel_dict, integrated_panel_dict, load_panel_dict, index=None ):
try:
# TODO: should id be encoded?
workflow_id = elem.get( 'id' )
@@ -367,16 +370,22 @@
if load_panel_dict:
panel_dict[ key ] = workflow
# Always load workflows into the integrated_panel_dict.
- integrated_panel_dict[ key ] = workflow
+ if key in integrated_panel_dict or index is None:
+ integrated_panel_dict[ key ] = workflow
+ else:
+ integrated_panel_dict.insert( index, key, workflow )
except:
log.exception( "Error loading workflow: %s" % workflow_id )
- def load_label_tag_set( self, elem, panel_dict, integrated_panel_dict, load_panel_dict ):
+ def load_label_tag_set( self, elem, panel_dict, integrated_panel_dict, load_panel_dict, index=None ):
label = ToolSectionLabel( elem )
key = 'label_' + label.id
if load_panel_dict:
panel_dict[ key ] = label
- integrated_panel_dict[ key ] = label
- def load_section_tag_set( self, elem, tool_path, load_panel_dict ):
+ if key in integrated_panel_dict or index is None:
+ integrated_panel_dict[ key ] = label
+ else:
+ integrated_panel_dict.insert( index, key, label )
+ def load_section_tag_set( self, elem, tool_path, load_panel_dict, index=None ):
key = 'section_' + elem.get( "id" )
if key in self.tool_panel:
section = self.tool_panel[ key ]
@@ -390,17 +399,20 @@
else:
integrated_section = ToolSection( elem )
integrated_elems = integrated_section.elems
- for sub_elem in elem:
+ for sub_index, sub_elem in enumerate( elem ):
if sub_elem.tag == 'tool':
- self.load_tool_tag_set( sub_elem, elems, integrated_elems, tool_path, load_panel_dict, guid=sub_elem.get( 'guid' ) )
+ self.load_tool_tag_set( sub_elem, elems, integrated_elems, tool_path, load_panel_dict, guid=sub_elem.get( 'guid' ), index=sub_index )
elif sub_elem.tag == 'workflow':
- self.load_workflow_tag_set( sub_elem, elems, integrated_elems, load_panel_dict )
+ self.load_workflow_tag_set( sub_elem, elems, integrated_elems, load_panel_dict, index=sub_index )
elif sub_elem.tag == 'label':
- self.load_label_tag_set( sub_elem, elems, integrated_elems, load_panel_dict )
+ self.load_label_tag_set( sub_elem, elems, integrated_elems, load_panel_dict, index=sub_index )
if load_panel_dict:
self.tool_panel[ key ] = section
# Always load sections into the integrated_tool_panel.
- self.integrated_tool_panel[ key ] = integrated_section
+ if key in self.integrated_tool_panel or index is None:
+ self.integrated_tool_panel[ key ] = integrated_section
+ else:
+ self.integrated_tool_panel.insert( index, key, integrated_section )
def load_tool( self, config_file, guid=None ):
"""Load a single tool from the file named by `config_file` and return an instance of `Tool`."""
# Parse XML configuration file and get the root element
diff -r 0626ddb49084b94248e9621d584409d398fd546d -r 25db1453bb2e55d7d58d407364ce71001bd24096 lib/galaxy/util/odict.py
--- a/lib/galaxy/util/odict.py
+++ b/lib/galaxy/util/odict.py
@@ -11,23 +11,22 @@
This dictionary class extends UserDict to record the order in which items are
added. Calling keys(), values(), items(), etc. will return results in this
order.
+ """
+ def __init__( self, dict = None ):
+ self._keys = []
+ UserDict.__init__( self, dict )
- I've added iterkeys, itervalues, iteritems
- """
- def __init__(self, dict = None):
- self._keys = []
- UserDict.__init__(self, dict)
+ def __delitem__( self, key ):
+ UserDict.__delitem__( self, key )
+ self._keys.remove( key )
- def __delitem__(self, key):
- UserDict.__delitem__(self, key)
- self._keys.remove(key)
+ def __setitem__( self, key, item ):
+ UserDict.__setitem__( self, key, item )
+ if key not in self._keys:
+ self._keys.append( key )
- def __setitem__(self, key, item):
- UserDict.__setitem__(self, key, item)
- if key not in self._keys: self._keys.append(key)
-
- def clear(self):
- UserDict.clear(self)
+ def clear( self ):
+ UserDict.clear( self )
self._keys = []
def copy(self):
@@ -35,49 +34,43 @@
new.update( self )
return new
- def items(self):
- return zip(self._keys, self.values())
+ def items( self ):
+ return zip( self._keys, self.values() )
- def keys(self):
+ def keys( self ):
return self._keys[:]
- def popitem(self):
+ def popitem( self ):
try:
key = self._keys[-1]
except IndexError:
- raise KeyError('dictionary is empty')
+ raise KeyError( 'dictionary is empty' )
+ val = self[ key ]
+ del self[ key ]
+ return ( key, val )
- val = self[key]
- del self[key]
+ def setdefault( self, key, failobj=None ):
+ if key not in self._keys:
+ self._keys.append( key )
+ return UserDict.setdefault( self, key, failobj )
- return (key, val)
+ def update( self, dict ):
+ for ( key, val ) in dict.items():
+ self.__setitem__( key, val )
- def setdefault(self, key, failobj = None):
- if key not in self._keys: self._keys.append(key)
- return UserDict.setdefault(self, key, failobj)
+ def values( self ):
+ return map( self.get, self._keys )
- def update(self, dict):
- UserDict.update(self, dict)
- for key in dict.keys():
- if key not in self._keys: self._keys.append(key)
-
- def update(self, dict):
- for (key,val) in dict.items():
- self.__setitem__(key,val)
-
- def values(self):
- return map(self.get, self._keys)
-
- def iterkeys(self):
+ def iterkeys( self ):
return iter( self._keys )
- def itervalues(self):
+ def itervalues( self ):
for key in self._keys:
- yield self.get(key)
+ yield self.get( key )
- def iteritems(self):
+ def iteritems( self ):
for key in self._keys:
- yield key, self.get(key)
+ yield key, self.get( key )
def __iter__( self ):
for key in self._keys:
@@ -86,3 +79,7 @@
def reverse( self ):
self._keys.reverse()
+ def insert( self, index, key, item ):
+ if key not in self._keys:
+ self._keys.insert( index, key )
+ UserDict.__setitem__( self, key, item )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: natefoo: Allow for job handler selection based on job params (like source).
by Bitbucket 30 Mar '12
by Bitbucket 30 Mar '12
30 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/0626ddb49084/
changeset: 0626ddb49084
user: natefoo
date: 2012-03-30 20:10:45
summary: Allow for job handler selection based on job params (like source).
affected #: 4 files
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -198,13 +198,27 @@
if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
self.track_jobs_in_database = False
# Store per-tool runner configs
+ self.tool_handlers = self.__read_tool_job_config( global_conf_parser, 'galaxy:tool_handlers', 'name' )
+ self.tool_runners = self.__read_tool_job_config( global_conf_parser, 'galaxy:tool_runners', 'url' )
+ self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' )
+ # Cloud configuration options
+ self.enable_cloud_launch = string_as_bool( kwargs.get( 'enable_cloud_launch', False ) )
+ # Galaxy messaging (AMQP) configuration options
+ self.amqp = {}
try:
- tool_runners_config = global_conf_parser.items("galaxy:tool_runners")
+ amqp_config = global_conf_parser.items("galaxy_amqp")
+ except ConfigParser.NoSectionError:
+ amqp_config = {}
+ for k, v in amqp_config:
+ self.amqp[k] = v
+ def __read_tool_job_config( self, global_conf_parser, section, key ):
+ try:
+ tool_runners_config = global_conf_parser.items( section )
# Process config to group multiple configs for the same tool.
- tool_runners = {}
+ rval = {}
for entry in tool_runners_config:
- tool_config, url = entry
+ tool_config, val = entry
tool = None
runner_dict = {}
if tool_config.find("[") != -1:
@@ -219,29 +233,18 @@
tool = tool_config
# Add runner URL.
- runner_dict[ 'url' ] = url
+ runner_dict[ key ] = val
# Create tool entry if necessary.
- if tool not in tool_runners:
- tool_runners[ tool ] = []
+ if tool not in rval:
+ rval[ tool ] = []
# Add entry to runners.
- tool_runners[ tool ].append( runner_dict )
+ rval[ tool ].append( runner_dict )
- self.tool_runners = tool_runners
+ return rval
except ConfigParser.NoSectionError:
- self.tool_runners = []
- self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' )
- # Cloud configuration options
- self.enable_cloud_launch = string_as_bool( kwargs.get( 'enable_cloud_launch', False ) )
- # Galaxy messaging (AMQP) configuration options
- self.amqp = {}
- try:
- amqp_config = global_conf_parser.items("galaxy_amqp")
- except ConfigParser.NoSectionError:
- amqp_config = {}
- for k, v in amqp_config:
- self.amqp[k] = v
+ return []
def get( self, key, default ):
return self.config_dict.get( key, default )
def get_bool( self, key, default ):
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/jobs/manager.py
--- a/lib/galaxy/jobs/manager.py
+++ b/lib/galaxy/jobs/manager.py
@@ -144,7 +144,7 @@
pass
for job in jobs_to_check:
- job.handler = self.__select_handler( job )
+ job.handler = self.__get_handler( job )
log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) )
self.sa_session.add( job )
@@ -157,9 +157,15 @@
for job in jobs_to_check:
self.job_handler.job_queue.put( job.id, job.tool_id )
- def __select_handler( self, job ):
- # TODO: handler selection based on params, tool, etc.
- return random.choice( self.app.config.job_handlers )
+ def __get_handler( self, job ):
+ try:
+ params = None
+ if job.params:
+ params = from_json_string( job.params )
+ return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params )
+ except:
+ log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) )
+ return random.choice( self.app.config.job_handlers )
def put( self, job_id, tool ):
"""Add a job to the queue (by job identifier)"""
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py
+++ b/lib/galaxy/tools/__init__.py
@@ -5,7 +5,7 @@
pkg_resources.require( "simplejson" )
-import logging, os, string, sys, tempfile, glob, shutil, types, urllib, subprocess
+import logging, os, string, sys, tempfile, glob, shutil, types, urllib, subprocess, random
import simplejson
import binascii
from UserDict import DictMixin
@@ -682,31 +682,35 @@
if tool_version:
return tool_version.get_version_ids( self.app )
return []
- def get_job_runner( self, job_params=None ):
- # Look through runners to find one with matching parameters.
- selected_runner = None
- if len( self.job_runners ) == 1:
- # Most tools have a single runner.
- selected_runner = self.job_runners[0]
+ def __get_job_run_config( self, run_configs, key, job_params=None ):
+ # Look through runners/handlers to find one with matching parameters.
+ available_configs = []
+ if len( run_configs ) == 1:
+ # Most tools have a single config.
+ return run_configs[0][ key ] # return to avoid random when this will be the case most of the time
elif job_params is None:
- # Use job runner with no params
- for runner in self.job_runners:
- if "params" not in runner:
- selected_runner = runner
+ # Use job config with no params
+ for config in run_configs:
+ if "params" not in config:
+ available_configs.append( config )
else:
- # Find runner with matching parameters.
- for runner in self.job_runners:
- if "params" in runner:
+ # Find config with matching parameters.
+ for config in run_configs:
+ if "params" in config:
match = True
- runner_params = runner[ "params" ]
+ config_params = config[ "params" ]
for param, value in job_params.items():
- if param not in runner_params or \
- runner_params[ param ] != job_params[ param ]:
+ if param not in config_params or \
+ config_params[ param ] != job_params[ param ]:
match = False
break
if match:
- selected_runner = runner
- return selected_runner[ "url" ]
+ available_configs.append( config )
+ return random.choice( available_configs )[ key ]
+ def get_job_runner( self, job_params=None ):
+ return self.__get_job_run_config( self.job_runners, key='url', job_params=job_params )
+ def get_job_handler( self, job_params=None ):
+ return self.__get_job_run_config( self.job_handlers, key='name', job_params=job_params )
def parse( self, root, guid=None ):
"""
Read tool configuration from the element `root` and fill in `self`.
@@ -773,6 +777,12 @@
self.parallelism = ToolParallelismInfo(parallelism)
else:
self.parallelism = None
+ # Set job handler(s). Each handler is a dict with 'url' and, optionally, 'params'.
+ self_id = self.id.lower()
+ self.job_handlers = [ { "name" : name } for name in self.app.config.job_handlers ]
+ # Set custom handler(s) if they're defined.
+ if self_id in self.app.config.tool_handlers:
+ self.job_handlers = self.app.config.tool_handlers[ self_id ]
# Set job runner(s). Each runner is a dict with 'url' and, optionally, 'params'.
if self.app.config.start_job_runners is None:
# Jobs are always local regardless of tool config if no additional
@@ -782,7 +792,6 @@
# Set job runner to the cluster default
self.job_runners = [ { "url" : self.app.config.default_cluster_job_runner } ]
# Set custom runner(s) if they're defined.
- self_id = self.id.lower()
if self_id in self.app.config.tool_runners:
self.job_runners = self.app.config.tool_runners[ self_id ]
# Is this a 'hidden' tool (hidden in tool menu)
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d universe_wsgi.ini.sample
--- a/universe_wsgi.ini.sample
+++ b/universe_wsgi.ini.sample
@@ -664,18 +664,30 @@
#pbs_stage_path =
#pbs_dataset_server =
-# ---- Tool Job Runners -----------------------------------------------------
+# ---- Per-Tool Job Management ----------------------------------------------
-# Individual per-tool job runner overrides. Parameters can be included to define
-# multiple runners per tool. E.g. to run Cufflinks jobs initiated from Trackster
+# Per-tool job handler and runner overrides. Parameters can be included to define multiple
+# runners per tool. E.g. to run Cufflinks jobs initiated from Trackster
# differently than standard Cufflinks jobs:
-# cufflinks = local:///
-# cufflinks[source@trackster] = local:///
-# If not listed here, a tool will run with the runner defined with
-# default_cluster_job_runner.
+#
+# cufflinks = local:///
+# cufflinks[source@trackster] = local:///
+
+[galaxy:tool_handlers]
+
+# By default, Galaxy will select a handler at random from the list of
+# job_handlers set above. You can override as in the following examples:
+#
+#upload1 = upload_handler
+#cufflinks[source@trackster] = realtime_handler
[galaxy:tool_runners]
+# If not listed here, a tool will run with the runner defined with
+# default_cluster_job_runner. These overrides for local:/// are done because
+# these tools can fetch data from remote sites, which may not be suitable to
+# run on a cluster (if it does not have access to the Internet, for example).
+
biomart = local:///
encode_db1 = local:///
hbvar = local:///
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: natefoo: Fix the data transfer script to work with new API library content ids.
by Bitbucket 30 Mar '12
by Bitbucket 30 Mar '12
30 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/12b14f3e78e9/
changeset: 12b14f3e78e9
user: natefoo
date: 2012-03-30 15:51:04
summary: Fix the data transfer script to work with new API library content ids.
affected #: 1 file
diff -r db6b788a1fe6f8ce12f91844bd8658f6168f2ff4 -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 scripts/galaxy_messaging/server/data_transfer.py
--- a/scripts/galaxy_messaging/server/data_transfer.py
+++ b/scripts/galaxy_messaging/server/data_transfer.py
@@ -177,7 +177,7 @@
self.update_status( SampleDataset.transfer_status.ADD_TO_LIBRARY )
try:
data = {}
- data[ 'folder_id' ] = api.encode_id( self.config_id_secret, '%s.%s' % ( 'folder', self.folder_id ) )
+ data[ 'folder_id' ] = 'F%s' % api.encode_id( self.config_id_secret, self.folder_id )
data[ 'file_type' ] = 'auto'
data[ 'server_dir' ] = self.server_dir
data[ 'dbkey' ] = ''
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Change new functional test flags to conform more closely to previous command line flags
by Bitbucket 30 Mar '12
by Bitbucket 30 Mar '12
30 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/db6b788a1fe6/
changeset: db6b788a1fe6
user: greg
date: 2012-03-30 15:36:21
summary: Change new functional test flags to conform more closely to previous command line flags
affected #: 2 files
diff -r de2946aca8877087d761684a72ff955f46e4f5a6 -r db6b788a1fe6f8ce12f91844bd8658f6168f2ff4 run_functional_tests.sh
--- a/run_functional_tests.sh
+++ b/run_functional_tests.sh
@@ -20,23 +20,23 @@
echo "==========================================================================================================================================="
echo "'run_functional_tests.sh -id bbb' for testing one tool with id 'bbb' ('bbb' is the tool id)"
echo "'run_functional_tests.sh -sid ccc' for testing one section with sid 'ccc' ('ccc' is the string after 'section::')"
-elif [ $1 = '--migrated' ]; then
+elif [ $1 = '-migrated' ]; then
if [ ! $2 ]; then
- python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html --migrated
+ python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html -migrated
elif [ $2 = '-id' ]; then
# TODO: This option is not tested...
- python ./scripts/functional_tests.py -v functional.test_toolbox:TestForTool_$3 --with-nosehtml --html-report-file run_functional_tests.html --migrated
+ python ./scripts/functional_tests.py -v functional.test_toolbox:TestForTool_$3 --with-nosehtml --html-report-file run_functional_tests.html -migrated
else
- python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html --migrated
+ python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html -migrated
fi
-elif [ $1 = '--installed' ]; then
+elif [ $1 = '-installed' ]; then
if [ ! $2 ]; then
- python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html --installed
+ python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html -installed
elif [ $2 = '-id' ]; then
# TODO: This option is not tested...
- python ./scripts/functional_tests.py -v functional.test_toolbox:TestForTool_$3 --with-nosehtml --html-report-file run_functional_tests.html --installed
+ python ./scripts/functional_tests.py -v functional.test_toolbox:TestForTool_$3 --with-nosehtml --html-report-file run_functional_tests.html -installed
else
- python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html --installed
+ python ./scripts/functional_tests.py -v functional.test_toolbox --with-nosehtml --html-report-file run_functional_tests.html -installed
fi
else
python ./scripts/functional_tests.py -v --with-nosehtml --html-report-file run_functional_tests.html $1
diff -r de2946aca8877087d761684a72ff955f46e4f5a6 -r db6b788a1fe6f8ce12f91844bd8658f6168f2ff4 scripts/functional_tests.py
--- a/scripts/functional_tests.py
+++ b/scripts/functional_tests.py
@@ -135,8 +135,8 @@
tool_path = os.environ.get( 'GALAXY_TEST_TOOL_PATH', 'tools' )
if 'HTTP_ACCEPT_LANGUAGE' not in os.environ:
os.environ[ 'HTTP_ACCEPT_LANGUAGE' ] = default_galaxy_locales
- testing_migrated_tools = '--migrated' in sys.argv
- testing_installed_tools = '--installed' in sys.argv
+ testing_migrated_tools = '-migrated' in sys.argv
+ testing_installed_tools = '-installed' in sys.argv
if testing_migrated_tools or testing_installed_tools:
sys.argv.pop()
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: jgoecks: Fix chart_curve (Trackster) icon omission in new styles.
by Bitbucket 30 Mar '12
by Bitbucket 30 Mar '12
30 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/de2946aca887/
changeset: de2946aca887
user: jgoecks
date: 2012-03-30 15:12:37
summary: Fix chart_curve (Trackster) icon omission in new styles.
affected #: 3 files
diff -r 1c3b74544a2f0d2d135c8d3f70803d1284278d5b -r de2946aca8877087d761684a72ff955f46e4f5a6 static/june_2007_style/base_sprites.less.tmpl
--- a/static/june_2007_style/base_sprites.less.tmpl
+++ b/static/june_2007_style/base_sprites.less.tmpl
@@ -94,6 +94,10 @@
-sprite-group: fugue;
-sprite-image: fugue/gear.png;
}
+.icon-button.chart_curve {
+ -sprite-group: fugue;
+ -sprite-image: silk/chart_curve.png;
+}
.text-and-autocomplete-select {
-sprite-group: fugue;
diff -r 1c3b74544a2f0d2d135c8d3f70803d1284278d5b -r de2946aca8877087d761684a72ff955f46e4f5a6 static/june_2007_style/blue/base.css
--- a/static/june_2007_style/blue/base.css
+++ b/static/june_2007_style/blue/base.css
@@ -225,7 +225,7 @@
.navbar-form .radio,.navbar-form .checkbox{margin-top:5px;}
.navbar-form input[type="image"],.navbar-form input[type="checkbox"],.navbar-form input[type="radio"]{margin-top:3px;}
.navbar-search{position:relative;float:left;margin-top:6px;margin-bottom:0;}.navbar-search .search-query{padding:4px 9px;font-family:"Lucida Grande",verdana,arial,helvetica,sans-serif;font-size:13px;font-weight:normal;line-height:1;color:#ffffff;color:rgba(255, 255, 255, 0.75);background:#666;background:rgba(255, 255, 255, 0.3);border:1px solid #111;-webkit-box-shadow:inset 0 1px 2px rgba(0, 0, 0, 0.1),0 1px 0px rgba(255, 255, 255, 0.15);-moz-box-shadow:inset 0 1px 2px rgba(0, 0, 0, 0.1),0 1px 0px rgba(255, 255, 255, 0.15);box-shadow:inset 0 1px 2px rgba(0, 0, 0, 0.1),0 1px 0px rgba(255, 255, 255, 0.15);-webkit-transition:none;-moz-transition:none;-ms-transition:none;-o-transition:none;transition:none;}.navbar-search .search-query :-moz-placeholder{color:#eeeeee;}
-.navbar-search .search-query::-webkit-input-placeholder{color:#eeeeee;}
+.navbar-search .search-query ::-webkit-input-placeholder{color:#eeeeee;}
.navbar-search .search-query:hover{color:#ffffff;background-color:#999999;background-color:rgba(255, 255, 255, 0.5);}
.navbar-search .search-query:focus,.navbar-search .search-query.focused{padding:5px 10px;color:#333333;text-shadow:0 1px 0 #ffffff;background-color:#ffffff;border:0;-webkit-box-shadow:0 0 3px rgba(0, 0, 0, 0.15);-moz-box-shadow:0 0 3px rgba(0, 0, 0, 0.15);box-shadow:0 0 3px rgba(0, 0, 0, 0.15);outline:0;}
.navbar-fixed-top{position:fixed;top:0;right:0;left:0;z-index:1030;}
@@ -706,7 +706,8 @@
.icon-button.import{background:url(fugue.png) no-repeat 0px -312px;}
.icon-button.plus-button{background:url(fugue.png) no-repeat 0px -338px;}
.icon-button.gear{background:url(fugue.png) no-repeat 0px -364px;}
-.text-and-autocomplete-select{background:url(fugue.png) no-repeat right -390px;}
+.icon-button.chart_curve{background:url(fugue.png) no-repeat 0px -390px;}
+.text-and-autocomplete-select{background:url(fugue.png) no-repeat right -416px;}
.tipsy{padding:5px;font-size:10px;filter:alpha(opacity=80);background-repeat:no-repeat;background-image:url(../images/tipsy.gif);}
.tipsy-inner{padding:5px 8px 4px 8px;background-color:black;color:white;max-width:200px;text-align:center;}
.tipsy-north{background-position:top center;}
diff -r 1c3b74544a2f0d2d135c8d3f70803d1284278d5b -r de2946aca8877087d761684a72ff955f46e4f5a6 static/june_2007_style/blue/fugue.png
Binary file static/june_2007_style/blue/fugue.png has changed
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: natefoo: Remove unused and faulty import of sqlalchemy expressions in galaxy.jobs
by Bitbucket 29 Mar '12
by Bitbucket 29 Mar '12
29 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/1c3b74544a2f/
changeset: 1c3b74544a2f
user: natefoo
date: 2012-03-30 04:47:17
summary: Remove unused and faulty import of sqlalchemy expressions in galaxy.jobs
affected #: 1 file
diff -r 4f6c38ca353861351e286016bbbddc126ca566c4 -r 1c3b74544a2f0d2d135c8d3f70803d1284278d5b lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -11,8 +11,6 @@
import traceback
import subprocess
-from sqlalchemy.sql.expression import and_, or_
-
import galaxy
from galaxy import util, model
from galaxy.datatypes.tabular import *
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: natefoo: Support multiple job runners by creating a job manager that designates a job handler to run jobs, thereby avoiding the "new job" race condition.
by Bitbucket 29 Mar '12
by Bitbucket 29 Mar '12
29 Mar '12
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/4f6c38ca3538/
changeset: 4f6c38ca3538
user: natefoo
date: 2012-03-29 23:25:56
summary: Support multiple job runners by creating a job manager that designates a job handler to run jobs, thereby avoiding the "new job" race condition.
affected #: 11 files
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/app.py
--- a/lib/galaxy/app.py
+++ b/lib/galaxy/app.py
@@ -124,8 +124,9 @@
if self.config.get_bool( 'enable_beta_job_managers', False ):
from jobs import transfer_manager
self.transfer_manager = transfer_manager.TransferManager( self )
- # Start the job queue
- self.job_manager = jobs.JobManager( self )
+ # Start the job manager
+ from jobs import manager
+ self.job_manager = manager.JobManager( self )
# FIXME: These are exposed directly for backward compatibility
self.job_queue = self.job_manager.job_queue
self.job_stop_queue = self.job_manager.job_stop_queue
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -184,7 +184,20 @@
# Heartbeat log file name override
if global_conf is not None:
self.heartbeat_log = global_conf.get( 'heartbeat_log', 'heartbeat.log' )
- #Store per-tool runner configs.
+ # Determine which 'server:' this is
+ self.server_name = 'main'
+ for arg in sys.argv:
+ # Crummy, but PasteScript does not give you a way to determine this
+ if arg.lower().startswith('--server-name='):
+ self.server_name = arg.split('=', 1)[-1]
+ # Store advanced job management config
+ self.job_manager = kwargs.get('job_manager', self.server_name).strip()
+ self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ]
+ # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory)
+ self.track_jobs_in_database = True
+ if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
+ self.track_jobs_in_database = False
+ # Store per-tool runner configs
try:
tool_runners_config = global_conf_parser.items("galaxy:tool_runners")
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -1,4 +1,17 @@
-import logging, threading, sys, os, time, traceback, shutil
+"""
+Support for running a tool in Galaxy via an internal job management system
+"""
+
+import os
+import sys
+import pwd
+import time
+import logging
+import threading
+import traceback
+import subprocess
+
+from sqlalchemy.sql.expression import and_, or_
import galaxy
from galaxy import util, model
@@ -9,51 +22,16 @@
from galaxy.util.json import from_json_string
from galaxy.util.expressions import ExpressionContext
from galaxy.jobs.actions.post import ActionBox
-import subprocess, pwd
from galaxy.exceptions import ObjectInvalid
-from sqlalchemy.sql.expression import and_, or_
-
-import pkg_resources
-pkg_resources.require( "PasteDeploy" )
-
-from Queue import Queue, Empty
-
log = logging.getLogger( __name__ )
-# States for running a job. These are NOT the same as data states
-JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
-
# This file, if created in the job's working directory, will be used for
# setting advanced metadata properties on the job and its associated outputs.
# This interface is currently experimental, is only used by the upload tool,
# and should eventually become API'd
TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json'
-class JobManager( object ):
- """
- Highest level interface to job management.
-
- TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly.
- This should be decoupled.
- """
- def __init__( self, app ):
- self.app = app
- if self.app.config.get_bool( "enable_job_running", True ):
- # The dispatcher launches the underlying job runners
- self.dispatcher = DefaultJobDispatcher( app )
- # Queues for starting and stopping jobs
- self.job_queue = JobQueue( app, self.dispatcher )
- self.job_stop_queue = JobStopQueue( app, self.dispatcher )
- if self.app.config.enable_beta_job_managers:
- from galaxy.jobs.deferred import DeferredJobQueue
- self.deferred_job_queue = DeferredJobQueue( app )
- else:
- self.job_queue = self.job_stop_queue = NoopQueue()
- def shutdown( self ):
- self.job_queue.shutdown()
- self.job_stop_queue.shutdown()
-
class Sleeper( object ):
"""
Provides a 'sleep' method that sleeps for a number of seconds *unless*
@@ -70,238 +48,6 @@
self.condition.notify()
self.condition.release()
-class JobQueue( object ):
- """
- Job manager, waits for jobs to be runnable and then dispatches to
- a JobRunner.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, dispatcher ):
- """Start the job manager"""
- self.app = app
- self.sa_session = app.model.context
- self.job_lock = False
- # Should we read jobs form the database, or use an in memory queue
- self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False )
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
- # Contains jobs that are waiting (only use from monitor thread)
- ## This and jobs_to_check[] are closest to a "Job Queue"
- self.waiting_jobs = []
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.dispatcher = dispatcher
- self.monitor_thread = threading.Thread( target=self.__monitor )
- # Recover jobs at startup
- if app.config.get_bool( 'enable_job_recovery', True ):
- self.__check_jobs_at_startup()
- # Start the queue
- self.monitor_thread.start()
- log.info( "job manager started" )
-
- def __check_jobs_at_startup( self ):
- """
- Checks all jobs that are in the 'new', 'queued' or 'running' state in
- the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
- """
- model = self.app.model # DBTODO Why?
- for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ):
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
- else:
- log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
- self.queue.put( ( job.id, job.tool_id ) )
- for job in self.sa_session.query( model.Job ).enable_eagerloads( False ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ):
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
- elif job.job_runner_name is None:
- log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
- if self.track_jobs_in_database:
- job.state = model.Job.states.NEW
- else:
- self.queue.put( ( job.id, job.tool_id ) )
- else:
- job_wrapper = JobWrapper( job, self )
- self.dispatcher.recover( job, job_wrapper )
- if self.sa_session.dirty:
- self.sa_session.flush()
-
- def __monitor( self ):
- """
- Continually iterate the waiting jobs, checking is each is ready to
- run and dispatching if so.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.__monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def __monitor_step( self ):
- """
- Called repeatedly by `monitor` to process waiting jobs. Gets any new
- jobs (either from the database or from its own queue), then iterates
- over all new and waiting jobs to check the state of the jobs each
- depends on. If the job has dependencies that have not finished, it
- it goes to the waiting queue. If the job has dependencies with errors,
- it is marked as having errors and removed from the queue. Otherwise,
- the job is dispatched.
- """
- # Pull all new jobs from the queue at once
- jobs_to_check = []
- if self.track_jobs_in_database:
- # Clear the session so we get fresh states for job and all datasets
- self.sa_session.expunge_all()
- # Fetch all new jobs
- jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( model.Job.state == model.Job.states.NEW ).all()
- else:
- # Get job objects and append to watch queue for any which were
- # previously waiting
- for job_id in self.waiting_jobs:
- jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, tool_id = message
- # Get the job object and append to watch queue
- jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
- except Empty:
- pass
- # Iterate over new and waiting jobs and look for any that are
- # ready to run
- new_waiting_jobs = []
- for job in jobs_to_check:
- try:
- # Check the job's dependencies, requeue if they're not done
- job_state = self.__check_if_ready_to_run( job )
- if job_state == JOB_WAIT:
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
- elif job_state == JOB_INPUT_ERROR:
- log.info( "job %d unable to run: one or more inputs in error state" % job.id )
- elif job_state == JOB_INPUT_DELETED:
- log.info( "job %d unable to run: one or more inputs deleted" % job.id )
- elif job_state == JOB_READY:
- if self.job_lock:
- log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id )
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
- else:
- self.dispatcher.put( JobWrapper( job, self ) )
- log.info( "job %d dispatched" % job.id )
- elif job_state == JOB_DELETED:
- log.info( "job %d deleted by user while still queued" % job.id )
- elif job_state == JOB_ADMIN_DELETED:
- log.info( "job %d deleted by admin while still queued" % job.id )
- else:
- log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) )
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
- except Exception:
- log.exception( "failure running job %d" % job.id )
- # Update the waiting list
- self.waiting_jobs = new_waiting_jobs
- # Done with the session
- self.sa_session.remove()
-
- def __check_if_ready_to_run( self, job ):
- """
- Check if a job is ready to run by verifying that each of its input
- datasets is ready (specifically in the OK state). If any input dataset
- has an error, fail the job and return JOB_INPUT_ERROR. If any input
- dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all
- input datasets are in OK state, return JOB_READY indicating that the
- job can be dispatched. Otherwise, return JOB_WAIT indicating that input
- datasets are still being prepared.
- """
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- elif self.app.config.enable_quotas:
- quota = self.app.quota_agent.get_quota( job.user )
- if quota is not None:
- try:
- usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
- if usage > quota:
- return JOB_WAIT
- except AssertionError, e:
- pass # No history, should not happen with an anon user
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- JobWrapper( job, self ).fail( "input data %d failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
- return self.__check_user_jobs( job )
-
- def __check_user_jobs( self, job ):
- if not self.app.config.user_job_limit:
- return JOB_READY
- if job.user:
- count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( and_( model.Job.user_id == job.user.id,
- or_( model.Job.state == model.Job.states.RUNNING,
- model.Job.state == model.Job.states.QUEUED ) ) ).count()
- elif job.galaxy_session:
- count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( and_( model.Job.session_id == job.galaxy_session.id,
- or_( model.Job.state == model.Job.states.RUNNING,
- model.Job.state == model.Job.states.QUEUED ) ) ).count()
- else:
- log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id )
- return JOB_READY
- if count >= self.app.config.user_job_limit:
- return JOB_WAIT
- return JOB_READY
-
- def put( self, job_id, tool ):
- """Add a job to the queue (by job identifier)"""
- if not self.track_jobs_in_database:
- self.queue.put( ( job_id, tool.id ) )
- self.sleeper.wake()
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job queue stopped" )
- self.dispatcher.shutdown()
-
class JobWrapper( object ):
"""
Wraps a 'model.Job' with convenience methods for running processes and
@@ -1177,179 +923,13 @@
# There is no metadata setting for tasks. This is handled after the merge, at the job level.
return ""
-class DefaultJobDispatcher( object ):
- def __init__( self, app ):
- self.app = app
- self.job_runners = {}
- start_job_runners = ["local"]
- if app.config.start_job_runners is not None:
- start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] )
- if app.config.use_tasked_jobs:
- start_job_runners.append("tasks")
- for name in start_job_runners:
- self._load_plugin( name )
-
- def _load_plugin( self, name ):
- module_name = 'galaxy.jobs.runners.' + name
- try:
- module = __import__( module_name )
- except:
- log.exception( 'Job runner is not loadable: %s' % module_name )
- return
- for comp in module_name.split( "." )[1:]:
- module = getattr( module, comp )
- if '__all__' not in dir( module ):
- log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name )
- return
- for obj in module.__all__:
- display_name = ':'.join( ( module_name, obj ) )
- runner = getattr( module, obj )
- self.job_runners[name] = runner( self.app )
- log.debug( 'Loaded job runner: %s' % display_name )
-
- def __get_runner_name( self, job_wrapper ):
- if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper):
- runner_name = "tasks"
- else:
- runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0]
- return runner_name
-
- def put( self, job_wrapper ):
- try:
- runner_name = self.__get_runner_name( job_wrapper )
- if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper):
- #DBTODO Refactor
- log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
- else:
- log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
- self.job_runners[runner_name].put( job_wrapper )
- except KeyError:
- log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
- job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
-
- def stop( self, job ):
- runner_name = ( job.job_runner_name.split(":", 1) )[0]
- log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) )
- try:
- self.job_runners[runner_name].stop_job( job )
- except KeyError:
- log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
- # Job and output dataset states have already been updated, so nothing is done here.
-
- def recover( self, job, job_wrapper ):
- runner_name = ( job.job_runner_name.split(":", 1) )[0]
- log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
- try:
- self.job_runners[runner_name].recover( job, job_wrapper )
- except KeyError:
- log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
- job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
-
- def shutdown( self ):
- for runner in self.job_runners.itervalues():
- runner.shutdown()
-
-class JobStopQueue( object ):
- """
- A queue for jobs which need to be terminated prematurely.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, dispatcher ):
- self.app = app
- self.sa_session = app.model.context
- self.dispatcher = dispatcher
-
- self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False )
-
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
-
- # Contains jobs that are waiting (only use from monitor thread)
- self.waiting = []
-
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( target=self.monitor )
- self.monitor_thread.start()
- log.info( "job stopper started" )
-
- def monitor( self ):
- """
- Continually iterate the waiting jobs, stop any that are found.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def monitor_step( self ):
- """
- Called repeatedly by `monitor` to stop jobs.
- """
- # Pull all new jobs from the queue at once
- jobs_to_check = []
- if self.track_jobs_in_database:
- # Clear the session so we get fresh states for job and all datasets
- self.sa_session.expunge_all()
- # Fetch all new jobs
- newly_deleted_jobs = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( model.Job.state == model.Job.states.DELETED_NEW ).all()
- for job in newly_deleted_jobs:
- jobs_to_check.append( ( job, None ) )
- # Also pull from the queue (in the case of Administrative stopped jobs)
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, error_msg = message
- # Get the job object and append to watch queue
- jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
- except Empty:
- pass
- for job, error_msg in jobs_to_check:
- if error_msg is not None:
- job.state = job.states.ERROR
- job.info = error_msg
- else:
- job.state = job.states.DELETED
- self.sa_session.add( job )
- self.sa_session.flush()
- if job.job_runner_name is not None:
- # tell the dispatcher to stop the job
- self.dispatcher.stop( job )
-
- def put( self, job_id, error_msg=None ):
- self.queue.put( ( job_id, error_msg ) )
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job stopper stopped" )
-
class NoopQueue( object ):
"""
Implements the JobQueue / JobStopQueue interface but does nothing
"""
def put( self, *args ):
return
+ def put_stop( self, *args ):
+ return
def shutdown( self ):
return
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/handler.py
--- /dev/null
+++ b/lib/galaxy/jobs/handler.py
@@ -0,0 +1,430 @@
+"""
+Galaxy job handler, prepares, runs, tracks, and finishes Galaxy jobs
+"""
+
+import os
+import time
+import logging
+import threading
+from Queue import Queue, Empty
+
+from sqlalchemy.sql.expression import and_, or_
+
+from galaxy import util, model
+from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper
+
+log = logging.getLogger( __name__ )
+
+# States for running a job. These are NOT the same as data states
+JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
+
+class JobHandler( object ):
+ """
+ Handle the preparation, running, tracking, and finishing of jobs
+ """
+ def __init__( self, app ):
+ self.app = app
+ # The dispatcher launches the underlying job runners
+ self.dispatcher = DefaultJobDispatcher( app )
+ # Queues for starting and stopping jobs
+ self.job_queue = JobHandlerQueue( app, self.dispatcher )
+ self.job_stop_queue = JobHandlerStopQueue( app, self.dispatcher )
+ def start( self ):
+ self.job_queue.start()
+ def shutdown( self ):
+ self.job_queue.shutdown()
+ self.job_stop_queue.shutdown()
+
+class JobHandlerQueue( object ):
+ """
+ Job manager, waits for jobs to be runnable and then dispatches to
+ a JobRunner.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, dispatcher ):
+ """Start the job manager"""
+ self.app = app
+ self.dispatcher = dispatcher
+
+ self.sa_session = app.model.context
+ self.track_jobs_in_database = self.app.config.track_jobs_in_database
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+ # Contains jobs that are waiting (only use from monitor thread)
+ ## This and jobs_to_check[] are closest to a "Job Queue"
+ self.waiting_jobs = []
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.__monitor )
+
+ def start( self ):
+ """
+ The JobManager should start, and then start its Handler, if it has one.
+ """
+ # Recover jobs at startup
+ self.__check_jobs_at_startup()
+ # Start the queue
+ self.monitor_thread.start()
+ log.info( "job handler queue started" )
+
+ def __check_jobs_at_startup( self ):
+ """
+ Checks all jobs that are in the 'new', 'queued' or 'running' state in
+ the database and requeues or cleans up as necessary. Only run as the
+ job manager starts.
+ """
+ for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( ( model.Job.state == model.Job.states.NEW ) \
+ | ( model.Job.state == model.Job.states.RUNNING ) \
+ | ( model.Job.state == model.Job.states.QUEUED ) ) \
+ & ( model.Job.handler == self.app.config.server_name ) ):
+ if job.tool_id not in self.app.toolbox.tools_by_id:
+ log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
+ JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
+ elif job.job_runner_name is None:
+ log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
+ if self.track_jobs_in_database:
+ job.state = model.Job.states.NEW
+ else:
+ self.queue.put( ( job.id, job.tool_id ) )
+ else:
+ job_wrapper = JobWrapper( job, self )
+ self.dispatcher.recover( job, job_wrapper )
+ if self.sa_session.dirty:
+ self.sa_session.flush()
+
+ def __monitor( self ):
+ """
+ Continually iterate the waiting jobs, checking is each is ready to
+ run and dispatching if so.
+ """
+ while self.running:
+ try:
+ self.__monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def __monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to process waiting jobs. Gets any new
+ jobs (either from the database or from its own queue), then iterates
+ over all new and waiting jobs to check the state of the jobs each
+ depends on. If the job has dependencies that have not finished, it
+ it goes to the waiting queue. If the job has dependencies with errors,
+ it is marked as having errors and removed from the queue. Otherwise,
+ the job is dispatched.
+ """
+ # Pull all new jobs from the queue at once
+ jobs_to_check = []
+ if self.track_jobs_in_database:
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( model.Job.state == model.Job.states.NEW ) \
+ & ( model.Job.handler == self.app.config.server_name ) ).all()
+ else:
+ # Get job objects and append to watch queue for any which were
+ # previously waiting
+ for job_id in self.waiting_jobs:
+ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, tool_id = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
+ except Empty:
+ pass
+ # Iterate over new and waiting jobs and look for any that are
+ # ready to run
+ new_waiting_jobs = []
+ for job in jobs_to_check:
+ try:
+ # Check the job's dependencies, requeue if they're not done
+ job_state = self.__check_if_ready_to_run( job )
+ if job_state == JOB_WAIT:
+ if not self.track_jobs_in_database:
+ new_waiting_jobs.append( job.id )
+ elif job_state == JOB_INPUT_ERROR:
+ log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id )
+ elif job_state == JOB_INPUT_DELETED:
+ log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id )
+ elif job_state == JOB_READY:
+ self.dispatcher.put( JobWrapper( job, self ) )
+ log.info( "(%d) Job dispatched" % job.id )
+ elif job_state == JOB_DELETED:
+ log.info( "(%d) Job deleted by user while still queued" % job.id )
+ elif job_state == JOB_ADMIN_DELETED:
+ log.info( "(%d) Job deleted by admin while still queued" % job.id )
+ else:
+ log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) )
+ if not self.track_jobs_in_database:
+ new_waiting_jobs.append( job.id )
+ except Exception:
+ log.exception( "failure running job %d" % job.id )
+ # Update the waiting list
+ self.waiting_jobs = new_waiting_jobs
+ # Done with the session
+ self.sa_session.remove()
+
+ def __check_if_ready_to_run( self, job ):
+ """
+ Check if a job is ready to run by verifying that each of its input
+ datasets is ready (specifically in the OK state). If any input dataset
+ has an error, fail the job and return JOB_INPUT_ERROR. If any input
+ dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all
+ input datasets are in OK state, return JOB_READY indicating that the
+ job can be dispatched. Otherwise, return JOB_WAIT indicating that input
+ datasets are still being prepared.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ elif self.app.config.enable_quotas:
+ quota = self.app.quota_agent.get_quota( job.user )
+ if quota is not None:
+ try:
+ usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
+ if usage > quota:
+ return JOB_WAIT
+ except AssertionError, e:
+ pass # No history, should not happen with an anon user
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ JobWrapper( job, self ).fail( "input data %d failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+ return self.__check_user_jobs( job )
+
+ def __check_user_jobs( self, job ):
+ if not self.app.config.user_job_limit:
+ return JOB_READY
+ if job.user:
+ count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( and_( model.Job.user_id == job.user.id,
+ or_( model.Job.state == model.Job.states.RUNNING,
+ model.Job.state == model.Job.states.QUEUED ) ) ).count()
+ elif job.galaxy_session:
+ count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( and_( model.Job.session_id == job.galaxy_session.id,
+ or_( model.Job.state == model.Job.states.RUNNING,
+ model.Job.state == model.Job.states.QUEUED ) ) ).count()
+ else:
+ log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id )
+ return JOB_READY
+ if count >= self.app.config.user_job_limit:
+ return JOB_WAIT
+ return JOB_READY
+
+ def put( self, job_id, tool_id ):
+ """Add a job to the queue (by job identifier)"""
+ if not self.track_jobs_in_database:
+ self.queue.put( ( job_id, tool_id ) )
+ self.sleeper.wake()
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job handler queue stopped" )
+ self.dispatcher.shutdown()
+
+class JobHandlerStopQueue( object ):
+ """
+ A queue for jobs which need to be terminated prematurely.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, dispatcher ):
+ self.app = app
+ self.dispatcher = dispatcher
+
+ self.sa_session = app.model.context
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+
+ # Contains jobs that are waiting (only use from monitor thread)
+ self.waiting = []
+
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.monitor )
+ self.monitor_thread.start()
+ log.info( "job handler stop queue started" )
+
+ def monitor( self ):
+ """
+ Continually iterate the waiting jobs, stop any that are found.
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to stop jobs.
+ """
+ # Pull all new jobs from the queue at once
+ jobs_to_check = []
+ if self.app.config.track_jobs_in_database:
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ newly_deleted_jobs = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( model.Job.state == model.Job.states.DELETED_NEW ) \
+ & ( model.Job.handler == self.app.config.server_name ) ).all()
+ for job in newly_deleted_jobs:
+ jobs_to_check.append( ( job, None ) )
+ # Also pull from the queue (in the case of Administrative stopped jobs)
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, error_msg = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
+ except Empty:
+ pass
+ for job, error_msg in jobs_to_check:
+ if error_msg is not None:
+ job.state = job.states.ERROR
+ job.info = error_msg
+ else:
+ job.state = job.states.DELETED
+ self.sa_session.add( job )
+ self.sa_session.flush()
+ if job.job_runner_name is not None:
+ # tell the dispatcher to stop the job
+ self.dispatcher.stop( job )
+
+ def put( self, job_id, error_msg=None ):
+ self.queue.put( ( job_id, error_msg ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job handler stop queue stopped" )
+
+class DefaultJobDispatcher( object ):
+ def __init__( self, app ):
+ self.app = app
+ self.job_runners = {}
+ start_job_runners = ["local"]
+ if app.config.start_job_runners is not None:
+ start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] )
+ if app.config.use_tasked_jobs:
+ start_job_runners.append("tasks")
+ for name in start_job_runners:
+ self._load_plugin( name )
+
+ def _load_plugin( self, name ):
+ module_name = 'galaxy.jobs.runners.' + name
+ try:
+ module = __import__( module_name )
+ except:
+ log.exception( 'Job runner is not loadable: %s' % module_name )
+ return
+ for comp in module_name.split( "." )[1:]:
+ module = getattr( module, comp )
+ if '__all__' not in dir( module ):
+ log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name )
+ return
+ for obj in module.__all__:
+ display_name = ':'.join( ( module_name, obj ) )
+ runner = getattr( module, obj )
+ self.job_runners[name] = runner( self.app )
+ log.debug( 'Loaded job runner: %s' % display_name )
+
+ def __get_runner_name( self, job_wrapper ):
+ if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper):
+ runner_name = "tasks"
+ else:
+ runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0]
+ return runner_name
+
+ def put( self, job_wrapper ):
+ try:
+ runner_name = self.__get_runner_name( job_wrapper )
+ if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper):
+ #DBTODO Refactor
+ log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
+ else:
+ log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
+ self.job_runners[runner_name].put( job_wrapper )
+ except KeyError:
+ log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
+ job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
+
+ def stop( self, job ):
+ runner_name = ( job.job_runner_name.split(":", 1) )[0]
+ log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) )
+ try:
+ self.job_runners[runner_name].stop_job( job )
+ except KeyError:
+ log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
+ # Job and output dataset states have already been updated, so nothing is done here.
+
+ def recover( self, job, job_wrapper ):
+ runner_name = ( job.job_runner_name.split(":", 1) )[0]
+ log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
+ try:
+ self.job_runners[runner_name].recover( job, job_wrapper )
+ except KeyError:
+ log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
+ job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
+
+ def shutdown( self ):
+ for runner in self.job_runners.itervalues():
+ runner.shutdown()
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/manager.py
--- /dev/null
+++ b/lib/galaxy/jobs/manager.py
@@ -0,0 +1,270 @@
+"""
+Top-level Galaxy job manager, moves jobs to handler(s)
+"""
+
+import os
+import time
+import random
+import logging
+import threading
+from Queue import Queue, Empty
+
+from sqlalchemy.sql.expression import and_, or_
+
+from galaxy import model
+from galaxy.jobs import handler, Sleeper, NoopQueue
+
+log = logging.getLogger( __name__ )
+
+class JobManager( object ):
+ """
+ Highest level interface to job management.
+
+ TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly.
+ This should be decoupled.
+ """
+ def __init__( self, app ):
+ self.app = app
+ self.job_handler = NoopHandler()
+ if self.app.config.server_name in self.app.config.job_handlers:
+ self.job_handler = handler.JobHandler( app )
+ if self.app.config.server_name == self.app.config.job_manager:
+ job_handler = NoopHandler()
+ # In the case that webapp == manager == handler, pass jobs in memory
+ if not self.app.config.track_jobs_in_database:
+ job_handler = self.job_handler
+ # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database
+ self.job_queue = JobManagerQueue( app, job_handler )
+ self.job_stop_queue = JobManagerStopQueue( app, job_handler )
+ if self.app.config.enable_beta_job_managers:
+ from galaxy.jobs.deferred import DeferredJobQueue
+ self.deferred_job_queue = DeferredJobQueue( app )
+ else:
+ self.job_queue = self.job_stop_queue = NoopQueue()
+ self.job_handler.start()
+ def shutdown( self ):
+ self.job_queue.shutdown()
+ self.job_stop_queue.shutdown()
+ self.job_handler.shutdown()
+
+class JobManagerQueue( object ):
+ """
+ Job manager, waits for jobs to be runnable and then dispatches to a
+ JobHandler.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, job_handler ):
+ self.app = app
+ self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory
+
+ self.sa_session = app.model.context
+ self.job_lock = False
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.__monitor )
+ # Recover jobs at startup
+ self.__check_jobs_at_startup()
+ # Start the queue
+ self.monitor_thread.start()
+ log.info( "job manager queue started" )
+
+ def __check_jobs_at_startup( self ):
+ """
+ Checks all jobs that are in the 'new', 'queued' or 'running' state in
+ the database and requeues or cleans up as necessary. Only run as the
+ job manager starts.
+ """
+ for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( ( model.Job.state == model.Job.states.NEW ) \
+ | ( model.Job.state == model.Job.states.RUNNING ) \
+ | ( model.Job.state == model.Job.states.QUEUED ) ) \
+ & ( model.Job.handler == None ) ):
+ if job.tool_id not in self.app.toolbox.tools_by_id:
+ log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
+ JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
+ else:
+ job.handler = self.__select_handler( job ) # handler's recovery method will take it from here
+ log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) )
+ if self.sa_session.dirty:
+ self.sa_session.flush()
+
+ def __monitor( self ):
+ """
+ Continually iterate the waiting jobs and dispatch to a handler
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.__monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def __monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to process waiting jobs. Gets any new
+ jobs (either from the database or from its own queue), then assigns a
+ handler.
+ """
+ # Do nothing if the queue is locked
+ if self.job_lock:
+ log.info( 'Job queue is administratively locked, sleeping...' )
+ time.sleep( 10 )
+ return
+ # Pull all new jobs from the queue at once
+ jobs_to_check = []
+ if self.app.config.track_jobs_in_database:
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( model.Job.state == model.Job.states.NEW ) \
+ & ( model.Job.handler == None ) ).all()
+ else:
+ # Get job objects and append to watch queue for any which were
+ # previously waiting
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, tool_id = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
+ except Empty:
+ pass
+
+ for job in jobs_to_check:
+ job.handler = self.__select_handler( job )
+ log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) )
+ self.sa_session.add( job )
+
+ # If tracking in the database, handlers will pick up the job now
+ self.sa_session.flush()
+
+ time.sleep( 5 )
+
+ # This only does something in the case that there is only one handler and it is this Galaxy process
+ for job in jobs_to_check:
+ self.job_handler.job_queue.put( job.id, job.tool_id )
+
+ def __select_handler( self, job ):
+ # TODO: handler selection based on params, tool, etc.
+ return random.choice( self.app.config.job_handlers )
+
+ def put( self, job_id, tool ):
+ """Add a job to the queue (by job identifier)"""
+ if not self.app.config.track_jobs_in_database:
+ self.queue.put( ( job_id, tool.id ) )
+ self.sleeper.wake()
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.app.config.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job manager queue stopped" )
+ self.dispatcher.shutdown()
+
+class JobManagerStopQueue( object ):
+ """
+ A queue for jobs which need to be terminated prematurely.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, job_handler ):
+ self.app = app
+ self.job_handler = job_handler
+
+ self.sa_session = app.model.context
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+
+ # Contains jobs that are waiting (only use from monitor thread)
+ self.waiting = []
+
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.monitor )
+ self.monitor_thread.start()
+ log.info( "job manager stop queue started" )
+
+ def monitor( self ):
+ """
+ Continually iterate the waiting jobs, stop any that are found.
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to stop jobs.
+ """
+ jobs_to_check = []
+ # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs)
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, error_msg = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
+ except Empty:
+ pass
+
+ # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler.
+ for job, error_msg in jobs_to_check:
+ self.job_handler.job_stop_queue.put( job.id, error_msg )
+
+ def put( self, job_id, error_msg=None ):
+ self.queue.put( ( job_id, error_msg ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.app.config.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job manager stop queue stopped" )
+
+class NoopHandler( object ):
+ def __init__( self, *args, **kwargs ):
+ self.job_queue = NoopQueue()
+ self.job_stop_queue = NoopQueue()
+ def start( self ):
+ pass
+ def shutdown( self, *args ):
+ pass
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -132,6 +132,7 @@
self.job_runner_external_id = None
self.post_job_actions = []
self.imported = False
+ self.handler = None
def add_parameter( self, name, value ):
self.parameters.append( JobParameter( name, value ) )
@@ -171,14 +172,11 @@
if not dataset.deleted:
return False
return True
- def mark_deleted( self, enable_job_running=True, track_jobs_in_database=False ):
+ def mark_deleted( self, track_jobs_in_database=False ):
"""
Mark this job as deleted, and mark any output datasets as discarded.
"""
- # This could be handled with *just* track_jobs_in_database, but I
- # didn't want to make setting track_jobs_in_database required in
- # non-runner configs.
- if not enable_job_running or track_jobs_in_database:
+ if track_jobs_in_database:
self.state = Job.states.DELETED_NEW
else:
self.state = Job.states.DELETED
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/mapping.py
--- a/lib/galaxy/model/mapping.py
+++ b/lib/galaxy/model/mapping.py
@@ -421,7 +421,8 @@
Column( "job_runner_external_id", String( 255 ) ),
Column( "object_store_id", TrimmedString( 255 ), index=True ),
Column( "imported", Boolean, default=False, index=True ),
- Column( "params", TrimmedString(255), index=True ) )
+ Column( "params", TrimmedString(255), index=True ),
+ Column( "handler", TrimmedString( 255 ), index=True ) )
JobParameter.table = Table( "job_parameter", metadata,
Column( "id", Integer, primary_key=True ),
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/migrate/versions/0094_add_job_handler_col.py
--- /dev/null
+++ b/lib/galaxy/model/migrate/versions/0094_add_job_handler_col.py
@@ -0,0 +1,49 @@
+"""
+Migration script to create "handler" column in job table.
+"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+from migrate import *
+from migrate.changeset import *
+
+import logging
+log = logging.getLogger( __name__ )
+
+# Need our custom types, but don't import anything else from model
+from galaxy.model.custom_types import *
+
+metadata = MetaData( migrate_engine )
+db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) )
+
+# Column to add.
+handler_col = Column( "handler", TrimmedString(255), index=True )
+
+def display_migration_details():
+ print ""
+ print "This migration script adds a 'handler' column to the Job table."
+
+def upgrade():
+ print __doc__
+ metadata.reflect()
+
+ # Add column to Job table.
+ try:
+ Job_table = Table( "job", metadata, autoload=True )
+ handler_col.create( Job_table )
+ assert handler_col is Job_table.c.handler
+
+ except Exception, e:
+ print str(e)
+ log.debug( "Adding column 'handler' to job table failed: %s" % str( e ) )
+
+def downgrade():
+ metadata.reflect()
+
+ # Drop column from Job table.
+ try:
+ Job_table = Table( "job", metadata, autoload=True )
+ handler_col = Job_table.c.handler
+ handler_col.drop()
+ except Exception, e:
+ log.debug( "Dropping column 'handler' from job table failed: %s" % ( str( e ) ) )
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/web/base/controller.py
--- a/lib/galaxy/web/base/controller.py
+++ b/lib/galaxy/web/base/controller.py
@@ -2381,8 +2381,8 @@
deleted = []
msg = None
status = None
- if not trans.app.config.get_bool( "enable_job_running", True ):
- return trans.show_error_message( 'This Galaxy instance is not configured to run jobs. If using multiple servers, please directly access the job running instance to manage jobs.' )
+ if not self.app.config.job_manager != self.app.config.server_name:
+ return trans.show_error_message( 'This Galaxy instance is not the job manager. If using multiple servers, please directly access the job manager instance to manage jobs.' )
job_ids = util.listify( stop )
if job_ids and stop_msg in [ None, '' ]:
msg = 'Please enter an error message to display to the user describing why the job was terminated'
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/web/controllers/dataset.py
--- a/lib/galaxy/web/controllers/dataset.py
+++ b/lib/galaxy/web/controllers/dataset.py
@@ -904,8 +904,7 @@
if job.state in [ self.app.model.Job.states.QUEUED, self.app.model.Job.states.RUNNING, self.app.model.Job.states.NEW ]:
# Are *all* of the job's other output datasets deleted?
if job.check_if_output_datasets_deleted():
- job.mark_deleted( self.app.config.get_bool( 'enable_job_running', True ),
- self.app.config.get_bool( 'track_jobs_in_database', False ) )
+ job.mark_deleted( self.app.config.track_jobs_in_database )
self.app.job_manager.job_stop_queue.put( job.id )
trans.sa_session.flush()
except Exception, e:
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 universe_wsgi.ini.sample
--- a/universe_wsgi.ini.sample
+++ b/universe_wsgi.ini.sample
@@ -542,14 +542,25 @@
# -- Job Execution
-# If running multiple Galaxy processes, one can be designated as the job
-# runner. For more information, see:
-# http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Web%20Application%20Scal…
-#enable_job_running = True
+# To increase performance of job execution and the web interface, you can
+# separate Galaxy into multiple processes. There are more than one way to do
+# this, and they are explained in detail in the documentation:
+#
+# http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Web%20Application%20Scal…
+#
+# By default, Galaxy manages and executes jobs from within a single process and
+# notifies itself of new jobs via in-memory queues. If you change job_manager
+# and job_handlers from their default values, notification will instead be done
+# using the `state` and `handler` columns of the job table in the database.
-# Should jobs be tracked through the database, rather than in memory.
-# Necessary if you're running the load balanced setup.
-#track_jobs_in_database = False
+# Identify the server_name (the string following server: at the top of this
+# file) which should be designated as the job manager (only one):
+#job_manager = main
+
+# Identify the server_name(s) which should be designated as job handlers
+# (responsible for starting, tracking, finishing, and cleaning up jobs) as a
+# comma-separated list.
+#job_handlers = main
# This enables splitting of jobs into tasks, if specified by the particular tool config.
# This is a new feature and not recommended for production servers yet.
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0