commit/galaxy-central: natefoo: Move S3 Object Store to its own module (more to come, probably).
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/c732173231b8/ Changeset: c732173231b8 User: natefoo Date: 2013-08-20 18:05:30 Summary: Move S3 Object Store to its own module (more to come, probably). Affected #: 2 files diff -r 267d294711e9f72dcfbff842ce263a1050be058f -r c732173231b89ce28bd7b8ee629b1af32c0b37e7 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -5,14 +5,11 @@ """ import os -import sys import time import random import shutil import logging import threading -import subprocess -from datetime import datetime from galaxy import util from galaxy.jobs import Sleeper @@ -21,15 +18,7 @@ from sqlalchemy.orm import object_session -import multiprocessing -from galaxy.objectstore.s3_multipart_upload import multipart_upload -import boto -from boto.s3.key import Key -from boto.s3.connection import S3Connection -from boto.exception import S3ResponseError - log = logging.getLogger( __name__ ) -logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy class ObjectStore(object): @@ -373,490 +362,6 @@ super(CachingObjectStore, self).__init__(self, path, backend) -class S3ObjectStore(ObjectStore): - """ - Object store that stores objects as items in an AWS S3 bucket. A local - cache exists that is used as an intermediate location for files between - Galaxy and S3. - """ - def __init__(self, config): - super(S3ObjectStore, self).__init__() - self.config = config - self.staging_path = self.config.file_path - self.s3_conn = get_OS_connection(self.config) - self.bucket = self._get_bucket(self.config.os_bucket_name) - self.use_rr = self.config.os_use_reduced_redundancy - self.cache_size = self.config.object_store_cache_size - self.transfer_progress = 0 - # Clean cache only if value is set in universe_wsgi.ini - if self.cache_size != -1: - # Convert GBs to bytes for comparison - self.cache_size = self.cache_size * 1073741824 - # Helper for interruptable sleep - self.sleeper = Sleeper() - self.cache_monitor_thread = threading.Thread(target=self.__cache_monitor) - self.cache_monitor_thread.start() - log.info("Cache cleaner manager started") - # Test if 'axel' is available for parallel download and pull the key into cache - try: - subprocess.call('axel') - self.use_axel = True - except OSError: - self.use_axel = False - - def __cache_monitor(self): - time.sleep(2) # Wait for things to load before starting the monitor - while self.running: - total_size = 0 - # Is this going to be too expensive of an operation to be done frequently? - file_list = [] - for dirpath, dirnames, filenames in os.walk(self.staging_path): - for f in filenames: - fp = os.path.join(dirpath, f) - file_size = os.path.getsize(fp) - total_size += file_size - # Get the time given file was last accessed - last_access_time = time.localtime(os.stat(fp)[7]) - # Compose a tuple of the access time and the file path - file_tuple = last_access_time, fp, file_size - file_list.append(file_tuple) - # Sort the file list (based on access time) - file_list.sort() - # Initiate cleaning once within 10% of the defined cache size? - cache_limit = self.cache_size * 0.9 - if total_size > cache_limit: - log.info("Initiating cache cleaning: current cache size: %s; clean until smaller than: %s" \ - % (convert_bytes(total_size), convert_bytes(cache_limit))) - # How much to delete? If simply deleting up to the cache-10% limit, - # is likely to be deleting frequently and may run the risk of hitting - # the limit - maybe delete additional #%? - # For now, delete enough to leave at least 10% of the total cache free - delete_this_much = total_size - cache_limit - self.__clean_cache(file_list, delete_this_much) - self.sleeper.sleep(30) # Test cache size every 30 seconds? - - def __clean_cache(self, file_list, delete_this_much): - """ Keep deleting files from the file_list until the size of the deleted - files is greater than the value in delete_this_much parameter. - - :type file_list: list - :param file_list: List of candidate files that can be deleted. This method - will start deleting files from the beginning of the list so the list - should be sorted accordingly. The list must contains 3-element tuples, - positioned as follows: position 0 holds file last accessed timestamp - (as time.struct_time), position 1 holds file path, and position 2 has - file size (e.g., (<access time>, /mnt/data/dataset_1.dat), 472394) - - :type delete_this_much: int - :param delete_this_much: Total size of files, in bytes, that should be deleted. - """ - # Keep deleting datasets from file_list until deleted_amount does not - # exceed delete_this_much; start deleting from the front of the file list, - # which assumes the oldest files come first on the list. - deleted_amount = 0 - for i, f in enumerate(file_list): - if deleted_amount < delete_this_much: - deleted_amount += f[2] - os.remove(f[1]) - # Debugging code for printing deleted files' stats - # folder, file_name = os.path.split(f[1]) - # file_date = time.strftime("%m/%d/%y %H:%M:%S", f[0]) - # log.debug("%s. %-25s %s, size %s (deleted %s/%s)" \ - # % (i, file_name, convert_bytes(f[2]), file_date, \ - # convert_bytes(deleted_amount), convert_bytes(delete_this_much))) - else: - log.debug("Cache cleaning done. Total space freed: %s" % convert_bytes(deleted_amount)) - return - - def _get_bucket(self, bucket_name): - """ Sometimes a handle to a bucket is not established right away so try - it a few times. Raise error is connection is not established. """ - for i in range(5): - try: - bucket = self.s3_conn.get_bucket(bucket_name) - log.debug("Using cloud object store with bucket '%s'" % bucket.name) - return bucket - except S3ResponseError: - log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i+1)) - time.sleep(2) - # All the attempts have been exhausted and connection was not established, - # raise error - raise S3ResponseError - - def _fix_permissions(self, rel_path): - """ Set permissions on rel_path""" - for basedir, dirs, files in os.walk(rel_path): - util.umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) - for f in files: - path = os.path.join(basedir, f) - # Ignore symlinks - if os.path.islink(path): - continue - util.umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) - - def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): - rel_path = os.path.join(*directory_hash_id(obj.id)) - if extra_dir is not None: - if extra_dir_at_root: - rel_path = os.path.join(extra_dir, rel_path) - else: - rel_path = os.path.join(rel_path, extra_dir) - # S3 folders are marked by having trailing '/' so add it now - rel_path = '%s/' % rel_path - if not dir_only: - rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id) - return rel_path - - def _get_cache_path(self, rel_path): - return os.path.abspath(os.path.join(self.staging_path, rel_path)) - - def _get_transfer_progress(self): - return self.transfer_progress - - def _get_size_in_s3(self, rel_path): - try: - key = self.bucket.get_key(rel_path) - if key: - return key.size - except S3ResponseError, ex: - log.error("Could not get size of key '%s' from S3: %s" % (rel_path, ex)) - except Exception, ex: - log.error("Could not get reference to the key object '%s'; returning -1 for key size: %s" % (rel_path, ex)) - return -1 - - def _key_exists(self, rel_path): - exists = False - try: - # A hackish way of testing if the rel_path is a folder vs a file - is_dir = rel_path[-1] == '/' - if is_dir: - rs = self.bucket.get_all_keys(prefix=rel_path) - if len(rs) > 0: - exists = True - else: - exists = False - else: - key = Key(self.bucket, rel_path) - exists = key.exists() - except S3ResponseError, ex: - log.error("Trouble checking existence of S3 key '%s': %s" % (rel_path, ex)) - return False - #print "Checking if '%s' exists in S3: %s" % (rel_path, exists) - if rel_path[0] == '/': - raise - return exists - - def _in_cache(self, rel_path): - """ Check if the given dataset is in the local cache and return True if so. """ - # log.debug("------ Checking cache for rel_path %s" % rel_path) - cache_path = self._get_cache_path(rel_path) - return os.path.exists(cache_path) - # TODO: Part of checking if a file is in cache should be to ensure the - # size of the cached file matches that on S3. Once the upload tool explicitly - # creates, this check sould be implemented- in the mean time, it's not - # looking likely to be implementable reliably. - # if os.path.exists(cache_path): - # # print "***1 %s exists" % cache_path - # if self._key_exists(rel_path): - # # print "***2 %s exists in S3" % rel_path - # # Make sure the size in cache is available in its entirety - # # print "File '%s' cache size: %s, S3 size: %s" % (cache_path, os.path.getsize(cache_path), self._get_size_in_s3(rel_path)) - # if os.path.getsize(cache_path) == self._get_size_in_s3(rel_path): - # # print "***2.1 %s exists in S3 and the size is the same as in cache (in_cache=True)" % rel_path - # exists = True - # else: - # # print "***2.2 %s exists but differs in size from cache (in_cache=False)" % cache_path - # exists = False - # else: - # # Although not perfect decision making, this most likely means - # # that the file is currently being uploaded - # # print "***3 %s found in cache but not in S3 (in_cache=True)" % cache_path - # exists = True - # else: - # return False - - def _pull_into_cache(self, rel_path): - # Ensure the cache directory structure exists (e.g., dataset_#_files/) - rel_path_dir = os.path.dirname(rel_path) - if not os.path.exists(self._get_cache_path(rel_path_dir)): - os.makedirs(self._get_cache_path(rel_path_dir)) - # Now pull in the file - ok = self._download(rel_path) - self._fix_permissions(self._get_cache_path(rel_path_dir)) - return ok - - def _transfer_cb(self, complete, total): - self.transfer_progress += 10 - - def _download(self, rel_path): - try: - log.debug("Pulling key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) - key = self.bucket.get_key(rel_path) - # Test if cache is large enough to hold the new file - if self.cache_size > 0 and key.size > self.cache_size: - log.critical("File %s is larger (%s) than the cache size (%s). Cannot download." \ - % (rel_path, key.size, self.cache_size)) - return False - if self.use_axel: - log.debug("Parallel pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) - ncores = multiprocessing.cpu_count() - url = key.generate_url(7200) - ret_code = subprocess.call("axel -a -n %s '%s'" % (ncores, url)) - if ret_code == 0: - return True - else: - log.debug("Pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) - self.transfer_progress = 0 # Reset transfer progress counter - key.get_contents_to_filename(self._get_cache_path(rel_path), cb=self._transfer_cb, num_cb=10) - return True - except S3ResponseError, ex: - log.error("Problem downloading key '%s' from S3 bucket '%s': %s" % (rel_path, self.bucket.name, ex)) - return False - - def _push_to_os(self, rel_path, source_file=None, from_string=None): - """ - Push the file pointed to by ``rel_path`` to the object store naming the key - ``rel_path``. If ``source_file`` is provided, push that file instead while - still using ``rel_path`` as the key name. - If ``from_string`` is provided, set contents of the file to the value of - the string. - """ - try: - source_file = source_file if source_file else self._get_cache_path(rel_path) - if os.path.exists(source_file): - key = Key(self.bucket, rel_path) - if os.path.getsize(source_file) == 0 and key.exists(): - log.debug("Wanted to push file '%s' to S3 key '%s' but its size is 0; skipping." % (source_file, rel_path)) - return True - if from_string: - key.set_contents_from_string(from_string, reduced_redundancy=self.use_rr) - log.debug("Pushed data from string '%s' to key '%s'" % (from_string, rel_path)) - else: - start_time = datetime.now() - # print "Pushing cache file '%s' of size %s bytes to key '%s'" % (source_file, os.path.getsize(source_file), rel_path) - # print "+ Push started at '%s'" % start_time - mb_size = os.path.getsize(source_file) / 1e6 - if mb_size < 60 or self.config.object_store == 'swift': - self.transfer_progress = 0 # Reset transfer progress counter - key.set_contents_from_filename(source_file, reduced_redundancy=self.use_rr, - cb=self._transfer_cb, num_cb=10) - else: - multipart_upload(self.bucket, key.name, source_file, mb_size, use_rr=self.use_rr) - end_time = datetime.now() - # print "+ Push ended at '%s'; %s bytes transfered in %ssec" % (end_time, os.path.getsize(source_file), end_time-start_time) - log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time-start_time)) - return True - else: - log.error("Tried updating key '%s' from source file '%s', but source file does not exist." - % (rel_path, source_file)) - except S3ResponseError, ex: - log.error("Trouble pushing S3 key '%s' from file '%s': %s" % (rel_path, source_file, ex)) - return False - - def file_ready(self, obj, **kwargs): - """ - A helper method that checks if a file corresponding to a dataset is - ready and available to be used. Return ``True`` if so, ``False`` otherwise. - """ - rel_path = self._construct_path(obj, **kwargs) - # Make sure the size in cache is available in its entirety - if self._in_cache(rel_path): - if os.path.getsize(self._get_cache_path(rel_path)) == self._get_size_in_s3(rel_path): - return True - log.debug("Waiting for dataset {0} to transfer from OS: {1}/{2}".format(rel_path, - os.path.getsize(self._get_cache_path(rel_path)), self._get_size_in_s3(rel_path))) - return False - - def exists(self, obj, **kwargs): - in_cache = in_s3 = False - rel_path = self._construct_path(obj, **kwargs) - # Check cache - if self._in_cache(rel_path): - in_cache = True - # Check S3 - in_s3 = self._key_exists(rel_path) - # log.debug("~~~~~~ File '%s' exists in cache: %s; in s3: %s" % (rel_path, in_cache, in_s3)) - # dir_only does not get synced so shortcut the decision - dir_only = kwargs.get('dir_only', False) - if dir_only: - if in_cache or in_s3: - return True - else: - return False - # TODO: Sync should probably not be done here. Add this to an async upload stack? - if in_cache and not in_s3: - self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path)) - return True - elif in_s3: - return True - else: - return False - - def create(self, obj, **kwargs): - if not self.exists(obj, **kwargs): - #print "S3 OS creating a dataset with ID %s" % kwargs - # Pull out locally used fields - extra_dir = kwargs.get('extra_dir', None) - extra_dir_at_root = kwargs.get('extra_dir_at_root', False) - dir_only = kwargs.get('dir_only', False) - alt_name = kwargs.get('alt_name', None) - # print "---- Processing: %s; %s" % (alt_name, locals()) - # Construct hashed path - rel_path = os.path.join(*directory_hash_id(obj.id)) - - # Optionally append extra_dir - if extra_dir is not None: - if extra_dir_at_root: - rel_path = os.path.join(extra_dir, rel_path) - else: - rel_path = os.path.join(rel_path, extra_dir) - # Create given directory in cache - cache_dir = os.path.join(self.staging_path, rel_path) - if not os.path.exists(cache_dir): - os.makedirs(cache_dir) - # Although not really necessary to create S3 folders (because S3 has - # flat namespace), do so for consistency with the regular file system - # S3 folders are marked by having trailing '/' so add it now - # s3_dir = '%s/' % rel_path - # self._push_to_os(s3_dir, from_string='') - # If instructed, create the dataset in cache & in S3 - if not dir_only: - rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id) - open(os.path.join(self.staging_path, rel_path), 'w').close() - self._push_to_os(rel_path, from_string='') - - def empty(self, obj, **kwargs): - if self.exists(obj, **kwargs): - return bool(self.size(obj, **kwargs) > 0) - else: - raise ObjectNotFound( 'objectstore.empty, object does not exist: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) - - def size(self, obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - if self._in_cache(rel_path): - try: - return os.path.getsize(self._get_cache_path(rel_path)) - except OSError, ex: - log.info("Could not get size of file '%s' in local cache, will try S3. Error: %s" % (rel_path, ex)) - elif self.exists(obj, **kwargs): - return self._get_size_in_s3(rel_path) - log.warning("Did not find dataset '%s', returning 0 for size" % rel_path) - return 0 - - def delete(self, obj, entire_dir=False, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - extra_dir = kwargs.get('extra_dir', None) - try: - # For the case of extra_files, because we don't have a reference to - # individual files/keys we need to remove the entire directory structure - # with all the files in it. This is easy for the local file system, - # but requires iterating through each individual key in S3 and deleing it. - if entire_dir and extra_dir: - shutil.rmtree(self._get_cache_path(rel_path)) - rs = self.bucket.get_all_keys(prefix=rel_path) - for key in rs: - log.debug("Deleting key %s" % key.name) - key.delete() - return True - else: - # Delete from cache first - os.unlink(self._get_cache_path(rel_path)) - # Delete from S3 as well - if self._key_exists(rel_path): - key = Key(self.bucket, rel_path) - log.debug("Deleting key %s" % key.name) - key.delete() - return True - except S3ResponseError, ex: - log.error("Could not delete key '%s' from S3: %s" % (rel_path, ex)) - except OSError, ex: - log.error('%s delete error %s' % (self._get_filename(obj, **kwargs), ex)) - return False - - def get_data(self, obj, start=0, count=-1, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - # Check cache first and get file if not there - if not self._in_cache(rel_path): - self._pull_into_cache(rel_path) - #else: - # print "(cccc) Getting '%s' from cache" % self._get_cache_path(rel_path) - # Read the file content from cache - data_file = open(self._get_cache_path(rel_path), 'r') - data_file.seek(start) - content = data_file.read(count) - data_file.close() - return content - - def get_filename(self, obj, **kwargs): - #print "S3 get_filename for dataset: %s" % dataset_id - dir_only = kwargs.get('dir_only', False) - rel_path = self._construct_path(obj, **kwargs) - cache_path = self._get_cache_path(rel_path) - # S3 does not recognize directories as files so cannot check if those exist. - # So, if checking dir only, ensure given dir exists in cache and return - # the expected cache path. - # dir_only = kwargs.get('dir_only', False) - # if dir_only: - # if not os.path.exists(cache_path): - # os.makedirs(cache_path) - # return cache_path - # Check if the file exists in the cache first - if self._in_cache(rel_path): - return cache_path - # Check if the file exists in persistent storage and, if it does, pull it into cache - elif self.exists(obj, **kwargs): - if dir_only: # Directories do not get pulled into cache - return cache_path - else: - if self._pull_into_cache(rel_path): - return cache_path - # For the case of retrieving a directory only, return the expected path - # even if it does not exist. - # if dir_only: - # return cache_path - raise ObjectNotFound( 'objectstore.get_filename, no cache_path: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) - # return cache_path # Until the upload tool does not explicitly create the dataset, return expected path - - def update_from_file(self, obj, file_name=None, create=False, **kwargs): - if create: - self.create(obj, **kwargs) - if self.exists(obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - # Chose whether to use the dataset file itself or an alternate file - if file_name: - source_file = os.path.abspath(file_name) - # Copy into cache - cache_file = self._get_cache_path(rel_path) - try: - if source_file != cache_file: - # FIXME? Should this be a `move`? - shutil.copy2(source_file, cache_file) - self._fix_permissions(cache_file) - except OSError, ex: - log.error("Trouble copying source file '%s' to cache '%s': %s" % (source_file, cache_file, ex)) - else: - source_file = self._get_cache_path(rel_path) - # Update the file on S3 - self._push_to_os(rel_path, source_file) - else: - raise ObjectNotFound( 'objectstore.update_from_file, object does not exist: %s, kwargs: %s' - %( str( obj ), str( kwargs ) ) ) - - def get_object_url(self, obj, **kwargs): - if self.exists(obj, **kwargs): - rel_path = self._construct_path(obj, **kwargs) - try: - key = Key(self.bucket, rel_path) - return key.generate_url(expires_in = 86400) # 24hrs - except S3ResponseError, ex: - log.warning("Trouble generating URL for dataset '%s': %s" % (rel_path, ex)) - return None - - def get_store_usage_percent(self): - return 0.0 - class DistributedObjectStore(ObjectStore): """ ObjectStore that defers to a list of backends, for getting objects the @@ -1011,6 +516,7 @@ return id return None + class HierarchicalObjectStore(ObjectStore): """ ObjectStore that defers to a list of backends, for getting objects the @@ -1021,6 +527,7 @@ def __init__(self, backends=[]): super(HierarchicalObjectStore, self).__init__() + def build_object_store_from_config(config, fsmon=False): """ Depending on the configuration setting, invoke the appropriate object store """ @@ -1028,6 +535,7 @@ if store == 'disk': return DiskObjectStore(config=config) elif store == 's3' or store == 'swift': + from galaxy.objectstore.s3 import S3ObjectStore return S3ObjectStore(config=config) elif store == 'distributed': return DistributedObjectStore(config=config, fsmon=fsmon) diff -r 267d294711e9f72dcfbff842ce263a1050be058f -r c732173231b89ce28bd7b8ee629b1af32c0b37e7 lib/galaxy/objectstore/s3.py --- /dev/null +++ b/lib/galaxy/objectstore/s3.py @@ -0,0 +1,535 @@ +""" +Object Store plugin for the Amazon Simple Storage Service (S3) +""" + +import os +import time +import shutil +import logging +import threading +import subprocess +from datetime import datetime + +from galaxy import util +from galaxy.jobs import Sleeper +from galaxy.model import directory_hash_id +from galaxy.objectstore import ObjectStore, convert_bytes +from galaxy.exceptions import ObjectNotFound, ObjectInvalid + +import multiprocessing +from galaxy.objectstore.s3_multipart_upload import multipart_upload +import boto +from boto.s3.key import Key +from boto.s3.connection import S3Connection +from boto.exception import S3ResponseError + +log = logging.getLogger( __name__ ) +logging.getLogger('boto').setLevel(logging.INFO) # Otherwise boto is quite noisy + + +class S3ObjectStore(ObjectStore): + """ + Object store that stores objects as items in an AWS S3 bucket. A local + cache exists that is used as an intermediate location for files between + Galaxy and S3. + """ + def __init__(self, config): + super(S3ObjectStore, self).__init__() + self.config = config + self.staging_path = self.config.file_path + self.s3_conn = get_OS_connection(self.config) + self.bucket = self._get_bucket(self.config.os_bucket_name) + self.use_rr = self.config.os_use_reduced_redundancy + self.cache_size = self.config.object_store_cache_size + self.transfer_progress = 0 + # Clean cache only if value is set in universe_wsgi.ini + if self.cache_size != -1: + # Convert GBs to bytes for comparison + self.cache_size = self.cache_size * 1073741824 + # Helper for interruptable sleep + self.sleeper = Sleeper() + self.cache_monitor_thread = threading.Thread(target=self.__cache_monitor) + self.cache_monitor_thread.start() + log.info("Cache cleaner manager started") + # Test if 'axel' is available for parallel download and pull the key into cache + try: + subprocess.call('axel') + self.use_axel = True + except OSError: + self.use_axel = False + + def __cache_monitor(self): + time.sleep(2) # Wait for things to load before starting the monitor + while self.running: + total_size = 0 + # Is this going to be too expensive of an operation to be done frequently? + file_list = [] + for dirpath, dirnames, filenames in os.walk(self.staging_path): + for f in filenames: + fp = os.path.join(dirpath, f) + file_size = os.path.getsize(fp) + total_size += file_size + # Get the time given file was last accessed + last_access_time = time.localtime(os.stat(fp)[7]) + # Compose a tuple of the access time and the file path + file_tuple = last_access_time, fp, file_size + file_list.append(file_tuple) + # Sort the file list (based on access time) + file_list.sort() + # Initiate cleaning once within 10% of the defined cache size? + cache_limit = self.cache_size * 0.9 + if total_size > cache_limit: + log.info("Initiating cache cleaning: current cache size: %s; clean until smaller than: %s" \ + % (convert_bytes(total_size), convert_bytes(cache_limit))) + # How much to delete? If simply deleting up to the cache-10% limit, + # is likely to be deleting frequently and may run the risk of hitting + # the limit - maybe delete additional #%? + # For now, delete enough to leave at least 10% of the total cache free + delete_this_much = total_size - cache_limit + self.__clean_cache(file_list, delete_this_much) + self.sleeper.sleep(30) # Test cache size every 30 seconds? + + def __clean_cache(self, file_list, delete_this_much): + """ Keep deleting files from the file_list until the size of the deleted + files is greater than the value in delete_this_much parameter. + + :type file_list: list + :param file_list: List of candidate files that can be deleted. This method + will start deleting files from the beginning of the list so the list + should be sorted accordingly. The list must contains 3-element tuples, + positioned as follows: position 0 holds file last accessed timestamp + (as time.struct_time), position 1 holds file path, and position 2 has + file size (e.g., (<access time>, /mnt/data/dataset_1.dat), 472394) + + :type delete_this_much: int + :param delete_this_much: Total size of files, in bytes, that should be deleted. + """ + # Keep deleting datasets from file_list until deleted_amount does not + # exceed delete_this_much; start deleting from the front of the file list, + # which assumes the oldest files come first on the list. + deleted_amount = 0 + for i, f in enumerate(file_list): + if deleted_amount < delete_this_much: + deleted_amount += f[2] + os.remove(f[1]) + # Debugging code for printing deleted files' stats + # folder, file_name = os.path.split(f[1]) + # file_date = time.strftime("%m/%d/%y %H:%M:%S", f[0]) + # log.debug("%s. %-25s %s, size %s (deleted %s/%s)" \ + # % (i, file_name, convert_bytes(f[2]), file_date, \ + # convert_bytes(deleted_amount), convert_bytes(delete_this_much))) + else: + log.debug("Cache cleaning done. Total space freed: %s" % convert_bytes(deleted_amount)) + return + + def _get_bucket(self, bucket_name): + """ Sometimes a handle to a bucket is not established right away so try + it a few times. Raise error is connection is not established. """ + for i in range(5): + try: + bucket = self.s3_conn.get_bucket(bucket_name) + log.debug("Using cloud object store with bucket '%s'" % bucket.name) + return bucket + except S3ResponseError: + log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i+1)) + time.sleep(2) + # All the attempts have been exhausted and connection was not established, + # raise error + raise S3ResponseError + + def _fix_permissions(self, rel_path): + """ Set permissions on rel_path""" + for basedir, dirs, files in os.walk(rel_path): + util.umask_fix_perms(basedir, self.config.umask, 0777, self.config.gid) + for f in files: + path = os.path.join(basedir, f) + # Ignore symlinks + if os.path.islink(path): + continue + util.umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) + + def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): + rel_path = os.path.join(*directory_hash_id(obj.id)) + if extra_dir is not None: + if extra_dir_at_root: + rel_path = os.path.join(extra_dir, rel_path) + else: + rel_path = os.path.join(rel_path, extra_dir) + # S3 folders are marked by having trailing '/' so add it now + rel_path = '%s/' % rel_path + if not dir_only: + rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id) + return rel_path + + def _get_cache_path(self, rel_path): + return os.path.abspath(os.path.join(self.staging_path, rel_path)) + + def _get_transfer_progress(self): + return self.transfer_progress + + def _get_size_in_s3(self, rel_path): + try: + key = self.bucket.get_key(rel_path) + if key: + return key.size + except S3ResponseError, ex: + log.error("Could not get size of key '%s' from S3: %s" % (rel_path, ex)) + except Exception, ex: + log.error("Could not get reference to the key object '%s'; returning -1 for key size: %s" % (rel_path, ex)) + return -1 + + def _key_exists(self, rel_path): + exists = False + try: + # A hackish way of testing if the rel_path is a folder vs a file + is_dir = rel_path[-1] == '/' + if is_dir: + rs = self.bucket.get_all_keys(prefix=rel_path) + if len(rs) > 0: + exists = True + else: + exists = False + else: + key = Key(self.bucket, rel_path) + exists = key.exists() + except S3ResponseError, ex: + log.error("Trouble checking existence of S3 key '%s': %s" % (rel_path, ex)) + return False + #print "Checking if '%s' exists in S3: %s" % (rel_path, exists) + if rel_path[0] == '/': + raise + return exists + + def _in_cache(self, rel_path): + """ Check if the given dataset is in the local cache and return True if so. """ + # log.debug("------ Checking cache for rel_path %s" % rel_path) + cache_path = self._get_cache_path(rel_path) + return os.path.exists(cache_path) + # TODO: Part of checking if a file is in cache should be to ensure the + # size of the cached file matches that on S3. Once the upload tool explicitly + # creates, this check sould be implemented- in the mean time, it's not + # looking likely to be implementable reliably. + # if os.path.exists(cache_path): + # # print "***1 %s exists" % cache_path + # if self._key_exists(rel_path): + # # print "***2 %s exists in S3" % rel_path + # # Make sure the size in cache is available in its entirety + # # print "File '%s' cache size: %s, S3 size: %s" % (cache_path, os.path.getsize(cache_path), self._get_size_in_s3(rel_path)) + # if os.path.getsize(cache_path) == self._get_size_in_s3(rel_path): + # # print "***2.1 %s exists in S3 and the size is the same as in cache (in_cache=True)" % rel_path + # exists = True + # else: + # # print "***2.2 %s exists but differs in size from cache (in_cache=False)" % cache_path + # exists = False + # else: + # # Although not perfect decision making, this most likely means + # # that the file is currently being uploaded + # # print "***3 %s found in cache but not in S3 (in_cache=True)" % cache_path + # exists = True + # else: + # return False + + def _pull_into_cache(self, rel_path): + # Ensure the cache directory structure exists (e.g., dataset_#_files/) + rel_path_dir = os.path.dirname(rel_path) + if not os.path.exists(self._get_cache_path(rel_path_dir)): + os.makedirs(self._get_cache_path(rel_path_dir)) + # Now pull in the file + ok = self._download(rel_path) + self._fix_permissions(self._get_cache_path(rel_path_dir)) + return ok + + def _transfer_cb(self, complete, total): + self.transfer_progress += 10 + + def _download(self, rel_path): + try: + log.debug("Pulling key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) + key = self.bucket.get_key(rel_path) + # Test if cache is large enough to hold the new file + if self.cache_size > 0 and key.size > self.cache_size: + log.critical("File %s is larger (%s) than the cache size (%s). Cannot download." \ + % (rel_path, key.size, self.cache_size)) + return False + if self.use_axel: + log.debug("Parallel pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) + ncores = multiprocessing.cpu_count() + url = key.generate_url(7200) + ret_code = subprocess.call("axel -a -n %s '%s'" % (ncores, url)) + if ret_code == 0: + return True + else: + log.debug("Pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) + self.transfer_progress = 0 # Reset transfer progress counter + key.get_contents_to_filename(self._get_cache_path(rel_path), cb=self._transfer_cb, num_cb=10) + return True + except S3ResponseError, ex: + log.error("Problem downloading key '%s' from S3 bucket '%s': %s" % (rel_path, self.bucket.name, ex)) + return False + + def _push_to_os(self, rel_path, source_file=None, from_string=None): + """ + Push the file pointed to by ``rel_path`` to the object store naming the key + ``rel_path``. If ``source_file`` is provided, push that file instead while + still using ``rel_path`` as the key name. + If ``from_string`` is provided, set contents of the file to the value of + the string. + """ + try: + source_file = source_file if source_file else self._get_cache_path(rel_path) + if os.path.exists(source_file): + key = Key(self.bucket, rel_path) + if os.path.getsize(source_file) == 0 and key.exists(): + log.debug("Wanted to push file '%s' to S3 key '%s' but its size is 0; skipping." % (source_file, rel_path)) + return True + if from_string: + key.set_contents_from_string(from_string, reduced_redundancy=self.use_rr) + log.debug("Pushed data from string '%s' to key '%s'" % (from_string, rel_path)) + else: + start_time = datetime.now() + # print "Pushing cache file '%s' of size %s bytes to key '%s'" % (source_file, os.path.getsize(source_file), rel_path) + # print "+ Push started at '%s'" % start_time + mb_size = os.path.getsize(source_file) / 1e6 + if mb_size < 60 or self.config.object_store == 'swift': + self.transfer_progress = 0 # Reset transfer progress counter + key.set_contents_from_filename(source_file, reduced_redundancy=self.use_rr, + cb=self._transfer_cb, num_cb=10) + else: + multipart_upload(self.bucket, key.name, source_file, mb_size, use_rr=self.use_rr) + end_time = datetime.now() + # print "+ Push ended at '%s'; %s bytes transfered in %ssec" % (end_time, os.path.getsize(source_file), end_time-start_time) + log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time-start_time)) + return True + else: + log.error("Tried updating key '%s' from source file '%s', but source file does not exist." + % (rel_path, source_file)) + except S3ResponseError, ex: + log.error("Trouble pushing S3 key '%s' from file '%s': %s" % (rel_path, source_file, ex)) + return False + + def file_ready(self, obj, **kwargs): + """ + A helper method that checks if a file corresponding to a dataset is + ready and available to be used. Return ``True`` if so, ``False`` otherwise. + """ + rel_path = self._construct_path(obj, **kwargs) + # Make sure the size in cache is available in its entirety + if self._in_cache(rel_path): + if os.path.getsize(self._get_cache_path(rel_path)) == self._get_size_in_s3(rel_path): + return True + log.debug("Waiting for dataset {0} to transfer from OS: {1}/{2}".format(rel_path, + os.path.getsize(self._get_cache_path(rel_path)), self._get_size_in_s3(rel_path))) + return False + + def exists(self, obj, **kwargs): + in_cache = in_s3 = False + rel_path = self._construct_path(obj, **kwargs) + # Check cache + if self._in_cache(rel_path): + in_cache = True + # Check S3 + in_s3 = self._key_exists(rel_path) + # log.debug("~~~~~~ File '%s' exists in cache: %s; in s3: %s" % (rel_path, in_cache, in_s3)) + # dir_only does not get synced so shortcut the decision + dir_only = kwargs.get('dir_only', False) + if dir_only: + if in_cache or in_s3: + return True + else: + return False + # TODO: Sync should probably not be done here. Add this to an async upload stack? + if in_cache and not in_s3: + self._push_to_os(rel_path, source_file=self._get_cache_path(rel_path)) + return True + elif in_s3: + return True + else: + return False + + def create(self, obj, **kwargs): + if not self.exists(obj, **kwargs): + #print "S3 OS creating a dataset with ID %s" % kwargs + # Pull out locally used fields + extra_dir = kwargs.get('extra_dir', None) + extra_dir_at_root = kwargs.get('extra_dir_at_root', False) + dir_only = kwargs.get('dir_only', False) + alt_name = kwargs.get('alt_name', None) + # print "---- Processing: %s; %s" % (alt_name, locals()) + # Construct hashed path + rel_path = os.path.join(*directory_hash_id(obj.id)) + + # Optionally append extra_dir + if extra_dir is not None: + if extra_dir_at_root: + rel_path = os.path.join(extra_dir, rel_path) + else: + rel_path = os.path.join(rel_path, extra_dir) + # Create given directory in cache + cache_dir = os.path.join(self.staging_path, rel_path) + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + # Although not really necessary to create S3 folders (because S3 has + # flat namespace), do so for consistency with the regular file system + # S3 folders are marked by having trailing '/' so add it now + # s3_dir = '%s/' % rel_path + # self._push_to_os(s3_dir, from_string='') + # If instructed, create the dataset in cache & in S3 + if not dir_only: + rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id) + open(os.path.join(self.staging_path, rel_path), 'w').close() + self._push_to_os(rel_path, from_string='') + + def empty(self, obj, **kwargs): + if self.exists(obj, **kwargs): + return bool(self.size(obj, **kwargs) > 0) + else: + raise ObjectNotFound( 'objectstore.empty, object does not exist: %s, kwargs: %s' + %( str( obj ), str( kwargs ) ) ) + + def size(self, obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + if self._in_cache(rel_path): + try: + return os.path.getsize(self._get_cache_path(rel_path)) + except OSError, ex: + log.info("Could not get size of file '%s' in local cache, will try S3. Error: %s" % (rel_path, ex)) + elif self.exists(obj, **kwargs): + return self._get_size_in_s3(rel_path) + log.warning("Did not find dataset '%s', returning 0 for size" % rel_path) + return 0 + + def delete(self, obj, entire_dir=False, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + extra_dir = kwargs.get('extra_dir', None) + try: + # For the case of extra_files, because we don't have a reference to + # individual files/keys we need to remove the entire directory structure + # with all the files in it. This is easy for the local file system, + # but requires iterating through each individual key in S3 and deleing it. + if entire_dir and extra_dir: + shutil.rmtree(self._get_cache_path(rel_path)) + rs = self.bucket.get_all_keys(prefix=rel_path) + for key in rs: + log.debug("Deleting key %s" % key.name) + key.delete() + return True + else: + # Delete from cache first + os.unlink(self._get_cache_path(rel_path)) + # Delete from S3 as well + if self._key_exists(rel_path): + key = Key(self.bucket, rel_path) + log.debug("Deleting key %s" % key.name) + key.delete() + return True + except S3ResponseError, ex: + log.error("Could not delete key '%s' from S3: %s" % (rel_path, ex)) + except OSError, ex: + log.error('%s delete error %s' % (self._get_filename(obj, **kwargs), ex)) + return False + + def get_data(self, obj, start=0, count=-1, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + # Check cache first and get file if not there + if not self._in_cache(rel_path): + self._pull_into_cache(rel_path) + #else: + # print "(cccc) Getting '%s' from cache" % self._get_cache_path(rel_path) + # Read the file content from cache + data_file = open(self._get_cache_path(rel_path), 'r') + data_file.seek(start) + content = data_file.read(count) + data_file.close() + return content + + def get_filename(self, obj, **kwargs): + #print "S3 get_filename for dataset: %s" % dataset_id + dir_only = kwargs.get('dir_only', False) + rel_path = self._construct_path(obj, **kwargs) + cache_path = self._get_cache_path(rel_path) + # S3 does not recognize directories as files so cannot check if those exist. + # So, if checking dir only, ensure given dir exists in cache and return + # the expected cache path. + # dir_only = kwargs.get('dir_only', False) + # if dir_only: + # if not os.path.exists(cache_path): + # os.makedirs(cache_path) + # return cache_path + # Check if the file exists in the cache first + if self._in_cache(rel_path): + return cache_path + # Check if the file exists in persistent storage and, if it does, pull it into cache + elif self.exists(obj, **kwargs): + if dir_only: # Directories do not get pulled into cache + return cache_path + else: + if self._pull_into_cache(rel_path): + return cache_path + # For the case of retrieving a directory only, return the expected path + # even if it does not exist. + # if dir_only: + # return cache_path + raise ObjectNotFound( 'objectstore.get_filename, no cache_path: %s, kwargs: %s' + %( str( obj ), str( kwargs ) ) ) + # return cache_path # Until the upload tool does not explicitly create the dataset, return expected path + + def update_from_file(self, obj, file_name=None, create=False, **kwargs): + if create: + self.create(obj, **kwargs) + if self.exists(obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + # Chose whether to use the dataset file itself or an alternate file + if file_name: + source_file = os.path.abspath(file_name) + # Copy into cache + cache_file = self._get_cache_path(rel_path) + try: + if source_file != cache_file: + # FIXME? Should this be a `move`? + shutil.copy2(source_file, cache_file) + self._fix_permissions(cache_file) + except OSError, ex: + log.error("Trouble copying source file '%s' to cache '%s': %s" % (source_file, cache_file, ex)) + else: + source_file = self._get_cache_path(rel_path) + # Update the file on S3 + self._push_to_os(rel_path, source_file) + else: + raise ObjectNotFound( 'objectstore.update_from_file, object does not exist: %s, kwargs: %s' + %( str( obj ), str( kwargs ) ) ) + + def get_object_url(self, obj, **kwargs): + if self.exists(obj, **kwargs): + rel_path = self._construct_path(obj, **kwargs) + try: + key = Key(self.bucket, rel_path) + return key.generate_url(expires_in = 86400) # 24hrs + except S3ResponseError, ex: + log.warning("Trouble generating URL for dataset '%s': %s" % (rel_path, ex)) + return None + + def get_store_usage_percent(self): + return 0.0 + + +def get_OS_connection(config): + """ + Get a connection object for a cloud Object Store specified in the config. + Currently, this is a ``boto`` connection object. + """ + log.debug("Getting a connection object for '{0}' object store".format(config.object_store)) + a_key = config.os_access_key + s_key = config.os_secret_key + if config.object_store == 's3': + return S3Connection(a_key, s_key) + else: + # Establish the connection now + calling_format = boto.s3.connection.OrdinaryCallingFormat() + s3_conn = boto.connect_s3(aws_access_key_id=a_key, + aws_secret_access_key=s_key, + is_secure=config.os_is_secure, + host=config.os_host, + port=int(config.os_port), + calling_format=calling_format, + path=config.os_conn_path) + return s3_conn Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org