[hg] galaxy 1696: Make the PBS script/stdout/stderr directory co...
details: http://www.bx.psu.edu/hg/galaxy/rev/aac0403bbe68 changeset: 1696:aac0403bbe68 user: Nate Coraor <nate@bx.psu.edu> date: Fri Jan 09 17:08:49 2009 -0500 description: Make the PBS script/stdout/stderr directory configurable. 3 file(s) affected in this change: lib/galaxy/config.py lib/galaxy/jobs/__init__.py lib/galaxy/jobs/runners/pbs.py diffs (93 lines): diff -r 94df4d059b19 -r aac0403bbe68 lib/galaxy/config.py --- a/lib/galaxy/config.py Fri Jan 09 15:59:29 2009 -0500 +++ b/lib/galaxy/config.py Fri Jan 09 17:08:49 2009 -0500 @@ -44,6 +44,7 @@ self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "5" ) ) self.job_scheduler_policy = kwargs.get("job_scheduler_policy", "FIFO") self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") ) + self.cluster_files_directory = resolve_path( kwargs.get( "cluster_files_directory", "database/pbs" ), self.root ) self.job_working_directory = resolve_path( kwargs.get( "job_working_directory", "database/job_working_directory" ), self.root ) self.outputs_to_working_directory = string_as_bool( kwargs.get( 'outputs_to_working_directory', False ) ) self.admin_pass = kwargs.get('admin_pass',"galaxy") @@ -94,7 +95,7 @@ return default def check( self ): # Check that required directories exist - for path in self.root, self.file_path, self.tool_path, self.tool_data_path, self.template_path, self.job_working_directory: + for path in self.root, self.file_path, self.tool_path, self.tool_data_path, self.template_path, self.job_working_directory, self.cluster_files_directory: if not os.path.isdir( path ): raise ConfigurationError("Directory does not exist: %s" % path ) # Check that required files exist diff -r 94df4d059b19 -r aac0403bbe68 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py Fri Jan 09 15:59:29 2009 -0500 +++ b/lib/galaxy/jobs/__init__.py Fri Jan 09 17:08:49 2009 -0500 @@ -1,4 +1,4 @@ -import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback +import logging, threading, sys, os, time, subprocess, string, tempfile, re, traceback, shutil from galaxy import util, model from galaxy.model import mapping @@ -396,6 +396,13 @@ # if the job was deleted, don't fail it if not job.state == model.Job.states.DELETED: for dataset_assoc in job.output_datasets: + if self.app.config.outputs_to_working_directory: + false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.id ) ) + if os.path.exists( false_path ): + shutil.move( false_path, dataset_assoc.dataset.file_name ) + log.debug( "fail(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) ) + else: + log.warning( "fail(): Missing output file in working directory: %s" % false_path ) dataset = dataset_assoc.dataset dataset.refresh() dataset.state = dataset.states.ERROR @@ -474,10 +481,10 @@ if self.app.config.outputs_to_working_directory: false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset_assoc.dataset.id ) ) if os.path.exists( false_path ): - os.rename( false_path, dataset_assoc.dataset.file_name ) - log.debug( "Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) ) + shutil.move( false_path, dataset_assoc.dataset.file_name ) + log.debug( "finish(): Moved %s to %s" % ( false_path, dataset_assoc.dataset.file_name ) ) else: - log.warning( "Missing output file in working directory: %s" % false_path ) + log.warning( "finish(): Missing output file in working directory: %s" % false_path ) for dataset in dataset_assoc.dataset.dataset.history_associations: #need to update all associated output hdas, i.e. history was shared with job running dataset.blurb = 'done' dataset.peek = 'no peek' diff -r 94df4d059b19 -r aac0403bbe68 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py Fri Jan 09 15:59:29 2009 -0500 +++ b/lib/galaxy/jobs/runners/pbs.py Fri Jan 09 17:08:49 2009 -0500 @@ -166,8 +166,8 @@ raise Exception( "Connection to PBS server for submit failed" ) # define job attributes - ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job_wrapper.job_id) - efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) + ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) + efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) # If an application server is set, we're staging if self.app.config.pbs_application_server: @@ -201,7 +201,7 @@ script = pbs_symlink_template % (job_wrapper.galaxy_lib_dir, " ".join(job_wrapper.get_input_fnames() + job_wrapper.get_output_fnames()), self.app.config.pbs_stage_path, exec_dir, command_line) else: script = pbs_template % (job_wrapper.galaxy_lib_dir, exec_dir, command_line) - job_file = "%s/database/pbs/%s.sh" % (os.getcwd(), job_wrapper.job_id) + job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.job_id) fh = file(job_file, "w") fh.write(script) fh.close() @@ -436,9 +436,9 @@ def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" pbs_job_state = PBSJobState() - pbs_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) - pbs_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) - pbs_job_state.job_file = "%s/database/pbs/%s.sh" % (os.getcwd(), job.id) + pbs_job_state.ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job.id) + pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id) + pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.job_id = str( job.job_runner_external_id ) pbs_job_state.runner_url = job_wrapper.tool.job_runner job_wrapper.command_line = job.command_line
participants (1)
-
Greg Von Kuster