1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/5a7a7003fbc3/
Changeset: 5a7a7003fbc3
User: martenson
Date: 2014-08-27 19:26:36
Summary: initial implementation of library folder manager
Affected #: 2 files
diff -r 8c0b86881425de9ba43c6aa604e868236cb7a9c9 -r
5a7a7003fbc3bc8f9d56f18a8d4b8f2d5b28fe42 lib/galaxy/managers/folders.py
--- /dev/null
+++ b/lib/galaxy/managers/folders.py
@@ -0,0 +1,160 @@
+"""
+Manager and Serializer for Library Folders.
+"""
+
+import galaxy.web
+from galaxy import exceptions
+from galaxy.model import orm
+
+import logging
+log = logging.getLogger( __name__ )
+
+
+# =============================================================================
+class FolderManager( object ):
+ """
+ Interface/service object for interacting with folders.
+ """
+
+ def get( self, trans, decoded_folder_id, check_ownership=False,
check_accessible=True):
+ """
+ Get the folder from the DB.
+
+ :param decoded_folder_id: decoded folder id
+ :type decoded_folder_id: int
+ :param check_ownership: flag whether the check that user is owner
+ :type check_ownership: bool
+ :param check_accessible: flag whether to check that user can access item
+ :type check_accessible: bool
+
+ :returns: the requested folder
+ :rtype: LibraryFolder
+ """
+ try:
+ folder = trans.sa_session.query( trans.app.model.LibraryFolder ).filter(
trans.app.model.LibraryFolder.table.c.id == decoded_folder_id ).one()
+ except MultipleResultsFound:
+ raise exceptions.InconsistentDatabase( 'Multiple folders found with the
same id.' )
+ except NoResultFound:
+ raise exceptions.RequestParameterInvalidException( 'No folder found with
the id provided.' )
+ except Exception, e:
+ raise exceptions.InternalServerError( 'Error loading from the
database.' + str( e ) )
+
+ folder = self.secure( trans, folder, check_ownership, check_accessible )
+
+ return folder
+
+ def secure( self, trans, folder, check_ownership=True, check_accessible=True ):
+ """
+ Check if (a) user owns folder or (b) folder is accessible to user.
+
+ :param folder: folder item
+ :type folder: LibraryFolder
+ :param check_ownership: flag whether the check that user is owner
+ :type check_ownership: bool
+ :param check_accessible: flag whether to check that user can access item
+ :type check_accessible: bool
+
+ :returns: the original folder
+ :rtype: LibraryFolder
+ """
+ # all folders are accessible to an admin
+ if trans.user_is_admin():
+ return folder
+ if check_ownership:
+ folder = self.check_ownership( trans, folder )
+ if check_accessible:
+ folder = self.check_accessible( trans, folder )
+ return folder
+
+ def check_ownership( self, trans, folder ):
+ """
+ Check whether the user is owner of the folder.
+
+ :returns: the original folder
+ :rtype: LibraryFolder
+ """
+ if not trans.user:
+ raise exceptions.AuthenticationRequired( "Must be logged in to manage
Galaxy items", type='error' )
+ if folder.user != trans.user:
+ raise exceptions.ItemOwnershipException( "Folder is not owned by the
current user", type='error' )
+ else:
+ return folder
+
+ def check_accessible( self, trans, folder ):
+ """
+ Check whether the folder is accessible to current user.
+ By default every folder is accessible (contents have their own permissions).
+ """
+ return True
+
+ def get_folder_dict( self, trans, folder ):
+ """
+ Return folder data in the form of a dictionary.
+
+ :param folder: folder item
+ :type folder: LibraryFolder
+
+ :returns: dict with data about the new folder
+ :rtype: dictionary
+
+ """
+ folder_dict = folder.to_dict( view='element' )
+ folder_dict = trans.security.encode_all_ids( folder_dict, True )
+ folder_dict[ 'id' ] = 'F' + folder_dict[ 'id' ]
+ if folder_dict[ 'parent_id' ] is not None:
+ folder_dict[ 'parent_id' ] = 'F' + folder_dict[
'parent_id' ]
+ return folder_dict
+
+ def create( self, trans, parent_folder_id, new_folder_name,
new_folder_description='' ):
+ """
+ Create a new folder under the given folder.
+
+ :param parent_folder_id: decoded id
+ :type parent_folder_id: int
+ :param new_folder_name: name of the new folder
+ :type new_folder_name: str
+ :param new_folder_description: description of the folder (optional, defaults to
empty string)
+ :type new_folder_description: str
+
+ :returns: the new folder
+ :rtype: LibraryFolder
+
+ :raises: InsufficientPermissionsException
+ """
+ parent_folder = self.get( trans, parent_folder_id )
+ current_user_roles = trans.get_current_user_roles()
+ if not ( trans.user_is_admin or trans.app.security_agent.can_add_library_item(
current_user_roles, parent_folder ) ):
+ raise exceptions.InsufficientPermissionsException( 'You do not have
proper permission to create folders under given folder.' )
+ new_folder = trans.app.model.LibraryFolder( name=new_folder_name,
description=new_folder_description )
+ # We are associating the last used genome build with folders, so we will always
+ # initialize a new folder with the first dbkey in genome builds list which is
currently
+ # ? unspecified (?)
+ new_folder.genome_build = trans.app.genome_builds.default_value
+ parent_folder.add_folder( new_folder )
+ trans.sa_session.add( new_folder )
+ trans.sa_session.flush()
+ # New folders default to having the same permissions as their parent folder
+ trans.app.security_agent.copy_library_permissions( trans, parent_folder,
new_folder )
+ return new_folder
+
+ def get_current_roles( self, trans, folder ):
+ """
+ Find all roles currently connected to relevant permissions
+ on the folder.
+
+ :param folder: the model object
+ :type folder: LibraryFolder
+
+ :returns: dict of current roles for all available permission types
+ :rtype: dictionary
+ """
+ # Omit duplicated roles by converting to set
+ modify_roles = set( trans.app.security_agent.get_roles_for_action( folder,
trans.app.security_agent.permitted_actions.LIBRARY_MODIFY ) )
+ manage_roles = set( trans.app.security_agent.get_roles_for_action( folder,
trans.app.security_agent.permitted_actions.LIBRARY_MANAGE ) )
+ add_roles = set( trans.app.security_agent.get_roles_for_action( folder,
trans.app.security_agent.permitted_actions.LIBRARY_ADD ) )
+
+ modify_folder_role_list = [ modify_role.name for modify_role in modify_roles ]
+ manage_folder_role_list = [ manage_role.name for manage_role in manage_roles ]
+ add_library_item_role_list = [ add_role.name for add_role in add_roles ]
+
+ return dict( modify_folder_role_list=modify_folder_role_list,
manage_folder_role_list=manage_folder_role_list,
add_library_item_role_list=add_library_item_role_list )
diff -r 8c0b86881425de9ba43c6aa604e868236cb7a9c9 -r
5a7a7003fbc3bc8f9d56f18a8d4b8f2d5b28fe42 lib/galaxy/webapps/galaxy/api/folders.py
--- a/lib/galaxy/webapps/galaxy/api/folders.py
+++ b/lib/galaxy/webapps/galaxy/api/folders.py
@@ -1,16 +1,10 @@
"""
API operations on library folders
"""
-# import os
-# import shutil
-# import urllib
-# import re
-# import socket
-# import traceback
-# import string
from galaxy import util
from galaxy import web
from galaxy import exceptions
+from galaxy.managers import folders
from galaxy.web import _future_expose_api as expose_api
from galaxy.web.base.controller import BaseAPIController, UsesLibraryMixin,
UsesLibraryMixinItems
from sqlalchemy.orm.exc import MultipleResultsFound
@@ -22,10 +16,14 @@
class FoldersController( BaseAPIController, UsesLibraryMixin, UsesLibraryMixinItems ):
+ def __init__( self, app ):
+ super( FoldersController, self ).__init__( app )
+ self.folder_manager = folders.FolderManager()
+
@web.expose_api
def index( self, trans, **kwd ):
"""
- GET /api/folders/
+ *GET /api/folders/
This would normally display a list of folders. However, that would
be across multiple libraries, so it's not implemented.
"""
@@ -45,12 +43,8 @@
:returns: dictionary including details of the folder
:rtype: dict
"""
- folder_id_without_prefix = self.__cut_the_prefix( id )
- content = self.get_library_folder( trans, folder_id_without_prefix,
check_ownership=False, check_accessible=True )
- return_dict = self.encode_all_ids( trans, content.to_dict( view='element'
) )
- return_dict[ 'id' ] = 'F' + return_dict[ 'id' ]
- if return_dict[ 'parent_id' ] is not None:
- return_dict[ 'parent_id' ] = 'F' + return_dict[
'parent_id' ]
+ folder = self.folder_manager.get( trans, self.__cut_and_decode( trans, id ),
check_ownership=False, check_accessible=True )
+ return_dict = self.folder_manager.get_folder_dict( trans, folder )
return return_dict
@expose_api
@@ -74,47 +68,20 @@
:returns: information about newly created folder, notably including ID
:rtype: dictionary
- :raises: RequestParameterMissingException, MalformedId, InternalServerError
+ :raises: RequestParameterMissingException
"""
-
payload = kwd.get( 'payload', None )
if payload is None:
- raise exceptions.RequestParameterMissingException( "Missing required
parameters 'encoded_parent_folder_id' and 'name'." )
+ raise exceptions.RequestParameterMissingException( "Missing required
parameter 'name'." )
name = payload.get( 'name', None )
+ if name is None:
+ raise exceptions.RequestParameterMissingException( "Missing required
parameter 'name'." )
description = payload.get( 'description', '' )
- if encoded_parent_folder_id is None:
- raise exceptions.RequestParameterMissingException( "Missing required
parameter 'encoded_parent_folder_id'." )
- elif name is None:
- raise exceptions.RequestParameterMissingException( "Missing required
parameter 'name'." )
-
- encoded_parent_folder_id = self.__cut_the_prefix( encoded_parent_folder_id )
- decoded_parent_folder_id = self.__decode_folder_id( trans,
encoded_parent_folder_id )
- parent_folder = self.__load_folder( trans, decoded_parent_folder_id )
+ decoded_parent_folder_id = self.__cut_and_decode( trans, encoded_parent_folder_id
)
+ parent_folder = self.folder_manager.get( trans, decoded_parent_folder_id )
+ new_folder = self.folder_manager.create( trans, parent_folder.id, name,
description )
+ return self.folder_manager.get_folder_dict( trans, new_folder )
- library = parent_folder.parent_library
- if library.deleted:
- raise exceptions.ObjectAttributeInvalidException( 'You cannot create
folder within a deleted library. Undelete it first.' )
-
- # TODO: refactor the functionality for use in manager instead of calling another
controller
- params = dict( [ ( "name", name ), ( "description",
description ) ] )
- status, output =
trans.webapp.controllers['library_common'].create_folder( trans, 'api',
encoded_parent_folder_id, '', **params )
-
- if 200 == status and len( output.items() ) == 1:
- for k, v in output.items():
- try:
- folder = trans.sa_session.query( trans.app.model.LibraryFolder ).get(
v.id )
- except Exception, e:
- raise exceptions.InternalServerError( 'Error loading from the
database.' + str( e ))
- if folder:
- update_time = folder.update_time.strftime( "%Y-%m-%d %I:%M
%p" )
- return_dict = self.encode_all_ids( trans, folder.to_dict(
view='element' ) )
- return_dict[ 'update_time' ] = update_time
- return_dict[ 'parent_id' ] = 'F' + return_dict[
'parent_id' ]
- return_dict[ 'id' ] = 'F' + return_dict[ 'id'
]
- return return_dict
- else:
- raise exceptions.InternalServerError( 'Error while creating a
folder.' + str( e ) )
-
@expose_api
def get_permissions( self, trans, encoded_folder_id, **kwd ):
"""
@@ -135,19 +102,16 @@
"""
current_user_roles = trans.get_current_user_roles()
is_admin = trans.user_is_admin()
-
encoded_folder_id = self.__cut_the_prefix( encoded_folder_id )
decoded_folder_id = self.__decode_folder_id( trans, encoded_folder_id )
- folder = self.__load_folder( trans, decoded_folder_id )
+ folder = self.folder_manager.get( trans, decoded_folder_id )
- if not ( is_admin or trans.app.security_agent.can_manage_library_item(
current_user_roles, library ) ):
+ if not ( is_admin or trans.app.security_agent.can_manage_library_item(
current_user_roles, folder ) ):
raise exceptions.InsufficientPermissionsException( 'You do not have
proper permission to access permissions of this folder.' )
scope = kwd.get( 'scope', None )
-
if scope == 'current' or scope is None:
- return self._get_current_roles( trans, folder )
-
+ return self.folder_manager.get_current_roles( trans, folder )
# Return roles that are available to select.
elif scope == 'available':
page = kwd.get( 'page', None )
@@ -155,17 +119,13 @@
page = int( page )
else:
page = 1
-
page_limit = kwd.get( 'page_limit', None )
if page_limit is not None:
page_limit = int( page_limit )
else:
page_limit = 10
-
query = kwd.get( 'q', None )
-
roles, total_roles = trans.app.security_agent.get_valid_roles( trans, folder,
query, page, page_limit )
-
return_roles = []
for role in roles:
return_roles.append( dict( id=role.name, name=role.name, type=role.type )
)
@@ -203,7 +163,7 @@
current_user_roles = trans.get_current_user_roles()
decoded_folder_id = self.__decode_folder_id( trans, self.__cut_the_prefix(
encoded_folder_id ) )
- folder = self.__load_folder( trans, decoded_folder_id )
+ folder = self.folder_manager.get( trans, decoded_folder_id )
if not ( is_admin or trans.app.security_agent.can_manage_library_item(
current_user_roles, folder ) ):
raise exceptions.InsufficientPermissionsException( 'You do not have
proper permission to modify permissions of this folder.' )
@@ -266,8 +226,7 @@
else:
raise exceptions.RequestParameterInvalidException( 'The mandatory
parameter "action" has an invalid value.'
'Allowed values are: "set_permissions"'
)
-
- return self._get_current_roles( trans, folder )
+ return self.folder_manager.get_current_roles( trans, folder )
@web.expose_api
def update( self, trans, id, library_id, payload, **kwd ):
@@ -297,42 +256,11 @@
raise exceptions.MalformedId( "Malformed folder id ( %s ) specified,
unable to decode" % ( str( encoded_id ) ) )
return decoded_id
- def __load_folder( self, trans, folder_id ):
+ def __cut_and_decode( self, trans, encoded_folder_id ):
"""
- Load the folder from the DB.
+ Cuts the prefix (the prepended 'F') and returns the decoded id.
"""
- try:
- folder = trans.sa_session.query( trans.app.model.LibraryFolder ).filter(
trans.app.model.LibraryFolder.table.c.id == folder_id ).one()
- except MultipleResultsFound:
- raise exceptions.InconsistentDatabase( 'Multiple folders found with the
same id.' )
- except NoResultFound:
- raise exceptions.RequestParameterInvalidException( 'No folder found with
the id provided.' )
- except Exception, e:
- raise exceptions.InternalServerError( 'Error loading from the
database.' + str( e ) )
- return folder
-
-
- def _get_current_roles( self, trans, folder ):
- """
- Find all roles currently connected to relevant permissions
- on the folder.
-
- :param folder: the model object
- :type folder: LibraryFolder
-
- :rtype: dictionary
- :returns: dict of current roles for all available permission types
- """
- # Omit duplicated roles by converting to set
- modify_roles = set( trans.app.security_agent.get_roles_for_action( folder,
trans.app.security_agent.permitted_actions.LIBRARY_MODIFY ) )
- manage_roles = set( trans.app.security_agent.get_roles_for_action( folder,
trans.app.security_agent.permitted_actions.LIBRARY_MANAGE ) )
- add_roles = set( trans.app.security_agent.get_roles_for_action( folder,
trans.app.security_agent.permitted_actions.LIBRARY_ADD ) )
-
- modify_folder_role_list = [ modify_role.name for modify_role in modify_roles ]
- manage_folder_role_list = [ manage_role.name for manage_role in manage_roles ]
- add_library_item_role_list = [ add_role.name for add_role in add_roles ]
-
- return dict( modify_folder_role_list=modify_folder_role_list,
manage_folder_role_list=manage_folder_role_list,
add_library_item_role_list=add_library_item_role_list )
+ return self.__decode_folder_id( trans, self.__cut_the_prefix( encoded_folder_id )
)
def _load_role( self, trans, role_name ):
"""
Repository URL:
https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from
bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.