2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/9937a1344fe7/ Changeset: 9937a1344fe7 User: natefoo Date: 2014-06-03 00:22:33 Summary: Rename the ambiguous concurrent_jobs limit to destination_user_concurrent jobs and add a destination_total_concurrent_jobs limit that prevents Galaxy from queueing *any* jobs at a destination or tagged group of destinations once the limit is reached. Affected #: 2 files diff -r e79b7ea727c90c2c7ccebdafcd19a5a6d880c51c -r 9937a1344fe789be6111d1af4b746b203a0dd785 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -211,16 +211,21 @@ walltime=None, walltime_delta=None, output_size=None, - concurrent_jobs={}) + destination_user_concurrent_jobs={}, + destination_total_concurrent_jobs={}) # Parse job limits limits = root.find('limits') if limits is not None: for limit in self.__findall_with_required(limits, 'limit', ('type',)): type = limit.get('type') - if type == 'concurrent_jobs': + # concurrent_jobs renamed to destination_user_concurrent_jobs in job_conf.xml + if type in ( 'destination_user_concurrent_jobs', 'concurrent_jobs', 'destination_total_concurrent_jobs' ): id = limit.get('tag', None) or limit.get('id') - self.limits.concurrent_jobs[id] = int(limit.text) + if type == 'destination_total_concurrent_jobs': + self.limits.destination_total_concurrent_jobs[id] = int(limit.text) + else: + self.limits.destination_user_concurrent_jobs[id] = int(limit.text) elif limit.text: self.limits.__dict__[type] = types.get(type, str)(limit.text) @@ -287,7 +292,8 @@ walltime=self.app.config.job_walltime, walltime_delta=self.app.config.job_walltime_delta, output_size=self.app.config.output_size_limit, - concurrent_jobs={}) + destination_user_concurrent_jobs={}, + destination_total_concurrent_jobs={}) log.debug('Done loading job configuration') diff -r e79b7ea727c90c2c7ccebdafcd19a5a6d880c51c -r 9937a1344fe789be6111d1af4b746b203a0dd785 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -57,7 +57,7 @@ self.track_jobs_in_database = self.app.config.track_jobs_in_database # Initialize structures for handling job limits - self.__clear_user_job_count() + self.__clear_job_count() # Keep track of the pid that started the job manager, only it # has valid threads @@ -232,7 +232,7 @@ except Empty: pass # Ensure that we get new job counts on each iteration - self.__clear_user_job_count() + self.__clear_job_count() # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting_jobs = [] @@ -330,7 +330,10 @@ self.job_wrappers[job.id].fail( failure_message ) return JOB_ERROR # job is ready to run, check limits - state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) + # TODO: these checks should be refactored to minimize duplication and made more modular/pluggable + state = self.__check_destination_jobs( job, self.job_wrappers[job.id] ) + if state == JOB_READY: + state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: @@ -340,11 +343,15 @@ return JOB_USER_OVER_QUOTA except AssertionError, e: pass # No history, should not happen with an anon user + if state == JOB_READY: + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id) return state - def __clear_user_job_count( self ): + def __clear_job_count( self ): self.user_job_count = None self.user_job_count_per_destination = None + self.total_job_count_per_destination = None def get_user_job_count(self, user_id): self.__cache_user_job_count() @@ -404,14 +411,21 @@ self.user_job_count_per_destination = {} def increase_running_job_count(self, user_id, destination_id): - if self.user_job_count is None: - self.user_job_count = {} - if self.user_job_count_per_destination is None: - self.user_job_count_per_destination = {} - self.user_job_count[user_id] = self.user_job_count.get(user_id, 0) + 1 - if user_id not in self.user_job_count_per_destination: - self.user_job_count_per_destination[user_id] = {} - self.user_job_count_per_destination[user_id][destination_id] = self.user_job_count_per_destination[user_id].get(destination_id, 0) + 1 + if self.app.job_config.limits.registered_user_concurrent_jobs or \ + self.app.job_config.limits.anonymous_user_concurrent_jobs or \ + self.app.job_config.limits.destination_user_concurrent_jobs: + if self.user_job_count is None: + self.user_job_count = {} + if self.user_job_count_per_destination is None: + self.user_job_count_per_destination = {} + self.user_job_count[user_id] = self.user_job_count.get(user_id, 0) + 1 + if user_id not in self.user_job_count_per_destination: + self.user_job_count_per_destination[user_id] = {} + self.user_job_count_per_destination[user_id][destination_id] = self.user_job_count_per_destination[user_id].get(destination_id, 0) + 1 + if self.app.job_config.limits.destination_total_concurrent_jobs: + if self.total_job_count_per_destination is None: + self.total_job_count_per_destination = {} + self.total_job_count_per_destination[destination_id] = self.total_job_count_per_destination.get(destination_id, 0) + 1 def __check_user_jobs( self, job, job_wrapper ): if job.user: @@ -424,25 +438,23 @@ # If we pass the hard limit, also check the per-destination count id = job_wrapper.job_destination.id count_per_id = self.get_user_job_count_per_destination(job.user_id) - if id in self.app.job_config.limits.concurrent_jobs: + if id in self.app.job_config.limits.destination_user_concurrent_jobs: count = count_per_id.get(id, 0) # Check the user's number of dispatched jobs in the assigned destination id against the limit for that id - if count >= self.app.job_config.limits.concurrent_jobs[id]: + if count >= self.app.job_config.limits.destination_user_concurrent_jobs[id]: return JOB_WAIT # If we pass the destination limit (if there is one), also check limits on any tags (if any) if job_wrapper.job_destination.tags: for tag in job_wrapper.job_destination.tags: # Check each tag for this job's destination - if tag in self.app.job_config.limits.concurrent_jobs: + if tag in self.app.job_config.limits.destination_user_concurrent_jobs: # Only if there's a limit defined for this tag count = 0 for id in [ d.id for d in self.app.job_config.get_destinations(tag) ]: # Add up the aggregate job total for this tag count += count_per_id.get(id, 0) - if count >= self.app.job_config.limits.concurrent_jobs[tag]: + if count >= self.app.job_config.limits.destination_user_concurrent_jobs[tag]: return JOB_WAIT - # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration - self.increase_running_job_count(job.user_id, id) elif job.galaxy_session: # Anonymous users only get the hard limit if self.app.job_config.limits.anonymous_user_concurrent_jobs: @@ -456,6 +468,46 @@ log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) return JOB_READY + def __cache_total_job_count_per_destination( self ): + # Cache the job count if necessary + if self.total_job_count_per_destination is None: + self.total_job_count_per_destination = {} + result = self.sa_session.execute(select([model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label('job_count')]) \ + .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)))) \ + .group_by(model.Job.table.c.destination_id)) + for row in result: + self.total_job_count_per_destination[row['destination_id']] = row['job_count'] + + def get_total_job_count_per_destination(self): + self.__cache_total_job_count_per_destination() + # Always use caching (at worst a job will have to wait one iteration, + # and this would be more fair anyway as it ensures FIFO scheduling, + # insofar as FIFO would be fair...) + return self.total_job_count_per_destination + + def __check_destination_jobs( self, job, job_wrapper ): + if self.app.job_config.limits.destination_total_concurrent_jobs: + id = job_wrapper.job_destination.id + count_per_id = self.get_total_job_count_per_destination() + if id in self.app.job_config.limits.destination_total_concurrent_jobs: + count = count_per_id.get(id, 0) + # Check the number of dispatched jobs in the assigned destination id against the limit for that id + if count >= self.app.job_config.limits.destination_total_concurrent_jobs[id]: + return JOB_WAIT + # If we pass the destination limit (if there is one), also check limits on any tags (if any) + if job_wrapper.job_destination.tags: + for tag in job_wrapper.job_destination.tags: + # Check each tag for this job's destination + if tag in self.app.job_config.limits.destination_total_concurrent_jobs: + # Only if there's a limit defined for this tag + count = 0 + for id in [ d.id for d in self.app.job_config.get_destinations(tag) ]: + # Add up the aggregate job total for this tag + count += count_per_id.get(id, 0) + if count >= self.app.job_config.limits.destination_total_concurrent_jobs[tag]: + return JOB_WAIT + return JOB_READY + def put( self, job_id, tool_id ): """Add a job to the queue (by job identifier)""" if not self.track_jobs_in_database: https://bitbucket.org/galaxy/galaxy-central/commits/46a487e7d547/ Changeset: 46a487e7d547 User: natefoo Date: 2014-06-03 00:23:07 Summary: Document limits available in job_conf.xml. Affected #: 1 file diff -r 9937a1344fe789be6111d1af4b746b203a0dd785 -r 46a487e7d5476429cb71885adcbcfb9746ce725a job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -253,12 +253,44 @@ <tool id="baz" handler="special_handlers" destination="bigmem"/></tools><limits> - <!-- Certain limits can be defined. --> + <!-- Certain limits can be defined. The 'concurrent_jobs' limits all + control the number of jobs that can be "active" at a time, that + is, dispatched to a runner and in the 'queued' or 'running' + states. + + A race condition exists that will allow destination_* concurrency + limits to be surpassed when multiple handlers are allowed to + handle jobs for the same destination. To prevent this, assign all + jobs for a specific destination to a single handler. + --> + <!-- registered_user_concurrent_jobs: + Limit on the number of jobs a user with a registered Galaxy + account can have active across all destinations. + --><limit type="registered_user_concurrent_jobs">2</limit> + <!-- anonymous_user_concurrent_jobs: + Likewise, but for unregistered/anonymous users. + --><limit type="anonymous_user_concurrent_jobs">1</limit> + <!-- destination_user_concurrent_jobs: + The number of jobs a user can have active in the specified + destination, or across all destinations identified by the + specified tag. (formerly: concurrent_jobs) + --> + <limit type="destination_user_concurrent_jobs" id="local">1</limit> + <limit type="destination_user_concurrent_jobs" tag="mycluster">2</limit> + <limit type="destination_user_concurrent_jobs" tag="longjobs">1</limit> + <!-- destination_total_concurrent_jobs: + The number of jobs that can be active in the specified + destination (or across all destinations identified by the + specified tag) by any/all users. + --> + <limit type="destination_total_concurrent_jobs" id="local">16</limit> + <limit type="destination_total_concurrent_jobs" tag="longjobs">100</limit> + <!-- walltime: + Amount of time a job can run (in any destination) before it + will be terminated by Galaxy. + --><limit type="walltime">24:00:00</limit> - <limit type="concurrent_jobs" id="local">1</limit> - <limit type="concurrent_jobs" tag="mycluster">2</limit> - <limit type="concurrent_jobs" tag="longjobs">1</limit></limits></job_conf> Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.