1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/40ab1d00a1b6/ Changeset: 40ab1d00a1b6 User: dannon Date: 2015-02-10 14:49:01+00:00 Summary: Merged in charles_hsu/galaxy-central/fix-multipart-upload-in-swift-backend (pull request #648) Fixed multipart-upload issue while use swift as object storage backend Affected #: 3 files diff -r c3eefbdaaa1ab242a1c81b65482ef2fbe943a390 -r 40ab1d00a1b6ef6749998d70cdd5bf7d8e19befa config/object_store_conf.xml.sample --- a/config/object_store_conf.xml.sample +++ b/config/object_store_conf.xml.sample @@ -24,8 +24,8 @@ <object_store type="s3"><auth access_key="...." secret_key="....." /> - <bucket name="unique_bucket_name" use_reduced_redundancy="False" /> - <connection host="" port="" is_secure="" conn_path="" /> + <bucket name="unique_bucket_name" use_reduced_redundancy="False" max_chunk_size="250"/> + <connection host="" port="" is_secure="" conn_path="" multipart="True"/><cache path="database/files/" size="100" /></object_store> diff -r c3eefbdaaa1ab242a1c81b65482ef2fbe943a390 -r 40ab1d00a1b6ef6749998d70cdd5bf7d8e19befa lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -14,6 +14,7 @@ from galaxy.exceptions import ObjectNotFound from galaxy.util import umask_fix_perms +from galaxy.util import string_as_bool from galaxy.util.directory_hash import directory_hash_id from galaxy.util.sleeper import Sleeper from .s3_multipart_upload import multipart_upload @@ -76,7 +77,8 @@ self.secret_key = a_xml.get('secret_key') b_xml = config_xml.findall('bucket')[0] self.bucket = b_xml.get('name') - self.use_rr = b_xml.get('use_reduced_redundancy', False) + self.use_rr= string_as_bool(b_xml.get('use_reduced_redundancy', 'False')) + self.max_chunk_size = int(b_xml.get('max_chunk_size', 250)) cn_xml = config_xml.findall('connection') if not cn_xml: cn_xml = {} @@ -84,11 +86,24 @@ cn_xml = cn_xml[0] self.host = cn_xml.get('host', None) self.port = int(cn_xml.get('port', 6000)) - self.is_secure = cn_xml.get('is_secure', True) + self.multipart = string_as_bool(cn_xml.get('multipart', 'True')) + self.is_secure = string_as_bool(cn_xml.get('is_secure', 'True')) self.conn_path = cn_xml.get('conn_path', '/') c_xml = config_xml.findall('cache')[0] self.cache_size = float(c_xml.get('size', -1)) self.cache_path = c_xml.get('path') + + # for multipart upload + self.s3server = {'access_key': self.access_key, + 'secret_key': self.secret_key, + 'is_secure': self.is_secure, + 'max_chunk_size': self.max_chunk_size, + 'host': self.host, + 'port': self.port, + 'use_rr': self.use_rr, + 'conn_path': self.conn_path} + + except Exception: # Toss it back up after logging, we can't continue loading at this point. log.exception("Malformed ObjectStore Configuration XML -- unable to continue") @@ -328,14 +343,14 @@ start_time = datetime.now() log.debug("Pushing cache file '%s' of size %s bytes to key '%s'" % (source_file, os.path.getsize(source_file), rel_path)) mb_size = os.path.getsize(source_file) / 1e6 - if mb_size < 10 or type(self) == SwiftObjectStore: + if mb_size < 10 or (not self.multipart): self.transfer_progress = 0 # Reset transfer progress counter key.set_contents_from_filename(source_file, reduced_redundancy=self.use_rr, cb=self._transfer_cb, num_cb=10) else: - multipart_upload(self.bucket, key.name, source_file, mb_size, self.access_key, self.secret_key, use_rr=self.use_rr) + multipart_upload(self.s3server, self.bucket, key.name, source_file, mb_size) end_time = datetime.now() log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time - start_time)) return True diff -r c3eefbdaaa1ab242a1c81b65482ef2fbe943a390 -r 40ab1d00a1b6ef6749998d70cdd5bf7d8e19befa lib/galaxy/objectstore/s3_multipart_upload.py --- a/lib/galaxy/objectstore/s3_multipart_upload.py +++ b/lib/galaxy/objectstore/s3_multipart_upload.py @@ -34,13 +34,23 @@ return wrapper -def mp_from_ids(mp_id, mp_keyname, mp_bucketname, aws_access_key_id, aws_secret_access_key): +def mp_from_ids(s3server, mp_id, mp_keyname, mp_bucketname): """Get the multipart upload from the bucket and multipart IDs. This allows us to reconstitute a connection to the upload from within multiprocessing functions. """ - conn = S3Connection(aws_access_key_id, aws_secret_access_key) + if s3server['host']: + conn = boto.connect_s3(aws_access_key_id=s3server['access_key'], + aws_secret_access_key=s3server['secret_key'], + is_secure=s3server['is_secure'], + host=s3server['host'], + port=s3server['port'], + calling_format=boto.s3.connection.OrdinaryCallingFormat(), + path=s3server['conn_path']) + else: + conn = S3Connection(s3server['access_key'], s3server['secret_key']) + bucket = conn.lookup(mp_bucketname) mp = boto.s3.multipart.MultiPartUpload(bucket) mp.key_name = mp_keyname @@ -49,33 +59,36 @@ @map_wrap -def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part, aws_access_key_id, aws_secret_access_key): +def transfer_part(s3server, mp_id, mp_keyname, mp_bucketname, i, part): """Transfer a part of a multipart upload. Designed to be run in parallel. """ - mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname, aws_access_key_id, aws_secret_access_key) + mp = mp_from_ids(s3server, mp_id, mp_keyname, mp_bucketname) with open(part) as t_handle: mp.upload_part_from_file(t_handle, i + 1) os.remove(part) -def multipart_upload(bucket, s3_key_name, tarball, mb_size, aws_access_key_id, aws_secret_access_key, use_rr=True): +def multipart_upload(s3server, bucket, s3_key_name, tarball, mb_size): """Upload large files using Amazon's multipart upload functionality. """ cores = multiprocessing.cpu_count() def split_file(in_file, mb_size, split_num=5): prefix = os.path.join(os.path.dirname(in_file), - "%sS3PART" % (os.path.basename(s3_key_name))) - # Split chunks so they are 5MB < chunk < 250MB - split_size = int(max(min(mb_size / (split_num * 2.0), 250), 5)) + "%sS3PART" % (os.path.basename(s3_key_name))) + max_chunk = s3server['max_chunk_size'] + # Split chunks so they are 5MB < chunk < 250MB(max_chunk_size) + split_size = int(max(min(mb_size / (split_num * 2.0), max_chunk), 5)) if not os.path.exists("%saa" % prefix): cl = ["split", "-b%sm" % split_size, in_file, prefix] subprocess.check_call(cl) return sorted(glob.glob("%s*" % prefix)) - mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr) + mp = bucket.initiate_multipart_upload(s3_key_name, + reduced_redundancy=s3server['use_rr']) + with multimap(cores) as pmap: - for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part, aws_access_key_id, aws_secret_access_key) + for _ in pmap(transfer_part, ((s3server, mp.id, mp.key_name, mp.bucket_name, i, part) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))): pass Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.