1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/35afd36f9014/ Changeset: 35afd36f9014 User: dannon Date: 2014-05-19 22:56:28 Summary: Merge from next-stable. Affected #: 7 files diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -18,6 +18,7 @@ from galaxy.tools.data_manager.manager import DataManagers from galaxy.jobs import metrics as job_metrics from galaxy.web.base import pluginframework +from galaxy.queue_worker import GalaxyQueueWorker import logging log = logging.getLogger( __name__ ) @@ -146,6 +147,10 @@ # Initialize the external service types self.external_service_types = external_service_types.ExternalServiceTypesCollection( self.config.external_service_type_config_file, self.config.external_service_type_path, self ) self.model.engine.dispose() + self.control_worker = GalaxyQueueWorker(self, + galaxy.queues.control_queue_from_config(self.config), + galaxy.queue_worker.control_message_to_task) + self.control_worker.start() def shutdown( self ): self.job_manager.shutdown() diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -340,6 +340,17 @@ amqp_config = {} for k, v in amqp_config: self.amqp[k] = v + # Galaxy internal control queue configuration. + # If specified in universe, use it, otherwise we use whatever 'real' + # database is specified. Lastly, we create and use new sqlite database + # (to minimize locking) as a final option. + if 'amqp_internal_connection' in kwargs: + self.amqp_internal_connection = kwargs.get('amqp_internal_connection') + # TODO Get extra amqp args as necessary for ssl + elif 'database_connection' in kwargs: + self.amqp_internal_connection = "sqlalchemy+"+self.database_connection + else: + self.amqp_internal_connection = "sqlalchemy+sqlite:///%s?isolation_level=IMMEDIATE" % resolve_path( "database/control.sqlite", self.root ) self.biostar_url = kwargs.get( 'biostar_url', None ) self.biostar_key_name = kwargs.get( 'biostar_key_name', None ) self.biostar_key = kwargs.get( 'biostar_key', None ) diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e lib/galaxy/queue_worker.py --- /dev/null +++ b/lib/galaxy/queue_worker.py @@ -0,0 +1,94 @@ +""" +Galaxy control queue and worker. This is used to handle 'app' control like +reloading the toolbox, etc., across multiple processes. +""" + +import logging +import threading +import galaxy.queues +from galaxy import eggs, util +eggs.require('kombu') + +from kombu import Connection +from kombu.mixins import ConsumerMixin +from kombu.pools import producers + + +log = logging.getLogger(__name__) + + +class GalaxyQueueWorker(ConsumerMixin, threading.Thread): + """ + This is a flexible worker for galaxy's queues. Each process, web or + handler, will have one of these used for dispatching so called 'control' + tasks. + """ + def __init__(self, app, queue, task_mapping): + super(GalaxyQueueWorker, self).__init__() + log.info("Initalizing Galaxy Queue Worker on %s" % app.config.amqp_internal_connection) + self.connection = Connection(app.config.amqp_internal_connection) + self.app = app + # Eventually we may want different workers w/ their own queues and task + # mappings. Right now, there's only the one. + self.control_queue = queue + self.task_mapping = task_mapping + self.declare_queues = galaxy.queues.all_control_queues_for_declare(app.config) + # TODO we may want to purge the queue at the start to avoid executing + # stale 'reload_tool', etc messages. This can happen if, say, a web + # process goes down and messages get sent before it comes back up. + # Those messages will no longer be useful (in any current case) + + def get_consumers(self, Consumer, channel): + return [Consumer(queues=self.control_queue, + callbacks=[self.process_task])] + + def process_task(self, body, message): + if body['task'] in self.task_mapping: + if body.get('noop', None) != self.app.config.server_name: + try: + f = self.task_mapping[body['task']] + log.debug("Instance recieved '%s' task, executing now." % body['task']) + f(self.app, **body['kwargs']) + except Exception: + # this shouldn't ever throw an exception, but... + log.exception("Error running control task type: %s" % body['task']) + else: + log.warning("Recieved a malformed task message:\n%s" % body) + message.ack() + + +def send_control_task(trans, task, noop_self=False, kwargs={}): + log.info("Sending %s control task." % task) + payload = {'task': task, + 'kwargs': kwargs} + if noop_self: + payload['noop'] = trans.app.config.server_name + c = Connection(trans.app.config.amqp_internal_connection) + with producers[c].acquire(block=True) as producer: + producer.publish(payload, exchange=galaxy.queues.galaxy_exchange, + declare=[galaxy.queues.galaxy_exchange] + galaxy.queues.all_control_queues_for_declare(trans.app.config), + routing_key='control') + + +# Tasks -- to be reorganized into a separate module as appropriate. This is +# just an example method. Ideally this gets pushed into atomic tasks, whether +# where they're currently invoked, or elsewhere. (potentially using a dispatch +# decorator). +def reload_tool(app, **kwargs): + params = util.Params(kwargs) + tool_id = params.get('tool_id', None) + log.debug("Executing reload tool task for %s" % tool_id) + if tool_id: + app.toolbox.reload_tool_by_id( tool_id ) + else: + log.error("Reload tool invoked without tool id.") + + +def reload_tool_data_tables(app, **kwargs): + params = util.Params(kwargs) + log.debug("Executing tool data table reload for %s" % params.get('table_names', 'all tables')) + table_names = app.tool_data_tables.reload_tables( table_names=params.get('table_name', None)) + log.debug("Finished data table reload for %s" % table_names) + +control_message_to_task = { 'reload_tool': reload_tool, + 'reload_tool_data_tables': reload_tool_data_tables} diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e lib/galaxy/queues.py --- /dev/null +++ b/lib/galaxy/queues.py @@ -0,0 +1,32 @@ +""" + +All message queues used by Galaxy + +""" + +from galaxy import eggs + +eggs.require("kombu") +from kombu import Exchange, Queue + +ALL_CONTROL = "control.*" +galaxy_exchange = Exchange('galaxy_core_exchange', type='topic') + + +def all_control_queues_for_declare(config): + """ + For in-memory routing (used by sqlalchemy-based transports), we need to be able to + build the entire routing table in producers. + + Refactor later to actually persist this somewhere instead of building it repeatedly. + """ + return [Queue('control.%s' % q, galaxy_exchange, routing_key='control') for q in config.server_names] + + +def control_queue_from_config(config): + """ + Returns a Queue instance with the correct name and routing key for this + galaxy process's config + """ + return Queue("control.%s" % config.server_name, galaxy_exchange, + routing_key='control') diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e lib/galaxy/web/base/controllers/admin.py --- a/lib/galaxy/web/base/controllers/admin.py +++ b/lib/galaxy/web/base/controllers/admin.py @@ -5,6 +5,7 @@ from galaxy.util import inflector from galaxy.web.form_builder import CheckboxField from string import punctuation as PUNCTUATION +import galaxy.queue_worker log = logging.getLogger( __name__ ) @@ -62,8 +63,9 @@ toolbox = self.app.toolbox tool_id = None if params.get( 'reload_tool_button', False ): - tool_id = params.tool_id - message, status = toolbox.reload_tool_by_id( tool_id ) + tool_id = params.get('tool_id', None) + galaxy.queue_worker.send_control_task(trans, 'reload_tool', noop_self=True, kwargs={'tool_id': tool_id} ) + message, status = trans.app.toolbox.reload_tool_by_id( tool_id) return trans.fill_template( '/admin/reload_tool.mako', tool_id=tool_id, toolbox=toolbox, diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e lib/galaxy/webapps/galaxy/controllers/data_manager.py --- a/lib/galaxy/webapps/galaxy/controllers/data_manager.py +++ b/lib/galaxy/webapps/galaxy/controllers/data_manager.py @@ -1,6 +1,7 @@ +import galaxy.queue_worker from galaxy import web +from galaxy.util.json import from_json_string from galaxy.web.base.controller import BaseUIController -from galaxy.util.json import from_json_string import pkg_resources; pkg_resources.require( "Paste" ) @@ -90,6 +91,9 @@ table_name = table_name.split( "," ) # Reload the tool data tables table_names = self.app.tool_data_tables.reload_tables( table_names=table_name ) + galaxy.queue_worker.send_control_task(trans, 'reload_tool_data_tables', + noop_self=True, + kwargs={'table_name': table_name} ) redirect_url = None if table_names: status = 'done' diff -r 174b6a2281e53793a8188ba2b2792d0dd220d362 -r 35afd36f9014915c4c82f65b1c52a6636f48328e universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -821,7 +821,21 @@ #user_tool_section_filters = examples:restrict_text #user_tool_label_filters = examples:restrict_upload_to_admins, examples:restrict_encode -# ---- Galaxy Message Queue ------------------------------------------------- +# Galaxy Application Internal Message Queue + +# Galaxy uses AMQP internally TODO more documentation on what for. +# For examples, see http://ask.github.io/kombu/userguide/connections.html +# +# Without specifying anything here, galaxy will first attempt to use your +# specified database_connection above. If that's not specified either, Galaxy +# will automatically create and use a separate sqlite database located in your +# <galaxy>/database folder (indicated in the commented out line below). + +#amqp_internal_connection = "sqlite:///./database/control.sqlite?isolation_level=IMMEDIATE" + + + +# ---- Galaxy External Message Queue ------------------------------------------------- # Galaxy uses AMQ protocol to receive messages from external sources like # bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.