1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/4dd7564268fc/
changeset: 4dd7564268fc
user: natefoo
date: 2012-03-31 16:06:43
summary: Allow job handlers to be exempted from the default random choice pool.
affected #: 3 files
diff -r 40abca016f1ebd7be7adb362614eb16d8f2116bf -r 4dd7564268fcd4f5bd3275fa3b5da012b7adefb5 lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -193,6 +193,7 @@
# Store advanced job management config
self.job_manager = kwargs.get('job_manager', self.server_name).strip()
self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ]
+ self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ]
# Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory)
self.track_jobs_in_database = True
if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
diff -r 40abca016f1ebd7be7adb362614eb16d8f2116bf -r 4dd7564268fcd4f5bd3275fa3b5da012b7adefb5 lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py
+++ b/lib/galaxy/tools/__init__.py
@@ -791,7 +791,7 @@
self.parallelism = None
# Set job handler(s). Each handler is a dict with 'url' and, optionally, 'params'.
self_id = self.id.lower()
- self.job_handlers = [ { "name" : name } for name in self.app.config.job_handlers ]
+ self.job_handlers = [ { "name" : name } for name in self.app.config.default_job_handlers ]
# Set custom handler(s) if they're defined.
if self_id in self.app.config.tool_handlers:
self.job_handlers = self.app.config.tool_handlers[ self_id ]
diff -r 40abca016f1ebd7be7adb362614eb16d8f2116bf -r 4dd7564268fcd4f5bd3275fa3b5da012b7adefb5 universe_wsgi.ini.sample
--- a/universe_wsgi.ini.sample
+++ b/universe_wsgi.ini.sample
@@ -562,6 +562,13 @@
# comma-separated list.
#job_handlers = main
+# By default, a handler from job_handlers will be selected at random if the
+# tool to run does specify a handler below in [galaxy:tool_handlers]. If you
+# want certain handlers to only handle jobs for tools/params explicitly
+# assigned below, use default_job_handlers to specify which handlers should be
+# used for jobs without explicit handlers.
+#default_job_handlers = main
+
# This enables splitting of jobs into tasks, if specified by the particular tool config.
# This is a new feature and not recommended for production servers yet.
#use_tasked_jobs = False
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/de97f00ff16f/
changeset: de97f00ff16f
user: greg
date: 2012-03-30 20:37:55
summary: Additional setting to fix tool shed config which was broken due to the additions in the Galaxy webapp in recent commits.
affected #: 1 file
diff -r 25db1453bb2e55d7d58d407364ce71001bd24096 -r de97f00ff16feb64cd26d3393a68c303f33e5e33 lib/galaxy/webapps/community/config.py
--- a/lib/galaxy/webapps/community/config.py
+++ b/lib/galaxy/webapps/community/config.py
@@ -84,6 +84,11 @@
self.screencasts_url = kwargs.get( 'screencasts_url', None )
self.log_events = False
self.cloud_controller_instance = False
+ self.server_name = ''
+ self.job_manager = ''
+ self.job_handlers = []
+ self.tool_handlers = []
+ self.tool_runners = []
# Proxy features
self.apache_xsendfile = kwargs.get( 'apache_xsendfile', False )
self.nginx_x_accel_redirect_base = kwargs.get( 'nginx_x_accel_redirect_base', False )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/0626ddb49084/
changeset: 0626ddb49084
user: natefoo
date: 2012-03-30 20:10:45
summary: Allow for job handler selection based on job params (like source).
affected #: 4 files
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -198,13 +198,27 @@
if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
self.track_jobs_in_database = False
# Store per-tool runner configs
+ self.tool_handlers = self.__read_tool_job_config( global_conf_parser, 'galaxy:tool_handlers', 'name' )
+ self.tool_runners = self.__read_tool_job_config( global_conf_parser, 'galaxy:tool_runners', 'url' )
+ self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' )
+ # Cloud configuration options
+ self.enable_cloud_launch = string_as_bool( kwargs.get( 'enable_cloud_launch', False ) )
+ # Galaxy messaging (AMQP) configuration options
+ self.amqp = {}
try:
- tool_runners_config = global_conf_parser.items("galaxy:tool_runners")
+ amqp_config = global_conf_parser.items("galaxy_amqp")
+ except ConfigParser.NoSectionError:
+ amqp_config = {}
+ for k, v in amqp_config:
+ self.amqp[k] = v
+ def __read_tool_job_config( self, global_conf_parser, section, key ):
+ try:
+ tool_runners_config = global_conf_parser.items( section )
# Process config to group multiple configs for the same tool.
- tool_runners = {}
+ rval = {}
for entry in tool_runners_config:
- tool_config, url = entry
+ tool_config, val = entry
tool = None
runner_dict = {}
if tool_config.find("[") != -1:
@@ -219,29 +233,18 @@
tool = tool_config
# Add runner URL.
- runner_dict[ 'url' ] = url
+ runner_dict[ key ] = val
# Create tool entry if necessary.
- if tool not in tool_runners:
- tool_runners[ tool ] = []
+ if tool not in rval:
+ rval[ tool ] = []
# Add entry to runners.
- tool_runners[ tool ].append( runner_dict )
+ rval[ tool ].append( runner_dict )
- self.tool_runners = tool_runners
+ return rval
except ConfigParser.NoSectionError:
- self.tool_runners = []
- self.datatypes_config = kwargs.get( 'datatypes_config_file', 'datatypes_conf.xml' )
- # Cloud configuration options
- self.enable_cloud_launch = string_as_bool( kwargs.get( 'enable_cloud_launch', False ) )
- # Galaxy messaging (AMQP) configuration options
- self.amqp = {}
- try:
- amqp_config = global_conf_parser.items("galaxy_amqp")
- except ConfigParser.NoSectionError:
- amqp_config = {}
- for k, v in amqp_config:
- self.amqp[k] = v
+ return []
def get( self, key, default ):
return self.config_dict.get( key, default )
def get_bool( self, key, default ):
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/jobs/manager.py
--- a/lib/galaxy/jobs/manager.py
+++ b/lib/galaxy/jobs/manager.py
@@ -144,7 +144,7 @@
pass
for job in jobs_to_check:
- job.handler = self.__select_handler( job )
+ job.handler = self.__get_handler( job )
log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) )
self.sa_session.add( job )
@@ -157,9 +157,15 @@
for job in jobs_to_check:
self.job_handler.job_queue.put( job.id, job.tool_id )
- def __select_handler( self, job ):
- # TODO: handler selection based on params, tool, etc.
- return random.choice( self.app.config.job_handlers )
+ def __get_handler( self, job ):
+ try:
+ params = None
+ if job.params:
+ params = from_json_string( job.params )
+ return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params )
+ except:
+ log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) )
+ return random.choice( self.app.config.job_handlers )
def put( self, job_id, tool ):
"""Add a job to the queue (by job identifier)"""
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d lib/galaxy/tools/__init__.py
--- a/lib/galaxy/tools/__init__.py
+++ b/lib/galaxy/tools/__init__.py
@@ -5,7 +5,7 @@
pkg_resources.require( "simplejson" )
-import logging, os, string, sys, tempfile, glob, shutil, types, urllib, subprocess
+import logging, os, string, sys, tempfile, glob, shutil, types, urllib, subprocess, random
import simplejson
import binascii
from UserDict import DictMixin
@@ -682,31 +682,35 @@
if tool_version:
return tool_version.get_version_ids( self.app )
return []
- def get_job_runner( self, job_params=None ):
- # Look through runners to find one with matching parameters.
- selected_runner = None
- if len( self.job_runners ) == 1:
- # Most tools have a single runner.
- selected_runner = self.job_runners[0]
+ def __get_job_run_config( self, run_configs, key, job_params=None ):
+ # Look through runners/handlers to find one with matching parameters.
+ available_configs = []
+ if len( run_configs ) == 1:
+ # Most tools have a single config.
+ return run_configs[0][ key ] # return to avoid random when this will be the case most of the time
elif job_params is None:
- # Use job runner with no params
- for runner in self.job_runners:
- if "params" not in runner:
- selected_runner = runner
+ # Use job config with no params
+ for config in run_configs:
+ if "params" not in config:
+ available_configs.append( config )
else:
- # Find runner with matching parameters.
- for runner in self.job_runners:
- if "params" in runner:
+ # Find config with matching parameters.
+ for config in run_configs:
+ if "params" in config:
match = True
- runner_params = runner[ "params" ]
+ config_params = config[ "params" ]
for param, value in job_params.items():
- if param not in runner_params or \
- runner_params[ param ] != job_params[ param ]:
+ if param not in config_params or \
+ config_params[ param ] != job_params[ param ]:
match = False
break
if match:
- selected_runner = runner
- return selected_runner[ "url" ]
+ available_configs.append( config )
+ return random.choice( available_configs )[ key ]
+ def get_job_runner( self, job_params=None ):
+ return self.__get_job_run_config( self.job_runners, key='url', job_params=job_params )
+ def get_job_handler( self, job_params=None ):
+ return self.__get_job_run_config( self.job_handlers, key='name', job_params=job_params )
def parse( self, root, guid=None ):
"""
Read tool configuration from the element `root` and fill in `self`.
@@ -773,6 +777,12 @@
self.parallelism = ToolParallelismInfo(parallelism)
else:
self.parallelism = None
+ # Set job handler(s). Each handler is a dict with 'url' and, optionally, 'params'.
+ self_id = self.id.lower()
+ self.job_handlers = [ { "name" : name } for name in self.app.config.job_handlers ]
+ # Set custom handler(s) if they're defined.
+ if self_id in self.app.config.tool_handlers:
+ self.job_handlers = self.app.config.tool_handlers[ self_id ]
# Set job runner(s). Each runner is a dict with 'url' and, optionally, 'params'.
if self.app.config.start_job_runners is None:
# Jobs are always local regardless of tool config if no additional
@@ -782,7 +792,6 @@
# Set job runner to the cluster default
self.job_runners = [ { "url" : self.app.config.default_cluster_job_runner } ]
# Set custom runner(s) if they're defined.
- self_id = self.id.lower()
if self_id in self.app.config.tool_runners:
self.job_runners = self.app.config.tool_runners[ self_id ]
# Is this a 'hidden' tool (hidden in tool menu)
diff -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 -r 0626ddb49084b94248e9621d584409d398fd546d universe_wsgi.ini.sample
--- a/universe_wsgi.ini.sample
+++ b/universe_wsgi.ini.sample
@@ -664,18 +664,30 @@
#pbs_stage_path =
#pbs_dataset_server =
-# ---- Tool Job Runners -----------------------------------------------------
+# ---- Per-Tool Job Management ----------------------------------------------
-# Individual per-tool job runner overrides. Parameters can be included to define
-# multiple runners per tool. E.g. to run Cufflinks jobs initiated from Trackster
+# Per-tool job handler and runner overrides. Parameters can be included to define multiple
+# runners per tool. E.g. to run Cufflinks jobs initiated from Trackster
# differently than standard Cufflinks jobs:
-# cufflinks = local:///
-# cufflinks[source@trackster] = local:///
-# If not listed here, a tool will run with the runner defined with
-# default_cluster_job_runner.
+#
+# cufflinks = local:///
+# cufflinks[source@trackster] = local:///
+
+[galaxy:tool_handlers]
+
+# By default, Galaxy will select a handler at random from the list of
+# job_handlers set above. You can override as in the following examples:
+#
+#upload1 = upload_handler
+#cufflinks[source@trackster] = realtime_handler
[galaxy:tool_runners]
+# If not listed here, a tool will run with the runner defined with
+# default_cluster_job_runner. These overrides for local:/// are done because
+# these tools can fetch data from remote sites, which may not be suitable to
+# run on a cluster (if it does not have access to the Internet, for example).
+
biomart = local:///
encode_db1 = local:///
hbvar = local:///
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/12b14f3e78e9/
changeset: 12b14f3e78e9
user: natefoo
date: 2012-03-30 15:51:04
summary: Fix the data transfer script to work with new API library content ids.
affected #: 1 file
diff -r db6b788a1fe6f8ce12f91844bd8658f6168f2ff4 -r 12b14f3e78e915c2a87649fe406386fbcd4b8c02 scripts/galaxy_messaging/server/data_transfer.py
--- a/scripts/galaxy_messaging/server/data_transfer.py
+++ b/scripts/galaxy_messaging/server/data_transfer.py
@@ -177,7 +177,7 @@
self.update_status( SampleDataset.transfer_status.ADD_TO_LIBRARY )
try:
data = {}
- data[ 'folder_id' ] = api.encode_id( self.config_id_secret, '%s.%s' % ( 'folder', self.folder_id ) )
+ data[ 'folder_id' ] = 'F%s' % api.encode_id( self.config_id_secret, self.folder_id )
data[ 'file_type' ] = 'auto'
data[ 'server_dir' ] = self.server_dir
data[ 'dbkey' ] = ''
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/1c3b74544a2f/
changeset: 1c3b74544a2f
user: natefoo
date: 2012-03-30 04:47:17
summary: Remove unused and faulty import of sqlalchemy expressions in galaxy.jobs
affected #: 1 file
diff -r 4f6c38ca353861351e286016bbbddc126ca566c4 -r 1c3b74544a2f0d2d135c8d3f70803d1284278d5b lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -11,8 +11,6 @@
import traceback
import subprocess
-from sqlalchemy.sql.expression import and_, or_
-
import galaxy
from galaxy import util, model
from galaxy.datatypes.tabular import *
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/4f6c38ca3538/
changeset: 4f6c38ca3538
user: natefoo
date: 2012-03-29 23:25:56
summary: Support multiple job runners by creating a job manager that designates a job handler to run jobs, thereby avoiding the "new job" race condition.
affected #: 11 files
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/app.py
--- a/lib/galaxy/app.py
+++ b/lib/galaxy/app.py
@@ -124,8 +124,9 @@
if self.config.get_bool( 'enable_beta_job_managers', False ):
from jobs import transfer_manager
self.transfer_manager = transfer_manager.TransferManager( self )
- # Start the job queue
- self.job_manager = jobs.JobManager( self )
+ # Start the job manager
+ from jobs import manager
+ self.job_manager = manager.JobManager( self )
# FIXME: These are exposed directly for backward compatibility
self.job_queue = self.job_manager.job_queue
self.job_stop_queue = self.job_manager.job_stop_queue
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -184,7 +184,20 @@
# Heartbeat log file name override
if global_conf is not None:
self.heartbeat_log = global_conf.get( 'heartbeat_log', 'heartbeat.log' )
- #Store per-tool runner configs.
+ # Determine which 'server:' this is
+ self.server_name = 'main'
+ for arg in sys.argv:
+ # Crummy, but PasteScript does not give you a way to determine this
+ if arg.lower().startswith('--server-name='):
+ self.server_name = arg.split('=', 1)[-1]
+ # Store advanced job management config
+ self.job_manager = kwargs.get('job_manager', self.server_name).strip()
+ self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ]
+ # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory)
+ self.track_jobs_in_database = True
+ if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
+ self.track_jobs_in_database = False
+ # Store per-tool runner configs
try:
tool_runners_config = global_conf_parser.items("galaxy:tool_runners")
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -1,4 +1,17 @@
-import logging, threading, sys, os, time, traceback, shutil
+"""
+Support for running a tool in Galaxy via an internal job management system
+"""
+
+import os
+import sys
+import pwd
+import time
+import logging
+import threading
+import traceback
+import subprocess
+
+from sqlalchemy.sql.expression import and_, or_
import galaxy
from galaxy import util, model
@@ -9,51 +22,16 @@
from galaxy.util.json import from_json_string
from galaxy.util.expressions import ExpressionContext
from galaxy.jobs.actions.post import ActionBox
-import subprocess, pwd
from galaxy.exceptions import ObjectInvalid
-from sqlalchemy.sql.expression import and_, or_
-
-import pkg_resources
-pkg_resources.require( "PasteDeploy" )
-
-from Queue import Queue, Empty
-
log = logging.getLogger( __name__ )
-# States for running a job. These are NOT the same as data states
-JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
-
# This file, if created in the job's working directory, will be used for
# setting advanced metadata properties on the job and its associated outputs.
# This interface is currently experimental, is only used by the upload tool,
# and should eventually become API'd
TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json'
-class JobManager( object ):
- """
- Highest level interface to job management.
-
- TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly.
- This should be decoupled.
- """
- def __init__( self, app ):
- self.app = app
- if self.app.config.get_bool( "enable_job_running", True ):
- # The dispatcher launches the underlying job runners
- self.dispatcher = DefaultJobDispatcher( app )
- # Queues for starting and stopping jobs
- self.job_queue = JobQueue( app, self.dispatcher )
- self.job_stop_queue = JobStopQueue( app, self.dispatcher )
- if self.app.config.enable_beta_job_managers:
- from galaxy.jobs.deferred import DeferredJobQueue
- self.deferred_job_queue = DeferredJobQueue( app )
- else:
- self.job_queue = self.job_stop_queue = NoopQueue()
- def shutdown( self ):
- self.job_queue.shutdown()
- self.job_stop_queue.shutdown()
-
class Sleeper( object ):
"""
Provides a 'sleep' method that sleeps for a number of seconds *unless*
@@ -70,238 +48,6 @@
self.condition.notify()
self.condition.release()
-class JobQueue( object ):
- """
- Job manager, waits for jobs to be runnable and then dispatches to
- a JobRunner.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, dispatcher ):
- """Start the job manager"""
- self.app = app
- self.sa_session = app.model.context
- self.job_lock = False
- # Should we read jobs form the database, or use an in memory queue
- self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False )
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
- # Contains jobs that are waiting (only use from monitor thread)
- ## This and jobs_to_check[] are closest to a "Job Queue"
- self.waiting_jobs = []
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.dispatcher = dispatcher
- self.monitor_thread = threading.Thread( target=self.__monitor )
- # Recover jobs at startup
- if app.config.get_bool( 'enable_job_recovery', True ):
- self.__check_jobs_at_startup()
- # Start the queue
- self.monitor_thread.start()
- log.info( "job manager started" )
-
- def __check_jobs_at_startup( self ):
- """
- Checks all jobs that are in the 'new', 'queued' or 'running' state in
- the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
- """
- model = self.app.model # DBTODO Why?
- for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ):
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
- else:
- log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id )
- self.queue.put( ( job.id, job.tool_id ) )
- for job in self.sa_session.query( model.Job ).enable_eagerloads( False ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ):
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) )
- JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' )
- elif job.job_runner_name is None:
- log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id )
- if self.track_jobs_in_database:
- job.state = model.Job.states.NEW
- else:
- self.queue.put( ( job.id, job.tool_id ) )
- else:
- job_wrapper = JobWrapper( job, self )
- self.dispatcher.recover( job, job_wrapper )
- if self.sa_session.dirty:
- self.sa_session.flush()
-
- def __monitor( self ):
- """
- Continually iterate the waiting jobs, checking is each is ready to
- run and dispatching if so.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.__monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def __monitor_step( self ):
- """
- Called repeatedly by `monitor` to process waiting jobs. Gets any new
- jobs (either from the database or from its own queue), then iterates
- over all new and waiting jobs to check the state of the jobs each
- depends on. If the job has dependencies that have not finished, it
- it goes to the waiting queue. If the job has dependencies with errors,
- it is marked as having errors and removed from the queue. Otherwise,
- the job is dispatched.
- """
- # Pull all new jobs from the queue at once
- jobs_to_check = []
- if self.track_jobs_in_database:
- # Clear the session so we get fresh states for job and all datasets
- self.sa_session.expunge_all()
- # Fetch all new jobs
- jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( model.Job.state == model.Job.states.NEW ).all()
- else:
- # Get job objects and append to watch queue for any which were
- # previously waiting
- for job_id in self.waiting_jobs:
- jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, tool_id = message
- # Get the job object and append to watch queue
- jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
- except Empty:
- pass
- # Iterate over new and waiting jobs and look for any that are
- # ready to run
- new_waiting_jobs = []
- for job in jobs_to_check:
- try:
- # Check the job's dependencies, requeue if they're not done
- job_state = self.__check_if_ready_to_run( job )
- if job_state == JOB_WAIT:
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
- elif job_state == JOB_INPUT_ERROR:
- log.info( "job %d unable to run: one or more inputs in error state" % job.id )
- elif job_state == JOB_INPUT_DELETED:
- log.info( "job %d unable to run: one or more inputs deleted" % job.id )
- elif job_state == JOB_READY:
- if self.job_lock:
- log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id )
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
- else:
- self.dispatcher.put( JobWrapper( job, self ) )
- log.info( "job %d dispatched" % job.id )
- elif job_state == JOB_DELETED:
- log.info( "job %d deleted by user while still queued" % job.id )
- elif job_state == JOB_ADMIN_DELETED:
- log.info( "job %d deleted by admin while still queued" % job.id )
- else:
- log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) )
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
- except Exception:
- log.exception( "failure running job %d" % job.id )
- # Update the waiting list
- self.waiting_jobs = new_waiting_jobs
- # Done with the session
- self.sa_session.remove()
-
- def __check_if_ready_to_run( self, job ):
- """
- Check if a job is ready to run by verifying that each of its input
- datasets is ready (specifically in the OK state). If any input dataset
- has an error, fail the job and return JOB_INPUT_ERROR. If any input
- dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all
- input datasets are in OK state, return JOB_READY indicating that the
- job can be dispatched. Otherwise, return JOB_WAIT indicating that input
- datasets are still being prepared.
- """
- if job.state == model.Job.states.DELETED:
- return JOB_DELETED
- elif job.state == model.Job.states.ERROR:
- return JOB_ADMIN_DELETED
- elif self.app.config.enable_quotas:
- quota = self.app.quota_agent.get_quota( job.user )
- if quota is not None:
- try:
- usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
- if usage > quota:
- return JOB_WAIT
- except AssertionError, e:
- pass # No history, should not happen with an anon user
- for dataset_assoc in job.input_datasets + job.input_library_datasets:
- idata = dataset_assoc.dataset
- if not idata:
- continue
- # don't run jobs for which the input dataset was deleted
- if idata.deleted:
- JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
- return JOB_INPUT_DELETED
- # an error in the input data causes us to bail immediately
- elif idata.state == idata.states.ERROR:
- JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state == idata.states.FAILED_METADATA:
- JobWrapper( job, self ).fail( "input data %d failed to properly set metadata" % ( idata.hid ) )
- return JOB_INPUT_ERROR
- elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
- # need to requeue
- return JOB_WAIT
- return self.__check_user_jobs( job )
-
- def __check_user_jobs( self, job ):
- if not self.app.config.user_job_limit:
- return JOB_READY
- if job.user:
- count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( and_( model.Job.user_id == job.user.id,
- or_( model.Job.state == model.Job.states.RUNNING,
- model.Job.state == model.Job.states.QUEUED ) ) ).count()
- elif job.galaxy_session:
- count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( and_( model.Job.session_id == job.galaxy_session.id,
- or_( model.Job.state == model.Job.states.RUNNING,
- model.Job.state == model.Job.states.QUEUED ) ) ).count()
- else:
- log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id )
- return JOB_READY
- if count >= self.app.config.user_job_limit:
- return JOB_WAIT
- return JOB_READY
-
- def put( self, job_id, tool ):
- """Add a job to the queue (by job identifier)"""
- if not self.track_jobs_in_database:
- self.queue.put( ( job_id, tool.id ) )
- self.sleeper.wake()
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job queue stopped" )
- self.dispatcher.shutdown()
-
class JobWrapper( object ):
"""
Wraps a 'model.Job' with convenience methods for running processes and
@@ -1177,179 +923,13 @@
# There is no metadata setting for tasks. This is handled after the merge, at the job level.
return ""
-class DefaultJobDispatcher( object ):
- def __init__( self, app ):
- self.app = app
- self.job_runners = {}
- start_job_runners = ["local"]
- if app.config.start_job_runners is not None:
- start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] )
- if app.config.use_tasked_jobs:
- start_job_runners.append("tasks")
- for name in start_job_runners:
- self._load_plugin( name )
-
- def _load_plugin( self, name ):
- module_name = 'galaxy.jobs.runners.' + name
- try:
- module = __import__( module_name )
- except:
- log.exception( 'Job runner is not loadable: %s' % module_name )
- return
- for comp in module_name.split( "." )[1:]:
- module = getattr( module, comp )
- if '__all__' not in dir( module ):
- log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name )
- return
- for obj in module.__all__:
- display_name = ':'.join( ( module_name, obj ) )
- runner = getattr( module, obj )
- self.job_runners[name] = runner( self.app )
- log.debug( 'Loaded job runner: %s' % display_name )
-
- def __get_runner_name( self, job_wrapper ):
- if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper):
- runner_name = "tasks"
- else:
- runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0]
- return runner_name
-
- def put( self, job_wrapper ):
- try:
- runner_name = self.__get_runner_name( job_wrapper )
- if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper):
- #DBTODO Refactor
- log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
- else:
- log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
- self.job_runners[runner_name].put( job_wrapper )
- except KeyError:
- log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
- job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
-
- def stop( self, job ):
- runner_name = ( job.job_runner_name.split(":", 1) )[0]
- log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) )
- try:
- self.job_runners[runner_name].stop_job( job )
- except KeyError:
- log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
- # Job and output dataset states have already been updated, so nothing is done here.
-
- def recover( self, job, job_wrapper ):
- runner_name = ( job.job_runner_name.split(":", 1) )[0]
- log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
- try:
- self.job_runners[runner_name].recover( job, job_wrapper )
- except KeyError:
- log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
- job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
-
- def shutdown( self ):
- for runner in self.job_runners.itervalues():
- runner.shutdown()
-
-class JobStopQueue( object ):
- """
- A queue for jobs which need to be terminated prematurely.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, dispatcher ):
- self.app = app
- self.sa_session = app.model.context
- self.dispatcher = dispatcher
-
- self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False )
-
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
-
- # Contains jobs that are waiting (only use from monitor thread)
- self.waiting = []
-
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( target=self.monitor )
- self.monitor_thread.start()
- log.info( "job stopper started" )
-
- def monitor( self ):
- """
- Continually iterate the waiting jobs, stop any that are found.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def monitor_step( self ):
- """
- Called repeatedly by `monitor` to stop jobs.
- """
- # Pull all new jobs from the queue at once
- jobs_to_check = []
- if self.track_jobs_in_database:
- # Clear the session so we get fresh states for job and all datasets
- self.sa_session.expunge_all()
- # Fetch all new jobs
- newly_deleted_jobs = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( model.Job.state == model.Job.states.DELETED_NEW ).all()
- for job in newly_deleted_jobs:
- jobs_to_check.append( ( job, None ) )
- # Also pull from the queue (in the case of Administrative stopped jobs)
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, error_msg = message
- # Get the job object and append to watch queue
- jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
- except Empty:
- pass
- for job, error_msg in jobs_to_check:
- if error_msg is not None:
- job.state = job.states.ERROR
- job.info = error_msg
- else:
- job.state = job.states.DELETED
- self.sa_session.add( job )
- self.sa_session.flush()
- if job.job_runner_name is not None:
- # tell the dispatcher to stop the job
- self.dispatcher.stop( job )
-
- def put( self, job_id, error_msg=None ):
- self.queue.put( ( job_id, error_msg ) )
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job stopper stopped" )
-
class NoopQueue( object ):
"""
Implements the JobQueue / JobStopQueue interface but does nothing
"""
def put( self, *args ):
return
+ def put_stop( self, *args ):
+ return
def shutdown( self ):
return
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/handler.py
--- /dev/null
+++ b/lib/galaxy/jobs/handler.py
@@ -0,0 +1,430 @@
+"""
+Galaxy job handler, prepares, runs, tracks, and finishes Galaxy jobs
+"""
+
+import os
+import time
+import logging
+import threading
+from Queue import Queue, Empty
+
+from sqlalchemy.sql.expression import and_, or_
+
+from galaxy import util, model
+from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper
+
+log = logging.getLogger( __name__ )
+
+# States for running a job. These are NOT the same as data states
+JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted'
+
+class JobHandler( object ):
+ """
+ Handle the preparation, running, tracking, and finishing of jobs
+ """
+ def __init__( self, app ):
+ self.app = app
+ # The dispatcher launches the underlying job runners
+ self.dispatcher = DefaultJobDispatcher( app )
+ # Queues for starting and stopping jobs
+ self.job_queue = JobHandlerQueue( app, self.dispatcher )
+ self.job_stop_queue = JobHandlerStopQueue( app, self.dispatcher )
+ def start( self ):
+ self.job_queue.start()
+ def shutdown( self ):
+ self.job_queue.shutdown()
+ self.job_stop_queue.shutdown()
+
+class JobHandlerQueue( object ):
+ """
+ Job manager, waits for jobs to be runnable and then dispatches to
+ a JobRunner.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, dispatcher ):
+ """Start the job manager"""
+ self.app = app
+ self.dispatcher = dispatcher
+
+ self.sa_session = app.model.context
+ self.track_jobs_in_database = self.app.config.track_jobs_in_database
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+ # Contains jobs that are waiting (only use from monitor thread)
+ ## This and jobs_to_check[] are closest to a "Job Queue"
+ self.waiting_jobs = []
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.__monitor )
+
+ def start( self ):
+ """
+ The JobManager should start, and then start its Handler, if it has one.
+ """
+ # Recover jobs at startup
+ self.__check_jobs_at_startup()
+ # Start the queue
+ self.monitor_thread.start()
+ log.info( "job handler queue started" )
+
+ def __check_jobs_at_startup( self ):
+ """
+ Checks all jobs that are in the 'new', 'queued' or 'running' state in
+ the database and requeues or cleans up as necessary. Only run as the
+ job manager starts.
+ """
+ for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( ( model.Job.state == model.Job.states.NEW ) \
+ | ( model.Job.state == model.Job.states.RUNNING ) \
+ | ( model.Job.state == model.Job.states.QUEUED ) ) \
+ & ( model.Job.handler == self.app.config.server_name ) ):
+ if job.tool_id not in self.app.toolbox.tools_by_id:
+ log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
+ JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
+ elif job.job_runner_name is None:
+ log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
+ if self.track_jobs_in_database:
+ job.state = model.Job.states.NEW
+ else:
+ self.queue.put( ( job.id, job.tool_id ) )
+ else:
+ job_wrapper = JobWrapper( job, self )
+ self.dispatcher.recover( job, job_wrapper )
+ if self.sa_session.dirty:
+ self.sa_session.flush()
+
+ def __monitor( self ):
+ """
+ Continually iterate the waiting jobs, checking is each is ready to
+ run and dispatching if so.
+ """
+ while self.running:
+ try:
+ self.__monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def __monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to process waiting jobs. Gets any new
+ jobs (either from the database or from its own queue), then iterates
+ over all new and waiting jobs to check the state of the jobs each
+ depends on. If the job has dependencies that have not finished, it
+ it goes to the waiting queue. If the job has dependencies with errors,
+ it is marked as having errors and removed from the queue. Otherwise,
+ the job is dispatched.
+ """
+ # Pull all new jobs from the queue at once
+ jobs_to_check = []
+ if self.track_jobs_in_database:
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( model.Job.state == model.Job.states.NEW ) \
+ & ( model.Job.handler == self.app.config.server_name ) ).all()
+ else:
+ # Get job objects and append to watch queue for any which were
+ # previously waiting
+ for job_id in self.waiting_jobs:
+ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, tool_id = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
+ except Empty:
+ pass
+ # Iterate over new and waiting jobs and look for any that are
+ # ready to run
+ new_waiting_jobs = []
+ for job in jobs_to_check:
+ try:
+ # Check the job's dependencies, requeue if they're not done
+ job_state = self.__check_if_ready_to_run( job )
+ if job_state == JOB_WAIT:
+ if not self.track_jobs_in_database:
+ new_waiting_jobs.append( job.id )
+ elif job_state == JOB_INPUT_ERROR:
+ log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id )
+ elif job_state == JOB_INPUT_DELETED:
+ log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id )
+ elif job_state == JOB_READY:
+ self.dispatcher.put( JobWrapper( job, self ) )
+ log.info( "(%d) Job dispatched" % job.id )
+ elif job_state == JOB_DELETED:
+ log.info( "(%d) Job deleted by user while still queued" % job.id )
+ elif job_state == JOB_ADMIN_DELETED:
+ log.info( "(%d) Job deleted by admin while still queued" % job.id )
+ else:
+ log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) )
+ if not self.track_jobs_in_database:
+ new_waiting_jobs.append( job.id )
+ except Exception:
+ log.exception( "failure running job %d" % job.id )
+ # Update the waiting list
+ self.waiting_jobs = new_waiting_jobs
+ # Done with the session
+ self.sa_session.remove()
+
+ def __check_if_ready_to_run( self, job ):
+ """
+ Check if a job is ready to run by verifying that each of its input
+ datasets is ready (specifically in the OK state). If any input dataset
+ has an error, fail the job and return JOB_INPUT_ERROR. If any input
+ dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all
+ input datasets are in OK state, return JOB_READY indicating that the
+ job can be dispatched. Otherwise, return JOB_WAIT indicating that input
+ datasets are still being prepared.
+ """
+ if job.state == model.Job.states.DELETED:
+ return JOB_DELETED
+ elif job.state == model.Job.states.ERROR:
+ return JOB_ADMIN_DELETED
+ elif self.app.config.enable_quotas:
+ quota = self.app.quota_agent.get_quota( job.user )
+ if quota is not None:
+ try:
+ usage = self.app.quota_agent.get_usage( user=job.user, history=job.history )
+ if usage > quota:
+ return JOB_WAIT
+ except AssertionError, e:
+ pass # No history, should not happen with an anon user
+ for dataset_assoc in job.input_datasets + job.input_library_datasets:
+ idata = dataset_assoc.dataset
+ if not idata:
+ continue
+ # don't run jobs for which the input dataset was deleted
+ if idata.deleted:
+ JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ return JOB_INPUT_DELETED
+ # an error in the input data causes us to bail immediately
+ elif idata.state == idata.states.ERROR:
+ JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state == idata.states.FAILED_METADATA:
+ JobWrapper( job, self ).fail( "input data %d failed to properly set metadata" % ( idata.hid ) )
+ return JOB_INPUT_ERROR
+ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
+ # need to requeue
+ return JOB_WAIT
+ return self.__check_user_jobs( job )
+
+ def __check_user_jobs( self, job ):
+ if not self.app.config.user_job_limit:
+ return JOB_READY
+ if job.user:
+ count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( and_( model.Job.user_id == job.user.id,
+ or_( model.Job.state == model.Job.states.RUNNING,
+ model.Job.state == model.Job.states.QUEUED ) ) ).count()
+ elif job.galaxy_session:
+ count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( and_( model.Job.session_id == job.galaxy_session.id,
+ or_( model.Job.state == model.Job.states.RUNNING,
+ model.Job.state == model.Job.states.QUEUED ) ) ).count()
+ else:
+ log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id )
+ return JOB_READY
+ if count >= self.app.config.user_job_limit:
+ return JOB_WAIT
+ return JOB_READY
+
+ def put( self, job_id, tool_id ):
+ """Add a job to the queue (by job identifier)"""
+ if not self.track_jobs_in_database:
+ self.queue.put( ( job_id, tool_id ) )
+ self.sleeper.wake()
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job handler queue stopped" )
+ self.dispatcher.shutdown()
+
+class JobHandlerStopQueue( object ):
+ """
+ A queue for jobs which need to be terminated prematurely.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, dispatcher ):
+ self.app = app
+ self.dispatcher = dispatcher
+
+ self.sa_session = app.model.context
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+
+ # Contains jobs that are waiting (only use from monitor thread)
+ self.waiting = []
+
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.monitor )
+ self.monitor_thread.start()
+ log.info( "job handler stop queue started" )
+
+ def monitor( self ):
+ """
+ Continually iterate the waiting jobs, stop any that are found.
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to stop jobs.
+ """
+ # Pull all new jobs from the queue at once
+ jobs_to_check = []
+ if self.app.config.track_jobs_in_database:
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ newly_deleted_jobs = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( model.Job.state == model.Job.states.DELETED_NEW ) \
+ & ( model.Job.handler == self.app.config.server_name ) ).all()
+ for job in newly_deleted_jobs:
+ jobs_to_check.append( ( job, None ) )
+ # Also pull from the queue (in the case of Administrative stopped jobs)
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, error_msg = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
+ except Empty:
+ pass
+ for job, error_msg in jobs_to_check:
+ if error_msg is not None:
+ job.state = job.states.ERROR
+ job.info = error_msg
+ else:
+ job.state = job.states.DELETED
+ self.sa_session.add( job )
+ self.sa_session.flush()
+ if job.job_runner_name is not None:
+ # tell the dispatcher to stop the job
+ self.dispatcher.stop( job )
+
+ def put( self, job_id, error_msg=None ):
+ self.queue.put( ( job_id, error_msg ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job handler stop queue stopped" )
+
+class DefaultJobDispatcher( object ):
+ def __init__( self, app ):
+ self.app = app
+ self.job_runners = {}
+ start_job_runners = ["local"]
+ if app.config.start_job_runners is not None:
+ start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] )
+ if app.config.use_tasked_jobs:
+ start_job_runners.append("tasks")
+ for name in start_job_runners:
+ self._load_plugin( name )
+
+ def _load_plugin( self, name ):
+ module_name = 'galaxy.jobs.runners.' + name
+ try:
+ module = __import__( module_name )
+ except:
+ log.exception( 'Job runner is not loadable: %s' % module_name )
+ return
+ for comp in module_name.split( "." )[1:]:
+ module = getattr( module, comp )
+ if '__all__' not in dir( module ):
+ log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name )
+ return
+ for obj in module.__all__:
+ display_name = ':'.join( ( module_name, obj ) )
+ runner = getattr( module, obj )
+ self.job_runners[name] = runner( self.app )
+ log.debug( 'Loaded job runner: %s' % display_name )
+
+ def __get_runner_name( self, job_wrapper ):
+ if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper):
+ runner_name = "tasks"
+ else:
+ runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0]
+ return runner_name
+
+ def put( self, job_wrapper ):
+ try:
+ runner_name = self.__get_runner_name( job_wrapper )
+ if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper):
+ #DBTODO Refactor
+ log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
+ else:
+ log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
+ self.job_runners[runner_name].put( job_wrapper )
+ except KeyError:
+ log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
+ job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
+
+ def stop( self, job ):
+ runner_name = ( job.job_runner_name.split(":", 1) )[0]
+ log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) )
+ try:
+ self.job_runners[runner_name].stop_job( job )
+ except KeyError:
+ log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
+ # Job and output dataset states have already been updated, so nothing is done here.
+
+ def recover( self, job, job_wrapper ):
+ runner_name = ( job.job_runner_name.split(":", 1) )[0]
+ log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) )
+ try:
+ self.job_runners[runner_name].recover( job, job_wrapper )
+ except KeyError:
+ log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
+ job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' )
+
+ def shutdown( self ):
+ for runner in self.job_runners.itervalues():
+ runner.shutdown()
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/manager.py
--- /dev/null
+++ b/lib/galaxy/jobs/manager.py
@@ -0,0 +1,270 @@
+"""
+Top-level Galaxy job manager, moves jobs to handler(s)
+"""
+
+import os
+import time
+import random
+import logging
+import threading
+from Queue import Queue, Empty
+
+from sqlalchemy.sql.expression import and_, or_
+
+from galaxy import model
+from galaxy.jobs import handler, Sleeper, NoopQueue
+
+log = logging.getLogger( __name__ )
+
+class JobManager( object ):
+ """
+ Highest level interface to job management.
+
+ TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly.
+ This should be decoupled.
+ """
+ def __init__( self, app ):
+ self.app = app
+ self.job_handler = NoopHandler()
+ if self.app.config.server_name in self.app.config.job_handlers:
+ self.job_handler = handler.JobHandler( app )
+ if self.app.config.server_name == self.app.config.job_manager:
+ job_handler = NoopHandler()
+ # In the case that webapp == manager == handler, pass jobs in memory
+ if not self.app.config.track_jobs_in_database:
+ job_handler = self.job_handler
+ # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database
+ self.job_queue = JobManagerQueue( app, job_handler )
+ self.job_stop_queue = JobManagerStopQueue( app, job_handler )
+ if self.app.config.enable_beta_job_managers:
+ from galaxy.jobs.deferred import DeferredJobQueue
+ self.deferred_job_queue = DeferredJobQueue( app )
+ else:
+ self.job_queue = self.job_stop_queue = NoopQueue()
+ self.job_handler.start()
+ def shutdown( self ):
+ self.job_queue.shutdown()
+ self.job_stop_queue.shutdown()
+ self.job_handler.shutdown()
+
+class JobManagerQueue( object ):
+ """
+ Job manager, waits for jobs to be runnable and then dispatches to a
+ JobHandler.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, job_handler ):
+ self.app = app
+ self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory
+
+ self.sa_session = app.model.context
+ self.job_lock = False
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.__monitor )
+ # Recover jobs at startup
+ self.__check_jobs_at_startup()
+ # Start the queue
+ self.monitor_thread.start()
+ log.info( "job manager queue started" )
+
+ def __check_jobs_at_startup( self ):
+ """
+ Checks all jobs that are in the 'new', 'queued' or 'running' state in
+ the database and requeues or cleans up as necessary. Only run as the
+ job manager starts.
+ """
+ for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( ( model.Job.state == model.Job.states.NEW ) \
+ | ( model.Job.state == model.Job.states.RUNNING ) \
+ | ( model.Job.state == model.Job.states.QUEUED ) ) \
+ & ( model.Job.handler == None ) ):
+ if job.tool_id not in self.app.toolbox.tools_by_id:
+ log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
+ JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
+ else:
+ job.handler = self.__select_handler( job ) # handler's recovery method will take it from here
+ log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) )
+ if self.sa_session.dirty:
+ self.sa_session.flush()
+
+ def __monitor( self ):
+ """
+ Continually iterate the waiting jobs and dispatch to a handler
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.__monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def __monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to process waiting jobs. Gets any new
+ jobs (either from the database or from its own queue), then assigns a
+ handler.
+ """
+ # Do nothing if the queue is locked
+ if self.job_lock:
+ log.info( 'Job queue is administratively locked, sleeping...' )
+ time.sleep( 10 )
+ return
+ # Pull all new jobs from the queue at once
+ jobs_to_check = []
+ if self.app.config.track_jobs_in_database:
+ # Clear the session so we get fresh states for job and all datasets
+ self.sa_session.expunge_all()
+ # Fetch all new jobs
+ jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
+ .filter( ( model.Job.state == model.Job.states.NEW ) \
+ & ( model.Job.handler == None ) ).all()
+ else:
+ # Get job objects and append to watch queue for any which were
+ # previously waiting
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, tool_id = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
+ except Empty:
+ pass
+
+ for job in jobs_to_check:
+ job.handler = self.__select_handler( job )
+ log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) )
+ self.sa_session.add( job )
+
+ # If tracking in the database, handlers will pick up the job now
+ self.sa_session.flush()
+
+ time.sleep( 5 )
+
+ # This only does something in the case that there is only one handler and it is this Galaxy process
+ for job in jobs_to_check:
+ self.job_handler.job_queue.put( job.id, job.tool_id )
+
+ def __select_handler( self, job ):
+ # TODO: handler selection based on params, tool, etc.
+ return random.choice( self.app.config.job_handlers )
+
+ def put( self, job_id, tool ):
+ """Add a job to the queue (by job identifier)"""
+ if not self.app.config.track_jobs_in_database:
+ self.queue.put( ( job_id, tool.id ) )
+ self.sleeper.wake()
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.app.config.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job manager queue stopped" )
+ self.dispatcher.shutdown()
+
+class JobManagerStopQueue( object ):
+ """
+ A queue for jobs which need to be terminated prematurely.
+ """
+ STOP_SIGNAL = object()
+ def __init__( self, app, job_handler ):
+ self.app = app
+ self.job_handler = job_handler
+
+ self.sa_session = app.model.context
+
+ # Keep track of the pid that started the job manager, only it
+ # has valid threads
+ self.parent_pid = os.getpid()
+ # Contains new jobs. Note this is not used if track_jobs_in_database is True
+ self.queue = Queue()
+
+ # Contains jobs that are waiting (only use from monitor thread)
+ self.waiting = []
+
+ # Helper for interruptable sleep
+ self.sleeper = Sleeper()
+ self.running = True
+ self.monitor_thread = threading.Thread( target=self.monitor )
+ self.monitor_thread.start()
+ log.info( "job manager stop queue started" )
+
+ def monitor( self ):
+ """
+ Continually iterate the waiting jobs, stop any that are found.
+ """
+ # HACK: Delay until after forking, we need a way to do post fork notification!!!
+ time.sleep( 10 )
+ while self.running:
+ try:
+ self.monitor_step()
+ except:
+ log.exception( "Exception in monitor_step" )
+ # Sleep
+ self.sleeper.sleep( 1 )
+
+ def monitor_step( self ):
+ """
+ Called repeatedly by `monitor` to stop jobs.
+ """
+ jobs_to_check = []
+ # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs)
+ try:
+ while 1:
+ message = self.queue.get_nowait()
+ if message is self.STOP_SIGNAL:
+ return
+ # Unpack the message
+ job_id, error_msg = message
+ # Get the job object and append to watch queue
+ jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
+ except Empty:
+ pass
+
+ # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler.
+ for job, error_msg in jobs_to_check:
+ self.job_handler.job_stop_queue.put( job.id, error_msg )
+
+ def put( self, job_id, error_msg=None ):
+ self.queue.put( ( job_id, error_msg ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker thread"""
+ if self.parent_pid != os.getpid():
+ # We're not the real job queue, do nothing
+ return
+ else:
+ log.info( "sending stop signal to worker thread" )
+ self.running = False
+ if not self.app.config.track_jobs_in_database:
+ self.queue.put( self.STOP_SIGNAL )
+ self.sleeper.wake()
+ log.info( "job manager stop queue stopped" )
+
+class NoopHandler( object ):
+ def __init__( self, *args, **kwargs ):
+ self.job_queue = NoopQueue()
+ self.job_stop_queue = NoopQueue()
+ def start( self ):
+ pass
+ def shutdown( self, *args ):
+ pass
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -132,6 +132,7 @@
self.job_runner_external_id = None
self.post_job_actions = []
self.imported = False
+ self.handler = None
def add_parameter( self, name, value ):
self.parameters.append( JobParameter( name, value ) )
@@ -171,14 +172,11 @@
if not dataset.deleted:
return False
return True
- def mark_deleted( self, enable_job_running=True, track_jobs_in_database=False ):
+ def mark_deleted( self, track_jobs_in_database=False ):
"""
Mark this job as deleted, and mark any output datasets as discarded.
"""
- # This could be handled with *just* track_jobs_in_database, but I
- # didn't want to make setting track_jobs_in_database required in
- # non-runner configs.
- if not enable_job_running or track_jobs_in_database:
+ if track_jobs_in_database:
self.state = Job.states.DELETED_NEW
else:
self.state = Job.states.DELETED
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/mapping.py
--- a/lib/galaxy/model/mapping.py
+++ b/lib/galaxy/model/mapping.py
@@ -421,7 +421,8 @@
Column( "job_runner_external_id", String( 255 ) ),
Column( "object_store_id", TrimmedString( 255 ), index=True ),
Column( "imported", Boolean, default=False, index=True ),
- Column( "params", TrimmedString(255), index=True ) )
+ Column( "params", TrimmedString(255), index=True ),
+ Column( "handler", TrimmedString( 255 ), index=True ) )
JobParameter.table = Table( "job_parameter", metadata,
Column( "id", Integer, primary_key=True ),
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/migrate/versions/0094_add_job_handler_col.py
--- /dev/null
+++ b/lib/galaxy/model/migrate/versions/0094_add_job_handler_col.py
@@ -0,0 +1,49 @@
+"""
+Migration script to create "handler" column in job table.
+"""
+
+from sqlalchemy import *
+from sqlalchemy.orm import *
+from migrate import *
+from migrate.changeset import *
+
+import logging
+log = logging.getLogger( __name__ )
+
+# Need our custom types, but don't import anything else from model
+from galaxy.model.custom_types import *
+
+metadata = MetaData( migrate_engine )
+db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) )
+
+# Column to add.
+handler_col = Column( "handler", TrimmedString(255), index=True )
+
+def display_migration_details():
+ print ""
+ print "This migration script adds a 'handler' column to the Job table."
+
+def upgrade():
+ print __doc__
+ metadata.reflect()
+
+ # Add column to Job table.
+ try:
+ Job_table = Table( "job", metadata, autoload=True )
+ handler_col.create( Job_table )
+ assert handler_col is Job_table.c.handler
+
+ except Exception, e:
+ print str(e)
+ log.debug( "Adding column 'handler' to job table failed: %s" % str( e ) )
+
+def downgrade():
+ metadata.reflect()
+
+ # Drop column from Job table.
+ try:
+ Job_table = Table( "job", metadata, autoload=True )
+ handler_col = Job_table.c.handler
+ handler_col.drop()
+ except Exception, e:
+ log.debug( "Dropping column 'handler' from job table failed: %s" % ( str( e ) ) )
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/web/base/controller.py
--- a/lib/galaxy/web/base/controller.py
+++ b/lib/galaxy/web/base/controller.py
@@ -2381,8 +2381,8 @@
deleted = []
msg = None
status = None
- if not trans.app.config.get_bool( "enable_job_running", True ):
- return trans.show_error_message( 'This Galaxy instance is not configured to run jobs. If using multiple servers, please directly access the job running instance to manage jobs.' )
+ if not self.app.config.job_manager != self.app.config.server_name:
+ return trans.show_error_message( 'This Galaxy instance is not the job manager. If using multiple servers, please directly access the job manager instance to manage jobs.' )
job_ids = util.listify( stop )
if job_ids and stop_msg in [ None, '' ]:
msg = 'Please enter an error message to display to the user describing why the job was terminated'
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/web/controllers/dataset.py
--- a/lib/galaxy/web/controllers/dataset.py
+++ b/lib/galaxy/web/controllers/dataset.py
@@ -904,8 +904,7 @@
if job.state in [ self.app.model.Job.states.QUEUED, self.app.model.Job.states.RUNNING, self.app.model.Job.states.NEW ]:
# Are *all* of the job's other output datasets deleted?
if job.check_if_output_datasets_deleted():
- job.mark_deleted( self.app.config.get_bool( 'enable_job_running', True ),
- self.app.config.get_bool( 'track_jobs_in_database', False ) )
+ job.mark_deleted( self.app.config.track_jobs_in_database )
self.app.job_manager.job_stop_queue.put( job.id )
trans.sa_session.flush()
except Exception, e:
diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 universe_wsgi.ini.sample
--- a/universe_wsgi.ini.sample
+++ b/universe_wsgi.ini.sample
@@ -542,14 +542,25 @@
# -- Job Execution
-# If running multiple Galaxy processes, one can be designated as the job
-# runner. For more information, see:
-# http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Web%20Application%20Scal…
-#enable_job_running = True
+# To increase performance of job execution and the web interface, you can
+# separate Galaxy into multiple processes. There are more than one way to do
+# this, and they are explained in detail in the documentation:
+#
+# http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Web%20Application%20Scal…
+#
+# By default, Galaxy manages and executes jobs from within a single process and
+# notifies itself of new jobs via in-memory queues. If you change job_manager
+# and job_handlers from their default values, notification will instead be done
+# using the `state` and `handler` columns of the job table in the database.
-# Should jobs be tracked through the database, rather than in memory.
-# Necessary if you're running the load balanced setup.
-#track_jobs_in_database = False
+# Identify the server_name (the string following server: at the top of this
+# file) which should be designated as the job manager (only one):
+#job_manager = main
+
+# Identify the server_name(s) which should be designated as job handlers
+# (responsible for starting, tracking, finishing, and cleaning up jobs) as a
+# comma-separated list.
+#job_handlers = main
# This enables splitting of jobs into tasks, if specified by the particular tool config.
# This is a new feature and not recommended for production servers yet.
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.