1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/ed01365064b2/ changeset: ed01365064b2 user: dannon date: 2012-07-21 20:46:27 summary: Task runner cleanup. affected #: 1 file diff -r 4c22a2fbf25a8ee05d870dcebe9f7b729fa22f07 -r ed01365064b2644531c783f01a12b28384fd64ad lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -59,26 +59,22 @@ # If we were able to get a command line, run the job. ( must be passed to tasks ) if command_line: try: - # DBTODO read tool info and use the right kind of parallelism. - # For now, the only splitter is the 'basic' one job_wrapper.change_state( model.Job.states.RUNNING ) self.sa_session.flush() # Split with the tool-defined method. try: splitter = getattr(__import__('galaxy.jobs.splitters', globals(), locals(), [job_wrapper.tool.parallelism.method]), job_wrapper.tool.parallelism.method) - except: + except: job_wrapper.change_state( model.Job.states.ERROR ) job_wrapper.fail("Job Splitting Failed, no match for '%s'" % job_wrapper.tool.parallelism) return tasks = splitter.do_split(job_wrapper) - # Not an option for now. Task objects don't *do* anything useful yet, but we'll want them tracked outside this thread to do anything. # if track_tasks_in_database: task_wrappers = [] for task in tasks: self.sa_session.add(task) self.sa_session.flush() - # Must flush prior to the creation and queueing of task wrappers. for task in tasks: tw = TaskWrapper(task, job_wrapper.queue) @@ -108,14 +104,9 @@ sleep( sleep_time ) if sleep_time < 8: sleep_time *= 2 - - import time - job_wrapper.reclaim_ownership() # if running as the actual user, change ownership before merging. - log.debug('execution finished - beginning merge: %s' % command_line) stdout, stderr = splitter.do_merge(job_wrapper, task_wrappers) - except Exception: job_wrapper.fail( "failure running job", exception=True ) log.exception("failure running job %d" % job_wrapper.job_id) @@ -129,15 +120,15 @@ set_extension = True, kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) ) - external_metadata_proc = subprocess.Popen( args = external_metadata_script, - shell = True, + external_metadata_proc = subprocess.Popen( args = external_metadata_script, + shell = True, env = os.environ, preexec_fn = os.setpgrp ) job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session ) external_metadata_proc.wait() log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id ) - - # Finish the job + + # Finish the job try: job_wrapper.finish( stdout, stderr ) except: @@ -149,7 +140,7 @@ # Change to queued state before handing to worker thread so the runner won't pick it up again job_wrapper.change_state( model.Job.states.QUEUED ) self.queue.put( job_wrapper ) - + def shutdown( self ): """Attempts to gracefully shut down the worker threads""" log.info( "sending stop signal to worker threads" ) @@ -175,14 +166,13 @@ # runner because the task runner also starts all the tasks. # First, get the list of tasks from job.tasks, which uses SQL # alchemy to retrieve a job's list of tasks. - tasks = job.tasks if ( len( job.tasks ) > 0 ): for task in job.tasks: self.stop_pid( task.task_runner_external_id, job.id ) # There were no subtasks, so just kill the job. We'll touch # this if the tasks runner is used but the tool does not use - # parallelism. + # parallelism. else: #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished if job.external_output_metadata: @@ -197,7 +187,7 @@ def stop_pid( self, pid, job_id ): """ This method stops the given process id whether it's a task or job. - It is meant to be a private helper method, but it is mostly reusable. + It is meant to be a private helper method, but it is mostly reusable. The first argument is the process id to stop, and the second id is the job's id (which is used for logging messages only right now). """ @@ -210,12 +200,12 @@ try: os.killpg( pid, sig ) except OSError, e: - # This warning could be bogus; many tasks are stopped with - # SIGTERM (signal 15), but ymmv depending on the platform. + # This warning could be bogus; many tasks are stopped with + # SIGTERM (signal 15), but ymmv depending on the platform. log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job_id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) return - # TODO: If we're stopping lots of tasks, then we will want to put this - # avoid a two-second overhead using some other asynchronous method. + # TODO: If we're stopping lots of tasks, then we will want to put this + # avoid a two-second overhead using some other asynchronous method. sleep( 2 ) if not self.check_pid( pid ): log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job_id, pid, sig ) ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.