2 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/c1a640fa6b3d/
changeset: c1a640fa6b3d
user: Brad Langhorst
date: 2012-07-20 21:15:14
summary: copies the tags from a source history when a workflow is creating new histories to store results.
moves tag copying logic into the model for TagAssociation class
adds copy_tags_from method to history and storedworkflow classes
affected #: 2 files
diff -r 4c22a2fbf25a8ee05d870dcebe9f7b729fa22f07 -r c1a640fa6b3d4f0486d3a20b7f0eebb605c923db lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -434,7 +434,10 @@
# Copy annotation.
self.copy_item_annotation( db_session, self.user, self, target_user, new_history )
-
+
+ #Copy Tags
+ new_history.copy_tags_from(target_user=target_user, source_history=self)
+
# Copy HDAs.
if activatable:
hdas = self.activatable_datasets
@@ -494,6 +497,12 @@
if nice_size:
rval = galaxy.datatypes.data.nice_size( rval )
return rval
+
+ def copy_tags_from(self,target_user,source_history):
+ for src_shta in source_history.tags:
+ new_shta = src_shta.copy()
+ new_shta.user = target_user
+ self.tags.append(new_shta)
class HistoryUserShareAssociation( object ):
def __init__( self ):
@@ -1717,6 +1726,12 @@
self.published = False
self.latest_workflow_id = None
self.workflows = []
+
+ def copy_tags_from(self,target_user,source_workflow):
+ for src_swta in source_workflow.owner_tags:
+ new_swta = src_swta.copy()
+ new_swta.user = target_user
+ self.tags.append(new_swta)
class Workflow( object ):
def __init__( self ):
@@ -2543,7 +2558,15 @@
self.user_tname = user_tname
self.value = None
self.user_value = None
-
+
+ def copy(self):
+ new_ta = type(self)()
+ new_ta.tag_id = self.tag_id
+ new_ta.user_tname = self.user_tname
+ new_ta.value = self.value
+ new_ta.user_value = self.user_value
+ return new_ta
+
class HistoryTagAssociation ( ItemTagAssociation ):
pass
diff -r 4c22a2fbf25a8ee05d870dcebe9f7b729fa22f07 -r c1a640fa6b3d4f0486d3a20b7f0eebb605c923db lib/galaxy/web/controllers/workflow.py
--- a/lib/galaxy/web/controllers/workflow.py
+++ b/lib/galaxy/web/controllers/workflow.py
@@ -508,7 +508,6 @@
trans.response.set_content_type("image/svg+xml")
return self._workflow_to_svg_canvas( trans, stored ).standalone_xml()
-
@web.expose
@web.require_login( "use Galaxy workflows" )
def clone( self, trans, id ):
@@ -531,15 +530,8 @@
annotation_obj = self.get_item_annotation_obj( trans.sa_session, stored.user, stored )
if annotation_obj:
self.add_item_annotation( trans.sa_session, trans.get_user(), new_stored, annotation_obj.annotation )
- # Clone tags.
- for swta in stored.owner_tags:
- new_swta = model.StoredWorkflowTagAssociation()
- new_swta.tag = swta.tag
- new_swta.user = trans.user
- new_swta.user_tname = swta.user_tname
- new_swta.user_value = swta.user_value
- new_swta.value = swta.value
- new_stored.tags.append( new_swta )
+ new_stored.copy_tags_from(trans.user,stored)
+
if not owner:
new_stored.name += " shared by '%s'" % stored.user.email
new_stored.user = user
@@ -1408,6 +1400,7 @@
mx_ds_name = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get( single_input ).name
nh_name = '%s on %s' % (nh_name, mx_ds_name)
new_history = trans.app.model.History( user=trans.user, name=nh_name )
+ new_history.copy_tags_from(trans.user, trans.get_history())
trans.sa_session.add( new_history )
target_history = new_history
else:
https://bitbucket.org/galaxy/galaxy-central/changeset/c81172ba88a7/
changeset: c81172ba88a7
user: dannon
date: 2012-07-23 00:02:43
summary: Merge pull request #54 from bwlang - parent tag copying for multiple workflow run output histories.
affected #: 2 files
diff -r ed01365064b2644531c783f01a12b28384fd64ad -r c81172ba88a7d2ee619a32f6de8bdfdcfe5614e2 lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -102,7 +102,7 @@
class Job( object ):
"""
- A job represents a request to run a tool given input datasets, tool
+ A job represents a request to run a tool given input datasets, tool
parameters, and output datasets.
"""
states = Bunch( NEW = 'new',
@@ -216,7 +216,7 @@
self.stdout = None
self.stderr = None
self.prepare_input_files_cmd = prepare_files_cmd
-
+
def set_state( self, state ):
self.state = state
@@ -285,7 +285,7 @@
class JobExportHistoryArchive( object ):
def __init__( self, job=None, history=None, dataset=None, compressed=False, \
- history_attrs_filename=None, datasets_attrs_filename=None,
+ history_attrs_filename=None, datasets_attrs_filename=None,
jobs_attrs_filename=None ):
self.job = job
self.history = history
@@ -378,7 +378,7 @@
self.datasets = []
self.galaxy_sessions = []
def _next_hid( self ):
- # TODO: override this with something in the database that ensures
+ # TODO: override this with something in the database that ensures
# better integrity
if len( self.datasets ) == 0:
return 1
@@ -435,6 +435,9 @@
# Copy annotation.
self.copy_item_annotation( db_session, self.user, self, target_user, new_history )
+ #Copy Tags
+ new_history.copy_tags_from(target_user=target_user, source_history=self)
+
# Copy HDAs.
if activatable:
hdas = self.activatable_datasets
@@ -495,6 +498,12 @@
rval = galaxy.datatypes.data.nice_size( rval )
return rval
+ def copy_tags_from(self,target_user,source_history):
+ for src_shta in source_history.tags:
+ new_shta = src_shta.copy()
+ new_shta.user = target_user
+ self.tags.append(new_shta)
+
class HistoryUserShareAssociation( object ):
def __init__( self ):
self.history = None
@@ -514,7 +523,7 @@
api_collection_visible_keys = ( 'id', 'name' )
api_element_visible_keys = ( 'id', 'name', 'description', 'type' )
private_id = None
- types = Bunch(
+ types = Bunch(
PRIVATE = 'private',
SYSTEM = 'system',
USER = 'user',
@@ -657,7 +666,7 @@
self.external_filename = external_filename
self._extra_files_path = extra_files_path
self.file_size = file_size
-
+
def get_file_name( self ):
if not self.external_filename:
assert self.id is not None, "ID must be set before filename used (commit the object)"
@@ -961,17 +970,17 @@
session.flush()
return None
def get_metadata_dataset( self, trans, dataset_ext ):
- """
- Returns an HDA that points to a metadata file which contains a
+ """
+ Returns an HDA that points to a metadata file which contains a
converted data with the requested extension.
"""
for name, value in self.metadata.items():
# HACK: MetadataFile objects do not have a type/ext, so need to use metadata name
# to determine type.
if dataset_ext == 'bai' and name == 'bam_index' and isinstance( value, trans.app.model.MetadataFile ):
- # HACK: MetadataFile objects cannot be used by tools, so return
+ # HACK: MetadataFile objects cannot be used by tools, so return
# a fake HDA that points to metadata file.
- fake_dataset = trans.app.model.Dataset( state=trans.app.model.Dataset.states.OK,
+ fake_dataset = trans.app.model.Dataset( state=trans.app.model.Dataset.states.OK,
external_filename=value.file_name )
fake_hda = trans.app.model.HistoryDatasetAssociation( dataset=fake_dataset )
return fake_hda
@@ -1039,14 +1048,14 @@
return self.datatype.get_display_applications_by_dataset( self, trans )
class HistoryDatasetAssociation( DatasetInstance ):
- def __init__( self,
- hid = None,
- history = None,
- copied_from_history_dataset_association = None,
- copied_from_library_dataset_dataset_association = None,
+ def __init__( self,
+ hid = None,
+ history = None,
+ copied_from_history_dataset_association = None,
+ copied_from_library_dataset_dataset_association = None,
sa_session = None,
**kwd ):
- # FIXME: sa_session is must be passed to DataSetInstance if the create_dataset
+ # FIXME: sa_session is must be passed to DataSetInstance if the create_dataset
# parameter is True so that the new object can be flushed. Is there a better way?
DatasetInstance.__init__( self, sa_session=sa_session, **kwd )
self.hid = hid
@@ -1055,18 +1064,18 @@
self.copied_from_history_dataset_association = copied_from_history_dataset_association
self.copied_from_library_dataset_dataset_association = copied_from_library_dataset_dataset_association
def copy( self, copy_children = False, parent_id = None ):
- hda = HistoryDatasetAssociation( hid=self.hid,
- name=self.name,
- info=self.info,
- blurb=self.blurb,
- peek=self.peek,
- tool_version=self.tool_version,
- extension=self.extension,
- dbkey=self.dbkey,
- dataset = self.dataset,
- visible=self.visible,
- deleted=self.deleted,
- parent_id=parent_id,
+ hda = HistoryDatasetAssociation( hid=self.hid,
+ name=self.name,
+ info=self.info,
+ blurb=self.blurb,
+ peek=self.peek,
+ tool_version=self.tool_version,
+ extension=self.extension,
+ dbkey=self.dbkey,
+ dataset = self.dataset,
+ visible=self.visible,
+ deleted=self.deleted,
+ parent_id=parent_id,
copied_from_history_dataset_association=self )
object_session( self ).add( hda )
object_session( self ).flush()
@@ -1086,7 +1095,7 @@
# The replace_dataset param ( when not None ) refers to a LibraryDataset that is being replaced with a new version.
library_dataset = replace_dataset
else:
- # If replace_dataset is None, the Library level permissions will be taken from the folder and applied to the new
+ # If replace_dataset is None, the Library level permissions will be taken from the folder and applied to the new
# LibraryDataset, and the current user's DefaultUserPermissions will be applied to the associated Dataset.
library_dataset = LibraryDataset( folder=target_folder, name=self.name, info=self.info )
object_session( self ).add( library_dataset )
@@ -1094,17 +1103,17 @@
if not user:
# This should never happen since users must be authenticated to upload to a data library
user = self.history.user
- ldda = LibraryDatasetDatasetAssociation( name=self.name,
+ ldda = LibraryDatasetDatasetAssociation( name=self.name,
info=self.info,
- blurb=self.blurb,
- peek=self.peek,
- tool_version=self.tool_version,
- extension=self.extension,
- dbkey=self.dbkey,
- dataset=self.dataset,
+ blurb=self.blurb,
+ peek=self.peek,
+ tool_version=self.tool_version,
+ extension=self.extension,
+ dbkey=self.dbkey,
+ dataset=self.dataset,
library_dataset=library_dataset,
- visible=self.visible,
- deleted=self.deleted,
+ visible=self.visible,
+ deleted=self.deleted,
parent_id=parent_id,
copied_from_history_dataset_association=self,
user=user )
@@ -1202,7 +1211,7 @@
self.history_dataset_association = hda
self.user = user
self.site = site
-
+
class HistoryDatasetAssociationSubset( object ):
def __init__(self, hda, subset, location):
self.hda = hda
@@ -1252,7 +1261,7 @@
# inherited is not applicable at the library level. The get_contents
# param is passed by callers that are inheriting a template - these
# are usually new library datsets for which we want to include template
- # fields on the upload form, but not necessarily the contents of the
+ # fields on the upload form, but not necessarily the contents of the
# inherited template saved for the parent.
info_association, inherited = self.get_info_association()
if info_association:
@@ -1271,7 +1280,7 @@
roles.append( lp.role )
return roles
def get_display_name( self ):
- # Library name can be either a string or a unicode object. If string,
+ # Library name can be either a string or a unicode object. If string,
# convert to unicode object assuming 'utf-8' format.
name = self.name
if isinstance( name, str ):
@@ -1329,7 +1338,7 @@
# not inherited ( we do not want to display the inherited contents ).
# (gvk: 8/30/10) Based on conversations with Dan, we agreed to ALWAYS inherit
# contents. We'll use this behavior until we hear from the community that
- # contents should not be inherited. If we don't hear anything for a while,
+ # contents should not be inherited. If we don't hear anything for a while,
# eliminate the old commented out behavior.
#if not inherited and get_contents:
if get_contents:
@@ -1344,7 +1353,7 @@
# This needs to be a list
return [ ld for ld in self.datasets if ld.library_dataset_dataset_association and not ld.library_dataset_dataset_association.dataset.deleted ]
def get_display_name( self ):
- # Library folder name can be either a string or a unicode object. If string,
+ # Library folder name can be either a string or a unicode object. If string,
# convert to unicode object assuming 'utf-8' format.
name = self.name
if isinstance( name, str ):
@@ -1451,7 +1460,7 @@
user=None,
sa_session=None,
**kwd ):
- # FIXME: sa_session is must be passed to DataSetInstance if the create_dataset
+ # FIXME: sa_session is must be passed to DataSetInstance if the create_dataset
# parameter in kwd is True so that the new object can be flushed. Is there a better way?
DatasetInstance.__init__( self, sa_session=sa_session, **kwd )
if copied_from_history_dataset_association:
@@ -1461,17 +1470,17 @@
self.library_dataset = library_dataset
self.user = user
def to_history_dataset_association( self, target_history, parent_id = None, add_to_history = False ):
- hda = HistoryDatasetAssociation( name=self.name,
+ hda = HistoryDatasetAssociation( name=self.name,
info=self.info,
- blurb=self.blurb,
- peek=self.peek,
- tool_version=self.tool_version,
- extension=self.extension,
- dbkey=self.dbkey,
- dataset=self.dataset,
- visible=self.visible,
- deleted=self.deleted,
- parent_id=parent_id,
+ blurb=self.blurb,
+ peek=self.peek,
+ tool_version=self.tool_version,
+ extension=self.extension,
+ dbkey=self.dbkey,
+ dataset=self.dataset,
+ visible=self.visible,
+ deleted=self.deleted,
+ parent_id=parent_id,
copied_from_library_dataset_dataset_association=self,
history=target_history )
object_session( self ).add( hda )
@@ -1486,17 +1495,17 @@
object_session( self ).flush()
return hda
def copy( self, copy_children = False, parent_id = None, target_folder = None ):
- ldda = LibraryDatasetDatasetAssociation( name=self.name,
- info=self.info,
- blurb=self.blurb,
- peek=self.peek,
- tool_version=self.tool_version,
- extension=self.extension,
- dbkey=self.dbkey,
- dataset=self.dataset,
- visible=self.visible,
- deleted=self.deleted,
- parent_id=parent_id,
+ ldda = LibraryDatasetDatasetAssociation( name=self.name,
+ info=self.info,
+ blurb=self.blurb,
+ peek=self.peek,
+ tool_version=self.tool_version,
+ extension=self.extension,
+ dbkey=self.dbkey,
+ dataset=self.dataset,
+ visible=self.visible,
+ deleted=self.deleted,
+ parent_id=parent_id,
copied_from_library_dataset_dataset_association=self,
folder=target_folder )
object_session( self ).add( ldda )
@@ -1534,7 +1543,7 @@
# See if we have any associated templatesThe get_contents
# param is passed by callers that are inheriting a template - these
# are usually new library datsets for which we want to include template
- # fields on the upload form, but not necessarily the contents of the
+ # fields on the upload form, but not necessarily the contents of the
# inherited template saved for the parent.
info_association, inherited = self.get_info_association()
if info_association:
@@ -1546,7 +1555,7 @@
# not inherited ( we do not want to display the inherited contents ).
# (gvk: 8/30/10) Based on conversations with Dan, we agreed to ALWAYS inherit
# contents. We'll use this behavior until we hear from the community that
- # contents should not be inherited. If we don't hear anything for a while,
+ # contents should not be inherited. If we don't hear anything for a while,
# eliminate the old commented out behavior.
#if not inherited and get_contents:
if get_contents:
@@ -1663,15 +1672,15 @@
self.message = message
class GalaxySession( object ):
- def __init__( self,
- id=None,
- user=None,
- remote_host=None,
- remote_addr=None,
- referer=None,
- current_history=None,
- session_key=None,
- is_valid=False,
+ def __init__( self,
+ id=None,
+ user=None,
+ remote_host=None,
+ remote_addr=None,
+ referer=None,
+ current_history=None,
+ session_key=None,
+ is_valid=False,
prev_session_id=None ):
self.id = id
self.user = user
@@ -1718,6 +1727,12 @@
self.latest_workflow_id = None
self.workflows = []
+ def copy_tags_from(self,target_user,source_workflow):
+ for src_swta in source_workflow.owner_tags:
+ new_swta = src_swta.copy()
+ new_swta.user = target_user
+ self.tags.append(new_swta)
+
class Workflow( object ):
def __init__( self ):
self.user = None
@@ -1797,7 +1812,7 @@
raise
# Return filename inside hashed directory
return os.path.abspath( os.path.join( path, "metadata_%d.dat" % self.id ) )
-
+
class FormDefinition( object, APIItem ):
# The following form_builder classes are supported by the FormDefinition class.
@@ -1813,7 +1828,7 @@
def __init__( self, name=None, desc=None, fields=[], form_definition_current=None, form_type=None, layout=None ):
self.name = name
self.desc = desc
- self.fields = fields
+ self.fields = fields
self.form_definition_current = form_definition_current
self.type = form_type
self.layout = layout
@@ -2012,7 +2027,7 @@
samples.append( sample )
return samples
def send_email_notification( self, trans, common_state, final_state=False ):
- # Check if an email notification is configured to be sent when the samples
+ # Check if an email notification is configured to be sent when the samples
# are in this state
if self.notification and common_state.id not in self.notification[ 'sample_states' ]:
return
@@ -2036,13 +2051,13 @@
All samples in state: %(sample_state)s
"""
- values = dict( user=self.user.email,
- request_name=self.name,
- request_type=self.type.name,
- request_state=self.state,
- num_samples=str( len( self.samples ) ),
- sample_state=common_state.name,
- create_time=self.create_time,
+ values = dict( user=self.user.email,
+ request_name=self.name,
+ request_type=self.type.name,
+ request_state=self.state,
+ num_samples=str( len( self.samples ) ),
+ sample_state=common_state.name,
+ create_time=self.create_time,
submit_time=self.create_time )
body = body % values
# check if this is the final state of the samples
@@ -2207,7 +2222,7 @@
class Sample( object, APIItem ):
# The following form_builder classes are supported by the Sample class.
supported_field_types = [ CheckboxField, SelectField, TextField, WorkflowField, WorkflowMappingField, HistoryField ]
- bulk_operations = Bunch( CHANGE_STATE = 'Change state',
+ bulk_operations = Bunch( CHANGE_STATE = 'Change state',
SELECT_LIBRARY = 'Select data library and folder' )
api_collection_visible_keys = ( 'id', 'name' )
def __init__(self, name=None, desc=None, request=None, form_values=None, bar_code=None, library=None, folder=None, workflow=None, history=None):
@@ -2290,8 +2305,8 @@
cmd = 'ssh %s "du -sh \'%s\'"' % ( login_str, filepath )
try:
output = pexpect.run( cmd,
- events={ '.ssword:*': scp_configs['password']+'\r\n',
- pexpect.TIMEOUT:print_ticks},
+ events={ '.ssword:*': scp_configs['password']+'\r\n',
+ pexpect.TIMEOUT:print_ticks},
timeout=10 )
except Exception, e:
return error_msg
@@ -2383,8 +2398,8 @@
self.run = run
class UserAddress( object ):
- def __init__( self, user=None, desc=None, name=None, institution=None,
- address=None, city=None, state=None, postal_code=None,
+ def __init__( self, user=None, desc=None, name=None, institution=None,
+ address=None, city=None, state=None, postal_code=None,
country=None, phone=None ):
self.user = user
self.desc = desc
@@ -2493,8 +2508,8 @@
if not visualization:
visualization = self.visualization
- return VisualizationRevision(
- visualization=visualization,
+ return VisualizationRevision(
+ visualization=visualization,
title=self.title,
dbkey=self.dbkey,
config=self.config
@@ -2544,6 +2559,14 @@
self.value = None
self.user_value = None
+ def copy(self):
+ new_ta = type(self)()
+ new_ta.tag_id = self.tag_id
+ new_ta.user_tname = self.user_tname
+ new_ta.value = self.value
+ new_ta.user_value = self.user_value
+ return new_ta
+
class HistoryTagAssociation ( ItemTagAssociation ):
pass
@@ -2806,7 +2829,7 @@
if tva:
return sa_session.query( app.model.ToolVersion ) \
.filter( app.model.ToolVersion.table.c.id == tva.tool_id ) \
- .first()
+ .first()
return None
def get_versions( self, app ):
sa_session = app.model.context.current
diff -r ed01365064b2644531c783f01a12b28384fd64ad -r c81172ba88a7d2ee619a32f6de8bdfdcfe5614e2 lib/galaxy/web/controllers/workflow.py
--- a/lib/galaxy/web/controllers/workflow.py
+++ b/lib/galaxy/web/controllers/workflow.py
@@ -508,7 +508,6 @@
trans.response.set_content_type("image/svg+xml")
return self._workflow_to_svg_canvas( trans, stored ).standalone_xml()
-
@web.expose
@web.require_login( "use Galaxy workflows" )
def clone( self, trans, id ):
@@ -531,15 +530,7 @@
annotation_obj = self.get_item_annotation_obj( trans.sa_session, stored.user, stored )
if annotation_obj:
self.add_item_annotation( trans.sa_session, trans.get_user(), new_stored, annotation_obj.annotation )
- # Clone tags.
- for swta in stored.owner_tags:
- new_swta = model.StoredWorkflowTagAssociation()
- new_swta.tag = swta.tag
- new_swta.user = trans.user
- new_swta.user_tname = swta.user_tname
- new_swta.user_value = swta.user_value
- new_swta.value = swta.value
- new_stored.tags.append( new_swta )
+ new_stored.copy_tags_from(trans.user,stored)
if not owner:
new_stored.name += " shared by '%s'" % stored.user.email
new_stored.user = user
@@ -918,11 +909,11 @@
# TODO: handle the case where the imported workflow requires tools that are not available in
# the local Galaxy instance.
pass
-
+
# Provide user feedback.
workflow_list_str = " <br>Return to <a href='%s'>workflow list." % url_for( action='list' )
if response.status != 200:
- return trans.show_error_message( "There was a problem importing the workflow. Error: %s %s" % (response_data, workflow_list_str) )
+ return trans.show_error_message( "There was a problem importing the workflow. Error: %s %s" % (response_data, workflow_list_str) )
if workflow.has_errors:
return trans.show_warn_message( "Imported, but some steps in this workflow have validation errors. %s" % workflow_list_str )
if workflow.has_cycles:
@@ -1408,6 +1399,7 @@
mx_ds_name = trans.sa_session.query(trans.app.model.HistoryDatasetAssociation).get( single_input ).name
nh_name = '%s on %s' % (nh_name, mx_ds_name)
new_history = trans.app.model.History( user=trans.user, name=nh_name )
+ new_history.copy_tags_from(trans.user, trans.get_history())
trans.sa_session.add( new_history )
target_history = new_history
else:
@@ -1834,9 +1826,9 @@
trans.sa_session.flush()
return stored, missing_tool_tups
-
- def _workflow_to_svg_canvas( self, trans, stored ):
-
+
+ def _workflow_to_svg_canvas( self, trans, stored ):
+
workflow = stored.latest_workflow
data = []
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/ed01365064b2/
changeset: ed01365064b2
user: dannon
date: 2012-07-21 20:46:27
summary: Task runner cleanup.
affected #: 1 file
diff -r 4c22a2fbf25a8ee05d870dcebe9f7b729fa22f07 -r ed01365064b2644531c783f01a12b28384fd64ad lib/galaxy/jobs/runners/tasks.py
--- a/lib/galaxy/jobs/runners/tasks.py
+++ b/lib/galaxy/jobs/runners/tasks.py
@@ -59,26 +59,22 @@
# If we were able to get a command line, run the job. ( must be passed to tasks )
if command_line:
try:
- # DBTODO read tool info and use the right kind of parallelism.
- # For now, the only splitter is the 'basic' one
job_wrapper.change_state( model.Job.states.RUNNING )
self.sa_session.flush()
# Split with the tool-defined method.
try:
splitter = getattr(__import__('galaxy.jobs.splitters', globals(), locals(), [job_wrapper.tool.parallelism.method]), job_wrapper.tool.parallelism.method)
- except:
+ except:
job_wrapper.change_state( model.Job.states.ERROR )
job_wrapper.fail("Job Splitting Failed, no match for '%s'" % job_wrapper.tool.parallelism)
return
tasks = splitter.do_split(job_wrapper)
-
# Not an option for now. Task objects don't *do* anything useful yet, but we'll want them tracked outside this thread to do anything.
# if track_tasks_in_database:
task_wrappers = []
for task in tasks:
self.sa_session.add(task)
self.sa_session.flush()
-
# Must flush prior to the creation and queueing of task wrappers.
for task in tasks:
tw = TaskWrapper(task, job_wrapper.queue)
@@ -108,14 +104,9 @@
sleep( sleep_time )
if sleep_time < 8:
sleep_time *= 2
-
- import time
-
job_wrapper.reclaim_ownership() # if running as the actual user, change ownership before merging.
-
log.debug('execution finished - beginning merge: %s' % command_line)
stdout, stderr = splitter.do_merge(job_wrapper, task_wrappers)
-
except Exception:
job_wrapper.fail( "failure running job", exception=True )
log.exception("failure running job %d" % job_wrapper.job_id)
@@ -129,15 +120,15 @@
set_extension = True,
kwds = { 'overwrite' : False } ) #we don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
log.debug( 'executing external set_meta script for job %d: %s' % ( job_wrapper.job_id, external_metadata_script ) )
- external_metadata_proc = subprocess.Popen( args = external_metadata_script,
- shell = True,
+ external_metadata_proc = subprocess.Popen( args = external_metadata_script,
+ shell = True,
env = os.environ,
preexec_fn = os.setpgrp )
job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session )
external_metadata_proc.wait()
log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id )
-
- # Finish the job
+
+ # Finish the job
try:
job_wrapper.finish( stdout, stderr )
except:
@@ -149,7 +140,7 @@
# Change to queued state before handing to worker thread so the runner won't pick it up again
job_wrapper.change_state( model.Job.states.QUEUED )
self.queue.put( job_wrapper )
-
+
def shutdown( self ):
"""Attempts to gracefully shut down the worker threads"""
log.info( "sending stop signal to worker threads" )
@@ -175,14 +166,13 @@
# runner because the task runner also starts all the tasks.
# First, get the list of tasks from job.tasks, which uses SQL
# alchemy to retrieve a job's list of tasks.
- tasks = job.tasks
if ( len( job.tasks ) > 0 ):
for task in job.tasks:
self.stop_pid( task.task_runner_external_id, job.id )
# There were no subtasks, so just kill the job. We'll touch
# this if the tasks runner is used but the tool does not use
- # parallelism.
+ # parallelism.
else:
#if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
if job.external_output_metadata:
@@ -197,7 +187,7 @@
def stop_pid( self, pid, job_id ):
"""
This method stops the given process id whether it's a task or job.
- It is meant to be a private helper method, but it is mostly reusable.
+ It is meant to be a private helper method, but it is mostly reusable.
The first argument is the process id to stop, and the second id is the
job's id (which is used for logging messages only right now).
"""
@@ -210,12 +200,12 @@
try:
os.killpg( pid, sig )
except OSError, e:
- # This warning could be bogus; many tasks are stopped with
- # SIGTERM (signal 15), but ymmv depending on the platform.
+ # This warning could be bogus; many tasks are stopped with
+ # SIGTERM (signal 15), but ymmv depending on the platform.
log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job_id, errno.errorcode[e.errno], sig, pid, e.strerror ) )
return
- # TODO: If we're stopping lots of tasks, then we will want to put this
- # avoid a two-second overhead using some other asynchronous method.
+ # TODO: If we're stopping lots of tasks, then we will want to put this
+ # avoid a two-second overhead using some other asynchronous method.
sleep( 2 )
if not self.check_pid( pid ):
log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" %( job_id, pid, sig ) )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/524cfb8ca67d/
changeset: 524cfb8ca67d
user: jgoecks
date: 2012-07-20 19:18:54
summary: Move implementation of from_work_dir attribute from job finish to job command line so that outputs are available earlier. This provides compatibility of attribute when setting metadata externally. Update Tophat wrapper to act as exemplar for attribute use.
affected #: 4 files
diff -r f4e720d9f182cb8216fdd2174fe3ab83d0350a24 -r 524cfb8ca67d491bcba54cda5a037a0c89e9a7b4 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -336,38 +336,11 @@
job_context = ExpressionContext( dict( stdout = stdout, stderr = stderr ) )
job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None )
- def in_directory( file, directory ):
- # Make both absolute.
- directory = os.path.abspath( directory )
- file = os.path.abspath( file )
-
- #Return true, if the common prefix of both is equal to directory
- #e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
- return os.path.commonprefix( [ file, directory ] ) == directory
+
for dataset_assoc in job.output_datasets + job.output_library_datasets:
context = self.get_dataset_finish_context( job_context, dataset_assoc.dataset.dataset )
#should this also be checking library associations? - can a library item be added from a history before the job has ended? - lets not allow this to occur
for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations: #need to update all associated output hdas, i.e. history was shared with job running
- #
- # If HDA is to be copied from the working directory, do it now so that other attributes are correctly set.
- #
- if isinstance( dataset, model.HistoryDatasetAssociation ):
- joda = self.sa_session.query( model.JobToOutputDatasetAssociation ).filter_by( job=job, dataset=dataset ).first()
- if joda and job_tool:
- hda_tool_output = job_tool.outputs.get( joda.name, None )
- if hda_tool_output and hda_tool_output.from_work_dir:
- # Copy from working dir to HDA.
- source_file = os.path.join( os.path.abspath( self.working_directory ), hda_tool_output.from_work_dir )
- if in_directory( source_file, self.working_directory ):
- try:
- shutil.move( source_file, dataset.file_name )
- log.debug( "finish(): Moved %s to %s as directed by from_work_dir" % ( source_file, dataset.file_name ) )
- except ( IOError, OSError ):
- log.debug( "finish(): Could not move %s to %s as directed by from_work_dir" % ( source_file, dataset.file_name ) )
- else:
- # Security violation.
- log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, self.working_directory ) )
-
dataset.blurb = 'done'
dataset.peek = 'no peek'
dataset.info = ( dataset.info or '' ) + context['stdout'] + context['stderr']
diff -r f4e720d9f182cb8216fdd2174fe3ab83d0350a24 -r 524cfb8ca67d491bcba54cda5a037a0c89e9a7b4 lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -1,4 +1,6 @@
-import os, os.path
+import os, logging, os.path
+
+log = logging.getLogger( __name__ )
class BaseJobRunner( object ):
def build_command_line( self, job_wrapper, include_metadata=False ):
@@ -10,6 +12,19 @@
- command line taken from job wrapper
- commands to set metadata (if include_metadata is True)
"""
+
+ def in_directory( file, directory ):
+ """
+ Return true, if the common prefix of both is equal to directory
+ e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
+ """
+
+ # Make both absolute.
+ directory = os.path.abspath( directory )
+ file = os.path.abspath( file )
+
+ return os.path.commonprefix( [ file, directory ] ) == directory
+
commands = job_wrapper.get_command_line()
# All job runners currently handle this case which should never
# occur
@@ -25,6 +40,31 @@
if job_wrapper.dependency_shell_commands:
commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] )
+ # Append commands to copy job outputs based on from_work_dir attribute.
+ job = job_wrapper.get_job()
+ job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None )
+ for dataset_assoc in job.output_datasets + job.output_library_datasets:
+ for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations:
+ if isinstance( dataset, self.app.model.HistoryDatasetAssociation ):
+ joda = self.sa_session.query( self.app.model.JobToOutputDatasetAssociation ).filter_by( job=job, dataset=dataset ).first()
+ if joda and job_tool:
+ hda_tool_output = job_tool.outputs.get( joda.name, None )
+ if hda_tool_output and hda_tool_output.from_work_dir:
+ # Copy from working dir to HDA.
+ # TODO: move instead of copy to save time?
+ source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir )
+ if in_directory( source_file, job_wrapper.working_directory ):
+ try:
+ commands += "; cp %s %s" % ( source_file, dataset.file_name )
+ log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, dataset.file_name ) )
+ except ( IOError, OSError ):
+ log.debug( "Could not copy %s to %s as directed by from_work_dir" % ( source_file, dataset.file_name ) )
+ else:
+ # Security violation.
+ log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) )
+
+
+
# Append metadata setting commands, we don't want to overwrite metadata
# that was copied over in init_meta(), as per established behavior
if include_metadata and self.app.config.set_metadata_externally:
diff -r f4e720d9f182cb8216fdd2174fe3ab83d0350a24 -r 524cfb8ca67d491bcba54cda5a037a0c89e9a7b4 tools/ngs_rna/tophat_wrapper.py
--- a/tools/ngs_rna/tophat_wrapper.py
+++ b/tools/ngs_rna/tophat_wrapper.py
@@ -227,10 +227,6 @@
if returncode != 0:
raise Exception, stderr
- # Copy output files from tmp directory to specified files.
- shutil.copyfile( os.path.join( "tophat_out", "junctions.bed" ), options.junctions_output_file )
- shutil.copyfile( os.path.join( "tophat_out", "accepted_hits.bam" ), options.accepted_hits_output_file )
-
# TODO: look for errors in program output.
except Exception, e:
stop_err( 'Error in tophat:\n' + str( e ) )
diff -r f4e720d9f182cb8216fdd2174fe3ab83d0350a24 -r 524cfb8ca67d491bcba54cda5a037a0c89e9a7b4 tools/ngs_rna/tophat_wrapper.xml
--- a/tools/ngs_rna/tophat_wrapper.xml
+++ b/tools/ngs_rna/tophat_wrapper.xml
@@ -424,7 +424,7 @@
</conditional></actions></data>
- <data format="bed" name="junctions" label="${tool.name} on ${on_string}: splice junctions">
+ <data format="bed" name="junctions" label="${tool.name} on ${on_string}: splice junctions" from_work_dir="tophat_out/junctions.bed"><actions><conditional name="refGenomeSource.genomeSource"><when value="indexed">
@@ -443,7 +443,7 @@
</conditional></actions></data>
- <data format="bam" name="accepted_hits" label="${tool.name} on ${on_string}: accepted_hits">
+ <data format="bam" name="accepted_hits" label="${tool.name} on ${on_string}: accepted_hits" from_work_dir="tophat_out/accepted_hits.bam"><actions><conditional name="refGenomeSource.genomeSource"><when value="indexed">
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/changeset/f4e720d9f182/
changeset: f4e720d9f182
user: james_taylor
date: 2012-07-20 17:26:17
summary: remove reference to modernizr in base_panels, not yet used
affected #: 1 file
diff -r 724c07bf72efdb66070b7ca396800b2cfb1b2012 -r f4e720d9f182cb8216fdd2174fe3ab83d0350a24 templates/base_panels.mako
--- a/templates/base_panels.mako
+++ b/templates/base_panels.mako
@@ -48,7 +48,7 @@
<!--[if lt IE 7]>
${h.js( 'IE7', 'ie7-recalc' )}
<![endif]-->
- ${h.js( 'modernizr', 'jquery', 'libs/underscore', 'libs/backbone', 'libs/backbone-relational', 'libs/handlebars.runtime', 'mvc/ui' )}
+ ${h.js( 'jquery', 'libs/underscore', 'libs/backbone', 'libs/backbone-relational', 'libs/handlebars.runtime', 'mvc/ui' )}
<script type="text/javascript">
// Set up needed paths.
var galaxy_paths = new GalaxyPaths({
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.