commit/galaxy-central: natefoo: Add more featureful job limiting and optimize the query for checking whether
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/73e05bc14cf1/ changeset: 73e05bc14cf1 user: natefoo date: 2012-11-13 21:00:55 summary: Add more featureful job limiting and optimize the query for checking whether jobs are ready to run. Input dependency checks are now performed via SQL rather than walking up the object chain. Limits on the number of jobs a user can run can now be set across the entire instance and a job runner URL. Quota checks at job runtime are only performed once, after limit checks. If a user is over quota, jobs are moved to a "paused" state. Once the user is under quota, jobs can be unpaused and continue to run (once this UI is added in another commit, shortly). This obviates the need for quota checks on every job, on every queue cycle. When a job's input dataset errors, the job (and all jobs dependent upon that job) are no longer errored. It will then be possible to remap a job to new inputs to allow execution to continue from the point of failure. Commit for that is also coming shortly. affected #: 7 files diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -124,6 +124,9 @@ self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) ) # Per-user Job concurrency limitations self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) ) + # user_job_limit for backwards-compatibility + self.registered_user_job_limit = int( kwargs.get( 'registered_user_job_limit', self.user_job_limit ) ) + self.anonymous_user_job_limit = int( kwargs.get( 'anonymous_user_job_limit', self.user_job_limit ) ) self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) self.pbs_application_server = kwargs.get('pbs_application_server', "" ) self.pbs_dataset_server = kwargs.get('pbs_dataset_server', "" ) @@ -216,6 +219,19 @@ self.job_manager = kwargs.get('job_manager', self.server_name).strip() self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ] self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ] + # parse the [galaxy:job_limits] section + self.job_limits = {} + try: + job_limits = global_conf_parser.items( 'galaxy:job_limits' ) + for k, v in job_limits: + # ConfigParser considers the first colon to be the delimiter, undo this behavior + more_k, v = v.split('=', 1) + k = '%s:%s' % (k, more_k.strip()) + v = v.strip().rsplit(None, 1) + v[1] = int(v[1]) + self.job_limits[k] = v + except ConfigParser.NoSectionError: + pass # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory) if self.track_jobs_in_database is None or self.track_jobs_in_database == "None": self.track_jobs_in_database = True diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -81,7 +81,7 @@ self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) - self.job_runner_mapper = JobRunnerMapper( self ) + self.job_runner_mapper = JobRunnerMapper( self, job.job_runner_name ) self.params = None if job.params: self.params = from_json_string( job.params ) diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -8,7 +8,7 @@ import threading from Queue import Queue, Empty -from sqlalchemy.sql.expression import and_, or_ +from sqlalchemy.sql.expression import and_, or_, select, func from galaxy import util, model from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper @@ -16,7 +16,7 @@ log = logging.getLogger( __name__ ) # States for running a job. These are NOT the same as data states -JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted' +JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED, JOB_USER_OVER_QUOTA = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted', 'user_over_quota' class JobHandler( object ): """ @@ -126,9 +126,32 @@ # Clear the session so we get fresh states for job and all datasets self.sa_session.expunge_all() # Fetch all new jobs - jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( ( model.Job.state == model.Job.states.NEW ) \ - & ( model.Job.handler == self.app.config.server_name ) ).all() + hda_not_ready = self.sa_session.query(model.Job.id).enable_eagerloads(False) \ + .join(model.JobToInputDatasetAssociation) \ + .join(model.HistoryDatasetAssociation) \ + .join(model.Dataset) \ + .filter(and_((model.Job.state == model.Job.states.NEW), + or_((model.HistoryDatasetAssociation._state != None), + (model.HistoryDatasetAssociation.deleted == True ), + (model.Dataset.state != model.Dataset.states.OK ), + (model.Dataset.deleted == True)))).subquery() + ldda_not_ready = self.sa_session.query(model.Job.id).enable_eagerloads(False) \ + .join(model.JobToInputLibraryDatasetAssociation) \ + .join(model.LibraryDatasetDatasetAssociation) \ + .join(model.Dataset) \ + .filter(and_((model.Job.state == model.Job.states.NEW), + or_((model.LibraryDatasetDatasetAssociation._state != None), + (model.LibraryDatasetDatasetAssociation.deleted == True), + (model.Dataset.state != model.Dataset.states.OK), + (model.Dataset.deleted == True)))).subquery() + jobs_to_check = self.sa_session.query(model.Job).enable_eagerloads(False) \ + .filter(and_((model.Job.state == model.Job.states.NEW), + (model.Job.handler == self.app.config.server_name), + ~model.Job.table.c.id.in_(hda_not_ready), + ~model.Job.table.c.id.in_(ldda_not_ready))) \ + .order_by(model.Job.id).all() + # Ensure that we get new job counts on each iteration + self.__clear_user_job_count() else: # Get job objects and append to watch queue for any which were # previously waiting @@ -150,7 +173,8 @@ new_waiting_jobs = [] for job in jobs_to_check: try: - # Check the job's dependencies, requeue if they're not done + # Check the job's dependencies, requeue if they're not done. + # Some of these states will only happen when using the in-memory job queue job_state = self.__check_if_ready_to_run( job ) if job_state == JOB_WAIT: if not self.track_jobs_in_database: @@ -166,6 +190,10 @@ log.info( "(%d) Job deleted by user while still queued" % job.id ) elif job_state == JOB_ADMIN_DELETED: log.info( "(%d) Job deleted by admin while still queued" % job.id ) + elif job_state == JOB_USER_OVER_QUOTA: + log.info( "(%d) User (%s) is over quota: job paused" % ( job.id, job.user_id ) ) + job.state = model.Job.states.PAUSED + self.sa_session.add( job ) else: log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) ) if not self.track_jobs_in_database: @@ -174,6 +202,8 @@ log.exception( "failure running job %d" % job.id ) # Update the waiting list self.waiting_jobs = new_waiting_jobs + # Flush, if we updated the state + self.sa_session.flush() # Done with the session self.sa_session.remove() @@ -187,57 +217,88 @@ job can be dispatched. Otherwise, return JOB_WAIT indicating that input datasets are still being prepared. """ - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - elif self.app.config.enable_quotas: + # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK + if not self.track_jobs_in_database: + if job.state == model.Job.states.DELETED: + return JOB_DELETED + elif job.state == model.Job.states.ERROR: + return JOB_ADMIN_DELETED + for dataset_assoc in job.input_datasets + job.input_library_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + JobWrapper( job, self ).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + JobWrapper( job, self ).fail( "input data %s is in error state" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state == idata.states.FAILED_METADATA: + JobWrapper( job, self ).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): + # need to requeue + return JOB_WAIT + state = self.__check_user_jobs( job ) + if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: try: usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) if usage > quota: - return JOB_WAIT + return JOB_USER_OVER_QUOTA except AssertionError, e: pass # No history, should not happen with an anon user - for dataset_assoc in job.input_datasets + job.input_library_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - JobWrapper( job, self ).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - JobWrapper( job, self ).fail( "input data %s is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state == idata.states.FAILED_METADATA: - JobWrapper( job, self ).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT - return self.__check_user_jobs( job ) + return state + + def __clear_user_job_count( self ): + self.user_job_count = {} + self.user_job_count_per_runner = {} def __check_user_jobs( self, job ): - if not self.app.config.user_job_limit: - return JOB_READY if job.user: - count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( and_( model.Job.user_id == job.user.id, - or_( model.Job.state == model.Job.states.RUNNING, - model.Job.state == model.Job.states.QUEUED ) ) ).count() + # Check the hard limit first + if self.app.config.registered_user_job_limit: + # Cache the job count if necessary + if not self.user_job_count: + query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \ + .group_by(model.Job.table.c.user_id)) + for row in query: + self.user_job_count[row[0]] = row[1] + if self.user_job_count.get(job.user_id, 0) >= self.app.config.registered_user_job_limit: + return JOB_WAIT + # If we pass the hard limit, also check the per-runner count + if job.job_runner_name in self.app.config.job_limits: + # Cache the job count if necessary + if job.job_runner_name not in self.user_job_count_per_runner: + self.user_job_count_per_runner[job.job_runner_name] = {} + query_url, limit = self.app.config.job_limits[job.job_runner_name] + base_query = select([model.Job.table.c.user_id, model.Job.table.c.job_runner_name, func.count(model.Job.table.c.user_id).label('job_count')]) \ + .where(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))) \ + .group_by(model.Job.table.c.user_id, model.Job.table.c.job_runner_name) + if '%' in query_url or '_' in query_url: + subq = base_query.having(model.Job.table.c.job_runner_name.like(query_url)).alias('subq') + query = self.sa_session.execute(select([subq.c.user_id, func.sum(subq.c.job_count).label('job_count')]).group_by(subq.c.user_id)) + else: + query = self.sa_session.execute(base_query.having(model.Job.table.c.job_runner_name == query_url)) + for row in query: + self.user_job_count_per_runner[job.job_runner_name][row['user_id']] = row['job_count'] + if self.user_job_count_per_runner[job.job_runner_name].get(job.user_id, 0) >= self.app.config.job_limits[job.job_runner_name][1]: + return JOB_WAIT elif job.galaxy_session: - count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ - .filter( and_( model.Job.session_id == job.galaxy_session.id, - or_( model.Job.state == model.Job.states.RUNNING, - model.Job.state == model.Job.states.QUEUED ) ) ).count() + # Anonymous users only get the hard limit + if self.app.config.anonymous_user_job_limit: + count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \ + .filter( and_( model.Job.session_id == job.galaxy_session.id, + or_( model.Job.state == model.Job.states.RUNNING, + model.Job.state == model.Job.states.QUEUED ) ) ).count() + if count >= self.app.config.anonymous_user_job_limit: + return JOB_WAIT else: log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) - return JOB_READY - if count >= self.app.config.user_job_limit: - return JOB_WAIT return JOB_READY def put( self, job_id, tool_id ): diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -146,6 +146,7 @@ for job in jobs_to_check: job.handler = self.__get_handler( job ) + job.job_runner_name = self.__get_runner_url( job ) log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) ) self.sa_session.add( job ) @@ -168,6 +169,14 @@ log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) ) return random.choice( self.app.config.job_handlers ) + def __get_runner_url( self, job ): + """This fetches the raw runner URL, and does not perform any computation e.g. for the dynamic runner""" + try: + return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_runner_url( job.params ) + except Exception, e: + log.warning( 'Unable to determine job runner URL for job %s: %s' % (job.id, str(e)) ) + return None + def put( self, job_id, tool ): """Add a job to the queue (by job identifier)""" if not self.app.config.track_jobs_in_database: diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -14,8 +14,9 @@ (in the form of job_wrappers) to job runner url strings. """ - def __init__( self, job_wrapper ): + def __init__( self, job_wrapper, job_runner_name=None ): self.job_wrapper = job_wrapper + self.job_runner_name = job_runner_name self.rule_modules = self.__get_rule_modules( ) def __get_rule_modules( self ): @@ -114,7 +115,11 @@ raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type ) def __cache_job_runner_url( self, params ): - raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params ) + # If there's already a runner set in the Job object, don't overwrite from the tool + if self.job_runner_name is not None: + raw_job_runner_url = self.job_runner_name + else: + raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params ) if raw_job_runner_url.startswith( DYNAMIC_RUNNER_PREFIX ): job_runner_url = self.__expand_dynamic_job_runner_url( raw_job_runner_url[ len( DYNAMIC_RUNNER_PREFIX ) : ] ) else: diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -112,6 +112,7 @@ RUNNING = 'running', OK = 'ok', ERROR = 'error', + PAUSED = 'paused', DELETED = 'deleted', DELETED_NEW = 'deleted_new' ) # Please include an accessor (get/set pair) for any new columns/members. diff -r ed0738c6001654d5456dd36579b278cd10fcd00c -r 73e05bc14cf1478b5ff9d8e8fffdf28d701dd2cb universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -659,12 +659,6 @@ # bytes). 0 for no limit. #output_size_limit = 0 -# Jobs can be held back from submission to a runner if a user already has more -# jobs queued or running than the number specified below. This prevents a -# single user from stuffing the queue and preventing other users from being -# able to run jobs. -#user_job_limit = None - # Clustering Galaxy is not a straightforward process and requires some # pre-configuration. See the the wiki before attempting to set any of these # options: @@ -717,6 +711,36 @@ # Details" option in the history. Administrators can always see this. #expose_dataset_path = False +# -- Job Limiting + +# A hard limit on the total number of jobs a user can have running across all +# configured job destinations can be configured here. +#registered_user_job_limit = None +#anonymous_user_job_limit = None + +# Additionally, jobs can be limited based on runner URLs (or matching of runner +# URLs). Matching is via SQL's 'LIKE' operator, so the wildcard characters are +# '_' and '%' (regex is not supported). Since the job runner code often +# rewrites the URL once the job has been submitted to the cluster, you will +# need to define how to match the runner URL stored in the database. When in +# doubt, you can run a job and then examine the stored value of +# 'job_runner_name' in the 'job' table of the database to see what you'll need +# to match. +# +# For example, if default_cluster_job_runner is set to pbs:/// and the default +# Torque cluster happens to be pbs.example.org, the job_runner_name is likely +# to be stored as 'pbs://pbs.example.org/'. To limit the number of jobs a user +# can run on this cluster to 4, use the following: +# +# pbs:/// = pbs://pbs.example.org/ 4 +# +# An example that uses matching (if, for example, your runner URL contains +# native options): +# +# drmaa:/// = drmaa://sge.example.org/% 4 + +[galaxy:job_limits] + # ---- Per-Tool Job Management ---------------------------------------------- # Per-tool job handler and runner overrides. Parameters can be included to define multiple Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket