# HG changeset patch --
Bitbucket.org
# Project galaxy-dist
# URL
http://bitbucket.org/galaxy/galaxy-dist/overview
# User Nate Coraor <nate(a)bx.psu.edu>
# Date 1279556993 14400
# Node ID 28e629ad59490af90a28bc45073a7cfc10ed646c
# Parent 0f5eb93a7d61478565d63563bddcbffc6b6a59d9
Upgrade pbs_python to 4.1.0 and use PBS exit_status (if keep_completed is set) so we can
detect PBS failures. This is a reapplication of 3786:48432330228e, which was backed out
in a subsequent revision due to crashes experienced in pbs_python 2.9.8.
--- a/eggs.ini
+++ b/eggs.ini
@@ -17,7 +17,7 @@ Cheetah = 2.2.2
DRMAA_python = 0.2
MySQL_python = 1.2.3c1
numpy = 1.3.0
-pbs_python = 2.9.4
+pbs_python = 4.1.0
psycopg2 = 2.0.13
pycrypto = 2.0.1
pysam = 0.1.1
--- a/lib/galaxy/jobs/runners/pbs.py
+++ b/lib/galaxy/jobs/runners/pbs.py
@@ -50,6 +50,19 @@ cd %s
%s
"""
+# From pbs' job.h
+JOB_EXIT_STATUS = {
+ 0: "job exec successful",
+ -1: "job exec failed, before files, no retry",
+ -2: "job exec failed, after files, no retry",
+ -3: "job execution failed, do retry",
+ -4: "job aborted on MOM initialization",
+ -5: "job aborted on MOM init, chkpt, no migrate",
+ -6: "job aborted on MOM init, chkpt, ok migrate",
+ -7: "job restart failed",
+ -8: "exec() of user command failed",
+}
+
class PBSJobState( object ):
def __init__( self ):
"""
@@ -65,6 +78,7 @@ class PBSJobState( object ):
self.efile = None
self.runner_url = None
self.check_count = 0
+ self.stop_job = False
class PBSJobRunner( object ):
"""
@@ -193,8 +207,9 @@ class PBSJobRunner( object ):
pbs_options = self.determine_pbs_options( runner_url )
c = pbs.pbs_connect( pbs_server_name )
if c <= 0:
+ errno, text = pbs.error()
job_wrapper.fail( "Unable to queue job for execution. Resubmitting the
job may succeed." )
- log.error( "Connection to PBS server for submit failed" )
+ log.error( "Connection to PBS server for submit failed: %s: %s" % (
errno, text ) )
return
# define job attributes
@@ -336,58 +351,78 @@ class PBSJobRunner( object ):
log.debug( "(%s/%s) Skipping state check because PBS server
connection failed" % ( galaxy_job_id, job_id ) )
new_watched.append( pbs_job_state )
continue
- if statuses.has_key( job_id ):
+ try:
status = statuses[job_id]
- if status.job_state != old_state:
- log.debug("(%s/%s) job state changed from %s to %s" % (
galaxy_job_id, job_id, old_state, status.job_state ) )
- if status.job_state == "R" and not pbs_job_state.running:
- pbs_job_state.running = True
- pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING )
- if status.job_state == "R" and ( pbs_job_state.check_count % 20
) == 0:
- # Every 20th time the job status is checked, do limit checks (if
configured)
- if self.app.config.output_size_limit > 0:
- # Check the size of the job outputs
- fail = False
- for outfile, size in
pbs_job_state.job_wrapper.check_output_sizes():
- if size > self.app.config.output_size_limit:
- pbs_job_state.fail_message = 'Job output grew too
large (greater than %s), please try different job parameters or' \
- % nice_size( self.app.config.output_size_limit )
- log.warning( '(%s/%s) Dequeueing job due to output %s
growing larger than %s limit' \
- % ( galaxy_job_id, job_id, os.path.basename( outfile
), nice_size( self.app.config.output_size_limit ) ) )
- self.work_queue.put( ( 'fail', pbs_job_state ) )
- fail = True
- break
- if fail:
- continue
- if self.job_walltime is not None:
- # Check the job's execution time
- if status.get( 'resources_used', False ):
- # resources_used may not be in the status for new jobs
- h, m, s = [ int( i ) for i in
status.resources_used.walltime.split( ':' ) ]
- time_executing = timedelta( 0, s, 0, 0, m, h )
- if time_executing > self.job_walltime:
- pbs_job_state.fail_message = 'Job ran longer than
maximum allowed execution time (%s), please try different job parameters or' \
- % self.app.config.job_walltime
- log.warning( '(%s/%s) Dequeueing job since walltime
has been reached' \
- % ( galaxy_job_id, job_id ) )
- self.work_queue.put( ( 'fail', pbs_job_state ) )
- continue
- pbs_job_state.old_state = status.job_state
- new_watched.append( pbs_job_state )
- else:
+ except KeyError:
try:
- # recheck to make sure it wasn't a communication problem
+ # Recheck to make sure it wasn't a communication problem
self.check_single_job( pbs_server_name, job_id )
- log.warning( "(%s/%s) job was not in state check list, but was
found with individual state check" % ( galaxy_job_id, job_id ) )
+ log.warning( "(%s/%s) PBS job was not in state check list, but
was found with individual state check" % ( galaxy_job_id, job_id ) )
new_watched.append( pbs_job_state )
except:
errno, text = pbs.error()
- if errno != 15001:
- log.info("(%s/%s) state check resulted in error (%d):
%s" % (galaxy_job_id, job_id, errno, text) )
+ if errno == 15001:
+ # 15001 == job not in queue
+ log.debug("(%s/%s) PBS job has left queue" %
(galaxy_job_id, job_id) )
+ self.work_queue.put( ( 'finish', pbs_job_state ) )
+ else:
+ # Unhandled error, continue to monitor
+ log.info("(%s/%s) PBS state check resulted in error (%d):
%s" % (galaxy_job_id, job_id, errno, text) )
new_watched.append( pbs_job_state )
- else:
- log.debug("(%s/%s) job has left queue" %
(galaxy_job_id, job_id) )
- self.work_queue.put( ( 'finish', pbs_job_state ) )
+ continue
+ if status.job_state != old_state:
+ log.debug("(%s/%s) PBS job state changed from %s to %s" % (
galaxy_job_id, job_id, old_state, status.job_state ) )
+ if status.job_state == "R" and not pbs_job_state.running:
+ pbs_job_state.running = True
+ pbs_job_state.job_wrapper.change_state( model.Job.states.RUNNING )
+ if status.job_state == "R" and ( pbs_job_state.check_count % 20 )
== 0:
+ # Every 20th time the job status is checked, do limit checks (if
configured)
+ if self.app.config.output_size_limit > 0:
+ # Check the size of the job outputs
+ fail = False
+ for outfile, size in pbs_job_state.job_wrapper.check_output_sizes():
+ if size > self.app.config.output_size_limit:
+ pbs_job_state.fail_message = 'Job output grew too large
(greater than %s), please try different job parameters or' \
+ % nice_size( self.app.config.output_size_limit )
+ log.warning( '(%s/%s) Dequeueing job due to output %s
growing larger than %s limit' \
+ % ( galaxy_job_id, job_id, os.path.basename( outfile ),
nice_size( self.app.config.output_size_limit ) ) )
+ pbs_job_state.stop_job = True
+ self.work_queue.put( ( 'fail', pbs_job_state ) )
+ fail = True
+ break
+ if fail:
+ continue
+ if self.job_walltime is not None:
+ # Check the job's execution time
+ if status.get( 'resources_used', False ):
+ # resources_used may not be in the status for new jobs
+ h, m, s = [ int( i ) for i in
status.resources_used.walltime.split( ':' ) ]
+ time_executing = timedelta( 0, s, 0, 0, m, h )
+ if time_executing > self.job_walltime:
+ pbs_job_state.fail_message = 'Job ran longer than maximum
allowed execution time (%s), please try different job parameters or' \
+ % self.app.config.job_walltime
+ log.warning( '(%s/%s) Dequeueing job since walltime has
been reached' \
+ % ( galaxy_job_id, job_id ) )
+ pbs_job_state.stop_job = True
+ self.work_queue.put( ( 'fail', pbs_job_state ) )
+ continue
+ elif status.job_state == "C":
+ # "keep_completed" is enabled in PBS, so try to check exit
status
+ try:
+ assert int( status.exit_status ) == 0
+ log.debug("(%s/%s) PBS job has completed successfully" % (
galaxy_job_id, job_id ) )
+ except AssertionError:
+ pbs_job_state.fail_message = 'Job cannot be completed due to a
cluster error. Please retry or'
+ log.error( '(%s/%s) PBS job failed: %s' % ( galaxy_job_id,
job_id, JOB_EXIT_STATUS.get( int( status.exit_status ), 'Unknown error: %s' %
status.exit_status ) ) )
+ self.work_queue.put( ( 'fail', pbs_job_state ) )
+ continue
+ except AttributeError:
+ # No exit_status, can't verify proper completion so we just have
to assume success.
+ log.debug("(%s/%s) PBS job has completed" % (
galaxy_job_id, job_id ) )
+ self.work_queue.put( ( 'finish', pbs_job_state ) )
+ continue
+ pbs_job_state.old_state = status.job_state
+ new_watched.append( pbs_job_state )
# Replace the watch list with the updated version
self.watched = new_watched
@@ -411,9 +446,10 @@ class PBSJobRunner( object ):
log.debug("connection to PBS server %s for state check failed"
% pbs_server_name )
failures.append( pbs_server_name )
continue
- stat_attrl = pbs.new_attrl(2)
+ stat_attrl = pbs.new_attrl(3)
stat_attrl[0].name = pbs.ATTR_state
stat_attrl[1].name = pbs.ATTR_used
+ stat_attrl[2].name = pbs.ATTR_exitstat
jobs = pbs.pbs_statjob( c, None, stat_attrl, None )
pbs.pbs_disconnect( c )
statuses.update( self.convert_statjob_to_bunches( jobs ) )
@@ -480,7 +516,8 @@ class PBSJobRunner( object ):
"""
Seperated out so we can use the worker threads for it.
"""
- self.stop_job( self.sa_session.query( self.app.model.Job ).get(
pbs_job_state.job_wrapper.job_id ) )
+ if pbs_job_state.stop_job:
+ self.stop_job( self.sa_session.query( self.app.model.Job ).get(
pbs_job_state.job_wrapper.job_id ) )
pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message )
self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file
) )
--- a/scripts/scramble/scripts/pbs_python.py
+++ b/scripts/scramble/scripts/pbs_python.py
@@ -29,8 +29,8 @@ if not os.path.exists( 'setup.py.orig' )
i = open( 'setup.py.orig', 'r' )
o = open( 'setup.py', 'w' )
for line in i.readlines():
- if line == " version = '2.9.0',\n":
- line = " version = '2.9.4',\n"
+ if line == " version = '4.0.0',\n":
+ line = " version = '4.1.0',\n"
print >>o, line,
i.close()
o.close()