1 new changeset in galaxy-central: http://bitbucket.org/galaxy/galaxy-central/changeset/3f926d934d98/ changeset: 3f926d934d98 user: dannon date: 2011-09-22 17:36:11 summary: Fix use_tasked_jobs to inherit the parent job's user. affected #: 1 file (-1 bytes) --- a/lib/galaxy/jobs/__init__.py Wed Sep 21 16:50:18 2011 -0400 +++ b/lib/galaxy/jobs/__init__.py Thu Sep 22 11:36:11 2011 -0400 @@ -32,9 +32,9 @@ class JobManager( object ): """ Highest level interface to job management. - + TODO: Currently the app accesses "job_queue" and "job_stop_queue" directly. - This should be decoupled. + This should be decoupled. """ def __init__( self, app ): self.app = app @@ -71,7 +71,7 @@ class JobQueue( object ): """ - Job manager, waits for jobs to be runnable and then dispatches to + Job manager, waits for jobs to be runnable and then dispatches to a JobRunner. """ STOP_SIGNAL = object() @@ -95,7 +95,7 @@ self.running = True self.dispatcher = dispatcher self.monitor_thread = threading.Thread( target=self.__monitor ) - self.monitor_thread.start() + self.monitor_thread.start() log.info( "job manager started" ) if app.config.get_bool( 'enable_job_recovery', True ): self.__check_jobs_at_startup() @@ -132,7 +132,7 @@ def __monitor( self ): """ - Continually iterate the waiting jobs, checking is each is ready to + Continually iterate the waiting jobs, checking is each is ready to run and dispatching if so. """ # HACK: Delay until after forking, we need a way to do post fork notification!!! @@ -180,12 +180,12 @@ jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) ) except Empty: pass - # Iterate over new and waiting jobs and look for any that are + # Iterate over new and waiting jobs and look for any that are # ready to run new_waiting_jobs = [] for job in jobs_to_check: try: - # Check the job's dependencies, requeue if they're not done + # Check the job's dependencies, requeue if they're not done job_state = self.__check_if_ready_to_run( job ) if job_state == JOB_WAIT: if not self.track_jobs_in_database: @@ -216,7 +216,7 @@ self.waiting_jobs = new_waiting_jobs # Done with the session self.sa_session.remove() - + def __check_if_ready_to_run( self, job ): """ Check if a job is ready to run by verifying that each of its input @@ -281,13 +281,13 @@ if len( user_jobs ) >= self.app.config.user_job_limit: return JOB_WAIT return JOB_READY - + def put( self, job_id, tool ): """Add a job to the queue (by job identifier)""" if not self.track_jobs_in_database: self.queue.put( ( job_id, tool.id ) ) self.sleeper.wake() - + def shutdown( self ): """Attempts to gracefully shut down the worker thread""" if self.parent_pid != os.getpid(): @@ -304,7 +304,7 @@ class JobWrapper( object ): """ - Wraps a 'model.Job' with convenience methods for running processes and + Wraps a 'model.Job' with convenience methods for running processes and state management. """ def __init__( self, job, queue ): @@ -329,15 +329,15 @@ self.output_paths = None self.tool_provided_job_metadata = None # Wrapper holding the info required to restore and clean up from files used for setting metadata externally - self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) - + self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job ) + def get_job( self ): return self.sa_session.query( model.Job ).get( self.job_id ) - + def get_id_tag(self): # For compatability with drmaa, which uses job_id right now, and TaskWrapper return str(self.job_id) - + def get_param_dict( self ): """ Restore the dictionary of parameters from the database. @@ -346,10 +346,10 @@ param_dict = dict( [ ( p.name, p.value ) for p in job.parameters ] ) param_dict = self.tool.params_from_strings( param_dict, self.app ) return param_dict - + def get_version_string_path( self ): return os.path.abspath(os.path.join(self.app.config.new_file_path, "GALAXY_VERSION_STRING_%s" % self.job_id)) - + def prepare( self ): """ Prepare the job to run by creating the working directory and the @@ -371,9 +371,9 @@ out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] ) out_data.update( [ ( da.name, da.dataset ) for da in job.output_library_datasets ] ) - - # Set up output dataset association for export history jobs. Because job - # uses a Dataset rather than an HDA or LDA, it's necessary to set up a + + # Set up output dataset association for export history jobs. Because job + # uses a Dataset rather than an HDA or LDA, it's necessary to set up a # fake dataset association that provides the needed attributes for # preparing a job. class FakeDatasetAssociation ( object ): @@ -400,7 +400,7 @@ # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) # Run the before queue ("exec_before_job") hook - self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, + self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, out_data=out_data, tool=self.tool, param_dict=incoming) self.sa_session.flush() # Build any required config files @@ -433,7 +433,7 @@ def fail( self, message, exception=False ): """ - Indicate job failure by setting state and message on all output + Indicate job failure by setting state and message on all output datasets. """ job = self.get_job() @@ -479,7 +479,7 @@ if self.tool: self.tool.job_failed( self, message, exception ) self.cleanup() - + def change_state( self, state, info = False ): job = self.get_job() self.sa_session.refresh( job ) @@ -509,12 +509,12 @@ job.job_runner_external_id = external_id self.sa_session.add( job ) self.sa_session.flush() - + def finish( self, stdout, stderr ): """ - Called to indicate that the associated command has been run. Updates + Called to indicate that the associated command has been run. Updates the output datasets based on stderr and stdout from the command, and - the contents of the output files. + the contents of the output files. """ # default post job setup self.sa_session.expunge_all() @@ -536,7 +536,7 @@ if os.path.exists(version_filename): self.version_string = open(version_filename).read() os.unlink(version_filename) - + if self.app.config.outputs_to_working_directory: for dataset_path in self.get_output_fnames(): try: @@ -584,7 +584,7 @@ else: # Security violation. log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, self.working_directory ) ) - + dataset.blurb = 'done' dataset.peek = 'no peek' dataset.info = context['stdout'] + context['stderr'] @@ -599,7 +599,7 @@ dataset.init_meta( copy_from=dataset ) #if a dataset was copied, it won't appear in our dictionary: #either use the metadata from originating output dataset, or call set_meta on the copies - #it would be quicker to just copy the metadata from the originating output dataset, + #it would be quicker to just copy the metadata from the originating output dataset, #but somewhat trickier (need to recurse up the copied_from tree), for now we'll call set_meta() if not self.app.config.set_metadata_externally or \ ( not self.external_output_metadata.external_metadata_set_successfully( dataset, self.sa_session ) \ @@ -611,7 +611,7 @@ #load metadata from file #we need to no longer allow metadata to be edited while the job is still running, #since if it is edited, the metadata changed on the running output will no longer match - #the metadata that was stored to disk for use via the external process, + #the metadata that was stored to disk for use via the external process, #and the changes made by the user will be lost, without warning or notice dataset.metadata.from_JSON_dict( self.external_output_metadata.get_output_filenames_by_dataset( dataset, self.sa_session ).filename_out ) try: @@ -652,13 +652,13 @@ # Flush all the dataset and job changes above. Dataset state changes # will now be seen by the user. self.sa_session.flush() - # Save stdout and stderr + # Save stdout and stderr if len( stdout ) > 32768: log.error( "stdout for job %d is greater than 32K, only first part will be logged to database" % job.id ) job.stdout = stdout[:32768] if len( stderr ) > 32768: log.error( "stderr for job %d is greater than 32K, only first part will be logged to database" % job.id ) - job.stderr = stderr[:32768] + job.stderr = stderr[:32768] # custom post process setup inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] ) out_data = dict( [ ( da.name, da.dataset ) for da in job.output_datasets ] ) @@ -675,8 +675,8 @@ # ( this used to be performed in the "exec_after_process" hook, but hooks are deprecated ). self.tool.exec_after_process( self.queue.app, inp_data, out_data, param_dict, job = job ) # Call 'exec_after_process' hook - self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, - out_data=out_data, param_dict=param_dict, + self.tool.call_hook( 'exec_after_process', self.queue.app, inp_data=inp_data, + out_data=out_data, param_dict=param_dict, tool=self.tool, stdout=stdout, stderr=stderr ) job.command_line = self.command_line @@ -695,7 +695,7 @@ self.sa_session.flush() log.debug( 'job %d ended' % self.job_id ) self.cleanup() - + def cleanup( self ): # remove temporary files try: @@ -709,10 +709,10 @@ galaxy.tools.imp_exp.JobImportHistoryArchiveWrapper( self.job_id ).cleanup_after_job( self.sa_session ) except: log.exception( "Unable to cleanup job %d" % self.job_id ) - + def get_command_line( self ): return self.command_line - + def get_session_id( self ): return self.session_id @@ -865,7 +865,7 @@ Should be refactored into a generalized executable unit wrapper parent, then jobs and tasks. """ # Abstract this to be more useful for running tasks that *don't* necessarily compose a job. - + def __init__(self, task, queue): super(TaskWrapper, self).__init__(task.job, queue) self.task_id = task.id @@ -939,7 +939,7 @@ # ( this used to be performed in the "exec_before_job" hook, but hooks are deprecated ). self.tool.exec_before_job( self.queue.app, inp_data, out_data, param_dict ) # Run the before queue ("exec_before_job") hook - self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, + self.tool.call_hook( 'exec_before_job', self.queue.app, inp_data=inp_data, out_data=out_data, tool=self.tool, param_dict=incoming) self.sa_session.flush() # Build any required config files @@ -986,12 +986,12 @@ task.state = state self.sa_session.add( task ) self.sa_session.flush() - + def get_state( self ): task = self.get_task() self.sa_session.refresh( task ) return task.state - + def set_runner( self, runner_url, external_id ): task = self.get_task() self.sa_session.refresh( task ) @@ -1000,15 +1000,15 @@ # DBTODO Check task job_runner_stuff self.sa_session.add( task ) self.sa_session.flush() - + def finish( self, stdout, stderr ): # DBTODO integrate previous finish logic. # Simple finish for tasks. Just set the flag OK. log.debug( 'task %s for job %d ended' % (self.task_id, self.job_id) ) """ - Called to indicate that the associated command has been run. Updates + Called to indicate that the associated command has been run. Updates the output datasets based on stderr and stdout from the command, and - the contents of the output files. + the contents of the output files. """ # default post job setup_external_metadata self.sa_session.expunge_all() @@ -1025,7 +1025,7 @@ task.state = task.states.ERROR else: task.state = task.states.OK - # Save stdout and stderr + # Save stdout and stderr if len( stdout ) > 32768: log.error( "stdout for task %d is greater than 32K, only first part will be logged to database" % task.id ) task.stdout = stdout[:32768] @@ -1039,7 +1039,7 @@ def cleanup( self ): # There is no task cleanup. The job cleans up for all tasks. pass - + def get_command_line( self ): return self.command_line @@ -1049,7 +1049,7 @@ def get_output_file_id( self, file ): # There is no permanent output file for tasks. return None - + def get_tool_provided_job_metadata( self ): # DBTODO Handle this as applicable for tasks. return None @@ -1071,10 +1071,6 @@ def setup_external_metadata( self, exec_dir = None, tmp_dir = None, dataset_files_path = None, config_root = None, datatypes_config = None, set_extension = True, **kwds ): # There is no metadata setting for tasks. This is handled after the merge, at the job level. return "" - - @property - def user( self ): - pass class DefaultJobDispatcher( object ): def __init__( self, app ): @@ -1105,13 +1101,19 @@ runner = getattr( module, obj ) self.job_runners[name] = runner( self.app ) log.debug( 'Loaded job runner: %s' % display_name ) - + def put( self, job_wrapper ): try: - if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): - runner_name = "tasks" - log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) - self.job_runners[runner_name].put( job_wrapper ) + if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None: + if isinstance(job_wrapper, TaskWrapper): + #DBTODO Refactor + runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] + log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) + else: + runner_name = "tasks" + log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) + self.job_runners[runner_name].put( job_wrapper ) else: runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) @@ -1167,7 +1169,7 @@ self.sleeper = Sleeper() self.running = True self.monitor_thread = threading.Thread( target=self.monitor ) - self.monitor_thread.start() + self.monitor_thread.start() log.info( "job stopper started" ) def monitor( self ): Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.