commit/galaxy-central: natefoo: Support multiple job runners by creating a job manager that designates a job handler to run jobs, thereby avoiding the "new job" race condition.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/4f6c38ca3538/ changeset: 4f6c38ca3538 user: natefoo date: 2012-03-29 23:25:56 summary: Support multiple job runners by creating a job manager that designates a job handler to run jobs, thereby avoiding the "new job" race condition. affected #: 11 files diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -124,8 +124,9 @@ if self.config.get_bool( 'enable_beta_job_managers', False ): from jobs import transfer_manager self.transfer_manager = transfer_manager.TransferManager( self ) - # Start the job queue - self.job_manager = jobs.JobManager( self ) + # Start the job manager + from jobs import manager + self.job_manager = manager.JobManager( self ) # FIXME: These are exposed directly for backward compatibility self.job_queue = self.job_manager.job_queue self.job_stop_queue = self.job_manager.job_stop_queue diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -184,7 +184,20 @@ # Heartbeat log file name override if global_conf is not None: self.heartbeat_log = global_conf.get( 'heartbeat_log', 'heartbeat.log' ) - #Store per-tool runner configs. + # Determine which 'server:' this is + self.server_name = 'main' + for arg in sys.argv: + # Crummy, but PasteScript does not give you a way to determine this + if arg.lower().startswith('--server-name='): + self.server_name = arg.split('=', 1)[-1] + # Store advanced job management config + self.job_manager = kwargs.get('job_manager', self.server_name).strip() + self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ] + # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory) + self.track_jobs_in_database = True + if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ): + self.track_jobs_in_database = False + # Store per-tool runner configs try: tool_runners_config = global_conf_parser.items("galaxy:tool_runners") diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1,4 +1,17 @@ -import logging, threading, sys, os, time, traceback, shutil +""" +Support for running a tool in Galaxy via an internal job management system +""" + +import os +import sys +import pwd +import time +import logging +import threading +import traceback +import subprocess + +from sqlalchemy.sql.expression import and_, or_ import galaxy from galaxy import util, model @@ -9,51 +22,16 @@ from galaxy.util.json import from_json_string from galaxy.util.expressions import ExpressionContext from galaxy.jobs.actions.post import ActionBox -import subprocess, pwd from galaxy.exceptions import ObjectInvalid -from sqlalchemy.sql.expression import and_, or_ - -import pkg_resources -pkg_resources.require( "PasteDeploy" ) - -from Queue import Queue, Empty - log = logging.getLogger( __name__ ) -# States for running a job. These are NOT the same as data states -JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' - # This file, if created in the job's working directory, will be used for # setting advanced metadata properties on the job and its associated outputs. # This interface is currently experimental, is only used by the upload tool, # and should eventually become API'd TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' -class JobManager( object ): - """ - Highest level interface to job management. - - TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. - This should be decoupled. - """ - def __init__( self, app ): - self.app = app - if self.app.config.get_bool( "enable_job_running", True ): - # The dispatcher launches the underlying job runners - self.dispatcher = DefaultJobDispatcher( app ) - # Queues for starting and stopping jobs - self.job_queue = JobQueue( app, self.dispatcher ) - self.job_stop_queue = JobStopQueue( app, self.dispatcher ) - if self.app.config.enable_beta_job_managers: - from galaxy.jobs.deferred import DeferredJobQueue - self.deferred_job_queue = DeferredJobQueue( app ) - else: - self.job_queue = self.job_stop_queue = NoopQueue() - def shutdown( self ): - self.job_queue.shutdown() - self.job_stop_queue.shutdown() - class Sleeper( object ): """ Provides a 'sleep' method that sleeps for a number of seconds *unless* @@ -70,238 +48,6 @@ self.condition.notify() self.condition.release() -class JobQueue( object ): - """ - Job manager, waits for jobs to be runnable and then dispatches to - a JobRunner. - """ - STOP_SIGNAL = object() - def __init__( self, app, dispatcher ): - """Start the job manager""" - self.app = app - self.sa_session = app.model.context - self.job_lock = False - # Should we read jobs form the database, or use an in memory queue - self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - # Contains jobs that are waiting (only use from monitor thread) - ## This and jobs_to_check[] are closest to a "Job Queue" - self.waiting_jobs = [] - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.dispatcher = dispatcher - self.monitor_thread = threading.Thread( target=self.__monitor ) - # Recover jobs at startup - if app.config.get_bool( 'enable_job_recovery', True ): - self.__check_jobs_at_startup() - # Start the queue - self.monitor_thread.start() - log.info( "job manager started" ) - - def __check_jobs_at_startup( self ): - """ - Checks all jobs that are in the 'new', 'queued' or 'running' state in - the database and requeues or cleans up as necessary. Only run as the - job manager starts. - """ - model = self.app.model # DBTODO Why? - for job in self.sa_session.query( model.Job ).filter( model.Job.state == model.Job.states.NEW ): - if job.tool_id not in self.app.toolbox.tools_by_id: - log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) - JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) - else: - log.debug( "no runner: %s is still in new state, adding to the jobs queue" %job.id ) - self.queue.put( ( job.id, job.tool_id ) ) - for job in self.sa_session.query( model.Job ).enable_eagerloads( False ).filter( ( model.Job.state == model.Job.states.RUNNING ) | ( model.Job.state == model.Job.states.QUEUED ) ): - if job.tool_id not in self.app.toolbox.tools_by_id: - log.warning( "Tool '%s' removed from tool config, unable to recover job: %s" % ( job.tool_id, job.id ) ) - JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator, or' ) - elif job.job_runner_name is None: - log.debug( "no runner: %s is still in queued state, adding to the jobs queue" %job.id ) - if self.track_jobs_in_database: - job.state = model.Job.states.NEW - else: - self.queue.put( ( job.id, job.tool_id ) ) - else: - job_wrapper = JobWrapper( job, self ) - self.dispatcher.recover( job, job_wrapper ) - if self.sa_session.dirty: - self.sa_session.flush() - - def __monitor( self ): - """ - Continually iterate the waiting jobs, checking is each is ready to - run and dispatching if so. - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.__monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def __monitor_step( self ): - """ - Called repeatedly by `monitor` to process waiting jobs. Gets any new - jobs (either from the database or from its own queue), then iterates - over all new and waiting jobs to check the state of the jobs each - depends on. If the job has dependencies that have not finished, it - it goes to the waiting queue. If the job has dependencies with errors, - it is marked as having errors and removed from the queue. Otherwise, - the job is dispatched. - """ - # Pull all new jobs from the queue at once - jobs_to_check = [] - if self.track_jobs_in_database: - # Clear the session so we get fresh states for job and all datasets - self.sa_session.expunge_all() - # Fetch all new jobs - jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( model.Job.state == model.Job.states.NEW ).all() - else: - # Get job objects and append to watch queue for any which were - # previously waiting - for job_id in self.waiting_jobs: - jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, tool_id = message - # Get the job object and append to watch queue - jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) - except Empty: - pass - # Iterate over new and waiting jobs and look for any that are - # ready to run - new_waiting_jobs = [] - for job in jobs_to_check: - try: - # Check the job's dependencies, requeue if they're not done - job_state = self.__check_if_ready_to_run( job ) - if job_state == JOB_WAIT: - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) - elif job_state == JOB_INPUT_ERROR: - log.info( "job %d unable to run: one or more inputs in error state" % job.id ) - elif job_state == JOB_INPUT_DELETED: - log.info( "job %d unable to run: one or more inputs deleted" % job.id ) - elif job_state == JOB_READY: - if self.job_lock: - log.info( "Job dispatch attempted for %s, but prevented by administrative lock." % job.id ) - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) - else: - self.dispatcher.put( JobWrapper( job, self ) ) - log.info( "job %d dispatched" % job.id ) - elif job_state == JOB_DELETED: - log.info( "job %d deleted by user while still queued" % job.id ) - elif job_state == JOB_ADMIN_DELETED: - log.info( "job %d deleted by admin while still queued" % job.id ) - else: - log.error( "unknown job state '%s' for job %d" % ( job_state, job.id ) ) - if not self.track_jobs_in_database: - new_waiting_jobs.append( job.id ) - except Exception: - log.exception( "failure running job %d" % job.id ) - # Update the waiting list - self.waiting_jobs = new_waiting_jobs - # Done with the session - self.sa_session.remove() - - def __check_if_ready_to_run( self, job ): - """ - Check if a job is ready to run by verifying that each of its input - datasets is ready (specifically in the OK state). If any input dataset - has an error, fail the job and return JOB_INPUT_ERROR. If any input - dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all - input datasets are in OK state, return JOB_READY indicating that the - job can be dispatched. Otherwise, return JOB_WAIT indicating that input - datasets are still being prepared. - """ - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - elif self.app.config.enable_quotas: - quota = self.app.quota_agent.get_quota( job.user ) - if quota is not None: - try: - usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) - if usage > quota: - return JOB_WAIT - except AssertionError, e: - pass # No history, should not happen with an anon user - for dataset_assoc in job.input_datasets + job.input_library_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state == idata.states.FAILED_METADATA: - JobWrapper( job, self ).fail( "input data %d failed to properly set metadata" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT - return self.__check_user_jobs( job ) - - def __check_user_jobs( self, job ): - if not self.app.config.user_job_limit: - return JOB_READY - if job.user: - count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( and_( model.Job.user_id == job.user.id, - or_( model.Job.state == model.Job.states.RUNNING, - model.Job.state == model.Job.states.QUEUED ) ) ).count() - elif job.galaxy_session: - count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( and_( model.Job.session_id == job.galaxy_session.id, - or_( model.Job.state == model.Job.states.RUNNING, - model.Job.state == model.Job.states.QUEUED ) ) ).count() - else: - log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) - return JOB_READY - if count >= self.app.config.user_job_limit: - return JOB_WAIT - return JOB_READY - - def put( self, job_id, tool ): - """Add a job to the queue (by job identifier)""" - if not self.track_jobs_in_database: - self.queue.put( ( job_id, tool.id ) ) - self.sleeper.wake() - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - if not self.track_jobs_in_database: - self.queue.put( self.STOP_SIGNAL ) - self.sleeper.wake() - log.info( "job queue stopped" ) - self.dispatcher.shutdown() - class JobWrapper( object ): """ Wraps a 'model.Job' with convenience methods for running processes and @@ -1177,179 +923,13 @@ # There is no metadata setting for tasks. This is handled after the merge, at the job level. return "" -class DefaultJobDispatcher( object ): - def __init__( self, app ): - self.app = app - self.job_runners = {} - start_job_runners = ["local"] - if app.config.start_job_runners is not None: - start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] ) - if app.config.use_tasked_jobs: - start_job_runners.append("tasks") - for name in start_job_runners: - self._load_plugin( name ) - - def _load_plugin( self, name ): - module_name = 'galaxy.jobs.runners.' + name - try: - module = __import__( module_name ) - except: - log.exception( 'Job runner is not loadable: %s' % module_name ) - return - for comp in module_name.split( "." )[1:]: - module = getattr( module, comp ) - if '__all__' not in dir( module ): - log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name ) - return - for obj in module.__all__: - display_name = ':'.join( ( module_name, obj ) ) - runner = getattr( module, obj ) - self.job_runners[name] = runner( self.app ) - log.debug( 'Loaded job runner: %s' % display_name ) - - def __get_runner_name( self, job_wrapper ): - if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): - runner_name = "tasks" - else: - runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0] - return runner_name - - def put( self, job_wrapper ): - try: - runner_name = self.__get_runner_name( job_wrapper ) - if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper): - #DBTODO Refactor - log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) - else: - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) - self.job_runners[runner_name].put( job_wrapper ) - except KeyError: - log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) - job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) - - def stop( self, job ): - runner_name = ( job.job_runner_name.split(":", 1) )[0] - log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) - try: - self.job_runners[runner_name].stop_job( job ) - except KeyError: - log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) - # Job and output dataset states have already been updated, so nothing is done here. - - def recover( self, job, job_wrapper ): - runner_name = ( job.job_runner_name.split(":", 1) )[0] - log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) - try: - self.job_runners[runner_name].recover( job, job_wrapper ) - except KeyError: - log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) - job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) - - def shutdown( self ): - for runner in self.job_runners.itervalues(): - runner.shutdown() - -class JobStopQueue( object ): - """ - A queue for jobs which need to be terminated prematurely. - """ - STOP_SIGNAL = object() - def __init__( self, app, dispatcher ): - self.app = app - self.sa_session = app.model.context - self.dispatcher = dispatcher - - self.track_jobs_in_database = app.config.get_bool( 'track_jobs_in_database', False ) - - # Keep track of the pid that started the job manager, only it - # has valid threads - self.parent_pid = os.getpid() - # Contains new jobs. Note this is not used if track_jobs_in_database is True - self.queue = Queue() - - # Contains jobs that are waiting (only use from monitor thread) - self.waiting = [] - - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.running = True - self.monitor_thread = threading.Thread( target=self.monitor ) - self.monitor_thread.start() - log.info( "job stopper started" ) - - def monitor( self ): - """ - Continually iterate the waiting jobs, stop any that are found. - """ - # HACK: Delay until after forking, we need a way to do post fork notification!!! - time.sleep( 10 ) - while self.running: - try: - self.monitor_step() - except: - log.exception( "Exception in monitor_step" ) - # Sleep - self.sleeper.sleep( 1 ) - - def monitor_step( self ): - """ - Called repeatedly by `monitor` to stop jobs. - """ - # Pull all new jobs from the queue at once - jobs_to_check = [] - if self.track_jobs_in_database: - # Clear the session so we get fresh states for job and all datasets - self.sa_session.expunge_all() - # Fetch all new jobs - newly_deleted_jobs = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( model.Job.state == model.Job.states.DELETED_NEW ).all() - for job in newly_deleted_jobs: - jobs_to_check.append( ( job, None ) ) - # Also pull from the queue (in the case of Administrative stopped jobs) - try: - while 1: - message = self.queue.get_nowait() - if message is self.STOP_SIGNAL: - return - # Unpack the message - job_id, error_msg = message - # Get the job object and append to watch queue - jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) ) - except Empty: - pass - for job, error_msg in jobs_to_check: - if error_msg is not None: - job.state = job.states.ERROR - job.info = error_msg - else: - job.state = job.states.DELETED - self.sa_session.add( job ) - self.sa_session.flush() - if job.job_runner_name is not None: - # tell the dispatcher to stop the job - self.dispatcher.stop( job ) - - def put( self, job_id, error_msg=None ): - self.queue.put( ( job_id, error_msg ) ) - - def shutdown( self ): - """Attempts to gracefully shut down the worker thread""" - if self.parent_pid != os.getpid(): - # We're not the real job queue, do nothing - return - else: - log.info( "sending stop signal to worker thread" ) - self.running = False - if not self.track_jobs_in_database: - self.queue.put( self.STOP_SIGNAL ) - self.sleeper.wake() - log.info( "job stopper stopped" ) - class NoopQueue( object ): """ Implements the JobQueue / JobStopQueue interface but does nothing """ def put( self, *args ): return + def put_stop( self, *args ): + return def shutdown( self ): return diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/handler.py --- /dev/null +++ b/lib/galaxy/jobs/handler.py @@ -0,0 +1,430 @@ +""" +Galaxy job handler, prepares, runs, tracks, and finishes Galaxy jobs +""" + +import os +import time +import logging +import threading +from Queue import Queue, Empty + +from sqlalchemy.sql.expression import and_, or_ + +from galaxy import util, model +from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper + +log = logging.getLogger( __name__ ) + +# States for running a job. These are NOT the same as data states +JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' + +class JobHandler( object ): + """ + Handle the preparation, running, tracking, and finishing of jobs + """ + def __init__( self, app ): + self.app = app + # The dispatcher launches the underlying job runners + self.dispatcher = DefaultJobDispatcher( app ) + # Queues for starting and stopping jobs + self.job_queue = JobHandlerQueue( app, self.dispatcher ) + self.job_stop_queue = JobHandlerStopQueue( app, self.dispatcher ) + def start( self ): + self.job_queue.start() + def shutdown( self ): + self.job_queue.shutdown() + self.job_stop_queue.shutdown() + +class JobHandlerQueue( object ): + """ + Job manager, waits for jobs to be runnable and then dispatches to + a JobRunner. + """ + STOP_SIGNAL = object() + def __init__( self, app, dispatcher ): + """Start the job manager""" + self.app = app + self.dispatcher = dispatcher + + self.sa_session = app.model.context + self.track_jobs_in_database = self.app.config.track_jobs_in_database + + # Keep track of the pid that started the job manager, only it + # has valid threads + self.parent_pid = os.getpid() + # Contains new jobs. Note this is not used if track_jobs_in_database is True + self.queue = Queue() + # Contains jobs that are waiting (only use from monitor thread) + ## This and jobs_to_check[] are closest to a "Job Queue" + self.waiting_jobs = [] + # Helper for interruptable sleep + self.sleeper = Sleeper() + self.running = True + self.monitor_thread = threading.Thread( target=self.__monitor ) + + def start( self ): + """ + The JobManager should start, and then start its Handler, if it has one. + """ + # Recover jobs at startup + self.__check_jobs_at_startup() + # Start the queue + self.monitor_thread.start() + log.info( "job handler queue started" ) + + def __check_jobs_at_startup( self ): + """ + Checks all jobs that are in the 'new', 'queued' or 'running' state in + the database and requeues or cleans up as necessary. Only run as the + job manager starts. + """ + for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( ( ( model.Job.state == model.Job.states.NEW ) \ + | ( model.Job.state == model.Job.states.RUNNING ) \ + | ( model.Job.state == model.Job.states.QUEUED ) ) \ + & ( model.Job.handler == self.app.config.server_name ) ): + if job.tool_id not in self.app.toolbox.tools_by_id: + log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) + JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) + elif job.job_runner_name is None: + log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) ) + if self.track_jobs_in_database: + job.state = model.Job.states.NEW + else: + self.queue.put( ( job.id, job.tool_id ) ) + else: + job_wrapper = JobWrapper( job, self ) + self.dispatcher.recover( job, job_wrapper ) + if self.sa_session.dirty: + self.sa_session.flush() + + def __monitor( self ): + """ + Continually iterate the waiting jobs, checking is each is ready to + run and dispatching if so. + """ + while self.running: + try: + self.__monitor_step() + except: + log.exception( "Exception in monitor_step" ) + # Sleep + self.sleeper.sleep( 1 ) + + def __monitor_step( self ): + """ + Called repeatedly by `monitor` to process waiting jobs. Gets any new + jobs (either from the database or from its own queue), then iterates + over all new and waiting jobs to check the state of the jobs each + depends on. If the job has dependencies that have not finished, it + it goes to the waiting queue. If the job has dependencies with errors, + it is marked as having errors and removed from the queue. Otherwise, + the job is dispatched. + """ + # Pull all new jobs from the queue at once + jobs_to_check = [] + if self.track_jobs_in_database: + # Clear the session so we get fresh states for job and all datasets + self.sa_session.expunge_all() + # Fetch all new jobs + jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( ( model.Job.state == model.Job.states.NEW ) \ + & ( model.Job.handler == self.app.config.server_name ) ).all() + else: + # Get job objects and append to watch queue for any which were + # previously waiting + for job_id in self.waiting_jobs: + jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) + try: + while 1: + message = self.queue.get_nowait() + if message is self.STOP_SIGNAL: + return + # Unpack the message + job_id, tool_id = message + # Get the job object and append to watch queue + jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) + except Empty: + pass + # Iterate over new and waiting jobs and look for any that are + # ready to run + new_waiting_jobs = [] + for job in jobs_to_check: + try: + # Check the job's dependencies, requeue if they're not done + job_state = self.__check_if_ready_to_run( job ) + if job_state == JOB_WAIT: + if not self.track_jobs_in_database: + new_waiting_jobs.append( job.id ) + elif job_state == JOB_INPUT_ERROR: + log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id ) + elif job_state == JOB_INPUT_DELETED: + log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id ) + elif job_state == JOB_READY: + self.dispatcher.put( JobWrapper( job, self ) ) + log.info( "(%d) Job dispatched" % job.id ) + elif job_state == JOB_DELETED: + log.info( "(%d) Job deleted by user while still queued" % job.id ) + elif job_state == JOB_ADMIN_DELETED: + log.info( "(%d) Job deleted by admin while still queued" % job.id ) + else: + log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) ) + if not self.track_jobs_in_database: + new_waiting_jobs.append( job.id ) + except Exception: + log.exception( "failure running job %d" % job.id ) + # Update the waiting list + self.waiting_jobs = new_waiting_jobs + # Done with the session + self.sa_session.remove() + + def __check_if_ready_to_run( self, job ): + """ + Check if a job is ready to run by verifying that each of its input + datasets is ready (specifically in the OK state). If any input dataset + has an error, fail the job and return JOB_INPUT_ERROR. If any input + dataset is deleted, fail the job and return JOB_INPUT_DELETED. If all + input datasets are in OK state, return JOB_READY indicating that the + job can be dispatched. Otherwise, return JOB_WAIT indicating that input + datasets are still being prepared. + """ + if job.state == model.Job.states.DELETED: + return JOB_DELETED + elif job.state == model.Job.states.ERROR: + return JOB_ADMIN_DELETED + elif self.app.config.enable_quotas: + quota = self.app.quota_agent.get_quota( job.user ) + if quota is not None: + try: + usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) + if usage > quota: + return JOB_WAIT + except AssertionError, e: + pass # No history, should not happen with an anon user + for dataset_assoc in job.input_datasets + job.input_library_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + JobWrapper( job, self ).fail( "input data %d (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + JobWrapper( job, self ).fail( "input data %d is in error state" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state == idata.states.FAILED_METADATA: + JobWrapper( job, self ).fail( "input data %d failed to properly set metadata" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): + # need to requeue + return JOB_WAIT + return self.__check_user_jobs( job ) + + def __check_user_jobs( self, job ): + if not self.app.config.user_job_limit: + return JOB_READY + if job.user: + count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( and_( model.Job.user_id == job.user.id, + or_( model.Job.state == model.Job.states.RUNNING, + model.Job.state == model.Job.states.QUEUED ) ) ).count() + elif job.galaxy_session: + count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( and_( model.Job.session_id == job.galaxy_session.id, + or_( model.Job.state == model.Job.states.RUNNING, + model.Job.state == model.Job.states.QUEUED ) ) ).count() + else: + log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) + return JOB_READY + if count >= self.app.config.user_job_limit: + return JOB_WAIT + return JOB_READY + + def put( self, job_id, tool_id ): + """Add a job to the queue (by job identifier)""" + if not self.track_jobs_in_database: + self.queue.put( ( job_id, tool_id ) ) + self.sleeper.wake() + + def shutdown( self ): + """Attempts to gracefully shut down the worker thread""" + if self.parent_pid != os.getpid(): + # We're not the real job queue, do nothing + return + else: + log.info( "sending stop signal to worker thread" ) + self.running = False + if not self.track_jobs_in_database: + self.queue.put( self.STOP_SIGNAL ) + self.sleeper.wake() + log.info( "job handler queue stopped" ) + self.dispatcher.shutdown() + +class JobHandlerStopQueue( object ): + """ + A queue for jobs which need to be terminated prematurely. + """ + STOP_SIGNAL = object() + def __init__( self, app, dispatcher ): + self.app = app + self.dispatcher = dispatcher + + self.sa_session = app.model.context + + # Keep track of the pid that started the job manager, only it + # has valid threads + self.parent_pid = os.getpid() + # Contains new jobs. Note this is not used if track_jobs_in_database is True + self.queue = Queue() + + # Contains jobs that are waiting (only use from monitor thread) + self.waiting = [] + + # Helper for interruptable sleep + self.sleeper = Sleeper() + self.running = True + self.monitor_thread = threading.Thread( target=self.monitor ) + self.monitor_thread.start() + log.info( "job handler stop queue started" ) + + def monitor( self ): + """ + Continually iterate the waiting jobs, stop any that are found. + """ + # HACK: Delay until after forking, we need a way to do post fork notification!!! + time.sleep( 10 ) + while self.running: + try: + self.monitor_step() + except: + log.exception( "Exception in monitor_step" ) + # Sleep + self.sleeper.sleep( 1 ) + + def monitor_step( self ): + """ + Called repeatedly by `monitor` to stop jobs. + """ + # Pull all new jobs from the queue at once + jobs_to_check = [] + if self.app.config.track_jobs_in_database: + # Clear the session so we get fresh states for job and all datasets + self.sa_session.expunge_all() + # Fetch all new jobs + newly_deleted_jobs = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( ( model.Job.state == model.Job.states.DELETED_NEW ) \ + & ( model.Job.handler == self.app.config.server_name ) ).all() + for job in newly_deleted_jobs: + jobs_to_check.append( ( job, None ) ) + # Also pull from the queue (in the case of Administrative stopped jobs) + try: + while 1: + message = self.queue.get_nowait() + if message is self.STOP_SIGNAL: + return + # Unpack the message + job_id, error_msg = message + # Get the job object and append to watch queue + jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) ) + except Empty: + pass + for job, error_msg in jobs_to_check: + if error_msg is not None: + job.state = job.states.ERROR + job.info = error_msg + else: + job.state = job.states.DELETED + self.sa_session.add( job ) + self.sa_session.flush() + if job.job_runner_name is not None: + # tell the dispatcher to stop the job + self.dispatcher.stop( job ) + + def put( self, job_id, error_msg=None ): + self.queue.put( ( job_id, error_msg ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the worker thread""" + if self.parent_pid != os.getpid(): + # We're not the real job queue, do nothing + return + else: + log.info( "sending stop signal to worker thread" ) + self.running = False + if not self.track_jobs_in_database: + self.queue.put( self.STOP_SIGNAL ) + self.sleeper.wake() + log.info( "job handler stop queue stopped" ) + +class DefaultJobDispatcher( object ): + def __init__( self, app ): + self.app = app + self.job_runners = {} + start_job_runners = ["local"] + if app.config.start_job_runners is not None: + start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] ) + if app.config.use_tasked_jobs: + start_job_runners.append("tasks") + for name in start_job_runners: + self._load_plugin( name ) + + def _load_plugin( self, name ): + module_name = 'galaxy.jobs.runners.' + name + try: + module = __import__( module_name ) + except: + log.exception( 'Job runner is not loadable: %s' % module_name ) + return + for comp in module_name.split( "." )[1:]: + module = getattr( module, comp ) + if '__all__' not in dir( module ): + log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name ) + return + for obj in module.__all__: + display_name = ':'.join( ( module_name, obj ) ) + runner = getattr( module, obj ) + self.job_runners[name] = runner( self.app ) + log.debug( 'Loaded job runner: %s' % display_name ) + + def __get_runner_name( self, job_wrapper ): + if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): + runner_name = "tasks" + else: + runner_name = ( job_wrapper.get_job_runner().split(":", 1) )[0] + return runner_name + + def put( self, job_wrapper ): + try: + runner_name = self.__get_runner_name( job_wrapper ) + if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper): + #DBTODO Refactor + log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) + else: + log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) + except KeyError: + log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) + + def stop( self, job ): + runner_name = ( job.job_runner_name.split(":", 1) )[0] + log.debug( "stopping job %d in %s runner" %( job.id, runner_name ) ) + try: + self.job_runners[runner_name].stop_job( job ) + except KeyError: + log.error( 'stop(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + # Job and output dataset states have already been updated, so nothing is done here. + + def recover( self, job, job_wrapper ): + runner_name = ( job.job_runner_name.split(":", 1) )[0] + log.debug( "recovering job %d in %s runner" %( job.id, runner_name ) ) + try: + self.job_runners[runner_name].recover( job, job_wrapper ) + except KeyError: + log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) + job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) + + def shutdown( self ): + for runner in self.job_runners.itervalues(): + runner.shutdown() diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/jobs/manager.py --- /dev/null +++ b/lib/galaxy/jobs/manager.py @@ -0,0 +1,270 @@ +""" +Top-level Galaxy job manager, moves jobs to handler(s) +""" + +import os +import time +import random +import logging +import threading +from Queue import Queue, Empty + +from sqlalchemy.sql.expression import and_, or_ + +from galaxy import model +from galaxy.jobs import handler, Sleeper, NoopQueue + +log = logging.getLogger( __name__ ) + +class JobManager( object ): + """ + Highest level interface to job management. + + TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. + This should be decoupled. + """ + def __init__( self, app ): + self.app = app + self.job_handler = NoopHandler() + if self.app.config.server_name in self.app.config.job_handlers: + self.job_handler = handler.JobHandler( app ) + if self.app.config.server_name == self.app.config.job_manager: + job_handler = NoopHandler() + # In the case that webapp == manager == handler, pass jobs in memory + if not self.app.config.track_jobs_in_database: + job_handler = self.job_handler + # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database + self.job_queue = JobManagerQueue( app, job_handler ) + self.job_stop_queue = JobManagerStopQueue( app, job_handler ) + if self.app.config.enable_beta_job_managers: + from galaxy.jobs.deferred import DeferredJobQueue + self.deferred_job_queue = DeferredJobQueue( app ) + else: + self.job_queue = self.job_stop_queue = NoopQueue() + self.job_handler.start() + def shutdown( self ): + self.job_queue.shutdown() + self.job_stop_queue.shutdown() + self.job_handler.shutdown() + +class JobManagerQueue( object ): + """ + Job manager, waits for jobs to be runnable and then dispatches to a + JobHandler. + """ + STOP_SIGNAL = object() + def __init__( self, app, job_handler ): + self.app = app + self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory + + self.sa_session = app.model.context + self.job_lock = False + # Keep track of the pid that started the job manager, only it + # has valid threads + self.parent_pid = os.getpid() + # Contains new jobs. Note this is not used if track_jobs_in_database is True + self.queue = Queue() + # Helper for interruptable sleep + self.sleeper = Sleeper() + self.running = True + self.monitor_thread = threading.Thread( target=self.__monitor ) + # Recover jobs at startup + self.__check_jobs_at_startup() + # Start the queue + self.monitor_thread.start() + log.info( "job manager queue started" ) + + def __check_jobs_at_startup( self ): + """ + Checks all jobs that are in the 'new', 'queued' or 'running' state in + the database and requeues or cleans up as necessary. Only run as the + job manager starts. + """ + for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( ( ( model.Job.state == model.Job.states.NEW ) \ + | ( model.Job.state == model.Job.states.RUNNING ) \ + | ( model.Job.state == model.Job.states.QUEUED ) ) \ + & ( model.Job.handler == None ) ): + if job.tool_id not in self.app.toolbox.tools_by_id: + log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) ) + JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' ) + else: + job.handler = self.__select_handler( job ) # handler's recovery method will take it from here + log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) ) + if self.sa_session.dirty: + self.sa_session.flush() + + def __monitor( self ): + """ + Continually iterate the waiting jobs and dispatch to a handler + """ + # HACK: Delay until after forking, we need a way to do post fork notification!!! + time.sleep( 10 ) + while self.running: + try: + self.__monitor_step() + except: + log.exception( "Exception in monitor_step" ) + # Sleep + self.sleeper.sleep( 1 ) + + def __monitor_step( self ): + """ + Called repeatedly by `monitor` to process waiting jobs. Gets any new + jobs (either from the database or from its own queue), then assigns a + handler. + """ + # Do nothing if the queue is locked + if self.job_lock: + log.info( 'Job queue is administratively locked, sleeping...' ) + time.sleep( 10 ) + return + # Pull all new jobs from the queue at once + jobs_to_check = [] + if self.app.config.track_jobs_in_database: + # Clear the session so we get fresh states for job and all datasets + self.sa_session.expunge_all() + # Fetch all new jobs + jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( ( model.Job.state == model.Job.states.NEW ) \ + & ( model.Job.handler == None ) ).all() + else: + # Get job objects and append to watch queue for any which were + # previously waiting + try: + while 1: + message = self.queue.get_nowait() + if message is self.STOP_SIGNAL: + return + # Unpack the message + job_id, tool_id = message + # Get the job object and append to watch queue + jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) + except Empty: + pass + + for job in jobs_to_check: + job.handler = self.__select_handler( job ) + log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) ) + self.sa_session.add( job ) + + # If tracking in the database, handlers will pick up the job now + self.sa_session.flush() + + time.sleep( 5 ) + + # This only does something in the case that there is only one handler and it is this Galaxy process + for job in jobs_to_check: + self.job_handler.job_queue.put( job.id, job.tool_id ) + + def __select_handler( self, job ): + # TODO: handler selection based on params, tool, etc. + return random.choice( self.app.config.job_handlers ) + + def put( self, job_id, tool ): + """Add a job to the queue (by job identifier)""" + if not self.app.config.track_jobs_in_database: + self.queue.put( ( job_id, tool.id ) ) + self.sleeper.wake() + + def shutdown( self ): + """Attempts to gracefully shut down the worker thread""" + if self.parent_pid != os.getpid(): + # We're not the real job queue, do nothing + return + else: + log.info( "sending stop signal to worker thread" ) + self.running = False + if not self.app.config.track_jobs_in_database: + self.queue.put( self.STOP_SIGNAL ) + self.sleeper.wake() + log.info( "job manager queue stopped" ) + self.dispatcher.shutdown() + +class JobManagerStopQueue( object ): + """ + A queue for jobs which need to be terminated prematurely. + """ + STOP_SIGNAL = object() + def __init__( self, app, job_handler ): + self.app = app + self.job_handler = job_handler + + self.sa_session = app.model.context + + # Keep track of the pid that started the job manager, only it + # has valid threads + self.parent_pid = os.getpid() + # Contains new jobs. Note this is not used if track_jobs_in_database is True + self.queue = Queue() + + # Contains jobs that are waiting (only use from monitor thread) + self.waiting = [] + + # Helper for interruptable sleep + self.sleeper = Sleeper() + self.running = True + self.monitor_thread = threading.Thread( target=self.monitor ) + self.monitor_thread.start() + log.info( "job manager stop queue started" ) + + def monitor( self ): + """ + Continually iterate the waiting jobs, stop any that are found. + """ + # HACK: Delay until after forking, we need a way to do post fork notification!!! + time.sleep( 10 ) + while self.running: + try: + self.monitor_step() + except: + log.exception( "Exception in monitor_step" ) + # Sleep + self.sleeper.sleep( 1 ) + + def monitor_step( self ): + """ + Called repeatedly by `monitor` to stop jobs. + """ + jobs_to_check = [] + # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs) + try: + while 1: + message = self.queue.get_nowait() + if message is self.STOP_SIGNAL: + return + # Unpack the message + job_id, error_msg = message + # Get the job object and append to watch queue + jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) ) + except Empty: + pass + + # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler. + for job, error_msg in jobs_to_check: + self.job_handler.job_stop_queue.put( job.id, error_msg ) + + def put( self, job_id, error_msg=None ): + self.queue.put( ( job_id, error_msg ) ) + + def shutdown( self ): + """Attempts to gracefully shut down the worker thread""" + if self.parent_pid != os.getpid(): + # We're not the real job queue, do nothing + return + else: + log.info( "sending stop signal to worker thread" ) + self.running = False + if not self.app.config.track_jobs_in_database: + self.queue.put( self.STOP_SIGNAL ) + self.sleeper.wake() + log.info( "job manager stop queue stopped" ) + +class NoopHandler( object ): + def __init__( self, *args, **kwargs ): + self.job_queue = NoopQueue() + self.job_stop_queue = NoopQueue() + def start( self ): + pass + def shutdown( self, *args ): + pass diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -132,6 +132,7 @@ self.job_runner_external_id = None self.post_job_actions = [] self.imported = False + self.handler = None def add_parameter( self, name, value ): self.parameters.append( JobParameter( name, value ) ) @@ -171,14 +172,11 @@ if not dataset.deleted: return False return True - def mark_deleted( self, enable_job_running=True, track_jobs_in_database=False ): + def mark_deleted( self, track_jobs_in_database=False ): """ Mark this job as deleted, and mark any output datasets as discarded. """ - # This could be handled with *just* track_jobs_in_database, but I - # didn't want to make setting track_jobs_in_database required in - # non-runner configs. - if not enable_job_running or track_jobs_in_database: + if track_jobs_in_database: self.state = Job.states.DELETED_NEW else: self.state = Job.states.DELETED diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -421,7 +421,8 @@ Column( "job_runner_external_id", String( 255 ) ), Column( "object_store_id", TrimmedString( 255 ), index=True ), Column( "imported", Boolean, default=False, index=True ), - Column( "params", TrimmedString(255), index=True ) ) + Column( "params", TrimmedString(255), index=True ), + Column( "handler", TrimmedString( 255 ), index=True ) ) JobParameter.table = Table( "job_parameter", metadata, Column( "id", Integer, primary_key=True ), diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/model/migrate/versions/0094_add_job_handler_col.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0094_add_job_handler_col.py @@ -0,0 +1,49 @@ +""" +Migration script to create "handler" column in job table. +""" + +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * + +import logging +log = logging.getLogger( __name__ ) + +# Need our custom types, but don't import anything else from model +from galaxy.model.custom_types import * + +metadata = MetaData( migrate_engine ) +db_session = scoped_session( sessionmaker( bind=migrate_engine, autoflush=False, autocommit=True ) ) + +# Column to add. +handler_col = Column( "handler", TrimmedString(255), index=True ) + +def display_migration_details(): + print "" + print "This migration script adds a 'handler' column to the Job table." + +def upgrade(): + print __doc__ + metadata.reflect() + + # Add column to Job table. + try: + Job_table = Table( "job", metadata, autoload=True ) + handler_col.create( Job_table ) + assert handler_col is Job_table.c.handler + + except Exception, e: + print str(e) + log.debug( "Adding column 'handler' to job table failed: %s" % str( e ) ) + +def downgrade(): + metadata.reflect() + + # Drop column from Job table. + try: + Job_table = Table( "job", metadata, autoload=True ) + handler_col = Job_table.c.handler + handler_col.drop() + except Exception, e: + log.debug( "Dropping column 'handler' from job table failed: %s" % ( str( e ) ) ) diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/web/base/controller.py --- a/lib/galaxy/web/base/controller.py +++ b/lib/galaxy/web/base/controller.py @@ -2381,8 +2381,8 @@ deleted = [] msg = None status = None - if not trans.app.config.get_bool( "enable_job_running", True ): - return trans.show_error_message( 'This Galaxy instance is not configured to run jobs. If using multiple servers, please directly access the job running instance to manage jobs.' ) + if not self.app.config.job_manager != self.app.config.server_name: + return trans.show_error_message( 'This Galaxy instance is not the job manager. If using multiple servers, please directly access the job manager instance to manage jobs.' ) job_ids = util.listify( stop ) if job_ids and stop_msg in [ None, '' ]: msg = 'Please enter an error message to display to the user describing why the job was terminated' diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 lib/galaxy/web/controllers/dataset.py --- a/lib/galaxy/web/controllers/dataset.py +++ b/lib/galaxy/web/controllers/dataset.py @@ -904,8 +904,7 @@ if job.state in [ self.app.model.Job.states.QUEUED, self.app.model.Job.states.RUNNING, self.app.model.Job.states.NEW ]: # Are *all* of the job's other output datasets deleted? if job.check_if_output_datasets_deleted(): - job.mark_deleted( self.app.config.get_bool( 'enable_job_running', True ), - self.app.config.get_bool( 'track_jobs_in_database', False ) ) + job.mark_deleted( self.app.config.track_jobs_in_database ) self.app.job_manager.job_stop_queue.put( job.id ) trans.sa_session.flush() except Exception, e: diff -r 528ca90fbd3e629bbe1eba92dc0faffb600a556f -r 4f6c38ca353861351e286016bbbddc126ca566c4 universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -542,14 +542,25 @@ # -- Job Execution -# If running multiple Galaxy processes, one can be designated as the job -# runner. For more information, see: -# http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Web%20Application%20Scali... -#enable_job_running = True +# To increase performance of job execution and the web interface, you can +# separate Galaxy into multiple processes. There are more than one way to do +# this, and they are explained in detail in the documentation: +# +# http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Web%20Application%20Scali... +# +# By default, Galaxy manages and executes jobs from within a single process and +# notifies itself of new jobs via in-memory queues. If you change job_manager +# and job_handlers from their default values, notification will instead be done +# using the `state` and `handler` columns of the job table in the database. -# Should jobs be tracked through the database, rather than in memory. -# Necessary if you're running the load balanced setup. -#track_jobs_in_database = False +# Identify the server_name (the string following server: at the top of this +# file) which should be designated as the job manager (only one): +#job_manager = main + +# Identify the server_name(s) which should be designated as job handlers +# (responsible for starting, tracking, finishing, and cleaning up jobs) as a +# comma-separated list. +#job_handlers = main # This enables splitting of jobs into tasks, if specified by the particular tool config. # This is a new feature and not recommended for production servers yet. Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket