3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/d1593c067833/ Changeset: d1593c067833 User: natefoo Date: 2013-08-28 22:08:21 Summary: Reorganize object store options in sample config file. Affected #: 1 file diff -r 4fea98291cbce9a0e8d580bfd9c4ef4a5184c680 -r d1593c067833c8dc4c85bdcc7b4d90b79d8ab01c universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -203,6 +203,41 @@ # breaking existing tools. #collect_outputs_from = new_file_path,job_working_directory +# -- Data Storage (Object Store) + +# Object store backend module (valid options are: disk, s3, swift, irods, +# distributed, hierarchical) +#object_store = disk + +# Credentials used by certain (s3, swift) object store backends +#os_access_key = <your cloud object store access key> +#os_secret_key = <your cloud object store secret key> +#os_bucket_name = <name of an existing object store bucket or container> + +# If using 'swift' object store, you must specify the following connection +# properties +#os_host = swift.rc.nectar.org.au +#os_port = 8888 +#os_is_secure = False +#os_conn_path = / + +# Reduced redundancy can be used only with the 's3' object store +#os_use_reduced_redundancy = False + +# Path to cache directory for object store backends that utilize a cache (s3, +# swift, irods) +#object_store_cache_path = database/files/ + +# Size (in GB) that the cache used by object store should be limited to. +# If the value is not specified, the cache size will be limited only by the +# file system size. +#object_store_cache_size = 100 + +# Configuration file for the distributed object store, if object_store = +# distributed. See the sample at distributed_object_store_conf.xml.sample +#distributed_object_store_config_file = None + + # -- Mail and notification # Galaxy sends mail for various things: Subscribing users to the mailing list @@ -524,28 +559,6 @@ # -- Beta features -# Object store mode (valid options are: disk, s3, swift, distributed, hierarchical) -#object_store = disk -#os_access_key = <your cloud object store access key> -#os_secret_key = <your cloud object store secret key> -#os_bucket_name = <name of an existing object store bucket or container> -# If using 'swift' object store, you must specify the following connection properties -#os_host = swift.rc.nectar.org.au -#os_port = 8888 -#os_is_secure = False -#os_conn_path = / -# Reduced redundancy can be used only with the 's3' object store -#os_use_reduced_redundancy = False -# Size (in GB) that the cache used by object store should be limited to. -# If the value is not specified, the cache size will be limited only by the -# file system size. The file system location of the cache is considered the -# configuration of the ``file_path`` directive defined above. -#object_store_cache_size = 100 - -# Configuration file for the distributed object store, if object_store = -# distributed. See the sample at distributed_object_store_conf.xml.sample -#distributed_object_store_config_file = None - # Enable Galaxy to communicate directly with a sequencer #enable_sequencer_communication = False https://bitbucket.org/galaxy/galaxy-central/commits/75bcfb029e8c/ Changeset: 75bcfb029e8c User: natefoo Date: 2013-08-28 22:11:29 Summary: Implement a local_extra_dirs decorator in the object store that object store plugins (e.g. cloud object stores) can use to cause things like the job_working_directory to be stored on local disk. Affected #: 1 file diff -r d1593c067833c8dc4c85bdcc7b4d90b79d8ab01c -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -25,7 +25,7 @@ """ ObjectStore abstract interface """ - def __init__(self): + def __init__(self, config, **kwargs): self.running = True self.extra_dirs = {} @@ -195,7 +195,7 @@ >>> assert s.get_filename(obj) == file_path + '/000/dataset_1.dat' """ def __init__(self, config, file_path=None, extra_dirs=None): - super(DiskObjectStore, self).__init__() + super(DiskObjectStore, self).__init__(config, file_path=file_path, extra_dirs=extra_dirs) self.file_path = file_path or config.file_path self.config = config self.extra_dirs['job_work'] = config.job_working_directory @@ -213,6 +213,7 @@ if not os.path.exists(path): return self._construct_path(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) + # TODO: rename to _disk_path or something like that to avoid conflicts with children that'll use the local_extra_dirs decorator, e.g. S3 def _construct_path(self, obj, old_style=False, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): """ Construct the expected absolute path for accessing the object identified by `obj`.id. @@ -544,6 +545,19 @@ else: log.error("Unrecognized object store definition: {0}".format(store)) +def local_extra_dirs( func ): + """ A decorator for non-local plugins to utilize local directories for their extra_dirs (job_working_directory and temp). + """ + def wraps( self, *args, **kwargs ): + if kwargs.get( 'base_dir', None ) is None: + return func( self, *args, **kwargs ) + else: + for c in self.__class__.__mro__: + if c.__name__ == 'DiskObjectStore': + return getattr( c, func.__name__ )( self, *args, **kwargs ) + raise Exception( "Could not call DiskObjectStore's %s method, does your Object Store plugin inherit from DiskObjectStore?" % func.__name__ ) + return wraps + def convert_bytes(bytes): """ A helper function used for pretty printing disk usage """ if bytes is None: https://bitbucket.org/galaxy/galaxy-central/commits/fc4e1a162195/ Changeset: fc4e1a162195 User: natefoo Date: 2013-08-28 22:20:19 Summary: Experimental/alpha/in-development/incomplete object store plugin for iRODS. This plugin will probably set your iRODS instance on fire: do not use except in development/testing. Affected #: 6 files diff -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae -r fc4e1a162195e45f69573724adec564ffe1ea9ee eggs.ini --- a/eggs.ini +++ b/eggs.ini @@ -18,6 +18,7 @@ MarkupSafe = 0.12 mercurial = 2.2.3 MySQL_python = 1.2.3c1 +PyRods = 3.2.4 numpy = 1.6.0 pbs_python = 4.3.5 psycopg2 = 2.0.13 diff -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae -r fc4e1a162195e45f69573724adec564ffe1ea9ee lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -209,6 +209,7 @@ if self.nginx_upload_store: self.nginx_upload_store = os.path.abspath( self.nginx_upload_store ) self.object_store = kwargs.get( 'object_store', 'disk' ) + self.object_store_cache_path = resolve_path( kwargs.get( "object_store_cache_path", "database/object_store_cache" ), self.root ) # Handle AWS-specific config options for backward compatibility if kwargs.get( 'aws_access_key', None) is not None: self.os_access_key= kwargs.get( 'aws_access_key', None ) @@ -228,6 +229,8 @@ self.distributed_object_store_config_file = kwargs.get( 'distributed_object_store_config_file', None ) if self.distributed_object_store_config_file is not None: self.distributed_object_store_config_file = resolve_path( self.distributed_object_store_config_file, self.root ) + self.irods_root_collection_path = kwargs.get( 'irods_root_collection_path', None ) + self.irods_default_resource = kwargs.get( 'irods_default_resource', None ) # Parse global_conf and save the parser global_conf = kwargs.get( 'global_conf', None ) global_conf_parser = ConfigParser.ConfigParser() @@ -369,6 +372,7 @@ self.nginx_upload_store, \ './static/genetrack/plots', \ self.whoosh_index_dir, \ + self.object_store_cache_path, \ os.path.join( self.tool_data_path, 'shared', 'jars' ): if path not in [ None, False ] and not os.path.isdir( path ): try: diff -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae -r fc4e1a162195e45f69573724adec564ffe1ea9ee lib/galaxy/eggs/__init__.py --- a/lib/galaxy/eggs/__init__.py +++ b/lib/galaxy/eggs/__init__.py @@ -387,7 +387,8 @@ "guppy": lambda: self.config.get( "app:main", "use_memdump" ), "python_openid": lambda: self.config.get( "app:main", "enable_openid" ), "python_daemon": lambda: sys.version_info[:2] >= ( 2, 5 ), - "pysam": lambda: check_pysam() + "pysam": lambda: check_pysam(), + "PyRods": lambda: self.config.get( "app:main", "object_store" ) == "irods" }.get( egg_name, lambda: True )() except: return False diff -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae -r fc4e1a162195e45f69573724adec564ffe1ea9ee lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -542,6 +542,9 @@ return DistributedObjectStore(config=config, fsmon=fsmon) elif store == 'hierarchical': return HierarchicalObjectStore() + elif store == 'irods': + from galaxy.objectstore.rods import IRODSObjectStore + return IRODSObjectStore(config=config) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae -r fc4e1a162195e45f69573724adec564ffe1ea9ee lib/galaxy/objectstore/rods.py --- /dev/null +++ b/lib/galaxy/objectstore/rods.py @@ -0,0 +1,332 @@ +""" +Object Store plugin for the Integrated Rule-Oriented Data Store (iRODS) + +The module is named rods to avoid conflicting with the PyRods module, irods +""" + +import os +import time +import errno +import logging +#import traceback + +from posixpath import join as path_join +from posixpath import basename as path_basename +from posixpath import dirname as path_dirname + +from galaxy.objectstore import DiskObjectStore, ObjectStore, local_extra_dirs +from galaxy.exceptions import ObjectNotFound, ObjectInvalid + +import galaxy.eggs +galaxy.eggs.require( 'PyRods' ) +import irods + +log = logging.getLogger( __name__ ) + + +class IRODSObjectStore( DiskObjectStore, ObjectStore ): + """ + Galaxy object store based on iRODS + """ + def __init__( self, config, file_path=None, extra_dirs=None ): + super( IRODSObjectStore, self ).__init__( config, file_path=file_path, extra_dirs=extra_dirs ) + self.cache_path = config.object_store_cache_path + self.default_resource = config.irods_default_resource or None + + # Connect to iRODS (AssertionErrors will be raised if anything goes wrong) + self.rods_env, self.rods_conn = rods_connect() + + # if the root collection path in the config is unset or relative, try to use a sensible default + if config.irods_root_collection_path is None or ( config.irods_root_collection_path is not None and not config.irods_root_collection_path.startswith( '/' ) ): + rods_home = self.rods_env.rodsHome + assert rods_home != '', "Unable to initialize iRODS Object Store: rodsHome cannot be determined and irods_root_collection_path in Galaxy config is unset or not absolute." + if config.irods_root_collection_path is None: + self.root_collection_path = path_join( rods_home, 'galaxy_data' ) + else: + self.root_collection_path = path_join( rods_home, config.irods_root_collection_path ) + else: + self.root_collection_path = config.irods_root_collection_path + + # will return a collection object regardless of whether it exists + self.root_collection = irods.irodsCollection( self.rods_conn, self.root_collection_path ) + + if self.root_collection.getId() == -1: + log.warning( "iRODS root collection does not exist, will attempt to create: %s", self.root_collection_path ) + self.root_collection.upCollection() + assert self.root_collection.createCollection( os.path.basename( self.root_collection_path ) ) == 0, "iRODS root collection creation failed: %s" % self.root_collection_path + self.root_collection = irods.irodsCollection( self.rods_conn, self.root_collection_path ) + assert self.root_collection.getId() != -1, "iRODS root collection creation claimed success but still does not exist" + + if self.default_resource is None: + self.default_resource = self.rods_env.rodsDefResource + + log.info( "iRODS data for this instance will be stored in collection: %s, resource: %s", self.root_collection_path, self.default_resource ) + + def __get_rods_path( self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, strip_dat=True, **kwargs ): + path = "" + if extra_dir is not None: + path = extra_dir + + # extra_dir_at_root is ignored - since the iRODS plugin does not use + # the directory hash, there is only one level of subdirectory. + + if not dir_only: + # the .dat extension is stripped when stored in iRODS + # TODO: is the strip_dat kwarg the best way to implement this? + if strip_dat and alt_name and alt_name.endswith( '.dat' ): + alt_name = os.path.splitext( alt_name )[0] + default_name = 'dataset_%s' % obj.id + if not strip_dat: + default_name += '.dat' + path = path_join( path, alt_name if alt_name else default_name ) + + path = path_join( self.root_collection_path, path ) + + #log.debug( 'iRODS path for %s %s is %s', obj.__class__.__name__, obj.id, path ) + + return path + + def __get_cache_path( self, obj, **kwargs ): + # FIXME: does not handle collections + # FIXME: collisions could occur here + return os.path.join( self.cache_path, path_basename( self.__get_rods_path( obj, strip_dat=False, **kwargs ) ) ) + + def __clean_cache_entry( self, obj, **kwargs ): + # FIXME: does not handle collections + try: + os.unlink( self.__get_cache_path( obj, **kwargs ) ) + except OSError: + # it is expected that we'll call this method a lot regardless of + # whether we think the cached file exists + pass + + def __get_rods_handle( self, obj, mode='r', **kwargs ): + if kwargs.get( 'dir_only', False ): + return irods.irodsCollection( self.rods_conn, self.__get_rods_path( obj, **kwargs ) ) + else: + return irods.irodsOpen( self.rods_conn, self.__get_rods_path( obj, **kwargs ), mode ) + + def __mkcolls( self, rods_path ): + """ + An os.makedirs() for iRODS collections. `rods_path` is the desired collection to create. + """ + assert rods_path.startswith( self.root_collection_path + '/' ), '__mkcolls(): Creating collections outside the root collection is not allowed (requested path was: %s)' % rods_path + mkcolls = [] + c = irods.irodsCollection( self.rods_conn, rods_path ) + while c.getId() == -1: + assert c.getCollName().startswith( self.root_collection_path + '/' ), '__mkcolls(): Attempted to move above the root collection: %s' % c.getCollName() + mkcolls.append( c.getCollName() ) + c.upCollection() + for collname in reversed( mkcolls ): + log.debug( 'Creating collection %s' % collname ) + ci = irods.collInp_t() + ci.collName = collname + status = rcCollCreate( self.rods_conn, ci ) + assert status == 0, '__mkcolls(): Failed to create collection: %s' % collname + + @local_extra_dirs + def exists( self, obj, **kwargs ): + doi = irods.dataObjInp_t() + doi.objPath = self.__get_rods_path( obj, **kwargs ) + log.debug( 'exists(): checking: %s', doi.objPath ) + return irods.rcObjStat( self.rods_conn, doi ) is not None + + @local_extra_dirs + def create(self, obj, **kwargs): + if not self.exists( obj, **kwargs ): + rods_path = self.__get_rods_path( obj, **kwargs ) + log.debug( 'create(): %s', rods_path ) + dir_only = kwargs.get( 'dir_only', False ) + # short circuit collection creation since most of the time it will + # be the root collection which already exists + collection_path = rods_path if dir_only else path_dirname( rods_path ) + if collection_path != self.root_collection_path: + self.__mkcolls( collection_path ) + if not dir_only: + # rcDataObjCreate is used instead of the irodsOpen wrapper so + # that we can prevent overwriting + doi = irods.dataObjInp_t() + doi.objPath = rods_path + doi.createMode = 0640 + doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable + irods.addKeyVal( doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource ) + status = irods.rcDataObjCreate( self.rods_conn, doi ) + assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % ( rods_path, status, irods.strerror( status ) ) + + @local_extra_dirs + def empty( self, obj, **kwargs ): + assert 'dir_only' not in kwargs, 'empty(): `dir_only` parameter is invalid here' + h = self.__get_rods_handle( obj, **kwargs ) + try: + return h.getSize() == 0 + except AttributeError: + # h is None + raise ObjectNotFound() + + def size( self, obj, **kwargs ): + assert 'dir_only' not in kwargs, 'size(): `dir_only` parameter is invalid here' + h = self.__get_rods_handle( obj, **kwargs ) + try: + return h.getSize() + except AttributeError: + # h is None + return 0 + + @local_extra_dirs + def delete( self, obj, entire_dir=False, **kwargs ): + assert 'dir_only' not in kwargs, 'delete(): `dir_only` parameter is invalid here' + rods_path = self.__get_rods_path( obj, **kwargs ) + # __get_rods_path prepends self.root_collection_path but we are going + # to ensure that it's valid anyway for safety's sake + assert rods_path.startswith( self.root_collection_path + '/' ), 'ERROR: attempt to delete object outside root collection (path was: %s)' % rods_path + if entire_dir: + # TODO + raise NotImplementedError() + h = self.__get_rods_handle( obj, **kwargs ) + try: + # note: PyRods' irodsFile.delete() does not set force + status = h.delete() + assert status == 0, '%d: %s' % ( status, irods.strerror( status ) ) + return True + except AttributeError: + log.warning( 'delete(): operation failed: object does not exist: %s', rods_path ) + except AssertionError, e: + # delete() does not raise on deletion failure + log.error( 'delete(): operation failed: %s', e ) + finally: + # remove the cached entry (finally is executed even when the try + # contains a return) + self.__clean_cache_entry( self, obj, **kwargs ) + return False + + @local_extra_dirs + def get_data( self, obj, start=0, count=-1, **kwargs ): + log.debug( 'get_data(): %s' ) + h = self.__get_rods_handle( obj, **kwargs ) + try: + h.seek( start ) + except AttributeError: + raise ObjectNotFound() + if count == -1: + return h.read() + else: + return f.read( count ) + # TODO: make sure implicit close is okay, DiskObjectStore actually + # reads data into a var, closes, and returns the var + + @local_extra_dirs + def get_filename( self, obj, **kwargs ): + log.debug( "get_filename(): called on %s %s. For better performance, avoid this method and use get_data() instead.", obj.__class__.__name__, obj.id ) + + # For finding all places where get_filename is called... + #log.debug( ''.join( traceback.format_stack() ) ) + + cached_path = self.__get_cache_path( obj, **kwargs ) + + if not self.exists( obj, **kwargs ): + raise ObjectNotFound() + + # TODO: implement or define whether dir_only is valid + if 'dir_only' in kwargs: + raise NotImplementedError() + + # cache hit + if os.path.exists( cached_path ): + return os.path.abspath( cached_path ) + + # cache miss + # TODO: thread this + incoming_path = os.path.join( os.path.dirname( cached_path ), "__incoming_%s" % os.path.basename( cached_path ) ) + doi = irods.dataObjInp_t() + doi.objPath = self.__get_rods_path( obj, **kwargs ) + doi.dataSize = 0 # TODO: does this affect performance? should we get size? + doi.numThreads = 0 + # TODO: might want to VERIFY_CHKSUM_KW + log.debug( 'get_filename(): caching %s to %s', doi.objPath, incoming_path ) + + # do the iget + status = irods.rcDataObjGet( self.rods_conn, doi, incoming_path ) + + # if incoming already exists, we'll wait for another process or thread + # to finish caching + if status != irods.OVERWRITE_WITHOUT_FORCE_FLAG: + assert status == 0, 'get_filename(): iget %s failed (%s): %s' % ( doi.objPath, status, irods.strerror( status ) ) + # POSIX rename is atomic + # TODO: rename without clobbering + os.rename( incoming_path, cached_path ) + log.debug( 'get_filename(): cached %s to %s', doi.objPath, cached_path ) + + # another process or thread is caching, wait for it + while not os.path.exists( cached_path ): + # TODO: force restart after mod time > some configurable, or + # otherwise deal with this potential deadlock and interrupted + # transfers + time.sleep( 5 ) + log.debug( "get_filename(): waiting on incoming '%s' for %s %s", incoming_path, obj.__class__.__name__, obj.id ) + + return os.path.abspath( cached_path ) + + @local_extra_dirs + def update_from_file(self, obj, file_name=None, create=False, **kwargs): + assert 'dir_only' not in kwargs, 'update_from_file(): `dir_only` parameter is invalid here' + + # do not create if not requested + if create and not self.exists( obj, **kwargs ): + raise ObjectNotFound() + + if file_name is None: + file_name = self.__get_cache_path( obj, **kwargs ) + + # put will create if necessary + doi = irods.dataObjInp_t() + doi.objPath = self.__get_rods_path( obj, **kwargs ) + doi.createMode = 0640 + doi.dataSize = os.stat( file_name ).st_size + doi.numThreads = 0 + irods.addKeyVal( doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource ) + irods.addKeyVal( doi.condInput, irods.FORCE_FLAG_KW, '' ) + # TODO: might want to VERIFY_CHKSUM_KW + log.debug( 'update_from_file(): updating %s to %s', file_name, doi.objPath ) + + # do the iput + status = irods.rcDataObjPut( self.rods_conn, doi, file_name ) + assert status == 0, 'update_from_file(): iput %s failed (%s): %s' % ( doi.objPath, status, irods.strerror( status ) ) + + def get_object_url(self, obj, **kwargs): + return None + + def get_store_usage_percent(self): + return 0.0 + +# monkeypatch an strerror method into the irods module +def _rods_strerror( errno ): + """ + The missing `strerror` for iRODS error codes + """ + if not hasattr( irods, '__rods_strerror_map' ): + irods.__rods_strerror_map = {} + for name in dir( irods ): + v = getattr( irods, name ) + if type( v ) == int and v < 0: + irods.__rods_strerror_map[ v ] = name + return irods.__rods_strerror_map.get( errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND' ) + +irods.strerror = _rods_strerror + +def rods_connect(): + """ + A basic iRODS connection mechanism that connects using the current iRODS + environment + """ + status, env = irods.getRodsEnv() + assert status == 0, 'connect(): getRodsEnv() failed (%s): %s' % ( status, irods.strerror( status ) ) + conn, err = irods.rcConnect( env.rodsHost, + env.rodsPort, + env.rodsUserName, + env.rodsZone ) + assert err.status == 0, 'connect(): rcConnect() failed (%s): %s' % ( err.status, err.msg ) + status, pw = irods.obfGetPw() + assert status == 0, 'connect(): getting password with obfGetPw() failed (%s): %s' % ( status, irods.strerror( status ) ) + status = irods.clientLoginWithObfPassword( conn, pw ) + assert status == 0, 'connect(): logging in with clientLoginWithObfPassword() failed (%s): %s' % ( status, irods.strerror( status ) ) + return env, conn diff -r 75bcfb029e8c5dd8e3d78829469a8583c73055ae -r fc4e1a162195e45f69573724adec564ffe1ea9ee scripts/scramble/scripts/PyRods.py --- /dev/null +++ b/scripts/scramble/scripts/PyRods.py @@ -0,0 +1,33 @@ +import os, sys +import subprocess + +# change back to the build dir +if os.path.dirname( sys.argv[0] ) != "": + os.chdir( os.path.dirname( sys.argv[0] ) ) + +# find setuptools +sys.path.append( os.path.join( '..', '..', '..', 'lib' ) ) +from scramble_lib import * + +tag = get_tag() # get the tag +get_deps() # require any dependent eggs +clean() # clean up any existing stuff (could happen if you run scramble.py by hand) + +subprocess.check_call( "./scripts/configure", shell=True ) +subprocess.check_call( "CFLAGS='-fPIC' make clients", shell=True ) + +# reset args for distutils +me = sys.argv[0] +sys.argv = [ me ] +sys.argv.append( "egg_info" ) +if tag is not None: + sys.argv.append( "--tag-build=%s" %tag ) +# svn revision (if any) is handled directly in tag-build +sys.argv.append( "--no-svn-revision" ) +sys.argv.append( "bdist_egg" ) + +# apply patches (if any) +apply_patches() + +# do it +execfile( "setup.py", globals(), locals() ) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.