1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/56dd83576ce6/ Changeset: 56dd83576ce6 User: jmchilton Date: 2014-04-27 03:27:07 Summary: Refactor managers into their own module. Allows cleaner reuse outside of controllers. Small PEP8 fixes. Affected #: 5 files diff -r 9dbd0de27e53a2cac3c604c0aa6fb70ffb5ac7d1 -r 56dd83576ce67763863b61ad85104f8d14d978e0 lib/galaxy/managers/__init__.py --- /dev/null +++ b/lib/galaxy/managers/__init__.py @@ -0,0 +1,4 @@ +""" 'Business logic' independent of web transactions/user context (trans) +should be pushed into models - but logic that requires the context trans +should be placed under this module. +""" diff -r 9dbd0de27e53a2cac3c604c0aa6fb70ffb5ac7d1 -r 56dd83576ce67763863b61ad85104f8d14d978e0 lib/galaxy/managers/hdas.py --- /dev/null +++ b/lib/galaxy/managers/hdas.py @@ -0,0 +1,67 @@ +from galaxy import exceptions +from ..managers import histories + + +class HDAManager( object ): + + def __init__( self ): + self.histories_mgr = histories.HistoryManager() + + def get( self, trans, unencoded_id, check_ownership=True, check_accessible=True ): + """ + """ + # this is a replacement for UsesHistoryDatasetAssociationMixin because mixins are a bad soln/structure + hda = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( unencoded_id ) + if hda is None: + raise exceptions.ObjectNotFound() + hda = self.secure( trans, hda, check_ownership, check_accessible ) + return hda + + def secure( self, trans, hda, check_ownership=True, check_accessible=True ): + """ + checks if (a) user owns item or (b) item is accessible to user. + """ + # all items are accessible to an admin + if trans.user and trans.user_is_admin(): + return hda + if check_ownership: + hda = self.check_ownership( trans, hda ) + if check_accessible: + hda = self.check_accessible( trans, hda ) + return hda + + def can_access_dataset( self, trans, hda ): + current_user_roles = trans.get_current_user_roles() + return trans.app.security_agent.can_access_dataset( current_user_roles, hda.dataset ) + + #TODO: is_owner, is_accessible + + def check_ownership( self, trans, hda ): + if not trans.user: + #if hda.history == trans.history: + # return hda + raise exceptions.AuthenticationRequired( "Must be logged in to manage Galaxy datasets", type='error' ) + if trans.user_is_admin(): + return hda + # check for ownership of the containing history and accessibility of the underlying dataset + if( self.histories_mgr.is_owner( trans, hda.history ) + and self.can_access_dataset( trans, hda ) ): + return hda + raise exceptions.ItemOwnershipException( + "HistoryDatasetAssociation is not owned by the current user", type='error' ) + + def check_accessible( self, trans, hda ): + if trans.user and trans.user_is_admin(): + return hda + # check for access of the containing history... + self.histories_mgr.check_accessible( trans, hda.history ) + # ...then the underlying dataset + if self.can_access_dataset( trans, hda ): + return hda + raise exceptions.ItemAccessibilityException( + "HistoryDatasetAssociation is not accessible to the current user", type='error' ) + + def err_if_uploading( self, trans, hda ): + if hda.state == trans.model.Dataset.states.UPLOAD: + raise exceptions.Conflict( "Please wait until this dataset finishes uploading" ) + return hda diff -r 9dbd0de27e53a2cac3c604c0aa6fb70ffb5ac7d1 -r 56dd83576ce67763863b61ad85104f8d14d978e0 lib/galaxy/managers/histories.py --- /dev/null +++ b/lib/galaxy/managers/histories.py @@ -0,0 +1,93 @@ +from galaxy import exceptions +from galaxy.model import orm + + +class HistoryManager( object ): + #TODO: all the following would be more useful if passed the user instead of defaulting to trans.user + + def get( self, trans, unencoded_id, check_ownership=True, check_accessible=True, deleted=None ): + """ + Get a History from the database by id, verifying ownership. + """ + # this is a replacement for UsesHistoryMixin because mixins are a bad soln/structure + history = trans.sa_session.query( trans.app.model.History ).get( unencoded_id ) + if history is None: + raise exceptions.ObjectNotFound() + if deleted is True and not history.deleted: + raise exceptions.ItemDeletionException( 'History "%s" is not deleted' % ( history.name ), type="error" ) + elif deleted is False and history.deleted: + raise exceptions.ItemDeletionException( 'History "%s" is deleted' % ( history.name ), type="error" ) + + history = self.secure( trans, history, check_ownership, check_accessible ) + return history + + def by_user( self, trans, user=None, include_deleted=False, only_deleted=False ): + """ + Get all the histories for a given user (defaulting to `trans.user`) + ordered by update time and filtered on whether they've been deleted. + """ + # handle default and/or anonymous user (which still may not have a history yet) + user = user or trans.user + if not user: + current_history = trans.get_history() + return [ current_history ] if current_history else [] + + history_model = trans.model.History + query = ( trans.sa_session.query( history_model ) + .filter( history_model.user == user ) + .order_by( orm.desc( history_model.table.c.update_time ) ) ) + if only_deleted: + query = query.filter( history_model.deleted == True ) + elif not include_deleted: + query = query.filter( history_model.deleted == False ) + return query.all() + + def secure( self, trans, history, check_ownership=True, check_accessible=True ): + """ + checks if (a) user owns item or (b) item is accessible to user. + """ + # all items are accessible to an admin + if trans.user and trans.user_is_admin(): + return history + if check_ownership: + history = self.check_ownership( trans, history ) + if check_accessible: + history = self.check_accessible( trans, history ) + return history + + def is_current( self, trans, history ): + return trans.history == history + + def is_owner( self, trans, history ): + # anon users are only allowed to view their current history + if not trans.user: + return self.is_current( trans, history ) + return trans.user == history.user + + def check_ownership( self, trans, history ): + if trans.user and trans.user_is_admin(): + return history + if not trans.user and not self.is_current( trans, history ): + raise exceptions.AuthenticationRequired( "Must be logged in to manage Galaxy histories", type='error' ) + if self.is_owner( trans, history ): + return history + raise exceptions.ItemOwnershipException( "History is not owned by the current user", type='error' ) + + def is_accessible( self, trans, history ): + # admin always have access + if trans.user and trans.user_is_admin(): + return True + # owner has implicit access + if self.is_owner( trans, history ): + return True + # importable and shared histories are always accessible + if history.importable: + return True + if trans.user in history.users_shared_with_dot_users: + return True + return False + + def check_accessible( self, trans, history ): + if self.is_accessible( trans, history ): + return history + raise exceptions.ItemAccessibilityException( "History is not accessible to the current user", type='error' ) diff -r 9dbd0de27e53a2cac3c604c0aa6fb70ffb5ac7d1 -r 56dd83576ce67763863b61ad85104f8d14d978e0 lib/galaxy/webapps/galaxy/api/histories.py --- a/lib/galaxy/webapps/galaxy/api/histories.py +++ b/lib/galaxy/webapps/galaxy/api/histories.py @@ -18,7 +18,7 @@ from galaxy.web.base.controller import ExportsHistoryMixin from galaxy.web.base.controller import ImportsHistoryMixin -from galaxy.model import orm +from galaxy.managers import histories from galaxy import util from galaxy.util import string_as_bool @@ -35,7 +35,7 @@ def __init__( self, app ): super( HistoriesController, self ).__init__( app ) self.mgrs = util.bunch.Bunch( - histories = HistoryManager() + histories=histories.HistoryManager() ) def _decode_id( self, trans, id ): @@ -404,96 +404,3 @@ pass #log.warn( 'unknown key: %s', str( key ) ) return validated_payload - - - - -class HistoryManager( object ): - #TODO: all the following would be more useful if passed the user instead of defaulting to trans.user - - def get( self, trans, unencoded_id, check_ownership=True, check_accessible=True, deleted=None ): - """ - Get a History from the database by id, verifying ownership. - """ - # this is a replacement for UsesHistoryMixin because mixins are a bad soln/structure - history = trans.sa_session.query( trans.app.model.History ).get( unencoded_id ) - if history is None: - raise exceptions.ObjectNotFound() - if deleted == True and not history.deleted: - raise exceptions.ItemDeletionException( 'History "%s" is not deleted' % ( history.name ), type="error" ) - elif deleted == False and history.deleted: - raise exceptions.ItemDeletionException( 'History "%s" is deleted' % ( history.name ), type="error" ) - - history = self.secure( trans, history, check_ownership, check_accessible ) - return history - - def by_user( self, trans, user=None, include_deleted=False, only_deleted=False ): - """ - Get all the histories for a given user (defaulting to `trans.user`) - ordered by update time and filtered on whether they've been deleted. - """ - # handle default and/or anonymous user (which still may not have a history yet) - user = user or trans.user - if not user: - current_history = trans.get_history() - return [ current_history ] if current_history else [] - - history_model = trans.model.History - query = ( trans.sa_session.query( history_model ) - .filter( history_model.user == user ) - .order_by( orm.desc( history_model.table.c.update_time ) ) ) - if only_deleted: - query = query.filter( history_model.deleted == True ) - elif not include_deleted: - query = query.filter( history_model.deleted == False ) - return query.all() - - def secure( self, trans, history, check_ownership=True, check_accessible=True ): - """ - checks if (a) user owns item or (b) item is accessible to user. - """ - # all items are accessible to an admin - if trans.user and trans.user_is_admin(): - return history - if check_ownership: - history = self.check_ownership( trans, history ) - if check_accessible: - history = self.check_accessible( trans, history ) - return history - - def is_current( self, trans, history ): - return trans.history == history - - def is_owner( self, trans, history ): - # anon users are only allowed to view their current history - if not trans.user: - return self.is_current( trans, history ) - return trans.user == history.user - - def check_ownership( self, trans, history ): - if trans.user and trans.user_is_admin(): - return history - if not trans.user and not self.is_current( trans, history ): - raise exceptions.AuthenticationRequired( "Must be logged in to manage Galaxy histories", type='error' ) - if self.is_owner( trans, history ): - return history - raise exceptions.ItemOwnershipException( "History is not owned by the current user", type='error' ) - - def is_accessible( self, trans, history ): - # admin always have access - if trans.user and trans.user_is_admin(): - return True - # owner has implicit access - if self.is_owner( trans, history ): - return True - # importable and shared histories are always accessible - if history.importable: - return True - if trans.user in history.users_shared_with_dot_users: - return True - return False - - def check_accessible( self, trans, history ): - if self.is_accessible( trans, history ): - return history - raise exceptions.ItemAccessibilityException( "History is not accessible to the current user", type='error' ) diff -r 9dbd0de27e53a2cac3c604c0aa6fb70ffb5ac7d1 -r 56dd83576ce67763863b61ad85104f8d14d978e0 lib/galaxy/webapps/galaxy/api/history_contents.py --- a/lib/galaxy/webapps/galaxy/api/history_contents.py +++ b/lib/galaxy/webapps/galaxy/api/history_contents.py @@ -7,7 +7,6 @@ from galaxy.web import _future_expose_api as expose_api from galaxy.web import _future_expose_api_anonymous as expose_api_anonymous -from galaxy.web import _future_expose_api_raw as expose_api_raw from galaxy.web.base.controller import BaseAPIController from galaxy.web.base.controller import UsesHistoryDatasetAssociationMixin @@ -18,7 +17,8 @@ from galaxy.web.base.controller import url_for -from galaxy.webapps.galaxy.api import histories +from galaxy.managers import histories +from galaxy.managers import hdas import logging log = logging.getLogger( __name__ ) @@ -30,8 +30,8 @@ def __init__( self, app ): super( HistoryContentsController, self ).__init__( app ) self.mgrs = util.bunch.Bunch( - histories = histories.HistoryManager(), - hdas = HDAManager() + histories=histories.HistoryManager(), + hdas=hdas.HDAManager() ) def _decode_id( self, trans, id ): @@ -434,67 +434,3 @@ def __handle_unknown_contents_type( self, trans, contents_type ): raise exceptions.UnknownContentsType('Unknown contents type: %s' % type) - -class HDAManager( object ): - - def __init__( self ): - self.histories_mgr = histories.HistoryManager() - - def get( self, trans, unencoded_id, check_ownership=True, check_accessible=True ): - """ - """ - # this is a replacement for UsesHistoryDatasetAssociationMixin because mixins are a bad soln/structure - hda = trans.sa_session.query( trans.app.model.HistoryDatasetAssociation ).get( unencoded_id ) - if hda is None: - raise exceptions.ObjectNotFound() - hda = self.secure( trans, hda, check_ownership, check_accessible ) - return hda - - def secure( self, trans, hda, check_ownership=True, check_accessible=True ): - """ - checks if (a) user owns item or (b) item is accessible to user. - """ - # all items are accessible to an admin - if trans.user and trans.user_is_admin(): - return hda - if check_ownership: - hda = self.check_ownership( trans, hda ) - if check_accessible: - hda = self.check_accessible( trans, hda ) - return hda - - def can_access_dataset( self, trans, hda ): - current_user_roles = trans.get_current_user_roles() - return trans.app.security_agent.can_access_dataset( current_user_roles, hda.dataset ) - - #TODO: is_owner, is_accessible - - def check_ownership( self, trans, hda ): - if not trans.user: - #if hda.history == trans.history: - # return hda - raise exceptions.AuthenticationRequired( "Must be logged in to manage Galaxy datasets", type='error' ) - if trans.user_is_admin(): - return hda - # check for ownership of the containing history and accessibility of the underlying dataset - if( self.histories_mgr.is_owner( trans, hda.history ) - and self.can_access_dataset( trans, hda ) ): - return hda - raise exceptions.ItemOwnershipException( - "HistoryDatasetAssociation is not owned by the current user", type='error' ) - - def check_accessible( self, trans, hda ): - if trans.user and trans.user_is_admin(): - return hda - # check for access of the containing history... - self.histories_mgr.check_accessible( trans, hda.history ) - # ...then the underlying dataset - if self.can_access_dataset( trans, hda ): - return hda - raise exceptions.ItemAccessibilityException( - "HistoryDatasetAssociation is not accessible to the current user", type='error' ) - - def err_if_uploading( self, trans, hda ): - if hda.state == trans.model.Dataset.states.UPLOAD: - raise exceptions.Conflict( "Please wait until this dataset finishes uploading" ) - return hda Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.