commit/galaxy-central: natefoo: Add control for deleting job files after job completion.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/5f044d41a8a7/ changeset: 5f044d41a8a7 user: natefoo date: 2011-12-06 18:41:10 summary: Add control for deleting job files after job completion. affected #: 5 files diff -r c831a584e46e1995ab2b673139ffda42147bef8b -r 5f044d41a8a74af74f36d020206f9807c55f0d34 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -80,6 +80,7 @@ self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") ) self.cluster_files_directory = os.path.abspath( kwargs.get( "cluster_files_directory", "database/pbs" ) ) self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root ) + self.cleanup_job = kwargs.get( "cleanup_job", "always" ) self.outputs_to_working_directory = string_as_bool( kwargs.get( 'outputs_to_working_directory', False ) ) self.output_size_limit = int( kwargs.get( 'output_size_limit', 0 ) ) self.job_walltime = kwargs.get( 'job_walltime', None ) diff -r c831a584e46e1995ab2b673139ffda42147bef8b -r 5f044d41a8a74af74f36d020206f9807c55f0d34 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -482,7 +482,8 @@ # If the job was deleted, call tool specific fail actions (used for e.g. external metadata) and clean up if self.tool: self.tool.job_failed( self, message, exception ) - self.cleanup() + if self.app.config.cleanup_job == 'always': + self.cleanup() def change_state( self, state, info = False ): job = self.get_job() @@ -525,7 +526,8 @@ job = self.get_job() # if the job was deleted, don't finish it if job.state == job.states.DELETED: - self.cleanup() + if self.app.config.cleanup_job in ( 'always', 'onsuccess' ): + self.cleanup() return elif job.state == job.states.ERROR: # Job was deleted by an administrator @@ -698,7 +700,8 @@ util.umask_fix_perms( path, self.app.config.umask, 0666, self.app.config.gid ) self.sa_session.flush() log.debug( 'job %d ended' % self.job_id ) - self.cleanup() + if self.app.config.cleanup_job in ( 'always', 'onsuccess' ): + self.cleanup() def cleanup( self ): # remove temporary files @@ -1045,7 +1048,8 @@ task = self.get_task() # if the job was deleted, don't finish it if task.state == task.states.DELETED: - self.cleanup() + if self.app.config.cleanup_job in ( 'always', 'onsuccess' ): + self.cleanup() return elif task.state == task.states.ERROR: # Job was deleted by an administrator diff -r c831a584e46e1995ab2b673139ffda42147bef8b -r 5f044d41a8a74af74f36d020206f9807c55f0d34 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -137,7 +137,8 @@ # Check for deletion before we change state if job_wrapper.get_state() == model.Job.states.DELETED: log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) - job_wrapper.cleanup() + if self.app.config.cleanup_job in ( "always", "onsuccess" ): + job_wrapper.cleanup() return # Change to queued state immediately @@ -168,8 +169,9 @@ # job was deleted while we were preparing it if job_wrapper.get_state() == model.Job.states.DELETED: log.debug( "Job %s deleted by user before it entered the queue" % job_wrapper.get_id_tag() ) - self.cleanup( ( ofile, efile, jt.remoteCommand ) ) - job_wrapper.cleanup() + if self.app.config.cleanup_job in ( "always", "onsuccess" ): + self.cleanup( ( ofile, efile, jt.remoteCommand ) ) + job_wrapper.cleanup() return # wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers. @@ -288,7 +290,8 @@ log.exception("Job wrapper finish method failed") # clean up the drm files - self.cleanup( ( ofile, efile, job_file ) ) + if self.app.config.cleanup_job == "always" or ( not stderr and self.app.config.cleanup_job == "onsuccess" )): + self.cleanup( ( ofile, efile, job_file ) ) def fail_job( self, drm_job_state ): """ @@ -296,13 +299,15 @@ """ self.stop_job( self.sa_session.query( self.app.model.Job ).get( drm_job_state.job_wrapper.job_id ) ) drm_job_state.job_wrapper.fail( drm_job_state.fail_message ) - self.cleanup( ( drm_job_state.ofile, drm_job_state.efile, drm_job_state.job_file ) ) + if self.app.config.cleanup_job == "always": + self.cleanup( ( drm_job_state.ofile, drm_job_state.efile, drm_job_state.job_file ) ) def cleanup( self, files ): - if not asbool( self.app.config.get( 'debug', False ) ): - for file in files: - if os.access( file, os.R_OK ): - os.unlink( file ) + for file in files: + try: + os.unlink( file ) + except Exception, e: + log.warning( "Unable to cleanup: %s" % str( e ) ) def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" diff -r c831a584e46e1995ab2b673139ffda42147bef8b -r 5f044d41a8a74af74f36d020206f9807c55f0d34 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -214,7 +214,8 @@ # Check for deletion before we change state if job_wrapper.get_state() == model.Job.states.DELETED: log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id ) - job_wrapper.cleanup() + if self.app.config.cleanup_job in ( "always", "onsuccess" ): + job_wrapper.cleanup() return ( pbs_server_name, runner_url ) = self.determine_pbs_server( runner_url, rewrite = True ) @@ -277,8 +278,9 @@ if job_wrapper.get_state() == model.Job.states.DELETED: log.debug( "Job %s deleted by user before it entered the PBS queue" % job_wrapper.job_id ) pbs.pbs_disconnect(c) - self.cleanup( ( ofile, efile, job_file ) ) - job_wrapper.cleanup() + if self.app.config.cleanup_job in ( "always", "onsuccess" ): + self.cleanup( ( ofile, efile, job_file ) ) + job_wrapper.cleanup() return # submit @@ -517,7 +519,8 @@ pbs_job_state.job_wrapper.fail("Unable to finish job", exception=True) # clean up the pbs files - self.cleanup( ( ofile, efile, job_file ) ) + if self.app.config.cleanup_job == "always" or ( not stderr and self.app.config.cleanup_job == "onsuccess" )): + self.cleanup( ( ofile, efile, job_file ) ) def fail_job( self, pbs_job_state ): """ @@ -526,13 +529,15 @@ if pbs_job_state.stop_job: self.stop_job( self.sa_session.query( self.app.model.Job ).get( pbs_job_state.job_wrapper.job_id ) ) pbs_job_state.job_wrapper.fail( pbs_job_state.fail_message ) - self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) ) + if self.app.config.cleanup_job == "always": + self.cleanup( ( pbs_job_state.ofile, pbs_job_state.efile, pbs_job_state.job_file ) ) def cleanup( self, files ): - if not asbool( self.app.config.get( 'debug', False ) ): - for file in files: - if os.access( file, os.R_OK ): - os.unlink( file ) + for file in files: + try: + os.unlink( file ) + except Exception, e: + log.warning( "Unable to cleanup: %s" % str( e ) ) def put( self, job_wrapper ): """Add a job to the queue (by job identifier)""" diff -r c831a584e46e1995ab2b673139ffda42147bef8b -r 5f044d41a8a74af74f36d020206f9807c55f0d34 universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -514,6 +514,12 @@ # Galaxy server after the job completes. #outputs_to_working_directory = False +# Clean up various bits of jobs left on the filesystem after completion. These +# bits include the job working directory, external metadata temporary files, +# and DRM stdout and stderr files (if using a DRM). Possible values are: +# always, onsuccess, never +#cleanup_job = always + # Number of concurrent jobs to run (local job runner) #local_job_queue_workers = 5 Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
Bitbucket