details:
http://www.bx.psu.edu/hg/galaxy/rev/319e2ba6a7be
changeset: 2467:319e2ba6a7be
user: Greg Von Kuster <greg(a)bx.psu.edu>
date: Wed Jul 08 14:10:45 2009 -0400
description:
Enhance history cloning to allow for cloning active or activatable datasets from the
original history - resolves ticket # 29.
6 file(s) affected in this change:
lib/galaxy/model/__init__.py
lib/galaxy/web/controllers/history.py
templates/history/clone.mako
templates/history/list_shared.mako
test/base/twilltestcase.py
test/functional/test_history_functions.py
diffs (395 lines):
diff -r 539d481c8f59 -r 319e2ba6a7be lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py Tue Jul 07 20:32:00 2009 -0400
+++ b/lib/galaxy/model/__init__.py Wed Jul 08 14:10:45 2009 -0400
@@ -214,17 +214,21 @@
if genome_build not in [None, '?']:
self.genome_build = genome_build
self.datasets.append( dataset )
- def copy( self, name=None, target_user=None ):
+ def copy( self, name=None, target_user=None, activatable=False ):
if not name:
name = self.name
if not target_user:
target_user = self.user
new_history = History( name=name, user=target_user )
new_history.flush()
- for data in self.datasets:
- new_data = data.copy( copy_children=True, target_history=new_history )
- new_history.add_dataset( new_data, set_hid = False )
- new_data.flush()
+ if activatable:
+ hdas = self.activatable_datasets
+ else:
+ hdas = self.active_datasets
+ for hda in hdas:
+ new_hda = hda.copy( copy_children=True, target_history=new_history )
+ new_history.add_dataset( new_hda, set_hid = False )
+ new_hda.flush()
new_history.hid_counter = self.hid_counter
new_history.flush()
return new_history
diff -r 539d481c8f59 -r 319e2ba6a7be lib/galaxy/web/controllers/history.py
--- a/lib/galaxy/web/controllers/history.py Tue Jul 07 20:32:00 2009 -0400
+++ b/lib/galaxy/web/controllers/history.py Wed Jul 08 14:10:45 2009 -0400
@@ -310,8 +310,8 @@
email=email,
send_to_err=send_to_err )
if params.get( 'share_button', False ):
- can_change, cannot_change, no_change_needed = \
- self._populate_restricted( trans, user, histories, send_to_users, None )
+ can_change, cannot_change, no_change_needed, send_to_err = \
+ self._populate_restricted( trans, user, histories, send_to_users, None,
send_to_err )
if can_change or cannot_change:
return trans.fill_template( "/history/share.mako",
histories=histories,
@@ -335,8 +335,8 @@
user = trans.get_user()
histories, send_to_users, send_to_err = self._get_histories_and_users( trans,
user, id, email )
send_to_err = ''
- can_change, cannot_change, no_change_needed = \
- self._populate_restricted( trans, user, histories, send_to_users, action )
+ can_change, cannot_change, no_change_needed, send_to_err = \
+ self._populate_restricted( trans, user, histories, send_to_users, action,
send_to_err )
# Now that we've populated the can_change, cannot_change, and
no_change_needed dictionaries,
# we'll populate the histories_for_sharing dictionary from each of them.
histories_for_sharing = {}
@@ -430,7 +430,7 @@
elif history not in histories_for_sharing[ send_to_user ]:
histories_for_sharing[ send_to_user ].append( history )
return histories_for_sharing, send_to_err
- def _populate_restricted( self, trans, user, histories, send_to_users, action ):
+ def _populate_restricted( self, trans, user, histories, send_to_users, action,
send_to_err ):
# The user may be attempting to share histories whose datasets cannot all be
accessed by other users.
# If this is the case, the user sharing the histories can:
# 1) action=='public': choose to make the datasets public if he is
permitted to do so
@@ -494,7 +494,7 @@
cannot_change[ send_to_user ][ history ] = [ hda ]
else:
cannot_change[ send_to_user ][ history ].append( hda
)
- return can_change, cannot_change, no_change_needed
+ return can_change, cannot_change, no_change_needed, send_to_err
def _share_histories( self, trans, user, send_to_err, histories={} ):
# histories looks like: { userA: [ historyX, historyY ], userB: [ historyY ] }
msg = ""
@@ -585,8 +585,12 @@
return trans.show_message( "<p>%s" % change_msg,
refresh_frames=['history'] )
@web.expose
@web.require_login( "clone shared Galaxy history" )
- def clone( self, trans, id ):
+ def clone( self, trans, id, **kwd ):
history = get_history( trans, id, check_ownership=False )
+ params = util.Params( kwd )
+ clone_choice = params.get( 'clone_choice', None )
+ if not clone_choice:
+ return trans.fill_template( "/history/clone.mako", history=history
)
user = trans.get_user()
if history.user == user:
owner = True
@@ -598,7 +602,10 @@
name = "Clone of '%s'" % history.name
if not owner:
name += " shared by '%s'" % history.user.email
- new_history = history.copy( name=name, target_user=user )
+ if clone_choice == 'activatable':
+ new_history = history.copy( name=name, target_user=user, activatable=True )
+ elif clone_choice == 'active':
+ new_history = history.copy( name=name, target_user=user )
# Render the list view
return trans.show_ok_message( 'Clone with name "%s" is now included
in your list of stored histories.' % new_history.name )
diff -r 539d481c8f59 -r 319e2ba6a7be templates/history/clone.mako
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/templates/history/clone.mako Wed Jul 08 14:10:45 2009 -0400
@@ -0,0 +1,25 @@
+<% _=n_ %>
+<%inherit file="/base.mako"/>
+<%def name="title()">Clone History</%def>
+
+<div class="toolForm">
+ <div class="toolFormTitle">Clone History</div>
+ <div class="toolFormBody">
+ <form action="${h.url_for( controller='history',
action='clone' )}" method="post" >
+ <div class="form-row">
+ <input type="hidden" name="id"
value="${trans.security.encode_id( history.id )}">
+ You can clone the history such that the clone will include all items in
the original
+ history, or you can eliminate the original history's deleted items
from the clone.
+ </div>
+ <div class="form-row">
+ <input type="radio" name="clone_choice"
value="activatable"> Clone all history items, including deleted items
+ </div>
+ <div class="form-row">
+ <input type="radio" name="clone_choice"
value="active"> Clone only items that are not deleted
+ </div>
+ <div class="form-row">
+ <input type="submit" name="clone_choice_button"
value="Clone">
+ </div>
+ </form>
+ </div>
+</div>
diff -r 539d481c8f59 -r 319e2ba6a7be templates/history/list_shared.mako
--- a/templates/history/list_shared.mako Tue Jul 07 20:32:00 2009 -0400
+++ b/templates/history/list_shared.mako Wed Jul 08 14:10:45 2009 -0400
@@ -18,7 +18,7 @@
${history.name}
<a id="shared-${i}-popup" class="popup-arrow"
style="display: none;">▼</a>
<div popupmenu="shared-${i}-popup">
- <a class="action-button" href="${h.url_for(
controller='history', action='clone', id=trans.security.encode_id(
history.id ) )}">Clone</a>
+ <a class="action-button" href="${h.url_for(
controller='history', action='clone', id=trans.security.encode_id(
history.id ), clone_choice='activatable' )}">Clone</a>
</div>
</td>
<td>${history.user.email}</td>
diff -r 539d481c8f59 -r 319e2ba6a7be test/base/twilltestcase.py
--- a/test/base/twilltestcase.py Tue Jul 07 20:32:00 2009 -0400
+++ b/test/base/twilltestcase.py Wed Jul 08 14:10:45 2009 -0400
@@ -121,10 +121,13 @@
page = self.last_page()
if page.find( 'error' ) > -1:
raise AssertionError('Errors in the history for user %s' % self.user
)
- def check_history_for_string( self, patt ):
+ def check_history_for_string( self, patt, show_deleted=False ):
"""Looks for 'string' in history page"""
self.home()
- self.visit_page( "history" )
+ if show_deleted:
+ self.visit_page( "history?show_deleted=True" )
+ else:
+ self.visit_page( "history" )
for subpatt in patt.split():
tc.find(subpatt)
self.home()
@@ -268,11 +271,6 @@
self.home()
def switch_history( self, id='', name='' ):
"""Switches to a history in the current list of
histories"""
- data_list = self.get_histories_as_data_list()
- self.assertTrue( data_list )
- if not id:
- history = history_list[0]
- id = history.get( 'id' )
self.visit_url( "%s/history/list?operation=switch&id=%s" % (
self.url, id ) )
if name:
self.check_history_for_string( name )
@@ -305,11 +303,15 @@
if check_str2:
self.check_page_for_string( check_str2 )
self.home()
- def clone_history( self, history_id, check_str1='' ):
+ def clone_history( self, history_id, clone_choice, check_str1='',
check_str_after_submit='' ):
self.home()
self.visit_page( "history/clone?id=%s" % history_id )
if check_str1:
self.check_page_for_string( check_str1 )
+ tc.fv( '1', 'clone_choice', clone_choice )
+ tc.submit( 'clone_choice_button' )
+ if check_str_after_submit:
+ self.check_page_for_string( check_str_after_submit )
self.home()
def enable_import_via_link( self, history_id, check_str='',
check_str_after_submit='' ):
self.home()
@@ -366,34 +368,22 @@
self.visit_page( "edit?hid=%d" % hid )
for subpatt in patt.split():
tc.find(subpatt)
- def delete_history_item( self, hid, check_str='' ):
+ def delete_history_item( self, hda_id, check_str='' ):
"""Deletes an item from a history"""
try:
- hid = int( hid )
+ hda_id = int( hda_id )
except:
- raise AssertionError, "Invalid hid '%s' - must be int" %
hid
- hid = str(hid)
- data_list = self.get_history_as_data_list()
- self.assertTrue( data_list )
- elems = [ elem for elem in data_list if elem.get( 'hid' ) == hid ]
- self.assertEqual( len( elems ), 1 )
- self.home()
- self.visit_page( "delete?id=%s" % elems[0].get( 'id' ) )
+ raise AssertionError, "Invalid hda_id '%s' - must be int" %
hda_id
+ self.visit_url(
"%s/root/delete?show_deleted_on_refresh=False&id=%s" % ( self.url, hda_id )
)
if check_str:
self.check_page_for_string( check_str )
- def undelete_history_item( self, hid, show_deleted=False, check_str='' ):
+ def undelete_history_item( self, hda_id, check_str='' ):
"""Un-deletes a deleted item in a history"""
try:
- hid = int( hid )
+ hda_id = int( hda_id )
except:
- raise AssertionError, "Invalid hid '%s' - must be int" %
hid
- hid = str( hid )
- data_list = self.get_history_as_data_list( show_deleted=show_deleted )
- self.assertTrue( data_list )
- elems = [ elem for elem in data_list if elem.get( 'hid' ) == hid ]
- self.assertEqual( len( elems ), 1 )
- self.home()
- self.visit_url( "%s/dataset/undelete?id=%s" % ( self.url, elems[0].get(
'id' ) ) )
+ raise AssertionError, "Invalid hda_id '%s' - must be int" %
hda_id
+ self.visit_url( "%s/dataset/undelete?id=%s" % ( self.url, hda_id ) )
if check_str:
self.check_page_for_string( check_str )
def display_history_item( self, id, check_str='' ):
diff -r 539d481c8f59 -r 319e2ba6a7be test/functional/test_history_functions.py
--- a/test/functional/test_history_functions.py Tue Jul 07 20:32:00 2009 -0400
+++ b/test/functional/test_history_functions.py Wed Jul 08 14:10:45 2009 -0400
@@ -168,11 +168,6 @@
# Test sharing history3 with an invalid user
self.share_current_history( 'jack(a)jill.com',
check_str_after_submit='jack(a)jill.com is not a
valid Galaxy user.' )
-
-
-
-
-
def test_025_delete_shared_current_history( self ):
"""Testing deleting the current history after it was
shared"""
# Logged in as admin_user
@@ -187,7 +182,8 @@
# Shared history3 should be in regular_user1's list of shared histories
self.view_shared_histories( check_str=history3.name, check_str2=admin_user.email
)
self.clone_history( self.security.encode_id( history3.id ),
- check_str1='is now included in your list of stored
histories.' )
+ 'activatable',
+ check_str_after_submit='is now included in your list of
stored histories.' )
global history3_clone1
history3_clone1 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
galaxy.model.History.table.c.user_id==regular_user1.id ) ) \
@@ -201,8 +197,26 @@
# logged in as regular_user1
self.logout()
self.login( email=admin_user.email )
+ # Current history should be history3, add more datasets to history3, then delete
them so we can
+ # test cloning activatable datasets as well as only the active datasets
+ self.upload_file( '2.bed', dbkey='hg18' )
+ hda_2_bed = galaxy.model.HistoryDatasetAssociation \
+ .filter( and_(
galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3.id,
+
galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \
+ .first()
+ assert hda_2_bed is not None, "Problem retrieving hda_2_bed from
database"
+ self.delete_history_item( str( hda_2_bed.id ) )
+ self.upload_file( '3.bed', dbkey='hg18' )
+ hda_3_bed = galaxy.model.HistoryDatasetAssociation \
+ .filter( and_(
galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3.id,
+
galaxy.model.HistoryDatasetAssociation.table.c.name=='3.bed' ) ) \
+ .first()
+ assert hda_3_bed is not None, "Problem retrieving hda_3_bed from
database"
+ self.delete_history_item( str( hda_3_bed.id ) )
+ # Test cloning activatable datasets
self.clone_history( self.security.encode_id( history3.id ),
- check_str1='is now included in your list of stored
histories.' )
+ 'activatable',
+ check_str_after_submit='is now included in your list of
stored histories.' )
global history3_clone2
history3_clone2 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
galaxy.model.History.table.c.user_id==admin_user.id ) ) \
@@ -210,6 +224,42 @@
assert history3_clone2 is not None, "Problem retrieving history3_clone2 from
database"
# Check list of histories to make sure shared history3 was cloned
self.view_stored_active_histories( check_str="Clone of '%s'" %
history3.name )
+ # Switch to the cloned history to make sure activatable datasets were cloned
+ self.switch_history( id=self.security.encode_id( history3_clone2.id ),
name=history3_clone2.name )
+ hda_2_bed = galaxy.model.HistoryDatasetAssociation \
+ .filter( and_(
galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3_clone2.id,
+
galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \
+ .first()
+ assert hda_2_bed is not None, "Problem retrieving hda_2_bed from
database"
+ hda_3_bed = galaxy.model.HistoryDatasetAssociation \
+ .filter( and_(
galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3_clone2.id,
+
galaxy.model.HistoryDatasetAssociation.table.c.name=='3.bed' ) ) \
+ .first()
+ assert hda_3_bed is not None, "Problem retrieving hda_3_bed from
database"
+ # Make sure the deleted datasets are included in the cloned history
+ check_str = 'This dataset has been deleted. Click undelete id=%d"' %
hda_2_bed.id
+ self.check_history_for_string( check_str, show_deleted=True )
+ check_str = 'This dataset has been deleted. Click undelete id=%d"' %
hda_3_bed.id
+ self.check_history_for_string( check_str, show_deleted=True )
+ # Test cloning only active datasets
+ self.clone_history( self.security.encode_id( history3.id ),
+ 'active',
+ check_str_after_submit='is now included in your list of
stored histories.' )
+ global history3_clone3
+ history3_clone3 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
+
galaxy.model.History.table.c.user_id==admin_user.id ) ) \
+ .order_by( desc( galaxy.model.History.table.c.create_time ) ).first()
+ assert history3_clone3 is not None, "Problem retrieving history3_clone3 from
database"
+ # Check list of histories to make sure shared history3 was cloned
+ self.view_stored_active_histories( check_str="Clone of '%s'" %
history3.name )
+ # Switch to the cloned history to make sure activatable datasets were cloned
+ self.switch_history( id=self.security.encode_id( history3_clone3.id ),
name=history3_clone3.name )
+ # Make sure the deleted datasets are NOT included in the cloned history
+ try:
+ self.check_history_for_string( 'This dataset has been deleted.',
show_deleted=True )
+ raise AssertionError, "Deleted datasets incorrectly included in cloned
history history3_clone3"
+ except:
+ pass
def test_040_sharing_mulitple_histories_with_multiple_users( self ):
"""Testing sharing multiple histories containing only public
datasets with multiple users"""
# Logged in as admin_user
@@ -304,7 +354,8 @@
self.view_shared_histories( check_str=history5.name, check_str2=admin_user.email
)
# Clone restricted history5
self.clone_history( self.security.encode_id( history5.id ),
- check_str1='is now included in your list of stored
histories.' )
+ 'activatable',
+ check_str_after_submit='is now included in your list of
stored histories.' )
global history5_clone1
history5_clone1 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
galaxy.model.History.table.c.user_id==regular_user1.id ) ) \
@@ -359,7 +410,8 @@
self.view_shared_histories( check_str=history5.name, check_str2=admin_user.email
)
# Clone restricted history5
self.clone_history( self.security.encode_id( history5.id ),
- check_str1='is now included in your list of stored
histories.' )
+ 'activatable',
+ check_str_after_submit='is now included in your list of
stored histories.' )
global history5_clone2
history5_clone2 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
galaxy.model.History.table.c.user_id==regular_user2.id ) ) \
@@ -431,7 +483,8 @@
self.view_shared_histories( check_str=history5.name, check_str2=admin_user.email
)
# Clone restricted history5
self.clone_history( self.security.encode_id( history5.id ),
- check_str1='is now included in your list of stored
histories.' )
+ 'activatable',
+ check_str_after_submit='is now included in your list of
stored histories.' )
global history5_clone3
history5_clone3 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
galaxy.model.History.table.c.user_id==regular_user2.id ) ) \
@@ -468,7 +521,8 @@
self.view_shared_histories( check_str=history5.name, check_str2=admin_user.email
)
# Clone restricted history5
self.clone_history( self.security.encode_id( history5.id ),
- check_str1='is now included in your list of stored
histories.' )
+ 'activatable',
+ check_str_after_submit='is now included in your list of
stored histories.' )
global history5_clone4
history5_clone4 = galaxy.model.History.filter( and_(
galaxy.model.History.table.c.deleted==False,
galaxy.model.History.table.c.user_id==regular_user3.id ) ) \
@@ -587,16 +641,16 @@
self.check_page_for_string( 'hg15' )
self.assertEqual ( len( self.get_history_as_data_list() ), 1 )
# Delete the history item
- self.delete_history_item( latest_hda.hid, check_str="Your history is
empty" )
+ self.delete_history_item( str( latest_hda.id ), check_str="Your history is
empty" )
self.assertEqual ( len( self.get_history_as_data_list() ), 0 )
# Try deleting an invalid hid
try:
self.delete_history_item( 'XXX' )
- raise AssertionError, "Inproperly able to delete hid 'XXX' which
is not an integer"
+ raise AssertionError, "Inproperly able to delete hda_id 'XXX'
which is not an integer"
except:
pass
# Undelete the history item
- self.undelete_history_item( latest_hda.hid, show_deleted=True )
+ self.undelete_history_item( str( latest_hda.id ) )
self.home()
self.visit_url( "%s/history/?show_deleted=False" % self.url )
self.check_page_for_string( '1.bed' )
@@ -673,6 +727,7 @@
# Delete histories
self.delete_history( id=self.security.encode_id( history3.id ) )
self.delete_history( id=self.security.encode_id( history3_clone2.id ) )
+ self.delete_history( id=self.security.encode_id( history3_clone3.id ) )
self.delete_history( id=self.security.encode_id( history4.id ) )
self.delete_history( id=self.security.encode_id( history5.id ) )
# Eliminate Sharing role for: test(a)bx.psu.edu, test2(a)bx.psu.edu