galaxy-commits
Threads by month
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
August 2014
- 1 participants
- 159 discussions
commit/galaxy-central: dannon: Fix more typos in jobs/rule_helper.py, some new, some old.
by commits-noreply@bitbucket.org 18 Aug '14
by commits-noreply@bitbucket.org 18 Aug '14
18 Aug '14
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/4637dfed79d3/
Changeset: 4637dfed79d3
User: dannon
Date: 2014-08-18 21:16:13
Summary: Fix more typos in jobs/rule_helper.py, some new, some old.
Affected #: 1 file
diff -r e6d6d98adcf3af152d8da21fd28f0e73c0ee2205 -r 4637dfed79d37c05e3caa3e0d4d5b1d44f892904 lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -17,7 +17,7 @@
class RuleHelper( object ):
- """ Utillity to allow job rules to interface cleanly with the rest of
+ """ Utility to allow job rules to interface cleanly with the rest of
Galaxy and shield them from low-level details of models, metrics, etc....
Currently focus is on figuring out job statistics for a given user, but
@@ -127,7 +127,7 @@
See stock_rules for an simple example of using this function - but to
get the most out of it - it should probably be used with custom job
- rules that can respond to the bursting by allocatin resources,
+ rules that can respond to the bursting by allocating resources,
launching cloud nodes, etc....
"""
if job_states is None:
@@ -160,9 +160,9 @@
def job_hash( self, job, hash_by=None ):
""" Produce a reproducible hash for the given job on various
criteria - for instance if hash_by is "workflow_invocation,history" -
- all jobs within the same workflow invocation will recieve the same
+ all jobs within the same workflow invocation will receive the same
hash - for jobs outside of workflows all jobs within the same history
- will recieve the same hash, other jobs will be hashed on job's id
+ will receive the same hash, other jobs will be hashed on job's id
randomly.
Primarily intended for use with ``choose_one`` above - to consistent
@@ -176,7 +176,7 @@
if job_hash:
return job_hash
- # Fallback to just hashing by job id, should always return a value.
+ # Fall back to just hashing by job id, should always return a value.
return self._try_hash_for_job( job, "job" )
def _try_hash_for_job( self, job, hash_by ):
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dannon: Typo from pr 465 in job_conf sample commentary.
by commits-noreply@bitbucket.org 18 Aug '14
by commits-noreply@bitbucket.org 18 Aug '14
18 Aug '14
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/e6d6d98adcf3/
Changeset: e6d6d98adcf3
User: dannon
Date: 2014-08-18 21:13:44
Summary: Typo from pr 465 in job_conf sample commentary.
Affected #: 1 file
diff -r 3c9a169936720219b60be32dd790fa2f14a139cd -r e6d6d98adcf3af152d8da21fd28f0e73c0ee2205 job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -237,11 +237,11 @@
</destination><destination id="load_balance" runner="dynamic"><param id="type">choose_one</param>
- <!-- Randomly assign jobs to various static destinatin ids -->
+ <!-- Randomly assign jobs to various static destination ids --><param id="destination_ids">cluster1,cluster2,cluster3</param></destination><destination id="load_balance_with_data_locality" runner="dynamic">
- <!-- Randomly assign jobs to various static destinatin ids,
+ <!-- Randomly assign jobs to various static destination ids,
but keep jobs in the same workflow invocation together and
for those jobs ran outside of workflows keep jobs in same
history together.
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dannon: Merged in jmchilton/galaxy-central-fork-1 (pull request #465)
by commits-noreply@bitbucket.org 18 Aug '14
by commits-noreply@bitbucket.org 18 Aug '14
18 Aug '14
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/3c9a16993672/
Changeset: 3c9a16993672
User: dannon
Date: 2014-08-18 21:05:18
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #465)
Dynamic destination enhancements aimed at enabling co-locating data and (cloud) bursting.
Affected #: 10 files
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -235,6 +235,48 @@
<!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination>
+ <destination id="load_balance" runner="dynamic">
+ <param id="type">choose_one</param>
+ <!-- Randomly assign jobs to various static destinatin ids -->
+ <param id="destination_ids">cluster1,cluster2,cluster3</param>
+ </destination>
+ <destination id="load_balance_with_data_locality" runner="dynamic">
+ <!-- Randomly assign jobs to various static destinatin ids,
+ but keep jobs in the same workflow invocation together and
+ for those jobs ran outside of workflows keep jobs in same
+ history together.
+ -->
+ <param id="type">choose_one</param>
+ <param id="destination_ids">cluster1,cluster2,cluster3</param>
+ <param id="hash_by">workflow_invocation,history</param>
+ </destination>
+ <destination id="burst_out" runner="dynamic">
+ <!-- Burst out from static destination local_cluster_8_core to
+ static destination shared_cluster_8_core when there are about
+ 50 Galaxy jobs assigned to any of the local_cluster_XXX
+ destinations (either running or queued). If there are fewer
+ than 50 jobs, just use local_cluster_8_core destination.
+
+ Uncomment job_state parameter to make this bursting happen when
+ roughly 50 jobs are queued instead.
+ -->
+ <param id="type">burst</param>
+ <param id="from_destination_ids">local_cluster_8_core,local_cluster_1_core,local_cluster_16_core</param>
+ <param id="to_destination_id">shared_cluster_8_core</param>
+ <param id="num_jobs">50</param>
+ <!-- <param id="job_states">queued</param> -->
+ </destination>
+ <destination id="docker_dispatch" runner="dynamic">
+ <!-- Follow dynamic destination type will send all tool's that
+ support docker to static destination defined by
+ docker_destination_id (docker_cluster in this example) and all
+ other tools to default_destination_id (normal_cluster in this
+ example).
+ -->
+ <param id="type">docker_dispatch</param>
+ <param id="docker_destination_id">docker_cluster</param>
+ <param id="default_destination_id">normal_cluster</param>
+ </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -5,6 +5,8 @@
log = logging.getLogger( __name__ )
import galaxy.jobs.rules
+from galaxy.jobs import stock_rules
+
from .rule_helper import RuleHelper
DYNAMIC_RUNNER_NAME = "dynamic"
@@ -27,6 +29,13 @@
self.message = message
+STOCK_RULES = dict(
+ choose_one=stock_rules.choose_one,
+ burst=stock_rules.burst,
+ docker_dispatch=stock_rules.docker_dispatch,
+)
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
@@ -69,7 +78,7 @@
names.append( rule_module_name )
return names
- def __invoke_expand_function( self, expand_function ):
+ def __invoke_expand_function( self, expand_function, destination_params ):
function_arg_names = inspect.getargspec( expand_function ).args
app = self.job_wrapper.app
possible_args = {
@@ -83,6 +92,11 @@
actual_args = {}
+ # Send through any job_conf.xml defined args to function
+ for destination_param in destination_params.keys():
+ if destination_param in function_arg_names:
+ actual_args[ destination_param ] = destination_params[ destination_param ]
+
# Populate needed args
for possible_arg_name in possible_args:
if possible_arg_name in function_arg_names:
@@ -90,7 +104,7 @@
# Don't hit the DB to load the job object if not needed
require_db = False
- for param in ["job", "user", "user_email", "resource_params"]:
+ for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]:
if param in function_arg_names:
require_db = True
break
@@ -122,6 +136,11 @@
pass
actual_args[ "resource_params" ] = resource_params
+ if "workflow_invocation_uuid" in function_arg_names:
+ param_values = job.raw_param_dict( )
+ workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None )
+ actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid
+
return expand_function( **actual_args )
def __job_params( self, job ):
@@ -172,6 +191,7 @@
def __handle_dynamic_job_destination( self, destination ):
expand_type = destination.params.get('type', "python")
+ expand_function = None
if expand_type == "python":
expand_function_name = self.__determine_expand_function_name( destination )
if not expand_function_name:
@@ -179,12 +199,15 @@
raise Exception( message )
expand_function = self.__get_expand_function( expand_function_name )
- return self.__handle_rule( expand_function )
+ elif expand_type in STOCK_RULES:
+ expand_function = STOCK_RULES[ expand_type ]
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
- def __handle_rule( self, rule_function ):
- job_destination = self.__invoke_expand_function( rule_function )
+ return self.__handle_rule( expand_function, destination )
+
+ def __handle_rule( self, rule_function, destination ):
+ job_destination = self.__invoke_expand_function( rule_function, destination.params )
if not isinstance(job_destination, galaxy.jobs.JobDestination):
job_destination_rep = str(job_destination) # Should be either id or url
if '://' in job_destination_rep:
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -1,4 +1,6 @@
from datetime import datetime
+import hashlib
+import random
from sqlalchemy import (
and_,
@@ -6,10 +8,13 @@
)
from galaxy import model
+from galaxy import util
import logging
log = logging.getLogger( __name__ )
+VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"]
+
class RuleHelper( object ):
""" Utillity to allow job rules to interface cleanly with the rest of
@@ -22,6 +27,24 @@
def __init__( self, app ):
self.app = app
+ def supports_docker( self, job_or_tool ):
+ """ Job rules can pass this function a job, job_wrapper, or tool and
+ determine if the underlying tool believes it can be containered.
+ """
+ # Not a ton of logic in this method - but the idea is to shield rule
+ # developers from the details and they shouldn't have to know how to
+ # interrogate tool or job to figure out if it can be run in a
+ # container.
+ if hasattr( job_or_tool, 'containers' ):
+ tool = job_or_tool
+ elif hasattr( job_or_tool, 'tool' ):
+ # Have a JobWrapper-like
+ tool = job_or_tool.tool
+ else:
+ # Have a Job object.
+ tool = self.app.toolbox.get_tool( job_or_tool.tool_id )
+ return any( [ c.type == "docker" for c in tool.containers ] )
+
def job_count(
self,
**kwds
@@ -58,16 +81,23 @@
query,
for_user_email=None,
for_destination=None,
+ for_destinations=None,
for_job_states=None,
created_in_last=None,
updated_in_last=None,
):
+ if for_destination is not None:
+ for_destinations = [ for_destination ]
+
query = query.join( model.User )
if for_user_email is not None:
query = query.filter( model.User.table.c.email == for_user_email )
- if for_destination is not None:
- query = query.filter( model.Job.table.c.destination_id == for_destination )
+ if for_destinations is not None:
+ if len( for_destinations ) == 1:
+ query = query.filter( model.Job.table.c.destination_id == for_destinations[ 0 ] )
+ else:
+ query = query.filter( model.Job.table.c.destination_id.in_( for_destinations ) )
if created_in_last is not None:
end_date = datetime.now()
@@ -89,3 +119,81 @@
query = query.filter( model.Job.table.c.state.in_( for_job_states ) )
return query
+
+ def should_burst( self, destination_ids, num_jobs, job_states=None ):
+ """ Check if the specified destinations ``destination_ids`` have at
+ least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued``
+ to limit this check to number of jobs queued.
+
+ See stock_rules for an simple example of using this function - but to
+ get the most out of it - it should probably be used with custom job
+ rules that can respond to the bursting by allocatin resources,
+ launching cloud nodes, etc....
+ """
+ if job_states is None:
+ job_states = "queued,running"
+ from_destination_job_count = self.job_count(
+ for_destinations=destination_ids,
+ for_job_states=util.listify( job_states )
+ )
+ # Would this job push us over maximum job count before requiring
+ # bursting (roughly... very roughly given many handler threads may be
+ # scheduling jobs).
+ return ( from_destination_job_count + 1 ) > int( num_jobs )
+
+ def choose_one( self, lst, hash_value=None ):
+ """ Choose a random value from supplied list. If hash_value is passed
+ in then every request with that same hash_value would produce the same
+ choice from the supplied list.
+ """
+ if hash_value is None:
+ return random.choice( lst )
+
+ if not isinstance( hash_value, int ):
+ # Convert hash_value string into index
+ as_hex = hashlib.md5( hash_value ).hexdigest()
+ hash_value = int(as_hex, 16)
+ # else assumed to be 'random' int from 0-~Inf
+ random_index = hash_value % len( lst )
+ return lst[ random_index ]
+
+ def job_hash( self, job, hash_by=None ):
+ """ Produce a reproducible hash for the given job on various
+ criteria - for instance if hash_by is "workflow_invocation,history" -
+ all jobs within the same workflow invocation will recieve the same
+ hash - for jobs outside of workflows all jobs within the same history
+ will recieve the same hash, other jobs will be hashed on job's id
+ randomly.
+
+ Primarily intended for use with ``choose_one`` above - to consistent
+ route or schedule related jobs.
+ """
+ if hash_by is None:
+ hash_by = [ "job" ]
+ hash_bys = util.listify( hash_by )
+ for hash_by in hash_bys:
+ job_hash = self._try_hash_for_job( job, hash_by )
+ if job_hash:
+ return job_hash
+
+ # Fallback to just hashing by job id, should always return a value.
+ return self._try_hash_for_job( job, "job" )
+
+ def _try_hash_for_job( self, job, hash_by ):
+ """ May return False or None if hash type is invalid for that job -
+ e.g. attempting to hash by user for anonymous job or by workflow
+ invocation for jobs outside of workflows.
+ """
+ if hash_by not in VALID_JOB_HASH_STRATEGIES:
+ message = "Do not know how to hash jobs by %s, must be one of %s" % ( hash_by, VALID_JOB_HASH_STRATEGIES )
+ raise Exception( message )
+
+ if hash_by == "workflow_invocation":
+ return job.raw_param_dict().get( "__workflow_invocation_uuid__", None )
+ elif hash_by == "history":
+ return job.history_id
+ elif hash_by == "user":
+ user = job.user
+ return user and user.id
+ elif hash_by == "job":
+ return job.id
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/stock_rules.py
--- /dev/null
+++ b/lib/galaxy/jobs/stock_rules.py
@@ -0,0 +1,25 @@
+""" Stock job 'dynamic' rules for use in job_conf.xml - these may cover some
+simple use cases but will just proxy into functions in rule_helper so similar
+functionality - but more tailored and composable can be utilized in custom
+rules.
+"""
+
+from galaxy import util
+
+
+def choose_one( rule_helper, job, destination_ids, hash_by="job" ):
+ destination_id_list = util.listify( destination_ids )
+ job_hash = rule_helper.job_hash( job, hash_by )
+ return rule_helper.choose_one( destination_id_list, hash_value=job_hash )
+
+
+def burst( rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None):
+ from_destination_ids = util.listify( from_destination_ids )
+ if rule_helper.should_burst( from_destination_ids, num_jobs=num_jobs, job_states=job_states ):
+ return to_destination_id
+ else:
+ return from_destination_ids[ 0 ]
+
+
+def docker_dispatch( rule_helper, tool, docker_destination_id, default_destination_id ):
+ return docker_destination_id if rule_helper.supports_docker( tool ) else default_destination_id
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -482,10 +482,13 @@
Read encoded parameter values from the database and turn back into a
dict of tool parameter values.
"""
- param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
+ param_dict = self.raw_param_dict()
tool = app.toolbox.get_tool( self.tool_id )
param_dict = tool.params_from_strings( param_dict, app, ignore_errors=ignore_errors )
return param_dict
+ def raw_param_dict( self ):
+ param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
+ return param_dict
def check_if_output_datasets_deleted( self ):
"""
Return true if all of the output datasets associated with this job are
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/tools/execute.py
--- a/lib/galaxy/tools/execute.py
+++ b/lib/galaxy/tools/execute.py
@@ -10,13 +10,19 @@
log = logging.getLogger( __name__ )
-def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None ):
+def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None, workflow_invocation_uuid=None ):
"""
Execute a tool and return object containing summary (output data, number of
failures, etc...).
"""
execution_tracker = ToolExecutionTracker( tool, param_combinations, collection_info )
for params in execution_tracker.param_combinations:
+ if workflow_invocation_uuid:
+ params[ '__workflow_invocation_uuid__' ] = workflow_invocation_uuid
+ elif '__workflow_invocation_uuid__' in params:
+ # Only workflow invocation code gets to set this, ignore user supplied
+ # values or rerun parameters.
+ del params[ '__workflow_invocation_uuid__' ]
job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history )
if job:
execution_tracker.record_success( job, result )
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/workflow/run.py
--- a/lib/galaxy/workflow/run.py
+++ b/lib/galaxy/workflow/run.py
@@ -1,3 +1,5 @@
+import uuid
+
from galaxy import model
from galaxy import exceptions
from galaxy import util
@@ -41,6 +43,8 @@
self.param_map = workflow_run_config.param_map
self.outputs = odict()
+ # TODO: Attach to actual model object and persist someday...
+ self.invocation_uuid = uuid.uuid1().hex
def invoke( self ):
workflow_invocation = model.WorkflowInvocation()
@@ -132,6 +136,7 @@
param_combinations=param_combinations,
history=self.target_history,
collection_info=collection_info,
+ workflow_invocation_uuid=self.invocation_uuid
)
if collection_info:
outputs[ step.id ] = dict( execution_tracker.created_collections )
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_mapper.py
--- a/test/unit/jobs/test_mapper.py
+++ b/test/unit/jobs/test_mapper.py
@@ -1,3 +1,5 @@
+import uuid
+
import jobs.test_rules
from galaxy.jobs.mapper import (
@@ -9,7 +11,7 @@
from galaxy.util import bunch
-
+WORKFLOW_UUID = uuid.uuid1().hex
TOOL_JOB_DESTINATION = JobDestination()
DYNAMICALLY_GENERATED_DESTINATION = JobDestination()
@@ -46,6 +48,12 @@
assert mapper.job_config.rule_response == "tool1_dest_id"
+def test_dynamic_mapping_job_conf_params():
+ mapper = __mapper( __dynamic_destination( dict( function="check_job_conf_params", param1="7" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "sent_7_dest_id"
+
+
def test_dynamic_mapping_function_parameters():
mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) )
assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
@@ -58,6 +66,12 @@
assert mapper.job_config.rule_response == "have_resource_params"
+def test_dynamic_mapping_workflow_invocation_parameter():
+ mapper = __mapper( __dynamic_destination( dict( function="check_workflow_invocation_uuid" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == WORKFLOW_UUID
+
+
def test_dynamic_mapping_no_function():
dest = __dynamic_destination( dict( ) )
mapper = __mapper( dest )
@@ -124,21 +138,26 @@
return True
def get_job(self):
+ raw_params = {
+ "threshold": 8,
+ "__workflow_invocation_uuid__": WORKFLOW_UUID,
+ }
+
def get_param_values( app, ignore_errors ):
assert app == self.app
- return {
- "threshold": 8,
- "__job_resource": {
- "__job_resource__select": "True",
- "memory": "8gb"
- }
+ params = raw_params.copy()
+ params[ "__job_resource" ] = {
+ "__job_resource__select": "True",
+ "memory": "8gb"
}
+ return params
return bunch.Bunch(
user=bunch.Bunch(
id=6789,
email="test(a)example.com"
),
+ raw_param_dict=lambda: raw_params,
get_param_values=get_param_values
)
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rule_helper.py
--- a/test/unit/jobs/test_rule_helper.py
+++ b/test/unit/jobs/test_rule_helper.py
@@ -1,3 +1,5 @@
+import uuid
+
from galaxy.util import bunch
from galaxy import model
from galaxy.model import mapping
@@ -24,9 +26,12 @@
__assert_job_count_is( 2, rule_helper, for_destination="local" )
__assert_job_count_is( 7, rule_helper, for_destination="cluster1" )
+ __assert_job_count_is( 9, rule_helper, for_destinations=["cluster1", "local"] )
+
# Test per user destination counts
__assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 )
__assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 )
+ __assert_job_count_is( 7, rule_helper, for_destinations=["cluster1", "local"], for_user_email=USER_EMAIL_1 )
__assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 )
__assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 )
@@ -69,10 +74,98 @@
app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) )
-def __rule_helper():
- app = MockApp()
- rule_helper = RuleHelper( app )
- return rule_helper
+def test_choose_one_unhashed():
+ rule_helper = __rule_helper()
+
+ # Random choices if hash not set.
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'])) )
+
+ assert chosen_ones == set(['a', 'b'])
+
+
+def test_choose_one_hashed():
+ rule_helper = __rule_helper()
+
+ # Hashed, so all choosen ones should be the same...
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value=1234)) )
+ assert len( chosen_ones ) == 1
+
+ # ... also can verify hashing on strings
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value="i am a string")) )
+
+ assert len( chosen_ones ) == 1
+
+
+def test_job_hash_unique_by_default( ):
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ rule_helper.job_hash( job1 ) != rule_helper.job_hash( job2 )
+
+
+def test_job_hash_history( ):
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="history" )
+
+
+def test_job_hash_workflow_invocation():
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs()
+ wi_uuid = uuid.uuid1().hex
+
+ job1.add_parameter( "__workflow_invocation_uuid__", wi_uuid )
+ job2.add_parameter( "__workflow_invocation_uuid__", wi_uuid )
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation" )
+
+
+def test_job_hash_fallback():
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" )
+
+
+def test_should_burst( ):
+ rule_helper = __rule_helper()
+ __setup_fixtures( rule_helper.app )
+ # cluster1 fixture has 4 queued jobs, 3 running
+ assert rule_helper.should_burst( [ "cluster1" ], "7" )
+ assert not rule_helper.should_burst( [ "cluster1" ], "10" )
+
+ assert rule_helper.should_burst( [ "cluster1" ], "2", job_states="queued" )
+ assert not rule_helper.should_burst( [ "cluster1" ], "6", job_states="queued" )
+
+
+def __assert_same_hash( rule_helper, job1, job2, hash_by ):
+ job1_hash = rule_helper.job_hash( job1, hash_by=hash_by )
+ job2_hash = rule_helper.job_hash( job2, hash_by=hash_by )
+ assert job1_hash == job2_hash
+
+
+def __two_jobs_in_a_history():
+ job1, job2 = __two_jobs()
+ job1.history_id = 4
+ job2.history_id = 4
+ return job1, job2
+
+
+def __two_jobs( ):
+ job1 = model.Job()
+ job1.id = 1
+ job2 = model.Job()
+ job2.id = 2
+ return job1, job2
+
+
+def __do_a_bunch( work ):
+ for i in range( 20 ):
+ work()
def __new_job( **kwds ):
@@ -82,6 +175,12 @@
return job
+def __rule_helper():
+ app = MockApp()
+ rule_helper = RuleHelper( app )
+ return rule_helper
+
+
class MockApp( object ):
def __init__( self ):
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rules/10_site.py
--- a/test/unit/jobs/test_rules/10_site.py
+++ b/test/unit/jobs/test_rules/10_site.py
@@ -40,6 +40,15 @@
return "all_passed"
+def check_job_conf_params( param1 ):
+ assert param1 == "7"
+ return "sent_7_dest_id"
+
+
def check_resource_params( resource_params ):
assert resource_params["memory"] == "8gb"
return "have_resource_params"
+
+
+def check_workflow_invocation_uuid( workflow_invocation_uuid ):
+ return workflow_invocation_uuid
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
7 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/40d8d9e3ebd5/
Changeset: 40d8d9e3ebd5
User: jmchilton
Date: 2014-08-18 00:43:54
Summary: Dynamic destinations - extend RuleHelper for reasoning about groups of destinations...
If multiple destinations map to the same underlying resource (a very typical case), this could allow rule developer to reason about cluster as a whole - though perhaps clunkily - by supplying all destination ids mapping to that cluster.
Affected #: 2 files
diff -r 572a8d5ee218aeb37f77660948cb75a439936868 -r 40d8d9e3ebd57820aac97115950818d97019ba17 lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -58,16 +58,23 @@
query,
for_user_email=None,
for_destination=None,
+ for_destinations=None,
for_job_states=None,
created_in_last=None,
updated_in_last=None,
):
+ if for_destination is not None:
+ for_destinations = [ for_destination ]
+
query = query.join( model.User )
if for_user_email is not None:
query = query.filter( model.User.table.c.email == for_user_email )
- if for_destination is not None:
- query = query.filter( model.Job.table.c.destination_id == for_destination )
+ if for_destinations is not None:
+ if len( for_destinations ) == 1:
+ query = query.filter( model.Job.table.c.destination_id == for_destinations[ 0 ] )
+ else:
+ query = query.filter( model.Job.table.c.destination_id.in_( for_destinations ) )
if created_in_last is not None:
end_date = datetime.now()
diff -r 572a8d5ee218aeb37f77660948cb75a439936868 -r 40d8d9e3ebd57820aac97115950818d97019ba17 test/unit/jobs/test_rule_helper.py
--- a/test/unit/jobs/test_rule_helper.py
+++ b/test/unit/jobs/test_rule_helper.py
@@ -24,9 +24,12 @@
__assert_job_count_is( 2, rule_helper, for_destination="local" )
__assert_job_count_is( 7, rule_helper, for_destination="cluster1" )
+ __assert_job_count_is( 9, rule_helper, for_destinations=["cluster1", "local"] )
+
# Test per user destination counts
__assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 )
__assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 )
+ __assert_job_count_is( 7, rule_helper, for_destinations=["cluster1", "local"], for_user_email=USER_EMAIL_1 )
__assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 )
__assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 )
https://bitbucket.org/galaxy/galaxy-central/commits/ae77e648918a/
Changeset: ae77e648918a
User: jmchilton
Date: 2014-08-18 00:43:54
Summary: Dynamic destinations - pass job_conf.xml params to rule functions.
In other words, send extra job destination parameters to dynamic rule functions as arguments (in addition to those dynamically populated by Galaxy itself). This enables greater parameterization of rule functions and should lead to cleaner separation of logic and data (i.e. sites can program rules that restrict access to users, but which users can be populated at a higher level in `job_conf.xml`).
For example the following dynamic job rule:
def cluster1(app, memory="4096", cores="1", hours="48"):
native_spec = "--time=%s:00:00 --nodes=1 --ntasks=%s --mem=%s" % ( hours, cores, memory )
return JobDestination( "cluster1", params=dict( native_specification=native_spec ) )
Could then be called with various parameters in job_conf.xml as follows:
<destination id="short_job" type="dyanmic"><param id="function">cluster1</param><param id="hours">1</param></destination><destination id="big_job" type="dynamic"><param id="function">cluster1</param><param id="cores">8</param><param id="memory">32768</param></destination>
Affected #: 3 files
diff -r 40d8d9e3ebd57820aac97115950818d97019ba17 -r ae77e648918aa881618e196bd6b036ae4095b6db lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -69,7 +69,7 @@
names.append( rule_module_name )
return names
- def __invoke_expand_function( self, expand_function ):
+ def __invoke_expand_function( self, expand_function, destination_params ):
function_arg_names = inspect.getargspec( expand_function ).args
app = self.job_wrapper.app
possible_args = {
@@ -83,6 +83,11 @@
actual_args = {}
+ # Send through any job_conf.xml defined args to function
+ for destination_param in destination_params.keys():
+ if destination_param in function_arg_names:
+ actual_args[ destination_param ] = destination_params[ destination_param ]
+
# Populate needed args
for possible_arg_name in possible_args:
if possible_arg_name in function_arg_names:
@@ -179,12 +184,12 @@
raise Exception( message )
expand_function = self.__get_expand_function( expand_function_name )
- return self.__handle_rule( expand_function )
+ return self.__handle_rule( expand_function, destination )
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
- def __handle_rule( self, rule_function ):
- job_destination = self.__invoke_expand_function( rule_function )
+ def __handle_rule( self, rule_function, destination ):
+ job_destination = self.__invoke_expand_function( rule_function, destination.params )
if not isinstance(job_destination, galaxy.jobs.JobDestination):
job_destination_rep = str(job_destination) # Should be either id or url
if '://' in job_destination_rep:
diff -r 40d8d9e3ebd57820aac97115950818d97019ba17 -r ae77e648918aa881618e196bd6b036ae4095b6db test/unit/jobs/test_mapper.py
--- a/test/unit/jobs/test_mapper.py
+++ b/test/unit/jobs/test_mapper.py
@@ -46,6 +46,12 @@
assert mapper.job_config.rule_response == "tool1_dest_id"
+def test_dynamic_mapping_job_conf_params():
+ mapper = __mapper( __dynamic_destination( dict( function="check_job_conf_params", param1="7" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "sent_7_dest_id"
+
+
def test_dynamic_mapping_function_parameters():
mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) )
assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
diff -r 40d8d9e3ebd57820aac97115950818d97019ba17 -r ae77e648918aa881618e196bd6b036ae4095b6db test/unit/jobs/test_rules/10_site.py
--- a/test/unit/jobs/test_rules/10_site.py
+++ b/test/unit/jobs/test_rules/10_site.py
@@ -40,6 +40,11 @@
return "all_passed"
+def check_job_conf_params( param1 ):
+ assert param1 == "7"
+ return "sent_7_dest_id"
+
+
def check_resource_params( resource_params ):
assert resource_params["memory"] == "8gb"
return "have_resource_params"
https://bitbucket.org/galaxy/galaxy-central/commits/ec6a5e37d428/
Changeset: ec6a5e37d428
User: jmchilton
Date: 2014-08-18 00:43:54
Summary: Dynamic destinations - allow dispatching on workflow invocation.
Generate a workflow invocation UUID that is stored with each job in the workflow and allow dynamic job destinations to consume these. Should allow for grouping jobs from the same workflow together during resource allocation (a sort of first attempt at dealing with data locality in workflows).
Affected #: 6 files
diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -95,7 +95,7 @@
# Don't hit the DB to load the job object if not needed
require_db = False
- for param in ["job", "user", "user_email", "resource_params"]:
+ for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]:
if param in function_arg_names:
require_db = True
break
@@ -127,6 +127,11 @@
pass
actual_args[ "resource_params" ] = resource_params
+ if "workflow_invocation_uuid" in function_arg_names:
+ param_values = job.raw_param_dict( )
+ workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None )
+ actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid
+
return expand_function( **actual_args )
def __job_params( self, job ):
diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -482,10 +482,13 @@
Read encoded parameter values from the database and turn back into a
dict of tool parameter values.
"""
- param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
+ param_dict = self.raw_param_dict()
tool = app.toolbox.get_tool( self.tool_id )
param_dict = tool.params_from_strings( param_dict, app, ignore_errors=ignore_errors )
return param_dict
+ def raw_param_dict( self ):
+ param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
+ return param_dict
def check_if_output_datasets_deleted( self ):
"""
Return true if all of the output datasets associated with this job are
diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/tools/execute.py
--- a/lib/galaxy/tools/execute.py
+++ b/lib/galaxy/tools/execute.py
@@ -10,13 +10,19 @@
log = logging.getLogger( __name__ )
-def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None ):
+def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None, workflow_invocation_uuid=None ):
"""
Execute a tool and return object containing summary (output data, number of
failures, etc...).
"""
execution_tracker = ToolExecutionTracker( tool, param_combinations, collection_info )
for params in execution_tracker.param_combinations:
+ if workflow_invocation_uuid:
+ params[ '__workflow_invocation_uuid__' ] = workflow_invocation_uuid
+ elif '__workflow_invocation_uuid__' in params:
+ # Only workflow invocation code gets to set this, ignore user supplied
+ # values or rerun parameters.
+ del params[ '__workflow_invocation_uuid__' ]
job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history )
if job:
execution_tracker.record_success( job, result )
diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd lib/galaxy/workflow/run.py
--- a/lib/galaxy/workflow/run.py
+++ b/lib/galaxy/workflow/run.py
@@ -1,3 +1,5 @@
+import uuid
+
from galaxy import model
from galaxy import exceptions
from galaxy import util
@@ -41,6 +43,8 @@
self.param_map = workflow_run_config.param_map
self.outputs = odict()
+ # TODO: Attach to actual model object and persist someday...
+ self.invocation_uuid = uuid.uuid1().hex
def invoke( self ):
workflow_invocation = model.WorkflowInvocation()
@@ -132,6 +136,7 @@
param_combinations=param_combinations,
history=self.target_history,
collection_info=collection_info,
+ workflow_invocation_uuid=self.invocation_uuid
)
if collection_info:
outputs[ step.id ] = dict( execution_tracker.created_collections )
diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd test/unit/jobs/test_mapper.py
--- a/test/unit/jobs/test_mapper.py
+++ b/test/unit/jobs/test_mapper.py
@@ -1,3 +1,5 @@
+import uuid
+
import jobs.test_rules
from galaxy.jobs.mapper import (
@@ -9,7 +11,7 @@
from galaxy.util import bunch
-
+WORKFLOW_UUID = uuid.uuid1().hex
TOOL_JOB_DESTINATION = JobDestination()
DYNAMICALLY_GENERATED_DESTINATION = JobDestination()
@@ -64,6 +66,12 @@
assert mapper.job_config.rule_response == "have_resource_params"
+def test_dynamic_mapping_workflow_invocation_parameter():
+ mapper = __mapper( __dynamic_destination( dict( function="check_workflow_invocation_uuid" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == WORKFLOW_UUID
+
+
def test_dynamic_mapping_no_function():
dest = __dynamic_destination( dict( ) )
mapper = __mapper( dest )
@@ -130,21 +138,26 @@
return True
def get_job(self):
+ raw_params = {
+ "threshold": 8,
+ "__workflow_invocation_uuid__": WORKFLOW_UUID,
+ }
+
def get_param_values( app, ignore_errors ):
assert app == self.app
- return {
- "threshold": 8,
- "__job_resource": {
- "__job_resource__select": "True",
- "memory": "8gb"
- }
+ params = raw_params.copy()
+ params[ "__job_resource" ] = {
+ "__job_resource__select": "True",
+ "memory": "8gb"
}
+ return params
return bunch.Bunch(
user=bunch.Bunch(
id=6789,
email="test(a)example.com"
),
+ raw_param_dict=lambda: raw_params,
get_param_values=get_param_values
)
diff -r ae77e648918aa881618e196bd6b036ae4095b6db -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd test/unit/jobs/test_rules/10_site.py
--- a/test/unit/jobs/test_rules/10_site.py
+++ b/test/unit/jobs/test_rules/10_site.py
@@ -48,3 +48,7 @@
def check_resource_params( resource_params ):
assert resource_params["memory"] == "8gb"
return "have_resource_params"
+
+
+def check_workflow_invocation_uuid( workflow_invocation_uuid ):
+ return workflow_invocation_uuid
https://bitbucket.org/galaxy/galaxy-central/commits/05b550588d9d/
Changeset: 05b550588d9d
User: jmchilton
Date: 2014-08-18 00:43:54
Summary: Dynamic destinations - easier, high-level reasoning about data locality.
... if only in a limitted sort of way. Provide ability to hash jobs by history, workflow, user, etc... and choose among various destinations semi-randomly based on that to distribute work across destinations based on these factors.
Add a stock rule as in the form of a new dynamic destination type ("choose_one") that demonstrate using this to quickly proxy to some fixed static destinations. Add examples to job_conf.xml.sample_advanced.
Affected #: 5 files
diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -235,6 +235,21 @@
<!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination>
+ <destination id="load_balance" runner="dynamic">
+ <param id="type">choose_one</param>
+ <!-- Randomly assign jobs to various static destinatin ids -->
+ <param id="destination_ids">cluster1,cluster2,cluster3</param>
+ </destination>
+ <destination id="load_balance_with_data_locality" runner="dynamic">
+ <!-- Randomly assign jobs to various static destinatin ids,
+ but keep jobs in the same workflow invocation together and
+ for those jobs ran outside of workflows keep jobs in same
+ history together.
+ -->
+ <param id="type">choose_one</param>
+ <param id="destination_ids">cluster1,cluster2,cluster3</param>
+ <param id="hash_by">workflow_invocation,history</param>
+ </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's
diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -5,6 +5,8 @@
log = logging.getLogger( __name__ )
import galaxy.jobs.rules
+from galaxy.jobs import stock_rules
+
from .rule_helper import RuleHelper
DYNAMIC_RUNNER_NAME = "dynamic"
@@ -27,6 +29,11 @@
self.message = message
+STOCK_RULES = dict(
+ choose_one=stock_rules.choose_one
+)
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
@@ -182,6 +189,7 @@
def __handle_dynamic_job_destination( self, destination ):
expand_type = destination.params.get('type', "python")
+ expand_function = None
if expand_type == "python":
expand_function_name = self.__determine_expand_function_name( destination )
if not expand_function_name:
@@ -189,10 +197,13 @@
raise Exception( message )
expand_function = self.__get_expand_function( expand_function_name )
- return self.__handle_rule( expand_function, destination )
+ elif expand_type in STOCK_RULES:
+ expand_function = STOCK_RULES[ expand_type ]
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
+ return self.__handle_rule( expand_function, destination )
+
def __handle_rule( self, rule_function, destination ):
job_destination = self.__invoke_expand_function( rule_function, destination.params )
if not isinstance(job_destination, galaxy.jobs.JobDestination):
diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -1,4 +1,6 @@
from datetime import datetime
+import hashlib
+import random
from sqlalchemy import (
and_,
@@ -6,10 +8,13 @@
)
from galaxy import model
+from galaxy import util
import logging
log = logging.getLogger( __name__ )
+VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"]
+
class RuleHelper( object ):
""" Utillity to allow job rules to interface cleanly with the rest of
@@ -96,3 +101,60 @@
query = query.filter( model.Job.table.c.state.in_( for_job_states ) )
return query
+
+ def choose_one( self, lst, hash_value=None ):
+ """ Choose a random value from supplied list. If hash_value is passed
+ in then every request with that same hash_value would produce the same
+ choice from the supplied list.
+ """
+ if hash_value is None:
+ return random.choice( lst )
+
+ if not isinstance( hash_value, int ):
+ # Convert hash_value string into index
+ as_hex = hashlib.md5( hash_value ).hexdigest()
+ hash_value = int(as_hex, 16)
+ # else assumed to be 'random' int from 0-~Inf
+ random_index = hash_value % len( lst )
+ return lst[ random_index ]
+
+ def job_hash( self, job, hash_by=None ):
+ """ Produce a reproducible hash for the given job on various
+ criteria - for instance if hash_by is "workflow_invocation,history" -
+ all jobs within the same workflow invocation will recieve the same
+ hash - for jobs outside of workflows all jobs within the same history
+ will recieve the same hash, other jobs will be hashed on job's id
+ randomly.
+
+ Primarily intended for use with ``choose_one`` above - to consistent
+ route or schedule related jobs.
+ """
+ if hash_by is None:
+ hash_by = [ "job" ]
+ hash_bys = util.listify( hash_by )
+ for hash_by in hash_bys:
+ job_hash = self._try_hash_for_job( job, hash_by )
+ if job_hash:
+ return job_hash
+
+ # Fallback to just hashing by job id, should always return a value.
+ return self._try_hash_for_job( job, "job" )
+
+ def _try_hash_for_job( self, job, hash_by ):
+ """ May return False or None if hash type is invalid for that job -
+ e.g. attempting to hash by user for anonymous job or by workflow
+ invocation for jobs outside of workflows.
+ """
+ if hash_by not in VALID_JOB_HASH_STRATEGIES:
+ message = "Do not know how to hash jobs by %s, must be one of %s" % ( hash_by, VALID_JOB_HASH_STRATEGIES )
+ raise Exception( message )
+
+ if hash_by == "workflow_invocation":
+ return job.raw_param_dict().get( "__workflow_invocation_uuid__", None )
+ elif hash_by == "history":
+ return job.history_id
+ elif hash_by == "user":
+ user = job.user
+ return user and user.id
+ elif hash_by == "job":
+ return job.id
diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc lib/galaxy/jobs/stock_rules.py
--- /dev/null
+++ b/lib/galaxy/jobs/stock_rules.py
@@ -0,0 +1,13 @@
+""" Stock job 'dynamic' rules for use in job_conf.xml - these may cover some
+simple use cases but will just proxy into functions in rule_helper so similar
+functionality - but more tailored and composable can be utilized in custom
+rules.
+"""
+
+from galaxy import util
+
+
+def choose_one( rule_helper, job, destination_ids, hash_by="job" ):
+ destination_id_list = util.listify( destination_ids )
+ job_hash = rule_helper.job_hash( job, hash_by )
+ return rule_helper.choose_one( destination_id_list, hash_value=job_hash )
diff -r ec6a5e37d428cc5d97b4d8f65487a7505a0ea5fd -r 05b550588d9dcf1502f141e7f1c5c896930e64bc test/unit/jobs/test_rule_helper.py
--- a/test/unit/jobs/test_rule_helper.py
+++ b/test/unit/jobs/test_rule_helper.py
@@ -1,3 +1,5 @@
+import uuid
+
from galaxy.util import bunch
from galaxy import model
from galaxy.model import mapping
@@ -72,10 +74,87 @@
app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) )
-def __rule_helper():
- app = MockApp()
- rule_helper = RuleHelper( app )
- return rule_helper
+def test_choose_one_unhashed():
+ rule_helper = __rule_helper()
+
+ # Random choices if hash not set.
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'])) )
+
+ assert chosen_ones == set(['a', 'b'])
+
+
+def test_choose_one_hashed():
+ rule_helper = __rule_helper()
+
+ # Hashed, so all choosen ones should be the same...
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value=1234)) )
+ assert len( chosen_ones ) == 1
+
+ # ... also can verify hashing on strings
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value="i am a string")) )
+
+ assert len( chosen_ones ) == 1
+
+
+def test_job_hash_unique_by_default( ):
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ rule_helper.job_hash( job1 ) != rule_helper.job_hash( job2 )
+
+
+def test_job_hash_history( ):
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="history" )
+
+
+def test_job_hash_workflow_invocation():
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs()
+ wi_uuid = uuid.uuid1().hex
+
+ job1.add_parameter( "__workflow_invocation_uuid__", wi_uuid )
+ job2.add_parameter( "__workflow_invocation_uuid__", wi_uuid )
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation" )
+
+
+def test_job_hash_fallback():
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" )
+
+
+def __assert_same_hash( rule_helper, job1, job2, hash_by ):
+ job1_hash = rule_helper.job_hash( job1, hash_by=hash_by )
+ job2_hash = rule_helper.job_hash( job2, hash_by=hash_by )
+ assert job1_hash == job2_hash
+
+
+def __two_jobs_in_a_history():
+ job1, job2 = __two_jobs()
+ job1.history_id = 4
+ job2.history_id = 4
+ return job1, job2
+
+
+def __two_jobs( ):
+ job1 = model.Job()
+ job1.id = 1
+ job2 = model.Job()
+ job2.id = 2
+ return job1, job2
+
+
+def __do_a_bunch( work ):
+ for i in range( 20 ):
+ work()
def __new_job( **kwds ):
@@ -85,6 +164,12 @@
return job
+def __rule_helper():
+ app = MockApp()
+ rule_helper = RuleHelper( app )
+ return rule_helper
+
+
class MockApp( object ):
def __init__( self ):
https://bitbucket.org/galaxy/galaxy-central/commits/8a56397528dc/
Changeset: 8a56397528dc
User: jmchilton
Date: 2014-08-18 00:43:54
Summary: Dynamic destinations - easier, high-level reasoning about bursting.
Add utility to RuleHelper to simplify bursting reasoning slightly, setup a stock rule for bursting between two static job destinations - mostly for demonstration purprose but it might be useful in some settings.
Affected #: 5 files
diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -250,6 +250,22 @@
<param id="destination_ids">cluster1,cluster2,cluster3</param><param id="hash_by">workflow_invocation,history</param></destination>
+ <destination id="burst_out" runner="dynamic">
+ <!-- Burst out from static destination local_cluster_8_core to
+ static destination shared_cluster_8_core when there are about
+ 50 Galaxy jobs assigned to any of the local_cluster_XXX
+ destinations (either running or queued). If there are fewer
+ than 50 jobs, just use local_cluster_8_core destination.
+
+ Uncomment job_state parameter to make this bursting happen when
+ roughly 50 jobs are queued instead.
+ -->
+ <param id="type">burst</param>
+ <param id="from_destination_ids">local_cluster_8_core,local_cluster_1_core,local_cluster_16_core</param>
+ <param id="to_destination_id">shared_cluster_8_core</param>
+ <param id="num_jobs">50</param>
+ <!-- <param id="job_states">queued</param> -->
+ </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's
diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -30,7 +30,8 @@
STOCK_RULES = dict(
- choose_one=stock_rules.choose_one
+ choose_one=stock_rules.choose_one,
+ burst=stock_rules.burst
)
diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -102,6 +102,27 @@
return query
+ def should_burst( self, destination_ids, num_jobs, job_states=None ):
+ """ Check if the specified destinations ``destination_ids`` have at
+ least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued``
+ to limit this check to number of jobs queued.
+
+ See stock_rules for an simple example of using this function - but to
+ get the most out of it - it should probably be used with custom job
+ rules that can respond to the bursting by allocatin resources,
+ launching cloud nodes, etc....
+ """
+ if job_states is None:
+ job_states = "queued,running"
+ from_destination_job_count = self.job_count(
+ for_destinations=destination_ids,
+ for_job_states=util.listify( job_states )
+ )
+ # Would this job push us over maximum job count before requiring
+ # bursting (roughly... very roughly given many handler threads may be
+ # scheduling jobs).
+ return ( from_destination_job_count + 1 ) > int( num_jobs )
+
def choose_one( self, lst, hash_value=None ):
""" Choose a random value from supplied list. If hash_value is passed
in then every request with that same hash_value would produce the same
diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 lib/galaxy/jobs/stock_rules.py
--- a/lib/galaxy/jobs/stock_rules.py
+++ b/lib/galaxy/jobs/stock_rules.py
@@ -11,3 +11,11 @@
destination_id_list = util.listify( destination_ids )
job_hash = rule_helper.job_hash( job, hash_by )
return rule_helper.choose_one( destination_id_list, hash_value=job_hash )
+
+
+def burst( rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None):
+ from_destination_ids = util.listify( from_destination_ids )
+ if rule_helper.should_burst( from_destination_ids, num_jobs=num_jobs, job_states=job_states ):
+ return to_destination_id
+ else:
+ return from_destination_ids[ 0 ]
diff -r 05b550588d9dcf1502f141e7f1c5c896930e64bc -r 8a56397528dcf8c99862394399db973f926690d2 test/unit/jobs/test_rule_helper.py
--- a/test/unit/jobs/test_rule_helper.py
+++ b/test/unit/jobs/test_rule_helper.py
@@ -131,6 +131,17 @@
__assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" )
+def test_should_burst( ):
+ rule_helper = __rule_helper()
+ __setup_fixtures( rule_helper.app )
+ # cluster1 fixture has 4 queued jobs, 3 running
+ assert rule_helper.should_burst( [ "cluster1" ], "7" )
+ assert not rule_helper.should_burst( [ "cluster1" ], "10" )
+
+ assert rule_helper.should_burst( [ "cluster1" ], "2", job_states="queued" )
+ assert not rule_helper.should_burst( [ "cluster1" ], "6", job_states="queued" )
+
+
def __assert_same_hash( rule_helper, job1, job2, hash_by ):
job1_hash = rule_helper.job_hash( job1, hash_by=hash_by )
job2_hash = rule_helper.job_hash( job2, hash_by=hash_by )
https://bitbucket.org/galaxy/galaxy-central/commits/d0d386587231/
Changeset: d0d386587231
User: jmchilton
Date: 2014-08-18 00:43:54
Summary: Dynamic destinations - easier, high-level reasoning about docker tool support.
Added supports_docker function to RuleHelper for use in dynamic destination rules.
Add a very simple stock dynamic destination type (docker_dispatcher) that will send all jobs into either a docker enabled destination or a vanilla destination depending on whether the tool being mapped supports docker (at this time that means is annotated explicitly with a container id).
Affected #: 4 files
diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -266,6 +266,17 @@
<param id="num_jobs">50</param><!-- <param id="job_states">queued</param> --></destination>
+ <destination id="docker_dispatch" runner="dynamic">
+ <!-- Follow dynamic destination type will send all tool's that
+ support docker to static destination defined by
+ docker_destination_id (docker_cluster in this example) and all
+ other tools to default_destination_id (normal_cluster in this
+ example).
+ -->
+ <param id="type">docker_dispatch</param>
+ <param id="docker_destination_id">docker_cluster</param>
+ <param id="default_destination_id">normal_cluster</param>
+ </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's
diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -31,7 +31,8 @@
STOCK_RULES = dict(
choose_one=stock_rules.choose_one,
- burst=stock_rules.burst
+ burst=stock_rules.burst,
+ docker_dispatch=stock_rules.docker_dispatch,
)
diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -27,6 +27,24 @@
def __init__( self, app ):
self.app = app
+ def supports_docker( self, job_or_tool ):
+ """ Job rules can pass this function a job, job_wrapper, or tool and
+ determine if the underlying tool believes it can be containered.
+ """
+ # Not a ton of logic in this method - but the idea is to shield rule
+ # developers from the details and they shouldn't have to know how to
+ # interrogate tool or job to figure out if it can be run in a
+ # container.
+ if hasattr( job_or_tool, 'containers' ):
+ tool = job_or_tool
+ elif hasattr( job_or_tool, 'tool' ):
+ # Have a JobWrapper-like
+ tool = job_or_tool.tool
+ else:
+ # Have a Job object.
+ tool = self.app.toolbox.get_tool( job_or_tool.tool_id )
+ return any( [ c.type == "docker" for c in tool.containers ] )
+
def job_count(
self,
**kwds
diff -r 8a56397528dcf8c99862394399db973f926690d2 -r d0d386587231f6b0e6c952122999b095b0e51024 lib/galaxy/jobs/stock_rules.py
--- a/lib/galaxy/jobs/stock_rules.py
+++ b/lib/galaxy/jobs/stock_rules.py
@@ -19,3 +19,7 @@
return to_destination_id
else:
return from_destination_ids[ 0 ]
+
+
+def docker_dispatch( rule_helper, tool, docker_destination_id, default_destination_id ):
+ return docker_destination_id if rule_helper.supports_docker( tool ) else default_destination_id
https://bitbucket.org/galaxy/galaxy-central/commits/3c9a16993672/
Changeset: 3c9a16993672
User: dannon
Date: 2014-08-18 21:05:18
Summary: Merged in jmchilton/galaxy-central-fork-1 (pull request #465)
Dynamic destination enhancements aimed at enabling co-locating data and (cloud) bursting.
Affected #: 10 files
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd job_conf.xml.sample_advanced
--- a/job_conf.xml.sample_advanced
+++ b/job_conf.xml.sample_advanced
@@ -235,6 +235,48 @@
<!-- A destination that represents a method in the dynamic runner. --><param id="function">foo</param></destination>
+ <destination id="load_balance" runner="dynamic">
+ <param id="type">choose_one</param>
+ <!-- Randomly assign jobs to various static destinatin ids -->
+ <param id="destination_ids">cluster1,cluster2,cluster3</param>
+ </destination>
+ <destination id="load_balance_with_data_locality" runner="dynamic">
+ <!-- Randomly assign jobs to various static destinatin ids,
+ but keep jobs in the same workflow invocation together and
+ for those jobs ran outside of workflows keep jobs in same
+ history together.
+ -->
+ <param id="type">choose_one</param>
+ <param id="destination_ids">cluster1,cluster2,cluster3</param>
+ <param id="hash_by">workflow_invocation,history</param>
+ </destination>
+ <destination id="burst_out" runner="dynamic">
+ <!-- Burst out from static destination local_cluster_8_core to
+ static destination shared_cluster_8_core when there are about
+ 50 Galaxy jobs assigned to any of the local_cluster_XXX
+ destinations (either running or queued). If there are fewer
+ than 50 jobs, just use local_cluster_8_core destination.
+
+ Uncomment job_state parameter to make this bursting happen when
+ roughly 50 jobs are queued instead.
+ -->
+ <param id="type">burst</param>
+ <param id="from_destination_ids">local_cluster_8_core,local_cluster_1_core,local_cluster_16_core</param>
+ <param id="to_destination_id">shared_cluster_8_core</param>
+ <param id="num_jobs">50</param>
+ <!-- <param id="job_states">queued</param> -->
+ </destination>
+ <destination id="docker_dispatch" runner="dynamic">
+ <!-- Follow dynamic destination type will send all tool's that
+ support docker to static destination defined by
+ docker_destination_id (docker_cluster in this example) and all
+ other tools to default_destination_id (normal_cluster in this
+ example).
+ -->
+ <param id="type">docker_dispatch</param>
+ <param id="docker_destination_id">docker_cluster</param>
+ <param id="default_destination_id">normal_cluster</param>
+ </destination><destination id="secure_pulsar_rest_dest" runner="pulsar_rest"><param id="url">https://examle.com:8913/</param><!-- If set, private_token must match token in remote Pulsar's
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -5,6 +5,8 @@
log = logging.getLogger( __name__ )
import galaxy.jobs.rules
+from galaxy.jobs import stock_rules
+
from .rule_helper import RuleHelper
DYNAMIC_RUNNER_NAME = "dynamic"
@@ -27,6 +29,13 @@
self.message = message
+STOCK_RULES = dict(
+ choose_one=stock_rules.choose_one,
+ burst=stock_rules.burst,
+ docker_dispatch=stock_rules.docker_dispatch,
+)
+
+
class JobRunnerMapper( object ):
"""
This class is responsible to managing the mapping of jobs
@@ -69,7 +78,7 @@
names.append( rule_module_name )
return names
- def __invoke_expand_function( self, expand_function ):
+ def __invoke_expand_function( self, expand_function, destination_params ):
function_arg_names = inspect.getargspec( expand_function ).args
app = self.job_wrapper.app
possible_args = {
@@ -83,6 +92,11 @@
actual_args = {}
+ # Send through any job_conf.xml defined args to function
+ for destination_param in destination_params.keys():
+ if destination_param in function_arg_names:
+ actual_args[ destination_param ] = destination_params[ destination_param ]
+
# Populate needed args
for possible_arg_name in possible_args:
if possible_arg_name in function_arg_names:
@@ -90,7 +104,7 @@
# Don't hit the DB to load the job object if not needed
require_db = False
- for param in ["job", "user", "user_email", "resource_params"]:
+ for param in ["job", "user", "user_email", "resource_params", "workflow_invocation_uuid"]:
if param in function_arg_names:
require_db = True
break
@@ -122,6 +136,11 @@
pass
actual_args[ "resource_params" ] = resource_params
+ if "workflow_invocation_uuid" in function_arg_names:
+ param_values = job.raw_param_dict( )
+ workflow_invocation_uuid = param_values.get( "__workflow_invocation_uuid__", None )
+ actual_args[ "workflow_invocation_uuid" ] = workflow_invocation_uuid
+
return expand_function( **actual_args )
def __job_params( self, job ):
@@ -172,6 +191,7 @@
def __handle_dynamic_job_destination( self, destination ):
expand_type = destination.params.get('type', "python")
+ expand_function = None
if expand_type == "python":
expand_function_name = self.__determine_expand_function_name( destination )
if not expand_function_name:
@@ -179,12 +199,15 @@
raise Exception( message )
expand_function = self.__get_expand_function( expand_function_name )
- return self.__handle_rule( expand_function )
+ elif expand_type in STOCK_RULES:
+ expand_function = STOCK_RULES[ expand_type ]
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
- def __handle_rule( self, rule_function ):
- job_destination = self.__invoke_expand_function( rule_function )
+ return self.__handle_rule( expand_function, destination )
+
+ def __handle_rule( self, rule_function, destination ):
+ job_destination = self.__invoke_expand_function( rule_function, destination.params )
if not isinstance(job_destination, galaxy.jobs.JobDestination):
job_destination_rep = str(job_destination) # Should be either id or url
if '://' in job_destination_rep:
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -1,4 +1,6 @@
from datetime import datetime
+import hashlib
+import random
from sqlalchemy import (
and_,
@@ -6,10 +8,13 @@
)
from galaxy import model
+from galaxy import util
import logging
log = logging.getLogger( __name__ )
+VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"]
+
class RuleHelper( object ):
""" Utillity to allow job rules to interface cleanly with the rest of
@@ -22,6 +27,24 @@
def __init__( self, app ):
self.app = app
+ def supports_docker( self, job_or_tool ):
+ """ Job rules can pass this function a job, job_wrapper, or tool and
+ determine if the underlying tool believes it can be containered.
+ """
+ # Not a ton of logic in this method - but the idea is to shield rule
+ # developers from the details and they shouldn't have to know how to
+ # interrogate tool or job to figure out if it can be run in a
+ # container.
+ if hasattr( job_or_tool, 'containers' ):
+ tool = job_or_tool
+ elif hasattr( job_or_tool, 'tool' ):
+ # Have a JobWrapper-like
+ tool = job_or_tool.tool
+ else:
+ # Have a Job object.
+ tool = self.app.toolbox.get_tool( job_or_tool.tool_id )
+ return any( [ c.type == "docker" for c in tool.containers ] )
+
def job_count(
self,
**kwds
@@ -58,16 +81,23 @@
query,
for_user_email=None,
for_destination=None,
+ for_destinations=None,
for_job_states=None,
created_in_last=None,
updated_in_last=None,
):
+ if for_destination is not None:
+ for_destinations = [ for_destination ]
+
query = query.join( model.User )
if for_user_email is not None:
query = query.filter( model.User.table.c.email == for_user_email )
- if for_destination is not None:
- query = query.filter( model.Job.table.c.destination_id == for_destination )
+ if for_destinations is not None:
+ if len( for_destinations ) == 1:
+ query = query.filter( model.Job.table.c.destination_id == for_destinations[ 0 ] )
+ else:
+ query = query.filter( model.Job.table.c.destination_id.in_( for_destinations ) )
if created_in_last is not None:
end_date = datetime.now()
@@ -89,3 +119,81 @@
query = query.filter( model.Job.table.c.state.in_( for_job_states ) )
return query
+
+ def should_burst( self, destination_ids, num_jobs, job_states=None ):
+ """ Check if the specified destinations ``destination_ids`` have at
+ least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued``
+ to limit this check to number of jobs queued.
+
+ See stock_rules for an simple example of using this function - but to
+ get the most out of it - it should probably be used with custom job
+ rules that can respond to the bursting by allocatin resources,
+ launching cloud nodes, etc....
+ """
+ if job_states is None:
+ job_states = "queued,running"
+ from_destination_job_count = self.job_count(
+ for_destinations=destination_ids,
+ for_job_states=util.listify( job_states )
+ )
+ # Would this job push us over maximum job count before requiring
+ # bursting (roughly... very roughly given many handler threads may be
+ # scheduling jobs).
+ return ( from_destination_job_count + 1 ) > int( num_jobs )
+
+ def choose_one( self, lst, hash_value=None ):
+ """ Choose a random value from supplied list. If hash_value is passed
+ in then every request with that same hash_value would produce the same
+ choice from the supplied list.
+ """
+ if hash_value is None:
+ return random.choice( lst )
+
+ if not isinstance( hash_value, int ):
+ # Convert hash_value string into index
+ as_hex = hashlib.md5( hash_value ).hexdigest()
+ hash_value = int(as_hex, 16)
+ # else assumed to be 'random' int from 0-~Inf
+ random_index = hash_value % len( lst )
+ return lst[ random_index ]
+
+ def job_hash( self, job, hash_by=None ):
+ """ Produce a reproducible hash for the given job on various
+ criteria - for instance if hash_by is "workflow_invocation,history" -
+ all jobs within the same workflow invocation will recieve the same
+ hash - for jobs outside of workflows all jobs within the same history
+ will recieve the same hash, other jobs will be hashed on job's id
+ randomly.
+
+ Primarily intended for use with ``choose_one`` above - to consistent
+ route or schedule related jobs.
+ """
+ if hash_by is None:
+ hash_by = [ "job" ]
+ hash_bys = util.listify( hash_by )
+ for hash_by in hash_bys:
+ job_hash = self._try_hash_for_job( job, hash_by )
+ if job_hash:
+ return job_hash
+
+ # Fallback to just hashing by job id, should always return a value.
+ return self._try_hash_for_job( job, "job" )
+
+ def _try_hash_for_job( self, job, hash_by ):
+ """ May return False or None if hash type is invalid for that job -
+ e.g. attempting to hash by user for anonymous job or by workflow
+ invocation for jobs outside of workflows.
+ """
+ if hash_by not in VALID_JOB_HASH_STRATEGIES:
+ message = "Do not know how to hash jobs by %s, must be one of %s" % ( hash_by, VALID_JOB_HASH_STRATEGIES )
+ raise Exception( message )
+
+ if hash_by == "workflow_invocation":
+ return job.raw_param_dict().get( "__workflow_invocation_uuid__", None )
+ elif hash_by == "history":
+ return job.history_id
+ elif hash_by == "user":
+ user = job.user
+ return user and user.id
+ elif hash_by == "job":
+ return job.id
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/jobs/stock_rules.py
--- /dev/null
+++ b/lib/galaxy/jobs/stock_rules.py
@@ -0,0 +1,25 @@
+""" Stock job 'dynamic' rules for use in job_conf.xml - these may cover some
+simple use cases but will just proxy into functions in rule_helper so similar
+functionality - but more tailored and composable can be utilized in custom
+rules.
+"""
+
+from galaxy import util
+
+
+def choose_one( rule_helper, job, destination_ids, hash_by="job" ):
+ destination_id_list = util.listify( destination_ids )
+ job_hash = rule_helper.job_hash( job, hash_by )
+ return rule_helper.choose_one( destination_id_list, hash_value=job_hash )
+
+
+def burst( rule_helper, job, from_destination_ids, to_destination_id, num_jobs, job_states=None):
+ from_destination_ids = util.listify( from_destination_ids )
+ if rule_helper.should_burst( from_destination_ids, num_jobs=num_jobs, job_states=job_states ):
+ return to_destination_id
+ else:
+ return from_destination_ids[ 0 ]
+
+
+def docker_dispatch( rule_helper, tool, docker_destination_id, default_destination_id ):
+ return docker_destination_id if rule_helper.supports_docker( tool ) else default_destination_id
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -482,10 +482,13 @@
Read encoded parameter values from the database and turn back into a
dict of tool parameter values.
"""
- param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
+ param_dict = self.raw_param_dict()
tool = app.toolbox.get_tool( self.tool_id )
param_dict = tool.params_from_strings( param_dict, app, ignore_errors=ignore_errors )
return param_dict
+ def raw_param_dict( self ):
+ param_dict = dict( [ ( p.name, p.value ) for p in self.parameters ] )
+ return param_dict
def check_if_output_datasets_deleted( self ):
"""
Return true if all of the output datasets associated with this job are
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/tools/execute.py
--- a/lib/galaxy/tools/execute.py
+++ b/lib/galaxy/tools/execute.py
@@ -10,13 +10,19 @@
log = logging.getLogger( __name__ )
-def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None ):
+def execute( trans, tool, param_combinations, history, rerun_remap_job_id=None, collection_info=None, workflow_invocation_uuid=None ):
"""
Execute a tool and return object containing summary (output data, number of
failures, etc...).
"""
execution_tracker = ToolExecutionTracker( tool, param_combinations, collection_info )
for params in execution_tracker.param_combinations:
+ if workflow_invocation_uuid:
+ params[ '__workflow_invocation_uuid__' ] = workflow_invocation_uuid
+ elif '__workflow_invocation_uuid__' in params:
+ # Only workflow invocation code gets to set this, ignore user supplied
+ # values or rerun parameters.
+ del params[ '__workflow_invocation_uuid__' ]
job, result = tool.handle_single_execution( trans, rerun_remap_job_id, params, history )
if job:
execution_tracker.record_success( job, result )
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd lib/galaxy/workflow/run.py
--- a/lib/galaxy/workflow/run.py
+++ b/lib/galaxy/workflow/run.py
@@ -1,3 +1,5 @@
+import uuid
+
from galaxy import model
from galaxy import exceptions
from galaxy import util
@@ -41,6 +43,8 @@
self.param_map = workflow_run_config.param_map
self.outputs = odict()
+ # TODO: Attach to actual model object and persist someday...
+ self.invocation_uuid = uuid.uuid1().hex
def invoke( self ):
workflow_invocation = model.WorkflowInvocation()
@@ -132,6 +136,7 @@
param_combinations=param_combinations,
history=self.target_history,
collection_info=collection_info,
+ workflow_invocation_uuid=self.invocation_uuid
)
if collection_info:
outputs[ step.id ] = dict( execution_tracker.created_collections )
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_mapper.py
--- a/test/unit/jobs/test_mapper.py
+++ b/test/unit/jobs/test_mapper.py
@@ -1,3 +1,5 @@
+import uuid
+
import jobs.test_rules
from galaxy.jobs.mapper import (
@@ -9,7 +11,7 @@
from galaxy.util import bunch
-
+WORKFLOW_UUID = uuid.uuid1().hex
TOOL_JOB_DESTINATION = JobDestination()
DYNAMICALLY_GENERATED_DESTINATION = JobDestination()
@@ -46,6 +48,12 @@
assert mapper.job_config.rule_response == "tool1_dest_id"
+def test_dynamic_mapping_job_conf_params():
+ mapper = __mapper( __dynamic_destination( dict( function="check_job_conf_params", param1="7" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "sent_7_dest_id"
+
+
def test_dynamic_mapping_function_parameters():
mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) )
assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
@@ -58,6 +66,12 @@
assert mapper.job_config.rule_response == "have_resource_params"
+def test_dynamic_mapping_workflow_invocation_parameter():
+ mapper = __mapper( __dynamic_destination( dict( function="check_workflow_invocation_uuid" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == WORKFLOW_UUID
+
+
def test_dynamic_mapping_no_function():
dest = __dynamic_destination( dict( ) )
mapper = __mapper( dest )
@@ -124,21 +138,26 @@
return True
def get_job(self):
+ raw_params = {
+ "threshold": 8,
+ "__workflow_invocation_uuid__": WORKFLOW_UUID,
+ }
+
def get_param_values( app, ignore_errors ):
assert app == self.app
- return {
- "threshold": 8,
- "__job_resource": {
- "__job_resource__select": "True",
- "memory": "8gb"
- }
+ params = raw_params.copy()
+ params[ "__job_resource" ] = {
+ "__job_resource__select": "True",
+ "memory": "8gb"
}
+ return params
return bunch.Bunch(
user=bunch.Bunch(
id=6789,
email="test(a)example.com"
),
+ raw_param_dict=lambda: raw_params,
get_param_values=get_param_values
)
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rule_helper.py
--- a/test/unit/jobs/test_rule_helper.py
+++ b/test/unit/jobs/test_rule_helper.py
@@ -1,3 +1,5 @@
+import uuid
+
from galaxy.util import bunch
from galaxy import model
from galaxy.model import mapping
@@ -24,9 +26,12 @@
__assert_job_count_is( 2, rule_helper, for_destination="local" )
__assert_job_count_is( 7, rule_helper, for_destination="cluster1" )
+ __assert_job_count_is( 9, rule_helper, for_destinations=["cluster1", "local"] )
+
# Test per user destination counts
__assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 )
__assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 )
+ __assert_job_count_is( 7, rule_helper, for_destinations=["cluster1", "local"], for_user_email=USER_EMAIL_1 )
__assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 )
__assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 )
@@ -69,10 +74,98 @@
app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) )
-def __rule_helper():
- app = MockApp()
- rule_helper = RuleHelper( app )
- return rule_helper
+def test_choose_one_unhashed():
+ rule_helper = __rule_helper()
+
+ # Random choices if hash not set.
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'])) )
+
+ assert chosen_ones == set(['a', 'b'])
+
+
+def test_choose_one_hashed():
+ rule_helper = __rule_helper()
+
+ # Hashed, so all choosen ones should be the same...
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value=1234)) )
+ assert len( chosen_ones ) == 1
+
+ # ... also can verify hashing on strings
+ chosen_ones = set([])
+ __do_a_bunch( lambda: chosen_ones.add(rule_helper.choose_one(['a', 'b'], hash_value="i am a string")) )
+
+ assert len( chosen_ones ) == 1
+
+
+def test_job_hash_unique_by_default( ):
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ rule_helper.job_hash( job1 ) != rule_helper.job_hash( job2 )
+
+
+def test_job_hash_history( ):
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="history" )
+
+
+def test_job_hash_workflow_invocation():
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs()
+ wi_uuid = uuid.uuid1().hex
+
+ job1.add_parameter( "__workflow_invocation_uuid__", wi_uuid )
+ job2.add_parameter( "__workflow_invocation_uuid__", wi_uuid )
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation" )
+
+
+def test_job_hash_fallback():
+ rule_helper = __rule_helper()
+ job1, job2 = __two_jobs_in_a_history()
+
+ __assert_same_hash( rule_helper, job1, job2, hash_by="workflow_invocation,history" )
+
+
+def test_should_burst( ):
+ rule_helper = __rule_helper()
+ __setup_fixtures( rule_helper.app )
+ # cluster1 fixture has 4 queued jobs, 3 running
+ assert rule_helper.should_burst( [ "cluster1" ], "7" )
+ assert not rule_helper.should_burst( [ "cluster1" ], "10" )
+
+ assert rule_helper.should_burst( [ "cluster1" ], "2", job_states="queued" )
+ assert not rule_helper.should_burst( [ "cluster1" ], "6", job_states="queued" )
+
+
+def __assert_same_hash( rule_helper, job1, job2, hash_by ):
+ job1_hash = rule_helper.job_hash( job1, hash_by=hash_by )
+ job2_hash = rule_helper.job_hash( job2, hash_by=hash_by )
+ assert job1_hash == job2_hash
+
+
+def __two_jobs_in_a_history():
+ job1, job2 = __two_jobs()
+ job1.history_id = 4
+ job2.history_id = 4
+ return job1, job2
+
+
+def __two_jobs( ):
+ job1 = model.Job()
+ job1.id = 1
+ job2 = model.Job()
+ job2.id = 2
+ return job1, job2
+
+
+def __do_a_bunch( work ):
+ for i in range( 20 ):
+ work()
def __new_job( **kwds ):
@@ -82,6 +175,12 @@
return job
+def __rule_helper():
+ app = MockApp()
+ rule_helper = RuleHelper( app )
+ return rule_helper
+
+
class MockApp( object ):
def __init__( self ):
diff -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 -r 3c9a169936720219b60be32dd790fa2f14a139cd test/unit/jobs/test_rules/10_site.py
--- a/test/unit/jobs/test_rules/10_site.py
+++ b/test/unit/jobs/test_rules/10_site.py
@@ -40,6 +40,15 @@
return "all_passed"
+def check_job_conf_params( param1 ):
+ assert param1 == "7"
+ return "sent_7_dest_id"
+
+
def check_resource_params( resource_params ):
assert resource_params["memory"] == "8gb"
return "have_resource_params"
+
+
+def check_workflow_invocation_uuid( workflow_invocation_uuid ):
+ return workflow_invocation_uuid
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dannon: Merged in kellrott/galaxy-farm/federation_ids (pull request #462)
by commits-noreply@bitbucket.org 18 Aug '14
by commits-noreply@bitbucket.org 18 Aug '14
18 Aug '14
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/0d06d852093f/
Changeset: 0d06d852093f
User: dannon
Date: 2014-08-18 19:38:24
Summary: Merged in kellrott/galaxy-farm/federation_ids (pull request #462)
UUIDs as valid inputs for workflows
Affected #: 2 files
diff -r eaf8920f999bab18a44c938565b643e0612f6e15 -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 lib/galaxy/workflow/run_request.py
--- a/lib/galaxy/workflow/run_request.py
+++ b/lib/galaxy/workflow/run_request.py
@@ -188,6 +188,15 @@
content = trans.sa_session.query(app.model.HistoryDatasetAssociation).get(
trans.security.decode_id(input_id))
assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), content.dataset )
+ elif input_source == 'uuid':
+ dataset = trans.sa_session.query(app.model.Dataset).filter(app.model.Dataset.uuid==input_id).first()
+ if dataset is None:
+ #this will need to be changed later. If federation code is avalible, then a missing UUID
+ #could be found amoung fereration partners
+ message = "Input cannot find UUID: %s." % input_id
+ raise exceptions.RequestParameterInvalidException( message )
+ assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), dataset )
+ content = history.add_dataset(dataset)
elif input_source == 'hdca':
content = app.dataset_collections_service.get_dataset_collection_instance(
trans,
diff -r eaf8920f999bab18a44c938565b643e0612f6e15 -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 templates/show_params.mako
--- a/templates/show_params.mako
+++ b/templates/show_params.mako
@@ -23,7 +23,7 @@
${ inputs_recursive(input.inputs, param_values[input.name][i], depth=depth+1) }
%endfor
%elif input.type == "conditional":
- <%
+ <%
try:
current_case = param_values[input.name]['__current_case__']
is_valid = True
@@ -53,7 +53,7 @@
<td></td></tr>
%elif input.visible:
- <%
+ <%
if hasattr( input, "label" ) and input.label:
label = input.label
else:
@@ -70,7 +70,7 @@
## Parameter does not have a stored value.
<tr><%
- # Get parameter label.
+ # Get parameter label.
if input.type == "conditional":
label = input.test_param.label
elif input.type == "repeat":
@@ -83,7 +83,7 @@
<td></td></tr>
%endif
-
+
%endfor
</%def>
@@ -91,7 +91,7 @@
<%def name="inputs_recursive_indent( text, depth )"><td style="padding-left: ${ ( depth - 1 ) * 10 }px">
${text | h}
- </td>
+ </td></%def><table class="tabletip">
@@ -120,6 +120,9 @@
<tr><td>Tool Standard Error:</td><td><a href="${h.url_for( controller='dataset', action='stderr', dataset_id=encoded_hda_id )}">stderr</a></td></tr><tr><td>Tool Exit Code:</td><td>${job.exit_code | h}</td></tr><tr><td>API ID:</td><td>${encoded_hda_id}</td></tr>
+ %if hda.dataset.uuid:
+ <tr><td>UUID:</td><td>${hda.dataset.uuid}</td></tr>
+ %endif
%if trans.user_is_admin() or trans.app.config.expose_dataset_path:
<tr><td>Full Path:</td><td>${hda.file_name | h}</td></tr>
%endif
@@ -168,4 +171,3 @@
'${dep[0].name | h}' in ${dep[1]}<br/></div>
% endfor
-
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
3 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/ce24adaae994/
Changeset: ce24adaae994
Branch: federation_ids
User: kellrott
Date: 2014-08-14 22:19:43
Summary: Adding method for workflows to use input with 'src' set as 'uuid', it will then scan datasets for a matching uuid.
Affected #: 2 files
diff -r d301ac50aa86a94e230d44eb2ae10c6c0e354b88 -r ce24adaae9941568a414f80b5f16da37e55fa192 lib/galaxy/workflow/run_request.py
--- a/lib/galaxy/workflow/run_request.py
+++ b/lib/galaxy/workflow/run_request.py
@@ -188,6 +188,15 @@
content = trans.sa_session.query(app.model.HistoryDatasetAssociation).get(
trans.security.decode_id(input_id))
assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), content.dataset )
+ elif input_source == 'uuid':
+ dataset = trans.sa_session.query(app.model.Dataset).filter(app.model.Dataset.uuid==input_id).first()
+ if dataset is None:
+ #this will need to be changed later. If federation code is avalible, then a missing UUID
+ #could be found amoung fereration partners
+ message = "Input cannot find UUID: %s." % input_id
+ raise exceptions.RequestParameterInvalidException( message )
+ content = history.add_dataset(dataset)
+ assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), content.dataset )
elif input_source == 'hdca':
content = app.dataset_collections_service.get_dataset_collection_instance(
trans,
diff -r d301ac50aa86a94e230d44eb2ae10c6c0e354b88 -r ce24adaae9941568a414f80b5f16da37e55fa192 templates/show_params.mako
--- a/templates/show_params.mako
+++ b/templates/show_params.mako
@@ -23,7 +23,7 @@
${ inputs_recursive(input.inputs, param_values[input.name][i], depth=depth+1) }
%endfor
%elif input.type == "conditional":
- <%
+ <%
try:
current_case = param_values[input.name]['__current_case__']
is_valid = True
@@ -53,7 +53,7 @@
<td></td></tr>
%elif input.visible:
- <%
+ <%
if hasattr( input, "label" ) and input.label:
label = input.label
else:
@@ -70,7 +70,7 @@
## Parameter does not have a stored value.
<tr><%
- # Get parameter label.
+ # Get parameter label.
if input.type == "conditional":
label = input.test_param.label
elif input.type == "repeat":
@@ -83,7 +83,7 @@
<td></td></tr>
%endif
-
+
%endfor
</%def>
@@ -91,7 +91,7 @@
<%def name="inputs_recursive_indent( text, depth )"><td style="padding-left: ${ ( depth - 1 ) * 10 }px">
${text | h}
- </td>
+ </td></%def><table class="tabletip">
@@ -120,6 +120,9 @@
<tr><td>Tool Standard Error:</td><td><a href="${h.url_for( controller='dataset', action='stderr', dataset_id=encoded_hda_id )}">stderr</a></td></tr><tr><td>Tool Exit Code:</td><td>${job.exit_code | h}</td></tr><tr><td>API ID:</td><td>${encoded_hda_id}</td></tr>
+ %if hda.dataset.uuid:
+ <tr><td>UUID:</td><td>${hda.dataset.uuid}</td></tr>
+ %endif
%if trans.user_is_admin() or trans.app.config.expose_dataset_path:
<tr><td>Full Path:</td><td>${hda.file_name | h}</td></tr>
%endif
@@ -168,4 +171,3 @@
'${dep[0].name | h}' in ${dep[1]}<br/></div>
% endfor
-
https://bitbucket.org/galaxy/galaxy-central/commits/9440196009e1/
Changeset: 9440196009e1
Branch: federation_ids
User: kellrott
Date: 2014-08-14 22:38:55
Summary: Checking security before creating the HDA
Affected #: 1 file
diff -r ce24adaae9941568a414f80b5f16da37e55fa192 -r 9440196009e1efdd69df90a406e49a4fc6d7655b lib/galaxy/workflow/run_request.py
--- a/lib/galaxy/workflow/run_request.py
+++ b/lib/galaxy/workflow/run_request.py
@@ -195,8 +195,8 @@
#could be found amoung fereration partners
message = "Input cannot find UUID: %s." % input_id
raise exceptions.RequestParameterInvalidException( message )
+ assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), dataset )
content = history.add_dataset(dataset)
- assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), content.dataset )
elif input_source == 'hdca':
content = app.dataset_collections_service.get_dataset_collection_instance(
trans,
https://bitbucket.org/galaxy/galaxy-central/commits/0d06d852093f/
Changeset: 0d06d852093f
User: dannon
Date: 2014-08-18 19:38:24
Summary: Merged in kellrott/galaxy-farm/federation_ids (pull request #462)
UUIDs as valid inputs for workflows
Affected #: 2 files
diff -r eaf8920f999bab18a44c938565b643e0612f6e15 -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 lib/galaxy/workflow/run_request.py
--- a/lib/galaxy/workflow/run_request.py
+++ b/lib/galaxy/workflow/run_request.py
@@ -188,6 +188,15 @@
content = trans.sa_session.query(app.model.HistoryDatasetAssociation).get(
trans.security.decode_id(input_id))
assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), content.dataset )
+ elif input_source == 'uuid':
+ dataset = trans.sa_session.query(app.model.Dataset).filter(app.model.Dataset.uuid==input_id).first()
+ if dataset is None:
+ #this will need to be changed later. If federation code is avalible, then a missing UUID
+ #could be found amoung fereration partners
+ message = "Input cannot find UUID: %s." % input_id
+ raise exceptions.RequestParameterInvalidException( message )
+ assert trans.user_is_admin() or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), dataset )
+ content = history.add_dataset(dataset)
elif input_source == 'hdca':
content = app.dataset_collections_service.get_dataset_collection_instance(
trans,
diff -r eaf8920f999bab18a44c938565b643e0612f6e15 -r 0d06d852093fcca94adc1a93a0fb9b7efc2880c4 templates/show_params.mako
--- a/templates/show_params.mako
+++ b/templates/show_params.mako
@@ -23,7 +23,7 @@
${ inputs_recursive(input.inputs, param_values[input.name][i], depth=depth+1) }
%endfor
%elif input.type == "conditional":
- <%
+ <%
try:
current_case = param_values[input.name]['__current_case__']
is_valid = True
@@ -53,7 +53,7 @@
<td></td></tr>
%elif input.visible:
- <%
+ <%
if hasattr( input, "label" ) and input.label:
label = input.label
else:
@@ -70,7 +70,7 @@
## Parameter does not have a stored value.
<tr><%
- # Get parameter label.
+ # Get parameter label.
if input.type == "conditional":
label = input.test_param.label
elif input.type == "repeat":
@@ -83,7 +83,7 @@
<td></td></tr>
%endif
-
+
%endfor
</%def>
@@ -91,7 +91,7 @@
<%def name="inputs_recursive_indent( text, depth )"><td style="padding-left: ${ ( depth - 1 ) * 10 }px">
${text | h}
- </td>
+ </td></%def><table class="tabletip">
@@ -120,6 +120,9 @@
<tr><td>Tool Standard Error:</td><td><a href="${h.url_for( controller='dataset', action='stderr', dataset_id=encoded_hda_id )}">stderr</a></td></tr><tr><td>Tool Exit Code:</td><td>${job.exit_code | h}</td></tr><tr><td>API ID:</td><td>${encoded_hda_id}</td></tr>
+ %if hda.dataset.uuid:
+ <tr><td>UUID:</td><td>${hda.dataset.uuid}</td></tr>
+ %endif
%if trans.user_is_admin() or trans.app.config.expose_dataset_path:
<tr><td>Full Path:</td><td>${hda.file_name | h}</td></tr>
%endif
@@ -168,4 +171,3 @@
'${dep[0].name | h}' in ${dep[1]}<br/></div>
% endfor
-
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
2 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/162b355ae974/
Changeset: 162b355ae974
User: jmchilton
Date: 2014-08-18 16:28:40
Summary: Minor trans-related changes for jobs.
Remove a couple unused references on the job execution path through the code and require 'less' from the object to run jobs (i.e. try to use only trans.user and not trans.get_user() and make access to the underlying session optional in galaxy.tools.actions).
Affected #: 3 files
diff -r e66936a92febf952017586742db36ed44be4d08b -r 162b355ae974e9082ae013640632d534579add6c lib/galaxy/model/__init__.py
--- a/lib/galaxy/model/__init__.py
+++ b/lib/galaxy/model/__init__.py
@@ -1610,7 +1610,7 @@
except KeyError:
depends_list = []
# See if converted dataset already exists, either in metadata in conversions.
- converted_dataset = self.get_metadata_dataset( trans, target_ext )
+ converted_dataset = self.get_metadata_dataset( target_ext )
if converted_dataset:
return converted_dataset
converted_dataset = self.get_converted_files_by_type( target_ext )
@@ -1641,7 +1641,7 @@
session.add( assoc )
session.flush()
return None
- def get_metadata_dataset( self, trans, dataset_ext ):
+ def get_metadata_dataset( self, dataset_ext ):
"""
Returns an HDA that points to a metadata file which contains a
converted data with the requested extension.
diff -r e66936a92febf952017586742db36ed44be4d08b -r 162b355ae974e9082ae013640632d534579add6c lib/galaxy/tools/actions/__init__.py
--- a/lib/galaxy/tools/actions/__init__.py
+++ b/lib/galaxy/tools/actions/__init__.py
@@ -133,7 +133,7 @@
tool.visit_inputs( param_values, visitor )
return input_datasets
- def collect_input_dataset_collections( self, tool, param_values, trans ):
+ def collect_input_dataset_collections( self, tool, param_values ):
input_dataset_collections = dict()
def visitor( prefix, input, value, parent=None ):
@@ -167,7 +167,7 @@
out_data = odict()
# Track input dataset collections - but replace with simply lists so collect
# input datasets can process these normally.
- inp_dataset_collections = self.collect_input_dataset_collections( tool, incoming, trans )
+ inp_dataset_collections = self.collect_input_dataset_collections( tool, incoming )
# Collect any input datasets from the incoming parameters
inp_data = self.collect_input_datasets( tool, incoming, trans )
@@ -331,10 +331,12 @@
trans.sa_session.flush()
# Create the job object
job = trans.app.model.Job()
- galaxy_session = trans.get_galaxy_session()
- # If we're submitting from the API, there won't be a session.
- if type( galaxy_session ) == trans.model.GalaxySession:
- job.session_id = galaxy_session.id
+
+ if hasattr( trans, "get_galaxy_session" ):
+ galaxy_session = trans.get_galaxy_session()
+ # If we're submitting from the API, there won't be a session.
+ if type( galaxy_session ) == trans.model.GalaxySession:
+ job.session_id = galaxy_session.id
if trans.user is not None:
job.user_id = trans.user.id
job.history_id = history.id
diff -r e66936a92febf952017586742db36ed44be4d08b -r 162b355ae974e9082ae013640632d534579add6c lib/galaxy/util/dbkeys.py
--- a/lib/galaxy/util/dbkeys.py
+++ b/lib/galaxy/util/dbkeys.py
@@ -36,7 +36,7 @@
.filter_by( deleted=False, history_id=trans.history.id, extension="len" )
for dataset in datasets:
rval.append( ( dataset.dbkey, "%s (%s) [History]" % ( dataset.name, dataset.dbkey ) ) )
- user = trans.get_user()
+ user = trans.user
if user and 'dbkeys' in user.preferences:
user_keys = from_json_string( user.preferences['dbkeys'] )
for key, chrom_dict in user_keys.iteritems():
https://bitbucket.org/galaxy/galaxy-central/commits/eaf8920f999b/
Changeset: eaf8920f999b
User: jmchilton
Date: 2014-08-18 16:28:40
Summary: Extract some mixin's out of trans allowing trans-like functionilty outside web threads...
In particular break out mixins defining context-based utilities for interacting with app, an user, and a history. In downstream work on workflow scheduling this proves nessecary and sufficient to create an context for 'execute'-ing tools (create jobs) outside of web threads in response to workflow requests.
Unit tests for some of this.
Affected #: 2 files
diff -r 162b355ae974e9082ae013640632d534579add6c -r eaf8920f999bab18a44c938565b643e0612f6e15 lib/galaxy/web/framework/__init__.py
--- a/lib/galaxy/web/framework/__init__.py
+++ b/lib/galaxy/web/framework/__init__.py
@@ -491,7 +491,178 @@
return T( app )
-class GalaxyWebTransaction( base.DefaultWebTransaction ):
+class ProvidesAppContext( object ):
+ """ For transaction-like objects to provide Galaxy convience layer for
+ database and event handling.
+
+ Mixed in class must provide `app` property.
+ """
+
+ def log_action( self, user=None, action=None, context=None, params=None):
+ """
+ Application-level logging of user actions.
+ """
+ if self.app.config.log_actions:
+ action = self.app.model.UserAction(action=action, context=context, params=unicode( to_json_string( params ) ) )
+ try:
+ if user:
+ action.user = user
+ else:
+ action.user = self.user
+ except:
+ action.user = None
+ try:
+ action.session_id = self.galaxy_session.id
+ except:
+ action.session_id = None
+ self.sa_session.add( action )
+ self.sa_session.flush()
+
+ def log_event( self, message, tool_id=None, **kwargs ):
+ """
+ Application level logging. Still needs fleshing out (log levels and such)
+ Logging events is a config setting - if False, do not log.
+ """
+ if self.app.config.log_events:
+ event = self.app.model.Event()
+ event.tool_id = tool_id
+ try:
+ event.message = message % kwargs
+ except:
+ event.message = message
+ try:
+ event.history = self.get_history()
+ except:
+ event.history = None
+ try:
+ event.history_id = self.history.id
+ except:
+ event.history_id = None
+ try:
+ event.user = self.user
+ except:
+ event.user = None
+ try:
+ event.session_id = self.galaxy_session.id
+ except:
+ event.session_id = None
+ self.sa_session.add( event )
+ self.sa_session.flush()
+
+ @property
+ def sa_session( self ):
+ """
+ Returns a SQLAlchemy session -- currently just gets the current
+ session from the threadlocal session context, but this is provided
+ to allow migration toward a more SQLAlchemy 0.4 style of use.
+ """
+ return self.app.model.context.current
+
+ def expunge_all( self ):
+ app = self.app
+ context = app.model.context
+ context.expunge_all()
+ # This is a bit hacky, should refctor this. Maybe refactor to app -> expunge_all()
+ if hasattr(app, 'install_model'):
+ install_model = app.install_model
+ if install_model != app.model:
+ install_model.context.expunge_all()
+
+ def get_toolbox(self):
+ """Returns the application toolbox"""
+ return self.app.toolbox
+
+ @property
+ def model( self ):
+ return self.app.model
+
+ @property
+ def install_model( self ):
+ return self.app.install_model
+
+ def request_types(self):
+ if self.sa_session.query( self.app.model.RequestType ).filter_by( deleted=False ).count() > 0:
+ return True
+ return False
+
+
+class ProvidesUserContext( object ):
+ """ For transaction-like objects to provide Galaxy convience layer for
+ reasoning about users.
+
+ Mixed in class must provide `user`, `api_inherit_admin`, and `app`
+ properties.
+ """
+
+ @property
+ def anonymous( self ):
+ return self.user is None and not self.api_inherit_admin
+
+ def get_current_user_roles( self ):
+ user = self.user
+ if user:
+ roles = user.all_roles()
+ else:
+ roles = []
+ return roles
+
+ def user_is_admin( self ):
+ if self.api_inherit_admin:
+ return True
+ return self.user and self.user.email in self.app.config.admin_users_list
+
+ def user_can_do_run_as( self ):
+ run_as_users = [ user for user in self.app.config.get( "api_allow_run_as", "" ).split( "," ) if user ]
+ if not run_as_users:
+ return False
+ user_in_run_as_users = self.user and self.user.email in run_as_users
+ # Can do if explicitly in list or master_api_key supplied.
+ can_do_run_as = user_in_run_as_users or self.api_inherit_admin
+ return can_do_run_as
+
+ @property
+ def user_ftp_dir( self ):
+ identifier = self.app.config.ftp_upload_dir_identifier
+ return os.path.join( self.app.config.ftp_upload_dir, getattr(self.user, identifier) )
+
+
+class ProvidesHistoryContext( object ):
+ """ For transaction-like objects to provide Galaxy convience layer for
+ reasoning about histories.
+
+ Mixed in class must provide `user`, `history`, and `app`
+ properties.
+ """
+
+ def db_dataset_for( self, dbkey ):
+ """
+ Returns the db_file dataset associated/needed by `dataset`, or `None`.
+ """
+ # If no history, return None.
+ if self.history is None:
+ return None
+ if isinstance(self.history, Bunch):
+ # The API presents a Bunch for a history. Until the API is
+ # more fully featured for handling this, also return None.
+ return None
+ datasets = self.sa_session.query( self.app.model.HistoryDatasetAssociation ) \
+ .filter_by( deleted=False, history_id=self.history.id, extension="len" )
+ for ds in datasets:
+ if dbkey == ds.dbkey:
+ return ds
+ return None
+
+ @property
+ def db_builds( self ):
+ """
+ Returns the builds defined by galaxy and the builds defined by
+ the user (chromInfo in history).
+ """
+ # FIXME: This method should be removed
+ return self.app.genome_builds.get_genome_build_names( trans=self )
+
+
+class GalaxyWebTransaction( base.DefaultWebTransaction, ProvidesAppContext, ProvidesUserContext, ProvidesHistoryContext ):
"""
Encapsulates web transaction specific state for the Galaxy application
(specifically the user's "cookie" session and history)
@@ -552,29 +723,6 @@
t = Translations.load( dirname='locale', locales=locales, domain='ginga' )
self.template_context.update( dict( _=t.ugettext, n_=t.ugettext, N_=t.ungettext ) )
- @property
- def anonymous( self ):
- return self.user is None and not self.api_inherit_admin
-
- @property
- def sa_session( self ):
- """
- Returns a SQLAlchemy session -- currently just gets the current
- session from the threadlocal session context, but this is provided
- to allow migration toward a more SQLAlchemy 0.4 style of use.
- """
- return self.app.model.context.current
-
- def expunge_all( self ):
- app = self.app
- context = app.model.context
- context.expunge_all()
- # This is a bit hacky, should refctor this. Maybe refactor to app -> expunge_all()
- if hasattr(app, 'install_model'):
- install_model = app.install_model
- if install_model != app.model:
- install_model.context.expunge_all()
-
def get_user( self ):
"""Return the current user if logged in or None."""
if self.galaxy_session:
@@ -592,57 +740,6 @@
user = property( get_user, set_user )
- def log_action( self, user=None, action=None, context=None, params=None):
- """
- Application-level logging of user actions.
- """
- if self.app.config.log_actions:
- action = self.app.model.UserAction(action=action, context=context, params=unicode( to_json_string( params ) ) )
- try:
- if user:
- action.user = user
- else:
- action.user = self.user
- except:
- action.user = None
- try:
- action.session_id = self.galaxy_session.id
- except:
- action.session_id = None
- self.sa_session.add( action )
- self.sa_session.flush()
-
- def log_event( self, message, tool_id=None, **kwargs ):
- """
- Application level logging. Still needs fleshing out (log levels and such)
- Logging events is a config setting - if False, do not log.
- """
- if self.app.config.log_events:
- event = self.app.model.Event()
- event.tool_id = tool_id
- try:
- event.message = message % kwargs
- except:
- event.message = message
- try:
- event.history = self.get_history()
- except:
- event.history = None
- try:
- event.history_id = self.history.id
- except:
- event.history_id = None
- try:
- event.user = self.user
- except:
- event.user = None
- try:
- event.session_id = self.galaxy_session.id
- except:
- event.session_id = None
- self.sa_session.add( event )
- self.sa_session.flush()
-
def get_cookie( self, name='galaxysession' ):
"""Convenience method for getting a session cookie"""
try:
@@ -1100,44 +1197,10 @@
self.sa_session.flush()
return history
- def get_current_user_roles( self ):
- user = self.get_user()
- if user:
- roles = user.all_roles()
- else:
- roles = []
- return roles
-
- def user_is_admin( self ):
- if self.api_inherit_admin:
- return True
- return self.user and self.user.email in self.app.config.admin_users_list
-
- def user_can_do_run_as( self ):
- run_as_users = [ user for user in self.app.config.get( "api_allow_run_as", "" ).split( "," ) if user ]
- if not run_as_users:
- return False
- user_in_run_as_users = self.user and self.user.email in run_as_users
- # Can do if explicitly in list or master_api_key supplied.
- can_do_run_as = user_in_run_as_users or self.api_inherit_admin
- return can_do_run_as
-
- def get_toolbox(self):
- """Returns the application toolbox"""
- return self.app.toolbox
-
@base.lazy_property
def template_context( self ):
return dict()
- @property
- def model( self ):
- return self.app.model
-
- @property
- def install_model( self ):
- return self.app.install_model
-
def make_form_data( self, name, **kwargs ):
rval = self.template_context[name] = FormData()
rval.values.update( kwargs )
@@ -1250,43 +1313,6 @@
searchList=[context or kwargs, dict(caller=self)] )
return str(template)
- @property
- def db_builds( self ):
- """
- Returns the builds defined by galaxy and the builds defined by
- the user (chromInfo in history).
- """
- # FIXME: This method should be removed
- return self.app.genome_builds.get_genome_build_names( trans=self )
-
- @property
- def user_ftp_dir( self ):
- identifier = self.app.config.ftp_upload_dir_identifier
- return os.path.join( self.app.config.ftp_upload_dir, getattr(self.user, identifier) )
-
- def db_dataset_for( self, dbkey ):
- """
- Returns the db_file dataset associated/needed by `dataset`, or `None`.
- """
- # If no history, return None.
- if self.history is None:
- return None
- if isinstance(self.history, Bunch):
- # The API presents a Bunch for a history. Until the API is
- # more fully featured for handling this, also return None.
- return None
- datasets = self.sa_session.query( self.app.model.HistoryDatasetAssociation ) \
- .filter_by( deleted=False, history_id=self.history.id, extension="len" )
- for ds in datasets:
- if dbkey == ds.dbkey:
- return ds
- return None
-
- def request_types(self):
- if self.sa_session.query( self.app.model.RequestType ).filter_by( deleted=False ).count() > 0:
- return True
- return False
-
class FormBuilder( object ):
"""
diff -r 162b355ae974e9082ae013640632d534579add6c -r eaf8920f999bab18a44c938565b643e0612f6e15 test/unit/test_galaxy_transactions.py
--- /dev/null
+++ b/test/unit/test_galaxy_transactions.py
@@ -0,0 +1,74 @@
+from galaxy import model
+from galaxy.model import mapping
+from galaxy.util import bunch
+from galaxy.web import framework
+
+
+class TestTransaction( framework.ProvidesAppContext ):
+
+ def __init__( self ):
+ self.app = TestApp()
+
+
+def test_logging_events_off():
+ trans = TestTransaction()
+ trans.log_event( "test event 123" )
+ assert len( trans.sa_session.query( model.Event ).all() ) == 0
+
+
+def test_logging_events_on():
+ trans = TestTransaction()
+ trans.app.config.log_events = True
+ trans.log_event( "test event 123" )
+ events = trans.sa_session.query( model.Event ).all()
+ assert len( events ) == 1
+ assert events[ 0 ].message == "test event 123"
+
+
+def test_logging_actions_off():
+ trans = TestTransaction()
+ trans.log_action( "test action 123" )
+ assert len( trans.sa_session.query( model.Event ).all() ) == 0
+
+
+def test_logging_actions_on():
+ trans = TestTransaction()
+ trans.app.config.log_actions = True
+ trans.log_action( None, "test action 123", context="the context", params=dict(foo="bar") )
+ actions = trans.sa_session.query( model.UserAction ).all()
+ assert len( actions ) == 1
+ assert actions[ 0 ].action == "test action 123"
+
+
+def test_expunge_all():
+ trans = TestTransaction()
+
+ user = model.User( "foo", "bar1" )
+ trans.sa_session.add( user )
+
+ user.password = "bar2"
+ trans.sa_session.flush()
+
+ assert trans.sa_session.query( model.User ).first().password == "bar2"
+
+ trans.sa_session.expunge_all()
+
+ user.password = "bar3"
+ trans.sa_session.flush()
+
+ # Password unchange because not attached to session/context.
+ assert trans.sa_session.query( model.User ).first().password == "bar2"
+
+
+class TestApp( object ):
+
+ def __init__( self ):
+ self.config = bunch.Bunch(
+ log_events=False,
+ log_actions=False,
+ )
+ self.model = mapping.init(
+ "/tmp",
+ "sqlite:///:memory:",
+ create_tables=True
+ )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: jmchilton: Fix for running LSF DRMAA jobs as 'real' user.
by commits-noreply@bitbucket.org 18 Aug '14
by commits-noreply@bitbucket.org 18 Aug '14
18 Aug '14
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/e66936a92feb/
Changeset: e66936a92feb
User: jmchilton
Date: 2014-08-18 14:43:37
Summary: Fix for running LSF DRMAA jobs as 'real' user.
Fix thanks to Chong Chen at IBM - "When invoking the LSF drmma api to submit a job, the API will print the message like 'Job <78> is submitted to default queue <normal>'. So if the Galaxy want to use this API, need modify the drmaa_external_runner.py script to handle this message. LSF already support a env variable to disable this message print out, "BSUB_QUIET", set this into env can disable this message output."
A second issue was related to subsequent querying of the job state as a user other than the submitter - according to Chong Chen this has been fixed in latest LSF DRMAA changes on github (https://github.com/PlatformLSF/lsf-drmaa/commit/43e62546172b90d9f95e90273d7…)
Affected #: 1 file
diff -r 572a8d5ee218aeb37f77660948cb75a439936868 -r e66936a92febf952017586742db36ed44be4d08b scripts/drmaa_external_runner.py
--- a/scripts/drmaa_external_runner.py
+++ b/scripts/drmaa_external_runner.py
@@ -124,6 +124,9 @@
userid, json_filename, assign_all_groups = validate_paramters()
set_user(userid, assign_all_groups)
json_file_exists(json_filename)
+ # Added to disable LSF generated messages that would interfer with this
+ # script. Fix thank to Chong Chen at IBM.
+ os.environ['BSUB_QUIET'] = 'Y'
s = drmaa.Session()
s.initialize()
jt = s.createJobTemplate()
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
6 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/ae0dd02c419c/
Changeset: ae0dd02c419c
User: jmchilton
Date: 2014-08-17 22:38:34
Summary: PEP-8 fixes for galaxy.jobs.mapper.
Affected #: 1 file
diff -r 109f40daa21ad0348662805f93f1cf1c9fbd6003 -r ae0dd02c419c0cc781c8601ffc5fd9bf4f8af2ca lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -10,6 +10,7 @@
DYNAMIC_RUNNER_NAME = "dynamic"
DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"
+
class JobMappingException( Exception ):
def __init__( self, failure_message ):
@@ -65,12 +66,14 @@
def __invoke_expand_function( self, expand_function ):
function_arg_names = inspect.getargspec( expand_function ).args
app = self.job_wrapper.app
- possible_args = { "job_id" : self.job_wrapper.job_id,
- "tool" : self.job_wrapper.tool,
- "tool_id" : self.job_wrapper.tool.id,
- "job_wrapper" : self.job_wrapper,
- "rule_helper": RuleHelper( app ),
- "app" : app }
+ possible_args = {
+ "job_id": self.job_wrapper.job_id,
+ "tool": self.job_wrapper.tool,
+ "tool_id": self.job_wrapper.tool.id,
+ "job_wrapper": self.job_wrapper,
+ "rule_helper": RuleHelper( app ),
+ "app": app
+ }
actual_args = {}
@@ -82,7 +85,6 @@
# Don't hit the DB to load the job object if not needed
if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names or "resource_params" in function_arg_names:
job = self.job_wrapper.get_job()
- history = job.history
user = job.user
user_email = user and str(user.email)
https://bitbucket.org/galaxy/galaxy-central/commits/cfb0c443884f/
Changeset: cfb0c443884f
User: jmchilton
Date: 2014-08-17 22:38:34
Summary: Small refactor of galaxy.jobs.mapper to make it more testable.
Affected #: 1 file
diff -r ae0dd02c419c0cc781c8601ffc5fd9bf4f8af2ca -r cfb0c443884f65a6297699b35b323962e2b74539 lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -35,6 +35,8 @@
self.url_to_destination = url_to_destination
self.job_config = job_config
+ self.rules_module = galaxy.jobs.rules
+
def __get_rule_modules( self ):
unsorted_module_names = self.__get_rule_module_names( )
## Load modules in reverse order to allow hierarchical overrides
@@ -55,11 +57,12 @@
return modules
def __get_rule_module_names( self ):
- rules_dir = galaxy.jobs.rules.__path__[0]
+ rules_dir = self.rules_module.__path__[0]
names = []
for fname in os.listdir( rules_dir ):
if not( fname.startswith( "_" ) ) and fname.endswith( ".py" ):
- rule_module_name = "galaxy.jobs.rules.%s" % fname[:-len(".py")]
+ base_name = self.rules_module.__name__
+ rule_module_name = "%s.%s" % (base_name, fname[:-len(".py")])
names.append( rule_module_name )
return names
https://bitbucket.org/galaxy/galaxy-central/commits/949be0b9ef72/
Changeset: 949be0b9ef72
User: jmchilton
Date: 2014-08-17 22:38:34
Summary: Job mapping unit tests.
A bunch of unit tests for galaxy.jobs.mapper and galaxy.jobs.rule_helper testing various aspects of static and dynamic job mapping and the utility class (RuleHelper) that can be supplied to dynamic job rules.
Affected #: 5 files
diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_mapper.py
--- /dev/null
+++ b/test/unit/jobs/test_mapper.py
@@ -0,0 +1,129 @@
+import jobs.test_rules
+
+from galaxy.jobs.mapper import JobRunnerMapper
+from galaxy.jobs import JobDestination
+
+from galaxy.util import bunch
+
+
+TOOL_JOB_DESTINATION = JobDestination()
+DYNAMICALLY_GENERATED_DESTINATION = JobDestination()
+
+
+def test_static_mapping():
+ mapper = __mapper()
+ assert mapper.get_job_destination( {} ) is TOOL_JOB_DESTINATION
+
+
+def test_caching():
+ mapper = __mapper()
+ mapper.get_job_destination( {} )
+ mapper.get_job_destination( {} )
+ assert mapper.job_wrapper.tool.call_count == 1
+
+
+def test_dynamic_mapping():
+ mapper = __mapper( __dynamic_destination( dict( function="upload" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "local_runner"
+
+
+def test_dynamic_mapping_priorities():
+ mapper = __mapper( __dynamic_destination( dict( function="tophat" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ # Next line verifies we using definition in 20_instance.py instead of
+ # 10_site.py.
+ assert mapper.job_config.rule_response == "instance_dest_id"
+
+
+def test_dynamic_mapping_defaults_to_tool_id_as_rule():
+ mapper = __mapper( __dynamic_destination( ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "tool1_dest_id"
+
+
+def test_dynamic_mapping_function_parameters():
+ mapper = __mapper( __dynamic_destination( dict( function="check_rule_params" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "all_passed"
+
+
+def test_dynamic_mapping_resource_parameters():
+ mapper = __mapper( __dynamic_destination( dict( function="check_resource_params" ) ) )
+ assert mapper.get_job_destination( {} ) is DYNAMICALLY_GENERATED_DESTINATION
+ assert mapper.job_config.rule_response == "have_resource_params"
+
+
+def __mapper( tool_job_destination=TOOL_JOB_DESTINATION ):
+ job_wrapper = MockJobWrapper( tool_job_destination )
+ job_config = MockJobConfig()
+
+ mapper = JobRunnerMapper(
+ job_wrapper,
+ {},
+ job_config
+ )
+ mapper.rules_module = jobs.test_rules
+ return mapper
+
+
+def __dynamic_destination( params={} ):
+ return JobDestination( runner="dynamic", params=params )
+
+
+class MockJobConfig( object ):
+
+ def __init__( self ):
+ self.rule_response = None
+
+ def get_destination( self, rep ):
+ # Called to transform dynamic job destination rule response
+ # from destination id/runner url into a dynamic job destination.
+ self.rule_response = rep
+ return DYNAMICALLY_GENERATED_DESTINATION
+
+
+class MockJobWrapper( object ):
+
+ def __init__( self, tool_job_destination ):
+ self.tool = MockTool( tool_job_destination )
+ self.job_id = 12345
+ self.app = object()
+
+ def is_mock_job_wrapper( self ):
+ return True
+
+ def get_job(self):
+ def get_param_values( app, ignore_errors ):
+ assert app == self.app
+ return {
+ "threshold": 8,
+ "__job_resource": {
+ "__job_resource__select": "True",
+ "memory": "8gb"
+ }
+ }
+
+ return bunch.Bunch(
+ user=bunch.Bunch(
+ id=6789,
+ email="test(a)example.com"
+ ),
+ get_param_values=get_param_values
+ )
+
+
+class MockTool( object ):
+
+ def __init__( self, tool_job_destination ):
+ self.id = "testtoolshed/devteam/tool1/23abcd13123"
+ self.call_count = 0
+ self.tool_job_destination = tool_job_destination
+ self.all_ids = [ "testtoolshed/devteam/tool1/23abcd13123", "tool1" ]
+
+ def get_job_destination( self, params ):
+ self.call_count += 1
+ return self.tool_job_destination
+
+ def is_mock_tool( self ):
+ return True
diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_rule_helper.py
--- /dev/null
+++ b/test/unit/jobs/test_rule_helper.py
@@ -0,0 +1,98 @@
+from galaxy.util import bunch
+from galaxy import model
+from galaxy.model import mapping
+
+from galaxy.jobs.rule_helper import RuleHelper
+
+USER_EMAIL_1 = "u1(a)example.com"
+USER_EMAIL_2 = "u2(a)example.com"
+USER_EMAIL_3 = "u3(a)example.com"
+
+
+def test_job_count():
+ rule_helper = __rule_helper()
+ __assert_job_count_is( 0, rule_helper )
+
+ __setup_fixtures( rule_helper.app )
+
+ # Test raw counts for users...
+ __assert_job_count_is( 7, rule_helper, for_user_email=USER_EMAIL_1 )
+ __assert_job_count_is( 2, rule_helper, for_user_email=USER_EMAIL_2 )
+ __assert_job_count_is( 0, rule_helper, for_user_email=USER_EMAIL_3 )
+
+ # Test desitnation counts
+ __assert_job_count_is( 2, rule_helper, for_destination="local" )
+ __assert_job_count_is( 7, rule_helper, for_destination="cluster1" )
+
+ # Test per user destination counts
+ __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1 )
+ __assert_job_count_is( 2, rule_helper, for_destination="local", for_user_email=USER_EMAIL_1 )
+
+ __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_2 )
+ __assert_job_count_is( 0, rule_helper, for_destination="local", for_user_email=USER_EMAIL_2 )
+
+ # Test per user, per state destination counts
+ __assert_job_count_is( 3, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "queued" ] )
+ __assert_job_count_is( 2, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "running" ] )
+ __assert_job_count_is( 0, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "error" ] )
+ __assert_job_count_is( 5, rule_helper, for_destination="cluster1", for_user_email=USER_EMAIL_1, for_job_states=[ "queued", "running", "error" ] )
+
+
+def __assert_job_count_is( expected_count, rule_helper, **kwds ):
+ acutal_count = rule_helper.job_count( **kwds )
+
+ if expected_count != acutal_count:
+ template = "Expected job count %d, actual job count %s for params %s"
+ raise AssertionError( template % ( expected_count, acutal_count, kwds ) )
+
+
+def __setup_fixtures( app ):
+ # user1 has 3 jobs queued and 2 jobs running on cluster1 and one queued and
+ # on running job on local. user2 has a queued and running job on the cluster.
+ # user3 has no jobs.
+ user1 = model.User( email=USER_EMAIL_1, password="pass1" )
+ user2 = model.User( email=USER_EMAIL_2, password="pass2" )
+ user3 = model.User( email=USER_EMAIL_2, password="pass2" )
+
+ app.add( user1, user2, user3 )
+
+ app.add( __new_job( user=user1, destination_id="cluster1", state="queued" ) )
+ app.add( __new_job( user=user1, destination_id="cluster1", state="queued" ) )
+ app.add( __new_job( user=user1, destination_id="cluster1", state="queued" ) )
+ app.add( __new_job( user=user1, destination_id="cluster1", state="running" ) )
+ app.add( __new_job( user=user1, destination_id="cluster1", state="running" ) )
+
+ app.add( __new_job( user=user1, destination_id="local", state="queued" ) )
+ app.add( __new_job( user=user1, destination_id="local", state="running" ) )
+
+ app.add( __new_job( user=user2, destination_id="cluster1", state="queued" ) )
+ app.add( __new_job( user=user2, destination_id="cluster1", state="running" ) )
+
+
+def __rule_helper():
+ app = MockApp()
+ rule_helper = RuleHelper( app )
+ return rule_helper
+
+
+def __new_job( **kwds ):
+ job = model.Job()
+ for key, value in kwds.items():
+ setattr( job, key, value )
+ return job
+
+
+class MockApp( object ):
+
+ def __init__( self ):
+ self.config = bunch.Bunch( )
+ self.model = mapping.init(
+ "/tmp",
+ "sqlite:///:memory:",
+ create_tables=True
+ )
+
+ def add( self, *args ):
+ for arg in args:
+ self.model.context.add( arg )
+ self.model.context.flush()
diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_rules/10_site.py
--- /dev/null
+++ b/test/unit/jobs/test_rules/10_site.py
@@ -0,0 +1,45 @@
+
+
+def upload():
+ return 'local_runner'
+
+
+def tophat():
+ return 'site_dest_id'
+
+
+def tool1():
+ # tool1 is id to test tool mocked out in test_mapper.py, without specify
+ # function name in dynamic destination - this function should be used by
+ # default.
+ return 'tool1_dest_id'
+
+
+def check_rule_params(
+ job_id,
+ tool,
+ tool_id,
+ job_wrapper,
+ rule_helper,
+ app,
+ job,
+ user,
+ user_email,
+):
+ assert job_id == 12345
+ assert tool.is_mock_tool()
+ assert tool_id == "testtoolshed/devteam/tool1/23abcd13123"
+ assert job_wrapper.is_mock_job_wrapper()
+ assert app == job_wrapper.app
+ assert rule_helper is not None
+
+ assert job.user == user
+ assert user.id == 6789
+ assert user_email == "test(a)example.com"
+
+ return "all_passed"
+
+
+def check_resource_params( resource_params ):
+ assert resource_params["memory"] == "8gb"
+ return "have_resource_params"
diff -r cfb0c443884f65a6297699b35b323962e2b74539 -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa test/unit/jobs/test_rules/20_instance.py
--- /dev/null
+++ b/test/unit/jobs/test_rules/20_instance.py
@@ -0,0 +1,6 @@
+
+
+def tophat():
+ # This should override definition in 10_site.py
+ return 'instance_dest_id'
+
https://bitbucket.org/galaxy/galaxy-central/commits/47efdb818f41/
Changeset: 47efdb818f41
User: jmchilton
Date: 2014-08-17 22:38:34
Summary: Improved error handling for dynamic job destinations.
More and more verbose error messages with updated terminology and error conditions unit tested.
Affected #: 2 files
diff -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa -r 47efdb818f41227c4c4ff179a0c9a28e9e154acd lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -10,6 +10,9 @@
DYNAMIC_RUNNER_NAME = "dynamic"
DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"
+ERROR_MESSAGE_NO_RULE_FUNCTION = "Galaxy misconfigured - cannot find dynamic rule function name for destination %s."
+ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND = "Galaxy misconfigured - no rule function named %s found in dynamic rule modules."
+
class JobMappingException( Exception ):
@@ -150,7 +153,8 @@
expand_function = getattr( matching_rule_module, expand_function_name )
return expand_function
else:
- raise Exception( "Dynamic job runner cannot find function to expand job runner type - %s" % expand_function_name )
+ message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % ( expand_function_name )
+ raise Exception( message )
def __last_rule_module_with_function( self, function_name ):
# self.rule_modules is sorted in reverse order, so find first
@@ -164,6 +168,10 @@
expand_type = destination.params.get('type', "python")
if expand_type == "python":
expand_function_name = self.__determine_expand_function_name( destination )
+ if not expand_function_name:
+ message = ERROR_MESSAGE_NO_RULE_FUNCTION % destination
+ raise Exception( message )
+
expand_function = self.__get_expand_function( expand_function_name )
job_destination = self.__invoke_expand_function( expand_function )
if not isinstance(job_destination, galaxy.jobs.JobDestination):
diff -r 949be0b9ef72b54ea7d1584055f563e2209aeeaa -r 47efdb818f41227c4c4ff179a0c9a28e9e154acd test/unit/jobs/test_mapper.py
--- a/test/unit/jobs/test_mapper.py
+++ b/test/unit/jobs/test_mapper.py
@@ -1,6 +1,10 @@
import jobs.test_rules
-from galaxy.jobs.mapper import JobRunnerMapper
+from galaxy.jobs.mapper import (
+ JobRunnerMapper,
+ ERROR_MESSAGE_NO_RULE_FUNCTION,
+ ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND,
+)
from galaxy.jobs import JobDestination
from galaxy.util import bunch
@@ -54,6 +58,32 @@
assert mapper.job_config.rule_response == "have_resource_params"
+def test_dynamic_mapping_no_function():
+ dest = __dynamic_destination( dict( ) )
+ mapper = __mapper( dest )
+ mapper.job_wrapper.tool.all_ids = [ "no_such_function" ]
+ error_message = ERROR_MESSAGE_NO_RULE_FUNCTION % dest
+ __assert_mapper_errors_with_message( mapper, error_message )
+
+
+def test_dynamic_mapping_missing_function():
+ dest = __dynamic_destination( dict( function="missing_func" ) )
+ mapper = __mapper( dest )
+ mapper.job_wrapper.tool.all_ids = [ "no_such_function" ]
+ error_message = ERROR_MESSAGE_RULE_FUNCTION_NOT_FOUND % ( "missing_func" )
+ __assert_mapper_errors_with_message( mapper, error_message )
+
+
+def __assert_mapper_errors_with_message( mapper, message ):
+ exception = None
+ try:
+ mapper.get_job_destination( {} )
+ except Exception as e:
+ exception = e
+ assert exception
+ assert str( exception ) == message, "%s != %s" % ( str( exception ), message )
+
+
def __mapper( tool_job_destination=TOOL_JOB_DESTINATION ):
job_wrapper = MockJobWrapper( tool_job_destination )
job_config = MockJobConfig()
https://bitbucket.org/galaxy/galaxy-central/commits/ef7107f56ffb/
Changeset: ef7107f56ffb
User: jmchilton
Date: 2014-08-17 22:38:34
Summary: galaxy.job.mapper refactoring and cleanup.
Affected #: 1 file
diff -r 47efdb818f41227c4c4ff179a0c9a28e9e154acd -r ef7107f56ffb6377d0cb0bd04a5cfae9e6b22a2f lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -89,7 +89,12 @@
actual_args[ possible_arg_name ] = possible_args[ possible_arg_name ]
# Don't hit the DB to load the job object if not needed
- if "job" in function_arg_names or "user" in function_arg_names or "user_email" in function_arg_names or "resource_params" in function_arg_names:
+ require_db = False
+ for param in ["job", "user", "user_email", "resource_params"]:
+ if param in function_arg_names:
+ require_db = True
+ break
+ if require_db:
job = self.job_wrapper.get_job()
user = job.user
user_email = user and str(user.email)
@@ -106,16 +111,12 @@
if "resource_params" in function_arg_names:
# Find the dymically inserted resource parameters and give them
# to rule.
- app = self.job_wrapper.app
- param_values = job.get_param_values( app, ignore_errors=True )
+ param_values = self.__job_params( job )
resource_params = {}
try:
resource_params_raw = param_values[ "__job_resource" ]
if resource_params_raw[ "__job_resource__select" ].lower() in [ "1", "yes", "true" ]:
for key, value in resource_params_raw.iteritems():
- #if key.startswith( "__job_resource_param__" ):
- # resource_key = key[ len( "__job_resource_param__" ): ]
- # resource_params[ resource_key ] = value
resource_params[ key ] = value
except KeyError:
pass
@@ -123,6 +124,11 @@
return expand_function( **actual_args )
+ def __job_params( self, job ):
+ app = self.job_wrapper.app
+ param_values = job.get_param_values( app, ignore_errors=True )
+ return param_values
+
def __convert_url_to_destination( self, url ):
"""
Job runner URLs are deprecated, but dynamic mapper functions may still
@@ -173,28 +179,28 @@
raise Exception( message )
expand_function = self.__get_expand_function( expand_function_name )
- job_destination = self.__invoke_expand_function( expand_function )
- if not isinstance(job_destination, galaxy.jobs.JobDestination):
- job_destination_rep = str(job_destination) # Should be either id or url
- if '://' in job_destination_rep:
- job_destination = self.__convert_url_to_destination(job_destination_rep)
- else:
- job_destination = self.job_config.get_destination(job_destination_rep)
- return job_destination
+ return self.__handle_rule( expand_function )
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
+ def __handle_rule( self, rule_function ):
+ job_destination = self.__invoke_expand_function( rule_function )
+ if not isinstance(job_destination, galaxy.jobs.JobDestination):
+ job_destination_rep = str(job_destination) # Should be either id or url
+ if '://' in job_destination_rep:
+ job_destination = self.__convert_url_to_destination(job_destination_rep)
+ else:
+ job_destination = self.job_config.get_destination(job_destination_rep)
+ return job_destination
+
def __cache_job_destination( self, params, raw_job_destination=None ):
if raw_job_destination is None:
raw_job_destination = self.job_wrapper.tool.get_job_destination( params )
- #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params )
if raw_job_destination.runner == DYNAMIC_RUNNER_NAME:
job_destination = self.__handle_dynamic_job_destination( raw_job_destination )
else:
job_destination = raw_job_destination
- #job_destination_id_or_tag = raw_job_destination_id_or_tag
self.cached_job_destination = job_destination
- #self.cached_job_destination_id_or_tag = job_destination_id_or_tag
def get_job_destination( self, params ):
"""
https://bitbucket.org/galaxy/galaxy-central/commits/572a8d5ee218/
Changeset: 572a8d5ee218
User: jmchilton
Date: 2014-08-17 22:38:34
Summary: Small optimization in galaxy.jobs.rule_helper.
Affected #: 1 file
diff -r ef7107f56ffb6377d0cb0bd04a5cfae9e6b22a2f -r 572a8d5ee218aeb37f77660948cb75a439936868 lib/galaxy/jobs/rule_helper.py
--- a/lib/galaxy/jobs/rule_helper.py
+++ b/lib/galaxy/jobs/rule_helper.py
@@ -82,6 +82,10 @@
query = query.filter( model.Job.table.c.update_time >= start_date )
if for_job_states is not None:
- query = query.filter( model.Job.table.c.state.in_( for_job_states ) )
+ # Optimize the singleton case - can be much more performant in my experience.
+ if len( for_job_states ) == 1:
+ query = query.filter( model.Job.table.c.state == for_job_states[ 0 ] )
+ else:
+ query = query.filter( model.Job.table.c.state.in_( for_job_states ) )
return query
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dannon: Add c3.2xlarge instances to the cloudlaunch list. This list is being moved to bioblend.cloudman and will be updated there in the future, but this is a stopgap measure enabling use of this particularly useful instance type.
by commits-noreply@bitbucket.org 15 Aug '14
by commits-noreply@bitbucket.org 15 Aug '14
15 Aug '14
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/109f40daa21a/
Changeset: 109f40daa21a
User: dannon
Date: 2014-08-15 22:35:02
Summary: Add c3.2xlarge instances to the cloudlaunch list. This list is being moved to bioblend.cloudman and will be updated there in the future, but this is a stopgap measure enabling use of this particularly useful instance type.
Affected #: 1 file
diff -r 2d509288d484025de4f882b28b2c22069f12422e -r 109f40daa21ad0348662805f93f1cf1c9fbd6003 templates/webapps/galaxy/cloud/index.mako
--- a/templates/webapps/galaxy/cloud/index.mako
+++ b/templates/webapps/galaxy/cloud/index.mako
@@ -240,6 +240,7 @@
<select name="instance_type" id="id_instance_type"><option value="m1.large">Large</option><option value="m1.xlarge">Extra Large</option>
+ <option value="c3.2xlarge">Compute Optimized Double Extra Large</option><option value="m2.4xlarge">High-Memory Quadruple Extra Large</option></select></div>
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0