[hg] galaxy 2428: Add ability to share histories with multiple u...
details: http://www.bx.psu.edu/hg/galaxy/rev/9a71b89082fe changeset: 2428:9a71b89082fe user: Greg Von Kuster <greg@bx.psu.edu> date: Fri Jun 05 11:15:25 2009 -0400 description: Add ability to share histories with multiple users, along with bug fixes and more functional test coverage for history features. 12 file(s) affected in this change: lib/galaxy/security/__init__.py lib/galaxy/tools/actions/upload.py lib/galaxy/web/controllers/history.py lib/galaxy/web/controllers/root.py lib/galaxy/web/framework/__init__.py scripts/cleanup_datasets/cleanup_datasets.py templates/history/rename.mako templates/history/share.mako test/base/twilltestcase.py test/functional/test_history_functions.py test/functional/test_security_and_libraries.py tool_conf.xml.sample diffs (1698 lines): diff -r 73847f425801 -r 9a71b89082fe lib/galaxy/security/__init__.py --- a/lib/galaxy/security/__init__.py Mon Jun 01 10:27:04 2009 -0400 +++ b/lib/galaxy/security/__init__.py Fri Jun 05 11:15:25 2009 -0400 @@ -51,6 +51,8 @@ def set_dataset_permission( self, dataset, permission ): raise "Unimplemented Method" def set_all_library_permissions( self, dataset, permissions ): + raise "Unimplemented Method" + def dataset_is_public( self, dataset ): raise "Unimplemented Method" def make_dataset_public( self, dataset ): raise "Unimplemented Method" @@ -296,6 +298,10 @@ # Add the new specific permission on the dataset for dp in [ self.model.DatasetPermissions( action, dataset, role ) for role in roles ]: dp.flush() + def dataset_is_public( self, dataset ): + # A dataset is considered public if there are no "access" actions associated with it. Any + # other actions ( 'manage permissions', 'edit metadata' ) are irrelevant. + return self.permitted_actions.DATASET_ACCESS.action not in [ a.action for a in dataset.actions ] def make_dataset_public( self, dataset ): # A dataset is considered public if there are no "access" actions associated with it. Any # other actions ( 'manage permissions', 'edit metadata' ) are irrelevant. diff -r 73847f425801 -r 9a71b89082fe lib/galaxy/tools/actions/upload.py --- a/lib/galaxy/tools/actions/upload.py Mon Jun 01 10:27:04 2009 -0400 +++ b/lib/galaxy/tools/actions/upload.py Fri Jun 05 11:15:25 2009 -0400 @@ -245,12 +245,14 @@ parts = file_name.split( "." ) if len( parts ) > 1: ext = parts[1].strip().lower() - if not( ext == 'ab1' or ext == 'scf' ): + if not( ext == 'ab1' or ext == 'scf' or ext == 'novoindex' ): raise BadFileException( "you attempted to upload an inappropriate file." ) if ext == 'ab1' and file_type != 'ab1': raise BadFileException( "you must manually set the 'File Format' to 'Ab1' when uploading ab1 files." ) elif ext == 'scf' and file_type != 'scf': raise BadFileException( "you must manually set the 'File Format' to 'Scf' when uploading scf files." ) + elif ext == 'novoindex' and file_type != 'novoindex': + raise BadFileException( "you must manually set the 'File Format' to 'NovoIndex' when uploading novoindex files." ) data_type = 'binary' if not data_type: # We must have a text file @@ -336,7 +338,7 @@ return ( False, False, None ) zip_file = zipfile.ZipFile( temp_name, "r" ) # Make sure the archive consists of valid files. The current rules are: - # 1. Archives can only include .ab1, .scf or .txt files + # 1. Archives can only include .ab1, .scf, or .txt files # 2. All file extensions within an archive must be the same name = zip_file.namelist()[0] test_ext = name.split( "." )[1].strip().lower() diff -r 73847f425801 -r 9a71b89082fe lib/galaxy/web/controllers/history.py --- a/lib/galaxy/web/controllers/history.py Mon Jun 01 10:27:04 2009 -0400 +++ b/lib/galaxy/web/controllers/history.py Fri Jun 05 11:15:25 2009 -0400 @@ -1,6 +1,7 @@ from galaxy.web.base.controller import * from galaxy.web.framework.helpers import time_ago, iff, grids -import webhelpers +from galaxy import util +import webhelpers, logging from datetime import datetime from cgi import escape @@ -65,7 +66,6 @@ @web.expose def index( self, trans ): return "" - @web.expose def list_as_xml( self, trans ): """ @@ -78,9 +78,7 @@ @web.expose @web.require_login( "work with multiple histories" ) def list( self, trans, **kwargs ): - """ - List all available histories - """ + """List all available histories""" status = message = None if 'operation' in kwargs: operation = kwargs['operation'].lower() @@ -92,9 +90,7 @@ status, message = None, None refresh_history = False # Load the histories and ensure they all belong to the current user - history_ids = kwargs.get( 'id', [] ) - if type( history_ids ) is not list: - history_ids = [ history_ids ] + history_ids = util.listify( kwargs.get( 'id', [] ) ) histories = [] for hid in history_ids: history = model.History.get( hid ) @@ -117,17 +113,13 @@ trans.sa_session.flush() # Render the list view return self.list_grid( trans, status=status, message=message, **kwargs ) - def _list_delete( self, trans, histories ): """Delete histories""" n_deleted = 0 deleted_current = False for history in histories: if not history.deleted: - # Delete DefaultHistoryPermissions - for dhp in history.default_permissions: - dhp.delete() - dhp.flush() + # We'll not eliminate any DefaultHistoryPermissions in case we undelete the history later # Mark history as deleted in db history.deleted = True # If deleting the current history, make a new current. @@ -144,7 +136,6 @@ message_parts.append( "Your active history was deleted, a new empty history is now active.") status = INFO return ( status, " ".join( message_parts ) ) - def _list_undelete( self, trans, histories ): """Undelete histories""" n_undeleted = 0 @@ -154,6 +145,15 @@ n_already_purged += 1 if history.deleted: history.deleted = False + if not history.default_permissions: + # For backward compatibility - for a while we were deleting all DefaultHistoryPermissions on + # the history when we deleted the history. We are no longer doing this. + # Need to add default DefaultHistoryPermissions since they were deleted when the history was deleted + default_action = trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS + private_user_role = trans.app.security_agent.get_private_user_role( history.user ) + default_permissions = {} + default_permissions[ default_action ] = [ private_user_role ] + trans.app.security_agent.history_set_default_permissions( history, default_permissions ) n_undeleted += 1 trans.log_event( "History id %d marked as undeleted" % history.id ) status = SUCCESS @@ -164,7 +164,6 @@ message_parts.append( "%d have already been purged and cannot be undeleted." % n_already_purged ) status = WARNING return status, "".join( message_parts ) - def _list_switch( self, trans, histories ): """Switch to a new different history""" new_history = histories[0] @@ -179,13 +178,9 @@ trans.log_event( "History switched to id: %s, name: '%s'" % (str(new_history.id), new_history.name ) ) # No message return None, None - @web.expose def delete_current( self, trans ): - """ - Delete just the active history -- this does not require a logged - in user. - """ + """Delete just the active history -- this does not require a logged in user.""" history = trans.get_history() if not history.deleted: history.deleted = True @@ -195,7 +190,6 @@ # history active trans.new_history() return trans.show_ok_message( "History deleted, a new history is active" ) - @web.expose def rename_async( self, trans, id=None, new_name=None ): history = model.History.get( id ) @@ -209,11 +203,9 @@ # Rename history.name = new_name trans.sa_session.flush() - - ## These have been moved from 'root' but not cleaned up - @web.expose def imp( self, trans, id=None, confirm=False, **kwd ): + # TODO clean this up and make sure functionally correct msg = "" user = trans.get_user() user_history = trans.get_history() @@ -262,97 +254,159 @@ Warning! If you import this history, you will lose your current history. Click <a href="%s">here</a> to confirm. """ % web.url_for( id=id, confirm=True ) ) - @web.expose @web.require_login( "share histories with other users" ) def share( self, trans, id=None, email="", **kwd ): - send_to_err = "" - if not id: - id = trans.get_history().id - if not isinstance( id, list ): - id = [ id ] - histories = [] - history_names = [] - for hid in id: - histories.append( trans.app.model.History.get( hid ) ) - history_names.append(histories[-1].name) - if not email: - return trans.fill_template("/history/share.mako", histories=histories, email=email, send_to_err=send_to_err) - user = trans.get_user() - send_to_user = trans.app.model.User.filter( trans.app.model.User.table.c.email==email ).first() + # If a history contains both datasets that can be shared and others that cannot be shared with the desired user, + # then the entire history is shared, and the protected datasets will be visible, but inaccessible ( greyed out ) + # in the shared history params = util.Params( kwd ) action = params.get( 'action', None ) if action == "no_share": - trans.response.send_redirect( url_for( action='history_options' ) ) - if not send_to_user: - send_to_err = "No such user" - elif user.email == email: - send_to_err = "You can't send histories to yourself" - else: - if 'history_share_btn' in kwd or action != 'share': - # The user is attempting to share a history whose datasets cannot all be accessed by the other user. In this case, - # the user sharing the history can chose to make the datasets public ( action == 'public' ) if he has the authority - # to do so, or automatically create a new "sharing role" that allows the user to share his private datasets only with the - # desired user ( action == 'private' ). - can_change = {} - cannot_change = {} + trans.response.send_redirect( url_for( controller='root', action='history_options' ) ) + if not id: + id = trans.get_history().id + id = util.listify( id ) + send_to_err = "" + histories = [] + for hid in id: + histories.append( trans.app.model.History.get( hid ) ) + if not email: + return trans.fill_template( "/history/share.mako", histories=histories, email=email, send_to_err=send_to_err ) + user = trans.get_user() + send_to_users = [] + for email_address in util.listify( email ): + email_address = email_address.strip() + if email_address: + if email_address == user.email: + send_to_err += "You can't send histories to yourself. " + else: + send_to_user = trans.app.model.User.filter( trans.app.model.User.table.c.email==email_address ).first() + if send_to_user: + send_to_users.append( send_to_user ) + else: + send_to_err += "%s is not a valid Galaxy user. " % email_address + if not send_to_users: + if not send_to_err: + send_to_err += "%s is not a valid Galaxy user. " % email + return trans.fill_template( "/history/share.mako", histories=histories, email=email, send_to_err=send_to_err ) + if params.get( 'share_proceed_button', False ) and action == 'share': + # We need to filter out all histories that cannot be shared + filtered_histories = {} + for history in histories: + for send_to_user in send_to_users: + # Only deal with datasets that have not been purged + for hda in history.activatable_datasets: + # The history can be shared if it contains at least 1 public dataset or 1 dataset that the + # other user can access. Inaccessible datasets contained in the history will be displayed + # in the shared history, but "greyed out", so they cannot be viewed or used. + if trans.app.security_agent.dataset_is_public( hda.dataset ) or \ + trans.app.security_agent.allow_action( send_to_user, + trans.app.security_agent.permitted_actions.DATASET_ACCESS, + dataset=hda ): + if send_to_user in filtered_histories: + filtered_histories[ send_to_user ].append( history ) + else: + filtered_histories[ send_to_user ] = [ history ] + break + return self._share_histories( trans, user, send_to_users, send_to_err, filtered_histories=filtered_histories ) + elif params.get( 'history_share_btn', False ) or action != 'share': + # The user is attempting to share histories whose datasets cannot all be accessed by other users. In this case, + # the user sharing the histories can: + # 1) action=='public': chose to make the datasets public if he is permitted to do so + # 2) action=='private': automatically create a new "sharing role" allowing protected + # datasets to be accessed only by the desired users + # 3) action=='share': share only what can be shared when no permissions are changed - this case is handled above + # 4) action=='no_share': Do not share anything - this case is handled above. + can_change = {} + cannot_change = {} + no_change_needed = {} + for history in histories: + # Only deal with datasets that have not been purged + for hda in history.activatable_datasets: + if trans.app.security_agent.dataset_is_public( hda.dataset ): + if history not in no_change_needed: + no_change_needed[ history ] = [ hda ] + else: + no_change_needed[ history ].append( hda ) + elif not trans.app.security_agent.allow_action( send_to_user, + trans.app.security_agent.permitted_actions.DATASET_ACCESS, + dataset=hda ): + # The user with which we are sharing the history does not have access permission on the current dataset + if trans.app.security_agent.allow_action( user, + trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS, + dataset=hda ) and not hda.dataset.library_associations: + # The current user has authority to change permissions on the current dataset because + # they have permission to manage permissions on the dataset and the dataset is not associated + # with a library. + if action == "private": + trans.app.security_agent.privately_share_dataset( hda.dataset, users=[ user, send_to_user ] ) + elif action == "public": + trans.app.security_agent.make_dataset_public( hda.dataset ) + elif history not in can_change: + # Build the set of histories / datasets on which the current user has authority + # to "manage permissions". This is used in /history/share.mako + can_change[ history ] = [ hda ] + else: + can_change[ history ].append( hda ) + else: + if action in [ "private", "public" ]: + # Don't change stuff that the user doesn't have permission to change + continue + elif history not in cannot_change: + # Build the set of histories / datasets on which the current user does + # not have authority to "manage permissions". This is used in /history/share.mako + cannot_change[ history ] = [ hda ] + else: + cannot_change[ history ].append( hda ) + if can_change or cannot_change: + return trans.fill_template( "/history/share.mako", + histories=histories, + email=email, + send_to_err=send_to_err, + can_change=can_change, + cannot_change=cannot_change, + no_change_needed=no_change_needed ) + return self._share_histories( trans, user, send_to_users, send_to_err, histories=histories ) + return trans.fill_template( "/history/share.mako", histories=histories, email=email, send_to_err=send_to_err ) + def _share_histories( self, trans, user, send_to_users, send_to_err, histories=[], filtered_histories={} ): + msg = "" + if not send_to_users: + msg = "No users have been specified with which to share histories" + sent_to_emails = [] + for sent_to_user in send_to_users: + sent_to_emails.append( sent_to_user.email ) + emails = ",".join( e for e in sent_to_emails ) + if not histories and not filtered_histories: + msg = "No histories can be sent to (%s) without changing dataset permissions associating a sharing role with them" % emails + elif histories: + history_names = [] + for history in histories: + history_names.append( history.name ) + for send_to_user in send_to_users: + new_history = history.copy( target_user=send_to_user ) + new_history.name = history.name + " from " + user.email + new_history.user_id = send_to_user.id + self.app.model.flush() + msg = "Histories (%s) have been shared with: %s. " % ( ",".join( history_names ), emails ) + elif filtered_histories: + # filtered_histories is a dictionary like: { user: [ history, history ], user: [ history ] } + for send_to_user, histories in filtered_histories.items(): + history_names = [] for history in histories: - for hda in history.activatable_datasets: - # Only deal with datasets that have not been purged - if not trans.app.security_agent.allow_action( send_to_user, - trans.app.security_agent.permitted_actions.DATASET_ACCESS, - dataset=hda ): - # The user with which we are sharing the history does not have access permission on the current dataset - if trans.app.security_agent.allow_action( user, - trans.app.security_agent.permitted_actions.DATASET_MANAGE_PERMISSIONS, - dataset=hda ) and not hda.dataset.library_associations: - # The current user has authority to change permissions on the current dataset because - # they have permission to manage permissions on the dataset and the dataset is not associated - # with a library. - if action == "private": - trans.app.security_agent.privately_share_dataset( hda.dataset, users=[ user, send_to_user ] ) - elif action == "public": - trans.app.security_agent.make_dataset_public( hda.dataset ) - elif history not in can_change: - # Build the set of histories / datasets on which the current user has authority - # to "manage permissions". This is used in /history/share.mako - can_change[ history ] = [ hda ] - else: - can_change[ history ].append( hda ) - else: - if action in [ "private", "public" ]: - # Don't change stuff that the user doesn't have permission to change - continue - elif history not in cannot_change: - # Build the set of histories / datasets on which the current user does - # not have authority to "manage permissions". This is used in /history/share.mako - cannot_change[ history ] = [ hda ] - else: - cannot_change[ history ].append( hda ) - if can_change or cannot_change: - return trans.fill_template( "/history/share.mako", - histories=histories, - email=email, - send_to_err=send_to_err, - can_change=can_change, - cannot_change=cannot_change ) - for history in histories: - new_history = history.copy( target_user=send_to_user ) - new_history.name = history.name + " from " + user.email - new_history.user_id = send_to_user.id - trans.log_event( "History share, id: %s, name: '%s': to new id: %s" % ( str( history.id ), history.name, str( new_history.id ) ) ) - self.app.model.flush() - return trans.show_message( "History (%s) has been shared with: %s" % ( ",".join( history_names ),email ) ) - return trans.fill_template( "/history/share.mako", histories=histories, email=email, send_to_err=send_to_err ) - + history_names.append( history.name ) + new_history = history.copy( target_user=send_to_user ) + new_history.name = history.name + " from " + user.email + new_history.user_id = send_to_user.id + self.app.model.flush() + msg += "Histories (%s) have been shared with: %s. " % ( ",".join( history_names ), send_to_user.email ) + if send_to_err: + msg += send_to_err + return trans.show_message( msg ) @web.expose @web.require_login( "rename histories" ) def rename( self, trans, id=None, name=None, **kwd ): - if trans.app.memory_usage: - # Keep track of memory usage - m0 = self.app.memory_usage.memory() user = trans.get_user() - if not isinstance( id, list ): if id != None: id = [ id ] @@ -387,7 +441,4 @@ change_msg = change_msg + "<p>You must specify a valid name for History: "+cur_names[i]+"</p>" else: change_msg = change_msg + "<p>History: "+cur_names[i]+" does not appear to belong to you.</p>" - if self.app.memory_usage: - m1 = trans.app.memory_usage.memory( m0, pretty=True ) - log.info( "End of root/history_rename, memory used increased by %s" % m1 ) - return trans.show_message( "<p>%s" % change_msg, refresh_frames=['history'] ) \ No newline at end of file + return trans.show_message( "<p>%s" % change_msg, refresh_frames=['history'] ) diff -r 73847f425801 -r 9a71b89082fe lib/galaxy/web/controllers/root.py --- a/lib/galaxy/web/controllers/root.py Mon Jun 01 10:27:04 2009 -0400 +++ b/lib/galaxy/web/controllers/root.py Fri Jun 05 11:15:25 2009 -0400 @@ -349,14 +349,12 @@ """Displays a list of history related actions""" return trans.fill_template( "/history/options.mako", user = trans.get_user(), history = trans.get_history() ) - @web.expose def history_delete( self, trans, id ): """ Backward compatibility with check_galaxy script. """ return trans.webapp.controllers['history'].list( trans, id, operation='delete' ) - @web.expose def clear_history( self, trans ): """Clears the history for a user""" @@ -367,7 +365,6 @@ self.app.model.flush() trans.log_event( "History id %s cleared" % (str(history.id)) ) trans.response.send_redirect( url_for("/index" ) ) - @web.expose def history_import( self, trans, id=None, confirm=False, **kwd ): msg = "" @@ -417,13 +414,11 @@ Warning! If you import this history, you will lose your current history. Click <a href="%s">here</a> to confirm. """ % web.url_for( id=id, confirm=True ) ) - @web.expose - def history_new( self, trans ): - trans.new_history() + def history_new( self, trans, name=None ): + trans.new_history( name=name ) trans.log_event( "Created new History, id: %s." % str(trans.get_history().id) ) return trans.show_message( "New history created", refresh_frames = ['history'] ) - @web.expose def history_add_to( self, trans, history_id=None, file_data=None, name="Data Added to History",info=None,ext="txt",dbkey="?",copy_access_from=None,**kwd ): """Adds a POSTed file to a History""" @@ -455,13 +450,22 @@ except Exception, e: trans.log_event( "Failed to add dataset to history: %s" % ( e ) ) return trans.show_error_message("Adding File to History has Failed") - @web.expose - def history_set_default_permissions( self, trans, **kwd ): - """Sets the user's default permissions for the current history""" + def history_set_default_permissions( self, trans, id=None, **kwd ): + """Sets the permissions on a history""" if trans.user: if 'update_roles_button' in kwd: - history = trans.get_history() + history = None + if id: + try: + id = int( id ) + except: + id = None + if id: + history = trans.app.model.History.get( id ) + if not history: + # If we haven't retrieved a history, use the current one + history = trans.get_history() p = util.Params( kwd ) permissions = {} for k, v in trans.app.model.Dataset.permitted_actions.items(): @@ -478,7 +482,6 @@ else: #user not logged in, history group must be only public return trans.show_error_message( "You must be logged in to change a history's default permissions." ) - @web.expose def dataset_make_primary( self, trans, id=None): """Copies a dataset and makes primary""" diff -r 73847f425801 -r 9a71b89082fe lib/galaxy/web/framework/__init__.py --- a/lib/galaxy/web/framework/__init__.py Mon Jun 01 10:27:04 2009 -0400 +++ b/lib/galaxy/web/framework/__init__.py Fri Jun 05 11:15:25 2009 -0400 @@ -432,13 +432,15 @@ self.galaxy_session.current_history = history self.sa_session.flush( [ self.galaxy_session ] ) history = property( get_history, set_history ) - def new_history( self ): + def new_history( self, name=None ): """ Create a new history and associate it with the current session and its associated user (if set). """ # Create new history history = self.app.model.History() + if name: + history.name = name # Associate with session history.add_galaxy_session( self.galaxy_session ) # Make it the session's current history diff -r 73847f425801 -r 9a71b89082fe scripts/cleanup_datasets/cleanup_datasets.py --- a/scripts/cleanup_datasets/cleanup_datasets.py Mon Jun 01 10:27:04 2009 -0400 +++ b/scripts/cleanup_datasets/cleanup_datasets.py Fri Jun 05 11:15:25 2009 -0400 @@ -123,6 +123,11 @@ for dataset_assoc in history.datasets: _purge_dataset_instance( dataset_assoc, app, remove_from_disk, info_only = info_only ) #mark a DatasetInstance as deleted, clear associated files, and mark the Dataset as deleted if it is deletable if not info_only: + # TODO: should the Delete DefaultHistoryPermissions be deleted here? This was incorrectly + # done in the _list_delete() method of the history controller, so copied it here. Not sure + # if we should ever delete info like this from the db though, so commented out for now... + #for dhp in history.default_permissions: + # dhp.delete() history.purged = True print "%d" % history.id history_count += 1 @@ -226,7 +231,6 @@ print "# This Dataset (%i) is not deletable, associated Metadata Files will not be removed.\n" % ( dataset.id ) else: # Mark all associated MetadataFiles as deleted and purged and remove them from disk - print "The following metadata files attached to associations of Dataset '%s' have been purged:" % dataset.id metadata_files = [] #lets create a list of metadata files, then perform actions on them for hda in dataset.history_associations: @@ -236,6 +240,7 @@ for metadata_file in app.model.MetadataFile.filter( app.model.MetadataFile.table.c.lda_id==lda.id ).all(): metadata_files.append( metadata_file ) for metadata_file in metadata_files: + print "# The following metadata files attached to associations of Dataset '%s' have been purged:" % dataset.id if not info_only: if remove_from_disk: try: @@ -248,7 +253,6 @@ print "%s" % metadata_file.file_name print dataset.deleted = True - #dataset.flush() app.model.flush() def _purge_dataset( dataset, remove_from_disk, info_only = False ): @@ -259,6 +263,7 @@ if not info_only: # Remove files from disk and update the database if remove_from_disk: + # TODO: should permissions on the dataset be deleted here? os.unlink( dataset.file_name ) # Remove associated extra files from disk if they exist if dataset.extra_files_path and os.path.exists( dataset.extra_files_path ): @@ -286,6 +291,7 @@ for sub_folder in folder.folders: _purge_folder( sub_folder, app, remove_from_disk, info_only = info_only ) if not info_only: + # TODO: should the folder permissions be deleted here? folder.purged = True folder.flush() diff -r 73847f425801 -r 9a71b89082fe templates/history/rename.mako --- a/templates/history/rename.mako Mon Jun 01 10:27:04 2009 -0400 +++ b/templates/history/rename.mako Fri Jun 05 11:15:25 2009 -0400 @@ -4,17 +4,38 @@ <div class="toolForm"> <div class="toolFormTitle">${_('Rename History')}</div> - <div class="toolFormBody"> - <form action="${h.url_for( controller='history', action='rename' )}" method="post" > - <table> - <tr><th>${_('Current Name')}</th><th>${_('New Name')}</th></tr> - %for history in histories: - <tr><td>${history.name}<input type="hidden" name="id" value="${history.id}"></td><td><input type="text" name="name" value="${history.name}" size="40"></td></tr> - %endfor - <tr><td colspan="2"><input type="submit" name="history_rename_btn" value="${_('Rename Histories')}"></td></tr> - </table> - </form> - </div> + <div class="toolFormBody"> + <form action="${h.url_for( controller='history', action='rename' )}" method="post" > + <table class="grid"> + %for history in histories: + <tr> + <td> + <div class="form-row"> + <input type="hidden" name="id" value="${history.id}"> + <label>${_('Current Name')}</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + ${history.name} + </div> + </div> + </td> + <td> + <div class="form-row"> + <label>${_('New Name')}</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + <input type="text" name="name" value="${history.name}" size="40"> + </div> + </div> + </td> + </tr> + %endfor + <tr> + <td colspan="2"> + <div class="form-row"> + <input type="submit" name="history_rename_btn" value="${_('Rename Histories')}"> + </div> + </td> + </tr> + </table> + </form> + </div> </div> -</body> -</html> \ No newline at end of file diff -r 73847f425801 -r 9a71b89082fe templates/history/share.mako --- a/templates/history/share.mako Mon Jun 01 10:27:04 2009 -0400 +++ b/templates/history/share.mako Fri Jun 05 11:15:25 2009 -0400 @@ -2,117 +2,173 @@ <%inherit file="/base.mako"/> <%def name="title()">Share histories</%def> -%if not can_change and not cannot_change: - <div class="toolForm"> - <div class="toolFormTitle">${_('Share histories')}</div> - <table> +<div class="toolForm"> + <div class="toolFormTitle">Share ${len( histories)} histories</div> + <div class="toolFormBody"> + %if not can_change and not cannot_change: <form action="${h.url_for( controller="history", action='share' )}" method="post" > - <tr><th>${_('History Name:')}</td><th>${_('Number of Datasets:')}</th><th>${_('Share Link')}</th></tr> %for history in histories: - <tr> - <td align="center">${history.name}<input type="hidden" name="id" value="${history.id}"></td> - <td align="center"> - %if len( history.datasets ) < 1: - <div class="warningmark">${_('This history contains no data.')}</div> - %else: - ${len(history.datasets)} - %endif - </td> - <td align="center"><a href="${h.url_for( controller='history', action='imp', id=trans.security.encode_id(history.id) )}">${_('copy link to share')}</a></td> - </tr> + <div class="toolForm"> + <div class="form-row"> + <label>${_('History Name:')}</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + ${history.name}<input type="hidden" name="id" value="${history.id}"> + </div> + </div> + <div style="clear: both"></div> + <div class="form-row"> + <label>${_('Number of Datasets:')}</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + %if len( history.datasets ) < 1: + <div class="warningmark">${_('This history contains no data.')}</div> + %else: + ${len(history.datasets)} + %endif + </td> + </div> + </div> + ## TODO: this feature is not currently working + ##<div style="clear: both"></div> + ##<div class="form-row"> + ## <label>${_('Share Link')}</label> + ## <div style="float: left; width: 250px; margin-right: 10px;"> + ## <a href="${h.url_for( controller='history', action='imp', id=trans.security.encode_id(history.id) )}">${_('copy link to share')}</a> + ## </div> + ##</div> + ##<div style="clear: both"></div> + <p/> + </div> %endfor - <tr><td>${_('Email of User to share with:')}</td><td><input type="text" name="email" value="${email}" size="40"></td></tr> + <p/> + <div style="clear: both"></div> + <div class="form-row"> + <label>Galaxy user emails with which to share histories</label> + <div style="float: left; width: 250px; margin-right: 10px;"> + <input type="text" name="email" value="${email}" size="40"> + </div> + <div class="toolParamHelp" style="clear: both;"> + Enter a Galaxy user email address or a comma-separated list of addresses if sharing with multiple users + </div> + </div> %if send_to_err: - <tr><td colspan="100%"><div class="errormessage">${send_to_err}</div></td></tr> + <div style="clear: both"></div> + <div class="form-row"> + <div class="errormessage">${send_to_err}</div> + </div> %endif - <tr><td colspan="2" align="right"><input type="submit" name="history_share_btn" value="Submit"></td></tr> + <div style="clear: both"></div> + <div class="form-row"> + <input type="submit" name="history_share_btn" value="Submit"> + </div> </form> - </table> + %else: + <form action="${h.url_for( controller='history', action='share' )}" method="post"> + %for history in histories: + <input type="hidden" name="id" value="${history.id}"> + %endfor + <input type="hidden" name="email" value="${email}"> + %if no_change_needed: + <div style="clear: both"></div> + <div class="form-row"> + <div class="donemessage"> + The following datasets can be shared with ${email} with no changes + </div> + </div> + %for history, hdas in no_change_needed.items(): + <div class="form-row"> + <label>History</label> + ${history.name} + </div> + <div style="clear: both"></div> + <div class="form-row"> + <label>Datasets</label> + </div> + %for hda in hdas: + <div class="form-row"> + ${hda.name} + </div> + %endfor + %endfor + %endif + %if can_change: + <div style="clear: both"></div> + <div class="form-row"> + <div class="warningmessage"> + The following datasets can be shared with ${email} by updating their permissions + </div> + </div> + %for history, hdas in can_change.items(): + <div class="form-row"> + <label>History</label> + ${history.name} + </div> + <div style="clear: both"></div> + <div class="form-row"> + <label>Datasets</label> + </div> + %for hda in hdas: + <div class="form-row"> + ${hda.name} + </div> + %endfor + %endfor + %endif + %if cannot_change: + <div style="clear: both"></div> + <div class="form-row"> + <div class="errormessage"> + The following datasets cannot be shared with ${email} because you are not authorized to + change the permissions on them + </div> + </div> + %for history, hdas in cannot_change.items(): + <div class="form-row"> + <label>History</label> + ${history.name} + </div> + <div style="clear: both"></div> + <div class="form-row"> + <label>Datasets</label> + </div> + %for hda in hdas: + <div class="form-row"> + ${hda.name} + </div> + %endfor + %endfor + %endif + <div class="form-row"> + <label>How would you like to proceed?</label> + </div> + %if can_change: + <div class="form-row"> + <input type="radio" name="action" value="public"> Make datasets public so anyone can access them + %if cannot_change: + (where possible) + %endif + </div> + <div class="form-row"> + <input type="radio" name="action" value="private"> Make datasets private to me and the user(s) with whom I am sharing + %if cannot_change: + (where possible) + %endif + </div> + %endif + %if no_change_needed: + <div class="form-row"> + <input type="radio" name="action" value="share"> Share anyway + %if can_change: + (don't change any permissions) + %endif + </div> + %endif + <div class="form-row"> + <input type="radio" name="action" value="no_share"> Don't share + </div> + <div class="form-row"> + <input type="submit" name="share_proceed_button" value="Go"><br/> + </div> + </form> + %endif </div> -%else: - <style type="text/css"> - th - { - text-align: left; - } - td - { - vertical-align: top; - } - </style> - <form action="${h.url_for( controller='history', action='share' )}" method="post"> - %for history in histories: - <input type="hidden" name="id" value="${history.id}"> - %endfor - <input type="hidden" name="email" value="${email}"> - <div class="warningmessage"> - The history or histories you've chosen to share contain datasets that the user with which you're sharing does not have permission to access. - These datasets are shown below. Datasets that the user has permission to access are not shown. - </div> - <p/> - %if can_change: - <div class="donemessage"> - The following datasets can be shared with ${email} by updating their permissions: - <p/> - <table cellpadding="0" cellspacing="8" border="0"> - <tr><th>Histories</th><th>Datasets</th></tr> - %for history, datasets in can_change.items(): - <tr> - <td>${history.name}</td> - <td> - %for dataset in datasets: - ${dataset.name}<br/> - %endfor - </td> - </tr> - %endfor - </table> - </div> - <p/> - %endif - %if cannot_change: - <div class="errormessage"> - The following datasets cannot be shared with ${email} because you are not authorized to change the permissions on them. - <p/> - <table cellpadding="0" cellspacing="8" border="0"> - <tr><th>Histories</th><th>Datasets</th></tr> - %for history, datasets in cannot_change.items(): - <tr> - <td>${history.name}</td> - <td> - %for dataset in datasets: - ${dataset.name}<br/> - %endfor - </td> - </tr> - %endfor - </table> - </div> - <p/> - %endif - <div> - <b>How would you like to proceed?</b> - <p/> - %if can_change: - <input type="radio" name="action" value="public"> Set datasets above to public access - %if cannot_change: - (where possible) - %endif - <br/> - <input type="radio" name="action" value="private"> Set datasets above to private access for me and the user(s) with whom I am sharing - %if cannot_change: - (where possible) - %endif - <br/> - %endif - <input type="radio" name="action" value="share"> Share anyway - %if can_change: - (don't change any permissions) - %endif - <br/> - <input type="radio" name="action" value="no_share"> Don't share<br/> - <br/> - <input type="submit" name="submit" value="Ok"><br/> - </div> - </form> -%endif +</div> diff -r 73847f425801 -r 9a71b89082fe test/base/twilltestcase.py --- a/test/base/twilltestcase.py Mon Jun 01 10:27:04 2009 -0400 +++ b/test/base/twilltestcase.py Fri Jun 05 11:15:25 2009 -0400 @@ -103,41 +103,42 @@ page = self.last_page() if page.find( 'error' ) > -1: raise AssertionError('Errors in the history for user %s' % self.user ) - def check_history_for_string( self, patt ): """Looks for 'string' in history page""" self.home() self.visit_page( "history" ) for subpatt in patt.split(): tc.find(subpatt) - + self.home() def clear_history( self ): """Empties a history of all datasets""" self.visit_page( "clear_history" ) self.check_history_for_string( 'Your history is empty' ) - - def delete_history( self, id=None ): - """Deletes a history""" + self.home() + def delete_history( self, id='' ): + """Deletes one or more histories""" history_list = self.get_histories() self.assertTrue( history_list ) - if id is None: + num_deleted = 1 + if not id: history = history_list[0] id = history.get( 'id' ) - id = str( id ) - self.visit_page( "history/list?operation=delete&id=%s" %(id) ) - + else: + num_deleted = len( id.split( ',' ) ) + self.visit_page( "history/list?operation=delete&id=%s" % ( id ) ) + check_str = 'Deleted %d histories' % num_deleted + self.check_page_for_string( check_str ) + self.home() def get_histories( self ): """Returns all histories""" tree = self.histories_as_xml_tree() data_list = [ elem for elem in tree.findall("data") ] return data_list - def get_history( self, show_deleted=False ): """Returns a history""" tree = self.history_as_xml_tree( show_deleted=show_deleted ) data_list = [ elem for elem in tree.findall("data") ] return data_list - def history_as_xml_tree( self, show_deleted=False ): """Returns a parsed xml object of a history""" self.home() @@ -145,7 +146,6 @@ xml = self.last_page() tree = ElementTree.fromstring(xml) return tree - def histories_as_xml_tree( self ): """Returns a parsed xml object of all histories""" self.home() @@ -153,97 +153,96 @@ xml = self.last_page() tree = ElementTree.fromstring(xml) return tree - - def history_options( self ): + def history_options( self, check_str='', upload=False ): """Mimics user clicking on history options link""" self.visit_page( "history_options" ) - - def new_history( self ): + if check_str: + self.check_page_for_string( check_str ) + else: + self.check_page_for_string( 'Rename</a> current history' ) + self.check_page_for_string( 'List</a> previously stored histories' ) + self.check_page_for_string( 'Construct workflow</a> from the current history' ) + self.check_page_for_string( 'Share</a> current history' ) + # Tests for changing default history permissions are done in test_security_and_libraries.py + self.check_page_for_string( 'Change default permissions</a> for the current history' ) + self.check_page_for_string( 'Show deleted</a> datasets in history' ) + self.check_page_for_string( 'Delete</a> current history' ) + # Need to add a history item in order to create a new empty history + try: + self.check_page_for_string( 'Create</a> a new empty history' ) + raise AssertionError, "Incorrectly able to create a new empty history when the current history is empty." + except: + pass + if upload: + self.upload_file( '1.bed', dbkey='hg18' ) + self.home() + self.visit_page( "history_options" ) + self.check_page_for_string( 'Create</a> a new empty history' ) + self.home() + def new_history( self, name=None ): """Creates a new, empty history""" - self.visit_page( "history_new" ) + if name: + self.visit_url( "%s/history_new?name=%s" % ( self.url, str( name ) ) ) + else: + self.visit_url( "%s/history_new" % self.url ) self.check_history_for_string('Your history is empty') - - def rename_history( self, id=None, name='NewTestHistory' ): + self.home() + def rename_history( self, id, old_name, new_name ): """Rename an existing history""" - history_list = self.get_histories() - self.assertTrue( history_list ) - if id is None: # take last id - elem = history_list[-1] - else: - i = history_list.index( id ) - self.assertTrue( i ) - elem = history_list[i] - id = elem.get( 'id' ) - self.assertTrue( id ) - old_name = elem.get( 'name' ) - self.assertTrue( old_name ) - id = str( id ) - self.visit_page( "history/rename?id=%s&name=%s" %(id, name) ) - return id, old_name, name - + self.home() + self.visit_page( "history/rename?id=%s&name=%s" %( id, new_name ) ) + check_str = 'History: %s renamed to: %s' % ( old_name, new_name ) + self.check_page_for_string( check_str ) + self.home() def set_history( self ): """Sets the history (stores the cookies for this run)""" if self.history_id: self.visit_page( "history?id=%s" % self.history_id ) else: self.new_history() - def share_history( self, id=None, email='test2@bx.psu.edu' ): - """Share a history with a different user""" - history_list = self.get_histories() - self.assertTrue( history_list ) - if id is None: # take last id - elem = history_list[-1] - else: - i = history_list.index( id ) - self.assertTrue( i ) - elem = history_list[i] - id = elem.get( 'id' ) - self.assertTrue( id ) - id = str( id ) - name = elem.get( 'name' ) - self.assertTrue( name ) + self.home() + def share_history( self, id, email, check_str, check_str2='', action=None, action_check_str=None ): + """Share a history different users""" self.visit_url( "%s/history/share?id=%s&email=%s&history_share_btn=Submit" % ( self.url, id, email ) ) - self.check_page_for_string( 'History (%s) has been shared with: %s' % ( name, email ) ) - return id, name, email - def share_history_containing_private_datasets( self, history_id, email='test@bx.psu.edu' ): - """Attempt to share a history containing private datasets with a different user""" - self.visit_url( "%s/history/share?id=%s&email=%s&history_share_btn=Submit" % ( self.url, history_id, email ) ) - self.last_page() - self.check_page_for_string( "The history or histories you've chosen to share contain datasets" ) - self.check_page_for_string( "How would you like to proceed?" ) + self.check_page_for_string( check_str ) + if check_str2: + self.check_page_for_string( check_str2 ) + if action: + # If we have an action, then we are sharing datasets with users that do not have access permissions on them + tc.fv( '1', 'action', action ) + tc.submit( "share_proceed_button" ) + if action_check_str: + self.check_page_for_string( action_check_str ) self.home() - def make_datasets_public( self, history_id, email='test@bx.psu.edu' ): - """Make private datasets public in order to share a history with a different user""" - self.visit_url( "%s/history/share?id=%s&email=%s&action=public&submit=Ok" % ( self.url, history_id, email ) ) - self.last_page() - check_str = "History (Unnamed history) has been shared with: %s" % email - self.check_page_for_string( check_str ) - self.home() - def privately_share_dataset( self, history_id, email='test@bx.psu.edu' ): - """Make private datasets public in order to share a history with a different user""" - self.visit_url( "%s/history/share?id=%s&email=%s&action=private&submit=Ok" % ( self.url, history_id, email ) ) - self.last_page() - check_str = "History (Unnamed history) has been shared with: %s" % email - self.check_page_for_string( check_str ) - self.home() - def switch_history( self, hid=None ): + def switch_history( self, id='', name='' ): """Switches to a history in the current list of histories""" data_list = self.get_histories() self.assertTrue( data_list ) - if hid is None: # take last hid - elem = data_list[-1] - hid = elem.get('hid') - if hid < 0: - hid = len(data_list) + hid + 1 - hid = str(hid) - elems = [ elem for elem in data_list if elem.get('hid') == hid ] - self.assertEqual(len(elems), 1) - self.visit_page( "history/list?operation=switch&id=%s" % elems[0].get('id') ) - - def view_stored_histories( self, check_str='' ): + if not id: + history = history_list[0] + id = history.get( 'id' ) + self.visit_url( "%s/history/list?operation=switch&id=%s" % ( self.url, id ) ) + if name: + self.check_history_for_string( name ) + self.home() + def view_stored_active_histories( self, check_str='' ): self.visit_page( "history/list" ) + self.check_page_for_string( 'Stored histories' ) + self.check_page_for_string( '<input type="checkbox" name="id" value=' ) + self.check_page_for_string( 'operation=Rename&id' ) + self.check_page_for_string( 'operation=Switch&id' ) + self.check_page_for_string( 'operation=Delete&id' ) if check_str: self.check_page_for_string( check_str ) + self.home() + def view_stored_deleted_histories( self, check_str='' ): + self.visit_page( "history/list?f-deleted=True" ) + self.check_page_for_string( 'Stored histories' ) + self.check_page_for_string( '<input type="checkbox" name="id" value=' ) + self.check_page_for_string( 'operation=Undelete&id' ) + if check_str: + self.check_page_for_string( check_str ) + self.home() # Functions associated with datasets (history items) and meta data def get_job_stderr( self, id ): @@ -299,9 +298,14 @@ self.visit_url( "%s/dataset/undelete?id=%s" % ( self.url, elems[0].get( 'id' ) ) ) if check_str: self.check_page_for_string( check_str ) + def display_history_item( self, id, check_str='' ): + """Displays a history item - simulates eye icon click""" + self.visit_url( '%s/datasets/%s/display/index' % ( self.url, id ) ) + if check_str: + self.check_page_for_string( check_str ) + self.home() def edit_metadata( self, hid=None, form_no=0, **kwd ): - """ - Edits the metadata associated with a history item.""" + """Edits the metadata associated with a history item.""" # There are currently 4 forms on the edit page: # 0. name="edit_attributes" # 1. name="auto_detect" @@ -324,7 +328,6 @@ button = "change" #Change data type form if kwd: self.submit_form( form_no=form_no, button=button, **kwd) - def get_dataset_ids_in_history( self ): """Returns the ids of datasets in a history""" data_list = self.get_history() diff -r 73847f425801 -r 9a71b89082fe test/functional/test_history_functions.py --- a/test/functional/test_history_functions.py Mon Jun 01 10:27:04 2009 -0400 +++ b/test/functional/test_history_functions.py Fri Jun 05 11:15:25 2009 -0400 @@ -4,74 +4,367 @@ class TestHistory( TwillTestCase ): - def test_00_history_options_when_not_logged_in( self ): + def test_000_history_options_when_not_logged_in( self ): """Testing history options when not logged in""" - self.logout() #Ensure we are not logged in - self.history_options() - self.check_page_for_string( 'logged in</a> to store or switch histories.' ) - self.login( email='test2@bx.psu.edu' ) #Just to make sure we have created this account since it is used to share histories self.logout() - def test_05_new_history_then_delete( self ): - """Testing creating a new history and then deleting it""" - self.login() - self.new_history() - if len(self.get_history()) > 0: - raise AssertionError("test_new_history_then_delete failed") - self.delete_history() - self.check_page_for_string( 'Deleted 1 histories' ) - def test_10_history_options_when_logged_in( self ): + check_str = 'logged in</a> to store or switch histories.' + self.history_options( check_str=check_str ) + # Make sure we have created the following accounts + self.login( email='test1@bx.psu.edu' ) + global regular_user1 + regular_user1 = galaxy.model.User.filter( galaxy.model.User.table.c.email=='test1@bx.psu.edu' ).first() + assert regular_user1 is not None, 'Problem retrieving user with email "test1@bx.psu.edu" from the database' + self.logout() + self.login( email='test2@bx.psu.edu' ) + global regular_user2 + regular_user2 = galaxy.model.User.filter( galaxy.model.User.table.c.email=='test2@bx.psu.edu' ).first() + assert regular_user2 is not None, 'Problem retrieving user with email "test2@bx.psu.edu" from the database' + self.logout() + self.login( email='test3@bx.psu.edu' ) + global regular_user3 + regular_user3 = galaxy.model.User.filter( galaxy.model.User.table.c.email=='test3@bx.psu.edu' ).first() + assert regular_user3 is not None, 'Problem retrieving user with email "test3@bx.psu.edu" from the database' + self.logout() + def test_005_deleting_histories( self ): + """Testing deleting histories""" + self.login( email='test@bx.psu.edu' ) + global admin_user + admin_user = galaxy.model.User.filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).first() + assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' + # Get the admin_user private role + global admin_user_private_role + admin_user_private_role = None + for role in admin_user.all_roles(): + if role.name == admin_user.email and role.description == 'Private Role for %s' % admin_user.email: + admin_user_private_role = role + break + if not admin_user_private_role: + raise AssertionError( "Private role not found for user '%s'" % admin_user.email ) + latest_history = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert latest_history is not None, "Problem retrieving latest history from database" + assert not latest_history.deleted, "After login, associated history is deleted" + self.delete_history( str( latest_history.id ) ) + latest_history.refresh() + if not latest_history.deleted: + raise AssertionError, "Problem deleting history id %d" % latest_history.id + # We'll now test deleting a list of histories + # After deleting the current history, a new one should have been created + global history1 + history1 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history1 is not None, "Problem retrieving history1 from database" + self.upload_file( '1.bed', dbkey='hg18' ) + self.new_history( name='history2' ) + global history2 + history2 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history2 is not None, "Problem retrieving history2 from database" + self.upload_file( '2.bed', dbkey='hg18' ) + ids = '%s,%s' % ( str( history1.id ), str( history2.id ) ) + self.delete_history( ids ) + try: + self.view_stored_active_histories( check_str=history1.name ) + raise AssertionError, "History %s is displayed in the active history list after it was deleted" % history1.name + except: + pass + self.view_stored_deleted_histories( check_str=history1.name ) + try: + self.view_stored_active_histories( check_str=history2.name ) + raise AssertionError, "History %s is displayed in the active history list after it was deleted" % history2.name + except: + pass + self.view_stored_deleted_histories( check_str=history2.name ) + history1.refresh() + if not history1.deleted: + raise AssertionError, "Problem deleting history id %d" % history1.id + if not history1.default_permissions: + raise AssertionError, "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history1.id + history2.refresh() + if not history2.deleted: + raise AssertionError, "Problem deleting history id %d" % history2.id + if not history2.default_permissions: + raise AssertionError, "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history2.id + def test_010_history_options_when_logged_in( self ): """Testing history options when logged in""" self.history_options() - self.check_page_for_string( 'Rename</a> current history' ) - self.check_page_for_string( 'List</a> previously stored histories' ) - self.check_page_for_string( 'Construct workflow</a> from the current history' ) - self.check_page_for_string( 'Share</a> current history' ) - # Tests for changing default history permissions are done in test_security_and_libraries.py - self.check_page_for_string( 'Change default permissions</a> for the current history' ) - self.check_page_for_string( 'Show deleted</a> datasets in history' ) - self.check_page_for_string( 'Delete</a> current history' ) - # Need to add a history item in order to create a new empty history + def test_015_history_rename( self ): + """Testing renaming a history""" + global history3 + history3 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history3 is not None, "Problem retrieving history3 from database" + if history3.deleted: + raise AssertionError, "History id %d deleted when it should not be" % latest_history.id + self.rename_history( str( history3.id ), history3.name, new_name='history3' ) + def test_020_history_list( self ): + """Testing viewing previously stored histories""" + self.view_stored_active_histories() + def test_025_history_share( self ): + """Testing sharing histories containing only public datasets""" + history3.refresh() + self.upload_file( '1.bed', dbkey='hg18' ) + # Test sharing a history with yourself + check_str = "You can't send histories to yourself." + self.share_history( str( history3.id ), 'test@bx.psu.edu', check_str ) + # Share a history with 1 valid user + name = history3.name + email = 'test1@bx.psu.edu' + check_str = 'Histories (%s) have been shared with: %s' % ( name, email ) + self.share_history( str( history3.id ), email, check_str ) + # We need to keep track of all shared histories so they can later be deleted + global history3_copy1 + history3_copy1 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history3_copy1 is not None, "Problem retrieving history3_copy1 from database" + self.logout() + self.login( email='test1@bx.psu.edu' ) + check_str = '%s from test@bx.psu.edu' % history3.name + self.view_stored_active_histories( check_str=check_str ) + self.logout() + self.login( email='test@bx.psu.edu' ) + # Need to delete history3_copy1 + history3_copy1.deleted = True + history3_copy1.flush() + # Test sharing a history with an invalid user + email = 'jack@jill.com' + check_str = '%s is not a valid Galaxy user.' % email + self.share_history( str( history3.id ), email, check_str ) + # Test sharing multiple histories with multiple users + self.new_history() + global history4 + history4 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history4 is not None, "Problem retrieving history4 from database" + self.rename_history( str( history4.id ), history4.name, new_name='history4' ) + history4.refresh() + self.upload_file( '2.bed', dbkey='hg18' ) + id = '%s,%s' % ( str( history3.id ), str( history4.id ) ) + name = '%s,%s' % ( history3.name, history4.name ) + email = 'test2@bx.psu.edu,test3@bx.psu.edu' + check_str = 'Histories (%s) have been shared with: %s' % ( name, email ) + self.share_history( id, email, check_str ) + # We need to keep track of all shared histories so they can later be deleted + history3_copy_name = "%s from %s" % ( history3.name, admin_user.email ) + history3_copies = galaxy.model.History \ + .filter( and_( galaxy.model.History.table.c.name==history3_copy_name, + galaxy.model.History.table.c.deleted==False ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .limit( 2 ) \ + .all() + history3_copy2 = history3_copies[0] + history3_copy3 = history3_copies[1] + history4_copy_name = "%s from %s" % ( history4.name, admin_user.email ) + history4_copyies = galaxy.model.History \ + .filter( and_( galaxy.model.History.table.c.name==history4_copy_name, + galaxy.model.History.table.c.deleted==False ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .limit( 2 ) \ + .all() + history4_copy1 = history4_copyies[0] + history4_copy2 = history4_copyies[1] + self.logout() + self.login( email='test2@bx.psu.edu' ) + check_str = '%s from %s' % ( history3.name, admin_user.email ) + self.view_stored_active_histories( check_str=check_str ) + check_str = '%s from %s' % ( history4.name, admin_user.email ) + self.view_stored_active_histories( check_str=check_str ) + self.logout() + self.login( email='test3@bx.psu.edu' ) + check_str = '%s from %s' % ( history3.name, admin_user.email ) + self.view_stored_active_histories( check_str=check_str ) + check_str = '%s from %s' % ( history4.name, admin_user.email ) + self.view_stored_active_histories( check_str=check_str ) + self.logout() + self.login( email='test@bx.psu.edu' ) + # Need to delete the copied histories, so later test runs are valid + history3_copy2.deleted = True + history3_copy2.flush() + history3_copy3.deleted = True + history3_copy3.flush() + history4_copy1.deleted = True + history4_copy1.flush() + history4_copy1.deleted = True + history4_copy1.flush() + history4_copy2.deleted = True + history4_copy2.flush() + def test_030_change_permissions_on_current_history( self ): + """Testing changing permissions on the current history""" + global history5 + history5 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history5 is not None, "Problem retrieving history5 from database" + self.rename_history( str( history5.id ), history5.name, new_name='history5' ) + history5.refresh() + # Due to the limitations of twill ( not functional with the permissions forms ), we're forced + # to do this manually. At this point, we just want to restrict the access permission on history5 + # to the admin_user + global access_action + access_action = galaxy.model.Dataset.permitted_actions.DATASET_ACCESS.action + dhp = galaxy.model.DefaultHistoryPermissions( history5, access_action, admin_user_private_role ) + dhp.flush() + self.upload_file( '1.bed', dbkey='hg18' ) + history5_dataset1 = None + for hda in history5.datasets: + if hda.name == '1.bed': + history5_dataset1 = hda.dataset + assert history5_dataset1 is not None, "Problem retrieving history5_dataset1 from the database" + # The permissions on the dataset should be restricted from sharing with anyone due to the + # inherited history permissions + restricted = False + for action in history5_dataset1.actions: + if action.action == access_action: + restricted = True + break + if not restricted: + raise AssertionError, "The 'access' permission is not set for history5_dataset1.actions" + def test_035_sharing_history_by_making_datasets_public( self ): + """Testing sharing a restricted history by making the datasets public""" + check_str = 'The following datasets can be shared with %s by updating their permissions' % regular_user1.email + action_check_str = 'Histories (%s) have been shared with: %s' % ( history5.name, regular_user1.email ) + self.share_history( str( history5.id ), regular_user1.email, check_str, action='public', action_check_str=action_check_str ) + history5_copy1 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history5_copy1 is not None, "Problem retrieving history5_copy1 from database" + self.logout() + self.login( email=regular_user1.email ) + self.visit_url( "%s/history/list" % self.url ) + self.check_page_for_string( history5_copy1.name ) + # Need to delete history5_copy1 on the history list page for regular_user1 + history5_copy1.deleted = True + history5_copy1.flush() + self.logout() + self.login( email=admin_user.email ) + def test_040_sharing_history_by_making_new_sharing_role( self ): + """Testing sharing a restricted history by associating a new sharing role with protected datasets""" + self.switch_history( id=str( history5.id ), name=history5.name ) + # At this point, history5 should have 1 item, 1.bed, which is public. We'll add another + # item which will be private to admin_user due to the permissions on history5 + self.upload_file( '2.bed', dbkey='hg18' ) + check_str = 'The following datasets can be shared with %s with no changes' % regular_user1.email + check_str2 = 'The following datasets can be shared with %s by updating their permissions' % regular_user1.email + action_check_str = 'Histories (%s) have been shared with: %s' % ( history5.name, regular_user1.email ) + self.share_history( str( history5.id ), + regular_user1.email, + check_str, + check_str2=check_str2, + action='private', + action_check_str=action_check_str ) + history5_copy2 = galaxy.model.History.query().order_by( desc( galaxy.model.History.table.c.create_time ) ).first() + assert history5_copy2 is not None, "Problem retrieving history5_copy2 from database" + # We should now have a new sharing role + global sharing_role + role_name = 'Sharing role for: %s, %s' % ( admin_user.email, regular_user1.email ) + sharing_role = galaxy.model.Role.filter( galaxy.model.Role.table.c.name==role_name ).first() + assert sharing_role is not None, "Problem retrieving sharing_role from the database" + self.logout() + self.login( email=regular_user1.email ) + self.visit_url( "%s/history/list" % self.url ) + self.check_page_for_string( history5_copy2.name ) + self.switch_history( id=str( history5_copy2.id ), name=history5_copy2.name ) + # Make sure both datasets are in the history + self.check_history_for_string( '1.bed' ) + self.check_history_for_string( '2.bed' ) + # Get both new hdas from the db that were created for the shared history + hda_1_bed = galaxy.model.HistoryDatasetAssociation \ + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_copy2.id, + galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) \ + .first() + assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" + hda_2_bed = galaxy.model.HistoryDatasetAssociation \ + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_copy2.id, + galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ + .first() + assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" + # Make sure 1.bed is accessible since it is public + self.display_history_item( str( hda_1_bed.id ), check_str='chr1' ) + # Make sure 2.bed is accessible since it is associated with a sharing role + self.display_history_item( str( hda_2_bed.id ), check_str='chr1' ) + # Need to delete history5_copy2 on the history list page for regular_user1 + history5_copy2.deleted = True + history5_copy2.flush() + self.logout() + self.login( email=admin_user.email ) + def test_045_sharing_private_history_with_multiple_users_by_changing_no_permissions( self ): + """Testing sharing a restricted history with multiple users, making no permission changes""" + # History5 can be shared with any user, since it contains a public dataset. However, only + # regular_user1 should be able to access history5's 2.bed dataset since it is associated with a + # sharing role, and regular_user2 should be able to access history5's 1.bed, but not 2.bed even + # though they can see it in their shared history. + self.switch_history( id=str( history5.id ), name=history5.name ) + email = '%s,%s' % ( regular_user1.email, regular_user2.email ) + check_str = 'The following datasets can be shared with %s with no changes' % email + check_str2 = 'The following datasets can be shared with %s by updating their permissions' % email + action_check_str = 'Histories (%s) have been shared with: %s' % ( history5.name, regular_user1.email ) + self.share_history( str( history5.id ), + email, + check_str, + check_str2=check_str2, + action='share', + action_check_str=action_check_str ) + # We need to keep track of all shared histories so they can later be deleted + history5_copy_name = "%s from %s" % ( history5.name, admin_user.email ) + history5_copies = galaxy.model.History \ + .filter( and_( galaxy.model.History.table.c.name==history5_copy_name, + galaxy.model.History.table.c.deleted==False ) ) \ + .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ + .limit( 2 ) \ + .all() + history5_copy3 = history5_copies[0] + assert history5_copy3 is not None, "Problem retrieving history5_copy3 from database" + history5_copy4 = history5_copies[1] + assert history5_copy4 is not None, "Problem retrieving history5_copy4 from database" + # Make sure test1@bx.psu.edu received a copy of history5 with both datasets accessible + self.login( email=regular_user1.email ) + check_str = '%s from %s' % ( history5.name, admin_user.email ) + self.view_stored_active_histories( check_str=check_str ) + self.switch_history( id=str( history5_copy3.id ), name=history5_copy3.name ) + self.check_history_for_string( '1.bed' ) + self.check_history_for_string( '2.bed' ) + self.logout() + # Make sure test2@bx.psu.edu received a copy of history5, with only 1.bed accessible + self.login( email=regular_user2.email ) + self.view_stored_active_histories( check_str=check_str ) + self.switch_history( id=str( history5_copy4.id ), name=history5_copy4.name ) + self.check_history_for_string( '1.bed' ) + self.check_history_for_string( '2.bed' ) + # Get both new hdas from the db that were created for the shared history + hda_1_bed = galaxy.model.HistoryDatasetAssociation \ + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_copy4.id, + galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) \ + .first() + assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" + hda_2_bed = galaxy.model.HistoryDatasetAssociation \ + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_copy4.id, + galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ + .first() + assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" + # Make sure 1.bed is accessible since it is public + self.display_history_item( str( hda_1_bed.id ), check_str='chr1' ) + # Make sure 2.bed is not accessible since it is protected try: - self.check_page_for_string( 'Create</a> a new empty history' ) - raise AssertionError, "Incorrectly able to create a new empty history when the current history is empty." + self.display_history_item( str( hda_2_bed.id ), check_str='chr1' ) + raise AssertionError, "History item 2.bed is accessible by user %s when is should not be" % regular_user2.email except: pass - self.upload_file( '1.bed', dbkey='hg18' ) - self.history_options() - self.check_page_for_string( 'Create</a> a new empty history' ) - def test_15_history_rename( self ): - """Testing renaming a history""" - id, old_name, new_name = self.rename_history() - self.check_page_for_string( 'History: %s renamed to: %s' %(old_name, new_name) ) - def test_20_history_list( self ): - """Testing viewing previously stored histories""" - self.view_stored_histories() - self.check_page_for_string( 'Stored histories' ) - self.check_page_for_string( '<input type="checkbox" name="id" value=' ) - self.check_page_for_string( 'operation=Rename&id' ) - self.check_page_for_string( 'operation=Switch&id' ) - self.check_page_for_string( 'operation=Delete&id' ) - def test_25_history_share( self ): - """Testing sharing a history with another user""" - self.upload_file('1.bed', dbkey='hg18') - id, name, email = self.share_history() - self.logout() - self.login( email=email ) - self.home() - check_str = 'Unnamed history from test@bx.psu.edu' - self.view_stored_histories( check_str=check_str ) - histories = self.get_histories() - for history in histories: - if history.get( 'name' ) == 'Unnamed history from test@bx.psu.edu': - id = history.get( 'id' ) - break - self.assertTrue( id ) - self.delete_history( id ) + self.check_history_for_string( 'You do not have permission to view this dataset' ) self.logout() self.login( email='test@bx.psu.edu' ) - def test_30_history_show_and_hide_deleted_datasets( self ): + # Need to delete the copied histories, so later test runs are valid + history5_copy3.deleted = True + history5_copy3.flush() + history5_copy4.deleted = True + history5_copy4.flush() + + + + def test_050_sharing_private_history_by_choosing_to_not_share( self ): + """Testing sharing a restricted history with multiple users by choosing not to share""" + self.switch_history( id=str( history5.id ), name=history5.name ) + email = '%s,%s' % ( regular_user1.email, regular_user2.email ) + check_str = 'The following datasets can be shared with %s with no changes' % email + check_str2 = 'The following datasets can be shared with %s by updating their permissions' % email + action_check_str = 'History Options' + self.share_history( str( history5.id ), + email, + check_str, + check_str2=check_str2, + action='no_share' ) + def test_055_history_show_and_hide_deleted_datasets( self ): """Testing displaying deleted history items""" - self.new_history() + self.new_history( name='temp_history1' ) self.upload_file('1.bed', dbkey='hg18') latest_hda = galaxy.model.HistoryDatasetAssociation.query() \ .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ).first() @@ -85,9 +378,9 @@ self.home() self.visit_url( "%s/history/?show_deleted=False" % self.url ) self.check_page_for_string( 'Your history is empty' ) - def test_35_deleting_and_undeleting_history_items( self ): + def test_060_deleting_and_undeleting_history_items( self ): """Testing deleting and un-deleting history items""" - self.new_history() + self.new_history( name='temp_history2' ) # Add a new history item self.upload_file( '1.bed', dbkey='hg15' ) self.home() @@ -110,6 +403,8 @@ self.visit_url( "%s/history/?show_deleted=False" % self.url ) self.check_page_for_string( '1.bed' ) self.check_page_for_string( 'hg15' ) - def test_9999_clean_up( self ): - self.delete_history() - self.logout() + def test_065_reset_data_for_later_test_runs( self ): + """Reseting data to enable later test runs to pass""" + self.delete_history( id=str( history3.id ) ) + self.delete_history( id=str( history4.id ) ) + self.delete_history( id=str( history5.id ) ) diff -r 73847f425801 -r 9a71b89082fe test/functional/test_security_and_libraries.py --- a/test/functional/test_security_and_libraries.py Mon Jun 01 10:27:04 2009 -0400 +++ b/test/functional/test_security_and_libraries.py Fri Jun 05 11:15:25 2009 -0400 @@ -40,7 +40,7 @@ global admin_user admin_user = galaxy.model.User.filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ).first() assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' - # Get the admin user's privat role for later use + # Get the admin user's private role for later use global admin_user_private_role admin_user_private_role = None for role in admin_user.all_roles(): @@ -136,7 +136,7 @@ dps.append( dp.action ) # Sort actions for later comparison dps.sort() - # Compare DatasetPermissionss with permissions_in - should be the same + # Compare DatasetPermissions with permissions_in - should be the same if dps != actions_in: raise AssertionError( 'DatasetPermissionss "%s" for dataset id %d differ from changed default permissions "%s"' \ % ( str( dps ), latest_dataset.id, str( actions_in ) ) ) @@ -145,14 +145,26 @@ raise AssertionError( 'DatasetPermissionss "%s" for dataset id %d differ from DefaultHistoryPermissions "%s" for history id %d' \ % ( str( dps ), latest_dataset.id, str( dhps ), latest_history.id ) ) # Since the dataset in the history is now private, we can test sharing with another user - self.share_history_containing_private_datasets( str( latest_history.id ), email=admin_user.email ) # Test making the dataset in the history public - self.make_datasets_public( str( latest_history.id ), email=admin_user.email ) + check_str = 'The following datasets can be shared with %s by updating their permissions' % admin_user.email + action_check_str = 'Histories (%s) have been shared with: %s' % ( latest_history.name, admin_user.email ) + self.share_history( str( latest_history.id ), + admin_user.email, + check_str, + action='public', + action_check_str=action_check_str ) # Add another dataset to the history, it should be private since that is now our default self.upload_file( '2.bed' ) - self.share_history_containing_private_datasets( str( latest_history.id ), email=admin_user.email ) - # Test creating a new sharing role for sharing the private datasets - self.privately_share_dataset( str( latest_history.id ), email=admin_user.email ) + # Test creating a new sharing role for the private dataset + check_str = 'The following datasets can be shared with %s with no changes' % admin_user.email + check_str2 = 'The following datasets can be shared with %s by updating their permissions' % admin_user.email + action_check_str = 'Histories (%s) have been shared with: %s' % ( latest_history.name, admin_user.email ) + self.share_history( str( latest_history.id ), + admin_user.email, + check_str, + check_str2=check_str2, + action='private', + action_check_str=action_check_str ) role_type = 'sharing' role_name = 'Sharing role for: %s, %s' % ( regular_user1.email, admin_user.email ) global sharing_role diff -r 73847f425801 -r 9a71b89082fe tool_conf.xml.sample --- a/tool_conf.xml.sample Mon Jun 01 10:27:04 2009 -0400 +++ b/tool_conf.xml.sample Fri Jun 05 11:15:25 2009 -0400 @@ -13,6 +13,7 @@ <tool file="data_source/wormbase_test.xml" /> <tool file="data_source/flymine.xml" /> <tool file="data_source/flymine_test.xml" /> + <tool file="data_source/eupathdb.xml" /> <tool file="data_source/encode_db.xml" /> <tool file="data_source/epigraph_import.xml" /> <tool file="data_source/epigraph_import_test.xml" />
participants (1)
-
Greg Von Kuster