commit/galaxy-central: jmchilton: Merged in jmchilton/galaxy-central-fork-1 (pull request #422)
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/566388d62374/ Changeset: 566388d62374 User: jmchilton Date: 2014-06-24 04:27:51 Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #422) Implement Pulsar job runners. Affected #: 32 files diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -19,27 +19,30 @@ <!-- Override the $DRMAA_LIBRARY_PATH environment variable --><param id="drmaa_library_path">/sge/lib/libdrmaa.so</param></plugin> - <plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <!-- More information on LWR can be found at https://lwr.readthedocs.org --> - <!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --> + <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> + <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> + <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <!-- Pulsar runners (see more at https://pulsar.readthedocs.org) --> + <plugin id="pulsar_rest" type="runner" load="galaxy.jobs.runners.pulsar:PulsarRESTJobRunner"> + <!-- Allow optimized HTTP calls with libcurl (defaults to urllib) --><!-- <param id="transport">curl</param> --> - <!-- *Experimental Caching*: Uncomment next parameters to enable - caching and specify the number of caching threads to enable on Galaxy - side. Likely will not work with newer features such as MQ support. - If this is enabled be sure to specify a `file_cache_dir` in the remote - LWR's main configuration file. + + <!-- *Experimental Caching*: Next parameter enables caching. + Likely will not work with newer features such as MQ support. + + If this is enabled be sure to specify a `file_cache_dir` in + the remote Pulsar's servers main configuration file. --><!-- <param id="cache">True</param> --> - <!-- <param id="transfer_threads">2</param> --></plugin> - <plugin id="amqp_lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <param id="url">amqp://guest:guest@localhost:5672//</param> - <!-- If using message queue driven LWR - the LWR will generally - initiate file transfers so a the URL of this Galaxy instance - must be configured. --> + <plugin id="pulsar_mq" type="runner" load="galaxy.jobs.runners.pulsar:PulsarMQJobRunner"> + <!-- AMQP URL to connect to. --> + <param id="amqp_url">amqp://guest:guest@localhost:5672//</param> + <!-- URL remote Pulsar apps should transfer files to this Galaxy + instance to/from. --><param id="galaxy_url">http://localhost:8080</param> - <!-- If multiple managers configured on the LWR, specify which one - this plugin targets. --> + <!-- Pulsar job manager to communicate with (see Pulsar + docs for information on job managers). --><!-- <param id="manager">_default_</param> --><!-- The AMQP client can provide an SSL client certificate (e.g. for validation), the following options configure that certificate @@ -58,9 +61,17 @@ higher value (in seconds) (or `None` to use blocking connections). --><!-- <param id="amqp_consumer_timeout">None</param> --></plugin> - <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> - <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> - <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <plugin id="pulsar_legacy" type="runner" load="galaxy.jobs.runners.pulsar:PulsarLegacyJobRunner"> + <!-- Pulsar job runner with default parameters matching those + of old LWR job runner. If your Pulsar server is running on a + Windows machine for instance this runner should still be used. + + These destinations still needs to target a Pulsar server, + older LWR plugins and destinations still work in Galaxy can + target LWR servers, but this support should be considered + deprecated and will disappear with a future release of Galaxy. + --> + </plugin></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -125,8 +136,8 @@ $galaxy_root:ro,$tool_directory:ro,$working_directory:rw,$default_file_path:ro - If using the LWR, defaults will be even further restricted because the - LWR will (by default) stage all needed inputs into the job's job_directory + If using the Pulsar, defaults will be even further restricted because the + Pulsar will (by default) stage all needed inputs into the job's job_directory (so there is not need to allow the docker container to read all the files - let alone write over them). Defaults in this case becomes: @@ -135,7 +146,7 @@ Python string.Template is used to expand volumes and values $defaults, $galaxy_root, $default_file_path, $tool_directory, $working_directory, are available to all jobs and $job_directory is also available for - LWR jobs. + Pulsar jobs. --><!-- Control memory allocatable by docker container with following option: --> @@ -213,87 +224,71 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> - <destination id="secure_lwr" runner="lwr"> - <param id="url">https://windowshost.examle.com:8913/</param> - <!-- If set, private_token must match token remote LWR server configured with. --> + <destination id="secure_pulsar_rest_dest" runner="pulsar_rest"> + <param id="url">https://examle.com:8913/</param> + <!-- If set, private_token must match token in remote Pulsar's + configuration. --><param id="private_token">123456789changeme</param><!-- Uncomment the following statement to disable file staging (e.g. - if there is a shared file system between Galaxy and the LWR + if there is a shared file system between Galaxy and the Pulsar server). Alternatively action can be set to 'copy' - to replace http transfers with file system copies, 'remote_transfer' to cause - the lwr to initiate HTTP transfers instead of Galaxy, or - 'remote_copy' to cause lwr to initiate file system copies. + the Pulsar to initiate HTTP transfers instead of Galaxy, or + 'remote_copy' to cause Pulsar to initiate file system copies. If setting this to 'remote_transfer' be sure to specify a 'galaxy_url' attribute on the runner plugin above. --><!-- <param id="default_file_action">none</param> --><!-- The above option is just the default, the transfer behavior none|copy|http can be configured on a per path basis via the - following file. See lib/galaxy/jobs/runners/lwr_client/action_mapper.py - for examples of how to configure this file. This is very beta - and nature of file will likely change. + following file. See Pulsar documentation for more details and + examples. --> - <!-- <param id="file_action_config">file_actions.json</param> --> - <!-- Uncomment following option to disable Galaxy tool dependency - resolution and utilize remote LWR's configuraiton of tool - dependency resolution instead (same options as Galaxy for - dependency resolution are available in LWR). At a minimum - the remote LWR server should define a tool_dependencies_dir in - its `server.ini` configuration. The LWR will not attempt to - stage dependencies - so ensure the the required galaxy or tool - shed packages are available remotely (exact same tool shed - installed changesets are required). + <!-- <param id="file_action_config">file_actions.yaml</param> --> + <!-- The non-legacy Pulsar runners will attempt to resolve Galaxy + dependencies remotely - to enable this set a tool_dependency_dir + in Pulsar's configuration (can work with all the same dependency + resolutions mechanisms as Galaxy - tool Shed installs, Galaxy + packages, etc...). To disable this behavior, set the follow parameter + to none. To generate the dependency resolution command locally + set the following parameter local. --> - <!-- <param id="dependency_resolution">remote</params> --> - <!-- Traditionally, the LWR allow Galaxy to generate a command line - as if it were going to run the command locally and then the - LWR client rewrites it after the fact using regular - expressions. Setting the following value to true causes the - LWR runner to insert itself into the command line generation - process and generate the correct command line from the get go. - This will likely be the default someday - but requires a newer - LWR version and is less well tested. --> - <!-- <param id="rewrite_parameters">true</params> --> + <!-- <param id="dependency_resolution">none</params> --><!-- Uncomment following option to enable setting metadata on remote - LWR server. The 'use_remote_datatypes' option is available for + Pulsar server. The 'use_remote_datatypes' option is available for determining whether to use remotely configured datatypes or local ones (both alternatives are a little brittle). --><!-- <param id="remote_metadata">true</param> --><!-- <param id="use_remote_datatypes">false</param> --><!-- <param id="remote_property_galaxy_home">/path/to/remote/galaxy-central</param> --> - <!-- If remote LWR server is configured to run jobs as the real user, + <!-- If remote Pulsar server is configured to run jobs as the real user, uncomment the following line to pass the current Galaxy user along. --><!-- <param id="submit_user">$__user_name__</param> --> - <!-- Various other submission parameters can be passed along to the LWR - whose use will depend on the remote LWR's configured job manager. + <!-- Various other submission parameters can be passed along to the Pulsar + whose use will depend on the remote Pulsar's configured job manager. For instance: --> - <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- Disable parameter rewriting and rewrite generated commands + instead. This may be required if remote host is Windows machine + but probably not otherwise. + --> + <!-- <param id="rewrite_parameters">false</params> --></destination> - <destination id="amqp_lwr_dest" runner="amqp_lwr" > - <!-- url and private_token are not valid when using MQ driven LWR. The plugin above - determines which queue/manager to target and the underlying MQ server should be - used to configure security. - --> - <!-- Traditionally, the LWR client sends request to LWR - server to populate various system properties. This + <destination id="pulsar_mq_dest" runner="amqp_pulsar" > + <!-- The RESTful Pulsar client sends a request to Pulsar + to populate various system properties. This extra step can be disabled and these calculated here on client by uncommenting jobs_directory and specifying any additional remote_property_ of interest, this is not optional when using message queues. --> - <param id="jobs_directory">/path/to/remote/lwr/lwr_staging/</param> - <!-- Default the LWR send files to and pull files from Galaxy when - using message queues (in the more traditional mode Galaxy sends - files to and pull files from the LWR - this is obviously less - appropriate when using a message queue). - - The default_file_action currently requires pycurl be available - to Galaxy (presumably in its virtualenv). Making this dependency - optional is an open task. + <param id="jobs_directory">/path/to/remote/pulsar/files/staging/</param> + <!-- Otherwise MQ and Legacy pulsar destinations can be supplied + all the same destination parameters as the RESTful client documented + above (though url and private_token are ignored when using a MQ). --> - <param id="default_file_action">remote_transfer</param></destination><destination id="ssh_torque" runner="cli"><param id="shell_plugin">SecureShell</param> diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/pulsar.py --- /dev/null +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -0,0 +1,707 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. + +import logging + +from galaxy import model +from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from galaxy.jobs import ComputeEnvironment +from galaxy.jobs import JobDestination +from galaxy.jobs.command_factory import build_command +from galaxy.tools.deps import dependencies +from galaxy.util import string_as_bool_or_none +from galaxy.util.bunch import Bunch +from galaxy.util import specs + +import errno +from time import sleep +import os + +from pulsar.client import build_client_manager +from pulsar.client import url_to_destination_params +from pulsar.client import finish_job as pulsar_finish_job +from pulsar.client import submit_job as pulsar_submit_job +from pulsar.client import ClientJobDescription +from pulsar.client import PulsarOutputs +from pulsar.client import ClientOutputs +from pulsar.client import PathMapper + +log = logging.getLogger( __name__ ) + +__all__ = [ 'PulsarLegacyJobRunner', 'PulsarRESTJobRunner', 'PulsarMQJobRunner' ] + +NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory." +NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml." +GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server." + +# Is there a good way to infer some default for this? Can only use +# url_for from web threads. https://gist.github.com/jmchilton/9098762 +DEFAULT_GALAXY_URL = "http://localhost:8080" + +PULSAR_PARAM_SPECS = dict( + transport=dict( + map=specs.to_str_or_none, + valid=specs.is_in("urllib", "curl", None), + default=None + ), + cache=dict( + map=specs.to_bool_or_none, + default=None, + ), + amqp_url=dict( + map=specs.to_str_or_none, + default=None, + ), + galaxy_url=dict( + map=specs.to_str_or_none, + default=DEFAULT_GALAXY_URL, + ), + manager=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_consumer_timeout=dict( + map=lambda val: None if val == "None" else float(val), + default=None, + ), + amqp_connect_ssl_ca_certs=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_keyfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_certfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_cert_reqs=dict( + map=specs.to_str_or_none, + default=None, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Producer.p... + amqp_publish_retry=dict( + map=specs.to_bool, + default=False, + ), + amqp_publish_priority=dict( + map=int, + valid=lambda x: 0 <= x and x <= 9, + default=0, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Exchange.d... + amqp_publish_delivery_mode=dict( + map=str, + valid=specs.is_in("transient", "persistent"), + default="persistent", + ), + amqp_publish_retry_max_retries=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_start=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_step=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_max=dict( + map=int, + default=None, + ), +) + + +PARAMETER_SPECIFICATION_REQUIRED = object() +PARAMETER_SPECIFICATION_IGNORED = object() + + +class PulsarJobRunner( AsynchronousJobRunner ): + """ + Pulsar Job Runner + """ + runner_name = "PulsarJobRunner" + + def __init__( self, app, nworkers, **kwds ): + """Start the job runner """ + super( PulsarJobRunner, self ).__init__( app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds ) + self._init_worker_threads() + galaxy_url = self.runner_params.galaxy_url + if galaxy_url: + galaxy_url = galaxy_url.rstrip("/") + self.galaxy_url = galaxy_url + self.__init_client_manager() + self._monitor() + + def _monitor( self ): + # Extension point allow MQ variant to setup callback instead + self._init_monitor_thread() + + def __init_client_manager( self ): + client_manager_kwargs = {} + for kwd in 'manager', 'cache', 'transport': + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + for kwd in self.runner_params.keys(): + if kwd.startswith( 'amqp_' ): + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + self.client_manager = build_client_manager(**client_manager_kwargs) + + def url_to_destination( self, url ): + """Convert a legacy URL to a job destination""" + return JobDestination( runner="pulsar", params=url_to_destination_params( url ) ) + + def check_watched_item(self, job_state): + try: + client = self.get_client_from_state(job_state) + status = client.get_status() + except Exception: + # An orphaned job was put into the queue at app startup, so remote server went down + # either way we are done I guess. + self.mark_as_finished(job_state) + return None + job_state = self._update_job_state_for_status(job_state, status) + return job_state + + def _update_job_state_for_status(self, job_state, pulsar_status): + if pulsar_status == "complete": + self.mark_as_finished(job_state) + return None + if pulsar_status == "failed": + self.fail_job(job_state) + return None + if pulsar_status == "running" and not job_state.running: + job_state.running = True + job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + return job_state + + def queue_job(self, job_wrapper): + job_destination = job_wrapper.job_destination + self._populate_parameter_defaults( job_destination ) + + command_line, client, remote_job_config, compute_environment = self.__prepare_job( job_wrapper, job_destination ) + + if not command_line: + return + + try: + dependencies_description = PulsarJobRunner.__dependencies_description( client, job_wrapper ) + rewrite_paths = not PulsarJobRunner.__rewrite_parameters( client ) + unstructured_path_rewrites = {} + if compute_environment: + unstructured_path_rewrites = compute_environment.unstructured_path_rewrites + + client_job_description = ClientJobDescription( + command_line=command_line, + input_files=self.get_input_files(job_wrapper), + client_outputs=self.__client_outputs(client, job_wrapper), + working_directory=job_wrapper.working_directory, + tool=job_wrapper.tool, + config_files=job_wrapper.extra_filenames, + dependencies_description=dependencies_description, + env=client.env, + rewrite_paths=rewrite_paths, + arbitrary_files=unstructured_path_rewrites, + ) + job_id = pulsar_submit_job(client, client_job_description, remote_job_config) + log.info("Pulsar job submitted with job_id %s" % job_id) + job_wrapper.set_job_destination( job_destination, job_id ) + job_wrapper.change_state( model.Job.states.QUEUED ) + except Exception: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + + pulsar_job_state = AsynchronousJobState() + pulsar_job_state.job_wrapper = job_wrapper + pulsar_job_state.job_id = job_id + pulsar_job_state.old_state = True + pulsar_job_state.running = False + pulsar_job_state.job_destination = job_destination + self.monitor_job(pulsar_job_state) + + def __prepare_job(self, job_wrapper, job_destination): + """ Build command-line and Pulsar client for this job. """ + command_line = None + client = None + remote_job_config = None + compute_environment = None + try: + client = self.get_client_from_wrapper(job_wrapper) + tool = job_wrapper.tool + remote_job_config = client.setup(tool.id, tool.version) + rewrite_parameters = PulsarJobRunner.__rewrite_parameters( client ) + prepare_kwds = {} + if rewrite_parameters: + compute_environment = PulsarComputeEnvironment( client, job_wrapper, remote_job_config ) + prepare_kwds[ 'compute_environment' ] = compute_environment + job_wrapper.prepare( **prepare_kwds ) + self.__prepare_input_files_locally(job_wrapper) + remote_metadata = PulsarJobRunner.__remote_metadata( client ) + dependency_resolution = PulsarJobRunner.__dependency_resolution( client ) + metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) + remote_command_params = dict( + working_directory=remote_job_config['working_directory'], + metadata_kwds=metadata_kwds, + dependency_resolution=dependency_resolution, + ) + remote_working_directory = remote_job_config['working_directory'] + # TODO: Following defs work for Pulsar, always worked for Pulsar but should be + # calculated at some other level. + remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir)) + remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files")) + container = self._find_container( + job_wrapper, + compute_working_directory=remote_working_directory, + compute_tool_directory=remote_tool_directory, + compute_job_directory=remote_job_directory, + ) + command_line = build_command( + self, + job_wrapper=job_wrapper, + container=container, + include_metadata=remote_metadata, + include_work_dir_outputs=False, + remote_command_params=remote_command_params, + ) + except Exception: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + + # If we were able to get a command line, run the job + if not command_line: + job_wrapper.finish( '', '' ) + + return command_line, client, remote_job_config, compute_environment + + def __prepare_input_files_locally(self, job_wrapper): + """Run task splitting commands locally.""" + prepare_input_files_cmds = getattr(job_wrapper, 'prepare_input_files_cmds', None) + if prepare_input_files_cmds is not None: + for cmd in prepare_input_files_cmds: # run the commands to stage the input files + if 0 != os.system(cmd): + raise Exception('Error running file staging command: %s' % cmd) + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + + def _populate_parameter_defaults( self, job_destination ): + updated = False + params = job_destination.params + for key, value in self.destination_defaults.iteritems(): + if key in params: + if value is PARAMETER_SPECIFICATION_IGNORED: + log.warn( "Pulsar runner in selected configuration ignores parameter %s" % key ) + continue + #if self.runner_params.get( key, None ): + # # Let plugin define defaults for some parameters - + # # for instance that way jobs_directory can be + # # configured next to AMQP url (where it belongs). + # params[ key ] = self.runner_params[ key ] + # continue + + if not value: + continue + + if value is PARAMETER_SPECIFICATION_REQUIRED: + raise Exception( "Pulsar destination does not define required parameter %s" % key ) + elif value is not PARAMETER_SPECIFICATION_IGNORED: + params[ key ] = value + updated = True + return updated + + def get_output_files(self, job_wrapper): + output_paths = job_wrapper.get_output_fnames() + return [ str( o ) for o in output_paths ] # Force job_path from DatasetPath objects. + + def get_input_files(self, job_wrapper): + input_paths = job_wrapper.get_input_paths() + return [ str( i ) for i in input_paths ] # Force job_path from DatasetPath objects. + + def get_client_from_wrapper(self, job_wrapper): + job_id = job_wrapper.job_id + if hasattr(job_wrapper, 'task_id'): + job_id = "%s_%s" % (job_id, job_wrapper.task_id) + params = job_wrapper.job_destination.params.copy() + for key, value in params.iteritems(): + if value: + params[key] = model.User.expand_user_properties( job_wrapper.get_job().user, value ) + + env = getattr( job_wrapper.job_destination, "env", [] ) + return self.get_client( params, job_id, env ) + + def get_client_from_state(self, job_state): + job_destination_params = job_state.job_destination.params + job_id = job_state.job_id + return self.get_client( job_destination_params, job_id ) + + def get_client( self, job_destination_params, job_id, env=[] ): + # Cannot use url_for outside of web thread. + #files_endpoint = url_for( controller="job_files", job_id=encoded_job_id ) + + encoded_job_id = self.app.security.encode_id(job_id) + job_key = self.app.security.encode_id( job_id, kind="jobs_files" ) + files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % ( + self.galaxy_url, + encoded_job_id, + job_key + ) + get_client_kwds = dict( + job_id=str( job_id ), + files_endpoint=files_endpoint, + env=env + ) + return self.client_manager.get_client( job_destination_params, **get_client_kwds ) + + def finish_job( self, job_state ): + stderr = stdout = '' + job_wrapper = job_state.job_wrapper + try: + client = self.get_client_from_state(job_state) + run_results = client.full_status() + remote_working_directory = run_results.get("working_directory", None) + stdout = run_results.get('stdout', '') + stderr = run_results.get('stderr', '') + exit_code = run_results.get('returncode', None) + pulsar_outputs = PulsarOutputs.from_status_response(run_results) + # Use Pulsar client code to transfer/copy files back + # and cleanup job if needed. + completed_normally = \ + job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] + cleanup_job = self.app.config.cleanup_job + client_outputs = self.__client_outputs(client, job_wrapper) + finish_args = dict( client=client, + job_completed_normally=completed_normally, + cleanup_job=cleanup_job, + client_outputs=client_outputs, + pulsar_outputs=pulsar_outputs ) + failed = pulsar_finish_job( **finish_args ) + if failed: + job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) + except Exception: + message = GENERIC_REMOTE_ERROR + job_wrapper.fail( message, exception=True ) + log.exception("failure finishing job %d" % job_wrapper.job_id) + return + if not PulsarJobRunner.__remote_metadata( client ): + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + # Finish the job + try: + job_wrapper.finish( + stdout, + stderr, + exit_code, + remote_working_directory=remote_working_directory + ) + except Exception: + log.exception("Job wrapper finish method failed") + job_wrapper.fail("Unable to finish job", exception=True) + + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( getattr( job_state, "fail_message", GENERIC_REMOTE_ERROR ) ) + + def check_pid( self, pid ): + try: + os.kill( pid, 0 ) + return True + except OSError, e: + if e.errno == errno.ESRCH: + log.debug( "check_pid(): PID %d is dead" % pid ) + else: + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) + return False + + def stop_job( self, job ): + #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished + job_ext_output_metadata = job.get_external_output_metadata() + if job_ext_output_metadata: + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them + if pid in [ None, '' ]: + log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) + return + pid = int( pid ) + if not self.check_pid( pid ): + log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) + return + for sig in [ 15, 9 ]: + try: + os.killpg( pid, sig ) + except OSError, e: + log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) + return # give up + sleep( 2 ) + if not self.check_pid( pid ): + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.id, pid, sig ) ) + return + else: + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.id, pid ) ) + else: + # Remote kill + pulsar_url = job.job_runner_name + job_id = job.job_runner_external_id + log.debug("Attempt remote Pulsar kill of job with url %s and id %s" % (pulsar_url, job_id)) + client = self.get_client(job.destination_params, job_id) + client.kill() + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_state = self._job_state( job, job_wrapper ) + job_wrapper.command_line = job.get_command_line() + state = job.get_state() + if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: + log.debug( "(Pulsar/%s) is still in running state, adding to the Pulsar queue" % ( job.get_id()) ) + job_state.old_state = True + job_state.running = state == model.Job.states.RUNNING + self.monitor_queue.put( job_state ) + + def shutdown( self ): + super( PulsarJobRunner, self ).shutdown() + self.client_manager.shutdown() + + def _job_state( self, job, job_wrapper ): + job_state = AsynchronousJobState() + # TODO: Determine why this is set when using normal message queue updates + # but not CLI submitted MQ updates... + raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id + job_state.job_id = str( raw_job_id ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_state.job_destination = job_wrapper.job_destination + job_state.job_wrapper = job_wrapper + return job_state + + def __client_outputs( self, client, job_wrapper ): + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + output_files = self.get_output_files( job_wrapper ) + client_outputs = ClientOutputs( + working_directory=job_wrapper.working_directory, + work_dir_outputs=work_dir_outputs, + output_files=output_files, + version_file=job_wrapper.get_version_string_path(), + ) + return client_outputs + + @staticmethod + def __dependencies_description( pulsar_client, job_wrapper ): + dependency_resolution = PulsarJobRunner.__dependency_resolution( pulsar_client ) + remote_dependency_resolution = dependency_resolution == "remote" + if not remote_dependency_resolution: + return None + requirements = job_wrapper.tool.requirements or [] + installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies or [] + return dependencies.DependenciesDescription( + requirements=requirements, + installed_tool_dependencies=installed_tool_dependencies, + ) + + @staticmethod + def __dependency_resolution( pulsar_client ): + dependency_resolution = pulsar_client.destination_params.get( "dependency_resolution", "local" ) + if dependency_resolution not in ["none", "local", "remote"]: + raise Exception("Unknown dependency_resolution value encountered %s" % dependency_resolution) + return dependency_resolution + + @staticmethod + def __remote_metadata( pulsar_client ): + remote_metadata = string_as_bool_or_none( pulsar_client.destination_params.get( "remote_metadata", False ) ) + return remote_metadata + + @staticmethod + def __use_remote_datatypes_conf( pulsar_client ): + """ When setting remote metadata, use integrated datatypes from this + Galaxy instance or use the datatypes config configured via the remote + Pulsar. + + Both options are broken in different ways for same reason - datatypes + may not match. One can push the local datatypes config to the remote + server - but there is no guarentee these datatypes will be defined + there. Alternatively, one can use the remote datatype config - but + there is no guarentee that it will contain all the datatypes available + to this Galaxy. + """ + use_remote_datatypes = string_as_bool_or_none( pulsar_client.destination_params.get( "use_remote_datatypes", False ) ) + return use_remote_datatypes + + @staticmethod + def __rewrite_parameters( pulsar_client ): + return string_as_bool_or_none( pulsar_client.destination_params.get( "rewrite_parameters", False ) ) or False + + def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config): + metadata_kwds = {} + if remote_metadata: + remote_system_properties = remote_job_config.get("system_properties", {}) + remote_galaxy_home = remote_system_properties.get("galaxy_home", None) + if not remote_galaxy_home: + raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE) + metadata_kwds['exec_dir'] = remote_galaxy_home + outputs_directory = remote_job_config['outputs_directory'] + configs_directory = remote_job_config['configs_directory'] + working_directory = remote_job_config['working_directory'] + # For metadata calculation, we need to build a list of of output + # file objects with real path indicating location on Galaxy server + # and false path indicating location on compute server. Since the + # Pulsar disables from_work_dir copying as part of the job command + # line we need to take the list of output locations on the Pulsar + # server (produced by self.get_output_files(job_wrapper)) and for + # each work_dir output substitute the effective path on the Pulsar + # server relative to the remote working directory as the + # false_path to send the metadata command generation module. + work_dir_outputs = self.get_work_dir_outputs(job_wrapper, job_working_directory=working_directory) + outputs = [Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)] + for output in outputs: + for pulsar_workdir_path, real_path in work_dir_outputs: + if real_path == output.real_path: + output.false_path = pulsar_workdir_path + metadata_kwds['output_fnames'] = outputs + metadata_kwds['compute_tmp_dir'] = working_directory + metadata_kwds['config_root'] = remote_galaxy_home + default_config_file = os.path.join(remote_galaxy_home, 'universe_wsgi.ini') + metadata_kwds['config_file'] = remote_system_properties.get('galaxy_config_file', default_config_file) + metadata_kwds['dataset_files_path'] = remote_system_properties.get('galaxy_dataset_files_path', None) + if PulsarJobRunner.__use_remote_datatypes_conf( client ): + remote_datatypes_config = remote_system_properties.get('galaxy_datatypes_config_file', None) + if not remote_datatypes_config: + log.warn(NO_REMOTE_DATATYPES_CONFIG) + remote_datatypes_config = os.path.join(remote_galaxy_home, 'datatypes_conf.xml') + metadata_kwds['datatypes_config'] = remote_datatypes_config + else: + integrates_datatypes_config = self.app.datatypes_registry.integrated_datatypes_configs + # Ensure this file gets pushed out to the remote config dir. + job_wrapper.extra_filenames.append(integrates_datatypes_config) + + metadata_kwds['datatypes_config'] = os.path.join(configs_directory, os.path.basename(integrates_datatypes_config)) + return metadata_kwds + + +class PulsarLegacyJobRunner( PulsarJobRunner ): + destination_defaults = dict( + rewrite_parameters="false", + dependency_resolution="local", + ) + + +class PulsarMQJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="remote_transfer", + rewrite_parameters="true", + dependency_resolution="remote", + jobs_directory=PARAMETER_SPECIFICATION_REQUIRED, + url=PARAMETER_SPECIFICATION_IGNORED, + private_token=PARAMETER_SPECIFICATION_IGNORED + ) + + def _monitor( self ): + # This is a message queue driven runner, don't monitor + # just setup required callback. + self.client_manager.ensure_has_status_update_callback(self.__async_update) + + def __async_update( self, full_status ): + job_id = None + try: + job_id = full_status[ "job_id" ] + job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id( job_id ) + job_state = self._job_state( job, job_wrapper ) + self._update_job_state_for_status(job_state, full_status[ "status" ] ) + except Exception: + log.exception( "Failed to update Pulsar job status for job_id %s" % job_id ) + raise + # Nothing else to do? - Attempt to fail the job? + + +class PulsarRESTJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="transfer", + rewrite_parameters="true", + dependency_resolution="remote", + url=PARAMETER_SPECIFICATION_REQUIRED, + ) + + +class PulsarComputeEnvironment( ComputeEnvironment ): + + def __init__( self, pulsar_client, job_wrapper, remote_job_config ): + self.pulsar_client = pulsar_client + self.job_wrapper = job_wrapper + self.local_path_config = job_wrapper.default_compute_environment() + self.unstructured_path_rewrites = {} + # job_wrapper.prepare is going to expunge the job backing the following + # computations, so precalculate these paths. + self._wrapper_input_paths = self.local_path_config.input_paths() + self._wrapper_output_paths = self.local_path_config.output_paths() + self.path_mapper = PathMapper(pulsar_client, remote_job_config, self.local_path_config.working_directory()) + self._config_directory = remote_job_config[ "configs_directory" ] + self._working_directory = remote_job_config[ "working_directory" ] + self._sep = remote_job_config[ "system_properties" ][ "separator" ] + self._tool_dir = remote_job_config[ "tools_directory" ] + version_path = self.local_path_config.version_path() + new_version_path = self.path_mapper.remote_version_path_rewrite(version_path) + if new_version_path: + version_path = new_version_path + self._version_path = version_path + + def output_paths( self ): + local_output_paths = self._wrapper_output_paths + + results = [] + for local_output_path in local_output_paths: + wrapper_path = str( local_output_path ) + remote_path = self.path_mapper.remote_output_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_output_path, remote_path ) ) + return results + + def input_paths( self ): + local_input_paths = self._wrapper_input_paths + + results = [] + for local_input_path in local_input_paths: + wrapper_path = str( local_input_path ) + # This will over-copy in some cases. For instance in the case of task + # splitting, this input will be copied even though only the work dir + # input will actually be used. + remote_path = self.path_mapper.remote_input_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_input_path, remote_path ) ) + return results + + def _dataset_path( self, local_dataset_path, remote_path ): + remote_extra_files_path = None + if remote_path: + remote_extra_files_path = "%s_files" % remote_path[ 0:-len( ".dat" ) ] + return local_dataset_path.with_path_for_job( remote_path, remote_extra_files_path ) + + def working_directory( self ): + return self._working_directory + + def config_directory( self ): + return self._config_directory + + def new_file_path( self ): + return self.working_directory() # Problems with doing this? + + def sep( self ): + return self._sep + + def version_path( self ): + return self._version_path + + def rewriter( self, parameter_value ): + unstructured_path_rewrites = self.unstructured_path_rewrites + if parameter_value in unstructured_path_rewrites: + # Path previously mapped, use previous mapping. + return unstructured_path_rewrites[ parameter_value ] + if parameter_value in unstructured_path_rewrites.itervalues(): + # Path is a rewritten remote path (this might never occur, + # consider dropping check...) + return parameter_value + + rewrite, new_unstructured_path_rewrites = self.path_mapper.check_for_arbitrary_rewrite( parameter_value ) + if rewrite: + unstructured_path_rewrites.update(new_unstructured_path_rewrites) + return rewrite + else: + # Did need to rewrite, use original path or value. + return parameter_value + + def unstructured_path_rewriter( self ): + return self.rewriter diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/__init__.py --- a/lib/galaxy/jobs/runners/util/__init__.py +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -1,7 +1,7 @@ """ This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain -functionality shared between Galaxy and the LWR. +functionality shared between Galaxy and the Pulsar. """ from galaxy.util.bunch import Bunch diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/factory.py --- a/lib/galaxy/jobs/runners/util/cli/factory.py +++ b/lib/galaxy/jobs/runners/util/cli/factory.py @@ -5,7 +5,7 @@ ) code_dir = 'lib' except ImportError: - from lwr.managers.util.cli import ( + from pulsar.managers.util.cli import ( CliInterface, split_params ) diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/job/slurm.py --- a/lib/galaxy/jobs/runners/util/cli/job/slurm.py +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm.py @@ -5,7 +5,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/jobs/runners/util/cli/job/torque.py --- a/lib/galaxy/jobs/runners/util/cli/job/torque.py +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -7,7 +7,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -623,9 +623,9 @@ elif store == 'irods': from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) - elif store == 'lwr': - from .lwr import LwrObjectStore - return LwrObjectStore(config=config, config_xml=config_xml) + elif store == 'pulsar': + from .pulsar import PulsarObjectStore + return PulsarObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/lwr.py --- a/lib/galaxy/objectstore/lwr.py +++ /dev/null @@ -1,76 +0,0 @@ -from __future__ import absolute_import # Need to import lwr_client absolutely. -from ..objectstore import ObjectStore -try: - from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager -except ImportError: - from lwr.lwr_client.manager import ObjectStoreClientManager - - -class LwrObjectStore(ObjectStore): - """ - Object store implementation that delegates to a remote LWR server. - - This may be more aspirational than practical for now, it would be good to - Galaxy to a point that a handler thread could be setup that doesn't attempt - to access the disk files returned by a (this) object store - just passing - them along to the LWR unmodified. That modification - along with this - implementation and LWR job destinations would then allow Galaxy to fully - manage jobs on remote servers with completely different mount points. - - This implementation should be considered beta and may be dropped from - Galaxy at some future point or significantly modified. - """ - - def __init__(self, config, config_xml): - self.lwr_client = self.__build_lwr_client(config_xml) - - def exists(self, obj, **kwds): - return self.lwr_client.exists(**self.__build_kwds(obj, **kwds)) - - def file_ready(self, obj, **kwds): - return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds)) - - def create(self, obj, **kwds): - return self.lwr_client.create(**self.__build_kwds(obj, **kwds)) - - def empty(self, obj, **kwds): - return self.lwr_client.empty(**self.__build_kwds(obj, **kwds)) - - def size(self, obj, **kwds): - return self.lwr_client.size(**self.__build_kwds(obj, **kwds)) - - def delete(self, obj, **kwds): - return self.lwr_client.delete(**self.__build_kwds(obj, **kwds)) - - # TODO: Optimize get_data. - def get_data(self, obj, **kwds): - return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds)) - - def get_filename(self, obj, **kwds): - return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds)) - - def update_from_file(self, obj, **kwds): - return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds)) - - def get_store_usage_percent(self): - return self.lwr_client.get_store_usage_percent() - - def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): - return None - - def __build_kwds(self, obj, **kwds): - kwds['object_id'] = obj.id - return kwds - pass - - def __build_lwr_client(self, config_xml): - url = config_xml.get("url") - private_token = config_xml.get("private_token", None) - transport = config_xml.get("transport", None) - manager_options = dict(transport=transport) - client_options = dict(url=url, private_token=private_token) - lwr_client = ObjectStoreClientManager(**manager_options).get_client(client_options) - return lwr_client - - def shutdown(self): - pass diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/objectstore/pulsar.py --- /dev/null +++ b/lib/galaxy/objectstore/pulsar.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. +from ..objectstore import ObjectStore +from pulsar.client.manager import ObjectStoreClientManager + + +class PulsarObjectStore(ObjectStore): + """ + Object store implementation that delegates to a remote Pulsar server. + + This may be more aspirational than practical for now, it would be good to + Galaxy to a point that a handler thread could be setup that doesn't attempt + to access the disk files returned by a (this) object store - just passing + them along to the Pulsar unmodified. That modification - along with this + implementation and Pulsar job destinations would then allow Galaxy to fully + manage jobs on remote servers with completely different mount points. + + This implementation should be considered beta and may be dropped from + Galaxy at some future point or significantly modified. + """ + + def __init__(self, config, config_xml): + self.pulsar_client = self.__build_pulsar_client(config_xml) + + def exists(self, obj, **kwds): + return self.pulsar_client.exists(**self.__build_kwds(obj, **kwds)) + + def file_ready(self, obj, **kwds): + return self.pulsar_client.file_ready(**self.__build_kwds(obj, **kwds)) + + def create(self, obj, **kwds): + return self.pulsar_client.create(**self.__build_kwds(obj, **kwds)) + + def empty(self, obj, **kwds): + return self.pulsar_client.empty(**self.__build_kwds(obj, **kwds)) + + def size(self, obj, **kwds): + return self.pulsar_client.size(**self.__build_kwds(obj, **kwds)) + + def delete(self, obj, **kwds): + return self.pulsar_client.delete(**self.__build_kwds(obj, **kwds)) + + # TODO: Optimize get_data. + def get_data(self, obj, **kwds): + return self.pulsar_client.get_data(**self.__build_kwds(obj, **kwds)) + + def get_filename(self, obj, **kwds): + return self.pulsar_client.get_filename(**self.__build_kwds(obj, **kwds)) + + def update_from_file(self, obj, **kwds): + return self.pulsar_client.update_from_file(**self.__build_kwds(obj, **kwds)) + + def get_store_usage_percent(self): + return self.pulsar_client.get_store_usage_percent() + + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): + return None + + def __build_kwds(self, obj, **kwds): + kwds['object_id'] = obj.id + return kwds + pass + + def __build_pulsar_client(self, config_xml): + url = config_xml.get("url") + private_token = config_xml.get("private_token", None) + transport = config_xml.get("transport", None) + manager_options = dict(transport=transport) + client_options = dict(url=url, private_token=private_token) + pulsar_client = ObjectStoreClientManager(**manager_options).get_client(client_options) + return pulsar_client + + def shutdown(self): + pass diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/galaxy/tools/deps/dependencies.py --- a/lib/galaxy/tools/deps/dependencies.py +++ b/lib/galaxy/tools/deps/dependencies.py @@ -8,7 +8,7 @@ related context required to resolve dependencies via the ToolShedPackageDependencyResolver. - This is meant to enable remote resolution of dependencies, by the LWR or + This is meant to enable remote resolution of dependencies, by the Pulsar or other potential remote execution mechanisms. """ @@ -39,7 +39,7 @@ @staticmethod def _toolshed_install_dependency_from_dict(as_dict): - # Rather than requiring full models in LWR, just use simple objects + # Rather than requiring full models in Pulsar, just use simple objects # containing only properties and associations used to resolve # dependencies for tool execution. repository_object = bunch.Bunch( diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/__init__.py --- /dev/null +++ b/lib/pulsar/client/__init__.py @@ -0,0 +1,62 @@ +""" +pulsar client +====== + +This module contains logic for interfacing with an external Pulsar server. + +------------------ +Configuring Galaxy +------------------ + +Galaxy job runners are configured in Galaxy's ``job_conf.xml`` file. See ``job_conf.xml.sample_advanced`` +in your Galaxy code base or on +`Bitbucket <https://bitbucket.org/galaxy/galaxy-dist/src/tip/job_conf.xml.sample_advanced?at=default>`_ +for information on how to configure Galaxy to interact with the Pulsar. + +Galaxy also supports an older, less rich configuration of job runners directly +in its main ``universe_wsgi.ini`` file. The following section describes how to +configure Galaxy to communicate with the Pulsar in this legacy mode. + +Legacy +------ + +A Galaxy tool can be configured to be executed remotely via Pulsar by +adding a line to the ``universe_wsgi.ini`` file under the +``galaxy:tool_runners`` section with the format:: + + <tool_id> = pulsar://http://<pulsar_host>:<pulsar_port> + +As an example, if a host named remotehost is running the Pulsar server +application on port ``8913``, then the tool with id ``test_tool`` can +be configured to run remotely on remotehost by adding the following +line to ``universe.ini``:: + + test_tool = pulsar://http://remotehost:8913 + +Remember this must be added after the ``[galaxy:tool_runners]`` header +in the ``universe.ini`` file. + + +""" + +from .staging.down import finish_job +from .staging.up import submit_job +from .staging import ClientJobDescription +from .staging import PulsarOutputs +from .staging import ClientOutputs +from .client import OutputNotFoundException +from .manager import build_client_manager +from .destination import url_to_destination_params +from .path_mapper import PathMapper + +__all__ = [ + build_client_manager, + OutputNotFoundException, + url_to_destination_params, + finish_job, + submit_job, + ClientJobDescription, + PulsarOutputs, + ClientOutputs, + PathMapper, +] diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/action_mapper.py --- /dev/null +++ b/lib/pulsar/client/action_mapper.py @@ -0,0 +1,567 @@ +from json import load +from os import makedirs +from os.path import exists +from os.path import abspath +from os.path import dirname +from os.path import join +from os.path import basename +from os.path import sep +import fnmatch +from re import compile +from re import escape +import galaxy.util +from galaxy.util.bunch import Bunch +from .config_util import read_file +from .util import directory_files +from .util import unique_path_prefix +from .transport import get_file +from .transport import post_file + + +DEFAULT_MAPPED_ACTION = 'transfer' # Not really clear to me what this should be, exception? +DEFAULT_PATH_MAPPER_TYPE = 'prefix' + +STAGING_ACTION_REMOTE = "remote" +STAGING_ACTION_LOCAL = "local" +STAGING_ACTION_NONE = None +STAGING_ACTION_DEFAULT = "default" + +# Poor man's enum. +path_type = Bunch( + # Galaxy input datasets and extra files. + INPUT="input", + # Galaxy config and param files. + CONFIG="config", + # Files from tool's tool_dir (for now just wrapper if available). + TOOL="tool", + # Input work dir files - e.g. metadata files, task-split input files, etc.. + WORKDIR="workdir", + # Galaxy output datasets in their final home. + OUTPUT="output", + # Galaxy from_work_dir output paths and other files (e.g. galaxy.json) + OUTPUT_WORKDIR="output_workdir", + # Other fixed tool parameter paths (likely coming from tool data, but not + # nessecarily). Not sure this is the best name... + UNSTRUCTURED="unstructured", +) + + +ACTION_DEFAULT_PATH_TYPES = [ + path_type.INPUT, + path_type.CONFIG, + path_type.TOOL, + path_type.WORKDIR, + path_type.OUTPUT, + path_type.OUTPUT_WORKDIR, +] +ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED] + + +class FileActionMapper(object): + """ + Objects of this class define how paths are mapped to actions. + + >>> json_string = r'''{"paths": [ \ + {"path": "/opt/galaxy", "action": "none"}, \ + {"path": "/galaxy/data", "action": "transfer"}, \ + {"path": "/cool/bamfiles/**/*.bam", "action": "copy", "match_type": "glob"}, \ + {"path": ".*/dataset_\\\\d+.dat", "action": "copy", "match_type": "regex"} \ + ]}''' + >>> from tempfile import NamedTemporaryFile + >>> from os import unlink + >>> def mapper_for(default_action, config_contents): + ... f = NamedTemporaryFile(delete=False) + ... f.write(config_contents.encode('UTF-8')) + ... f.close() + ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) + ... mapper = FileActionMapper(mock_client) + ... mapper = FileActionMapper(config=mapper.to_dict()) # Serialize and deserialize it to make sure still works + ... unlink(f.name) + ... return mapper + >>> mapper = mapper_for(default_action='none', config_contents=json_string) + >>> # Test first config line above, implicit path prefix mapper + >>> action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + >>> action.action_type == u'none' + True + >>> action.staging_needed + False + >>> # Test another (2nd) mapper, this one with a different action + >>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input') + >>> action.action_type == u'transfer' + True + >>> action.staging_needed + True + >>> # Always at least copy work_dir outputs. + >>> action = mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir') + >>> action.action_type == u'copy' + True + >>> action.staging_needed + True + >>> # Test glob mapper (matching test) + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input').action_type == u'copy' + True + >>> # Test glob mapper (non-matching test) + >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input').action_type == u'none' + True + >>> # Regex mapper test. + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input').action_type == u'copy' + True + >>> # Doesn't map unstructured paths by default + >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'none' + True + >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "input"} \ + ] }''') + >>> input_only_mapper.action('/dataset_1.dat', 'input').action_type == u'transfer' + True + >>> input_only_mapper.action('/dataset_1.dat', 'output').action_type == u'none' + True + >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"path": "/", "action": "transfer", "path_types": "*any*"} \ + ] }''') + >>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'transfer' + True + """ + + def __init__(self, client=None, config=None): + if config is None and client is None: + message = "FileActionMapper must be constructed from either a client or a config dictionary." + raise Exception(message) + if config is None: + config = self.__client_to_config(client) + self.default_action = config.get("default_action", "transfer") + self.mappers = mappers_from_dicts(config.get("paths", [])) + self.files_endpoint = config.get("files_endpoint", None) + + def action(self, path, type, mapper=None): + mapper = self.__find_mapper(path, type, mapper) + action_class = self.__action_class(path, type, mapper) + file_lister = DEFAULT_FILE_LISTER + action_kwds = {} + if mapper: + file_lister = mapper.file_lister + action_kwds = mapper.action_kwds + action = action_class(path, file_lister=file_lister, **action_kwds) + self.__process_action(action, type) + return action + + def unstructured_mappers(self): + """ Return mappers that will map 'unstructured' files (i.e. go beyond + mapping inputs, outputs, and config files). + """ + return filter(lambda m: path_type.UNSTRUCTURED in m.path_types, self.mappers) + + def to_dict(self): + return dict( + default_action=self.default_action, + files_endpoint=self.files_endpoint, + paths=map(lambda m: m.to_dict(), self.mappers) + ) + + def __client_to_config(self, client): + action_config_path = client.action_config_path + if action_config_path: + config = read_file(action_config_path) + else: + config = dict() + config["default_action"] = client.default_file_action + config["files_endpoint"] = client.files_endpoint + return config + + def __load_action_config(self, path): + config = load(open(path, 'rb')) + self.mappers = mappers_from_dicts(config.get('paths', [])) + + def __find_mapper(self, path, type, mapper=None): + if not mapper: + normalized_path = abspath(path) + for query_mapper in self.mappers: + if query_mapper.matches(normalized_path, type): + mapper = query_mapper + break + return mapper + + def __action_class(self, path, type, mapper): + action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none" + if mapper: + action_type = mapper.action_type + if type in ["workdir", "output_workdir"] and action_type == "none": + # We are changing the working_directory relative to what + # Galaxy would use, these need to be copied over. + action_type = "copy" + action_class = actions.get(action_type, None) + if action_class is None: + message_template = "Unknown action_type encountered %s while trying to map path %s" + message_args = (action_type, path) + raise Exception(message_template % message_args) + return action_class + + def __process_action(self, action, file_type): + """ Extension point to populate extra action information after an + action has been created. + """ + if action.action_type == "remote_transfer": + url_base = self.files_endpoint + if not url_base: + raise Exception("Attempted to use remote_transfer action with defining a files_endpoint") + if "?" not in url_base: + url_base = "%s?" % url_base + # TODO: URL encode path. + url = "%s&path=%s&file_type=%s" % (url_base, action.path, file_type) + action.url = url + +REQUIRED_ACTION_KWD = object() + + +class BaseAction(object): + action_spec = {} + + def __init__(self, path, file_lister=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + + def unstructured_map(self, path_helper): + unstructured_map = self.file_lister.unstructured_map(self.path) + if self.staging_needed: + # To ensure uniqueness, prepend unique prefix to each name + prefix = unique_path_prefix(self.path) + for path, name in unstructured_map.iteritems(): + unstructured_map[path] = join(prefix, name) + else: + path_rewrites = {} + for path in unstructured_map: + rewrite = self.path_rewrite(path_helper, path) + if rewrite: + path_rewrites[path] = rewrite + unstructured_map = path_rewrites + return unstructured_map + + @property + def staging_needed(self): + return self.staging != STAGING_ACTION_NONE + + @property + def staging_action_local(self): + return self.staging == STAGING_ACTION_LOCAL + + +class NoneAction(BaseAction): + """ This action indicates the corresponding path does not require any + additional action. This should indicate paths that are available both on + the Pulsar client (i.e. Galaxy server) and remote Pulsar server with the same + paths. """ + action_type = "none" + staging = STAGING_ACTION_NONE + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return NoneAction(path=action_dict["path"]) + + def path_rewrite(self, path_helper, path=None): + return None + + +class RewriteAction(BaseAction): + """ This actin indicates the Pulsar server should simply rewrite the path + to the specified file. + """ + action_spec = dict( + source_directory=REQUIRED_ACTION_KWD, + destination_directory=REQUIRED_ACTION_KWD + ) + action_type = "rewrite" + staging = STAGING_ACTION_NONE + + def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None): + self.path = path + self.file_lister = file_lister or DEFAULT_FILE_LISTER + self.source_directory = source_directory + self.destination_directory = destination_directory + + def to_dict(self): + return dict( + path=self.path, + action_type=self.action_type, + source_directory=self.source_directory, + destination_directory=self.destination_directory, + ) + + @classmethod + def from_dict(cls, action_dict): + return RewriteAction( + path=action_dict["path"], + source_directory=action_dict["source_directory"], + destination_directory=action_dict["destination_directory"], + ) + + def path_rewrite(self, path_helper, path=None): + if not path: + path = self.path + new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory) + return None if new_path == self.path else new_path + + +class TransferAction(BaseAction): + """ This actions indicates that the Pulsar client should initiate an HTTP + transfer of the corresponding path to the remote Pulsar server before + launching the job. """ + action_type = "transfer" + staging = STAGING_ACTION_LOCAL + + +class CopyAction(BaseAction): + """ This action indicates that the Pulsar client should execute a file system + copy of the corresponding path to the Pulsar staging directory prior to + launching the corresponding job. """ + action_type = "copy" + staging = STAGING_ACTION_LOCAL + + +class RemoteCopyAction(BaseAction): + """ This action indicates the Pulsar server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the Pulsar server instead of on + the client. + """ + action_type = "remote_copy" + staging = STAGING_ACTION_REMOTE + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type) + + @classmethod + def from_dict(cls, action_dict): + return RemoteCopyAction(path=action_dict["path"]) + + def write_to_path(self, path): + galaxy.util.copy_to_path(open(self.path, "rb"), path) + + def write_from_path(self, pulsar_path): + destination = self.path + parent_directory = dirname(destination) + if not exists(parent_directory): + makedirs(parent_directory) + with open(pulsar_path, "rb") as f: + galaxy.util.copy_to_path(f, destination) + + +class RemoteTransferAction(BaseAction): + """ This action indicates the Pulsar server should copy the file before + execution via direct file system copy. This is like a CopyAction, but + it indicates the action should occur on the Pulsar server instead of on + the client. + """ + action_type = "remote_transfer" + staging = STAGING_ACTION_REMOTE + + def __init__(self, path, file_lister=None, url=None): + super(RemoteTransferAction, self).__init__(path, file_lister=file_lister) + self.url = url + + def to_dict(self): + return dict(path=self.path, action_type=self.action_type, url=self.url) + + @classmethod + def from_dict(cls, action_dict): + return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"]) + + def write_to_path(self, path): + get_file(self.url, path) + + def write_from_path(self, pulsar_path): + post_file(self.url, pulsar_path) + + +class MessageAction(object): + """ Sort of pseudo action describing "files" store in memory and + transferred via message (HTTP, Python-call, MQ, etc...) + """ + action_type = "message" + staging = STAGING_ACTION_DEFAULT + + def __init__(self, contents, client=None): + self.contents = contents + self.client = client + + @property + def staging_needed(self): + return True + + @property + def staging_action_local(self): + # Ekkk, cannot be called if created through from_dict. + # Shouldn't be a problem the way it is used - but is an + # object design problem. + return self.client.prefer_local_staging + + def to_dict(self): + return dict(contents=self.contents, action_type=MessageAction.action_type) + + @classmethod + def from_dict(cls, action_dict): + return MessageAction(contents=action_dict["contents"]) + + def write_to_path(self, path): + open(path, "w").write(self.contents) + + +DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction] + + +def from_dict(action_dict): + action_type = action_dict.get("action_type", None) + target_class = None + for action_class in DICTIFIABLE_ACTION_CLASSES: + if action_type == action_class.action_type: + target_class = action_class + if not target_class: + message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type + raise Exception(message) + return target_class.from_dict(action_dict) + + +class BasePathMapper(object): + + def __init__(self, config): + action_type = config.get('action', DEFAULT_MAPPED_ACTION) + action_class = actions.get(action_type, None) + action_kwds = action_class.action_spec.copy() + for key, value in action_kwds.items(): + if key in config: + action_kwds[key] = config[key] + elif value is REQUIRED_ACTION_KWD: + message_template = "action_type %s requires key word argument %s" + message = message_template % (action_type, key) + raise Exception(message) + self.action_type = action_type + self.action_kwds = action_kwds + path_types_str = config.get('path_types', "*defaults*") + path_types_str = path_types_str.replace("*defaults*", ",".join(ACTION_DEFAULT_PATH_TYPES)) + path_types_str = path_types_str.replace("*any*", ",".join(ALL_PATH_TYPES)) + self.path_types = path_types_str.split(",") + self.file_lister = FileLister(config) + + def matches(self, path, path_type): + path_type_matches = path_type in self.path_types + return path_type_matches and self._path_matches(path) + + def _extend_base_dict(self, **kwds): + base_dict = dict( + action=self.action_type, + path_types=",".join(self.path_types), + match_type=self.match_type + ) + base_dict.update(self.file_lister.to_dict()) + base_dict.update(self.action_kwds) + base_dict.update(**kwds) + return base_dict + + +class PrefixPathMapper(BasePathMapper): + match_type = 'prefix' + + def __init__(self, config): + super(PrefixPathMapper, self).__init__(config) + self.prefix_path = abspath(config['path']) + + def _path_matches(self, path): + return path.startswith(self.prefix_path) + + def to_pattern(self): + pattern_str = "(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep)) + return compile(pattern_str) + + def to_dict(self): + return self._extend_base_dict(path=self.prefix_path) + + +class GlobPathMapper(BasePathMapper): + match_type = 'glob' + + def __init__(self, config): + super(GlobPathMapper, self).__init__(config) + self.glob_path = config['path'] + + def _path_matches(self, path): + return fnmatch.fnmatch(path, self.glob_path) + + def to_pattern(self): + return compile(fnmatch.translate(self.glob_path)) + + def to_dict(self): + return self._extend_base_dict(path=self.glob_path) + + +class RegexPathMapper(BasePathMapper): + match_type = 'regex' + + def __init__(self, config): + super(RegexPathMapper, self).__init__(config) + self.pattern_raw = config['path'] + self.pattern = compile(self.pattern_raw) + + def _path_matches(self, path): + return self.pattern.match(path) is not None + + def to_pattern(self): + return self.pattern + + def to_dict(self): + return self._extend_base_dict(path=self.pattern_raw) + +MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper] +MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES)) + + +def mappers_from_dicts(mapper_def_list): + return map(lambda m: __mappper_from_dict(m), mapper_def_list) + + +def __mappper_from_dict(mapper_dict): + map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE) + return MAPPER_CLASS_DICT[map_type](mapper_dict) + + +class FileLister(object): + + def __init__(self, config): + self.depth = int(config.get("depth", "0")) + + def to_dict(self): + return dict( + depth=self.depth + ) + + def unstructured_map(self, path): + depth = self.depth + if self.depth == 0: + return {path: basename(path)} + else: + while depth > 0: + path = dirname(path) + depth -= 1 + return dict([(join(path, f), f) for f in directory_files(path)]) + +DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) + +ACTION_CLASSES = [ + NoneAction, + RewriteAction, + TransferAction, + CopyAction, + RemoteCopyAction, + RemoteTransferAction, +] +actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) + + +__all__ = [ + FileActionMapper, + path_type, + from_dict, + MessageAction, + RemoteTransferAction, # For testing +] diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/amqp_exchange.py --- /dev/null +++ b/lib/pulsar/client/amqp_exchange.py @@ -0,0 +1,139 @@ +try: + import kombu + from kombu import pools +except ImportError: + kombu = None + +import socket +import logging +import threading +from time import sleep +log = logging.getLogger(__name__) + + +KOMBU_UNAVAILABLE = "Attempting to bind to AMQP message queue, but kombu dependency unavailable" + +DEFAULT_EXCHANGE_NAME = "pulsar" +DEFAULT_EXCHANGE_TYPE = "direct" +# Set timeout to periodically give up looking and check if polling should end. +DEFAULT_TIMEOUT = 0.2 +DEFAULT_HEARTBEAT = 580 + +DEFAULT_RECONNECT_CONSUMER_WAIT = 1 +DEFAULT_HEARTBEAT_WAIT = 1 + + +class PulsarExchange(object): + """ Utility for publishing and consuming structured Pulsar queues using kombu. + This is shared between the server and client - an exchange should be setup + for each manager (or in the case of the client, each manager one wished to + communicate with.) + + Each Pulsar manager is defined solely by name in the scheme, so only one Pulsar + should target each AMQP endpoint or care should be taken that unique + manager names are used across Pulsar servers targetting same AMQP endpoint - + and in particular only one such Pulsar should define an default manager with + name _default_. + """ + + def __init__( + self, + url, + manager_name, + connect_ssl=None, + timeout=DEFAULT_TIMEOUT, + publish_kwds={}, + ): + """ + """ + if not kombu: + raise Exception(KOMBU_UNAVAILABLE) + self.__url = url + self.__manager_name = manager_name + self.__connect_ssl = connect_ssl + self.__exchange = kombu.Exchange(DEFAULT_EXCHANGE_NAME, DEFAULT_EXCHANGE_TYPE) + self.__timeout = timeout + # Be sure to log message publishing failures. + if publish_kwds.get("retry", False): + if "retry_policy" not in publish_kwds: + publish_kwds["retry_policy"] = {} + if "errback" not in publish_kwds["retry_policy"]: + publish_kwds["retry_policy"]["errback"] = self.__publish_errback + self.__publish_kwds = publish_kwds + + @property + def url(self): + return self.__url + + def consume(self, queue_name, callback, check=True, connection_kwargs={}): + queue = self.__queue(queue_name) + log.debug("Consuming queue '%s'", queue) + while check: + heartbeat_thread = None + try: + with self.connection(self.__url, heartbeat=DEFAULT_HEARTBEAT, **connection_kwargs) as connection: + with kombu.Consumer(connection, queues=[queue], callbacks=[callback], accept=['json']): + heartbeat_thread = self.__start_heartbeat(queue_name, connection) + while check and connection.connected: + try: + connection.drain_events(timeout=self.__timeout) + except socket.timeout: + pass + except (IOError, socket.error), exc: + # In testing, errno is None + log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc) + if heartbeat_thread: + heartbeat_thread.join() + sleep(DEFAULT_RECONNECT_CONSUMER_WAIT) + + def heartbeat(self, connection): + log.debug('AMQP heartbeat thread alive') + while connection.connected: + connection.heartbeat_check() + sleep(DEFAULT_HEARTBEAT_WAIT) + log.debug('AMQP heartbeat thread exiting') + + def publish(self, name, payload): + with self.connection(self.__url) as connection: + with pools.producers[connection].acquire() as producer: + key = self.__queue_name(name) + producer.publish( + payload, + serializer='json', + exchange=self.__exchange, + declare=[self.__exchange], + routing_key=key, + **self.__publish_kwds + ) + + def __publish_errback(self, exc, interval): + log.error("Connection error while publishing: %r", exc, exc_info=1) + log.info("Retrying in %s seconds", interval) + + def connection(self, connection_string, **kwargs): + if "ssl" not in kwargs: + kwargs["ssl"] = self.__connect_ssl + return kombu.Connection(connection_string, **kwargs) + + def __queue(self, name): + queue_name = self.__queue_name(name) + queue = kombu.Queue(queue_name, self.__exchange, routing_key=queue_name) + return queue + + def __queue_name(self, name): + key_prefix = self.__key_prefix() + queue_name = '%s_%s' % (key_prefix, name) + return queue_name + + def __key_prefix(self): + if self.__manager_name == "_default_": + key_prefix = "pulsar_" + else: + key_prefix = "pulsar_%s_" % self.__manager_name + return key_prefix + + def __start_heartbeat(self, queue_name, connection): + thread_name = "consume-heartbeat-%s" % (self.__queue_name(queue_name)) + thread = threading.Thread(name=thread_name, target=self.heartbeat, args=(connection,)) + thread.start() + return thread diff -r 4ef50975607ee0d3b4490c9e7a5795d6a04802ee -r 566388d623749dad4259ea04cf44a5d858a56671 lib/pulsar/client/amqp_exchange_factory.py --- /dev/null +++ b/lib/pulsar/client/amqp_exchange_factory.py @@ -0,0 +1,41 @@ +from .amqp_exchange import PulsarExchange +from .util import filter_destination_params + + +def get_exchange(url, manager_name, params): + connect_ssl = parse_amqp_connect_ssl_params(params) + exchange_kwds = dict( + manager_name=manager_name, + connect_ssl=connect_ssl, + publish_kwds=parse_amqp_publish_kwds(params) + ) + timeout = params.get('amqp_consumer_timeout', False) + if timeout is not False: + exchange_kwds['timeout'] = timeout + exchange = PulsarExchange(url, **exchange_kwds) + return exchange + + +def parse_amqp_connect_ssl_params(params): + ssl_params = filter_destination_params(params, "amqp_connect_ssl_") + if not ssl_params: + return + + ssl = __import__('ssl') + if 'cert_reqs' in ssl_params: + value = ssl_params['cert_reqs'] + ssl_params['cert_reqs'] = getattr(ssl, value.upper()) + return ssl_params + + +def parse_amqp_publish_kwds(params): + all_publish_params = filter_destination_params(params, "amqp_publish_") + retry_policy_params = {} + for key in all_publish_params.keys(): + if key.startswith("retry_"): + value = all_publish_params[key] + retry_policy_params[key[len("retry_"):]] = value + del all_publish_params[key] + if retry_policy_params: + all_publish_params["retry_policy"] = retry_policy_params + return all_publish_params This diff is so big that we needed to truncate the remainder. Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org