1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/changeset/8d6138fa2251/ changeset: 8d6138fa2251 user: dannon date: 2012-06-19 22:42:25 summary: S3 object store now functional. Whitespace cleanup. affected #: 1 file diff -r 958d1f8b2caf9595d79e66d1eb1705110a1a6784 -r 8d6138fa2251566b5d17d64e9f56f129d9dc624a lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -40,20 +40,20 @@ def __init__(self): self.running = True self.extra_dirs = {} - + def shutdown(self): self.running = False - + def exists(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ Returns True if the object identified by `obj` exists in this file store, False otherwise. - + FIELD DESCRIPTIONS (these apply to all the methods in this class): :type obj: object :param obj: A Galaxy object with an assigned database ID accessible via the .id attribute. - + :type base_dir: string :param base_dir: A key in self.extra_dirs corresponding to the base directory in which this object should be created, or @@ -64,19 +64,19 @@ identified by `obj` should be located, not the dataset itself. This option applies to `extra_dir` argument as well. - + :type extra_dir: string :param extra_dir: Append `extra_dir` to the directory structure where the dataset identified by `obj` should be located. (e.g., 000/extra_dir/obj.id) - + :type extra_dir_at_root: bool :param extra_dir_at_root: Applicable only if `extra_dir` is set. If True, the `extra_dir` argument is placed at root of the created directory structure rather than at the end (e.g., extra_dir/000/obj.id vs. 000/extra_dir/obj.id) - + :type alt_name: string :param alt_name: Use this name as the alternative name for the created dataset rather than the default. @@ -84,10 +84,10 @@ raise NotImplementedError() def file_ready(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): - """ A helper method that checks if a file corresponding to a dataset + """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return True if so, False otherwise.""" return True - + def create(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ Mark the object identified by `obj` as existing in the store, but with @@ -104,7 +104,7 @@ See `exists` method for the description of the fields. """ raise NotImplementedError() - + def size(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ Return size of the object identified by `obj`. @@ -112,13 +112,13 @@ See `exists` method for the description of the fields. """ raise NotImplementedError() - + def delete(self, obj, entire_dir=False, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ Deletes the object identified by `obj`. See `exists` method for the description of other fields. :type entire_dir: bool - :param entire_dir: If True, delete the entire directory pointed to by + :param entire_dir: If True, delete the entire directory pointed to by extra_dir. For safety reasons, this option applies only for and in conjunction with the extra_dir option. """ @@ -130,15 +130,15 @@ object identified uniquely by `obj`. If the object does not exist raises `ObjectNotFound`. See `exists` method for the description of other fields. - + :type start: int :param start: Set the position to start reading the dataset file - + :type count: int :param count: Read at most `count` bytes from the dataset """ raise NotImplementedError() - + def get_filename(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ Get the expected filename (including the absolute path) which can be used @@ -146,7 +146,7 @@ See `exists` method for the description of the fields. """ raise NotImplementedError() - + def update_from_file(self, obj, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, file_name=None, create=False): """ Inform the store that the file associated with the object has been @@ -154,16 +154,16 @@ of the default. If the object does not exist raises `ObjectNotFound`. See `exists` method for the description of other fields. - + :type file_name: string - :param file_name: Use file pointed to by `file_name` as the source for + :param file_name: Use file pointed to by `file_name` as the source for updating the dataset identified by `obj` - + :type create: bool :param create: If True and the default dataset does not exist, create it first. """ raise NotImplementedError() - + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): """ If the store supports direct URL access, return a URL. Otherwise return @@ -178,7 +178,7 @@ Return the percentage indicating how full the store is """ raise NotImplementedError() - + ## def get_staging_command( id ): ## """ ## Return a shell command that can be prepended to the job script to stage the @@ -212,21 +212,21 @@ self.extra_dirs['temp'] = config.new_file_path if extra_dirs is not None: self.extra_dirs.update( extra_dirs ) - + def _get_filename(self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): """Class method that returns the absolute path for the file corresponding - to the `obj`.id regardless of whether the file exists. + to the `obj`.id regardless of whether the file exists. """ path = self._construct_path(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name, old_style=True) - # For backward compatibility, check the old style root path first; otherwise, + # For backward compatibility, check the old style root path first; otherwise, # construct hashed path if not os.path.exists(path): return self._construct_path(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) - + def _construct_path(self, obj, old_style=False, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): """ Construct the expected absolute path for accessing the object identified by `obj`.id. - + :type base_dir: string :param base_dir: A key in self.extra_dirs corresponding to the base directory in which this object should be created, or @@ -237,16 +237,16 @@ identified by `obj` should be located, not the dataset itself. This option applies to `extra_dir` argument as well. - + :type extra_dir: string :param extra_dir: Append the value of this parameter to the expected path used to access the object identified by `obj` (e.g., /files/000/<extra_dir>/dataset_10.dat). - + :type alt_name: string :param alt_name: Use this name as the alternative name for the returned dataset rather than the default. - + :type old_style: bool param old_style: This option is used for backward compatibility. If True the composed directory structure does not include a hash id @@ -274,7 +274,7 @@ def exists(self, obj, **kwargs): path = self._construct_path(obj, old_style=True, **kwargs) - # For backward compatibility, check root path first; otherwise, construct + # For backward compatibility, check root path first; otherwise, construct # and check hashed path if os.path.exists(path): return True @@ -292,12 +292,12 @@ os.makedirs(dir) # Create the file if it does not exist if not dir_only: - open(path, 'w').close() + open(path, 'w').close() util.umask_fix_perms(path, self.config.umask, 0666) def empty(self, obj, **kwargs): return os.path.getsize(self.get_filename(obj, **kwargs)) > 0 - + def size(self, obj, **kwargs): if self.exists(obj, **kwargs): try: @@ -306,7 +306,7 @@ return 0 else: return 0 - + def delete(self, obj, entire_dir=False, **kwargs): path = self.get_filename(obj, **kwargs) extra_dir = kwargs.get('extra_dir', None) @@ -327,16 +327,16 @@ content = data_file.read(count) data_file.close() return content - + def get_filename(self, obj, **kwargs): path = self._construct_path(obj, old_style=True, **kwargs) - # For backward compatibility, check root path first; otherwise, construct + # For backward compatibility, check root path first; otherwise, construct # and return hashed path if os.path.exists(path): return path else: return self._construct_path(obj, **kwargs) - + def update_from_file(self, obj, file_name=None, create=False, **kwargs): """ `create` parameter is not used in this implementation """ if create: @@ -345,12 +345,12 @@ try: shutil.copy(file_name, self.get_filename(obj, **kwargs)) except IOError, ex: - log.critical('Error copying %s to %s: %s' % (file_name, + log.critical('Error copying %s to %s: %s' % (file_name, self._get_filename(obj, **kwargs), ex)) - + def get_object_url(self, obj, **kwargs): return None - + def get_store_usage_percent(self): st = os.statvfs(self.file_path) return (float(st.f_blocks - st.f_bavail)/st.f_blocks) * 100 @@ -361,10 +361,9 @@ Object store that uses a directory for caching files, but defers and writes back to another object store. """ - + def __init__(self, path, backend): super(CachingObjectStore, self).__init__(self, path, backend) - class S3ObjectStore(ObjectStore): @@ -381,16 +380,24 @@ self.s3_conn = S3Connection() self.bucket = self._get_bucket(self.config.s3_bucket) self.use_rr = self.config.use_reduced_redundancy - self.cache_size = self.config.object_store_cache_size * 1073741824 # Convert GBs to bytes + self.cache_size = self.config.object_store_cache_size self.transfer_progress = 0 # Clean cache only if value is set in universe_wsgi.ini if self.cache_size != -1: + # Convert GBs to bytes for comparison + self.cache_size = self.cache_size * 1073741824 # Helper for interruptable sleep self.sleeper = Sleeper() self.cache_monitor_thread = threading.Thread(target=self.__cache_monitor) self.cache_monitor_thread.start() log.info("Cache cleaner manager started") - + # Test if 'axel' is available for parallel download and pull the key into cache + try: + subprocess.call('axel') + self.use_axel = True + except OSError: + self.use_axel = False + def __cache_monitor(self): time.sleep(2) # Wait for things to load before starting the monitor while self.running: @@ -421,19 +428,19 @@ delete_this_much = total_size - cache_limit self.__clean_cache(file_list, delete_this_much) self.sleeper.sleep(30) # Test cache size every 30 seconds? - + def __clean_cache(self, file_list, delete_this_much): """ Keep deleting files from the file_list until the size of the deleted files is greater than the value in delete_this_much parameter. - + :type file_list: list :param file_list: List of candidate files that can be deleted. This method will start deleting files from the beginning of the list so the list should be sorted accordingly. The list must contains 3-element tuples, positioned as follows: position 0 holds file last accessed timestamp - (as time.struct_time), position 1 holds file path, and position 2 has + (as time.struct_time), position 1 holds file path, and position 2 has file size (e.g., (<access time>, /mnt/data/dataset_1.dat), 472394) - + :type delete_this_much: int :param delete_this_much: Total size of files, in bytes, that should be deleted. """ @@ -454,7 +461,7 @@ else: log.debug("Cache cleaning done. Total space freed: %s" % convert_bytes(deleted_amount)) return - + def _get_bucket(self, bucket_name): """ Sometimes a handle to a bucket is not established right away so try it a few times. Raise error is connection is not established. """ @@ -463,13 +470,13 @@ bucket = self.s3_conn.get_bucket(bucket_name) log.debug("Using S3 object store; got bucket '%s'" % bucket.name) return bucket - except S3ResponseError: + except S3ResponseError: log.debug("Could not get bucket '%s', attempt %s/5" % (bucket_name, i+1)) time.sleep(2) # All the attempts have been exhausted and connection was not established, # raise error raise S3ResponseError - + def _fix_permissions(self, rel_path): """ Set permissions on rel_path""" for basedir, dirs, files in os.walk(rel_path): @@ -478,10 +485,10 @@ path = os.path.join(basedir, f) # Ignore symlinks if os.path.islink(path): - continue + continue util.umask_fix_perms( path, self.config.umask, 0666, self.config.gid ) - - def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): + + def _construct_path(self, obj, dir_only=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, **kwargs): rel_path = os.path.join(*directory_hash_id(obj.id)) if extra_dir is not None: if extra_dir_at_root: @@ -496,10 +503,10 @@ def _get_cache_path(self, rel_path): return os.path.abspath(os.path.join(self.staging_path, rel_path)) - + def _get_transfer_progress(self): return self.transfer_progress - + def _get_size_in_s3(self, rel_path): try: key = self.bucket.get_key(rel_path) @@ -510,7 +517,7 @@ except Exception, ex: log.error("Could not get reference to the key object '%s'; returning -1 for key size: %s" % (rel_path, ex)) return -1 - + def _key_exists(self, rel_path): exists = False try: @@ -532,15 +539,13 @@ if rel_path[0] == '/': raise return exists - + def _in_cache(self, rel_path): """ Check if the given dataset is in the local cache and return True if so. """ # log.debug("------ Checking cache for rel_path %s" % rel_path) cache_path = self._get_cache_path(rel_path) - exists = os.path.exists(cache_path) - # print "Checking chache for %s; returning %s" % (cache_path, exists) - return exists - # EATODO: Part of checking if a file is in cache should be to ensure the + return os.path.exists(cache_path) + # TODO: Part of checking if a file is in cache should be to ensure the # size of the cached file matches that on S3. Once the upload tool explicitly # creates, this check sould be implemented- in the mean time, it's not # looking likely to be implementable reliably. @@ -562,11 +567,7 @@ # # print "***3 %s found in cache but not in S3 (in_cache=True)" % cache_path # exists = True # else: - # # print "***4 %s does not exist (in_cache=False)" % cache_path - # exists = False - # # print "Checking cache for %s; returning %s" % (cache_path, exists) - # return exists - # # return False + # return False def _pull_into_cache(self, rel_path): # Ensure the cache directory structure exists (e.g., dataset_#_files/) @@ -577,45 +578,39 @@ ok = self._download(rel_path) self._fix_permissions(self._get_cache_path(rel_path_dir)) return ok - + def _transfer_cb(self, complete, total): self.transfer_progress += 10 - # print "Dataset transfer progress: %s" % self.transfer_progress - + def _download(self, rel_path): try: log.debug("Pulling key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) key = self.bucket.get_key(rel_path) - # Test is cache is large enough to hold the new file - if key.size > self.cache_size: + # Test if cache is large enough to hold the new file + if self.cache_size > 0 and key.size > self.cache_size: log.critical("File %s is larger (%s) than the cache size (%s). Cannot download." \ % (rel_path, key.size, self.cache_size)) return False - # Test if 'axel' is available for parallel download and pull the key into cache - try: - ret_code = subprocess.call('axel') - except OSError: - ret_code = 127 - if ret_code == 127: - self.transfer_progress = 0 # Reset transfer progress counter - key.get_contents_to_filename(self._get_cache_path(rel_path), cb=self._transfer_cb, num_cb=10) - #print "(ssss1) Pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path)) - return True - else: + if self.use_axel: + log.debug("Parallel pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) ncores = multiprocessing.cpu_count() url = key.generate_url(7200) ret_code = subprocess.call("axel -a -n %s '%s'" % (ncores, url)) if ret_code == 0: - #print "(ssss2) Parallel pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path)) return True + else: + log.debug("Pulled key '%s' into cache to %s" % (rel_path, self._get_cache_path(rel_path))) + self.transfer_progress = 0 # Reset transfer progress counter + key.get_contents_to_filename(self._get_cache_path(rel_path), cb=self._transfer_cb, num_cb=10) + return True except S3ResponseError, ex: log.error("Problem downloading key '%s' from S3 bucket '%s': %s" % (rel_path, self.bucket.name, ex)) return False - + def _push_to_s3(self, rel_path, source_file=None, from_string=None): - """ - Push the file pointed to by `rel_path` to S3 naming the key `rel_path`. - If `source_file` is provided, push that file instead while still using + """ + Push the file pointed to by `rel_path` to S3 naming the key `rel_path`. + If `source_file` is provided, push that file instead while still using `rel_path` as the key name. If `from_string` is provided, set contents of the file to the value of the string @@ -651,16 +646,16 @@ except S3ResponseError, ex: log.error("Trouble pushing S3 key '%s' from file '%s': %s" % (rel_path, source_file, ex)) return False - + def file_ready(self, obj, **kwargs): - """ A helper method that checks if a file corresponding to a dataset + """ A helper method that checks if a file corresponding to a dataset is ready and available to be used. Return True if so, False otherwise.""" rel_path = self._construct_path(obj, **kwargs) # Make sure the size in cache is available in its entirety if self._in_cache(rel_path) and os.path.getsize(self._get_cache_path(rel_path)) == self._get_size_in_s3(rel_path): return True return False - + def exists(self, obj, **kwargs): in_cache = in_s3 = False rel_path = self._construct_path(obj, **kwargs) @@ -685,10 +680,10 @@ return True else: return False - + def create(self, obj, **kwargs): if not self.exists(obj, **kwargs): - #print "S3 OS creating a dataset with ID %s" % dataset_id + #print "S3 OS creating a dataset with ID %s" % kwargs # Pull out locally used fields extra_dir = kwargs.get('extra_dir', None) extra_dir_at_root = kwargs.get('extra_dir_at_root', False) @@ -696,7 +691,8 @@ alt_name = kwargs.get('alt_name', None) # print "---- Processing: %s; %s" % (alt_name, locals()) # Construct hashed path - rel_path = os.path.join(*directory_hash_id(obj)) + rel_path = os.path.join(*directory_hash_id(obj.id)) + # Optionally append extra_dir if extra_dir is not None: if extra_dir_at_root: @@ -717,13 +713,13 @@ rel_path = os.path.join(rel_path, alt_name if alt_name else "dataset_%s.dat" % obj.id) open(os.path.join(self.staging_path, rel_path), 'w').close() self._push_to_s3(rel_path, from_string='') - + def empty(self, obj, **kwargs): if self.exists(obj, **kwargs): return bool(self.size(obj, **kwargs) > 0) else: raise ObjectNotFound() - + def size(self, obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) if self._in_cache(rel_path): @@ -735,7 +731,7 @@ return self._get_size_in_s3(rel_path) log.warning("Did not find dataset '%s', returning 0 for size" % rel_path) return 0 - + def delete(self, obj, entire_dir=False, **kwargs): rel_path = self._construct_path(obj, **kwargs) extra_dir = kwargs.get('extra_dir', None) @@ -765,7 +761,7 @@ except OSError, ex: log.error('%s delete error %s' % (self._get_filename(obj, **kwargs), ex)) return False - + def get_data(self, obj, start=0, count=-1, **kwargs): rel_path = self._construct_path(obj, **kwargs) # Check cache first and get file if not there @@ -779,7 +775,7 @@ content = data_file.read(count) data_file.close() return content - + def get_filename(self, obj, **kwargs): #print "S3 get_filename for dataset: %s" % dataset_id dir_only = kwargs.get('dir_only', False) @@ -809,8 +805,8 @@ # return cache_path raise ObjectNotFound() # return cache_path # Until the upload tool does not explicitly create the dataset, return expected path - - def update_from_file(self, obj, file_name=None, create=False, **kwargs): + + def update_from_file(self, obj, file_name=None, create=False, **kwargs): if create: self.create(obj, **kwargs) if self.exists(obj, **kwargs): @@ -833,7 +829,7 @@ self._push_to_s3(rel_path, source_file) else: raise ObjectNotFound() - + def get_object_url(self, obj, **kwargs): if self.exists(obj, **kwargs): rel_path = self._construct_path(obj, **kwargs) @@ -854,7 +850,7 @@ first store where the object exists is used, objects are created in a store selected randomly, but with weighting. """ - + def __init__(self, config): super(DistributedObjectStore, self).__init__() self.distributed_config = config.distributed_object_store_config_file @@ -1003,7 +999,7 @@ first store where the object exists is used, objects are always created in the first store. """ - + def __init__(self, backends=[]): super(HierarchicalObjectStore, self).__init__() @@ -1027,7 +1023,7 @@ if bytes is None: bytes = 0 bytes = float(bytes) - + if bytes >= 1099511627776: terabytes = bytes / 1099511627776 size = '%.2fTB' % terabytes Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.