6 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/96dd7b391ae4/ changeset: 96dd7b391ae4 user: jmchilton date: 2012-10-19 06:32:32 summary: Improved encapsulation of job splitting logic, setting the stage for implicit splitting. affected #: 2 files
diff -r 340438c62171578078323d39da398d5053b69d0a -r 96dd7b391ae478e82af14153495d61225bf55dcd lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -89,6 +89,10 @@ self.__user_system_pwent = None self.__galaxy_system_pwent = None
+ def can_split( self ): + # Should the job handler split this job up? + return self.app.config.use_tasked_jobs and self.tool.parallelism + def get_job_runner_url( self ): return self.job_runner_mapper.get_job_runner_url( self.params )
@@ -922,6 +926,11 @@ self.prepare_input_files_cmds = None self.status = task.states.NEW
+ def can_split( self ): + # Should the job handler split this job up? TaskWrapper should + # always return False as the job has already been split. + return False + def get_job( self ): if self.job_id: return self.sa_session.query( model.Job ).get( self.job_id )
diff -r 340438c62171578078323d39da398d5053b69d0a -r 96dd7b391ae478e82af14153495d61225bf55dcd lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -449,7 +449,7 @@ log.debug( 'Loaded job runner: %s' % display_name )
def __get_runner_name( self, job_wrapper ): - if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and not isinstance(job_wrapper, TaskWrapper): + if job_wrapper.can_split(): runner_name = "tasks" else: runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0] @@ -458,7 +458,7 @@ def put( self, job_wrapper ): try: runner_name = self.__get_runner_name( job_wrapper ) - if self.app.config.use_tasked_jobs and job_wrapper.tool.parallelism is not None and isinstance(job_wrapper, TaskWrapper): + if isinstance(job_wrapper, TaskWrapper): #DBTODO Refactor log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) else:
https://bitbucket.org/galaxy/galaxy-central/commits/4bafc5e59111/ changeset: 4bafc5e59111 user: jmchilton date: 2012-11-11 22:36:14 summary: Replace access pattern 'job_wrapper.tool.parallelism' with 'job_wrapper.get_parallelism()' (a newly implemented method on JobWrapper) as a step toward enabling of per job parallelism (as opposed to per tool parallelism). affected #: 4 files
diff -r 96dd7b391ae478e82af14153495d61225bf55dcd -r 4bafc5e59111cc2b8e374d2ab816a5c12d4ed459 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -96,6 +96,9 @@ def get_job_runner_url( self ): return self.job_runner_mapper.get_job_runner_url( self.params )
+ def get_parallelism(self): + return self.tool.parallelism + # legacy naming get_job_runner = get_job_runner_url
diff -r 96dd7b391ae478e82af14153495d61225bf55dcd -r 4bafc5e59111cc2b8e374d2ab816a5c12d4ed459 lib/galaxy/jobs/runners/tasks.py --- a/lib/galaxy/jobs/runners/tasks.py +++ b/lib/galaxy/jobs/runners/tasks.py @@ -71,12 +71,13 @@ try: job_wrapper.change_state( model.Job.states.RUNNING ) self.sa_session.flush() - # Split with the tool-defined method. + # Split with the defined method. + parallelism = job_wrapper.get_parallelism() try: - splitter = getattr(__import__('galaxy.jobs.splitters', globals(), locals(), [job_wrapper.tool.parallelism.method]), job_wrapper.tool.parallelism.method) + splitter = getattr(__import__('galaxy.jobs.splitters', globals(), locals(), [parallelism.method]), parallelism.method) except: job_wrapper.change_state( model.Job.states.ERROR ) - job_wrapper.fail("Job Splitting Failed, no match for '%s'" % job_wrapper.tool.parallelism) + job_wrapper.fail("Job Splitting Failed, no match for '%s'" % parallelism) return tasks = splitter.do_split(job_wrapper) # Not an option for now. Task objects don't *do* anything
diff -r 96dd7b391ae478e82af14153495d61225bf55dcd -r 4bafc5e59111cc2b8e374d2ab816a5c12d4ed459 lib/galaxy/jobs/splitters/basic.py --- a/lib/galaxy/jobs/splitters/basic.py +++ b/lib/galaxy/jobs/splitters/basic.py @@ -5,8 +5,9 @@
def set_basic_defaults(job_wrapper): parent_job = job_wrapper.get_job() - job_wrapper.tool.parallelism.attributes['split_inputs'] = parent_job.input_datasets[0].name - job_wrapper.tool.parallelism.attributes['merge_outputs'] = job_wrapper.get_output_hdas_and_fnames().keys()[0] + parallelism = job_wrapper.get_parallelism() + parallelism.attributes['split_inputs'] = parent_job.input_datasets[0].name + parallelism.attributes['merge_outputs'] = job_wrapper.get_output_hdas_and_fnames().keys()[0]
def do_split (job_wrapper): if len(job_wrapper.get_input_fnames()) > 1 or len(job_wrapper.get_output_fnames()) > 1:
diff -r 96dd7b391ae478e82af14153495d61225bf55dcd -r 4bafc5e59111cc2b8e374d2ab816a5c12d4ed459 lib/galaxy/jobs/splitters/multi.py --- a/lib/galaxy/jobs/splitters/multi.py +++ b/lib/galaxy/jobs/splitters/multi.py @@ -8,7 +8,7 @@ parent_job = job_wrapper.get_job() working_directory = os.path.abspath(job_wrapper.working_directory)
- parallel_settings = job_wrapper.tool.parallelism.attributes + parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: split_inputs="input1,input2" shared_inputs="genome" # Designates inputs to be split or shared split_inputs=parallel_settings.get("split_inputs") @@ -91,7 +91,7 @@
def do_merge( job_wrapper, task_wrappers): - parallel_settings = job_wrapper.tool.parallelism.attributes + parallel_settings = job_wrapper.get_parallelism().attributes # Syntax: merge_outputs="export" pickone_outputs="genomesize" # Designates outputs to be merged, or selected from as a representative merge_outputs = parallel_settings.get("merge_outputs")
https://bitbucket.org/galaxy/galaxy-central/commits/53d3d620a878/ changeset: 53d3d620a878 user: jmchilton date: 2012-11-15 16:19:08 summary: Rename ToolParallelismInfo to ParallelismInfo and move to jobs module to reflect the fact per-job (instead of per-tool) parallelism could also be a possibility (even if only in downstream Galaxy forks). Also, allow building these objects from dictionaries (in addition to traditional XML-based creation). affected #: 2 files
diff -r 4bafc5e59111cc2b8e374d2ab816a5c12d4ed459 -r 53d3d620a8783a7ac4de9360e3691853681ffac3 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -1153,3 +1153,20 @@ return def shutdown( self ): return + +class ParallelismInfo(object): + """ + Stores the information (if any) for running multiple instances of the tool in parallel + on the same set of inputs. + """ + def __init__(self, tag): + self.method = tag.get('method') + if isinstance(tag, dict): + items = tag.iteritems() + else: + items = tag.attrib.items() + self.attributes = dict([item for item in items if item[0] != 'method' ]) + if len(self.attributes) == 0: + # legacy basic mode - provide compatible defaults + self.attributes['split_size'] = 20 + self.attributes['split_mode'] = 'number_of_parts'
diff -r 4bafc5e59111cc2b8e374d2ab816a5c12d4ed459 -r 53d3d620a8783a7ac4de9360e3691853681ffac3 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -15,6 +15,7 @@ from galaxy.util.bunch import Bunch from galaxy.util.template import fill_template from galaxy import util, jobs, model +from galaxy.jobs import ParallelismInfo from elementtree import ElementTree from parameters import * from parameters.grouping import * @@ -797,19 +798,6 @@ self.type = type self.version = version
-class ToolParallelismInfo(object): - """ - Stores the information (if any) for running multiple instances of the tool in parallel - on the same set of inputs. - """ - def __init__(self, tag): - self.method = tag.get('method') - self.attributes = dict([item for item in tag.attrib.items() if item[0] != 'method' ]) - if len(self.attributes) == 0: - # legacy basic mode - provide compatible defaults - self.attributes['split_size'] = 20 - self.attributes['split_mode'] = 'number_of_parts' - class Tool: """ Represents a computational tool that can be executed through Galaxy. @@ -989,7 +977,7 @@ # Parallelism for tasks, read from tool config. parallelism = root.find("parallelism") if parallelism is not None and parallelism.get("method"): - self.parallelism = ToolParallelismInfo(parallelism) + self.parallelism = ParallelismInfo(parallelism) else: self.parallelism = None # Set job handler(s). Each handler is a dict with 'url' and, optionally, 'params'.
https://bitbucket.org/galaxy/galaxy-central/commits/e67fb0786e41/ changeset: e67fb0786e41 user: jmchilton date: 2012-11-26 19:55:15 summary: Merge with latest galaxy-central and resolve conflict introduced with pull request 82 was accepted. affected #: 154 files
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 README.txt --- a/README.txt +++ b/README.txt @@ -10,7 +10,7 @@ Galaxy requires Python 2.5, 2.6 or 2.7. To check your python version, run:
% python -V -Python 2.4.4 +Python 2.7.3
Start Galaxy:
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/datatypes/assembly.py --- a/lib/galaxy/datatypes/assembly.py +++ b/lib/galaxy/datatypes/assembly.py @@ -225,4 +225,3 @@ if __name__ == '__main__': import doctest, sys doctest.testmod(sys.modules[__name__]) -
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/datatypes/data.py --- a/lib/galaxy/datatypes/data.py +++ b/lib/galaxy/datatypes/data.py @@ -69,6 +69,9 @@ <class 'galaxy.datatypes.metadata.MetadataParameter'>
""" + #: dictionary of metadata fields for this datatype:: + metadata_spec = None + __metaclass__ = DataMeta # Add metadata elements MetadataElement( name="dbkey", desc="Database/Build", default="?", param=metadata.DBKeyParameter, multiple=False, no_value="?" ) @@ -849,4 +852,3 @@ except UnicodeDecodeError: text = "binary/unknown file" return text -
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/datatypes/metadata.py --- a/lib/galaxy/datatypes/metadata.py +++ b/lib/galaxy/datatypes/metadata.py @@ -123,6 +123,7 @@ def __getstate__( self ): return None #cannot pickle a weakref item (self._parent), when data._metadata_collection is None, it will be recreated on demand
+ class MetadataSpecCollection( odict ): """ A simple extension of dict which allows cleaner access to items @@ -132,13 +133,21 @@ """ def __init__( self, dict = None ): odict.__init__( self, dict = None ) + def append( self, item ): self[item.name] = item + def iter( self ): return self.itervalues() + def __getattr__( self, name ): return self.get( name )
+ def __repr__( self ): + # force elements to draw with __str__ for sphinx-apidoc + return ', '.join([ item.__str__() for item in self.iter() ]) + + class MetadataParameter( object ): def __init__( self, spec ): self.spec = spec @@ -185,7 +194,6 @@ """ pass
- def unwrap( self, form_value ): """ Turns a value into its storable form. @@ -205,19 +213,22 @@ Turns a value read from an external dict into its value to be pushed directly into the metadata dict. """ return value + def to_external_value( self, value ): """ Turns a value read from a metadata into its value to be pushed directly into the external dict. """ return value
+ class MetadataElementSpec( object ): """ Defines a metadata element and adds it to the metadata_spec (which is a MetadataSpecCollection) of datatype. """ - - def __init__( self, datatype, name=None, desc=None, param=MetadataParameter, default=None, no_value = None, visible=True, set_in_upload = False, **kwargs ): + def __init__( self, datatype, + name=None, desc=None, param=MetadataParameter, default=None, no_value = None, + visible=True, set_in_upload = False, **kwargs ): self.name = name self.desc = desc or name self.default = default @@ -226,24 +237,37 @@ self.set_in_upload = set_in_upload # Catch-all, allows for extra attributes to be set self.__dict__.update(kwargs) - #set up param last, as it uses values set above + # set up param last, as it uses values set above self.param = param( self ) - datatype.metadata_spec.append( self ) #add spec element to the spec + # add spec element to the spec + datatype.metadata_spec.append( self ) + def get( self, name, default=None ): return self.__dict__.get(name, default) + def wrap( self, value ): """ Turns a stored value into its usable form. """ return self.param.wrap( value ) + def unwrap( self, value ): """ Turns an incoming value into its storable form. """ return self.param.unwrap( value )
+ def __str__( self ): + #TODO??: assuming param is the class of this MetadataElementSpec - add the plain class name for that + spec_dict = dict( param_class=self.param.__class__.__name__ ) + spec_dict.update( self.__dict__ ) + return ( "{name} ({param_class}): {desc}, defaults to '{default}'".format( **spec_dict ) ) + +# create a statement class that, when called, +# will add a new MetadataElementSpec to a class's metadata_spec MetadataElement = Statement( MetadataElementSpec )
+ """ MetadataParameter sub-classes. """
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -252,6 +252,9 @@ # Update (non-library) job output datasets through the object store if dataset not in job.output_library_datasets: self.app.object_store.update_from_file(dataset.dataset, create=True) + # Pause any dependent jobs (and those jobs' outputs) + for dep_job_assoc in dataset.dependent_jobs: + self.pause( dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state." ) self.sa_session.add( dataset ) self.sa_session.flush() job.state = job.states.ERROR @@ -282,6 +285,19 @@ if self.app.config.cleanup_job == 'always' or (self.app.config.cleanup_job == 'onsuccess' and job.state == job.states.DELETED): self.cleanup()
+ def pause( self, job=None, message=None ): + if job is None: + job = self.get_job() + if message is None: + message = "Execution of this dataset's job is paused" + if job.state == job.states.NEW: + for dataset_assoc in job.output_datasets + job.output_library_datasets: + dataset_assoc.dataset.dataset.state = dataset_assoc.dataset.dataset.states.PAUSED + dataset_assoc.dataset.info = message + self.sa_session.add( dataset_assoc.dataset ) + job.state = job.states.PAUSED + self.sa_session.add( job ) + def change_state( self, state, info = False ): job = self.get_job() self.sa_session.refresh( job ) @@ -444,6 +460,9 @@ log.debug( "setting dataset state to ERROR" ) # TODO: This is where the state is being set to error. Change it! dataset_assoc.dataset.dataset.state = model.Dataset.states.ERROR + # Pause any dependent jobs (and those jobs' outputs) + for dep_job_assoc in dataset_assoc.dataset.dependent_jobs: + self.pause( dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state." ) else: dataset_assoc.dataset.dataset.state = model.Dataset.states.OK # If any of the rest of the finish method below raises an
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -17,6 +17,7 @@
# States for running a job. These are NOT the same as data states JOB_WAIT, JOB_ERROR, JOB_INPUT_ERROR, JOB_INPUT_DELETED, JOB_READY, JOB_DELETED, JOB_ADMIN_DELETED, JOB_USER_OVER_QUOTA = 'wait', 'error', 'input_error', 'input_deleted', 'ready', 'deleted', 'admin_deleted', 'user_over_quota' +DEFAULT_JOB_PUT_FAILURE_MESSAGE = 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.'
class JobHandler( object ): """ @@ -41,6 +42,7 @@ a JobRunner. """ STOP_SIGNAL = object() + def __init__( self, app, dispatcher ): """Start the job manager""" self.app = app @@ -193,6 +195,10 @@ elif job_state == JOB_USER_OVER_QUOTA: log.info( "(%d) User (%s) is over quota: job paused" % ( job.id, job.user_id ) ) job.state = model.Job.states.PAUSED + for dataset_assoc in job.output_datasets + job.output_library_datasets: + dataset_assoc.dataset.dataset.state = model.Dataset.states.PAUSED + dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run" + self.sa_session.add( dataset_assoc.dataset.dataset ) self.sa_session.add( job ) else: log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) ) @@ -458,6 +464,15 @@ def put( self, job_wrapper ): try: runner_name = self.__get_runner_name( job_wrapper ) + except Exception, e: + failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) + if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: + log.exception( 'Failed to generate job runner name' ) + else: + log.debug( "Intentionally failing job with message (%s)" % failure_message ) + job_wrapper.fail( failure_message ) + return + try: if isinstance(job_wrapper, TaskWrapper): #DBTODO Refactor log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) ) @@ -466,7 +481,7 @@ self.job_runners[runner_name].put( job_wrapper ) except KeyError: log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) - job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) + job_wrapper.fail( DEFAULT_JOB_PUT_FAILURE_MESSAGE )
def stop( self, job ): """ @@ -508,7 +523,7 @@ self.job_runners[runner_name].recover( job, job_wrapper ) except KeyError: log.error( 'recover(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) ) - job_wrapper.fail( 'Unable to run job due to a misconfiguration of the Galaxy job running system. Please contact a site administrator.' ) + job_wrapper.fail( DEFAULT_JOB_PUT_FAILURE_MESSAGE )
def shutdown( self ): for runner in self.job_runners.itervalues():
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -8,6 +8,12 @@
DYNAMIC_RUNNER_PREFIX = "dynamic:///"
+class JobMappingException( Exception ): + + def __init__( self, failure_message ): + self.failure_message = failure_message + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs @@ -116,7 +122,7 @@
def __cache_job_runner_url( self, params ): # If there's already a runner set in the Job object, don't overwrite from the tool - if self.job_runner_name is not None: + if self.job_runner_name is not None and not self.job_runner_name.startswith('tasks'): raw_job_runner_url = self.job_runner_name else: raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params )
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/runners/cli.py --- a/lib/galaxy/jobs/runners/cli.py +++ b/lib/galaxy/jobs/runners/cli.py @@ -359,12 +359,16 @@
def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_id = job.get_job_runner_external_id() + if job_id is None: + self.put( job_wrapper ) + return runner_job_state = RunnerJobState() runner_job_state.ofile = "%s.gjout" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) runner_job_state.efile = "%s.gjerr" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) runner_job_state.ecfile = "%s.gjec" % os.path.join(job_wrapper.working_directory, job_wrapper.get_id_tag()) runner_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job_wrapper.get_id_tag()) - runner_job_state.external_job_id = str( job.job_runner_external_id ) + runner_job_state.external_job_id = str( job_id ) job_wrapper.command_line = job.command_line runner_job_state.job_wrapper = job_wrapper runner_job_state.runner_url = job.job_runner_name
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/runners/condor.py --- a/lib/galaxy/jobs/runners/condor.py +++ b/lib/galaxy/jobs/runners/condor.py @@ -368,11 +368,15 @@ def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" # TODO Check if we need any changes here + job_id = job.get_job_runner_external_id() + if job_id is None: + self.put( job_wrapper ) + return drm_job_state = CondorJobState() drm_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.id) drm_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.id) drm_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.id) - drm_job_state.job_id = str( job.job_runner_external_id ) + drm_job_state.job_id = str( job_id ) drm_job_state.runner_url = job_wrapper.get_job_runner() job_wrapper.command_line = job.command_line drm_job_state.job_wrapper = job_wrapper
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/runners/drmaa.py --- a/lib/galaxy/jobs/runners/drmaa.py +++ b/lib/galaxy/jobs/runners/drmaa.py @@ -411,12 +411,16 @@
def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_id = job.get_job_runner_external_id() + if job_id is None: + self.put( job_wrapper ) + return drm_job_state = DRMAAJobState() drm_job_state.ofile = "%s.drmout" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) drm_job_state.efile = "%s.drmerr" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) drm_job_state.ecfile = "%s.drmec" % os.path.join(os.getcwd(), job_wrapper.working_directory, job_wrapper.get_id_tag()) drm_job_state.job_file = "%s/galaxy_%s.sh" % (self.app.config.cluster_files_directory, job.get_id()) - drm_job_state.job_id = str( job.get_job_runner_external_id() ) + drm_job_state.job_id = str( job_id ) drm_job_state.runner_url = job_wrapper.get_job_runner_url() job_wrapper.command_line = job.get_command_line() drm_job_state.job_wrapper = job_wrapper
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -640,12 +640,16 @@
def recover( self, job, job_wrapper ): """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_id = job.get_job_runner_external_id() + if job_id is None: + self.put( job_wrapper ) + return pbs_job_state = PBSJobState() pbs_job_state.ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.ecfile = "%s/%s.ec" % (self.app.config.cluster_files_directory, job.id) pbs_job_state.job_file = "%s/%s.sh" % (self.app.config.cluster_files_directory, job.id) - pbs_job_state.job_id = str( job.get_job_runner_external_id() ) + pbs_job_state.job_id = str( job_id ) pbs_job_state.runner_url = job_wrapper.get_job_runner_url() job_wrapper.command_line = job.command_line pbs_job_state.job_wrapper = job_wrapper
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/jobs/runners/sge.py --- a/lib/galaxy/jobs/runners/sge.py +++ /dev/null @@ -1,392 +0,0 @@ -import os, logging, threading, time -from Queue import Queue, Empty - -from galaxy import model -from galaxy.jobs.runners import BaseJobRunner - -from paste.deploy.converters import asbool - -import pkg_resources - -egg_message = """ - -The 'sge' runner depends on 'DRMAA_python' which is not installed. Galaxy's -"scramble" system should make this installation simple, please follow the -instructions found at: - - http://wiki.g2.bx.psu.edu/Admin/Config/Performance/Cluster - -Additional errors may follow: -%s -""" - - -try: - pkg_resources.require( "DRMAA_python" ) - import DRMAA -except Exception, e: - raise Exception( egg_message % str( e ) ) - - -log = logging.getLogger( __name__ ) - -__all__ = [ 'SGEJobRunner' ] - -DRMAA_state = { - DRMAA.Session.UNDETERMINED: 'process status cannot be determined', - DRMAA.Session.QUEUED_ACTIVE: 'job is queued and waiting to be scheduled', - DRMAA.Session.SYSTEM_ON_HOLD: 'job is queued and in system hold', - DRMAA.Session.USER_ON_HOLD: 'job is queued and in user hold', - DRMAA.Session.USER_SYSTEM_ON_HOLD: 'job is queued and in user and system hold', - DRMAA.Session.RUNNING: 'job is running', - DRMAA.Session.SYSTEM_SUSPENDED: 'job is system suspended', - DRMAA.Session.USER_SUSPENDED: 'job is user suspended', - DRMAA.Session.DONE: 'job finished normally', - DRMAA.Session.FAILED: 'job finished, but failed', -} - -sge_template = """#!/bin/sh -#$ -S /bin/sh -GALAXY_LIB="%s" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - PYTHONPATH="$GALAXY_LIB" - fi - export PYTHONPATH -fi -cd %s -%s -""" - -class SGEJobState( object ): - def __init__( self ): - """ - Encapsulates state related to a job that is being run via SGE and - that we need to monitor. - """ - self.job_wrapper = None - self.job_id = None - self.old_state = None - self.running = False - self.job_file = None - self.ofile = None - self.efile = None - self.runner_url = None - -class SGEJobRunner( BaseJobRunner ): - """ - Job runner backed by a finite pool of worker threads. FIFO scheduling - """ - STOP_SIGNAL = object() - def __init__( self, app ): - """Initialize this job runner and start the monitor thread""" - self.app = app - self.sa_session = app.model.context - # 'watched' and 'queue' are both used to keep track of jobs to watch. - # 'queue' is used to add new watched jobs, and can be called from - # any thread (usually by the 'queue_job' method). 'watched' must only - # be modified by the monitor thread, which will move items from 'queue' - # to 'watched' and then manage the watched jobs. - self.watched = [] - self.monitor_queue = Queue() - self.default_cell = self.determine_sge_cell( self.app.config.default_cluster_job_runner ) - self.ds = DRMAA.Session() - self.ds.init( self.default_cell ) - self.monitor_thread = threading.Thread( target=self.monitor ) - self.monitor_thread.start() - self.work_queue = Queue() - self.work_threads = [] - nworkers = app.config.cluster_job_queue_workers - for i in range( nworkers ): - worker = threading.Thread( target=self.run_next ) - worker.start() - self.work_threads.append( worker ) - log.debug( "%d workers ready" % nworkers ) - - def determine_sge_cell( self, url ): - """Determine what SGE cell we are using""" - url_split = url.split("/") - if url_split[0] == 'sge:': - return url_split[2] - # this could happen if sge is started, but is not the default runner - else: - return '' - - def determine_sge_queue( self, url ): - """Determine what SGE queue we are submitting to""" - try: - return url.split('/')[3] or None - except: - return None - - def determine_sge_project( self, url ): - """Determine what SGE project we are submitting to""" - try: - return url.split('/')[4] or None - except: - return None - - def determine_sge_tool_parameters( self, url ): - """Determine what are the tool's specific paramters""" - try: - return url.split('/')[5] or None - except: - return None - - def run_next( self ): - """ - Run the next item in the queue (a job waiting to run or finish ) - """ - while 1: - ( op, obj ) = self.work_queue.get() - if op is self.STOP_SIGNAL: - return - try: - if op == 'queue': - self.queue_job( obj ) - elif op == 'finish': - self.finish_job( obj ) - elif op == 'fail': - self.fail_job( obj ) - except: - log.exception( "Uncaught exception %sing job" % op ) - - def queue_job( self, job_wrapper ): - """Create SGE script for a job and submit it to the SGE queue""" - - try: - job_wrapper.prepare() - command_line = self.build_command_line( job_wrapper, include_metadata = True ) - except: - job_wrapper.fail( "failure preparing job", exception=True ) - log.exception("failure running job %d" % job_wrapper.job_id) - return - - runner_url = job_wrapper.get_job_runner_url() - - # This is silly, why would we queue a job with no command line? - if not command_line: - job_wrapper.finish( '', '' ) - return - - # Check for deletion before we change state - if job_wrapper.get_state() == model.Job.states.DELETED: - log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id ) - job_wrapper.cleanup() - return - - # Change to queued state immediately - job_wrapper.change_state( model.Job.states.QUEUED ) - - if self.determine_sge_cell( runner_url ) != self.default_cell: - # TODO: support multiple cells - log.warning( "(%s) Using multiple SGE cells is not supported. This job will be submitted to the default cell." % job_wrapper.job_id ) - sge_queue_name = self.determine_sge_queue( runner_url ) - sge_project_name = self.determine_sge_project( runner_url ) - sge_extra_params = self.determine_sge_tool_parameters ( runner_url ) - - # define job attributes - ofile = "%s/%s.o" % (self.app.config.cluster_files_directory, job_wrapper.job_id) - efile = "%s/%s.e" % (self.app.config.cluster_files_directory, job_wrapper.job_id) - jt = self.ds.createJobTemplate() - jt.remoteCommand = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job_wrapper.job_id) - jt.outputPath = ":%s" % ofile - jt.errorPath = ":%s" % efile - nativeSpec = [] - if sge_queue_name is not None: - nativeSpec.append( "-q '%s'" % sge_queue_name ) - if sge_project_name is not None: - nativeSpec.append( "-P '%s'" % sge_project_name) - if sge_extra_params is not None: - nativeSpec.append( sge_extra_params ) - if len(nativeSpec)>0: - jt.nativeSpecification = ' '.join(nativeSpec) - - script = sge_template % (job_wrapper.galaxy_lib_dir, os.path.abspath( job_wrapper.working_directory ), command_line) - - fh = file( jt.remoteCommand, "w" ) - fh.write( script ) - fh.close() - os.chmod( jt.remoteCommand, 0750 ) - - # job was deleted while we were preparing it - if job_wrapper.get_state() == model.Job.states.DELETED: - log.debug( "Job %s deleted by user before it entered the SGE queue" % job_wrapper.job_id ) - self.cleanup( ( ofile, efile, jt.remoteCommand ) ) - job_wrapper.cleanup() - return - - galaxy_job_id = job_wrapper.job_id - log.debug("(%s) submitting file %s" % ( galaxy_job_id, jt.remoteCommand ) ) - log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) - # runJob will raise if there's a submit problem - job_id = self.ds.runJob(jt) - if sge_queue_name is None: - log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) - else: - log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, sge_queue_name, job_id) ) - - # store runner information for tracking if Galaxy restarts - job_wrapper.set_runner( runner_url, job_id ) - - # Store SGE related state information for job - sge_job_state = SGEJobState() - sge_job_state.job_wrapper = job_wrapper - sge_job_state.job_id = job_id - sge_job_state.ofile = ofile - sge_job_state.efile = efile - sge_job_state.job_file = jt.remoteCommand - sge_job_state.old_state = 'new' - sge_job_state.running = False - sge_job_state.runner_url = runner_url - - # delete the job template - self.ds.deleteJobTemplate( jt ) - - # Add to our 'queue' of jobs to monitor - self.monitor_queue.put( sge_job_state ) - - def monitor( self ): - """ - Watches jobs currently in the PBS queue and deals with state changes - (queued to running) and job completion - """ - while 1: - # Take any new watched jobs and put them on the monitor list - try: - while 1: - sge_job_state = self.monitor_queue.get_nowait() - if sge_job_state is self.STOP_SIGNAL: - # TODO: This is where any cleanup would occur - self.ds.exit() - return - self.watched.append( sge_job_state ) - except Empty: - pass - # Iterate over the list of watched jobs and check state - self.check_watched_items() - # Sleep a bit before the next state check - time.sleep( 1 ) - - def check_watched_items( self ): - """ - Called by the monitor thread to look at each watched job and deal - with state changes. - """ - new_watched = [] - for sge_job_state in self.watched: - job_id = sge_job_state.job_id - galaxy_job_id = sge_job_state.job_wrapper.job_id - old_state = sge_job_state.old_state - try: - state = self.ds.getJobProgramStatus( job_id ) - except DRMAA.InvalidJobError: - # we should only get here if an orphaned job was put into the queue at app startup - log.debug("(%s/%s) job left SGE queue" % ( galaxy_job_id, job_id ) ) - self.work_queue.put( ( 'finish', sge_job_state ) ) - continue - except Exception, e: - # so we don't kill the monitor thread - log.exception("(%s/%s) Unable to check job status" % ( galaxy_job_id, job_id ) ) - log.warning("(%s/%s) job will now be errored" % ( galaxy_job_id, job_id ) ) - sge_job_state.fail_message = "Cluster could not complete job" - self.work_queue.put( ( 'fail', sge_job_state ) ) - continue - if state != old_state: - log.debug("(%s/%s) state change: %s" % ( galaxy_job_id, job_id, DRMAA_state[state] ) ) - if state == DRMAA.Session.RUNNING and not sge_job_state.running: - sge_job_state.running = True - sge_job_state.job_wrapper.change_state( model.Job.states.RUNNING ) - if state in ( DRMAA.Session.DONE, DRMAA.Session.FAILED ): - self.work_queue.put( ( 'finish', sge_job_state ) ) - continue - sge_job_state.old_state = state - new_watched.append( sge_job_state ) - # Replace the watch list with the updated version - self.watched = new_watched - - def finish_job( self, sge_job_state ): - """ - Get the output/error for a finished job, pass to `job_wrapper.finish` - and cleanup all the SGE temporary files. - """ - ofile = sge_job_state.ofile - efile = sge_job_state.efile - job_file = sge_job_state.job_file - # collect the output - try: - ofh = file(ofile, "r") - efh = file(efile, "r") - stdout = ofh.read( 32768 ) - stderr = efh.read( 32768 ) - except: - stdout = '' - stderr = 'Job output not returned from cluster' - log.debug(stderr) - - try: - sge_job_state.job_wrapper.finish( stdout, stderr ) - except: - log.exception("Job wrapper finish method failed") - - # clean up the sge files - self.cleanup( ( ofile, efile, job_file ) ) - - def fail_job( self, sge_job_state ): - """ - Seperated out so we can use the worker threads for it. - """ - self.stop_job( self.sa_session.query( self.app.model.Job ).get( sge_job_state.job_wrapper.job_id ) ) - sge_job_state.job_wrapper.fail( sge_job_state.fail_message ) - self.cleanup( ( sge_job_state.ofile, sge_job_state.efile, sge_job_state.job_file ) ) - - def cleanup( self, files ): - if not asbool( self.app.config.get( 'debug', False ) ): - for file in files: - if os.access( file, os.R_OK ): - os.unlink( file ) - - def put( self, job_wrapper ): - """Add a job to the queue (by job identifier)""" - # Change to queued state before handing to worker thread so the runner won't pick it up again - job_wrapper.change_state( model.Job.states.QUEUED ) - self.work_queue.put( ( 'queue', job_wrapper ) ) - - def shutdown( self ): - """Attempts to gracefully shut down the monitor thread""" - log.info( "sending stop signal to worker threads" ) - self.monitor_queue.put( self.STOP_SIGNAL ) - for i in range( len( self.work_threads ) ): - self.work_queue.put( ( self.STOP_SIGNAL, None ) ) - log.info( "sge job runner stopped" ) - - def stop_job( self, job ): - """Attempts to delete a job from the SGE queue""" - try: - self.ds.control( job.get_job_runner_external_id(), DRMAA.Session.TERMINATE ) - log.debug( "(%s/%s) Removed from SGE queue at user's request" % ( job.get_id(), job.get_job_runner_external_id() ) ) - except DRMAA.InvalidJobError: - log.debug( "(%s/%s) User killed running job, but it was already dead" % ( job.get_id(), job.get_job_runner_external_id() ) ) - - def recover( self, job, job_wrapper ): - """Recovers jobs stuck in the queued/running state when Galaxy started""" - sge_job_state = SGEJobState() - sge_job_state.ofile = "%s/database/pbs/%s.o" % (os.getcwd(), job.get_id()) - sge_job_state.efile = "%s/database/pbs/%s.e" % (os.getcwd(), job.get_id()) - sge_job_state.job_file = "%s/database/pbs/galaxy_%s.sh" % (os.getcwd(), job.get_id()) - sge_job_state.job_id = str( job.get_job_runner_external_id() ) - sge_job_state.runner_url = job_wrapper.get_job_runner_url() - job_wrapper.command_line = job.get_command_line() - sge_job_state.job_wrapper = job_wrapper - if job.get_state() == model.Job.states.RUNNING: - log.debug( "(%s/%s) is still in running state, adding to the SGE queue" % ( job.get_id(), job.get_job_runner_external_id() ) ) - sge_job_state.old_state = DRMAA.Session.RUNNING - sge_job_state.running = True - self.monitor_queue.put( sge_job_state ) - elif job.get_state() == model.Job.states.QUEUED: - log.debug( "(%s/%s) is still in SGE queued state, adding to the SGE queue" % ( job.get_id(), job.get_job_runner_external_id() ) ) - sge_job_state.old_state = DRMAA.Session.QUEUED_ACTIVE - sge_job_state.running = False - self.monitor_queue.put( sge_job_state )
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -4,24 +4,23 @@ Naming: try to use class names that have a distinct plural form so that the relationship cardinalities are obvious (e.g. prefer Dataset to Data) """ + import pkg_resources -pkg_resources.require( "simplejson" ) -import simplejson +pkg_resources.require("simplejson") +pkg_resources.require("pexpect") +import simplejson, os, errno, codecs, operator, socket, pexpect, logging, time import galaxy.datatypes -from galaxy.util.bunch import Bunch -from galaxy import util import galaxy.datatypes.registry from galaxy.datatypes.metadata import MetadataCollection -from galaxy.security import RBACAgent, get_permitted_actions -from galaxy.util.hash_util import * -from galaxy.web.form_builder import * +from galaxy.security import get_permitted_actions +from galaxy import util +from galaxy.util.bunch import Bunch +from galaxy.util.hash_util import new_secure_hash +from galaxy.web.form_builder import (AddressField, CheckboxField, PasswordField, SelectField, TextArea, TextField, + WorkflowField, WorkflowMappingField, HistoryField) from galaxy.model.item_attrs import UsesAnnotations, APIItem from sqlalchemy.orm import object_session from sqlalchemy.sql.expression import func -import os.path, os, errno, codecs, operator, socket, pexpect, logging, time, shutil - -if sys.version_info[:2] < ( 2, 5 ): - from sets import Set as set
log = logging.getLogger( __name__ )
@@ -138,13 +137,13 @@ self.exit_code = None
# TODO: Add accessors for members defined in SQL Alchemy for the Job table and - # for the mapper defined to the Job table. + # for the mapper defined to the Job table. def get_external_output_metadata( self ): """ - The external_output_metadata is currently a reference from Job to + The external_output_metadata is currently a reference from Job to JobExternalOutputMetadata. It exists for a job but not a task. """ - return self.external_output_metadata + return self.external_output_metadata def get_session_id( self ): return self.session_id def get_user_id( self ): @@ -177,7 +176,7 @@ # runner_name is not the same thing. return self.job_runner_name def get_job_runner_external_id( self ): - # This is different from the Task just in the member accessed: + # This is different from the Task just in the member accessed: return self.job_runner_external_id def get_post_job_actions( self ): return self.post_job_actions @@ -197,10 +196,10 @@ # The tasks member is pert of a reference in the SQL Alchemy schema: return self.tasks def get_id_tag( self ): - """ - Return a tag that can be useful in identifying a Job. + """ + Return a tag that can be useful in identifying a Job. This returns the Job's get_id - """ + """ return "%s" % self.id;
def set_session_id( self, session_id ): @@ -324,8 +323,8 @@ self.task_runner_name = None self.task_runner_external_id = None self.job = job - self.stdout = "" - self.stderr = "" + self.stdout = "" + self.stderr = "" self.exit_code = None self.prepare_input_files_cmd = prepare_files_cmd
@@ -340,8 +339,8 @@ return param_dict
def get_id( self ): - # This is defined in the SQL Alchemy schema: - return self.id + # This is defined in the SQL Alchemy schema: + return self.id def get_id_tag( self ): """ Return an id tag suitable for identifying the task. @@ -378,7 +377,7 @@ # metdata). These can be filled in as needed. def get_external_output_metadata( self ): """ - The external_output_metadata is currently a backref to + The external_output_metadata is currently a backref to JobExternalOutputMetadata. It exists for a job but not a task, and when a task is cancelled its corresponding parent Job will be cancelled. So None is returned now, but that could be changed @@ -395,13 +394,13 @@ """ Runners will use the same methods to get information about the Task class as they will about the Job class, so this method just returns - the task's external id. + the task's external id. """ # TODO: Merge into get_runner_external_id. return self.task_runner_external_id def get_session_id( self ): # The Job's galaxy session is equal to the Job's session, so the - # Job's session is the same as the Task's session. + # Job's session is the same as the Task's session. return self.get_job().get_session_id()
def set_id( self, id ): @@ -424,7 +423,7 @@ # This method is available for runners that do not want/need to # differentiate between the kinds of Runnable things (Jobs and Tasks) # that they're using. - log.debug( "Task %d: Set external id to %s" + log.debug( "Task %d: Set external id to %s" % ( self.id, task_runner_external_id ) ) self.task_runner_external_id = task_runner_external_id def set_task_runner_external_id( self, task_runner_external_id ): @@ -701,8 +700,8 @@ def resume_paused_jobs( self ): for dataset in self.datasets: job = dataset.creating_job - if job.state == Job.states.PAUSED: - job.set_state(Job.states.QUEUED) + if job is not None and job.state == Job.states.PAUSED: + job.set_state(Job.states.NEW) def get_disk_size( self, nice_size=False ): # unique datasets only db_session = object_session( self ) @@ -870,6 +869,7 @@ EMPTY = 'empty', ERROR = 'error', DISCARDED = 'discarded', + PAUSED = 'paused', SETTING_METADATA = 'setting_metadata', FAILED_METADATA = 'failed_metadata' ) permitted_actions = get_permitted_actions( filter='DATASET' ) @@ -953,7 +953,7 @@ return False try: return util.is_multi_byte( codecs.open( self.file_name, 'r', 'utf-8' ).read( 100 ) ) - except UnicodeDecodeError, e: + except UnicodeDecodeError: return False # FIXME: sqlalchemy will replace this def _delete(self): @@ -1135,7 +1135,6 @@ """ Returns dict of { "dependency" => HDA } """ - converted_dataset = self.get_converted_files_by_type( target_ext ) # List of string of dependencies try: depends_list = trans.app.datatypes_registry.converter_deps[self.extension][target_ext] @@ -1306,7 +1305,7 @@ """ Returns datasources for dataset; if datasources are not available due to indexing, indexing is started. Return value is a dictionary - with entries of type + with entries of type (<datasource_type> : {<datasource_name>, <indexing_message>}). """ track_type, data_sources = self.datatype.get_track_type() @@ -1319,17 +1318,17 @@ else: # Convert. msg = self.convert_dataset( trans, data_source ) - + # Store msg. data_sources_dict[ source_type ] = { "name" : data_source, "message": msg } - + return data_sources_dict
def convert_dataset( self, trans, target_type ): """ - Converts a dataset to the target_type and returns a message indicating + Converts a dataset to the target_type and returns a message indicating status of the conversion. None is returned to indicate that dataset - was converted successfully. + was converted successfully. """
# FIXME: copied from controller.py @@ -1401,7 +1400,7 @@ hda.metadata = self.metadata if copy_children: for child in self.children: - child_copy = child.copy( copy_children = copy_children, parent_id = hda.id ) + child.copy( copy_children = copy_children, parent_id = hda.id ) if not self.datatype.copy_safe_peek: # In some instances peek relies on dataset_id, i.e. gmaj.zip for viewing MAFs hda.set_peek() @@ -1453,11 +1452,11 @@ object_session( self ).add( library_dataset ) object_session( self ).flush() for child in self.children: - child_copy = child.to_library_dataset_dataset_association( trans, - target_folder=target_folder, - replace_dataset=replace_dataset, - parent_id=ldda.id, - user=ldda.user ) + child.to_library_dataset_dataset_association( trans, + target_folder=target_folder, + replace_dataset=replace_dataset, + parent_id=ldda.id, + user=ldda.user ) if not self.datatype.copy_safe_peek: # In some instances peek relies on dataset_id, i.e. gmaj.zip for viewing MAFs ldda.set_peek() @@ -1807,7 +1806,7 @@ if add_to_history and target_history: target_history.add_dataset( hda ) for child in self.children: - child_copy = child.to_history_dataset_association( target_history = target_history, parent_id = hda.id, add_to_history = False ) + child.to_history_dataset_association( target_history = target_history, parent_id = hda.id, add_to_history = False ) if not self.datatype.copy_safe_peek: hda.set_peek() #in some instances peek relies on dataset_id, i.e. gmaj.zip for viewing MAFs object_session( self ).flush() @@ -1832,7 +1831,7 @@ ldda.metadata = self.metadata if copy_children: for child in self.children: - child_copy = child.copy( copy_children = copy_children, parent_id = ldda.id ) + child.copy( copy_children = copy_children, parent_id = ldda.id ) if not self.datatype.copy_safe_peek: # In some instances peek relies on dataset_id, i.e. gmaj.zip for viewing MAFs ldda.set_peek() @@ -2639,7 +2638,7 @@ events={ '.ssword:*': scp_configs['password']+'\r\n', pexpect.TIMEOUT:print_ticks}, timeout=10 ) - except Exception, e: + except Exception: return error_msg # cleanup the output to get just the file size return output.replace( filepath, '' )\ @@ -3279,7 +3278,6 @@ .first() return None def get_versions( self, app ): - sa_session = app.model.context.current tool_versions = [] # Prepend ancestors. def __ancestors( app, tool_version ):
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -1571,13 +1571,13 @@ ) )
assign_mapper( context, JobToInputDatasetAssociation, JobToInputDatasetAssociation.table, - properties=dict( job=relation( Job ), dataset=relation( HistoryDatasetAssociation, lazy=False ) ) ) + properties=dict( job=relation( Job ), dataset=relation( HistoryDatasetAssociation, lazy=False, backref="dependent_jobs" ) ) )
assign_mapper( context, JobToOutputDatasetAssociation, JobToOutputDatasetAssociation.table, properties=dict( job=relation( Job ), dataset=relation( HistoryDatasetAssociation, lazy=False ) ) )
assign_mapper( context, JobToInputLibraryDatasetAssociation, JobToInputLibraryDatasetAssociation.table, - properties=dict( job=relation( Job ), dataset=relation( LibraryDatasetDatasetAssociation, lazy=False ) ) ) + properties=dict( job=relation( Job ), dataset=relation( LibraryDatasetDatasetAssociation, lazy=False, backref="dependent_jobs" ) ) )
assign_mapper( context, JobToOutputLibraryDatasetAssociation, JobToOutputLibraryDatasetAssociation.table, properties=dict( job=relation( Job ), dataset=relation( LibraryDatasetDatasetAssociation, lazy=False ) ) )
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/__init__.py --- a/lib/galaxy/tool_shed/__init__.py +++ b/lib/galaxy/tool_shed/__init__.py @@ -53,4 +53,4 @@ galaxy.util.shed_util.load_installed_datatype_converters( self.app, installed_repository_dict, deactivate=deactivate ) if installed_repository_dict[ 'display_path' ]: galaxy.util.shed_util.load_installed_display_applications( self.app, installed_repository_dict, deactivate=deactivate ) - \ No newline at end of file +
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/install_manager.py --- a/lib/galaxy/tool_shed/install_manager.py +++ b/lib/galaxy/tool_shed/install_manager.py @@ -6,6 +6,7 @@ from galaxy.tools import ToolSection from galaxy.util.json import from_json_string, to_json_string from galaxy.util.shed_util import * +from galaxy.util.shed_util_common import * from galaxy.util.odict import odict from galaxy.tool_shed.common_util import *
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/migrate/common.py --- a/lib/galaxy/tool_shed/migrate/common.py +++ b/lib/galaxy/tool_shed/migrate/common.py @@ -2,6 +2,7 @@ import galaxy.config import galaxy.datatypes.registry from galaxy import tools +from galaxy.tools.data import * import galaxy.model.mapping import galaxy.tools.search from galaxy.objectstore import build_object_store_from_config @@ -42,8 +43,8 @@ # Load the data types in the Galaxy distribution, which are defined in self.config.datatypes_config. self.datatypes_registry.load_datatypes( self.config.root, self.config.datatypes_config ) # Initialize tool data tables using the config defined by self.config.tool_data_table_config_path. - self.tool_data_tables = galaxy.tools.data.ToolDataTableManager( tool_data_path=self.config.tool_data_path, - config_filename=self.config.tool_data_table_config_path ) + self.tool_data_tables = ToolDataTableManager( tool_data_path=self.config.tool_data_path, + config_filename=self.config.tool_data_table_config_path ) # Load additional entries defined by self.config.shed_tool_data_table_config into tool data tables. self.tool_data_tables.load_from_config_file( config_filename=self.config.shed_tool_data_table_config, tool_data_path=self.tool_data_tables.tool_data_path,
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/migrate/versions/0007_tools.py --- /dev/null +++ b/lib/galaxy/tool_shed/migrate/versions/0007_tools.py @@ -0,0 +1,17 @@ +""" +The following tools have been eliminated from the distribution: +Map with Bowtie for Illumina, Map with Bowtie for SOLiD, Lastz, +and Lastz paired reads. The tools are now available in the +repositories named bowtie_wrappers, bowtie_color_wrappers, lastz, +and lastz_paired_reads from the main Galaxy tool shed at +http://toolshed.g2.bx.psu.edu, and will be installed into your +local Galaxy instance at the location discussed above by running +the following command. +""" + +import sys + +def upgrade(): + print __doc__ +def downgrade(): + pass
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/tool_dependencies/fabric_util.py --- a/lib/galaxy/tool_shed/tool_dependencies/fabric_util.py +++ b/lib/galaxy/tool_shed/tool_dependencies/fabric_util.py @@ -58,7 +58,10 @@ action_type, action_dict = actions[ 0 ] if action_type == 'download_by_url': url = action_dict[ 'url' ] - downloaded_filename = os.path.split( url )[ -1 ] + if 'target_filename' in action_dict: + downloaded_filename = action_dict[ 'target_filename' ] + else: + downloaded_filename = os.path.split( url )[ -1 ] downloaded_file_path = common_util.url_download( work_dir, downloaded_filename, url ) if common_util.istar( downloaded_file_path ): # <action type="download_by_url">http://sourceforge.net/projects/samtools/files/samtools/0.1.18/samtools-0.1....</action>
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/tool_dependencies/install_util.py --- a/lib/galaxy/tool_shed/tool_dependencies/install_util.py +++ b/lib/galaxy/tool_shed/tool_dependencies/install_util.py @@ -135,6 +135,8 @@ # <action type="download_by_url">http://sourceforge.net/projects/samtools/files/samtools/0.1.18/samtools-0.1....</action> if action_elem.text: action_dict[ 'url' ] = action_elem.text + if 'target_filename' in action_elem.attrib: + action_dict[ 'target_filename' ] = action_elem.attrib[ 'target_filename' ] else: continue elif action_type == 'make_directory':
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tool_shed/update_manager.py --- a/lib/galaxy/tool_shed/update_manager.py +++ b/lib/galaxy/tool_shed/update_manager.py @@ -4,6 +4,7 @@ import threading, urllib2, logging from galaxy.util import string_as_bool from galaxy.util.shed_util import * +from galaxy.util.shed_util_common import *
log = logging.getLogger( __name__ )
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -34,6 +34,7 @@ from galaxy.util.hash_util import * from galaxy.util import listify from galaxy.util.shed_util import * +from galaxy.util.shed_util_common import * from galaxy.web import url_for
from galaxy.visualization.genome.visual_analytics import TracksterConfig
diff -r 53d3d620a8783a7ac4de9360e3691853681ffac3 -r e67fb0786e41de3bb1229a3be267a9bad07320a6 lib/galaxy/tools/data/__init__.py --- a/lib/galaxy/tools/data/__init__.py +++ b/lib/galaxy/tools/data/__init__.py @@ -118,6 +118,14 @@ class ToolDataTable( object ): def __init__( self, config_element, tool_data_path ): self.name = config_element.get( 'name' ) + self.comment_char = config_element.get( 'comment_char' ) + for file_elem in config_element.findall( 'file' ): + # There should only be one file_elem. + if 'path' in file_elem.attrib: + tool_data_file_path = file_elem.get( 'path' ) + self.tool_data_file = os.path.split( tool_data_file_path )[1] + else: + self.tool_data_file = None self.tool_data_path = tool_data_path self.missing_index_file = None
This diff is so big that we needed to truncate the remainder.
https://bitbucket.org/galaxy/galaxy-central/commits/aa07ef33632c/ changeset: aa07ef33632c user: jmchilton date: 2013-01-16 23:22:55 summary: Merge with latest galaxy-central to resolve conflict introduced with 4bd4197. affected #: 6 files Diff not available.
https://bitbucket.org/galaxy/galaxy-central/commits/301d7447dd22/ changeset: 301d7447dd22 user: dannon date: 2013-01-30 00:08:43 summary: Merged in galaxyp/galaxy-central-parallelism-refactorings (pull request #87)
Refactoring Task Splitting Toward Per-Job Definitions (in Addition to Current Per-Tool Definitions) affected #: 6 files Diff not available.
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
galaxy-commits@lists.galaxyproject.org