10 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/fe2c12b2eb22/ Changeset: fe2c12b2eb22 User: jmchilton Date: 2013-12-02 02:07:03 Summary: Refactor directory_hash_id into its own module. Also add PEP-8 cleanup and unit tests. This eliminates explicit dependency between galaxy.objectstore and galaxy.model - with the ultimate goal of being able to utilize all object store functionality except dataset creation from a remote LWR instance without a database connection. Affected #: 4 files diff -r 752929cea6d37423c14f46f317ef3815cfc3c3d0 -r fe2c12b2eb22a5cab876629593e91079e27f2492 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -29,6 +29,7 @@ from galaxy.util import asbool, is_multi_byte, nice_size, Params, restore_text, send_mail from galaxy.util.bunch import Bunch from galaxy.util.hash_util import new_secure_hash +from galaxy.util.directory_hash import directory_hash_id from galaxy.web.framework.helpers import to_unicode from galaxy.web.form_builder import (AddressField, CheckboxField, HistoryField, PasswordField, SelectField, TextArea, TextField, WorkflowField, @@ -4029,18 +4030,3 @@ self.repository_id = repository_id self.repository_path = repository_path self.version = version - -## ---- Utility methods ------------------------------------------------------- - -def directory_hash_id( id ): - s = str( id ) - l = len( s ) - # Shortcut -- ids 0-999 go under ../000/ - if l < 4: - return [ "000" ] - # Pad with zeros until a multiple of three - padded = ( ( 3 - len( s ) % 3 ) * "0" ) + s - # Drop the last three digits -- 1000 files per directory - padded = padded[:-3] - # Break into chunks of three - return [ padded[i*3:(i+1)*3] for i in range( len( padded ) // 3 ) ] diff -r 752929cea6d37423c14f46f317ef3815cfc3c3d0 -r fe2c12b2eb22a5cab876629593e91079e27f2492 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -13,7 +13,7 @@ from galaxy import util from galaxy.exceptions import ObjectInvalid, ObjectNotFound from galaxy.jobs import Sleeper -from galaxy.model import directory_hash_id +from galaxy.util.directory_hash import directory_hash_id from galaxy.util.odict import odict from sqlalchemy.orm import object_session diff -r 752929cea6d37423c14f46f317ef3815cfc3c3d0 -r fe2c12b2eb22a5cab876629593e91079e27f2492 lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -12,7 +12,7 @@ from galaxy import util from galaxy.jobs import Sleeper -from galaxy.model import directory_hash_id +from galaxy.util.directory_hash import directory_hash_id from galaxy.objectstore import ObjectStore, convert_bytes from galaxy.exceptions import ObjectNotFound diff -r 752929cea6d37423c14f46f317ef3815cfc3c3d0 -r fe2c12b2eb22a5cab876629593e91079e27f2492 lib/galaxy/util/directory_hash.py --- /dev/null +++ b/lib/galaxy/util/directory_hash.py @@ -0,0 +1,23 @@ + + +def directory_hash_id( id ): + """ + + >>> directory_hash_id( 100 ) + ['000'] + >>> directory_hash_id( "90000" ) + ['090'] + >>> directory_hash_id("777777777") + ['000', '777', '777'] + """ + s = str( id ) + l = len( s ) + # Shortcut -- ids 0-999 go under ../000/ + if l < 4: + return [ "000" ] + # Pad with zeros until a multiple of three + padded = ( ( 3 - len( s ) % 3 ) * "0" ) + s + # Drop the last three digits -- 1000 files per directory + padded = padded[:-3] + # Break into chunks of three + return [ padded[ i * 3 : (i + 1 ) * 3 ] for i in range( len( padded ) // 3 ) ] https://bitbucket.org/galaxy/galaxy-central/commits/433d5f4f5c66/ Changeset: 433d5f4f5c66 User: jmchilton Date: 2013-12-02 02:07:04 Summary: PEP-8 fixes for object store module. Affected #: 3 files diff -r fe2c12b2eb22a5cab876629593e91079e27f2492 -r 433d5f4f5c66c17701d84b08185ca2b733664d6b lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -358,7 +358,7 @@ def get_store_usage_percent(self): st = os.statvfs(self.file_path) - return (float(st.f_blocks - st.f_bavail)/st.f_blocks) * 100 + return ( float( st.f_blocks - st.f_bavail ) / st.f_blocks ) * 100 class CachingObjectStore(ObjectStore): @@ -428,7 +428,7 @@ return store.__getattribute__(method)(obj, **kwargs) if default_is_exception: raise default( 'objectstore, __call_method failed: %s on %s, kwargs: %s' - %( method, str( obj ), str( kwargs ) ) ) + % ( method, str( obj ), str( kwargs ) ) ) else: return default @@ -462,7 +462,7 @@ self.filesystem_monitor_thread.start() log.info("Filesystem space monitor started") - def __parse_distributed_config(self, config, config_xml = None): + def __parse_distributed_config(self, config, config_xml=None): if not config_xml: tree = util.parse_xml(self.distributed_config) root = tree.getroot() @@ -512,7 +512,7 @@ if pct > maxpct: new_weighted_backend_ids = filter(lambda x: x != id, new_weighted_backend_ids) self.weighted_backend_ids = new_weighted_backend_ids - self.sleeper.sleep(120) # Test free space every 2 minutes + self.sleeper.sleep(120) # Test free space every 2 minutes def create(self, obj, **kwargs): """ @@ -524,7 +524,7 @@ obj.object_store_id = random.choice(self.weighted_backend_ids) except IndexError: raise ObjectInvalid( 'objectstore.create, could not generate obj.object_store_id: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) object_session( obj ).add( obj ) object_session( obj ).flush() log.debug("Selected backend '%s' for creation of %s %s" % (obj.object_store_id, obj.__class__.__name__, obj.id)) @@ -538,7 +538,7 @@ return self.backends[object_store_id].__getattribute__(method)(obj, **kwargs) if default_is_exception: raise default( 'objectstore, __call_method failed: %s on %s, kwargs: %s' - %( method, str( obj ), str( kwargs ) ) ) + % ( method, str( obj ), str( kwargs ) ) ) else: return default @@ -625,6 +625,7 @@ else: log.error("Unrecognized object store definition: {0}".format(store)) + def local_extra_dirs( func ): """ A decorator for non-local plugins to utilize local directories for their extra_dirs (job_working_directory and temp). """ @@ -638,6 +639,7 @@ raise Exception( "Could not call DiskObjectStore's %s method, does your Object Store plugin inherit from DiskObjectStore?" % func.__name__ ) return wraps + def convert_bytes(bytes): """ A helper function used for pretty printing disk usage """ if bytes is None: diff -r fe2c12b2eb22a5cab876629593e91079e27f2492 -r 433d5f4f5c66c17701d84b08185ca2b733664d6b lib/galaxy/objectstore/rods.py --- a/lib/galaxy/objectstore/rods.py +++ b/lib/galaxy/objectstore/rods.py @@ -146,7 +146,7 @@ doi = irods.dataObjInp_t() doi.objPath = rods_path doi.createMode = 0640 - doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable + doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable irods.addKeyVal( doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource ) status = irods.rcDataObjCreate( self.rods_conn, doi ) assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % ( rods_path, status, irods.strerror( status ) ) @@ -237,7 +237,7 @@ incoming_path = os.path.join( os.path.dirname( cached_path ), "__incoming_%s" % os.path.basename( cached_path ) ) doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path( obj, **kwargs ) - doi.dataSize = 0 # TODO: does this affect performance? should we get size? + doi.dataSize = 0 # TODO: does this affect performance? should we get size? doi.numThreads = 0 # TODO: might want to VERIFY_CHKSUM_KW log.debug( 'get_filename(): caching %s to %s', doi.objPath, incoming_path ) @@ -296,6 +296,7 @@ def get_store_usage_percent(self): return 0.0 + # monkeypatch an strerror method into the irods module def _rods_strerror( errno ): """ @@ -311,6 +312,7 @@ irods.strerror = _rods_strerror + def rods_connect(): """ A basic iRODS connection mechanism that connects using the current iRODS diff -r fe2c12b2eb22a5cab876629593e91079e27f2492 -r 433d5f4f5c66c17701d84b08185ca2b733664d6b lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -24,7 +24,7 @@ from boto.exception import S3ResponseError log = logging.getLogger( __name__ ) -logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy +logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy class S3ObjectStore(ObjectStore): @@ -82,7 +82,7 @@ raise def __cache_monitor(self): - time.sleep(2) # Wait for things to load before starting the monitor + time.sleep(2) # Wait for things to load before starting the monitor while self.running: total_size = 0 # Is this going to be too expensive of an operation to be done frequently? @@ -110,7 +110,7 @@ # For now, delete enough to leave at least 10% of the total cache free delete_this_much = total_size - cache_limit self.__clean_cache(file_list, delete_this_much) - self.sleeper.sleep(30) # Test cache size every 30 seconds? + self.sleeper.sleep(30) # Test cache size every 30 seconds? def __clean_cache(self, file_list, delete_this_much): """ Keep deleting files from the file_list until the size of the deleted @@ -154,7 +154,7 @@ log.debug("Using cloud object store with bucket '%s'" % bucket.name) return bucket except S3ResponseError: - log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i+1)) + log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i + 1)) time.sleep(2) # All the attempts have been exhausted and connection was not established, # raise error @@ -283,7 +283,7 @@ return True else: log.debug("Pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) - self.transfer_progress = 0 # Reset transfer progress counter + self.transfer_progress = 0 # Reset transfer progress counter key.get_contents_to_filename(self._get_cache_path(rel_path), cb=self._transfer_cb, num_cb=10) return True except S3ResponseError, ex: @@ -315,14 +315,14 @@ mb_size = os.path.getsize(source_file) / 1e6 #DBTODO Hack, refactor this logic. if mb_size < 60 or type(self) == SwiftObjectStore: - self.transfer_progress = 0 # Reset transfer progress counter + self.transfer_progress = 0 # Reset transfer progress counter key.set_contents_from_filename(source_file, reduced_redundancy=self.use_rr, cb=self._transfer_cb, num_cb=10) else: multipart_upload(self.bucket, key.name, source_file, mb_size, use_rr=self.use_rr) end_time = datetime.now() # print "+ Push ended at '%s'; %s bytes transfered in %ssec" % (end_time, os.path.getsize(source_file), end_time-start_time) - log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time-start_time)) + log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time - start_time)) return True else: log.error("Tried updating key '%s' from source file '%s', but source file does not exist." @@ -408,7 +408,7 @@ return bool(self.size(obj, **kwargs) > 0) else: raise ObjectNotFound( 'objectstore.empty, object does not exist: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) def size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) @@ -484,7 +484,7 @@ return cache_path # Check if the file exists in persistent storage and, if it does, pull it into cache elif self.exists(obj, **kwargs): - if dir_only: # Directories do not get pulled into cache + if dir_only: # Directories do not get pulled into cache return cache_path else: if self._pull_into_cache(rel_path): @@ -494,7 +494,7 @@ # if dir_only: # return cache_path raise ObjectNotFound( 'objectstore.get_filename, no cache_path: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) # return cache_path # Until the upload tool does not explicitly create the dataset, return expected path def update_from_file(self, obj, file_name=None, create=False, **kwargs): @@ -520,14 +520,14 @@ self._push_to_os(rel_path, source_file) else: raise ObjectNotFound( 'objectstore.update_from_file, object does not exist: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) def get_object_url(self, obj, **kwargs): if self.exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: key = Key(self.bucket, rel_path) - return key.generate_url(expires_in = 86400) # 24hrs + return key.generate_url(expires_in=86400) # 24hrs except S3ResponseError, ex: log.warning("Trouble generating URL for dataset '%s': %s" % (rel_path, ex)) return None @@ -552,4 +552,3 @@ port=self.port, calling_format=boto.s3.connection.OrdinaryCallingFormat(), path=self.conn_path) - https://bitbucket.org/galaxy/galaxy-central/commits/1f62ef450cd7/ Changeset: 1f62ef450cd7 User: jmchilton Date: 2013-12-02 02:07:04 Summary: Refactor jobs.Sleeper into its own module. This eliminates explicit dependency between galaxy.objectstore and galaxy.job - with the ultimate goal of being able to utilize all object store functionality except dataset creation from a remote LWR instance with minimal dependencies. Affected #: 6 files diff -r 433d5f4f5c66c17701d84b08185ca2b733664d6b -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -14,7 +14,6 @@ import shutil import subprocess import sys -import threading import traceback from galaxy import model, util from galaxy.datatypes import metadata @@ -39,21 +38,6 @@ # and should eventually become API'd TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' -class Sleeper( object ): - """ - Provides a 'sleep' method that sleeps for a number of seconds *unless* - the notify method is called (from a different thread). - """ - def __init__( self ): - self.condition = threading.Condition() - def sleep( self, seconds ): - self.condition.acquire() - self.condition.wait( seconds ) - self.condition.release() - def wake( self ): - self.condition.acquire() - self.condition.notify() - self.condition.release() class JobDestination( Bunch ): """ diff -r 433d5f4f5c66c17701d84b08185ca2b733664d6b -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -11,7 +11,8 @@ from sqlalchemy.sql.expression import and_, or_, select, func from galaxy import model -from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination +from galaxy.util.sleeper import Sleeper +from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination log = logging.getLogger( __name__ ) diff -r 433d5f4f5c66c17701d84b08185ca2b733664d6b -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -11,7 +11,8 @@ from Queue import Empty, Queue from galaxy import model -from galaxy.jobs import handler, JobWrapper, NoopQueue, Sleeper +from galaxy.util.sleeper import Sleeper +from galaxy.jobs import handler, JobWrapper, NoopQueue from galaxy.util.json import from_json_string log = logging.getLogger( __name__ ) diff -r 433d5f4f5c66c17701d84b08185ca2b733664d6b -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -12,7 +12,7 @@ from galaxy import util from galaxy.exceptions import ObjectInvalid, ObjectNotFound -from galaxy.jobs import Sleeper +from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id from galaxy.util.odict import odict diff -r 433d5f4f5c66c17701d84b08185ca2b733664d6b -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -11,7 +11,7 @@ from datetime import datetime from galaxy import util -from galaxy.jobs import Sleeper +from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id from galaxy.objectstore import ObjectStore, convert_bytes from galaxy.exceptions import ObjectNotFound diff -r 433d5f4f5c66c17701d84b08185ca2b733664d6b -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 lib/galaxy/util/sleeper.py --- /dev/null +++ b/lib/galaxy/util/sleeper.py @@ -0,0 +1,22 @@ +import threading + + +class Sleeper( object ): + """ + Provides a 'sleep' method that sleeps for a number of seconds *unless* + the notify method is called (from a different thread). + """ + def __init__( self ): + self.condition = threading.Condition() + + def sleep( self, seconds ): + # Should this be in a try/finally block? -John + self.condition.acquire() + self.condition.wait( seconds ) + self.condition.release() + + def wake( self ): + # Should this be in a try/finally block? -John + self.condition.acquire() + self.condition.notify() + self.condition.release() https://bitbucket.org/galaxy/galaxy-central/commits/b78fc1c8b14e/ Changeset: b78fc1c8b14e User: jmchilton Date: 2013-12-02 02:07:04 Summary: More work on objectstore dependencies. Be explicit about what in galaxy.util is being used, don't fail if galaxy.eggs is unavailable - assume the actual dependencies will be there (boto and rods) if not fail just a little later. If nothing else, this should fix some failing buildbot tests (nosetests records a failure if it cannot import a file and it cannot import the s3 or irods object store files because of these imports). Affected #: 4 files diff -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -10,7 +10,7 @@ import logging import threading -from galaxy import util +from galaxy.util import umask_fix_perms, parse_xml, force_symlink from galaxy.exceptions import ObjectInvalid, ObjectNotFound from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id @@ -291,7 +291,7 @@ # Create the file if it does not exist if not dir_only: open(path, 'w').close() - util.umask_fix_perms(path, self.config.umask, 0666) + umask_fix_perms(path, self.config.umask, 0666) def empty(self, obj, **kwargs): return os.path.getsize(self.get_filename(obj, **kwargs)) == 0 @@ -345,7 +345,7 @@ if file_name and self.exists(obj, **kwargs): try: if preserve_symlinks and os.path.islink( file_name ): - util.force_symlink( os.readlink( file_name ), self.get_filename( obj, **kwargs ) ) + force_symlink( os.readlink( file_name ), self.get_filename( obj, **kwargs ) ) else: shutil.copy( file_name, self.get_filename( obj, **kwargs ) ) except IOError, ex: @@ -464,7 +464,7 @@ def __parse_distributed_config(self, config, config_xml=None): if not config_xml: - tree = util.parse_xml(self.distributed_config) + tree = parse_xml(self.distributed_config) root = tree.getroot() log.debug('Loading backends for distributed object store from %s' % self.distributed_config) else: @@ -598,7 +598,7 @@ # This is a top level invocation of build_object_store_from_config, and # we have an object_store_conf.xml -- read the .xml and build # accordingly - tree = util.parse_xml(config.object_store_config_file) + tree = parse_xml(config.object_store_config_file) root = tree.getroot() store = root.get('type') config_xml = root diff -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 lib/galaxy/objectstore/rods.py --- a/lib/galaxy/objectstore/rods.py +++ b/lib/galaxy/objectstore/rods.py @@ -15,9 +15,14 @@ from galaxy.objectstore import DiskObjectStore, ObjectStore, local_extra_dirs from galaxy.exceptions import ObjectNotFound -import galaxy.eggs -galaxy.eggs.require( 'PyRods' ) -import irods +try: + import galaxy.eggs + galaxy.eggs.require( 'PyRods' ) + import irods +except ImportError: + irods = None + +NO_PYRODS_ERROR_MESSAGE = "IRODS object store configured, but no PyRods dependency available. Please install and properly configure PyRods or modify object store configuration." log = logging.getLogger( __name__ ) @@ -27,6 +32,8 @@ Galaxy object store based on iRODS """ def __init__( self, config, file_path=None, extra_dirs=None ): + if irods is None: + raise Exception(NO_PYRODS_ERROR_MESSAGE) super( IRODSObjectStore, self ).__init__( config, file_path=file_path, extra_dirs=extra_dirs ) self.cache_path = config.object_store_cache_path self.default_resource = config.irods_default_resource or None @@ -310,7 +317,8 @@ irods.__rods_strerror_map[ v ] = name return irods.__rods_strerror_map.get( errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND' ) -irods.strerror = _rods_strerror +if irods is not None: + irods.strerror = _rods_strerror def rods_connect(): diff -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -10,7 +10,7 @@ import subprocess from datetime import datetime -from galaxy import util +from galaxy.util import umask_fix_perms from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id from galaxy.objectstore import ObjectStore, convert_bytes @@ -18,10 +18,16 @@ import multiprocessing from galaxy.objectstore.s3_multipart_upload import multipart_upload -import boto -from boto.s3.key import Key -from boto.s3.connection import S3Connection -from boto.exception import S3ResponseError +try: + import boto + from boto.s3.key import Key + from boto.s3.connection import S3Connection + from boto.exception import S3ResponseError +except ImportError: + boto = None + +NO_BOTO_ERROR_MESSAGE = "S3/Swift object store configured, but no boto dependency available. Please install and properly configure boto or modify object store configuration." + log = logging.getLogger( __name__ ) logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy @@ -34,6 +40,8 @@ Galaxy and S3. """ def __init__(self, config, config_xml): + if boto is None: + raise Exception(NO_BOTO_ERROR_MESSAGE) super(S3ObjectStore, self).__init__(config, config_xml) self.config = config self.staging_path = self.config.file_path @@ -163,13 +171,13 @@ def _fix_permissions(self, rel_path): """ Set permissions on rel_path""" for basedir, dirs, files in os.walk(rel_path): - util.umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) + umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) for f in files: path = os.path.join(basedir, f) # Ignore symlinks if os.path.islink(path): continue - util.umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) + umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): rel_path = os.path.join(*directory_hash_id(obj.id)) diff -r 1f62ef450cd7c79af0de6a22f8c2b1c1b6cb9114 -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 lib/galaxy/objectstore/s3_multipart_upload.py --- a/lib/galaxy/objectstore/s3_multipart_upload.py +++ b/lib/galaxy/objectstore/s3_multipart_upload.py @@ -4,10 +4,8 @@ This parallelizes the task over available cores using multiprocessing. Code mostly taken form CloudBioLinux. """ -from __future__ import with_statement import os -import sys import glob import subprocess import contextlib @@ -16,10 +14,17 @@ import multiprocessing from multiprocessing.pool import IMapIterator -from galaxy import eggs -eggs.require('boto') +try: + from galaxy import eggs + eggs.require('boto') +except ImportError: + pass -import boto +try: + import boto +except ImportError: + boto = None + def map_wrap(f): @functools.wraps(f) https://bitbucket.org/galaxy/galaxy-central/commits/a03f4f2f46dd/ Changeset: a03f4f2f46dd User: jmchilton Date: 2013-12-02 02:07:04 Summary: Use relative imports in objectstore code. Goal is to enable the whole module to be placed in lwr.objectstore. Affected #: 3 files diff -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 -r a03f4f2f46dd783bd183def7ade25d0e10017ced lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -610,17 +610,17 @@ if store == 'disk': return DiskObjectStore(config=config, config_xml=config_xml) elif store == 's3': - from galaxy.objectstore.s3 import S3ObjectStore + from .s3 import S3ObjectStore return S3ObjectStore(config=config, config_xml=config_xml) elif store == 'swift': - from galaxy.objectstore.s3 import SwiftObjectStore + from .s3 import SwiftObjectStore return SwiftObjectStore(config=config, config_xml=config_xml) elif store == 'distributed': return DistributedObjectStore(config=config, fsmon=fsmon, config_xml=config_xml) elif store == 'hierarchical': return HierarchicalObjectStore(config=config, config_xml=config_xml) elif store == 'irods': - from galaxy.objectstore.rods import IRODSObjectStore + from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 -r a03f4f2f46dd783bd183def7ade25d0e10017ced lib/galaxy/objectstore/rods.py --- a/lib/galaxy/objectstore/rods.py +++ b/lib/galaxy/objectstore/rods.py @@ -12,8 +12,8 @@ from posixpath import basename as path_basename from posixpath import dirname as path_dirname -from galaxy.objectstore import DiskObjectStore, ObjectStore, local_extra_dirs from galaxy.exceptions import ObjectNotFound +from ..objectstore import DiskObjectStore, ObjectStore, local_extra_dirs try: import galaxy.eggs diff -r b78fc1c8b14e7926d3551f5a676d107fc0a4ed89 -r a03f4f2f46dd783bd183def7ade25d0e10017ced lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -13,11 +13,11 @@ from galaxy.util import umask_fix_perms from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id -from galaxy.objectstore import ObjectStore, convert_bytes +from ..objectstore import ObjectStore, convert_bytes from galaxy.exceptions import ObjectNotFound import multiprocessing -from galaxy.objectstore.s3_multipart_upload import multipart_upload +from .s3_multipart_upload import multipart_upload try: import boto from boto.s3.key import Key @@ -28,7 +28,6 @@ NO_BOTO_ERROR_MESSAGE = "S3/Swift object store configured, but no boto dependency available. Please install and properly configure boto or modify object store configuration." - log = logging.getLogger( __name__ ) logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy https://bitbucket.org/galaxy/galaxy-central/commits/10701b684e73/ Changeset: 10701b684e73 User: jmchilton Date: 2013-12-02 02:07:04 Summary: Make sqlalchemy dependency in object store optional unless create is used. Affected #: 1 file diff -r a03f4f2f46dd783bd183def7ade25d0e10017ced -r 10701b684e73af78f9688ded379e82b3417c86bf lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -15,8 +15,12 @@ from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id from galaxy.util.odict import odict +try: + from sqlalchemy.orm import object_session +except ImportError: + object_session = None -from sqlalchemy.orm import object_session +NO_SESSION_ERROR_MESSAGE = "Attempted to 'create' object store entity in configuration with no database session present." log = logging.getLogger( __name__ ) @@ -525,8 +529,7 @@ except IndexError: raise ObjectInvalid( 'objectstore.create, could not generate obj.object_store_id: %s, kwargs: %s' % ( str( obj ), str( kwargs ) ) ) - object_session( obj ).add( obj ) - object_session( obj ).flush() + create_object_in_session( obj ) log.debug("Selected backend '%s' for creation of %s %s" % (obj.object_store_id, obj.__class__.__name__, obj.id)) else: log.debug("Using preferred backend '%s' for creation of %s %s" % (obj.object_store_id, obj.__class__.__name__, obj.id)) @@ -554,8 +557,7 @@ if store.exists(obj, **kwargs): log.warning('%s object with ID %s found in backend object store with ID %s' % (obj.__class__.__name__, obj.id, id)) obj.object_store_id = id - object_session( obj ).add( obj ) - object_session( obj ).flush() + create_object_in_session( obj ) return id return None @@ -661,3 +663,12 @@ else: size = '%.2fb' % bytes return size + + +def create_object_in_session( obj ): + session = object_session( obj ) if object_session is not None else None + if session is not None: + object_session( obj ).add( obj ) + object_session( obj ).flush() + else: + raise Exception( NO_SESSION_ERROR_MESSAGE ) https://bitbucket.org/galaxy/galaxy-central/commits/1ba3ef72b736/ Changeset: 1ba3ef72b736 User: jmchilton Date: 2013-12-02 02:07:04 Summary: Update objectstore XML parsing to stdlib's ElementTree implementation. Eliminates dependency on older elementtree egg. Affected #: 1 file diff -r 10701b684e73af78f9688ded379e82b3417c86bf -r 1ba3ef72b7364678ab87d95412164eee89f8913e lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -9,8 +9,9 @@ import shutil import logging import threading +from xml.etree import ElementTree -from galaxy.util import umask_fix_perms, parse_xml, force_symlink +from galaxy.util import umask_fix_perms, force_symlink from galaxy.exceptions import ObjectInvalid, ObjectNotFound from galaxy.util.sleeper import Sleeper from galaxy.util.directory_hash import directory_hash_id @@ -206,7 +207,7 @@ self.extra_dirs['job_work'] = config.job_working_directory self.extra_dirs['temp'] = config.new_file_path #The new config_xml overrides universe settings. - if config_xml: + if config_xml is not None: for e in config_xml: if e.tag == 'files_dir': self.file_path = e.get('path') @@ -294,7 +295,7 @@ os.makedirs(dir) # Create the file if it does not exist if not dir_only: - open(path, 'w').close() + open(path, 'w').close() # Should be rb? umask_fix_perms(path, self.config.umask, 0666) def empty(self, obj, **kwargs): @@ -324,7 +325,7 @@ return False def get_data(self, obj, start=0, count=-1, **kwargs): - data_file = open(self.get_filename(obj, **kwargs), 'r') + data_file = open(self.get_filename(obj, **kwargs), 'r') # Should be rb? data_file.seek(start) content = data_file.read(count) data_file.close() @@ -446,7 +447,7 @@ def __init__(self, config, config_xml=None, fsmon=False): super(DistributedObjectStore, self).__init__(config, config_xml=config_xml) - if not config_xml: + if config_xml is None: self.distributed_config = config.distributed_object_store_config_file assert self.distributed_config is not None, "distributed object store ('object_store = distributed') " \ "requires a config file, please set one in " \ @@ -467,9 +468,8 @@ log.info("Filesystem space monitor started") def __parse_distributed_config(self, config, config_xml=None): - if not config_xml: - tree = parse_xml(self.distributed_config) - root = tree.getroot() + if config_xml is None: + root = ElementTree.parse(self.distributed_config).getroot() log.debug('Loading backends for distributed object store from %s' % self.distributed_config) else: root = config_xml.find('backends') @@ -596,15 +596,14 @@ Depending on the configuration setting, invoke the appropriate object store """ - if not config_xml and config.object_store_config_file: + if config_xml is None and config.object_store_config_file: # This is a top level invocation of build_object_store_from_config, and # we have an object_store_conf.xml -- read the .xml and build # accordingly - tree = parse_xml(config.object_store_config_file) - root = tree.getroot() + root = ElementTree.parse(config.object_store_config_file).getroot() store = root.get('type') config_xml = root - elif config_xml: + elif config_xml is not None: store = config_xml.get('type') else: store = config.object_store https://bitbucket.org/galaxy/galaxy-central/commits/a194847d84d3/ Changeset: a194847d84d3 User: jmchilton Date: 2013-12-02 02:07:04 Summary: Add test cases for some basic object store types. Namely tests for disk, hierarchical, and distributed stores - as well parsing. Affected #: 1 file diff -r 1ba3ef72b7364678ab87d95412164eee89f8913e -r a194847d84d373cd7efdb581819115e7303e0568 test/unit/test_objectstore.py --- /dev/null +++ b/test/unit/test_objectstore.py @@ -0,0 +1,210 @@ +import os +from shutil import rmtree +from string import Template +from tempfile import mkdtemp +try: + from galaxy import objectstore +except ImportError: + from lwr import objectstore +from contextlib import contextmanager + +DISK_TEST_CONFIG = """<?xml version="1.0"?> +<object_store type="disk"> + <files_dir path="${temp_directory}/files1"/> + <extra_dir type="temp" path="${temp_directory}/tmp1"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory1"/> +</object_store> +""" + + +def test_disk_store(): + with TestConfig(DISK_TEST_CONFIG) as (directory, object_store): + # Test no dataset with id 1 exists. + absent_dataset = MockDataset(1) + assert not object_store.exists(absent_dataset) + + # Write empty dataset 2 in second backend, ensure it is empty and + # exists. + empty_dataset = MockDataset(2) + directory.write(b"", "files1/000/dataset_2.dat") + assert object_store.exists(empty_dataset) + assert object_store.empty(empty_dataset) + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + hello_world_dataset = MockDataset(3) + directory.write(b"Hello World!", "files1/000/dataset_3.dat") + assert object_store.exists(hello_world_dataset) + assert not object_store.empty(hello_world_dataset) + + # Test get_data + data = object_store.get_data(hello_world_dataset) + assert data == b"Hello World!" + + data = object_store.get_data(hello_world_dataset, start=1, count=6) + assert data == b"ello W" + + # Test Size + + # Test absent and empty datasets yield size of 0. + assert object_store.size(absent_dataset) == 0 + assert object_store.size(empty_dataset) == 0 + # Elsewise + assert object_store.size(hello_world_dataset) > 0 # Should this always be the number of bytes? + + # Test percent used (to some degree) + percent_store_used = object_store.get_store_usage_percent() + assert percent_store_used > 0.0 + assert percent_store_used < 100.0 + + # Test update_from_file test + output_dataset = MockDataset(4) + output_real_path = os.path.join(directory.temp_directory, "files1", "000", "dataset_4.dat") + assert not os.path.exists(output_real_path) + output_working_path = directory.write(b"NEW CONTENTS", "job_working_directory1/example_output") + object_store.update_from_file(output_dataset, file_name=output_working_path, create=True) + assert os.path.exists(output_real_path) + + # Test delete + to_delete_dataset = MockDataset(5) + to_delete_real_path = directory.write(b"content to be deleted!", "files1/000/dataset_5.dat") + assert object_store.exists(to_delete_dataset) + assert object_store.delete(to_delete_dataset) + assert not object_store.exists(to_delete_dataset) + assert not os.path.exists(to_delete_real_path) + + +HIERARCHICAL_TEST_CONFIG = """<?xml version="1.0"?> +<object_store type="hierarchical"> + <backends> + <backend id="files1" type="disk" weight="1" order="0"> + <files_dir path="${temp_directory}/files1"/> + <extra_dir type="temp" path="${temp_directory}/tmp1"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory1"/> + </backend> + <backend id="files2" type="disk" weight="1" order="1"> + <files_dir path="${temp_directory}/files2"/> + <extra_dir type="temp" path="${temp_directory}/tmp2"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory2"/> + </backend> + </backends> +</object_store> +""" + + +def test_hierarchical_store(): + with TestConfig(HIERARCHICAL_TEST_CONFIG) as (directory, object_store): + + # Test no dataset with id 1 exists. + assert not object_store.exists(MockDataset(1)) + + # Write empty dataset 2 in second backend, ensure it is empty and + # exists. + directory.write("", "files2/000/dataset_2.dat") + assert object_store.exists(MockDataset(2)) + assert object_store.empty(MockDataset(2)) + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + directory.write("Hello World!", "files1/000/dataset_3.dat") + assert object_store.exists(MockDataset(3)) + assert not object_store.empty(MockDataset(3)) + + # Assert creation always happens in first backend. + for i in range(100): + dataset = MockDataset(100 + i) + object_store.create(dataset) + assert object_store.get_filename(dataset).find("files1") > 0 + + +DISTRIBUTED_TEST_CONFIG = """<?xml version="1.0"?> +<object_store type="distributed"> + <backends> + <backend id="files1" type="disk" weight="2" order="0"> + <files_dir path="${temp_directory}/files1"/> + <extra_dir type="temp" path="${temp_directory}/tmp1"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory1"/> + </backend> + <backend id="files2" type="disk" weight="1" order="1"> + <files_dir path="${temp_directory}/files2"/> + <extra_dir type="temp" path="${temp_directory}/tmp2"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory2"/> + </backend> + </backends> +</object_store> +""" + + +def test_distributed_store(): + with TestConfig(DISTRIBUTED_TEST_CONFIG) as (directory, object_store): + with __stubbed_persistence() as persisted_ids: + for i in range(100): + dataset = MockDataset(100 + i) + object_store.create(dataset) + + ## Test distributes datasets between backends according to weights + backend_1_count = len([v for v in persisted_ids.values() if v == "files1"]) + backend_2_count = len([v for v in persisted_ids.values() if v == "files2"]) + + assert backend_1_count > 0 + assert backend_2_count > 0 + assert backend_1_count > backend_2_count + + +class TestConfig(object): + def __init__(self, config_xml): + self.temp_directory = mkdtemp() + self.write(config_xml, "store.xml") + config = MockConfig(self.temp_directory) + self.object_store = objectstore.build_object_store_from_config(config) + + def __enter__(self): + return self, self.object_store + + def __exit__(self, type, value, tb): + rmtree(self.temp_directory) + + def write(self, contents, name): + path = os.path.join(self.temp_directory, name) + directory = os.path.dirname(path) + if not os.path.exists(directory): + os.makedirs(directory) + expanded_contents = Template(contents).safe_substitute(temp_directory=self.temp_directory) + open(path, "w").write(expanded_contents) + return path + + +class MockConfig(object): + + def __init__(self, temp_directory): + self.file_path = temp_directory + self.object_store_config_file = os.path.join(temp_directory, "store.xml") + self.object_store_check_old_style = False + self.job_working_directory = temp_directory + self.new_file_path = temp_directory + self.umask = 0000 + + +class MockDataset(object): + + def __init__(self, id): + self.id = id + self.object_store_id = None + + +## Poor man's mocking. Need to get a real mocking library as real Galaxy development +## dependnecy. +PERSIST_METHOD_NAME = "create_object_in_session" + + +@contextmanager +def __stubbed_persistence(): + real_method = getattr(objectstore, PERSIST_METHOD_NAME) + try: + persisted_ids = {} + + def persist(object): + persisted_ids[object.id] = object.object_store_id + setattr(objectstore, PERSIST_METHOD_NAME, persist) + yield persisted_ids + + finally: + setattr(objectstore, PERSIST_METHOD_NAME, real_method) https://bitbucket.org/galaxy/galaxy-central/commits/5c62540e3f59/ Changeset: 5c62540e3f59 User: jmchilton Date: 2013-12-02 02:07:04 Summary: Add an LWR object store implementation. This changeset implements an object store implementation that delegates to a concrete remote object store instance via an LWR server. Can utilize existing LWR security mechanisms and multiple transport options. Updates the Galaxy LWR client through LWR changeset 2333f57. This changeset contains no tests for this new implementation, but the LWR contains the same code along with an integration test that spins up a temp LWR to test against - this test case demonstrating the lwr object store client and server can be found here : https://bitbucket.org/jmchilton/lwr/src/default/test/lwr_objectstore_test.py. Affected #: 4 files diff -r a194847d84d373cd7efdb581819115e7303e0568 -r 5c62540e3f5999fe70b3eeff73bee8cce5aa24c3 lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -50,6 +50,7 @@ return "No remote output found for path %s" % self.path +# TODO: Rename to job client. class Client(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -326,3 +327,55 @@ @parseJson() def file_available(self, path): return self._raw_execute("file_available", {"path": path}) + + +class ObjectStoreClient(object): + + def __init__(self, lwr_interface): + self.lwr_interface = lwr_interface + + @parseJson() + def exists(self, **kwds): + return self._raw_execute("object_store_exists", args=self.__data(**kwds)) + + @parseJson() + def file_ready(self, **kwds): + return self._raw_execute("object_store_file_ready", args=self.__data(**kwds)) + + @parseJson() + def create(self, **kwds): + return self._raw_execute("object_store_create", args=self.__data(**kwds)) + + @parseJson() + def empty(self, **kwds): + return self._raw_execute("object_store_empty", args=self.__data(**kwds)) + + @parseJson() + def size(self, **kwds): + return self._raw_execute("object_store_size", args=self.__data(**kwds)) + + @parseJson() + def delete(self, **kwds): + return self._raw_execute("object_store_delete", args=self.__data(**kwds)) + + @parseJson() + def get_data(self, **kwds): + return self._raw_execute("object_store_get_data", args=self.__data(**kwds)) + + @parseJson() + def get_filename(self, **kwds): + return self._raw_execute("object_store_get_filename", args=self.__data(**kwds)) + + @parseJson() + def update_from_file(self, **kwds): + return self._raw_execute("object_store_update_from_file", args=self.__data(**kwds)) + + @parseJson() + def get_store_usage_percent(self): + return self._raw_execute("object_store_get_store_usage_percent", args={}) + + def __data(self, **kwds): + return kwds + + def _raw_execute(self, command, args={}): + return self.lwr_interface.execute(command, args, data=None, input_path=None, output_path=None) diff -r a194847d84d373cd7efdb581819115e7303e0568 -r 5c62540e3f5999fe70b3eeff73bee8cce5aa24c3 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -19,6 +19,7 @@ from galaxy.util import unicodify as text_type from .client import Client, InputCachingClient +from .client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager from .destination import url_to_destination_params @@ -74,6 +75,26 @@ return destination_params +class ObjectStoreClientManager(object): + + def __init__(self, **kwds): + if 'object_store' in kwds: + self.interface_class = LocalLwrInterface + self.interface_args = dict(object_store=kwds['object_store']) + else: + self.interface_class = HttpLwrInterface + transport_type = kwds.get('transport_type', None) + transport = get_transport(transport_type) + self.interface_args = dict(transport=transport) + self.extra_client_kwds = {} + + def get_client(self, client_params): + interface_class = self.interface_class + interface_args = dict(destination_params=client_params, **self.interface_args) + interface = interface_class(**interface_args) + return ObjectStoreClient(interface) + + class JobManagerInteface(object): """ Abstract base class describes how client communicates with remote job @@ -114,9 +135,10 @@ class LocalLwrInterface(object): - def __init__(self, destination_params, job_manager, file_cache): + def __init__(self, destination_params, job_manager=None, file_cache=None, object_store=None): self.job_manager = job_manager self.file_cache = file_cache + self.object_store = object_store def __app_args(self): ## Arguments that would be specified from LwrApp if running @@ -124,6 +146,7 @@ return { 'manager': self.job_manager, 'file_cache': self.file_cache, + 'object_store': self.object_store, 'ip': None } @@ -204,4 +227,4 @@ int_val = int(val) return int_val -__all__ = [ClientManager, HttpLwrInterface] +__all__ = [ClientManager, ObjectStoreClientManager, HttpLwrInterface] diff -r a194847d84d373cd7efdb581819115e7303e0568 -r 5c62540e3f5999fe70b3eeff73bee8cce5aa24c3 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -623,6 +623,9 @@ elif store == 'irods': from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) + elif store == 'lwr': + from .lwr import LwrObjectStore + return LwrObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r a194847d84d373cd7efdb581819115e7303e0568 -r 5c62540e3f5999fe70b3eeff73bee8cce5aa24c3 lib/galaxy/objectstore/lwr.py --- /dev/null +++ b/lib/galaxy/objectstore/lwr.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import # Need to import lwr_client absolutely. +from ..objectstore import ObjectStore +try: + from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager +except ImportError: + from lwr.lwr_client.manager import ObjectStoreClientManager + + +class LwrObjectStore(ObjectStore): + """ + Object store implementation that delegates to a remote LWR server. + + This may be more aspirational than practical for now, it would be good to + Galaxy to a point that a handler thread could be setup that doesn't attempt + to access the disk files returned by a (this) object store - just passing + them along to the LWR unmodified. That modification - along with this + implementation and LWR job destinations would then allow Galaxy to fully + manage jobs on remote servers with completely different mount points. + + This implementation should be considered beta and may be dropped from + Galaxy at some future point or significantly modified. + """ + + def __init__(self, config, config_xml): + self.lwr_client = self.__build_lwr_client(config_xml) + + def exists(self, obj, **kwds): + return self.lwr_client.exists(**self.__build_kwds(obj, **kwds)) + + def file_ready(self, obj, **kwds): + return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds)) + + def create(self, obj, **kwds): + return self.lwr_client.create(**self.__build_kwds(obj, **kwds)) + + def empty(self, obj, **kwds): + return self.lwr_client.empty(**self.__build_kwds(obj, **kwds)) + + def size(self, obj, **kwds): + return self.lwr_client.size(**self.__build_kwds(obj, **kwds)) + + def delete(self, obj, **kwds): + return self.lwr_client.delete(**self.__build_kwds(obj, **kwds)) + + # TODO: Optimize get_data. + def get_data(self, obj, **kwds): + return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds)) + + def get_filename(self, obj, **kwds): + return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds)) + + def update_from_file(self, obj, **kwds): + return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds)) + + def get_store_usage_percent(self): + return self.lwr_client.get_store_usage_percent() + + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): + return None + + def __build_kwds(self, obj, **kwds): + kwds['object_id'] = obj.id + return kwds + pass + + def __build_lwr_client(self, config_xml): + url = config_xml.get("url") + private_token = config_xml.get("private_token", None) + transport = config_xml.get("transport", None) + manager_options = dict(transport=transport) + client_options = dict(url=url, private_token=private_token) + lwr_client = ObjectStoreClientManager(**manager_options).get_client(client_options) + return lwr_client + + def shutdown(self): + pass https://bitbucket.org/galaxy/galaxy-central/commits/a540510a3091/ Changeset: a540510a3091 User: dannon Date: 2013-12-03 17:12:57 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #269) Object Store dependency refactoring, testing, and added LWR implementation. Affected #: 14 files diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -14,7 +14,6 @@ import shutil import subprocess import sys -import threading import traceback from galaxy import model, util from galaxy.datatypes import metadata @@ -39,21 +38,6 @@ # and should eventually become API'd TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' -class Sleeper( object ): - """ - Provides a 'sleep' method that sleeps for a number of seconds *unless* - the notify method is called (from a different thread). - """ - def __init__( self ): - self.condition = threading.Condition() - def sleep( self, seconds ): - self.condition.acquire() - self.condition.wait( seconds ) - self.condition.release() - def wake( self ): - self.condition.acquire() - self.condition.notify() - self.condition.release() class JobDestination( Bunch ): """ diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -11,7 +11,8 @@ from sqlalchemy.sql.expression import and_, or_, select, func from galaxy import model -from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination +from galaxy.util.sleeper import Sleeper +from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination log = logging.getLogger( __name__ ) diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -11,7 +11,8 @@ from Queue import Empty, Queue from galaxy import model -from galaxy.jobs import handler, JobWrapper, NoopQueue, Sleeper +from galaxy.util.sleeper import Sleeper +from galaxy.jobs import handler, JobWrapper, NoopQueue from galaxy.util.json import from_json_string log = logging.getLogger( __name__ ) diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/jobs/runners/lwr_client/client.py --- a/lib/galaxy/jobs/runners/lwr_client/client.py +++ b/lib/galaxy/jobs/runners/lwr_client/client.py @@ -50,6 +50,7 @@ return "No remote output found for path %s" % self.path +# TODO: Rename to job client. class Client(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -326,3 +327,55 @@ @parseJson() def file_available(self, path): return self._raw_execute("file_available", {"path": path}) + + +class ObjectStoreClient(object): + + def __init__(self, lwr_interface): + self.lwr_interface = lwr_interface + + @parseJson() + def exists(self, **kwds): + return self._raw_execute("object_store_exists", args=self.__data(**kwds)) + + @parseJson() + def file_ready(self, **kwds): + return self._raw_execute("object_store_file_ready", args=self.__data(**kwds)) + + @parseJson() + def create(self, **kwds): + return self._raw_execute("object_store_create", args=self.__data(**kwds)) + + @parseJson() + def empty(self, **kwds): + return self._raw_execute("object_store_empty", args=self.__data(**kwds)) + + @parseJson() + def size(self, **kwds): + return self._raw_execute("object_store_size", args=self.__data(**kwds)) + + @parseJson() + def delete(self, **kwds): + return self._raw_execute("object_store_delete", args=self.__data(**kwds)) + + @parseJson() + def get_data(self, **kwds): + return self._raw_execute("object_store_get_data", args=self.__data(**kwds)) + + @parseJson() + def get_filename(self, **kwds): + return self._raw_execute("object_store_get_filename", args=self.__data(**kwds)) + + @parseJson() + def update_from_file(self, **kwds): + return self._raw_execute("object_store_update_from_file", args=self.__data(**kwds)) + + @parseJson() + def get_store_usage_percent(self): + return self._raw_execute("object_store_get_store_usage_percent", args={}) + + def __data(self, **kwds): + return kwds + + def _raw_execute(self, command, args={}): + return self.lwr_interface.execute(command, args, data=None, input_path=None, output_path=None) diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -19,6 +19,7 @@ from galaxy.util import unicodify as text_type from .client import Client, InputCachingClient +from .client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager from .destination import url_to_destination_params @@ -74,6 +75,26 @@ return destination_params +class ObjectStoreClientManager(object): + + def __init__(self, **kwds): + if 'object_store' in kwds: + self.interface_class = LocalLwrInterface + self.interface_args = dict(object_store=kwds['object_store']) + else: + self.interface_class = HttpLwrInterface + transport_type = kwds.get('transport_type', None) + transport = get_transport(transport_type) + self.interface_args = dict(transport=transport) + self.extra_client_kwds = {} + + def get_client(self, client_params): + interface_class = self.interface_class + interface_args = dict(destination_params=client_params, **self.interface_args) + interface = interface_class(**interface_args) + return ObjectStoreClient(interface) + + class JobManagerInteface(object): """ Abstract base class describes how client communicates with remote job @@ -114,9 +135,10 @@ class LocalLwrInterface(object): - def __init__(self, destination_params, job_manager, file_cache): + def __init__(self, destination_params, job_manager=None, file_cache=None, object_store=None): self.job_manager = job_manager self.file_cache = file_cache + self.object_store = object_store def __app_args(self): ## Arguments that would be specified from LwrApp if running @@ -124,6 +146,7 @@ return { 'manager': self.job_manager, 'file_cache': self.file_cache, + 'object_store': self.object_store, 'ip': None } @@ -204,4 +227,4 @@ int_val = int(val) return int_val -__all__ = [ClientManager, HttpLwrInterface] +__all__ = [ClientManager, ObjectStoreClientManager, HttpLwrInterface] diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -29,6 +29,7 @@ from galaxy.util import asbool, is_multi_byte, nice_size, Params, restore_text, send_mail from galaxy.util.bunch import Bunch from galaxy.util.hash_util import new_secure_hash +from galaxy.util.directory_hash import directory_hash_id from galaxy.web.framework.helpers import to_unicode from galaxy.web.form_builder import (AddressField, CheckboxField, HistoryField, PasswordField, SelectField, TextArea, TextField, WorkflowField, @@ -4012,18 +4013,3 @@ self.repository_id = repository_id self.repository_path = repository_path self.version = version - -## ---- Utility methods ------------------------------------------------------- - -def directory_hash_id( id ): - s = str( id ) - l = len( s ) - # Shortcut -- ids 0-999 go under ../000/ - if l < 4: - return [ "000" ] - # Pad with zeros until a multiple of three - padded = ( ( 3 - len( s ) % 3 ) * "0" ) + s - # Drop the last three digits -- 1000 files per directory - padded = padded[:-3] - # Break into chunks of three - return [ padded[i*3:(i+1)*3] for i in range( len( padded ) // 3 ) ] diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -9,14 +9,19 @@ import shutil import logging import threading +from xml.etree import ElementTree -from galaxy import util +from galaxy.util import umask_fix_perms, force_symlink from galaxy.exceptions import ObjectInvalid, ObjectNotFound -from galaxy.jobs import Sleeper -from galaxy.model import directory_hash_id +from galaxy.util.sleeper import Sleeper +from galaxy.util.directory_hash import directory_hash_id from galaxy.util.odict import odict +try: + from sqlalchemy.orm import object_session +except ImportError: + object_session = None -from sqlalchemy.orm import object_session +NO_SESSION_ERROR_MESSAGE = "Attempted to 'create' object store entity in configuration with no database session present." log = logging.getLogger( __name__ ) @@ -202,7 +207,7 @@ self.extra_dirs['job_work'] = config.job_working_directory self.extra_dirs['temp'] = config.new_file_path #The new config_xml overrides universe settings. - if config_xml: + if config_xml is not None: for e in config_xml: if e.tag == 'files_dir': self.file_path = e.get('path') @@ -290,8 +295,8 @@ os.makedirs(dir) # Create the file if it does not exist if not dir_only: - open(path, 'w').close() - util.umask_fix_perms(path, self.config.umask, 0666) + open(path, 'w').close() # Should be rb? + umask_fix_perms(path, self.config.umask, 0666) def empty(self, obj, **kwargs): return os.path.getsize(self.get_filename(obj, **kwargs)) == 0 @@ -320,7 +325,7 @@ return False def get_data(self, obj, start=0, count=-1, **kwargs): - data_file = open(self.get_filename(obj, **kwargs), 'r') + data_file = open(self.get_filename(obj, **kwargs), 'r') # Should be rb? data_file.seek(start) content = data_file.read(count) data_file.close() @@ -345,7 +350,7 @@ if file_name and self.exists(obj, **kwargs): try: if preserve_symlinks and os.path.islink( file_name ): - util.force_symlink( os.readlink( file_name ), self.get_filename( obj, **kwargs ) ) + force_symlink( os.readlink( file_name ), self.get_filename( obj, **kwargs ) ) else: shutil.copy( file_name, self.get_filename( obj, **kwargs ) ) except IOError, ex: @@ -358,7 +363,7 @@ def get_store_usage_percent(self): st = os.statvfs(self.file_path) - return (float(st.f_blocks - st.f_bavail)/st.f_blocks) * 100 + return ( float( st.f_blocks - st.f_bavail ) / st.f_blocks ) * 100 class CachingObjectStore(ObjectStore): @@ -428,7 +433,7 @@ return store.__getattribute__(method)(obj, **kwargs) if default_is_exception: raise default( 'objectstore, __call_method failed: %s on %s, kwargs: %s' - %( method, str( obj ), str( kwargs ) ) ) + % ( method, str( obj ), str( kwargs ) ) ) else: return default @@ -442,7 +447,7 @@ def __init__(self, config, config_xml=None, fsmon=False): super(DistributedObjectStore, self).__init__(config, config_xml=config_xml) - if not config_xml: + if config_xml is None: self.distributed_config = config.distributed_object_store_config_file assert self.distributed_config is not None, "distributed object store ('object_store = distributed') " \ "requires a config file, please set one in " \ @@ -462,10 +467,9 @@ self.filesystem_monitor_thread.start() log.info("Filesystem space monitor started") - def __parse_distributed_config(self, config, config_xml = None): - if not config_xml: - tree = util.parse_xml(self.distributed_config) - root = tree.getroot() + def __parse_distributed_config(self, config, config_xml=None): + if config_xml is None: + root = ElementTree.parse(self.distributed_config).getroot() log.debug('Loading backends for distributed object store from %s' % self.distributed_config) else: root = config_xml.find('backends') @@ -512,7 +516,7 @@ if pct > maxpct: new_weighted_backend_ids = filter(lambda x: x != id, new_weighted_backend_ids) self.weighted_backend_ids = new_weighted_backend_ids - self.sleeper.sleep(120) # Test free space every 2 minutes + self.sleeper.sleep(120) # Test free space every 2 minutes def create(self, obj, **kwargs): """ @@ -524,9 +528,8 @@ obj.object_store_id = random.choice(self.weighted_backend_ids) except IndexError: raise ObjectInvalid( 'objectstore.create, could not generate obj.object_store_id: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) - object_session( obj ).add( obj ) - object_session( obj ).flush() + % ( str( obj ), str( kwargs ) ) ) + create_object_in_session( obj ) log.debug("Selected backend '%s' for creation of %s %s" % (obj.object_store_id, obj.__class__.__name__, obj.id)) else: log.debug("Using preferred backend '%s' for creation of %s %s" % (obj.object_store_id, obj.__class__.__name__, obj.id)) @@ -538,7 +541,7 @@ return self.backends[object_store_id].__getattribute__(method)(obj, **kwargs) if default_is_exception: raise default( 'objectstore, __call_method failed: %s on %s, kwargs: %s' - %( method, str( obj ), str( kwargs ) ) ) + % ( method, str( obj ), str( kwargs ) ) ) else: return default @@ -554,8 +557,7 @@ if store.exists(obj, **kwargs): log.warning('%s object with ID %s found in backend object store with ID %s' % (obj.__class__.__name__, obj.id, id)) obj.object_store_id = id - object_session( obj ).add( obj ) - object_session( obj ).flush() + create_object_in_session( obj ) return id return None @@ -594,15 +596,14 @@ Depending on the configuration setting, invoke the appropriate object store """ - if not config_xml and config.object_store_config_file: + if config_xml is None and config.object_store_config_file: # This is a top level invocation of build_object_store_from_config, and # we have an object_store_conf.xml -- read the .xml and build # accordingly - tree = util.parse_xml(config.object_store_config_file) - root = tree.getroot() + root = ElementTree.parse(config.object_store_config_file).getroot() store = root.get('type') config_xml = root - elif config_xml: + elif config_xml is not None: store = config_xml.get('type') else: store = config.object_store @@ -610,21 +611,25 @@ if store == 'disk': return DiskObjectStore(config=config, config_xml=config_xml) elif store == 's3': - from galaxy.objectstore.s3 import S3ObjectStore + from .s3 import S3ObjectStore return S3ObjectStore(config=config, config_xml=config_xml) elif store == 'swift': - from galaxy.objectstore.s3 import SwiftObjectStore + from .s3 import SwiftObjectStore return SwiftObjectStore(config=config, config_xml=config_xml) elif store == 'distributed': return DistributedObjectStore(config=config, fsmon=fsmon, config_xml=config_xml) elif store == 'hierarchical': return HierarchicalObjectStore(config=config, config_xml=config_xml) elif store == 'irods': - from galaxy.objectstore.rods import IRODSObjectStore + from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) + elif store == 'lwr': + from .lwr import LwrObjectStore + return LwrObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) + def local_extra_dirs( func ): """ A decorator for non-local plugins to utilize local directories for their extra_dirs (job_working_directory and temp). """ @@ -638,6 +643,7 @@ raise Exception( "Could not call DiskObjectStore's %s method, does your Object Store plugin inherit from DiskObjectStore?" % func.__name__ ) return wraps + def convert_bytes(bytes): """ A helper function used for pretty printing disk usage """ if bytes is None: @@ -659,3 +665,12 @@ else: size = '%.2fb' % bytes return size + + +def create_object_in_session( obj ): + session = object_session( obj ) if object_session is not None else None + if session is not None: + object_session( obj ).add( obj ) + object_session( obj ).flush() + else: + raise Exception( NO_SESSION_ERROR_MESSAGE ) diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/objectstore/lwr.py --- /dev/null +++ b/lib/galaxy/objectstore/lwr.py @@ -0,0 +1,76 @@ +from __future__ import absolute_import # Need to import lwr_client absolutely. +from ..objectstore import ObjectStore +try: + from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager +except ImportError: + from lwr.lwr_client.manager import ObjectStoreClientManager + + +class LwrObjectStore(ObjectStore): + """ + Object store implementation that delegates to a remote LWR server. + + This may be more aspirational than practical for now, it would be good to + Galaxy to a point that a handler thread could be setup that doesn't attempt + to access the disk files returned by a (this) object store - just passing + them along to the LWR unmodified. That modification - along with this + implementation and LWR job destinations would then allow Galaxy to fully + manage jobs on remote servers with completely different mount points. + + This implementation should be considered beta and may be dropped from + Galaxy at some future point or significantly modified. + """ + + def __init__(self, config, config_xml): + self.lwr_client = self.__build_lwr_client(config_xml) + + def exists(self, obj, **kwds): + return self.lwr_client.exists(**self.__build_kwds(obj, **kwds)) + + def file_ready(self, obj, **kwds): + return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds)) + + def create(self, obj, **kwds): + return self.lwr_client.create(**self.__build_kwds(obj, **kwds)) + + def empty(self, obj, **kwds): + return self.lwr_client.empty(**self.__build_kwds(obj, **kwds)) + + def size(self, obj, **kwds): + return self.lwr_client.size(**self.__build_kwds(obj, **kwds)) + + def delete(self, obj, **kwds): + return self.lwr_client.delete(**self.__build_kwds(obj, **kwds)) + + # TODO: Optimize get_data. + def get_data(self, obj, **kwds): + return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds)) + + def get_filename(self, obj, **kwds): + return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds)) + + def update_from_file(self, obj, **kwds): + return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds)) + + def get_store_usage_percent(self): + return self.lwr_client.get_store_usage_percent() + + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): + return None + + def __build_kwds(self, obj, **kwds): + kwds['object_id'] = obj.id + return kwds + pass + + def __build_lwr_client(self, config_xml): + url = config_xml.get("url") + private_token = config_xml.get("private_token", None) + transport = config_xml.get("transport", None) + manager_options = dict(transport=transport) + client_options = dict(url=url, private_token=private_token) + lwr_client = ObjectStoreClientManager(**manager_options).get_client(client_options) + return lwr_client + + def shutdown(self): + pass diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/objectstore/rods.py --- a/lib/galaxy/objectstore/rods.py +++ b/lib/galaxy/objectstore/rods.py @@ -12,12 +12,17 @@ from posixpath import basename as path_basename from posixpath import dirname as path_dirname -from galaxy.objectstore import DiskObjectStore, ObjectStore, local_extra_dirs from galaxy.exceptions import ObjectNotFound +from ..objectstore import DiskObjectStore, ObjectStore, local_extra_dirs -import galaxy.eggs -galaxy.eggs.require( 'PyRods' ) -import irods +try: + import galaxy.eggs + galaxy.eggs.require( 'PyRods' ) + import irods +except ImportError: + irods = None + +NO_PYRODS_ERROR_MESSAGE = "IRODS object store configured, but no PyRods dependency available. Please install and properly configure PyRods or modify object store configuration." log = logging.getLogger( __name__ ) @@ -27,6 +32,8 @@ Galaxy object store based on iRODS """ def __init__( self, config, file_path=None, extra_dirs=None ): + if irods is None: + raise Exception(NO_PYRODS_ERROR_MESSAGE) super( IRODSObjectStore, self ).__init__( config, file_path=file_path, extra_dirs=extra_dirs ) self.cache_path = config.object_store_cache_path self.default_resource = config.irods_default_resource or None @@ -146,7 +153,7 @@ doi = irods.dataObjInp_t() doi.objPath = rods_path doi.createMode = 0640 - doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable + doi.dataSize = 0 # 0 actually means "unknown", although literally 0 would be preferable irods.addKeyVal( doi.condInput, irods.DEST_RESC_NAME_KW, self.default_resource ) status = irods.rcDataObjCreate( self.rods_conn, doi ) assert status >= 0, 'create(): rcDataObjCreate() failed: %s: %s: %s' % ( rods_path, status, irods.strerror( status ) ) @@ -237,7 +244,7 @@ incoming_path = os.path.join( os.path.dirname( cached_path ), "__incoming_%s" % os.path.basename( cached_path ) ) doi = irods.dataObjInp_t() doi.objPath = self.__get_rods_path( obj, **kwargs ) - doi.dataSize = 0 # TODO: does this affect performance? should we get size? + doi.dataSize = 0 # TODO: does this affect performance? should we get size? doi.numThreads = 0 # TODO: might want to VERIFY_CHKSUM_KW log.debug( 'get_filename(): caching %s to %s', doi.objPath, incoming_path ) @@ -296,6 +303,7 @@ def get_store_usage_percent(self): return 0.0 + # monkeypatch an strerror method into the irods module def _rods_strerror( errno ): """ @@ -309,7 +317,9 @@ irods.__rods_strerror_map[ v ] = name return irods.__rods_strerror_map.get( errno, 'GALAXY_NO_ERRNO_MAPPING_FOUND' ) -irods.strerror = _rods_strerror +if irods is not None: + irods.strerror = _rods_strerror + def rods_connect(): """ diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -10,21 +10,26 @@ import subprocess from datetime import datetime -from galaxy import util -from galaxy.jobs import Sleeper -from galaxy.model import directory_hash_id -from galaxy.objectstore import ObjectStore, convert_bytes +from galaxy.util import umask_fix_perms +from galaxy.util.sleeper import Sleeper +from galaxy.util.directory_hash import directory_hash_id +from ..objectstore import ObjectStore, convert_bytes from galaxy.exceptions import ObjectNotFound import multiprocessing -from galaxy.objectstore.s3_multipart_upload import multipart_upload -import boto -from boto.s3.key import Key -from boto.s3.connection import S3Connection -from boto.exception import S3ResponseError +from .s3_multipart_upload import multipart_upload +try: + import boto + from boto.s3.key import Key + from boto.s3.connection import S3Connection + from boto.exception import S3ResponseError +except ImportError: + boto = None + +NO_BOTO_ERROR_MESSAGE = "S3/Swift object store configured, but no boto dependency available. Please install and properly configure boto or modify object store configuration." log = logging.getLogger( __name__ ) -logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy +logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy class S3ObjectStore(ObjectStore): @@ -34,6 +39,8 @@ Galaxy and S3. """ def __init__(self, config, config_xml): + if boto is None: + raise Exception(NO_BOTO_ERROR_MESSAGE) super(S3ObjectStore, self).__init__(config, config_xml) self.config = config self.staging_path = self.config.file_path @@ -82,7 +89,7 @@ raise def __cache_monitor(self): - time.sleep(2) # Wait for things to load before starting the monitor + time.sleep(2) # Wait for things to load before starting the monitor while self.running: total_size = 0 # Is this going to be too expensive of an operation to be done frequently? @@ -110,7 +117,7 @@ # For now, delete enough to leave at least 10% of the total cache free delete_this_much = total_size - cache_limit self.__clean_cache(file_list, delete_this_much) - self.sleeper.sleep(30) # Test cache size every 30 seconds? + self.sleeper.sleep(30) # Test cache size every 30 seconds? def __clean_cache(self, file_list, delete_this_much): """ Keep deleting files from the file_list until the size of the deleted @@ -154,7 +161,7 @@ log.debug("Using cloud object store with bucket '%s'" % bucket.name) return bucket except S3ResponseError: - log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i+1)) + log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i + 1)) time.sleep(2) # All the attempts have been exhausted and connection was not established, # raise error @@ -163,13 +170,13 @@ def _fix_permissions(self, rel_path): """ Set permissions on rel_path""" for basedir, dirs, files in os.walk(rel_path): - util.umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) + umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) for f in files: path = os.path.join(basedir, f) # Ignore symlinks if os.path.islink(path): continue - util.umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) + umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): rel_path = os.path.join(*directory_hash_id(obj.id)) @@ -283,7 +290,7 @@ return True else: log.debug("Pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) - self.transfer_progress = 0 # Reset transfer progress counter + self.transfer_progress = 0 # Reset transfer progress counter key.get_contents_to_filename(self._get_cache_path(rel_path), cb=self._transfer_cb, num_cb=10) return True except S3ResponseError, ex: @@ -315,14 +322,14 @@ mb_size = os.path.getsize(source_file) / 1e6 #DBTODO Hack, refactor this logic. if mb_size < 60 or type(self) == SwiftObjectStore: - self.transfer_progress = 0 # Reset transfer progress counter + self.transfer_progress = 0 # Reset transfer progress counter key.set_contents_from_filename(source_file, reduced_redundancy=self.use_rr, cb=self._transfer_cb, num_cb=10) else: multipart_upload(self.bucket, key.name, source_file, mb_size, use_rr=self.use_rr) end_time = datetime.now() # print "+ Push ended at '%s'; %s bytes transfered in %ssec" % (end_time, os.path.getsize(source_file), end_time-start_time) - log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time-start_time)) + log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time - start_time)) return True else: log.error("Tried updating key '%s' from source file '%s', but source file does not exist." @@ -408,7 +415,7 @@ return bool(self.size(obj, **kwargs) > 0) else: raise ObjectNotFound( 'objectstore.empty, object does not exist: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) def size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) @@ -484,7 +491,7 @@ return cache_path # Check if the file exists in persistent storage and, if it does, pull it into cache elif self.exists(obj, **kwargs): - if dir_only: # Directories do not get pulled into cache + if dir_only: # Directories do not get pulled into cache return cache_path else: if self._pull_into_cache(rel_path): @@ -494,7 +501,7 @@ # if dir_only: # return cache_path raise ObjectNotFound( 'objectstore.get_filename, no cache_path: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) # return cache_path # Until the upload tool does not explicitly create the dataset, return expected path def update_from_file(self, obj, file_name=None, create=False, **kwargs): @@ -520,14 +527,14 @@ self._push_to_os(rel_path, source_file) else: raise ObjectNotFound( 'objectstore.update_from_file, object does not exist: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) + % ( str( obj ), str( kwargs ) ) ) def get_object_url(self, obj, **kwargs): if self.exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) try: key = Key(self.bucket, rel_path) - return key.generate_url(expires_in = 86400) # 24hrs + return key.generate_url(expires_in=86400) # 24hrs except S3ResponseError, ex: log.warning("Trouble generating URL for dataset '%s': %s" % (rel_path, ex)) return None @@ -552,4 +559,3 @@ port=self.port, calling_format=boto.s3.connection.OrdinaryCallingFormat(), path=self.conn_path) - diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/objectstore/s3_multipart_upload.py --- a/lib/galaxy/objectstore/s3_multipart_upload.py +++ b/lib/galaxy/objectstore/s3_multipart_upload.py @@ -4,10 +4,8 @@ This parallelizes the task over available cores using multiprocessing. Code mostly taken form CloudBioLinux. """ -from __future__ import with_statement import os -import sys import glob import subprocess import contextlib @@ -16,10 +14,17 @@ import multiprocessing from multiprocessing.pool import IMapIterator -from galaxy import eggs -eggs.require('boto') +try: + from galaxy import eggs + eggs.require('boto') +except ImportError: + pass -import boto +try: + import boto +except ImportError: + boto = None + def map_wrap(f): @functools.wraps(f) diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/util/directory_hash.py --- /dev/null +++ b/lib/galaxy/util/directory_hash.py @@ -0,0 +1,23 @@ + + +def directory_hash_id( id ): + """ + + >>> directory_hash_id( 100 ) + ['000'] + >>> directory_hash_id( "90000" ) + ['090'] + >>> directory_hash_id("777777777") + ['000', '777', '777'] + """ + s = str( id ) + l = len( s ) + # Shortcut -- ids 0-999 go under ../000/ + if l < 4: + return [ "000" ] + # Pad with zeros until a multiple of three + padded = ( ( 3 - len( s ) % 3 ) * "0" ) + s + # Drop the last three digits -- 1000 files per directory + padded = padded[:-3] + # Break into chunks of three + return [ padded[ i * 3 : (i + 1 ) * 3 ] for i in range( len( padded ) // 3 ) ] diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f lib/galaxy/util/sleeper.py --- /dev/null +++ b/lib/galaxy/util/sleeper.py @@ -0,0 +1,22 @@ +import threading + + +class Sleeper( object ): + """ + Provides a 'sleep' method that sleeps for a number of seconds *unless* + the notify method is called (from a different thread). + """ + def __init__( self ): + self.condition = threading.Condition() + + def sleep( self, seconds ): + # Should this be in a try/finally block? -John + self.condition.acquire() + self.condition.wait( seconds ) + self.condition.release() + + def wake( self ): + # Should this be in a try/finally block? -John + self.condition.acquire() + self.condition.notify() + self.condition.release() diff -r 3116103966fa1cd6c942aa918bcd647f39c0aae3 -r a540510a3091ad7cfe3c4680c8bb04a20679e44f test/unit/test_objectstore.py --- /dev/null +++ b/test/unit/test_objectstore.py @@ -0,0 +1,210 @@ +import os +from shutil import rmtree +from string import Template +from tempfile import mkdtemp +try: + from galaxy import objectstore +except ImportError: + from lwr import objectstore +from contextlib import contextmanager + +DISK_TEST_CONFIG = """<?xml version="1.0"?> +<object_store type="disk"> + <files_dir path="${temp_directory}/files1"/> + <extra_dir type="temp" path="${temp_directory}/tmp1"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory1"/> +</object_store> +""" + + +def test_disk_store(): + with TestConfig(DISK_TEST_CONFIG) as (directory, object_store): + # Test no dataset with id 1 exists. + absent_dataset = MockDataset(1) + assert not object_store.exists(absent_dataset) + + # Write empty dataset 2 in second backend, ensure it is empty and + # exists. + empty_dataset = MockDataset(2) + directory.write(b"", "files1/000/dataset_2.dat") + assert object_store.exists(empty_dataset) + assert object_store.empty(empty_dataset) + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + hello_world_dataset = MockDataset(3) + directory.write(b"Hello World!", "files1/000/dataset_3.dat") + assert object_store.exists(hello_world_dataset) + assert not object_store.empty(hello_world_dataset) + + # Test get_data + data = object_store.get_data(hello_world_dataset) + assert data == b"Hello World!" + + data = object_store.get_data(hello_world_dataset, start=1, count=6) + assert data == b"ello W" + + # Test Size + + # Test absent and empty datasets yield size of 0. + assert object_store.size(absent_dataset) == 0 + assert object_store.size(empty_dataset) == 0 + # Elsewise + assert object_store.size(hello_world_dataset) > 0 # Should this always be the number of bytes? + + # Test percent used (to some degree) + percent_store_used = object_store.get_store_usage_percent() + assert percent_store_used > 0.0 + assert percent_store_used < 100.0 + + # Test update_from_file test + output_dataset = MockDataset(4) + output_real_path = os.path.join(directory.temp_directory, "files1", "000", "dataset_4.dat") + assert not os.path.exists(output_real_path) + output_working_path = directory.write(b"NEW CONTENTS", "job_working_directory1/example_output") + object_store.update_from_file(output_dataset, file_name=output_working_path, create=True) + assert os.path.exists(output_real_path) + + # Test delete + to_delete_dataset = MockDataset(5) + to_delete_real_path = directory.write(b"content to be deleted!", "files1/000/dataset_5.dat") + assert object_store.exists(to_delete_dataset) + assert object_store.delete(to_delete_dataset) + assert not object_store.exists(to_delete_dataset) + assert not os.path.exists(to_delete_real_path) + + +HIERARCHICAL_TEST_CONFIG = """<?xml version="1.0"?> +<object_store type="hierarchical"> + <backends> + <backend id="files1" type="disk" weight="1" order="0"> + <files_dir path="${temp_directory}/files1"/> + <extra_dir type="temp" path="${temp_directory}/tmp1"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory1"/> + </backend> + <backend id="files2" type="disk" weight="1" order="1"> + <files_dir path="${temp_directory}/files2"/> + <extra_dir type="temp" path="${temp_directory}/tmp2"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory2"/> + </backend> + </backends> +</object_store> +""" + + +def test_hierarchical_store(): + with TestConfig(HIERARCHICAL_TEST_CONFIG) as (directory, object_store): + + # Test no dataset with id 1 exists. + assert not object_store.exists(MockDataset(1)) + + # Write empty dataset 2 in second backend, ensure it is empty and + # exists. + directory.write("", "files2/000/dataset_2.dat") + assert object_store.exists(MockDataset(2)) + assert object_store.empty(MockDataset(2)) + + # Write non-empty dataset in backend 1, test it is not emtpy & exists. + directory.write("Hello World!", "files1/000/dataset_3.dat") + assert object_store.exists(MockDataset(3)) + assert not object_store.empty(MockDataset(3)) + + # Assert creation always happens in first backend. + for i in range(100): + dataset = MockDataset(100 + i) + object_store.create(dataset) + assert object_store.get_filename(dataset).find("files1") > 0 + + +DISTRIBUTED_TEST_CONFIG = """<?xml version="1.0"?> +<object_store type="distributed"> + <backends> + <backend id="files1" type="disk" weight="2" order="0"> + <files_dir path="${temp_directory}/files1"/> + <extra_dir type="temp" path="${temp_directory}/tmp1"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory1"/> + </backend> + <backend id="files2" type="disk" weight="1" order="1"> + <files_dir path="${temp_directory}/files2"/> + <extra_dir type="temp" path="${temp_directory}/tmp2"/> + <extra_dir type="job_work" path="${temp_directory}/job_working_directory2"/> + </backend> + </backends> +</object_store> +""" + + +def test_distributed_store(): + with TestConfig(DISTRIBUTED_TEST_CONFIG) as (directory, object_store): + with __stubbed_persistence() as persisted_ids: + for i in range(100): + dataset = MockDataset(100 + i) + object_store.create(dataset) + + ## Test distributes datasets between backends according to weights + backend_1_count = len([v for v in persisted_ids.values() if v == "files1"]) + backend_2_count = len([v for v in persisted_ids.values() if v == "files2"]) + + assert backend_1_count > 0 + assert backend_2_count > 0 + assert backend_1_count > backend_2_count + + +class TestConfig(object): + def __init__(self, config_xml): + self.temp_directory = mkdtemp() + self.write(config_xml, "store.xml") + config = MockConfig(self.temp_directory) + self.object_store = objectstore.build_object_store_from_config(config) + + def __enter__(self): + return self, self.object_store + + def __exit__(self, type, value, tb): + rmtree(self.temp_directory) + + def write(self, contents, name): + path = os.path.join(self.temp_directory, name) + directory = os.path.dirname(path) + if not os.path.exists(directory): + os.makedirs(directory) + expanded_contents = Template(contents).safe_substitute(temp_directory=self.temp_directory) + open(path, "w").write(expanded_contents) + return path + + +class MockConfig(object): + + def __init__(self, temp_directory): + self.file_path = temp_directory + self.object_store_config_file = os.path.join(temp_directory, "store.xml") + self.object_store_check_old_style = False + self.job_working_directory = temp_directory + self.new_file_path = temp_directory + self.umask = 0000 + + +class MockDataset(object): + + def __init__(self, id): + self.id = id + self.object_store_id = None + + +## Poor man's mocking. Need to get a real mocking library as real Galaxy development +## dependnecy. +PERSIST_METHOD_NAME = "create_object_in_session" + + +@contextmanager +def __stubbed_persistence(): + real_method = getattr(objectstore, PERSIST_METHOD_NAME) + try: + persisted_ids = {} + + def persist(object): + persisted_ids[object.id] = object.object_store_id + setattr(objectstore, PERSIST_METHOD_NAME, persist) + yield persisted_ids + + finally: + setattr(objectstore, PERSIST_METHOD_NAME, real_method) Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.