1 new changeset in galaxy-central: http://bitbucket.org/galaxy/galaxy-central/changeset/ed6608267375/ changeset: ed6608267375 user: natefoo date: 2011-08-11 18:49:36 summary: Very basic "high water mark" per-user job limits. affected #: 2 files (1.5 KB) --- a/lib/galaxy/config.py Thu Aug 11 10:14:53 2011 -0400 +++ b/lib/galaxy/config.py Thu Aug 11 12:49:36 2011 -0400 @@ -86,9 +86,11 @@ self.external_service_type_path = resolve_path( kwargs.get( 'external_service_type_path', 'external_service_types' ), self.root ) # Tasked job runner. self.use_tasked_jobs = string_as_bool( kwargs.get( 'use_tasked_jobs', False ) ) + self.local_task_queue_workers = int(kwargs.get("local_task_queue_workers", 2)) # The transfer manager and deferred job queue self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) ) - self.local_task_queue_workers = int(kwargs.get("local_task_queue_workers", 2)) + # Per-user Job concurrency limitations + self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) ) self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) self.pbs_application_server = kwargs.get('pbs_application_server', "" ) self.pbs_dataset_server = kwargs.get('pbs_dataset_server', "" ) --- a/lib/galaxy/jobs/__init__.py Thu Aug 11 10:14:53 2011 -0400 +++ b/lib/galaxy/jobs/__init__.py Thu Aug 11 12:49:36 2011 -0400 @@ -11,6 +11,8 @@ from galaxy.util.expressions import ExpressionContext from galaxy.jobs.actions.post import ActionBox +from sqlalchemy.sql.expression import and_, or_ + import pkg_resources pkg_resources.require( "PasteDeploy" ) @@ -163,6 +165,10 @@ .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ .filter( model.Job.state == model.Job.states.NEW ).all() else: + # Get job objects and append to watch queue for any which were + # previously waiting + for job_id in self.waiting_jobs: + jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) try: while 1: message = self.queue.get_nowait() @@ -174,10 +180,6 @@ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) except Empty: pass - # Get job objects and append to watch queue for any which were - # previously waiting - for job_id in self.waiting_jobs: - jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting_jobs = [] @@ -256,6 +258,28 @@ elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): # need to requeue return JOB_WAIT + return self.__check_user_jobs( job ) + + def __check_user_jobs( self, job ): + if not self.app.config.user_job_limit: + return JOB_READY + if job.user: + user_jobs = self.sa_session.query( model.Job ) \ + .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ + .filter( and_( model.Job.user_id == job.user.id, + or_( model.Job.state == model.Job.states.RUNNING, + model.Job.state == model.Job.states.QUEUED ) ) ).all() + elif job.galaxy_session: + user_jobs = self.sa_session.query( model.Job ) \ + .options( lazyload( "external_output_metadata" ), lazyload( "parameters" ) ) \ + .filter( and_( model.Job.session_id == job.galaxy_session.id, + or_( model.Job.state == model.Job.states.RUNNING, + model.Job.state == model.Job.states.QUEUED ) ) ).all() + else: + log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id ) + return JOB_READY + if len( user_jobs ) >= self.app.config.user_job_limit: + return JOB_WAIT return JOB_READY def put( self, job_id, tool ): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.