commit/galaxy-central: 3 new changesets
3 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/fa9860db38e2/ Changeset: fa9860db38e2 User: dannon Date: 2014-03-18 19:55:11 Summary: Pep8, cruft cleanup -- s3_multipart_upload Affected #: 1 file diff -r 2043fb8cec50582ba5620b399bb55ad7dffbd72a -r fa9860db38e2209d91648f09e5fbf9937a2ff7f9 lib/galaxy/objectstore/s3_multipart_upload.py --- a/lib/galaxy/objectstore/s3_multipart_upload.py +++ b/lib/galaxy/objectstore/s3_multipart_upload.py @@ -5,13 +5,13 @@ Code mostly taken form CloudBioLinux. """ -import os -import glob -import subprocess import contextlib import functools +import glob +import multiprocessing +import os +import subprocess -import multiprocessing from multiprocessing.pool import IMapIterator try: @@ -32,6 +32,7 @@ return apply(f, *args, **kwargs) return wrapper + def mp_from_ids(mp_id, mp_keyname, mp_bucketname): """Get the multipart upload from the bucket and multipart IDs. @@ -45,21 +46,22 @@ mp.id = mp_id return mp + @map_wrap def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part): """Transfer a part of a multipart upload. Designed to be run in parallel. """ mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname) - #print " Transferring", i, part with open(part) as t_handle: - mp.upload_part_from_file(t_handle, i+1) + mp.upload_part_from_file(t_handle, i + 1) os.remove(part) + def multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True): """Upload large files using Amazon's multipart upload functionality. """ cores = multiprocessing.cpu_count() - #print "Initiating multipart upload using %s cores" % cores + def split_file(in_file, mb_size, split_num=5): prefix = os.path.join(os.path.dirname(in_file), "%sS3PART" % (os.path.basename(s3_key_name))) @@ -78,6 +80,7 @@ pass mp.complete_upload() + @contextlib.contextmanager def multimap(cores=None): """Provide multiprocessing imap like function. @@ -87,6 +90,7 @@ """ if cores is None: cores = max(multiprocessing.cpu_count() - 1, 1) + def wrapper(func): def wrap(self, timeout=None): return func(self, timeout=timeout if timeout is not None else 1e100) https://bitbucket.org/galaxy/galaxy-central/commits/c115742f0c63/ Changeset: c115742f0c63 User: dannon Date: 2014-03-18 20:41:06 Summary: Force multipart upload to, instead of using environment variables via connect_s3, use keys provided in the objectstore. Affected #: 1 file diff -r fa9860db38e2209d91648f09e5fbf9937a2ff7f9 -r c115742f0c6319b823c277c80cfece08150bea45 lib/galaxy/objectstore/s3_multipart_upload.py --- a/lib/galaxy/objectstore/s3_multipart_upload.py +++ b/lib/galaxy/objectstore/s3_multipart_upload.py @@ -22,6 +22,7 @@ try: import boto + from boto.s3.connection import S3Connection except ImportError: boto = None @@ -33,13 +34,13 @@ return wrapper -def mp_from_ids(mp_id, mp_keyname, mp_bucketname): +def mp_from_ids(mp_id, mp_keyname, mp_bucketname, aws_access_key_id, aws_secret_access_key): """Get the multipart upload from the bucket and multipart IDs. This allows us to reconstitute a connection to the upload from within multiprocessing functions. """ - conn = boto.connect_s3() + conn = S3Connection(aws_access_key_id, aws_secret_access_key) bucket = conn.lookup(mp_bucketname) mp = boto.s3.multipart.MultiPartUpload(bucket) mp.key_name = mp_keyname @@ -48,16 +49,16 @@ @map_wrap -def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part): +def transfer_part(mp_id, mp_keyname, mp_bucketname, i, part, aws_access_key_id, aws_secret_access_key): """Transfer a part of a multipart upload. Designed to be run in parallel. """ - mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname) + mp = mp_from_ids(mp_id, mp_keyname, mp_bucketname, aws_access_key_id, aws_secret_access_key) with open(part) as t_handle: mp.upload_part_from_file(t_handle, i + 1) os.remove(part) -def multipart_upload(bucket, s3_key_name, tarball, mb_size, use_rr=True): +def multipart_upload(bucket, s3_key_name, tarball, mb_size, aws_access_key_id, aws_secret_access_key, use_rr=True): """Upload large files using Amazon's multipart upload functionality. """ cores = multiprocessing.cpu_count() @@ -74,7 +75,7 @@ mp = bucket.initiate_multipart_upload(s3_key_name, reduced_redundancy=use_rr) with multimap(cores) as pmap: - for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part) + for _ in pmap(transfer_part, ((mp.id, mp.key_name, mp.bucket_name, i, part, aws_access_key_id, aws_secret_access_key) for (i, part) in enumerate(split_file(tarball, mb_size, cores)))): pass https://bitbucket.org/galaxy/galaxy-central/commits/c2ee53fa3589/ Changeset: c2ee53fa3589 User: dannon Date: 2014-03-18 20:42:08 Summary: S3ObjectStore import cleanup, fix (forgotten) new multipart upload params. Affected #: 1 file diff -r c115742f0c6319b823c277c80cfece08150bea45 -r c2ee53fa3589fa2852c056ac65c4656e1d88632a lib/galaxy/objectstore/s3.py --- a/lib/galaxy/objectstore/s3.py +++ b/lib/galaxy/objectstore/s3.py @@ -2,22 +2,23 @@ Object Store plugin for the Amazon Simple Storage Service (S3) """ +import logging +import multiprocessing import os +import shutil +import subprocess +import threading import time -import shutil -import logging -import threading -import subprocess + from datetime import datetime +from galaxy.exceptions import ObjectNotFound from galaxy.util import umask_fix_perms +from galaxy.util.directory_hash import directory_hash_id from galaxy.util.sleeper import Sleeper -from galaxy.util.directory_hash import directory_hash_id +from .s3_multipart_upload import multipart_upload from ..objectstore import ObjectStore, convert_bytes -from galaxy.exceptions import ObjectNotFound -import multiprocessing -from .s3_multipart_upload import multipart_upload try: import boto from boto.s3.key import Key @@ -326,7 +327,7 @@ cb=self._transfer_cb, num_cb=10) else: - multipart_upload(self.bucket, key.name, source_file, mb_size, use_rr=self.use_rr) + multipart_upload(self.bucket, key.name, source_file, mb_size, self.access_key, self.secret_key, use_rr=self.use_rr) end_time = datetime.now() log.debug("Pushed cache file '%s' to key '%s' (%s bytes transfered in %s sec)" % (source_file, rel_path, os.path.getsize(source_file), end_time - start_time)) return True Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org