2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/0405422ab342/ Changeset: 0405422ab342 User: natefoo Date: 2014-05-13 22:22:03 Summary: Allow specification of SSL client authentication parameters to the kombu connection. This is done via the `amqp_connect_ssl_*` options in `server.ini`. Affected #: 5 files diff -r e5b545b85b836b6f1be76164543d7e4613f25d87 -r 0405422ab3422f1b1373c0b00398885a4d8423f6 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -41,6 +41,16 @@ <!-- If multiple managers configured on the LWR, specify which one this plugin targets. --><!-- <param id="manager">_default_</param> --> + <!-- The AMQP client can provide an SSL client certificate (e.g. for + validation), the following options configure that certificate + (see for reference: + http://kombu.readthedocs.org/en/latest/reference/kombu.connection.html + ). If you simply want to use SSL but not use/validate a client + cert, just use the ?ssl=1 query on the amqp URL instead. --> + <!-- <param id="amqp_connect_ssl_ca_certs">/path/to/cacert.pem</param> --> + <!-- <param id="amqp_connect_ssl_keyfile">/path/to/key.pem</param> --> + <!-- <param id="amqp_connect_ssl_certfile">/path/to/cert.pem</param> --> + <!-- <param id="amqp_connect_ssl_cert_reqs">cert_required</param> --></plugin><plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /><plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> diff -r e5b545b85b836b6f1be76164543d7e4613f25d87 -r 0405422ab3422f1b1373c0b00398885a4d8423f6 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -40,13 +40,17 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL ): + def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL, **kwds ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) self.async_status_updates = dict() self._init_monitor_thread() self._init_worker_threads() - client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url} + amqp_connect_ssl_args = {} + for kwd in kwds.keys(): + if kwd.startswith('amqp_connect_ssl_'): + amqp_connect_ssl_args[kwd] = kwds[kwd] + client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url, 'amqp_connect_ssl_args': amqp_connect_ssl_args or None} self.galaxy_url = galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) diff -r e5b545b85b836b6f1be76164543d7e4613f25d87 -r 0405422ab3422f1b1373c0b00398885a4d8423f6 lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -30,13 +30,14 @@ name _default_. """ - def __init__(self, url, manager_name, timeout=DEFAULT_TIMEOUT): + def __init__(self, url, manager_name, connect_ssl=None, timeout=DEFAULT_TIMEOUT): """ """ if not kombu: raise Exception(KOMBU_UNAVAILABLE) self.__url = url self.__manager_name = manager_name + self.__connect_ssl = connect_ssl self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) self.__timeout = timeout @@ -46,7 +47,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): queue = self.__queue(queue_name) - with self.connection(self.__url, **connection_kwargs) as connection: + with self.connection(self.__url, ssl=self.__connect_ssl, **connection_kwargs) as connection: with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): while check: try: @@ -55,7 +56,7 @@ pass def publish(self, name, payload): - with self.connection(self.__url) as connection: + with self.connection(self.__url, ssl=self.__connect_ssl) as connection: with pools.producers[connection].acquire() as producer: key = self.__queue_name(name) producer.publish( diff -r e5b545b85b836b6f1be76164543d7e4613f25d87 -r 0405422ab3422f1b1373c0b00398885a4d8423f6 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -13,6 +13,7 @@ from .object_client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager +from .util import parse_amqp_connect_ssl_params from .destination import url_to_destination_params from .amqp_exchange import LwrExchange @@ -76,7 +77,8 @@ def __init__(self, **kwds): self.url = kwds.get('url') self.manager_name = kwds.get("manager", "_default_") - self.exchange = LwrExchange(self.url, self.manager_name) + self.connect_ssl = parse_amqp_connect_ssl_params(kwds.get('amqp_connect_ssl_args', None)) + self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl) self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None diff -r e5b545b85b836b6f1be76164543d7e4613f25d87 -r 0405422ab3422f1b1373c0b00398885a4d8423f6 lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -62,6 +62,21 @@ contents.append(name) return contents +def parse_amqp_connect_ssl_params(params): + ssl = None + rval = None + ssl_options = filter(lambda x: x.startswith('amqp_connect_ssl_'), params.keys()) + if ssl_options: + ssl = __import__('ssl') + rval = {} + for option in ssl_options: + value = params.get(option) + option = option.replace('amqp_connect_ssl_', '', 1) + if option == 'cert_reqs': + value = getattr(ssl, value.upper()) + rval[option] = value + return rval + def filter_destination_params(destination_params, prefix): destination_params = destination_params or {} https://bitbucket.org/galaxy/galaxy-central/commits/b4680b54994a/ Changeset: b4680b54994a User: jmchilton Date: 2014-05-13 22:38:04 Summary: Merged in natefoo/galaxy-central (pull request #392) SSL client certificate support for LWR AMQP connections Affected #: 5 files diff -r d963b05c497cc17d36a16696077e5a0f3158e52b -r b4680b54994ad6c62f05d354a2af5743344152e8 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -41,6 +41,16 @@ <!-- If multiple managers configured on the LWR, specify which one this plugin targets. --><!-- <param id="manager">_default_</param> --> + <!-- The AMQP client can provide an SSL client certificate (e.g. for + validation), the following options configure that certificate + (see for reference: + http://kombu.readthedocs.org/en/latest/reference/kombu.connection.html + ). If you simply want to use SSL but not use/validate a client + cert, just use the ?ssl=1 query on the amqp URL instead. --> + <!-- <param id="amqp_connect_ssl_ca_certs">/path/to/cacert.pem</param> --> + <!-- <param id="amqp_connect_ssl_keyfile">/path/to/key.pem</param> --> + <!-- <param id="amqp_connect_ssl_certfile">/path/to/cert.pem</param> --> + <!-- <param id="amqp_connect_ssl_cert_reqs">cert_required</param> --></plugin><plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /><plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> diff -r d963b05c497cc17d36a16696077e5a0f3158e52b -r b4680b54994ad6c62f05d354a2af5743344152e8 lib/galaxy/jobs/runners/lwr.py --- a/lib/galaxy/jobs/runners/lwr.py +++ b/lib/galaxy/jobs/runners/lwr.py @@ -40,13 +40,17 @@ """ runner_name = "LWRRunner" - def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL ): + def __init__( self, app, nworkers, transport=None, cache=None, url=None, galaxy_url=DEFAULT_GALAXY_URL, **kwds ): """Start the job runner """ super( LwrJobRunner, self ).__init__( app, nworkers ) self.async_status_updates = dict() self._init_monitor_thread() self._init_worker_threads() - client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url} + amqp_connect_ssl_args = {} + for kwd in kwds.keys(): + if kwd.startswith('amqp_connect_ssl_'): + amqp_connect_ssl_args[kwd] = kwds[kwd] + client_manager_kwargs = {'transport_type': transport, 'cache': string_as_bool_or_none(cache), "url": url, 'amqp_connect_ssl_args': amqp_connect_ssl_args or None} self.galaxy_url = galaxy_url self.client_manager = build_client_manager(**client_manager_kwargs) diff -r d963b05c497cc17d36a16696077e5a0f3158e52b -r b4680b54994ad6c62f05d354a2af5743344152e8 lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py --- a/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py +++ b/lib/galaxy/jobs/runners/lwr_client/amqp_exchange.py @@ -30,13 +30,14 @@ name _default_. """ - def __init__(self, url, manager_name, timeout=DEFAULT_TIMEOUT): + def __init__(self, url, manager_name, connect_ssl=None, timeout=DEFAULT_TIMEOUT): """ """ if not kombu: raise Exception(KOMBU_UNAVAILABLE) self.__url = url self.__manager_name = manager_name + self.__connect_ssl = connect_ssl self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) self.__timeout = timeout @@ -46,7 +47,7 @@ def consume(self, queue_name, callback, check=True, connection_kwargs={}): queue = self.__queue(queue_name) - with self.connection(self.__url, **connection_kwargs) as connection: + with self.connection(self.__url, ssl=self.__connect_ssl, **connection_kwargs) as connection: with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): while check: try: @@ -55,7 +56,7 @@ pass def publish(self, name, payload): - with self.connection(self.__url) as connection: + with self.connection(self.__url, ssl=self.__connect_ssl) as connection: with pools.producers[connection].acquire() as producer: key = self.__queue_name(name) producer.publish( diff -r d963b05c497cc17d36a16696077e5a0f3158e52b -r b4680b54994ad6c62f05d354a2af5743344152e8 lib/galaxy/jobs/runners/lwr_client/manager.py --- a/lib/galaxy/jobs/runners/lwr_client/manager.py +++ b/lib/galaxy/jobs/runners/lwr_client/manager.py @@ -13,6 +13,7 @@ from .object_client import ObjectStoreClient from .transport import get_transport from .util import TransferEventManager +from .util import parse_amqp_connect_ssl_params from .destination import url_to_destination_params from .amqp_exchange import LwrExchange @@ -76,7 +77,8 @@ def __init__(self, **kwds): self.url = kwds.get('url') self.manager_name = kwds.get("manager", "_default_") - self.exchange = LwrExchange(self.url, self.manager_name) + self.connect_ssl = parse_amqp_connect_ssl_params(kwds.get('amqp_connect_ssl_args', None)) + self.exchange = LwrExchange(self.url, self.manager_name, self.connect_ssl) self.status_cache = {} self.callback_lock = threading.Lock() self.callback_thread = None diff -r d963b05c497cc17d36a16696077e5a0f3158e52b -r b4680b54994ad6c62f05d354a2af5743344152e8 lib/galaxy/jobs/runners/lwr_client/util.py --- a/lib/galaxy/jobs/runners/lwr_client/util.py +++ b/lib/galaxy/jobs/runners/lwr_client/util.py @@ -62,6 +62,21 @@ contents.append(name) return contents +def parse_amqp_connect_ssl_params(params): + ssl = None + rval = None + ssl_options = filter(lambda x: x.startswith('amqp_connect_ssl_'), params.keys()) + if ssl_options: + ssl = __import__('ssl') + rval = {} + for option in ssl_options: + value = params.get(option) + option = option.replace('amqp_connect_ssl_', '', 1) + if option == 'cert_reqs': + value = getattr(ssl, value.upper()) + rval[option] = value + return rval + def filter_destination_params(destination_params, prefix): destination_params = destination_params or {} Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.