7 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/d1864ea200ce/ Changeset: d1864ea200ce User: jmchilton Date: 2014-11-13 18:47:38+00:00 Summary: Dynamically load workflow module descriptions in mako editor. This again falls under the umbrella of making workflow modules more like true plugins. This should allow one to add new modules downstream without needing to modify templates/webapps/galaxy/workflow/editor.mako. One can imagine a new API endpoint for the new load_module_sections method will be useful as the editor UI shifts to a single page app and actual API clients need to determine what workflow modules are available. Such an API endpoint is not included but would be easy to add now. Affected #: 2 files diff -r c9b13e2f3f6bb7f35f6ed9a012159c42519fac02 -r d1864ea200ce16992c26a4a642381be71310c4a2 lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -773,6 +773,24 @@ module_factory = WorkflowModuleFactory( module_types ) +def load_module_sections( trans ): + """ Get abstract description of the workflow modules this Galaxy instance + is configured with. + """ + inputs_section = { + "name": "inputs", + "title": "Inputs", + "modules": [ + {"name": "data_input", "title": "Input Dataset", "description": "Input dataset"}, + {"name": "data_collection_input", "title": "Input Dataset Collection", "description": "Input dataset collection"}, + ], + } + module_sections = [ + inputs_section + ] + return module_sections + + class MissingToolException( Exception ): """ WorkflowModuleInjector will raise this if the tool corresponding to the module is missing. """ diff -r c9b13e2f3f6bb7f35f6ed9a012159c42519fac02 -r d1864ea200ce16992c26a4a642381be71310c4a2 templates/webapps/galaxy/workflow/editor.mako --- a/templates/webapps/galaxy/workflow/editor.mako +++ b/templates/webapps/galaxy/workflow/editor.mako @@ -332,19 +332,27 @@ <em><strong>Search did not match any tools.</strong></em></div><div> </div> + <div class="toolMenuGroupHeader">Workflow control</div> - <div class="toolSectionTitle" id="title___workflow__input__"> - <span>Inputs</span> - </div> - <div id="__workflow__input__" class="toolSectionBody"> + <% + from galaxy.workflow.modules import load_module_sections + %> + %for module_section in load_module_sections( trans ): + <% section_title = module_section["title"] %> + <% section_name = module_section["name"] %> + <div class="toolSectionTitle" id="title___workflow__${section_name}__"> + <span>${section_title}</span> + </div> + <div id="__workflow__${section_name}__" class="toolSectionBody"><div class="toolSectionBg"> + %for module in module_section["modules"]: <div class="toolTitle"> - <a href="#" onclick="add_node_for_module( 'data_input', 'Input Dataset' )">Input dataset</a> - </div> - <div class="toolTitle"> - <a href="#" onclick="add_node_for_module( 'data_collection_input', 'Input Dataset Collection' )">Input dataset collection</a> - </div> + <a href="#" onclick="add_node_for_module( '${module['name']}', '${module['title']}' )">${module['description']}</a> + </div><!-- end toolTitle --> + %endfor </div> + </div> + %endfor </div></div></div> https://bitbucket.org/galaxy/galaxy-central/commits/f6dcea371b03/ Changeset: f6dcea371b03 User: jmchilton Date: 2014-11-13 18:47:38+00:00 Summary: Implement workflow scheduling 'plugin' framework. Models: Workflow invocations have been augmented with significantly more state - inputs, parameters, runtime step state, are all being tracked now. Workflow invocations have a state that can be changed over time, the UUIDs generated for workflow invocations in Pull Request #465 have to be persisted so they can be reused when scheduling new jobs for theworkflow invocation. Workflow invocation steps now have an action parameter for persisting state provided by users during the execution of the workflow (see forthcoming PauseModule for further details). Some initial elements of these model changes were based on model changes in Kyle Ellrott's Galaxy farm work (https://bitbucket.org/kellrott/galaxy-farm/branch/workflow_migrate). I made heavy modifications to the model to enforce referential integrity on parameter to workflow step mappings and made some cosmetic changes various other details. Scheduling Plugins: Used the pattern setup with dependency resolvers and job metrics to build a dynamic plugin infrastructure for defining workflow schedulers. I hesistate calling anything with only one implementation a plugin infrastructure, but I am confident enough that the combination of persisted workflow request combined with scheduler tag could be used to build a galaxy-farm plugin that would wait for another Galaxy instance to become available and it would pull the workflow down and This work piggy backs on Galaxy job handlers to have workflow scheduled in the background (i.e. during submission each workflow being scheduled in the background is assigned a unique job handler and only that job handler thread will process the workflow). It should be pretty easy to allow the definition of a new kind of handler - that is a workflow handler instead of a job handler if that is of interest. I will probably move a bunch of stuff that is happening in workflow/scheduling_manager.py more into the scheduler itself so that it can be more configurable and closer to a true plugin. API: There are a number of new API points here for flushing out dealing with workflow invocations (called usages in existing parlance). - POST /api/workflows/{encoded_workflow_id}/usage Schedule a worklfow to be run in the background and return just the workflow invocation information. RESTfully speaking this should be plural but the matching GET endpoint is likewise usage and not usages - so I am favoring consistency over RESTful correctness here. Also, likewise creating a 'usage' feel like odd - I would like to make all of the usage endpoints aliases to a more RESTfully correct invocations endpoints. The existing workflow run API endpoints still work and still work the way they use usually - but the output now includes all of the workflow invocation to_dict stuff as well as the list of outputs it initially used. Once everything is scheduled this way - that list of outputs is going to have to disappear but hopefully people can start using the invocation stuff now to help the transition. - DELETE /api/workflows/{workflow_id}/usage/{usage_id} Cancel a scheduled workflow invocation. - GET /api/workflows/{workflow_id}/usage/{usage_id}/steps/{step_id} Get information about a workflow invocation step. - PUT /api/workflows/{workflow_id}/usage/{usage_id}/steps/{step_id} Update a workflow invocation step - for ones with modifiable state. Extension point added to workflow modules to support this but it is unused by all existing worklfow modules. A subsequent PauseModule will use this to either continue or cancel a workflow invocation at a particular step. Modules: Workflow modules can now define new methods for dealing with recovering state and interacting with user requests. Testing: One can issue a workflow request by running the following test. ./run_tests.sh -api test/api/test_workflows.py:WorkflowsApiTestCase.test_workflow_request Affected #: 23 files diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f .hgignore --- a/.hgignore +++ b/.hgignore @@ -67,6 +67,7 @@ shed_data_manager_conf.xml object_store_conf.xml job_metrics_conf.xml +workflow_schedulers_conf.xml config/* static/welcome.html.* static/welcome.html diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f config/workflow_schedulers_conf.xml.sample --- /dev/null +++ b/config/workflow_schedulers_conf.xml.sample @@ -0,0 +1,14 @@ +<?xml version="1.0"?> +<!-- If workflow_schedulers_conf.xml exists it defines the workflow scheduling + plugins to load and how to configure them. Currently only the core workflow + scheduling is available. +--> +<workflow_schedulers default="core"> + <!-- Each element in this file corresponds to a workflow scheduling plugin + in lib/galaxy/workflow/schedulers. --> + + <!-- Core plugin schedules whole workflow at outset inside Galaxy and doesn't + require any external dependencies. --> + <core id="core" /> + +</workflow_schedulers> diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/app.py --- a/lib/galaxy/app.py +++ b/lib/galaxy/app.py @@ -142,6 +142,11 @@ self.proxy_manager = ProxyManager( self.config ) # Initialize the external service types self.external_service_types = external_service_types.ExternalServiceTypesCollection( self.config.external_service_type_config_file, self.config.external_service_type_path, self ) + + from galaxy.workflow import scheduling_manager + # Must be initialized after job_config. + self.workflow_scheduling_manager = scheduling_manager.WorkflowSchedulingManager( self ) + self.model.engine.dispose() self.control_worker = GalaxyQueueWorker(self, galaxy.queues.control_queue_from_config(self.config), @@ -150,6 +155,7 @@ self.control_worker.start() def shutdown( self ): + self.workflow_scheduling_manager.shutdown() self.job_manager.shutdown() self.object_store.shutdown() if self.heartbeat: @@ -171,3 +177,6 @@ self.trace_logger = FluentTraceLogger( 'galaxy', self.config.fluent_host, self.config.fluent_port ) else: self.trace_logger = None + + def is_job_handler( self ): + return (self.config.track_jobs_in_database and self.job_config.is_handler(self.config.server_name)) or not self.config.track_jobs_in_database diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -139,6 +139,7 @@ self.collect_outputs_from = [ x.strip() for x in kwargs.get( 'collect_outputs_from', 'new_file_path,job_working_directory' ).lower().split(',') ] self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root ) self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root ) + self.workflow_schedulers_config_file = resolve_path( kwargs.get( 'workflow_schedulers_config_file', 'config/workflow_schedulers_conf.xml' ), self.root ) self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) ) self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) ) self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/jobs/manager.py --- a/lib/galaxy/jobs/manager.py +++ b/lib/galaxy/jobs/manager.py @@ -18,7 +18,7 @@ """ def __init__( self, app ): self.app = app - if (self.app.config.track_jobs_in_database and self.app.job_config.is_handler(self.app.config.server_name)) or not self.app.config.track_jobs_in_database: + if self.app.is_job_handler(): log.debug("Starting job handler") self.job_handler = handler.JobHandler( app ) self.job_queue = self.job_handler.job_queue diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/managers/workflows.py --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -1,5 +1,6 @@ from galaxy import model from galaxy import exceptions +from galaxy.workflow import modules class WorkflowsManager( object ): @@ -49,6 +50,46 @@ self.check_security( trans, workflow_invocation, check_ownership=True, check_accessible=False ) return workflow_invocation + def cancel_invocation( self, trans, decoded_invocation_id ): + workflow_invocation = self.get_invocation( trans, decoded_invocation_id ) + cancelled = workflow_invocation.cancel() + + if cancelled: + trans.sa_session.add( workflow_invocation ) + trans.sa_session.flush() + else: + # TODO: More specific exception? + raise exceptions.MessageException( "Cannot cancel an inactive workflow invocation." ) + + return workflow_invocation + + def get_invocation_step( self, trans, decoded_workflow_invocation_step_id ): + try: + workflow_invocation_step = trans.sa_session.query( + model.WorkflowInvocationStep + ).get( decoded_workflow_invocation_step_id ) + except Exception: + raise exceptions.ObjectNotFound() + self.check_security( trans, workflow_invocation_step.workflow_invocation, check_ownership=True, check_accessible=False ) + return workflow_invocation_step + + def update_invocation_step( self, trans, decoded_workflow_invocation_step_id, action ): + if action is None: + raise exceptions.RequestParameterMissingException( "Updating workflow invocation step requires an action parameter. " ) + + workflow_invocation_step = self.get_invocation_step( trans, decoded_workflow_invocation_step_id ) + workflow_invocation = workflow_invocation_step.workflow_invocation + if not workflow_invocation.active: + raise exceptions.RequestParameterInvalidException( "Attempting to modify the state of an completed workflow invocation." ) + + step = workflow_invocation_step.workflow_step + module = modules.module_factory.from_workflow_step( trans, step ) + performed_action = module.do_invocation_step_action( step, action ) + workflow_invocation_step.action = performed_action + trans.sa_session.add( workflow_invocation_step ) + trans.sa_session.flush() + return workflow_invocation_step + def build_invocations_query( self, trans, decoded_stored_workflow_id ): try: stored_workflow = trans.sa_session.query( diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -39,6 +39,7 @@ from galaxy.web.form_builder import (AddressField, CheckboxField, HistoryField, PasswordField, SelectField, TextArea, TextField, WorkflowField, WorkflowMappingField) +from galaxy.model.orm import and_, or_ from sqlalchemy.orm import object_session from sqlalchemy.orm import joinedload from sqlalchemy.sql.expression import func @@ -3096,8 +3097,74 @@ class WorkflowInvocation( object, Dictifiable ): - dict_collection_visible_keys = ( 'id', 'update_time', 'workflow_id' ) - dict_element_visible_keys = ( 'id', 'update_time', 'workflow_id' ) + dict_collection_visible_keys = ( 'id', 'update_time', 'workflow_id', 'history_id', 'uuid', 'state' ) + dict_element_visible_keys = ( 'id', 'update_time', 'workflow_id', 'history_id', 'uuid', 'state' ) + states = Bunch( + NEW='new', # Brand new workflow invocation... maybe this should be same as READY + READY='ready', # Workflow ready for another iteration of scheduling. + SCHEDULED='scheduled', # Workflow has been scheduled. + CANCELLED='cancelled', + FAILED='failed', + ) + + @property + def active( self ): + """ Indicates the workflow invocation is somehow active - and in + particular valid actions may be performed on its + ``WorkflowInvocationStep``s. + """ + states = WorkflowInvocation.states + return self.state in [ states.NEW, states.READY ] + + def cancel( self ): + if not self.active: + return False + else: + self.state = WorkflowInvocation.states.CANCELLED + return True + + def fail( self ): + self.state = WorkflowInvocation.states.FAILED + + def step_states_by_step_id( self ): + step_states = {} + for step_state in self.step_states: + step_id = step_state.workflow_step_id + step_states[ step_id ] = step_state + return step_states + + def step_invocations_by_step_id( self ): + step_invocations = {} + for invocation_step in self.steps: + step_id = invocation_step.workflow_step_id + if step_id not in step_invocations: + step_invocations[ step_id ] = [] + step_invocations[ step_id ].append( invocation_step ) + return step_invocations + + @staticmethod + def poll_active_workflow_ids( + sa_session, + scheduler=None, + handler=None + ): + and_conditions = [ + or_( + WorkflowInvocation.state == WorkflowInvocation.states.NEW, + WorkflowInvocation.state == WorkflowInvocation.states.READY + ), + ] + if scheduler is not None: + and_conditions.append( WorkflowInvocation.scheduler == scheduler ) + if handler is not None: + and_conditions.append( WorkflowInvocation.handler == handler ) + + query = sa_session.query( + WorkflowInvocation + ).filter( and_( *and_conditions ) ) + # Immediately just load all ids into memory so time slicing logic + # is relatively intutitive. + return map( lambda wi: wi.id, query.all() ) def to_dict( self, view='collection', value_mapper=None ): rval = super( WorkflowInvocation, self ).to_dict( view=view, value_mapper=value_mapper ) @@ -3123,14 +3190,63 @@ class WorkflowInvocationStep( object, Dictifiable ): - dict_collection_visible_keys = ( 'id', 'update_time', 'job_id', 'workflow_step_id' ) - dict_element_visible_keys = ( 'id', 'update_time', 'job_id', 'workflow_step_id' ) + dict_collection_visible_keys = ( 'id', 'update_time', 'job_id', 'workflow_step_id', 'action' ) + dict_element_visible_keys = ( 'id', 'update_time', 'job_id', 'workflow_step_id', 'action' ) def to_dict( self, view='collection', value_mapper=None ): rval = super( WorkflowInvocationStep, self ).to_dict( view=view, value_mapper=value_mapper ) rval['order_index'] = self.workflow_step.order_index return rval + +class WorkflowRequest( object, Dictifiable ): + dict_collection_visible_keys = [ 'id', 'name', 'type', 'state', 'history_id', 'workflow_id' ] + dict_element_visible_keys = [ 'id', 'name', 'type', 'state', 'history_id', 'workflow_id' ] + + def to_dict( self, view='collection', value_mapper=None ): + rval = super( WorkflowRequest, self ).to_dict( view=view, value_mapper=value_mapper ) + return rval + + +class WorkflowRequestInputParameter(object, Dictifiable): + """ Workflow-related parameters not tied to steps or inputs. + """ + dict_collection_visible_keys = ['id', 'name', 'value', 'type'] + types = Bunch( + REPLACEMENT_PARAMETERS='replacements', + META_PARAMETERS='meta', # + ) + + def __init__( self, name=None, value=None, type=None ): + self.name = name + self.value = value + self.type = type + + +class WorkflowRequestStepState(object, Dictifiable): + """ Workflow step value parameters. + """ + dict_collection_visible_keys = ['id', 'name', 'value', 'workflow_step_id'] + + def __init__( self, workflow_step=None, name=None, value=None ): + self.workflow_step = workflow_step + self.name = name + self.value = value + self.type = type + + +class WorkflowRequestToInputDatasetAssociation(object, Dictifiable): + """ Workflow step input dataset parameters. + """ + dict_collection_visible_keys = ['id', 'workflow_invocation_id', 'workflow_step_id', 'dataset_id', 'name' ] + + +class WorkflowRequestToInputDatasetCollectionAssociation(object, Dictifiable): + """ Workflow step input dataset collection parameters. + """ + dict_collection_visible_keys = ['id', 'workflow_invocation_id', 'workflow_step_id', 'dataset_collection_id', 'name' ] + + class MetadataFile( object ): def __init__( self, dataset=None, name=None ): @@ -4038,6 +4154,18 @@ self.value = None self.user_value = None + +class WorkRequestTagAssociation( ItemTagAssociation ): + def __init__( self, id=None, user=None, workflow_request_id=None, tag_id=None, user_tname=None, value=None ): + self.id = id + self.user = user + self.workflow_request_id = workflow_request_id + self.tag_id = tag_id + self.user_tname = user_tname + self.value = None + self.user_value = None + + # Item annotation classes. class HistoryAnnotationAssociation( object ): diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/model/mapping.py --- a/lib/galaxy/model/mapping.py +++ b/lib/galaxy/model/mapping.py @@ -710,6 +710,46 @@ ## Column( "input_connections", JSONType ) ) + +model.WorkflowRequestStepState.table = Table( + "workflow_request_step_states", metadata, + Column( "id", Integer, primary_key=True ), + Column( "workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE" )), + Column( "workflow_step_id", Integer, ForeignKey("workflow_step.id" )), + Column( "value", JSONType ), +) + + +model.WorkflowRequestInputParameter.table = Table( + "workflow_request_input_parameters", metadata, + Column( "id", Integer, primary_key=True ), + Column( "workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE" )), + Column( "name", Unicode(255) ), + Column( "value", TEXT ), + Column( "type", Unicode(255) ), +) + + +model.WorkflowRequestToInputDatasetAssociation.table = Table( + "workflow_request_to_input_dataset", metadata, + Column( "id", Integer, primary_key=True ), + Column( "name", String(255) ), + Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True ), + Column( "workflow_step_id", Integer, ForeignKey("workflow_step.id") ), + Column( "dataset_id", Integer, ForeignKey( "history_dataset_association.id" ), index=True ), +) + + +model.WorkflowRequestToInputDatasetCollectionAssociation.table = Table( + "workflow_request_to_input_collection_dataset", metadata, + Column( "id", Integer, primary_key=True ), + Column( "name", String(255) ), + Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True ), + Column( "workflow_step_id", Integer, ForeignKey("workflow_step.id") ), + Column( "dataset_collection_id", Integer, ForeignKey( "history_dataset_collection_association.id" ), index=True ), +) + + model.WorkflowStepConnection.table = Table( "workflow_step_connection", metadata, Column( "id", Integer, primary_key=True ), Column( "output_step_id", Integer, ForeignKey( "workflow_step.id" ), index=True ), @@ -728,8 +768,13 @@ Column( "id", Integer, primary_key=True ), Column( "create_time", DateTime, default=now ), Column( "update_time", DateTime, default=now, onupdate=now ), - Column( "workflow_id", Integer, ForeignKey( "workflow.id" ), index=True, nullable=False ) - ) + Column( "workflow_id", Integer, ForeignKey( "workflow.id" ), index=True, nullable=False ), + Column( "state", TrimmedString( 64 ), index=True ), + Column( "scheduler", TrimmedString( 255 ), index=True ), + Column( "handler", TrimmedString( 255 ), index=True ), + Column( 'uuid', UUIDType() ), + Column( "history_id", Integer, ForeignKey( "history.id" ), index=True ), +) model.WorkflowInvocationStep.table = Table( "workflow_invocation_step", metadata, Column( "id", Integer, primary_key=True ), @@ -737,8 +782,9 @@ Column( "update_time", DateTime, default=now, onupdate=now ), Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True, nullable=False ), Column( "workflow_step_id", Integer, ForeignKey( "workflow_step.id" ), index=True, nullable=False ), - Column( "job_id", Integer, ForeignKey( "job.id" ), index=True, nullable=True ) - ) + Column( "job_id", Integer, ForeignKey( "job.id" ), index=True, nullable=True ), + Column( "action", JSONType, nullable=True ), +) model.StoredWorkflowUserShareAssociation.table = Table( "stored_workflow_user_share_connection", metadata, Column( "id", Integer, primary_key=True ), @@ -1968,6 +2014,11 @@ mapper( model.WorkflowInvocation, model.WorkflowInvocation.table, properties=dict( + history=relation( model.History ), + input_parameters=relation( model.WorkflowRequestInputParameter ), + step_states=relation( model.WorkflowRequestStepState ), + input_datasets=relation( model.WorkflowRequestToInputDatasetAssociation ), + input_dataset_collections=relation( model.WorkflowRequestToInputDatasetCollectionAssociation ), steps=relation( model.WorkflowInvocationStep, backref='workflow_invocation', lazy=False ), workflow=relation( model.Workflow ) ) ) @@ -1976,6 +2027,33 @@ workflow_step = relation( model.WorkflowStep ), job = relation( model.Job, backref=backref( 'workflow_invocation_step', uselist=False ) ) ) ) +simple_mapping( + model.WorkflowRequestInputParameter, + workflow_invocation=relation( model.WorkflowInvocation ), +) + +simple_mapping( + model.WorkflowRequestStepState, + workflow_invocation=relation( model.WorkflowInvocation ), + workflow_step=relation( model.WorkflowStep ), +) + +simple_mapping( + model.WorkflowRequestToInputDatasetAssociation, + workflow_invocation=relation( model.WorkflowInvocation ), + workflow_step=relation( model.WorkflowStep ), + dataset=relation( model.HistoryDatasetAssociation ), +) + + +simple_mapping( + model.WorkflowRequestToInputDatasetCollectionAssociation, + workflow_invocation=relation( model.WorkflowInvocation ), + workflow_step=relation( model.WorkflowStep ), + dataset_collection=relation( model.HistoryDatasetCollectionAssociation ), +) + + mapper( model.MetadataFile, model.MetadataFile.table, properties=dict( history_dataset=relation( model.HistoryDatasetAssociation ), library_dataset=relation( model.LibraryDatasetDatasetAssociation ) ) ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/model/migrate/versions/0123_add_workflow_request_tables.py --- /dev/null +++ b/lib/galaxy/model/migrate/versions/0123_add_workflow_request_tables.py @@ -0,0 +1,144 @@ +""" +Migration script for workflow request tables. +""" +from sqlalchemy import * +from sqlalchemy.orm import * +from migrate import * +from migrate.changeset import * +from galaxy.model.custom_types import * + +import datetime +now = datetime.datetime.utcnow + +import logging +log = logging.getLogger( __name__ ) + +metadata = MetaData() + + +WorkflowRequestInputParameter_table = Table( + "workflow_request_input_parameters", metadata, + Column( "id", Integer, primary_key=True ), + Column( "workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE" )), + Column( "name", Unicode(255) ), + Column( "type", Unicode(255) ), + Column( "value", TEXT ), +) + + +WorkflowRequestStepState_table = Table( + "workflow_request_step_states", metadata, + Column( "id", Integer, primary_key=True ), + Column( "workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id", onupdate="CASCADE", ondelete="CASCADE" )), + Column( "workflow_step_id", Integer, ForeignKey("workflow_step.id" )), + Column( "value", JSONType ), +) + + +WorkflowRequestToInputDatasetAssociation_table = Table( + "workflow_request_to_input_dataset", metadata, + Column( "id", Integer, primary_key=True ), + Column( "name", String(255) ), + Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True ), + Column( "workflow_step_id", Integer, ForeignKey("workflow_step.id") ), + Column( "dataset_id", Integer, ForeignKey( "history_dataset_association.id" ), index=True ), +) + + +WorkflowRequestToInputDatasetCollectionAssociation_table = Table( + "workflow_request_to_input_collection_dataset", metadata, + Column( "id", Integer, primary_key=True ), + Column( "name", String(255) ), + Column( "workflow_invocation_id", Integer, ForeignKey( "workflow_invocation.id" ), index=True ), + Column( "workflow_step_id", Integer, ForeignKey("workflow_step.id") ), + Column( "dataset_collection_id", Integer, ForeignKey( "history_dataset_collection_association.id" ), index=True ), +) + + +TABLES = [ + WorkflowRequestInputParameter_table, + WorkflowRequestStepState_table, + WorkflowRequestToInputDatasetAssociation_table, + WorkflowRequestToInputDatasetCollectionAssociation_table, +] + + +def upgrade(migrate_engine): + metadata.bind = migrate_engine + print __doc__ + metadata.reflect() + + for table in TABLES: + __create(table) + + History_column = Column( "history_id", Integer, ForeignKey( "history.id" ), nullable=True ) + State_column = Column( "state", TrimmedString( 64 ) ) + + # TODO: Handle indexes correctly + SchedulerId_column = Column( "scheduler", TrimmedString(255) ) + HandlerId_column = Column( "handler", TrimmedString(255) ) + WorkflowUUID_column = Column( "uuid", UUIDType, nullable=True ) + __add_column( History_column, "workflow_invocation", metadata ) + __add_column( State_column, "workflow_invocation", metadata ) + __add_column( SchedulerId_column, "workflow_invocation", metadata, index_nane="id_workflow_invocation_scheduler" ) + __add_column( HandlerId_column, "workflow_invocation", metadata, index_name="id_workflow_invocation_handler" ) + __add_column( WorkflowUUID_column, "workflow_invocation", metadata ) + + # All previous invocations have been scheduled... + cmd = "UPDATE workflow_invocation SET state = 'scheduled'" + try: + migrate_engine.execute( cmd ) + except Exception, e: + log.debug( "failed to update past workflow invocation states: %s" % ( str( e ) ) ) + + WorkflowInvocationStepAction_column = Column( "action", JSONType, nullable=True ) + __add_column( WorkflowInvocationStepAction_column, "workflow_invocation_step", metadata ) + + +def downgrade(migrate_engine): + metadata.bind = migrate_engine + metadata.reflect() + + for table in TABLES: + __drop(table) + + __drop_column( "state", "workflow_invocation", metadata ) + __drop_column( "scheduler_id", "workflow_invocation", metadata ) + __drop_column( "uuid", "workflow_invocation", metadata ) + __drop_column( "history_id", "workflow_invocation", metadata ) + __drop_column( "handler_id", "workflow_invocation", metadata ) + __drop_column( "action", "workflow_invocation_step", metadata ) + + +def __add_column(column, table_name, metadata, **kwds): + try: + table = Table( table_name, metadata, autoload=True ) + column.create( table, **kwds ) + except Exception as e: + print str(e) + log.exception( "Adding column %s column failed." % column) + + +def __drop_column( column_name, table_name, metadata ): + try: + table = Table( table_name, metadata, autoload=True ) + getattr( table.c, column_name ).drop() + except Exception as e: + print str(e) + log.exception( "Dropping column %s failed." % column_name ) + + +def __create(table): + try: + table.create() + except Exception as e: + print str(e) + log.exception("Creating %s table failed: %s" % (table.name, str( e ) ) ) + + +def __drop(table): + try: + table.drop() + except Exception as e: + print str(e) + log.exception("Dropping %s table failed: %s" % (table.name, str( e ) ) ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -16,7 +16,7 @@ from galaxy.web.base.controller import UsesHistoryMixin from galaxy.web.base.controller import SharableMixin from galaxy.workflow.extract import extract_workflow -from galaxy.workflow.run import invoke +from galaxy.workflow.run import invoke, queue_invoke from galaxy.workflow.run_request import build_workflow_run_config log = logging.getLogger(__name__) @@ -224,7 +224,7 @@ # invoke may throw MessageExceptions on tool erors, failure # to match up inputs, etc... - outputs = invoke( + outputs, invocation = invoke( trans=trans, workflow=workflow, workflow_run_config=run_config, @@ -235,14 +235,19 @@ # Build legacy output - should probably include more information from # outputs. rval = {} - rval['history'] = trans.security.encode_id(history.id) + rval['history'] = trans.security.encode_id( history.id ) rval['outputs'] = [] for step in workflow.steps: if step.type == 'tool' or step.type is None: for v in outputs[ step.id ].itervalues(): rval[ 'outputs' ].append( trans.security.encode_id( v.id ) ) - return rval + # Newer version of this API just returns the invocation as a dict, to + # facilitate migration - produce the newer style response and blend in + # the older information. + invocation_response = self.__encode_invocation( trans, invocation ) + invocation_response.update( rval ) + return invocation_response @expose_api def workflow_dict( self, trans, workflow_id, **kwd ): @@ -370,6 +375,32 @@ return item @expose_api + def workflow_request( self, trans, workflow_id, payload, **kwd ): + """ + POST /api/workflows/{encoded_workflow_id}/usage + + Schedule the workflow specified by `workflow_id` to run. + """ + # /usage is awkward in this context but is consistent with the rest of + # this module. Would prefer to redo it all to use /invocation(s). + # Get workflow + accessibility check. + stored_workflow = self.__get_stored_accessible_workflow( trans, workflow_id ) + workflow = stored_workflow.latest_workflow + + run_config = build_workflow_run_config( trans, workflow, payload ) + workflow_scheduler_id = payload.get( "scheduler", None ) + # TODO: workflow scheduler hints + work_request_params = dict( scheduler=workflow_scheduler_id ) + + workflow_invocation = queue_invoke( + trans=trans, + workflow=workflow, + workflow_run_config=run_config, + request_params=work_request_params + ) + return self.encode_all_ids( trans, workflow_invocation.to_dict(), recursive=True ) + + @expose_api def workflow_usage(self, trans, workflow_id, **kwd): """ GET /api/workflows/{workflow_id}/usage @@ -407,6 +438,87 @@ return self.__encode_invocation( trans, workflow_invocation ) return None + @expose_api + def cancel_workflow_invocation(self, trans, workflow_id, usage_id, **kwd): + """ + DELETE /api/workflows/{workflow_id}/usage/{usage_id} + Cancel the specified workflow invocation. + + :param workflow_id: the workflow id (required) + :type workflow_id: str + + :param usage_id: the usage id (required) + :type usage_id: str + + :raises: exceptions.MessageException, exceptions.ObjectNotFound + """ + decoded_workflow_invocation_id = self.__decode_id( trans, usage_id ) + workflow_invocation = self.workflow_manager.cancel_invocation( trans, decoded_workflow_invocation_id ) + return self.__encode_invocation( trans, workflow_invocation ) + + @expose_api + def workflow_invocation_step(self, trans, workflow_id, usage_id, step_id, **kwd): + """ + GET /api/workflows/{workflow_id}/usage/{usage_id}/steps/{step_id} + + :param workflow_id: the workflow id (required) + :type workflow_id: str + + :param usage_id: the usage id (required) + :type usage_id: str + + :param step_id: encoded id of the WorkflowInvocationStep (required) + :type step_id: str + + :param payload: payload containing update action information + for running workflow. + + :raises: exceptions.MessageException, exceptions.ObjectNotFound + """ + decoded_invocation_step_id = self.__decode_id( trans, step_id ) + invocation_step = self.workflow_manager.get_invocation_step( + trans, + decoded_invocation_step_id + ) + return self.__encode_invocation_step( trans, invocation_step ) + + @expose_api + def workflow_invocation_step_update(self, trans, workflow_id, usage_id, step_id, payload, **kwd): + """ + PUT /api/workflows/{workflow_id}/usage/{usage_id}/steps/{step_id} + Update state of running workflow step invocation - still very nebulous + but this would be for stuff like confirming paused steps can proceed + etc.... + + + :param workflow_id: the workflow id (required) + :type workflow_id: str + + :param usage_id: the usage id (required) + :type usage_id: str + + :param step_id: encoded id of the WorkflowInvocationStep (required) + :type step_id: str + + :raises: exceptions.MessageException, exceptions.ObjectNotFound + """ + decoded_invocation_step_id = self.__decode_id( trans, step_id ) + action = payload.get( "action", None ) + + invocation_step = self.workflow_manager.update_invocation_step( + trans, + decoded_invocation_step_id, + action=action, + ) + return self.__encode_invocation_step( trans, invocation_step ) + + def __encode_invocation_step( self, trans, invocation_step ): + return self.encode_all_ids( + trans, + invocation_step.to_dict( 'element' ), + True + ) + def __get_stored_accessible_workflow( self, trans, workflow_id ): stored_workflow = self.__get_stored_workflow( trans, workflow_id ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/webapps/galaxy/buildapp.py --- a/lib/galaxy/webapps/galaxy/buildapp.py +++ b/lib/galaxy/webapps/galaxy/buildapp.py @@ -227,7 +227,12 @@ webapp.mapper.connect( 'import_shared_workflow_deprecated', '/api/workflows/import', controller='workflows', action='import_shared_workflow_deprecated', conditions=dict( method=['POST'] ) ) webapp.mapper.connect( 'workflow_usage', '/api/workflows/{workflow_id}/usage', controller='workflows', action='workflow_usage', conditions=dict(method=['GET'])) webapp.mapper.connect( 'workflow_usage_contents', '/api/workflows/{workflow_id}/usage/{usage_id}', controller='workflows', action='workflow_usage_contents', conditions=dict(method=['GET'])) + webapp.mapper.connect( 'cancel_workflow_invocation', '/api/workflows/{workflow_id}/usage/{usage_id}', controller='workflows', action='cancel_workflow_invocation', conditions=dict(method=['DELETE'])) + webapp.mapper.connect( 'workflow_invocation_step', '/api/workflows/{workflow_id}/usage/{usage_id}/steps/{step_id}', controller='workflows', action='workflow_invocation_step', conditions=dict(method=['GET'])) + webapp.mapper.connect( 'workflow_invocation_step_update', '/api/workflows/{workflow_id}/usage/{usage_id}/steps/{step_id}', controller='workflows', action='workflow_invocation_step_update', conditions=dict(method=['PUT'])) + + webapp.mapper.connect( 'workflow_request', '/api/workflows/{workflow_id}/usage', controller='workflows', action='workflow_request', conditions=dict( method=['POST'] ) ) # ============================ # ===== AUTHENTICATE API ===== # ============================ diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -1295,7 +1295,7 @@ copy_inputs_to_history=new_history is not None ) - outputs = invoke( + outputs, invocation = invoke( trans=trans, workflow=workflow, workflow_run_config=run_config diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/work/context.py --- /dev/null +++ b/lib/galaxy/work/context.py @@ -0,0 +1,46 @@ +from galaxy.managers.context import ( + ProvidesAppContext, + ProvidesUserContext, + ProvidesHistoryContext +) + + +class WorkRequestContext( ProvidesAppContext, ProvidesUserContext, ProvidesHistoryContext ): + """ Stripped down implementation of Galaxy web transaction god object for + work request handling outside of web threads - uses mix-ins shared with + GalaxyWebTransaction to provide app, user, and history context convience + methods - but nothing related to HTTP handling, mako views, etc.... + + Things that only need app shouldn't be consuming trans - but there is a + need for actions potentially tied to users and histories and hopefully + this can define that stripped down interface providing access to user and + history information - but not dealing with web request and response + objects. + """ + + def __init__( self, app, user=None, history=None ): + self.app = app + self.security = app.security + self.__user = user + self.__history = history + self.api_inherit_admin = False + + def get_history( self, create=False ): + if create: + raise NotImplementedError( "Cannot create histories from a work request context." ) + return self.__history + + def set_history( self ): + raise NotImplementedError( "Cannot change histories from a work request context." ) + + history = property( get_history, set_history ) + + def get_user( self ): + """Return the current user if logged in or None.""" + return self.__user + + def set_user( self, user ): + """Set the current user.""" + raise NotImplementedError( "Cannot change users from a work request context." ) + + user = property( get_user, set_user ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -171,6 +171,21 @@ """ raise TypeError( "Abstract method" ) + def do_invocation_step_action( self, step, action ): + """ Update or set the workflow invocation state action - generic + extension point meant to allows users to interact with interactive + workflow modules. The action object returned from this method will + be attached to the WorkflowInvocationStep and be available the next + time the workflow scheduler visits the workflow. + """ + raise exceptions.RequestParameterInvalidException( "Attempting to perform invocation step action on module that does not support actions." ) + + def recover_mapping( self, step, step_invocations, progress ): + """ Re-populate progress object with information about connections + from previously executed steps recorded via step_invocations. + """ + raise TypeError( "Abstract method" ) + class InputModule( WorkflowModule ): @@ -240,6 +255,19 @@ state.inputs = dict( input=None ) return state + def recover_runtime_state( self, runtime_state ): + """ Take secure runtime state from persisted invocation and convert it + into a DefaultToolState object for use during workflow invocation. + """ + fake_tool = Bunch( inputs=self.get_runtime_inputs() ) + state = galaxy.tools.DefaultToolState() + state.decode( runtime_state, fake_tool, self.trans.app, secure=False ) + return state + + def normalize_runtime_state( self, runtime_state ): + fake_tool = Bunch( inputs=self.get_runtime_inputs() ) + return runtime_state.encode( fake_tool, self.trans.app, secure=False ) + def encode_runtime_state( self, trans, state ): fake_tool = Bunch( inputs=self.get_runtime_inputs() ) return state.encode( fake_tool, trans.app ) @@ -291,6 +319,9 @@ progress.set_outputs_for_input( step, step_outputs ) return job + def recover_mapping( self, step, step_invocations, progress ): + progress.set_outputs_for_input( step ) + class InputDataModule( InputModule ): type = "data_input" @@ -439,6 +470,18 @@ ) self.state.inputs = self.tool.params_from_strings( state, app, **params_from_kwds ) + def recover_runtime_state( self, runtime_state ): + """ Take secure runtime state from persisted invocation and convert it + into a DefaultToolState object for use during workflow invocation. + """ + state = galaxy.tools.DefaultToolState() + app = self.trans.app + state.decode( runtime_state, self.tool, app, secure=False ) + return state + + def normalize_runtime_state( self, runtime_state ): + return runtime_state.encode( self.tool, self.trans.app, secure=False ) + @classmethod def __get_tool_version( cls, trans, tool_id ): # Return a ToolVersion if one exists for tool_id. @@ -664,7 +707,7 @@ param_combinations=param_combinations, history=invocation.history, collection_info=collection_info, - workflow_invocation_uuid=invocation.uuid + workflow_invocation_uuid=invocation.uuid.hex ) if collection_info: step_outputs = dict( execution_tracker.created_collections ) @@ -731,6 +774,23 @@ visit_input_values( self.tool.inputs, self.state.inputs, callback ) + def recover_mapping( self, step, step_invocations, progress ): + # Grab a job representing this invocation - for normal workflows + # there will be just one job but if this step was mapped over there + # may be many. + job_0 = step_invocations[ 0 ].job + + outputs = {} + for job_output in job_0.output_datasets: + replacement_name = job_output.name + replacement_value = job_output.dataset + # If was a mapping step, grab the output mapped collection for + # replacement instead. + if replacement_value.hidden_beneath_collection_instance: + replacement_value = replacement_value.hidden_beneath_collection_instance + outputs[ replacement_name ] = replacement_value + progress.set_step_outputs( step, outputs ) + class WorkflowModuleFactory( object ): @@ -796,6 +856,14 @@ module is missing. """ +class DelayedWorkflowEvaluation(Exception): + pass + + +class CancelWorkflowEvaluation(Exception): + pass + + class WorkflowModuleInjector(object): """ Injects workflow step objects from the ORM with appropriate module and module generated/influenced state. """ diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -6,54 +6,110 @@ from galaxy.util.odict import odict from galaxy.workflow import modules from galaxy.workflow.run_request import WorkflowRunConfig +from galaxy.workflow.run_request import workflow_run_config_to_request import logging log = logging.getLogger( __name__ ) -def invoke( trans, workflow, workflow_run_config, populate_state=False ): +def invoke( trans, workflow, workflow_run_config, workflow_invocation=None, populate_state=False ): """ Run the supplied workflow in the supplied target_history. """ if populate_state: modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map ) - return WorkflowInvoker( + invoker = WorkflowInvoker( trans, workflow, workflow_run_config, - ).invoke() + workflow_invocation=workflow_invocation, + ) + try: + outputs = invoker.invoke() + except modules.CancelWorkflowEvaluation: + if workflow_invocation: + if workflow_invocation.cancel(): + trans.sa_session.add( workflow_invocation ) + outputs = [] + except Exception: + log.exception("Failed to execute scheduled workflow.") + if workflow_invocation: + # Running workflow invocation in background, just mark + # persistent workflow invocation as failed. + workflow_invocation.fail() + trans.sa_session.add( workflow_invocation ) + else: + # Running new transient workflow invocation in legacy + # controller action - propage the exception up. + raise + outputs = [] + + if workflow_invocation: + # Be sure to update state of workflow_invocation. + trans.sa_session.flush() + + return outputs, invoker.workflow_invocation + + +def queue_invoke( trans, workflow, workflow_run_config, request_params ): + modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map ) + workflow_invocation = workflow_run_config_to_request( trans, workflow_run_config, workflow ) + workflow_invocation.workflow = workflow + return trans.app.workflow_scheduling_manager.queue( + workflow_invocation, + request_params + ) class WorkflowInvoker( object ): - def __init__( self, trans, workflow, workflow_run_config ): + def __init__( self, trans, workflow, workflow_run_config, workflow_invocation=None ): self.trans = trans self.workflow = workflow - workflow_invocation = model.WorkflowInvocation() - workflow_invocation.workflow = self.workflow - self.workflow_invocation = workflow_invocation - self.progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs ) + if workflow_invocation is None: + invocation_uuid = uuid.uuid1() - invocation_uuid = uuid.uuid1().hex + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.workflow = self.workflow - # In one way or another, following attributes will become persistent - # so they are available during delayed/revisited workflow scheduling. - self.workflow_invocation.uuid = invocation_uuid - self.workflow_invocation.history = workflow_run_config.target_history + # In one way or another, following attributes will become persistent + # so they are available during delayed/revisited workflow scheduling. + workflow_invocation.uuid = invocation_uuid + workflow_invocation.history = workflow_run_config.target_history + + self.workflow_invocation = workflow_invocation + else: + self.workflow_invocation = workflow_invocation + self.workflow_invocation.copy_inputs_to_history = workflow_run_config.copy_inputs_to_history self.workflow_invocation.replacement_dict = workflow_run_config.replacement_dict + module_injector = modules.WorkflowModuleInjector( trans ) + self.progress = WorkflowProgress( self.workflow_invocation, workflow_run_config.inputs, module_injector ) + def invoke( self ): workflow_invocation = self.workflow_invocation remaining_steps = self.progress.remaining_steps() + delayed_steps = False for step in remaining_steps: - jobs = self._invoke_step( step ) - for job in util.listify( jobs ): - # Record invocation - workflow_invocation_step = model.WorkflowInvocationStep() - workflow_invocation_step.workflow_invocation = workflow_invocation - workflow_invocation_step.workflow_step = step - workflow_invocation_step.job = job + jobs = None + try: + jobs = self._invoke_step( step ) + for job in (util.listify( jobs ) or [None]): + # Record invocation + workflow_invocation_step = model.WorkflowInvocationStep() + workflow_invocation_step.workflow_invocation = workflow_invocation + workflow_invocation_step.workflow_step = step + workflow_invocation_step.job = job + except modules.DelayedWorkflowEvaluation: + delayed_steps = True + self.progress.mark_step_outputs_delayed( step ) + + if delayed_steps: + state = model.WorkflowInvocation.states.READY + else: + state = model.WorkflowInvocation.states.SCHEDULED + workflow_invocation.state = state # All jobs ran successfully, so we can save now self.trans.sa_session.add( workflow_invocation ) @@ -66,18 +122,35 @@ jobs = step.module.execute( self.trans, self.progress, self.workflow_invocation, step ) return jobs +STEP_OUTPUT_DELAYED = object() + class WorkflowProgress( object ): - def __init__( self, workflow_invocation, inputs_by_step_id ): + def __init__( self, workflow_invocation, inputs_by_step_id, module_injector ): self.outputs = odict() + self.module_injector = module_injector self.workflow_invocation = workflow_invocation self.inputs_by_step_id = inputs_by_step_id def remaining_steps(self): + # Previously computed and persisted step states. + step_states = self.workflow_invocation.step_states_by_step_id() steps = self.workflow_invocation.workflow.steps + remaining_steps = [] + step_invocations_by_id = self.workflow_invocation.step_invocations_by_step_id() + for step in steps: + if not hasattr( step, 'module' ): + self.module_injector.inject( step ) + runtime_state = step_states[ step.id ].value + step.state = step.module.recover_runtime_state( runtime_state ) - return steps + invocation_steps = step_invocations_by_id.get( step.id, None ) + if invocation_steps: + self._recover_mapping( step, invocation_steps ) + else: + remaining_steps.append( step ) + return remaining_steps def replacement_for_tool_input( self, step, input, prefixed_name ): """ For given workflow 'step' that has had input_connections_by_name @@ -100,6 +173,8 @@ def replacement_for_connection( self, connection ): step_outputs = self.outputs[ connection.output_step.id ] + if step_outputs is STEP_OUTPUT_DELAYED: + raise modules.DelayedWorkflowEvaluation() return step_outputs[ connection.output_name ] def set_outputs_for_input( self, step, outputs={} ): @@ -111,5 +186,13 @@ def set_step_outputs(self, step, outputs): self.outputs[ step.id ] = outputs + def mark_step_outputs_delayed(self, step): + self.outputs[ step.id ] = STEP_OUTPUT_DELAYED + + def _recover_mapping( self, step, step_invocations ): + try: + step.module.recover_mapping( step, step_invocations, self ) + except modules.DelayedWorkflowEvaluation: + self.mark_step_outputs_delayed( step ) __all__ = [ invoke, WorkflowRunConfig ] diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/workflow/run_request.py --- a/lib/galaxy/workflow/run_request.py +++ b/lib/galaxy/workflow/run_request.py @@ -1,9 +1,15 @@ +import uuid + from galaxy import exceptions +from galaxy import model from galaxy.managers import histories INPUT_STEP_TYPES = [ 'data_input', 'data_collection_input' ] +import logging +log = logging.getLogger( __name__ ) + class WorkflowRunConfig( object ): """ Wrapper around all the ways a workflow execution can be parameterized. @@ -231,6 +237,96 @@ return run_config +def workflow_run_config_to_request( trans, run_config, workflow ): + param_types = model.WorkflowRequestInputParameter.types + + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.uuid = uuid.uuid1() + workflow_invocation.history = run_config.target_history + + def add_parameter( name, value, type ): + parameter = model.WorkflowRequestInputParameter( + name=name, + value=value, + type=type, + ) + workflow_invocation.input_parameters.append( parameter ) + + replacement_dict = run_config.replacement_dict + for name, value in replacement_dict.iteritems(): + add_parameter( + name=name, + value=value, + type=param_types.REPLACEMENT_PARAMETERS, + ) + + for step_id, content in run_config.inputs.iteritems(): + if content.history_content_type == "dataset": + request_to_content = model.WorkflowRequestToInputDatasetAssociation() + request_to_content.dataset = content + request_to_content.workflow_step_id = step_id + workflow_invocation.input_datasets.append( request_to_content ) + else: + request_to_content = model.WorkflowRequestToInputDatasetCollectionAssociation() + request_to_content.dataset_collection = content + request_to_content.workflow_step_id = step_id + workflow_invocation.input_dataset_collections.append( request_to_content ) + + for step in workflow.steps: + state = step.state + serializable_runtime_state = step.module.normalize_runtime_state( state ) + step_state = model.WorkflowRequestStepState() + step_state.workflow_step_id = step.id + step_state.value = serializable_runtime_state + workflow_invocation.step_states.append( step_state ) + + add_parameter( "copy_inputs_to_history", "true" if run_config.copy_inputs_to_history else "false", param_types.META_PARAMETERS ) + return workflow_invocation + + +def workflow_request_to_run_config( work_request_context, workflow_invocation ): + param_types = model.WorkflowRequestInputParameter.types + + history = workflow_invocation.history + replacement_dict = {} + inputs = {} + param_map = {} + copy_inputs_to_history = None + + for parameter in workflow_invocation.input_parameters: + parameter_type = parameter.type + + if parameter_type == param_types.REPLACEMENT_PARAMETERS: + replacement_dict[ parameter.name ] = parameter.value + elif parameter_type == param_types.META_PARAMETERS: + if parameter.name == "copy_inputs_to_history": + copy_inputs_to_history = (parameter.value == "true") + + #for parameter in workflow_invocation.step_parameters: + # step_id = parameter.workflow_step_id + # if step_id not in param_map: + # param_map[ step_id ] = {} + # param_map[ step_id ][ parameter.name ] = parameter.value + + for input_association in workflow_invocation.input_datasets: + inputs[ input_association.workflow_step_id ] = input_association.dataset + + for input_association in workflow_invocation.input_dataset_collections: + inputs[ input_association.workflow_step_id ] = input_association.dataset_collection + + if copy_inputs_to_history is None: + raise exceptions.InconsistentDatabase("Failed to find copy_inputs_to_history parameter loading workflow_invocation from database.") + + workflow_run_config = WorkflowRunConfig( + target_history=history, + replacement_dict=replacement_dict, + inputs=inputs, + param_map=param_map, + copy_inputs_to_history=copy_inputs_to_history, + ) + return workflow_run_config + + def __decode_id( trans, workflow_id, model_type="workflow" ): try: return trans.security.decode_id( workflow_id ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/workflow/schedulers/__init__.py --- /dev/null +++ b/lib/galaxy/workflow/schedulers/__init__.py @@ -0,0 +1,41 @@ +""" Module containing Galaxy workflow scheduling plugins. Galaxy's interface +for workflow scheduling is highly experimental and the interface required for +scheduling plugins will almost certainly change. +""" +from abc import ABCMeta +from abc import abstractmethod + + +class WorkflowSchedulingPlugin( object ): + """ A plugin defining how Galaxy should schedule plugins. By default + plugins are passive and should monitor Galaxy's work queue for + WorkflowRequests. Inherit from ActiveWorkflowSchedulingPlugin instead if + the scheduling plugin should be forced (i.e. if scheduling happen all at + once or the request will be stored and monitored outside of Galaxy.) + """ + __metaclass__ = ABCMeta + + @property + @abstractmethod + def plugin_type( self ): + """ Short string providing labelling this plugin """ + + def startup( self, app ): + """ Called when Galaxy starts up if the plugin is enabled. + """ + + def shutdown( self ): + """ Called when Galaxy is shutting down, workflow scheduling should + end. + """ + + +class ActiveWorkflowSchedulingPlugin( WorkflowSchedulingPlugin ): + __metaclass__ = ABCMeta + + @abstractmethod + def schedule( self, workflow_invocation ): + """ Optionally return one or more commands to instrument job. These + commands will be executed on the compute server prior to the job + running. + """ diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/workflow/schedulers/core.py --- /dev/null +++ b/lib/galaxy/workflow/schedulers/core.py @@ -0,0 +1,46 @@ +""" The class defines the stock Galaxy workflow scheduling plugin - currently +it simply schedules the whole workflow up front when offered. +""" +from ..schedulers import ActiveWorkflowSchedulingPlugin + +from galaxy.work import context + +from galaxy.workflow import run +from galaxy.workflow import run_request + +import logging +log = logging.getLogger( __name__ ) + + +class CoreWorkflowSchedulingPlugin( ActiveWorkflowSchedulingPlugin ): + plugin_type = "core" + + def __init__( self, **kwds ): + pass + + def startup( self, app ): + self.app = app + + def shutdown( self ): + pass + + def schedule( self, workflow_invocation ): + workflow = workflow_invocation.workflow + history = workflow_invocation.history + request_context = context.WorkRequestContext( + app=self.app, + history=history, + user=history.user + ) # trans-like object not tied to a web-thread. + workflow_run_config = run_request.workflow_request_to_run_config( + request_context, + workflow_invocation + ) + run.invoke( + trans=request_context, + workflow=workflow, + workflow_run_config=workflow_run_config, + workflow_invocation=workflow_invocation, + ) + +__all__ = [ CoreWorkflowSchedulingPlugin ] diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f lib/galaxy/workflow/scheduling_manager.py --- /dev/null +++ b/lib/galaxy/workflow/scheduling_manager.py @@ -0,0 +1,197 @@ +import os +import time +import logging +import threading + +from xml.etree import ElementTree + +from galaxy import model +from galaxy.util import plugin_config + +import galaxy.workflow.schedulers + +log = logging.getLogger( __name__ ) + +DEFAULT_SCHEDULER_ID = "default" # well actually this should be called DEFAULT_DEFAULT_SCHEDULER_ID... +DEFAULT_SCHEDULER_PLUGIN_TYPE = "core" + +EXCEPTION_MESSAGE_SHUTDOWN = "Exception raised while attempting to shutdown workflow scheduler." +EXCEPTION_MESSAGE_NO_SCHEDULERS = "Failed to defined workflow schedulers - no workflow schedulers defined." +EXCEPTION_MESSAGE_NO_DEFAULT_SCHEDULER = "Failed to defined workflow schedulers - no workflow scheduler found for default id '%s'." +EXCEPTION_MESSAGE_DUPLICATE_SCHEDULERS = "Failed to defined workflow schedulers - workflow scheduling plugin id '%s' duplicated." + + +class WorkflowSchedulingManager( object ): + """ A workflow scheduling manager based loosely on pattern established by + ``galaxy.manager.JobManager``. Only schedules workflows on handler + processes. + """ + + def __init__( self, app ): + self.app = app + self.__job_config = app.job_config + self.workflow_schedulers = {} + self.active_workflow_schedulers = {} # Passive workflow schedulers + # won't need to be monitored I + # guess. + self.request_monitor = None + + self.__plugin_classes = self.__plugins_dict() + self.__init_schedulers() + + if self._is_workflow_handler(): + log.debug("Starting workflow schedulers") + self.__start_schedulers() + if self.active_workflow_schedulers: + self.__start_request_monitor() + else: + # Process should not schedule workflows - do nothing. + pass + + # Provide a handler config-like interface by delegating to job handler + # config. Perhaps it makes sense to let there be explicit workflow + # handlers? + def _is_workflow_handler( self ): + return self.app.is_job_handler() + + def _get_handler( self ): + return self.__job_config.get_handler( None ) + + def shutdown( self ): + for workflow_scheduler in self.workflow_schedulers.itervalues(): + try: + workflow_scheduler.shutdown() + except Exception: + log.exception( EXCEPTION_MESSAGE_SHUTDOWN ) + if self.request_monitor: + try: + self.request_monitor.shutdown() + except Exception: + log.exception( "Failed to shutdown worklfow request monitor." ) + + def queue( self, workflow_invocation, request_params ): + workflow_invocation.state = model.WorkflowInvocation.states.NEW + scheduler = request_params.get( "scheduler", None ) or self.default_scheduler_id + handler = self._get_handler() + + workflow_invocation.scheduler = scheduler + workflow_invocation.handler = handler + + sa_session = self.app.model.context + sa_session.add( workflow_invocation ) + sa_session.flush() + return workflow_invocation + + def __start_schedulers( self ): + for workflow_scheduler in self.workflow_schedulers.itervalues(): + workflow_scheduler.startup( self.app ) + + def __plugins_dict( self ): + return plugin_config.plugins_dict( galaxy.workflow.schedulers, 'plugin_type' ) + + def __init_schedulers( self ): + config_file = self.app.config.workflow_schedulers_config_file + use_default_scheduler = False + if not config_file: + log.info( "Not workflow schedulers plugin config file defined, using default scheduler." ) + use_default_scheduler = True + elif not os.path.exists( config_file ): + log.info( "Cannot find workflow schedulers plugin config file '%s', using default scheduler." % config_file ) + use_default_scheduler = True + + if use_default_scheduler: + self.__init_default_scheduler() + else: + plugins_element = ElementTree.parse( config_file ).getroot() + self.__init_schedulers_for_element( plugins_element ) + + def __init_default_scheduler( self ): + self.default_scheduler_id = DEFAULT_SCHEDULER_ID + self.__init_plugin( DEFAULT_SCHEDULER_PLUGIN_TYPE ) + + def __init_schedulers_for_element( self, plugins_element ): + plugins_kwds = dict( plugins_element.items() ) + self.default_scheduler_id = plugins_kwds.get( 'default', DEFAULT_SCHEDULER_ID ) + + for plugin_element in plugins_element.getchildren(): + plugin_type = plugin_element.tag + plugin_kwds = dict( plugin_element.items() ) + plugin_kwds.update( self.extra_kwargs ) + workflow_scheduler_id = plugin_kwds.get( 'id', None ) + self.__init_plugin( plugin_type, workflow_scheduler_id, **plugin_kwds ) + + if not self.workflow_schedulers: + raise Exception( EXCEPTION_MESSAGE_NO_SCHEDULERS ) + if self.default_scheduler_id not in self.workflow_schedulers: + raise Exception( EXCEPTION_MESSAGE_NO_DEFAULT_SCHEDULER % self.default_scheduler_id ) + + def __init_plugin( self, plugin_type, workflow_scheduler_id=None, **kwds ): + workflow_scheduler_id = workflow_scheduler_id or self.default_scheduler_id + + if workflow_scheduler_id in self.workflow_schedulers: + raise Exception( EXCEPTION_MESSAGE_DUPLICATE_SCHEDULERS % workflow_scheduler_id ) + + workflow_scheduler = self.__plugin_classes[ plugin_type ]( **kwds ) + self.workflow_schedulers[ workflow_scheduler_id ] = workflow_scheduler + if isinstance( workflow_scheduler, galaxy.workflow.schedulers.ActiveWorkflowSchedulingPlugin ): + self.active_workflow_schedulers[ workflow_scheduler_id ] = workflow_scheduler + + def __start_request_monitor( self ): + self.request_monitor = WorkflowRequestMonitor( self.app, self ) + + +class WorkflowRequestMonitor( object ): + + def __init__( self, app, workflow_scheduling_manager ): + self.app = app + self.active = True + self.workflow_scheduling_manager = workflow_scheduling_manager + self.monitor_thread = threading.Thread( name="WorkflowRequestMonitor.monitor_thread", target=self.__monitor ) + self.monitor_thread.setDaemon( True ) + self.monitor_thread.start() + + def __monitor( self ): + to_monitor = self.workflow_scheduling_manager.active_workflow_schedulers + while self.active: + for workflow_scheduler_id, workflow_scheduler in to_monitor.iteritems(): + if not self.active: + return + + self.__schedule( workflow_scheduler_id, workflow_scheduler ) + time.sleep(1) # TODO: wake if stopped + + def __schedule( self, workflow_scheduler_id, workflow_scheduler ): + invocation_ids = self.__active_invocation_ids( workflow_scheduler_id ) + for invocation_id in invocation_ids: + self.__attempt_schedule( invocation_id, workflow_scheduler ) + if not self.active: + return + + def __attempt_schedule( self, invocation_id, workflow_scheduler ): + sa_session = self.app.model.context + workflow_invocation = sa_session.query( model.WorkflowInvocation ).get( invocation_id ) + + if not workflow_invocation or not workflow_invocation.active: + return False + + try: + workflow_scheduler.schedule( workflow_invocation ) + except Exception: + # TODO: eventually fail this - or fail it right away? + log.exception( "Exception raised while attempting to schedule workflow request." ) + return False + + # A workflow was obtained and scheduled... + return True + + def __active_invocation_ids( self, scheduler_id ): + sa_session = self.app.model.context + handler = self.app.config.server_name + return model.WorkflowInvocation.poll_active_workflow_ids( + sa_session, + scheduler=scheduler_id, + handler=handler, + ) + + def shutdown( self ): + self.active = False diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -135,9 +135,28 @@ # TODO: This should really be a post to workflows/<workflow_id>/run or # something like that. run_workflow_response = self._post( "workflows", data=workflow_request ) + + invocation_id = run_workflow_response.json()[ "id" ] + invocation = self._invocation_details( workflow_request[ "workflow_id" ], invocation_id ) + assert invocation[ "state" ] == "scheduled", invocation + self._assert_status_code_is( run_workflow_response, 200 ) self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + def test_workflow_request( self ): + workflow = self.workflow_populator.load_workflow( name="test_for_queue" ) + workflow_request, history_id = self._setup_workflow_run( workflow ) + # TODO: This should really be a post to workflows/<workflow_id>/run or + # something like that. + url = "workflows/%s/request" % ( workflow_request[ "workflow_id" ] ) + del workflow_request[ "workflow_id" ] + run_workflow_response = self._post( url, data=workflow_request ) + + self._assert_status_code_is( run_workflow_response, 200 ) + # Give some time for workflow to get scheduled before scanning the history. + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + def test_cannot_run_inaccessible_workflow( self ): workflow = self.workflow_populator.load_workflow( name="test_for_run_cannot_access" ) workflow_request, history_id = self._setup_workflow_run( workflow ) @@ -563,11 +582,18 @@ @skip_without_tool( "cat1" ) def test_invocation_usage( self ): workflow_id, usage = self._run_workflow_once_get_invocation( "test_usage") - usage_details = self._invocation_details( workflow_id, usage[ "id" ] ) + invocation_id = usage[ "id" ] + usage_details = self._invocation_details( workflow_id, invocation_id ) # Assert some high-level things about the structure of data returned. self._assert_has_keys( usage_details, "inputs", "steps" ) - for step in usage_details[ "steps" ]: + invocation_steps = usage_details[ "steps" ] + for step in invocation_steps: self._assert_has_keys( step, "workflow_step_id", "order_index", "id" ) + an_invocation_step = invocation_steps[ 0 ] + step_id = an_invocation_step[ "id" ] + step_response = self._get( "workflows/%s/usage/%s/steps/%s" % ( workflow_id, invocation_id, step_id ) ) + self._assert_status_code_is( step_response, 200 ) + self._assert_has_keys( step_response.json(), "id", "order_index" ) def _invocation_details( self, workflow_id, invocation_id ): invocation_details_response = self._get( "workflows/%s/usage/%s" % ( workflow_id, invocation_id ) ) diff -r d1864ea200ce16992c26a4a642381be71310c4a2 -r f6dcea371b036e423488bf5478e3395d5501fd8f test/unit/test_galaxy_mapping.py --- a/test/unit/test_galaxy_mapping.py +++ b/test/unit/test_galaxy_mapping.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import unittest import galaxy.model.mapping as mapping +import uuid class MappingTests( unittest.TestCase ): @@ -358,6 +359,54 @@ assert contents_iter_names( ids=[ d1.id, d3.id ] ) == [ "1", "3" ] + def test_workflows( self ): + model = self.model + user = model.User( + email="testworkflows@bx.psu.edu", + password="password" + ) + stored_workflow = model.StoredWorkflow() + stored_workflow.user = user + workflow = model.Workflow() + workflow_step = model.WorkflowStep() + workflow.steps = [ workflow_step ] + workflow.stored_workflow = stored_workflow + + self.persist( workflow ) + assert workflow_step.id is not None + + invocation_uuid = uuid.uuid1() + + workflow_invocation = model.WorkflowInvocation() + workflow_invocation.uuid = invocation_uuid + + workflow_invocation_step1 = model.WorkflowInvocationStep() + workflow_invocation_step1.workflow_invocation = workflow_invocation + workflow_invocation_step1.workflow_step = workflow_step + + workflow_invocation_step2 = model.WorkflowInvocationStep() + workflow_invocation_step2.workflow_invocation = workflow_invocation + workflow_invocation_step2.workflow_step = workflow_step + + workflow_invocation.workflow = workflow + + h1 = model.History( name="WorkflowHistory1", user=user) + d1 = self.new_hda( h1, name="1" ) + workflow_request_dataset = model.WorkflowRequestToInputDatasetAssociation() + workflow_request_dataset.workflow_invocation = workflow_invocation + workflow_request_dataset.workflow_step = workflow_step + workflow_request_dataset.dataset = d1 + self.persist( workflow_invocation ) + assert workflow_request_dataset is not None + assert workflow_invocation.id is not None + + self.expunge() + + loaded_invocation = self.query( model.WorkflowInvocation ).get( workflow_invocation.id ) + assert loaded_invocation.uuid == invocation_uuid, "%s != %s" % (loaded_invocation.uuid, invocation_uuid) + assert loaded_invocation + assert len( loaded_invocation.steps ) == 2 + def new_hda( self, history, **kwds ): return history.add_dataset( self.model.HistoryDatasetAssociation( create_dataset=True, sa_session=self.model.session, **kwds ) ) https://bitbucket.org/galaxy/galaxy-central/commits/504aad0d9688/ Changeset: 504aad0d9688 User: jmchilton Date: 2014-11-13 18:47:39+00:00 Summary: Update workflow invocations when their jobs complete. This could allow more efficient checks when recovering workflow state. Affected #: 3 files diff -r f6dcea371b036e423488bf5478e3395d5501fd8f -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -918,7 +918,7 @@ self.pause( dep_job_assoc.job, "Execution of this dataset's job is paused because its input datasets are in an error state." ) self.sa_session.add( dataset ) self.sa_session.flush() - job.state = job.states.ERROR + job.set_final_state( job.states.ERROR ) job.command_line = self.command_line job.info = message # TODO: Put setting the stdout, stderr, and exit code in one place @@ -1243,7 +1243,7 @@ # Finally set the job state. This should only happen *after* all # dataset creation, and will allow us to eliminate force_history_refresh. - job.state = final_job_state + job.set_final_state( final_job_state ) if not job.tasks: # If job was composed of tasks, don't attempt to recollect statisitcs self._collect_metrics( job ) diff -r f6dcea371b036e423488bf5478e3395d5501fd8f -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 lib/galaxy/jobs/handler.py --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -680,11 +680,11 @@ except Empty: pass for job, error_msg in jobs_to_check: + final_state = job.states.DELETED if error_msg is not None: - job.state = job.states.ERROR + final_state = job.states.ERROR job.info = error_msg - else: - job.state = job.states.DELETED + job.set_final_state( final_state ) self.sa_session.add( job ) self.sa_session.flush() if job.job_runner_name is not None: diff -r f6dcea371b036e423488bf5478e3395d5501fd8f -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -28,6 +28,7 @@ import galaxy.security.passwords from galaxy.datatypes.metadata import MetadataCollection from galaxy.model.item_attrs import Dictifiable, UsesAnnotations +import galaxy.model.orm.now from galaxy.security import get_permitted_actions from galaxy.util import is_multi_byte, nice_size, Params, restore_text, send_mail from galaxy.util import ready_name_for_url @@ -551,6 +552,11 @@ return rval + def set_final_state( self, final_state ): + self.state = final_state + if self.workflow_invocation_step: + self.workflow_invocation_step.update() + class Task( object, HasJobMetrics ): """ @@ -3188,11 +3194,17 @@ rval['inputs'] = inputs return rval + def update( self ): + self.update_time = galaxy.model.orm.now.now() + class WorkflowInvocationStep( object, Dictifiable ): dict_collection_visible_keys = ( 'id', 'update_time', 'job_id', 'workflow_step_id', 'action' ) dict_element_visible_keys = ( 'id', 'update_time', 'job_id', 'workflow_step_id', 'action' ) + def update( self ): + self.workflow_invocation.update() + def to_dict( self, view='collection', value_mapper=None ): rval = super( WorkflowInvocationStep, self ).to_dict( view=view, value_mapper=value_mapper ) rval['order_index'] = self.workflow_step.order_index https://bitbucket.org/galaxy/galaxy-central/commits/5324504e9175/ Changeset: 5324504e9175 User: jmchilton Date: 2014-11-13 18:47:39+00:00 Summary: Implement pause module for workflows. New module type that pauses a workflow and gives time for the runner to review it before proceeding with execution. - Introduce concept of beta workflow modules - I guess we should just keep the pause module as beta until their is a UI to support it. - Extracts base class ouptut of InputModule for modules that are "simple" - i.e. their configuration state is represented as a dictionary and configuration form is rendered via the generic template. - Test Cases (for this changeset and a bunch of stuff that is now testable with the pause module in place from the scheduling framework commit). Affected #: 5 files diff -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 -r 5324504e91756088158af4214bbe8592daa879ff lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -192,6 +192,11 @@ self.local_task_queue_workers = int(kwargs.get("local_task_queue_workers", 2)) # The transfer manager and deferred job queue self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) ) + # These workflow modules should not be considered part of Galaxy's + # public API yet - the module state definitions may chnage and + # workflows built using these modules may not function in the + # future. + self.enable_beta_workflow_modules = string_as_bool( kwargs.get( 'enable_beta_workflow_modules', 'False' ) ) # Per-user Job concurrency limitations self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) ) self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) ) diff -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 -r 5324504e91756088158af4214bbe8592daa879ff lib/galaxy/workflow/modules.py --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -187,7 +187,7 @@ raise TypeError( "Abstract method" ) -class InputModule( WorkflowModule ): +class SimpleWorkflowModule( WorkflowModule ): @classmethod def new( Class, trans, tool_id=None ): @@ -215,46 +215,17 @@ """ raise TypeError( "Abstract method" ) - def get_runtime_input_dicts( self, step_annotation ): - name = self.state.get( "name", self.default_name ) - return [ dict( name=name, description=step_annotation ) ] - - def recover_state( self, state, **kwds ): - """ Recover state `dict` from simple dictionary describing configuration - state (potentially from persisted step state). - - Sub-classes should supply `default_state` method and `state_fields` - attribute which are used to build up the state `dict`. - """ - self.state = self.default_state() - for key in self.state_fields: - if state and key in state: - self.state[ key ] = state[ key ] - def save_to_step( self, step ): step.type = self.type step.tool_id = None step.tool_inputs = self.state - def get_data_inputs( self ): - return [] - - def get_config_form( self ): - form = self._abstract_config_form( ) - return self.trans.fill_template( "workflow/editor_generic_form.mako", - module=self, form=form ) - def get_state( self, secure=True ): return dumps( self.state ) def update_state( self, incoming ): self.recover_state( incoming ) - def get_runtime_state( self ): - state = galaxy.tools.DefaultToolState() - state.inputs = dict( input=None ) - return state - def recover_runtime_state( self, runtime_state ): """ Take secure runtime state from persisted invocation and convert it into a DefaultToolState object for use during workflow invocation. @@ -275,7 +246,8 @@ def decode_runtime_state( self, trans, string ): fake_tool = Bunch( inputs=self.get_runtime_inputs() ) state = galaxy.tools.DefaultToolState() - state.decode( string, fake_tool, trans.app ) + if string: + state.decode( string, fake_tool, trans.app ) return state def update_runtime_state( self, trans, state, values ): @@ -288,7 +260,7 @@ return errors def compute_runtime_state( self, trans, step_updates=None ): - if step_updates: + if step_updates and "tool_state" in step_updates: # Fix this for multiple inputs state = self.decode_runtime_state( trans, step_updates.pop( "tool_state" ) ) step_errors = self.update_runtime_state( trans, state, step_updates ) @@ -298,6 +270,38 @@ return state, step_errors + def recover_state( self, state, **kwds ): + """ Recover state `dict` from simple dictionary describing configuration + state (potentially from persisted step state). + + Sub-classes should supply `default_state` method and `state_fields` + attribute which are used to build up the state `dict`. + """ + self.state = self.default_state() + for key in self.state_fields: + if state and key in state: + self.state[ key ] = state[ key ] + + def get_config_form( self ): + form = self._abstract_config_form( ) + return self.trans.fill_template( "workflow/editor_generic_form.mako", + module=self, form=form ) + + +class InputModule( SimpleWorkflowModule ): + + def get_runtime_state( self ): + state = galaxy.tools.DefaultToolState() + state.inputs = dict( input=None ) + return state + + def get_runtime_input_dicts( self, step_annotation ): + name = self.state.get( "name", self.default_name ) + return [ dict( name=name, description=step_annotation ) ] + + def get_data_inputs( self ): + return [] + def execute( self, trans, progress, invocation, step ): job, step_outputs = None, dict( output=step.state.inputs['input']) @@ -389,6 +393,76 @@ return [ dict( name='output', extensions=['input_collection'], collection_type=self.state[ 'collection_type' ] ) ] +class PauseModule( SimpleWorkflowModule ): + """ Initially this module will unconditionally pause a workflow - will aim + to allow conditional pausing later on. + """ + type = "pause" + name = "Pause for dataset review" + default_name = "Pause for Dataset Review" + state_fields = [ "name" ] + + @classmethod + def default_state( Class ): + return dict( name=Class.default_name ) + + def get_data_inputs( self ): + input = dict( + name="input", + label="Dataset for Review", + multiple=False, + extensions='input', + input_type="dataset", + ) + return [ input ] + + def get_data_outputs( self ): + return [ dict( name="output", label="Reviewed Dataset", extensions=['input'] ) ] + + def _abstract_config_form( self ): + form = formbuilder.FormBuilder( + title=self.name + ).add_text( "name", "Name", value=self.state['name'] ) + return form + + def get_runtime_inputs( self, **kwds ): + return dict( ) + + def get_runtime_input_dicts( self, step_annotation ): + return [] + + def get_runtime_state( self ): + state = galaxy.tools.DefaultToolState() + state.inputs = dict( ) + return state + + def execute( self, trans, progress, invocation, step ): + progress.mark_step_outputs_delayed( step ) + return None + + def recover_mapping( self, step, step_invocations, progress ): + if step_invocations: + step_invocation = step_invocations[0] + action = step_invocation.action + if action: + connection = step.input_connections_by_name[ "input" ][ 0 ] + replacement = progress.replacement_for_connection( connection ) + progress.set_step_outputs( step, { 'output': replacement } ) + return + elif action is False: + raise CancelWorkflowEvaluation() + raise DelayedWorkflowEvaluation() + + def do_invocation_step_action( self, step, action ): + """ Update or set the workflow invocation state action - generic + extension point meant to allows users to interact with interactive + workflow modules. The action object returned from this method will + be attached to the WorkflowInvocationStep and be available the next + time the workflow scheduler visits the workflow. + """ + return bool( action ) + + class ToolModule( WorkflowModule ): type = "tool" @@ -828,6 +902,7 @@ module_types = dict( data_input=InputDataModule, data_collection_input=InputDataCollectionModule, + pause=PauseModule, tool=ToolModule, ) module_factory = WorkflowModuleFactory( module_types ) @@ -848,6 +923,16 @@ module_sections = [ inputs_section ] + if trans.app.config.enable_beta_workflow_modules: + experimental_modules = { + "name": "experimental", + "title": "Experimental", + "modules": [ + {"name": "pause", "title": "Pause Workflow for Dataset Review", "description": "Pause for Review"}, + ], + } + module_sections.append(experimental_modules) + return module_sections @@ -901,7 +986,6 @@ # Populate module. module = step.module = module_factory.from_workflow_step( trans, step ) - # Calculating step errors and state depends on whether step is a tool step or not. if not module: step.module = None step.state = None diff -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 -r 5324504e91756088158af4214bbe8592daa879ff templates/webapps/galaxy/workflow/editor.mako --- a/templates/webapps/galaxy/workflow/editor.mako +++ b/templates/webapps/galaxy/workflow/editor.mako @@ -353,7 +353,6 @@ </div></div> %endfor - </div></div></div> diff -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 -r 5324504e91756088158af4214bbe8592daa879ff test/api/test_workflow_pause.ga --- /dev/null +++ b/test/api/test_workflow_pause.ga @@ -0,0 +1,118 @@ +{ + "a_galaxy_workflow": "true", + "annotation": "", + "format-version": "0.1", + "name": "test_workflow_pause", + "steps": { + "0": { + "annotation": "", + "id": 0, + "input_connections": {}, + "inputs": [ + { + "description": "", + "name": "Input Dataset" + } + ], + "name": "Input dataset", + "outputs": [], + "position": { + "left": 199.9201512336731, + "top": 251.4826512336731 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"name\": \"Input Dataset\"}", + "tool_version": null, + "type": "data_input", + "user_outputs": [] + }, + "1": { + "annotation": "", + "id": 1, + "input_connections": { + "input1": { + "id": 0, + "output_name": "output" + } + }, + "inputs": [], + "name": "Concatenate datasets (for test workflows)", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 516.7257237434387, + "top": 187.28126573562622 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "cat", + "tool_state": "{\"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"null\", \"queries\": \"[]\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + }, + "2": { + "annotation": "", + "id": 2, + "input_connections": { + "input": { + "id": 1, + "output_name": "out_file1" + } + }, + "inputs": [ + { + "description": "", + "name": "Pause for Dataset Review" + } + ], + "name": "Pause for dataset review", + "outputs": [], + "position": { + "left": 862.715301990509, + "top": 197.28126573562622 + }, + "tool_errors": null, + "tool_id": null, + "tool_state": "{\"name\": \"Pause for Dataset Review\"}", + "tool_version": null, + "type": "pause", + "user_outputs": [] + }, + "3": { + "annotation": "", + "id": 3, + "input_connections": { + "input1": { + "id": 2, + "output_name": "output" + } + }, + "inputs": [], + "name": "Concatenate datasets (for test workflows)", + "outputs": [ + { + "name": "out_file1", + "type": "input" + } + ], + "position": { + "left": 1181.9722595214844, + "top": 181.52084350585938 + }, + "post_job_actions": {}, + "tool_errors": null, + "tool_id": "cat1", + "tool_state": "{\"__page__\": 0, \"__rerun_remap_job_id__\": null, \"input1\": \"null\", \"queries\": \"[]\"}", + "tool_version": "1.0.0", + "type": "tool", + "user_outputs": [] + } + }, + "uuid": "9058956e-76b6-4909-bab3-c12b2cc394c7" +} \ No newline at end of file diff -r 504aad0d96887e8eb7e9fee9445bc1a968fccbb8 -r 5324504e91756088158af4214bbe8592daa879ff test/api/test_workflows.py --- a/test/api/test_workflows.py +++ b/test/api/test_workflows.py @@ -9,6 +9,7 @@ from .helpers import skip_without_tool from requests import delete +from requests import put from galaxy.exceptions import error_codes @@ -146,9 +147,7 @@ def test_workflow_request( self ): workflow = self.workflow_populator.load_workflow( name="test_for_queue" ) workflow_request, history_id = self._setup_workflow_run( workflow ) - # TODO: This should really be a post to workflows/<workflow_id>/run or - # something like that. - url = "workflows/%s/request" % ( workflow_request[ "workflow_id" ] ) + url = "workflows/%s/usage" % ( workflow_request[ "workflow_id" ] ) del workflow_request[ "workflow_id" ] run_workflow_response = self._post( url, data=workflow_request ) @@ -157,6 +156,110 @@ time.sleep( 5 ) self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + def test_workflow_pause( self ): + workflow = self.workflow_populator.load_workflow_from_resource( "test_workflow_pause" ) + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) + history_id = self.dataset_populator.new_history() + hda1 = self.dataset_populator.new_dataset( history_id, content="1 2 3" ) + index_map = { + '0': self._ds_entry(hda1), + } + invocation_id = self.__invoke_workflow( history_id, uploaded_workflow_id, index_map ) + # Give some time for workflow to get scheduled before scanning the history. + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + + # Wait for all the datasets to complete, make sure the workflow invocation + # is not complete. + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] != 'scheduled', invocation + + self.__review_paused_steps( uploaded_workflow_id, invocation_id, order_index=2, action=True ) + + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] == 'scheduled', invocation + + def test_workflow_pause_cancel( self ): + workflow = self.workflow_populator.load_workflow_from_resource( "test_workflow_pause" ) + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) + history_id = self.dataset_populator.new_history() + hda1 = self.dataset_populator.new_dataset( history_id, content="1 2 3" ) + index_map = { + '0': self._ds_entry(hda1), + } + invocation_id = self.__invoke_workflow( history_id, uploaded_workflow_id, index_map ) + # Give some time for workflow to get scheduled before scanning the history. + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + + # Wait for all the datasets to complete, make sure the workflow invocation + # is not complete. + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] != 'scheduled' + + self.__review_paused_steps( uploaded_workflow_id, invocation_id, order_index=2, action=False ) + # Not immediately cancelled, must wait until workflow scheduled again. + time.sleep( 4 ) + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] == 'cancelled', invocation + + def test_workflow_map_reduce_pause( self ): + workflow = self.workflow_populator.load_workflow_from_resource( "test_workflow_map_reduce_pause" ) + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) + history_id = self.dataset_populator.new_history() + hda1 = self.dataset_populator.new_dataset( history_id, content="reviewed\nunreviewed" ) + hdca1 = self.dataset_collection_populator.create_list_in_history( history_id, contents=["1\n2\n3", "4\n5\n6"] ).json() + index_map = { + '0': self._ds_entry(hda1), + '1': self._ds_entry(hdca1), + } + invocation_id = self.__invoke_workflow( history_id, uploaded_workflow_id, index_map ) + # Give some time for workflow to get scheduled before scanning the history. + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + + # Wait for all the datasets to complete, make sure the workflow invocation + # is not complete. + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] != 'scheduled' + + self.__review_paused_steps( uploaded_workflow_id, invocation_id, order_index=4, action=True ) + + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] == 'scheduled' + self.assertEquals("reviewed\n1\nreviewed\n4\n", self.dataset_populator.get_history_dataset_content( history_id ) ) + + def test_cancel_workflow_invocation( self ): + workflow = self.workflow_populator.load_workflow_from_resource( "test_workflow_pause" ) + uploaded_workflow_id = self.workflow_populator.create_workflow( workflow ) + history_id = self.dataset_populator.new_history() + hda1 = self.dataset_populator.new_dataset( history_id, content="1 2 3" ) + index_map = { + '0': self._ds_entry(hda1), + } + invocation_id = self.__invoke_workflow( history_id, uploaded_workflow_id, index_map ) + # Give some time for workflow to get scheduled before scanning the history. + time.sleep( 5 ) + self.dataset_populator.wait_for_history( history_id, assert_ok=True ) + + # Wait for all the datasets to complete, make sure the workflow invocation + # is not complete. + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] != 'scheduled' + + invocation_url = self._api_url( "workflows/%s/usage/%s" % (uploaded_workflow_id, invocation_id), use_key=True ) + delete_response = delete( invocation_url ) + self._assert_status_code_is( delete_response, 200 ) + + # Wait for all the datasets to complete, make sure the workflow invocation + # is not complete. + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + assert invocation[ 'state' ] == 'cancelled' + def test_cannot_run_inaccessible_workflow( self ): workflow = self.workflow_populator.load_workflow( name="test_for_run_cannot_access" ) workflow_request, history_id = self._setup_workflow_run( workflow ) @@ -601,6 +704,21 @@ invocation_details = invocation_details_response.json() return invocation_details + def _invocation_step_details( self, workflow_id, invocation_id, step_id ): + invocation_step_response = self._get( "workflows/%s/usage/%s/steps/%s" % ( workflow_id, invocation_id, step_id ) ) + self._assert_status_code_is( invocation_step_response, 200 ) + invocation_step_details = invocation_step_response.json() + return invocation_step_details + + def _execute_invocation_step_action( self, workflow_id, invocation_id, step_id, action ): + raw_url = "workflows/%s/usage/%s/steps/%s" % ( workflow_id, invocation_id, step_id ) + url = self._api_url( raw_url, use_key=True ) + payload = dumps( dict( action=action ) ) + action_response = put( url, data=payload ) + self._assert_status_code_is( action_response, 200 ) + invocation_step_details = action_response.json() + return invocation_step_details + def _run_workflow_once_get_invocation( self, name ): workflow = self.workflow_populator.load_workflow( name=name ) workflow_request, history_id = self._setup_workflow_run( workflow ) @@ -689,6 +807,15 @@ names = self.__workflow_names() assert name in names, "No workflows with name %s in users workflows <%s>" % ( name, names ) + def __review_paused_steps( self, uploaded_workflow_id, invocation_id, order_index, action=True ): + invocation = self._invocation_details( uploaded_workflow_id, invocation_id ) + invocation_steps = invocation[ "steps" ] + pause_steps = [ s for s in invocation_steps if s[ 'order_index' ] == order_index ] + for pause_step in pause_steps: + pause_step_id = pause_step[ 'id' ] + + self._execute_invocation_step_action( uploaded_workflow_id, invocation_id, pause_step_id, action=action ) + def __assert_lines_hid_line_count_is( self, history, hid, lines ): contents_url = "histories/%s/contents" % history history_contents_response = self._get( contents_url ) @@ -698,6 +825,22 @@ self._assert_status_code_is( hda_info_response, 200 ) self.assertEquals( hda_info_response.json()[ "metadata_data_lines" ], lines ) + def __invoke_workflow( self, history_id, workflow_id, inputs, assert_ok=True ): + workflow_request = dict( + history="hist_id=%s" % history_id, + ) + workflow_request[ "inputs" ] = dumps( inputs ) + workflow_request[ "inputs_by" ] = 'step_index' + url = "workflows/%s/usage" % ( workflow_id ) + + invocation_response = self._post( url, data=workflow_request ) + if assert_ok: + self._assert_status_code_is( invocation_response, 200 ) + invocation_id = invocation_response.json()[ "id" ] + return invocation_id + else: + return invocation_response + def __workflow_names( self ): index_response = self._get( "workflows" ) self._assert_status_code_is( index_response, 200 ) https://bitbucket.org/galaxy/galaxy-central/commits/45eae5a7f3ba/ Changeset: 45eae5a7f3ba User: jmchilton Date: 2014-11-13 18:47:40+00:00 Summary: Conditionally force traditional workflow invocations paths through scheduling framework. Workflow with pause modules have to be run through the new workflow scheduling plugin framework - so these will always force jobs to be queued in the background. The deployer can also now choose to have workflows scheduled this way if a workflow has a minimum number of steps (set at a very high 250 right now) or if it uses collections (currently this defaults to False). These options are meant as transient options that will disappear once all workflows a scheduled this way - but there needs to be better UI in place before making that switch as well as more testing. These options are just meant to give people running really large workflows a way to start exploring the scheduling framework option. Affected #: 5 files diff -r 5324504e91756088158af4214bbe8592daa879ff -r 45eae5a7f3baff113f5d264a40117e991e083212 lib/galaxy/config.py --- a/lib/galaxy/config.py +++ b/lib/galaxy/config.py @@ -193,10 +193,19 @@ # The transfer manager and deferred job queue self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) ) # These workflow modules should not be considered part of Galaxy's - # public API yet - the module state definitions may chnage and + # public API yet - the module state definitions may change and # workflows built using these modules may not function in the # future. self.enable_beta_workflow_modules = string_as_bool( kwargs.get( 'enable_beta_workflow_modules', 'False' ) ) + + # Certain modules such as the pause module will automatically cause + # workflows to be scheduled in job handlers the way all workflows will + # be someday - the following two properties can also be used to force this + # behavior in under conditions - namely for workflows that have a minimum + # number of steps or that consume collections. + self.force_beta_workflow_scheduled_min_steps = int( kwargs.get( 'force_beta_workflow_scheduled_min_steps', '250' ) ) + self.force_beta_workflow_scheduled_for_collections = string_as_bool( kwargs.get( 'force_beta_workflow_scheduled_for_collections', 'False' ) ) + # Per-user Job concurrency limitations self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) ) self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) ) diff -r 5324504e91756088158af4214bbe8592daa879ff -r 45eae5a7f3baff113f5d264a40117e991e083212 lib/galaxy/webapps/galaxy/controllers/workflow.py --- a/lib/galaxy/webapps/galaxy/controllers/workflow.py +++ b/lib/galaxy/webapps/galaxy/controllers/workflow.py @@ -1250,6 +1250,7 @@ try: # use a try/finally block to restore the user's current history default_target_history = trans.get_history() module_injector = WorkflowModuleInjector( trans ) + scheduled = True if kwargs: # If kwargs were provided, the states for each step should have # been POSTed @@ -1300,13 +1301,18 @@ workflow=workflow, workflow_run_config=run_config ) - + invocation_state = invocation.state + # Just use last invocation - right now not really + # possible to have some invocations scheduled and not + # others. + scheduled = invocation_state == model.WorkflowInvocation.states.SCHEDULED invocations.append({'outputs': outputs, 'new_history': new_history}) trans.sa_session.flush() if invocations: return trans.fill_template( "workflow/run_complete.mako", workflow=stored, + scheduled=scheduled, invocations=invocations ) else: # Prepare each step diff -r 5324504e91756088158af4214bbe8592daa879ff -r 45eae5a7f3baff113f5d264a40117e991e083212 lib/galaxy/workflow/run.py --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -12,7 +12,48 @@ log = logging.getLogger( __name__ ) +# Entry point for direct invoke via controllers. Deprecated to some degree. def invoke( trans, workflow, workflow_run_config, workflow_invocation=None, populate_state=False ): + if force_queue( trans, workflow ): + invocation = queue_invoke( trans, workflow, workflow_run_config, populate_state=populate_state ) + return [], invocation + else: + return __invoke( trans, workflow, workflow_run_config, workflow_invocation, populate_state ) + + +# Entry point for core workflow scheduler. +def schedule( trans, workflow, workflow_run_config, workflow_invocation ): + return __invoke( trans, workflow, workflow_run_config, workflow_invocation ) + + +BASIC_WORKFLOW_STEP_TYPES = [ None, "tool", "data_input", "data_collection_input" ] + + +def force_queue( trans, workflow ): + # Default behavior is still to just schedule workflows completley right + # away. This can be modified here in various ways. + config = trans.app.config + force_for_collection = config.force_beta_workflow_scheduled_for_collections + force_min_steps = config.force_beta_workflow_scheduled_min_steps + + step_count = len( workflow.steps ) + if step_count > force_min_steps: + log.info("Workflow has many steps %d, backgrounding execution" % step_count) + return True + for step in workflow.steps: + if step.type not in BASIC_WORKFLOW_STEP_TYPES: + log.info("Found non-basic workflow step type - backgrounding execution") + # Force all new beta modules types to be use force queueing of + # workflow. + return True + if step.type == "data_collection_input" and force_for_collection: + log.info("Found collection input step - backgrounding execution") + return True + + return False + + +def __invoke( trans, workflow, workflow_run_config, workflow_invocation=None, populate_state=False ): """ Run the supplied workflow in the supplied target_history. """ if populate_state: @@ -51,8 +92,9 @@ return outputs, invoker.workflow_invocation -def queue_invoke( trans, workflow, workflow_run_config, request_params ): - modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map ) +def queue_invoke( trans, workflow, workflow_run_config, request_params={}, populate_state=True ): + if populate_state: + modules.populate_module_and_state( trans, workflow, workflow_run_config.param_map ) workflow_invocation = workflow_run_config_to_request( trans, workflow_run_config, workflow ) workflow_invocation.workflow = workflow return trans.app.workflow_scheduling_manager.queue( diff -r 5324504e91756088158af4214bbe8592daa879ff -r 45eae5a7f3baff113f5d264a40117e991e083212 lib/galaxy/workflow/schedulers/core.py --- a/lib/galaxy/workflow/schedulers/core.py +++ b/lib/galaxy/workflow/schedulers/core.py @@ -36,7 +36,7 @@ request_context, workflow_invocation ) - run.invoke( + run.schedule( trans=request_context, workflow=workflow, workflow_run_config=workflow_run_config, diff -r 5324504e91756088158af4214bbe8592daa879ff -r 45eae5a7f3baff113f5d264a40117e991e083212 templates/webapps/galaxy/workflow/run_complete.mako --- a/templates/webapps/galaxy/workflow/run_complete.mako +++ b/templates/webapps/galaxy/workflow/run_complete.mako @@ -1,6 +1,7 @@ <%inherit file="/base.mako"/><div class="donemessagelarge"> +%if scheduled: Successfully ran workflow "${util.unicodify( workflow.name )}". The following datasets have been added to the queue: %for invocation in invocations: <div class="workflow-invocation-complete"> @@ -21,6 +22,11 @@ </div></div> %endfor +%else: + The requested workflows have been queued and datasets will appear + as jobs are created - you will need to refresh your history panel + to see these. +%endif </div><script type="text/javascript"> https://bitbucket.org/galaxy/galaxy-central/commits/114b22292fb6/ Changeset: 114b22292fb6 User: kellrott Date: 2014-11-13 18:47:40+00:00 Summary: Fixing id lookup on GET /api/workflows/<uuid>/usage Affected #: 1 file diff -r 45eae5a7f3baff113f5d264a40117e991e083212 -r 114b22292fb6a32b8c6102ff8554872f1977f2a7 lib/galaxy/webapps/galaxy/api/workflows.py --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -411,8 +411,8 @@ :raises: exceptions.MessageException, exceptions.ObjectNotFound """ - decoded_stored_workflow_invocation_id = self.__decode_id( trans, workflow_id ) - results = self.workflow_manager.build_invocations_query( trans, decoded_stored_workflow_invocation_id ) + stored_workflow = self.__get_stored_workflow(trans, workflow_id) + results = self.workflow_manager.build_invocations_query( trans, stored_workflow.id ) out = [] for r in results: out.append( self.__encode_invocation( trans, r ) ) https://bitbucket.org/galaxy/galaxy-central/commits/facd14607d28/ Changeset: facd14607d28 User: kellrott Date: 2014-11-13 18:47:41+00:00 Summary: Adding job state info to WorkflowInvocationStep to_dict Affected #: 1 file diff -r 114b22292fb6a32b8c6102ff8554872f1977f2a7 -r facd14607d28535545e460bd1f4c7a8ef6b8dff5 lib/galaxy/model/__init__.py --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -3208,6 +3208,7 @@ def to_dict( self, view='collection', value_mapper=None ): rval = super( WorkflowInvocationStep, self ).to_dict( view=view, value_mapper=value_mapper ) rval['order_index'] = self.workflow_step.order_index + rval['state'] = self.job.state if self.job is not None else None return rval Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.