7 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/90a490a788c1/ Changeset: 90a490a788c1 User: dannon Date: 2014-06-20 16:16:40 Summary: Prevent possible XSS in reset_password. Affected #: 1 file diff -r c972ce1e3e8840309dceaa40ac6858c3156fdff3 -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 lib/galaxy/webapps/galaxy/controllers/user.py --- a/lib/galaxy/webapps/galaxy/controllers/user.py +++ b/lib/galaxy/webapps/galaxy/controllers/user.py @@ -1092,7 +1092,7 @@ def reset_password( self, trans, email=None, **kwd ): if trans.app.config.smtp_server is None: return trans.show_error_message( "Mail is not configured for this Galaxy instance. Please contact your local Galaxy administrator." ) - message = util.restore_text( kwd.get( 'message', '' ) ) + message = util.sanitize_text(util.restore_text( kwd.get( 'message', '' ) )) status = 'done' if kwd.get( 'reset_password_button', False ): reset_user = trans.sa_session.query( trans.app.model.User ).filter( trans.app.model.User.table.c.email == email ).first() https://bitbucket.org/galaxy/galaxy-central/commits/7b4946192456/ Changeset: 7b4946192456 User: dannon Date: 2014-06-24 23:02:28 Summary: Merge. Affected #: 140 files diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d job_conf.xml.sample_advanced --- a/job_conf.xml.sample_advanced +++ b/job_conf.xml.sample_advanced @@ -19,27 +19,30 @@ <!-- Override the $DRMAA_LIBRARY_PATH environment variable --><param id="drmaa_library_path">/sge/lib/libdrmaa.so</param></plugin> - <plugin id="lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <!-- More information on LWR can be found at https://lwr.readthedocs.org --> - <!-- Uncomment following line to use libcurl to perform HTTP calls (defaults to urllib) --> + <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> + <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> + <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <!-- Pulsar runners (see more at https://pulsar.readthedocs.org) --> + <plugin id="pulsar_rest" type="runner" load="galaxy.jobs.runners.pulsar:PulsarRESTJobRunner"> + <!-- Allow optimized HTTP calls with libcurl (defaults to urllib) --><!-- <param id="transport">curl</param> --> - <!-- *Experimental Caching*: Uncomment next parameters to enable - caching and specify the number of caching threads to enable on Galaxy - side. Likely will not work with newer features such as MQ support. - If this is enabled be sure to specify a `file_cache_dir` in the remote - LWR's main configuration file. + + <!-- *Experimental Caching*: Next parameter enables caching. + Likely will not work with newer features such as MQ support. + + If this is enabled be sure to specify a `file_cache_dir` in + the remote Pulsar's servers main configuration file. --><!-- <param id="cache">True</param> --> - <!-- <param id="transfer_threads">2</param> --></plugin> - <plugin id="amqp_lwr" type="runner" load="galaxy.jobs.runners.lwr:LwrJobRunner"> - <param id="url">amqp://guest:guest@localhost:5672//</param> - <!-- If using message queue driven LWR - the LWR will generally - initiate file transfers so a the URL of this Galaxy instance - must be configured. --> + <plugin id="pulsar_mq" type="runner" load="galaxy.jobs.runners.pulsar:PulsarMQJobRunner"> + <!-- AMQP URL to connect to. --> + <param id="amqp_url">amqp://guest:guest@localhost:5672//</param> + <!-- URL remote Pulsar apps should transfer files to this Galaxy + instance to/from. --><param id="galaxy_url">http://localhost:8080</param> - <!-- If multiple managers configured on the LWR, specify which one - this plugin targets. --> + <!-- Pulsar job manager to communicate with (see Pulsar + docs for information on job managers). --><!-- <param id="manager">_default_</param> --><!-- The AMQP client can provide an SSL client certificate (e.g. for validation), the following options configure that certificate @@ -58,9 +61,17 @@ higher value (in seconds) (or `None` to use blocking connections). --><!-- <param id="amqp_consumer_timeout">None</param> --></plugin> - <plugin id="cli" type="runner" load="galaxy.jobs.runners.cli:ShellJobRunner" /> - <plugin id="condor" type="runner" load="galaxy.jobs.runners.condor:CondorJobRunner" /> - <plugin id="slurm" type="runner" load="galaxy.jobs.runners.slurm:SlurmJobRunner" /> + <plugin id="pulsar_legacy" type="runner" load="galaxy.jobs.runners.pulsar:PulsarLegacyJobRunner"> + <!-- Pulsar job runner with default parameters matching those + of old LWR job runner. If your Pulsar server is running on a + Windows machine for instance this runner should still be used. + + These destinations still needs to target a Pulsar server, + older LWR plugins and destinations still work in Galaxy can + target LWR servers, but this support should be considered + deprecated and will disappear with a future release of Galaxy. + --> + </plugin></plugins><handlers default="handlers"><!-- Additional job handlers - the id should match the name of a @@ -125,8 +136,8 @@ $galaxy_root:ro,$tool_directory:ro,$working_directory:rw,$default_file_path:ro - If using the LWR, defaults will be even further restricted because the - LWR will (by default) stage all needed inputs into the job's job_directory + If using the Pulsar, defaults will be even further restricted because the + Pulsar will (by default) stage all needed inputs into the job's job_directory (so there is not need to allow the docker container to read all the files - let alone write over them). Defaults in this case becomes: @@ -135,7 +146,7 @@ Python string.Template is used to expand volumes and values $defaults, $galaxy_root, $default_file_path, $tool_directory, $working_directory, are available to all jobs and $job_directory is also available for - LWR jobs. + Pulsar jobs. --><!-- Control memory allocatable by docker container with following option: --> @@ -178,7 +189,7 @@ does trust tool's specified container - but also wants tool's not configured to run in a container the following option can provide a fallback. --> - <!-- <param id="dockers_default_container_id">busybox:ubuntu-14.04</param> --> + <!-- <param id="docker_default_container_id">busybox:ubuntu-14.04</param> --></destination><destination id="pbs" runner="pbs" tags="mycluster"/> @@ -213,87 +224,71 @@ <!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination> - <destination id="secure_lwr" runner="lwr"> - <param id="url">https://windowshost.examle.com:8913/</param> - <!-- If set, private_token must match token remote LWR server configured with. --> + <destination id="secure_pulsar_rest_dest" runner="pulsar_rest"> + <param id="url">https://examle.com:8913/</param> + <!-- If set, private_token must match token in remote Pulsar's + configuration. --><param id="private_token">123456789changeme</param><!-- Uncomment the following statement to disable file staging (e.g. - if there is a shared file system between Galaxy and the LWR + if there is a shared file system between Galaxy and the Pulsar server). Alternatively action can be set to 'copy' - to replace http transfers with file system copies, 'remote_transfer' to cause - the lwr to initiate HTTP transfers instead of Galaxy, or - 'remote_copy' to cause lwr to initiate file system copies. + the Pulsar to initiate HTTP transfers instead of Galaxy, or + 'remote_copy' to cause Pulsar to initiate file system copies. If setting this to 'remote_transfer' be sure to specify a 'galaxy_url' attribute on the runner plugin above. --><!-- <param id="default_file_action">none</param> --><!-- The above option is just the default, the transfer behavior none|copy|http can be configured on a per path basis via the - following file. See lib/galaxy/jobs/runners/lwr_client/action_mapper.py - for examples of how to configure this file. This is very beta - and nature of file will likely change. + following file. See Pulsar documentation for more details and + examples. --> - <!-- <param id="file_action_config">file_actions.json</param> --> - <!-- Uncomment following option to disable Galaxy tool dependency - resolution and utilize remote LWR's configuraiton of tool - dependency resolution instead (same options as Galaxy for - dependency resolution are available in LWR). At a minimum - the remote LWR server should define a tool_dependencies_dir in - its `server.ini` configuration. The LWR will not attempt to - stage dependencies - so ensure the the required galaxy or tool - shed packages are available remotely (exact same tool shed - installed changesets are required). + <!-- <param id="file_action_config">file_actions.yaml</param> --> + <!-- The non-legacy Pulsar runners will attempt to resolve Galaxy + dependencies remotely - to enable this set a tool_dependency_dir + in Pulsar's configuration (can work with all the same dependency + resolutions mechanisms as Galaxy - tool Shed installs, Galaxy + packages, etc...). To disable this behavior, set the follow parameter + to none. To generate the dependency resolution command locally + set the following parameter local. --> - <!-- <param id="dependency_resolution">remote</params> --> - <!-- Traditionally, the LWR allow Galaxy to generate a command line - as if it were going to run the command locally and then the - LWR client rewrites it after the fact using regular - expressions. Setting the following value to true causes the - LWR runner to insert itself into the command line generation - process and generate the correct command line from the get go. - This will likely be the default someday - but requires a newer - LWR version and is less well tested. --> - <!-- <param id="rewrite_parameters">true</params> --> + <!-- <param id="dependency_resolution">none</params> --><!-- Uncomment following option to enable setting metadata on remote - LWR server. The 'use_remote_datatypes' option is available for + Pulsar server. The 'use_remote_datatypes' option is available for determining whether to use remotely configured datatypes or local ones (both alternatives are a little brittle). --><!-- <param id="remote_metadata">true</param> --><!-- <param id="use_remote_datatypes">false</param> --><!-- <param id="remote_property_galaxy_home">/path/to/remote/galaxy-central</param> --> - <!-- If remote LWR server is configured to run jobs as the real user, + <!-- If remote Pulsar server is configured to run jobs as the real user, uncomment the following line to pass the current Galaxy user along. --><!-- <param id="submit_user">$__user_name__</param> --> - <!-- Various other submission parameters can be passed along to the LWR - whose use will depend on the remote LWR's configured job manager. + <!-- Various other submission parameters can be passed along to the Pulsar + whose use will depend on the remote Pulsar's configured job manager. For instance: --> - <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- <param id="submit_native_specification">-P bignodes -R y -pe threads 8</param> --> + <!-- Disable parameter rewriting and rewrite generated commands + instead. This may be required if remote host is Windows machine + but probably not otherwise. + --> + <!-- <param id="rewrite_parameters">false</params> --></destination> - <destination id="amqp_lwr_dest" runner="amqp_lwr" > - <!-- url and private_token are not valid when using MQ driven LWR. The plugin above - determines which queue/manager to target and the underlying MQ server should be - used to configure security. - --> - <!-- Traditionally, the LWR client sends request to LWR - server to populate various system properties. This + <destination id="pulsar_mq_dest" runner="amqp_pulsar" > + <!-- The RESTful Pulsar client sends a request to Pulsar + to populate various system properties. This extra step can be disabled and these calculated here on client by uncommenting jobs_directory and specifying any additional remote_property_ of interest, this is not optional when using message queues. --> - <param id="jobs_directory">/path/to/remote/lwr/lwr_staging/</param> - <!-- Default the LWR send files to and pull files from Galaxy when - using message queues (in the more traditional mode Galaxy sends - files to and pull files from the LWR - this is obviously less - appropriate when using a message queue). - - The default_file_action currently requires pycurl be available - to Galaxy (presumably in its virtualenv). Making this dependency - optional is an open task. + <param id="jobs_directory">/path/to/remote/pulsar/files/staging/</param> + <!-- Otherwise MQ and Legacy pulsar destinations can be supplied + all the same destination parameters as the RESTful client documented + above (though url and private_token are ignored when using a MQ). --> - <param id="default_file_action">remote_transfer</param></destination><destination id="ssh_torque" runner="cli"><param id="shell_plugin">SecureShell</param> diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -13,6 +13,7 @@ from galaxy import model from galaxy.util.sleeper import Sleeper from galaxy.jobs import JobWrapper, TaskWrapper, JobDestination +from galaxy.jobs.mapper import JobNotReadyException log = logging.getLogger( __name__ ) @@ -263,7 +264,7 @@ try: # Check the job's dependencies, requeue if they're not done. # Some of these states will only happen when using the in-memory job queue - job_state = self.__check_if_ready_to_run( job ) + job_state = self.__check_job_state( job ) if job_state == JOB_WAIT: new_waiting_jobs.append( job.id ) elif job_state == JOB_INPUT_ERROR: @@ -304,7 +305,7 @@ # Done with the session self.sa_session.remove() - def __check_if_ready_to_run( self, job ): + def __check_job_state( self, job ): """ Check if a job is ready to run by verifying that each of its input datasets is ready (specifically in the OK state). If any input dataset @@ -314,62 +315,97 @@ job can be dispatched. Otherwise, return JOB_WAIT indicating that input datasets are still being prepared. """ - # If tracking in the database, job.state is guaranteed to be NEW and the inputs are guaranteed to be OK if not self.track_jobs_in_database: - if job.state == model.Job.states.DELETED: - return JOB_DELETED - elif job.state == model.Job.states.ERROR: - return JOB_ADMIN_DELETED - for dataset_assoc in job.input_datasets + job.input_library_datasets: - idata = dataset_assoc.dataset - if not idata: - continue - # don't run jobs for which the input dataset was deleted - if idata.deleted: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) - return JOB_INPUT_DELETED - # an error in the input data causes us to bail immediately - elif idata.state == idata.states.ERROR: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state == idata.states.FAILED_METADATA: - self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) - return JOB_INPUT_ERROR - elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): - # need to requeue - return JOB_WAIT + in_memory_not_ready_state = self.__verify_in_memory_job_inputs( job ) + if in_memory_not_ready_state: + return in_memory_not_ready_state + + # Else, if tracking in the database, job.state is guaranteed to be NEW and + # the inputs are guaranteed to be OK. + # Create the job wrapper so that the destination can be set - if job.id not in self.job_wrappers: - self.job_wrappers[job.id] = self.job_wrapper( job ) - # Cause the job_destination to be set and cached by the mapper + job_id = job.id + job_wrapper = self.job_wrappers.get( job_id, None ) + if not job_wrapper: + job_wrapper = self.job_wrapper( job ) + self.job_wrappers[ job_id ] = job_wrapper + + # If state == JOB_READY, assume job_destination also set - otherwise + # in case of various error or cancelled states do not assume + # destination has been set. + state, job_destination = self.__verify_job_ready( job, job_wrapper ) + + if state == JOB_READY: + # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration + self.increase_running_job_count(job.user_id, job_destination.id ) + return state + + def __verify_job_ready( self, job, job_wrapper ): + """ Compute job destination and verify job is ready at that + destination by checking job limits and quota. If this method + return a job state of JOB_READY - it MUST also return a job + destination. + """ + job_destination = None try: - self.job_wrappers[job.id].job_destination + # Cause the job_destination to be set and cached by the mapper + job_destination = job_wrapper.job_destination + except JobNotReadyException as e: + job_state = e.job_state or JOB_WAIT + return job_state, None except Exception, e: failure_message = getattr( e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE ) if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE: log.exception( 'Failed to generate job destination' ) else: log.debug( "Intentionally failing job with message (%s)" % failure_message ) - self.job_wrappers[job.id].fail( failure_message ) - return JOB_ERROR + job_wrapper.fail( failure_message ) + return JOB_ERROR, job_destination # job is ready to run, check limits # TODO: these checks should be refactored to minimize duplication and made more modular/pluggable - state = self.__check_destination_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_destination_jobs( job, job_wrapper ) if state == JOB_READY: - state = self.__check_user_jobs( job, self.job_wrappers[job.id] ) + state = self.__check_user_jobs( job, job_wrapper ) if state == JOB_READY and self.app.config.enable_quotas: quota = self.app.quota_agent.get_quota( job.user ) if quota is not None: try: usage = self.app.quota_agent.get_usage( user=job.user, history=job.history ) if usage > quota: - return JOB_USER_OVER_QUOTA + return JOB_USER_OVER_QUOTA, job_destination except AssertionError, e: pass # No history, should not happen with an anon user - if state == JOB_READY: - # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration - self.increase_running_job_count(job.user_id, self.job_wrappers[job.id].job_destination.id) - return state + return state, job_destination + + def __verify_in_memory_job_inputs( self, job ): + """ Perform the same checks that happen via SQL for in-memory managed + jobs. + """ + if job.state == model.Job.states.DELETED: + return JOB_DELETED + elif job.state == model.Job.states.ERROR: + return JOB_ADMIN_DELETED + for dataset_assoc in job.input_datasets + job.input_library_datasets: + idata = dataset_assoc.dataset + if not idata: + continue + # don't run jobs for which the input dataset was deleted + if idata.deleted: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) ) + return JOB_INPUT_DELETED + # an error in the input data causes us to bail immediately + elif idata.state == idata.states.ERROR: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s is in error state" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state == idata.states.FAILED_METADATA: + self.job_wrappers.pop(job.id, self.job_wrapper( job )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) ) + return JOB_INPUT_ERROR + elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ): + # need to requeue + return JOB_WAIT + + # All inputs ready to go. + return None def __clear_job_count( self ): self.user_job_count = None diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/mapper.py --- a/lib/galaxy/jobs/mapper.py +++ b/lib/galaxy/jobs/mapper.py @@ -16,6 +16,13 @@ self.failure_message = failure_message +class JobNotReadyException( Exception ): + + def __init__( self, job_state=None, message=None ): + self.job_state = job_state + self.message = message + + class JobRunnerMapper( object ): """ This class is responsible to managing the mapping of jobs diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/runners/pulsar.py --- /dev/null +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -0,0 +1,707 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. + +import logging + +from galaxy import model +from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner +from galaxy.jobs import ComputeEnvironment +from galaxy.jobs import JobDestination +from galaxy.jobs.command_factory import build_command +from galaxy.tools.deps import dependencies +from galaxy.util import string_as_bool_or_none +from galaxy.util.bunch import Bunch +from galaxy.util import specs + +import errno +from time import sleep +import os + +from pulsar.client import build_client_manager +from pulsar.client import url_to_destination_params +from pulsar.client import finish_job as pulsar_finish_job +from pulsar.client import submit_job as pulsar_submit_job +from pulsar.client import ClientJobDescription +from pulsar.client import PulsarOutputs +from pulsar.client import ClientOutputs +from pulsar.client import PathMapper + +log = logging.getLogger( __name__ ) + +__all__ = [ 'PulsarLegacyJobRunner', 'PulsarRESTJobRunner', 'PulsarMQJobRunner' ] + +NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory." +NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml." +GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server." + +# Is there a good way to infer some default for this? Can only use +# url_for from web threads. https://gist.github.com/jmchilton/9098762 +DEFAULT_GALAXY_URL = "http://localhost:8080" + +PULSAR_PARAM_SPECS = dict( + transport=dict( + map=specs.to_str_or_none, + valid=specs.is_in("urllib", "curl", None), + default=None + ), + cache=dict( + map=specs.to_bool_or_none, + default=None, + ), + amqp_url=dict( + map=specs.to_str_or_none, + default=None, + ), + galaxy_url=dict( + map=specs.to_str_or_none, + default=DEFAULT_GALAXY_URL, + ), + manager=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_consumer_timeout=dict( + map=lambda val: None if val == "None" else float(val), + default=None, + ), + amqp_connect_ssl_ca_certs=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_keyfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_certfile=dict( + map=specs.to_str_or_none, + default=None, + ), + amqp_connect_ssl_cert_reqs=dict( + map=specs.to_str_or_none, + default=None, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Producer.p... + amqp_publish_retry=dict( + map=specs.to_bool, + default=False, + ), + amqp_publish_priority=dict( + map=int, + valid=lambda x: 0 <= x and x <= 9, + default=0, + ), + # http://kombu.readthedocs.org/en/latest/reference/kombu.html#kombu.Exchange.d... + amqp_publish_delivery_mode=dict( + map=str, + valid=specs.is_in("transient", "persistent"), + default="persistent", + ), + amqp_publish_retry_max_retries=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_start=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_step=dict( + map=int, + default=None, + ), + amqp_publish_retry_interval_max=dict( + map=int, + default=None, + ), +) + + +PARAMETER_SPECIFICATION_REQUIRED = object() +PARAMETER_SPECIFICATION_IGNORED = object() + + +class PulsarJobRunner( AsynchronousJobRunner ): + """ + Pulsar Job Runner + """ + runner_name = "PulsarJobRunner" + + def __init__( self, app, nworkers, **kwds ): + """Start the job runner """ + super( PulsarJobRunner, self ).__init__( app, nworkers, runner_param_specs=PULSAR_PARAM_SPECS, **kwds ) + self._init_worker_threads() + galaxy_url = self.runner_params.galaxy_url + if galaxy_url: + galaxy_url = galaxy_url.rstrip("/") + self.galaxy_url = galaxy_url + self.__init_client_manager() + self._monitor() + + def _monitor( self ): + # Extension point allow MQ variant to setup callback instead + self._init_monitor_thread() + + def __init_client_manager( self ): + client_manager_kwargs = {} + for kwd in 'manager', 'cache', 'transport': + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + for kwd in self.runner_params.keys(): + if kwd.startswith( 'amqp_' ): + client_manager_kwargs[ kwd ] = self.runner_params[ kwd ] + self.client_manager = build_client_manager(**client_manager_kwargs) + + def url_to_destination( self, url ): + """Convert a legacy URL to a job destination""" + return JobDestination( runner="pulsar", params=url_to_destination_params( url ) ) + + def check_watched_item(self, job_state): + try: + client = self.get_client_from_state(job_state) + status = client.get_status() + except Exception: + # An orphaned job was put into the queue at app startup, so remote server went down + # either way we are done I guess. + self.mark_as_finished(job_state) + return None + job_state = self._update_job_state_for_status(job_state, status) + return job_state + + def _update_job_state_for_status(self, job_state, pulsar_status): + if pulsar_status == "complete": + self.mark_as_finished(job_state) + return None + if pulsar_status == "failed": + self.fail_job(job_state) + return None + if pulsar_status == "running" and not job_state.running: + job_state.running = True + job_state.job_wrapper.change_state( model.Job.states.RUNNING ) + return job_state + + def queue_job(self, job_wrapper): + job_destination = job_wrapper.job_destination + self._populate_parameter_defaults( job_destination ) + + command_line, client, remote_job_config, compute_environment = self.__prepare_job( job_wrapper, job_destination ) + + if not command_line: + return + + try: + dependencies_description = PulsarJobRunner.__dependencies_description( client, job_wrapper ) + rewrite_paths = not PulsarJobRunner.__rewrite_parameters( client ) + unstructured_path_rewrites = {} + if compute_environment: + unstructured_path_rewrites = compute_environment.unstructured_path_rewrites + + client_job_description = ClientJobDescription( + command_line=command_line, + input_files=self.get_input_files(job_wrapper), + client_outputs=self.__client_outputs(client, job_wrapper), + working_directory=job_wrapper.working_directory, + tool=job_wrapper.tool, + config_files=job_wrapper.extra_filenames, + dependencies_description=dependencies_description, + env=client.env, + rewrite_paths=rewrite_paths, + arbitrary_files=unstructured_path_rewrites, + ) + job_id = pulsar_submit_job(client, client_job_description, remote_job_config) + log.info("Pulsar job submitted with job_id %s" % job_id) + job_wrapper.set_job_destination( job_destination, job_id ) + job_wrapper.change_state( model.Job.states.QUEUED ) + except Exception: + job_wrapper.fail( "failure running job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + return + + pulsar_job_state = AsynchronousJobState() + pulsar_job_state.job_wrapper = job_wrapper + pulsar_job_state.job_id = job_id + pulsar_job_state.old_state = True + pulsar_job_state.running = False + pulsar_job_state.job_destination = job_destination + self.monitor_job(pulsar_job_state) + + def __prepare_job(self, job_wrapper, job_destination): + """ Build command-line and Pulsar client for this job. """ + command_line = None + client = None + remote_job_config = None + compute_environment = None + try: + client = self.get_client_from_wrapper(job_wrapper) + tool = job_wrapper.tool + remote_job_config = client.setup(tool.id, tool.version) + rewrite_parameters = PulsarJobRunner.__rewrite_parameters( client ) + prepare_kwds = {} + if rewrite_parameters: + compute_environment = PulsarComputeEnvironment( client, job_wrapper, remote_job_config ) + prepare_kwds[ 'compute_environment' ] = compute_environment + job_wrapper.prepare( **prepare_kwds ) + self.__prepare_input_files_locally(job_wrapper) + remote_metadata = PulsarJobRunner.__remote_metadata( client ) + dependency_resolution = PulsarJobRunner.__dependency_resolution( client ) + metadata_kwds = self.__build_metadata_configuration(client, job_wrapper, remote_metadata, remote_job_config) + remote_command_params = dict( + working_directory=remote_job_config['working_directory'], + metadata_kwds=metadata_kwds, + dependency_resolution=dependency_resolution, + ) + remote_working_directory = remote_job_config['working_directory'] + # TODO: Following defs work for Pulsar, always worked for Pulsar but should be + # calculated at some other level. + remote_job_directory = os.path.abspath(os.path.join(remote_working_directory, os.path.pardir)) + remote_tool_directory = os.path.abspath(os.path.join(remote_job_directory, "tool_files")) + container = self._find_container( + job_wrapper, + compute_working_directory=remote_working_directory, + compute_tool_directory=remote_tool_directory, + compute_job_directory=remote_job_directory, + ) + command_line = build_command( + self, + job_wrapper=job_wrapper, + container=container, + include_metadata=remote_metadata, + include_work_dir_outputs=False, + remote_command_params=remote_command_params, + ) + except Exception: + job_wrapper.fail( "failure preparing job", exception=True ) + log.exception("failure running job %d" % job_wrapper.job_id) + + # If we were able to get a command line, run the job + if not command_line: + job_wrapper.finish( '', '' ) + + return command_line, client, remote_job_config, compute_environment + + def __prepare_input_files_locally(self, job_wrapper): + """Run task splitting commands locally.""" + prepare_input_files_cmds = getattr(job_wrapper, 'prepare_input_files_cmds', None) + if prepare_input_files_cmds is not None: + for cmd in prepare_input_files_cmds: # run the commands to stage the input files + if 0 != os.system(cmd): + raise Exception('Error running file staging command: %s' % cmd) + job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line + + def _populate_parameter_defaults( self, job_destination ): + updated = False + params = job_destination.params + for key, value in self.destination_defaults.iteritems(): + if key in params: + if value is PARAMETER_SPECIFICATION_IGNORED: + log.warn( "Pulsar runner in selected configuration ignores parameter %s" % key ) + continue + #if self.runner_params.get( key, None ): + # # Let plugin define defaults for some parameters - + # # for instance that way jobs_directory can be + # # configured next to AMQP url (where it belongs). + # params[ key ] = self.runner_params[ key ] + # continue + + if not value: + continue + + if value is PARAMETER_SPECIFICATION_REQUIRED: + raise Exception( "Pulsar destination does not define required parameter %s" % key ) + elif value is not PARAMETER_SPECIFICATION_IGNORED: + params[ key ] = value + updated = True + return updated + + def get_output_files(self, job_wrapper): + output_paths = job_wrapper.get_output_fnames() + return [ str( o ) for o in output_paths ] # Force job_path from DatasetPath objects. + + def get_input_files(self, job_wrapper): + input_paths = job_wrapper.get_input_paths() + return [ str( i ) for i in input_paths ] # Force job_path from DatasetPath objects. + + def get_client_from_wrapper(self, job_wrapper): + job_id = job_wrapper.job_id + if hasattr(job_wrapper, 'task_id'): + job_id = "%s_%s" % (job_id, job_wrapper.task_id) + params = job_wrapper.job_destination.params.copy() + for key, value in params.iteritems(): + if value: + params[key] = model.User.expand_user_properties( job_wrapper.get_job().user, value ) + + env = getattr( job_wrapper.job_destination, "env", [] ) + return self.get_client( params, job_id, env ) + + def get_client_from_state(self, job_state): + job_destination_params = job_state.job_destination.params + job_id = job_state.job_id + return self.get_client( job_destination_params, job_id ) + + def get_client( self, job_destination_params, job_id, env=[] ): + # Cannot use url_for outside of web thread. + #files_endpoint = url_for( controller="job_files", job_id=encoded_job_id ) + + encoded_job_id = self.app.security.encode_id(job_id) + job_key = self.app.security.encode_id( job_id, kind="jobs_files" ) + files_endpoint = "%s/api/jobs/%s/files?job_key=%s" % ( + self.galaxy_url, + encoded_job_id, + job_key + ) + get_client_kwds = dict( + job_id=str( job_id ), + files_endpoint=files_endpoint, + env=env + ) + return self.client_manager.get_client( job_destination_params, **get_client_kwds ) + + def finish_job( self, job_state ): + stderr = stdout = '' + job_wrapper = job_state.job_wrapper + try: + client = self.get_client_from_state(job_state) + run_results = client.full_status() + remote_working_directory = run_results.get("working_directory", None) + stdout = run_results.get('stdout', '') + stderr = run_results.get('stderr', '') + exit_code = run_results.get('returncode', None) + pulsar_outputs = PulsarOutputs.from_status_response(run_results) + # Use Pulsar client code to transfer/copy files back + # and cleanup job if needed. + completed_normally = \ + job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ] + cleanup_job = self.app.config.cleanup_job + client_outputs = self.__client_outputs(client, job_wrapper) + finish_args = dict( client=client, + job_completed_normally=completed_normally, + cleanup_job=cleanup_job, + client_outputs=client_outputs, + pulsar_outputs=pulsar_outputs ) + failed = pulsar_finish_job( **finish_args ) + if failed: + job_wrapper.fail("Failed to find or download one or more job outputs from remote server.", exception=True) + except Exception: + message = GENERIC_REMOTE_ERROR + job_wrapper.fail( message, exception=True ) + log.exception("failure finishing job %d" % job_wrapper.job_id) + return + if not PulsarJobRunner.__remote_metadata( client ): + self._handle_metadata_externally( job_wrapper, resolve_requirements=True ) + # Finish the job + try: + job_wrapper.finish( + stdout, + stderr, + exit_code, + remote_working_directory=remote_working_directory + ) + except Exception: + log.exception("Job wrapper finish method failed") + job_wrapper.fail("Unable to finish job", exception=True) + + def fail_job( self, job_state ): + """ + Seperated out so we can use the worker threads for it. + """ + self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) + job_state.job_wrapper.fail( getattr( job_state, "fail_message", GENERIC_REMOTE_ERROR ) ) + + def check_pid( self, pid ): + try: + os.kill( pid, 0 ) + return True + except OSError, e: + if e.errno == errno.ESRCH: + log.debug( "check_pid(): PID %d is dead" % pid ) + else: + log.warning( "check_pid(): Got errno %s when attempting to check PID %d: %s" % ( errno.errorcode[e.errno], pid, e.strerror ) ) + return False + + def stop_job( self, job ): + #if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished + job_ext_output_metadata = job.get_external_output_metadata() + if job_ext_output_metadata: + pid = job_ext_output_metadata[0].job_runner_external_pid # every JobExternalOutputMetadata has a pid set, we just need to take from one of them + if pid in [ None, '' ]: + log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id ) + return + pid = int( pid ) + if not self.check_pid( pid ): + log.warning( "stop_job(): %s: PID %d was already dead or can't be signaled" % ( job.id, pid ) ) + return + for sig in [ 15, 9 ]: + try: + os.killpg( pid, sig ) + except OSError, e: + log.warning( "stop_job(): %s: Got errno %s when attempting to signal %d to PID %d: %s" % ( job.id, errno.errorcode[e.errno], sig, pid, e.strerror ) ) + return # give up + sleep( 2 ) + if not self.check_pid( pid ): + log.debug( "stop_job(): %s: PID %d successfully killed with signal %d" % ( job.id, pid, sig ) ) + return + else: + log.warning( "stop_job(): %s: PID %d refuses to die after signaling TERM/KILL" % ( job.id, pid ) ) + else: + # Remote kill + pulsar_url = job.job_runner_name + job_id = job.job_runner_external_id + log.debug("Attempt remote Pulsar kill of job with url %s and id %s" % (pulsar_url, job_id)) + client = self.get_client(job.destination_params, job_id) + client.kill() + + def recover( self, job, job_wrapper ): + """Recovers jobs stuck in the queued/running state when Galaxy started""" + job_state = self._job_state( job, job_wrapper ) + job_wrapper.command_line = job.get_command_line() + state = job.get_state() + if state in [model.Job.states.RUNNING, model.Job.states.QUEUED]: + log.debug( "(Pulsar/%s) is still in running state, adding to the Pulsar queue" % ( job.get_id()) ) + job_state.old_state = True + job_state.running = state == model.Job.states.RUNNING + self.monitor_queue.put( job_state ) + + def shutdown( self ): + super( PulsarJobRunner, self ).shutdown() + self.client_manager.shutdown() + + def _job_state( self, job, job_wrapper ): + job_state = AsynchronousJobState() + # TODO: Determine why this is set when using normal message queue updates + # but not CLI submitted MQ updates... + raw_job_id = job.get_job_runner_external_id() or job_wrapper.job_id + job_state.job_id = str( raw_job_id ) + job_state.runner_url = job_wrapper.get_job_runner_url() + job_state.job_destination = job_wrapper.job_destination + job_state.job_wrapper = job_wrapper + return job_state + + def __client_outputs( self, client, job_wrapper ): + work_dir_outputs = self.get_work_dir_outputs( job_wrapper ) + output_files = self.get_output_files( job_wrapper ) + client_outputs = ClientOutputs( + working_directory=job_wrapper.working_directory, + work_dir_outputs=work_dir_outputs, + output_files=output_files, + version_file=job_wrapper.get_version_string_path(), + ) + return client_outputs + + @staticmethod + def __dependencies_description( pulsar_client, job_wrapper ): + dependency_resolution = PulsarJobRunner.__dependency_resolution( pulsar_client ) + remote_dependency_resolution = dependency_resolution == "remote" + if not remote_dependency_resolution: + return None + requirements = job_wrapper.tool.requirements or [] + installed_tool_dependencies = job_wrapper.tool.installed_tool_dependencies or [] + return dependencies.DependenciesDescription( + requirements=requirements, + installed_tool_dependencies=installed_tool_dependencies, + ) + + @staticmethod + def __dependency_resolution( pulsar_client ): + dependency_resolution = pulsar_client.destination_params.get( "dependency_resolution", "local" ) + if dependency_resolution not in ["none", "local", "remote"]: + raise Exception("Unknown dependency_resolution value encountered %s" % dependency_resolution) + return dependency_resolution + + @staticmethod + def __remote_metadata( pulsar_client ): + remote_metadata = string_as_bool_or_none( pulsar_client.destination_params.get( "remote_metadata", False ) ) + return remote_metadata + + @staticmethod + def __use_remote_datatypes_conf( pulsar_client ): + """ When setting remote metadata, use integrated datatypes from this + Galaxy instance or use the datatypes config configured via the remote + Pulsar. + + Both options are broken in different ways for same reason - datatypes + may not match. One can push the local datatypes config to the remote + server - but there is no guarentee these datatypes will be defined + there. Alternatively, one can use the remote datatype config - but + there is no guarentee that it will contain all the datatypes available + to this Galaxy. + """ + use_remote_datatypes = string_as_bool_or_none( pulsar_client.destination_params.get( "use_remote_datatypes", False ) ) + return use_remote_datatypes + + @staticmethod + def __rewrite_parameters( pulsar_client ): + return string_as_bool_or_none( pulsar_client.destination_params.get( "rewrite_parameters", False ) ) or False + + def __build_metadata_configuration(self, client, job_wrapper, remote_metadata, remote_job_config): + metadata_kwds = {} + if remote_metadata: + remote_system_properties = remote_job_config.get("system_properties", {}) + remote_galaxy_home = remote_system_properties.get("galaxy_home", None) + if not remote_galaxy_home: + raise Exception(NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE) + metadata_kwds['exec_dir'] = remote_galaxy_home + outputs_directory = remote_job_config['outputs_directory'] + configs_directory = remote_job_config['configs_directory'] + working_directory = remote_job_config['working_directory'] + # For metadata calculation, we need to build a list of of output + # file objects with real path indicating location on Galaxy server + # and false path indicating location on compute server. Since the + # Pulsar disables from_work_dir copying as part of the job command + # line we need to take the list of output locations on the Pulsar + # server (produced by self.get_output_files(job_wrapper)) and for + # each work_dir output substitute the effective path on the Pulsar + # server relative to the remote working directory as the + # false_path to send the metadata command generation module. + work_dir_outputs = self.get_work_dir_outputs(job_wrapper, job_working_directory=working_directory) + outputs = [Bunch(false_path=os.path.join(outputs_directory, os.path.basename(path)), real_path=path) for path in self.get_output_files(job_wrapper)] + for output in outputs: + for pulsar_workdir_path, real_path in work_dir_outputs: + if real_path == output.real_path: + output.false_path = pulsar_workdir_path + metadata_kwds['output_fnames'] = outputs + metadata_kwds['compute_tmp_dir'] = working_directory + metadata_kwds['config_root'] = remote_galaxy_home + default_config_file = os.path.join(remote_galaxy_home, 'universe_wsgi.ini') + metadata_kwds['config_file'] = remote_system_properties.get('galaxy_config_file', default_config_file) + metadata_kwds['dataset_files_path'] = remote_system_properties.get('galaxy_dataset_files_path', None) + if PulsarJobRunner.__use_remote_datatypes_conf( client ): + remote_datatypes_config = remote_system_properties.get('galaxy_datatypes_config_file', None) + if not remote_datatypes_config: + log.warn(NO_REMOTE_DATATYPES_CONFIG) + remote_datatypes_config = os.path.join(remote_galaxy_home, 'datatypes_conf.xml') + metadata_kwds['datatypes_config'] = remote_datatypes_config + else: + integrates_datatypes_config = self.app.datatypes_registry.integrated_datatypes_configs + # Ensure this file gets pushed out to the remote config dir. + job_wrapper.extra_filenames.append(integrates_datatypes_config) + + metadata_kwds['datatypes_config'] = os.path.join(configs_directory, os.path.basename(integrates_datatypes_config)) + return metadata_kwds + + +class PulsarLegacyJobRunner( PulsarJobRunner ): + destination_defaults = dict( + rewrite_parameters="false", + dependency_resolution="local", + ) + + +class PulsarMQJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="remote_transfer", + rewrite_parameters="true", + dependency_resolution="remote", + jobs_directory=PARAMETER_SPECIFICATION_REQUIRED, + url=PARAMETER_SPECIFICATION_IGNORED, + private_token=PARAMETER_SPECIFICATION_IGNORED + ) + + def _monitor( self ): + # This is a message queue driven runner, don't monitor + # just setup required callback. + self.client_manager.ensure_has_status_update_callback(self.__async_update) + + def __async_update( self, full_status ): + job_id = None + try: + job_id = full_status[ "job_id" ] + job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id( job_id ) + job_state = self._job_state( job, job_wrapper ) + self._update_job_state_for_status(job_state, full_status[ "status" ] ) + except Exception: + log.exception( "Failed to update Pulsar job status for job_id %s" % job_id ) + raise + # Nothing else to do? - Attempt to fail the job? + + +class PulsarRESTJobRunner( PulsarJobRunner ): + destination_defaults = dict( + default_file_action="transfer", + rewrite_parameters="true", + dependency_resolution="remote", + url=PARAMETER_SPECIFICATION_REQUIRED, + ) + + +class PulsarComputeEnvironment( ComputeEnvironment ): + + def __init__( self, pulsar_client, job_wrapper, remote_job_config ): + self.pulsar_client = pulsar_client + self.job_wrapper = job_wrapper + self.local_path_config = job_wrapper.default_compute_environment() + self.unstructured_path_rewrites = {} + # job_wrapper.prepare is going to expunge the job backing the following + # computations, so precalculate these paths. + self._wrapper_input_paths = self.local_path_config.input_paths() + self._wrapper_output_paths = self.local_path_config.output_paths() + self.path_mapper = PathMapper(pulsar_client, remote_job_config, self.local_path_config.working_directory()) + self._config_directory = remote_job_config[ "configs_directory" ] + self._working_directory = remote_job_config[ "working_directory" ] + self._sep = remote_job_config[ "system_properties" ][ "separator" ] + self._tool_dir = remote_job_config[ "tools_directory" ] + version_path = self.local_path_config.version_path() + new_version_path = self.path_mapper.remote_version_path_rewrite(version_path) + if new_version_path: + version_path = new_version_path + self._version_path = version_path + + def output_paths( self ): + local_output_paths = self._wrapper_output_paths + + results = [] + for local_output_path in local_output_paths: + wrapper_path = str( local_output_path ) + remote_path = self.path_mapper.remote_output_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_output_path, remote_path ) ) + return results + + def input_paths( self ): + local_input_paths = self._wrapper_input_paths + + results = [] + for local_input_path in local_input_paths: + wrapper_path = str( local_input_path ) + # This will over-copy in some cases. For instance in the case of task + # splitting, this input will be copied even though only the work dir + # input will actually be used. + remote_path = self.path_mapper.remote_input_path_rewrite( wrapper_path ) + results.append( self._dataset_path( local_input_path, remote_path ) ) + return results + + def _dataset_path( self, local_dataset_path, remote_path ): + remote_extra_files_path = None + if remote_path: + remote_extra_files_path = "%s_files" % remote_path[ 0:-len( ".dat" ) ] + return local_dataset_path.with_path_for_job( remote_path, remote_extra_files_path ) + + def working_directory( self ): + return self._working_directory + + def config_directory( self ): + return self._config_directory + + def new_file_path( self ): + return self.working_directory() # Problems with doing this? + + def sep( self ): + return self._sep + + def version_path( self ): + return self._version_path + + def rewriter( self, parameter_value ): + unstructured_path_rewrites = self.unstructured_path_rewrites + if parameter_value in unstructured_path_rewrites: + # Path previously mapped, use previous mapping. + return unstructured_path_rewrites[ parameter_value ] + if parameter_value in unstructured_path_rewrites.itervalues(): + # Path is a rewritten remote path (this might never occur, + # consider dropping check...) + return parameter_value + + rewrite, new_unstructured_path_rewrites = self.path_mapper.check_for_arbitrary_rewrite( parameter_value ) + if rewrite: + unstructured_path_rewrites.update(new_unstructured_path_rewrites) + return rewrite + else: + # Did need to rewrite, use original path or value. + return parameter_value + + def unstructured_path_rewriter( self ): + return self.rewriter diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/runners/util/__init__.py --- a/lib/galaxy/jobs/runners/util/__init__.py +++ b/lib/galaxy/jobs/runners/util/__init__.py @@ -1,7 +1,7 @@ """ This module and its submodules contains utilities for running external processes and interfacing with job managers. This module should contain -functionality shared between Galaxy and the LWR. +functionality shared between Galaxy and the Pulsar. """ from galaxy.util.bunch import Bunch diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/runners/util/cli/factory.py --- a/lib/galaxy/jobs/runners/util/cli/factory.py +++ b/lib/galaxy/jobs/runners/util/cli/factory.py @@ -5,7 +5,7 @@ ) code_dir = 'lib' except ImportError: - from lwr.managers.util.cli import ( + from pulsar.managers.util.cli import ( CliInterface, split_params ) diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/runners/util/cli/job/slurm.py --- a/lib/galaxy/jobs/runners/util/cli/job/slurm.py +++ b/lib/galaxy/jobs/runners/util/cli/job/slurm.py @@ -5,7 +5,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/jobs/runners/util/cli/job/torque.py --- a/lib/galaxy/jobs/runners/util/cli/job/torque.py +++ b/lib/galaxy/jobs/runners/util/cli/job/torque.py @@ -7,7 +7,7 @@ from galaxy.model import Job job_states = Job.states except ImportError: - # Not in Galaxy, map Galaxy job states to LWR ones. + # Not in Galaxy, map Galaxy job states to Pulsar ones. from galaxy.util import enum job_states = enum(RUNNING='running', OK='complete', QUEUED='queued') diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/objectstore/__init__.py --- a/lib/galaxy/objectstore/__init__.py +++ b/lib/galaxy/objectstore/__init__.py @@ -623,9 +623,9 @@ elif store == 'irods': from .rods import IRODSObjectStore return IRODSObjectStore(config=config, config_xml=config_xml) - elif store == 'lwr': - from .lwr import LwrObjectStore - return LwrObjectStore(config=config, config_xml=config_xml) + elif store == 'pulsar': + from .pulsar import PulsarObjectStore + return PulsarObjectStore(config=config, config_xml=config_xml) else: log.error("Unrecognized object store definition: {0}".format(store)) diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/objectstore/lwr.py --- a/lib/galaxy/objectstore/lwr.py +++ /dev/null @@ -1,76 +0,0 @@ -from __future__ import absolute_import # Need to import lwr_client absolutely. -from ..objectstore import ObjectStore -try: - from galaxy.jobs.runners.lwr_client.manager import ObjectStoreClientManager -except ImportError: - from lwr.lwr_client.manager import ObjectStoreClientManager - - -class LwrObjectStore(ObjectStore): - """ - Object store implementation that delegates to a remote LWR server. - - This may be more aspirational than practical for now, it would be good to - Galaxy to a point that a handler thread could be setup that doesn't attempt - to access the disk files returned by a (this) object store - just passing - them along to the LWR unmodified. That modification - along with this - implementation and LWR job destinations would then allow Galaxy to fully - manage jobs on remote servers with completely different mount points. - - This implementation should be considered beta and may be dropped from - Galaxy at some future point or significantly modified. - """ - - def __init__(self, config, config_xml): - self.lwr_client = self.__build_lwr_client(config_xml) - - def exists(self, obj, **kwds): - return self.lwr_client.exists(**self.__build_kwds(obj, **kwds)) - - def file_ready(self, obj, **kwds): - return self.lwr_client.file_ready(**self.__build_kwds(obj, **kwds)) - - def create(self, obj, **kwds): - return self.lwr_client.create(**self.__build_kwds(obj, **kwds)) - - def empty(self, obj, **kwds): - return self.lwr_client.empty(**self.__build_kwds(obj, **kwds)) - - def size(self, obj, **kwds): - return self.lwr_client.size(**self.__build_kwds(obj, **kwds)) - - def delete(self, obj, **kwds): - return self.lwr_client.delete(**self.__build_kwds(obj, **kwds)) - - # TODO: Optimize get_data. - def get_data(self, obj, **kwds): - return self.lwr_client.get_data(**self.__build_kwds(obj, **kwds)) - - def get_filename(self, obj, **kwds): - return self.lwr_client.get_filename(**self.__build_kwds(obj, **kwds)) - - def update_from_file(self, obj, **kwds): - return self.lwr_client.update_from_file(**self.__build_kwds(obj, **kwds)) - - def get_store_usage_percent(self): - return self.lwr_client.get_store_usage_percent() - - def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): - return None - - def __build_kwds(self, obj, **kwds): - kwds['object_id'] = obj.id - return kwds - pass - - def __build_lwr_client(self, config_xml): - url = config_xml.get("url") - private_token = config_xml.get("private_token", None) - transport = config_xml.get("transport", None) - manager_options = dict(transport=transport) - client_options = dict(url=url, private_token=private_token) - lwr_client = ObjectStoreClientManager(**manager_options).get_client(client_options) - return lwr_client - - def shutdown(self): - pass diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/objectstore/pulsar.py --- /dev/null +++ b/lib/galaxy/objectstore/pulsar.py @@ -0,0 +1,73 @@ +from __future__ import absolute_import # Need to import pulsar_client absolutely. +from ..objectstore import ObjectStore +from pulsar.client.manager import ObjectStoreClientManager + + +class PulsarObjectStore(ObjectStore): + """ + Object store implementation that delegates to a remote Pulsar server. + + This may be more aspirational than practical for now, it would be good to + Galaxy to a point that a handler thread could be setup that doesn't attempt + to access the disk files returned by a (this) object store - just passing + them along to the Pulsar unmodified. That modification - along with this + implementation and Pulsar job destinations would then allow Galaxy to fully + manage jobs on remote servers with completely different mount points. + + This implementation should be considered beta and may be dropped from + Galaxy at some future point or significantly modified. + """ + + def __init__(self, config, config_xml): + self.pulsar_client = self.__build_pulsar_client(config_xml) + + def exists(self, obj, **kwds): + return self.pulsar_client.exists(**self.__build_kwds(obj, **kwds)) + + def file_ready(self, obj, **kwds): + return self.pulsar_client.file_ready(**self.__build_kwds(obj, **kwds)) + + def create(self, obj, **kwds): + return self.pulsar_client.create(**self.__build_kwds(obj, **kwds)) + + def empty(self, obj, **kwds): + return self.pulsar_client.empty(**self.__build_kwds(obj, **kwds)) + + def size(self, obj, **kwds): + return self.pulsar_client.size(**self.__build_kwds(obj, **kwds)) + + def delete(self, obj, **kwds): + return self.pulsar_client.delete(**self.__build_kwds(obj, **kwds)) + + # TODO: Optimize get_data. + def get_data(self, obj, **kwds): + return self.pulsar_client.get_data(**self.__build_kwds(obj, **kwds)) + + def get_filename(self, obj, **kwds): + return self.pulsar_client.get_filename(**self.__build_kwds(obj, **kwds)) + + def update_from_file(self, obj, **kwds): + return self.pulsar_client.update_from_file(**self.__build_kwds(obj, **kwds)) + + def get_store_usage_percent(self): + return self.pulsar_client.get_store_usage_percent() + + def get_object_url(self, obj, extra_dir=None, extra_dir_at_root=False, alt_name=None): + return None + + def __build_kwds(self, obj, **kwds): + kwds['object_id'] = obj.id + return kwds + pass + + def __build_pulsar_client(self, config_xml): + url = config_xml.get("url") + private_token = config_xml.get("private_token", None) + transport = config_xml.get("transport", None) + manager_options = dict(transport=transport) + client_options = dict(url=url, private_token=private_token) + pulsar_client = ObjectStoreClientManager(**manager_options).get_client(client_options) + return pulsar_client + + def shutdown(self): + pass diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/tools/__init__.py --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1264,7 +1264,7 @@ executable = self.version_string_cmd.split()[0] abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) command_line = self.version_string_cmd.replace(executable, abs_executable, 1) - self.version_string_cmd = self.interpreter + " " + command_line + self.version_string_cmd = version_cmd_interpreter + " " + command_line # Parallelism for tasks, read from tool config. parallelism = root.find("parallelism") if parallelism is not None and parallelism.get("method"): diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/tools/deps/dependencies.py --- a/lib/galaxy/tools/deps/dependencies.py +++ b/lib/galaxy/tools/deps/dependencies.py @@ -8,7 +8,7 @@ related context required to resolve dependencies via the ToolShedPackageDependencyResolver. - This is meant to enable remote resolution of dependencies, by the LWR or + This is meant to enable remote resolution of dependencies, by the Pulsar or other potential remote execution mechanisms. """ @@ -39,7 +39,7 @@ @staticmethod def _toolshed_install_dependency_from_dict(as_dict): - # Rather than requiring full models in LWR, just use simple objects + # Rather than requiring full models in Pulsar, just use simple objects # containing only properties and associations used to resolve # dependencies for tool execution. repository_object = bunch.Bunch( diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/util/__init__.py --- a/lib/galaxy/util/__init__.py +++ b/lib/galaxy/util/__init__.py @@ -417,17 +417,16 @@ return slug_base -def in_directory( file, directory ): +def in_directory( file, directory, local_path_module=os.path ): """ Return true, if the common prefix of both is equal to directory e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b """ # Make both absolute. - directory = os.path.abspath( directory ) - file = os.path.abspath( file ) - - return os.path.commonprefix( [ file, directory ] ) == directory + directory = local_path_module.abspath(directory) + file = local_path_module.abspath(file) + return local_path_module.commonprefix([file, directory]) == directory def merge_sorted_iterables( operator, *iterables ): diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/webapps/galaxy/controllers/admin_toolshed.py --- a/lib/galaxy/webapps/galaxy/controllers/admin_toolshed.py +++ b/lib/galaxy/webapps/galaxy/controllers/admin_toolshed.py @@ -1359,9 +1359,8 @@ else: # Entering this else block occurs only if the tool_shed_repository does not include any valid tools. if install_repository_dependencies: - repository_dependencies = \ - repository_dependency_util.get_repository_dependencies_for_installed_tool_shed_repository( trans.app, - tool_shed_repository ) + repository_dependencies = rdm.get_repository_dependencies_for_installed_tool_shed_repository( trans.app, + tool_shed_repository ) else: repository_dependencies = None if metadata: @@ -1579,9 +1578,9 @@ raw_text = common_util.tool_shed_get( trans.app, tool_shed_url, url ) readme_files_dict = json.from_json_string( raw_text ) tool_dependencies = metadata.get( 'tool_dependencies', None ) - repository_dependencies = \ - repository_dependency_util.get_repository_dependencies_for_installed_tool_shed_repository( trans.app, - tool_shed_repository ) + rdm = RepositoryDependencyManager( trans.app ) + repository_dependencies = rdm.get_repository_dependencies_for_installed_tool_shed_repository( trans.app, + tool_shed_repository ) repo_info_dict = \ repository_maintenance_util.create_repo_info_dict( app=trans.app, repository_clone_url=repository_clone_url, diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/webapps/tool_shed/api/repositories.py --- a/lib/galaxy/webapps/tool_shed/api/repositories.py +++ b/lib/galaxy/webapps/tool_shed/api/repositories.py @@ -10,14 +10,14 @@ from galaxy.web.base.controller import BaseAPIController from galaxy.web.base.controller import HTTPBadRequest from galaxy.web.framework.helpers import time_ago +from tool_shed.capsule import capsule_manager import tool_shed.repository_types.util as rt_util -import tool_shed.util.shed_util_common as suc from tool_shed.util import basic_util from tool_shed.util import encoding_util from tool_shed.util import hg_util -from tool_shed.util import import_util from tool_shed.util import metadata_util from tool_shed.util import repository_maintenance_util +from tool_shed.util import shed_util_common as suc from tool_shed.util import tool_util log = logging.getLogger( __name__ ) @@ -248,10 +248,14 @@ except tarfile.ReadError, e: log.debug( 'Error opening capsule file %s: %s' % ( str( capsule_file_name ), str( e ) ) ) return {} + irm = capsule_manager.ImportRepositoryManager( trans.app, + trans.request.host, + trans.user, + trans.user_is_admin() ) capsule_dict[ 'tar_archive' ] = tar_archive capsule_dict[ 'capsule_file_name' ] = capsule_file_name - capsule_dict = import_util.extract_capsule_files( **capsule_dict ) - capsule_dict = import_util.validate_capsule( **capsule_dict ) + capsule_dict = irm.extract_capsule_files( **capsule_dict ) + capsule_dict = irm.validate_capsule( **capsule_dict ) status = capsule_dict.get( 'status', 'error' ) if status == 'error': log.debug( 'The capsule contents are invalid and cannot be imported:<br/>%s' % \ @@ -263,15 +267,12 @@ return {} file_path = encoding_util.tool_shed_decode( encoded_file_path ) export_info_file_path = os.path.join( file_path, 'export_info.xml' ) - export_info_dict = import_util.get_export_info_dict( export_info_file_path ) + export_info_dict = irm.get_export_info_dict( export_info_file_path ) manifest_file_path = os.path.join( file_path, 'manifest.xml' ) # The manifest.xml file has already been validated, so no error_message should be returned here. - repository_info_dicts, error_message = import_util.get_repository_info_from_manifest( manifest_file_path ) + repository_info_dicts, error_message = irm.get_repository_info_from_manifest( manifest_file_path ) # Determine the status for each exported repository archive contained within the capsule. - repository_status_info_dicts = import_util.get_repository_status_from_tool_shed( trans.app, - trans.user, - trans.user_is_admin(), - repository_info_dicts ) + repository_status_info_dicts = irm.get_repository_status_from_tool_shed( repository_info_dicts ) # Generate a list of repository name / import results message tuples for display after the capsule is imported. import_results_tups = [] # Only create repositories that do not yet exist and that the current user is authorized to create. The @@ -280,12 +281,9 @@ # Add the capsule_file_name and encoded_file_path to the repository_status_info_dict. repository_status_info_dict[ 'capsule_file_name' ] = capsule_file_name repository_status_info_dict[ 'encoded_file_path' ] = encoded_file_path - import_results_tups = import_util.create_repository_and_import_archive( trans.app, - trans.request.host, - trans.user, - repository_status_info_dict, - import_results_tups ) - import_util.check_status_and_reset_downloadable( trans.app, import_results_tups ) + import_results_tups = irm.create_repository_and_import_archive( repository_status_info_dict, + import_results_tups ) + irm.check_status_and_reset_downloadable( import_results_tups ) basic_util.remove_dir( file_path ) # NOTE: the order of installation is defined in import_results_tups, but order will be lost # when transferred to return_dict. diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/webapps/tool_shed/api/repository_revisions.py --- a/lib/galaxy/webapps/tool_shed/api/repository_revisions.py +++ b/lib/galaxy/webapps/tool_shed/api/repository_revisions.py @@ -5,7 +5,7 @@ from galaxy import util from galaxy.model.orm import and_, not_, select from galaxy.web.base.controller import BaseAPIController, HTTPBadRequest -from tool_shed.util import export_util +from tool_shed.capsule import capsule_manager from tool_shed.util import hg_util import tool_shed.util.shed_util_common as suc @@ -53,14 +53,14 @@ log.debug( error_message ) return None, error_message repository_id = trans.security.encode_id( repository.id ) - return export_util.export_repository( trans, - tool_shed_url, - repository_id, - str( repository.name ), - changeset_revision, - file_type, - export_repository_dependencies, - api=True ) + erm = capsule_manager.ExportRepositoryManager( app=trans.app, + user=trans.user, + tool_shed_url=tool_shed_url, + repository=repository, + changeset_revision=changeset_revision, + export_repository_dependencies=export_repository_dependencies, + using_api=True ) + return erm.export_repository() def __get_value_mapper( self, trans ): value_mapper = { 'id' : trans.security.encode_id, diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/webapps/tool_shed/controllers/repository.py --- a/lib/galaxy/webapps/tool_shed/controllers/repository.py +++ b/lib/galaxy/webapps/tool_shed/controllers/repository.py @@ -14,20 +14,19 @@ from galaxy.web.framework.helpers import grids from galaxy.util import json from galaxy.model.orm import and_ -import tool_shed.util.shed_util_common as suc +from tool_shed.capsule import capsule_manager from tool_shed.util import basic_util from tool_shed.util import common_util from tool_shed.util import container_util from tool_shed.util import encoding_util -from tool_shed.util import export_util from tool_shed.util import hg_util -from tool_shed.util import import_util from tool_shed.util import metadata_util from tool_shed.util import readme_util from tool_shed.util import repository_dependency_util from tool_shed.util import repository_maintenance_util from tool_shed.util import review_util from tool_shed.util import search_util +from tool_shed.util import shed_util_common as suc from tool_shed.util import tool_dependency_util from tool_shed.util import tool_util from tool_shed.util import workflow_util @@ -1149,7 +1148,7 @@ # server account's .hgrc file to include the following setting: # [web] # allow_archive = bz2, gz, zip - file_type_str = export_util.get_file_type_str( changeset_revision, file_type ) + file_type_str = basic_util.get_file_type_str( changeset_revision, file_type ) repository.times_downloaded += 1 trans.sa_session.add( repository ) trans.sa_session.flush() @@ -1172,14 +1171,14 @@ file_type = 'gz' export_repository_dependencies = CheckboxField.is_checked( export_repository_dependencies ) tool_shed_url = web.url_for( '/', qualified=True ) - repositories_archive, error_message = export_util.export_repository( trans, - tool_shed_url, - repository_id, - str( repository.name ), - changeset_revision, - file_type, - export_repository_dependencies, - api=False ) + erm = capsule_manager.ExportRepositoryManager( app=trans.app, + user=trans.user, + tool_shed_url=tool_shed_url, + repository=repository, + changeset_revision=changeset_revision, + export_repository_dependencies=export_repository_dependencies, + using_api=False ) + repositories_archive, error_message = erm.export_repository() repositories_archive_filename = os.path.basename( repositories_archive.name ) if error_message: message = error_message @@ -1968,15 +1967,16 @@ encoded_file_path = kwd.get( 'encoded_file_path', None ) file_path = encoding_util.tool_shed_decode( encoded_file_path ) export_info_file_path = os.path.join( file_path, 'export_info.xml' ) - export_info_dict = import_util.get_export_info_dict( export_info_file_path ) + irm = capsule_manager.ImportRepositoryManager( trans.app, + trans.request.host, + trans.user, + trans.user_is_admin() ) + export_info_dict = irm.get_export_info_dict( export_info_file_path ) manifest_file_path = os.path.join( file_path, 'manifest.xml' ) # The manifest.xml file has already been validated, so no error_message should be returned here. - repository_info_dicts, error_message = import_util.get_repository_info_from_manifest( manifest_file_path ) + repository_info_dicts, error_message = irm.get_repository_info_from_manifest( manifest_file_path ) # Determine the status for each exported repository archive contained within the capsule. - repository_status_info_dicts = import_util.get_repository_status_from_tool_shed( trans.app, - trans.user, - trans.user_is_admin(), - repository_info_dicts ) + repository_status_info_dicts = irm.get_repository_status_from_tool_shed( repository_info_dicts ) if 'import_capsule_button' in kwd: # Generate a list of repository name / import results message tuples for display after the capsule is imported. import_results_tups = [] @@ -1986,12 +1986,9 @@ # Add the capsule_file_name and encoded_file_path to the repository_status_info_dict. repository_status_info_dict[ 'capsule_file_name' ] = capsule_file_name repository_status_info_dict[ 'encoded_file_path' ] = encoded_file_path - import_results_tups = import_util.create_repository_and_import_archive( trans.app, - trans.request.host, - trans.user, - repository_status_info_dict, - import_results_tups ) - import_util.check_status_and_reset_downloadable( trans.app, import_results_tups ) + import_results_tups = irm.create_repository_and_import_archive( repository_status_info_dict, + import_results_tups ) + irm.check_status_and_reset_downloadable( import_results_tups ) basic_util.remove_dir( file_path ) return trans.fill_template( '/webapps/tool_shed/repository/import_capsule_results.mako', export_info_dict=export_info_dict, @@ -3080,13 +3077,17 @@ status = kwd.get( 'status', 'done' ) url = kwd.get( 'url', '' ) if 'upload_capsule_button' in kwd: - capsule_dict = import_util.upload_capsule( **kwd ) + irm = capsule_manager.ImportRepositoryManager( trans.app, + trans.request.host, + trans.user, + trans.user_is_admin() ) + capsule_dict = irm.upload_capsule( **kwd ) status = capsule_dict.get( 'status', 'error' ) if status == 'error': message = capsule_dict.get( 'error_message', '' ) else: - capsule_dict = import_util.extract_capsule_files( **capsule_dict ) - capsule_dict = import_util.validate_capsule( **capsule_dict ) + capsule_dict = irm.extract_capsule_files( **capsule_dict ) + capsule_dict = irm.validate_capsule( **capsule_dict ) status = capsule_dict.get( 'status', 'error' ) if status == 'ok': return trans.response.send_redirect( web.url_for( controller='repository', diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/webapps/tool_shed/controllers/upload.py --- a/lib/galaxy/webapps/tool_shed/controllers/upload.py +++ b/lib/galaxy/webapps/tool_shed/controllers/upload.py @@ -8,13 +8,14 @@ from galaxy import util from galaxy import web from galaxy.datatypes import checkers +from tool_shed.dependencies import dependency_manager import tool_shed.repository_types.util as rt_util -import tool_shed.util.shed_util_common as suc from tool_shed.util import basic_util from tool_shed.util import commit_util from tool_shed.util import hg_util from tool_shed.util import metadata_util from tool_shed.util import repository_dependency_util +from tool_shed.util import shed_util_common as suc from tool_shed.util import tool_dependency_util from tool_shed.util import tool_util from tool_shed.util import xml_util @@ -95,6 +96,8 @@ uploaded_file_filename = os.path.split( file_data.filename )[ -1 ] isempty = os.path.getsize( os.path.abspath( uploaded_file_name ) ) == 0 if uploaded_file or uploaded_directory: + rdah = dependency_manager.RepositoryDependencyAttributeHandler( trans.app, unpopulate=False ) + tdah = dependency_manager.ToolDependencyAttributeHandler( trans.app, unpopulate=False ) ok = True isgzip = False isbz2 = False @@ -124,6 +127,8 @@ if istar: ok, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed = \ self.upload_tar( trans, + rdah, + tdah, repository, tar, uploaded_file, @@ -134,6 +139,8 @@ elif uploaded_directory: ok, message, files_to_remove, content_alert_str, undesirable_dirs_removed, undesirable_files_removed = \ self.upload_directory( trans, + rdah, + tdah, repository, uploaded_directory, upload_point, @@ -164,12 +171,9 @@ full_path = os.path.abspath( os.path.join( repo_dir, uploaded_file_filename ) ) # Move some version of the uploaded file to the load_point within the repository hierarchy. if uploaded_file_filename in [ rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME ]: - # Inspect the contents of the file to see if changeset_revision values are missing and if so, - # set them appropriately. - altered, root_elem, error_message = \ - commit_util.handle_repository_dependencies_definition( trans.app, - uploaded_file_name, - unpopulate=False ) + # Inspect the contents of the file to see if toolshed or changeset_revision attributes + # are missing and if so, set them appropriately. + altered, root_elem, error_message = rdah.handle_tag_attributes( uploaded_file_name ) if error_message: ok = False message = error_message @@ -179,12 +183,10 @@ shutil.move( tmp_filename, full_path ) else: shutil.move( uploaded_file_name, full_path ) - elif uploaded_file_filename in [ rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME, - rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME ]: + elif uploaded_file_filename in [ rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME ]: # Inspect the contents of the file to see if changeset_revision values are # missing and if so, set them appropriately. - altered, root_elem, error_message = \ - commit_util.handle_tool_dependencies_definition( trans.app, uploaded_file_name ) + altered, root_elem, error_message = tdah.handle_tag_attributes( uploaded_file_name ) if error_message: ok = False message = error_message @@ -333,7 +335,7 @@ message=message, status=status ) - def upload_directory( self, trans, repository, uploaded_directory, upload_point, remove_repo_files_not_in_tar, + def upload_directory( self, trans, rdah, tdah, repository, uploaded_directory, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert ): repo_dir = repository.repo_path( trans.app ) repo = hg_util.get_repo_for_repository( trans.app, repository=None, repo_path=repo_dir, create=False ) @@ -364,20 +366,18 @@ if ok: uploaded_file_name = os.path.abspath( os.path.join( root, uploaded_file ) ) if os.path.split( uploaded_file_name )[ -1 ] == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME: - # Inspect the contents of the file to see if changeset_revision values are missing and - # if so, set them appropriately. - altered, root_elem, error_message = \ - commit_util.handle_repository_dependencies_definition( trans.app, - uploaded_file_name, - unpopulate=False ) + # Inspect the contents of the file to see if toolshed or changeset_revision + # attributes are missing and if so, set them appropriately. + altered, root_elem, error_message = rdah.handle_tag_attributes( uploaded_file_name ) if error_message: return False, error_message, [], '', [], [] elif altered: tmp_filename = xml_util.create_and_write_tmp_file( root_elem ) shutil.move( tmp_filename, uploaded_file_name ) elif os.path.split( uploaded_file_name )[ -1 ] == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME: - # Inspect the contents of the file to see if changeset_revision values are missing and if so, set them appropriately. - altered, root_elem, error_message = commit_util.handle_tool_dependencies_definition( trans.app, uploaded_file_name ) + # Inspect the contents of the file to see if toolshed or changeset_revision + # attributes are missing and if so, set them appropriately. + altered, root_elem, error_message = tdah.handle_tag_attributes( uploaded_file_name ) if error_message: return False, error_message, [], '', [], [] if altered: @@ -406,7 +406,8 @@ undesirable_dirs_removed, undesirable_files_removed ) - def upload_tar( self, trans, repository, tar, uploaded_file, upload_point, remove_repo_files_not_in_tar, commit_message, new_repo_alert ): + def upload_tar( self, trans, rdah, tdah, repository, tar, uploaded_file, upload_point, remove_repo_files_not_in_tar, + commit_message, new_repo_alert ): # Upload a tar archive of files. repo_dir = repository.repo_path( trans.app ) repo = hg_util.get_repo_for_repository( trans.app, repository=None, repo_path=repo_dir, create=False ) @@ -442,18 +443,18 @@ for filename in filenames_in_archive: uploaded_file_name = os.path.join( full_path, filename ) if os.path.split( uploaded_file_name )[ -1 ] == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME: - # Inspect the contents of the file to see if changeset_revision values are missing and if so, set them appropriately. - altered, root_elem, error_message = commit_util.handle_repository_dependencies_definition( trans.app, - uploaded_file_name, - unpopulate=False ) + # Inspect the contents of the file to see if toolshed or changeset_revision attributes + # are missing and if so, set them appropriately. + altered, root_elem, error_message = rdah.handle_tag_attributes( uploaded_file_name ) if error_message: return False, error_message, [], '', [], [] elif altered: tmp_filename = xml_util.create_and_write_tmp_file( root_elem ) shutil.move( tmp_filename, uploaded_file_name ) elif os.path.split( uploaded_file_name )[ -1 ] == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME: - # Inspect the contents of the file to see if changeset_revision values are missing and if so, set them appropriately. - altered, root_elem, error_message = commit_util.handle_tool_dependencies_definition( trans.app, uploaded_file_name ) + # Inspect the contents of the file to see if toolshed or changeset_revision + # attributes are missing and if so, set them appropriately. + altered, root_elem, error_message = tdah.handle_tag_attributes( uploaded_file_name ) if error_message: return False, error_message, [], '', [], [] if altered: diff -r 90a490a788c1718cd2abe9f5b65bdf8c6a081d82 -r 7b4946192456c1619829b6836a95716d8656817d lib/galaxy/webapps/tool_shed/framework/middleware/hg.py --- a/lib/galaxy/webapps/tool_shed/framework/middleware/hg.py +++ b/lib/galaxy/webapps/tool_shed/framework/middleware/hg.py @@ -11,7 +11,6 @@ from galaxy.util import asbool from galaxy.util.hash_util import new_secure_hash -from tool_shed.util import commit_util from tool_shed.util import hg_util import tool_shed.repository_types.util as rt_util @@ -54,13 +53,15 @@ connection = engine.connect() path_info = environ[ 'PATH_INFO' ].lstrip( '/' ) user_id, repository_name = self.__get_user_id_repository_name_from_path_info( connection, path_info ) - sql_cmd = "SELECT times_downloaded FROM repository WHERE user_id = %d AND name = '%s'" % ( user_id, repository_name.lower() ) + sql_cmd = "SELECT times_downloaded FROM repository WHERE user_id = %d AND name = '%s'" % \ + ( user_id, repository_name.lower() ) result_set = connection.execute( sql_cmd ) for row in result_set: # Should only be 1 row... times_downloaded = row[ 'times_downloaded' ] times_downloaded += 1 - sql_cmd = "UPDATE repository SET times_downloaded = %d WHERE user_id = %d AND name = '%s'" % ( times_downloaded, user_id, repository_name.lower() ) + sql_cmd = "UPDATE repository SET times_downloaded = %d WHERE user_id = %d AND name = '%s'" % \ + ( times_downloaded, user_id, repository_name.lower() ) connection.execute( sql_cmd ) connection.close() elif cmd in [ 'unbundle', 'pushkey' ]: @@ -132,7 +133,7 @@ if filename and isinstance( filename, str ): if filename == rt_util.REPOSITORY_DEPENDENCY_DEFINITION_FILENAME: # Make sure the any complex repository dependency definitions contain valid <repository> tags. - is_valid, error_msg = commit_util.repository_tags_are_valid( filename, change_list ) + is_valid, error_msg = self.repository_tags_are_valid( filename, change_list ) if not is_valid: log.debug( error_msg ) return self.__display_exception_remotely( start_response, error_msg ) @@ -151,7 +152,7 @@ if filename and isinstance( filename, str ): if filename == rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME: # Make sure the any complex repository dependency definitions contain valid <repository> tags. - is_valid, error_msg = commit_util.repository_tags_are_valid( filename, change_list ) + is_valid, error_msg = self.repository_tags_are_valid( filename, change_list ) if not is_valid: log.debug( error_msg ) return self.__display_exception_remotely( start_response, error_msg ) @@ -174,7 +175,7 @@ rt_util.TOOL_DEPENDENCY_DEFINITION_FILENAME ]: # We check both files since tool dependency definitions files can contain complex # repository dependency definitions. - is_valid, error_msg = commit_util.repository_tags_are_valid( filename, change_list ) + is_valid, error_msg = self.repository_tags_are_valid( filename, change_list ) if not is_valid: log.debug( error_msg ) return self.__display_exception_remotely( start_response, error_msg ) @@ -186,6 +187,54 @@ return result.wsgi_application( environ, start_response ) return self.app( environ, start_response ) + def __authenticate( self, username, password ): + db_password = None + # Instantiate a database connection + engine = sqlalchemy.create_engine( self.db_url ) + connection = engine.connect() + result_set = connection.execute( "select email, password from galaxy_user where username = '%s'" % username.lower() ) + for row in result_set: + # Should only be 1 row... + db_email = row[ 'email' ] + db_password = row[ 'password' ] + connection.close() + if db_password: + # Check if password matches db_password when hashed. + return new_secure_hash( text_type=password ) == db_password + return False + + def __authenticate_remote_user( self, environ, username, password ): + """ + Look after a remote user and "authenticate" - upstream server should already have achieved + this for us, but we check that the user exists at least. Hg allow_push = must include username + - some versions of mercurial blow up with 500 errors. + """ + db_username = None + ru_email = environ[ 'HTTP_REMOTE_USER' ].lower() + ## Instantiate a database connection... + engine = sqlalchemy.create_engine( self.db_url ) + connection = engine.connect() + result_set = connection.execute( "select email, username, password from galaxy_user where email = '%s'" % ru_email ) + for row in result_set: + # Should only be 1 row... + db_email = row[ 'email' ] + db_password = row[ 'password' ] + db_username = row[ 'username' ] + connection.close() + if db_username: + # We could check the password here except that the function galaxy.web.framework.get_or_create_remote_user() + # does some random generation of a password - so that no-one knows the password and only the hash is stored... + return db_username == username + return False + + def __basic_authentication( self, environ, username, password ): + """The environ parameter is needed in basic authentication. We also check it if use_remote_user is true.""" + if asbool( self.config.get( 'use_remote_user', False ) ): + assert "HTTP_REMOTE_USER" in environ, "use_remote_user is set but no HTTP_REMOTE_USER variable" + return self.__authenticate_remote_user( environ, username, password ) + else: + return self.__authenticate( username, password ) + def __display_exception_remotely( self, start_response, msg ): # Display the exception to the remote user's command line. status = "500 %s" % msg @@ -213,49 +262,37 @@ user_id = row[ 'id' ] return user_id, repository_name - def __basic_authentication( self, environ, username, password ): - """The environ parameter is needed in basic authentication. We also check it if use_remote_user is true.""" - if asbool( self.config.get( 'use_remote_user', False ) ): - assert "HTTP_REMOTE_USER" in environ, "use_remote_user is set but no HTTP_REMOTE_USER variable" - return self.__authenticate_remote_user( environ, username, password ) - else: - return self.__authenticate( username, password ) - - def __authenticate( self, username, password ): - db_password = None - # Instantiate a database connection - engine = sqlalchemy.create_engine( self.db_url ) - connection = engine.connect() - result_set = connection.execute( "select email, password from galaxy_user where username = '%s'" % username.lower() ) - for row in result_set: - # Should only be 1 row... - db_email = row[ 'email' ] - db_password = row[ 'password' ] - connection.close() - if db_password: - # Check if password matches db_password when hashed. - return new_secure_hash( text_type=password ) == db_password - return False - - def __authenticate_remote_user( self, environ, username, password ): + def repository_tag_is_valid( self, filename, line ): """ - Look after a remote user and "authenticate" - upstream server should already have achieved this for us, but we check that the - user exists at least. Hg allow_push = must include username - some versions of mercurial blow up with 500 errors. + Checks changes made to <repository> tags in a dependency definition file being pushed to the + Tool Shed from the command line to ensure that all required attributes exist. """ - db_username = None - ru_email = environ[ 'HTTP_REMOTE_USER' ].lower() - ## Instantiate a database connection... - engine = sqlalchemy.create_engine( self.db_url ) - connection = engine.connect() - result_set = connection.execute( "select email, username, password from galaxy_user where email = '%s'" % ru_email ) - for row in result_set: - # Should only be 1 row... - db_email = row[ 'email' ] - db_password = row[ 'password' ] - db_username = row[ 'username' ] - connection.close() - if db_username: - # We could check the password here except that the function galaxy.web.framework.get_or_create_remote_user() does some random generation of - # a password - so that no-one knows the password and only the hash is stored... - return db_username == username - return False + required_attributes = [ 'toolshed', 'name', 'owner', 'changeset_revision' ] + defined_attributes = line.split() + for required_attribute in required_attributes: + defined = False + for defined_attribute in defined_attributes: + if defined_attribute.startswith( required_attribute ): + defined = True + break + if not defined: + error_msg = 'The %s file contains a <repository> tag that is missing the required attribute %s. ' % \ + ( filename, required_attribute ) + error_msg += 'Automatically populating dependency definition attributes occurs only when using ' + error_msg += 'the Tool Shed upload utility. ' + return False, error_msg + return True, '' + + def repository_tags_are_valid( self, filename, change_list ): + """ + Make sure the any complex repository dependency definitions contain valid <repository> tags when pushing + changes to the tool shed on the command line. + """ + tag = '<repository' + for change_dict in change_list: + lines = get_change_lines_in_file_for_tag( tag, change_dict ) + for line in lines: + is_valid, error_msg = repository_tag_is_valid( filename, line ) + if not is_valid: + return False, error_msg + return True, '' This diff is so big that we needed to truncate the remainder. https://bitbucket.org/galaxy/galaxy-central/commits/4a8a3951b020/ Changeset: 4a8a3951b020 User: dannon Date: 2014-06-25 00:34:32 Summary: Make display chunk size configurable in universe. Defaults to 64k. Affected #: 3 files diff -r 7b4946192456c1619829b6836a95716d8656817d -r 4a8a3951b020ae7de3fc74668ab3399e1a4ba3ef lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -376,6 +376,8 @@ self.fluent_port = int( kwargs.get( 'fluent_port', 24224 ) ) # visualization plugin framework self.visualization_plugins_directory = kwargs.get( 'visualization_plugins_directory', None ) + # Default chunk size for chunkable datatypes -- 64k + self.display_chunk_size = int( kwargs.get( 'display_chunk_size', 65536) ) @property def sentry_dsn_public( self ): diff -r 7b4946192456c1619829b6836a95716d8656817d -r 4a8a3951b020ae7de3fc74668ab3399e1a4ba3ef lib/galaxy/datatypes/tabular.py --- a/lib/galaxy/datatypes/tabular.py +++ b/lib/galaxy/datatypes/tabular.py @@ -25,7 +25,6 @@ # All tabular data is chunkable. CHUNKABLE = True - CHUNK_SIZE = 10000 """Add metadata elements""" MetadataElement( name="comment_lines", default=0, desc="Number of comment lines", readonly=False, optional=True, no_value=0 ) @@ -262,13 +261,13 @@ def get_chunk(self, trans, dataset, chunk): ck_index = int(chunk) f = open(dataset.file_name) - f.seek(ck_index * self.CHUNK_SIZE) + f.seek(ck_index * trans.app.config.display_chunk_size) # If we aren't at the start of the file, seek to next newline. Do this better eventually. if f.tell() != 0: cursor = f.read(1) while cursor and cursor != '\n': cursor = f.read(1) - ck_data = f.read(self.CHUNK_SIZE) + ck_data = f.read(trans.app.config.display_chunk_size) cursor = f.read(1) while cursor and ck_data[-1] != '\n': ck_data += cursor diff -r 7b4946192456c1619829b6836a95716d8656817d -r 4a8a3951b020ae7de3fc74668ab3399e1a4ba3ef universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -462,6 +462,10 @@ #enable_cloud_launch = False #cloudlaunch_default_ami = ami-a7dbf6ce +# Incremental Display Options + +#display_chunk_size = 65536 + # -- Advanced proxy features # For help on configuring the Advanced proxy features, see: https://bitbucket.org/galaxy/galaxy-central/commits/dd315abced98/ Changeset: dd315abced98 User: dannon Date: 2014-06-25 00:34:45 Summary: Merge. Affected #: 2 files diff -r 4a8a3951b020ae7de3fc74668ab3399e1a4ba3ef -r dd315abced98d580b24d8718c65243c4185c0c4c static/scripts/mvc/history/history-panel.js --- a/static/scripts/mvc/history/history-panel.js +++ b/static/scripts/mvc/history/history-panel.js @@ -249,7 +249,7 @@ actions.push( { // TODO: Only show quick pair if two things selected. html: _l( 'Build List of Dataset Pairs (Experimental)' ), - func: panel._showPairedCollectionModal.bind( panel ) + func: _.bind( panel._showPairedCollectionModal, panel ) }); return new PopupMenu( $where.find( '.history-dataset-action-popup-btn' ), actions ); }, diff -r 4a8a3951b020ae7de3fc74668ab3399e1a4ba3ef -r dd315abced98d580b24d8718c65243c4185c0c4c test/casperjs/modules/api.js --- a/test/casperjs/modules/api.js +++ b/test/casperjs/modules/api.js @@ -555,7 +555,7 @@ }; /** amount of time allowed to upload a file (before erroring) */ -ToolsAPI.prototype.DEFAULT_UPLOAD_TIMEOUT = 12000; +ToolsAPI.prototype.DEFAULT_UPLOAD_TIMEOUT = 30 * 1000; /** add two casperjs steps - upload a file, wait for the job to complete, and run 'then' when they are */ ToolsAPI.prototype.thenUpload = function thenUpload( historyId, options, then ){ @@ -569,6 +569,9 @@ this.debug( 'uploadedId: ' + uploadedId ); }); + spaceghost.debug( '---------------------------------------------------------- timeout: ' + ( options.timeout || spaceghost.api.tools.DEFAULT_UPLOAD_TIMEOUT ) ); + spaceghost.debug( 'timeout: ' + options.timeout ); + spaceghost.debug( 'timeout: ' + spaceghost.api.tools.DEFAULT_UPLOAD_TIMEOUT ); spaceghost.then( function(){ this.waitFor( function testHdaState(){ https://bitbucket.org/galaxy/galaxy-central/commits/06b64928f4a4/ Changeset: 06b64928f4a4 User: dannon Date: 2014-06-25 00:43:13 Summary: Clean up config.py (pep8, imports, misc). Affected #: 1 file diff -r dd315abced98d580b24d8718c65243c4185c0c4c -r 06b64928f4a4da6384d0717fc9bb650aa5e307eb lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -15,13 +15,12 @@ from galaxy.web.formatting import expand_pretty_datetime_format from galaxy.util import string_as_bool from galaxy.util import listify -from galaxy.util import parse_xml from galaxy.util.dbkeys import GenomeBuilds from galaxy import eggs -import pkg_resources log = logging.getLogger( __name__ ) + def resolve_path( path, root ): """If 'path' is relative make absolute by prepending 'root'""" if not( os.path.isabs( path ) ): @@ -40,9 +39,9 @@ self.config_dict = kwargs self.root = kwargs.get( 'root_dir', '.' ) # Collect the umask and primary gid from the environment - self.umask = os.umask( 077 ) # get the current umask - os.umask( self.umask ) # can't get w/o set, so set it back - self.gid = os.getgid() # if running under newgrp(1) we'll need to fix the group of data created on the cluster + self.umask = os.umask( 077 ) # get the current umask + os.umask( self.umask ) # can't get w/o set, so set it back + self.gid = os.getgid() # if running under newgrp(1) we'll need to fix the group of data created on the cluster # Database related configuration self.database = resolve_path( kwargs.get( "database_file", "database/universe.sqlite" ), self.root ) @@ -75,7 +74,7 @@ self.enable_unique_workflow_defaults = string_as_bool( kwargs.get( 'enable_unique_workflow_defaults', False ) ) self.tool_path = resolve_path( kwargs.get( "tool_path", "tools" ), self.root ) self.tool_data_path = resolve_path( kwargs.get( "tool_data_path", "tool-data" ), os.getcwd() ) - self.len_file_path = resolve_path( kwargs.get( "len_file_path", os.path.join( self.tool_data_path, 'shared','ucsc','chrom') ), self.root ) + self.len_file_path = resolve_path( kwargs.get( "len_file_path", os.path.join( self.tool_data_path, 'shared', 'ucsc', 'chrom') ), self.root ) self.test_conf = resolve_path( kwargs.get( "test_conf", "" ), self.root ) # The value of migrated_tools_config is the file reserved for containing only those tools that have been eliminated from the distribution # and moved to the tool shed. @@ -169,7 +168,7 @@ self.admin_users = kwargs.get( "admin_users", "" ) self.admin_users_list = [u.strip() for u in self.admin_users.split(',') if u] self.reset_password_length = int( kwargs.get('reset_password_length', '15') ) - self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-announce-join@bx.psu.edu") + self.mailing_join_addr = kwargs.get('mailing_join_addr', 'galaxy-announce-join@bx.psu.edu') self.error_email_to = kwargs.get( 'error_email_to', None ) self.activation_email = kwargs.get( 'activation_email', None ) self.user_activation_on = string_as_bool( kwargs.get( 'user_activation_on', False ) ) @@ -271,9 +270,9 @@ self.object_store_cache_path = resolve_path( kwargs.get( "object_store_cache_path", "database/object_store_cache" ), self.root ) # Handle AWS-specific config options for backward compatibility if kwargs.get( 'aws_access_key', None) is not None: - self.os_access_key= kwargs.get( 'aws_access_key', None ) - self.os_secret_key= kwargs.get( 'aws_secret_key', None ) - self.os_bucket_name= kwargs.get( 's3_bucket', None ) + self.os_access_key = kwargs.get( 'aws_access_key', None ) + self.os_secret_key = kwargs.get( 'aws_secret_key', None ) + self.os_bucket_name = kwargs.get( 's3_bucket', None ) self.os_use_reduced_redundancy = kwargs.get( 'use_reduced_redundancy', False ) else: self.os_access_key = kwargs.get( 'os_access_key', None ) @@ -454,19 +453,13 @@ except Exception, e: raise ConfigurationError( "Unable to create missing directory: %s\n%s" % ( path, e ) ) # Create the directories that it makes sense to create - for path in self.file_path, \ - self.new_file_path, \ - self.job_working_directory, \ - self.cluster_files_directory, \ - self.template_cache, \ - self.ftp_upload_dir, \ - self.library_import_dir, \ - self.user_library_import_dir, \ - self.nginx_upload_store, \ - './static/genetrack/plots', \ - self.whoosh_index_dir, \ - self.object_store_cache_path, \ - os.path.join( self.tool_data_path, 'shared', 'jars' ): + for path in (self.file_path, self.new_file_path, + self.job_working_directory, self.cluster_files_directory, + self.template_cache, self.ftp_upload_dir, + self.library_import_dir, self.user_library_import_dir, + self.nginx_upload_store, './static/genetrack/plots', + self.whoosh_index_dir, self.object_store_cache_path, + os.path.join( self.tool_data_path, 'shared', 'jars' )): self._ensure_directory( path ) # Check that required files exist tool_configs = self.tool_configs @@ -482,7 +475,7 @@ if key in self.deprecated_options: log.warning( "Config option '%s' is deprecated and will be removed in a future release. Please consult the latest version of the sample configuration file." % key ) - def is_admin_user( self,user ): + def is_admin_user( self, user ): """ Determine if the provided user is listed in `admin_users`. @@ -497,12 +490,13 @@ """ return resolve_path( path, self.root ) + def get_database_engine_options( kwargs, model_prefix='' ): """ Allow options for the SQLAlchemy database engine to be passed by using the prefix "database_engine_option". """ - conversions = { + conversions = { 'convert_unicode': string_as_bool, 'pool_timeout': int, 'echo': string_as_bool, @@ -524,6 +518,7 @@ rval[ key ] = value return rval + def configure_logging( config ): """ Allow some basic logging configuration to be read from ini file. @@ -558,7 +553,7 @@ root.addHandler( handler ) # If sentry is configured, also log to it if config.sentry_dsn: - pkg_resources.require( "raven" ) + eggs.require( "raven" ) from raven.handlers.logging import SentryHandler sentry_handler = SentryHandler( config.sentry_dsn ) sentry_handler.setLevel( logging.WARN ) https://bitbucket.org/galaxy/galaxy-central/commits/13a9176d15e0/ Changeset: 13a9176d15e0 User: dannon Date: 2014-07-08 19:49:40 Summary: Merge from stable. Affected #: 1 file diff -r 5874f6bc02f92b0d09184e04e8fdc89bd2fda638 -r 13a9176d15e0fe13af359b5494ec2d8830d22e04 lib/galaxy/jobs/metrics/formatting.py --- a/lib/galaxy/jobs/metrics/formatting.py +++ b/lib/galaxy/jobs/metrics/formatting.py @@ -15,4 +15,4 @@ elif value < 3600: return "%s minutes" % ( value / 60 ) else: - return "%s days and %s minutes" % ( value / 3600, ( value % 3600 ) / 60 ) + return "%s hours and %s minutes" % ( value / 3600, ( value % 3600 ) / 60 ) https://bitbucket.org/galaxy/galaxy-central/commits/158302d14057/ Changeset: 158302d14057 User: dannon Date: 2014-07-08 19:50:05 Summary: Merge. Affected #: 4 files diff -r 13a9176d15e0fe13af359b5494ec2d8830d22e04 -r 158302d1405755306533ad44b2284acad31e421a lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -15,13 +15,12 @@ from galaxy.web.formatting import expand_pretty_datetime_format from galaxy.util import string_as_bool from galaxy.util import listify -from galaxy.util import parse_xml from galaxy.util.dbkeys import GenomeBuilds from galaxy import eggs -import pkg_resources log = logging.getLogger( __name__ ) + def resolve_path( path, root ): """If 'path' is relative make absolute by prepending 'root'""" if not( os.path.isabs( path ) ): @@ -40,9 +39,9 @@ self.config_dict = kwargs self.root = kwargs.get( 'root_dir', '.' ) # Collect the umask and primary gid from the environment - self.umask = os.umask( 077 ) # get the current umask - os.umask( self.umask ) # can't get w/o set, so set it back - self.gid = os.getgid() # if running under newgrp(1) we'll need to fix the group of data created on the cluster + self.umask = os.umask( 077 ) # get the current umask + os.umask( self.umask ) # can't get w/o set, so set it back + self.gid = os.getgid() # if running under newgrp(1) we'll need to fix the group of data created on the cluster # Database related configuration self.database = resolve_path( kwargs.get( "database_file", "database/universe.sqlite" ), self.root ) @@ -75,7 +74,7 @@ self.enable_unique_workflow_defaults = string_as_bool( kwargs.get( 'enable_unique_workflow_defaults', False ) ) self.tool_path = resolve_path( kwargs.get( "tool_path", "tools" ), self.root ) self.tool_data_path = resolve_path( kwargs.get( "tool_data_path", "tool-data" ), os.getcwd() ) - self.len_file_path = resolve_path( kwargs.get( "len_file_path", os.path.join( self.tool_data_path, 'shared','ucsc','chrom') ), self.root ) + self.len_file_path = resolve_path( kwargs.get( "len_file_path", os.path.join( self.tool_data_path, 'shared', 'ucsc', 'chrom') ), self.root ) self.test_conf = resolve_path( kwargs.get( "test_conf", "" ), self.root ) # The value of migrated_tools_config is the file reserved for containing only those tools that have been eliminated from the distribution # and moved to the tool shed. @@ -169,7 +168,7 @@ self.admin_users = kwargs.get( "admin_users", "" ) self.admin_users_list = [u.strip() for u in self.admin_users.split(',') if u] self.reset_password_length = int( kwargs.get('reset_password_length', '15') ) - self.mailing_join_addr = kwargs.get('mailing_join_addr',"galaxy-announce-join@bx.psu.edu") + self.mailing_join_addr = kwargs.get('mailing_join_addr', 'galaxy-announce-join@bx.psu.edu') self.error_email_to = kwargs.get( 'error_email_to', None ) self.activation_email = kwargs.get( 'activation_email', None ) self.user_activation_on = string_as_bool( kwargs.get( 'user_activation_on', False ) ) @@ -271,9 +270,9 @@ self.object_store_cache_path = resolve_path( kwargs.get( "object_store_cache_path", "database/object_store_cache" ), self.root ) # Handle AWS-specific config options for backward compatibility if kwargs.get( 'aws_access_key', None) is not None: - self.os_access_key= kwargs.get( 'aws_access_key', None ) - self.os_secret_key= kwargs.get( 'aws_secret_key', None ) - self.os_bucket_name= kwargs.get( 's3_bucket', None ) + self.os_access_key = kwargs.get( 'aws_access_key', None ) + self.os_secret_key = kwargs.get( 'aws_secret_key', None ) + self.os_bucket_name = kwargs.get( 's3_bucket', None ) self.os_use_reduced_redundancy = kwargs.get( 'use_reduced_redundancy', False ) else: self.os_access_key = kwargs.get( 'os_access_key', None ) @@ -376,6 +375,8 @@ self.fluent_port = int( kwargs.get( 'fluent_port', 24224 ) ) # visualization plugin framework self.visualization_plugins_directory = kwargs.get( 'visualization_plugins_directory', None ) + # Default chunk size for chunkable datatypes -- 64k + self.display_chunk_size = int( kwargs.get( 'display_chunk_size', 65536) ) @property def sentry_dsn_public( self ): @@ -452,19 +453,13 @@ except Exception, e: raise ConfigurationError( "Unable to create missing directory: %s\n%s" % ( path, e ) ) # Create the directories that it makes sense to create - for path in self.file_path, \ - self.new_file_path, \ - self.job_working_directory, \ - self.cluster_files_directory, \ - self.template_cache, \ - self.ftp_upload_dir, \ - self.library_import_dir, \ - self.user_library_import_dir, \ - self.nginx_upload_store, \ - './static/genetrack/plots', \ - self.whoosh_index_dir, \ - self.object_store_cache_path, \ - os.path.join( self.tool_data_path, 'shared', 'jars' ): + for path in (self.file_path, self.new_file_path, + self.job_working_directory, self.cluster_files_directory, + self.template_cache, self.ftp_upload_dir, + self.library_import_dir, self.user_library_import_dir, + self.nginx_upload_store, './static/genetrack/plots', + self.whoosh_index_dir, self.object_store_cache_path, + os.path.join( self.tool_data_path, 'shared', 'jars' )): self._ensure_directory( path ) # Check that required files exist tool_configs = self.tool_configs @@ -480,7 +475,7 @@ if key in self.deprecated_options: log.warning( "Config option '%s' is deprecated and will be removed in a future release. Please consult the latest version of the sample configuration file." % key ) - def is_admin_user( self,user ): + def is_admin_user( self, user ): """ Determine if the provided user is listed in `admin_users`. @@ -495,12 +490,13 @@ """ return resolve_path( path, self.root ) + def get_database_engine_options( kwargs, model_prefix='' ): """ Allow options for the SQLAlchemy database engine to be passed by using the prefix "database_engine_option". """ - conversions = { + conversions = { 'convert_unicode': string_as_bool, 'pool_timeout': int, 'echo': string_as_bool, @@ -522,6 +518,7 @@ rval[ key ] = value return rval + def configure_logging( config ): """ Allow some basic logging configuration to be read from ini file. @@ -556,7 +553,7 @@ root.addHandler( handler ) # If sentry is configured, also log to it if config.sentry_dsn: - pkg_resources.require( "raven" ) + eggs.require( "raven" ) from raven.handlers.logging import SentryHandler sentry_handler = SentryHandler( config.sentry_dsn ) sentry_handler.setLevel( logging.WARN ) diff -r 13a9176d15e0fe13af359b5494ec2d8830d22e04 -r 158302d1405755306533ad44b2284acad31e421a lib/galaxy/datatypes/tabular.py --- a/lib/galaxy/datatypes/tabular.py +++ b/lib/galaxy/datatypes/tabular.py @@ -25,7 +25,6 @@ # All tabular data is chunkable. CHUNKABLE = True - CHUNK_SIZE = 10000 """Add metadata elements""" MetadataElement( name="comment_lines", default=0, desc="Number of comment lines", readonly=False, optional=True, no_value=0 ) @@ -262,13 +261,13 @@ def get_chunk(self, trans, dataset, chunk): ck_index = int(chunk) f = open(dataset.file_name) - f.seek(ck_index * self.CHUNK_SIZE) + f.seek(ck_index * trans.app.config.display_chunk_size) # If we aren't at the start of the file, seek to next newline. Do this better eventually. if f.tell() != 0: cursor = f.read(1) while cursor and cursor != '\n': cursor = f.read(1) - ck_data = f.read(self.CHUNK_SIZE) + ck_data = f.read(trans.app.config.display_chunk_size) cursor = f.read(1) while cursor and ck_data[-1] != '\n': ck_data += cursor diff -r 13a9176d15e0fe13af359b5494ec2d8830d22e04 -r 158302d1405755306533ad44b2284acad31e421a lib/galaxy/webapps/galaxy/controllers/user.py --- a/lib/galaxy/webapps/galaxy/controllers/user.py +++ b/lib/galaxy/webapps/galaxy/controllers/user.py @@ -1092,7 +1092,7 @@ def reset_password( self, trans, email=None, **kwd ): if trans.app.config.smtp_server is None: return trans.show_error_message( "Mail is not configured for this Galaxy instance. Please contact your local Galaxy administrator." ) - message = util.restore_text( kwd.get( 'message', '' ) ) + message = util.sanitize_text(util.restore_text( kwd.get( 'message', '' ) )) status = 'done' if kwd.get( 'reset_password_button', False ): reset_user = trans.sa_session.query( trans.app.model.User ).filter( trans.app.model.User.table.c.email == email ).first() diff -r 13a9176d15e0fe13af359b5494ec2d8830d22e04 -r 158302d1405755306533ad44b2284acad31e421a universe_wsgi.ini.sample --- a/universe_wsgi.ini.sample +++ b/universe_wsgi.ini.sample @@ -462,6 +462,10 @@ #enable_cloud_launch = False #cloudlaunch_default_ami = ami-a7dbf6ce +# Incremental Display Options + +#display_chunk_size = 65536 + # -- Advanced proxy features # For help on configuring the Advanced proxy features, see: Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.