commit/galaxy-central: 2 new changesets
2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/dd4ecbbbbab3/ changeset: dd4ecbbbbab3 user: jmchilton date: 2013-02-07 21:18:33 summary: Update lwr_client through https://bitbucket.org/jmchilton/lwr/commits/3034b5cb789a6c96b005b838542c6e7c.... UChicago reported some issues with the use of mmap in the LWR client for large files. To get around this, I have implemented an optional alternative transport layer for the LWR client that is backed by pycurl instead of urllib2. This can be enabled by setting the environment variable LWR_CURL_TRANSPORT=1 for the Galaxy process. If LWR_CURL_TRANSPORT is set, the python pycurl package must be installed. affected #: 4 files diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r dd4ecbbbbab33020fb5ff482fc302f092824e514 lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -5,15 +5,15 @@ This module contains logic for interfacing with an external LWR server. """ -import mmap import os import re import time import urllib -import urllib2 import simplejson +from transport import get_transport + class JobInputs(object): """ @@ -254,6 +254,18 @@ return self.job_inputs.rewritten_command_line +class parseJson(object): + + def __init__(self): + pass + + def __call__(self, func): + def replacement(*args, **kwargs): + response = func(*args, **kwargs) + return simplejson.loads(response) + return replacement + + class Client(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -283,9 +295,7 @@ self.remote_host = remote_host self.job_id = job_id self.private_key = private_key - - def _url_open(self, request, data): - return urllib2.urlopen(request, data) + self.transport = get_transport() def __build_url(self, command, args): if self.private_key: @@ -294,29 +304,20 @@ url = self.remote_host + command + "?" + data return url - def __raw_execute(self, command, args={}, data=None): + def __raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): url = self.__build_url(command, args) - request = urllib2.Request(url=url, data=data) - response = self._url_open(request, data) + response = self.transport.execute(url, data=data, input_path=input_path, output_path=output_path) return response - def __raw_execute_and_parse(self, command, args={}, data=None): - response = self.__raw_execute(command, args, data) - return simplejson.loads(response.read()) - + @parseJson() def __upload_file(self, action, path, name=None, contents=None): - input = open(path, 'rb') - try: - mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) - return self.__upload_contents(action, path, mmapped_input, name) - finally: - input.close() - - def __upload_contents(self, action, path, contents, name=None): if not name: name = os.path.basename(path) args = {"job_id": self.job_id, "name": name} - return self.__raw_execute_and_parse(action, args, contents) + input_path = path + if contents: + input_path = None + return self.__raw_execute(action, args, contents, input_path) def upload_tool_file(self, path): """ @@ -364,7 +365,7 @@ contents : str Rewritten contents of the config file to upload. """ - return self.__upload_contents("upload_config_file", path, contents) + return self.__upload_file("upload_config_file", path, contents=contents) def upload_working_directory_file(self, path): """ @@ -378,9 +379,10 @@ """ return self.__upload_file("upload_working_directory_file", path) + @parseJson() def _get_output_type(self, name): - return self.__raw_execute_and_parse("get_output_type", {"name": name, - "job_id": self.job_id}) + return self.__raw_execute("get_output_type", {"name": name, + "job_id": self.job_id}) def download_work_dir_output(self, source, working_directory, output_path): """ @@ -414,25 +416,19 @@ name = os.path.basename(path) output_type = self._get_output_type(name) if output_type == "direct": - output = open(path, "wb") + output_path = path elif output_type == "task": - output = open(os.path.join(working_directory, name), "wb") + output_path = os.path.join(working_directory, name) else: raise Exception("No remote output found for dataset with path %s" % path) - self.__raw_download_output(name, self.job_id, output_type, output) + self.__raw_download_output(name, self.job_id, output_type, output_path) - def __raw_download_output(self, name, job_id, output_type, output_file): - response = self.__raw_execute("download_output", {"name": name, - "job_id": self.job_id, - "output_type": output_type}) - try: - while True: - buffer = response.read(1024) - if buffer == "": - break - output_file.write(buffer) - finally: - output_file.close() + def __raw_download_output(self, name, job_id, output_type, output_path): + self.__raw_execute("download_output", + {"name": name, + "job_id": self.job_id, + "output_type": output_type}, + output_path=output_path) def launch(self, command_line): """ @@ -463,11 +459,12 @@ return complete_response time.sleep(1) + @parseJson() def raw_check_complete(self): """ Get check_complete response from the remote server. """ - check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id": self.job_id}) + check_complete_response = self.__raw_execute("check_complete", {"job_id": self.job_id}) return check_complete_response def check_complete(self): @@ -482,11 +479,12 @@ """ self.__raw_execute("clean", {"job_id": self.job_id}) + @parseJson() def setup(self): """ Setup remote LWR server to run this job. """ - return self.__raw_execute_and_parse("setup", {"job_id": self.job_id}) + return self.__raw_execute("setup", {"job_id": self.job_id}) def _read(path): diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r dd4ecbbbbab33020fb5ff482fc302f092824e514 lib/galaxy/jobs/runners/lwr_client/transport/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/transport/__init__.py @@ -0,0 +1,16 @@ +from standard import Urllib2Transport +from curl import PycurlTransport +import os + + +def get_transport(os_module=os): + use_curl = os_module.getenv('LWR_CURL_TRANSPORT', "0") + ## If LWR_CURL_TRANSPORT is unset or set to 0, use default, + ## else use curl. + if use_curl.isdigit() and not int(use_curl): + return Urllib2Transport() + else: + return PycurlTransport() + + +__all__ = [get_transport] diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r dd4ecbbbbab33020fb5ff482fc302f092824e514 lib/galaxy/jobs/runners/lwr_client/transport/curl.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/transport/curl.py @@ -0,0 +1,42 @@ +from cStringIO import StringIO +try: + from pycurl import Curl +except: + pass +from os.path import getsize + + +PYCURL_UNAVAILABLE_MESSAGE = \ + "You are attempting to use the Pycurl version of the LWR client by pycurl is unavailable." + + +class PycurlTransport(object): + + def execute(self, url, data=None, input_path=None, output_path=None): + buf = self._open_output(output_path) + try: + c = self._new_curl_object() + c.setopt(c.URL, url.encode('ascii')) + c.setopt(c.WRITEFUNCTION, buf.write) + if input_path: + c.setopt(c.UPLOAD, 1) + c.setopt(c.READFUNCTION, open(input_path, 'rb').read) + filesize = getsize(input_path) + c.setopt(c.INFILESIZE, filesize) + if data: + c.setopt(c.POST, 1) + c.setopt(c.POSTFIELDS, data) + c.perform() + if not output_path: + return buf.getvalue() + finally: + buf.close() + + def _new_curl_object(self): + try: + return Curl() + except NameError: + raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) + + def _open_output(self, output_path): + return open(output_path, 'wb') if output_path else StringIO() diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r dd4ecbbbbab33020fb5ff482fc302f092824e514 lib/galaxy/jobs/runners/lwr_client/transport/standard.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/transport/standard.py @@ -0,0 +1,33 @@ +""" +LWR HTTP Client layer based on Python Standard Library (urllib2) +""" +import mmap +import urllib2 + + +class Urllib2Transport(object): + + def _url_open(self, request, data): + return urllib2.urlopen(request, data) + + def execute(self, url, data=None, input_path=None, output_path=None): + request = urllib2.Request(url=url, data=data) + input = None + try: + if input_path: + input = open(input_path, 'rb') + data = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) + response = self._url_open(request, data) + finally: + if input: + input.close() + if output_path: + with open(output_path, 'wb') as output: + while True: + buffer = response.read(1024) + if buffer == "": + break + output.write(buffer) + return response + else: + return response.read() https://bitbucket.org/galaxy/galaxy-central/commits/ecbfab5f9f1b/ changeset: ecbfab5f9f1b user: natefoo date: 2013-02-25 21:16:30 summary: Merged in jmchilton/galaxy-central-lwr (pull request #118) Implement optional, alternative pycurl backend for LWR client. affected #: 4 files diff -r fa34924860aaa282fe3c3021a257f2523848a6e6 -r ecbfab5f9f1b070bda03520700335d800b8fc761 lib/galaxy/jobs/runners/lwr_client/__init__.py --- a/lib/galaxy/jobs/runners/lwr_client/__init__.py +++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py @@ -5,15 +5,15 @@ This module contains logic for interfacing with an external LWR server. """ -import mmap import os import re import time import urllib -import urllib2 import simplejson +from transport import get_transport + class JobInputs(object): """ @@ -254,6 +254,18 @@ return self.job_inputs.rewritten_command_line +class parseJson(object): + + def __init__(self): + pass + + def __call__(self, func): + def replacement(*args, **kwargs): + response = func(*args, **kwargs) + return simplejson.loads(response) + return replacement + + class Client(object): """ Objects of this client class perform low-level communication with a remote LWR server. @@ -283,9 +295,7 @@ self.remote_host = remote_host self.job_id = job_id self.private_key = private_key - - def _url_open(self, request, data): - return urllib2.urlopen(request, data) + self.transport = get_transport() def __build_url(self, command, args): if self.private_key: @@ -294,29 +304,20 @@ url = self.remote_host + command + "?" + data return url - def __raw_execute(self, command, args={}, data=None): + def __raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): url = self.__build_url(command, args) - request = urllib2.Request(url=url, data=data) - response = self._url_open(request, data) + response = self.transport.execute(url, data=data, input_path=input_path, output_path=output_path) return response - def __raw_execute_and_parse(self, command, args={}, data=None): - response = self.__raw_execute(command, args, data) - return simplejson.loads(response.read()) - + @parseJson() def __upload_file(self, action, path, name=None, contents=None): - input = open(path, 'rb') - try: - mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) - return self.__upload_contents(action, path, mmapped_input, name) - finally: - input.close() - - def __upload_contents(self, action, path, contents, name=None): if not name: name = os.path.basename(path) args = {"job_id": self.job_id, "name": name} - return self.__raw_execute_and_parse(action, args, contents) + input_path = path + if contents: + input_path = None + return self.__raw_execute(action, args, contents, input_path) def upload_tool_file(self, path): """ @@ -364,7 +365,7 @@ contents : str Rewritten contents of the config file to upload. """ - return self.__upload_contents("upload_config_file", path, contents) + return self.__upload_file("upload_config_file", path, contents=contents) def upload_working_directory_file(self, path): """ @@ -378,9 +379,10 @@ """ return self.__upload_file("upload_working_directory_file", path) + @parseJson() def _get_output_type(self, name): - return self.__raw_execute_and_parse("get_output_type", {"name": name, - "job_id": self.job_id}) + return self.__raw_execute("get_output_type", {"name": name, + "job_id": self.job_id}) def download_work_dir_output(self, source, working_directory, output_path): """ @@ -414,25 +416,19 @@ name = os.path.basename(path) output_type = self._get_output_type(name) if output_type == "direct": - output = open(path, "wb") + output_path = path elif output_type == "task": - output = open(os.path.join(working_directory, name), "wb") + output_path = os.path.join(working_directory, name) else: raise Exception("No remote output found for dataset with path %s" % path) - self.__raw_download_output(name, self.job_id, output_type, output) + self.__raw_download_output(name, self.job_id, output_type, output_path) - def __raw_download_output(self, name, job_id, output_type, output_file): - response = self.__raw_execute("download_output", {"name": name, - "job_id": self.job_id, - "output_type": output_type}) - try: - while True: - buffer = response.read(1024) - if buffer == "": - break - output_file.write(buffer) - finally: - output_file.close() + def __raw_download_output(self, name, job_id, output_type, output_path): + self.__raw_execute("download_output", + {"name": name, + "job_id": self.job_id, + "output_type": output_type}, + output_path=output_path) def launch(self, command_line): """ @@ -463,11 +459,12 @@ return complete_response time.sleep(1) + @parseJson() def raw_check_complete(self): """ Get check_complete response from the remote server. """ - check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id": self.job_id}) + check_complete_response = self.__raw_execute("check_complete", {"job_id": self.job_id}) return check_complete_response def check_complete(self): @@ -482,11 +479,12 @@ """ self.__raw_execute("clean", {"job_id": self.job_id}) + @parseJson() def setup(self): """ Setup remote LWR server to run this job. """ - return self.__raw_execute_and_parse("setup", {"job_id": self.job_id}) + return self.__raw_execute("setup", {"job_id": self.job_id}) def _read(path): diff -r fa34924860aaa282fe3c3021a257f2523848a6e6 -r ecbfab5f9f1b070bda03520700335d800b8fc761 lib/galaxy/jobs/runners/lwr_client/transport/__init__.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/transport/__init__.py @@ -0,0 +1,16 @@ +from standard import Urllib2Transport +from curl import PycurlTransport +import os + + +def get_transport(os_module=os): + use_curl = os_module.getenv('LWR_CURL_TRANSPORT', "0") + ## If LWR_CURL_TRANSPORT is unset or set to 0, use default, + ## else use curl. + if use_curl.isdigit() and not int(use_curl): + return Urllib2Transport() + else: + return PycurlTransport() + + +__all__ = [get_transport] diff -r fa34924860aaa282fe3c3021a257f2523848a6e6 -r ecbfab5f9f1b070bda03520700335d800b8fc761 lib/galaxy/jobs/runners/lwr_client/transport/curl.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/transport/curl.py @@ -0,0 +1,42 @@ +from cStringIO import StringIO +try: + from pycurl import Curl +except: + pass +from os.path import getsize + + +PYCURL_UNAVAILABLE_MESSAGE = \ + "You are attempting to use the Pycurl version of the LWR client by pycurl is unavailable." + + +class PycurlTransport(object): + + def execute(self, url, data=None, input_path=None, output_path=None): + buf = self._open_output(output_path) + try: + c = self._new_curl_object() + c.setopt(c.URL, url.encode('ascii')) + c.setopt(c.WRITEFUNCTION, buf.write) + if input_path: + c.setopt(c.UPLOAD, 1) + c.setopt(c.READFUNCTION, open(input_path, 'rb').read) + filesize = getsize(input_path) + c.setopt(c.INFILESIZE, filesize) + if data: + c.setopt(c.POST, 1) + c.setopt(c.POSTFIELDS, data) + c.perform() + if not output_path: + return buf.getvalue() + finally: + buf.close() + + def _new_curl_object(self): + try: + return Curl() + except NameError: + raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) + + def _open_output(self, output_path): + return open(output_path, 'wb') if output_path else StringIO() diff -r fa34924860aaa282fe3c3021a257f2523848a6e6 -r ecbfab5f9f1b070bda03520700335d800b8fc761 lib/galaxy/jobs/runners/lwr_client/transport/standard.py --- /dev/null +++ b/lib/galaxy/jobs/runners/lwr_client/transport/standard.py @@ -0,0 +1,33 @@ +""" +LWR HTTP Client layer based on Python Standard Library (urllib2) +""" +import mmap +import urllib2 + + +class Urllib2Transport(object): + + def _url_open(self, request, data): + return urllib2.urlopen(request, data) + + def execute(self, url, data=None, input_path=None, output_path=None): + request = urllib2.Request(url=url, data=data) + input = None + try: + if input_path: + input = open(input_path, 'rb') + data = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ) + response = self._url_open(request, data) + finally: + if input: + input.close() + if output_path: + with open(output_path, 'wb') as output: + while True: + buffer = response.read(1024) + if buffer == "": + break + output.write(buffer) + return response + else: + return response.read() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org