2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/8c569cffa967/ Changeset: 8c569cffa967 User: jmchilton Date: 2014-06-20 07:28:36 Summary: Implement Pulsar job runners. LWR runner and client remains in Galaxy as is and can continue to target old LWR server. But these should be considered deprecated and will likely be dropped at some point. The LWR documentaiton in job_conf.xml.sample_advanced has been replaced with Pulsar documentation to reflect this. Pulsar runners will need to target a Pulsar server (now with a RESTful web interface - MQ option also available). Information on upgrading an LWR instance to pulsar can be found in the [Pulsar docs](http://pulsar.readthedocs.org/en/latest/#upgrading-from-the-lwr). Three new job runners have been added to replace the LWR runner. The `PulsarLegacyJobRunner` more or less has all of the old defaults that the LWR runner had and this one should be easiest to get working for people who are porting over old LWR servers (and will likely be most compatible with remote Windows hosts). The `PulsarRESTJobRunner` targets a remote Pulsar server over HTTP(S) but has newer standard configuration options such as rewriting parameters at tool stage by default, remote dependency evaluation by default, etc.... The `PulsarMQJobRunner` targets a remote Pulsar server via AMQP - and has the newer defaults of the RESTful runner as well as targetting the Galaxy job files API by default. The `url` runner parameter has been renamed `amqp_url` for clarity. These newer runners improve parameter handling in other ways such as warning deployers when ignored parameters are supplied. Affected #: 32 files diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -19,27 +19,30 @@ <!-- Override the $DRMAA_LIBRARY_PATH environment variable --><param id="drmaa_library_path">/sge/lib/libdrmaa.so</param></plugin> - <plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <!-- More information on LWR can be found at https://lwr.readthedocs.org --> - <!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --> + <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> + <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> + <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <!-- Pulsar runners (see more at https://pulsar.readthedocs.org) --> + <plugin id="pulsar_rest" type="runner" load="galaxy.jobs.runners.pulsar:PulsarRESTJobRunner"> + <!-- Allow optimized HTTP calls with libcurl (defaults to urllib) --><!-- <param id="transport">curl</param> --> - <!-- *Experimental Caching*: Uncomment next parameters to enable - caching and specify the number of caching threads to enable on Galaxy - side. Likely will not work with newer features such as MQ support. - If this is enabled be sure to specify a `file_cache_dir` in the remote - LWR's main configuration file. + + <!-- *Experimental Caching*: Next parameter enables caching. + Likely will not work with newer features such as MQ support. + + If this is enabled be sure to specify a `file_cache_dir` in + the remote Pulsar's servers main configuration file. --><!-- <param id="cache">True</param> --> - <!-- <param id="transfer_threads">2</param> --></plugin> - <plugin id="amqp_lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <param id="url">amqp://guest:guest@localhost:5672//</param> - <!-- If using message queue driven LWR - the LWR will generally - initiate file transfers so a the URL of this Galaxy instance - must be configured. --> + <plugin id="pulsar_mq" type="runner" load="galaxy.jobs.runners.pulsar:PulsarMQJobRunner"> + <!-- AMQP URL to connect to. --> + <param id="amqp_url">amqp://guest:guest@localhost:5672//</param> + <!-- URL remote Pulsar apps should transfer files to this Galaxy + instance to/from. --><param id="galaxy_url">http://localhost:8080</param> - <!-- If multiple managers configured on the LWR, specify which one - this plugin targets. --> + <!-- Pulsar job manager to communicate with (see Pulsar + docs for information on job managers). --><!-- <param id="manager">_default_</param> --><!-- The AMQP client can provide an SSL client certificate (e.g. for validation), the following options configure that certificate @@ -58,9 +61,17 @@ higher value (in seconds) (or `None` to use blocking connections). --><!-- <param id="amqp_consumer_timeout">None</param> --></plugin> - <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> - <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> - <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <plugin id="pulsar_legacy" type="runner" load="galaxy.jobs.runners.pulsar:PulsarLegacyJobRunner"> + <!-- Pulsar job runner with default parameters matching those + of old LWR job runner. If your Pulsar server is running on a + Windows machine for instance this runner should still be used. + + These destinations still needs to target a Pulsar server, + older LWR plugins and destinations still work in Galaxy can + target LWR servers, but this support should be considered + deprecated and will disappear with a future release of Galaxy. + --> + </plugin></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -125,8 +136,8 @@ $galaxy_root:ro,$tool_directory:ro,$working_directory:rw,$default_file_path:ro - If using the LWR, defaults will be even further restricted because the - LWR will (by default) stage all needed inputs into the job's job_directory + If using the Pulsar, defaults will be even further restricted because the + Pulsar will (by default) stage all needed inputs into the job's job_directory (so there is not need to allow the docker container to read all the files - let alone write over them). Defaults in this case becomes: @@ -135,7 +146,7 @@ Python string.Template is used to expand volumes and values $defaults, $galaxy_root, $default_file_path, $tool_directory, $working_directory, are available to all jobs and $job_directory is also available for - LWR jobs. + Pulsar jobs. --><!-- Control memory allocatable by docker container with following option: --> @@ -213,87 +224,71 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> - <destination id="secure_lwr" runner="lwr"> - <param id="url">https://windowshost.examle.com:8913/</param> - <!-- If set, private_token must match token remote LWR server configured with. --> + <destination id="secure_pulsar_rest_dest" runner="pulsar_rest"> + <param id="url">https://examle.com:8913/</param> + <!-- If set, private_token must match token in remote Pulsar's + configuration. --><param id="private_token">123456789changeme</param><!-- Uncomment the following statement to disable file staging (e.g. - if there is a shared file system between Galaxy and the LWR + if there is a shared file system between Galaxy and the Pulsar server). Alternatively action can be set to 'copy' - to replace http transfers with file system copies, 'remote_transfer' to cause - the lwr to initiate HTTP transfers instead of Galaxy, or - 'remote_copy' to cause lwr to initiate file system copies. + the Pulsar to initiate HTTP transfers instead of Galaxy, or + 'remote_copy' to cause Pulsar to initiate file system copies. If setting this to 'remote_transfer' be sure to specify a 'galaxy_url' attribute on the runner plugin above. --><!-- <param id="default_file_action">none</param> --><!-- The above option is just the default, the transfer behavior none|copy|http can be configured on a per path basis via the - following file. See lib/galaxy/jobs/runners/lwr_client/action_mapper.py - for examples of how to configure this file. This is very beta - and nature of file will likely change. + following file. See Pulsar documentation for more details and + examples. --> - <!-- <param id="file_action_config">file_actions.json</param> --> - <!-- Uncomment following option to disable Galaxy tool dependency - resolution and utilize remote LWR's configuraiton of tool - dependency resolution instead (same options as Galaxy for - dependency resolution are available in LWR). At a minimum - the remote LWR server should define a tool_dependencies_dir in - its `server.ini` configuration. The LWR will not attempt to - stage dependencies - so ensure the the required galaxy or tool - shed packages are available remotely (exact same tool shed - installed changesets are required). + <!-- <param id="file_action_config">file_actions.yaml</param> --> + <!-- The non-legacy Pulsar runners will attempt to resolve Galaxy + dependencies remotely - to enable this set a tool_dependency_dir + in Pulsar's configuration (can work with all the same dependency + resolutions mechanisms as Galaxy - tool Shed installs, Galaxy + packages, etc...). To disable this behavior, set the follow parameter + to none. To generate the dependency resolution command locally + set the following parameter local. --> - <!-- <param id="dependency_resolution">remote</params> --> - <!-- Traditionally, the LWR allow Galaxy to generate a command line - as if it were going to run the command locally and then the - LWR client rewrites it after the fact using regular - expressions. Setting the following value to true causes the - LWR runner to insert itself into the command line generation - process and generate the correct command line from the get go. - This will likely be the default someday - but requires a newer - LWR version and is less well tested. --> - <!-- <param id="rewrite_parameters">true</params> --> + <!-- <param id="dependency_resolution">none</params> --><!-- Uncomment following option to enable setting metadata on remote - LWR server. The 'use_remote_datatypes' option is available for + Pulsar server. The 'use_remote_datatypes' option is available for determining whether to use remotely configured datatypes or local ones (both alternatives are a little brittle). --><!-- <param id="remote_metadata">true</param> --><!-- <param id="use_remote_datatypes">false</param> --><!-- <param id="remote_property_galaxy_home">/path/to/remote/galaxy-central</param> --> - <!-- If remote LWR server is configured to run jobs as the real user, + <!-- If remote Pulsar server is configured to run jobs as the real user, uncomment the following line to pass the current Galaxy user along. --><!-- <param id="submit_user">$__user_name__</param> --> - <!-- Various other submission parameters can be passed along to the LWR - whose use will depend on the remote LWR's configured job manager. + <!-- Various other submission parameters can be passed along to the Pulsar + whose use will depend on the remote Pulsar's configured job manager. For instance: --> - <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- Disable parameter rewriting and rewrite generated commands + instead. This may be required if remote host is Windows machine + but probably not otherwise. + --> + <!-- <param id="rewrite_parameters">false</params> --></destination> - <destination id="amqp_lwr_dest" runner="amqp_lwr" > - <!-- url and private_token are not valid when using MQ driven LWR. The plugin above - determines which queue/manager to target and the underlying MQ server should be - used to configure security. - --> - <!-- Traditionally, the LWR client sends request to LWR - server to populate various system properties. This + <destination id="pulsar_mq_dest" runner="amqp_pulsar" > + <!-- The RESTful Pulsar client sends a request to Pulsar + to populate various system properties. This extra step can be disabled and these calculated here on client by uncommenting jobs_directory and specifying any additional remote_property_ of interest, this is not optional when using message queues. --> - <param id="jobs_directory">/path/to/remote/lwr/lwr_staging/</param> - <!-- Default the LWR send files to and pull files from Galaxy when - using message queues (in the more traditional mode Galaxy sends - files to and pull files from the LWR - this is obviously less - appropriate when using a message queue). - - The default_file_action currently requires pycurl be available - to Galaxy (presumably in its virtualenv). Making this dependency - optional is an open task. + <param id="jobs_directory">/path/to/remote/pulsar/files/staging/</param> + <!-- Otherwise MQ and Legacy pulsar destinations can be supplied + all the same destination parameters as the RESTful client documented + above (though url and private_token are ignored when using a MQ). --> - <param id="default_file_action">remote_transfer</param></destination><destination id="ssh_torque" runner="cli"><param id="shell_plugin">SecureShell</param> diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/jobs/runners/pulsar.py --- /dev/null +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -0,0 +1,707 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. + +import logging + +from galaxy import model +from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from galaxy.jobs import ComputeEnvironment +from galaxy.jobs import JobDestination +from galaxy.jobs.command_factory import build_command +from galaxy.tools.deps import dependencies +from galaxy.util import string_as_bool_or_none +from galaxy.util.bunch import Bunch +from galaxy.util import specs + +import errno +from time import sleep +import os + +from pulsar.client import build_client_manager +from pulsar.client import url_to_destination_params +from pulsar.client import finish_job as pulsar_finish_job +from pulsar.client import submit_job as pulsar_submit_job +from pulsar.client import ClientJobDescription +from pulsar.client import PulsarOutputs +from pulsar.client import ClientOutputs +from pulsar.client import PathMapper + +log = logging.getLogger( __name__ ) + +__all__ = [ 'PulsarLegacyJobRunner', 'PulsarRESTJobRunner', 'PulsarMQJobRunner' ] + +NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory." +NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml." +GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server." + +# Is there a good way to infer some default for this? Can only use +# url_for from web threads. https://gist.github.com/jmchilton/9098762 +DEFAULT_GALAXY_URL = "http://localhost:8080" + +PULSAR_PARAM_SPECS = dict( + transport=dict( + map=specs.to_str_or_none, + valid=specs.is_in("urllib", "curl", None), + default=None + ), + cache=dict( + map=specs.to_bool_or_none, + default=None, + ), + amqp_url=dict( + map=specs.to_str_or_none, + default=None, + ), + galaxy_url=dict( + map=specs.to_str_or_none, + default=DEFAULT_GALAXY_URL, + ), + manager=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_consumer_timeout=dict( + map=lambda val: None if val == "None" else float(val), + default=None, + ), + amqp_connect_ssl_ca_certs=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_keyfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_certfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_cert_reqs=dict( + map=specs.to_str_or_none, + default=None, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Producer.p... + amqp_publish_retry=dict( + map=specs.to_bool, + default=False, + ), + amqp_publish_priority=dict( + map=int, + valid=lambda x: 0 <= x and x <= 9, + default=0, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Exchange.d... + amqp_publish_delivery_mode=dict( + map=str, + valid=specs.is_in("transient", "persistent"), + default="persistent", + ), + amqp_publish_retry_max_retries=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_start=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_step=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_max=dict( + map=int, + default=None, + ), +) + + +PARAMETER_SPECIFICATION_REQUIRED = object() +PARAMETER_SPECIFICATION_IGNORED = object() + + +class PulsarJobRunner( AsynchronousJobRunner ): + """ + Pulsar Job Runner + """ + runner_name = "PulsarJobRunner" + + def __init__( self, app, nworkers, **kwds ): + """Start the job runner """ + super( PulsarJobRunner, self ).__init__( app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds ) + self._init_worker_threads() + galaxy_url = self.runner_params.galaxy_url + if galaxy_url: + galaxy_url = galaxy_url.rstrip("/") + self.galaxy_url = galaxy_url + self.__init_client_manager() + self._monitor() + + def _monitor( self ): + # Extension point allow MQ variant to setup callback instead + self._init_monitor_thread() + + def __init_client_manager( self ): + client_manager_kwargs = {} + for kwd in 'manager', 'cache', 'transport': + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + for kwd in self.runner_params.keys(): + if kwd.startswith( 'amqp_' ): + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + self.client_manager = build_client_manager(**client_manager_kwargs) + + def url_to_destination( self, url ): + """Convert a legacy URL to a job destination""" + return JobDestination( runner="pulsar", params=url_to_destination_params( url ) ) + + def check_watched_item(self, job_state): + try: + client = self.get_client_from_state(job_state) + status = client.get_status() + except Exception: + # An orphaned job was put into the queue at app startup, so remote server went down + # either way we are done I guess. + self.mark_as_finished(job_state) + return None + job_state = self._update_job_state_for_status(job_state, status) + return job_state + + def _update_job_state_for_status(self, job_state, pulsar_status): + if pulsar_status == "complete": + self.mark_as_finished(job_state) + return None + if pulsar_status == "failed": + self.fail_job(job_state) + return None + if pulsar_status == "running" and not job_state.running: + job_state.running = True + job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + return job_state + + def queue_job(self, job_wrapper): + job_destination = job_wrapper.job_destination + self._populate_parameter_defaults( job_destination ) + + command_line, client, remote_job_config, compute_environment = self.__prepare_job( job_wrapper, job_destination ) + + if not command_line: + return + + try: + dependencies_description = PulsarJobRunner.__dependencies_description( client, job_wrapper ) + rewrite_paths = not PulsarJobRunner.__rewrite_parameters( client ) + unstructured_path_rewrites = {} + if compute_environment: + unstructured_path_rewrites = compute_environment.unstructured_path_rewrites + + client_job_description = ClientJobDescription( + command_line=command_line, + input_files=self.get_input_files(job_wrapper), + client_outputs=self.__client_outputs(client, job_wrapper), + working_directory=job_wrapper.working_directory, + tool=job_wrapper.tool, + config_files=job_wrapper.extra_filenames, + dependencies_description=dependencies_description, + env=client.env, + rewrite_paths=rewrite_paths, + arbitrary_files=unstructured_path_rewrites, + ) + job_id = pulsar_submit_job(client, client_job_description, remote_job_config) + log.info("Pulsar job submitted with job_id %s" % job_id) + job_wrapper.set_job_destination( job_destination, job_id ) + job_wrapper.change_state( model.Job.states.QUEUED ) + except Exception: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + + pulsar_job_state = AsynchronousJobState() + pulsar_job_state.job_wrapper = job_wrapper + pulsar_job_state.job_id = job_id + pulsar_job_state.old_state = True + pulsar_job_state.running = False + pulsar_job_state.job_destination = job_destination + self.monitor_job(pulsar_job_state) + + def __prepare_job(self, job_wrapper, job_destination): + """ Build command-line and Pulsar client for this job. """ + command_line = None + client = None + remote_job_config = None + compute_environment = None + try: + client = self.get_client_from_wrapper(job_wrapper) + tool = job_wrapper.tool + remote_job_config = client.setup(tool.id, tool.version) + rewrite_parameters = PulsarJobRunner.__rewrite_parameters( client ) + prepare_kwds = {} + if rewrite_parameters: + compute_environment = PulsarComputeEnvironment( client, job_wrapper, remote_job_config ) + prepare_kwds[ 'compute_environment' ] = compute_environment + job_wrapper.prepare( **prepare_kwds ) + self.__prepare_input_files_locally(job_wrapper) + remote_metadata = PulsarJobRunner.__remote_metadata( client ) + dependency_resolution = PulsarJobRunner.__dependency_resolution( client ) + metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) + remote_command_params = dict( + working_directory=remote_job_config['working_directory'], + metadata_kwds=metadata_kwds, + dependency_resolution=dependency_resolution, + ) + remote_working_directory = remote_job_config['working_directory'] + # TODO: Following defs work for Pulsar, always worked for Pulsar but should be + # calculated at some other level. + remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir)) + remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files")) + container = self._find_container( + job_wrapper, + compute_working_directory=remote_working_directory, + compute_tool_directory=remote_tool_directory, + compute_job_directory=remote_job_directory, + ) + command_line = build_command( + self, + job_wrapper=job_wrapper, + container=container, + include_metadata=remote_metadata, + include_work_dir_outputs=False, + remote_command_params=remote_command_params, + ) + except Exception: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + + # If we were able to get a command line, run the job + if not command_line: + job_wrapper.finish( '', '' ) + + return command_line, client, remote_job_config, compute_environment + + def __prepare_input_files_locally(self, job_wrapper): + """Run task splitting commands locally.""" + prepare_input_files_cmds = getattr(job_wrapper, 'prepare_input_files_cmds', None) + if prepare_input_files_cmds is not None: + for cmd in prepare_input_files_cmds: # run the commands to stage the input files + if 0 != os.system(cmd): + raise Exception('Error running file staging command: %s' % cmd) + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + + def _populate_parameter_defaults( self, job_destination ): + updated = False + params = job_destination.params + for key, value in self.destination_defaults.iteritems(): + if key in params: + if value is PARAMETER_SPECIFICATION_IGNORED: + log.warn( "Pulsar runner in selected configuration ignores parameter %s" % key ) + continue + #if self.runner_params.get( key, None ): + # # Let plugin define defaults for some parameters - + # # for instance that way jobs_directory can be + # # configured next to AMQP url (where it belongs). + # params[ key ] = self.runner_params[ key ] + # continue + + if not value: + continue + + if value is PARAMETER_SPECIFICATION_REQUIRED: + raise Exception( "Pulsar destination does not define required parameter %s" % key ) + elif value is not PARAMETER_SPECIFICATION_IGNORED: + params[ key ] = value + updated = True + return updated + + def get_output_files(self, job_wrapper): + output_paths = job_wrapper.get_output_fnames() + return [ str( o ) for o in output_paths ] # Force job_path from DatasetPath objects. + + def get_input_files(self, job_wrapper): + input_paths = job_wrapper.get_input_paths() + return [ str( i ) for i in input_paths ] # Force job_path from DatasetPath objects. + + def get_client_from_wrapper(self, job_wrapper): + job_id = job_wrapper.job_id + if hasattr(job_wrapper, 'task_id'): + job_id = "%s_%s" % (job_id, job_wrapper.task_id) + params = job_wrapper.job_destination.params.copy() + for key, value in params.iteritems(): + if value: + params[key] = model.User.expand_user_properties( job_wrapper.get_job().user, value ) + + env = getattr( job_wrapper.job_destination, "env", [] ) + return self.get_client( params, job_id, env ) + + def get_client_from_state(self, job_state): + job_destination_params = job_state.job_destination.params + job_id = job_state.job_id + return self.get_client( job_destination_params, job_id ) + + def get_client( self, job_destination_params, job_id, env=[] ): + # Cannot use url_for outside of web thread. + #files_endpoint = url_for( controller="job_files", job_id=encoded_job_id ) + + encoded_job_id = self.app.security.encode_id(job_id) + job_key = self.app.security.encode_id( job_id, kind="jobs_files" ) + files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % ( + self.galaxy_url, + encoded_job_id, + job_key + ) + get_client_kwds = dict( + job_id=str( job_id ), + files_endpoint=files_endpoint, + env=env + ) + return self.client_manager.get_client( job_destination_params, **get_client_kwds ) + + def finish_job( self, job_state ): + stderr = stdout = '' + job_wrapper = job_state.job_wrapper + try: + client = self.get_client_from_state(job_state) + run_results = client.full_status() + remote_working_directory = run_results.get("working_directory", None) + stdout = run_results.get('stdout', '') + stderr = run_results.get('stderr', '') + exit_code = run_results.get('returncode', None) + pulsar_outputs = PulsarOutputs.from_status_response(run_results) + # Use Pulsar client code to transfer/copy files back + # and cleanup job if needed. + completed_normally = \ + job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] + cleanup_job = self.app.config.cleanup_job + client_outputs = self.__client_outputs(client, job_wrapper) + finish_args = dict( client=client, + job_completed_normally=completed_normally, + cleanup_job=cleanup_job, + client_outputs=client_outputs, + pulsar_outputs=pulsar_outputs ) + failed = pulsar_finish_job( **finish_args ) + if failed: + job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) + except Exception: + message = GENERIC_REMOTE_ERROR + job_wrapper.fail( message, exception=True ) + log.exception("failure finishing job %d" % job_wrapper.job_id) + return + if not PulsarJobRunner.__remote_metadata( client ): + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + # Finish the job + try: + job_wrapper.finish( + stdout, + stderr, + exit_code, + remote_working_directory=remote_working_directory + ) + except Exception: + log.exception("Job wrapper finish method failed") + job_wrapper.fail("Unable to finish job", exception=True) + + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( getattr( job_state, "fail_message", GENERIC_REMOTE_ERROR ) ) + + def check_pid( self, pid ): + try: + os.kill( pid, 0 ) + return True + except OSError, e: + if e.errno == errno.ESRCH: + log.debug( "check_pid(): PID %d is dead" % pid ) + else: + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) + return False + + def stop_job( self, job ): + #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished + job_ext_output_metadata = job.get_external_output_metadata() + if job_ext_output_metadata: + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them + if pid in [ None, '' ]: + log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) + return + pid = int( pid ) + if not self.check_pid( pid ): + log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) + return + for sig in [ 15, 9 ]: + try: + os.killpg( pid, sig ) + except OSError, e: + log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) + return # give up + sleep( 2 ) + if not self.check_pid( pid ): + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.id, pid, sig ) ) + return + else: + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.id, pid ) ) + else: + # Remote kill + pulsar_url = job.job_runner_name + job_id = job.job_runner_external_id + log.debug("Attempt remote Pulsar kill of job with url %s and id %s" % (pulsar_url, job_id)) + client = self.get_client(job.destination_params, job_id) + client.kill() + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_state = self._job_state( job, job_wrapper ) + job_wrapper.command_line = job.get_command_line() + state = job.get_state() + if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: + log.debug( "(Pulsar/%s) is still in running state, adding to the Pulsar queue" % ( job.get_id()) ) + job_state.old_state = True + job_state.running = state == model.Job.states.RUNNING + self.monitor_queue.put( job_state ) + + def shutdown( self ): + super( PulsarJobRunner, self ).shutdown() + self.client_manager.shutdown() + + def _job_state( self, job, job_wrapper ): + job_state = AsynchronousJobState() + # TODO: Determine why this is set when using normal message queue updates + # but not CLI submitted MQ updates... + raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id + job_state.job_id = str( raw_job_id ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_state.job_destination = job_wrapper.job_destination + job_state.job_wrapper = job_wrapper + return job_state + + def __client_outputs( self, client, job_wrapper ): + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + output_files = self.get_output_files( job_wrapper ) + client_outputs = ClientOutputs( + working_directory=job_wrapper.working_directory, + work_dir_outputs=work_dir_outputs, + output_files=output_files, + version_file=job_wrapper.get_version_string_path(), + ) + return client_outputs + + @staticmethod + def __dependencies_description( pulsar_client, job_wrapper ): + dependency_resolution = PulsarJobRunner.__dependency_resolution( pulsar_client ) + remote_dependency_resolution = dependency_resolution == "remote" + if not remote_dependency_resolution: + return None + requirements = job_wrapper.tool.requirements or [] + installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies or [] + return dependencies.DependenciesDescription( + requirements=requirements, + installed_tool_dependencies=installed_tool_dependencies, + ) + + @staticmethod + def __dependency_resolution( pulsar_client ): + dependency_resolution = pulsar_client.destination_params.get( "dependency_resolution", "local" ) + if dependency_resolution not in ["none", "local", "remote"]: + raise Exception("Unknown dependency_resolution value encountered %s" % dependency_resolution) + return dependency_resolution + + @staticmethod + def __remote_metadata( pulsar_client ): + remote_metadata = string_as_bool_or_none( pulsar_client.destination_params.get( "remote_metadata", False ) ) + return remote_metadata + + @staticmethod + def __use_remote_datatypes_conf( pulsar_client ): + """ When setting remote metadata, use integrated datatypes from this + Galaxy instance or use the datatypes config configured via the remote + Pulsar. + + Both options are broken in different ways for same reason - datatypes + may not match. One can push the local datatypes config to the remote + server - but there is no guarentee these datatypes will be defined + there. Alternatively, one can use the remote datatype config - but + there is no guarentee that it will contain all the datatypes available + to this Galaxy. + """ + use_remote_datatypes = string_as_bool_or_none( pulsar_client.destination_params.get( "use_remote_datatypes", False ) ) + return use_remote_datatypes + + @staticmethod + def __rewrite_parameters( pulsar_client ): + return string_as_bool_or_none( pulsar_client.destination_params.get( "rewrite_parameters", False ) ) or False + + def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config): + metadata_kwds = {} + if remote_metadata: + remote_system_properties = remote_job_config.get("system_properties", {}) + remote_galaxy_home = remote_system_properties.get("galaxy_home", None) + if not remote_galaxy_home: + raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE) + metadata_kwds['exec_dir'] = remote_galaxy_home + outputs_directory = remote_job_config['outputs_directory'] + configs_directory = remote_job_config['configs_directory'] + working_directory = remote_job_config['working_directory'] + # For metadata calculation, we need to build a list of of output + # file objects with real path indicating location on Galaxy server + # and false path indicating location on compute server. Since the + # Pulsar disables from_work_dir copying as part of the job command + # line we need to take the list of output locations on the Pulsar + # server (produced by self.get_output_files(job_wrapper)) and for + # each work_dir output substitute the effective path on the Pulsar + # server relative to the remote working directory as the + # false_path to send the metadata command generation module. + work_dir_outputs = self.get_work_dir_outputs(job_wrapper, job_working_directory=working_directory) + outputs = [Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)] + for output in outputs: + for pulsar_workdir_path, real_path in work_dir_outputs: + if real_path == output.real_path: + output.false_path = pulsar_workdir_path + metadata_kwds['output_fnames'] = outputs + metadata_kwds['compute_tmp_dir'] = working_directory + metadata_kwds['config_root'] = remote_galaxy_home + default_config_file = os.path.join(remote_galaxy_home, 'universe_wsgi.ini') + metadata_kwds['config_file'] = remote_system_properties.get('galaxy_config_file', default_config_file) + metadata_kwds['dataset_files_path'] = remote_system_properties.get('galaxy_dataset_files_path', None) + if PulsarJobRunner.__use_remote_datatypes_conf( client ): + remote_datatypes_config = remote_system_properties.get('galaxy_datatypes_config_file', None) + if not remote_datatypes_config: + log.warn(NO_REMOTE_DATATYPES_CONFIG) + remote_datatypes_config = os.path.join(remote_galaxy_home, 'datatypes_conf.xml') + metadata_kwds['datatypes_config'] = remote_datatypes_config + else: + integrates_datatypes_config = self.app.datatypes_registry.integrated_datatypes_configs + # Ensure this file gets pushed out to the remote config dir. + job_wrapper.extra_filenames.append(integrates_datatypes_config) + + metadata_kwds['datatypes_config'] = os.path.join(configs_directory, os.path.basename(integrates_datatypes_config)) + return metadata_kwds + + +class PulsarLegacyJobRunner( PulsarJobRunner ): + destination_defaults = dict( + rewrite_parameters="false", + dependency_resolution="local", + ) + + +class PulsarMQJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="remote_transfer", + rewrite_parameters="true", + dependency_resolution="remote", + jobs_directory=PARAMETER_SPECIFICATION_REQUIRED, + url=PARAMETER_SPECIFICATION_IGNORED, + private_token=PARAMETER_SPECIFICATION_IGNORED + ) + + def _monitor( self ): + # This is a message queue driven runner, don't monitor + # just setup required callback. + self.client_manager.ensure_has_status_update_callback(self.__async_update) + + def __async_update( self, full_status ): + job_id = None + try: + job_id = full_status[ "job_id" ] + job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id( job_id ) + job_state = self._job_state( job, job_wrapper ) + self._update_job_state_for_status(job_state, full_status[ "status" ] ) + except Exception: + log.exception( "Failed to update Pulsar job status for job_id %s" % job_id ) + raise + # Nothing else to do? - Attempt to fail the job? + + +class PulsarRESTJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="transfer", + rewrite_parameters="true", + dependency_resolution="remote", + url=PARAMETER_SPECIFICATION_REQUIRED, + ) + + +class PulsarComputeEnvironment( ComputeEnvironment ): + + def __init__( self, pulsar_client, job_wrapper, remote_job_config ): + self.pulsar_client = pulsar_client + self.job_wrapper = job_wrapper + self.local_path_config = job_wrapper.default_compute_environment() + self.unstructured_path_rewrites = {} + # job_wrapper.prepare is going to expunge the job backing the following + # computations, so precalculate these paths. + self._wrapper_input_paths = self.local_path_config.input_paths() + self._wrapper_output_paths = self.local_path_config.output_paths() + self.path_mapper = PathMapper(pulsar_client, remote_job_config, self.local_path_config.working_directory()) + self._config_directory = remote_job_config[ "configs_directory" ] + self._working_directory = remote_job_config[ "working_directory" ] + self._sep = remote_job_config[ "system_properties" ][ "separator" ] + self._tool_dir = remote_job_config[ "tools_directory" ] + version_path = self.local_path_config.version_path() + new_version_path = self.path_mapper.remote_version_path_rewrite(version_path) + if new_version_path: + version_path = new_version_path + self._version_path = version_path + + def output_paths( self ): + local_output_paths = self._wrapper_output_paths + + results = [] + for local_output_path in local_output_paths: + wrapper_path = str( local_output_path ) + remote_path = self.path_mapper.remote_output_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_output_path, remote_path ) ) + return results + + def input_paths( self ): + local_input_paths = self._wrapper_input_paths + + results = [] + for local_input_path in local_input_paths: + wrapper_path = str( local_input_path ) + # This will over-copy in some cases. For instance in the case of task + # splitting, this input will be copied even though only the work dir + # input will actually be used. + remote_path = self.path_mapper.remote_input_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_input_path, remote_path ) ) + return results + + def _dataset_path( self, local_dataset_path, remote_path ): + remote_extra_files_path = None + if remote_path: + remote_extra_files_path = "%s_files" % remote_path[ 0:-len( ".dat" ) ] + return local_dataset_path.with_path_for_job( remote_path, remote_extra_files_path ) + + def working_directory( self ): + return self._working_directory + + def config_directory( self ): + return self._config_directory + + def new_file_path( self ): + return self.working_directory() # Problems with doing this? + + def sep( self ): + return self._sep + + def version_path( self ): + return self._version_path + + def rewriter( self, parameter_value ): + unstructured_path_rewrites = self.unstructured_path_rewrites + if parameter_value in unstructured_path_rewrites: + # Path previously mapped, use previous mapping. + return unstructured_path_rewrites[ parameter_value ] + if parameter_value in unstructured_path_rewrites.itervalues(): + # Path is a rewritten remote path (this might never occur, + # consider dropping check...) + return parameter_value + + rewrite, new_unstructured_path_rewrites = self.path_mapper.check_for_arbitrary_rewrite( parameter_value ) + if rewrite: + unstructured_path_rewrites.update(new_unstructured_path_rewrites) + return rewrite + else: + # Did need to rewrite, use original path or value. + return parameter_value + + def unstructured_path_rewriter( self ): + return self.rewriter diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/jobs/runners/util/__init__.py --- a/lib/galaxy/jobs/runners/util/__init__.py +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -1,7 +1,7 @@ """ This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain -functionality shared between Galaxy and the LWR. +functionality shared between Galaxy and the Pulsar. """ from galaxy.util.bunch import Bunch diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/jobs/runners/util/cli/factory.py --- a/lib/galaxy/jobs/runners/util/cli/factory.py +++ b/lib/galaxy/jobs/runners/util/cli/factory.py @@ -5,7 +5,7 @@ ) code_dir = 'lib' except ImportError: - from lwr.managers.util.cli import ( + from pulsar.managers.util.cli import ( CliInterface, split_params ) diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/jobs/runners/util/cli/job/slurm.py --- a/lib/galaxy/jobs/runners/util/cli/job/slurm.py +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm.py @@ -5,7 +5,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/jobs/runners/util/cli/job/torque.py --- a/lib/galaxy/jobs/runners/util/cli/job/torque.py +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -7,7 +7,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -623,9 +623,9 @@ elif store == 'irods': from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) - elif store == 'lwr': - from .lwr import LwrObjectStore - return LwrObjectStore(config=config, config_xml=config_xml) + elif store == 'pulsar': + from .pulsar import PulsarObjectStore + return PulsarObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/objectstore/lwr.py --- a/lib/galaxy/objectstore/lwr.py +++ /dev/null @@ -1,76 +0,0 @@ -from __future__ import absolute_import # Need to import lwr_client absolutely. -from ..objectstore import ObjectStore -try: - from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager -except ImportError: - from lwr.lwr_client.manager import ObjectStoreClientManager - - -class LwrObjectStore(ObjectStore): - """ - Object store implementation that delegates to a remote LWR server. - - This may be more aspirational than practical for now, it would be good to - Galaxy to a point that a handler thread could be setup that doesn't attempt - to access the disk files returned by a (this) object store - just passing - them along to the LWR unmodified. That modification - along with this - implementation and LWR job destinations would then allow Galaxy to fully - manage jobs on remote servers with completely different mount points. - - This implementation should be considered beta and may be dropped from - Galaxy at some future point or significantly modified. - """ - - def __init__(self, config, config_xml): - self.lwr_client = self.__build_lwr_client(config_xml) - - def exists(self, obj, **kwds): - return self.lwr_client.exists(**self.__build_kwds(obj, **kwds)) - - def file_ready(self, obj, **kwds): - return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds)) - - def create(self, obj, **kwds): - return self.lwr_client.create(**self.__build_kwds(obj, **kwds)) - - def empty(self, obj, **kwds): - return self.lwr_client.empty(**self.__build_kwds(obj, **kwds)) - - def size(self, obj, **kwds): - return self.lwr_client.size(**self.__build_kwds(obj, **kwds)) - - def delete(self, obj, **kwds): - return self.lwr_client.delete(**self.__build_kwds(obj, **kwds)) - - # TODO: Optimize get_data. - def get_data(self, obj, **kwds): - return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds)) - - def get_filename(self, obj, **kwds): - return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds)) - - def update_from_file(self, obj, **kwds): - return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds)) - - def get_store_usage_percent(self): - return self.lwr_client.get_store_usage_percent() - - def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): - return None - - def __build_kwds(self, obj, **kwds): - kwds['object_id'] = obj.id - return kwds - pass - - def __build_lwr_client(self, config_xml): - url = config_xml.get("url") - private_token = config_xml.get("private_token", None) - transport = config_xml.get("transport", None) - manager_options = dict(transport=transport) - client_options = dict(url=url, private_token=private_token) - lwr_client = ObjectStoreClientManager(**manager_options).get_client(client_options) - return lwr_client - - def shutdown(self): - pass diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/objectstore/pulsar.py --- /dev/null +++ b/lib/galaxy/objectstore/pulsar.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. +from ..objectstore import ObjectStore +from pulsar.client.manager import ObjectStoreClientManager + + +class PulsarObjectStore(ObjectStore): + """ + Object store implementation that delegates to a remote Pulsar server. + + This may be more aspirational than practical for now, it would be good to + Galaxy to a point that a handler thread could be setup that doesn't attempt + to access the disk files returned by a (this) object store - just passing + them along to the Pulsar unmodified. That modification - along with this + implementation and Pulsar job destinations would then allow Galaxy to fully + manage jobs on remote servers with completely different mount points. + + This implementation should be considered beta and may be dropped from + Galaxy at some future point or significantly modified. + """ + + def __init__(self, config, config_xml): + self.pulsar_client = self.__build_pulsar_client(config_xml) + + def exists(self, obj, **kwds): + return self.pulsar_client.exists(**self.__build_kwds(obj, **kwds)) + + def file_ready(self, obj, **kwds): + return self.pulsar_client.file_ready(**self.__build_kwds(obj, **kwds)) + + def create(self, obj, **kwds): + return self.pulsar_client.create(**self.__build_kwds(obj, **kwds)) + + def empty(self, obj, **kwds): + return self.pulsar_client.empty(**self.__build_kwds(obj, **kwds)) + + def size(self, obj, **kwds): + return self.pulsar_client.size(**self.__build_kwds(obj, **kwds)) + + def delete(self, obj, **kwds): + return self.pulsar_client.delete(**self.__build_kwds(obj, **kwds)) + + # TODO: Optimize get_data. + def get_data(self, obj, **kwds): + return self.pulsar_client.get_data(**self.__build_kwds(obj, **kwds)) + + def get_filename(self, obj, **kwds): + return self.pulsar_client.get_filename(**self.__build_kwds(obj, **kwds)) + + def update_from_file(self, obj, **kwds): + return self.pulsar_client.update_from_file(**self.__build_kwds(obj, **kwds)) + + def get_store_usage_percent(self): + return self.pulsar_client.get_store_usage_percent() + + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): + return None + + def __build_kwds(self, obj, **kwds): + kwds['object_id'] = obj.id + return kwds + pass + + def __build_pulsar_client(self, config_xml): + url = config_xml.get("url") + private_token = config_xml.get("private_token", None) + transport = config_xml.get("transport", None) + manager_options = dict(transport=transport) + client_options = dict(url=url, private_token=private_token) + pulsar_client = ObjectStoreClientManager(**manager_options).get_client(client_options) + return pulsar_client + + def shutdown(self): + pass diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/galaxy/tools/deps/dependencies.py --- a/lib/galaxy/tools/deps/dependencies.py +++ b/lib/galaxy/tools/deps/dependencies.py @@ -8,7 +8,7 @@ related context required to resolve dependencies via the ToolShedPackageDependencyResolver. - This is meant to enable remote resolution of dependencies, by the LWR or + This is meant to enable remote resolution of dependencies, by the Pulsar or other potential remote execution mechanisms. """ @@ -39,7 +39,7 @@ @staticmethod def _toolshed_install_dependency_from_dict(as_dict): - # Rather than requiring full models in LWR, just use simple objects + # Rather than requiring full models in Pulsar, just use simple objects # containing only properties and associations used to resolve # dependencies for tool execution. repository_object = bunch.Bunch( diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/pulsar/client/__init__.py --- /dev/null +++ b/lib/pulsar/client/__init__.py @@ -0,0 +1,62 @@ +""" +pulsar client +====== + +This module contains logic for interfacing with an external Pulsar server. + +------------------ +Configuring Galaxy +------------------ + +Galaxy job runners are configured in Galaxy's ``job_conf.xml`` file. See ``job_conf.xml.sample_advanced`` +in your Galaxy code base or on +`Bitbucket <https://bitbucket.org/galaxy/galaxy-dist/src/tip/job_conf.xml.sample_advanced?at=default>`_ +for information on how to configure Galaxy to interact with the Pulsar. + +Galaxy also supports an older, less rich configuration of job runners directly +in its main ``universe_wsgi.ini`` file. The following section describes how to +configure Galaxy to communicate with the Pulsar in this legacy mode. + +Legacy +------ + +A Galaxy tool can be configured to be executed remotely via Pulsar by +adding a line to the ``universe_wsgi.ini`` file under the +``galaxy:tool_runners`` section with the format:: + + <tool_id> = pulsar://http://<pulsar_host>:<pulsar_port> + +As an example, if a host named remotehost is running the Pulsar server +application on port ``8913``, then the tool with id ``test_tool`` can +be configured to run remotely on remotehost by adding the following +line to ``universe.ini``:: + + test_tool = pulsar://http://remotehost:8913 + +Remember this must be added after the ``[galaxy:tool_runners]`` header +in the ``universe.ini`` file. + + +""" + +from .staging.down import finish_job +from .staging.up import submit_job +from .staging import ClientJobDescription +from .staging import PulsarOutputs +from .staging import ClientOutputs +from .client import OutputNotFoundException +from .manager import build_client_manager +from .destination import url_to_destination_params +from .path_mapper import PathMapper + +__all__ = [ + build_client_manager, + OutputNotFoundException, + url_to_destination_params, + finish_job, + submit_job, + ClientJobDescription, + PulsarOutputs, + ClientOutputs, + PathMapper, +] diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/pulsar/client/action_mapper.py --- /dev/null +++ b/lib/pulsar/client/action_mapper.py @@ -0,0 +1,567 @@ +from json import load +from os import makedirs +from os.path import exists +from os.path import abspath +from os.path import dirname +from os.path import join +from os.path import basename +from os.path import sep +import fnmatch +from re import compile +from re import escape +import galaxy.util +from galaxy.util.bunch import Bunch +from .config_util import read_file +from .util import directory_files +from .util import unique_path_prefix +from .transport import get_file +from .transport import post_file + + +DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? +DEFAULT_PATH_MAPPER_TYPE = 'prefix' + +STAGING_ACTION_REMOTE = "remote" +STAGING_ACTION_LOCAL = "local" +STAGING_ACTION_NONE = None +STAGING_ACTION_DEFAULT = "default" + +# Poor man's enum. +path_type = Bunch( + # Galaxy input datasets and extra files. + INPUT="input", + # Galaxy config and param files. + CONFIG="config", + # Files from tool's tool_dir (for now just wrapper if available). + TOOL="tool", + # Input work dir files - e.g. metadata files, task-split input files, etc.. + WORKDIR="workdir", + # Galaxy output datasets in their final home. + OUTPUT="output", + # Galaxy from_work_dir output paths and other files (e.g. galaxy.json) + OUTPUT_WORKDIR="output_workdir", + # Other fixed tool parameter paths (likely coming from tool data, but not + # nessecarily). Not sure this is the best name... + UNSTRUCTURED="unstructured", +) + + +ACTION_DEFAULT_PATH_TYPES = [ + path_type.INPUT, + path_type.CONFIG, + path_type.TOOL, + path_type.WORKDIR, + path_type.OUTPUT, + path_type.OUTPUT_WORKDIR, +] +ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED] + + +class FileActionMapper(object): + """ + Objects of this class define how paths are mapped to actions. + + >>> json_string = r'''{"paths": [ \ + {"path": "/opt/galaxy", "action": "none"}, \ + {"path": "/galaxy/data", "action": "transfer"}, \ + {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, \ + {"path": ".*/dataset_\\\\d+.dat", "action": "copy", "match_type": "regex"} \ + ]}''' + >>> from tempfile import NamedTemporaryFile + >>> from os import unlink + >>> def mapper_for(default_action, config_contents): + ... f = NamedTemporaryFile(delete=False) + ... f.write(config_contents.encode('UTF-8')) + ... f.close() + ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) + ... mapper = FileActionMapper(mock_client) + ... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and deserialize it to make sure still works + ... unlink(f.name) + ... return mapper + >>> mapper = mapper_for(default_action='none', config_contents=json_string) + >>> # Test first config line above, implicit path prefix mapper + >>> action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + >>> action.action_type == u'none' + True + >>> action.staging_needed + False + >>> # Test another (2nd) mapper, this one with a different action + >>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input') + >>> action.action_type == u'transfer' + True + >>> action.staging_needed + True + >>> # Always at least copy work_dir outputs. + >>> action = mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir') + >>> action.action_type == u'copy' + True + >>> action.staging_needed + True + >>> # Test glob mapper (matching test) + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input').action_type == u'copy' + True + >>> # Test glob mapper (non-matching test) + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input').action_type == u'none' + True + >>> # Regex mapper test. + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input').action_type == u'copy' + True + >>> # Doesn't map unstructured paths by default + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'none' + True + >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "input"} \ + ] }''') + >>> input_only_mapper.action('/dataset_1.dat', 'input').action_type == u'transfer' + True + >>> input_only_mapper.action('/dataset_1.dat', 'output').action_type == u'none' + True + >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "*any*"} \ + ] }''') + >>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'transfer' + True + """ + + def __init__(self, client=None, config=None): + if config is None and client is None: + message = "FileActionMapper must be constructed from either a client or a config dictionary." + raise Exception(message) + if config is None: + config = self.__client_to_config(client) + self.default_action = config.get("default_action", "transfer") + self.mappers = mappers_from_dicts(config.get("paths", [])) + self.files_endpoint = config.get("files_endpoint", None) + + def action(self, path, type, mapper=None): + mapper = self.__find_mapper(path, type, mapper) + action_class = self.__action_class(path, type, mapper) + file_lister = DEFAULT_FILE_LISTER + action_kwds = {} + if mapper: + file_lister = mapper.file_lister + action_kwds = mapper.action_kwds + action = action_class(path, file_lister=file_lister, **action_kwds) + self.__process_action(action, type) + return action + + def unstructured_mappers(self): + """ Return mappers that will map 'unstructured' files (i.e. go beyond + mapping inputs, outputs, and config files). + """ + return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers) + + def to_dict(self): + return dict( + default_action=self.default_action, + files_endpoint=self.files_endpoint, + paths=map(lambda m: m.to_dict(), self.mappers) + ) + + def __client_to_config(self, client): + action_config_path = client.action_config_path + if action_config_path: + config = read_file(action_config_path) + else: + config = dict() + config["default_action"] = client.default_file_action + config["files_endpoint"] = client.files_endpoint + return config + + def __load_action_config(self, path): + config = load(open(path, 'rb')) + self.mappers = mappers_from_dicts(config.get('paths', [])) + + def __find_mapper(self, path, type, mapper=None): + if not mapper: + normalized_path = abspath(path) + for query_mapper in self.mappers: + if query_mapper.matches(normalized_path, type): + mapper = query_mapper + break + return mapper + + def __action_class(self, path, type, mapper): + action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" + if mapper: + action_type = mapper.action_type + if type in ["workdir", "output_workdir"] and action_type == "none": + # We are changing the working_directory relative to what + # Galaxy would use, these need to be copied over. + action_type = "copy" + action_class = actions.get(action_type, None) + if action_class is None: + message_template = "Unknown action_type encountered %s while trying to map path %s" + message_args = (action_type, path) + raise Exception(message_template % message_args) + return action_class + + def __process_action(self, action, file_type): + """ Extension point to populate extra action information after an + action has been created. + """ + if action.action_type == "remote_transfer": + url_base = self.files_endpoint + if not url_base: + raise Exception("Attempted to use remote_transfer action with defining a files_endpoint") + if "?" not in url_base: + url_base = "%s?" % url_base + # TODO: URL encode path. + url = "%s&path=%s&file_type=%s" % (url_base, action.path, file_type) + action.url = url + +REQUIRED_ACTION_KWD = object() + + +class BaseAction(object): + action_spec = {} + + def __init__(self, path, file_lister=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + + def unstructured_map(self, path_helper): + unstructured_map = self.file_lister.unstructured_map(self.path) + if self.staging_needed: + # To ensure uniqueness, prepend unique prefix to each name + prefix = unique_path_prefix(self.path) + for path, name in unstructured_map.iteritems(): + unstructured_map[path] = join(prefix, name) + else: + path_rewrites = {} + for path in unstructured_map: + rewrite = self.path_rewrite(path_helper, path) + if rewrite: + path_rewrites[path] = rewrite + unstructured_map = path_rewrites + return unstructured_map + + @property + def staging_needed(self): + return self.staging != STAGING_ACTION_NONE + + @property + def staging_action_local(self): + return self.staging == STAGING_ACTION_LOCAL + + +class NoneAction(BaseAction): + """ This action indicates the corresponding path does not require any + additional action. This should indicate paths that are available both on + the Pulsar client (i.e. Galaxy server) and remote Pulsar server with the same + paths. """ + action_type = "none" + staging = STAGING_ACTION_NONE + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return NoneAction(path=action_dict["path"]) + + def path_rewrite(self, path_helper, path=None): + return None + + +class RewriteAction(BaseAction): + """ This actin indicates the Pulsar server should simply rewrite the path + to the specified file. + """ + action_spec = dict( + source_directory=REQUIRED_ACTION_KWD, + destination_directory=REQUIRED_ACTION_KWD + ) + action_type = "rewrite" + staging = STAGING_ACTION_NONE + + def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.source_directory = source_directory + self.destination_directory = destination_directory + + def to_dict(self): + return dict( + path=self.path, + action_type=self.action_type, + source_directory=self.source_directory, + destination_directory=self.destination_directory, + ) + + @classmethod + def from_dict(cls, action_dict): + return RewriteAction( + path=action_dict["path"], + source_directory=action_dict["source_directory"], + destination_directory=action_dict["destination_directory"], + ) + + def path_rewrite(self, path_helper, path=None): + if not path: + path = self.path + new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory) + return None if new_path == self.path else new_path + + +class TransferAction(BaseAction): + """ This actions indicates that the Pulsar client should initiate an HTTP + transfer of the corresponding path to the remote Pulsar server before + launching the job. """ + action_type = "transfer" + staging = STAGING_ACTION_LOCAL + + +class CopyAction(BaseAction): + """ This action indicates that the Pulsar client should execute a file system + copy of the corresponding path to the Pulsar staging directory prior to + launching the corresponding job. """ + action_type = "copy" + staging = STAGING_ACTION_LOCAL + + +class RemoteCopyAction(BaseAction): + """ This action indicates the Pulsar server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the Pulsar server instead of on + the client. + """ + action_type = "remote_copy" + staging = STAGING_ACTION_REMOTE + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return RemoteCopyAction(path=action_dict["path"]) + + def write_to_path(self, path): + galaxy.util.copy_to_path(open(self.path, "rb"), path) + + def write_from_path(self, pulsar_path): + destination = self.path + parent_directory = dirname(destination) + if not exists(parent_directory): + makedirs(parent_directory) + with open(pulsar_path, "rb") as f: + galaxy.util.copy_to_path(f, destination) + + +class RemoteTransferAction(BaseAction): + """ This action indicates the Pulsar server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the Pulsar server instead of on + the client. + """ + action_type = "remote_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, path, file_lister=None, url=None): + super(RemoteTransferAction, self).__init__(path, file_lister=file_lister) + self.url = url + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type, url=self.url) + + @classmethod + def from_dict(cls, action_dict): + return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"]) + + def write_to_path(self, path): + get_file(self.url, path) + + def write_from_path(self, pulsar_path): + post_file(self.url, pulsar_path) + + +class MessageAction(object): + """ Sort of pseudo action describing "files" store in memory and + transferred via message (HTTP, Python-call, MQ, etc...) + """ + action_type = "message" + staging = STAGING_ACTION_DEFAULT + + def __init__(self, contents, client=None): + self.contents = contents + self.client = client + + @property + def staging_needed(self): + return True + + @property + def staging_action_local(self): + # Ekkk, cannot be called if created through from_dict. + # Shouldn't be a problem the way it is used - but is an + # object design problem. + return self.client.prefer_local_staging + + def to_dict(self): + return dict(contents=self.contents, action_type=MessageAction.action_type) + + @classmethod + def from_dict(cls, action_dict): + return MessageAction(contents=action_dict["contents"]) + + def write_to_path(self, path): + open(path, "w").write(self.contents) + + +DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction] + + +def from_dict(action_dict): + action_type = action_dict.get("action_type", None) + target_class = None + for action_class in DICTIFIABLE_ACTION_CLASSES: + if action_type == action_class.action_type: + target_class = action_class + if not target_class: + message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type + raise Exception(message) + return target_class.from_dict(action_dict) + + +class BasePathMapper(object): + + def __init__(self, config): + action_type = config.get('action', DEFAULT_MAPPED_ACTION) + action_class = actions.get(action_type, None) + action_kwds = action_class.action_spec.copy() + for key, value in action_kwds.items(): + if key in config: + action_kwds[key] = config[key] + elif value is REQUIRED_ACTION_KWD: + message_template = "action_type %s requires key word argument %s" + message = message_template % (action_type, key) + raise Exception(message) + self.action_type = action_type + self.action_kwds = action_kwds + path_types_str = config.get('path_types', "*defaults*") + path_types_str = path_types_str.replace("*defaults*", ",".join(ACTION_DEFAULT_PATH_TYPES)) + path_types_str = path_types_str.replace("*any*", ",".join(ALL_PATH_TYPES)) + self.path_types = path_types_str.split(",") + self.file_lister = FileLister(config) + + def matches(self, path, path_type): + path_type_matches = path_type in self.path_types + return path_type_matches and self._path_matches(path) + + def _extend_base_dict(self, **kwds): + base_dict = dict( + action=self.action_type, + path_types=",".join(self.path_types), + match_type=self.match_type + ) + base_dict.update(self.file_lister.to_dict()) + base_dict.update(self.action_kwds) + base_dict.update(**kwds) + return base_dict + + +class PrefixPathMapper(BasePathMapper): + match_type = 'prefix' + + def __init__(self, config): + super(PrefixPathMapper, self).__init__(config) + self.prefix_path = abspath(config['path']) + + def _path_matches(self, path): + return path.startswith(self.prefix_path) + + def to_pattern(self): + pattern_str = "(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep)) + return compile(pattern_str) + + def to_dict(self): + return self._extend_base_dict(path=self.prefix_path) + + +class GlobPathMapper(BasePathMapper): + match_type = 'glob' + + def __init__(self, config): + super(GlobPathMapper, self).__init__(config) + self.glob_path = config['path'] + + def _path_matches(self, path): + return fnmatch.fnmatch(path, self.glob_path) + + def to_pattern(self): + return compile(fnmatch.translate(self.glob_path)) + + def to_dict(self): + return self._extend_base_dict(path=self.glob_path) + + +class RegexPathMapper(BasePathMapper): + match_type = 'regex' + + def __init__(self, config): + super(RegexPathMapper, self).__init__(config) + self.pattern_raw = config['path'] + self.pattern = compile(self.pattern_raw) + + def _path_matches(self, path): + return self.pattern.match(path) is not None + + def to_pattern(self): + return self.pattern + + def to_dict(self): + return self._extend_base_dict(path=self.pattern_raw) + +MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper] +MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES)) + + +def mappers_from_dicts(mapper_def_list): + return map(lambda m: __mappper_from_dict(m), mapper_def_list) + + +def __mappper_from_dict(mapper_dict): + map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE) + return MAPPER_CLASS_DICT[map_type](mapper_dict) + + +class FileLister(object): + + def __init__(self, config): + self.depth = int(config.get("depth", "0")) + + def to_dict(self): + return dict( + depth=self.depth + ) + + def unstructured_map(self, path): + depth = self.depth + if self.depth == 0: + return {path: basename(path)} + else: + while depth > 0: + path = dirname(path) + depth -= 1 + return dict([(join(path, f), f) for f in directory_files(path)]) + +DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) + +ACTION_CLASSES = [ + NoneAction, + RewriteAction, + TransferAction, + CopyAction, + RemoteCopyAction, + RemoteTransferAction, +] +actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) + + +__all__ = [ + FileActionMapper, + path_type, + from_dict, + MessageAction, + RemoteTransferAction, # For testing +] diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/pulsar/client/amqp_exchange.py --- /dev/null +++ b/lib/pulsar/client/amqp_exchange.py @@ -0,0 +1,139 @@ +try: + import kombu + from kombu import pools +except ImportError: + kombu = None + +import socket +import logging +import threading +from time import sleep +log = logging.getLogger(__name__) + + +KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" + +DEFAULT_EXCHANGE_NAME = "pulsar" +DEFAULT_EXCHANGE_TYPE = "direct" +# Set timeout to periodically give up looking and check if polling should end. +DEFAULT_TIMEOUT = 0.2 +DEFAULT_HEARTBEAT = 580 + +DEFAULT_RECONNECT_CONSUMER_WAIT = 1 +DEFAULT_HEARTBEAT_WAIT = 1 + + +class PulsarExchange(object): + """ Utility for publishing and consuming structured Pulsar queues using kombu. + This is shared between the server and client - an exchange should be setup + for each manager (or in the case of the client, each manager one wished to + communicate with.) + + Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar + should target each AMQP endpoint or care should be taken that unique + manager names are used across Pulsar servers targetting same AMQP endpoint - + and in particular only one such Pulsar should define an default manager with + name _default_. + """ + + def __init__( + self, + url, + manager_name, + connect_ssl=None, + timeout=DEFAULT_TIMEOUT, + publish_kwds={}, + ): + """ + """ + if not kombu: + raise Exception(KOMBU_UNAVAILABLE) + self.__url = url + self.__manager_name = manager_name + self.__connect_ssl = connect_ssl + self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) + self.__timeout = timeout + # Be sure to log message publishing failures. + if publish_kwds.get("retry", False): + if "retry_policy" not in publish_kwds: + publish_kwds["retry_policy"] = {} + if "errback" not in publish_kwds["retry_policy"]: + publish_kwds["retry_policy"]["errback"] = self.__publish_errback + self.__publish_kwds = publish_kwds + + @property + def url(self): + return self.__url + + def consume(self, queue_name, callback, check=True, connection_kwargs={}): + queue = self.__queue(queue_name) + log.debug("Consuming queue '%s'", queue) + while check: + heartbeat_thread = None + try: + with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + heartbeat_thread = self.__start_heartbeat(queue_name, connection) + while check and connection.connected: + try: + connection.drain_events(timeout=self.__timeout) + except socket.timeout: + pass + except (IOError, socket.error), exc: + # In testing, errno is None + log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) + if heartbeat_thread: + heartbeat_thread.join() + sleep(DEFAULT_RECONNECT_CONSUMER_WAIT) + + def heartbeat(self, connection): + log.debug('AMQP heartbeat thread alive') + while connection.connected: + connection.heartbeat_check() + sleep(DEFAULT_HEARTBEAT_WAIT) + log.debug('AMQP heartbeat thread exiting') + + def publish(self, name, payload): + with self.connection(self.__url) as connection: + with pools.producers[connection].acquire() as producer: + key = self.__queue_name(name) + producer.publish( + payload, + serializer='json', + exchange=self.__exchange, + declare=[self.__exchange], + routing_key=key, + **self.__publish_kwds + ) + + def __publish_errback(self, exc, interval): + log.error("Connection error while publishing: %r", exc, exc_info=1) + log.info("Retrying in %s seconds", interval) + + def connection(self, connection_string, **kwargs): + if "ssl" not in kwargs: + kwargs["ssl"] = self.__connect_ssl + return kombu.Connection(connection_string, **kwargs) + + def __queue(self, name): + queue_name = self.__queue_name(name) + queue = kombu.Queue(queue_name, self.__exchange, routing_key=queue_name) + return queue + + def __queue_name(self, name): + key_prefix = self.__key_prefix() + queue_name = '%s_%s' % (key_prefix, name) + return queue_name + + def __key_prefix(self): + if self.__manager_name == "_default_": + key_prefix = "pulsar_" + else: + key_prefix = "pulsar_%s_" % self.__manager_name + return key_prefix + + def __start_heartbeat(self, queue_name, connection): + thread_name = "consume-heartbeat-%s" % (self.__queue_name(queue_name)) + thread = threading.Thread(name=thread_name, target=self.heartbeat, args=(connection,)) + thread.start() + return thread diff -r cfa24d46767a41afa9e83cbf703eef837b47f1e2 -r 8c569cffa9670f765492c8ddc11fb54d9a2ab2f1 lib/pulsar/client/amqp_exchange_factory.py --- /dev/null +++ b/lib/pulsar/client/amqp_exchange_factory.py @@ -0,0 +1,41 @@ +from .amqp_exchange import PulsarExchange +from .util import filter_destination_params + + +def get_exchange(url, manager_name, params): + connect_ssl = parse_amqp_connect_ssl_params(params) + exchange_kwds = dict( + manager_name=manager_name, + connect_ssl=connect_ssl, + publish_kwds=parse_amqp_publish_kwds(params) + ) + timeout = params.get('amqp_consumer_timeout', False) + if timeout is not False: + exchange_kwds['timeout'] = timeout + exchange = PulsarExchange(url, **exchange_kwds) + return exchange + + +def parse_amqp_connect_ssl_params(params): + ssl_params = filter_destination_params(params, "amqp_connect_ssl_") + if not ssl_params: + return + + ssl = __import__('ssl') + if 'cert_reqs' in ssl_params: + value = ssl_params['cert_reqs'] + ssl_params['cert_reqs'] = getattr(ssl, value.upper()) + return ssl_params + + +def parse_amqp_publish_kwds(params): + all_publish_params = filter_destination_params(params, "amqp_publish_") + retry_policy_params = {} + for key in all_publish_params.keys(): + if key.startswith("retry_"): + value = all_publish_params[key] + retry_policy_params[key[len("retry_"):]] = value + del all_publish_params[key] + if retry_policy_params: + all_publish_params["retry_policy"] = retry_policy_params + return all_publish_params This diff is so big that we needed to truncate the remainder. https://bitbucket.org/galaxy/galaxy-central/commits/566388d62374/ Changeset: 566388d62374 User: jmchilton Date: 2014-06-24 04:27:51 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #422) Implement Pulsar job runners. Affected #: 32 files diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -19,27 +19,30 @@ <!-- Override the $DRMAA_LIBRARY_PATH environment variable --><param id="drmaa_library_path">/sge/lib/libdrmaa.so</param></plugin> - <plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <!-- More information on LWR can be found at https://lwr.readthedocs.org --> - <!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --> + <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> + <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> + <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <!-- Pulsar runners (see more at https://pulsar.readthedocs.org) --> + <plugin id="pulsar_rest" type="runner" load="galaxy.jobs.runners.pulsar:PulsarRESTJobRunner"> + <!-- Allow optimized HTTP calls with libcurl (defaults to urllib) --><!-- <param id="transport">curl</param> --> - <!-- *Experimental Caching*: Uncomment next parameters to enable - caching and specify the number of caching threads to enable on Galaxy - side. Likely will not work with newer features such as MQ support. - If this is enabled be sure to specify a `file_cache_dir` in the remote - LWR's main configuration file. + + <!-- *Experimental Caching*: Next parameter enables caching. + Likely will not work with newer features such as MQ support. + + If this is enabled be sure to specify a `file_cache_dir` in + the remote Pulsar's servers main configuration file. --><!-- <param id="cache">True</param> --> - <!-- <param id="transfer_threads">2</param> --></plugin> - <plugin id="amqp_lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <param id="url">amqp://guest:guest@localhost:5672//</param> - <!-- If using message queue driven LWR - the LWR will generally - initiate file transfers so a the URL of this Galaxy instance - must be configured. --> + <plugin id="pulsar_mq" type="runner" load="galaxy.jobs.runners.pulsar:PulsarMQJobRunner"> + <!-- AMQP URL to connect to. --> + <param id="amqp_url">amqp://guest:guest@localhost:5672//</param> + <!-- URL remote Pulsar apps should transfer files to this Galaxy + instance to/from. --><param id="galaxy_url">http://localhost:8080</param> - <!-- If multiple managers configured on the LWR, specify which one - this plugin targets. --> + <!-- Pulsar job manager to communicate with (see Pulsar + docs for information on job managers). --><!-- <param id="manager">_default_</param> --><!-- The AMQP client can provide an SSL client certificate (e.g. for validation), the following options configure that certificate @@ -58,9 +61,17 @@ higher value (in seconds) (or `None` to use blocking connections). --><!-- <param id="amqp_consumer_timeout">None</param> --></plugin> - <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> - <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> - <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <plugin id="pulsar_legacy" type="runner" load="galaxy.jobs.runners.pulsar:PulsarLegacyJobRunner"> + <!-- Pulsar job runner with default parameters matching those + of old LWR job runner. If your Pulsar server is running on a + Windows machine for instance this runner should still be used. + + These destinations still needs to target a Pulsar server, + older LWR plugins and destinations still work in Galaxy can + target LWR servers, but this support should be considered + deprecated and will disappear with a future release of Galaxy. + --> + </plugin></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -125,8 +136,8 @@ $galaxy_root:ro,$tool_directory:ro,$working_directory:rw,$default_file_path:ro - If using the LWR, defaults will be even further restricted because the - LWR will (by default) stage all needed inputs into the job's job_directory + If using the Pulsar, defaults will be even further restricted because the + Pulsar will (by default) stage all needed inputs into the job's job_directory (so there is not need to allow the docker container to read all the files - let alone write over them). Defaults in this case becomes: @@ -135,7 +146,7 @@ Python string.Template is used to expand volumes and values $defaults, $galaxy_root, $default_file_path, $tool_directory, $working_directory, are available to all jobs and $job_directory is also available for - LWR jobs. + Pulsar jobs. --><!-- Control memory allocatable by docker container with following option: --> @@ -213,87 +224,71 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> - <destination id="secure_lwr" runner="lwr"> - <param id="url">https://windowshost.examle.com:8913/</param> - <!-- If set, private_token must match token remote LWR server configured with. --> + <destination id="secure_pulsar_rest_dest" runner="pulsar_rest"> + <param id="url">https://examle.com:8913/</param> + <!-- If set, private_token must match token in remote Pulsar's + configuration. --><param id="private_token">123456789changeme</param><!-- Uncomment the following statement to disable file staging (e.g. - if there is a shared file system between Galaxy and the LWR + if there is a shared file system between Galaxy and the Pulsar server). Alternatively action can be set to 'copy' - to replace http transfers with file system copies, 'remote_transfer' to cause - the lwr to initiate HTTP transfers instead of Galaxy, or - 'remote_copy' to cause lwr to initiate file system copies. + the Pulsar to initiate HTTP transfers instead of Galaxy, or + 'remote_copy' to cause Pulsar to initiate file system copies. If setting this to 'remote_transfer' be sure to specify a 'galaxy_url' attribute on the runner plugin above. --><!-- <param id="default_file_action">none</param> --><!-- The above option is just the default, the transfer behavior none|copy|http can be configured on a per path basis via the - following file. See lib/galaxy/jobs/runners/lwr_client/action_mapper.py - for examples of how to configure this file. This is very beta - and nature of file will likely change. + following file. See Pulsar documentation for more details and + examples. --> - <!-- <param id="file_action_config">file_actions.json</param> --> - <!-- Uncomment following option to disable Galaxy tool dependency - resolution and utilize remote LWR's configuraiton of tool - dependency resolution instead (same options as Galaxy for - dependency resolution are available in LWR). At a minimum - the remote LWR server should define a tool_dependencies_dir in - its `server.ini` configuration. The LWR will not attempt to - stage dependencies - so ensure the the required galaxy or tool - shed packages are available remotely (exact same tool shed - installed changesets are required). + <!-- <param id="file_action_config">file_actions.yaml</param> --> + <!-- The non-legacy Pulsar runners will attempt to resolve Galaxy + dependencies remotely - to enable this set a tool_dependency_dir + in Pulsar's configuration (can work with all the same dependency + resolutions mechanisms as Galaxy - tool Shed installs, Galaxy + packages, etc...). To disable this behavior, set the follow parameter + to none. To generate the dependency resolution command locally + set the following parameter local. --> - <!-- <param id="dependency_resolution">remote</params> --> - <!-- Traditionally, the LWR allow Galaxy to generate a command line - as if it were going to run the command locally and then the - LWR client rewrites it after the fact using regular - expressions. Setting the following value to true causes the - LWR runner to insert itself into the command line generation - process and generate the correct command line from the get go. - This will likely be the default someday - but requires a newer - LWR version and is less well tested. --> - <!-- <param id="rewrite_parameters">true</params> --> + <!-- <param id="dependency_resolution">none</params> --><!-- Uncomment following option to enable setting metadata on remote - LWR server. The 'use_remote_datatypes' option is available for + Pulsar server. The 'use_remote_datatypes' option is available for determining whether to use remotely configured datatypes or local ones (both alternatives are a little brittle). --><!-- <param id="remote_metadata">true</param> --><!-- <param id="use_remote_datatypes">false</param> --><!-- <param id="remote_property_galaxy_home">/path/to/remote/galaxy-central</param> --> - <!-- If remote LWR server is configured to run jobs as the real user, + <!-- If remote Pulsar server is configured to run jobs as the real user, uncomment the following line to pass the current Galaxy user along. --><!-- <param id="submit_user">$__user_name__</param> --> - <!-- Various other submission parameters can be passed along to the LWR - whose use will depend on the remote LWR's configured job manager. + <!-- Various other submission parameters can be passed along to the Pulsar + whose use will depend on the remote Pulsar's configured job manager. For instance: --> - <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- Disable parameter rewriting and rewrite generated commands + instead. This may be required if remote host is Windows machine + but probably not otherwise. + --> + <!-- <param id="rewrite_parameters">false</params> --></destination> - <destination id="amqp_lwr_dest" runner="amqp_lwr" > - <!-- url and private_token are not valid when using MQ driven LWR. The plugin above - determines which queue/manager to target and the underlying MQ server should be - used to configure security. - --> - <!-- Traditionally, the LWR client sends request to LWR - server to populate various system properties. This + <destination id="pulsar_mq_dest" runner="amqp_pulsar" > + <!-- The RESTful Pulsar client sends a request to Pulsar + to populate various system properties. This extra step can be disabled and these calculated here on client by uncommenting jobs_directory and specifying any additional remote_property_ of interest, this is not optional when using message queues. --> - <param id="jobs_directory">/path/to/remote/lwr/lwr_staging/</param> - <!-- Default the LWR send files to and pull files from Galaxy when - using message queues (in the more traditional mode Galaxy sends - files to and pull files from the LWR - this is obviously less - appropriate when using a message queue). - - The default_file_action currently requires pycurl be available - to Galaxy (presumably in its virtualenv). Making this dependency - optional is an open task. + <param id="jobs_directory">/path/to/remote/pulsar/files/staging/</param> + <!-- Otherwise MQ and Legacy pulsar destinations can be supplied + all the same destination parameters as the RESTful client documented + above (though url and private_token are ignored when using a MQ). --> - <param id="default_file_action">remote_transfer</param></destination><destination id="ssh_torque" runner="cli"><param id="shell_plugin">SecureShell</param> diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/pulsar.py --- /dev/null +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -0,0 +1,707 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. + +import logging + +from galaxy import model +from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from galaxy.jobs import ComputeEnvironment +from galaxy.jobs import JobDestination +from galaxy.jobs.command_factory import build_command +from galaxy.tools.deps import dependencies +from galaxy.util import string_as_bool_or_none +from galaxy.util.bunch import Bunch +from galaxy.util import specs + +import errno +from time import sleep +import os + +from pulsar.client import build_client_manager +from pulsar.client import url_to_destination_params +from pulsar.client import finish_job as pulsar_finish_job +from pulsar.client import submit_job as pulsar_submit_job +from pulsar.client import ClientJobDescription +from pulsar.client import PulsarOutputs +from pulsar.client import ClientOutputs +from pulsar.client import PathMapper + +log = logging.getLogger( __name__ ) + +__all__ = [ 'PulsarLegacyJobRunner', 'PulsarRESTJobRunner', 'PulsarMQJobRunner' ] + +NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory." +NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml." +GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server." + +# Is there a good way to infer some default for this? Can only use +# url_for from web threads. https://gist.github.com/jmchilton/9098762 +DEFAULT_GALAXY_URL = "http://localhost:8080" + +PULSAR_PARAM_SPECS = dict( + transport=dict( + map=specs.to_str_or_none, + valid=specs.is_in("urllib", "curl", None), + default=None + ), + cache=dict( + map=specs.to_bool_or_none, + default=None, + ), + amqp_url=dict( + map=specs.to_str_or_none, + default=None, + ), + galaxy_url=dict( + map=specs.to_str_or_none, + default=DEFAULT_GALAXY_URL, + ), + manager=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_consumer_timeout=dict( + map=lambda val: None if val == "None" else float(val), + default=None, + ), + amqp_connect_ssl_ca_certs=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_keyfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_certfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_cert_reqs=dict( + map=specs.to_str_or_none, + default=None, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Producer.p... + amqp_publish_retry=dict( + map=specs.to_bool, + default=False, + ), + amqp_publish_priority=dict( + map=int, + valid=lambda x: 0 <= x and x <= 9, + default=0, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Exchange.d... + amqp_publish_delivery_mode=dict( + map=str, + valid=specs.is_in("transient", "persistent"), + default="persistent", + ), + amqp_publish_retry_max_retries=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_start=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_step=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_max=dict( + map=int, + default=None, + ), +) + + +PARAMETER_SPECIFICATION_REQUIRED = object() +PARAMETER_SPECIFICATION_IGNORED = object() + + +class PulsarJobRunner( AsynchronousJobRunner ): + """ + Pulsar Job Runner + """ + runner_name = "PulsarJobRunner" + + def __init__( self, app, nworkers, **kwds ): + """Start the job runner """ + super( PulsarJobRunner, self ).__init__( app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds ) + self._init_worker_threads() + galaxy_url = self.runner_params.galaxy_url + if galaxy_url: + galaxy_url = galaxy_url.rstrip("/") + self.galaxy_url = galaxy_url + self.__init_client_manager() + self._monitor() + + def _monitor( self ): + # Extension point allow MQ variant to setup callback instead + self._init_monitor_thread() + + def __init_client_manager( self ): + client_manager_kwargs = {} + for kwd in 'manager', 'cache', 'transport': + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + for kwd in self.runner_params.keys(): + if kwd.startswith( 'amqp_' ): + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + self.client_manager = build_client_manager(**client_manager_kwargs) + + def url_to_destination( self, url ): + """Convert a legacy URL to a job destination""" + return JobDestination( runner="pulsar", params=url_to_destination_params( url ) ) + + def check_watched_item(self, job_state): + try: + client = self.get_client_from_state(job_state) + status = client.get_status() + except Exception: + # An orphaned job was put into the queue at app startup, so remote server went down + # either way we are done I guess. + self.mark_as_finished(job_state) + return None + job_state = self._update_job_state_for_status(job_state, status) + return job_state + + def _update_job_state_for_status(self, job_state, pulsar_status): + if pulsar_status == "complete": + self.mark_as_finished(job_state) + return None + if pulsar_status == "failed": + self.fail_job(job_state) + return None + if pulsar_status == "running" and not job_state.running: + job_state.running = True + job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + return job_state + + def queue_job(self, job_wrapper): + job_destination = job_wrapper.job_destination + self._populate_parameter_defaults( job_destination ) + + command_line, client, remote_job_config, compute_environment = self.__prepare_job( job_wrapper, job_destination ) + + if not command_line: + return + + try: + dependencies_description = PulsarJobRunner.__dependencies_description( client, job_wrapper ) + rewrite_paths = not PulsarJobRunner.__rewrite_parameters( client ) + unstructured_path_rewrites = {} + if compute_environment: + unstructured_path_rewrites = compute_environment.unstructured_path_rewrites + + client_job_description = ClientJobDescription( + command_line=command_line, + input_files=self.get_input_files(job_wrapper), + client_outputs=self.__client_outputs(client, job_wrapper), + working_directory=job_wrapper.working_directory, + tool=job_wrapper.tool, + config_files=job_wrapper.extra_filenames, + dependencies_description=dependencies_description, + env=client.env, + rewrite_paths=rewrite_paths, + arbitrary_files=unstructured_path_rewrites, + ) + job_id = pulsar_submit_job(client, client_job_description, remote_job_config) + log.info("Pulsar job submitted with job_id %s" % job_id) + job_wrapper.set_job_destination( job_destination, job_id ) + job_wrapper.change_state( model.Job.states.QUEUED ) + except Exception: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + + pulsar_job_state = AsynchronousJobState() + pulsar_job_state.job_wrapper = job_wrapper + pulsar_job_state.job_id = job_id + pulsar_job_state.old_state = True + pulsar_job_state.running = False + pulsar_job_state.job_destination = job_destination + self.monitor_job(pulsar_job_state) + + def __prepare_job(self, job_wrapper, job_destination): + """ Build command-line and Pulsar client for this job. """ + command_line = None + client = None + remote_job_config = None + compute_environment = None + try: + client = self.get_client_from_wrapper(job_wrapper) + tool = job_wrapper.tool + remote_job_config = client.setup(tool.id, tool.version) + rewrite_parameters = PulsarJobRunner.__rewrite_parameters( client ) + prepare_kwds = {} + if rewrite_parameters: + compute_environment = PulsarComputeEnvironment( client, job_wrapper, remote_job_config ) + prepare_kwds[ 'compute_environment' ] = compute_environment + job_wrapper.prepare( **prepare_kwds ) + self.__prepare_input_files_locally(job_wrapper) + remote_metadata = PulsarJobRunner.__remote_metadata( client ) + dependency_resolution = PulsarJobRunner.__dependency_resolution( client ) + metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) + remote_command_params = dict( + working_directory=remote_job_config['working_directory'], + metadata_kwds=metadata_kwds, + dependency_resolution=dependency_resolution, + ) + remote_working_directory = remote_job_config['working_directory'] + # TODO: Following defs work for Pulsar, always worked for Pulsar but should be + # calculated at some other level. + remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir)) + remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files")) + container = self._find_container( + job_wrapper, + compute_working_directory=remote_working_directory, + compute_tool_directory=remote_tool_directory, + compute_job_directory=remote_job_directory, + ) + command_line = build_command( + self, + job_wrapper=job_wrapper, + container=container, + include_metadata=remote_metadata, + include_work_dir_outputs=False, + remote_command_params=remote_command_params, + ) + except Exception: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + + # If we were able to get a command line, run the job + if not command_line: + job_wrapper.finish( '', '' ) + + return command_line, client, remote_job_config, compute_environment + + def __prepare_input_files_locally(self, job_wrapper): + """Run task splitting commands locally.""" + prepare_input_files_cmds = getattr(job_wrapper, 'prepare_input_files_cmds', None) + if prepare_input_files_cmds is not None: + for cmd in prepare_input_files_cmds: # run the commands to stage the input files + if 0 != os.system(cmd): + raise Exception('Error running file staging command: %s' % cmd) + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + + def _populate_parameter_defaults( self, job_destination ): + updated = False + params = job_destination.params + for key, value in self.destination_defaults.iteritems(): + if key in params: + if value is PARAMETER_SPECIFICATION_IGNORED: + log.warn( "Pulsar runner in selected configuration ignores parameter %s" % key ) + continue + #if self.runner_params.get( key, None ): + # # Let plugin define defaults for some parameters - + # # for instance that way jobs_directory can be + # # configured next to AMQP url (where it belongs). + # params[ key ] = self.runner_params[ key ] + # continue + + if not value: + continue + + if value is PARAMETER_SPECIFICATION_REQUIRED: + raise Exception( "Pulsar destination does not define required parameter %s" % key ) + elif value is not PARAMETER_SPECIFICATION_IGNORED: + params[ key ] = value + updated = True + return updated + + def get_output_files(self, job_wrapper): + output_paths = job_wrapper.get_output_fnames() + return [ str( o ) for o in output_paths ] # Force job_path from DatasetPath objects. + + def get_input_files(self, job_wrapper): + input_paths = job_wrapper.get_input_paths() + return [ str( i ) for i in input_paths ] # Force job_path from DatasetPath objects. + + def get_client_from_wrapper(self, job_wrapper): + job_id = job_wrapper.job_id + if hasattr(job_wrapper, 'task_id'): + job_id = "%s_%s" % (job_id, job_wrapper.task_id) + params = job_wrapper.job_destination.params.copy() + for key, value in params.iteritems(): + if value: + params[key] = model.User.expand_user_properties( job_wrapper.get_job().user, value ) + + env = getattr( job_wrapper.job_destination, "env", [] ) + return self.get_client( params, job_id, env ) + + def get_client_from_state(self, job_state): + job_destination_params = job_state.job_destination.params + job_id = job_state.job_id + return self.get_client( job_destination_params, job_id ) + + def get_client( self, job_destination_params, job_id, env=[] ): + # Cannot use url_for outside of web thread. + #files_endpoint = url_for( controller="job_files", job_id=encoded_job_id ) + + encoded_job_id = self.app.security.encode_id(job_id) + job_key = self.app.security.encode_id( job_id, kind="jobs_files" ) + files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % ( + self.galaxy_url, + encoded_job_id, + job_key + ) + get_client_kwds = dict( + job_id=str( job_id ), + files_endpoint=files_endpoint, + env=env + ) + return self.client_manager.get_client( job_destination_params, **get_client_kwds ) + + def finish_job( self, job_state ): + stderr = stdout = '' + job_wrapper = job_state.job_wrapper + try: + client = self.get_client_from_state(job_state) + run_results = client.full_status() + remote_working_directory = run_results.get("working_directory", None) + stdout = run_results.get('stdout', '') + stderr = run_results.get('stderr', '') + exit_code = run_results.get('returncode', None) + pulsar_outputs = PulsarOutputs.from_status_response(run_results) + # Use Pulsar client code to transfer/copy files back + # and cleanup job if needed. + completed_normally = \ + job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] + cleanup_job = self.app.config.cleanup_job + client_outputs = self.__client_outputs(client, job_wrapper) + finish_args = dict( client=client, + job_completed_normally=completed_normally, + cleanup_job=cleanup_job, + client_outputs=client_outputs, + pulsar_outputs=pulsar_outputs ) + failed = pulsar_finish_job( **finish_args ) + if failed: + job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) + except Exception: + message = GENERIC_REMOTE_ERROR + job_wrapper.fail( message, exception=True ) + log.exception("failure finishing job %d" % job_wrapper.job_id) + return + if not PulsarJobRunner.__remote_metadata( client ): + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + # Finish the job + try: + job_wrapper.finish( + stdout, + stderr, + exit_code, + remote_working_directory=remote_working_directory + ) + except Exception: + log.exception("Job wrapper finish method failed") + job_wrapper.fail("Unable to finish job", exception=True) + + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( getattr( job_state, "fail_message", GENERIC_REMOTE_ERROR ) ) + + def check_pid( self, pid ): + try: + os.kill( pid, 0 ) + return True + except OSError, e: + if e.errno == errno.ESRCH: + log.debug( "check_pid(): PID %d is dead" % pid ) + else: + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) + return False + + def stop_job( self, job ): + #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished + job_ext_output_metadata = job.get_external_output_metadata() + if job_ext_output_metadata: + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them + if pid in [ None, '' ]: + log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) + return + pid = int( pid ) + if not self.check_pid( pid ): + log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) + return + for sig in [ 15, 9 ]: + try: + os.killpg( pid, sig ) + except OSError, e: + log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) + return # give up + sleep( 2 ) + if not self.check_pid( pid ): + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.id, pid, sig ) ) + return + else: + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.id, pid ) ) + else: + # Remote kill + pulsar_url = job.job_runner_name + job_id = job.job_runner_external_id + log.debug("Attempt remote Pulsar kill of job with url %s and id %s" % (pulsar_url, job_id)) + client = self.get_client(job.destination_params, job_id) + client.kill() + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_state = self._job_state( job, job_wrapper ) + job_wrapper.command_line = job.get_command_line() + state = job.get_state() + if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: + log.debug( "(Pulsar/%s) is still in running state, adding to the Pulsar queue" % ( job.get_id()) ) + job_state.old_state = True + job_state.running = state == model.Job.states.RUNNING + self.monitor_queue.put( job_state ) + + def shutdown( self ): + super( PulsarJobRunner, self ).shutdown() + self.client_manager.shutdown() + + def _job_state( self, job, job_wrapper ): + job_state = AsynchronousJobState() + # TODO: Determine why this is set when using normal message queue updates + # but not CLI submitted MQ updates... + raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id + job_state.job_id = str( raw_job_id ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_state.job_destination = job_wrapper.job_destination + job_state.job_wrapper = job_wrapper + return job_state + + def __client_outputs( self, client, job_wrapper ): + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + output_files = self.get_output_files( job_wrapper ) + client_outputs = ClientOutputs( + working_directory=job_wrapper.working_directory, + work_dir_outputs=work_dir_outputs, + output_files=output_files, + version_file=job_wrapper.get_version_string_path(), + ) + return client_outputs + + @staticmethod + def __dependencies_description( pulsar_client, job_wrapper ): + dependency_resolution = PulsarJobRunner.__dependency_resolution( pulsar_client ) + remote_dependency_resolution = dependency_resolution == "remote" + if not remote_dependency_resolution: + return None + requirements = job_wrapper.tool.requirements or [] + installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies or [] + return dependencies.DependenciesDescription( + requirements=requirements, + installed_tool_dependencies=installed_tool_dependencies, + ) + + @staticmethod + def __dependency_resolution( pulsar_client ): + dependency_resolution = pulsar_client.destination_params.get( "dependency_resolution", "local" ) + if dependency_resolution not in ["none", "local", "remote"]: + raise Exception("Unknown dependency_resolution value encountered %s" % dependency_resolution) + return dependency_resolution + + @staticmethod + def __remote_metadata( pulsar_client ): + remote_metadata = string_as_bool_or_none( pulsar_client.destination_params.get( "remote_metadata", False ) ) + return remote_metadata + + @staticmethod + def __use_remote_datatypes_conf( pulsar_client ): + """ When setting remote metadata, use integrated datatypes from this + Galaxy instance or use the datatypes config configured via the remote + Pulsar. + + Both options are broken in different ways for same reason - datatypes + may not match. One can push the local datatypes config to the remote + server - but there is no guarentee these datatypes will be defined + there. Alternatively, one can use the remote datatype config - but + there is no guarentee that it will contain all the datatypes available + to this Galaxy. + """ + use_remote_datatypes = string_as_bool_or_none( pulsar_client.destination_params.get( "use_remote_datatypes", False ) ) + return use_remote_datatypes + + @staticmethod + def __rewrite_parameters( pulsar_client ): + return string_as_bool_or_none( pulsar_client.destination_params.get( "rewrite_parameters", False ) ) or False + + def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config): + metadata_kwds = {} + if remote_metadata: + remote_system_properties = remote_job_config.get("system_properties", {}) + remote_galaxy_home = remote_system_properties.get("galaxy_home", None) + if not remote_galaxy_home: + raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE) + metadata_kwds['exec_dir'] = remote_galaxy_home + outputs_directory = remote_job_config['outputs_directory'] + configs_directory = remote_job_config['configs_directory'] + working_directory = remote_job_config['working_directory'] + # For metadata calculation, we need to build a list of of output + # file objects with real path indicating location on Galaxy server + # and false path indicating location on compute server. Since the + # Pulsar disables from_work_dir copying as part of the job command + # line we need to take the list of output locations on the Pulsar + # server (produced by self.get_output_files(job_wrapper)) and for + # each work_dir output substitute the effective path on the Pulsar + # server relative to the remote working directory as the + # false_path to send the metadata command generation module. + work_dir_outputs = self.get_work_dir_outputs(job_wrapper, job_working_directory=working_directory) + outputs = [Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)] + for output in outputs: + for pulsar_workdir_path, real_path in work_dir_outputs: + if real_path == output.real_path: + output.false_path = pulsar_workdir_path + metadata_kwds['output_fnames'] = outputs + metadata_kwds['compute_tmp_dir'] = working_directory + metadata_kwds['config_root'] = remote_galaxy_home + default_config_file = os.path.join(remote_galaxy_home, 'universe_wsgi.ini') + metadata_kwds['config_file'] = remote_system_properties.get('galaxy_config_file', default_config_file) + metadata_kwds['dataset_files_path'] = remote_system_properties.get('galaxy_dataset_files_path', None) + if PulsarJobRunner.__use_remote_datatypes_conf( client ): + remote_datatypes_config = remote_system_properties.get('galaxy_datatypes_config_file', None) + if not remote_datatypes_config: + log.warn(NO_REMOTE_DATATYPES_CONFIG) + remote_datatypes_config = os.path.join(remote_galaxy_home, 'datatypes_conf.xml') + metadata_kwds['datatypes_config'] = remote_datatypes_config + else: + integrates_datatypes_config = self.app.datatypes_registry.integrated_datatypes_configs + # Ensure this file gets pushed out to the remote config dir. + job_wrapper.extra_filenames.append(integrates_datatypes_config) + + metadata_kwds['datatypes_config'] = os.path.join(configs_directory, os.path.basename(integrates_datatypes_config)) + return metadata_kwds + + +class PulsarLegacyJobRunner( PulsarJobRunner ): + destination_defaults = dict( + rewrite_parameters="false", + dependency_resolution="local", + ) + + +class PulsarMQJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="remote_transfer", + rewrite_parameters="true", + dependency_resolution="remote", + jobs_directory=PARAMETER_SPECIFICATION_REQUIRED, + url=PARAMETER_SPECIFICATION_IGNORED, + private_token=PARAMETER_SPECIFICATION_IGNORED + ) + + def _monitor( self ): + # This is a message queue driven runner, don't monitor + # just setup required callback. + self.client_manager.ensure_has_status_update_callback(self.__async_update) + + def __async_update( self, full_status ): + job_id = None + try: + job_id = full_status[ "job_id" ] + job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id( job_id ) + job_state = self._job_state( job, job_wrapper ) + self._update_job_state_for_status(job_state, full_status[ "status" ] ) + except Exception: + log.exception( "Failed to update Pulsar job status for job_id %s" % job_id ) + raise + # Nothing else to do? - Attempt to fail the job? + + +class PulsarRESTJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="transfer", + rewrite_parameters="true", + dependency_resolution="remote", + url=PARAMETER_SPECIFICATION_REQUIRED, + ) + + +class PulsarComputeEnvironment( ComputeEnvironment ): + + def __init__( self, pulsar_client, job_wrapper, remote_job_config ): + self.pulsar_client = pulsar_client + self.job_wrapper = job_wrapper + self.local_path_config = job_wrapper.default_compute_environment() + self.unstructured_path_rewrites = {} + # job_wrapper.prepare is going to expunge the job backing the following + # computations, so precalculate these paths. + self._wrapper_input_paths = self.local_path_config.input_paths() + self._wrapper_output_paths = self.local_path_config.output_paths() + self.path_mapper = PathMapper(pulsar_client, remote_job_config, self.local_path_config.working_directory()) + self._config_directory = remote_job_config[ "configs_directory" ] + self._working_directory = remote_job_config[ "working_directory" ] + self._sep = remote_job_config[ "system_properties" ][ "separator" ] + self._tool_dir = remote_job_config[ "tools_directory" ] + version_path = self.local_path_config.version_path() + new_version_path = self.path_mapper.remote_version_path_rewrite(version_path) + if new_version_path: + version_path = new_version_path + self._version_path = version_path + + def output_paths( self ): + local_output_paths = self._wrapper_output_paths + + results = [] + for local_output_path in local_output_paths: + wrapper_path = str( local_output_path ) + remote_path = self.path_mapper.remote_output_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_output_path, remote_path ) ) + return results + + def input_paths( self ): + local_input_paths = self._wrapper_input_paths + + results = [] + for local_input_path in local_input_paths: + wrapper_path = str( local_input_path ) + # This will over-copy in some cases. For instance in the case of task + # splitting, this input will be copied even though only the work dir + # input will actually be used. + remote_path = self.path_mapper.remote_input_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_input_path, remote_path ) ) + return results + + def _dataset_path( self, local_dataset_path, remote_path ): + remote_extra_files_path = None + if remote_path: + remote_extra_files_path = "%s_files" % remote_path[ 0:-len( ".dat" ) ] + return local_dataset_path.with_path_for_job( remote_path, remote_extra_files_path ) + + def working_directory( self ): + return self._working_directory + + def config_directory( self ): + return self._config_directory + + def new_file_path( self ): + return self.working_directory() # Problems with doing this? + + def sep( self ): + return self._sep + + def version_path( self ): + return self._version_path + + def rewriter( self, parameter_value ): + unstructured_path_rewrites = self.unstructured_path_rewrites + if parameter_value in unstructured_path_rewrites: + # Path previously mapped, use previous mapping. + return unstructured_path_rewrites[ parameter_value ] + if parameter_value in unstructured_path_rewrites.itervalues(): + # Path is a rewritten remote path (this might never occur, + # consider dropping check...) + return parameter_value + + rewrite, new_unstructured_path_rewrites = self.path_mapper.check_for_arbitrary_rewrite( parameter_value ) + if rewrite: + unstructured_path_rewrites.update(new_unstructured_path_rewrites) + return rewrite + else: + # Did need to rewrite, use original path or value. + return parameter_value + + def unstructured_path_rewriter( self ): + return self.rewriter diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/__init__.py --- a/lib/galaxy/jobs/runners/util/__init__.py +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -1,7 +1,7 @@ """ This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain -functionality shared between Galaxy and the LWR. +functionality shared between Galaxy and the Pulsar. """ from galaxy.util.bunch import Bunch diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/factory.py --- a/lib/galaxy/jobs/runners/util/cli/factory.py +++ b/lib/galaxy/jobs/runners/util/cli/factory.py @@ -5,7 +5,7 @@ ) code_dir = 'lib' except ImportError: - from lwr.managers.util.cli import ( + from pulsar.managers.util.cli import ( CliInterface, split_params ) diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/job/slurm.py --- a/lib/galaxy/jobs/runners/util/cli/job/slurm.py +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm.py @@ -5,7 +5,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/job/torque.py --- a/lib/galaxy/jobs/runners/util/cli/job/torque.py +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -7,7 +7,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -623,9 +623,9 @@ elif store == 'irods': from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) - elif store == 'lwr': - from .lwr import LwrObjectStore - return LwrObjectStore(config=config, config_xml=config_xml) + elif store == 'pulsar': + from .pulsar import PulsarObjectStore + return PulsarObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/lwr.py --- a/lib/galaxy/objectstore/lwr.py +++ /dev/null @@ -1,76 +0,0 @@ -from __future__ import absolute_import # Need to import lwr_client absolutely. -from ..objectstore import ObjectStore -try: - from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager -except ImportError: - from lwr.lwr_client.manager import ObjectStoreClientManager - - -class LwrObjectStore(ObjectStore): - """ - Object store implementation that delegates to a remote LWR server. - - This may be more aspirational than practical for now, it would be good to - Galaxy to a point that a handler thread could be setup that doesn't attempt - to access the disk files returned by a (this) object store - just passing - them along to the LWR unmodified. That modification - along with this - implementation and LWR job destinations would then allow Galaxy to fully - manage jobs on remote servers with completely different mount points. - - This implementation should be considered beta and may be dropped from - Galaxy at some future point or significantly modified. - """ - - def __init__(self, config, config_xml): - self.lwr_client = self.__build_lwr_client(config_xml) - - def exists(self, obj, **kwds): - return self.lwr_client.exists(**self.__build_kwds(obj, **kwds)) - - def file_ready(self, obj, **kwds): - return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds)) - - def create(self, obj, **kwds): - return self.lwr_client.create(**self.__build_kwds(obj, **kwds)) - - def empty(self, obj, **kwds): - return self.lwr_client.empty(**self.__build_kwds(obj, **kwds)) - - def size(self, obj, **kwds): - return self.lwr_client.size(**self.__build_kwds(obj, **kwds)) - - def delete(self, obj, **kwds): - return self.lwr_client.delete(**self.__build_kwds(obj, **kwds)) - - # TODO: Optimize get_data. - def get_data(self, obj, **kwds): - return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds)) - - def get_filename(self, obj, **kwds): - return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds)) - - def update_from_file(self, obj, **kwds): - return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds)) - - def get_store_usage_percent(self): - return self.lwr_client.get_store_usage_percent() - - def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): - return None - - def __build_kwds(self, obj, **kwds): - kwds['object_id'] = obj.id - return kwds - pass - - def __build_lwr_client(self, config_xml): - url = config_xml.get("url") - private_token = config_xml.get("private_token", None) - transport = config_xml.get("transport", None) - manager_options = dict(transport=transport) - client_options = dict(url=url, private_token=private_token) - lwr_client = ObjectStoreClientManager(**manager_options).get_client(client_options) - return lwr_client - - def shutdown(self): - pass diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/pulsar.py --- /dev/null +++ b/lib/galaxy/objectstore/pulsar.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. +from ..objectstore import ObjectStore +from pulsar.client.manager import ObjectStoreClientManager + + +class PulsarObjectStore(ObjectStore): + """ + Object store implementation that delegates to a remote Pulsar server. + + This may be more aspirational than practical for now, it would be good to + Galaxy to a point that a handler thread could be setup that doesn't attempt + to access the disk files returned by a (this) object store - just passing + them along to the Pulsar unmodified. That modification - along with this + implementation and Pulsar job destinations would then allow Galaxy to fully + manage jobs on remote servers with completely different mount points. + + This implementation should be considered beta and may be dropped from + Galaxy at some future point or significantly modified. + """ + + def __init__(self, config, config_xml): + self.pulsar_client = self.__build_pulsar_client(config_xml) + + def exists(self, obj, **kwds): + return self.pulsar_client.exists(**self.__build_kwds(obj, **kwds)) + + def file_ready(self, obj, **kwds): + return self.pulsar_client.file_ready(**self.__build_kwds(obj, **kwds)) + + def create(self, obj, **kwds): + return self.pulsar_client.create(**self.__build_kwds(obj, **kwds)) + + def empty(self, obj, **kwds): + return self.pulsar_client.empty(**self.__build_kwds(obj, **kwds)) + + def size(self, obj, **kwds): + return self.pulsar_client.size(**self.__build_kwds(obj, **kwds)) + + def delete(self, obj, **kwds): + return self.pulsar_client.delete(**self.__build_kwds(obj, **kwds)) + + # TODO: Optimize get_data. + def get_data(self, obj, **kwds): + return self.pulsar_client.get_data(**self.__build_kwds(obj, **kwds)) + + def get_filename(self, obj, **kwds): + return self.pulsar_client.get_filename(**self.__build_kwds(obj, **kwds)) + + def update_from_file(self, obj, **kwds): + return self.pulsar_client.update_from_file(**self.__build_kwds(obj, **kwds)) + + def get_store_usage_percent(self): + return self.pulsar_client.get_store_usage_percent() + + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): + return None + + def __build_kwds(self, obj, **kwds): + kwds['object_id'] = obj.id + return kwds + pass + + def __build_pulsar_client(self, config_xml): + url = config_xml.get("url") + private_token = config_xml.get("private_token", None) + transport = config_xml.get("transport", None) + manager_options = dict(transport=transport) + client_options = dict(url=url, private_token=private_token) + pulsar_client = ObjectStoreClientManager(**manager_options).get_client(client_options) + return pulsar_client + + def shutdown(self): + pass diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/tools/deps/dependencies.py --- a/lib/galaxy/tools/deps/dependencies.py +++ b/lib/galaxy/tools/deps/dependencies.py @@ -8,7 +8,7 @@ related context required to resolve dependencies via the ToolShedPackageDependencyResolver. - This is meant to enable remote resolution of dependencies, by the LWR or + This is meant to enable remote resolution of dependencies, by the Pulsar or other potential remote execution mechanisms. """ @@ -39,7 +39,7 @@ @staticmethod def _toolshed_install_dependency_from_dict(as_dict): - # Rather than requiring full models in LWR, just use simple objects + # Rather than requiring full models in Pulsar, just use simple objects # containing only properties and associations used to resolve # dependencies for tool execution. repository_object = bunch.Bunch( diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/__init__.py --- /dev/null +++ b/lib/pulsar/client/__init__.py @@ -0,0 +1,62 @@ +""" +pulsar client +====== + +This module contains logic for interfacing with an external Pulsar server. + +------------------ +Configuring Galaxy +------------------ + +Galaxy job runners are configured in Galaxy's ``job_conf.xml`` file. See ``job_conf.xml.sample_advanced`` +in your Galaxy code base or on +`Bitbucket <https://bitbucket.org/galaxy/galaxy-dist/src/tip/job_conf.xml.sample_advanced?at=default>`_ +for information on how to configure Galaxy to interact with the Pulsar. + +Galaxy also supports an older, less rich configuration of job runners directly +in its main ``universe_wsgi.ini`` file. The following section describes how to +configure Galaxy to communicate with the Pulsar in this legacy mode. + +Legacy +------ + +A Galaxy tool can be configured to be executed remotely via Pulsar by +adding a line to the ``universe_wsgi.ini`` file under the +``galaxy:tool_runners`` section with the format:: + + <tool_id> = pulsar://http://<pulsar_host>:<pulsar_port> + +As an example, if a host named remotehost is running the Pulsar server +application on port ``8913``, then the tool with id ``test_tool`` can +be configured to run remotely on remotehost by adding the following +line to ``universe.ini``:: + + test_tool = pulsar://http://remotehost:8913 + +Remember this must be added after the ``[galaxy:tool_runners]`` header +in the ``universe.ini`` file. + + +""" + +from .staging.down import finish_job +from .staging.up import submit_job +from .staging import ClientJobDescription +from .staging import PulsarOutputs +from .staging import ClientOutputs +from .client import OutputNotFoundException +from .manager import build_client_manager +from .destination import url_to_destination_params +from .path_mapper import PathMapper + +__all__ = [ + build_client_manager, + OutputNotFoundException, + url_to_destination_params, + finish_job, + submit_job, + ClientJobDescription, + PulsarOutputs, + ClientOutputs, + PathMapper, +] diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/action_mapper.py --- /dev/null +++ b/lib/pulsar/client/action_mapper.py @@ -0,0 +1,567 @@ +from json import load +from os import makedirs +from os.path import exists +from os.path import abspath +from os.path import dirname +from os.path import join +from os.path import basename +from os.path import sep +import fnmatch +from re import compile +from re import escape +import galaxy.util +from galaxy.util.bunch import Bunch +from .config_util import read_file +from .util import directory_files +from .util import unique_path_prefix +from .transport import get_file +from .transport import post_file + + +DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? +DEFAULT_PATH_MAPPER_TYPE = 'prefix' + +STAGING_ACTION_REMOTE = "remote" +STAGING_ACTION_LOCAL = "local" +STAGING_ACTION_NONE = None +STAGING_ACTION_DEFAULT = "default" + +# Poor man's enum. +path_type = Bunch( + # Galaxy input datasets and extra files. + INPUT="input", + # Galaxy config and param files. + CONFIG="config", + # Files from tool's tool_dir (for now just wrapper if available). + TOOL="tool", + # Input work dir files - e.g. metadata files, task-split input files, etc.. + WORKDIR="workdir", + # Galaxy output datasets in their final home. + OUTPUT="output", + # Galaxy from_work_dir output paths and other files (e.g. galaxy.json) + OUTPUT_WORKDIR="output_workdir", + # Other fixed tool parameter paths (likely coming from tool data, but not + # nessecarily). Not sure this is the best name... + UNSTRUCTURED="unstructured", +) + + +ACTION_DEFAULT_PATH_TYPES = [ + path_type.INPUT, + path_type.CONFIG, + path_type.TOOL, + path_type.WORKDIR, + path_type.OUTPUT, + path_type.OUTPUT_WORKDIR, +] +ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED] + + +class FileActionMapper(object): + """ + Objects of this class define how paths are mapped to actions. + + >>> json_string = r'''{"paths": [ \ + {"path": "/opt/galaxy", "action": "none"}, \ + {"path": "/galaxy/data", "action": "transfer"}, \ + {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, \ + {"path": ".*/dataset_\\\\d+.dat", "action": "copy", "match_type": "regex"} \ + ]}''' + >>> from tempfile import NamedTemporaryFile + >>> from os import unlink + >>> def mapper_for(default_action, config_contents): + ... f = NamedTemporaryFile(delete=False) + ... f.write(config_contents.encode('UTF-8')) + ... f.close() + ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) + ... mapper = FileActionMapper(mock_client) + ... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and deserialize it to make sure still works + ... unlink(f.name) + ... return mapper + >>> mapper = mapper_for(default_action='none', config_contents=json_string) + >>> # Test first config line above, implicit path prefix mapper + >>> action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + >>> action.action_type == u'none' + True + >>> action.staging_needed + False + >>> # Test another (2nd) mapper, this one with a different action + >>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input') + >>> action.action_type == u'transfer' + True + >>> action.staging_needed + True + >>> # Always at least copy work_dir outputs. + >>> action = mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir') + >>> action.action_type == u'copy' + True + >>> action.staging_needed + True + >>> # Test glob mapper (matching test) + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input').action_type == u'copy' + True + >>> # Test glob mapper (non-matching test) + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input').action_type == u'none' + True + >>> # Regex mapper test. + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input').action_type == u'copy' + True + >>> # Doesn't map unstructured paths by default + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'none' + True + >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "input"} \ + ] }''') + >>> input_only_mapper.action('/dataset_1.dat', 'input').action_type == u'transfer' + True + >>> input_only_mapper.action('/dataset_1.dat', 'output').action_type == u'none' + True + >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "*any*"} \ + ] }''') + >>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'transfer' + True + """ + + def __init__(self, client=None, config=None): + if config is None and client is None: + message = "FileActionMapper must be constructed from either a client or a config dictionary." + raise Exception(message) + if config is None: + config = self.__client_to_config(client) + self.default_action = config.get("default_action", "transfer") + self.mappers = mappers_from_dicts(config.get("paths", [])) + self.files_endpoint = config.get("files_endpoint", None) + + def action(self, path, type, mapper=None): + mapper = self.__find_mapper(path, type, mapper) + action_class = self.__action_class(path, type, mapper) + file_lister = DEFAULT_FILE_LISTER + action_kwds = {} + if mapper: + file_lister = mapper.file_lister + action_kwds = mapper.action_kwds + action = action_class(path, file_lister=file_lister, **action_kwds) + self.__process_action(action, type) + return action + + def unstructured_mappers(self): + """ Return mappers that will map 'unstructured' files (i.e. go beyond + mapping inputs, outputs, and config files). + """ + return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers) + + def to_dict(self): + return dict( + default_action=self.default_action, + files_endpoint=self.files_endpoint, + paths=map(lambda m: m.to_dict(), self.mappers) + ) + + def __client_to_config(self, client): + action_config_path = client.action_config_path + if action_config_path: + config = read_file(action_config_path) + else: + config = dict() + config["default_action"] = client.default_file_action + config["files_endpoint"] = client.files_endpoint + return config + + def __load_action_config(self, path): + config = load(open(path, 'rb')) + self.mappers = mappers_from_dicts(config.get('paths', [])) + + def __find_mapper(self, path, type, mapper=None): + if not mapper: + normalized_path = abspath(path) + for query_mapper in self.mappers: + if query_mapper.matches(normalized_path, type): + mapper = query_mapper + break + return mapper + + def __action_class(self, path, type, mapper): + action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" + if mapper: + action_type = mapper.action_type + if type in ["workdir", "output_workdir"] and action_type == "none": + # We are changing the working_directory relative to what + # Galaxy would use, these need to be copied over. + action_type = "copy" + action_class = actions.get(action_type, None) + if action_class is None: + message_template = "Unknown action_type encountered %s while trying to map path %s" + message_args = (action_type, path) + raise Exception(message_template % message_args) + return action_class + + def __process_action(self, action, file_type): + """ Extension point to populate extra action information after an + action has been created. + """ + if action.action_type == "remote_transfer": + url_base = self.files_endpoint + if not url_base: + raise Exception("Attempted to use remote_transfer action with defining a files_endpoint") + if "?" not in url_base: + url_base = "%s?" % url_base + # TODO: URL encode path. + url = "%s&path=%s&file_type=%s" % (url_base, action.path, file_type) + action.url = url + +REQUIRED_ACTION_KWD = object() + + +class BaseAction(object): + action_spec = {} + + def __init__(self, path, file_lister=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + + def unstructured_map(self, path_helper): + unstructured_map = self.file_lister.unstructured_map(self.path) + if self.staging_needed: + # To ensure uniqueness, prepend unique prefix to each name + prefix = unique_path_prefix(self.path) + for path, name in unstructured_map.iteritems(): + unstructured_map[path] = join(prefix, name) + else: + path_rewrites = {} + for path in unstructured_map: + rewrite = self.path_rewrite(path_helper, path) + if rewrite: + path_rewrites[path] = rewrite + unstructured_map = path_rewrites + return unstructured_map + + @property + def staging_needed(self): + return self.staging != STAGING_ACTION_NONE + + @property + def staging_action_local(self): + return self.staging == STAGING_ACTION_LOCAL + + +class NoneAction(BaseAction): + """ This action indicates the corresponding path does not require any + additional action. This should indicate paths that are available both on + the Pulsar client (i.e. Galaxy server) and remote Pulsar server with the same + paths. """ + action_type = "none" + staging = STAGING_ACTION_NONE + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return NoneAction(path=action_dict["path"]) + + def path_rewrite(self, path_helper, path=None): + return None + + +class RewriteAction(BaseAction): + """ This actin indicates the Pulsar server should simply rewrite the path + to the specified file. + """ + action_spec = dict( + source_directory=REQUIRED_ACTION_KWD, + destination_directory=REQUIRED_ACTION_KWD + ) + action_type = "rewrite" + staging = STAGING_ACTION_NONE + + def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.source_directory = source_directory + self.destination_directory = destination_directory + + def to_dict(self): + return dict( + path=self.path, + action_type=self.action_type, + source_directory=self.source_directory, + destination_directory=self.destination_directory, + ) + + @classmethod + def from_dict(cls, action_dict): + return RewriteAction( + path=action_dict["path"], + source_directory=action_dict["source_directory"], + destination_directory=action_dict["destination_directory"], + ) + + def path_rewrite(self, path_helper, path=None): + if not path: + path = self.path + new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory) + return None if new_path == self.path else new_path + + +class TransferAction(BaseAction): + """ This actions indicates that the Pulsar client should initiate an HTTP + transfer of the corresponding path to the remote Pulsar server before + launching the job. """ + action_type = "transfer" + staging = STAGING_ACTION_LOCAL + + +class CopyAction(BaseAction): + """ This action indicates that the Pulsar client should execute a file system + copy of the corresponding path to the Pulsar staging directory prior to + launching the corresponding job. """ + action_type = "copy" + staging = STAGING_ACTION_LOCAL + + +class RemoteCopyAction(BaseAction): + """ This action indicates the Pulsar server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the Pulsar server instead of on + the client. + """ + action_type = "remote_copy" + staging = STAGING_ACTION_REMOTE + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return RemoteCopyAction(path=action_dict["path"]) + + def write_to_path(self, path): + galaxy.util.copy_to_path(open(self.path, "rb"), path) + + def write_from_path(self, pulsar_path): + destination = self.path + parent_directory = dirname(destination) + if not exists(parent_directory): + makedirs(parent_directory) + with open(pulsar_path, "rb") as f: + galaxy.util.copy_to_path(f, destination) + + +class RemoteTransferAction(BaseAction): + """ This action indicates the Pulsar server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the Pulsar server instead of on + the client. + """ + action_type = "remote_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, path, file_lister=None, url=None): + super(RemoteTransferAction, self).__init__(path, file_lister=file_lister) + self.url = url + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type, url=self.url) + + @classmethod + def from_dict(cls, action_dict): + return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"]) + + def write_to_path(self, path): + get_file(self.url, path) + + def write_from_path(self, pulsar_path): + post_file(self.url, pulsar_path) + + +class MessageAction(object): + """ Sort of pseudo action describing "files" store in memory and + transferred via message (HTTP, Python-call, MQ, etc...) + """ + action_type = "message" + staging = STAGING_ACTION_DEFAULT + + def __init__(self, contents, client=None): + self.contents = contents + self.client = client + + @property + def staging_needed(self): + return True + + @property + def staging_action_local(self): + # Ekkk, cannot be called if created through from_dict. + # Shouldn't be a problem the way it is used - but is an + # object design problem. + return self.client.prefer_local_staging + + def to_dict(self): + return dict(contents=self.contents, action_type=MessageAction.action_type) + + @classmethod + def from_dict(cls, action_dict): + return MessageAction(contents=action_dict["contents"]) + + def write_to_path(self, path): + open(path, "w").write(self.contents) + + +DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction] + + +def from_dict(action_dict): + action_type = action_dict.get("action_type", None) + target_class = None + for action_class in DICTIFIABLE_ACTION_CLASSES: + if action_type == action_class.action_type: + target_class = action_class + if not target_class: + message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type + raise Exception(message) + return target_class.from_dict(action_dict) + + +class BasePathMapper(object): + + def __init__(self, config): + action_type = config.get('action', DEFAULT_MAPPED_ACTION) + action_class = actions.get(action_type, None) + action_kwds = action_class.action_spec.copy() + for key, value in action_kwds.items(): + if key in config: + action_kwds[key] = config[key] + elif value is REQUIRED_ACTION_KWD: + message_template = "action_type %s requires key word argument %s" + message = message_template % (action_type, key) + raise Exception(message) + self.action_type = action_type + self.action_kwds = action_kwds + path_types_str = config.get('path_types', "*defaults*") + path_types_str = path_types_str.replace("*defaults*", ",".join(ACTION_DEFAULT_PATH_TYPES)) + path_types_str = path_types_str.replace("*any*", ",".join(ALL_PATH_TYPES)) + self.path_types = path_types_str.split(",") + self.file_lister = FileLister(config) + + def matches(self, path, path_type): + path_type_matches = path_type in self.path_types + return path_type_matches and self._path_matches(path) + + def _extend_base_dict(self, **kwds): + base_dict = dict( + action=self.action_type, + path_types=",".join(self.path_types), + match_type=self.match_type + ) + base_dict.update(self.file_lister.to_dict()) + base_dict.update(self.action_kwds) + base_dict.update(**kwds) + return base_dict + + +class PrefixPathMapper(BasePathMapper): + match_type = 'prefix' + + def __init__(self, config): + super(PrefixPathMapper, self).__init__(config) + self.prefix_path = abspath(config['path']) + + def _path_matches(self, path): + return path.startswith(self.prefix_path) + + def to_pattern(self): + pattern_str = "(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep)) + return compile(pattern_str) + + def to_dict(self): + return self._extend_base_dict(path=self.prefix_path) + + +class GlobPathMapper(BasePathMapper): + match_type = 'glob' + + def __init__(self, config): + super(GlobPathMapper, self).__init__(config) + self.glob_path = config['path'] + + def _path_matches(self, path): + return fnmatch.fnmatch(path, self.glob_path) + + def to_pattern(self): + return compile(fnmatch.translate(self.glob_path)) + + def to_dict(self): + return self._extend_base_dict(path=self.glob_path) + + +class RegexPathMapper(BasePathMapper): + match_type = 'regex' + + def __init__(self, config): + super(RegexPathMapper, self).__init__(config) + self.pattern_raw = config['path'] + self.pattern = compile(self.pattern_raw) + + def _path_matches(self, path): + return self.pattern.match(path) is not None + + def to_pattern(self): + return self.pattern + + def to_dict(self): + return self._extend_base_dict(path=self.pattern_raw) + +MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper] +MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES)) + + +def mappers_from_dicts(mapper_def_list): + return map(lambda m: __mappper_from_dict(m), mapper_def_list) + + +def __mappper_from_dict(mapper_dict): + map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE) + return MAPPER_CLASS_DICT[map_type](mapper_dict) + + +class FileLister(object): + + def __init__(self, config): + self.depth = int(config.get("depth", "0")) + + def to_dict(self): + return dict( + depth=self.depth + ) + + def unstructured_map(self, path): + depth = self.depth + if self.depth == 0: + return {path: basename(path)} + else: + while depth > 0: + path = dirname(path) + depth -= 1 + return dict([(join(path, f), f) for f in directory_files(path)]) + +DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) + +ACTION_CLASSES = [ + NoneAction, + RewriteAction, + TransferAction, + CopyAction, + RemoteCopyAction, + RemoteTransferAction, +] +actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) + + +__all__ = [ + FileActionMapper, + path_type, + from_dict, + MessageAction, + RemoteTransferAction, # For testing +] diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/amqp_exchange.py --- /dev/null +++ b/lib/pulsar/client/amqp_exchange.py @@ -0,0 +1,139 @@ +try: + import kombu + from kombu import pools +except ImportError: + kombu = None + +import socket +import logging +import threading +from time import sleep +log = logging.getLogger(__name__) + + +KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" + +DEFAULT_EXCHANGE_NAME = "pulsar" +DEFAULT_EXCHANGE_TYPE = "direct" +# Set timeout to periodically give up looking and check if polling should end. +DEFAULT_TIMEOUT = 0.2 +DEFAULT_HEARTBEAT = 580 + +DEFAULT_RECONNECT_CONSUMER_WAIT = 1 +DEFAULT_HEARTBEAT_WAIT = 1 + + +class PulsarExchange(object): + """ Utility for publishing and consuming structured Pulsar queues using kombu. + This is shared between the server and client - an exchange should be setup + for each manager (or in the case of the client, each manager one wished to + communicate with.) + + Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar + should target each AMQP endpoint or care should be taken that unique + manager names are used across Pulsar servers targetting same AMQP endpoint - + and in particular only one such Pulsar should define an default manager with + name _default_. + """ + + def __init__( + self, + url, + manager_name, + connect_ssl=None, + timeout=DEFAULT_TIMEOUT, + publish_kwds={}, + ): + """ + """ + if not kombu: + raise Exception(KOMBU_UNAVAILABLE) + self.__url = url + self.__manager_name = manager_name + self.__connect_ssl = connect_ssl + self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) + self.__timeout = timeout + # Be sure to log message publishing failures. + if publish_kwds.get("retry", False): + if "retry_policy" not in publish_kwds: + publish_kwds["retry_policy"] = {} + if "errback" not in publish_kwds["retry_policy"]: + publish_kwds["retry_policy"]["errback"] = self.__publish_errback + self.__publish_kwds = publish_kwds + + @property + def url(self): + return self.__url + + def consume(self, queue_name, callback, check=True, connection_kwargs={}): + queue = self.__queue(queue_name) + log.debug("Consuming queue '%s'", queue) + while check: + heartbeat_thread = None + try: + with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + heartbeat_thread = self.__start_heartbeat(queue_name, connection) + while check and connection.connected: + try: + connection.drain_events(timeout=self.__timeout) + except socket.timeout: + pass + except (IOError, socket.error), exc: + # In testing, errno is None + log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) + if heartbeat_thread: + heartbeat_thread.join() + sleep(DEFAULT_RECONNECT_CONSUMER_WAIT) + + def heartbeat(self, connection): + log.debug('AMQP heartbeat thread alive') + while connection.connected: + connection.heartbeat_check() + sleep(DEFAULT_HEARTBEAT_WAIT) + log.debug('AMQP heartbeat thread exiting') + + def publish(self, name, payload): + with self.connection(self.__url) as connection: + with pools.producers[connection].acquire() as producer: + key = self.__queue_name(name) + producer.publish( + payload, + serializer='json', + exchange=self.__exchange, + declare=[self.__exchange], + routing_key=key, + **self.__publish_kwds + ) + + def __publish_errback(self, exc, interval): + log.error("Connection error while publishing: %r", exc, exc_info=1) + log.info("Retrying in %s seconds", interval) + + def connection(self, connection_string, **kwargs): + if "ssl" not in kwargs: + kwargs["ssl"] = self.__connect_ssl + return kombu.Connection(connection_string, **kwargs) + + def __queue(self, name): + queue_name = self.__queue_name(name) + queue = kombu.Queue(queue_name, self.__exchange, routing_key=queue_name) + return queue + + def __queue_name(self, name): + key_prefix = self.__key_prefix() + queue_name = '%s_%s' % (key_prefix, name) + return queue_name + + def __key_prefix(self): + if self.__manager_name == "_default_": + key_prefix = "pulsar_" + else: + key_prefix = "pulsar_%s_" % self.__manager_name + return key_prefix + + def __start_heartbeat(self, queue_name, connection): + thread_name = "consume-heartbeat-%s" % (self.__queue_name(queue_name)) + thread = threading.Thread(name=thread_name, target=self.heartbeat, args=(connection,)) + thread.start() + return thread diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/amqp_exchange_factory.py --- /dev/null +++ b/lib/pulsar/client/amqp_exchange_factory.py @@ -0,0 +1,41 @@ +from .amqp_exchange import PulsarExchange +from .util import filter_destination_params + + +def get_exchange(url, manager_name, params): + connect_ssl = parse_amqp_connect_ssl_params(params) + exchange_kwds = dict( + manager_name=manager_name, + connect_ssl=connect_ssl, + publish_kwds=parse_amqp_publish_kwds(params) + ) + timeout = params.get('amqp_consumer_timeout', False) + if timeout is not False: + exchange_kwds['timeout'] = timeout + exchange = PulsarExchange(url, **exchange_kwds) + return exchange + + +def parse_amqp_connect_ssl_params(params): + ssl_params = filter_destination_params(params, "amqp_connect_ssl_") + if not ssl_params: + return + + ssl = __import__('ssl') + if 'cert_reqs' in ssl_params: + value = ssl_params['cert_reqs'] + ssl_params['cert_reqs'] = getattr(ssl, value.upper()) + return ssl_params + + +def parse_amqp_publish_kwds(params): + all_publish_params = filter_destination_params(params, "amqp_publish_") + retry_policy_params = {} + for key in all_publish_params.keys(): + if key.startswith("retry_"): + value = all_publish_params[key] + retry_policy_params[key[len("retry_"):]] = value + del all_publish_params[key] + if retry_policy_params: + all_publish_params["retry_policy"] = retry_policy_params + return all_publish_params This diff is so big that we needed to truncate the remainder. Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.