1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/566388d62374/
Changeset: 566388d62374
User: jmchilton
Date: 2014-06-24 04:27:51
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #422)
Implement Pulsar job runners.
Affected #: 32 files
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -19,27 +19,30 @@
<!-- Override the $DRMAA_LIBRARY_PATH environment variable --><param
id="drmaa_library_path">/sge/lib/libdrmaa.so</param></plugin>
- <plugin id="lwr" type="runner"
load="galaxy.jobs.runners.lwr:LwrJobRunner">
- <!-- More information on LWR can be found at
https://lwr.readthedocs.org
-->
- <!-- Uncomment following line to use libcurl to perform HTTP calls (defaults
to urllib) -->
+ <plugin id="cli" type="runner"
load="galaxy.jobs.runners.cli:ShellJobRunner" />
+ <plugin id="condor" type="runner"
load="galaxy.jobs.runners.condor:CondorJobRunner" />
+ <plugin id="slurm" type="runner"
load="galaxy.jobs.runners.slurm:SlurmJobRunner" />
+ <!-- Pulsar runners (see more at
https://pulsar.readthedocs.org) -->
+ <plugin id="pulsar_rest" type="runner"
load="galaxy.jobs.runners.pulsar:PulsarRESTJobRunner">
+ <!-- Allow optimized HTTP calls with libcurl (defaults to urllib)
--><!-- <param id="transport">curl</param> -->
- <!-- *Experimental Caching*: Uncomment next parameters to enable
- caching and specify the number of caching threads to enable on Galaxy
- side. Likely will not work with newer features such as MQ support.
- If this is enabled be sure to specify a `file_cache_dir` in the remote
- LWR's main configuration file.
+
+ <!-- *Experimental Caching*: Next parameter enables caching.
+ Likely will not work with newer features such as MQ support.
+
+ If this is enabled be sure to specify a `file_cache_dir` in
+ the remote Pulsar's servers main configuration file.
--><!-- <param id="cache">True</param> -->
- <!-- <param id="transfer_threads">2</param>
--></plugin>
- <plugin id="amqp_lwr" type="runner"
load="galaxy.jobs.runners.lwr:LwrJobRunner">
- <param
id="url">amqp://guest:guest@localhost:5672//</param>
- <!-- If using message queue driven LWR - the LWR will generally
- initiate file transfers so a the URL of this Galaxy instance
- must be configured. -->
+ <plugin id="pulsar_mq" type="runner"
load="galaxy.jobs.runners.pulsar:PulsarMQJobRunner">
+ <!-- AMQP URL to connect to. -->
+ <param
id="amqp_url">amqp://guest:guest@localhost:5672//</param>
+ <!-- URL remote Pulsar apps should transfer files to this Galaxy
+ instance to/from. --><param
id="galaxy_url">http://localhost:8080</param>
- <!-- If multiple managers configured on the LWR, specify which one
- this plugin targets. -->
+ <!-- Pulsar job manager to communicate with (see Pulsar
+ docs for information on job managers). --><!-- <param
id="manager">_default_</param> --><!-- The AMQP client can
provide an SSL client certificate (e.g. for
validation), the following options configure that certificate
@@ -58,9 +61,17 @@
higher value (in seconds) (or `None` to use blocking connections).
--><!-- <param id="amqp_consumer_timeout">None</param>
--></plugin>
- <plugin id="cli" type="runner"
load="galaxy.jobs.runners.cli:ShellJobRunner" />
- <plugin id="condor" type="runner"
load="galaxy.jobs.runners.condor:CondorJobRunner" />
- <plugin id="slurm" type="runner"
load="galaxy.jobs.runners.slurm:SlurmJobRunner" />
+ <plugin id="pulsar_legacy" type="runner"
load="galaxy.jobs.runners.pulsar:PulsarLegacyJobRunner">
+ <!-- Pulsar job runner with default parameters matching those
+ of old LWR job runner. If your Pulsar server is running on a
+ Windows machine for instance this runner should still be used.
+
+ These destinations still needs to target a Pulsar server,
+ older LWR plugins and destinations still work in Galaxy can
+ target LWR servers, but this support should be considered
+ deprecated and will disappear with a future release of Galaxy.
+ -->
+ </plugin></plugins><handlers
default="handlers"><!-- Additional job handlers - the id should match the
name of a
@@ -125,8 +136,8 @@
$galaxy_root:ro,$tool_directory:ro,$working_directory:rw,$default_file_path:ro
- If using the LWR, defaults will be even further restricted because the
- LWR will (by default) stage all needed inputs into the job's
job_directory
+ If using the Pulsar, defaults will be even further restricted because the
+ Pulsar will (by default) stage all needed inputs into the job's
job_directory
(so there is not need to allow the docker container to read all the
files - let alone write over them). Defaults in this case becomes:
@@ -135,7 +146,7 @@
Python string.Template is used to expand volumes and values $defaults,
$galaxy_root, $default_file_path, $tool_directory, $working_directory,
are available to all jobs and $job_directory is also available for
- LWR jobs.
+ Pulsar jobs.
--><!-- Control memory allocatable by docker container with following
option:
-->
@@ -213,87 +224,71 @@
<!-- A destination that represents a method in the dynamic runner.
--><param id="function">foo</param></destination>
- <destination id="secure_lwr" runner="lwr">
- <param
id="url">https://windowshost.examle.com:8913/</param>
- <!-- If set, private_token must match token remote LWR server configured
with. -->
+ <destination id="secure_pulsar_rest_dest"
runner="pulsar_rest">
+ <param id="url">https://examle.com:8913/</param>
+ <!-- If set, private_token must match token in remote Pulsar's
+ configuration. --><param
id="private_token">123456789changeme</param><!-- Uncomment the
following statement to disable file staging (e.g.
- if there is a shared file system between Galaxy and the LWR
+ if there is a shared file system between Galaxy and the Pulsar
server). Alternatively action can be set to 'copy' - to replace
http transfers with file system copies, 'remote_transfer' to
cause
- the lwr to initiate HTTP transfers instead of Galaxy, or
- 'remote_copy' to cause lwr to initiate file system copies.
+ the Pulsar to initiate HTTP transfers instead of Galaxy, or
+ 'remote_copy' to cause Pulsar to initiate file system copies.
If setting this to 'remote_transfer' be sure to specify a
'galaxy_url' attribute on the runner plugin above. --><!--
<param id="default_file_action">none</param> --><!-- The above
option is just the default, the transfer behavior
none|copy|http can be configured on a per path basis via the
- following file. See lib/galaxy/jobs/runners/lwr_client/action_mapper.py
- for examples of how to configure this file. This is very beta
- and nature of file will likely change.
+ following file. See Pulsar documentation for more details and
+ examples.
-->
- <!-- <param
id="file_action_config">file_actions.json</param> -->
- <!-- Uncomment following option to disable Galaxy tool dependency
- resolution and utilize remote LWR's configuraiton of tool
- dependency resolution instead (same options as Galaxy for
- dependency resolution are available in LWR). At a minimum
- the remote LWR server should define a tool_dependencies_dir in
- its `server.ini` configuration. The LWR will not attempt to
- stage dependencies - so ensure the the required galaxy or tool
- shed packages are available remotely (exact same tool shed
- installed changesets are required).
+ <!-- <param
id="file_action_config">file_actions.yaml</param> -->
+ <!-- The non-legacy Pulsar runners will attempt to resolve Galaxy
+ dependencies remotely - to enable this set a tool_dependency_dir
+ in Pulsar's configuration (can work with all the same dependency
+ resolutions mechanisms as Galaxy - tool Shed installs, Galaxy
+ packages, etc...). To disable this behavior, set the follow parameter
+ to none. To generate the dependency resolution command locally
+ set the following parameter local.
-->
- <!-- <param
id="dependency_resolution">remote</params> -->
- <!-- Traditionally, the LWR allow Galaxy to generate a command line
- as if it were going to run the command locally and then the
- LWR client rewrites it after the fact using regular
- expressions. Setting the following value to true causes the
- LWR runner to insert itself into the command line generation
- process and generate the correct command line from the get go.
- This will likely be the default someday - but requires a newer
- LWR version and is less well tested. -->
- <!-- <param id="rewrite_parameters">true</params>
-->
+ <!-- <param id="dependency_resolution">none</params>
--><!-- Uncomment following option to enable setting metadata on remote
- LWR server. The 'use_remote_datatypes' option is available for
+ Pulsar server. The 'use_remote_datatypes' option is available
for
determining whether to use remotely configured datatypes or local
ones (both alternatives are a little brittle). --><!-- <param
id="remote_metadata">true</param> --><!-- <param
id="use_remote_datatypes">false</param> --><!-- <param
id="remote_property_galaxy_home">/path/to/remote/galaxy-central</param>
-->
- <!-- If remote LWR server is configured to run jobs as the real user,
+ <!-- If remote Pulsar server is configured to run jobs as the real user,
uncomment the following line to pass the current Galaxy user
along. --><!-- <param
id="submit_user">$__user_name__</param> -->
- <!-- Various other submission parameters can be passed along to the LWR
- whose use will depend on the remote LWR's configured job manager.
+ <!-- Various other submission parameters can be passed along to the
Pulsar
+ whose use will depend on the remote Pulsar's configured job
manager.
For instance:
-->
- <!-- <param id="submit_native_specification">-P bignodes
-R y -pe threads 8</param> -->
+ <!-- <param id="submit_native_specification">-P bignodes
-R y -pe threads 8</param> -->
+ <!-- Disable parameter rewriting and rewrite generated commands
+ instead. This may be required if remote host is Windows machine
+ but probably not otherwise.
+ -->
+ <!-- <param id="rewrite_parameters">false</params>
--></destination>
- <destination id="amqp_lwr_dest" runner="amqp_lwr" >
- <!-- url and private_token are not valid when using MQ driven LWR. The
plugin above
- determines which queue/manager to target and the underlying MQ server
should be
- used to configure security.
- -->
- <!-- Traditionally, the LWR client sends request to LWR
- server to populate various system properties. This
+ <destination id="pulsar_mq_dest" runner="amqp_pulsar"
>
+ <!-- The RESTful Pulsar client sends a request to Pulsar
+ to populate various system properties. This
extra step can be disabled and these calculated here
on client by uncommenting jobs_directory and
specifying any additional remote_property_ of
interest, this is not optional when using message
queues.
-->
- <param
id="jobs_directory">/path/to/remote/lwr/lwr_staging/</param>
- <!-- Default the LWR send files to and pull files from Galaxy when
- using message queues (in the more traditional mode Galaxy sends
- files to and pull files from the LWR - this is obviously less
- appropriate when using a message queue).
-
- The default_file_action currently requires pycurl be available
- to Galaxy (presumably in its virtualenv). Making this dependency
- optional is an open task.
+ <param
id="jobs_directory">/path/to/remote/pulsar/files/staging/</param>
+ <!-- Otherwise MQ and Legacy pulsar destinations can be supplied
+ all the same destination parameters as the RESTful client documented
+ above (though url and private_token are ignored when using a MQ).
-->
- <param
id="default_file_action">remote_transfer</param></destination><destination
id="ssh_torque" runner="cli"><param
id="shell_plugin">SecureShell</param>
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/pulsar.py
--- /dev/null
+++ b/lib/galaxy/jobs/runners/pulsar.py
@@ -0,0 +1,707 @@
+from __future__ import absolute_import # Need to import pulsar_client absolutely.
+
+import logging
+
+from galaxy import model
+from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner
+from galaxy.jobs import ComputeEnvironment
+from galaxy.jobs import JobDestination
+from galaxy.jobs.command_factory import build_command
+from galaxy.tools.deps import dependencies
+from galaxy.util import string_as_bool_or_none
+from galaxy.util.bunch import Bunch
+from galaxy.util import specs
+
+import errno
+from time import sleep
+import os
+
+from pulsar.client import build_client_manager
+from pulsar.client import url_to_destination_params
+from pulsar.client import finish_job as pulsar_finish_job
+from pulsar.client import submit_job as pulsar_submit_job
+from pulsar.client import ClientJobDescription
+from pulsar.client import PulsarOutputs
+from pulsar.client import ClientOutputs
+from pulsar.client import PathMapper
+
+log = logging.getLogger( __name__ )
+
+__all__ = [ 'PulsarLegacyJobRunner', 'PulsarRESTJobRunner',
'PulsarMQJobRunner' ]
+
+NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client
configured to set metadata remotely, but remote Pulsar isn't properly configured with
a galaxy_home directory."
+NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes
configuration when setting metadata externally, but Pulsar is not configured with this
information. Defaulting to datatypes_conf.xml."
+GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server."
+
+# Is there a good way to infer some default for this? Can only use
+# url_for from web threads.
https://gist.github.com/jmchilton/9098762
+DEFAULT_GALAXY_URL = "http://localhost:8080"
+
+PULSAR_PARAM_SPECS = dict(
+ transport=dict(
+ map=specs.to_str_or_none,
+ valid=specs.is_in("urllib", "curl", None),
+ default=None
+ ),
+ cache=dict(
+ map=specs.to_bool_or_none,
+ default=None,
+ ),
+ amqp_url=dict(
+ map=specs.to_str_or_none,
+ default=None,
+ ),
+ galaxy_url=dict(
+ map=specs.to_str_or_none,
+ default=DEFAULT_GALAXY_URL,
+ ),
+ manager=dict(
+ map=specs.to_str_or_none,
+ default=None,
+ ),
+ amqp_consumer_timeout=dict(
+ map=lambda val: None if val == "None" else float(val),
+ default=None,
+ ),
+ amqp_connect_ssl_ca_certs=dict(
+ map=specs.to_str_or_none,
+ default=None,
+ ),
+ amqp_connect_ssl_keyfile=dict(
+ map=specs.to_str_or_none,
+ default=None,
+ ),
+ amqp_connect_ssl_certfile=dict(
+ map=specs.to_str_or_none,
+ default=None,
+ ),
+ amqp_connect_ssl_cert_reqs=dict(
+ map=specs.to_str_or_none,
+ default=None,
+ ),
+ #
http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Produce...
+ amqp_publish_retry=dict(
+ map=specs.to_bool,
+ default=False,
+ ),
+ amqp_publish_priority=dict(
+ map=int,
+ valid=lambda x: 0 <= x and x <= 9,
+ default=0,
+ ),
+ #
http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Exchang...
+ amqp_publish_delivery_mode=dict(
+ map=str,
+ valid=specs.is_in("transient", "persistent"),
+ default="persistent",
+ ),
+ amqp_publish_retry_max_retries=dict(
+ map=int,
+ default=None,
+ ),
+ amqp_publish_retry_interval_start=dict(
+ map=int,
+ default=None,
+ ),
+ amqp_publish_retry_interval_step=dict(
+ map=int,
+ default=None,
+ ),
+ amqp_publish_retry_interval_max=dict(
+ map=int,
+ default=None,
+ ),
+)
+
+
+PARAMETER_SPECIFICATION_REQUIRED = object()
+PARAMETER_SPECIFICATION_IGNORED = object()
+
+
+class PulsarJobRunner( AsynchronousJobRunner ):
+ """
+ Pulsar Job Runner
+ """
+ runner_name = "PulsarJobRunner"
+
+ def __init__( self, app, nworkers, **kwds ):
+ """Start the job runner """
+ super( PulsarJobRunner, self ).__init__( app, nworkers,
runner_param_specs=PULSAR_PARAM_SPECS, **kwds )
+ self._init_worker_threads()
+ galaxy_url = self.runner_params.galaxy_url
+ if galaxy_url:
+ galaxy_url = galaxy_url.rstrip("/")
+ self.galaxy_url = galaxy_url
+ self.__init_client_manager()
+ self._monitor()
+
+ def _monitor( self ):
+ # Extension point allow MQ variant to setup callback instead
+ self._init_monitor_thread()
+
+ def __init_client_manager( self ):
+ client_manager_kwargs = {}
+ for kwd in 'manager', 'cache', 'transport':
+ client_manager_kwargs[ kwd ] = self.runner_params[ kwd ]
+ for kwd in self.runner_params.keys():
+ if kwd.startswith( 'amqp_' ):
+ client_manager_kwargs[ kwd ] = self.runner_params[ kwd ]
+ self.client_manager = build_client_manager(**client_manager_kwargs)
+
+ def url_to_destination( self, url ):
+ """Convert a legacy URL to a job destination"""
+ return JobDestination( runner="pulsar",
params=url_to_destination_params( url ) )
+
+ def check_watched_item(self, job_state):
+ try:
+ client = self.get_client_from_state(job_state)
+ status = client.get_status()
+ except Exception:
+ # An orphaned job was put into the queue at app startup, so remote server
went down
+ # either way we are done I guess.
+ self.mark_as_finished(job_state)
+ return None
+ job_state = self._update_job_state_for_status(job_state, status)
+ return job_state
+
+ def _update_job_state_for_status(self, job_state, pulsar_status):
+ if pulsar_status == "complete":
+ self.mark_as_finished(job_state)
+ return None
+ if pulsar_status == "failed":
+ self.fail_job(job_state)
+ return None
+ if pulsar_status == "running" and not job_state.running:
+ job_state.running = True
+ job_state.job_wrapper.change_state( model.Job.states.RUNNING )
+ return job_state
+
+ def queue_job(self, job_wrapper):
+ job_destination = job_wrapper.job_destination
+ self._populate_parameter_defaults( job_destination )
+
+ command_line, client, remote_job_config, compute_environment =
self.__prepare_job( job_wrapper, job_destination )
+
+ if not command_line:
+ return
+
+ try:
+ dependencies_description = PulsarJobRunner.__dependencies_description(
client, job_wrapper )
+ rewrite_paths = not PulsarJobRunner.__rewrite_parameters( client )
+ unstructured_path_rewrites = {}
+ if compute_environment:
+ unstructured_path_rewrites =
compute_environment.unstructured_path_rewrites
+
+ client_job_description = ClientJobDescription(
+ command_line=command_line,
+ input_files=self.get_input_files(job_wrapper),
+ client_outputs=self.__client_outputs(client, job_wrapper),
+ working_directory=job_wrapper.working_directory,
+ tool=job_wrapper.tool,
+ config_files=job_wrapper.extra_filenames,
+ dependencies_description=dependencies_description,
+ env=client.env,
+ rewrite_paths=rewrite_paths,
+ arbitrary_files=unstructured_path_rewrites,
+ )
+ job_id = pulsar_submit_job(client, client_job_description,
remote_job_config)
+ log.info("Pulsar job submitted with job_id %s" % job_id)
+ job_wrapper.set_job_destination( job_destination, job_id )
+ job_wrapper.change_state( model.Job.states.QUEUED )
+ except Exception:
+ job_wrapper.fail( "failure running job", exception=True )
+ log.exception("failure running job %d" % job_wrapper.job_id)
+ return
+
+ pulsar_job_state = AsynchronousJobState()
+ pulsar_job_state.job_wrapper = job_wrapper
+ pulsar_job_state.job_id = job_id
+ pulsar_job_state.old_state = True
+ pulsar_job_state.running = False
+ pulsar_job_state.job_destination = job_destination
+ self.monitor_job(pulsar_job_state)
+
+ def __prepare_job(self, job_wrapper, job_destination):
+ """ Build command-line and Pulsar client for this job.
"""
+ command_line = None
+ client = None
+ remote_job_config = None
+ compute_environment = None
+ try:
+ client = self.get_client_from_wrapper(job_wrapper)
+ tool = job_wrapper.tool
+ remote_job_config = client.setup(tool.id, tool.version)
+ rewrite_parameters = PulsarJobRunner.__rewrite_parameters( client )
+ prepare_kwds = {}
+ if rewrite_parameters:
+ compute_environment = PulsarComputeEnvironment( client, job_wrapper,
remote_job_config )
+ prepare_kwds[ 'compute_environment' ] = compute_environment
+ job_wrapper.prepare( **prepare_kwds )
+ self.__prepare_input_files_locally(job_wrapper)
+ remote_metadata = PulsarJobRunner.__remote_metadata( client )
+ dependency_resolution = PulsarJobRunner.__dependency_resolution( client )
+ metadata_kwds = self.__build_metadata_configuration(client, job_wrapper,
remote_metadata, remote_job_config)
+ remote_command_params = dict(
+ working_directory=remote_job_config['working_directory'],
+ metadata_kwds=metadata_kwds,
+ dependency_resolution=dependency_resolution,
+ )
+ remote_working_directory = remote_job_config['working_directory']
+ # TODO: Following defs work for Pulsar, always worked for Pulsar but should
be
+ # calculated at some other level.
+ remote_job_directory = os.path.abspath(os.path.join(remote_working_directory,
os.path.pardir))
+ remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory,
"tool_files"))
+ container = self._find_container(
+ job_wrapper,
+ compute_working_directory=remote_working_directory,
+ compute_tool_directory=remote_tool_directory,
+ compute_job_directory=remote_job_directory,
+ )
+ command_line = build_command(
+ self,
+ job_wrapper=job_wrapper,
+ container=container,
+ include_metadata=remote_metadata,
+ include_work_dir_outputs=False,
+ remote_command_params=remote_command_params,
+ )
+ except Exception:
+ job_wrapper.fail( "failure preparing job", exception=True )
+ log.exception("failure running job %d" % job_wrapper.job_id)
+
+ # If we were able to get a command line, run the job
+ if not command_line:
+ job_wrapper.finish( '', '' )
+
+ return command_line, client, remote_job_config, compute_environment
+
+ def __prepare_input_files_locally(self, job_wrapper):
+ """Run task splitting commands locally."""
+ prepare_input_files_cmds = getattr(job_wrapper,
'prepare_input_files_cmds', None)
+ if prepare_input_files_cmds is not None:
+ for cmd in prepare_input_files_cmds: # run the commands to stage the input
files
+ if 0 != os.system(cmd):
+ raise Exception('Error running file staging command: %s' %
cmd)
+ job_wrapper.prepare_input_files_cmds = None # prevent them from being used
in-line
+
+ def _populate_parameter_defaults( self, job_destination ):
+ updated = False
+ params = job_destination.params
+ for key, value in self.destination_defaults.iteritems():
+ if key in params:
+ if value is PARAMETER_SPECIFICATION_IGNORED:
+ log.warn( "Pulsar runner in selected configuration ignores
parameter %s" % key )
+ continue
+ #if self.runner_params.get( key, None ):
+ # # Let plugin define defaults for some parameters -
+ # # for instance that way jobs_directory can be
+ # # configured next to AMQP url (where it belongs).
+ # params[ key ] = self.runner_params[ key ]
+ # continue
+
+ if not value:
+ continue
+
+ if value is PARAMETER_SPECIFICATION_REQUIRED:
+ raise Exception( "Pulsar destination does not define required
parameter %s" % key )
+ elif value is not PARAMETER_SPECIFICATION_IGNORED:
+ params[ key ] = value
+ updated = True
+ return updated
+
+ def get_output_files(self, job_wrapper):
+ output_paths = job_wrapper.get_output_fnames()
+ return [ str( o ) for o in output_paths ] # Force job_path from DatasetPath
objects.
+
+ def get_input_files(self, job_wrapper):
+ input_paths = job_wrapper.get_input_paths()
+ return [ str( i ) for i in input_paths ] # Force job_path from DatasetPath
objects.
+
+ def get_client_from_wrapper(self, job_wrapper):
+ job_id = job_wrapper.job_id
+ if hasattr(job_wrapper, 'task_id'):
+ job_id = "%s_%s" % (job_id, job_wrapper.task_id)
+ params = job_wrapper.job_destination.params.copy()
+ for key, value in params.iteritems():
+ if value:
+ params[key] = model.User.expand_user_properties(
job_wrapper.get_job().user, value )
+
+ env = getattr( job_wrapper.job_destination, "env", [] )
+ return self.get_client( params, job_id, env )
+
+ def get_client_from_state(self, job_state):
+ job_destination_params = job_state.job_destination.params
+ job_id = job_state.job_id
+ return self.get_client( job_destination_params, job_id )
+
+ def get_client( self, job_destination_params, job_id, env=[] ):
+ # Cannot use url_for outside of web thread.
+ #files_endpoint = url_for( controller="job_files",
job_id=encoded_job_id )
+
+ encoded_job_id = self.app.security.encode_id(job_id)
+ job_key = self.app.security.encode_id( job_id, kind="jobs_files" )
+ files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % (
+ self.galaxy_url,
+ encoded_job_id,
+ job_key
+ )
+ get_client_kwds = dict(
+ job_id=str( job_id ),
+ files_endpoint=files_endpoint,
+ env=env
+ )
+ return self.client_manager.get_client( job_destination_params, **get_client_kwds
)
+
+ def finish_job( self, job_state ):
+ stderr = stdout = ''
+ job_wrapper = job_state.job_wrapper
+ try:
+ client = self.get_client_from_state(job_state)
+ run_results = client.full_status()
+ remote_working_directory = run_results.get("working_directory",
None)
+ stdout = run_results.get('stdout', '')
+ stderr = run_results.get('stderr', '')
+ exit_code = run_results.get('returncode', None)
+ pulsar_outputs = PulsarOutputs.from_status_response(run_results)
+ # Use Pulsar client code to transfer/copy files back
+ # and cleanup job if needed.
+ completed_normally = \
+ job_wrapper.get_state() not in [ model.Job.states.ERROR,
model.Job.states.DELETED ]
+ cleanup_job = self.app.config.cleanup_job
+ client_outputs = self.__client_outputs(client, job_wrapper)
+ finish_args = dict( client=client,
+ job_completed_normally=completed_normally,
+ cleanup_job=cleanup_job,
+ client_outputs=client_outputs,
+ pulsar_outputs=pulsar_outputs )
+ failed = pulsar_finish_job( **finish_args )
+ if failed:
+ job_wrapper.fail("Failed to find or download one or more job outputs
from remote server.", exception=True)
+ except Exception:
+ message = GENERIC_REMOTE_ERROR
+ job_wrapper.fail( message, exception=True )
+ log.exception("failure finishing job %d" % job_wrapper.job_id)
+ return
+ if not PulsarJobRunner.__remote_metadata( client ):
+ self._handle_metadata_externally( job_wrapper, resolve_requirements=True )
+ # Finish the job
+ try:
+ job_wrapper.finish(
+ stdout,
+ stderr,
+ exit_code,
+ remote_working_directory=remote_working_directory
+ )
+ except Exception:
+ log.exception("Job wrapper finish method failed")
+ job_wrapper.fail("Unable to finish job", exception=True)
+
+ def fail_job( self, job_state ):
+ """
+ Seperated out so we can use the worker threads for it.
+ """
+ self.stop_job( self.sa_session.query( self.app.model.Job ).get(
job_state.job_wrapper.job_id ) )
+ job_state.job_wrapper.fail( getattr( job_state, "fail_message",
GENERIC_REMOTE_ERROR ) )
+
+ def check_pid( self, pid ):
+ try:
+ os.kill( pid, 0 )
+ return True
+ except OSError, e:
+ if e.errno == errno.ESRCH:
+ log.debug( "check_pid(): PID %d is dead" % pid )
+ else:
+ log.warning( "check_pid(): Got errno %s when attempting to check PID
%d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) )
+ return False
+
+ def stop_job( self, job ):
+ #if our local job has JobExternalOutputMetadata associated, then our primary job
has to have already finished
+ job_ext_output_metadata = job.get_external_output_metadata()
+ if job_ext_output_metadata:
+ pid = job_ext_output_metadata[0].job_runner_external_pid # every
JobExternalOutputMetadata has a pid set, we just need to take from one of them
+ if pid in [ None, '' ]:
+ log.warning( "stop_job(): %s: no PID in database for job, unable to
stop" % job.id )
+ return
+ pid = int( pid )
+ if not self.check_pid( pid ):
+ log.warning( "stop_job(): %s: PID %d was already dead or can't
be signaled" % ( job.id, pid ) )
+ return
+ for sig in [ 15, 9 ]:
+ try:
+ os.killpg( pid, sig )
+ except OSError, e:
+ log.warning( "stop_job(): %s: Got errno %s when attempting to
signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror )
)
+ return # give up
+ sleep( 2 )
+ if not self.check_pid( pid ):
+ log.debug( "stop_job(): %s: PID %d successfully killed with
signal %d" % ( job.id, pid, sig ) )
+ return
+ else:
+ log.warning( "stop_job(): %s: PID %d refuses to die after
signaling TERM/KILL" % ( job.id, pid ) )
+ else:
+ # Remote kill
+ pulsar_url = job.job_runner_name
+ job_id = job.job_runner_external_id
+ log.debug("Attempt remote Pulsar kill of job with url %s and id %s"
% (pulsar_url, job_id))
+ client = self.get_client(job.destination_params, job_id)
+ client.kill()
+
+ def recover( self, job, job_wrapper ):
+ """Recovers jobs stuck in the queued/running state when Galaxy
started"""
+ job_state = self._job_state( job, job_wrapper )
+ job_wrapper.command_line = job.get_command_line()
+ state = job.get_state()
+ if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]:
+ log.debug( "(Pulsar/%s) is still in running state, adding to the Pulsar
queue" % ( job.get_id()) )
+ job_state.old_state = True
+ job_state.running = state == model.Job.states.RUNNING
+ self.monitor_queue.put( job_state )
+
+ def shutdown( self ):
+ super( PulsarJobRunner, self ).shutdown()
+ self.client_manager.shutdown()
+
+ def _job_state( self, job, job_wrapper ):
+ job_state = AsynchronousJobState()
+ # TODO: Determine why this is set when using normal message queue updates
+ # but not CLI submitted MQ updates...
+ raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id
+ job_state.job_id = str( raw_job_id )
+ job_state.runner_url = job_wrapper.get_job_runner_url()
+ job_state.job_destination = job_wrapper.job_destination
+ job_state.job_wrapper = job_wrapper
+ return job_state
+
+ def __client_outputs( self, client, job_wrapper ):
+ work_dir_outputs = self.get_work_dir_outputs( job_wrapper )
+ output_files = self.get_output_files( job_wrapper )
+ client_outputs = ClientOutputs(
+ working_directory=job_wrapper.working_directory,
+ work_dir_outputs=work_dir_outputs,
+ output_files=output_files,
+ version_file=job_wrapper.get_version_string_path(),
+ )
+ return client_outputs
+
+ @staticmethod
+ def __dependencies_description( pulsar_client, job_wrapper ):
+ dependency_resolution = PulsarJobRunner.__dependency_resolution( pulsar_client )
+ remote_dependency_resolution = dependency_resolution == "remote"
+ if not remote_dependency_resolution:
+ return None
+ requirements = job_wrapper.tool.requirements or []
+ installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies or []
+ return dependencies.DependenciesDescription(
+ requirements=requirements,
+ installed_tool_dependencies=installed_tool_dependencies,
+ )
+
+ @staticmethod
+ def __dependency_resolution( pulsar_client ):
+ dependency_resolution = pulsar_client.destination_params.get(
"dependency_resolution", "local" )
+ if dependency_resolution not in ["none", "local",
"remote"]:
+ raise Exception("Unknown dependency_resolution value encountered
%s" % dependency_resolution)
+ return dependency_resolution
+
+ @staticmethod
+ def __remote_metadata( pulsar_client ):
+ remote_metadata = string_as_bool_or_none( pulsar_client.destination_params.get(
"remote_metadata", False ) )
+ return remote_metadata
+
+ @staticmethod
+ def __use_remote_datatypes_conf( pulsar_client ):
+ """ When setting remote metadata, use integrated datatypes from
this
+ Galaxy instance or use the datatypes config configured via the remote
+ Pulsar.
+
+ Both options are broken in different ways for same reason - datatypes
+ may not match. One can push the local datatypes config to the remote
+ server - but there is no guarentee these datatypes will be defined
+ there. Alternatively, one can use the remote datatype config - but
+ there is no guarentee that it will contain all the datatypes available
+ to this Galaxy.
+ """
+ use_remote_datatypes = string_as_bool_or_none(
pulsar_client.destination_params.get( "use_remote_datatypes", False ) )
+ return use_remote_datatypes
+
+ @staticmethod
+ def __rewrite_parameters( pulsar_client ):
+ return string_as_bool_or_none( pulsar_client.destination_params.get(
"rewrite_parameters", False ) ) or False
+
+ def __build_metadata_configuration(self, client, job_wrapper, remote_metadata,
remote_job_config):
+ metadata_kwds = {}
+ if remote_metadata:
+ remote_system_properties =
remote_job_config.get("system_properties", {})
+ remote_galaxy_home = remote_system_properties.get("galaxy_home",
None)
+ if not remote_galaxy_home:
+ raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE)
+ metadata_kwds['exec_dir'] = remote_galaxy_home
+ outputs_directory = remote_job_config['outputs_directory']
+ configs_directory = remote_job_config['configs_directory']
+ working_directory = remote_job_config['working_directory']
+ # For metadata calculation, we need to build a list of of output
+ # file objects with real path indicating location on Galaxy server
+ # and false path indicating location on compute server. Since the
+ # Pulsar disables from_work_dir copying as part of the job command
+ # line we need to take the list of output locations on the Pulsar
+ # server (produced by self.get_output_files(job_wrapper)) and for
+ # each work_dir output substitute the effective path on the Pulsar
+ # server relative to the remote working directory as the
+ # false_path to send the metadata command generation module.
+ work_dir_outputs = self.get_work_dir_outputs(job_wrapper,
job_working_directory=working_directory)
+ outputs = [Bunch(false_path=os.path.join(outputs_directory,
os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)]
+ for output in outputs:
+ for pulsar_workdir_path, real_path in work_dir_outputs:
+ if real_path == output.real_path:
+ output.false_path = pulsar_workdir_path
+ metadata_kwds['output_fnames'] = outputs
+ metadata_kwds['compute_tmp_dir'] = working_directory
+ metadata_kwds['config_root'] = remote_galaxy_home
+ default_config_file = os.path.join(remote_galaxy_home,
'universe_wsgi.ini')
+ metadata_kwds['config_file'] =
remote_system_properties.get('galaxy_config_file', default_config_file)
+ metadata_kwds['dataset_files_path'] =
remote_system_properties.get('galaxy_dataset_files_path', None)
+ if PulsarJobRunner.__use_remote_datatypes_conf( client ):
+ remote_datatypes_config =
remote_system_properties.get('galaxy_datatypes_config_file', None)
+ if not remote_datatypes_config:
+ log.warn(NO_REMOTE_DATATYPES_CONFIG)
+ remote_datatypes_config = os.path.join(remote_galaxy_home,
'datatypes_conf.xml')
+ metadata_kwds['datatypes_config'] = remote_datatypes_config
+ else:
+ integrates_datatypes_config =
self.app.datatypes_registry.integrated_datatypes_configs
+ # Ensure this file gets pushed out to the remote config dir.
+ job_wrapper.extra_filenames.append(integrates_datatypes_config)
+
+ metadata_kwds['datatypes_config'] =
os.path.join(configs_directory, os.path.basename(integrates_datatypes_config))
+ return metadata_kwds
+
+
+class PulsarLegacyJobRunner( PulsarJobRunner ):
+ destination_defaults = dict(
+ rewrite_parameters="false",
+ dependency_resolution="local",
+ )
+
+
+class PulsarMQJobRunner( PulsarJobRunner ):
+ destination_defaults = dict(
+ default_file_action="remote_transfer",
+ rewrite_parameters="true",
+ dependency_resolution="remote",
+ jobs_directory=PARAMETER_SPECIFICATION_REQUIRED,
+ url=PARAMETER_SPECIFICATION_IGNORED,
+ private_token=PARAMETER_SPECIFICATION_IGNORED
+ )
+
+ def _monitor( self ):
+ # This is a message queue driven runner, don't monitor
+ # just setup required callback.
+ self.client_manager.ensure_has_status_update_callback(self.__async_update)
+
+ def __async_update( self, full_status ):
+ job_id = None
+ try:
+ job_id = full_status[ "job_id" ]
+ job, job_wrapper =
self.app.job_manager.job_handler.job_queue.job_pair_for_id( job_id )
+ job_state = self._job_state( job, job_wrapper )
+ self._update_job_state_for_status(job_state, full_status[ "status"
] )
+ except Exception:
+ log.exception( "Failed to update Pulsar job status for job_id %s" %
job_id )
+ raise
+ # Nothing else to do? - Attempt to fail the job?
+
+
+class PulsarRESTJobRunner( PulsarJobRunner ):
+ destination_defaults = dict(
+ default_file_action="transfer",
+ rewrite_parameters="true",
+ dependency_resolution="remote",
+ url=PARAMETER_SPECIFICATION_REQUIRED,
+ )
+
+
+class PulsarComputeEnvironment( ComputeEnvironment ):
+
+ def __init__( self, pulsar_client, job_wrapper, remote_job_config ):
+ self.pulsar_client = pulsar_client
+ self.job_wrapper = job_wrapper
+ self.local_path_config = job_wrapper.default_compute_environment()
+ self.unstructured_path_rewrites = {}
+ # job_wrapper.prepare is going to expunge the job backing the following
+ # computations, so precalculate these paths.
+ self._wrapper_input_paths = self.local_path_config.input_paths()
+ self._wrapper_output_paths = self.local_path_config.output_paths()
+ self.path_mapper = PathMapper(pulsar_client, remote_job_config,
self.local_path_config.working_directory())
+ self._config_directory = remote_job_config[ "configs_directory" ]
+ self._working_directory = remote_job_config[ "working_directory" ]
+ self._sep = remote_job_config[ "system_properties" ][
"separator" ]
+ self._tool_dir = remote_job_config[ "tools_directory" ]
+ version_path = self.local_path_config.version_path()
+ new_version_path = self.path_mapper.remote_version_path_rewrite(version_path)
+ if new_version_path:
+ version_path = new_version_path
+ self._version_path = version_path
+
+ def output_paths( self ):
+ local_output_paths = self._wrapper_output_paths
+
+ results = []
+ for local_output_path in local_output_paths:
+ wrapper_path = str( local_output_path )
+ remote_path = self.path_mapper.remote_output_path_rewrite( wrapper_path )
+ results.append( self._dataset_path( local_output_path, remote_path ) )
+ return results
+
+ def input_paths( self ):
+ local_input_paths = self._wrapper_input_paths
+
+ results = []
+ for local_input_path in local_input_paths:
+ wrapper_path = str( local_input_path )
+ # This will over-copy in some cases. For instance in the case of task
+ # splitting, this input will be copied even though only the work dir
+ # input will actually be used.
+ remote_path = self.path_mapper.remote_input_path_rewrite( wrapper_path )
+ results.append( self._dataset_path( local_input_path, remote_path ) )
+ return results
+
+ def _dataset_path( self, local_dataset_path, remote_path ):
+ remote_extra_files_path = None
+ if remote_path:
+ remote_extra_files_path = "%s_files" % remote_path[ 0:-len(
".dat" ) ]
+ return local_dataset_path.with_path_for_job( remote_path, remote_extra_files_path
)
+
+ def working_directory( self ):
+ return self._working_directory
+
+ def config_directory( self ):
+ return self._config_directory
+
+ def new_file_path( self ):
+ return self.working_directory() # Problems with doing this?
+
+ def sep( self ):
+ return self._sep
+
+ def version_path( self ):
+ return self._version_path
+
+ def rewriter( self, parameter_value ):
+ unstructured_path_rewrites = self.unstructured_path_rewrites
+ if parameter_value in unstructured_path_rewrites:
+ # Path previously mapped, use previous mapping.
+ return unstructured_path_rewrites[ parameter_value ]
+ if parameter_value in unstructured_path_rewrites.itervalues():
+ # Path is a rewritten remote path (this might never occur,
+ # consider dropping check...)
+ return parameter_value
+
+ rewrite, new_unstructured_path_rewrites =
self.path_mapper.check_for_arbitrary_rewrite( parameter_value )
+ if rewrite:
+ unstructured_path_rewrites.update(new_unstructured_path_rewrites)
+ return rewrite
+ else:
+ # Did need to rewrite, use original path or value.
+ return parameter_value
+
+ def unstructured_path_rewriter( self ):
+ return self.rewriter
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/__init__.py
--- a/lib/galaxy/jobs/runners/util/__init__.py
+++ b/lib/galaxy/jobs/runners/util/__init__.py
@@ -1,7 +1,7 @@
"""
This module and its submodules contains utilities for running external
processes and interfacing with job managers. This module should contain
-functionality shared between Galaxy and the LWR.
+functionality shared between Galaxy and the Pulsar.
"""
from galaxy.util.bunch import Bunch
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/factory.py
--- a/lib/galaxy/jobs/runners/util/cli/factory.py
+++ b/lib/galaxy/jobs/runners/util/cli/factory.py
@@ -5,7 +5,7 @@
)
code_dir = 'lib'
except ImportError:
- from lwr.managers.util.cli import (
+ from pulsar.managers.util.cli import (
CliInterface,
split_params
)
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/job/slurm.py
--- a/lib/galaxy/jobs/runners/util/cli/job/slurm.py
+++ b/lib/galaxy/jobs/runners/util/cli/job/slurm.py
@@ -5,7 +5,7 @@
from galaxy.model import Job
job_states = Job.states
except ImportError:
- # Not in Galaxy, map Galaxy job states to LWR ones.
+ # Not in Galaxy, map Galaxy job states to Pulsar ones.
from galaxy.util import enum
job_states = enum(RUNNING='running', OK='complete',
QUEUED='queued')
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/job/torque.py
--- a/lib/galaxy/jobs/runners/util/cli/job/torque.py
+++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py
@@ -7,7 +7,7 @@
from galaxy.model import Job
job_states = Job.states
except ImportError:
- # Not in Galaxy, map Galaxy job states to LWR ones.
+ # Not in Galaxy, map Galaxy job states to Pulsar ones.
from galaxy.util import enum
job_states = enum(RUNNING='running', OK='complete',
QUEUED='queued')
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/__init__.py
--- a/lib/galaxy/objectstore/__init__.py
+++ b/lib/galaxy/objectstore/__init__.py
@@ -623,9 +623,9 @@
elif store == 'irods':
from .rods import IRODSObjectStore
return IRODSObjectStore(config=config, config_xml=config_xml)
- elif store == 'lwr':
- from .lwr import LwrObjectStore
- return LwrObjectStore(config=config, config_xml=config_xml)
+ elif store == 'pulsar':
+ from .pulsar import PulsarObjectStore
+ return PulsarObjectStore(config=config, config_xml=config_xml)
else:
log.error("Unrecognized object store definition: {0}".format(store))
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/lwr.py
--- a/lib/galaxy/objectstore/lwr.py
+++ /dev/null
@@ -1,76 +0,0 @@
-from __future__ import absolute_import # Need to import lwr_client absolutely.
-from ..objectstore import ObjectStore
-try:
- from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager
-except ImportError:
- from lwr.lwr_client.manager import ObjectStoreClientManager
-
-
-class LwrObjectStore(ObjectStore):
- """
- Object store implementation that delegates to a remote LWR server.
-
- This may be more aspirational than practical for now, it would be good to
- Galaxy to a point that a handler thread could be setup that doesn't attempt
- to access the disk files returned by a (this) object store - just passing
- them along to the LWR unmodified. That modification - along with this
- implementation and LWR job destinations would then allow Galaxy to fully
- manage jobs on remote servers with completely different mount points.
-
- This implementation should be considered beta and may be dropped from
- Galaxy at some future point or significantly modified.
- """
-
- def __init__(self, config, config_xml):
- self.lwr_client = self.__build_lwr_client(config_xml)
-
- def exists(self, obj, **kwds):
- return self.lwr_client.exists(**self.__build_kwds(obj, **kwds))
-
- def file_ready(self, obj, **kwds):
- return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds))
-
- def create(self, obj, **kwds):
- return self.lwr_client.create(**self.__build_kwds(obj, **kwds))
-
- def empty(self, obj, **kwds):
- return self.lwr_client.empty(**self.__build_kwds(obj, **kwds))
-
- def size(self, obj, **kwds):
- return self.lwr_client.size(**self.__build_kwds(obj, **kwds))
-
- def delete(self, obj, **kwds):
- return self.lwr_client.delete(**self.__build_kwds(obj, **kwds))
-
- # TODO: Optimize get_data.
- def get_data(self, obj, **kwds):
- return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds))
-
- def get_filename(self, obj, **kwds):
- return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds))
-
- def update_from_file(self, obj, **kwds):
- return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds))
-
- def get_store_usage_percent(self):
- return self.lwr_client.get_store_usage_percent()
-
- def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False,
alt_name=None):
- return None
-
- def __build_kwds(self, obj, **kwds):
- kwds['object_id'] = obj.id
- return kwds
- pass
-
- def __build_lwr_client(self, config_xml):
- url = config_xml.get("url")
- private_token = config_xml.get("private_token", None)
- transport = config_xml.get("transport", None)
- manager_options = dict(transport=transport)
- client_options = dict(url=url, private_token=private_token)
- lwr_client =
ObjectStoreClientManager(**manager_options).get_client(client_options)
- return lwr_client
-
- def shutdown(self):
- pass
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/pulsar.py
--- /dev/null
+++ b/lib/galaxy/objectstore/pulsar.py
@@ -0,0 +1,73 @@
+from __future__ import absolute_import # Need to import pulsar_client absolutely.
+from ..objectstore import ObjectStore
+from pulsar.client.manager import ObjectStoreClientManager
+
+
+class PulsarObjectStore(ObjectStore):
+ """
+ Object store implementation that delegates to a remote Pulsar server.
+
+ This may be more aspirational than practical for now, it would be good to
+ Galaxy to a point that a handler thread could be setup that doesn't attempt
+ to access the disk files returned by a (this) object store - just passing
+ them along to the Pulsar unmodified. That modification - along with this
+ implementation and Pulsar job destinations would then allow Galaxy to fully
+ manage jobs on remote servers with completely different mount points.
+
+ This implementation should be considered beta and may be dropped from
+ Galaxy at some future point or significantly modified.
+ """
+
+ def __init__(self, config, config_xml):
+ self.pulsar_client = self.__build_pulsar_client(config_xml)
+
+ def exists(self, obj, **kwds):
+ return self.pulsar_client.exists(**self.__build_kwds(obj, **kwds))
+
+ def file_ready(self, obj, **kwds):
+ return self.pulsar_client.file_ready(**self.__build_kwds(obj, **kwds))
+
+ def create(self, obj, **kwds):
+ return self.pulsar_client.create(**self.__build_kwds(obj, **kwds))
+
+ def empty(self, obj, **kwds):
+ return self.pulsar_client.empty(**self.__build_kwds(obj, **kwds))
+
+ def size(self, obj, **kwds):
+ return self.pulsar_client.size(**self.__build_kwds(obj, **kwds))
+
+ def delete(self, obj, **kwds):
+ return self.pulsar_client.delete(**self.__build_kwds(obj, **kwds))
+
+ # TODO: Optimize get_data.
+ def get_data(self, obj, **kwds):
+ return self.pulsar_client.get_data(**self.__build_kwds(obj, **kwds))
+
+ def get_filename(self, obj, **kwds):
+ return self.pulsar_client.get_filename(**self.__build_kwds(obj, **kwds))
+
+ def update_from_file(self, obj, **kwds):
+ return self.pulsar_client.update_from_file(**self.__build_kwds(obj, **kwds))
+
+ def get_store_usage_percent(self):
+ return self.pulsar_client.get_store_usage_percent()
+
+ def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False,
alt_name=None):
+ return None
+
+ def __build_kwds(self, obj, **kwds):
+ kwds['object_id'] = obj.id
+ return kwds
+ pass
+
+ def __build_pulsar_client(self, config_xml):
+ url = config_xml.get("url")
+ private_token = config_xml.get("private_token", None)
+ transport = config_xml.get("transport", None)
+ manager_options = dict(transport=transport)
+ client_options = dict(url=url, private_token=private_token)
+ pulsar_client =
ObjectStoreClientManager(**manager_options).get_client(client_options)
+ return pulsar_client
+
+ def shutdown(self):
+ pass
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/tools/deps/dependencies.py
--- a/lib/galaxy/tools/deps/dependencies.py
+++ b/lib/galaxy/tools/deps/dependencies.py
@@ -8,7 +8,7 @@
related context required to resolve dependencies via the
ToolShedPackageDependencyResolver.
- This is meant to enable remote resolution of dependencies, by the LWR or
+ This is meant to enable remote resolution of dependencies, by the Pulsar or
other potential remote execution mechanisms.
"""
@@ -39,7 +39,7 @@
@staticmethod
def _toolshed_install_dependency_from_dict(as_dict):
- # Rather than requiring full models in LWR, just use simple objects
+ # Rather than requiring full models in Pulsar, just use simple objects
# containing only properties and associations used to resolve
# dependencies for tool execution.
repository_object = bunch.Bunch(
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/__init__.py
--- /dev/null
+++ b/lib/pulsar/client/__init__.py
@@ -0,0 +1,62 @@
+"""
+pulsar client
+======
+
+This module contains logic for interfacing with an external Pulsar server.
+
+------------------
+Configuring Galaxy
+------------------
+
+Galaxy job runners are configured in Galaxy's ``job_conf.xml`` file. See
``job_conf.xml.sample_advanced``
+in your Galaxy code base or on
+`Bitbucket
<
https://bitbucket.org/galaxy/galaxy-dist/src/tip/job_conf.xml.sample_adva...
+for information on how to configure Galaxy to interact with the Pulsar.
+
+Galaxy also supports an older, less rich configuration of job runners directly
+in its main ``universe_wsgi.ini`` file. The following section describes how to
+configure Galaxy to communicate with the Pulsar in this legacy mode.
+
+Legacy
+------
+
+A Galaxy tool can be configured to be executed remotely via Pulsar by
+adding a line to the ``universe_wsgi.ini`` file under the
+``galaxy:tool_runners`` section with the format::
+
+ <tool_id> = pulsar://http://<pulsar_host>:<pulsar_port>
+
+As an example, if a host named remotehost is running the Pulsar server
+application on port ``8913``, then the tool with id ``test_tool`` can
+be configured to run remotely on remotehost by adding the following
+line to ``universe.ini``::
+
+ test_tool = pulsar://http://remotehost:8913
+
+Remember this must be added after the ``[galaxy:tool_runners]`` header
+in the ``universe.ini`` file.
+
+
+"""
+
+from .staging.down import finish_job
+from .staging.up import submit_job
+from .staging import ClientJobDescription
+from .staging import PulsarOutputs
+from .staging import ClientOutputs
+from .client import OutputNotFoundException
+from .manager import build_client_manager
+from .destination import url_to_destination_params
+from .path_mapper import PathMapper
+
+__all__ = [
+ build_client_manager,
+ OutputNotFoundException,
+ url_to_destination_params,
+ finish_job,
+ submit_job,
+ ClientJobDescription,
+ PulsarOutputs,
+ ClientOutputs,
+ PathMapper,
+]
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/action_mapper.py
--- /dev/null
+++ b/lib/pulsar/client/action_mapper.py
@@ -0,0 +1,567 @@
+from json import load
+from os import makedirs
+from os.path import exists
+from os.path import abspath
+from os.path import dirname
+from os.path import join
+from os.path import basename
+from os.path import sep
+import fnmatch
+from re import compile
+from re import escape
+import galaxy.util
+from galaxy.util.bunch import Bunch
+from .config_util import read_file
+from .util import directory_files
+from .util import unique_path_prefix
+from .transport import get_file
+from .transport import post_file
+
+
+DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be,
exception?
+DEFAULT_PATH_MAPPER_TYPE = 'prefix'
+
+STAGING_ACTION_REMOTE = "remote"
+STAGING_ACTION_LOCAL = "local"
+STAGING_ACTION_NONE = None
+STAGING_ACTION_DEFAULT = "default"
+
+# Poor man's enum.
+path_type = Bunch(
+ # Galaxy input datasets and extra files.
+ INPUT="input",
+ # Galaxy config and param files.
+ CONFIG="config",
+ # Files from tool's tool_dir (for now just wrapper if available).
+ TOOL="tool",
+ # Input work dir files - e.g. metadata files, task-split input files, etc..
+ WORKDIR="workdir",
+ # Galaxy output datasets in their final home.
+ OUTPUT="output",
+ # Galaxy from_work_dir output paths and other files (e.g. galaxy.json)
+ OUTPUT_WORKDIR="output_workdir",
+ # Other fixed tool parameter paths (likely coming from tool data, but not
+ # nessecarily). Not sure this is the best name...
+ UNSTRUCTURED="unstructured",
+)
+
+
+ACTION_DEFAULT_PATH_TYPES = [
+ path_type.INPUT,
+ path_type.CONFIG,
+ path_type.TOOL,
+ path_type.WORKDIR,
+ path_type.OUTPUT,
+ path_type.OUTPUT_WORKDIR,
+]
+ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED]
+
+
+class FileActionMapper(object):
+ """
+ Objects of this class define how paths are mapped to actions.
+
+ >>> json_string = r'''{"paths": [ \
+ {"path": "/opt/galaxy", "action": "none"},
\
+ {"path": "/galaxy/data", "action":
"transfer"}, \
+ {"path": "/cool/bamfiles/**/*.bam", "action":
"copy", "match_type": "glob"}, \
+ {"path": ".*/dataset_\\\\d+.dat", "action":
"copy", "match_type": "regex"} \
+ ]}'''
+ >>> from tempfile import NamedTemporaryFile
+ >>> from os import unlink
+ >>> def mapper_for(default_action, config_contents):
+ ... f = NamedTemporaryFile(delete=False)
+ ... f.write(config_contents.encode('UTF-8'))
+ ... f.close()
+ ... mock_client = Bunch(default_file_action=default_action,
action_config_path=f.name, files_endpoint=None)
+ ... mapper = FileActionMapper(mock_client)
+ ... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and
deserialize it to make sure still works
+ ... unlink(f.name)
+ ... return mapper
+ >>> mapper = mapper_for(default_action='none',
config_contents=json_string)
+ >>> # Test first config line above, implicit path prefix mapper
+ >>> action =
mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input')
+ >>> action.action_type == u'none'
+ True
+ >>> action.staging_needed
+ False
+ >>> # Test another (2nd) mapper, this one with a different action
+ >>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat',
'input')
+ >>> action.action_type == u'transfer'
+ True
+ >>> action.staging_needed
+ True
+ >>> # Always at least copy work_dir outputs.
+ >>> action =
mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir')
+ >>> action.action_type == u'copy'
+ True
+ >>> action.staging_needed
+ True
+ >>> # Test glob mapper (matching test)
+ >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam',
'input').action_type == u'copy'
+ True
+ >>> # Test glob mapper (non-matching test)
+ >>>
mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai',
'input').action_type == u'none'
+ True
+ >>> # Regex mapper test.
+ >>> mapper.action('/old/galaxy/data/dataset_10245.dat',
'input').action_type == u'copy'
+ True
+ >>> # Doesn't map unstructured paths by default
+ >>> mapper.action('/old/galaxy/data/dataset_10245.dat',
'unstructured').action_type == u'none'
+ True
+ >>> input_only_mapper = mapper_for(default_action="none",
config_contents=r'''{"paths": [ \
+ {"path": "/", "action": "transfer",
"path_types": "input"} \
+ ] }''')
+ >>> input_only_mapper.action('/dataset_1.dat',
'input').action_type == u'transfer'
+ True
+ >>> input_only_mapper.action('/dataset_1.dat',
'output').action_type == u'none'
+ True
+ >>> unstructured_mapper = mapper_for(default_action="none",
config_contents=r'''{"paths": [ \
+ {"path": "/", "action": "transfer",
"path_types": "*any*"} \
+ ] }''')
+ >>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat',
'unstructured').action_type == u'transfer'
+ True
+ """
+
+ def __init__(self, client=None, config=None):
+ if config is None and client is None:
+ message = "FileActionMapper must be constructed from either a client or
a config dictionary."
+ raise Exception(message)
+ if config is None:
+ config = self.__client_to_config(client)
+ self.default_action = config.get("default_action",
"transfer")
+ self.mappers = mappers_from_dicts(config.get("paths", []))
+ self.files_endpoint = config.get("files_endpoint", None)
+
+ def action(self, path, type, mapper=None):
+ mapper = self.__find_mapper(path, type, mapper)
+ action_class = self.__action_class(path, type, mapper)
+ file_lister = DEFAULT_FILE_LISTER
+ action_kwds = {}
+ if mapper:
+ file_lister = mapper.file_lister
+ action_kwds = mapper.action_kwds
+ action = action_class(path, file_lister=file_lister, **action_kwds)
+ self.__process_action(action, type)
+ return action
+
+ def unstructured_mappers(self):
+ """ Return mappers that will map 'unstructured' files
(i.e. go beyond
+ mapping inputs, outputs, and config files).
+ """
+ return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers)
+
+ def to_dict(self):
+ return dict(
+ default_action=self.default_action,
+ files_endpoint=self.files_endpoint,
+ paths=map(lambda m: m.to_dict(), self.mappers)
+ )
+
+ def __client_to_config(self, client):
+ action_config_path = client.action_config_path
+ if action_config_path:
+ config = read_file(action_config_path)
+ else:
+ config = dict()
+ config["default_action"] = client.default_file_action
+ config["files_endpoint"] = client.files_endpoint
+ return config
+
+ def __load_action_config(self, path):
+ config = load(open(path, 'rb'))
+ self.mappers = mappers_from_dicts(config.get('paths', []))
+
+ def __find_mapper(self, path, type, mapper=None):
+ if not mapper:
+ normalized_path = abspath(path)
+ for query_mapper in self.mappers:
+ if query_mapper.matches(normalized_path, type):
+ mapper = query_mapper
+ break
+ return mapper
+
+ def __action_class(self, path, type, mapper):
+ action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else
"none"
+ if mapper:
+ action_type = mapper.action_type
+ if type in ["workdir", "output_workdir"] and action_type ==
"none":
+ # We are changing the working_directory relative to what
+ # Galaxy would use, these need to be copied over.
+ action_type = "copy"
+ action_class = actions.get(action_type, None)
+ if action_class is None:
+ message_template = "Unknown action_type encountered %s while trying to
map path %s"
+ message_args = (action_type, path)
+ raise Exception(message_template % message_args)
+ return action_class
+
+ def __process_action(self, action, file_type):
+ """ Extension point to populate extra action information after an
+ action has been created.
+ """
+ if action.action_type == "remote_transfer":
+ url_base = self.files_endpoint
+ if not url_base:
+ raise Exception("Attempted to use remote_transfer action with
defining a files_endpoint")
+ if "?" not in url_base:
+ url_base = "%s?" % url_base
+ # TODO: URL encode path.
+ url = "%s&path=%s&file_type=%s" % (url_base, action.path,
file_type)
+ action.url = url
+
+REQUIRED_ACTION_KWD = object()
+
+
+class BaseAction(object):
+ action_spec = {}
+
+ def __init__(self, path, file_lister=None):
+ self.path = path
+ self.file_lister = file_lister or DEFAULT_FILE_LISTER
+
+ def unstructured_map(self, path_helper):
+ unstructured_map = self.file_lister.unstructured_map(self.path)
+ if self.staging_needed:
+ # To ensure uniqueness, prepend unique prefix to each name
+ prefix = unique_path_prefix(self.path)
+ for path, name in unstructured_map.iteritems():
+ unstructured_map[path] = join(prefix, name)
+ else:
+ path_rewrites = {}
+ for path in unstructured_map:
+ rewrite = self.path_rewrite(path_helper, path)
+ if rewrite:
+ path_rewrites[path] = rewrite
+ unstructured_map = path_rewrites
+ return unstructured_map
+
+ @property
+ def staging_needed(self):
+ return self.staging != STAGING_ACTION_NONE
+
+ @property
+ def staging_action_local(self):
+ return self.staging == STAGING_ACTION_LOCAL
+
+
+class NoneAction(BaseAction):
+ """ This action indicates the corresponding path does not require any
+ additional action. This should indicate paths that are available both on
+ the Pulsar client (i.e. Galaxy server) and remote Pulsar server with the same
+ paths. """
+ action_type = "none"
+ staging = STAGING_ACTION_NONE
+
+ def to_dict(self):
+ return dict(path=self.path, action_type=self.action_type)
+
+ @classmethod
+ def from_dict(cls, action_dict):
+ return NoneAction(path=action_dict["path"])
+
+ def path_rewrite(self, path_helper, path=None):
+ return None
+
+
+class RewriteAction(BaseAction):
+ """ This actin indicates the Pulsar server should simply rewrite the
path
+ to the specified file.
+ """
+ action_spec = dict(
+ source_directory=REQUIRED_ACTION_KWD,
+ destination_directory=REQUIRED_ACTION_KWD
+ )
+ action_type = "rewrite"
+ staging = STAGING_ACTION_NONE
+
+ def __init__(self, path, file_lister=None, source_directory=None,
destination_directory=None):
+ self.path = path
+ self.file_lister = file_lister or DEFAULT_FILE_LISTER
+ self.source_directory = source_directory
+ self.destination_directory = destination_directory
+
+ def to_dict(self):
+ return dict(
+ path=self.path,
+ action_type=self.action_type,
+ source_directory=self.source_directory,
+ destination_directory=self.destination_directory,
+ )
+
+ @classmethod
+ def from_dict(cls, action_dict):
+ return RewriteAction(
+ path=action_dict["path"],
+ source_directory=action_dict["source_directory"],
+ destination_directory=action_dict["destination_directory"],
+ )
+
+ def path_rewrite(self, path_helper, path=None):
+ if not path:
+ path = self.path
+ new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory,
self.destination_directory)
+ return None if new_path == self.path else new_path
+
+
+class TransferAction(BaseAction):
+ """ This actions indicates that the Pulsar client should initiate an
HTTP
+ transfer of the corresponding path to the remote Pulsar server before
+ launching the job. """
+ action_type = "transfer"
+ staging = STAGING_ACTION_LOCAL
+
+
+class CopyAction(BaseAction):
+ """ This action indicates that the Pulsar client should execute a file
system
+ copy of the corresponding path to the Pulsar staging directory prior to
+ launching the corresponding job. """
+ action_type = "copy"
+ staging = STAGING_ACTION_LOCAL
+
+
+class RemoteCopyAction(BaseAction):
+ """ This action indicates the Pulsar server should copy the file
before
+ execution via direct file system copy. This is like a CopyAction, but
+ it indicates the action should occur on the Pulsar server instead of on
+ the client.
+ """
+ action_type = "remote_copy"
+ staging = STAGING_ACTION_REMOTE
+
+ def to_dict(self):
+ return dict(path=self.path, action_type=self.action_type)
+
+ @classmethod
+ def from_dict(cls, action_dict):
+ return RemoteCopyAction(path=action_dict["path"])
+
+ def write_to_path(self, path):
+ galaxy.util.copy_to_path(open(self.path, "rb"), path)
+
+ def write_from_path(self, pulsar_path):
+ destination = self.path
+ parent_directory = dirname(destination)
+ if not exists(parent_directory):
+ makedirs(parent_directory)
+ with open(pulsar_path, "rb") as f:
+ galaxy.util.copy_to_path(f, destination)
+
+
+class RemoteTransferAction(BaseAction):
+ """ This action indicates the Pulsar server should copy the file
before
+ execution via direct file system copy. This is like a CopyAction, but
+ it indicates the action should occur on the Pulsar server instead of on
+ the client.
+ """
+ action_type = "remote_transfer"
+ staging = STAGING_ACTION_REMOTE
+
+ def __init__(self, path, file_lister=None, url=None):
+ super(RemoteTransferAction, self).__init__(path, file_lister=file_lister)
+ self.url = url
+
+ def to_dict(self):
+ return dict(path=self.path, action_type=self.action_type, url=self.url)
+
+ @classmethod
+ def from_dict(cls, action_dict):
+ return RemoteTransferAction(path=action_dict["path"],
url=action_dict["url"])
+
+ def write_to_path(self, path):
+ get_file(self.url, path)
+
+ def write_from_path(self, pulsar_path):
+ post_file(self.url, pulsar_path)
+
+
+class MessageAction(object):
+ """ Sort of pseudo action describing "files" store in memory
and
+ transferred via message (HTTP, Python-call, MQ, etc...)
+ """
+ action_type = "message"
+ staging = STAGING_ACTION_DEFAULT
+
+ def __init__(self, contents, client=None):
+ self.contents = contents
+ self.client = client
+
+ @property
+ def staging_needed(self):
+ return True
+
+ @property
+ def staging_action_local(self):
+ # Ekkk, cannot be called if created through from_dict.
+ # Shouldn't be a problem the way it is used - but is an
+ # object design problem.
+ return self.client.prefer_local_staging
+
+ def to_dict(self):
+ return dict(contents=self.contents, action_type=MessageAction.action_type)
+
+ @classmethod
+ def from_dict(cls, action_dict):
+ return MessageAction(contents=action_dict["contents"])
+
+ def write_to_path(self, path):
+ open(path, "w").write(self.contents)
+
+
+DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction]
+
+
+def from_dict(action_dict):
+ action_type = action_dict.get("action_type", None)
+ target_class = None
+ for action_class in DICTIFIABLE_ACTION_CLASSES:
+ if action_type == action_class.action_type:
+ target_class = action_class
+ if not target_class:
+ message = "Failed to recover action from dictionary - invalid action type
specified %s." % action_type
+ raise Exception(message)
+ return target_class.from_dict(action_dict)
+
+
+class BasePathMapper(object):
+
+ def __init__(self, config):
+ action_type = config.get('action', DEFAULT_MAPPED_ACTION)
+ action_class = actions.get(action_type, None)
+ action_kwds = action_class.action_spec.copy()
+ for key, value in action_kwds.items():
+ if key in config:
+ action_kwds[key] = config[key]
+ elif value is REQUIRED_ACTION_KWD:
+ message_template = "action_type %s requires key word argument
%s"
+ message = message_template % (action_type, key)
+ raise Exception(message)
+ self.action_type = action_type
+ self.action_kwds = action_kwds
+ path_types_str = config.get('path_types', "*defaults*")
+ path_types_str = path_types_str.replace("*defaults*",
",".join(ACTION_DEFAULT_PATH_TYPES))
+ path_types_str = path_types_str.replace("*any*",
",".join(ALL_PATH_TYPES))
+ self.path_types = path_types_str.split(",")
+ self.file_lister = FileLister(config)
+
+ def matches(self, path, path_type):
+ path_type_matches = path_type in self.path_types
+ return path_type_matches and self._path_matches(path)
+
+ def _extend_base_dict(self, **kwds):
+ base_dict = dict(
+ action=self.action_type,
+ path_types=",".join(self.path_types),
+ match_type=self.match_type
+ )
+ base_dict.update(self.file_lister.to_dict())
+ base_dict.update(self.action_kwds)
+ base_dict.update(**kwds)
+ return base_dict
+
+
+class PrefixPathMapper(BasePathMapper):
+ match_type = 'prefix'
+
+ def __init__(self, config):
+ super(PrefixPathMapper, self).__init__(config)
+ self.prefix_path = abspath(config['path'])
+
+ def _path_matches(self, path):
+ return path.startswith(self.prefix_path)
+
+ def to_pattern(self):
+ pattern_str = "(%s%s[^\s,\"\']+)" % (escape(self.prefix_path),
escape(sep))
+ return compile(pattern_str)
+
+ def to_dict(self):
+ return self._extend_base_dict(path=self.prefix_path)
+
+
+class GlobPathMapper(BasePathMapper):
+ match_type = 'glob'
+
+ def __init__(self, config):
+ super(GlobPathMapper, self).__init__(config)
+ self.glob_path = config['path']
+
+ def _path_matches(self, path):
+ return fnmatch.fnmatch(path, self.glob_path)
+
+ def to_pattern(self):
+ return compile(fnmatch.translate(self.glob_path))
+
+ def to_dict(self):
+ return self._extend_base_dict(path=self.glob_path)
+
+
+class RegexPathMapper(BasePathMapper):
+ match_type = 'regex'
+
+ def __init__(self, config):
+ super(RegexPathMapper, self).__init__(config)
+ self.pattern_raw = config['path']
+ self.pattern = compile(self.pattern_raw)
+
+ def _path_matches(self, path):
+ return self.pattern.match(path) is not None
+
+ def to_pattern(self):
+ return self.pattern
+
+ def to_dict(self):
+ return self._extend_base_dict(path=self.pattern_raw)
+
+MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper]
+MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES))
+
+
+def mappers_from_dicts(mapper_def_list):
+ return map(lambda m: __mappper_from_dict(m), mapper_def_list)
+
+
+def __mappper_from_dict(mapper_dict):
+ map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE)
+ return MAPPER_CLASS_DICT[map_type](mapper_dict)
+
+
+class FileLister(object):
+
+ def __init__(self, config):
+ self.depth = int(config.get("depth", "0"))
+
+ def to_dict(self):
+ return dict(
+ depth=self.depth
+ )
+
+ def unstructured_map(self, path):
+ depth = self.depth
+ if self.depth == 0:
+ return {path: basename(path)}
+ else:
+ while depth > 0:
+ path = dirname(path)
+ depth -= 1
+ return dict([(join(path, f), f) for f in directory_files(path)])
+
+DEFAULT_FILE_LISTER = FileLister(dict(depth=0))
+
+ACTION_CLASSES = [
+ NoneAction,
+ RewriteAction,
+ TransferAction,
+ CopyAction,
+ RemoteCopyAction,
+ RemoteTransferAction,
+]
+actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES])
+
+
+__all__ = [
+ FileActionMapper,
+ path_type,
+ from_dict,
+ MessageAction,
+ RemoteTransferAction, # For testing
+]
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/amqp_exchange.py
--- /dev/null
+++ b/lib/pulsar/client/amqp_exchange.py
@@ -0,0 +1,139 @@
+try:
+ import kombu
+ from kombu import pools
+except ImportError:
+ kombu = None
+
+import socket
+import logging
+import threading
+from time import sleep
+log = logging.getLogger(__name__)
+
+
+KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency
unavailable"
+
+DEFAULT_EXCHANGE_NAME = "pulsar"
+DEFAULT_EXCHANGE_TYPE = "direct"
+# Set timeout to periodically give up looking and check if polling should end.
+DEFAULT_TIMEOUT = 0.2
+DEFAULT_HEARTBEAT = 580
+
+DEFAULT_RECONNECT_CONSUMER_WAIT = 1
+DEFAULT_HEARTBEAT_WAIT = 1
+
+
+class PulsarExchange(object):
+ """ Utility for publishing and consuming structured Pulsar queues
using kombu.
+ This is shared between the server and client - an exchange should be setup
+ for each manager (or in the case of the client, each manager one wished to
+ communicate with.)
+
+ Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar
+ should target each AMQP endpoint or care should be taken that unique
+ manager names are used across Pulsar servers targetting same AMQP endpoint -
+ and in particular only one such Pulsar should define an default manager with
+ name _default_.
+ """
+
+ def __init__(
+ self,
+ url,
+ manager_name,
+ connect_ssl=None,
+ timeout=DEFAULT_TIMEOUT,
+ publish_kwds={},
+ ):
+ """
+ """
+ if not kombu:
+ raise Exception(KOMBU_UNAVAILABLE)
+ self.__url = url
+ self.__manager_name = manager_name
+ self.__connect_ssl = connect_ssl
+ self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE)
+ self.__timeout = timeout
+ # Be sure to log message publishing failures.
+ if publish_kwds.get("retry", False):
+ if "retry_policy" not in publish_kwds:
+ publish_kwds["retry_policy"] = {}
+ if "errback" not in publish_kwds["retry_policy"]:
+ publish_kwds["retry_policy"]["errback"] =
self.__publish_errback
+ self.__publish_kwds = publish_kwds
+
+ @property
+ def url(self):
+ return self.__url
+
+ def consume(self, queue_name, callback, check=True, connection_kwargs={}):
+ queue = self.__queue(queue_name)
+ log.debug("Consuming queue '%s'", queue)
+ while check:
+ heartbeat_thread = None
+ try:
+ with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT,
**connection_kwargs) as connection:
+ with kombu.Consumer(connection, queues=[queue], callbacks=[callback],
accept=['json']):
+ heartbeat_thread = self.__start_heartbeat(queue_name,
connection)
+ while check and connection.connected:
+ try:
+ connection.drain_events(timeout=self.__timeout)
+ except socket.timeout:
+ pass
+ except (IOError, socket.error), exc:
+ # In testing, errno is None
+ log.warning('Got %s, will retry: %s', exc.__class__.__name__,
exc)
+ if heartbeat_thread:
+ heartbeat_thread.join()
+ sleep(DEFAULT_RECONNECT_CONSUMER_WAIT)
+
+ def heartbeat(self, connection):
+ log.debug('AMQP heartbeat thread alive')
+ while connection.connected:
+ connection.heartbeat_check()
+ sleep(DEFAULT_HEARTBEAT_WAIT)
+ log.debug('AMQP heartbeat thread exiting')
+
+ def publish(self, name, payload):
+ with self.connection(self.__url) as connection:
+ with pools.producers[connection].acquire() as producer:
+ key = self.__queue_name(name)
+ producer.publish(
+ payload,
+ serializer='json',
+ exchange=self.__exchange,
+ declare=[self.__exchange],
+ routing_key=key,
+ **self.__publish_kwds
+ )
+
+ def __publish_errback(self, exc, interval):
+ log.error("Connection error while publishing: %r", exc, exc_info=1)
+ log.info("Retrying in %s seconds", interval)
+
+ def connection(self, connection_string, **kwargs):
+ if "ssl" not in kwargs:
+ kwargs["ssl"] = self.__connect_ssl
+ return kombu.Connection(connection_string, **kwargs)
+
+ def __queue(self, name):
+ queue_name = self.__queue_name(name)
+ queue = kombu.Queue(queue_name, self.__exchange, routing_key=queue_name)
+ return queue
+
+ def __queue_name(self, name):
+ key_prefix = self.__key_prefix()
+ queue_name = '%s_%s' % (key_prefix, name)
+ return queue_name
+
+ def __key_prefix(self):
+ if self.__manager_name == "_default_":
+ key_prefix = "pulsar_"
+ else:
+ key_prefix = "pulsar_%s_" % self.__manager_name
+ return key_prefix
+
+ def __start_heartbeat(self, queue_name, connection):
+ thread_name = "consume-heartbeat-%s" % (self.__queue_name(queue_name))
+ thread = threading.Thread(name=thread_name, target=self.heartbeat,
args=(connection,))
+ thread.start()
+ return thread
diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r
566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/amqp_exchange_factory.py
--- /dev/null
+++ b/lib/pulsar/client/amqp_exchange_factory.py
@@ -0,0 +1,41 @@
+from .amqp_exchange import PulsarExchange
+from .util import filter_destination_params
+
+
+def get_exchange(url, manager_name, params):
+ connect_ssl = parse_amqp_connect_ssl_params(params)
+ exchange_kwds = dict(
+ manager_name=manager_name,
+ connect_ssl=connect_ssl,
+ publish_kwds=parse_amqp_publish_kwds(params)
+ )
+ timeout = params.get('amqp_consumer_timeout', False)
+ if timeout is not False:
+ exchange_kwds['timeout'] = timeout
+ exchange = PulsarExchange(url, **exchange_kwds)
+ return exchange
+
+
+def parse_amqp_connect_ssl_params(params):
+ ssl_params = filter_destination_params(params, "amqp_connect_ssl_")
+ if not ssl_params:
+ return
+
+ ssl = __import__('ssl')
+ if 'cert_reqs' in ssl_params:
+ value = ssl_params['cert_reqs']
+ ssl_params['cert_reqs'] = getattr(ssl, value.upper())
+ return ssl_params
+
+
+def parse_amqp_publish_kwds(params):
+ all_publish_params = filter_destination_params(params, "amqp_publish_")
+ retry_policy_params = {}
+ for key in all_publish_params.keys():
+ if key.startswith("retry_"):
+ value = all_publish_params[key]
+ retry_policy_params[key[len("retry_"):]] = value
+ del all_publish_params[key]
+ if retry_policy_params:
+ all_publish_params["retry_policy"] = retry_policy_params
+ return all_publish_params
This diff is so big that we needed to truncate the remainder.
Repository URL:
https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from
bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.