galaxy-commits
Threads by month
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
February 2013
- 2 participants
- 189 discussions
11 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/4f562e319784/
changeset: 4f562e319784
user: greg
date: 2013-02-11 20:43:31
summary: Enhance the new sharable link for repositories in the tool shed to include the changeset revision if it is not the repository tip. Standardize the citable string by changing it to sharable. Fix several methos name typos.
affected #: 8 files
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e lib/galaxy/util/shed_util_common.py
--- a/lib/galaxy/util/shed_util_common.py
+++ b/lib/galaxy/util/shed_util_common.py
@@ -944,11 +944,6 @@
for repository_components_list in val:
tool_shed, name, owner, changeset_revision = repository_components_list
repository = get_or_create_tool_shed_repository( trans, tool_shed, name, owner, changeset_revision )
-def generate_citable_link_for_repository_in_tool_shed( trans, repository ):
- """Generate the URL for citing a repository that is in the tool shed."""
- base_url = url_for( '/', qualified=True ).rstrip( '/' )
- protocol, base = base_url.split( '://' )
- return '%s://%s/view/%s/%s' % ( protocol, base, repository.user.username, repository.name )
def generate_clone_url_for_installed_repository( app, repository ):
"""Generate the URL for cloning a repository that has been installed into a Galaxy instance."""
tool_shed_url = get_url_from_repository_tool_shed( app, repository )
@@ -1310,6 +1305,14 @@
repository_dependencies=repository_dependencies_tups )
metadata_dict[ 'repository_dependencies' ] = repository_dependencies_dict
return metadata_dict, error_message
+def generate_sharable_link_for_repository_in_tool_shed( trans, repository, changeset_revision=None ):
+ """Generate the URL for sharing a repository that is in the tool shed."""
+ base_url = url_for( '/', qualified=True ).rstrip( '/' )
+ protocol, base = base_url.split( '://' )
+ sharable_url = '%s://%s/view/%s/%s' % ( protocol, base, repository.user.username, repository.name )
+ if changeset_revision:
+ sharable_url += '/%s' % changeset_revision
+ return sharable_url
def generate_tool_dependency_metadata( app, repository, changeset_revision, repository_clone_url, tool_dependencies_config, metadata_dict,
original_repository_metadata=None ):
"""
@@ -1803,7 +1806,7 @@
if parent_id is None:
# The tool did not change through all of the changeset revisions.
return old_id
-def get_previous_downloadable_changset_revision( repository, repo, before_changeset_revision ):
+def get_previous_downloadable_changeset_revision( repository, repo, before_changeset_revision ):
"""
Return the installable changeset_revision in the repository changelog prior to the changeset to which before_changeset_revision
refers. If there isn't one, return the hash value of an empty repository changelog, INITIAL_CHANGELOG_HASH.
@@ -2021,7 +2024,7 @@
def get_repository_metadata_by_id( trans, id ):
"""Get repository metadata from the database"""
return trans.sa_session.query( trans.model.RepositoryMetadata ).get( trans.security.decode_id( id ) )
-def get_repository_metadata_by_repository_id_changset_revision( trans, id, changeset_revision ):
+def get_repository_metadata_by_repository_id_changeset_revision( trans, id, changeset_revision ):
"""Get a specified metadata record for a specified repository."""
return trans.sa_session.query( trans.model.RepositoryMetadata ) \
.filter( and_( trans.model.RepositoryMetadata.table.c.repository_id == trans.security.decode_id( id ),
@@ -2236,9 +2239,9 @@
if tool_shed_is_this_tool_shed( rd_toolshed ):
repository = get_repository_by_name_and_owner( trans.app, rd_name, rd_owner )
if repository:
- repository_metadata = get_repository_metadata_by_repository_id_changset_revision( trans,
- trans.security.encode_id( repository.id ),
- rd_changeset_revision )
+ repository_metadata = get_repository_metadata_by_repository_id_changeset_revision( trans,
+ trans.security.encode_id( repository.id ),
+ rd_changeset_revision )
if repository_metadata:
# The repository changeset_revision is installable, so no updates are available.
new_key_rd_dict = {}
@@ -2249,9 +2252,9 @@
repo_dir = repository.repo_path( trans.app )
repo = hg.repository( get_configured_ui(), repo_dir )
changeset_revision = get_next_downloadable_changeset_revision( repository, repo, rd_changeset_revision )
- repository_metadata = get_repository_metadata_by_repository_id_changset_revision( trans,
- trans.security.encode_id( repository.id ),
- changeset_revision )
+ repository_metadata = get_repository_metadata_by_repository_id_changeset_revision( trans,
+ trans.security.encode_id( repository.id ),
+ changeset_revision )
if repository_metadata:
new_key_rd_dict = {}
new_key_rd_dict[ key ] = [ rd_toolshed, rd_name, rd_owner, repository_metadata.changeset_revision ]
@@ -2450,9 +2453,9 @@
toolshed, name, owner, changeset_revision = repository_dependency
if tool_shed_is_this_tool_shed( toolshed ):
required_repository = get_repository_by_name_and_owner( trans.app, name, owner )
- required_repository_metadata = get_repository_metadata_by_repository_id_changset_revision( trans,
- trans.security.encode_id( required_repository.id ),
- changeset_revision )
+ required_repository_metadata = get_repository_metadata_by_repository_id_changeset_revision( trans,
+ trans.security.encode_id( required_repository.id ),
+ changeset_revision )
if required_repository_metadata:
# The required_repository_metadata changeset_revision is installable.
required_metadata = required_repository_metadata.metadata
@@ -3075,9 +3078,9 @@
if rd_tup not in rd_tups_processed:
toolshed, name, owner, changeset_revision = rd_tup
repository = get_repository_by_name_and_owner( trans.app, name, owner )
- repository_metadata = get_repository_metadata_by_repository_id_changset_revision( trans,
- trans.security.encode_id( repository.id ),
- changeset_revision )
+ repository_metadata = get_repository_metadata_by_repository_id_changeset_revision( trans,
+ trans.security.encode_id( repository.id ),
+ changeset_revision )
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
@@ -3088,9 +3091,9 @@
if rd_tup not in rd_tups_processed:
toolshed, name, owner, changeset_revision = rd_tup
repository = get_repository_by_name_and_owner( trans.app, name, owner )
- repository_metadata = get_repository_metadata_by_repository_id_changset_revision( trans,
- trans.security.encode_id( repository.id ),
- changeset_revision )
+ repository_metadata = get_repository_metadata_by_repository_id_changeset_revision( trans,
+ trans.security.encode_id( repository.id ),
+ changeset_revision )
if repository_metadata:
metadata = repository_metadata.metadata
if metadata:
@@ -3147,7 +3150,7 @@
metadata = repository_metadata.metadata
tool_dicts = metadata[ 'tools' ]
if index == 0:
- # The first changset_revision is a special case because it will have no ancestor changeset_revisions in which to match tools.
+ # The first changeset_revision is a special case because it will have no ancestor changeset_revisions in which to match tools.
# The parent tool id for tools in the first changeset_revision will be the "old_id" in the tool config.
for tool_dict in tool_dicts:
tool_versions_dict[ tool_dict[ 'guid' ] ] = tool_dict[ 'id' ]
@@ -3298,7 +3301,7 @@
"""
# To set excluded_lower_bounds_changeset_revision, calling methods should do the following, where the value of changeset_revision
# is a downloadable changeset_revision.
- # excluded_lower_bounds_changeset_revision = get_previous_downloadable_changset_revision( repository, repo, changeset_revision )
+ # excluded_lower_bounds_changeset_revision = get_previous_downloadable_changeset_revision( repository, repo, changeset_revision )
if excluded_lower_bounds_changeset_revision == INITIAL_CHANGELOG_HASH:
appending_started = True
else:
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e lib/galaxy/webapps/community/buildapp.py
--- a/lib/galaxy/webapps/community/buildapp.py
+++ b/lib/galaxy/webapps/community/buildapp.py
@@ -63,9 +63,9 @@
# Create the universe WSGI application
webapp = CommunityWebApplication( app, session_cookie='galaxycommunitysession', name="community" )
add_ui_controllers( webapp, app )
- webapp.add_route( '/view/{owner}', controller='repository', action='citable_owner' )
- webapp.add_route( '/view/{owner}/{name}', controller='repository', action='citable_repository' )
- webapp.add_route( '/view/{owner}/{name}/{changeset_revision}', controller='repository', action='citable_repository_revision' )
+ webapp.add_route( '/view/{owner}', controller='repository', action='sharable_owner' )
+ webapp.add_route( '/view/{owner}/{name}', controller='repository', action='sharable_repository' )
+ webapp.add_route( '/view/{owner}/{name}/{changeset_revision}', controller='repository', action='sharable_repository_revision' )
webapp.add_route( '/:controller/:action', action='index' )
webapp.add_route( '/:action', controller='repository', action='index' )
webapp.add_route( '/repos/*path_info', controller='hg', action='handle_request', path_info='/' )
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e lib/galaxy/webapps/community/controllers/admin.py
--- a/lib/galaxy/webapps/community/controllers/admin.py
+++ b/lib/galaxy/webapps/community/controllers/admin.py
@@ -509,9 +509,9 @@
# know the refresh_on_change occurred, and we have the necessary repository id and change set
# revision to pass on.
for k, v in kwd.items():
- changset_revision_str = 'changeset_revision_'
- if k.startswith( changset_revision_str ):
- repository_id = trans.security.encode_id( int( k.lstrip( changset_revision_str ) ) )
+ changeset_revision_str = 'changeset_revision_'
+ if k.startswith( changeset_revision_str ):
+ repository_id = trans.security.encode_id( int( k.lstrip( changeset_revision_str ) ) )
repository = suc.get_repository_in_tool_shed( trans, repository_id )
if repository.tip( trans.app ) != v:
return trans.response.send_redirect( web.url_for( controller='repository',
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e lib/galaxy/webapps/community/controllers/repository.py
--- a/lib/galaxy/webapps/community/controllers/repository.py
+++ b/lib/galaxy/webapps/community/controllers/repository.py
@@ -697,7 +697,7 @@
kwd[ 'message' ] = 'You must be logged in to set email alerts.'
kwd[ 'status' ] = 'error'
del kwd[ 'operation' ]
- selected_changeset_revision, repository = self.get_repository_from_refresh_on_change( trans, **kwd )
+ selected_changeset_revision, repository = self.__get_repository_from_refresh_on_change( trans, **kwd )
if repository:
return trans.response.send_redirect( web.url_for( controller='repository',
action='browse_repositories',
@@ -726,7 +726,7 @@
kwd[ 'user_id' ] = trans.security.encode_id( repository.user.id )
else:
# The user selected a repository revision which results in a refresh_on_change.
- selected_changeset_revision, repository = self.get_repository_from_refresh_on_change( trans, **kwd )
+ selected_changeset_revision, repository = self.__get_repository_from_refresh_on_change( trans, **kwd )
if repository:
return trans.response.send_redirect( web.url_for( controller='repository',
action='view_or_manage_repository',
@@ -818,7 +818,7 @@
category_id = kwd.get( 'id', None )
category = suc.get_category( trans, category_id )
kwd[ 'f-Category.name' ] = category.name
- selected_changeset_revision, repository = self.get_repository_from_refresh_on_change( trans, **kwd )
+ selected_changeset_revision, repository = self.__get_repository_from_refresh_on_change( trans, **kwd )
if repository:
return trans.response.send_redirect( web.url_for( controller='repository',
action='preview_tools_in_changeset',
@@ -930,86 +930,6 @@
url += '&latest_ctx_rev=%s' % str( update_to_ctx.rev() )
return trans.response.send_redirect( url )
@web.expose
- def citable_owner( self, trans, owner ):
- """Support for citable URL for each repository owner's tools, e.g. http://example.org/view/owner."""
- try:
- user = suc.get_user_by_username( trans, owner )
- except:
- user = None
- if user:
- user_id = trans.security.encode_id( user.id )
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='index',
- user_id=user_id ) )
- else:
- return trans.show_error_message( "The tool shed <b>%s</b> contains no repositories owned by <b>%s</b>." % \
- ( web.url_for( '/', qualified=True ).rstrip( '/' ), str( owner ) ) )
- @web.expose
- def citable_repository( self, trans, owner, name ):
- """Support for citable URL for a specified repository, e.g. http://example.org/view/owner/name."""
- try:
- repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
- except:
- repository = None
- if repository:
- repository_id = trans.security.encode_id( repository.id )
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='index',
- repository_id=repository_id ) )
- else:
- # If the owner is valid, then show all of their repositories.
- try:
- user = suc.get_user_by_username( trans, owner )
- except:
- user = None
- if user:
- user_id = trans.security.encode_id( user.id )
- message = "This list of repositories owned by <b>%s</b>, does not include one named <b>%s</b>." % ( str( owner ), str( name ) )
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='index',
- user_id=user_id,
- message=message,
- status='error' ) )
- else:
- return trans.show_error_message( "The tool shed <b>%s</b> contains no repositories named <b>%s</b> with owner <b>%s</b>." % \
- ( web.url_for( '/', qualified=True ).rstrip( '/' ), str( name ), str( owner ) ) )
- @web.expose
- def citable_repository_revision( self, trans, owner, name, changeset_revision ):
- """Support for citable URL for a specified repository revision, e.g. http://example.org/view/owner/name/changeset_revision."""
- try:
- repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
- except:
- repository = None
- if repository:
- repository_id = trans.security.encode_id( repository.id )
- repository_metadata = suc.get_repository_metadata_by_repository_id_changset_revision( trans, repository_id, changeset_revision )
- if not repository_metadata:
- # Get updates to the received changeset_revision if any exist.
- repo_dir = repository.repo_path( trans.app )
- repo = hg.repository( suc.get_configured_ui(), repo_dir )
- upper_bound_changeset_revision = suc.get_next_downloadable_changeset_revision( repository, repo, changeset_revision )
- if upper_bound_changeset_revision:
- changeset_revision = upper_bound_changeset_revision
- repository_metadata = suc.get_repository_metadata_by_repository_id_changset_revision( trans, repository_id, changeset_revision )
- if repository_metadata:
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='index',
- repository_id=repository_id,
- changeset_revision=changeset_revision ) )
- else:
- message = "The change log for the repository named <b>%s</b> owned by <b>%s</b> does not include revision <b>%s</b>." % \
- ( str( name ), str( owner ), str( changeset_revision ) )
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='index',
- repository_id=repository_id,
- message=message,
- status='error' ) )
- else:
- # See if the owner is valid.
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='citable_owner',
- owner=owner ) )
- @web.expose
def contact_owner( self, trans, id, **kwd ):
params = util.Params( kwd )
message = util.restore_text( params.get( 'message', '' ) )
@@ -1521,16 +1441,16 @@
if repository_dependencies:
return encoding_util.tool_shed_encode( repository_dependencies )
return ''
- def get_repository_from_refresh_on_change( self, trans, **kwd ):
+ def __get_repository_from_refresh_on_change( self, trans, **kwd ):
# The changeset_revision_select_field in several grids performs a refresh_on_change which sends in request parameters like
# changeset_revison_1, changeset_revision_2, etc. One of the many select fields on the grid performed the refresh_on_change,
# so we loop through all of the received values to see which value is not the repository tip. If we find it, we know the
# refresh_on_change occurred and we have the necessary repository id and change set revision to pass on.
repository_id = None
for k, v in kwd.items():
- changset_revision_str = 'changeset_revision_'
- if k.startswith( changset_revision_str ):
- repository_id = trans.security.encode_id( int( k.lstrip( changset_revision_str ) ) )
+ changeset_revision_str = 'changeset_revision_'
+ if k.startswith( changeset_revision_str ):
+ repository_id = trans.security.encode_id( int( k.lstrip( changeset_revision_str ) ) )
repository = suc.get_repository_in_tool_shed( trans, repository_id )
if repository.tip( trans.app ) != v:
return v, repository
@@ -1807,7 +1727,7 @@
if repository.deprecated:
has_deprecated_repositories = True
break
- # Route in may have been from a citable URL, in whcih case we'll have a user_id and possibly a name
+ # Route in may have been from a sharable URL, in whcih case we'll have a user_id and possibly a name
# The received user_id will be the id of the repository owner.
user_id = params.get( 'user_id', None )
repository_id = params.get( 'repository_id', None )
@@ -2062,7 +1982,7 @@
is_malicious = repository_metadata.malicious
else:
# There is no repository_metadata defined for the changeset_revision, so see if it was defined in a previous changeset in the changelog.
- previous_changeset_revision = suc.get_previous_downloadable_changset_revision( repository, repo, changeset_revision )
+ previous_changeset_revision = suc.get_previous_downloadable_changeset_revision( repository, repo, changeset_revision )
if previous_changeset_revision != suc.INITIAL_CHANGELOG_HASH:
repository_metadata = suc.get_repository_metadata_by_changeset_revision( trans, id, previous_changeset_revision )
if repository_metadata:
@@ -2243,7 +2163,7 @@
repo_dir = repository.repo_path( trans.app )
repo = hg.repository( suc.get_configured_ui(), repo_dir )
# Get the lower bound changeset revision.
- lower_bound_changeset_revision = suc.get_previous_downloadable_changset_revision( repository, repo, changeset_revision )
+ lower_bound_changeset_revision = suc.get_previous_downloadable_changeset_revision( repository, repo, changeset_revision )
# Build the list of changeset revision hashes.
changeset_hashes = []
for changeset in suc.reversed_lower_upper_bounded_changelog( repo, lower_bound_changeset_revision, changeset_revision ):
@@ -2563,6 +2483,86 @@
malicious=malicious,
message=message,
status=status ) )
+ @web.expose
+ def sharable_owner( self, trans, owner ):
+ """Support for sharable URL for each repository owner's tools, e.g. http://example.org/view/owner."""
+ try:
+ user = suc.get_user_by_username( trans, owner )
+ except:
+ user = None
+ if user:
+ user_id = trans.security.encode_id( user.id )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ user_id=user_id ) )
+ else:
+ return trans.show_error_message( "The tool shed <b>%s</b> contains no repositories owned by <b>%s</b>." % \
+ ( web.url_for( '/', qualified=True ).rstrip( '/' ), str( owner ) ) )
+ @web.expose
+ def sharable_repository( self, trans, owner, name ):
+ """Support for sharable URL for a specified repository, e.g. http://example.org/view/owner/name."""
+ try:
+ repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
+ except:
+ repository = None
+ if repository:
+ repository_id = trans.security.encode_id( repository.id )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ repository_id=repository_id ) )
+ else:
+ # If the owner is valid, then show all of their repositories.
+ try:
+ user = suc.get_user_by_username( trans, owner )
+ except:
+ user = None
+ if user:
+ user_id = trans.security.encode_id( user.id )
+ message = "This list of repositories owned by <b>%s</b>, does not include one named <b>%s</b>." % ( str( owner ), str( name ) )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ user_id=user_id,
+ message=message,
+ status='error' ) )
+ else:
+ return trans.show_error_message( "The tool shed <b>%s</b> contains no repositories named <b>%s</b> with owner <b>%s</b>." % \
+ ( web.url_for( '/', qualified=True ).rstrip( '/' ), str( name ), str( owner ) ) )
+ @web.expose
+ def sharable_repository_revision( self, trans, owner, name, changeset_revision ):
+ """Support for sharable URL for a specified repository revision, e.g. http://example.org/view/owner/name/changeset_revision."""
+ try:
+ repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
+ except:
+ repository = None
+ if repository:
+ repository_id = trans.security.encode_id( repository.id )
+ repository_metadata = suc.get_repository_metadata_by_repository_id_changeset_revision( trans, repository_id, changeset_revision )
+ if not repository_metadata:
+ # Get updates to the received changeset_revision if any exist.
+ repo_dir = repository.repo_path( trans.app )
+ repo = hg.repository( suc.get_configured_ui(), repo_dir )
+ upper_bound_changeset_revision = suc.get_next_downloadable_changeset_revision( repository, repo, changeset_revision )
+ if upper_bound_changeset_revision:
+ changeset_revision = upper_bound_changeset_revision
+ repository_metadata = suc.get_repository_metadata_by_repository_id_changeset_revision( trans, repository_id, changeset_revision )
+ if repository_metadata:
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ repository_id=repository_id,
+ changeset_revision=changeset_revision ) )
+ else:
+ message = "The change log for the repository named <b>%s</b> owned by <b>%s</b> does not include revision <b>%s</b>." % \
+ ( str( name ), str( owner ), str( changeset_revision ) )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ repository_id=repository_id,
+ message=message,
+ status='error' ) )
+ else:
+ # See if the owner is valid.
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='sharable_owner',
+ owner=owner ) )
def __stringify( self, list ):
if list:
return ','.join( list )
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e templates/webapps/community/index.mako
--- a/templates/webapps/community/index.mako
+++ b/templates/webapps/community/index.mako
@@ -45,7 +45,7 @@
<div class="toolMenu"><div class="toolSectionList">
%if user_id or repository_id:
- ## The route in was a citable url.
+ ## The route in was a sharable url, and may have included a changeset_revision, although we don't check for it.
<div class="toolSectionPad"></div><div class="toolSectionTitle">
All Repositories
@@ -150,13 +150,13 @@
if trans.app.config.require_login and not trans.user:
center_url = h.url_for( controller='user', action='login', message=message, status=status )
elif repository_id and changeset_revision:
- # Route in was a citable url: /view/{owner}/{name}/{changeset_revision}.
+ # Route in was a sharable url: /view/{owner}/{name}/{changeset_revision}.
center_url = h.url_for( controller='repository', action='view_repository', id=repository_id, changeset_revision=changeset_revision, message=message, status=status )
elif repository_id:
- # Route in was a citable url: /view/{owner}/{name}.
+ # Route in was a sharable url: /view/{owner}/{name}.
center_url = h.url_for( controller='repository', action='view_repository', id=repository_id, message=message, status=status )
elif user_id:
- # Route in was a citable url: /view/{owner}.
+ # Route in was a sharable url: /view/{owner}.
center_url = h.url_for( controller='repository', action='browse_repositories', operation="repositories_by_user", user_id=user_id, message=message, status=status )
else:
center_url = h.url_for( controller='repository', action='browse_categories', message=message, status=status )
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e templates/webapps/community/repository/common.mako
--- a/templates/webapps/community/repository/common.mako
+++ b/templates/webapps/community/repository/common.mako
@@ -163,12 +163,12 @@
</script></%def>
-<%def name="render_sharable_str( repository )">
+<%def name="render_sharable_str( repository, changeset_revision=None )"><%
- from galaxy.util.shed_util_common import generate_citable_link_for_repository_in_tool_shed
- citable_str = generate_citable_link_for_repository_in_tool_shed( trans, repository )
+ from galaxy.util.shed_util_common import generate_sharable_link_for_repository_in_tool_shed
+ sharable_link = generate_sharable_link_for_repository_in_tool_shed( trans, repository, changeset_revision=changeset_revision )
%>
- ${citable_str}
+ ${sharable_link}
</%def><%def name="render_clone_str( repository )">
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e templates/webapps/community/repository/manage_repository.mako
--- a/templates/webapps/community/repository/manage_repository.mako
+++ b/templates/webapps/community/repository/manage_repository.mako
@@ -21,7 +21,8 @@
can_reset_all_metadata = not is_deprecated and len( repo ) > 0
can_review_repository = has_metadata and not is_deprecated and trans.app.security_agent.user_can_review_repositories( trans.user )
can_set_metadata = not is_new and not is_deprecated
- can_set_malicious = metadata and can_set_metadata and is_admin and changeset_revision == repository.tip( trans.app )
+ changeset_revision_is_repository_tip = changeset_revision == repository.tip( trans.app )
+ can_set_malicious = metadata and can_set_metadata and is_admin and changeset_revision_is_repository_tip
can_undeprecate = trans.user and ( is_admin or repository.user == trans.user ) and is_deprecated
can_upload = can_push
can_view_change_log = not is_new
@@ -30,10 +31,15 @@
browse_label = 'Browse or delete repository tip files'
else:
browse_label = 'Browse repository tip files'
- if changeset_revision == repository.tip( trans.app ):
+
+ if changeset_revision_is_repository_tip:
tip_str = 'repository tip'
+ sharable_link_label = 'Sharable link to this repository:'
+ sharable_link_changeset_revision = None
else:
tip_str = ''
+ sharable_link_label = 'Sharable link to this repository revision:'
+ sharable_link_changeset_revision = changeset_revision
%><%!
@@ -143,8 +149,8 @@
<div class="toolFormBody"><form name="edit_repository" id="edit_repository" action="${h.url_for( controller='repository', action='manage_repository', id=trans.security.encode_id( repository.id ) )}" method="post" ><div class="form-row">
- <label>Sharable link to this repository:</label>
- ${render_sharable_str( repository )}
+ <label>${sharable_link_label}</label>
+ ${render_sharable_str( repository, changeset_revision=sharable_link_changeset_revision )}
</div>
%if can_download:
<div class="form-row">
diff -r 400659787f76f11ee54fb8e3d97a258ab35843cb -r 4f562e319784d0a316157f8e8f6eea1202c7023e templates/webapps/community/repository/view_repository.mako
--- a/templates/webapps/community/repository/view_repository.mako
+++ b/templates/webapps/community/repository/view_repository.mako
@@ -23,11 +23,21 @@
can_review_repository = has_metadata and not is_deprecated and trans.app.security_agent.user_can_review_repositories( trans.user )
can_upload = can_push
can_view_change_log = trans.webapp.name == 'community' and not is_new
+ changeset_revision_is_repository_tip = changeset_revision == repository.tip( trans.app )
if can_push:
browse_label = 'Browse or delete repository tip files'
else:
browse_label = 'Browse repository tip files'
+
+ if changeset_revision_is_repository_tip:
+ tip_str = 'repository tip'
+ sharable_link_label = 'Sharable link to this repository:'
+ sharable_link_changeset_revision = None
+ else:
+ tip_str = ''
+ sharable_link_label = 'Sharable link to this repository revision:'
+ sharable_link_changeset_revision = changeset_revision
%><%!
@@ -119,12 +129,6 @@
<div class="toolFormBody"><form name="change_revision" id="change_revision" action="${h.url_for( controller='repository', action='view_repository', id=trans.security.encode_id( repository.id ) )}" method="post" ><div class="form-row">
- <%
- if changeset_revision == repository.tip( trans.app ):
- tip_str = 'repository tip'
- else:
- tip_str = ''
- %>
${changeset_revision_select_field.get_html()} <i>${tip_str}</i><div class="toolParamHelp" style="clear: both;">
Select a revision to inspect and download versions of tools from this repository.
@@ -139,8 +143,8 @@
<div class="toolFormTitle">Repository '${repository.name}'</div><div class="toolFormBody"><div class="form-row">
- <label>Sharable link to this repository:</label>
- ${render_sharable_str( repository )}
+ <label>${sharable_link_label}</label>
+ ${render_sharable_str( repository, changeset_revision=sharable_link_changeset_revision )}
</div>
%if can_download:
<div class="form-row">
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: inithello: Make buildbot_setup.sh actually copy the job_conf sample file.
by Bitbucket 11 Feb '13
by Bitbucket 11 Feb '13
11 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/400659787f76/
changeset: 400659787f76
user: inithello
date: 2013-02-11 19:56:11
summary: Make buildbot_setup.sh actually copy the job_conf sample file.
affected #: 1 file
diff -r bc2115a935f3f540dc6c3d1ce7e27894685a0182 -r 400659787f76f11ee54fb8e3d97a258ab35843cb buildbot_setup.sh
--- a/buildbot_setup.sh
+++ b/buildbot_setup.sh
@@ -70,7 +70,6 @@
tool_data_table_conf.xml.sample
shed_tool_data_table_conf.xml.sample
migrated_tools_conf.xml.sample
-job_conf.xml.sample_basic
tool-data/shared/ensembl/builds.txt.sample
tool-data/shared/igv/igv_build_sites.txt.sample
tool-data/shared/ncbi/builds.txt.sample
@@ -115,6 +114,9 @@
cp $sample $file
done
+echo "Copying job_conf.xml.sample_basic to job_conf.xml"
+cp job_conf.xml.sample_basic job_conf.xml
+
for dir in $DIRS; do
if [ ! -d $dir ]; then
echo "Creating $dir"
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: inithello: Add job config sample file to buildbot setup and job_conf.xml to .hgignore.
by Bitbucket 11 Feb '13
by Bitbucket 11 Feb '13
11 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/bc2115a935f3/
changeset: bc2115a935f3
user: inithello
date: 2013-02-11 19:52:02
summary: Add job config sample file to buildbot setup and job_conf.xml to .hgignore.
affected #: 2 files
diff -r 5e51b830c2b54a6e62ac3860b0e536205e7844d9 -r bc2115a935f3f540dc6c3d1ce7e27894685a0182 .hgignore
--- a/.hgignore
+++ b/.hgignore
@@ -45,6 +45,7 @@
integrated_tool_panel.xml
openid_conf.xml
shed_tool_data_table_conf.xml
+job_conf.xml
static/welcome.html.*
static/welcome.html
diff -r 5e51b830c2b54a6e62ac3860b0e536205e7844d9 -r bc2115a935f3f540dc6c3d1ce7e27894685a0182 buildbot_setup.sh
--- a/buildbot_setup.sh
+++ b/buildbot_setup.sh
@@ -70,6 +70,7 @@
tool_data_table_conf.xml.sample
shed_tool_data_table_conf.xml.sample
migrated_tools_conf.xml.sample
+job_conf.xml.sample_basic
tool-data/shared/ensembl/builds.txt.sample
tool-data/shared/igv/igv_build_sites.txt.sample
tool-data/shared/ncbi/builds.txt.sample
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/5e51b830c2b5/
changeset: 5e51b830c2b5
user: natefoo
date: 2013-02-11 19:38:06
summary: Bug fix for new job features.
affected #: 1 file
diff -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 -r 5e51b830c2b54a6e62ac3860b0e536205e7844d9 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -393,6 +393,8 @@
:returns: str -- A valid job handler ID.
"""
+ if id_or_tag is None:
+ id_or_tag = self.default_handler_id
return self.__get_single_item(self.handlers[id_or_tag])
def get_destination(self, id_or_tag):
@@ -406,6 +408,8 @@
Destinations are deepcopied as they are expected to be passed in to job
runners, which will modify them for persisting params set at runtime.
"""
+ if id_or_tag is None:
+ id_or_tag = self.default_destination_id
return copy.deepcopy(self.__get_single_item(self.destinations[id_or_tag]))
def get_destinations(self, id_or_tag):
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
3 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/37c0fd353dd7/
changeset: 37c0fd353dd7
user: natefoo
date: 2013-02-07 21:31:09
summary: Define and implement job destinations, an improved method for defining resources on which Galaxy will run jobs. Functionality is similar to the current system configured via "URLs" in the Galaxy config file, but job concurrency limits on a per-destination basis works (whereas per-URL limits were broken). Sample configs for the new destination system can be found in job_conf.xml.sample_*. Also, renamed ClusterJobRunner to AsynchronousJobRunner, moved worker threads to BaseJobRunner and significantly expanded both of these classes. Updated the tasks, local, lwr, and pbs runners to use destinations, and to inherit from AsynchronousJobRunner, drmaa coming shortly. This should be entirely backwards-compatible, so if you update to this revision and change nothing, your jobs should all still run exactly as they have before. Regardless, it is not recommended that you upgrade your Galaxy instance with jobs running unless you are willing to risk losing those jobs.
affected #: 25 files
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf job_conf.xml.sample_advanced
--- /dev/null
+++ b/job_conf.xml.sample_advanced
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<job_conf>
+ <plugins workers="4">
+ <!-- "workers" is the number of threads for the runner's work queue.
+ The default from <plugins> is used if not defined for a <plugin>.
+ -->
+ <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/>
+ <plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/>
+ <plugin id="gridengine" type="runner" load="galaxy.jobs.runners.drmaa:DRMAARunner"/>
+ </plugins>
+ <handlers default="handlers">
+ <!-- Additional job handlers - the id should match the name of a
+ [server:<id>] in universe_wsgi.ini.
+ -->
+ <handler id="handler0" tags="handlers"/>
+ <handler id="handler1" tags="handlers"/>
+ <handler id="special_handler0" tags="special_handlers"/>
+ <handler id="special_handler1" tags="special_handlers"/>
+ <handler id="trackster_handler"/>
+ </handlers>
+ <destinations default="local">
+ <!-- Destinations define details about remote resources and how jobs
+ should be executed on those remote resources.
+ -->
+ <destination id="local" runner="local"/>
+ <destination id="pbs" runner="pbs" tags="mycluster"/>
+ <destination id="pbs_longjobs" runner="pbs" tags="mycluster,longjobs">
+ <!-- Define parameters that are native to the job runner plugin. -->
+ <param id="Execution_Time">72:00:00</param>
+ </destination>
+ <destination id="remote_cluster" runner="drmaa" tags="longjobs"/>
+ <destination id="real_user_cluster" runner="drmaa">
+ <!-- TODO: The real user options should maybe not be considered runner params. -->
+ <param id="galaxy_external_runjob_script">scripts/drmaa_external_runner.py</param>
+ <param id="galaxy_external_killjob_script">scripts/drmaa_external_killer.py</param>
+ <param id="galaxy_external_chown_script">scripts/external_chown_script.py</param>
+ </destination>
+ <destination id="dynamic" runner="dynamic">
+ <!-- A destination that represents a method in the dynamic runner. -->
+ <param id="type">python</param>
+ <param id="function">foo</param>
+ </destination>
+ </destinations>
+ <tools>
+ <!-- Tools can be configured to use specific destinations or handlers,
+ identified by either the "id" or "tags" attribute. If assigned to
+ a tag, a handler or destination that matches that tag will be
+ chosen at random.
+ -->
+ <tool id="foo" handler="trackster_handler">
+ <param id="source">trackster</param>
+ </tool>
+ <tool id="bar" destination="dynamic"/>
+ <tool id="baz" handler="special_handlers" destination="bigmem"/>
+ </tools>
+ <limits>
+ <!-- Certain limits can be defined. -->
+ <limit type="registered_user_concurrent_jobs">2</limit>
+ <limit type="unregistered_user_concurrent_jobs">1</limit>
+ <limit type="job_walltime">24:00:00</limit>
+ <limit type="concurrent_jobs" id="local">1</limit>
+ <limit type="concurrent_jobs" tag="mycluster">2</limit>
+ <limit type="concurrent_jobs" tag="longjobs">1</limit>
+ </limits>
+</job_conf>
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf job_conf.xml.sample_basic
--- /dev/null
+++ b/job_conf.xml.sample_basic
@@ -0,0 +1,13 @@
+<?xml version="1.0"?>
+<!-- A sample job config that explicitly configures job running the way it is configured by default (if there is no explicit config). -->
+<job_conf>
+ <plugins>
+ <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4"/>
+ </plugins>
+ <handlers>
+ <handler id="main"/>
+ </handlers>
+ <destinations>
+ <destination id="local" runner="local"/>
+ </destinations>
+</job_conf>
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/app.py
--- a/lib/galaxy/app.py
+++ b/lib/galaxy/app.py
@@ -86,6 +86,8 @@
self.tool_data_tables.load_from_config_file( config_filename=self.config.shed_tool_data_table_config,
tool_data_path=self.tool_data_tables.tool_data_path,
from_shed_config=True )
+ # Initialize the job management configuration
+ self.job_config = jobs.JobConfiguration(self)
# Initialize the tools, making sure the list of tool configs includes the reserved migrated_tools_conf.xml file.
tool_configs = self.config.tool_configs
if self.config.migrated_tools_config not in tool_configs:
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -91,6 +91,7 @@
self.collect_outputs_from = [ x.strip() for x in kwargs.get( 'collect_outputs_from', 'new_file_path,job_working_directory' ).lower().split(',') ]
self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root )
self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root )
+ self.job_config_file = resolve_path( kwargs.get( 'job_config_file', 'job_conf.xml' ), self.root )
self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) )
self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) )
self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") )
@@ -111,8 +112,8 @@
self.smtp_server = kwargs.get( 'smtp_server', None )
self.smtp_username = kwargs.get( 'smtp_username', None )
self.smtp_password = kwargs.get( 'smtp_password', None )
- self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', None )
- self.start_job_runners = kwargs.get( 'start_job_runners', None )
+ self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', 'None' )
+ self.start_job_runners = listify(kwargs.get( 'start_job_runners', '' ))
self.expose_dataset_path = string_as_bool( kwargs.get( 'expose_dataset_path', 'False' ) )
# External Service types used in sample tracking
self.external_service_type_config_file = resolve_path( kwargs.get( 'external_service_type_config_file', 'external_service_types_conf.xml' ), self.root )
@@ -123,8 +124,8 @@
# The transfer manager and deferred job queue
self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) )
# Per-user Job concurrency limitations
+ self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) )
self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) )
- # user_job_limit for backwards-compatibility
self.registered_user_job_limit = int( kwargs.get( 'registered_user_job_limit', self.user_job_limit ) )
self.anonymous_user_job_limit = int( kwargs.get( 'anonymous_user_job_limit', self.user_job_limit ) )
self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' )
@@ -214,28 +215,20 @@
# Crummy, but PasteScript does not give you a way to determine this
if arg.lower().startswith('--server-name='):
self.server_name = arg.split('=', 1)[-1]
+ # Store all configured server names
+ self.server_names = []
+ for section in global_conf_parser.sections():
+ if section.startswith('server:'):
+ self.server_names.append(section.replace('server:', '', 1))
# Store advanced job management config
self.job_manager = kwargs.get('job_manager', self.server_name).strip()
self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ]
self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ]
- # parse the [galaxy:job_limits] section
- self.job_limits = {}
- try:
- job_limits = global_conf_parser.items( 'galaxy:job_limits' )
- for k, v in job_limits:
- # Since the URL contains a colon and possibly an equals sign, consider ' = ' the delimiter
- more_k, v = v.split(' = ', 1)
- k = '%s:%s' % (k, more_k.strip())
- v = v.strip().rsplit(None, 1)
- v[1] = int(v[1])
- self.job_limits[k] = v
- except ConfigParser.NoSectionError:
- pass
- # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory)
- if self.track_jobs_in_database is None or self.track_jobs_in_database == "None":
- self.track_jobs_in_database = True
- if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
- self.track_jobs_in_database = False
+ # Use database for job running IPC unless this is a standalone server or explicitly set in the config
+ if self.track_jobs_in_database == 'None':
+ self.track_jobs_in_database = False
+ if len(self.server_names) > 1:
+ self.track_jobs_in_database = True
else:
self.track_jobs_in_database = string_as_bool( self.track_jobs_in_database )
# Store per-tool runner configs
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -6,13 +6,17 @@
import sys
import pwd
import time
+import copy
+import random
import logging
+import datetime
import threading
import traceback
import subprocess
import galaxy
from galaxy import util, model
+from galaxy.util.bunch import Bunch
from galaxy.datatypes.tabular import *
from galaxy.datatypes.interval import *
# tabular/interval imports appear to be unused. Clean up?
@@ -22,6 +26,7 @@
from galaxy.jobs.actions.post import ActionBox
from galaxy.exceptions import ObjectInvalid
from galaxy.jobs.mapper import JobRunnerMapper
+from galaxy.jobs.runners import BaseJobRunner
log = logging.getLogger( __name__ )
@@ -47,6 +52,486 @@
self.condition.notify()
self.condition.release()
+class JobDestination( Bunch ):
+ """
+ Provides details about where a job runs
+ """
+ def __init__(self, **kwds):
+ self['id'] = None
+ self['url'] = None
+ self['tags'] = None
+ self['runner'] = None
+ self['legacy'] = False
+ self['converted'] = False
+ # dict is appropriate (rather than a bunch) since keys may not be valid as attributes
+ self['params'] = dict()
+ super(JobDestination, self).__init__(**kwds)
+
+ # Store tags as a list
+ if self.tags is not None:
+ self['tags'] = [ x.strip() for x in self.tags.split(',') ]
+
+class JobToolConfiguration( Bunch ):
+ """
+ Provides details on what handler and destination a tool should use
+
+ A JobToolConfiguration will have the required attribute 'id' and optional
+ attributes 'handler', 'destination', and 'params'
+ """
+ def __init__(self, **kwds):
+ self['handler'] = None
+ self['destination'] = None
+ self['params'] = dict()
+ super(JobToolConfiguration, self).__init__(**kwds)
+
+class JobConfiguration( object ):
+ """A parser and interface to advanced job management features.
+
+ These features are configured in the job configuration, by default, ``job_conf.xml``
+ """
+ DEFAULT_NWORKERS = 4
+ def __init__(self, app):
+ """Parse the job configuration XML.
+ """
+ self.app = app
+ self.runner_plugins = []
+ self.handlers = {}
+ self.default_handler_id = None
+ self.destinations = {}
+ self.destination_tags = {}
+ self.default_destination_id = None
+ self.tools = {}
+ self.limits = Bunch()
+
+ # Initialize the config
+ try:
+ tree = util.parse_xml(self.app.config.job_config_file)
+ self.__parse_job_conf_xml(tree)
+ except IOError:
+ log.warning( 'Job configuration "%s" does not exist, using legacy job configuration from Galaxy config file "%s" instead' % ( self.app.config.job_config_file, self.app.config.config_file ) )
+ self.__parse_job_conf_legacy()
+
+ def __parse_job_conf_xml(self, tree):
+ """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml).
+
+ :param tree: Object representing the root ``<job_conf>`` object in the job config file.
+ :type tree: ``xml.etree.ElementTree.Element``
+ """
+ root = tree.getroot()
+ log.debug('Loading job configuration from %s' % self.app.config.job_config_file)
+
+ # Parse job plugins
+ plugins = root.find('plugins')
+ if plugins is not None:
+ for plugin in self.__findall_with_required(plugins, 'plugin', ('id', 'type', 'load')):
+ if plugin.get('type') == 'runner':
+ workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS))
+ self.runner_plugins.append(dict(id=plugin.get('id'), load=plugin.get('load'), workers=int(workers)))
+ else:
+ log.error('Unknown plugin type: %s' % plugin.get('type'))
+ # Load tasks if configured
+ if self.app.config.use_tasked_jobs:
+ self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers))
+
+ # Parse handlers
+ handlers = root.find('handlers')
+ if handlers is not None:
+ for handler in self.__findall_with_required(handlers, 'handler'):
+ id = handler.get('id')
+ if id in self.handlers:
+ log.error("Handler '%s' overlaps handler with the same name, ignoring" % id)
+ else:
+ log.debug("Read definition for handler '%s'" % id)
+ self.handlers[id] = (id,)
+ if handler.get('tags', None) is not None:
+ for tag in [ x.strip() for x in handler.get('tags').split(',') ]:
+ if tag in self.handlers:
+ self.handlers[tag].append(id)
+ else:
+ self.handlers[tag] = [id]
+
+ # Determine the default handler(s)
+ self.default_handler_id = self.__get_default(handlers, self.handlers.keys())
+
+ # Parse destinations
+ destinations = root.find('destinations')
+ for destination in self.__findall_with_required(destinations, 'destination', ('id', 'runner')):
+ id = destination.get('id')
+ job_destination = JobDestination(**dict(destination.items()))
+ job_destination['params'] = self.__get_params(destination)
+ self.destinations[id] = (job_destination,)
+ if job_destination.tags is not None:
+ for tag in job_destination.tags:
+ if tag not in self.destinations:
+ self.destinations[tag] = []
+ self.destinations[tag].append(job_destination)
+
+ # Determine the default destination
+ self.default_destination_id = self.__get_default(destinations, self.destinations.keys())
+
+ # Parse tool mappings
+ tools = root.find('tools')
+ if tools is not None:
+ for tool in self.__findall_with_required(tools, 'tool'):
+ # There can be multiple definitions with identical ids, but different params
+ id = tool.get('id')
+ if id not in self.tools:
+ self.tools[id] = list()
+ self.tools[id].append(JobToolConfiguration(**dict(tool.items())))
+ self.tools[id][-1]['params'] = self.__get_params(tool)
+
+ self.limits = Bunch(registered_user_concurrent_jobs = None,
+ anonymous_user_concurrent_jobs = None,
+ walltime = None,
+ walltime_delta = None,
+ output_size = None,
+ concurrent_jobs = {})
+
+ # Parse job limits
+ limits = root.find('limits')
+ if limits is not None:
+ for limit in self.__findall_with_required(limits, 'limit', ('type',)):
+ type = limit.get('type')
+ if type == 'concurrent_jobs':
+ id = limit.get('tag', None) or limit.get('id')
+ self.limits.concurrent_jobs[id] = int(limit.text)
+ elif limit.text:
+ self.limits.__dict__[type] = limit.text
+
+ if self.limits.walltime is not None:
+ h, m, s = [ int( v ) for v in self.limits.walltime.split( ':' ) ]
+ self.limits.walltime_delta = datetime.timedelta( 0, s, 0, 0, m, h )
+
+ log.debug('Done loading job configuration')
+
+ def __parse_job_conf_legacy(self):
+ """Loads the old-style job configuration from options in the galaxy config file (by default, universe_wsgi.ini).
+ """
+ log.debug('Loading job configuration from %s' % self.app.config.config_file)
+
+ # Always load local and lwr
+ self.runner_plugins = [dict(id='local', load='local', workers=self.app.config.local_job_queue_workers), dict(id='lwr', load='lwr', workers=self.app.config.cluster_job_queue_workers)]
+ # Load tasks if configured
+ if self.app.config.use_tasked_jobs:
+ self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers))
+ for runner in self.app.config.start_job_runners:
+ self.runner_plugins.append(dict(id=runner, load=runner, workers=self.app.config.cluster_job_queue_workers))
+
+ # Set the handlers
+ for id in self.app.config.job_handlers:
+ self.handlers[id] = (id,)
+
+ self.handlers['default_job_handlers'] = self.app.config.default_job_handlers
+ self.default_handler_id = 'default_job_handlers'
+
+ # Set tool handler configs
+ for id, tool_handlers in self.app.config.tool_handlers.items():
+ self.tools[id] = list()
+ for handler_config in tool_handlers:
+ # rename the 'name' key to 'handler'
+ handler_config['handler'] = handler_config.pop('name')
+ self.tools[id].append(JobToolConfiguration(**handler_config))
+
+ # Set tool runner configs
+ for id, tool_runners in self.app.config.tool_runners.items():
+ # Might have been created in the handler parsing above
+ if id not in self.tools:
+ self.tools[id] = list()
+ for runner_config in tool_runners:
+ url = runner_config['url']
+ if url not in self.destinations:
+ # Create a new "legacy" JobDestination - it will have its URL converted to a destination params once the appropriate plugin has loaded
+ self.destinations[url] = (JobDestination(id=url, runner=url.split(':', 1)[0], url=url, legacy=True, converted=False),)
+ for tool_conf in self.tools[id]:
+ if tool_conf.params == runner_config.get('params', {}):
+ tool_conf['destination'] = url
+ break
+ else:
+ # There was not an existing config (from the handlers section) with the same params
+ # rename the 'url' key to 'destination'
+ runner_config['destination'] = runner_config.pop('url')
+ self.tools[id].append(JobToolConfiguration(**runner_config))
+
+ self.destinations[self.app.config.default_cluster_job_runner] = (JobDestination(id=self.app.config.default_cluster_job_runner, runner=self.app.config.default_cluster_job_runner.split(':', 1)[0], url=self.app.config.default_cluster_job_runner, legacy=True, converted=False),)
+ self.default_destination_id = self.app.config.default_cluster_job_runner
+
+ # Set the job limits
+ self.limits = Bunch(registered_user_concurrent_jobs = self.app.config.registered_user_job_limit,
+ anonymous_user_concurrent_jobs = self.app.config.anonymous_user_job_limit,
+ walltime = self.app.config.job_walltime,
+ walltime_delta = self.app.config.job_walltime_delta,
+ output_size = self.app.config.output_size_limit,
+ concurrent_jobs = {})
+
+ log.debug('Done loading job configuration')
+
+ def __get_default(self, parent, names):
+ """Returns the default attribute set in a parent tag like <handlers> or <destinations>, or return the ID of the child, if there is no explicit default and only one child.
+
+ :param parent: Object representing a tag that may or may not have a 'default' attribute.
+ :type parent: ``xml.etree.ElementTree.Element``
+ :param names: The list of destination or handler IDs or tags that were loaded.
+ :type names: list of str
+
+ :returns: str -- id or tag representing the default.
+ """
+ rval = parent.get('default')
+ if rval is not None:
+ # If the parent element has a 'default' attribute, use the id or tag in that attribute
+ if rval not in names:
+ raise Exception("<%s> default attribute '%s' does not match a defined id or tag in a child element" % (parent.tag, rval))
+ log.debug("<%s> default set to child with id or tag '%s'" % (parent.tag, rval))
+ elif len(names) == 1:
+ log.info("Setting <%s> default to child with id '%s'" % (parent.tag, names[0]))
+ rval = names[0]
+ else:
+ raise Exception("No <%s> default specified, please specify a valid id or tag with the 'default' attribute" % parent.tag)
+ return rval
+
+ def __findall_with_required(self, parent, match, attribs=None):
+ """Like ``xml.etree.ElementTree.Element.findall()``, except only returns children that have the specified attribs.
+
+ :param parent: Parent element in which to find.
+ :type parent: ``xml.etree.ElementTree.Element``
+ :param match: Name of child elements to find.
+ :type match: str
+ :param attribs: List of required attributes in children elements.
+ :type attribs: list of str
+
+ :returns: list of ``xml.etree.ElementTree.Element``
+ """
+ rval = []
+ if attribs is None:
+ attribs = ('id',)
+ for elem in parent.findall(match):
+ for attrib in attribs:
+ if attrib not in elem.attrib:
+ log.warning("required '%s' attribute is missing from <%s> element" % (attrib, match))
+ break
+ else:
+ rval.append(elem)
+ return rval
+
+ def __get_params(self, parent):
+ """Parses any child <param> tags in to a dictionary suitable for persistence.
+
+ :param parent: Parent element in which to find child <param> tags.
+ :type parent: ``xml.etree.ElementTree.Element``
+
+ :returns: dict
+ """
+ rval = {}
+ for param in parent.findall('param'):
+ rval[param.get('id')] = param.text
+ return rval
+
+ @property
+ def default_job_tool_configuration(self):
+ """The default JobToolConfiguration, used if a tool does not have an explicit defintion in the configuration. It consists of a reference to the default handler and default destination.
+
+ :returns: JobToolConfiguration -- a representation of a <tool> element that uses the default handler and destination
+ """
+ return JobToolConfiguration(id='default', handler=self.default_handler_id, destination=self.default_destination_id)
+
+ # Called upon instantiation of a Tool object
+ def get_job_tool_configurations(self, ids):
+ """Get all configured JobToolConfigurations for a tool ID, or, if given a list of IDs, the JobToolConfigurations for the first id in ``ids`` matching a tool definition.
+
+ .. note::
+
+ You should not mix tool shed tool IDs, versionless tool shed IDs, and tool config tool IDs that refer to the same tool.
+
+ :param ids: Tool ID or IDs to fetch the JobToolConfiguration of.
+ :type ids: list or str.
+ :returns: list -- JobToolConfiguration Bunches representing <tool> elements matching the specified ID(s).
+
+ Example tool ID strings include:
+
+ * Full tool shed id: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool/1.0.0``
+ * Tool shed id less version: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool``
+ * Tool config tool id: ``filter_tool``
+ """
+ rval = []
+ # listify if ids is a single (string) id
+ ids = util.listify(ids)
+ for id in ids:
+ if id in self.tools:
+ # If a tool has definitions that include job params but not a
+ # definition for jobs without params, include the default
+ # config
+ for job_tool_configuration in self.tools[id]:
+ if not job_tool_configuration.params:
+ break
+ else:
+ rval.append(self.default_job_tool_configuration)
+ rval.extend(self.tools[id])
+ break
+ else:
+ rval.append(self.default_job_tool_configuration)
+ return rval
+
+ def __get_single_item(self, collection):
+ """Given a collection of handlers or destinations, return one item from the collection at random.
+ """
+ # Done like this to avoid random under the assumption it's faster to avoid it
+ if len(collection) == 1:
+ return collection[0]
+ else:
+ return random.choice(collection)
+
+ # This is called by Tool.get_job_handler()
+ def get_handler(self, id_or_tag):
+ """Given a handler ID or tag, return the provided ID or an ID matching the provided tag
+
+ :param id_or_tag: A handler ID or tag.
+ :type id_or_tag: str
+
+ :returns: str -- A valid job handler ID.
+ """
+ return self.__get_single_item(self.handlers[id_or_tag])
+
+ def get_destination(self, id_or_tag):
+ """Given a destination ID or tag, return the JobDestination matching the provided ID or tag
+
+ :param id_or_tag: A destination ID or tag.
+ :type id_or_tag: str
+
+ :returns: JobDestination -- A valid destination
+
+ Destinations are deepcopied as they are expected to be passed in to job
+ runners, which will modify them for persisting params set at runtime.
+ """
+ return copy.deepcopy(self.__get_single_item(self.destinations[id_or_tag]))
+
+ def get_destinations(self, id_or_tag):
+ """Given a destination ID or tag, return all JobDestinations matching the provided ID or tag
+
+ :param id_or_tag: A destination ID or tag.
+ :type id_or_tag: str
+
+ :returns: list or tuple of JobDestinations
+
+ Destinations are not deepcopied, so they should not be passed to
+ anything which might modify them.
+ """
+ return self.destinations.get(id_or_tag, None)
+
+ def get_job_runner_plugins(self):
+ """Load all configured job runner plugins
+
+ :returns: list of job runner plugins
+ """
+ rval = {}
+ for runner in self.runner_plugins:
+ class_names = []
+ module = None
+ id = runner['id']
+ load = runner['load']
+ if ':' in load:
+ # Name to load was specified as '<module>:<class>'
+ module_name, class_name = load.rsplit(':', 1)
+ class_names = [ class_name ]
+ module = __import__( module_name )
+ else:
+ # Name to load was specified as '<module>'
+ if '.' not in load:
+ # For legacy reasons, try from galaxy.jobs.runners first if there's no '.' in the name
+ module_name = 'galaxy.jobs.runners.' + load
+ try:
+ module = __import__( module_name )
+ except ImportError:
+ # No such module, we'll retry without prepending galaxy.jobs.runners.
+ # All other exceptions (e.g. something wrong with the module code) will raise
+ pass
+ if module is None:
+ # If the name included a '.' or loading from the static runners path failed, try the original name
+ module = __import__( load )
+ module_name = load
+ if module is None:
+ # Module couldn't be loaded, error should have already been displayed
+ continue
+ for comp in module_name.split( "." )[1:]:
+ module = getattr( module, comp )
+ if not class_names:
+ # If there's not a ':', we check <module>.__all__ for class names
+ try:
+ assert module.__all__
+ class_names = module.__all__
+ except AssertionError:
+ log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % load )
+ continue
+ for class_name in class_names:
+ runner_class = getattr( module, class_name )
+ try:
+ assert issubclass(runner_class, BaseJobRunner)
+ except TypeError:
+ log.warning("A non-class name was found in __all__, ignoring: %s" % id)
+ continue
+ except AssertionError:
+ log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__))
+ continue
+ try:
+ rval[id] = runner_class( self.app, runner['workers'] )
+ except TypeError:
+ log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) )
+ rval[id] = runner_class( self.app )
+ log.debug( "Loaded job runner '%s:%s' as '%s'" % ( module_name, class_name, id ) )
+ return rval
+
+ def is_id(self, collection):
+ """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID
+
+ :param collection: A representation of a destination or handler
+ :type collection: tuple or list
+
+ :returns: bool
+ """
+ return type(collection) == tuple
+
+ def is_tag(self, collection):
+ """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID
+
+ :param collection: A representation of a destination or handler
+ :type collection: tuple or list
+
+ :returns: bool
+ """
+ return type(collection) == list
+
+ def is_handler(self, server_name):
+ """Given a server name, indicate whether the server is a job handler
+
+ :param server_name: The name to check
+ :type server_name: str
+
+ :return: bool
+ """
+ for collection in self.handlers.values():
+ if server_name in collection:
+ return True
+ return False
+
+ def convert_legacy_destinations(self, job_runners):
+ """Converts legacy (from a URL) destinations to contain the appropriate runner params defined in the URL.
+
+ :param job_runners: All loaded job runner plugins.
+ :type job_runners: list of job runner plugins
+ """
+ for id, destination in [ ( id, destinations[0] ) for id, destinations in self.destinations.items() if self.is_id(destinations) ]:
+ # Only need to deal with real destinations, not members of tags
+ if destination.legacy and not destination.converted:
+ if destination.runner in job_runners:
+ destination.params = job_runners[destination.runner].url_to_destination(destination.url).params
+ destination.converted = True
+ if destination.params:
+ log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url))
+ for k, v in destination.params.items():
+ log.debug(" %s: %s" % (k, v))
+ else:
+ log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url))
+ else:
+ log.warning("Legacy destination with id '%s' could not be converted: Unknown runner plugin: %s" % (id, destination.runner))
+
class JobWrapper( object ):
"""
Wraps a 'model.Job' with convenience methods for running processes and
@@ -81,7 +566,7 @@
self.tool_provided_job_metadata = None
# Wrapper holding the info required to restore and clean up from files used for setting metadata externally
self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job )
- self.job_runner_mapper = JobRunnerMapper( self, job.job_runner_name )
+ self.job_runner_mapper = JobRunnerMapper( self, queue.dispatcher.url_to_destination )
self.params = None
if job.params:
self.params = from_json_string( job.params )
@@ -94,7 +579,8 @@
return self.app.config.use_tasked_jobs and self.tool.parallelism
def get_job_runner_url( self ):
- return self.job_runner_mapper.get_job_runner_url( self.params )
+ log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id)
+ return self.job_destination.url
def get_parallelism(self):
return self.tool.parallelism
@@ -102,6 +588,20 @@
# legacy naming
get_job_runner = get_job_runner_url
+ @property
+ def job_destination(self):
+ """Return the JobDestination that this job will use to run. This will
+ either be a configured destination, a randomly selected destination if
+ the configured destination was a tag, or a dynamically generated
+ destination from the dynamic runner.
+
+ Calling this method for the first time causes the dynamic runner to do
+ its calculation, if any.
+
+ :returns: ``JobDestination``
+ """
+ return self.job_runner_mapper.get_job_destination(self.params)
+
def get_job( self ):
return self.sa_session.query( model.Job ).get( self.job_id )
@@ -321,11 +821,24 @@
return job.state
def set_runner( self, runner_url, external_id ):
+ log.warning('set_runner() is deprecated, use set_job_destination()')
+ self.set_job_destination(self.job_destination, external_id)
+
+ def set_job_destination(self, job_destination, external_id):
+ """
+ Persist job destination params in the database for recovery.
+
+ self.job_destination is not used because a runner may choose to rewrite
+ parts of the destination (e.g. the params).
+ """
job = self.get_job()
- self.sa_session.refresh( job )
- job.job_runner_name = runner_url
+ self.sa_session.refresh(job)
+ log.debug('(%s) Persisting job destination (destination id: %s)' % (job.id, job_destination.id))
+ job.destination_id = job_destination.id
+ job.destination_params = job_destination.params
+ job.job_runner_name = job_destination.runner
job.job_runner_external_id = external_id
- self.sa_session.add( job )
+ self.sa_session.add(job)
self.sa_session.flush()
def finish( self, stdout, stderr, tool_exit_code=None ):
@@ -699,6 +1212,28 @@
except:
log.exception( "Unable to cleanup job %d" % self.job_id )
+ def get_output_sizes( self ):
+ sizes = []
+ output_paths = self.get_output_fnames()
+ for outfile in [ str( o ) for o in output_paths ]:
+ if os.path.exists( outfile ):
+ sizes.append( ( outfile, os.stat( outfile ).st_size ) )
+ else:
+ sizes.append( ( outfile, 0 ) )
+ return sizes
+
+ def check_limits(self, runtime=None):
+ if self.app.job_config.limits.output_size > 0:
+ for outfile, size in self.get_output_sizes():
+ if size > self.app.config.output_size_limit:
+ log.warning( '(%s) Job output %s is over the output size limit' % ( self.get_id_tag(), os.path.basename( outfile ) ) )
+ return 'Job output file grew too large (greater than %s), please try different inputs or parameters' % util.nice_size( self.app.job_config.limits.output_size )
+ if self.app.job_config.limits.walltime_delta is not None and runtime is not None:
+ if runtime > self.app.job_config.limits.walltime_delta:
+ log.warning( '(%s) Job has reached walltime, it will be terminated' % ( self.get_id_tag() ) )
+ return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime
+ return None
+
def get_command_line( self ):
return self.command_line
@@ -825,16 +1360,6 @@
return ExpressionContext( meta, job_context )
return job_context
- def check_output_sizes( self ):
- sizes = []
- output_paths = self.get_output_fnames()
- for outfile in [ str( o ) for o in output_paths ]:
- if os.path.exists( outfile ):
- sizes.append( ( outfile, os.stat( outfile ).st_size ) )
- else:
- sizes.append( ( outfile, 0 ) )
- return sizes
-
def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ):
# extension could still be 'auto' if this is the upload tool.
job = self.get_job()
@@ -1148,16 +1673,6 @@
# Handled at the parent job level. Do nothing here.
pass
- def check_output_sizes( self ):
- sizes = []
- output_paths = self.get_output_fnames()
- for outfile in [ str( o ) for o in output_paths ]:
- if os.path.exists( outfile ):
- sizes.append( ( outfile, os.stat( outfile ).st_size ) )
- else:
- sizes.append( ( outfile, 0 ) )
- return sizes
-
def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ):
# There is no metadata setting for tasks. This is handled after the merge, at the job level.
return ""
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -11,7 +11,7 @@
from sqlalchemy.sql.expression import and_, or_, select, func
from galaxy import util, model
-from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper
+from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination
log = logging.getLogger( __name__ )
@@ -51,6 +51,9 @@
self.sa_session = app.model.context
self.track_jobs_in_database = self.app.config.track_jobs_in_database
+ # Initialize structures for handling job limits
+ self.__clear_user_job_count()
+
# Keep track of the pid that started the job manager, only it
# has valid threads
self.parent_pid = os.getpid()
@@ -58,6 +61,8 @@
self.queue = Queue()
# Contains jobs that are waiting (only use from monitor thread)
self.waiting_jobs = []
+ # Contains wrappers of jobs that are limited or ready (so they aren't created unnecessarily/multiple times)
+ self.job_wrappers = {}
# Helper for interruptable sleep
self.sleeper = Sleeper()
self.running = True
@@ -78,7 +83,7 @@
"""
Checks all jobs that are in the 'new', 'queued' or 'running' state in
the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
+ job handler starts.
"""
for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
.filter( ( ( model.Job.state == model.Job.states.NEW ) \
@@ -88,17 +93,32 @@
if job.tool_id not in self.app.toolbox.tools_by_id:
log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
- elif job.job_runner_name is None or (job.job_runner_name is not None and job.job_runner_external_id is None):
- if job.job_runner_name is None:
- log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
+ if job.job_runner_name is not None and job.job_runner_external_id is None:
+ # This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner.
+ log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id )
+ job.job_runner_name = None
+ if self.track_jobs_in_database:
+ job.state = model.Job.states.NEW
else:
- log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id )
+ self.queue.put( ( job.id, job.tool_id ) )
+ elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None:
+ # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist
+ # TODO: test me extensively
+ job_wrapper = JobWrapper( job, self )
+ job_wrapper.set_job_destination(self.dispatcher.url_to_destination(self.job_runner_name))
+ self.dispatcher.recover( job, job_wrapper )
+ log.info('(%s) Converted job from a URL to a destination and recovered' % (job.id))
+ elif job.job_runner_name is None:
+ # Never (fully) dispatched
+ log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
if self.track_jobs_in_database:
job.state = model.Job.states.NEW
else:
self.queue.put( ( job.id, job.tool_id ) )
else:
+ # Already dispatched and running
job_wrapper = JobWrapper( job, self )
+ job_wrapper.job_runner_mapper.cached_job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params)
self.dispatcher.recover( job, job_wrapper )
if self.sa_session.dirty:
self.sa_session.flush()
@@ -156,8 +176,6 @@
~model.Job.table.c.id.in_(hda_not_ready),
~model.Job.table.c.id.in_(ldda_not_ready))) \
.order_by(model.Job.id).all()
- # Ensure that we get new job counts on each iteration
- self.__clear_user_job_count()
else:
# Get job objects and append to watch queue for any which were
# previously waiting
@@ -174,6 +192,8 @@
jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
except Empty:
pass
+ # Ensure that we get new job counts on each iteration
+ self.__clear_user_job_count()
# Iterate over new and waiting jobs and look for any that are
# ready to run
new_waiting_jobs = []
@@ -183,14 +203,13 @@
# Some of these states will only happen when using the in-memory job queue
job_state = self.__check_if_ready_to_run( job )
if job_state == JOB_WAIT:
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
+ new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id )
elif job_state == JOB_INPUT_DELETED:
log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id )
elif job_state == JOB_READY:
- self.dispatcher.put( JobWrapper( job, self ) )
+ self.dispatcher.put( self.job_wrappers.pop( job.id ) )
log.info( "(%d) Job dispatched" % job.id )
elif job_state == JOB_DELETED:
log.info( "(%d) Job deleted by user while still queued" % job.id )
@@ -204,14 +223,20 @@
dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
self.sa_session.add( dataset_assoc.dataset.dataset )
self.sa_session.add( job )
+ elif job_state == JOB_ERROR:
+ log.error( "(%d) Error checking job readiness" % job.id )
else:
log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) )
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
+ new_waiting_jobs.append( job.id )
except Exception:
log.exception( "failure running job %d" % job.id )
# Update the waiting list
- self.waiting_jobs = new_waiting_jobs
+ if not self.track_jobs_in_database:
+ self.waiting_jobs = new_waiting_jobs
+ # Remove cached wrappers for any jobs that are no longer being tracked
+ for id in self.job_wrappers.keys():
+ if id not in new_waiting_jobs:
+ del self.job_wrappers[id]
# Flush, if we updated the state
self.sa_session.flush()
# Done with the session
@@ -239,19 +264,34 @@
continue
# don't run jobs for which the input dataset was deleted
if idata.deleted:
- JobWrapper( job, self ).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
return JOB_INPUT_DELETED
# an error in the input data causes us to bail immediately
elif idata.state == idata.states.ERROR:
- JobWrapper( job, self ).fail( "input data %s is in error state" % ( idata.hid ) )
+ self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s is in error state" % ( idata.hid ) )
return JOB_INPUT_ERROR
elif idata.state == idata.states.FAILED_METADATA:
- JobWrapper( job, self ).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
+ self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
return JOB_INPUT_ERROR
elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
# need to requeue
return JOB_WAIT
- state = self.__check_user_jobs( job )
+ # Create the job wrapper so that the destination can be set
+ if job.id not in self.job_wrappers:
+ self.job_wrappers[job.id] = JobWrapper(job, self)
+ # Cause the job_destination to be set and cached by the mapper
+ try:
+ self.job_wrappers[job.id].job_destination
+ except Exception, e:
+ failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
+ if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
+ log.exception( 'Failed to generate job destination' )
+ else:
+ log.debug( "Intentionally failing job with message (%s)" % failure_message )
+ self.job_wrappers[job.id].fail( failure_message )
+ return JOB_ERROR
+ # job is ready to run, check limits
+ state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
@@ -264,48 +304,114 @@
return state
def __clear_user_job_count( self ):
- self.user_job_count = {}
- self.user_job_count_per_runner = {}
+ self.user_job_count = None
+ self.user_job_count_per_destination = None
- def __check_user_jobs( self, job ):
+ def get_user_job_count(self, user_id):
+ self.__cache_user_job_count()
+ # This could have been incremented by a previous job dispatched on this iteration, even if we're not caching
+ rval = self.user_job_count.get(user_id, 0)
+ if not self.app.config.cache_user_job_count:
+ result = self.sa_session.execute(select([func.count(model.Job.table.c.id)]).where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id))))
+ for row in result:
+ # there should only be one row
+ rval += row[0]
+ return rval
+
+ def __cache_user_job_count( self ):
+ # Cache the job count if necessary
+ if self.user_job_count is None and self.app.config.cache_user_job_count:
+ self.user_job_count = {}
+ query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \
+ .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \
+ .group_by(model.Job.table.c.user_id))
+ for row in query:
+ self.user_job_count[row[0]] = row[1]
+ elif self.user_job_count is None:
+ self.user_job_count = {}
+
+ def get_user_job_count_per_destination(self, user_id):
+ self.__cache_user_job_count_per_destination()
+ cached = self.user_job_count_per_destination.get(user_id, {})
+ if self.app.config.cache_user_job_count:
+ rval = cached
+ else:
+ # The cached count is still used even when we're not caching, it is
+ # incremented when a job is run by this handler to ensure that
+ # multiple jobs can't get past the limits in one iteration of the
+ # queue.
+ rval = {}
+ rval.update(cached)
+ result = self.sa_session.execute(select([model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label('job_count')]) \
+ .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id))) \
+ .group_by(model.Job.table.c.destination_id))
+ for row in result:
+ # Add the count from the database to the cached count
+ rval[row['destination_id']] = rval.get(row['destination_id'], 0) + row['job_count']
+ return rval
+
+ def __cache_user_job_count_per_destination(self):
+ # Cache the job count if necessary
+ if self.user_job_count_per_destination is None and self.app.config.cache_user_job_count:
+ self.user_job_count_per_destination = {}
+ result = self.sa_session.execute(select([model.Job.table.c.user_id, model.Job.table.c.destination_id, func.count(model.Job.table.c.user_id).label('job_count')]) \
+ .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)))) \
+ .group_by(model.Job.table.c.user_id, model.Job.table.c.destination_id))
+ for row in result:
+ if row['user_id'] not in self.user_job_count_per_destination:
+ self.user_job_count_per_destination[row['user_id']] = {}
+ self.user_job_count_per_destination[row['user_id']][row['destination_id']] = row['job_count']
+ elif self.user_job_count_per_destination is None:
+ self.user_job_count_per_destination = {}
+
+ def increase_running_job_count(self, user_id, destination_id):
+ if self.user_job_count is None:
+ self.user_job_count = {}
+ if self.user_job_count_per_destination is None:
+ self.user_job_count_per_destination = {}
+ self.user_job_count[user_id] = self.user_job_count.get(user_id, 0) + 1
+ if user_id not in self.user_job_count_per_destination:
+ self.user_job_count_per_destination[user_id] = {}
+ self.user_job_count_per_destination[user_id][destination_id] = self.user_job_count_per_destination[user_id].get(destination_id, 0) + 1
+
+ def __check_user_jobs( self, job, job_wrapper ):
if job.user:
# Check the hard limit first
- if self.app.config.registered_user_job_limit:
- # Cache the job count if necessary
- if not self.user_job_count:
- query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \
- .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \
- .group_by(model.Job.table.c.user_id))
- for row in query:
- self.user_job_count[row[0]] = row[1]
- if self.user_job_count.get(job.user_id, 0) >= self.app.config.registered_user_job_limit:
+ if self.app.job_config.limits.registered_user_concurrent_jobs:
+ count = self.get_user_job_count(job.user_id)
+ # Check the user's number of dispatched jobs against the overall limit
+ if count >= self.app.job_config.limits.registered_user_concurrent_jobs:
return JOB_WAIT
- # If we pass the hard limit, also check the per-runner count
- if job.job_runner_name in self.app.config.job_limits:
- # Cache the job count if necessary
- if job.job_runner_name not in self.user_job_count_per_runner:
- self.user_job_count_per_runner[job.job_runner_name] = {}
- query_url, limit = self.app.config.job_limits[job.job_runner_name]
- base_query = select([model.Job.table.c.user_id, model.Job.table.c.job_runner_name, func.count(model.Job.table.c.user_id).label('job_count')]) \
- .where(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))) \
- .group_by(model.Job.table.c.user_id, model.Job.table.c.job_runner_name)
- if '%' in query_url or '_' in query_url:
- subq = base_query.having(model.Job.table.c.job_runner_name.like(query_url)).alias('subq')
- query = self.sa_session.execute(select([subq.c.user_id, func.sum(subq.c.job_count).label('job_count')]).group_by(subq.c.user_id))
- else:
- query = self.sa_session.execute(base_query.having(model.Job.table.c.job_runner_name == query_url))
- for row in query:
- self.user_job_count_per_runner[job.job_runner_name][row['user_id']] = row['job_count']
- if self.user_job_count_per_runner[job.job_runner_name].get(job.user_id, 0) >= self.app.config.job_limits[job.job_runner_name][1]:
+ # If we pass the hard limit, also check the per-destination count
+ id = job_wrapper.job_destination.id
+ count_per_id = self.get_user_job_count_per_destination(job.user_id)
+ if id in self.app.job_config.limits.concurrent_jobs:
+ count = count_per_id.get(id, 0)
+ # Check the user's number of dispatched jobs in the assigned destination id against the limit for that id
+ if count >= self.app.job_config.limits.concurrent_jobs[id]:
return JOB_WAIT
+ # If we pass the destination limit (if there is one), also check limits on any tags (if any)
+ if job_wrapper.job_destination.tags:
+ for tag in job_wrapper.job_destination.tags:
+ # Check each tag for this job's destination
+ if tag in self.app.job_config.limits.concurrent_jobs:
+ # Only if there's a limit defined for this tag
+ count = 0
+ for id in [ d.id for d in self.app.job_config.get_destinations(tag) ]:
+ # Add up the aggregate job total for this tag
+ count += count_per_id.get(id, 0)
+ if count >= self.app.job_config.limits.concurrent_jobs[tag]:
+ return JOB_WAIT
+ # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, id)
elif job.galaxy_session:
# Anonymous users only get the hard limit
- if self.app.config.anonymous_user_job_limit:
+ if self.app.job_config.limits.anonymous_user_concurrent_jobs:
count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
.filter( and_( model.Job.session_id == job.galaxy_session.id,
or_( model.Job.state == model.Job.states.RUNNING,
model.Job.state == model.Job.states.QUEUED ) ) ).count()
- if count >= self.app.config.anonymous_user_job_limit:
+ if count >= self.app.job_config.limits.anonymous_user_concurrent_jobs:
return JOB_WAIT
else:
log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id )
@@ -431,58 +537,41 @@
class DefaultJobDispatcher( object ):
def __init__( self, app ):
self.app = app
- self.job_runners = {}
- start_job_runners = ["local", "lwr"]
- if app.config.start_job_runners is not None:
- start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] )
- if app.config.use_tasked_jobs:
- start_job_runners.append("tasks")
- for name in start_job_runners:
- self._load_plugin( name )
- log.debug( "Job runners: " + ':'.join( start_job_runners ) )
-
- def _load_plugin( self, name ):
- module_name = 'galaxy.jobs.runners.' + name
- try:
- module = __import__( module_name )
- except:
- log.exception( 'Job runner is not loadable: %s' % module_name )
- return
- for comp in module_name.split( "." )[1:]:
- module = getattr( module, comp )
- if '__all__' not in dir( module ):
- log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name )
- return
- for obj in module.__all__:
- display_name = ':'.join( ( module_name, obj ) )
- runner = getattr( module, obj )
- self.job_runners[name] = runner( self.app )
- log.debug( 'Loaded job runner: %s' % display_name )
+ self.job_runners = self.app.job_config.get_job_runner_plugins()
+ # Once plugins are loaded, all job destinations that were created from
+ # URLs can have their URL params converted to the destination's param
+ # dict by the plugin.
+ self.app.job_config.convert_legacy_destinations(self.job_runners)
+ log.debug( "Loaded job runners plugins: " + ':'.join( self.job_runners.keys() ) )
def __get_runner_name( self, job_wrapper ):
if job_wrapper.can_split():
runner_name = "tasks"
else:
- runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0]
+ runner_name = job_wrapper.job_destination.runner
return runner_name
+ def url_to_destination( self, url ):
+ """This is used by the runner mapper (a.k.a. dynamic runner) and
+ recovery methods to have runners convert URLs to destinations.
+
+ New-style runner plugin IDs must match the URL's scheme for this to work.
+ """
+ runner_name = url.split(':', 1)[0]
+ try:
+ return self.job_runners[runner_name].url_to_destination(url)
+ except Exception, e:
+ log.error("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e))
+ return JobDestination(runner=runner_name)
+
def put( self, job_wrapper ):
- try:
- runner_name = self.__get_runner_name( job_wrapper )
- except Exception, e:
- failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
- if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
- log.exception( 'Failed to generate job runner name' )
- else:
- log.debug( "Intentionally failing job with message (%s)" % failure_message )
- job_wrapper.fail( failure_message )
- return
+ runner_name = self.__get_runner_name( job_wrapper )
try:
if isinstance(job_wrapper, TaskWrapper):
#DBTODO Refactor
- log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
+ log.debug( "(%s) Dispatching task %s to %s runner" %( job_wrapper.job_id, job_wrapper.task_id, runner_name ) )
else:
- log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
+ log.debug( "(%s) Dispatching to %s runner" %( job_wrapper.job_id, runner_name ) )
self.job_runners[runner_name].put( job_wrapper )
except KeyError:
log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/manager.py
--- a/lib/galaxy/jobs/manager.py
+++ b/lib/galaxy/jobs/manager.py
@@ -26,257 +26,18 @@
"""
def __init__( self, app ):
self.app = app
- self.job_handler = NoopHandler()
- if self.app.config.server_name in self.app.config.job_handlers:
+ if (self.app.config.track_jobs_in_database and self.app.job_config.is_handler(self.app.config.server_name)) or not self.app.config.track_jobs_in_database:
+ log.debug("Starting job handler")
self.job_handler = handler.JobHandler( app )
- if self.app.config.server_name == self.app.config.job_manager:
- job_handler = NoopHandler()
- # In the case that webapp == manager == handler, pass jobs in memory
- if not self.app.config.track_jobs_in_database:
- job_handler = self.job_handler
- # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database
- self.job_queue = JobManagerQueue( app, job_handler )
- self.job_stop_queue = JobManagerStopQueue( app, job_handler )
- if self.app.config.enable_beta_job_managers:
- from galaxy.jobs.deferred import DeferredJobQueue
- self.deferred_job_queue = DeferredJobQueue( app )
+ self.job_queue = self.job_handler.job_queue
+ self.job_stop_queue = self.job_handler.job_stop_queue
else:
+ self.job_handler = NoopHandler()
self.job_queue = self.job_stop_queue = NoopQueue()
self.job_handler.start()
def shutdown( self ):
- self.job_queue.shutdown()
- self.job_stop_queue.shutdown()
self.job_handler.shutdown()
-class JobManagerQueue( object ):
- """
- Job manager, waits for jobs to be runnable and then dispatches to a
- JobHandler.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, job_handler ):
- self.app = app
- self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory
-
- self.sa_session = app.model.context
- self.job_lock = False
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( name="JobManagerQueue.monitor_thread", target=self.__monitor )
- self.monitor_thread.setDaemon( True )
- # Recover jobs at startup
- self.__check_jobs_at_startup()
- # Start the queue
- self.monitor_thread.start()
- log.info( "job manager queue started" )
-
- def __check_jobs_at_startup( self ):
- """
- Checks all jobs that are in the 'new', 'queued' or 'running' state in
- the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
- """
- for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( ( ( model.Job.state == model.Job.states.NEW ) \
- | ( model.Job.state == model.Job.states.RUNNING ) \
- | ( model.Job.state == model.Job.states.QUEUED ) ) \
- & ( model.Job.handler == None ) ):
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
- JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
- else:
- job.handler = self.__get_handler( job ) # handler's recovery method will take it from here
- log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) )
- if self.sa_session.dirty:
- self.sa_session.flush()
-
- def __monitor( self ):
- """
- Continually iterate the waiting jobs and dispatch to a handler
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.__monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def __monitor_step( self ):
- """
- Called repeatedly by `monitor` to process waiting jobs. Gets any new
- jobs (either from the database or from its own queue), then assigns a
- handler.
- """
- # Do nothing if the queue is locked
- if self.job_lock:
- log.info( 'Job queue is administratively locked, sleeping...' )
- time.sleep( 10 )
- return
- # Pull all new jobs from the queue at once
- jobs_to_check = []
- if self.app.config.track_jobs_in_database:
- # Clear the session so we get fresh states for job and all datasets
- self.sa_session.expunge_all()
- # Fetch all new jobs
- jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( ( model.Job.state == model.Job.states.NEW ) \
- & ( model.Job.handler == None ) ).all()
- else:
- # Get job objects and append to watch queue for any which were
- # previously waiting
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, tool_id = message
- # Get the job object and append to watch queue
- jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
- except Empty:
- pass
-
- for job in jobs_to_check:
- job.handler = self.__get_handler( job )
- job.job_runner_name = self.__get_runner_url( job )
- log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) )
- self.sa_session.add( job )
-
- # If tracking in the database, handlers will pick up the job now
- self.sa_session.flush()
-
- time.sleep( 5 )
-
- # This only does something in the case that there is only one handler and it is this Galaxy process
- for job in jobs_to_check:
- self.job_handler.job_queue.put( job.id, job.tool_id )
-
- def __get_handler( self, job ):
- try:
- params = None
- if job.params:
- params = from_json_string( job.params )
- return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params )
- except:
- log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) )
- return random.choice( self.app.config.job_handlers )
-
- def __get_runner_url( self, job ):
- """This fetches the raw runner URL, and does not perform any computation e.g. for the dynamic runner"""
- try:
- return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_runner_url( job.params )
- except Exception, e:
- log.warning( 'Unable to determine job runner URL for job %s: %s' % (job.id, str(e)) )
- return None
-
- def put( self, job_id, tool ):
- """Add a job to the queue (by job identifier)"""
- if not self.app.config.track_jobs_in_database:
- self.queue.put( ( job_id, tool.id ) )
- self.sleeper.wake()
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.app.config.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job manager queue stopped" )
-
-class JobManagerStopQueue( object ):
- """
- A queue for jobs which need to be terminated prematurely.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, job_handler ):
- self.app = app
- self.job_handler = job_handler
-
- self.sa_session = app.model.context
-
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
-
- # Contains jobs that are waiting (only use from monitor thread)
- self.waiting = []
-
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( name="JobManagerStopQueue.monitor_thread", target=self.monitor )
- self.monitor_thread.setDaemon( True )
- self.monitor_thread.start()
- log.info( "job manager stop queue started" )
-
- def monitor( self ):
- """
- Continually iterate the waiting jobs, stop any that are found.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def monitor_step( self ):
- """
- Called repeatedly by `monitor` to stop jobs.
- """
- jobs_to_check = []
- # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs)
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, error_msg = message
- # Get the job object and append to watch queue
- jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
- except Empty:
- pass
-
- # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler.
- for job, error_msg in jobs_to_check:
- self.job_handler.job_stop_queue.put( job.id, error_msg )
-
- def put( self, job_id, error_msg=None ):
- self.queue.put( ( job_id, error_msg ) )
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.app.config.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job manager stop queue stopped" )
-
class NoopHandler( object ):
def __init__( self, *args, **kwargs ):
self.job_queue = NoopQueue()
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -6,7 +6,8 @@
import galaxy.jobs.rules
-DYNAMIC_RUNNER_PREFIX = "dynamic:///"
+DYNAMIC_RUNNER_NAME = "dynamic"
+DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"
class JobMappingException( Exception ):
@@ -20,9 +21,9 @@
(in the form of job_wrappers) to job runner url strings.
"""
- def __init__( self, job_wrapper, job_runner_name=None ):
+ def __init__( self, job_wrapper, url_to_destination ):
self.job_wrapper = job_wrapper
- self.job_runner_name = job_runner_name
+ self.url_to_destination = url_to_destination
self.rule_modules = self.__get_rule_modules( )
def __get_rule_modules( self ):
@@ -87,11 +88,23 @@
return expand_function( **actual_args )
- def __determine_expand_function_name( self, option_parts ):
+ def __convert_url_to_destination( self, url ):
+ """
+ Job runner URLs are deprecated, but dynamic mapper functions may still
+ be returning them. Runners are expected to be able to convert these to
+ destinations.
+
+ This method calls
+ JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn
+ calls the url_to_destination method for the appropriate runner.
+ """
+ dest = self.url_to_destination( url )
+ dest['id'] = DYNAMIC_DESTINATION_ID
+ return dest
+
+ def __determine_expand_function_name( self, destination ):
# default look for function with same name as tool, unless one specified
- expand_function_name = self.job_wrapper.tool.id
- if len( option_parts ) > 1:
- expand_function_name = option_parts[ 1 ]
+ expand_function_name = destination.params.get('function', self.job_wrapper.tool.id)
return expand_function_name
def __get_expand_function( self, expand_function_name ):
@@ -110,32 +123,57 @@
return rule_module
return None
- def __expand_dynamic_job_runner_url( self, options_str ):
- option_parts = options_str.split( '/' )
- expand_type = option_parts[ 0 ]
+ def __handle_dynamic_job_destination( self, destination ):
+ expand_type = destination.params.get('type', None)
if expand_type == "python":
- expand_function_name = self.__determine_expand_function_name( option_parts )
+ expand_function_name = self.__determine_expand_function_name( destination )
expand_function = self.__get_expand_function( expand_function_name )
- return self.__invoke_expand_function( expand_function )
+ rval = self.__invoke_expand_function( expand_function )
+ # TODO: test me extensively
+ if isinstance(rval, basestring):
+ # If the function returned a string, check if it's a URL, convert if necessary
+ if '://' in rval:
+ return self.__convert_url_to_destination(rval)
+ else:
+ return self.app.job_config.get_destination(rval)
+ elif isinstance(rval, galaxy.jobs.JobDestination):
+ # If the function generated a JobDestination, we'll use that
+ # destination directly. However, for advanced job limiting, a
+ # function may want to set the JobDestination's 'tags'
+ # attribute so that limiting can be done on a destination tag.
+ #id_or_tag = rval.get('id')
+ #if rval.get('tags', None):
+ # # functions that are generating destinations should only define one tag
+ # id_or_tag = rval.get('tags')[0]
+ #return id_or_tag, rval
+ return rval
+ else:
+ raise Exception( 'Dynamic function returned a value that could not be understood: %s' % rval )
+ elif expand_type is None:
+ raise Exception( 'Dynamic function type not specified (hint: add <param id="type">python</param> to your <destination>)' )
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
- def __cache_job_runner_url( self, params ):
- # If there's already a runner set in the Job object, don't overwrite from the tool
- if self.job_runner_name is not None and not self.job_runner_name.startswith('tasks'):
- raw_job_runner_url = self.job_runner_name
+ def __cache_job_destination( self, params ):
+ raw_job_destination = self.job_wrapper.tool.get_job_destination( params )
+ #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params )
+ if raw_job_destination.runner == DYNAMIC_RUNNER_NAME:
+ job_destination = self.__handle_dynamic_job_destination( raw_job_destination )
else:
- raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params )
- if raw_job_runner_url.startswith( DYNAMIC_RUNNER_PREFIX ):
- job_runner_url = self.__expand_dynamic_job_runner_url( raw_job_runner_url[ len( DYNAMIC_RUNNER_PREFIX ) : ] )
- else:
- job_runner_url = raw_job_runner_url
- self.cached_job_runner_url = job_runner_url
+ job_destination = raw_job_destination
+ #job_destination_id_or_tag = raw_job_destination_id_or_tag
+ self.cached_job_destination = job_destination
+ #self.cached_job_destination_id_or_tag = job_destination_id_or_tag
- def get_job_runner_url( self, params ):
+ def get_job_destination( self, params ):
"""
- Cache the job_runner_url string to avoid recalculation.
+ Cache the job_destination to avoid recalculation.
"""
- if not hasattr( self, 'cached_job_runner_url' ):
- self.__cache_job_runner_url( params )
- return self.cached_job_runner_url
+ if not hasattr( self, 'cached_job_destination' ):
+ self.__cache_job_destination( params )
+ return self.cached_job_destination
+
+ #def get_job_destination_id_or_tag( self, params ):
+ # if not hasattr( self, 'cached_job_destination_id_or_tag' ):
+ # self.__cache_job_destination( params )
+ # return self.cached_job_destination_id_or_tag
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -1,13 +1,129 @@
-import os, logging, os.path
+"""
+Base classes for job runner plugins.
+"""
+import os
+import time
+import logging
+import threading
+
+from Queue import Queue, Empty
+
+import galaxy.jobs
from galaxy import model
-from Queue import Queue, Empty
-import time
-import threading
log = logging.getLogger( __name__ )
+STOP_SIGNAL = object()
+
class BaseJobRunner( object ):
+ def __init__( self, app, nworkers ):
+ """Start the job runner
+ """
+ self.app = app
+ self.sa_session = app.model.context
+ self.nworkers = nworkers
+
+ def _init_worker_threads(self):
+ """Start ``nworkers`` worker threads.
+ """
+ self.work_queue = Queue()
+ self.work_threads = []
+ log.debug('Starting %s %s workers' % (self.nworkers, self.runner_name))
+ for i in range(self.nworkers):
+ worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
+ worker.setDaemon( True )
+ worker.start()
+ self.work_threads.append( worker )
+
+ def run_next(self):
+ """Run the next item in the work queue (a job waiting to run)
+ """
+ while 1:
+ ( method, arg ) = self.work_queue.get()
+ if method is STOP_SIGNAL:
+ return
+ # id and name are collected first so that the call of method() is the last exception.
+ try:
+ # arg should be a JobWrapper/TaskWrapper
+ job_id = arg.get_id_tag()
+ except:
+ job_id = 'unknown'
+ try:
+ name = method.__name__
+ except:
+ name = 'unknown'
+ try:
+ method(arg)
+ except:
+ log.exception( "(%s) Unhandled exception calling %s" % ( job_id, name ) )
+
+ # Causes a runner's `queue_job` method to be called from a worker thread
+ def put(self, job_wrapper):
+ """Add a job to the queue (by job identifier), indicate that the job is ready to run.
+ """
+ # Change to queued state before handing to worker thread so the runner won't pick it up again
+ job_wrapper.change_state( model.Job.states.QUEUED )
+ # Persist the destination so that the job will be included in counts if using concurrency limits
+ job_wrapper.set_job_destination( job_wrapper.job_destination, None )
+ self.mark_as_queued(job_wrapper)
+
+ def mark_as_queued(self, job_wrapper):
+ self.work_queue.put( ( self.queue_job, job_wrapper ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker threads
+ """
+ log.info( "%s: Sending stop signal to %s worker threads" % ( self.runner_name, len( self.work_threads ) ) )
+ for i in range( len( self.work_threads ) ):
+ self.work_queue.put( ( STOP_SIGNAL, None ) )
+
+ # Most runners should override the legacy URL handler methods and destination param method
+ def url_to_destination(self, url):
+ """
+ Convert a legacy URL to a JobDestination.
+
+ Job runner URLs are deprecated, JobDestinations should be used instead.
+ This base class method converts from a URL to a very basic
+ JobDestination without destination params.
+ """
+ return galaxy.jobs.JobDestination(runner=url.split(':')[0])
+
+ def parse_destination_params(self, params):
+ """Parse the JobDestination ``params`` dict and return the runner's native representation of those params.
+ """
+ raise NotImplementedError()
+
+ # Runners must override the job handling methods
+ def queue_job(self, job_wrapper):
+ """Some sanity checks that all runners' queue_job() methods are likely to want to do
+ """
+ job_id = job_wrapper.get_id_tag()
+ job_state = job_wrapper.get_state()
+ job_wrapper.is_ready = False
+
+ # Make sure the job hasn't been deleted
+ if job_state != model.Job.states.QUEUED:
+ log.info( "(%d) Job is in state %s, skipping execution" % ( job_id, job_state ) )
+ return
+
+ # Prepare the job
+ try:
+ job_wrapper.prepare()
+ job_wrapper.runner_command_line = self.build_command_line( job_wrapper )
+ except:
+ log.exception("(%d) Failure preparing job" % job_id)
+ job_wrapper.fail( "failure preparing job", exception=True )
+ return
+
+ job_wrapper.is_ready = True
+
+ def stop_job(self, job):
+ raise NotImplementedError()
+
+ def recover(self, job, job_wrapper):
+ raise NotImplementedError()
+
def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ):
"""
Compose the sequence of commands necessary to execute a job. This will
@@ -104,12 +220,11 @@
log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) )
return output_pairs
-
-class ClusterJobState( object ):
+class AsynchronousJobState( object ):
"""
- Encapsulate the state of a cluster job, this should be subclassed as
+ Encapsulate the state of an asynchronous job, this should be subclassed as
needed for various job runners to capture additional information needed
- to communicate with cluster job manager.
+ to communicate with distributed resource manager.
"""
def __init__( self ):
@@ -117,23 +232,22 @@
self.job_id = None
self.old_state = None
self.running = False
- self.runner_url = None
+ self.job_file = None
+ self.output_file = None
+ self.error_file = None
+ self.exit_code_file = None
+ self.check_count = 0
+ self.job_destination = None
-STOP_SIGNAL = object()
-
-JOB_STATUS_QUEUED = 'queue'
-JOB_STATUS_FAILED = 'fail'
-JOB_STATUS_FINISHED = 'finish'
-
-class ClusterJobRunner( BaseJobRunner ):
- """
- Not sure this is the best name for this class, but there is common code
- shared between sge, pbs, drmaa, etc...
+class AsynchronousJobRunner( BaseJobRunner ):
+ """Parent class for any job runner that runs jobs asynchronously (e.g. via
+ a distributed resource manager). Provides general methods for having a
+ thread to monitor the state of asynchronous jobs and submitting those jobs
+ to the correct methods (queue, finish, cleanup) at appropriate times..
"""
- def __init__( self, app ):
- self.app = app
- self.sa_session = app.model.context
+ def __init__( self, app, nworkers ):
+ super( AsynchronousJobRunner, self ).__init__( app, nworkers )
# 'watched' and 'queue' are both used to keep track of jobs to watch.
# 'queue' is used to add new watched jobs, and can be called from
# any thread (usually by the 'queue_job' method). 'watched' must only
@@ -147,82 +261,44 @@
self.monitor_thread.setDaemon( True )
self.monitor_thread.start()
- def _init_worker_threads(self):
- self.work_queue = Queue()
- self.work_threads = []
- nworkers = self.app.config.cluster_job_queue_workers
- for i in range( nworkers ):
- worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
- worker.setDaemon( True )
- worker.start()
- self.work_threads.append( worker )
-
def handle_stop(self):
# DRMAA and SGE runners should override this and disconnect.
pass
def monitor( self ):
"""
- Watches jobs currently in the cluster queue and deals with state changes
- (queued to running) and job completion
+ Watches jobs currently in the monitor queue and deals with state
+ changes (queued to running) and job completion.
"""
while 1:
# Take any new watched jobs and put them on the monitor list
try:
while 1:
- cluster_job_state = self.monitor_queue.get_nowait()
- if cluster_job_state is STOP_SIGNAL:
+ async_job_state = self.monitor_queue.get_nowait()
+ if async_job_state is STOP_SIGNAL:
# TODO: This is where any cleanup would occur
self.handle_stop()
return
- self.watched.append( cluster_job_state )
+ self.watched.append( async_job_state )
except Empty:
pass
# Iterate over the list of watched jobs and check state
- self.check_watched_items()
+ try:
+ self.check_watched_items()
+ except Exception, e:
+ log.exception('Unhandled exception checking active jobs')
# Sleep a bit before the next state check
time.sleep( 1 )
- def run_next( self ):
- """
- Run the next item in the queue (a job waiting to run or finish )
- """
- while 1:
- ( op, obj ) = self.work_queue.get()
- if op is STOP_SIGNAL:
- return
- try:
- if op == JOB_STATUS_QUEUED:
- # If the next item is to be run, then only run it if the
- # job state is "queued". Otherwise the next item was either
- # cancelled or one of its siblings encountered an error.
- job_state = obj.get_state()
- if model.Job.states.QUEUED == job_state:
- self.queue_job( obj )
- else:
- log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) )
- elif op == JOB_STATUS_FINISHED:
- self.finish_job( obj )
- elif op == JOB_STATUS_FAILED:
- self.fail_job( obj )
- except:
- log.exception( "Uncaught exception %sing job" % op )
-
def monitor_job(self, job_state):
self.monitor_queue.put( job_state )
- def put( self, job_wrapper ):
- """Add a job to the queue (by job identifier)"""
- # Change to queued state before handing to worker thread so the runner won't pick it up again
- job_wrapper.change_state( model.Job.states.QUEUED )
- self.mark_as_queued(job_wrapper)
-
def shutdown( self ):
"""Attempts to gracefully shut down the monitor thread"""
- log.info( "sending stop signal to worker threads" )
+ log.info( "%s: Sending stop signal to monitor thread" % self.runner_name )
self.monitor_queue.put( STOP_SIGNAL )
- for i in range( len( self.work_threads ) ):
- self.work_queue.put( ( STOP_SIGNAL, None ) )
+ # Call the parent's shutdown method to stop workers
+ super( AsynchronousJobRunner, self ).shutdown()
def check_watched_items(self):
"""
@@ -233,19 +309,16 @@
reuse the logic here.
"""
new_watched = []
- for cluster_job_state in self.watched:
- new_cluster_job_state = self.check_watched_item(cluster_job_state)
- if new_cluster_job_state:
- new_watched.append(new_cluster_job_state)
+ for async_job_state in self.watched:
+ new_async_job_state = self.check_watched_item(async_job_state)
+ if new_async_job_state:
+ new_watched.append(new_async_job_state)
self.watched = new_watched
# Subclasses should implement this unless they override check_watched_items all together.
def check_watched_item(self):
raise NotImplementedError()
- def queue_job(self, job_wrapper):
- raise NotImplementedError()
-
def finish_job(self, job_state):
raise NotImplementedError()
@@ -253,10 +326,7 @@
raise NotImplementedError()
def mark_as_finished(self, job_state):
- self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) )
+ self.work_queue.put( ( self.finish_job, job_state ) )
def mark_as_failed(self, job_state):
- self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) )
-
- def mark_as_queued(self, job_wrapper):
- self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) )
+ self.work_queue.put( ( self.fail_job, job_state ) )
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 37c0fd353dd778d2de8edf51c8d454dadee26caf lib/galaxy/jobs/runners/drmaa.py
--- a/lib/galaxy/jobs/runners/drmaa.py
+++ b/lib/galaxy/jobs/runners/drmaa.py
@@ -89,7 +89,7 @@
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
STOP_SIGNAL = object()
- def __init__( self, app ):
+ def __init__( self, app, nworkers ):
"""Initialize this job runner and start the monitor thread"""
# Check if drmaa was importable, fail if not
self.app = app
This diff is so big that we needed to truncate the remainder.
https://bitbucket.org/galaxy/galaxy-central/commits/f6570f99cc5c/
changeset: f6570f99cc5c
user: natefoo
date: 2013-02-11 18:19:44
summary: Fixes for job limiting and recovery with destinations.
affected #: 3 files
diff -r 37c0fd353dd778d2de8edf51c8d454dadee26caf -r f6570f99cc5c6cf7afd7736c8ec0ddd821af758f lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -180,6 +180,11 @@
self.tools[id].append(JobToolConfiguration(**dict(tool.items())))
self.tools[id][-1]['params'] = self.__get_params(tool)
+ types = dict(registered_user_concurrent_jobs = int,
+ anonymous_user_concurrent_jobs = int,
+ walltime = str,
+ output_size = int)
+
self.limits = Bunch(registered_user_concurrent_jobs = None,
anonymous_user_concurrent_jobs = None,
walltime = None,
@@ -196,7 +201,7 @@
id = limit.get('tag', None) or limit.get('id')
self.limits.concurrent_jobs[id] = int(limit.text)
elif limit.text:
- self.limits.__dict__[type] = limit.text
+ self.limits.__dict__[type] = types.get(type, str)(limit.text)
if self.limits.walltime is not None:
h, m, s = [ int( v ) for v in self.limits.walltime.split( ':' ) ]
diff -r 37c0fd353dd778d2de8edf51c8d454dadee26caf -r f6570f99cc5c6cf7afd7736c8ec0ddd821af758f lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -105,7 +105,7 @@
# This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist
# TODO: test me extensively
job_wrapper = JobWrapper( job, self )
- job_wrapper.set_job_destination(self.dispatcher.url_to_destination(self.job_runner_name))
+ job_wrapper.set_job_destination(self.dispatcher.url_to_destination(job.job_runner_name), job.job_runner_external_id)
self.dispatcher.recover( job, job_wrapper )
log.info('(%s) Converted job from a URL to a destination and recovered' % (job.id))
elif job.job_runner_name is None:
@@ -561,7 +561,7 @@
try:
return self.job_runners[runner_name].url_to_destination(url)
except Exception, e:
- log.error("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e))
+ log.exception("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e))
return JobDestination(runner=runner_name)
def put( self, job_wrapper ):
diff -r 37c0fd353dd778d2de8edf51c8d454dadee26caf -r f6570f99cc5c6cf7afd7736c8ec0ddd821af758f lib/galaxy/jobs/runners/pbs.py
--- a/lib/galaxy/jobs/runners/pbs.py
+++ b/lib/galaxy/jobs/runners/pbs.py
@@ -167,6 +167,8 @@
param, value = opt.split( None, 1 )
params[param] = value
+ log.debug("Converted URL '%s' to destination runner=pbs, params=%s" % (url, params))
+
# Create a dynamic JobDestination
return JobDestination(runner='pbs', params=params)
https://bitbucket.org/galaxy/galaxy-central/commits/f3ff8526fde6/
changeset: f3ff8526fde6
user: natefoo
date: 2013-02-11 18:27:37
summary: Merge in job URL->destination changes.
If you have running jobs and are using DRMAA, Condor, or the CLI runner plugins and you are not willing to lose those jobs, please wait for the commit converting those plugins to the new style.
affected #: 25 files
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 job_conf.xml.sample_advanced
--- /dev/null
+++ b/job_conf.xml.sample_advanced
@@ -0,0 +1,65 @@
+<?xml version="1.0"?>
+<job_conf>
+ <plugins workers="4">
+ <!-- "workers" is the number of threads for the runner's work queue.
+ The default from <plugins> is used if not defined for a <plugin>.
+ -->
+ <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner"/>
+ <plugin id="pbs" type="runner" load="galaxy.jobs.runners.pbs:PBSJobRunner" workers="2"/>
+ <plugin id="gridengine" type="runner" load="galaxy.jobs.runners.drmaa:DRMAARunner"/>
+ </plugins>
+ <handlers default="handlers">
+ <!-- Additional job handlers - the id should match the name of a
+ [server:<id>] in universe_wsgi.ini.
+ -->
+ <handler id="handler0" tags="handlers"/>
+ <handler id="handler1" tags="handlers"/>
+ <handler id="special_handler0" tags="special_handlers"/>
+ <handler id="special_handler1" tags="special_handlers"/>
+ <handler id="trackster_handler"/>
+ </handlers>
+ <destinations default="local">
+ <!-- Destinations define details about remote resources and how jobs
+ should be executed on those remote resources.
+ -->
+ <destination id="local" runner="local"/>
+ <destination id="pbs" runner="pbs" tags="mycluster"/>
+ <destination id="pbs_longjobs" runner="pbs" tags="mycluster,longjobs">
+ <!-- Define parameters that are native to the job runner plugin. -->
+ <param id="Execution_Time">72:00:00</param>
+ </destination>
+ <destination id="remote_cluster" runner="drmaa" tags="longjobs"/>
+ <destination id="real_user_cluster" runner="drmaa">
+ <!-- TODO: The real user options should maybe not be considered runner params. -->
+ <param id="galaxy_external_runjob_script">scripts/drmaa_external_runner.py</param>
+ <param id="galaxy_external_killjob_script">scripts/drmaa_external_killer.py</param>
+ <param id="galaxy_external_chown_script">scripts/external_chown_script.py</param>
+ </destination>
+ <destination id="dynamic" runner="dynamic">
+ <!-- A destination that represents a method in the dynamic runner. -->
+ <param id="type">python</param>
+ <param id="function">foo</param>
+ </destination>
+ </destinations>
+ <tools>
+ <!-- Tools can be configured to use specific destinations or handlers,
+ identified by either the "id" or "tags" attribute. If assigned to
+ a tag, a handler or destination that matches that tag will be
+ chosen at random.
+ -->
+ <tool id="foo" handler="trackster_handler">
+ <param id="source">trackster</param>
+ </tool>
+ <tool id="bar" destination="dynamic"/>
+ <tool id="baz" handler="special_handlers" destination="bigmem"/>
+ </tools>
+ <limits>
+ <!-- Certain limits can be defined. -->
+ <limit type="registered_user_concurrent_jobs">2</limit>
+ <limit type="unregistered_user_concurrent_jobs">1</limit>
+ <limit type="job_walltime">24:00:00</limit>
+ <limit type="concurrent_jobs" id="local">1</limit>
+ <limit type="concurrent_jobs" tag="mycluster">2</limit>
+ <limit type="concurrent_jobs" tag="longjobs">1</limit>
+ </limits>
+</job_conf>
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 job_conf.xml.sample_basic
--- /dev/null
+++ b/job_conf.xml.sample_basic
@@ -0,0 +1,13 @@
+<?xml version="1.0"?>
+<!-- A sample job config that explicitly configures job running the way it is configured by default (if there is no explicit config). -->
+<job_conf>
+ <plugins>
+ <plugin id="local" type="runner" load="galaxy.jobs.runners.local:LocalJobRunner" workers="4"/>
+ </plugins>
+ <handlers>
+ <handler id="main"/>
+ </handlers>
+ <destinations>
+ <destination id="local" runner="local"/>
+ </destinations>
+</job_conf>
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/app.py
--- a/lib/galaxy/app.py
+++ b/lib/galaxy/app.py
@@ -86,6 +86,8 @@
self.tool_data_tables.load_from_config_file( config_filename=self.config.shed_tool_data_table_config,
tool_data_path=self.tool_data_tables.tool_data_path,
from_shed_config=True )
+ # Initialize the job management configuration
+ self.job_config = jobs.JobConfiguration(self)
# Initialize the tools, making sure the list of tool configs includes the reserved migrated_tools_conf.xml file.
tool_configs = self.config.tool_configs
if self.config.migrated_tools_config not in tool_configs:
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/config.py
--- a/lib/galaxy/config.py
+++ b/lib/galaxy/config.py
@@ -91,6 +91,7 @@
self.collect_outputs_from = [ x.strip() for x in kwargs.get( 'collect_outputs_from', 'new_file_path,job_working_directory' ).lower().split(',') ]
self.template_path = resolve_path( kwargs.get( "template_path", "templates" ), self.root )
self.template_cache = resolve_path( kwargs.get( "template_cache_path", "database/compiled_templates" ), self.root )
+ self.job_config_file = resolve_path( kwargs.get( 'job_config_file', 'job_conf.xml' ), self.root )
self.local_job_queue_workers = int( kwargs.get( "local_job_queue_workers", "5" ) )
self.cluster_job_queue_workers = int( kwargs.get( "cluster_job_queue_workers", "3" ) )
self.job_queue_cleanup_interval = int( kwargs.get("job_queue_cleanup_interval", "5") )
@@ -111,8 +112,8 @@
self.smtp_server = kwargs.get( 'smtp_server', None )
self.smtp_username = kwargs.get( 'smtp_username', None )
self.smtp_password = kwargs.get( 'smtp_password', None )
- self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', None )
- self.start_job_runners = kwargs.get( 'start_job_runners', None )
+ self.track_jobs_in_database = kwargs.get( 'track_jobs_in_database', 'None' )
+ self.start_job_runners = listify(kwargs.get( 'start_job_runners', '' ))
self.expose_dataset_path = string_as_bool( kwargs.get( 'expose_dataset_path', 'False' ) )
# External Service types used in sample tracking
self.external_service_type_config_file = resolve_path( kwargs.get( 'external_service_type_config_file', 'external_service_types_conf.xml' ), self.root )
@@ -123,8 +124,8 @@
# The transfer manager and deferred job queue
self.enable_beta_job_managers = string_as_bool( kwargs.get( 'enable_beta_job_managers', 'False' ) )
# Per-user Job concurrency limitations
+ self.cache_user_job_count = string_as_bool( kwargs.get( 'cache_user_job_count', False ) )
self.user_job_limit = int( kwargs.get( 'user_job_limit', 0 ) )
- # user_job_limit for backwards-compatibility
self.registered_user_job_limit = int( kwargs.get( 'registered_user_job_limit', self.user_job_limit ) )
self.anonymous_user_job_limit = int( kwargs.get( 'anonymous_user_job_limit', self.user_job_limit ) )
self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' )
@@ -214,28 +215,20 @@
# Crummy, but PasteScript does not give you a way to determine this
if arg.lower().startswith('--server-name='):
self.server_name = arg.split('=', 1)[-1]
+ # Store all configured server names
+ self.server_names = []
+ for section in global_conf_parser.sections():
+ if section.startswith('server:'):
+ self.server_names.append(section.replace('server:', '', 1))
# Store advanced job management config
self.job_manager = kwargs.get('job_manager', self.server_name).strip()
self.job_handlers = [ x.strip() for x in kwargs.get('job_handlers', self.server_name).split(',') ]
self.default_job_handlers = [ x.strip() for x in kwargs.get('default_job_handlers', ','.join( self.job_handlers ) ).split(',') ]
- # parse the [galaxy:job_limits] section
- self.job_limits = {}
- try:
- job_limits = global_conf_parser.items( 'galaxy:job_limits' )
- for k, v in job_limits:
- # Since the URL contains a colon and possibly an equals sign, consider ' = ' the delimiter
- more_k, v = v.split(' = ', 1)
- k = '%s:%s' % (k, more_k.strip())
- v = v.strip().rsplit(None, 1)
- v[1] = int(v[1])
- self.job_limits[k] = v
- except ConfigParser.NoSectionError:
- pass
- # Use database for IPC unless this is a standalone server (or multiple servers doing self dispatching in memory)
- if self.track_jobs_in_database is None or self.track_jobs_in_database == "None":
- self.track_jobs_in_database = True
- if ( len( self.job_handlers ) == 1 ) and ( self.job_handlers[0] == self.server_name ) and ( self.job_manager == self.server_name ):
- self.track_jobs_in_database = False
+ # Use database for job running IPC unless this is a standalone server or explicitly set in the config
+ if self.track_jobs_in_database == 'None':
+ self.track_jobs_in_database = False
+ if len(self.server_names) > 1:
+ self.track_jobs_in_database = True
else:
self.track_jobs_in_database = string_as_bool( self.track_jobs_in_database )
# Store per-tool runner configs
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/__init__.py
--- a/lib/galaxy/jobs/__init__.py
+++ b/lib/galaxy/jobs/__init__.py
@@ -6,13 +6,17 @@
import sys
import pwd
import time
+import copy
+import random
import logging
+import datetime
import threading
import traceback
import subprocess
import galaxy
from galaxy import util, model
+from galaxy.util.bunch import Bunch
from galaxy.datatypes.tabular import *
from galaxy.datatypes.interval import *
# tabular/interval imports appear to be unused. Clean up?
@@ -22,6 +26,7 @@
from galaxy.jobs.actions.post import ActionBox
from galaxy.exceptions import ObjectInvalid
from galaxy.jobs.mapper import JobRunnerMapper
+from galaxy.jobs.runners import BaseJobRunner
log = logging.getLogger( __name__ )
@@ -47,6 +52,491 @@
self.condition.notify()
self.condition.release()
+class JobDestination( Bunch ):
+ """
+ Provides details about where a job runs
+ """
+ def __init__(self, **kwds):
+ self['id'] = None
+ self['url'] = None
+ self['tags'] = None
+ self['runner'] = None
+ self['legacy'] = False
+ self['converted'] = False
+ # dict is appropriate (rather than a bunch) since keys may not be valid as attributes
+ self['params'] = dict()
+ super(JobDestination, self).__init__(**kwds)
+
+ # Store tags as a list
+ if self.tags is not None:
+ self['tags'] = [ x.strip() for x in self.tags.split(',') ]
+
+class JobToolConfiguration( Bunch ):
+ """
+ Provides details on what handler and destination a tool should use
+
+ A JobToolConfiguration will have the required attribute 'id' and optional
+ attributes 'handler', 'destination', and 'params'
+ """
+ def __init__(self, **kwds):
+ self['handler'] = None
+ self['destination'] = None
+ self['params'] = dict()
+ super(JobToolConfiguration, self).__init__(**kwds)
+
+class JobConfiguration( object ):
+ """A parser and interface to advanced job management features.
+
+ These features are configured in the job configuration, by default, ``job_conf.xml``
+ """
+ DEFAULT_NWORKERS = 4
+ def __init__(self, app):
+ """Parse the job configuration XML.
+ """
+ self.app = app
+ self.runner_plugins = []
+ self.handlers = {}
+ self.default_handler_id = None
+ self.destinations = {}
+ self.destination_tags = {}
+ self.default_destination_id = None
+ self.tools = {}
+ self.limits = Bunch()
+
+ # Initialize the config
+ try:
+ tree = util.parse_xml(self.app.config.job_config_file)
+ self.__parse_job_conf_xml(tree)
+ except IOError:
+ log.warning( 'Job configuration "%s" does not exist, using legacy job configuration from Galaxy config file "%s" instead' % ( self.app.config.job_config_file, self.app.config.config_file ) )
+ self.__parse_job_conf_legacy()
+
+ def __parse_job_conf_xml(self, tree):
+ """Loads the new-style job configuration from options in the job config file (by default, job_conf.xml).
+
+ :param tree: Object representing the root ``<job_conf>`` object in the job config file.
+ :type tree: ``xml.etree.ElementTree.Element``
+ """
+ root = tree.getroot()
+ log.debug('Loading job configuration from %s' % self.app.config.job_config_file)
+
+ # Parse job plugins
+ plugins = root.find('plugins')
+ if plugins is not None:
+ for plugin in self.__findall_with_required(plugins, 'plugin', ('id', 'type', 'load')):
+ if plugin.get('type') == 'runner':
+ workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS))
+ self.runner_plugins.append(dict(id=plugin.get('id'), load=plugin.get('load'), workers=int(workers)))
+ else:
+ log.error('Unknown plugin type: %s' % plugin.get('type'))
+ # Load tasks if configured
+ if self.app.config.use_tasked_jobs:
+ self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers))
+
+ # Parse handlers
+ handlers = root.find('handlers')
+ if handlers is not None:
+ for handler in self.__findall_with_required(handlers, 'handler'):
+ id = handler.get('id')
+ if id in self.handlers:
+ log.error("Handler '%s' overlaps handler with the same name, ignoring" % id)
+ else:
+ log.debug("Read definition for handler '%s'" % id)
+ self.handlers[id] = (id,)
+ if handler.get('tags', None) is not None:
+ for tag in [ x.strip() for x in handler.get('tags').split(',') ]:
+ if tag in self.handlers:
+ self.handlers[tag].append(id)
+ else:
+ self.handlers[tag] = [id]
+
+ # Determine the default handler(s)
+ self.default_handler_id = self.__get_default(handlers, self.handlers.keys())
+
+ # Parse destinations
+ destinations = root.find('destinations')
+ for destination in self.__findall_with_required(destinations, 'destination', ('id', 'runner')):
+ id = destination.get('id')
+ job_destination = JobDestination(**dict(destination.items()))
+ job_destination['params'] = self.__get_params(destination)
+ self.destinations[id] = (job_destination,)
+ if job_destination.tags is not None:
+ for tag in job_destination.tags:
+ if tag not in self.destinations:
+ self.destinations[tag] = []
+ self.destinations[tag].append(job_destination)
+
+ # Determine the default destination
+ self.default_destination_id = self.__get_default(destinations, self.destinations.keys())
+
+ # Parse tool mappings
+ tools = root.find('tools')
+ if tools is not None:
+ for tool in self.__findall_with_required(tools, 'tool'):
+ # There can be multiple definitions with identical ids, but different params
+ id = tool.get('id')
+ if id not in self.tools:
+ self.tools[id] = list()
+ self.tools[id].append(JobToolConfiguration(**dict(tool.items())))
+ self.tools[id][-1]['params'] = self.__get_params(tool)
+
+ types = dict(registered_user_concurrent_jobs = int,
+ anonymous_user_concurrent_jobs = int,
+ walltime = str,
+ output_size = int)
+
+ self.limits = Bunch(registered_user_concurrent_jobs = None,
+ anonymous_user_concurrent_jobs = None,
+ walltime = None,
+ walltime_delta = None,
+ output_size = None,
+ concurrent_jobs = {})
+
+ # Parse job limits
+ limits = root.find('limits')
+ if limits is not None:
+ for limit in self.__findall_with_required(limits, 'limit', ('type',)):
+ type = limit.get('type')
+ if type == 'concurrent_jobs':
+ id = limit.get('tag', None) or limit.get('id')
+ self.limits.concurrent_jobs[id] = int(limit.text)
+ elif limit.text:
+ self.limits.__dict__[type] = types.get(type, str)(limit.text)
+
+ if self.limits.walltime is not None:
+ h, m, s = [ int( v ) for v in self.limits.walltime.split( ':' ) ]
+ self.limits.walltime_delta = datetime.timedelta( 0, s, 0, 0, m, h )
+
+ log.debug('Done loading job configuration')
+
+ def __parse_job_conf_legacy(self):
+ """Loads the old-style job configuration from options in the galaxy config file (by default, universe_wsgi.ini).
+ """
+ log.debug('Loading job configuration from %s' % self.app.config.config_file)
+
+ # Always load local and lwr
+ self.runner_plugins = [dict(id='local', load='local', workers=self.app.config.local_job_queue_workers), dict(id='lwr', load='lwr', workers=self.app.config.cluster_job_queue_workers)]
+ # Load tasks if configured
+ if self.app.config.use_tasked_jobs:
+ self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers))
+ for runner in self.app.config.start_job_runners:
+ self.runner_plugins.append(dict(id=runner, load=runner, workers=self.app.config.cluster_job_queue_workers))
+
+ # Set the handlers
+ for id in self.app.config.job_handlers:
+ self.handlers[id] = (id,)
+
+ self.handlers['default_job_handlers'] = self.app.config.default_job_handlers
+ self.default_handler_id = 'default_job_handlers'
+
+ # Set tool handler configs
+ for id, tool_handlers in self.app.config.tool_handlers.items():
+ self.tools[id] = list()
+ for handler_config in tool_handlers:
+ # rename the 'name' key to 'handler'
+ handler_config['handler'] = handler_config.pop('name')
+ self.tools[id].append(JobToolConfiguration(**handler_config))
+
+ # Set tool runner configs
+ for id, tool_runners in self.app.config.tool_runners.items():
+ # Might have been created in the handler parsing above
+ if id not in self.tools:
+ self.tools[id] = list()
+ for runner_config in tool_runners:
+ url = runner_config['url']
+ if url not in self.destinations:
+ # Create a new "legacy" JobDestination - it will have its URL converted to a destination params once the appropriate plugin has loaded
+ self.destinations[url] = (JobDestination(id=url, runner=url.split(':', 1)[0], url=url, legacy=True, converted=False),)
+ for tool_conf in self.tools[id]:
+ if tool_conf.params == runner_config.get('params', {}):
+ tool_conf['destination'] = url
+ break
+ else:
+ # There was not an existing config (from the handlers section) with the same params
+ # rename the 'url' key to 'destination'
+ runner_config['destination'] = runner_config.pop('url')
+ self.tools[id].append(JobToolConfiguration(**runner_config))
+
+ self.destinations[self.app.config.default_cluster_job_runner] = (JobDestination(id=self.app.config.default_cluster_job_runner, runner=self.app.config.default_cluster_job_runner.split(':', 1)[0], url=self.app.config.default_cluster_job_runner, legacy=True, converted=False),)
+ self.default_destination_id = self.app.config.default_cluster_job_runner
+
+ # Set the job limits
+ self.limits = Bunch(registered_user_concurrent_jobs = self.app.config.registered_user_job_limit,
+ anonymous_user_concurrent_jobs = self.app.config.anonymous_user_job_limit,
+ walltime = self.app.config.job_walltime,
+ walltime_delta = self.app.config.job_walltime_delta,
+ output_size = self.app.config.output_size_limit,
+ concurrent_jobs = {})
+
+ log.debug('Done loading job configuration')
+
+ def __get_default(self, parent, names):
+ """Returns the default attribute set in a parent tag like <handlers> or <destinations>, or return the ID of the child, if there is no explicit default and only one child.
+
+ :param parent: Object representing a tag that may or may not have a 'default' attribute.
+ :type parent: ``xml.etree.ElementTree.Element``
+ :param names: The list of destination or handler IDs or tags that were loaded.
+ :type names: list of str
+
+ :returns: str -- id or tag representing the default.
+ """
+ rval = parent.get('default')
+ if rval is not None:
+ # If the parent element has a 'default' attribute, use the id or tag in that attribute
+ if rval not in names:
+ raise Exception("<%s> default attribute '%s' does not match a defined id or tag in a child element" % (parent.tag, rval))
+ log.debug("<%s> default set to child with id or tag '%s'" % (parent.tag, rval))
+ elif len(names) == 1:
+ log.info("Setting <%s> default to child with id '%s'" % (parent.tag, names[0]))
+ rval = names[0]
+ else:
+ raise Exception("No <%s> default specified, please specify a valid id or tag with the 'default' attribute" % parent.tag)
+ return rval
+
+ def __findall_with_required(self, parent, match, attribs=None):
+ """Like ``xml.etree.ElementTree.Element.findall()``, except only returns children that have the specified attribs.
+
+ :param parent: Parent element in which to find.
+ :type parent: ``xml.etree.ElementTree.Element``
+ :param match: Name of child elements to find.
+ :type match: str
+ :param attribs: List of required attributes in children elements.
+ :type attribs: list of str
+
+ :returns: list of ``xml.etree.ElementTree.Element``
+ """
+ rval = []
+ if attribs is None:
+ attribs = ('id',)
+ for elem in parent.findall(match):
+ for attrib in attribs:
+ if attrib not in elem.attrib:
+ log.warning("required '%s' attribute is missing from <%s> element" % (attrib, match))
+ break
+ else:
+ rval.append(elem)
+ return rval
+
+ def __get_params(self, parent):
+ """Parses any child <param> tags in to a dictionary suitable for persistence.
+
+ :param parent: Parent element in which to find child <param> tags.
+ :type parent: ``xml.etree.ElementTree.Element``
+
+ :returns: dict
+ """
+ rval = {}
+ for param in parent.findall('param'):
+ rval[param.get('id')] = param.text
+ return rval
+
+ @property
+ def default_job_tool_configuration(self):
+ """The default JobToolConfiguration, used if a tool does not have an explicit defintion in the configuration. It consists of a reference to the default handler and default destination.
+
+ :returns: JobToolConfiguration -- a representation of a <tool> element that uses the default handler and destination
+ """
+ return JobToolConfiguration(id='default', handler=self.default_handler_id, destination=self.default_destination_id)
+
+ # Called upon instantiation of a Tool object
+ def get_job_tool_configurations(self, ids):
+ """Get all configured JobToolConfigurations for a tool ID, or, if given a list of IDs, the JobToolConfigurations for the first id in ``ids`` matching a tool definition.
+
+ .. note::
+
+ You should not mix tool shed tool IDs, versionless tool shed IDs, and tool config tool IDs that refer to the same tool.
+
+ :param ids: Tool ID or IDs to fetch the JobToolConfiguration of.
+ :type ids: list or str.
+ :returns: list -- JobToolConfiguration Bunches representing <tool> elements matching the specified ID(s).
+
+ Example tool ID strings include:
+
+ * Full tool shed id: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool/1.0.0``
+ * Tool shed id less version: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool``
+ * Tool config tool id: ``filter_tool``
+ """
+ rval = []
+ # listify if ids is a single (string) id
+ ids = util.listify(ids)
+ for id in ids:
+ if id in self.tools:
+ # If a tool has definitions that include job params but not a
+ # definition for jobs without params, include the default
+ # config
+ for job_tool_configuration in self.tools[id]:
+ if not job_tool_configuration.params:
+ break
+ else:
+ rval.append(self.default_job_tool_configuration)
+ rval.extend(self.tools[id])
+ break
+ else:
+ rval.append(self.default_job_tool_configuration)
+ return rval
+
+ def __get_single_item(self, collection):
+ """Given a collection of handlers or destinations, return one item from the collection at random.
+ """
+ # Done like this to avoid random under the assumption it's faster to avoid it
+ if len(collection) == 1:
+ return collection[0]
+ else:
+ return random.choice(collection)
+
+ # This is called by Tool.get_job_handler()
+ def get_handler(self, id_or_tag):
+ """Given a handler ID or tag, return the provided ID or an ID matching the provided tag
+
+ :param id_or_tag: A handler ID or tag.
+ :type id_or_tag: str
+
+ :returns: str -- A valid job handler ID.
+ """
+ return self.__get_single_item(self.handlers[id_or_tag])
+
+ def get_destination(self, id_or_tag):
+ """Given a destination ID or tag, return the JobDestination matching the provided ID or tag
+
+ :param id_or_tag: A destination ID or tag.
+ :type id_or_tag: str
+
+ :returns: JobDestination -- A valid destination
+
+ Destinations are deepcopied as they are expected to be passed in to job
+ runners, which will modify them for persisting params set at runtime.
+ """
+ return copy.deepcopy(self.__get_single_item(self.destinations[id_or_tag]))
+
+ def get_destinations(self, id_or_tag):
+ """Given a destination ID or tag, return all JobDestinations matching the provided ID or tag
+
+ :param id_or_tag: A destination ID or tag.
+ :type id_or_tag: str
+
+ :returns: list or tuple of JobDestinations
+
+ Destinations are not deepcopied, so they should not be passed to
+ anything which might modify them.
+ """
+ return self.destinations.get(id_or_tag, None)
+
+ def get_job_runner_plugins(self):
+ """Load all configured job runner plugins
+
+ :returns: list of job runner plugins
+ """
+ rval = {}
+ for runner in self.runner_plugins:
+ class_names = []
+ module = None
+ id = runner['id']
+ load = runner['load']
+ if ':' in load:
+ # Name to load was specified as '<module>:<class>'
+ module_name, class_name = load.rsplit(':', 1)
+ class_names = [ class_name ]
+ module = __import__( module_name )
+ else:
+ # Name to load was specified as '<module>'
+ if '.' not in load:
+ # For legacy reasons, try from galaxy.jobs.runners first if there's no '.' in the name
+ module_name = 'galaxy.jobs.runners.' + load
+ try:
+ module = __import__( module_name )
+ except ImportError:
+ # No such module, we'll retry without prepending galaxy.jobs.runners.
+ # All other exceptions (e.g. something wrong with the module code) will raise
+ pass
+ if module is None:
+ # If the name included a '.' or loading from the static runners path failed, try the original name
+ module = __import__( load )
+ module_name = load
+ if module is None:
+ # Module couldn't be loaded, error should have already been displayed
+ continue
+ for comp in module_name.split( "." )[1:]:
+ module = getattr( module, comp )
+ if not class_names:
+ # If there's not a ':', we check <module>.__all__ for class names
+ try:
+ assert module.__all__
+ class_names = module.__all__
+ except AssertionError:
+ log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % load )
+ continue
+ for class_name in class_names:
+ runner_class = getattr( module, class_name )
+ try:
+ assert issubclass(runner_class, BaseJobRunner)
+ except TypeError:
+ log.warning("A non-class name was found in __all__, ignoring: %s" % id)
+ continue
+ except AssertionError:
+ log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__))
+ continue
+ try:
+ rval[id] = runner_class( self.app, runner['workers'] )
+ except TypeError:
+ log.warning( "Job runner '%s:%s' has not been converted to a new-style runner" % ( module_name, class_name ) )
+ rval[id] = runner_class( self.app )
+ log.debug( "Loaded job runner '%s:%s' as '%s'" % ( module_name, class_name, id ) )
+ return rval
+
+ def is_id(self, collection):
+ """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID
+
+ :param collection: A representation of a destination or handler
+ :type collection: tuple or list
+
+ :returns: bool
+ """
+ return type(collection) == tuple
+
+ def is_tag(self, collection):
+ """Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID
+
+ :param collection: A representation of a destination or handler
+ :type collection: tuple or list
+
+ :returns: bool
+ """
+ return type(collection) == list
+
+ def is_handler(self, server_name):
+ """Given a server name, indicate whether the server is a job handler
+
+ :param server_name: The name to check
+ :type server_name: str
+
+ :return: bool
+ """
+ for collection in self.handlers.values():
+ if server_name in collection:
+ return True
+ return False
+
+ def convert_legacy_destinations(self, job_runners):
+ """Converts legacy (from a URL) destinations to contain the appropriate runner params defined in the URL.
+
+ :param job_runners: All loaded job runner plugins.
+ :type job_runners: list of job runner plugins
+ """
+ for id, destination in [ ( id, destinations[0] ) for id, destinations in self.destinations.items() if self.is_id(destinations) ]:
+ # Only need to deal with real destinations, not members of tags
+ if destination.legacy and not destination.converted:
+ if destination.runner in job_runners:
+ destination.params = job_runners[destination.runner].url_to_destination(destination.url).params
+ destination.converted = True
+ if destination.params:
+ log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url))
+ for k, v in destination.params.items():
+ log.debug(" %s: %s" % (k, v))
+ else:
+ log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url))
+ else:
+ log.warning("Legacy destination with id '%s' could not be converted: Unknown runner plugin: %s" % (id, destination.runner))
+
class JobWrapper( object ):
"""
Wraps a 'model.Job' with convenience methods for running processes and
@@ -81,7 +571,7 @@
self.tool_provided_job_metadata = None
# Wrapper holding the info required to restore and clean up from files used for setting metadata externally
self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper( job )
- self.job_runner_mapper = JobRunnerMapper( self, job.job_runner_name )
+ self.job_runner_mapper = JobRunnerMapper( self, queue.dispatcher.url_to_destination )
self.params = None
if job.params:
self.params = from_json_string( job.params )
@@ -94,7 +584,8 @@
return self.app.config.use_tasked_jobs and self.tool.parallelism
def get_job_runner_url( self ):
- return self.job_runner_mapper.get_job_runner_url( self.params )
+ log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id)
+ return self.job_destination.url
def get_parallelism(self):
return self.tool.parallelism
@@ -102,6 +593,20 @@
# legacy naming
get_job_runner = get_job_runner_url
+ @property
+ def job_destination(self):
+ """Return the JobDestination that this job will use to run. This will
+ either be a configured destination, a randomly selected destination if
+ the configured destination was a tag, or a dynamically generated
+ destination from the dynamic runner.
+
+ Calling this method for the first time causes the dynamic runner to do
+ its calculation, if any.
+
+ :returns: ``JobDestination``
+ """
+ return self.job_runner_mapper.get_job_destination(self.params)
+
def get_job( self ):
return self.sa_session.query( model.Job ).get( self.job_id )
@@ -321,11 +826,24 @@
return job.state
def set_runner( self, runner_url, external_id ):
+ log.warning('set_runner() is deprecated, use set_job_destination()')
+ self.set_job_destination(self.job_destination, external_id)
+
+ def set_job_destination(self, job_destination, external_id):
+ """
+ Persist job destination params in the database for recovery.
+
+ self.job_destination is not used because a runner may choose to rewrite
+ parts of the destination (e.g. the params).
+ """
job = self.get_job()
- self.sa_session.refresh( job )
- job.job_runner_name = runner_url
+ self.sa_session.refresh(job)
+ log.debug('(%s) Persisting job destination (destination id: %s)' % (job.id, job_destination.id))
+ job.destination_id = job_destination.id
+ job.destination_params = job_destination.params
+ job.job_runner_name = job_destination.runner
job.job_runner_external_id = external_id
- self.sa_session.add( job )
+ self.sa_session.add(job)
self.sa_session.flush()
def finish( self, stdout, stderr, tool_exit_code=None ):
@@ -699,6 +1217,28 @@
except:
log.exception( "Unable to cleanup job %d" % self.job_id )
+ def get_output_sizes( self ):
+ sizes = []
+ output_paths = self.get_output_fnames()
+ for outfile in [ str( o ) for o in output_paths ]:
+ if os.path.exists( outfile ):
+ sizes.append( ( outfile, os.stat( outfile ).st_size ) )
+ else:
+ sizes.append( ( outfile, 0 ) )
+ return sizes
+
+ def check_limits(self, runtime=None):
+ if self.app.job_config.limits.output_size > 0:
+ for outfile, size in self.get_output_sizes():
+ if size > self.app.config.output_size_limit:
+ log.warning( '(%s) Job output %s is over the output size limit' % ( self.get_id_tag(), os.path.basename( outfile ) ) )
+ return 'Job output file grew too large (greater than %s), please try different inputs or parameters' % util.nice_size( self.app.job_config.limits.output_size )
+ if self.app.job_config.limits.walltime_delta is not None and runtime is not None:
+ if runtime > self.app.job_config.limits.walltime_delta:
+ log.warning( '(%s) Job has reached walltime, it will be terminated' % ( self.get_id_tag() ) )
+ return 'Job ran longer than the maximum allowed execution time (%s), please try different inputs or parameters' % self.app.job_config.limits.walltime
+ return None
+
def get_command_line( self ):
return self.command_line
@@ -825,16 +1365,6 @@
return ExpressionContext( meta, job_context )
return job_context
- def check_output_sizes( self ):
- sizes = []
- output_paths = self.get_output_fnames()
- for outfile in [ str( o ) for o in output_paths ]:
- if os.path.exists( outfile ):
- sizes.append( ( outfile, os.stat( outfile ).st_size ) )
- else:
- sizes.append( ( outfile, 0 ) )
- return sizes
-
def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ):
# extension could still be 'auto' if this is the upload tool.
job = self.get_job()
@@ -1148,16 +1678,6 @@
# Handled at the parent job level. Do nothing here.
pass
- def check_output_sizes( self ):
- sizes = []
- output_paths = self.get_output_fnames()
- for outfile in [ str( o ) for o in output_paths ]:
- if os.path.exists( outfile ):
- sizes.append( ( outfile, os.stat( outfile ).st_size ) )
- else:
- sizes.append( ( outfile, 0 ) )
- return sizes
-
def setup_external_metadata( self, exec_dir=None, tmp_dir=None, dataset_files_path=None, config_root=None, config_file=None, datatypes_config=None, set_extension=True, **kwds ):
# There is no metadata setting for tasks. This is handled after the merge, at the job level.
return ""
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/handler.py
--- a/lib/galaxy/jobs/handler.py
+++ b/lib/galaxy/jobs/handler.py
@@ -11,7 +11,7 @@
from sqlalchemy.sql.expression import and_, or_, select, func
from galaxy import util, model
-from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper
+from galaxy.jobs import Sleeper, JobWrapper, TaskWrapper, JobDestination
log = logging.getLogger( __name__ )
@@ -51,6 +51,9 @@
self.sa_session = app.model.context
self.track_jobs_in_database = self.app.config.track_jobs_in_database
+ # Initialize structures for handling job limits
+ self.__clear_user_job_count()
+
# Keep track of the pid that started the job manager, only it
# has valid threads
self.parent_pid = os.getpid()
@@ -58,6 +61,8 @@
self.queue = Queue()
# Contains jobs that are waiting (only use from monitor thread)
self.waiting_jobs = []
+ # Contains wrappers of jobs that are limited or ready (so they aren't created unnecessarily/multiple times)
+ self.job_wrappers = {}
# Helper for interruptable sleep
self.sleeper = Sleeper()
self.running = True
@@ -78,7 +83,7 @@
"""
Checks all jobs that are in the 'new', 'queued' or 'running' state in
the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
+ job handler starts.
"""
for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
.filter( ( ( model.Job.state == model.Job.states.NEW ) \
@@ -88,17 +93,32 @@
if job.tool_id not in self.app.toolbox.tools_by_id:
log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
- elif job.job_runner_name is None or (job.job_runner_name is not None and job.job_runner_external_id is None):
- if job.job_runner_name is None:
- log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
+ if job.job_runner_name is not None and job.job_runner_external_id is None:
+ # This could happen during certain revisions of Galaxy where a runner URL was persisted before the job was dispatched to a runner.
+ log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id )
+ job.job_runner_name = None
+ if self.track_jobs_in_database:
+ job.state = model.Job.states.NEW
else:
- log.debug( "(%s) Job runner assigned but no external ID recorded, adding to the job handler queue" % job.id )
+ self.queue.put( ( job.id, job.tool_id ) )
+ elif job.job_runner_name is not None and job.job_runner_external_id is not None and job.destination_id is None:
+ # This is the first start after upgrading from URLs to destinations, convert the URL to a destination and persist
+ # TODO: test me extensively
+ job_wrapper = JobWrapper( job, self )
+ job_wrapper.set_job_destination(self.dispatcher.url_to_destination(job.job_runner_name), job.job_runner_external_id)
+ self.dispatcher.recover( job, job_wrapper )
+ log.info('(%s) Converted job from a URL to a destination and recovered' % (job.id))
+ elif job.job_runner_name is None:
+ # Never (fully) dispatched
+ log.debug( "(%s) No job runner assigned and job still in '%s' state, adding to the job handler queue" % ( job.id, job.state ) )
if self.track_jobs_in_database:
job.state = model.Job.states.NEW
else:
self.queue.put( ( job.id, job.tool_id ) )
else:
+ # Already dispatched and running
job_wrapper = JobWrapper( job, self )
+ job_wrapper.job_runner_mapper.cached_job_destination = JobDestination(id=job.destination_id, runner=job.job_runner_name, params=job.destination_params)
self.dispatcher.recover( job, job_wrapper )
if self.sa_session.dirty:
self.sa_session.flush()
@@ -156,8 +176,6 @@
~model.Job.table.c.id.in_(hda_not_ready),
~model.Job.table.c.id.in_(ldda_not_ready))) \
.order_by(model.Job.id).all()
- # Ensure that we get new job counts on each iteration
- self.__clear_user_job_count()
else:
# Get job objects and append to watch queue for any which were
# previously waiting
@@ -174,6 +192,8 @@
jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
except Empty:
pass
+ # Ensure that we get new job counts on each iteration
+ self.__clear_user_job_count()
# Iterate over new and waiting jobs and look for any that are
# ready to run
new_waiting_jobs = []
@@ -183,14 +203,13 @@
# Some of these states will only happen when using the in-memory job queue
job_state = self.__check_if_ready_to_run( job )
if job_state == JOB_WAIT:
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
+ new_waiting_jobs.append( job.id )
elif job_state == JOB_INPUT_ERROR:
log.info( "(%d) Job unable to run: one or more inputs in error state" % job.id )
elif job_state == JOB_INPUT_DELETED:
log.info( "(%d) Job unable to run: one or more inputs deleted" % job.id )
elif job_state == JOB_READY:
- self.dispatcher.put( JobWrapper( job, self ) )
+ self.dispatcher.put( self.job_wrappers.pop( job.id ) )
log.info( "(%d) Job dispatched" % job.id )
elif job_state == JOB_DELETED:
log.info( "(%d) Job deleted by user while still queued" % job.id )
@@ -204,14 +223,20 @@
dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
self.sa_session.add( dataset_assoc.dataset.dataset )
self.sa_session.add( job )
+ elif job_state == JOB_ERROR:
+ log.error( "(%d) Error checking job readiness" % job.id )
else:
log.error( "(%d) Job in unknown state '%s'" % ( job.id, job_state ) )
- if not self.track_jobs_in_database:
- new_waiting_jobs.append( job.id )
+ new_waiting_jobs.append( job.id )
except Exception:
log.exception( "failure running job %d" % job.id )
# Update the waiting list
- self.waiting_jobs = new_waiting_jobs
+ if not self.track_jobs_in_database:
+ self.waiting_jobs = new_waiting_jobs
+ # Remove cached wrappers for any jobs that are no longer being tracked
+ for id in self.job_wrappers.keys():
+ if id not in new_waiting_jobs:
+ del self.job_wrappers[id]
# Flush, if we updated the state
self.sa_session.flush()
# Done with the session
@@ -239,19 +264,34 @@
continue
# don't run jobs for which the input dataset was deleted
if idata.deleted:
- JobWrapper( job, self ).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
+ self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s (file: %s) was deleted before the job started" % ( idata.hid, idata.file_name ) )
return JOB_INPUT_DELETED
# an error in the input data causes us to bail immediately
elif idata.state == idata.states.ERROR:
- JobWrapper( job, self ).fail( "input data %s is in error state" % ( idata.hid ) )
+ self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s is in error state" % ( idata.hid ) )
return JOB_INPUT_ERROR
elif idata.state == idata.states.FAILED_METADATA:
- JobWrapper( job, self ).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
+ self.job_wrappers.pop(job.id, JobWrapper( job, self )).fail( "input data %s failed to properly set metadata" % ( idata.hid ) )
return JOB_INPUT_ERROR
elif idata.state != idata.states.OK and not ( idata.state == idata.states.SETTING_METADATA and job.tool_id is not None and job.tool_id == self.app.datatypes_registry.set_external_metadata_tool.id ):
# need to requeue
return JOB_WAIT
- state = self.__check_user_jobs( job )
+ # Create the job wrapper so that the destination can be set
+ if job.id not in self.job_wrappers:
+ self.job_wrappers[job.id] = JobWrapper(job, self)
+ # Cause the job_destination to be set and cached by the mapper
+ try:
+ self.job_wrappers[job.id].job_destination
+ except Exception, e:
+ failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
+ if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
+ log.exception( 'Failed to generate job destination' )
+ else:
+ log.debug( "Intentionally failing job with message (%s)" % failure_message )
+ self.job_wrappers[job.id].fail( failure_message )
+ return JOB_ERROR
+ # job is ready to run, check limits
+ state = self.__check_user_jobs( job, self.job_wrappers[job.id] )
if state == JOB_READY and self.app.config.enable_quotas:
quota = self.app.quota_agent.get_quota( job.user )
if quota is not None:
@@ -264,48 +304,114 @@
return state
def __clear_user_job_count( self ):
- self.user_job_count = {}
- self.user_job_count_per_runner = {}
+ self.user_job_count = None
+ self.user_job_count_per_destination = None
- def __check_user_jobs( self, job ):
+ def get_user_job_count(self, user_id):
+ self.__cache_user_job_count()
+ # This could have been incremented by a previous job dispatched on this iteration, even if we're not caching
+ rval = self.user_job_count.get(user_id, 0)
+ if not self.app.config.cache_user_job_count:
+ result = self.sa_session.execute(select([func.count(model.Job.table.c.id)]).where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id))))
+ for row in result:
+ # there should only be one row
+ rval += row[0]
+ return rval
+
+ def __cache_user_job_count( self ):
+ # Cache the job count if necessary
+ if self.user_job_count is None and self.app.config.cache_user_job_count:
+ self.user_job_count = {}
+ query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \
+ .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \
+ .group_by(model.Job.table.c.user_id))
+ for row in query:
+ self.user_job_count[row[0]] = row[1]
+ elif self.user_job_count is None:
+ self.user_job_count = {}
+
+ def get_user_job_count_per_destination(self, user_id):
+ self.__cache_user_job_count_per_destination()
+ cached = self.user_job_count_per_destination.get(user_id, {})
+ if self.app.config.cache_user_job_count:
+ rval = cached
+ else:
+ # The cached count is still used even when we're not caching, it is
+ # incremented when a job is run by this handler to ensure that
+ # multiple jobs can't get past the limits in one iteration of the
+ # queue.
+ rval = {}
+ rval.update(cached)
+ result = self.sa_session.execute(select([model.Job.table.c.destination_id, func.count(model.Job.table.c.destination_id).label('job_count')]) \
+ .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id == user_id))) \
+ .group_by(model.Job.table.c.destination_id))
+ for row in result:
+ # Add the count from the database to the cached count
+ rval[row['destination_id']] = rval.get(row['destination_id'], 0) + row['job_count']
+ return rval
+
+ def __cache_user_job_count_per_destination(self):
+ # Cache the job count if necessary
+ if self.user_job_count_per_destination is None and self.app.config.cache_user_job_count:
+ self.user_job_count_per_destination = {}
+ result = self.sa_session.execute(select([model.Job.table.c.user_id, model.Job.table.c.destination_id, func.count(model.Job.table.c.user_id).label('job_count')]) \
+ .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)))) \
+ .group_by(model.Job.table.c.user_id, model.Job.table.c.destination_id))
+ for row in result:
+ if row['user_id'] not in self.user_job_count_per_destination:
+ self.user_job_count_per_destination[row['user_id']] = {}
+ self.user_job_count_per_destination[row['user_id']][row['destination_id']] = row['job_count']
+ elif self.user_job_count_per_destination is None:
+ self.user_job_count_per_destination = {}
+
+ def increase_running_job_count(self, user_id, destination_id):
+ if self.user_job_count is None:
+ self.user_job_count = {}
+ if self.user_job_count_per_destination is None:
+ self.user_job_count_per_destination = {}
+ self.user_job_count[user_id] = self.user_job_count.get(user_id, 0) + 1
+ if user_id not in self.user_job_count_per_destination:
+ self.user_job_count_per_destination[user_id] = {}
+ self.user_job_count_per_destination[user_id][destination_id] = self.user_job_count_per_destination[user_id].get(destination_id, 0) + 1
+
+ def __check_user_jobs( self, job, job_wrapper ):
if job.user:
# Check the hard limit first
- if self.app.config.registered_user_job_limit:
- # Cache the job count if necessary
- if not self.user_job_count:
- query = self.sa_session.execute(select([model.Job.table.c.user_id, func.count(model.Job.table.c.user_id)]) \
- .where(and_(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING)), (model.Job.table.c.user_id is not None))) \
- .group_by(model.Job.table.c.user_id))
- for row in query:
- self.user_job_count[row[0]] = row[1]
- if self.user_job_count.get(job.user_id, 0) >= self.app.config.registered_user_job_limit:
+ if self.app.job_config.limits.registered_user_concurrent_jobs:
+ count = self.get_user_job_count(job.user_id)
+ # Check the user's number of dispatched jobs against the overall limit
+ if count >= self.app.job_config.limits.registered_user_concurrent_jobs:
return JOB_WAIT
- # If we pass the hard limit, also check the per-runner count
- if job.job_runner_name in self.app.config.job_limits:
- # Cache the job count if necessary
- if job.job_runner_name not in self.user_job_count_per_runner:
- self.user_job_count_per_runner[job.job_runner_name] = {}
- query_url, limit = self.app.config.job_limits[job.job_runner_name]
- base_query = select([model.Job.table.c.user_id, model.Job.table.c.job_runner_name, func.count(model.Job.table.c.user_id).label('job_count')]) \
- .where(model.Job.table.c.state.in_((model.Job.states.QUEUED, model.Job.states.RUNNING))) \
- .group_by(model.Job.table.c.user_id, model.Job.table.c.job_runner_name)
- if '%' in query_url or '_' in query_url:
- subq = base_query.having(model.Job.table.c.job_runner_name.like(query_url)).alias('subq')
- query = self.sa_session.execute(select([subq.c.user_id, func.sum(subq.c.job_count).label('job_count')]).group_by(subq.c.user_id))
- else:
- query = self.sa_session.execute(base_query.having(model.Job.table.c.job_runner_name == query_url))
- for row in query:
- self.user_job_count_per_runner[job.job_runner_name][row['user_id']] = row['job_count']
- if self.user_job_count_per_runner[job.job_runner_name].get(job.user_id, 0) >= self.app.config.job_limits[job.job_runner_name][1]:
+ # If we pass the hard limit, also check the per-destination count
+ id = job_wrapper.job_destination.id
+ count_per_id = self.get_user_job_count_per_destination(job.user_id)
+ if id in self.app.job_config.limits.concurrent_jobs:
+ count = count_per_id.get(id, 0)
+ # Check the user's number of dispatched jobs in the assigned destination id against the limit for that id
+ if count >= self.app.job_config.limits.concurrent_jobs[id]:
return JOB_WAIT
+ # If we pass the destination limit (if there is one), also check limits on any tags (if any)
+ if job_wrapper.job_destination.tags:
+ for tag in job_wrapper.job_destination.tags:
+ # Check each tag for this job's destination
+ if tag in self.app.job_config.limits.concurrent_jobs:
+ # Only if there's a limit defined for this tag
+ count = 0
+ for id in [ d.id for d in self.app.job_config.get_destinations(tag) ]:
+ # Add up the aggregate job total for this tag
+ count += count_per_id.get(id, 0)
+ if count >= self.app.job_config.limits.concurrent_jobs[tag]:
+ return JOB_WAIT
+ # PASS. increase usage by one job (if caching) so that multiple jobs aren't dispatched on this queue iteration
+ self.increase_running_job_count(job.user_id, id)
elif job.galaxy_session:
# Anonymous users only get the hard limit
- if self.app.config.anonymous_user_job_limit:
+ if self.app.job_config.limits.anonymous_user_concurrent_jobs:
count = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
.filter( and_( model.Job.session_id == job.galaxy_session.id,
or_( model.Job.state == model.Job.states.RUNNING,
model.Job.state == model.Job.states.QUEUED ) ) ).count()
- if count >= self.app.config.anonymous_user_job_limit:
+ if count >= self.app.job_config.limits.anonymous_user_concurrent_jobs:
return JOB_WAIT
else:
log.warning( 'Job %s is not associated with a user or session so job concurrency limit cannot be checked.' % job.id )
@@ -431,58 +537,41 @@
class DefaultJobDispatcher( object ):
def __init__( self, app ):
self.app = app
- self.job_runners = {}
- start_job_runners = ["local", "lwr"]
- if app.config.start_job_runners is not None:
- start_job_runners.extend( [ x.strip() for x in util.listify( app.config.start_job_runners ) ] )
- if app.config.use_tasked_jobs:
- start_job_runners.append("tasks")
- for name in start_job_runners:
- self._load_plugin( name )
- log.debug( "Job runners: " + ':'.join( start_job_runners ) )
-
- def _load_plugin( self, name ):
- module_name = 'galaxy.jobs.runners.' + name
- try:
- module = __import__( module_name )
- except:
- log.exception( 'Job runner is not loadable: %s' % module_name )
- return
- for comp in module_name.split( "." )[1:]:
- module = getattr( module, comp )
- if '__all__' not in dir( module ):
- log.error( 'Runner "%s" does not contain a list of exported classes in __all__' % module_name )
- return
- for obj in module.__all__:
- display_name = ':'.join( ( module_name, obj ) )
- runner = getattr( module, obj )
- self.job_runners[name] = runner( self.app )
- log.debug( 'Loaded job runner: %s' % display_name )
+ self.job_runners = self.app.job_config.get_job_runner_plugins()
+ # Once plugins are loaded, all job destinations that were created from
+ # URLs can have their URL params converted to the destination's param
+ # dict by the plugin.
+ self.app.job_config.convert_legacy_destinations(self.job_runners)
+ log.debug( "Loaded job runners plugins: " + ':'.join( self.job_runners.keys() ) )
def __get_runner_name( self, job_wrapper ):
if job_wrapper.can_split():
runner_name = "tasks"
else:
- runner_name = ( job_wrapper.get_job_runner_url().split(":", 1) )[0]
+ runner_name = job_wrapper.job_destination.runner
return runner_name
+ def url_to_destination( self, url ):
+ """This is used by the runner mapper (a.k.a. dynamic runner) and
+ recovery methods to have runners convert URLs to destinations.
+
+ New-style runner plugin IDs must match the URL's scheme for this to work.
+ """
+ runner_name = url.split(':', 1)[0]
+ try:
+ return self.job_runners[runner_name].url_to_destination(url)
+ except Exception, e:
+ log.exception("Unable to convert legacy job runner URL to job destination, destination will be the '%s' runner with no params: %s" % (runner_name, e))
+ return JobDestination(runner=runner_name)
+
def put( self, job_wrapper ):
- try:
- runner_name = self.__get_runner_name( job_wrapper )
- except Exception, e:
- failure_message = getattr(e, 'failure_message', DEFAULT_JOB_PUT_FAILURE_MESSAGE )
- if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
- log.exception( 'Failed to generate job runner name' )
- else:
- log.debug( "Intentionally failing job with message (%s)" % failure_message )
- job_wrapper.fail( failure_message )
- return
+ runner_name = self.__get_runner_name( job_wrapper )
try:
if isinstance(job_wrapper, TaskWrapper):
#DBTODO Refactor
- log.debug( "dispatching task %s, of job %d, to %s runner" %( job_wrapper.task_id, job_wrapper.job_id, runner_name ) )
+ log.debug( "(%s) Dispatching task %s to %s runner" %( job_wrapper.job_id, job_wrapper.task_id, runner_name ) )
else:
- log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) )
+ log.debug( "(%s) Dispatching to %s runner" %( job_wrapper.job_id, runner_name ) )
self.job_runners[runner_name].put( job_wrapper )
except KeyError:
log.error( 'put(): (%s) Invalid job runner: %s' % ( job_wrapper.job_id, runner_name ) )
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/manager.py
--- a/lib/galaxy/jobs/manager.py
+++ b/lib/galaxy/jobs/manager.py
@@ -26,257 +26,18 @@
"""
def __init__( self, app ):
self.app = app
- self.job_handler = NoopHandler()
- if self.app.config.server_name in self.app.config.job_handlers:
+ if (self.app.config.track_jobs_in_database and self.app.job_config.is_handler(self.app.config.server_name)) or not self.app.config.track_jobs_in_database:
+ log.debug("Starting job handler")
self.job_handler = handler.JobHandler( app )
- if self.app.config.server_name == self.app.config.job_manager:
- job_handler = NoopHandler()
- # In the case that webapp == manager == handler, pass jobs in memory
- if not self.app.config.track_jobs_in_database:
- job_handler = self.job_handler
- # Otherwise, even if the manager == one of the handlers, its handler will pick up jobs from the database
- self.job_queue = JobManagerQueue( app, job_handler )
- self.job_stop_queue = JobManagerStopQueue( app, job_handler )
- if self.app.config.enable_beta_job_managers:
- from galaxy.jobs.deferred import DeferredJobQueue
- self.deferred_job_queue = DeferredJobQueue( app )
+ self.job_queue = self.job_handler.job_queue
+ self.job_stop_queue = self.job_handler.job_stop_queue
else:
+ self.job_handler = NoopHandler()
self.job_queue = self.job_stop_queue = NoopQueue()
self.job_handler.start()
def shutdown( self ):
- self.job_queue.shutdown()
- self.job_stop_queue.shutdown()
self.job_handler.shutdown()
-class JobManagerQueue( object ):
- """
- Job manager, waits for jobs to be runnable and then dispatches to a
- JobHandler.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, job_handler ):
- self.app = app
- self.job_handler = job_handler # the (singular) handler if we are passing jobs in memory
-
- self.sa_session = app.model.context
- self.job_lock = False
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( name="JobManagerQueue.monitor_thread", target=self.__monitor )
- self.monitor_thread.setDaemon( True )
- # Recover jobs at startup
- self.__check_jobs_at_startup()
- # Start the queue
- self.monitor_thread.start()
- log.info( "job manager queue started" )
-
- def __check_jobs_at_startup( self ):
- """
- Checks all jobs that are in the 'new', 'queued' or 'running' state in
- the database and requeues or cleans up as necessary. Only run as the
- job manager starts.
- """
- for job in self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( ( ( model.Job.state == model.Job.states.NEW ) \
- | ( model.Job.state == model.Job.states.RUNNING ) \
- | ( model.Job.state == model.Job.states.QUEUED ) ) \
- & ( model.Job.handler == None ) ):
- if job.tool_id not in self.app.toolbox.tools_by_id:
- log.warning( "(%s) Tool '%s' removed from tool config, unable to recover job" % ( job.id, job.tool_id ) )
- JobWrapper( job, self ).fail( 'This tool was disabled before the job completed. Please contact your Galaxy administrator.' )
- else:
- job.handler = self.__get_handler( job ) # handler's recovery method will take it from here
- log.info( "(%d) Job in '%s' state had no handler at job manager startup, assigned '%s' handler" % ( job.id, job.state, job.handler ) )
- if self.sa_session.dirty:
- self.sa_session.flush()
-
- def __monitor( self ):
- """
- Continually iterate the waiting jobs and dispatch to a handler
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.__monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def __monitor_step( self ):
- """
- Called repeatedly by `monitor` to process waiting jobs. Gets any new
- jobs (either from the database or from its own queue), then assigns a
- handler.
- """
- # Do nothing if the queue is locked
- if self.job_lock:
- log.info( 'Job queue is administratively locked, sleeping...' )
- time.sleep( 10 )
- return
- # Pull all new jobs from the queue at once
- jobs_to_check = []
- if self.app.config.track_jobs_in_database:
- # Clear the session so we get fresh states for job and all datasets
- self.sa_session.expunge_all()
- # Fetch all new jobs
- jobs_to_check = self.sa_session.query( model.Job ).enable_eagerloads( False ) \
- .filter( ( model.Job.state == model.Job.states.NEW ) \
- & ( model.Job.handler == None ) ).all()
- else:
- # Get job objects and append to watch queue for any which were
- # previously waiting
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, tool_id = message
- # Get the job object and append to watch queue
- jobs_to_check.append( self.sa_session.query( model.Job ).get( job_id ) )
- except Empty:
- pass
-
- for job in jobs_to_check:
- job.handler = self.__get_handler( job )
- job.job_runner_name = self.__get_runner_url( job )
- log.debug( "(%s) Job assigned to handler '%s'" % ( job.id, job.handler ) )
- self.sa_session.add( job )
-
- # If tracking in the database, handlers will pick up the job now
- self.sa_session.flush()
-
- time.sleep( 5 )
-
- # This only does something in the case that there is only one handler and it is this Galaxy process
- for job in jobs_to_check:
- self.job_handler.job_queue.put( job.id, job.tool_id )
-
- def __get_handler( self, job ):
- try:
- params = None
- if job.params:
- params = from_json_string( job.params )
- return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_handler( params )
- except:
- log.exception( "(%s) Caught exception attempting to get tool-specific job handler for tool '%s', selecting at random from available handlers instead:" % ( job.id, job.tool_id ) )
- return random.choice( self.app.config.job_handlers )
-
- def __get_runner_url( self, job ):
- """This fetches the raw runner URL, and does not perform any computation e.g. for the dynamic runner"""
- try:
- return self.app.toolbox.tools_by_id.get( job.tool_id, None ).get_job_runner_url( job.params )
- except Exception, e:
- log.warning( 'Unable to determine job runner URL for job %s: %s' % (job.id, str(e)) )
- return None
-
- def put( self, job_id, tool ):
- """Add a job to the queue (by job identifier)"""
- if not self.app.config.track_jobs_in_database:
- self.queue.put( ( job_id, tool.id ) )
- self.sleeper.wake()
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.app.config.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job manager queue stopped" )
-
-class JobManagerStopQueue( object ):
- """
- A queue for jobs which need to be terminated prematurely.
- """
- STOP_SIGNAL = object()
- def __init__( self, app, job_handler ):
- self.app = app
- self.job_handler = job_handler
-
- self.sa_session = app.model.context
-
- # Keep track of the pid that started the job manager, only it
- # has valid threads
- self.parent_pid = os.getpid()
- # Contains new jobs. Note this is not used if track_jobs_in_database is True
- self.queue = Queue()
-
- # Contains jobs that are waiting (only use from monitor thread)
- self.waiting = []
-
- # Helper for interruptable sleep
- self.sleeper = Sleeper()
- self.running = True
- self.monitor_thread = threading.Thread( name="JobManagerStopQueue.monitor_thread", target=self.monitor )
- self.monitor_thread.setDaemon( True )
- self.monitor_thread.start()
- log.info( "job manager stop queue started" )
-
- def monitor( self ):
- """
- Continually iterate the waiting jobs, stop any that are found.
- """
- # HACK: Delay until after forking, we need a way to do post fork notification!!!
- time.sleep( 10 )
- while self.running:
- try:
- self.monitor_step()
- except:
- log.exception( "Exception in monitor_step" )
- # Sleep
- self.sleeper.sleep( 1 )
-
- def monitor_step( self ):
- """
- Called repeatedly by `monitor` to stop jobs.
- """
- jobs_to_check = []
- # Pull from the queue even if tracking in the database (in the case of Administrative stopped jobs)
- try:
- while 1:
- message = self.queue.get_nowait()
- if message is self.STOP_SIGNAL:
- return
- # Unpack the message
- job_id, error_msg = message
- # Get the job object and append to watch queue
- jobs_to_check.append( ( self.sa_session.query( model.Job ).get( job_id ), error_msg ) )
- except Empty:
- pass
-
- # If tracking in the database, the handler will pick up the stop itself. Otherwise, notify the handler.
- for job, error_msg in jobs_to_check:
- self.job_handler.job_stop_queue.put( job.id, error_msg )
-
- def put( self, job_id, error_msg=None ):
- self.queue.put( ( job_id, error_msg ) )
-
- def shutdown( self ):
- """Attempts to gracefully shut down the worker thread"""
- if self.parent_pid != os.getpid():
- # We're not the real job queue, do nothing
- return
- else:
- log.info( "sending stop signal to worker thread" )
- self.running = False
- if not self.app.config.track_jobs_in_database:
- self.queue.put( self.STOP_SIGNAL )
- self.sleeper.wake()
- log.info( "job manager stop queue stopped" )
-
class NoopHandler( object ):
def __init__( self, *args, **kwargs ):
self.job_queue = NoopQueue()
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/mapper.py
--- a/lib/galaxy/jobs/mapper.py
+++ b/lib/galaxy/jobs/mapper.py
@@ -6,7 +6,8 @@
import galaxy.jobs.rules
-DYNAMIC_RUNNER_PREFIX = "dynamic:///"
+DYNAMIC_RUNNER_NAME = "dynamic"
+DYNAMIC_DESTINATION_ID = "dynamic_legacy_from_url"
class JobMappingException( Exception ):
@@ -20,9 +21,9 @@
(in the form of job_wrappers) to job runner url strings.
"""
- def __init__( self, job_wrapper, job_runner_name=None ):
+ def __init__( self, job_wrapper, url_to_destination ):
self.job_wrapper = job_wrapper
- self.job_runner_name = job_runner_name
+ self.url_to_destination = url_to_destination
self.rule_modules = self.__get_rule_modules( )
def __get_rule_modules( self ):
@@ -87,11 +88,23 @@
return expand_function( **actual_args )
- def __determine_expand_function_name( self, option_parts ):
+ def __convert_url_to_destination( self, url ):
+ """
+ Job runner URLs are deprecated, but dynamic mapper functions may still
+ be returning them. Runners are expected to be able to convert these to
+ destinations.
+
+ This method calls
+ JobHandlerQueue.DefaultJobDispatcher.url_to_destination, which in turn
+ calls the url_to_destination method for the appropriate runner.
+ """
+ dest = self.url_to_destination( url )
+ dest['id'] = DYNAMIC_DESTINATION_ID
+ return dest
+
+ def __determine_expand_function_name( self, destination ):
# default look for function with same name as tool, unless one specified
- expand_function_name = self.job_wrapper.tool.id
- if len( option_parts ) > 1:
- expand_function_name = option_parts[ 1 ]
+ expand_function_name = destination.params.get('function', self.job_wrapper.tool.id)
return expand_function_name
def __get_expand_function( self, expand_function_name ):
@@ -110,32 +123,57 @@
return rule_module
return None
- def __expand_dynamic_job_runner_url( self, options_str ):
- option_parts = options_str.split( '/' )
- expand_type = option_parts[ 0 ]
+ def __handle_dynamic_job_destination( self, destination ):
+ expand_type = destination.params.get('type', None)
if expand_type == "python":
- expand_function_name = self.__determine_expand_function_name( option_parts )
+ expand_function_name = self.__determine_expand_function_name( destination )
expand_function = self.__get_expand_function( expand_function_name )
- return self.__invoke_expand_function( expand_function )
+ rval = self.__invoke_expand_function( expand_function )
+ # TODO: test me extensively
+ if isinstance(rval, basestring):
+ # If the function returned a string, check if it's a URL, convert if necessary
+ if '://' in rval:
+ return self.__convert_url_to_destination(rval)
+ else:
+ return self.app.job_config.get_destination(rval)
+ elif isinstance(rval, galaxy.jobs.JobDestination):
+ # If the function generated a JobDestination, we'll use that
+ # destination directly. However, for advanced job limiting, a
+ # function may want to set the JobDestination's 'tags'
+ # attribute so that limiting can be done on a destination tag.
+ #id_or_tag = rval.get('id')
+ #if rval.get('tags', None):
+ # # functions that are generating destinations should only define one tag
+ # id_or_tag = rval.get('tags')[0]
+ #return id_or_tag, rval
+ return rval
+ else:
+ raise Exception( 'Dynamic function returned a value that could not be understood: %s' % rval )
+ elif expand_type is None:
+ raise Exception( 'Dynamic function type not specified (hint: add <param id="type">python</param> to your <destination>)' )
else:
raise Exception( "Unhandled dynamic job runner type specified - %s" % expand_type )
- def __cache_job_runner_url( self, params ):
- # If there's already a runner set in the Job object, don't overwrite from the tool
- if self.job_runner_name is not None and not self.job_runner_name.startswith('tasks'):
- raw_job_runner_url = self.job_runner_name
+ def __cache_job_destination( self, params ):
+ raw_job_destination = self.job_wrapper.tool.get_job_destination( params )
+ #raw_job_destination_id_or_tag = self.job_wrapper.tool.get_job_destination_id_or_tag( params )
+ if raw_job_destination.runner == DYNAMIC_RUNNER_NAME:
+ job_destination = self.__handle_dynamic_job_destination( raw_job_destination )
else:
- raw_job_runner_url = self.job_wrapper.tool.get_job_runner_url( params )
- if raw_job_runner_url.startswith( DYNAMIC_RUNNER_PREFIX ):
- job_runner_url = self.__expand_dynamic_job_runner_url( raw_job_runner_url[ len( DYNAMIC_RUNNER_PREFIX ) : ] )
- else:
- job_runner_url = raw_job_runner_url
- self.cached_job_runner_url = job_runner_url
+ job_destination = raw_job_destination
+ #job_destination_id_or_tag = raw_job_destination_id_or_tag
+ self.cached_job_destination = job_destination
+ #self.cached_job_destination_id_or_tag = job_destination_id_or_tag
- def get_job_runner_url( self, params ):
+ def get_job_destination( self, params ):
"""
- Cache the job_runner_url string to avoid recalculation.
+ Cache the job_destination to avoid recalculation.
"""
- if not hasattr( self, 'cached_job_runner_url' ):
- self.__cache_job_runner_url( params )
- return self.cached_job_runner_url
+ if not hasattr( self, 'cached_job_destination' ):
+ self.__cache_job_destination( params )
+ return self.cached_job_destination
+
+ #def get_job_destination_id_or_tag( self, params ):
+ # if not hasattr( self, 'cached_job_destination_id_or_tag' ):
+ # self.__cache_job_destination( params )
+ # return self.cached_job_destination_id_or_tag
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -1,13 +1,129 @@
-import os, logging, os.path
+"""
+Base classes for job runner plugins.
+"""
+import os
+import time
+import logging
+import threading
+
+from Queue import Queue, Empty
+
+import galaxy.jobs
from galaxy import model
-from Queue import Queue, Empty
-import time
-import threading
log = logging.getLogger( __name__ )
+STOP_SIGNAL = object()
+
class BaseJobRunner( object ):
+ def __init__( self, app, nworkers ):
+ """Start the job runner
+ """
+ self.app = app
+ self.sa_session = app.model.context
+ self.nworkers = nworkers
+
+ def _init_worker_threads(self):
+ """Start ``nworkers`` worker threads.
+ """
+ self.work_queue = Queue()
+ self.work_threads = []
+ log.debug('Starting %s %s workers' % (self.nworkers, self.runner_name))
+ for i in range(self.nworkers):
+ worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
+ worker.setDaemon( True )
+ worker.start()
+ self.work_threads.append( worker )
+
+ def run_next(self):
+ """Run the next item in the work queue (a job waiting to run)
+ """
+ while 1:
+ ( method, arg ) = self.work_queue.get()
+ if method is STOP_SIGNAL:
+ return
+ # id and name are collected first so that the call of method() is the last exception.
+ try:
+ # arg should be a JobWrapper/TaskWrapper
+ job_id = arg.get_id_tag()
+ except:
+ job_id = 'unknown'
+ try:
+ name = method.__name__
+ except:
+ name = 'unknown'
+ try:
+ method(arg)
+ except:
+ log.exception( "(%s) Unhandled exception calling %s" % ( job_id, name ) )
+
+ # Causes a runner's `queue_job` method to be called from a worker thread
+ def put(self, job_wrapper):
+ """Add a job to the queue (by job identifier), indicate that the job is ready to run.
+ """
+ # Change to queued state before handing to worker thread so the runner won't pick it up again
+ job_wrapper.change_state( model.Job.states.QUEUED )
+ # Persist the destination so that the job will be included in counts if using concurrency limits
+ job_wrapper.set_job_destination( job_wrapper.job_destination, None )
+ self.mark_as_queued(job_wrapper)
+
+ def mark_as_queued(self, job_wrapper):
+ self.work_queue.put( ( self.queue_job, job_wrapper ) )
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the worker threads
+ """
+ log.info( "%s: Sending stop signal to %s worker threads" % ( self.runner_name, len( self.work_threads ) ) )
+ for i in range( len( self.work_threads ) ):
+ self.work_queue.put( ( STOP_SIGNAL, None ) )
+
+ # Most runners should override the legacy URL handler methods and destination param method
+ def url_to_destination(self, url):
+ """
+ Convert a legacy URL to a JobDestination.
+
+ Job runner URLs are deprecated, JobDestinations should be used instead.
+ This base class method converts from a URL to a very basic
+ JobDestination without destination params.
+ """
+ return galaxy.jobs.JobDestination(runner=url.split(':')[0])
+
+ def parse_destination_params(self, params):
+ """Parse the JobDestination ``params`` dict and return the runner's native representation of those params.
+ """
+ raise NotImplementedError()
+
+ # Runners must override the job handling methods
+ def queue_job(self, job_wrapper):
+ """Some sanity checks that all runners' queue_job() methods are likely to want to do
+ """
+ job_id = job_wrapper.get_id_tag()
+ job_state = job_wrapper.get_state()
+ job_wrapper.is_ready = False
+
+ # Make sure the job hasn't been deleted
+ if job_state != model.Job.states.QUEUED:
+ log.info( "(%d) Job is in state %s, skipping execution" % ( job_id, job_state ) )
+ return
+
+ # Prepare the job
+ try:
+ job_wrapper.prepare()
+ job_wrapper.runner_command_line = self.build_command_line( job_wrapper )
+ except:
+ log.exception("(%d) Failure preparing job" % job_id)
+ job_wrapper.fail( "failure preparing job", exception=True )
+ return
+
+ job_wrapper.is_ready = True
+
+ def stop_job(self, job):
+ raise NotImplementedError()
+
+ def recover(self, job, job_wrapper):
+ raise NotImplementedError()
+
def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ):
"""
Compose the sequence of commands necessary to execute a job. This will
@@ -104,12 +220,11 @@
log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) )
return output_pairs
-
-class ClusterJobState( object ):
+class AsynchronousJobState( object ):
"""
- Encapsulate the state of a cluster job, this should be subclassed as
+ Encapsulate the state of an asynchronous job, this should be subclassed as
needed for various job runners to capture additional information needed
- to communicate with cluster job manager.
+ to communicate with distributed resource manager.
"""
def __init__( self ):
@@ -117,23 +232,22 @@
self.job_id = None
self.old_state = None
self.running = False
- self.runner_url = None
+ self.job_file = None
+ self.output_file = None
+ self.error_file = None
+ self.exit_code_file = None
+ self.check_count = 0
+ self.job_destination = None
-STOP_SIGNAL = object()
-
-JOB_STATUS_QUEUED = 'queue'
-JOB_STATUS_FAILED = 'fail'
-JOB_STATUS_FINISHED = 'finish'
-
-class ClusterJobRunner( BaseJobRunner ):
- """
- Not sure this is the best name for this class, but there is common code
- shared between sge, pbs, drmaa, etc...
+class AsynchronousJobRunner( BaseJobRunner ):
+ """Parent class for any job runner that runs jobs asynchronously (e.g. via
+ a distributed resource manager). Provides general methods for having a
+ thread to monitor the state of asynchronous jobs and submitting those jobs
+ to the correct methods (queue, finish, cleanup) at appropriate times..
"""
- def __init__( self, app ):
- self.app = app
- self.sa_session = app.model.context
+ def __init__( self, app, nworkers ):
+ super( AsynchronousJobRunner, self ).__init__( app, nworkers )
# 'watched' and 'queue' are both used to keep track of jobs to watch.
# 'queue' is used to add new watched jobs, and can be called from
# any thread (usually by the 'queue_job' method). 'watched' must only
@@ -147,82 +261,44 @@
self.monitor_thread.setDaemon( True )
self.monitor_thread.start()
- def _init_worker_threads(self):
- self.work_queue = Queue()
- self.work_threads = []
- nworkers = self.app.config.cluster_job_queue_workers
- for i in range( nworkers ):
- worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
- worker.setDaemon( True )
- worker.start()
- self.work_threads.append( worker )
-
def handle_stop(self):
# DRMAA and SGE runners should override this and disconnect.
pass
def monitor( self ):
"""
- Watches jobs currently in the cluster queue and deals with state changes
- (queued to running) and job completion
+ Watches jobs currently in the monitor queue and deals with state
+ changes (queued to running) and job completion.
"""
while 1:
# Take any new watched jobs and put them on the monitor list
try:
while 1:
- cluster_job_state = self.monitor_queue.get_nowait()
- if cluster_job_state is STOP_SIGNAL:
+ async_job_state = self.monitor_queue.get_nowait()
+ if async_job_state is STOP_SIGNAL:
# TODO: This is where any cleanup would occur
self.handle_stop()
return
- self.watched.append( cluster_job_state )
+ self.watched.append( async_job_state )
except Empty:
pass
# Iterate over the list of watched jobs and check state
- self.check_watched_items()
+ try:
+ self.check_watched_items()
+ except Exception, e:
+ log.exception('Unhandled exception checking active jobs')
# Sleep a bit before the next state check
time.sleep( 1 )
- def run_next( self ):
- """
- Run the next item in the queue (a job waiting to run or finish )
- """
- while 1:
- ( op, obj ) = self.work_queue.get()
- if op is STOP_SIGNAL:
- return
- try:
- if op == JOB_STATUS_QUEUED:
- # If the next item is to be run, then only run it if the
- # job state is "queued". Otherwise the next item was either
- # cancelled or one of its siblings encountered an error.
- job_state = obj.get_state()
- if model.Job.states.QUEUED == job_state:
- self.queue_job( obj )
- else:
- log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) )
- elif op == JOB_STATUS_FINISHED:
- self.finish_job( obj )
- elif op == JOB_STATUS_FAILED:
- self.fail_job( obj )
- except:
- log.exception( "Uncaught exception %sing job" % op )
-
def monitor_job(self, job_state):
self.monitor_queue.put( job_state )
- def put( self, job_wrapper ):
- """Add a job to the queue (by job identifier)"""
- # Change to queued state before handing to worker thread so the runner won't pick it up again
- job_wrapper.change_state( model.Job.states.QUEUED )
- self.mark_as_queued(job_wrapper)
-
def shutdown( self ):
"""Attempts to gracefully shut down the monitor thread"""
- log.info( "sending stop signal to worker threads" )
+ log.info( "%s: Sending stop signal to monitor thread" % self.runner_name )
self.monitor_queue.put( STOP_SIGNAL )
- for i in range( len( self.work_threads ) ):
- self.work_queue.put( ( STOP_SIGNAL, None ) )
+ # Call the parent's shutdown method to stop workers
+ super( AsynchronousJobRunner, self ).shutdown()
def check_watched_items(self):
"""
@@ -233,19 +309,16 @@
reuse the logic here.
"""
new_watched = []
- for cluster_job_state in self.watched:
- new_cluster_job_state = self.check_watched_item(cluster_job_state)
- if new_cluster_job_state:
- new_watched.append(new_cluster_job_state)
+ for async_job_state in self.watched:
+ new_async_job_state = self.check_watched_item(async_job_state)
+ if new_async_job_state:
+ new_watched.append(new_async_job_state)
self.watched = new_watched
# Subclasses should implement this unless they override check_watched_items all together.
def check_watched_item(self):
raise NotImplementedError()
- def queue_job(self, job_wrapper):
- raise NotImplementedError()
-
def finish_job(self, job_state):
raise NotImplementedError()
@@ -253,10 +326,7 @@
raise NotImplementedError()
def mark_as_finished(self, job_state):
- self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) )
+ self.work_queue.put( ( self.finish_job, job_state ) )
def mark_as_failed(self, job_state):
- self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) )
-
- def mark_as_queued(self, job_wrapper):
- self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) )
+ self.work_queue.put( ( self.fail_job, job_state ) )
diff -r 396202a06c1d5a195752fe8a561df8a69e8efad3 -r f3ff8526fde6f0aa42733e92537a4268b7fabc30 lib/galaxy/jobs/runners/drmaa.py
--- a/lib/galaxy/jobs/runners/drmaa.py
+++ b/lib/galaxy/jobs/runners/drmaa.py
@@ -89,7 +89,7 @@
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
STOP_SIGNAL = object()
- def __init__( self, app ):
+ def __init__( self, app, nworkers ):
"""Initialize this job runner and start the monitor thread"""
# Check if drmaa was importable, fail if not
self.app = app
This diff is so big that we needed to truncate the remainder.
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
11 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/396202a06c1d/
changeset: 396202a06c1d
user: jgoecks
date: 2013-02-11 18:05:50
summary: More refactoring for tabular data display.
affected #: 2 files
diff -r 7ccddea80a25cf7cdb1e4a9df2056153a326737c -r 396202a06c1d5a195752fe8a561df8a69e8efad3 static/scripts/mvc/data.js
--- a/static/scripts/mvc/data.js
+++ b/static/scripts/mvc/data.js
@@ -202,11 +202,40 @@
}
});
+// -- Utility functions. --
+
+/**
+ * Create a model, attach it to a view, render view, and attach it to a parent element.
+ */
+var createModelAndView = function(model, view, model_config, parent_elt) {
+ // Create model, view.
+ var a_view = new view({
+ model: new model(model_config)
+ });
+
+ // Render view and add to parent element.
+ a_view.render();
+ if (parent_elt) {
+ parent_elt.append(a_view.$el);
+ }
+
+ return a_view;
+};
+
+/**
+ * Create a tabular dataset chunked view (and requisite tabular dataset model)
+ * and appends to parent_elt.
+ */
+var createTabularDatasetChunkedView = function(dataset_config, parent_elt) {
+ return createModelAndView(TabularDataset, TabularDatasetChunkedView, dataset_config, parent_elt);
+};
+
return {
Dataset: Dataset,
TabularDataset: TabularDataset,
DatasetCollection: DatasetCollection,
- TabularDatasetChunkedView: TabularDatasetChunkedView
+ TabularDatasetChunkedView: TabularDatasetChunkedView,
+ createTabularDatasetChunkedView: createTabularDatasetChunkedView
};
});
diff -r 7ccddea80a25cf7cdb1e4a9df2056153a326737c -r 396202a06c1d5a195752fe8a561df8a69e8efad3 templates/webapps/galaxy/dataset/tabular_chunked.mako
--- a/templates/webapps/galaxy/dataset/tabular_chunked.mako
+++ b/templates/webapps/galaxy/dataset/tabular_chunked.mako
@@ -16,22 +16,18 @@
});
require(['mvc/data'], function(data) {
-
- // Set up dataset.
- var dataset = new data.TabularDataset( _.extend( ${h.to_json_string( dataset.get_api_value() )},
- {
- chunk_url: "${h.url_for( controller='/dataset', action='display',
+ data.createTabularDatasetChunkedView(
+ // Dataset config. TODO: encode id.
+ _.extend( ${h.to_json_string( dataset.get_api_value() )},
+ {
+ chunk_url: "${h.url_for( controller='/dataset', action='display',
dataset_id=trans.security.encode_id( dataset.id ))}",
- first_data_chunk: ${chunk}
- }
- ) );
-
- // Set up, render, and add view.
- var dataset_view = new data.TabularDatasetChunkedView({
- model: dataset
- });
- dataset_view.render();
- $('body').append(dataset_view.$el);
+ first_data_chunk: ${chunk}
+ }
+ ),
+ // Append view to body.
+ $('body')
+ );
});
</script></%def>
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Enhancements and fixes for citable URLs for repositories in the tool shed.
by Bitbucket 11 Feb '13
by Bitbucket 11 Feb '13
11 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/7ccddea80a25/
changeset: 7ccddea80a25
user: greg
date: 2013-02-11 17:46:08
summary: Enhancements and fixes for citable URLs for repositories in the tool shed.
affected #: 4 files
diff -r 33f735bb3682ab3907de3d6cfdd8171999a645a3 -r 7ccddea80a25cf7cdb1e4a9df2056153a326737c lib/galaxy/webapps/community/buildapp.py
--- a/lib/galaxy/webapps/community/buildapp.py
+++ b/lib/galaxy/webapps/community/buildapp.py
@@ -63,8 +63,9 @@
# Create the universe WSGI application
webapp = CommunityWebApplication( app, session_cookie='galaxycommunitysession', name="community" )
add_ui_controllers( webapp, app )
- webapp.add_route( '/view/{owner}/', controller='repository', action='citable_owner' )
- webapp.add_route( '/view/{owner}/{name}/', controller='repository', action='citable_repository' )
+ webapp.add_route( '/view/{owner}', controller='repository', action='citable_owner' )
+ webapp.add_route( '/view/{owner}/{name}', controller='repository', action='citable_repository' )
+ webapp.add_route( '/view/{owner}/{name}/{changeset_revision}', controller='repository', action='citable_repository_revision' )
webapp.add_route( '/:controller/:action', action='index' )
webapp.add_route( '/:action', controller='repository', action='index' )
webapp.add_route( '/repos/*path_info', controller='hg', action='handle_request', path_info='/' )
diff -r 33f735bb3682ab3907de3d6cfdd8171999a645a3 -r 7ccddea80a25cf7cdb1e4a9df2056153a326737c lib/galaxy/webapps/community/controllers/repository.py
--- a/lib/galaxy/webapps/community/controllers/repository.py
+++ b/lib/galaxy/webapps/community/controllers/repository.py
@@ -225,6 +225,39 @@
.outerjoin( model.RepositoryCategoryAssociation.table ) \
.outerjoin( model.Category.table )
+class RepositoriesByUserGrid( RepositoryGrid ):
+ title = "Repositories by user"
+ columns = [
+ RepositoryGrid.NameColumn( "Name",
+ key="name",
+ link=( lambda item: dict( operation="view_or_manage_repository", id=item.id ) ),
+ attach_popup=False ),
+ RepositoryGrid.MetadataRevisionColumn( "Metadata Revisions" ),
+ RepositoryGrid.TipRevisionColumn( "Tip Revision" ),
+ RepositoryGrid.DescriptionColumn( "Synopsis",
+ key="description",
+ attach_popup=False ),
+ RepositoryGrid.CategoryColumn( "Category",
+ model_class=model.Category,
+ key="Category.name",
+ attach_popup=False )
+ ]
+ operations = []
+ standard_filters = []
+ default_filter = dict( deleted="False" )
+ num_rows_per_page = 50
+ preserve_state = False
+ use_paging = True
+ def build_initial_query( self, trans, **kwd ):
+ decoded_user_id = trans.security.decode_id( kwd[ 'user_id' ] )
+ return trans.sa_session.query( model.Repository ) \
+ .filter( and_( model.Repository.table.c.deleted == False,
+ model.Repository.table.c.deprecated == False,
+ model.Repository.table.c.user_id == decoded_user_id ) ) \
+ .join( model.User.table ) \
+ .outerjoin( model.RepositoryCategoryAssociation.table ) \
+ .outerjoin( model.Category.table )
+
class RepositoriesIOwnGrid( RepositoryGrid ):
title = "Repositories I own"
columns = [
@@ -376,7 +409,7 @@
.filter( model.Repository.table.c.id < 0 )
class ValidRepositoryGrid( RepositoryGrid ):
- # This grid filters out repositories that have been marked as deprecated.
+ # This grid filters out repositories that have been marked as either deleted or deprecated.
class CategoryColumn( grids.TextColumn ):
def get_value( self, trans, grid, repository ):
rval = '<ul>'
@@ -531,6 +564,7 @@
my_writable_repositories_grid = MyWritableRepositoriesGrid()
repositories_i_own_grid = RepositoriesIOwnGrid()
deprecated_repositories_i_own_grid = DeprecatedRepositoriesIOwnGrid()
+ repositories_by_user_grid = RepositoriesByUserGrid()
@web.expose
def browse_categories( self, trans, **kwd ):
@@ -588,12 +622,13 @@
.order_by( trans.model.Repository.table.c.name ):
for downloadable_revision in repository.metadata_revisions:
metadata = downloadable_revision.metadata
- invalid_tools = metadata.get( 'invalid_tools', [] )
- for invalid_tool_config in invalid_tools:
- invalid_tools_dict[ invalid_tool_config ] = ( repository.id,
- repository.name,
- repository.user.username,
- downloadable_revision.changeset_revision )
+ if metadata:
+ invalid_tools = metadata.get( 'invalid_tools', [] )
+ for invalid_tool_config in invalid_tools:
+ invalid_tools_dict[ invalid_tool_config ] = ( repository.id,
+ repository.name,
+ repository.user.username,
+ downloadable_revision.changeset_revision )
return trans.fill_template( '/webapps/community/repository/browse_invalid_tools.mako',
cntrller=cntrller,
invalid_tools_dict=invalid_tools_dict,
@@ -614,19 +649,9 @@
action='edit_repository',
**kwd ) )
elif operation == "repositories_by_user":
- # Eliminate the current filters if any exist.
- for k, v in kwd.items():
- if k.startswith( 'f-' ):
- del kwd[ k ]
- if 'user_id' in kwd:
- user = suc.get_user( trans, kwd[ 'user_id' ] )
- kwd[ 'f-email' ] = user.email
- del kwd[ 'user_id' ]
- else:
- # The received id is the repository id, so we need to get the id of the user that uploaded the repository.
- repository_id = kwd.get( 'id', None )
- repository = suc.get_repository_in_tool_shed( trans, repository_id )
- kwd[ 'f-email' ] = repository.user.email
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='browse_repositories_by_user',
+ **kwd ) )
elif operation == "repositories_i_own":
# Eliminate the current filters if any exist.
for k, v in kwd.items():
@@ -672,24 +697,44 @@
kwd[ 'message' ] = 'You must be logged in to set email alerts.'
kwd[ 'status' ] = 'error'
del kwd[ 'operation' ]
- # The changeset_revision_select_field in the RepositoryGrid performs a refresh_on_change
- # which sends in request parameters like changeset_revison_1, changeset_revision_2, etc. One
- # of the many select fields on the grid performed the refresh_on_change, so we loop through
- # all of the received values to see which value is not the repository tip. If we find it, we
- # know the refresh_on_change occurred, and we have the necessary repository id and change set
- # revision to pass on.
+ selected_changeset_revision, repository = self.get_repository_from_refresh_on_change( trans, **kwd )
+ if repository:
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='browse_repositories',
+ operation='view_or_manage_repository',
+ id=trans.security.encode_id( repository.id ),
+ changeset_revision=selected_changeset_revision ) )
+ return self.repository_grid( trans, **kwd )
+ @web.expose
+ def browse_repositories_by_user( self, trans, **kwd ):
+ """Display the list of repositories owned by a specified user."""
+ # Eliminate the current filters if any exist.
for k, v in kwd.items():
- changset_revision_str = 'changeset_revision_'
- if k.startswith( changset_revision_str ):
- repository_id = trans.security.encode_id( int( k.lstrip( changset_revision_str ) ) )
+ if k.startswith( 'f-' ):
+ del kwd[ k ]
+ if 'operation' in kwd:
+ operation = kwd[ 'operation' ].lower()
+ if operation == "view_or_manage_repository":
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='view_or_manage_repository',
+ **kwd ) )
+ if 'user_id' not in kwd:
+ # The received id is the repository id, so we need to get the id of the user that uploaded the repository.
+ repository_id = kwd.get( 'id', None )
+ if repository_id:
repository = suc.get_repository_in_tool_shed( trans, repository_id )
- if repository.tip( trans.app ) != v:
+ kwd[ 'user_id' ] = trans.security.encode_id( repository.user.id )
+ else:
+ # The user selected a repository revision which results in a refresh_on_change.
+ selected_changeset_revision, repository = self.get_repository_from_refresh_on_change( trans, **kwd )
+ if repository:
return trans.response.send_redirect( web.url_for( controller='repository',
- action='browse_repositories',
- operation='view_or_manage_repository',
+ action='view_or_manage_repository',
id=trans.security.encode_id( repository.id ),
- changeset_revision=v ) )
- return self.repository_grid( trans, **kwd )
+ changeset_revision=selected_changeset_revision ) )
+ user = suc.get_user( trans, kwd[ 'user_id' ] )
+ self.repositories_by_user_grid.title = "Repositories owned by %s" % user.username
+ return self.repositories_by_user_grid( trans, **kwd )
@web.expose
def browse_repository( self, trans, id, **kwd ):
params = util.Params( kwd )
@@ -745,6 +790,7 @@
@web.expose
def browse_valid_repositories( self, trans, **kwd ):
galaxy_url = kwd.get( 'galaxy_url', None )
+ repository_id = kwd.get( 'id', None )
if 'f-free-text-search' in kwd:
if 'f-Category.name' in kwd:
# The user browsed to a category and then entered a search string, so get the category associated with it's value.
@@ -757,7 +803,6 @@
if 'operation' in kwd:
operation = kwd[ 'operation' ].lower()
if operation == "preview_tools_in_changeset":
- repository_id = kwd.get( 'id', None )
repository = suc.get_repository_in_tool_shed( trans, repository_id )
repository_metadata = suc.get_latest_repository_metadata( trans, repository.id )
latest_installable_changeset_revision = repository_metadata.changeset_revision
@@ -773,21 +818,12 @@
category_id = kwd.get( 'id', None )
category = suc.get_category( trans, category_id )
kwd[ 'f-Category.name' ] = category.name
- # The changeset_revision_select_field in the ValidRepositoryGrid performs a refresh_on_change which sends in request parameters like
- # changeset_revison_1, changeset_revision_2, etc. One of the many select fields on the grid performed the refresh_on_change, so we loop
- # through all of the received values to see which value is not the repository tip. If we find it, we know the refresh_on_change occurred
- # and we have the necessary repository id and change set revision to pass on.
- repository_id = None
- for k, v in kwd.items():
- changset_revision_str = 'changeset_revision_'
- if k.startswith( changset_revision_str ):
- repository_id = trans.security.encode_id( int( k.lstrip( changset_revision_str ) ) )
- repository = suc.get_repository_in_tool_shed( trans, repository_id )
- if repository.tip( trans.app ) != v:
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='preview_tools_in_changeset',
- repository_id=trans.security.encode_id( repository.id ),
- changeset_revision=v ) )
+ selected_changeset_revision, repository = self.get_repository_from_refresh_on_change( trans, **kwd )
+ if repository:
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='preview_tools_in_changeset',
+ repository_id=trans.security.encode_id( repository.id ),
+ changeset_revision=selected_changeset_revision ) )
url_args = dict( action='browse_valid_repositories',
operation='preview_tools_in_changeset',
repository_id=repository_id )
@@ -895,45 +931,84 @@
return trans.response.send_redirect( url )
@web.expose
def citable_owner( self, trans, owner ):
- """Support for citeable URL for each repository owner's tools, e.g. http://example.org/view/owner."""
+ """Support for citable URL for each repository owner's tools, e.g. http://example.org/view/owner."""
try:
user = suc.get_user_by_username( trans, owner )
except:
user = None
if user:
user_id = trans.security.encode_id( user.id )
- return trans.fill_template( "/webapps/community/citable_repository.mako",
- user_id=user_id )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ user_id=user_id ) )
else:
- message = "No repositories exist with owner <b>%s</b>." % str( owner )
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='browse_categories',
- id=None,
- name=None,
- owner=None,
- message=message,
- status='error' ) )
+ return trans.show_error_message( "The tool shed <b>%s</b> contains no repositories owned by <b>%s</b>." % \
+ ( web.url_for( '/', qualified=True ).rstrip( '/' ), str( owner ) ) )
@web.expose
def citable_repository( self, trans, owner, name ):
- """Support for citeable URL for each repository, e.g. http://example.org/view/owner/name."""
+ """Support for citable URL for a specified repository, e.g. http://example.org/view/owner/name."""
try:
repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
except:
repository = None
if repository:
repository_id = trans.security.encode_id( repository.id )
- return trans.fill_template( "/webapps/community/citable_repository.mako",
- repository_id=repository_id )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ repository_id=repository_id ) )
else:
- #TODO - If the owner is OK, show their repositories?
- message = "No repositories named <b>%s</b> with owner <b>%s</b> exist." % ( str( name ), str( owner ) )
+ # If the owner is valid, then show all of their repositories.
+ try:
+ user = suc.get_user_by_username( trans, owner )
+ except:
+ user = None
+ if user:
+ user_id = trans.security.encode_id( user.id )
+ message = "This list of repositories owned by <b>%s</b>, does not include one named <b>%s</b>." % ( str( owner ), str( name ) )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ user_id=user_id,
+ message=message,
+ status='error' ) )
+ else:
+ return trans.show_error_message( "The tool shed <b>%s</b> contains no repositories named <b>%s</b> with owner <b>%s</b>." % \
+ ( web.url_for( '/', qualified=True ).rstrip( '/' ), str( name ), str( owner ) ) )
+ @web.expose
+ def citable_repository_revision( self, trans, owner, name, changeset_revision ):
+ """Support for citable URL for a specified repository revision, e.g. http://example.org/view/owner/name/changeset_revision."""
+ try:
+ repository = suc.get_repository_by_name_and_owner( trans.app, name, owner )
+ except:
+ repository = None
+ if repository:
+ repository_id = trans.security.encode_id( repository.id )
+ repository_metadata = suc.get_repository_metadata_by_repository_id_changset_revision( trans, repository_id, changeset_revision )
+ if not repository_metadata:
+ # Get updates to the received changeset_revision if any exist.
+ repo_dir = repository.repo_path( trans.app )
+ repo = hg.repository( suc.get_configured_ui(), repo_dir )
+ upper_bound_changeset_revision = suc.get_next_downloadable_changeset_revision( repository, repo, changeset_revision )
+ if upper_bound_changeset_revision:
+ changeset_revision = upper_bound_changeset_revision
+ repository_metadata = suc.get_repository_metadata_by_repository_id_changset_revision( trans, repository_id, changeset_revision )
+ if repository_metadata:
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ repository_id=repository_id,
+ changeset_revision=changeset_revision ) )
+ else:
+ message = "The change log for the repository named <b>%s</b> owned by <b>%s</b> does not include revision <b>%s</b>." % \
+ ( str( name ), str( owner ), str( changeset_revision ) )
+ return trans.response.send_redirect( web.url_for( controller='repository',
+ action='index',
+ repository_id=repository_id,
+ message=message,
+ status='error' ) )
+ else:
+ # See if the owner is valid.
return trans.response.send_redirect( web.url_for( controller='repository',
- action='browse_categories',
- id=None,
- name=None,
- owner=None,
- message=message,
- status='error' ) )
+ action='citable_owner',
+ owner=owner ) )
@web.expose
def contact_owner( self, trans, id, **kwd ):
params = util.Params( kwd )
@@ -1446,6 +1521,21 @@
if repository_dependencies:
return encoding_util.tool_shed_encode( repository_dependencies )
return ''
+ def get_repository_from_refresh_on_change( self, trans, **kwd ):
+ # The changeset_revision_select_field in several grids performs a refresh_on_change which sends in request parameters like
+ # changeset_revison_1, changeset_revision_2, etc. One of the many select fields on the grid performed the refresh_on_change,
+ # so we loop through all of the received values to see which value is not the repository tip. If we find it, we know the
+ # refresh_on_change occurred and we have the necessary repository id and change set revision to pass on.
+ repository_id = None
+ for k, v in kwd.items():
+ changset_revision_str = 'changeset_revision_'
+ if k.startswith( changset_revision_str ):
+ repository_id = trans.security.encode_id( int( k.lstrip( changset_revision_str ) ) )
+ repository = suc.get_repository_in_tool_shed( trans, repository_id )
+ if repository.tip( trans.app ) != v:
+ return v, repository
+ # This should never be reached - raise an exception?
+ return v, None
@web.json
def get_repository_information( self, trans, repository_ids, changeset_revisions, **kwd ):
"""
@@ -1717,10 +1807,18 @@
if repository.deprecated:
has_deprecated_repositories = True
break
+ # Route in may have been from a citable URL, in whcih case we'll have a user_id and possibly a name
+ # The received user_id will be the id of the repository owner.
+ user_id = params.get( 'user_id', None )
+ repository_id = params.get( 'repository_id', None )
+ changeset_revision = params.get( 'changeset_revision', None )
return trans.fill_template( '/webapps/community/index.mako',
repository_metadata=repository_metadata,
has_reviewed_repositories=has_reviewed_repositories,
has_deprecated_repositories=has_deprecated_repositories,
+ user_id=user_id,
+ repository_id=repository_id,
+ changeset_revision=changeset_revision,
message=message,
status=status )
@web.expose
@@ -2472,7 +2570,7 @@
@web.expose
def updated_changeset_revisions( self, trans, **kwd ):
"""
- Handle a request from a local Galaxy instance to retrieve the lsit of changeset revisions to which an installed repository can be updated. This
+ Handle a request from a local Galaxy instance to retrieve the list of changeset revisions to which an installed repository can be updated. This
method will return a string of comma-separated changeset revision hashes for all available updates to the received changeset revision. Among
other things , this method handles the scenario where an installed tool shed repository's tool_dependency definition file defines a changeset
revision for a complex repository dependency that is outdated. In other words, a defined changeset revision is older than the current changeset
@@ -2593,17 +2691,6 @@
message=message,
status=status )
@web.expose
- def view_citable_repositories_by_owner( self, trans, user_id, **kwd ):
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='browse_repositories',
- operation="repositories_by_user",
- user_id=user_id ) )
- @web.expose
- def view_citable_repository( self, trans, repository_id, **kwd ):
- return trans.response.send_redirect( web.url_for( controller='repository',
- action='view_repository',
- id=repository_id ) )
- @web.expose
def view_or_manage_repository( self, trans, **kwd ):
repository = suc.get_repository_in_tool_shed( trans, kwd[ 'id' ] )
if trans.user_is_admin() or repository.user == trans.user:
@@ -2693,7 +2780,6 @@
review_id = None
containers_dict = suc.build_repository_containers_for_tool_shed( trans, repository, changeset_revision, repository_dependencies, repository_metadata )
can_browse_repository_reviews = suc.can_browse_repository_reviews( trans, repository )
- log.debug("VVV In view_repository, can_browse_repository_reviews: %s" % str( can_browse_repository_reviews ))
return trans.fill_template( '/webapps/community/repository/view_repository.mako',
repo=repo,
repository=repository,
diff -r 33f735bb3682ab3907de3d6cfdd8171999a645a3 -r 7ccddea80a25cf7cdb1e4a9df2056153a326737c templates/webapps/community/citable_repository.mako
--- a/templates/webapps/community/citable_repository.mako
+++ /dev/null
@@ -1,84 +0,0 @@
-<%inherit file="/webapps/community/base_panels.mako"/>
-<%namespace file="/message.mako" import="render_msg" />
-
-<%def name="stylesheets()">
- ${parent.stylesheets()}
- ## Include "base.css" for styling tool menu and forms (details)
- ${h.css( "base", "autocomplete_tagging", "tool_menu" )}
-
- ## But make sure styles for the layout take precedence
- ${parent.stylesheets()}
-
- <style type="text/css">
- body { margin: 0; padding: 0; overflow: hidden; }
- #left {
- background: #C1C9E5 url(${h.url_for('/static/style/menu_bg.png')}) top repeat-x;
- }
- </style>
-</%def>
-
-<%def name="javascripts()">
- ${parent.javascripts()}
-</%def>
-
-<%def name="init()">
- <%
- self.has_left_panel=True
- self.has_right_panel=False
- self.active_view="tools"
- %>
- %if trans.app.config.require_login and not trans.user:
- <script type="text/javascript">
- if ( window != top ) {
- top.location.href = location.href;
- }
- </script>
- %endif
-</%def>
-
-<%def name="left_panel()">
- <div class="page-container" style="padding: 10px;">
- <div class="toolMenu">
- <div class="toolSectionList">
- <div class="toolSectionPad"></div>
- <div class="toolSectionTitle">
- Search
- </div>
- <div class="toolSectionBody">
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='find_tools' )}">Search for valid tools</a>
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='find_workflows' )}">Search for workflows</a>
- </div>
- </div>
- <div class="toolSectionPad"></div>
- <div class="toolSectionTitle">
- All Repositories
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_categories' )}">Browse by category</a>
- </div>
- <div class="toolSectionPad"></div>
- <div class="toolSectionTitle">
- Available Actions
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='/user', action='login' )}">Login to create a repository</a>
- </div>
- </div>
- </div>
- </div>
-</%def>
-
-<%def name="center_panel()">
- <%
- if trans.app.config.require_login and not trans.user:
- center_url = h.url_for( controller='user', action='login' )
- elif repository_id:
- center_url = h.url_for( controller='repository', action='view_citable_repository', repository_id=repository_id )
- else:
- center_url = h.url_for( controller='repository', action='view_citable_repositories_by_owner', user_id=user_id )
- %>
- <iframe name="galaxy_main" id="galaxy_main" frameborder="0" style="position: absolute; width: 100%; height: 100%;" src="${center_url}"></iframe>
-</%def>
diff -r 33f735bb3682ab3907de3d6cfdd8171999a645a3 -r 7ccddea80a25cf7cdb1e4a9df2056153a326737c templates/webapps/community/index.mako
--- a/templates/webapps/community/index.mako
+++ b/templates/webapps/community/index.mako
@@ -44,90 +44,101 @@
<div class="page-container" style="padding: 10px;"><div class="toolMenu"><div class="toolSectionList">
- %if repository_metadata:
+ %if user_id or repository_id:
+ ## The route in was a citable url.
<div class="toolSectionPad"></div><div class="toolSectionTitle">
- Search
- </div>
- <div class="toolSectionBody">
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='find_tools' )}">Search for valid tools</a>
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='find_workflows' )}">Search for workflows</a>
- </div>
- </div>
- %endif
- <div class="toolSectionPad"></div>
- <div class="toolSectionTitle">
- All Repositories
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_categories' )}">Browse by category</a>
- </div>
- %if trans.user:
- <div class="toolSectionPad"></div>
- <div class="toolSectionTitle">
- My Repositories and Tools
+ All Repositories
</div><div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='repositories_i_own' )}">Repositories I own</a>
+ <a href="${h.url_for( controller='repository', action='index' )}">Browse by category</a></div>
- %if has_reviewed_repositories:
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='reviewed_repositories_i_own' )}">Reviewed repositories I own</a>
- </div>
- %endif
- %if has_deprecated_repositories:
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='deprecated_repositories_i_own' )}">Deprecated repositories I own</a>
- </div>
- %endif
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='my_writable_repositories' )}">My writable repositories</a>
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_invalid_tools', cntrller='repository' )}">My invalid tools</a>
- </div>
- <div class="toolSectionPad"></div>
- <div class="toolSectionTitle">
- Available Actions
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository', action='create_repository' )}">Create new repository</a>
- </div>
- %if can_review_repositories:
+ %else:
+ %if repository_metadata:
<div class="toolSectionPad"></div><div class="toolSectionTitle">
- Reviewing Repositories
+ Search
</div><div class="toolSectionBody">
- <div class="toolSectionBg">
- %if trans.user.repository_reviews:
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_repositories_reviewed_by_me' )}">Repositories reviewed by me</a>
- </div>
- %endif
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_repositories_with_reviews' )}">All reviewed repositories</a>
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_repositories_without_reviews' )}">Repositories with no reviews</a>
- </div>
- <div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_components' )}">Manage review components</a>
- </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='find_tools' )}">Search for valid tools</a>
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='find_workflows' )}">Search for workflows</a></div></div>
%endif
- %else:
<div class="toolSectionPad"></div><div class="toolSectionTitle">
- Available Actions
+ All Repositories
</div><div class="toolTitle">
- <a target="galaxy_main" href="${h.url_for( controller='/user', action='login' )}">Login to create a repository</a>
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_categories' )}">Browse by category</a></div>
+ %if trans.user:
+ <div class="toolSectionPad"></div>
+ <div class="toolSectionTitle">
+ My Repositories and Tools
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='repositories_i_own' )}">Repositories I own</a>
+ </div>
+ %if has_reviewed_repositories:
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='reviewed_repositories_i_own' )}">Reviewed repositories I own</a>
+ </div>
+ %endif
+ %if has_deprecated_repositories:
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='deprecated_repositories_i_own' )}">Deprecated repositories I own</a>
+ </div>
+ %endif
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_repositories', operation='my_writable_repositories' )}">My writable repositories</a>
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='browse_invalid_tools', cntrller='repository' )}">My invalid tools</a>
+ </div>
+ <div class="toolSectionPad"></div>
+ <div class="toolSectionTitle">
+ Available Actions
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository', action='create_repository' )}">Create new repository</a>
+ </div>
+ %if can_review_repositories:
+ <div class="toolSectionPad"></div>
+ <div class="toolSectionTitle">
+ Reviewing Repositories
+ </div>
+ <div class="toolSectionBody">
+ <div class="toolSectionBg">
+ %if trans.user.repository_reviews:
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_repositories_reviewed_by_me' )}">Repositories reviewed by me</a>
+ </div>
+ %endif
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_repositories_with_reviews' )}">All reviewed repositories</a>
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_repositories_without_reviews' )}">Repositories with no reviews</a>
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='repository_review', action='manage_components' )}">Manage review components</a>
+ </div>
+ </div>
+ </div>
+ %endif
+ %else:
+ <div class="toolSectionPad"></div>
+ <div class="toolSectionTitle">
+ Available Actions
+ </div>
+ <div class="toolTitle">
+ <a target="galaxy_main" href="${h.url_for( controller='/user', action='login' )}">Login to create a repository</a>
+ </div>
+ %endif
%endif
</div></div>
@@ -138,8 +149,17 @@
<%
if trans.app.config.require_login and not trans.user:
center_url = h.url_for( controller='user', action='login', message=message, status=status )
+ elif repository_id and changeset_revision:
+ # Route in was a citable url: /view/{owner}/{name}/{changeset_revision}.
+ center_url = h.url_for( controller='repository', action='view_repository', id=repository_id, changeset_revision=changeset_revision, message=message, status=status )
+ elif repository_id:
+ # Route in was a citable url: /view/{owner}/{name}.
+ center_url = h.url_for( controller='repository', action='view_repository', id=repository_id, message=message, status=status )
+ elif user_id:
+ # Route in was a citable url: /view/{owner}.
+ center_url = h.url_for( controller='repository', action='browse_repositories', operation="repositories_by_user", user_id=user_id, message=message, status=status )
else:
center_url = h.url_for( controller='repository', action='browse_categories', message=message, status=status )
%>
- <iframe name="galaxy_main" id="galaxy_main" frameborder="0" style="position: absolute; width: 100%; height: 100%;" src="${center_url}"></iframe>
+ <iframe name="galaxy_main" id="galaxy_main" frameborder="0" style="position: absolute; width: 100%; height: 100%;" src="${center_url}"></iframe></%def>
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Elimnte deleted repositories from the valid repository grid.
by Bitbucket 07 Feb '13
by Bitbucket 07 Feb '13
07 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/1b5a0358c2ba/
changeset: 1b5a0358c2ba
user: greg
date: 2013-02-07 22:38:03
summary: Elimnte deleted repositories from the valid repository grid.
affected #: 1 file
diff -r 2e8ad17c1786b73ca6a5b9d60dcdf69ef7e50f4d -r 1b5a0358c2bad7a857ef1b08f22a795a397b9d54 lib/galaxy/webapps/community/controllers/repository.py
--- a/lib/galaxy/webapps/community/controllers/repository.py
+++ b/lib/galaxy/webapps/community/controllers/repository.py
@@ -76,7 +76,7 @@
viewable_repositories = 0
for rca in category.repositories:
repository = rca.repository
- if not repository.deprecated and repository.downloadable_revisions:
+ if not repository.deleted and not repository.deprecated and repository.downloadable_revisions:
viewable_repositories += 1
return viewable_repositories
return 0
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Fix the operation handling in the rols and groups methos of the Admin controller.
by Bitbucket 07 Feb '13
by Bitbucket 07 Feb '13
07 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/2e8ad17c1786/
changeset: 2e8ad17c1786
user: greg
date: 2013-02-07 22:36:53
summary: Fix the operation handling in the rols and groups methos of the Admin controller.
affected #: 1 file
diff -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 -r 2e8ad17c1786b73ca6a5b9d60dcdf69ef7e50f4d lib/galaxy/web/base/controllers/admin.py
--- a/lib/galaxy/web/base/controllers/admin.py
+++ b/lib/galaxy/web/base/controllers/admin.py
@@ -1,3 +1,4 @@
+import logging
from datetime import datetime, timedelta
from string import punctuation as PUNCTUATION
@@ -7,6 +8,8 @@
from galaxy.util import inflector
from galaxy.web.form_builder import CheckboxField
+log = logging.getLogger( __name__ )
+
class Admin( object ):
# Override these
user_list_grid = None
@@ -76,7 +79,7 @@
@web.require_admin
def roles( self, trans, **kwargs ):
if 'operation' in kwargs:
- operation = kwargs['operation'].lower()
+ operation = kwargs[ 'operation' ].lower().replace( '+', ' ' )
if operation == "roles":
return self.role( trans, **kwargs )
if operation == "create":
@@ -411,7 +414,7 @@
@web.require_admin
def groups( self, trans, **kwargs ):
if 'operation' in kwargs:
- operation = kwargs['operation'].lower()
+ operation = kwargs[ 'operation' ].lower().replace( '+', ' ' )
if operation == "groups":
return self.group( trans, **kwargs )
if operation == "create":
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: jgoecks: Subtract query tool: make ignoring empty end columns and whitespace optional.
by Bitbucket 07 Feb '13
by Bitbucket 07 Feb '13
07 Feb '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/ce9789a35356/
changeset: ce9789a35356
user: jgoecks
date: 2013-02-07 20:46:49
summary: Subtract query tool: make ignoring empty end columns and whitespace optional.
affected #: 3 files
diff -r fd82f2ff9533fb5e4ef37c0404ede7c58d9234c5 -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 test-data/eq-showbeginning_e.dat
--- /dev/null
+++ b/test-data/eq-showbeginning_e.dat
@@ -0,0 +1,10 @@
+chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -
+chr1 147984545 147984630 CCDS990.1_cds_0_0_chr1_147984546_f 0 +
+chr1 148078400 148078582 CCDS993.1_cds_0_0_chr1_148078401_r 0 -
+chr1 148185136 148185276 CCDS996.1_cds_0_0_chr1_148185137_f 0 +
+chr10 55251623 55253124 CCDS7248.1_cds_0_0_chr10_55251624_r 0 -
+chr11 116124407 116124501 CCDS8374.1_cds_0_0_chr11_116124408_r 0 -
+chr11 116206508 116206563 CCDS8377.1_cds_0_0_chr11_116206509_f 0 +
+chr11 116211733 116212337 CCDS8378.1_cds_0_0_chr11_116211734_r 0 -
+chr11 1812377 1812407 CCDS7726.1_cds_0_0_chr11_1812378_f 0 +
+chr12 38440094 38440321 CCDS8736.1_cds_0_0_chr12_38440095_r 0 -
diff -r fd82f2ff9533fb5e4ef37c0404ede7c58d9234c5 -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 tools/new_operations/subtract_query.py
--- a/tools/new_operations/subtract_query.py
+++ b/tools/new_operations/subtract_query.py
@@ -3,7 +3,8 @@
"""
Subtract an entire query from another query
-usage: %prog in_file_1 in_file_2 begin_col end_col output
+usage: %prog in_file_1 in_file_2 begin_col end_col output
+ --ignore-empty-end-cols: ignore empty end columns when subtracting
"""
import sys, re
from galaxy import eggs
@@ -18,7 +19,7 @@
assert sys.version_info[:2] >= ( 2, 4 )
-def get_lines(fname, begin_col='', end_col=''):
+def get_lines(fname, begin_col='', end_col='', ignore_empty_end_cols=False):
lines = set([])
i = 0
for i, line in enumerate(file(fname)):
@@ -29,12 +30,15 @@
try:
line = line.split('\t')
line = '\t'.join([line[j] for j in range(begin_col-1, end_col)])
- # removing empty fields, we do not compare empty fields at the end of a line.
- line = line.rstrip()
+ if ignore_empty_end_cols:
+ # removing empty fields, we do not compare empty fields at the end of a line.
+ line = line.rstrip()
lines.add( line )
except: pass
else:
- line = line.rstrip()
+ if ignore_empty_end_cols:
+ # removing empty fields, we do not compare empty fields at the end of a line.
+ line = line.rstrip()
lines.add( line )
if i: return (i+1, lines)
else: return (i, lines)
@@ -83,9 +87,9 @@
lines1 is the set of unique lines in inp1_file
diff1 is the number of duplicate lines removed from inp1_file
"""
- len1, lines1 = get_lines(inp1_file, begin_col, end_col)
+ len1, lines1 = get_lines(inp1_file, begin_col, end_col, options.ignore_empty_end_cols)
diff1 = len1 - len(lines1)
- len2, lines2 = get_lines(inp2_file, begin_col, end_col)
+ len2, lines2 = get_lines(inp2_file, begin_col, end_col, options.ignore_empty_end_cols)
lines1.difference_update(lines2)
"""lines1 is now the set of unique lines in inp1_file - the set of unique lines in inp2_file"""
diff -r fd82f2ff9533fb5e4ef37c0404ede7c58d9234c5 -r ce9789a35356da2b2ee4ae723506d5af57a0ce69 tools/new_operations/subtract_query.xml
--- a/tools/new_operations/subtract_query.xml
+++ b/tools/new_operations/subtract_query.xml
@@ -1,11 +1,18 @@
-<tool id="subtract_query1" name="Subtract Whole Dataset">
+<tool id="subtract_query1" name="Subtract Whole Dataset" version="0.1"><description>from another dataset</description>
- <command interpreter="python">subtract_query.py $input1 $input2 $begin_col $end_col $output</command>
+ <command interpreter="python">
+ subtract_query.py $input1 $input2 $begin_col $end_col $output
+ #if str($ignore_empty_end_cols) == 'true':
+ --ignore-empty-end-cols
+ #end if
+
+ </command><inputs><param format="txt" name="input2" type="data" label="Subtract" help="Second dataset" /><param format="txt" name="input1" type="data" label="from" help="First dataset" /><param name="begin_col" type="data_column" data_ref="input1" force_select="False" label="Restrict subtraction between 'begin column'" /><param name="end_col" type="data_column" data_ref="input1" force_select="False" label="and 'end column'" help="Specifying columns for restricting subtraction is available only for tabular formatted datasets" />
+ <param name="ignore_empty_end_cols" type="boolean" label="Ignore empty columns and whitespace at end of line when subtracting"/></inputs><outputs><data format="input" name="output" metadata_source="input1" />
@@ -45,6 +52,15 @@
<param name="end_col" value="None" /><output name="output" file="subtract-query-4.dat" /></test>
+ <!-- Subtract 2 tabular files with no column restrictions, ignoring empty end columns. -->
+ <test>
+ <param name="input1" value="eq-showbeginning_e.dat" />
+ <param name="input2" value="eq-showtail.dat" />
+ <param name="begin_col" value="None" />
+ <param name="end_col" value="None" />
+ <param name="ignore_empty_end_cols" value="true" />
+ <output name="output" file="subtract-query-2.dat" />
+ </test></tests><help>
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0