commit/galaxy-central: scot...@gatech.edu: Added minor changes, including model accessors, for cancelling PBS tasks
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/81692d707741/ changeset: 81692d707741 user: scot...@gatech.edu date: 2012-08-06 05:45:11 summary: Added minor changes, including model accessors, for cancelling PBS tasks affected #: 6 files diff -r ba56d4746f7ade81c3cd7caf8d0d66a437ae7087 -r 81692d707741d1933c3f7007148b59e7c9e28698 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -100,7 +100,7 @@ def get_id_tag(self): # For compatability with drmaa, which uses job_id right now, and TaskWrapper - return str(self.job_id) + return self.get_job().get_id_tag() def get_param_dict( self ): """ @@ -869,7 +869,7 @@ def get_id_tag(self): # For compatibility with drmaa job runner and TaskWrapper, instead of using job_id directly - return "%s_%s" % (self.job_id, self.task_id) + return self.get_task().get_id_tag() def get_param_dict( self ): """ diff -r ba56d4746f7ade81c3cd7caf8d0d66a437ae7087 -r 81692d707741d1933c3f7007148b59e7c9e28698 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -1,4 +1,4 @@ -import os, logging, threading, time +import os, logging, threading, time, traceback from datetime import timedelta from Queue import Queue, Empty @@ -305,7 +305,9 @@ return # submit - galaxy_job_id = job_wrapper.job_id + # The job tag includes the job and the task identifier + # (if a TaskWrapper was passed in): + galaxy_job_id = job_wrapper.get_id_tag() log.debug("(%s) submitting file %s" % ( galaxy_job_id, job_file ) ) log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) job_id = pbs.pbs_submit(c, job_attrs, job_file, pbs_queue_name, None) @@ -375,7 +377,8 @@ ( failures, statuses ) = self.check_all_jobs() for pbs_job_state in self.watched: job_id = pbs_job_state.job_id - galaxy_job_id = pbs_job_state.job_wrapper.job_id + #galaxy_job_id = pbs_job_state.job_wrapper.job_id + galaxy_job_id = pbs_job_state.job_wrapper.get_id_tag() old_state = pbs_job_state.old_state pbs_server_name = self.determine_pbs_server( pbs_job_state.runner_url ) if pbs_server_name in failures: @@ -562,6 +565,8 @@ """ Separated out so we can use the worker threads for it. """ + # NB: The stop_job method was modified to limit exceptions being sent up here, + # so the wrapper's fail method will now be called in case of error: if pbs_job_state.stop_job: self.stop_job( self.sa_session.query( self.app.model.Job ).get( pbs_job_state.job_wrapper.job_id ) ) pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message ) @@ -606,14 +611,30 @@ def stop_job( self, job ): """Attempts to delete a job from the PBS queue""" - pbs_server_name = self.determine_pbs_server( str( job.job_runner_name ) ) - c = pbs.pbs_connect( pbs_server_name ) - if c <= 0: - log.debug("(%s/%s) Connection to PBS server for job delete failed" % ( job.id, job.job_runner_external_id ) ) - return - pbs.pbs_deljob( c, str( job.job_runner_external_id ), '' ) - pbs.pbs_disconnect( c ) - log.debug( "(%s/%s) Removed from PBS queue before job completion" % ( job.id, job.job_runner_external_id ) ) + job_tag = ( "(%s/%s)" + % ( job.get_id_tag(), job.get_job_runner_external_id() ) ) + log.debug( "%s Stopping PBS job" % job_tag ) + + # Declare the connection handle c so that it can be cleaned up: + c = None + + try: + pbs_server_name = self.determine_pbs_server( job.get_job_runner_name() ) + c = pbs.pbs_connect( pbs_server_name ) + if c <= 0: + log.debug("%s Connection to PBS server for job delete failed" + % job_tag ) + return + pbs.pbs_deljob( c, job.get_job_runner_external_id(), '' ) + log.debug( "%s Removed from PBS queue before job completion" + % job_tag ) + except: + e = traceback.format_exc() + log.debug( "%s Unable to stop job: %s" % ( job_tag, e ) ) + finally: + # Cleanup: disconnect from the server. + if ( None != c ): + pbs.pbs_disconnect( c ) def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" @@ -622,17 +643,17 @@ pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.ecfile = "%s/%s.ec" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id) - pbs_job_state.job_id = str( job.job_runner_external_id ) + pbs_job_state.job_id = str( job.get_job_runner_external_id() ) pbs_job_state.runner_url = job_wrapper.get_job_runner_url() job_wrapper.command_line = job.command_line pbs_job_state.job_wrapper = job_wrapper if job.state == model.Job.states.RUNNING: - log.debug( "(%s/%s) is still in running state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) + log.debug( "(%s/%s) is still in running state, adding to the PBS queue" % ( job.id, job.get_job_runner_external_id() ) ) pbs_job_state.old_state = 'R' pbs_job_state.running = True self.monitor_queue.put( pbs_job_state ) elif job.state == model.Job.states.QUEUED: - log.debug( "(%s/%s) is still in PBS queued state, adding to the PBS queue" % ( job.id, job.job_runner_external_id ) ) + log.debug( "(%s/%s) is still in PBS queued state, adding to the PBS queue" % ( job.id, job.get_job_runner_external_id() ) ) pbs_job_state.old_state = 'Q' pbs_job_state.running = False self.monitor_queue.put( pbs_job_state ) diff -r ba56d4746f7ade81c3cd7caf8d0d66a437ae7087 -r 81692d707741d1933c3f7007148b59e7c9e28698 lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -197,12 +197,6 @@ % ( job.get_id(), task.get_id() ) ) job_wrapper.app.job_manager.job_handler.dispatcher.stop( task ) -# DELETEME: -# else: -# log.debug( "cancel_job for job %d: Task %d is in state %s and does not need to be cancelled" -# % ( job.get_id(), task.get_id(), task_state ) ) - - def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" # Change to queued state before handing to worker thread so the runner won't pick it up again diff -r ba56d4746f7ade81c3cd7caf8d0d66a437ae7087 -r 81692d707741d1933c3f7007148b59e7c9e28698 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -188,6 +188,12 @@ def get_tasks( self ): # The tasks member is pert of a reference in the SQL Alchemy schema: return self.tasks + def get_id_tag( self ): + """ + Return a tag that can be useful in identifying a Job. + This returns the Job's get_id + """ + return "%s" % self.id; def set_session_id( self, session_id ): self.session_id = session_id @@ -328,6 +334,12 @@ def get_id( self ): # This is defined in the SQL Alchemy schema: return self.id + def get_id_tag( self ): + """ + Return an id tag suitable for identifying the task. + This combines the task's job id and the task's own id. + """ + return "%s:%s" % ( self.job.get_id(), self.get_id() ) def get_command_line( self ): return self.command_line def get_parameters( self ): diff -r ba56d4746f7ade81c3cd7caf8d0d66a437ae7087 -r 81692d707741d1933c3f7007148b59e7c9e28698 tools/filters/changeCase.pl --- a/tools/filters/changeCase.pl +++ b/tools/filters/changeCase.pl @@ -56,3 +56,10 @@ close IN; close OUT; + +sleep(10); +if ( 0 == floor(4 * rand()) % 4 ) { + print "Exiting randomly - no actual error\n"; + exit 2; +} +sleep(50); diff -r ba56d4746f7ade81c3cd7caf8d0d66a437ae7087 -r 81692d707741d1933c3f7007148b59e7c9e28698 tools/filters/changeCase.xml --- a/tools/filters/changeCase.xml +++ b/tools/filters/changeCase.xml @@ -1,5 +1,13 @@ <tool id="ChangeCase" name="Change Case"><description> of selected columns</description> + <parallelism method="multi" + split_inputs="input" + split_mode="number_of_parts" + split_size="8" + merge_outputs="out_file1" /> + <stdio> + <exit_code range="1:" err_level="fatal" /> + </stdio><command interpreter="perl">changeCase.pl $input "$cols" $delimiter $casing $out_file1</command><inputs><param name="input" format="txt" type="data" label="From"/> @@ -71,4 +79,4 @@ WINDOWS is BAD </help> -</tool> \ No newline at end of file +</tool> Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket