galaxy-commits
Threads by month
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2010 -----
- December
- November
- October
- September
- August
- July
- June
- May
January 2013
- 1 participants
- 160 discussions
commit/galaxy-central: inithello: Tool shed functional tests for automated installation, reinstallation, and reactivation of repository dependencies. Enhancements to tool shed functional test framework.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/2e96c23c760b/
changeset: 2e96c23c760b
user: inithello
date: 2013-01-11 23:02:42
summary: Tool shed functional tests for automated installation, reinstallation, and reactivation of repository dependencies. Enhancements to tool shed functional test framework.
affected #: 6 files
diff -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e -r 2e96c23c760b7d9259168e007bec52ffcdfa00a9 test/tool_shed/base/twilltestcase.py
--- a/test/tool_shed/base/twilltestcase.py
+++ b/test/tool_shed/base/twilltestcase.py
@@ -103,10 +103,10 @@
url = '/repository/view_changelog?id=%s' % self.security.encode_id( repository.id )
self.visit_url( url )
self.check_for_strings( strings_displayed, strings_not_displayed )
- def check_repository_dependency( self, repository, depends_on_repository, depends_on_changeset_revision, changeset_revision=None ):
- if changeset_revision is None:
- changeset_revision = self.get_repository_tip( repository )
- strings_displayed = [ depends_on_repository.name, depends_on_repository.user.username, depends_on_changeset_revision ]
+ def check_repository_dependency( self, repository, depends_on_repository, depends_on_changeset_revision=None, changeset_revision=None ):
+ strings_displayed = [ depends_on_repository.name, depends_on_repository.user.username ]
+ if depends_on_changeset_revision:
+ strings_displayed.append( depends_on_changeset_revision )
self.display_manage_repository_page( repository, changeset_revision=changeset_revision, strings_displayed=strings_displayed )
def check_repository_metadata( self, repository, tip_only=True ):
if tip_only:
@@ -178,6 +178,15 @@
return '%s=%s&%s=%s' % ( field_name, field_value, field_name, field_value )
else:
return '%s=%s' % ( field_name, field_value )
+ def create_repository_dependency( self, repository=None, depends_on=[], filepath=None ):
+ dependency_description = '%s depends on %s.' % ( repository.name, ', '.join( repo.name for repo in depends_on ) )
+ self.generate_repository_dependency_xml( depends_on,
+ self.get_filename( 'repository_dependencies.xml', filepath=filepath ),
+ dependency_description=dependency_description )
+ self.upload_file( repository,
+ 'repository_dependencies.xml',
+ filepath=filepath,
+ commit_message='Uploaded dependency on %s.' % ', '.join( repo.name for repo in depends_on ) )
def create_user_in_galaxy( self, cntrller='user', email='test(a)bx.psu.edu', password='testuser', username='admin-user', redirect='' ):
self.visit_galaxy_url( "/user/create?cntrller=%s&use_panels=False" % cntrller )
tc.fv( '1', 'email', email )
@@ -247,11 +256,25 @@
self.check_for_strings( strings_displayed, strings_not_displayed )
def display_manage_repository_page( self, repository, changeset_revision=None, strings_displayed=[], strings_not_displayed=[] ):
base_url = '/repository/manage_repository?id=%s' % self.security.encode_id( repository.id )
- if changeset_revision is not None:
+ if changeset_revision:
url = '%s&changeset_revision=%s' % ( base_url, changeset_revision )
else:
+ changeset_revision = self.get_repository_tip( repository )
url = base_url
self.visit_url( url )
+ metadata = self.get_repository_metadata_by_changeset_revision( repository, changeset_revision )
+ if metadata:
+ if 'tool_dependencies' in metadata.metadata:
+ strings_displayed.append( 'Tool dependencies' )
+ for dependency in metadata.metadata[ 'tool_dependencies' ]:
+ if dependency == 'set_environment':
+ for environment_dependency in metadata.metadata[ 'tool_dependencies' ][ dependency ]:
+ strings_displayed.append( environment_dependency[ 'name' ] )
+ strings_displayed.append( environment_dependency[ 'type' ] )
+ else:
+ strings_displayed.append( metadata.metadata[ 'tool_dependencies' ][ dependency ][ 'name' ] )
+ strings_displayed.append( metadata.metadata[ 'tool_dependencies' ][ dependency ][ 'version' ] )
+ strings_displayed.append( metadata.metadata[ 'tool_dependencies' ][ dependency ][ 'type' ] )
self.check_for_strings( strings_displayed, strings_not_displayed )
def display_repository_clone_page( self, owner_name, repository_name, strings_displayed=[], strings_not_displayed=[] ):
url = '/repos/%s/%s' % ( owner_name, repository_name )
@@ -384,6 +407,9 @@
return self.hgweb_config_manager.get_entry( lhs )
except:
raise Exception( "Entry for repository %s missing in hgweb config file %s." % ( lhs, self.hgweb_config_manager.hgweb_config ) )
+ def get_repository_changelog( self, repository ):
+ repo = hg.repository( ui.ui(), self.get_repo_path( repository ) )
+ return [repo.changectx( changeset ) for changeset in repo.changelog ]
def get_repository_datatypes_count( self, repository ):
metadata = self.get_repository_metadata( repository )[0].metadata
if 'datatypes' not in metadata:
@@ -424,7 +450,7 @@
return [ metadata_revision for metadata_revision in repository.metadata_revisions ]
def get_repository_metadata_by_changeset_revision( self, repository, changeset_revision ):
found = None
- for metadata_revision in self.get_repository_metadata( repository ):
+ for metadata_revision in repository.metadata_revisions:
if metadata_revision.changeset_revision == changeset_revision:
found = metadata_revision
return found
@@ -671,6 +697,30 @@
tc.formfile( "1", "file_data", self.get_filename( filename, filepath ) )
tc.submit( "upload_button" )
self.check_for_strings( strings_displayed, strings_not_displayed )
+ # Uncomment this if it becomes necessary to wait for an asynchronous process to complete after submitting an upload.
+ #for i in range( 5 ):
+ # try:
+ # self.check_for_strings( strings_displayed, strings_not_displayed )
+ # break
+ # except Exception, e:
+ # if i == 4:
+ # raise e
+ # else:
+ # time.sleep( 1 )
+ # continue
+ def verify_installed_uninstalled_repositories( self, installed_repositories=[], uninstalled_repositories=[] ):
+ strings_displayed = []
+ strings_not_displayed = []
+ for repository_name, repository_owner in uninstalled_repositories:
+ repository = test_db_util.get_repository_by_name_and_owner( repository_name, repository_owner )
+ strings_not_displayed.extend( [ repository_name, self.get_repository_tip( repository ) ] )
+ for repository_name, repository_owner in installed_repositories:
+ repository = test_db_util.get_repository_by_name_and_owner( repository_name, repository_owner )
+ galaxy_repository = test_db_util.get_installed_repository_by_name_owner( repository_name, repository_owner )
+ if galaxy_repository:
+ assert galaxy_repository.status == 'Installed', 'Repository %s should be installed, but is %s' % ( repository_name, galaxy_repository.status )
+ strings_displayed.extend( [ repository_name, self.get_repository_tip( repository ) ] )
+ self.display_galaxy_browse_repositories_page( strings_displayed=strings_displayed, strings_not_displayed=strings_not_displayed )
def verify_installed_repository_metadata_unchanged( self, name, owner ):
installed_repository = test_db_util.get_installed_repository_by_name_owner( name, owner )
metadata = installed_repository.metadata
diff -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e -r 2e96c23c760b7d9259168e007bec52ffcdfa00a9 test/tool_shed/functional/test_0050_circular_dependencies_4_levels.py
--- a/test/tool_shed/functional/test_0050_circular_dependencies_4_levels.py
+++ b/test/tool_shed/functional/test_0050_circular_dependencies_4_levels.py
@@ -17,8 +17,20 @@
freebayes_repository_description = "Galaxy's freebayes tool"
freebayes_repository_long_description = "Long description of Galaxy's freebayes tool"
-default_category = 'test_0050_repository_n_level_circular_dependencies'
-default_category_description = 'Testing handling of circular repository dependencies to n levels.'
+column_repository_name = 'column_maker_0050'
+column_repository_description = "Add column"
+column_repository_long_description = "Compute an expression on every row"
+
+convert_repository_name = 'convert_chars_0050'
+convert_repository_description = "Convert delimiters"
+convert_repository_long_description = "Convert delimiters to tab"
+
+bismark_repository_name = 'bismark_0050'
+bismark_repository_description = "A flexible aligner."
+bismark_repository_long_description = "A flexible aligner and methylation caller for Bisulfite-Seq applications."
+
+category_name = 'Test 0050 Circular Dependencies 5 Levels'
+category_description = 'Test circular dependency features'
class TestRepositoryCircularDependenciesToNLevels( ShedTwillTestCase ):
'''Verify that the code correctly handles circular dependencies down to n levels.'''
@@ -34,30 +46,57 @@
admin_user = test_db_util.get_user( common.admin_email )
assert admin_user is not None, 'Problem retrieving user with email %s from the database' % admin_email
admin_user_private_role = test_db_util.get_private_role( admin_user )
- def test_0005_create_category( self ):
- """Create a category for this test suite"""
- self.create_category( name=default_category, description=default_category_description )
- def test_0010_create_emboss_datatypes_repository( self ):
+ def test_0005_create_convert_repository( self ):
+ '''Create and populate convert_chars_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ self.logout()
+ self.login( email=common.test_user_1_email, username=common.test_user_1_name )
+ repository = self.get_or_create_repository( name=convert_repository_name,
+ description=convert_repository_description,
+ long_description=convert_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ self.upload_file( repository,
+ 'convert_chars/convert_chars.tar',
+ strings_displayed=[],
+ commit_message='Uploaded convert_chars.tar.' )
+ def test_0010_create_column_repository( self ):
+ '''Create and populate convert_chars_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=column_repository_name,
+ description=column_repository_description,
+ long_description=column_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ self.upload_file( repository,
+ 'column_maker/column_maker.tar',
+ strings_displayed=[],
+ commit_message='Uploaded column_maker.tar.' )
+ def test_0015_create_emboss_datatypes_repository( self ):
'''Create and populate emboss_datatypes_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
self.logout()
self.login( email=common.test_user_1_email, username=common.test_user_1_name )
repository = self.get_or_create_repository( name=emboss_datatypes_repository_name,
description=emboss_datatypes_repository_description,
long_description=emboss_datatypes_repository_long_description,
owner=common.test_user_1_name,
- categories=[ default_category ],
+ category_id=self.security.encode_id( category.id ),
strings_displayed=[] )
self.upload_file( repository,
'emboss/datatypes/datatypes_conf.xml',
strings_displayed=[],
commit_message='Uploaded datatypes_conf.xml.' )
- def test_0015_create_emboss_repository( self ):
+ def test_0020_create_emboss_repository( self ):
'''Create and populate emboss_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
repository = self.get_or_create_repository( name=emboss_repository_name,
description=emboss_repository_description,
long_description=emboss_repository_long_description,
owner=common.test_user_1_name,
- categories=[ default_category ],
+ category_id=self.security.encode_id( category.id ),
strings_displayed=[] )
self.upload_file( repository,
'emboss/emboss.tar',
@@ -72,13 +111,14 @@
'repository_dependencies.xml',
filepath=repository_dependencies_path,
commit_message='Uploaded dependency on emboss_datatypes.' )
- def test_0020_create_filtering_repository( self ):
+ def test_0025_create_filtering_repository( self ):
'''Create and populate filtering_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
filtering_repository = self.get_or_create_repository( name=filtering_repository_name,
description=filtering_repository_description,
long_description=filtering_repository_long_description,
owner=common.test_user_1_name,
- categories=[ default_category ],
+ category_id=self.security.encode_id( category.id ),
strings_displayed=[] )
self.upload_file( filtering_repository,
'filtering/filtering_1.1.0.tar',
@@ -93,69 +133,95 @@
'repository_dependencies.xml',
filepath=repository_dependencies_path,
commit_message='Uploaded dependency on emboss.' )
- def test_0025_create_freebayes_repository( self ):
+ def test_0030_create_freebayes_repository( self ):
'''Create and populate freebayes_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
repository = self.get_or_create_repository( name=freebayes_repository_name,
description=freebayes_repository_description,
long_description=freebayes_repository_long_description,
owner=common.test_user_1_name,
- categories=[ default_category ],
+ category_id=self.security.encode_id( category.id ),
strings_displayed=[] )
self.upload_file( repository,
'freebayes/freebayes.tar',
strings_displayed=[],
commit_message='Uploaded freebayes.tar.' )
+ def test_0035_create_bismark_repository( self ):
+ '''Create and populate bismark_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=bismark_repository_name,
+ description=bismark_repository_description,
+ long_description=bismark_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ self.upload_file( repository,
+ 'bismark/bismark.tar',
+ strings_displayed=[],
+ valid_tools_only=False,
+ commit_message='Uploaded bismark.tar.' )
+ def test_0040_create_and_upload_dependency_definitions( self ):
+ column_repository = test_db_util.get_repository_by_name_and_owner( column_repository_name, common.test_user_1_name )
+ convert_repository = test_db_util.get_repository_by_name_and_owner( convert_repository_name, common.test_user_1_name )
emboss_datatypes_repository = test_db_util.get_repository_by_name_and_owner( emboss_datatypes_repository_name, common.test_user_1_name )
emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
filtering_repository = test_db_util.get_repository_by_name_and_owner( filtering_repository_name, common.test_user_1_name )
- repository_dependencies_path = self.generate_temp_path( 'test_0050', additional_paths=[ 'freebayes' ] )
- self.generate_repository_dependency_xml( [ filtering_repository ],
- self.get_filename( 'repository_dependencies.xml', filepath=repository_dependencies_path ),
- dependency_description='Emboss depends on the filtering repository.' )
- self.upload_file( emboss_repository,
- 'repository_dependencies.xml',
- filepath=repository_dependencies_path,
- commit_message='Uploaded dependency on filtering.' )
- previous_tip = self.get_repository_tip( repository )
- self.generate_repository_dependency_xml( [ emboss_datatypes_repository, emboss_repository, filtering_repository, repository ],
- self.get_filename( 'repository_dependencies.xml', filepath=repository_dependencies_path ),
- dependency_description='Freebayes depends on the filtering repository.' )
- self.upload_file( repository,
- 'repository_dependencies.xml',
- filepath=repository_dependencies_path,
- commit_message='Uploaded dependency on filtering.' )
- self.display_manage_repository_page( repository, strings_not_displayed=[ previous_tip ] )
- def test_0030_verify_repository_dependencies( self ):
+ freebayes_repository = test_db_util.get_repository_by_name_and_owner( freebayes_repository_name, common.test_user_1_name )
+ bismark_repository = test_db_util.get_repository_by_name_and_owner( bismark_repository_name, common.test_user_1_name )
+ dependency_xml_path = self.generate_temp_path( 'test_0050', additional_paths=[ 'freebayes' ] )
+ self.create_repository_dependency( convert_repository, depends_on=[ column_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( column_repository, depends_on=[ convert_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( emboss_datatypes_repository, depends_on=[ bismark_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( emboss_repository, depends_on=[ emboss_datatypes_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( freebayes_repository,
+ depends_on=[ freebayes_repository, emboss_datatypes_repository, emboss_repository, column_repository ],
+ filepath=dependency_xml_path )
+ self.create_repository_dependency( filtering_repository, depends_on=[ emboss_repository ], filepath=dependency_xml_path )
+ def test_0045_verify_repository_dependencies( self ):
'''Verify that the generated dependency circle does not cause an infinite loop.
-
Expected structure:
- id: 2 key: http://localhost:8634__ESEP__freebayes_0050__ESEP__user1__ESEP__2e73d8e1b59d
- ['http://localhost:8634', 'emboss_datatypes_0050', 'user1', '596029c334b1']
- ['http://localhost:8634', 'emboss_0050', 'user1', '9f1503046640']
- id: 3 key: http://localhost:8634__ESEP__filtering_0050__ESEP__user1__ESEP__eefdd8bc0db9
- ['http://localhost:8634', 'emboss_0050', 'user1', '9f1503046640']
- id: 4 key: http://localhost:8634__ESEP__emboss_0050__ESEP__user1__ESEP__9f1503046640
- ['http://localhost:8634', 'emboss_datatypes_0050', 'user1', '596029c334b1']
+ id: 2 key: http://toolshed.local:10001__ESEP__filtering__ESEP__test__ESEP__871602b4276b
+ ['http://toolshed.local:10001', 'emboss_5', 'test', '8de5fe0d7b04']
+ id: 3 key: http://toolshed.local:10001__ESEP__emboss_datatypes__ESEP__test__ESEP__dbd4…
+ ['http://toolshed.local:10001', 'freebayes', 'test', 'f40028114098']
+ id: 4 key: http://toolshed.local:10001__ESEP__freebayes__ESEP__test__ESEP__f40028114098
+ ['http://toolshed.local:10001', 'emboss_datatypes', 'test', 'dbd4f68bf507']
+ ['http://toolshed.local:10001', 'emboss_5', 'test', '8de5fe0d7b04']
+ ['http://toolshed.local:10001', 'column_maker', 'test', '83e956bdbac0']
+ id: 5 key: http://toolshed.local:10001__ESEP__column_maker__ESEP__test__ESEP__83e956bd…
+ ['http://toolshed.local:10001', 'convert_chars', 'test', 'b28134220c8a']
+ id: 6 key: http://toolshed.local:10001__ESEP__convert_chars__ESEP__test__ESEP__b281342…
+ ['http://toolshed.local:10001', 'column_maker', 'test', '83e956bdbac0']
+ id: 7 key: http://toolshed.local:10001__ESEP__emboss_5__ESEP__test__ESEP__8de5fe0d7b04
+ ['http://toolshed.local:10001', 'emboss_datatypes', 'test', 'dbd4f68bf507']
'''
emboss_datatypes_repository = test_db_util.get_repository_by_name_and_owner( emboss_datatypes_repository_name, common.test_user_1_name )
emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
filtering_repository = test_db_util.get_repository_by_name_and_owner( filtering_repository_name, common.test_user_1_name )
freebayes_repository = test_db_util.get_repository_by_name_and_owner( freebayes_repository_name, common.test_user_1_name )
- for repository in [ emboss_datatypes_repository, emboss_repository, filtering_repository ]:
- self.check_repository_dependency( freebayes_repository, repository, self.get_repository_tip( repository ) )
- for changeset_revision in self.get_repository_metadata_revisions( emboss_repository ):
- self.check_repository_dependency( freebayes_repository, emboss_repository, changeset_revision )
+ column_repository = test_db_util.get_repository_by_name_and_owner( column_repository_name, common.test_user_1_name )
+ convert_repository = test_db_util.get_repository_by_name_and_owner( convert_repository_name, common.test_user_1_name )
+ bismark_repository = test_db_util.get_repository_by_name_and_owner( bismark_repository_name, common.test_user_1_name )
+ self.check_repository_dependency( convert_repository, column_repository )
+ self.check_repository_dependency( column_repository, convert_repository )
+ self.check_repository_dependency( emboss_datatypes_repository, bismark_repository )
+ self.check_repository_dependency( emboss_repository, emboss_datatypes_repository )
+ self.check_repository_dependency( filtering_repository, emboss_repository )
+ for repository in [ emboss_datatypes_repository, emboss_repository, column_repository ]:
+ self.check_repository_dependency( freebayes_repository, repository )
+ freebayes_dependencies = [ freebayes_repository, emboss_datatypes_repository, emboss_repository, column_repository ]
+ strings_displayed = [ '%s depends on %s.' % ( freebayes_repository.name, ', '.join( repo.name for repo in freebayes_dependencies ) ) ]
self.display_manage_repository_page( freebayes_repository,
- strings_displayed=[ 'Freebayes depends on the filtering repository.' ] )
- def test_0035_verify_tool_dependencies( self ):
+ strings_displayed=strings_displayed )
+ def test_0050_verify_tool_dependencies( self ):
'''Check that freebayes and emboss display tool dependencies.'''
freebayes_repository = test_db_util.get_repository_by_name_and_owner( freebayes_repository_name, common.test_user_1_name )
emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
self.display_manage_repository_page( freebayes_repository,
strings_displayed=[ 'freebayes', '0.9.4_9696d0ce8a9', 'samtools', '0.1.18', 'Tool dependencies' ] )
self.display_manage_repository_page( emboss_repository, strings_displayed=[ 'Tool dependencies', 'emboss', '5.0.0', 'package' ] )
- def test_0040_verify_repository_metadata( self ):
+ def test_0055_verify_repository_metadata( self ):
'''Verify that resetting the metadata does not change it.'''
emboss_datatypes_repository = test_db_util.get_repository_by_name_and_owner( emboss_datatypes_repository_name, common.test_user_1_name )
emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
diff -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e -r 2e96c23c760b7d9259168e007bec52ffcdfa00a9 test/tool_shed/functional/test_0070_invalid_tool.py
--- a/test/tool_shed/functional/test_0070_invalid_tool.py
+++ b/test/tool_shed/functional/test_0070_invalid_tool.py
@@ -45,5 +45,6 @@
remove_repo_files_not_in_tar='No',
commit_message='Uploaded an updated tool xml.' )
valid_revision = self.get_repository_tip( repository )
+ test_db_util.refresh( repository )
self.check_repository_tools_for_changeset_revision( repository, valid_revision )
self.check_repository_invalid_tools_for_changeset_revision( repository, invalid_revision )
diff -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e -r 2e96c23c760b7d9259168e007bec52ffcdfa00a9 test/tool_shed/functional/test_1050_circular_dependencies_4_levels.py
--- /dev/null
+++ b/test/tool_shed/functional/test_1050_circular_dependencies_4_levels.py
@@ -0,0 +1,314 @@
+from tool_shed.base.twilltestcase import ShedTwillTestCase, common, os
+import tool_shed.base.test_db_util as test_db_util
+
+emboss_datatypes_repository_name = 'emboss_datatypes_0050'
+emboss_datatypes_repository_description = "Datatypes for emboss"
+emboss_datatypes_repository_long_description = "Long description of Emboss' datatypes"
+
+emboss_repository_name = 'emboss_0050'
+emboss_repository_description = "Galaxy's emboss tool"
+emboss_repository_long_description = "Long description of Galaxy's emboss tool"
+
+filtering_repository_name = 'filtering_0050'
+filtering_repository_description = "Galaxy's filtering tool"
+filtering_repository_long_description = "Long description of Galaxy's filtering tool"
+
+freebayes_repository_name = 'freebayes_0050'
+freebayes_repository_description = "Galaxy's freebayes tool"
+freebayes_repository_long_description = "Long description of Galaxy's freebayes tool"
+
+column_repository_name = 'column_maker_0050'
+column_repository_description = "Add column"
+column_repository_long_description = "Compute an expression on every row"
+
+convert_repository_name = 'convert_chars_0050'
+convert_repository_description = "Convert delimiters"
+convert_repository_long_description = "Convert delimiters to tab"
+
+bismark_repository_name = 'bismark_0050'
+bismark_repository_description = "A flexible aligner."
+bismark_repository_long_description = "A flexible aligner and methylation caller for Bisulfite-Seq applications."
+
+category_name = 'Test 0050 Circular Dependencies 5 Levels'
+category_description = 'Test circular dependency features'
+
+running_standalone = False
+
+class TestInstallRepositoryCircularDependencies( ShedTwillTestCase ):
+ '''Verify that the code correctly handles circular dependencies down to n levels.'''
+ def test_0000_initiate_users( self ):
+ """Create necessary user accounts."""
+ self.logout()
+ self.login( email=common.test_user_1_email, username=common.test_user_1_name )
+ test_user_1 = test_db_util.get_user( common.test_user_1_email )
+ assert test_user_1 is not None, 'Problem retrieving user with email %s from the database' % test_user_1_email
+ test_user_1_private_role = test_db_util.get_private_role( test_user_1 )
+ self.logout()
+ self.login( email=common.admin_email, username=common.admin_username )
+ admin_user = test_db_util.get_user( common.admin_email )
+ assert admin_user is not None, 'Problem retrieving user with email %s from the database' % admin_email
+ admin_user_private_role = test_db_util.get_private_role( admin_user )
+ def test_0005_create_convert_repository( self ):
+ '''Create and populate convert_chars_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ global running_standalone
+ self.logout()
+ self.login( email=common.test_user_1_email, username=common.test_user_1_name )
+ repository = self.get_or_create_repository( name=convert_repository_name,
+ description=convert_repository_description,
+ long_description=convert_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ running_standalone = True
+ self.upload_file( repository,
+ 'convert_chars/convert_chars.tar',
+ strings_displayed=[],
+ commit_message='Uploaded convert_chars.tar.' )
+ def test_0010_create_column_repository( self ):
+ '''Create and populate convert_chars_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=column_repository_name,
+ description=column_repository_description,
+ long_description=column_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ self.upload_file( repository,
+ 'column_maker/column_maker.tar',
+ strings_displayed=[],
+ commit_message='Uploaded column_maker.tar.' )
+ def test_0015_create_emboss_datatypes_repository( self ):
+ '''Create and populate emboss_datatypes_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=emboss_datatypes_repository_name,
+ description=emboss_datatypes_repository_description,
+ long_description=emboss_datatypes_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ self.upload_file( repository,
+ 'emboss/datatypes/datatypes_conf.xml',
+ strings_displayed=[],
+ commit_message='Uploaded datatypes_conf.xml.' )
+ def test_0020_create_emboss_repository( self ):
+ '''Create and populate emboss_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=emboss_repository_name,
+ description=emboss_repository_description,
+ long_description=emboss_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ self.upload_file( repository,
+ 'emboss/emboss.tar',
+ strings_displayed=[],
+ commit_message='Uploaded tool tarball.' )
+ def test_0025_create_filtering_repository( self ):
+ '''Create and populate filtering_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=filtering_repository_name,
+ description=filtering_repository_description,
+ long_description=filtering_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ self.upload_file( repository,
+ 'filtering/filtering_1.1.0.tar',
+ strings_displayed=[],
+ commit_message='Uploaded filtering.tar.' )
+ def test_0030_create_freebayes_repository( self ):
+ '''Create and populate freebayes_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=freebayes_repository_name,
+ description=freebayes_repository_description,
+ long_description=freebayes_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ self.upload_file( repository,
+ 'freebayes/freebayes.tar',
+ strings_displayed=[],
+ commit_message='Uploaded freebayes.tar.' )
+ def test_0035_create_bismark_repository( self ):
+ '''Create and populate bismark_0050.'''
+ category = self.create_category( name=category_name, description=category_description )
+ repository = self.get_or_create_repository( name=bismark_repository_name,
+ description=bismark_repository_description,
+ long_description=bismark_repository_long_description,
+ owner=common.test_user_1_name,
+ category_id=self.security.encode_id( category.id ),
+ strings_displayed=[] )
+ if self.repository_is_new( repository ):
+ self.upload_file( repository,
+ 'bismark/bismark.tar',
+ strings_displayed=[],
+ valid_tools_only=False,
+ commit_message='Uploaded bismark.tar.' )
+ def test_0040_create_and_upload_dependency_definitions( self ):
+ '''Set up the dependency structure.'''
+ global running_standalone
+ if running_standalone:
+ column_repository = test_db_util.get_repository_by_name_and_owner( column_repository_name, common.test_user_1_name )
+ convert_repository = test_db_util.get_repository_by_name_and_owner( convert_repository_name, common.test_user_1_name )
+ emboss_datatypes_repository = test_db_util.get_repository_by_name_and_owner( emboss_datatypes_repository_name, common.test_user_1_name )
+ emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
+ filtering_repository = test_db_util.get_repository_by_name_and_owner( filtering_repository_name, common.test_user_1_name )
+ freebayes_repository = test_db_util.get_repository_by_name_and_owner( freebayes_repository_name, common.test_user_1_name )
+ bismark_repository = test_db_util.get_repository_by_name_and_owner( bismark_repository_name, common.test_user_1_name )
+ dependency_xml_path = self.generate_temp_path( 'test_1050', additional_paths=[ 'dependencies' ] )
+ self.create_repository_dependency( convert_repository, depends_on=[ column_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( column_repository, depends_on=[ convert_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( emboss_datatypes_repository, depends_on=[ bismark_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( emboss_repository, depends_on=[ emboss_datatypes_repository ], filepath=dependency_xml_path )
+ self.create_repository_dependency( freebayes_repository,
+ depends_on=[ freebayes_repository, emboss_datatypes_repository, emboss_repository, column_repository ],
+ filepath=dependency_xml_path )
+ self.create_repository_dependency( filtering_repository, depends_on=[ emboss_repository ], filepath=dependency_xml_path )
+ def test_0045_verify_repository_dependencies( self ):
+ '''Verify that the generated dependency circle does not cause an infinite loop.
+ Expected structure:
+
+ id: 2 key: http://toolshed.local:10001__ESEP__filtering__ESEP__test__ESEP__871602b4276b
+ ['http://toolshed.local:10001', 'emboss_5', 'test', '8de5fe0d7b04']
+ id: 3 key: http://toolshed.local:10001__ESEP__emboss_datatypes__ESEP__test__ESEP__dbd4…
+ ['http://toolshed.local:10001', 'freebayes', 'test', 'f40028114098']
+ id: 4 key: http://toolshed.local:10001__ESEP__freebayes__ESEP__test__ESEP__f40028114098
+ ['http://toolshed.local:10001', 'emboss_datatypes', 'test', 'dbd4f68bf507']
+ ['http://toolshed.local:10001', 'emboss_5', 'test', '8de5fe0d7b04']
+ ['http://toolshed.local:10001', 'column_maker', 'test', '83e956bdbac0']
+ id: 5 key: http://toolshed.local:10001__ESEP__column_maker__ESEP__test__ESEP__83e956bd…
+ ['http://toolshed.local:10001', 'convert_chars', 'test', 'b28134220c8a']
+ id: 6 key: http://toolshed.local:10001__ESEP__convert_chars__ESEP__test__ESEP__b281342…
+ ['http://toolshed.local:10001', 'column_maker', 'test', '83e956bdbac0']
+ id: 7 key: http://toolshed.local:10001__ESEP__emboss_5__ESEP__test__ESEP__8de5fe0d7b04
+ ['http://toolshed.local:10001', 'emboss_datatypes', 'test', 'dbd4f68bf507']
+ '''
+ emboss_datatypes_repository = test_db_util.get_repository_by_name_and_owner( emboss_datatypes_repository_name, common.test_user_1_name )
+ emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
+ filtering_repository = test_db_util.get_repository_by_name_and_owner( filtering_repository_name, common.test_user_1_name )
+ freebayes_repository = test_db_util.get_repository_by_name_and_owner( freebayes_repository_name, common.test_user_1_name )
+ column_repository = test_db_util.get_repository_by_name_and_owner( column_repository_name, common.test_user_1_name )
+ convert_repository = test_db_util.get_repository_by_name_and_owner( convert_repository_name, common.test_user_1_name )
+ bismark_repository = test_db_util.get_repository_by_name_and_owner( bismark_repository_name, common.test_user_1_name )
+ self.check_repository_dependency( convert_repository, column_repository )
+ self.check_repository_dependency( column_repository, convert_repository )
+ self.check_repository_dependency( emboss_datatypes_repository, bismark_repository )
+ self.check_repository_dependency( emboss_repository, emboss_datatypes_repository )
+ self.check_repository_dependency( filtering_repository, emboss_repository )
+ for repository in [ emboss_datatypes_repository, emboss_repository, column_repository ]:
+ self.check_repository_dependency( freebayes_repository, repository )
+ freebayes_dependencies = [ freebayes_repository, emboss_datatypes_repository, emboss_repository, column_repository ]
+ strings_displayed = [ '%s depends on %s.' % ( freebayes_repository.name, ', '.join( repo.name for repo in freebayes_dependencies ) ) ]
+ self.display_manage_repository_page( freebayes_repository, strings_displayed=strings_displayed )
+ def test_0050_verify_tool_dependencies( self ):
+ '''Check that freebayes and emboss display tool dependencies.'''
+ freebayes_repository = test_db_util.get_repository_by_name_and_owner( freebayes_repository_name, common.test_user_1_name )
+ emboss_repository = test_db_util.get_repository_by_name_and_owner( emboss_repository_name, common.test_user_1_name )
+ self.display_manage_repository_page( freebayes_repository,
+ strings_displayed=[ 'freebayes', '0.9.4_9696d0ce8a9', 'samtools', '0.1.18', 'Tool dependencies' ] )
+ self.display_manage_repository_page( emboss_repository, strings_displayed=[ 'Tool dependencies', 'emboss', '5.0.0', 'package' ] )
+ def test_0055_install_column_repository( self ):
+ '''Install column_maker with repository dependencies.'''
+ self.galaxy_logout()
+ self.galaxy_login( email=common.admin_email, username=common.admin_username )
+ self.install_repository( column_repository_name,
+ common.test_user_1_name,
+ category_name,
+ install_tool_dependencies=False,
+ install_repository_dependencies=True,
+ new_tool_panel_section='column_maker' )
+ installed_repositories = [ ( column_repository_name, common.test_user_1_name ),
+ ( convert_repository_name, common.test_user_1_name ) ]
+ uninstalled_repositories = [ ( emboss_datatypes_repository_name, common.test_user_1_name ),
+ ( emboss_repository_name, common.test_user_1_name ),
+ ( filtering_repository_name, common.test_user_1_name ),
+ ( freebayes_repository_name, common.test_user_1_name ),
+ ( bismark_repository_name, common.test_user_1_name ) ]
+ self.verify_installed_uninstalled_repositories( installed_repositories=installed_repositories, uninstalled_repositories=uninstalled_repositories )
+ def test_0060_install_emboss_repository( self ):
+ '''Install emboss_5 with repository dependencies.'''
+ global running_standalone
+ original_datatypes = self.get_datatypes_count()
+ self.install_repository( emboss_repository_name,
+ common.test_user_1_name,
+ category_name,
+ install_tool_dependencies=False,
+ install_repository_dependencies=True,
+ new_tool_panel_section='emboss_5_0050' )
+ if running_standalone:
+ assert original_datatypes < self.get_datatypes_count(), 'Installing a repository that depends on emboss_datatypes did not add datatypes.'
+ installed_repositories = [ ( emboss_datatypes_repository_name, common.test_user_1_name ),
+ ( column_repository_name, common.test_user_1_name ),
+ ( emboss_repository_name, common.test_user_1_name ),
+ ( convert_repository_name, common.test_user_1_name ),
+ ( bismark_repository_name, common.test_user_1_name ) ]
+ uninstalled_repositories = [ ( filtering_repository_name, common.test_user_1_name ),
+ ( freebayes_repository_name, common.test_user_1_name ) ]
+ self.verify_installed_uninstalled_repositories( installed_repositories=installed_repositories, uninstalled_repositories=uninstalled_repositories )
+ def test_0065_deactivate_datatypes_repository( self ):
+ '''Deactivate emboss_datatypes and verify that the datatypes count is reduced.'''
+ original_datatypes = self.get_datatypes_count()
+ repository = test_db_util.get_installed_repository_by_name_owner( emboss_datatypes_repository_name, common.test_user_1_name )
+ self.uninstall_repository( repository, remove_from_disk=False )
+ assert original_datatypes > self.get_datatypes_count(), 'Deactivating emboss_datatypes did not remove datatypes.'
+ installed_repositories = [ ( column_repository_name, common.test_user_1_name ),
+ ( emboss_repository_name, common.test_user_1_name ),
+ ( convert_repository_name, common.test_user_1_name ),
+ ( bismark_repository_name, common.test_user_1_name ) ]
+ uninstalled_repositories = [ ( emboss_datatypes_repository_name, common.test_user_1_name ),
+ ( filtering_repository_name, common.test_user_1_name ),
+ ( freebayes_repository_name, common.test_user_1_name ) ]
+ self.verify_installed_uninstalled_repositories( installed_repositories=installed_repositories, uninstalled_repositories=uninstalled_repositories )
+ strings_not_displayed = [ repository.name, repository.installed_changeset_revision ]
+ self.display_galaxy_browse_repositories_page( strings_not_displayed=strings_not_displayed )
+ def test_0070_uninstall_emboss_repository( self ):
+ '''Uninstall the emboss_5 repository.'''
+ repository = test_db_util.get_installed_repository_by_name_owner( emboss_repository_name, common.test_user_1_name )
+ self.uninstall_repository( repository, remove_from_disk=True )
+ strings_not_displayed = [ repository.name, repository.installed_changeset_revision ]
+ self.display_galaxy_browse_repositories_page( strings_not_displayed=strings_not_displayed )
+ test_db_util.ga_refresh( repository )
+ self.check_galaxy_repository_tool_panel_section( repository, 'emboss_5_0050' )
+ installed_repositories = [ ( column_repository_name, common.test_user_1_name ),
+ ( convert_repository_name, common.test_user_1_name ),
+ ( bismark_repository_name, common.test_user_1_name ) ]
+ uninstalled_repositories = [ ( emboss_datatypes_repository_name, common.test_user_1_name ),
+ ( emboss_repository_name, common.test_user_1_name ),
+ ( filtering_repository_name, common.test_user_1_name ),
+ ( freebayes_repository_name, common.test_user_1_name ) ]
+ self.verify_installed_uninstalled_repositories( installed_repositories=installed_repositories, uninstalled_repositories=uninstalled_repositories )
+ def test_0075_install_freebayes_repository( self ):
+ '''Install freebayes with repository dependencies. This should also automatically reactivate emboss_datatypes and reinstall emboss_5.'''
+ original_datatypes = self.get_datatypes_count()
+ strings_displayed = [ 'Handle', 'tool dependencies', 'freebayes', '0.9.4_9696d0ce8a9', 'samtools', '0.1.18' ]
+ self.install_repository( freebayes_repository_name,
+ common.test_user_1_name,
+ category_name,
+ strings_displayed=strings_displayed,
+ install_tool_dependencies=False,
+ install_repository_dependencies=True,
+ new_tool_panel_section='freebayes' )
+ assert original_datatypes < self.get_datatypes_count(), 'Installing a repository that depends on emboss_datatypes did not add datatypes.'
+ emboss_repository = test_db_util.get_installed_repository_by_name_owner( emboss_repository_name, common.test_user_1_name )
+ datatypes_repository = test_db_util.get_installed_repository_by_name_owner( emboss_datatypes_repository_name, common.test_user_1_name )
+ strings_displayed = [ emboss_repository.name,
+ emboss_repository.installed_changeset_revision,
+ datatypes_repository.name,
+ datatypes_repository.installed_changeset_revision ]
+ self.display_galaxy_browse_repositories_page( strings_displayed=strings_displayed )
+ installed_repositories = [ ( column_repository_name, common.test_user_1_name ),
+ ( emboss_datatypes_repository_name, common.test_user_1_name ),
+ ( emboss_repository_name, common.test_user_1_name ),
+ ( freebayes_repository_name, common.test_user_1_name ),
+ ( convert_repository_name, common.test_user_1_name ),
+ ( bismark_repository_name, common.test_user_1_name ) ]
+ uninstalled_repositories = [ ( filtering_repository_name, common.test_user_1_name ) ]
+ self.verify_installed_uninstalled_repositories( installed_repositories=installed_repositories, uninstalled_repositories=uninstalled_repositories )
+
diff -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e -r 2e96c23c760b7d9259168e007bec52ffcdfa00a9 test/tool_shed/functional/test_1080_advanced_circular_dependency_installation.py
--- a/test/tool_shed/functional/test_1080_advanced_circular_dependency_installation.py
+++ b/test/tool_shed/functional/test_1080_advanced_circular_dependency_installation.py
@@ -210,10 +210,8 @@
common.test_user_1_name )
self.uninstall_repository( installed_convert_repository, remove_from_disk=False )
strings_not_displayed = [ installed_column_repository.name,
- installed_column_repository.description,
installed_column_repository.installed_changeset_revision,
installed_convert_repository.name,
- installed_convert_repository.description,
installed_convert_repository.installed_changeset_revision ]
self.display_galaxy_browse_repositories_page( strings_not_displayed=strings_not_displayed )
def test_0050_reactivate_column_repository( self ):
diff -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e -r 2e96c23c760b7d9259168e007bec52ffcdfa00a9 test/tool_shed/functional_tests.py
--- a/test/tool_shed/functional_tests.py
+++ b/test/tool_shed/functional_tests.py
@@ -139,6 +139,7 @@
galaxy_file_path = os.path.join( galaxy_db_path, 'files' )
hgweb_config_file_path = tempfile.mkdtemp( dir=tool_shed_test_tmp_dir )
new_repos_path = tempfile.mkdtemp( dir=tool_shed_test_tmp_dir )
+ galaxy_tempfiles = tempfile.mkdtemp( dir=tool_shed_test_tmp_dir )
galaxy_shed_tool_path = tempfile.mkdtemp( dir=tool_shed_test_tmp_dir )
galaxy_tool_dependency_dir = tempfile.mkdtemp( dir=tool_shed_test_tmp_dir )
os.environ[ 'GALAXY_TEST_TOOL_DEPENDENCY_DIR' ] = galaxy_tool_dependency_dir
@@ -269,6 +270,7 @@
database_connection = galaxy_database_connection,
database_engine_option_pool_size = '10',
file_path = galaxy_file_path,
+ new_file_path = galaxy_tempfiles,
tool_path = tool_path,
tool_data_path = tool_data_path,
shed_tool_path = galaxy_shed_tool_path,
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Fix for rendering tool dependencies defined for tools cotnained in tool shed repositories.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/9bdc2c2e95aa/
changeset: 9bdc2c2e95aa
user: greg
date: 2013-01-11 22:49:36
summary: Fix for rendering tool dependencies defined for tools cotnained in tool shed repositories.
affected #: 2 files
diff -r 6cd4058d7b32f788e3c7484f0c5709e89cfbed0f -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e lib/galaxy/util/shed_util.py
--- a/lib/galaxy/util/shed_util.py
+++ b/lib/galaxy/util/shed_util.py
@@ -1251,18 +1251,20 @@
old_container_tool_dependencies_root = old_container_dict[ 'tool_dependencies' ]
if old_container_tool_dependencies_root:
if tool_dependencies_root_folder is None:
- tool_dependencies_root_folder = old_container_tool_dependencies_root
- # Change the folder id so it won't confict with others being merged.
- tool_dependencies_root_folder.id = folder_id
+ tool_dependencies_root_folder = container_util.Folder( id=folder_id, key='root', label='root', parent=None )
folder_id += 1
- tool_dependencies_folder = tool_dependencies_root_folder.folders[ 0 ]
- tool_dependencies_folder.id = folder_id
+ tool_dependencies_folder = container_util.Folder( id=folder_id,
+ key='merged',
+ label='Tool dependencies',
+ parent=tool_dependencies_root_folder )
folder_id += 1
else:
+ td_list = [ td.listify for td in tool_dependencies_folder.tool_dependencies ]
# The old_container_tool_dependencies_root will be a root folder containing a single sub_folder.
old_container_tool_dependencies_folder = old_container_tool_dependencies_root.folders[ 0 ]
for td in old_container_tool_dependencies_folder.tool_dependencies:
- tool_dependencies_folder.tool_dependencies.append( td )
+ if td.listify not in td_list:
+ tool_dependencies_folder.tool_dependencies.append( td )
if repository_dependencies_root_folder:
repository_dependencies_root_folder.folders.append( repository_dependencies_folder )
new_containers_dict[ 'repository_dependencies' ] = repository_dependencies_root_folder
diff -r 6cd4058d7b32f788e3c7484f0c5709e89cfbed0f -r 9bdc2c2e95aa05e9cc8cd01599b224e1a3c6b26e lib/galaxy/webapps/community/util/container_util.py
--- a/lib/galaxy/webapps/community/util/container_util.py
+++ b/lib/galaxy/webapps/community/util/container_util.py
@@ -112,6 +112,9 @@
self.installation_status = installation_status
self.repository_id = repository_id
self.tool_dependency_id = tool_dependency_id
+ @property
+ def listify( self ):
+ return [ self.name, self.version, self.type ]
class Workflow( object ):
"""Workflow object."""
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dan: Fix for external display applications and permissions.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/6cd4058d7b32/
changeset: 6cd4058d7b32
user: dan
date: 2013-01-11 21:41:29
summary: Fix for external display applications and permissions.
affected #: 1 file
diff -r 3674685eeb0d171b9f4c0b01e2c7f56cfba86ce7 -r 6cd4058d7b32f788e3c7484f0c5709e89cfbed0f lib/galaxy/webapps/galaxy/controllers/dataset.py
--- a/lib/galaxy/webapps/galaxy/controllers/dataset.py
+++ b/lib/galaxy/webapps/galaxy/controllers/dataset.py
@@ -163,8 +163,11 @@
assert hda and self._can_access_dataset( trans, hda )
return hda.creating_job
- def _can_access_dataset( self, trans, dataset_association, allow_admin=True ):
- return ( allow_admin and trans.user_is_admin() ) or trans.app.security_agent.can_access_dataset( trans.get_current_user_roles(), dataset_association.dataset )
+ def _can_access_dataset( self, trans, dataset_association, allow_admin=True, additional_roles=None ):
+ roles = trans.get_current_user_roles()
+ if additional_roles:
+ roles = roles + additional_roles
+ return ( allow_admin and trans.user_is_admin() ) or trans.app.security_agent.can_access_dataset( roles, dataset_association.dataset )
@web.expose
def errors( self, trans, id ):
@@ -736,7 +739,7 @@
link_name = urllib.unquote_plus( link_name )
if None in [ app_name, link_name ]:
return trans.show_error_message( "A display application name and link name must be provided." )
- if self._can_access_dataset( trans, data ):
+ if self._can_access_dataset( trans, data, additional_roles=user_roles ):
msg = []
refresh = False
display_app = trans.app.datatypes_registry.display_applications.get( app_name )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: Restrice display of certain tool dependency columns when in the tool shed.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/3674685eeb0d/
changeset: 3674685eeb0d
user: greg
date: 2013-01-11 21:09:33
summary: Restrice display of certain tool dependency columns when in the tool shed.
affected #: 1 file
diff -r 1bd3e09417b16e31d2ecce606ac1db5f5564e887 -r 3674685eeb0d171b9f4c0b01e2c7f56cfba86ce7 templates/webapps/community/repository/common.mako
--- a/templates/webapps/community/repository/common.mako
+++ b/templates/webapps/community/repository/common.mako
@@ -507,10 +507,12 @@
${version_str | h}
</${cell_type}><${cell_type}>${tool_dependency.type | h}</${cell_type}>
- %if is_missing:
- <${cell_type}>${tool_dependency.installation_status | h}</${cell_type}>
- %elif tool_dependency.install_dir:
- <${cell_type}>${tool_dependency.install_dir | h}</${cell_type}>
+ %if trans.webapp.name == 'galaxy':
+ %if is_missing:
+ <${cell_type}>${tool_dependency.installation_status | h}</${cell_type}>
+ %elif tool_dependency.install_dir:
+ <${cell_type}>${tool_dependency.install_dir | h}</${cell_type}>
+ %endif
%endif
</tr><%
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: greg: When installing multiple repositories sumultaneously, merge all repository and tool dependenceis into a single container for display.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/1bd3e09417b1/
changeset: 1bd3e09417b1
user: greg
date: 2013-01-11 21:06:35
summary: When installing multiple repositories sumultaneously, merge all repository and tool dependenceis into a single container for display.
affected #: 3 files
diff -r be7625f376d16cefa2fb4984fa6483f37b4b71d4 -r 1bd3e09417b16e31d2ecce606ac1db5f5564e887 lib/galaxy/util/shed_util.py
--- a/lib/galaxy/util/shed_util.py
+++ b/lib/galaxy/util/shed_util.py
@@ -1,4 +1,4 @@
-import os, tempfile, shutil, logging, urllib2
+import os, tempfile, shutil, logging, urllib2, threading
from galaxy.datatypes import checkers
from galaxy.web import url_for
from galaxy import util
@@ -1202,6 +1202,78 @@
def load_installed_display_applications( app, installed_repository_dict, deactivate=False ):
# Load or deactivate proprietary datatype display applications
app.datatypes_registry.load_display_applications( installed_repository_dict=installed_repository_dict, deactivate=deactivate )
+def merge_containers_dicts_for_new_install( containers_dicts ):
+ """
+ When installing one or more tool shed repositories for the first time, the received list of containers_dicts contains a containers_dict for
+ each repository being installed. Since the repositories are being installed for the first time, all entries are None except the repository
+ dependencies and tool dependencies. The entries for missing dependencies are all None since they have previously been merged into the installed
+ dependencies. This method will merge the dependencies entries into a single container and return it for display.
+ """
+ new_containers_dict = dict( readme_files=None,
+ datatypes=None,
+ missing_repository_dependencies=None,
+ repository_dependencies=None,
+ missing_tool_dependencies=None,
+ tool_dependencies=None,
+ invalid_tools=None,
+ valid_tools=None,
+ workflows=None )
+ if containers_dicts:
+ lock = threading.Lock()
+ lock.acquire( True )
+ try:
+ repository_dependencies_root_folder = None
+ tool_dependencies_root_folder = None
+ # Use a unique folder id (hopefully the following is).
+ folder_id = 867
+ for old_container_dict in containers_dicts:
+ # Merge repository_dependencies.
+ old_container_repository_dependencies_root = old_container_dict[ 'repository_dependencies' ]
+ if old_container_repository_dependencies_root:
+ if repository_dependencies_root_folder is None:
+ repository_dependencies_root_folder = container_util.Folder( id=folder_id, key='root', label='root', parent=None )
+ folder_id += 1
+ repository_dependencies_folder = container_util.Folder( id=folder_id,
+ key='merged',
+ label='Repository dependencies',
+ parent=repository_dependencies_root_folder )
+ folder_id += 1
+ # The old_container_repository_dependencies_root will be a root folder containing a single sub_folder.
+ old_container_repository_dependencies_folder = old_container_repository_dependencies_root.folders[ 0 ]
+ # Change the folder id so it won't confict with others being merged.
+ old_container_repository_dependencies_folder.id = folder_id
+ folder_id += 1
+ # Generate the label by retrieving the repository name.
+ toolshed, name, owner, changeset_revision = container_util.get_components_from_key( old_container_repository_dependencies_folder.key )
+ old_container_repository_dependencies_folder.label = str( name )
+ repository_dependencies_folder.folders.append( old_container_repository_dependencies_folder )
+ # Merge tool_dependencies.
+ old_container_tool_dependencies_root = old_container_dict[ 'tool_dependencies' ]
+ if old_container_tool_dependencies_root:
+ if tool_dependencies_root_folder is None:
+ tool_dependencies_root_folder = old_container_tool_dependencies_root
+ # Change the folder id so it won't confict with others being merged.
+ tool_dependencies_root_folder.id = folder_id
+ folder_id += 1
+ tool_dependencies_folder = tool_dependencies_root_folder.folders[ 0 ]
+ tool_dependencies_folder.id = folder_id
+ folder_id += 1
+ else:
+ # The old_container_tool_dependencies_root will be a root folder containing a single sub_folder.
+ old_container_tool_dependencies_folder = old_container_tool_dependencies_root.folders[ 0 ]
+ for td in old_container_tool_dependencies_folder.tool_dependencies:
+ tool_dependencies_folder.tool_dependencies.append( td )
+ if repository_dependencies_root_folder:
+ repository_dependencies_root_folder.folders.append( repository_dependencies_folder )
+ new_containers_dict[ 'repository_dependencies' ] = repository_dependencies_root_folder
+ if tool_dependencies_root_folder:
+ tool_dependencies_root_folder.folders.append( tool_dependencies_folder )
+ new_containers_dict[ 'tool_dependencies' ] = tool_dependencies_root_folder
+ except Exception, e:
+ log.debug( "Exception in merge_containers_dicts_for_new_install: %s" % str( e ) )
+ finally:
+ lock.release()
+ return new_containers_dict
def panel_entry_per_tool( tool_section_dict ):
# Return True if tool_section_dict looks like this.
# {<Tool guid> : [{ tool_config : <tool_config_file>, id: <ToolSection id>, version : <ToolSection version>, name : <TooSection name>}]}
diff -r be7625f376d16cefa2fb4984fa6483f37b4b71d4 -r 1bd3e09417b16e31d2ecce606ac1db5f5564e887 lib/galaxy/webapps/galaxy/controllers/admin_toolshed.py
--- a/lib/galaxy/webapps/galaxy/controllers/admin_toolshed.py
+++ b/lib/galaxy/webapps/galaxy/controllers/admin_toolshed.py
@@ -1266,7 +1266,6 @@
shed_tool_conf_select_field = None
tool_path = suc.get_tool_path_by_shed_tool_conf_filename( trans, shed_tool_conf )
tool_panel_section_select_field = build_tool_panel_section_select_field( trans )
- containers_dicts = []
if len( repo_info_dicts ) == 1:
# If we're installing a single repository, see if it contains a readme or dependencies that we can display.
repo_info_dict = repo_info_dicts[ 0 ]
@@ -1287,13 +1286,9 @@
missing_repository_dependencies=missing_repository_dependencies,
installed_tool_dependencies=installed_tool_dependencies,
missing_tool_dependencies=missing_tool_dependencies )
- containers_dicts.append( containers_dict )
else:
# We're installing a list of repositories, each of which may have tool dependencies or repository dependencies.
- all_installed_repository_dependencies = []
- all_missing_repository_dependencies = []
- all_installed_tool_dependencies = []
- all_missing_tool_dependencies = []
+ containers_dicts = []
for repo_info_dict in repo_info_dicts:
name, repository_owner, changeset_revision, includes_tool_dependencies, installed_repository_dependencies, \
missing_repository_dependencies, installed_tool_dependencies, missing_tool_dependencies = \
@@ -1302,11 +1297,13 @@
tool_shed_url=tool_shed_url,
tool_path=tool_path,
readme_files_dict=None,
- repository_dependencies=installed_repository_dependencies,
+ installed_repository_dependencies=installed_repository_dependencies,
missing_repository_dependencies=missing_repository_dependencies,
- tool_dependencies=installed_tool_dependencies,
+ installed_tool_dependencies=installed_tool_dependencies,
missing_tool_dependencies=missing_tool_dependencies )
containers_dicts.append( containers_dict )
+ # Merge all containers into a single container.
+ containers_dict = shed_util.merge_containers_dicts_for_new_install( containers_dicts )
# Handle tool dependencies chack box.
if trans.app.config.tool_dependency_dir is None:
if includes_tool_dependencies:
@@ -1327,7 +1324,7 @@
includes_repository_dependencies=includes_repository_dependencies,
install_repository_dependencies_check_box=install_repository_dependencies_check_box,
new_tool_panel_section=new_tool_panel_section,
- containers_dicts=containers_dicts,
+ containers_dict=containers_dict,
shed_tool_conf=shed_tool_conf,
shed_tool_conf_select_field=shed_tool_conf_select_field,
tool_panel_section_select_field=tool_panel_section_select_field,
diff -r be7625f376d16cefa2fb4984fa6483f37b4b71d4 -r 1bd3e09417b16e31d2ecce606ac1db5f5564e887 templates/admin/tool_shed_repository/select_tool_panel_section.mako
--- a/templates/admin/tool_shed_repository/select_tool_panel_section.mako
+++ b/templates/admin/tool_shed_repository/select_tool_panel_section.mako
@@ -20,25 +20,24 @@
# this case, the repository metadata is an empty dictionary, but one or both of includes_repository_dependencies
# and includes_tool_dependencies may be True. If either of these are True but we have no metadata, we cannot install
# repository dependencies on this pass.
-
- # TODO: do we need this?
- can_display_repository_dependencies = False
- can_display_tool_dependencies = False
-
- if includes_repository_dependencies and not can_display_repository_dependencies:
- for containers_dict in containers_dicts:
- repository_dependencies = containers_dict[ 'repository_dependencies' ]
- missing_repository_dependencies = containers_dict[ 'missing_repository_dependencies' ]
- if repository_dependencies or missing_repository_dependencies:
- can_display_repository_dependencies = True
- break
- if includes_tool_dependencies and not can_display_tool_dependencies:
- for containers_dict in containers_dicts:
- tool_dependencies = containers_dict[ 'tool_dependencies' ]
- missing_tool_dependencies = containers_dict[ 'missing_tool_dependencies' ]
- if tool_dependencies or missing_tool_dependencies:
- can_display_tool_dependencies = True
- break
+ if includes_repository_dependencies:
+ repository_dependencies = containers_dict[ 'repository_dependencies' ]
+ missing_repository_dependencies = containers_dict[ 'missing_repository_dependencies' ]
+ if repository_dependencies or missing_repository_dependencies:
+ can_display_repository_dependencies = True
+ else:
+ can_display_repository_dependencies = False
+ else:
+ can_display_repository_dependencies = False
+ if includes_tool_dependencies:
+ tool_dependencies = containers_dict[ 'tool_dependencies' ]
+ missing_tool_dependencies = containers_dict[ 'missing_tool_dependencies' ]
+ if tool_dependencies or missing_tool_dependencies:
+ can_display_tool_dependencies = True
+ else:
+ can_display_tool_dependencies = False
+ else:
+ can_display_tool_dependencies = False
%>
%if message:
@@ -63,14 +62,7 @@
<div class="toolFormBody"><form name="select_tool_panel_section" id="select_tool_panel_section" action="${h.url_for( controller='admin_toolshed', action='prepare_for_install', tool_shed_url=tool_shed_url, encoded_repo_info_dicts=encoded_repo_info_dicts, includes_tools=includes_tools, includes_tool_dependencies=includes_tool_dependencies )}" method="post" ><div style="clear: both"></div>
- <%
- if len( containers_dicts ) == 1:
- containers_dict = containers_dicts[ 0 ]
- readme_files_dict = containers_dict.get( 'readme_files', None )
- else:
- containers_dict = None
- readme_files_dict = None
- %>
+ <% readme_files_dict = containers_dict.get( 'readme_files', None ) %>
%if readme_files_dict:
<div class="form-row"><table class="colored" width="100%">
@@ -86,15 +78,8 @@
<th bgcolor="#EBD9B2">Confirm dependency installation</th></table></div>
- %for index, containers_dict in enumerate( containers_dicts ):
- %if index == 0:
- ${render_dependencies_section( install_repository_dependencies_check_box, install_tool_dependencies_check_box, containers_dict )}
- <div style="clear: both"></div>
- %else:
- ${render_dependencies_section( None, None, containers_dict )}
- <div style="clear: both"></div>
- %endif
- %endfor
+ ${render_dependencies_section( install_repository_dependencies_check_box, install_tool_dependencies_check_box, containers_dict )}
+ <div style="clear: both"></div>
%endif
<div class="form-row"><table class="colored" width="100%">
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: james_taylor: ClusterJobRunner: make sure workers are daemon threads, shutdown now works again
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/be7625f376d1/
changeset: be7625f376d1
user: james_taylor
date: 2013-01-11 20:49:24
summary: ClusterJobRunner: make sure workers are daemon threads, shutdown now works again
affected #: 1 file
diff -r 71a616ffce1c85aa31d3e5311d5babfc12c49381 -r be7625f376d16cefa2fb4984fa6483f37b4b71d4 lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -152,6 +152,7 @@
nworkers = self.app.config.cluster_job_queue_workers
for i in range( nworkers ):
worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
+ worker.setDaemon( True )
worker.start()
self.work_threads.append( worker )
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: dan: Fixes for showing job parameters when tools have been updated.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/71a616ffce1c/
changeset: 71a616ffce1c
user: dan
date: 2013-01-11 20:41:12
summary: Fixes for showing job parameters when tools have been updated.
affected #: 3 files
diff -r e9f81b5154bb71c6c12d4591c8f9519e52c93212 -r 71a616ffce1c85aa31d3e5311d5babfc12c49381 lib/galaxy/tools/parameters/basic.py
--- a/lib/galaxy/tools/parameters/basic.py
+++ b/lib/galaxy/tools/parameters/basic.py
@@ -1347,7 +1347,7 @@
rval = []
for val in value:
rval.append( get_option_display( val, self.options ) or val )
- return "\n".join( rval ) + suffix
+ return "\n".join( map( str, rval ) ) + suffix
def get_dependencies( self ):
"""
@@ -1611,9 +1611,11 @@
if value and not isinstance( value, list ):
value = [ value ]
if value:
- return ", ".join( [ "%s: %s" % ( item.hid, item.name ) for item in value ] )
- else:
- return "No dataset"
+ try:
+ return ", ".join( [ "%s: %s" % ( item.hid, item.name ) for item in value ] )
+ except:
+ pass
+ return "No dataset"
def validate( self, value, history=None ):
for validator in self.validators:
diff -r e9f81b5154bb71c6c12d4591c8f9519e52c93212 -r 71a616ffce1c85aa31d3e5311d5babfc12c49381 lib/galaxy/webapps/galaxy/controllers/dataset.py
--- a/lib/galaxy/webapps/galaxy/controllers/dataset.py
+++ b/lib/galaxy/webapps/galaxy/controllers/dataset.py
@@ -997,6 +997,8 @@
params_objects = None
job = None
tool = None
+ upgrade_messages = {}
+ has_parameter_errors = False
inherit_chain = hda.source_dataset_chain
if inherit_chain:
job_dataset_association, dataset_association_container_name = inherit_chain[-1]
@@ -1013,12 +1015,19 @@
toolbox = self.get_toolbox()
tool = toolbox.get_tool( job.tool_id )
assert tool is not None, 'Requested tool has not been loaded.'
- params_objects = job.get_param_values( trans.app )
+ #Load parameter objects, if a parameter type has changed, its possible for the value to no longer be valid
+ try:
+ params_objects = job.get_param_values( trans.app, ignore_errors=False )
+ except:
+ params_objects = job.get_param_values( trans.app, ignore_errors=True )
+ upgrade_messages = tool.check_and_update_param_values( job.get_param_values( trans.app, ignore_errors=True ), trans, update_values=False ) #use different param_objects here, since we want to display original values as much as possible
+ has_parameter_errors = True
except:
pass
if job is None:
return trans.show_error_message( "Job information is not available for this dataset." )
- return trans.fill_template( "show_params.mako", inherit_chain=inherit_chain, history=trans.get_history(), hda=hda, job=job, tool=tool, params_objects=params_objects )
+ #TODO: we should provide the basic values along with the objects, in order to better handle reporting of old values during upgrade
+ return trans.fill_template( "show_params.mako", inherit_chain=inherit_chain, history=trans.get_history(), hda=hda, job=job, tool=tool, params_objects=params_objects, upgrade_messages=upgrade_messages, has_parameter_errors=has_parameter_errors )
@web.expose
def copy_datasets( self, trans, source_history=None, source_dataset_ids="", target_history_id=None, target_history_ids="", new_history_name="", do_copy=False, **kwd ):
diff -r e9f81b5154bb71c6c12d4591c8f9519e52c93212 -r 71a616ffce1c85aa31d3e5311d5babfc12c49381 templates/show_params.mako
--- a/templates/show_params.mako
+++ b/templates/show_params.mako
@@ -1,4 +1,5 @@
<%inherit file="/base.mako"/>
+<%namespace file="/message.mako" import="render_msg" /><% from galaxy.util import nice_size %><style>
@@ -10,7 +11,11 @@
}
</style>
-<%def name="inputs_recursive( input_params, param_values, depth=1 )">
+<%def name="inputs_recursive( input_params, param_values, depth=1, upgrade_messages=None )">
+ <%
+ if upgrade_messages is None:
+ upgrade_messages = {}
+ %>
%for input_index, input in enumerate( input_params.itervalues() ):
%if input.name in param_values:
%if input.type == "repeat":
@@ -18,17 +23,47 @@
${ inputs_recursive(input.inputs, param_values[input.name][i], depth=depth+1) }
%endfor
%elif input.type == "conditional":
- <% current_case = param_values[input.name]['__current_case__'] %>
+ <%
+ try:
+ current_case = param_values[input.name]['__current_case__']
+ is_valid = True
+ except:
+ current_case = None
+ is_valid = False
+ %>
+ %if is_valid:
+ <tr>
+ ${ inputs_recursive_indent( text=input.test_param.label, depth=depth )}
+ ##<!-- Get the value of the current Conditional parameter -->
+ <td>${input.cases[current_case].value | h}</td>
+ <td></td>
+ </tr>
+ ${ inputs_recursive( input.cases[current_case].inputs, param_values[input.name], depth=depth+1, upgrade_messages=upgrade_messages.get( input.name ) ) }
+ %else:
+ <tr>
+ ${ inputs_recursive_indent( text=input.name, depth=depth )}
+ <td><em>The previously used value is no longer valid</em></td>
+ <td></td>
+ </tr>
+ %endif
+ %elif input.type == "upload_dataset":
+ <tr>
+ ${inputs_recursive_indent( text=input.group_title( param_values ), depth=depth )}
+ <td>${ len( param_values[input.name] ) } uploaded datasets</td>
+ <td></td>
+ </tr>
+ %elif input.visible:
+ <%
+ if hasattr( input, "label" ) and input.label:
+ label = input.label
+ else:
+ #value for label not required, fallback to input name (same as tool panel)
+ label = input.name
+ %><tr>
- ${ inputs_recursive_indent( text=input.test_param.label, depth=depth )}
- <!-- Get the value of the current Conditonal parameter -->
- <td>${input.cases[current_case].value}</td>
- </tr>
- ${ inputs_recursive(input.cases[current_case].inputs, param_values[input.name], depth=depth+1) }
- %elif getattr(input, "label", None):
- <tr>
- ${inputs_recursive_indent( text=input.label, depth=depth )}
- <td>${input.value_to_display_text(param_values[input.name], trans.app)}</td>
+ ${inputs_recursive_indent( text=label, depth=depth )}
+ <td>${input.value_to_display_text( param_values[input.name], trans.app ) | h}</td>
+ <td>${ upgrade_messages.get( input.name, '' ) | h }</td></tr>
%endif
%else:
@@ -38,11 +73,14 @@
# Get parameter label.
if input.type == "conditional":
label = input.test_param.label
+ elif input.type == "repeat":
+ label = input.label()
else:
- label = input.label
+ label = input.label or input.name
%>
${inputs_recursive_indent( text=label, depth=depth )}
<td><em>not used (parameter was added after this job was run)</em></td>
+ <td></td></tr>
%endif
@@ -52,7 +90,7 @@
## function to add a indentation depending on the depth in a <tr><%def name="inputs_recursive_indent( text, depth )"><td style="padding-left: ${ ( depth - 1 ) * 10 }px">
- ${text}
+ ${text | h}
</td></%def>
@@ -60,50 +98,57 @@
<thead><tr><th colspan="2" style="font-size: 120%;">
% if tool:
- Tool: ${tool.name}
+ Tool: ${tool.name | h}
% else:
Unknown Tool
% endif
</th></tr></thead><tbody>
- <tr><td>Name:</td><td>${hda.name}</td></tr>
+ <tr><td>Name:</td><td>${hda.name | h}</td></tr><tr><td>Created:</td><td>${hda.create_time.strftime("%b %d, %Y")}</td></tr>
## <tr><td>Copied from another history?</td><td>${hda.source_library_dataset}</td></tr><tr><td>Filesize:</td><td>${nice_size(hda.dataset.file_size)}</td></tr>
- <tr><td>Dbkey:</td><td>${hda.dbkey}</td></tr>
- <tr><td>Format:</td><td>${hda.ext}</td></tr>
- <tr><td>Tool Version:</td><td>${hda.tool_version}</td></tr>
+ <tr><td>Dbkey:</td><td>${hda.dbkey | h}</td></tr>
+ <tr><td>Format:</td><td>${hda.ext | h}</td></tr>
+ <tr><td>Tool Version:</td><td>${hda.tool_version | h}</td></tr><tr><td>Tool Standard Output:</td><td><a href="${h.url_for( controller='dataset', action='stdout')}">stdout</a></td></tr><tr><td>Tool Standard Error:</td><td><a href="${h.url_for( controller='dataset', action='stderr')}">stderr</a></td></tr>
- <tr><td>Tool Exit Code:</td><td>${job.exit_code}</td></tr>
+ <tr><td>Tool Exit Code:</td><td>${job.exit_code | h}</td></tr>
%if trans.user_is_admin() or trans.app.config.expose_dataset_path:
- <tr><td>Full Path:</td><td>${hda.file_name}</td></tr>
+ <tr><td>Full Path:</td><td>${hda.file_name | h}</td></tr>
%endif
</table><br />
+
<table class="tabletip"><thead><tr><th>Input Parameter</th><th>Value</th>
+ <th>Note for rerun</th></tr></thead><tbody>
% if params_objects and tool:
- ${ inputs_recursive(tool.inputs, params_objects, depth=1) }
+ ${ inputs_recursive( tool.inputs, params_objects, depth=1, upgrade_messages=upgrade_messages ) }
+ %elif params_objects is None:
+ <tr><td colspan="2">Unable to load parameters.</td></tr>
% else:
<tr><td colspan="2">No parameters.</td></tr>
% endif
</tbody></table>
-
+%if has_parameter_errors:
+ <br />
+ ${ render_msg( 'One or more of your original parameters may no longer be valid or displayed properly.', status='warning' ) }
+%endif
<h3>Inheritance Chain</h3>
- <div class="inherit" style="background-color: #fff; font-weight:bold;">${hda.name}</div>
+ <div class="inherit" style="background-color: #fff; font-weight:bold;">${hda.name | h}</div>
% for dep in inherit_chain:
<div style="font-size: 36px; text-align: center;">↑</div>
- <div class="inherit">${dep[0].name}<br/>${dep[1]}</div>
+ <div class="inherit">${dep[0].name | h}<br/>${dep[1]}</div>
% endfor
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: inithello: Fix for installing a repository with set_environment tool dependencies.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/e9f81b5154bb/
changeset: e9f81b5154bb
user: inithello
date: 2013-01-11 19:31:02
summary: Fix for installing a repository with set_environment tool dependencies.
affected #: 1 file
diff -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 -r e9f81b5154bb71c6c12d4591c8f9519e52c93212 lib/galaxy/util/shed_util.py
--- a/lib/galaxy/util/shed_util.py
+++ b/lib/galaxy/util/shed_util.py
@@ -791,9 +791,17 @@
tool_dependencies = {}
missing_tool_dependencies = {}
for td_key, val in all_tool_dependencies.items():
- # Since we have a new install, missing tool dependencies have never been installed.
- val[ 'status' ] = trans.model.ToolDependency.installation_status.NEVER_INSTALLED
- missing_tool_dependencies[ td_key ] = val
+ # Set environment tool dependencies are a list, set each member to never installed.
+ if td_key == 'set_environment':
+ new_val = []
+ for requirement_dict in val:
+ requirement_dict[ 'status' ] = trans.model.ToolDependency.installation_status.NEVER_INSTALLED
+ new_val.append( requirement_dict )
+ missing_tool_dependencies[ td_key ] = new_val
+ else:
+ # Since we have a new install, missing tool dependencies have never been installed.
+ val[ 'status' ] = trans.model.ToolDependency.installation_status.NEVER_INSTALLED
+ missing_tool_dependencies[ td_key ] = val
else:
tool_dependencies = None
missing_tool_dependencies = None
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
11 new commits in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/611bc9467b22/
changeset: 611bc9467b22
user: jmchilton
date: 2013-01-11 17:53:05
summary: Implement new job runner super class ClusterJobRunner intended to reduce amount of duplicated code between drmaa, pbs, and lwr job runners (also I guess cli and condor classes seem like they could benefit from this as well). This super class will manage the monitor and worker threads and queues.
I am submitting only changes to the LWR that use this class, but I would encourage the Galaxy team to refactor the drmaa and pbs runners to use this class as well (or I would be happy to make these changes if given access or a promise the changes will be accepted quickly).
A variant of the drmaa runner that has been refactored to use this class can be found here: https://bitbucket.org/jmchilton/galaxy-central-lwr-enhancement-1/src/tip/li… from the now defunct pull request 80.
affected #: 1 file
diff -r 3faa833c15b5162db69f061bdbd4e951ba2ffdc7 -r 611bc9467b222bb1cf0b1ef5d5b6120f210a471f lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -1,5 +1,10 @@
import os, logging, os.path
+from galaxy import model
+from Queue import Queue, Empty
+import time
+import threading
+
log = logging.getLogger( __name__ )
class BaseJobRunner( object ):
@@ -90,3 +95,158 @@
set_extension = False,
kwds = { 'overwrite' : False } )
return commands
+
+class ClusterJobState( object ):
+ """
+ Encapsulate the state of a cluster job, this should be subclassed as
+ needed for various job runners to capture additional information needed
+ to communicate with cluster job manager.
+ """
+
+ def __init__( self ):
+ self.job_wrapper = None
+ self.job_id = None
+ self.old_state = None
+ self.running = False
+ self.runner_url = None
+
+STOP_SIGNAL = object()
+
+JOB_STATUS_QUEUED = 'queue'
+JOB_STATUS_FAILED = 'fail'
+JOB_STATUS_FINISHED = 'finish'
+
+class ClusterJobRunner( BaseJobRunner ):
+ """
+ Not sure this is the best name for this class, but there is common code
+ shared between sge, pbs, drmaa, etc...
+ """
+
+ def __init__( self, app ):
+ self.app = app
+ self.sa_session = app.model.context
+ # 'watched' and 'queue' are both used to keep track of jobs to watch.
+ # 'queue' is used to add new watched jobs, and can be called from
+ # any thread (usually by the 'queue_job' method). 'watched' must only
+ # be modified by the monitor thread, which will move items from 'queue'
+ # to 'watched' and then manage the watched jobs.
+ self.watched = []
+ self.monitor_queue = Queue()
+
+ def _init_monitor_thread(self):
+ self.monitor_thread = threading.Thread( name="%s.monitor_thread" % self.runner_name, target=self.monitor )
+ self.monitor_thread.setDaemon( True )
+ self.monitor_thread.start()
+
+ def _init_worker_threads(self):
+ self.work_queue = Queue()
+ self.work_threads = []
+ nworkers = self.app.config.cluster_job_queue_workers
+ for i in range( nworkers ):
+ worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
+ worker.start()
+ self.work_threads.append( worker )
+
+ def handle_stop(self):
+ # DRMAA and SGE runners should override this and disconnect.
+ pass
+
+ def monitor( self ):
+ """
+ Watches jobs currently in the cluster queue and deals with state changes
+ (queued to running) and job completion
+ """
+ while 1:
+ # Take any new watched jobs and put them on the monitor list
+ try:
+ while 1:
+ cluster_job_state = self.monitor_queue.get_nowait()
+ if cluster_job_state is STOP_SIGNAL:
+ # TODO: This is where any cleanup would occur
+ self.handle_stop()
+ return
+ self.watched.append( cluster_job_state )
+ except Empty:
+ pass
+ # Iterate over the list of watched jobs and check state
+ self.check_watched_items()
+ # Sleep a bit before the next state check
+ time.sleep( 1 )
+
+ def run_next( self ):
+ """
+ Run the next item in the queue (a job waiting to run or finish )
+ """
+ while 1:
+ ( op, obj ) = self.work_queue.get()
+ if op is STOP_SIGNAL:
+ return
+ try:
+ if op == JOB_STATUS_QUEUED:
+ # If the next item is to be run, then only run it if the
+ # job state is "queued". Otherwise the next item was either
+ # cancelled or one of its siblings encountered an error.
+ job_state = obj.get_state()
+ if model.Job.states.QUEUED == job_state:
+ self.queue_job( obj )
+ else:
+ log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) )
+ elif op == JOB_STATUS_FINISHED:
+ self.finish_job( obj )
+ elif op == JOB_STATUS_FAILED:
+ self.fail_job( obj )
+ except:
+ log.exception( "Uncaught exception %sing job" % op )
+
+ def monitor_job(self, job_state):
+ self.monitor_queue.put( job_state )
+
+ def put( self, job_wrapper ):
+ """Add a job to the queue (by job identifier)"""
+ # Change to queued state before handing to worker thread so the runner won't pick it up again
+ job_wrapper.change_state( model.Job.states.QUEUED )
+ self.mark_as_queued(job_wrapper)
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the monitor thread"""
+ log.info( "sending stop signal to worker threads" )
+ self.monitor_queue.put( STOP_SIGNAL )
+ for i in range( len( self.work_threads ) ):
+ self.work_queue.put( ( STOP_SIGNAL, None ) )
+
+ def check_watched_items(self):
+ """
+ This method is responsible for iterating over self.watched and handling
+ state changes and updating self.watched with a new list of watched job
+ states. Subclasses can opt to override this directly (as older job runners will
+ initially) or just override check_watched_item and allow the list processing to
+ reuse the logic here.
+ """
+ new_watched = []
+ for cluster_job_state in self.watched:
+ new_cluster_job_state = self.check_watched_item(cluster_job_state)
+ if new_cluster_job_state:
+ new_watched.append(new_cluster_job_state)
+ self.watched = new_watched
+
+ # Subclasses should implement this unless they override check_watched_items all together.
+ def check_watched_item(self):
+ raise NotImplementedError()
+
+ def queue_job(self, job_wrapper):
+ raise NotImplementedError()
+
+ def finish_job(self, job_state):
+ raise NotImplementedError()
+
+ def fail_job(self, job_state):
+ raise NotImplementedError()
+
+ def mark_as_finished(self, job_state):
+ self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) )
+
+ def mark_as_failed(self, job_state):
+ self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) )
+
+ def mark_as_queued(self, job_wrapper):
+ self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) )
https://bitbucket.org/galaxy/galaxy-central/commits/57d33740aa70/
changeset: 57d33740aa70
user: jmchilton
date: 2013-01-11 17:53:05
summary: Refactor the LWRJobRunner to be a ClusterJobRunner and implement a recover method for this runner, the upshot of this is that LWR jobs can now survive Galaxy restarts. Downside is that jobs are no longer queued on Galaxy server, so LWR server should be updated (to changeset 5213f6d or newer) to queue jobs on the remote server. This is not manidatory however, this will still work it is just more jobs may run simultaneously than is desired.
affected #: 1 file
diff -r 611bc9467b222bb1cf0b1ef5d5b6120f210a471f -r 57d33740aa70c18c0fa082c0e2ef40042edb62e4 lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -7,7 +7,7 @@
from galaxy import model
from galaxy.datatypes.data import nice_size
-from galaxy.jobs.runners import BaseJobRunner
+from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner
import os, errno
from time import sleep
@@ -199,12 +199,18 @@
def wait(self):
""" """
while True:
- check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
- complete = check_complete_response["complete"] == "true"
+ complete = self.check_complete()
if complete:
return check_complete_response
time.sleep(1)
+ def raw_check_complete(self):
+ check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
+ return check_complete_response
+
+ def check_complete(self):
+ return self.raw_check_complete()["complete"] == "true"
+
def clean(self):
self.__raw_execute("clean", { "job_id" : self.job_id })
@@ -213,51 +219,34 @@
-class LwrJobRunner( BaseJobRunner ):
+class LwrJobRunner( ClusterJobRunner ):
"""
- Lwr Job Runner
+ LWR Job Runner
"""
- STOP_SIGNAL = object()
+ runner_name = "LWRRunner"
+
def __init__( self, app ):
- """Start the job runner with 'nworkers' worker threads"""
- self.app = app
- self.sa_session = app.model.context
+ """Start the job runner """
+ super( LwrJobRunner, self ).__init__( app )
+ self._init_monitor_thread()
+ log.info( "starting LWR workers" )
+ self._init_worker_threads()
- # start workers
- self.queue = Queue()
- self.threads = []
- nworkers = app.config.local_job_queue_workers
- log.info( "starting workers" )
- for i in range( nworkers ):
- worker = threading.Thread( ( name="LwrJobRunner.thread-%d" % i ), target=self.run_next )
- worker.setDaemon( True )
- worker.start()
- self.threads.append( worker )
- log.debug( "%d workers ready", nworkers )
+ def check_watched_item(self, job_state):
+ try:
+ client = self.get_client_from_state(job_state)
+ complete = client.check_complete()
+ except Exception:
+ # An orphaned job was put into the queue at app startup, so remote server went down
+ # either way we are done I guess.
+ self.mark_as_finished(job_state)
+ return None
+ if complete:
+ self.mark_as_finished(job_state)
+ return None
+ return job_state
- def run_next( self ):
- """Run the next job, waiting until one is available if neccesary"""
- while 1:
- job_wrapper = self.queue.get()
- if job_wrapper is self.STOP_SIGNAL:
- return
- try:
- self.run_job( job_wrapper )
- except:
- log.exception( "Uncaught exception running job" )
-
- def determine_lwr_url(self, url):
- lwr_url = url[ len( 'lwr://' ) : ]
- return lwr_url
-
- def get_client_from_wrapper(self, job_wrapper):
- return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id )
-
- def get_client(self, job_runner, job_id):
- lwr_url = self.determine_lwr_url( job_runner )
- return Client(lwr_url, job_id)
-
- def run_job( self, job_wrapper ):
+ def queue_job(self, job_wrapper):
stderr = stdout = command_line = ''
runner_url = job_wrapper.get_job_runner_url()
@@ -277,35 +266,76 @@
return
# If we were able to get a command line, run the job
- if command_line:
- try:
- #log.debug( 'executing: %s' % command_line )
- client = self.get_client_from_wrapper(job_wrapper)
- output_fnames = job_wrapper.get_output_fnames()
- output_files = [ str( o ) for o in output_fnames ]
- input_files = job_wrapper.get_input_fnames()
- file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir)
- rebuilt_command_line = file_stager.get_rewritten_command_line()
- client.launch( rebuilt_command_line )
+ if not command_line:
+ job_wrapper.finish( '', '' )
+ return
- job_wrapper.set_runner( runner_url, job_wrapper.job_id )
- job_wrapper.change_state( model.Job.states.RUNNING )
+ try:
+ #log.debug( 'executing: %s' % command_line )
+ client = self.get_client_from_wrapper(job_wrapper)
+ output_files = self.get_output_files(job_wrapper)
+ input_files = job_wrapper.get_input_fnames()
+ file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir)
+ rebuilt_command_line = file_stager.get_rewritten_command_line()
+ client.launch( rebuilt_command_line )
+ job_wrapper.set_runner( runner_url, job_wrapper.job_id )
+ job_wrapper.change_state( model.Job.states.RUNNING )
- run_results = client.wait()
- log.debug('run_results %s' % run_results )
- stdout = run_results['stdout']
- stderr = run_results['stderr']
+ except Exception, exc:
+ job_wrapper.fail( "failure running job", exception=True )
+ log.exception("failure running job %d" % job_wrapper.job_id)
+ return
-
- if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
- for output_file in output_files:
- client.download_output(output_file)
- client.clean()
- log.debug('execution finished: %s' % command_line)
- except Exception, exc:
- job_wrapper.fail( "failure running job", exception=True )
- log.exception("failure running job %d" % job_wrapper.job_id)
- return
+ lwr_job_state = ClusterJobState()
+ lwr_job_state.job_wrapper = job_wrapper
+ lwr_job_state.job_id = job_wrapper.job_id
+ lwr_job_state.old_state = True
+ lwr_job_state.running = True
+ lwr_job_state.runner_url = runner_url
+ self.monitor_job(lwr_job_state)
+
+ def get_output_files(self, job_wrapper):
+ output_fnames = job_wrapper.get_output_fnames()
+ return [ str( o ) for o in output_fnames ]
+
+
+ def determine_lwr_url(self, url):
+ lwr_url = url[ len( 'lwr://' ) : ]
+ return lwr_url
+
+ def get_client_from_wrapper(self, job_wrapper):
+ return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id )
+
+ def get_client_from_state(self, job_state):
+ job_runner = job_state.runner_url
+ job_id = job_state.job_id
+ return self.get_client(job_runner, job_id)
+
+ def get_client(self, job_runner, job_id):
+ lwr_url = self.determine_lwr_url( job_runner )
+ return Client(lwr_url, job_id)
+
+ def finish_job( self, job_state ):
+ stderr = stdout = command_line = ''
+ job_wrapper = job_state.job_wrapper
+ try:
+ client = self.get_client_from_state(job_state)
+
+ run_results = client.raw_check_complete()
+ log.debug('run_results %s' % run_results )
+ stdout = run_results['stdout']
+ stderr = run_results['stderr']
+
+ if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
+ output_files = self.get_output_files(job_wrapper)
+ for output_file in output_files:
+ client.download_output(output_file)
+ client.clean()
+ log.debug('execution finished: %s' % command_line)
+ except Exception, exc:
+ job_wrapper.fail( "failure running job", exception=True )
+ log.exception("failure running job %d" % job_wrapper.job_id)
+ return
#run the metadata setting script here
#this is terminate-able when output dataset/job is deleted
#so that long running set_meta()s can be canceled without having to reboot the server
@@ -321,7 +351,7 @@
job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session )
external_metadata_proc.wait()
log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id )
-
+
# Finish the job
try:
job_wrapper.finish( stdout, stderr )
@@ -329,12 +359,13 @@
log.exception("Job wrapper finish method failed")
job_wrapper.fail("Unable to finish job", exception=True)
- def put( self, job_wrapper ):
- """Add a job to the queue (by job identifier)"""
- # Change to queued state before handing to worker thread so the runner won't pick it up again
- job_wrapper.change_state( model.Job.states.QUEUED )
- self.queue.put( job_wrapper )
-
+ def fail_job( self, job_state ):
+ """
+ Seperated out so we can use the worker threads for it.
+ """
+ self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) )
+ job_state.job_wrapper.fail( job_state.fail_message )
+
def shutdown( self ):
"""Attempts to gracefully shut down the worker threads"""
log.info( "sending stop signal to worker threads" )
@@ -383,7 +414,21 @@
log.debug("Attempt remote lwr kill of job with url %s and id %s" % (lwr_url, job_id))
client = self.get_client(lwr_url, job_id)
client.kill()
+
+
def recover( self, job, job_wrapper ):
- # local jobs can't be recovered
- job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." )
-
+ """Recovers jobs stuck in the queued/running state when Galaxy started"""
+ job_state = ClusterJobState()
+ job_state.job_id = str( job.get_job_runner_external_id() )
+ job_state.runner_url = job_wrapper.get_job_runner_url()
+ job_wrapper.command_line = job.get_command_line()
+ job_state.job_wrapper = job_wrapper
+ if job.get_state() == model.Job.states.RUNNING:
+ log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) )
+ job_state.old_state = True
+ job_state.running = True
+ self.monitor_queue.put( job_state )
+ elif job.get_state() == model.Job.states.QUEUED:
+ # LWR doesn't queue currently, so this indicates galaxy was shutoff while
+ # job was being staged. Not sure how to recover from that.
+ job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted. Please retry the job." )
https://bitbucket.org/galaxy/galaxy-central/commits/0a2976ff3b73/
changeset: 0a2976ff3b73
user: jmchilton
date: 2013-01-11 17:53:05
summary: Allow execution of jobs created by task splitting via the LWR job runner.
affected #: 1 file
diff -r 57d33740aa70c18c0fa082c0e2ef40042edb62e4 -r 0a2976ff3b73d5130674c1e1019e3425b16d1fc0 lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -27,13 +27,14 @@
class FileStager(object):
- def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir):
+ def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory):
self.client = client
self.command_line = command_line
self.config_files = config_files
self.input_files = input_files
self.output_files = output_files
self.tool_dir = os.path.abspath(tool_dir)
+ self.working_directory = working_directory
self.file_renames = {}
@@ -46,7 +47,9 @@
self.__initialize_referenced_tool_files()
self.__upload_tool_files()
self.__upload_input_files()
+ self.__upload_working_directory_files()
self.__initialize_output_file_renames()
+ self.__initialize_task_output_file_renames()
self.__initialize_config_file_renames()
self.__rewrite_and_upload_config_files()
self.__rewrite_command_line()
@@ -69,13 +72,27 @@
for input_file in self.input_files:
input_upload_response = self.client.upload_input(input_file)
self.file_renames[input_file] = input_upload_response['path']
-
+
+ def __upload_working_directory_files(self):
+ # Task manager stages files into working directory, these need to be uploaded
+ for working_directory_file in os.listdir(self.working_directory):
+ path = os.path.join(self.working_directory, working_directory_file)
+ working_file_response = self.client.upload_working_directory_file(path)
+ self.file_renames[path] = working_file_response['path']
+
def __initialize_output_file_renames(self):
for output_file in self.output_files:
self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
self.remote_path_separator,
os.path.basename(output_file))
+ def __initialize_task_output_file_renames(self):
+ for output_file in self.output_files:
+ name = os.path.basename(output_file)
+ self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory,
+ self.remote_path_separator,
+ name)
+
def __initialize_config_file_renames(self):
for config_file in self.config_files:
self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
@@ -172,13 +189,27 @@
def upload_config_file(self, path, contents):
return self.__upload_contents("upload_config_file", path, contents)
-
- def download_output(self, path):
+
+ def upload_working_directory_file(self, path):
+ return self.__upload_file("upload_working_directory_file", path)
+
+ def _get_output_type(self, name):
+ return self.__raw_execute_and_parse('get_output_type', {'name': name,
+ 'job_id': self.job_id})
+
+ def download_output(self, path, working_directory):
""" """
name = os.path.basename(path)
- response = self.__raw_execute('download_output', {'name' : name,
- "job_id" : self.job_id})
- output = open(path, 'wb')
+ output_type = self._get_output_type(name)
+ response = self.__raw_execute('download_output', {'name' : name,
+ "job_id" : self.job_id,
+ 'output_type': output_type})
+ if output_type == 'direct':
+ output = open(path, 'wb')
+ elif output_type == 'task':
+ output = open(os.path.join(working_directory, name), 'wb')
+ else:
+ raise Exception("No remote output found for dataset with path %s" % path)
try:
while True:
buffer = response.read(1024)
@@ -254,7 +285,7 @@
try:
job_wrapper.prepare()
if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None:
- for cmd in job_wrapper.prepare_input_file_cmds: # run the commands to stage the input files
+ for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files
#log.debug( 'executing: %s' % cmd )
if 0 != os.system(cmd):
raise Exception('Error running file staging command: %s' % cmd)
@@ -275,7 +306,8 @@
client = self.get_client_from_wrapper(job_wrapper)
output_files = self.get_output_files(job_wrapper)
input_files = job_wrapper.get_input_fnames()
- file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir)
+ working_directory = job_wrapper.working_directory
+ file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory)
rebuilt_command_line = file_stager.get_rewritten_command_line()
client.launch( rebuilt_command_line )
job_wrapper.set_runner( runner_url, job_wrapper.job_id )
@@ -304,7 +336,10 @@
return lwr_url
def get_client_from_wrapper(self, job_wrapper):
- return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id )
+ job_id = job_wrapper.job_id
+ if hasattr(job_wrapper, 'task_id'):
+ job_id = "%s_%s" % (job_id, job_wrapper.task_id)
+ return self.get_client( job_wrapper.get_job_runner_url(), job_id )
def get_client_from_state(self, job_state):
job_runner = job_state.runner_url
@@ -329,7 +364,7 @@
if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
output_files = self.get_output_files(job_wrapper)
for output_file in output_files:
- client.download_output(output_file)
+ client.download_output(output_file, working_directory=job_wrapper.working_directory)
client.clean()
log.debug('execution finished: %s' % command_line)
except Exception, exc:
@@ -386,8 +421,9 @@
def stop_job( self, job ):
#if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
- if job.external_output_metadata:
- pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
+ job_ext_output_metadata = job.get_external_output_metadata()
+ if job_ext_output_metadata:
+ pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
if pid in [ None, '' ]:
log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id )
return
https://bitbucket.org/galaxy/galaxy-central/commits/ed3d477d056f/
changeset: ed3d477d056f
user: jmchilton
date: 2013-01-11 17:53:05
summary: Extend LWR job runner to stage an input's extra_files_path (if present).
affected #: 1 file
diff -r 0a2976ff3b73d5130674c1e1019e3425b16d1fc0 -r ed3d477d056fd77a207e61ae7ee7ed9cc2c1e9b2 lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -72,6 +72,16 @@
for input_file in self.input_files:
input_upload_response = self.client.upload_input(input_file)
self.file_renames[input_file] = input_upload_response['path']
+ # TODO: Determine if this is object store safe and what needs to be
+ # done if it is not.
+ files_path = "%s_files" % input_file[0:-len(".dat")]
+ if os.path.exists(files_path):
+ for extra_file in os.listdir(files_path):
+ extra_file_path = os.path.join(files_path, extra_file)
+ relative_path = os.path.basename(files_path)
+ extra_file_relative_path = os.path.join(relative_path, extra_file)
+ response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
+ self.file_renames[extra_file_path] = response['path']
def __upload_working_directory_files(self):
# Task manager stages files into working directory, these need to be uploaded
@@ -167,17 +177,18 @@
response = self.__raw_execute(command, args, data)
return simplejson.loads(response.read())
- def __upload_file(self, action, path, contents = None):
+ def __upload_file(self, action, path, name=None, contents = None):
""" """
input = open(path, 'rb')
try:
mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ)
- return self.__upload_contents(action, path, mmapped_input)
+ return self.__upload_contents(action, path, mmapped_input, name)
finally:
input.close()
- def __upload_contents(self, action, path, contents):
- name = os.path.basename(path)
+ def __upload_contents(self, action, path, contents, name=None):
+ if not name:
+ name = os.path.basename(path)
args = {"job_id" : self.job_id, "name" : name}
return self.__raw_execute_and_parse(action, args, contents)
@@ -187,6 +198,9 @@
def upload_input(self, path):
return self.__upload_file("upload_input", path)
+ def upload_extra_input(self, path, relative_name):
+ return self.__upload_file("upload_extra_input", path, name=relative_name)
+
def upload_config_file(self, path, contents):
return self.__upload_contents("upload_config_file", path, contents)
https://bitbucket.org/galaxy/galaxy-central/commits/6fa106f2678d/
changeset: 6fa106f2678d
user: jmchilton
date: 2013-01-11 17:53:05
summary: Refactor much of the lwr client code out into its own module. This will make it easier to keep content insync with client code from lwr source.
affected #: 2 files
diff -r ed3d477d056fd77a207e61ae7ee7ed9cc2c1e9b2 -r 6fa106f2678da0e0af8d8c25e90928817254c6f5 lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -1,268 +1,18 @@
import logging
import subprocess
-from Queue import Queue
-import threading
-
-import re
from galaxy import model
-from galaxy.datatypes.data import nice_size
from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner
-import os, errno
+import errno
from time import sleep
+from lwr_client import FileStager, Client
+
log = logging.getLogger( __name__ )
__all__ = [ 'LwrJobRunner' ]
-import urllib
-import urllib2
-import httplib
-import mmap
-import tempfile
-import time
-
-import simplejson
-
-class FileStager(object):
-
- def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory):
- self.client = client
- self.command_line = command_line
- self.config_files = config_files
- self.input_files = input_files
- self.output_files = output_files
- self.tool_dir = os.path.abspath(tool_dir)
- self.working_directory = working_directory
-
- self.file_renames = {}
-
- job_config = client.setup()
-
- self.new_working_directory = job_config['working_directory']
- self.new_outputs_directory = job_config['outputs_directory']
- self.remote_path_separator = job_config['path_separator']
-
- self.__initialize_referenced_tool_files()
- self.__upload_tool_files()
- self.__upload_input_files()
- self.__upload_working_directory_files()
- self.__initialize_output_file_renames()
- self.__initialize_task_output_file_renames()
- self.__initialize_config_file_renames()
- self.__rewrite_and_upload_config_files()
- self.__rewrite_command_line()
-
- def __initialize_referenced_tool_files(self):
- pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep)
- referenced_tool_files = []
- referenced_tool_files += re.findall(pattern, self.command_line)
- if self.config_files != None:
- for config_file in self.config_files:
- referenced_tool_files += re.findall(pattern, self.__read(config_file))
- self.referenced_tool_files = referenced_tool_files
-
- def __upload_tool_files(self):
- for referenced_tool_file in self.referenced_tool_files:
- tool_upload_response = self.client.upload_tool_file(referenced_tool_file)
- self.file_renames[referenced_tool_file] = tool_upload_response['path']
-
- def __upload_input_files(self):
- for input_file in self.input_files:
- input_upload_response = self.client.upload_input(input_file)
- self.file_renames[input_file] = input_upload_response['path']
- # TODO: Determine if this is object store safe and what needs to be
- # done if it is not.
- files_path = "%s_files" % input_file[0:-len(".dat")]
- if os.path.exists(files_path):
- for extra_file in os.listdir(files_path):
- extra_file_path = os.path.join(files_path, extra_file)
- relative_path = os.path.basename(files_path)
- extra_file_relative_path = os.path.join(relative_path, extra_file)
- response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
- self.file_renames[extra_file_path] = response['path']
-
- def __upload_working_directory_files(self):
- # Task manager stages files into working directory, these need to be uploaded
- for working_directory_file in os.listdir(self.working_directory):
- path = os.path.join(self.working_directory, working_directory_file)
- working_file_response = self.client.upload_working_directory_file(path)
- self.file_renames[path] = working_file_response['path']
-
- def __initialize_output_file_renames(self):
- for output_file in self.output_files:
- self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
- self.remote_path_separator,
- os.path.basename(output_file))
-
- def __initialize_task_output_file_renames(self):
- for output_file in self.output_files:
- name = os.path.basename(output_file)
- self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory,
- self.remote_path_separator,
- name)
-
- def __initialize_config_file_renames(self):
- for config_file in self.config_files:
- self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
- self.remote_path_separator,
- os.path.basename(config_file))
-
- def __rewrite_paths(self, contents):
- new_contents = contents
- for local_path, remote_path in self.file_renames.iteritems():
- new_contents = new_contents.replace(local_path, remote_path)
- return new_contents
-
- def __rewrite_and_upload_config_files(self):
- for config_file in self.config_files:
- config_contents = self.__read(config_file)
- new_config_contents = self.__rewrite_paths(config_contents)
- self.client.upload_config_file(config_file, new_config_contents)
-
- def __rewrite_command_line(self):
- self.rewritten_command_line = self.__rewrite_paths(self.command_line)
-
- def get_rewritten_command_line(self):
- return self.rewritten_command_line
-
- def __read(self, path):
- input = open(path, "r")
- try:
- return input.read()
- finally:
- input.close()
-
-
-
-class Client(object):
- """
- """
- """
- """
- def __init__(self, remote_host, job_id, private_key=None):
- if not remote_host.endswith("/"):
- remote_host = remote_host + "/"
- ## If we don't have an explicit private_key defined, check for
- ## one embedded in the URL. A URL of the form
- ## https://moo@cow:8913 will try to contact https://cow:8913
- ## with a private key of moo
- private_key_format = "https?://(.*)@.*/?"
- private_key_match= re.match(private_key_format, remote_host)
- if not private_key and private_key_match:
- private_key = private_key_match.group(1)
- remote_host = remote_host.replace("%s@" % private_key, '', 1)
- self.remote_host = remote_host
- self.job_id = job_id
- self.private_key = private_key
-
- def url_open(self, request, data):
- return urllib2.urlopen(request, data)
-
- def __build_url(self, command, args):
- if self.private_key:
- args["private_key"] = self.private_key
- data = urllib.urlencode(args)
- url = self.remote_host + command + "?" + data
- return url
-
- def __raw_execute(self, command, args = {}, data = None):
- url = self.__build_url(command, args)
- request = urllib2.Request(url=url, data=data)
- response = self.url_open(request, data)
- return response
-
- def __raw_execute_and_parse(self, command, args = {}, data = None):
- response = self.__raw_execute(command, args, data)
- return simplejson.loads(response.read())
-
- def __upload_file(self, action, path, name=None, contents = None):
- """ """
- input = open(path, 'rb')
- try:
- mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ)
- return self.__upload_contents(action, path, mmapped_input, name)
- finally:
- input.close()
-
- def __upload_contents(self, action, path, contents, name=None):
- if not name:
- name = os.path.basename(path)
- args = {"job_id" : self.job_id, "name" : name}
- return self.__raw_execute_and_parse(action, args, contents)
-
- def upload_tool_file(self, path):
- return self.__upload_file("upload_tool_file", path)
-
- def upload_input(self, path):
- return self.__upload_file("upload_input", path)
-
- def upload_extra_input(self, path, relative_name):
- return self.__upload_file("upload_extra_input", path, name=relative_name)
-
- def upload_config_file(self, path, contents):
- return self.__upload_contents("upload_config_file", path, contents)
-
- def upload_working_directory_file(self, path):
- return self.__upload_file("upload_working_directory_file", path)
-
- def _get_output_type(self, name):
- return self.__raw_execute_and_parse('get_output_type', {'name': name,
- 'job_id': self.job_id})
-
- def download_output(self, path, working_directory):
- """ """
- name = os.path.basename(path)
- output_type = self._get_output_type(name)
- response = self.__raw_execute('download_output', {'name' : name,
- "job_id" : self.job_id,
- 'output_type': output_type})
- if output_type == 'direct':
- output = open(path, 'wb')
- elif output_type == 'task':
- output = open(os.path.join(working_directory, name), 'wb')
- else:
- raise Exception("No remote output found for dataset with path %s" % path)
- try:
- while True:
- buffer = response.read(1024)
- if buffer == "":
- break
- output.write(buffer)
- finally:
- output.close()
-
- def launch(self, command_line):
- """ """
- return self.__raw_execute("launch", {"command_line" : command_line,
- "job_id" : self.job_id})
-
- def kill(self):
- return self.__raw_execute("kill", {"job_id" : self.job_id})
-
- def wait(self):
- """ """
- while True:
- complete = self.check_complete()
- if complete:
- return check_complete_response
- time.sleep(1)
-
- def raw_check_complete(self):
- check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
- return check_complete_response
-
- def check_complete(self):
- return self.raw_check_complete()["complete"] == "true"
-
- def clean(self):
- self.__raw_execute("clean", { "job_id" : self.job_id })
-
- def setup(self):
- return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id })
-
-
class LwrJobRunner( ClusterJobRunner ):
"""
diff -r ed3d477d056fd77a207e61ae7ee7ed9cc2c1e9b2 -r 6fa106f2678da0e0af8d8c25e90928817254c6f5 lib/galaxy/jobs/runners/lwr_client/__init__.py
--- /dev/null
+++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py
@@ -0,0 +1,246 @@
+import mmap
+import os
+import re
+import time
+import urllib
+import urllib2
+
+import simplejson
+
+
+class FileStager(object):
+
+ def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory):
+ self.client = client
+ self.command_line = command_line
+ self.config_files = config_files
+ self.input_files = input_files
+ self.output_files = output_files
+ self.tool_dir = os.path.abspath(tool_dir)
+ self.working_directory = working_directory
+
+ self.file_renames = {}
+
+ job_config = client.setup()
+
+ self.new_working_directory = job_config['working_directory']
+ self.new_outputs_directory = job_config['outputs_directory']
+ self.remote_path_separator = job_config['path_separator']
+
+ self.__initialize_referenced_tool_files()
+ self.__upload_tool_files()
+ self.__upload_input_files()
+ self.__upload_working_directory_files()
+ self.__initialize_output_file_renames()
+ self.__initialize_task_output_file_renames()
+ self.__initialize_config_file_renames()
+ self.__rewrite_and_upload_config_files()
+ self.__rewrite_command_line()
+
+ def __initialize_referenced_tool_files(self):
+ pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep)
+ referenced_tool_files = []
+ referenced_tool_files += re.findall(pattern, self.command_line)
+ if self.config_files != None:
+ for config_file in self.config_files:
+ referenced_tool_files += re.findall(pattern, self.__read(config_file))
+ self.referenced_tool_files = referenced_tool_files
+
+ def __upload_tool_files(self):
+ for referenced_tool_file in self.referenced_tool_files:
+ tool_upload_response = self.client.upload_tool_file(referenced_tool_file)
+ self.file_renames[referenced_tool_file] = tool_upload_response['path']
+
+ def __upload_input_files(self):
+ for input_file in self.input_files:
+ input_upload_response = self.client.upload_input(input_file)
+ self.file_renames[input_file] = input_upload_response['path']
+ # TODO: Determine if this is object store safe and what needs to be
+ # done if it is not.
+ files_path = "%s_files" % input_file[0:-len(".dat")]
+ if os.path.exists(files_path):
+ for extra_file in os.listdir(files_path):
+ extra_file_path = os.path.join(files_path, extra_file)
+ relative_path = os.path.basename(files_path)
+ extra_file_relative_path = os.path.join(relative_path, extra_file)
+ response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
+ self.file_renames[extra_file_path] = response['path']
+
+ def __upload_working_directory_files(self):
+ # Task manager stages files into working directory, these need to be uploaded
+ for working_directory_file in os.listdir(self.working_directory):
+ path = os.path.join(self.working_directory, working_directory_file)
+ working_file_response = self.client.upload_working_directory_file(path)
+ self.file_renames[path] = working_file_response['path']
+
+ def __initialize_output_file_renames(self):
+ for output_file in self.output_files:
+ self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
+ self.remote_path_separator,
+ os.path.basename(output_file))
+
+ def __initialize_task_output_file_renames(self):
+ for output_file in self.output_files:
+ name = os.path.basename(output_file)
+ self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory,
+ self.remote_path_separator,
+ name)
+
+ def __initialize_config_file_renames(self):
+ for config_file in self.config_files:
+ self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
+ self.remote_path_separator,
+ os.path.basename(config_file))
+
+ def __rewrite_paths(self, contents):
+ new_contents = contents
+ for local_path, remote_path in self.file_renames.iteritems():
+ new_contents = new_contents.replace(local_path, remote_path)
+ return new_contents
+
+ def __rewrite_and_upload_config_files(self):
+ for config_file in self.config_files:
+ config_contents = self.__read(config_file)
+ new_config_contents = self.__rewrite_paths(config_contents)
+ self.client.upload_config_file(config_file, new_config_contents)
+
+ def __rewrite_command_line(self):
+ self.rewritten_command_line = self.__rewrite_paths(self.command_line)
+
+ def get_rewritten_command_line(self):
+ return self.rewritten_command_line
+
+ def __read(self, path):
+ input = open(path, "r")
+ try:
+ return input.read()
+ finally:
+ input.close()
+
+
+
+class Client(object):
+ """
+ """
+ """
+ """
+ def __init__(self, remote_host, job_id, private_key=None):
+ if not remote_host.endswith("/"):
+ remote_host = remote_host + "/"
+ ## If we don't have an explicit private_key defined, check for
+ ## one embedded in the URL. A URL of the form
+ ## https://moo@cow:8913 will try to contact https://cow:8913
+ ## with a private key of moo
+ private_key_format = "https?://(.*)@.*/?"
+ private_key_match= re.match(private_key_format, remote_host)
+ if not private_key and private_key_match:
+ private_key = private_key_match.group(1)
+ remote_host = remote_host.replace("%s@" % private_key, '', 1)
+ self.remote_host = remote_host
+ self.job_id = job_id
+ self.private_key = private_key
+
+ def url_open(self, request, data):
+ return urllib2.urlopen(request, data)
+
+ def __build_url(self, command, args):
+ if self.private_key:
+ args["private_key"] = self.private_key
+ data = urllib.urlencode(args)
+ url = self.remote_host + command + "?" + data
+ return url
+
+ def __raw_execute(self, command, args = {}, data = None):
+ url = self.__build_url(command, args)
+ request = urllib2.Request(url=url, data=data)
+ response = self.url_open(request, data)
+ return response
+
+ def __raw_execute_and_parse(self, command, args = {}, data = None):
+ response = self.__raw_execute(command, args, data)
+ return simplejson.loads(response.read())
+
+ def __upload_file(self, action, path, name=None, contents = None):
+ """ """
+ input = open(path, 'rb')
+ try:
+ mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ)
+ return self.__upload_contents(action, path, mmapped_input, name)
+ finally:
+ input.close()
+
+ def __upload_contents(self, action, path, contents, name=None):
+ if not name:
+ name = os.path.basename(path)
+ args = {"job_id" : self.job_id, "name" : name}
+ return self.__raw_execute_and_parse(action, args, contents)
+
+ def upload_tool_file(self, path):
+ return self.__upload_file("upload_tool_file", path)
+
+ def upload_input(self, path):
+ return self.__upload_file("upload_input", path)
+
+ def upload_extra_input(self, path, relative_name):
+ return self.__upload_file("upload_extra_input", path, name=relative_name)
+
+ def upload_config_file(self, path, contents):
+ return self.__upload_contents("upload_config_file", path, contents)
+
+ def upload_working_directory_file(self, path):
+ return self.__upload_file("upload_working_directory_file", path)
+
+ def _get_output_type(self, name):
+ return self.__raw_execute_and_parse('get_output_type', {'name': name,
+ 'job_id': self.job_id})
+
+ def download_output(self, path, working_directory):
+ """ """
+ name = os.path.basename(path)
+ output_type = self._get_output_type(name)
+ response = self.__raw_execute('download_output', {'name' : name,
+ "job_id" : self.job_id,
+ 'output_type': output_type})
+ if output_type == 'direct':
+ output = open(path, 'wb')
+ elif output_type == 'task':
+ output = open(os.path.join(working_directory, name), 'wb')
+ else:
+ raise Exception("No remote output found for dataset with path %s" % path)
+ try:
+ while True:
+ buffer = response.read(1024)
+ if buffer == "":
+ break
+ output.write(buffer)
+ finally:
+ output.close()
+
+ def launch(self, command_line):
+ """ """
+ return self.__raw_execute("launch", {"command_line" : command_line,
+ "job_id" : self.job_id})
+
+ def kill(self):
+ return self.__raw_execute("kill", {"job_id" : self.job_id})
+
+ def wait(self):
+ """ """
+ while True:
+ complete = self.check_complete()
+ if complete:
+ return check_complete_response
+ time.sleep(1)
+
+ def raw_check_complete(self):
+ check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
+ return check_complete_response
+
+ def check_complete(self):
+ return self.raw_check_complete()["complete"] == "true"
+
+ def clean(self):
+ self.__raw_execute("clean", { "job_id" : self.job_id })
+
+ def setup(self):
+ return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id })
https://bitbucket.org/galaxy/galaxy-central/commits/b109c7db9347/
changeset: b109c7db9347
user: jmchilton
date: 2013-01-11 17:53:05
summary: Documentation and PEP8 fixes for lwr client code.
affected #: 1 file
diff -r 6fa106f2678da0e0af8d8c25e90928817254c6f5 -r b109c7db9347271b35d426d3148120ce359977e6 lib/galaxy/jobs/runners/lwr_client/__init__.py
--- a/lib/galaxy/jobs/runners/lwr_client/__init__.py
+++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py
@@ -1,3 +1,10 @@
+"""
+lwr_client
+==========
+
+This module contains logic for interfacing with an external LWR server.
+
+"""
import mmap
import os
import re
@@ -9,8 +16,32 @@
class FileStager(object):
-
+ """
+ Objects of the FileStager class interact with an LWR client object to
+ stage the files required to run jobs on a remote LWR server.
+
+ **Parameters**
+
+ client : Client
+ LWR client object.
+ command_line : str
+ The local command line to execute, this will be rewritten for the remote server.
+ config_files : list
+ List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server.
+ input_files : list
+ List of input files used by job. These will be transferred and references rewritten.
+ output_files : list
+ List of output_files produced by job.
+ tool_dir : str
+ Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server).
+ working_directory : str
+ Local path created by Galaxy for running this job.
+
+ """
+
def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory):
+ """
+ """
self.client = client
self.command_line = command_line
self.config_files = config_files
@@ -67,7 +98,8 @@
self.file_renames[extra_file_path] = response['path']
def __upload_working_directory_files(self):
- # Task manager stages files into working directory, these need to be uploaded
+ # Task manager stages files into working directory, these need to be
+ # uploaded if present.
for working_directory_file in os.listdir(self.working_directory):
path = os.path.join(self.working_directory, working_directory_file)
working_file_response = self.client.upload_working_directory_file(path)
@@ -75,8 +107,8 @@
def __initialize_output_file_renames(self):
for output_file in self.output_files:
- self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
- self.remote_path_separator,
+ self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
+ self.remote_path_separator,
os.path.basename(output_file))
def __initialize_task_output_file_renames(self):
@@ -108,6 +140,10 @@
self.rewritten_command_line = self.__rewrite_paths(self.command_line)
def get_rewritten_command_line(self):
+ """
+ Returns the rewritten version of the command line to execute suitable
+ for remote host.
+ """
return self.rewritten_command_line
def __read(self, path):
@@ -117,13 +153,21 @@
finally:
input.close()
-
-
+
class Client(object):
- """
"""
- """
+ Objects of this client class perform low-level communication with a remote LWR server.
+
+ **Parameters**
+
+ remote_host : str
+ Remote URL of the LWR server.
+ job_id : str
+ Galaxy job/task id.
+ private_key : str (optional)
+ Secret key the remote LWR server is configured with.
"""
+
def __init__(self, remote_host, job_id, private_key=None):
if not remote_host.endswith("/"):
remote_host = remote_host + "/"
@@ -132,7 +176,7 @@
## https://moo@cow:8913 will try to contact https://cow:8913
## with a private key of moo
private_key_format = "https?://(.*)@.*/?"
- private_key_match= re.match(private_key_format, remote_host)
+ private_key_match = re.match(private_key_format, remote_host)
if not private_key and private_key_match:
private_key = private_key_match.group(1)
remote_host = remote_host.replace("%s@" % private_key, '', 1)
@@ -140,9 +184,9 @@
self.job_id = job_id
self.private_key = private_key
- def url_open(self, request, data):
+ def _url_open(self, request, data):
return urllib2.urlopen(request, data)
-
+
def __build_url(self, command, args):
if self.private_key:
args["private_key"] = self.private_key
@@ -150,21 +194,20 @@
url = self.remote_host + command + "?" + data
return url
- def __raw_execute(self, command, args = {}, data = None):
+ def __raw_execute(self, command, args={}, data=None):
url = self.__build_url(command, args)
request = urllib2.Request(url=url, data=data)
- response = self.url_open(request, data)
+ response = self._url_open(request, data)
return response
- def __raw_execute_and_parse(self, command, args = {}, data = None):
+ def __raw_execute_and_parse(self, command, args={}, data=None):
response = self.__raw_execute(command, args, data)
return simplejson.loads(response.read())
- def __upload_file(self, action, path, name=None, contents = None):
- """ """
+ def __upload_file(self, action, path, name=None, contents=None):
input = open(path, 'rb')
try:
- mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ)
+ mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ)
return self.__upload_contents(action, path, mmapped_input, name)
finally:
input.close()
@@ -172,39 +215,93 @@
def __upload_contents(self, action, path, contents, name=None):
if not name:
name = os.path.basename(path)
- args = {"job_id" : self.job_id, "name" : name}
+ args = {"job_id": self.job_id, "name": name}
return self.__raw_execute_and_parse(action, args, contents)
-
+
def upload_tool_file(self, path):
+ """
+ Upload a tool related file (e.g. wrapper) required to run job.
+
+ **Parameters**
+
+ path : str
+ Local path tool.
+ """
return self.__upload_file("upload_tool_file", path)
def upload_input(self, path):
+ """
+ Upload input dataset to remote server.
+
+ **Parameters**
+
+ path : str
+ Local path of input dataset.
+ """
return self.__upload_file("upload_input", path)
def upload_extra_input(self, path, relative_name):
+ """
+ Upload extra input file to remote server.
+
+ **Parameters**
+
+ path : str
+ Extra files path of input dataset corresponding to this input.
+ relative_name : str
+ Relative path of extra file to upload relative to inputs extra files path.
+ """
return self.__upload_file("upload_extra_input", path, name=relative_name)
def upload_config_file(self, path, contents):
+ """
+ Upload a job's config file to the remote server.
+
+ **Parameters**
+
+ path : str
+ Local path to the original config file.
+ contents : str
+ Rewritten contents of the config file to upload.
+ """
return self.__upload_contents("upload_config_file", path, contents)
def upload_working_directory_file(self, path):
+ """
+ Upload the supplied file (path) from a job's working directory
+ to remote server.
+
+ **Parameters**
+
+ path : str
+ Path to file to upload.
+ """
return self.__upload_file("upload_working_directory_file", path)
def _get_output_type(self, name):
- return self.__raw_execute_and_parse('get_output_type', {'name': name,
- 'job_id': self.job_id})
+ return self.__raw_execute_and_parse("get_output_type", {"name": name,
+ "job_id": self.job_id})
def download_output(self, path, working_directory):
- """ """
+ """
+ Download an output dataset from the remote server.
+
+ **Parameters**
+
+ path : str
+ Local path of the dataset.
+ working_directory : str
+ Local working_directory for the job.
+ """
name = os.path.basename(path)
output_type = self._get_output_type(name)
- response = self.__raw_execute('download_output', {'name' : name,
- "job_id" : self.job_id,
- 'output_type': output_type})
- if output_type == 'direct':
- output = open(path, 'wb')
- elif output_type == 'task':
- output = open(os.path.join(working_directory, name), 'wb')
+ response = self.__raw_execute("download_output", {"name": name,
+ "job_id": self.job_id,
+ "output_type": output_type})
+ if output_type == "direct":
+ output = open(path, "wb")
+ elif output_type == "task":
+ output = open(os.path.join(working_directory, name), "wb")
else:
raise Exception("No remote output found for dataset with path %s" % path)
try:
@@ -215,32 +312,57 @@
output.write(buffer)
finally:
output.close()
-
+
def launch(self, command_line):
- """ """
- return self.__raw_execute("launch", {"command_line" : command_line,
- "job_id" : self.job_id})
+ """
+ Run or queue up the execution of the supplied
+ `command_line` on the remote server.
+
+ **Parameters**
+
+ command_line : str
+ Command to execute.
+ """
+ return self.__raw_execute("launch", {"command_line": command_line,
+ "job_id": self.job_id})
def kill(self):
- return self.__raw_execute("kill", {"job_id" : self.job_id})
-
+ """
+ Cancel remote job, either removing from the queue or killing it.
+ """
+ return self.__raw_execute("kill", {"job_id": self.job_id})
+
def wait(self):
- """ """
+ """
+ Wait for job to finish.
+ """
while True:
- complete = self.check_complete()
- if complete:
- return check_complete_response
+ complete_response = self.raw_check_complete()
+ if complete_response["complete"] == "true":
+ return complete_response
time.sleep(1)
def raw_check_complete(self):
- check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
+ """
+ Get check_complete response from the remote server.
+ """
+ check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id": self.job_id})
return check_complete_response
def check_complete(self):
+ """
+ Return boolean indicating whether the job is complete.
+ """
return self.raw_check_complete()["complete"] == "true"
def clean(self):
- self.__raw_execute("clean", { "job_id" : self.job_id })
+ """
+ Cleanup the remote job.
+ """
+ self.__raw_execute("clean", {"job_id": self.job_id})
def setup(self):
- return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id })
+ """
+ Setup remote LWR server to run this job.
+ """
+ return self.__raw_execute_and_parse("setup", {"job_id": self.job_id})
https://bitbucket.org/galaxy/galaxy-central/commits/856d9508b97f/
changeset: 856d9508b97f
user: jmchilton
date: 2013-01-11 17:53:05
summary: Extend lwr to allow execution of jobs with outputs specified using 'from_work_dir'.
affected #: 3 files
diff -r b109c7db9347271b35d426d3148120ce359977e6 -r 856d9508b97fe655e9652750b59fbb787c8f632a lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -8,7 +8,7 @@
log = logging.getLogger( __name__ )
class BaseJobRunner( object ):
- def build_command_line( self, job_wrapper, include_metadata=False ):
+ def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ):
"""
Compose the sequence of commands necessary to execute a job. This will
currently include:
@@ -19,18 +19,6 @@
- commands to set metadata (if include_metadata is True)
"""
- def in_directory( file, directory ):
- """
- Return true, if the common prefix of both is equal to directory
- e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
- """
-
- # Make both absolute.
- directory = os.path.abspath( directory )
- file = os.path.abspath( file )
-
- return os.path.commonprefix( [ file, directory ] ) == directory
-
commands = job_wrapper.get_command_line()
# All job runners currently handle this case which should never
# occur
@@ -47,6 +35,41 @@
commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] )
# -- Append commands to copy job outputs based on from_work_dir attribute. --
+ if include_work_dir_outputs:
+ work_dir_outputs = self.get_work_dir_outputs( job_wrapper )
+ if work_dir_outputs:
+ commands += "; " + "; ".join( [ "cp %s %s" % ( source_file, destination ) for ( source_file, destination ) in work_dir_outputs ] )
+
+ # Append metadata setting commands, we don't want to overwrite metadata
+ # that was copied over in init_meta(), as per established behavior
+ if include_metadata and self.app.config.set_metadata_externally:
+ commands += "; cd %s; " % os.path.abspath( os.getcwd() )
+ commands += job_wrapper.setup_external_metadata(
+ exec_dir = os.path.abspath( os.getcwd() ),
+ tmp_dir = job_wrapper.working_directory,
+ dataset_files_path = self.app.model.Dataset.file_path,
+ output_fnames = job_wrapper.get_output_fnames(),
+ set_extension = False,
+ kwds = { 'overwrite' : False } )
+ return commands
+
+ def get_work_dir_outputs( self, job_wrapper ):
+ """
+ Returns list of pairs (source_file, destination) describing path
+ to work_dir output file and ultimate destination.
+ """
+
+ def in_directory( file, directory ):
+ """
+ Return true, if the common prefix of both is equal to directory
+ e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
+ """
+
+ # Make both absolute.
+ directory = os.path.abspath( directory )
+ file = os.path.abspath( file )
+
+ return os.path.commonprefix( [ file, directory ] ) == directory
# Set up dict of dataset id --> output path; output path can be real or
# false depending on outputs_to_working_directory
@@ -57,6 +80,7 @@
path = dataset_path.false_path
output_paths[ dataset_path.dataset_id ] = path
+ output_pairs = []
# Walk job's output associations to find and use from_work_dir attributes.
job = job_wrapper.get_job()
job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None )
@@ -72,30 +96,14 @@
source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir )
destination = output_paths[ dataset.dataset_id ]
if in_directory( source_file, job_wrapper.working_directory ):
- try:
- commands += "; cp %s %s" % ( source_file, destination )
- log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) )
- except ( IOError, OSError ):
- log.debug( "Could not copy %s to %s as directed by from_work_dir" % ( source_file, destination ) )
+ output_pairs.append( ( source_file, destination ) )
+ log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) )
else:
# Security violation.
log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) )
+ return output_pairs
-
- # Append metadata setting commands, we don't want to overwrite metadata
- # that was copied over in init_meta(), as per established behavior
- if include_metadata and self.app.config.set_metadata_externally:
- commands += "; cd %s; " % os.path.abspath( os.getcwd() )
- commands += job_wrapper.setup_external_metadata(
- exec_dir = os.path.abspath( os.getcwd() ),
- tmp_dir = job_wrapper.working_directory,
- dataset_files_path = self.app.model.Dataset.file_path,
- output_fnames = job_wrapper.get_output_fnames(),
- set_extension = False,
- kwds = { 'overwrite' : False } )
- return commands
-
class ClusterJobState( object ):
"""
Encapsulate the state of a cluster job, this should be subclassed as
diff -r b109c7db9347271b35d426d3148120ce359977e6 -r 856d9508b97fe655e9652750b59fbb787c8f632a lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -54,7 +54,7 @@
if 0 != os.system(cmd):
raise Exception('Error running file staging command: %s' % cmd)
job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line
- command_line = self.build_command_line( job_wrapper, include_metadata=False )
+ command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False )
except:
job_wrapper.fail( "failure preparing job", exception=True )
log.exception("failure running job %d" % job_wrapper.job_id)
@@ -126,7 +126,12 @@
stderr = run_results['stderr']
if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
+ work_dir_outputs = self.get_work_dir_outputs(job_wrapper)
output_files = self.get_output_files(job_wrapper)
+ for source_file, output_file in work_dir_outputs:
+ client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file)
+ # Remove from full output_files list so don't try to download directly.
+ output_files.remove(output_file)
for output_file in output_files:
client.download_output(output_file, working_directory=job_wrapper.working_directory)
client.clean()
diff -r b109c7db9347271b35d426d3148120ce359977e6 -r 856d9508b97fe655e9652750b59fbb787c8f632a lib/galaxy/jobs/runners/lwr_client/__init__.py
--- a/lib/galaxy/jobs/runners/lwr_client/__init__.py
+++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py
@@ -282,6 +282,24 @@
return self.__raw_execute_and_parse("get_output_type", {"name": name,
"job_id": self.job_id})
+ def download_work_dir_output(self, source, working_directory, output_path):
+ """
+ Download an output dataset specified with from_work_dir from the
+ remote server.
+
+ **Parameters**
+
+ source : str
+ Path in job's working_directory to find output in.
+ working_directory : str
+ Local working_directory for the job.
+ output_path : str
+ Full path to output dataset.
+ """
+ output = open(output_path, "wb")
+ name = os.path.basename(source)
+ self.__raw_download_output(name, self.job_id, "work_dir", output)
+
def download_output(self, path, working_directory):
"""
Download an output dataset from the remote server.
@@ -295,23 +313,26 @@
"""
name = os.path.basename(path)
output_type = self._get_output_type(name)
- response = self.__raw_execute("download_output", {"name": name,
- "job_id": self.job_id,
- "output_type": output_type})
if output_type == "direct":
output = open(path, "wb")
elif output_type == "task":
output = open(os.path.join(working_directory, name), "wb")
else:
raise Exception("No remote output found for dataset with path %s" % path)
+ self.__raw_download_output(name, self.job_id, output_type, output)
+
+ def __raw_download_output(self, name, job_id, output_type, output_file):
+ response = self.__raw_execute("download_output", {"name": name,
+ "job_id": self.job_id,
+ "output_type": output_type})
try:
while True:
buffer = response.read(1024)
if buffer == "":
break
- output.write(buffer)
+ output_file.write(buffer)
finally:
- output.close()
+ output_file.close()
def launch(self, command_line):
"""
https://bitbucket.org/galaxy/galaxy-central/commits/b749bb68c1f7/
changeset: b749bb68c1f7
user: jmchilton
date: 2013-01-11 17:53:05
summary: Implement optimization attempting to not transfer unneeded inputs to remote LWR server. More general refactoring and testing of lwr client code.
affected #: 1 file
diff -r 856d9508b97fe655e9652750b59fbb787c8f632a -r b749bb68c1f7a71c775e7486152dcfe93813c668 lib/galaxy/jobs/runners/lwr_client/__init__.py
--- a/lib/galaxy/jobs/runners/lwr_client/__init__.py
+++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py
@@ -15,6 +15,99 @@
import simplejson
+class JobInputs(object):
+ """
+ Abstractions over dynamic inputs created for a given job (namely the command to
+ execute and created configfiles).
+
+ **Parameters**
+
+ command_line : str
+ Local command to execute for this job. (To be rewritten.)
+ config_files : str
+ Config files created for this job. (To be rewritten.)
+
+
+ >>> import tempfile
+ >>> tf = tempfile.NamedTemporaryFile()
+ >>> def setup_inputs(tf):
+ ... open(tf.name, "w").write("world /path/to/input the rest")
+ ... inputs = JobInputs("hello /path/to/input", [tf.name])
+ ... return inputs
+ >>> inputs = setup_inputs(tf)
+ >>> inputs.rewrite_paths("/path/to/input", 'C:\\input')
+ >>> inputs.rewritten_command_line
+ 'hello C:\\\\input'
+ >>> inputs.rewritten_config_files[tf.name]
+ 'world C:\\\\input the rest'
+ >>> tf.close()
+ >>> tf = tempfile.NamedTemporaryFile()
+ >>> inputs = setup_inputs(tf)
+ >>> inputs.find_referenced_subfiles('/path/to')
+ ['/path/to/input']
+ >>> inputs.path_referenced('/path/to')
+ True
+ >>> inputs.path_referenced('/path/to/input')
+ True
+ >>> inputs.path_referenced('/path/to/notinput')
+ False
+ >>> tf.close()
+ """
+
+ def __init__(self, command_line, config_files):
+ self.rewritten_command_line = command_line
+ self.rewritten_config_files = {}
+ for config_file in config_files or []:
+ config_contents = _read(config_file)
+ self.rewritten_config_files[config_file] = config_contents
+
+ def find_referenced_subfiles(self, directory):
+ """
+ Return list of files below specified `directory` in job inputs. Could
+ use more sophisticated logic (match quotes to handle spaces, handle
+ subdirectories, etc...).
+
+ **Parameters**
+
+ directory : str
+ Full path to directory to search.
+
+ """
+ pattern = r"(%s%s\S+)" % (directory, os.sep)
+ referenced_files = set()
+ for input_contents in self.__items():
+ referenced_files.update(re.findall(pattern, input_contents))
+ return list(referenced_files)
+
+ def path_referenced(self, path):
+ pattern = r"%s" % path
+ found = False
+ for input_contents in self.__items():
+ if re.findall(pattern, input_contents):
+ found = True
+ break
+ return found
+
+ def rewrite_paths(self, local_path, remote_path):
+ """
+ Rewrite references to `local_path` with `remote_path` in job inputs.
+ """
+ self.__rewrite_command_line(local_path, remote_path)
+ self.__rewrite_config_files(local_path, remote_path)
+
+ def __rewrite_command_line(self, local_path, remote_path):
+ self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path)
+
+ def __rewrite_config_files(self, local_path, remote_path):
+ for config_file, rewritten_contents in self.rewritten_config_files.iteritems():
+ self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path)
+
+ def __items(self):
+ items = [self.rewritten_command_line]
+ items.extend(self.rewritten_config_files.values())
+ return items
+
+
class FileStager(object):
"""
Objects of the FileStager class interact with an LWR client object to
@@ -50,6 +143,10 @@
self.tool_dir = os.path.abspath(tool_dir)
self.working_directory = working_directory
+ # Setup job inputs, these will need to be rewritten before
+ # shipping off to remote LWR server.
+ self.job_inputs = JobInputs(self.command_line, self.config_files)
+
self.file_renames = {}
job_config = client.setup()
@@ -65,17 +162,11 @@
self.__initialize_output_file_renames()
self.__initialize_task_output_file_renames()
self.__initialize_config_file_renames()
- self.__rewrite_and_upload_config_files()
- self.__rewrite_command_line()
+ self.__handle_rewrites()
+ self.__upload_rewritten_config_files()
def __initialize_referenced_tool_files(self):
- pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep)
- referenced_tool_files = []
- referenced_tool_files += re.findall(pattern, self.command_line)
- if self.config_files != None:
- for config_file in self.config_files:
- referenced_tool_files += re.findall(pattern, self.__read(config_file))
- self.referenced_tool_files = referenced_tool_files
+ self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir)
def __upload_tool_files(self):
for referenced_tool_file in self.referenced_tool_files:
@@ -84,18 +175,25 @@
def __upload_input_files(self):
for input_file in self.input_files:
+ self.__upload_input_file(input_file)
+ self.__upload_input_extra_files(input_file)
+
+ def __upload_input_file(self, input_file):
+ if self.job_inputs.path_referenced(input_file):
input_upload_response = self.client.upload_input(input_file)
self.file_renames[input_file] = input_upload_response['path']
- # TODO: Determine if this is object store safe and what needs to be
- # done if it is not.
- files_path = "%s_files" % input_file[0:-len(".dat")]
- if os.path.exists(files_path):
- for extra_file in os.listdir(files_path):
- extra_file_path = os.path.join(files_path, extra_file)
- relative_path = os.path.basename(files_path)
- extra_file_relative_path = os.path.join(relative_path, extra_file)
- response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
- self.file_renames[extra_file_path] = response['path']
+
+ def __upload_input_extra_files(self, input_file):
+ # TODO: Determine if this is object store safe and what needs to be
+ # done if it is not.
+ files_path = "%s_files" % input_file[0:-len(".dat")]
+ if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path):
+ for extra_file in os.listdir(files_path):
+ extra_file_path = os.path.join(files_path, extra_file)
+ relative_path = os.path.basename(files_path)
+ extra_file_relative_path = os.path.join(relative_path, extra_file)
+ response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
+ self.file_renames[extra_file_path] = response['path']
def __upload_working_directory_files(self):
# Task manager stages files into working directory, these need to be
@@ -130,28 +228,20 @@
new_contents = new_contents.replace(local_path, remote_path)
return new_contents
- def __rewrite_and_upload_config_files(self):
- for config_file in self.config_files:
- config_contents = self.__read(config_file)
- new_config_contents = self.__rewrite_paths(config_contents)
+ def __handle_rewrites(self):
+ for local_path, remote_path in self.file_renames.iteritems():
+ self.job_inputs.rewrite_paths(local_path, remote_path)
+
+ def __upload_rewritten_config_files(self):
+ for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems():
self.client.upload_config_file(config_file, new_config_contents)
- def __rewrite_command_line(self):
- self.rewritten_command_line = self.__rewrite_paths(self.command_line)
-
def get_rewritten_command_line(self):
"""
Returns the rewritten version of the command line to execute suitable
for remote host.
"""
- return self.rewritten_command_line
-
- def __read(self, path):
- input = open(path, "r")
- try:
- return input.read()
- finally:
- input.close()
+ return self.job_inputs.rewritten_command_line
class Client(object):
@@ -387,3 +477,15 @@
Setup remote LWR server to run this job.
"""
return self.__raw_execute_and_parse("setup", {"job_id": self.job_id})
+
+
+def _read(path):
+ """
+ Utility method to quickly read small files (config files and tool
+ wrappers) into memory as strings.
+ """
+ input = open(path, "r")
+ try:
+ return input.read()
+ finally:
+ input.close()
https://bitbucket.org/galaxy/galaxy-central/commits/2631faf42a8f/
changeset: 2631faf42a8f
user: jmchilton
date: 2013-01-11 17:53:05
summary: Rework job_id handling in LWR runner allowing remote LWR server to assign a job_id during setup, save this will serve as the job's external id. This change allows multiple Galaxy instances to submit jobs to the same LWR backend server and will prove useful when implementing additional backends (pbs/drmaa/etc...) for the LWR server.
affected #: 2 files
diff -r b749bb68c1f7a71c775e7486152dcfe93813c668 -r 2631faf42a8fe4f3b3f9b556809da07b69ee845a lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -73,8 +73,9 @@
working_directory = job_wrapper.working_directory
file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory)
rebuilt_command_line = file_stager.get_rewritten_command_line()
+ job_id = file_stager.job_id
client.launch( rebuilt_command_line )
- job_wrapper.set_runner( runner_url, job_wrapper.job_id )
+ job_wrapper.set_runner( runner_url, job_id )
job_wrapper.change_state( model.Job.states.RUNNING )
except Exception, exc:
@@ -84,7 +85,7 @@
lwr_job_state = ClusterJobState()
lwr_job_state.job_wrapper = job_wrapper
- lwr_job_state.job_id = job_wrapper.job_id
+ lwr_job_state.job_id = job_id
lwr_job_state.old_state = True
lwr_job_state.running = True
lwr_job_state.runner_url = runner_url
diff -r b749bb68c1f7a71c775e7486152dcfe93813c668 -r 2631faf42a8fe4f3b3f9b556809da07b69ee845a lib/galaxy/jobs/runners/lwr_client/__init__.py
--- a/lib/galaxy/jobs/runners/lwr_client/__init__.py
+++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py
@@ -149,12 +149,7 @@
self.file_renames = {}
- job_config = client.setup()
-
- self.new_working_directory = job_config['working_directory']
- self.new_outputs_directory = job_config['outputs_directory']
- self.remote_path_separator = job_config['path_separator']
-
+ self.__handle_setup()
self.__initialize_referenced_tool_files()
self.__upload_tool_files()
self.__upload_input_files()
@@ -165,6 +160,21 @@
self.__handle_rewrites()
self.__upload_rewritten_config_files()
+ def __handle_setup(self):
+ job_config = self.client.setup()
+
+ self.new_working_directory = job_config['working_directory']
+ self.new_outputs_directory = job_config['outputs_directory']
+ self.remote_path_separator = job_config['path_separator']
+ # If remote LWR server assigned job id, use that otherwise
+ # just use local job_id assigned.
+ galaxy_job_id = self.client.job_id
+ self.job_id = job_config.get('job_id', galaxy_job_id)
+ if self.job_id != galaxy_job_id:
+ # Remote LWR server assigned an id different than the
+ # Galaxy job id, update client to reflect this.
+ self.client.job_id = self.job_id
+
def __initialize_referenced_tool_files(self):
self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir)
https://bitbucket.org/galaxy/galaxy-central/commits/21a91117bcc8/
changeset: 21a91117bcc8
user: jmchilton
date: 2013-01-11 18:02:43
summary: Re-introduce LWR shutdown fixes from Dave B..
affected #: 1 file
diff -r 2631faf42a8fe4f3b3f9b556809da07b69ee845a -r 21a91117bcc80e55aa5a196caa8b801180ed9480 lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -2,7 +2,7 @@
import subprocess
from galaxy import model
-from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner
+from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner, STOP_SIGNAL
import errno
from time import sleep
@@ -174,8 +174,9 @@
def shutdown( self ):
"""Attempts to gracefully shut down the worker threads"""
log.info( "sending stop signal to worker threads" )
- for i in range( len( self.threads ) ):
- self.queue.put( self.STOP_SIGNAL )
+ self.monitor_queue.put( STOP_SIGNAL )
+ for i in range( len( self.work_threads ) ):
+ self.work_queue.put( ( STOP_SIGNAL, None ) )
log.info( "local job runner stopped" )
def check_pid( self, pid ):
https://bitbucket.org/galaxy/galaxy-central/commits/e5ec6ed8f033/
changeset: e5ec6ed8f033
user: natefoo
date: 2013-01-11 18:25:58
summary: Merged in jmchilton/galaxy-central-lwr (pull request #106: Ongoing LWR Enhancements)
affected #: 3 files
diff -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 lib/galaxy/jobs/runners/__init__.py
--- a/lib/galaxy/jobs/runners/__init__.py
+++ b/lib/galaxy/jobs/runners/__init__.py
@@ -1,9 +1,14 @@
import os, logging, os.path
+from galaxy import model
+from Queue import Queue, Empty
+import time
+import threading
+
log = logging.getLogger( __name__ )
class BaseJobRunner( object ):
- def build_command_line( self, job_wrapper, include_metadata=False ):
+ def build_command_line( self, job_wrapper, include_metadata=False, include_work_dir_outputs=True ):
"""
Compose the sequence of commands necessary to execute a job. This will
currently include:
@@ -14,18 +19,6 @@
- commands to set metadata (if include_metadata is True)
"""
- def in_directory( file, directory ):
- """
- Return true, if the common prefix of both is equal to directory
- e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
- """
-
- # Make both absolute.
- directory = os.path.abspath( directory )
- file = os.path.abspath( file )
-
- return os.path.commonprefix( [ file, directory ] ) == directory
-
commands = job_wrapper.get_command_line()
# All job runners currently handle this case which should never
# occur
@@ -42,6 +35,41 @@
commands = "; ".join( job_wrapper.dependency_shell_commands + [ commands ] )
# -- Append commands to copy job outputs based on from_work_dir attribute. --
+ if include_work_dir_outputs:
+ work_dir_outputs = self.get_work_dir_outputs( job_wrapper )
+ if work_dir_outputs:
+ commands += "; " + "; ".join( [ "cp %s %s" % ( source_file, destination ) for ( source_file, destination ) in work_dir_outputs ] )
+
+ # Append metadata setting commands, we don't want to overwrite metadata
+ # that was copied over in init_meta(), as per established behavior
+ if include_metadata and self.app.config.set_metadata_externally:
+ commands += "; cd %s; " % os.path.abspath( os.getcwd() )
+ commands += job_wrapper.setup_external_metadata(
+ exec_dir = os.path.abspath( os.getcwd() ),
+ tmp_dir = job_wrapper.working_directory,
+ dataset_files_path = self.app.model.Dataset.file_path,
+ output_fnames = job_wrapper.get_output_fnames(),
+ set_extension = False,
+ kwds = { 'overwrite' : False } )
+ return commands
+
+ def get_work_dir_outputs( self, job_wrapper ):
+ """
+ Returns list of pairs (source_file, destination) describing path
+ to work_dir output file and ultimate destination.
+ """
+
+ def in_directory( file, directory ):
+ """
+ Return true, if the common prefix of both is equal to directory
+ e.g. /a/b/c/d.rst and directory is /a/b, the common prefix is /a/b
+ """
+
+ # Make both absolute.
+ directory = os.path.abspath( directory )
+ file = os.path.abspath( file )
+
+ return os.path.commonprefix( [ file, directory ] ) == directory
# Set up dict of dataset id --> output path; output path can be real or
# false depending on outputs_to_working_directory
@@ -52,6 +80,7 @@
path = dataset_path.false_path
output_paths[ dataset_path.dataset_id ] = path
+ output_pairs = []
# Walk job's output associations to find and use from_work_dir attributes.
job = job_wrapper.get_job()
job_tool = self.app.toolbox.tools_by_id.get( job.tool_id, None )
@@ -67,26 +96,165 @@
source_file = os.path.join( os.path.abspath( job_wrapper.working_directory ), hda_tool_output.from_work_dir )
destination = output_paths[ dataset.dataset_id ]
if in_directory( source_file, job_wrapper.working_directory ):
- try:
- commands += "; cp %s %s" % ( source_file, destination )
- log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) )
- except ( IOError, OSError ):
- log.debug( "Could not copy %s to %s as directed by from_work_dir" % ( source_file, destination ) )
+ output_pairs.append( ( source_file, destination ) )
+ log.debug( "Copying %s to %s as directed by from_work_dir" % ( source_file, destination ) )
else:
# Security violation.
log.exception( "from_work_dir specified a location not in the working directory: %s, %s" % ( source_file, job_wrapper.working_directory ) )
+ return output_pairs
+class ClusterJobState( object ):
+ """
+ Encapsulate the state of a cluster job, this should be subclassed as
+ needed for various job runners to capture additional information needed
+ to communicate with cluster job manager.
+ """
- # Append metadata setting commands, we don't want to overwrite metadata
- # that was copied over in init_meta(), as per established behavior
- if include_metadata and self.app.config.set_metadata_externally:
- commands += "; cd %s; " % os.path.abspath( os.getcwd() )
- commands += job_wrapper.setup_external_metadata(
- exec_dir = os.path.abspath( os.getcwd() ),
- tmp_dir = job_wrapper.working_directory,
- dataset_files_path = self.app.model.Dataset.file_path,
- output_fnames = job_wrapper.get_output_fnames(),
- set_extension = False,
- kwds = { 'overwrite' : False } )
- return commands
+ def __init__( self ):
+ self.job_wrapper = None
+ self.job_id = None
+ self.old_state = None
+ self.running = False
+ self.runner_url = None
+
+STOP_SIGNAL = object()
+
+JOB_STATUS_QUEUED = 'queue'
+JOB_STATUS_FAILED = 'fail'
+JOB_STATUS_FINISHED = 'finish'
+
+class ClusterJobRunner( BaseJobRunner ):
+ """
+ Not sure this is the best name for this class, but there is common code
+ shared between sge, pbs, drmaa, etc...
+ """
+
+ def __init__( self, app ):
+ self.app = app
+ self.sa_session = app.model.context
+ # 'watched' and 'queue' are both used to keep track of jobs to watch.
+ # 'queue' is used to add new watched jobs, and can be called from
+ # any thread (usually by the 'queue_job' method). 'watched' must only
+ # be modified by the monitor thread, which will move items from 'queue'
+ # to 'watched' and then manage the watched jobs.
+ self.watched = []
+ self.monitor_queue = Queue()
+
+ def _init_monitor_thread(self):
+ self.monitor_thread = threading.Thread( name="%s.monitor_thread" % self.runner_name, target=self.monitor )
+ self.monitor_thread.setDaemon( True )
+ self.monitor_thread.start()
+
+ def _init_worker_threads(self):
+ self.work_queue = Queue()
+ self.work_threads = []
+ nworkers = self.app.config.cluster_job_queue_workers
+ for i in range( nworkers ):
+ worker = threading.Thread( name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next )
+ worker.start()
+ self.work_threads.append( worker )
+
+ def handle_stop(self):
+ # DRMAA and SGE runners should override this and disconnect.
+ pass
+
+ def monitor( self ):
+ """
+ Watches jobs currently in the cluster queue and deals with state changes
+ (queued to running) and job completion
+ """
+ while 1:
+ # Take any new watched jobs and put them on the monitor list
+ try:
+ while 1:
+ cluster_job_state = self.monitor_queue.get_nowait()
+ if cluster_job_state is STOP_SIGNAL:
+ # TODO: This is where any cleanup would occur
+ self.handle_stop()
+ return
+ self.watched.append( cluster_job_state )
+ except Empty:
+ pass
+ # Iterate over the list of watched jobs and check state
+ self.check_watched_items()
+ # Sleep a bit before the next state check
+ time.sleep( 1 )
+
+ def run_next( self ):
+ """
+ Run the next item in the queue (a job waiting to run or finish )
+ """
+ while 1:
+ ( op, obj ) = self.work_queue.get()
+ if op is STOP_SIGNAL:
+ return
+ try:
+ if op == JOB_STATUS_QUEUED:
+ # If the next item is to be run, then only run it if the
+ # job state is "queued". Otherwise the next item was either
+ # cancelled or one of its siblings encountered an error.
+ job_state = obj.get_state()
+ if model.Job.states.QUEUED == job_state:
+ self.queue_job( obj )
+ else:
+ log.debug( "Not executing job %d in state %s" % ( obj.get_id_tag(), job_state ) )
+ elif op == JOB_STATUS_FINISHED:
+ self.finish_job( obj )
+ elif op == JOB_STATUS_FAILED:
+ self.fail_job( obj )
+ except:
+ log.exception( "Uncaught exception %sing job" % op )
+
+ def monitor_job(self, job_state):
+ self.monitor_queue.put( job_state )
+
+ def put( self, job_wrapper ):
+ """Add a job to the queue (by job identifier)"""
+ # Change to queued state before handing to worker thread so the runner won't pick it up again
+ job_wrapper.change_state( model.Job.states.QUEUED )
+ self.mark_as_queued(job_wrapper)
+
+ def shutdown( self ):
+ """Attempts to gracefully shut down the monitor thread"""
+ log.info( "sending stop signal to worker threads" )
+ self.monitor_queue.put( STOP_SIGNAL )
+ for i in range( len( self.work_threads ) ):
+ self.work_queue.put( ( STOP_SIGNAL, None ) )
+
+ def check_watched_items(self):
+ """
+ This method is responsible for iterating over self.watched and handling
+ state changes and updating self.watched with a new list of watched job
+ states. Subclasses can opt to override this directly (as older job runners will
+ initially) or just override check_watched_item and allow the list processing to
+ reuse the logic here.
+ """
+ new_watched = []
+ for cluster_job_state in self.watched:
+ new_cluster_job_state = self.check_watched_item(cluster_job_state)
+ if new_cluster_job_state:
+ new_watched.append(new_cluster_job_state)
+ self.watched = new_watched
+
+ # Subclasses should implement this unless they override check_watched_items all together.
+ def check_watched_item(self):
+ raise NotImplementedError()
+
+ def queue_job(self, job_wrapper):
+ raise NotImplementedError()
+
+ def finish_job(self, job_state):
+ raise NotImplementedError()
+
+ def fail_job(self, job_state):
+ raise NotImplementedError()
+
+ def mark_as_finished(self, job_state):
+ self.work_queue.put( ( JOB_STATUS_FINISHED, job_state ) )
+
+ def mark_as_failed(self, job_state):
+ self.work_queue.put( ( JOB_STATUS_FAILED, job_state ) )
+
+ def mark_as_queued(self, job_wrapper):
+ self.work_queue.put( ( JOB_STATUS_QUEUED, job_wrapper ) )
diff -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 lib/galaxy/jobs/runners/lwr.py
--- a/lib/galaxy/jobs/runners/lwr.py
+++ b/lib/galaxy/jobs/runners/lwr.py
@@ -1,263 +1,47 @@
import logging
import subprocess
-from Queue import Queue
-import threading
-
-import re
from galaxy import model
-from galaxy.datatypes.data import nice_size
-from galaxy.jobs.runners import BaseJobRunner
+from galaxy.jobs.runners import ClusterJobState, ClusterJobRunner, STOP_SIGNAL
-import os, errno
+import errno
from time import sleep
+from lwr_client import FileStager, Client
+
log = logging.getLogger( __name__ )
__all__ = [ 'LwrJobRunner' ]
-import urllib
-import urllib2
-import httplib
-import mmap
-import tempfile
-import time
-import simplejson
+class LwrJobRunner( ClusterJobRunner ):
+ """
+ LWR Job Runner
+ """
+ runner_name = "LWRRunner"
-class FileStager(object):
-
- def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir):
- self.client = client
- self.command_line = command_line
- self.config_files = config_files
- self.input_files = input_files
- self.output_files = output_files
- self.tool_dir = os.path.abspath(tool_dir)
+ def __init__( self, app ):
+ """Start the job runner """
+ super( LwrJobRunner, self ).__init__( app )
+ self._init_monitor_thread()
+ log.info( "starting LWR workers" )
+ self._init_worker_threads()
- self.file_renames = {}
+ def check_watched_item(self, job_state):
+ try:
+ client = self.get_client_from_state(job_state)
+ complete = client.check_complete()
+ except Exception:
+ # An orphaned job was put into the queue at app startup, so remote server went down
+ # either way we are done I guess.
+ self.mark_as_finished(job_state)
+ return None
+ if complete:
+ self.mark_as_finished(job_state)
+ return None
+ return job_state
- job_config = client.setup()
-
- self.new_working_directory = job_config['working_directory']
- self.new_outputs_directory = job_config['outputs_directory']
- self.remote_path_separator = job_config['path_separator']
-
- self.__initialize_referenced_tool_files()
- self.__upload_tool_files()
- self.__upload_input_files()
- self.__initialize_output_file_renames()
- self.__initialize_config_file_renames()
- self.__rewrite_and_upload_config_files()
- self.__rewrite_command_line()
-
- def __initialize_referenced_tool_files(self):
- pattern = r"(%s%s\S+)" % (self.tool_dir, os.sep)
- referenced_tool_files = []
- referenced_tool_files += re.findall(pattern, self.command_line)
- if self.config_files != None:
- for config_file in self.config_files:
- referenced_tool_files += re.findall(pattern, self.__read(config_file))
- self.referenced_tool_files = referenced_tool_files
-
- def __upload_tool_files(self):
- for referenced_tool_file in self.referenced_tool_files:
- tool_upload_response = self.client.upload_tool_file(referenced_tool_file)
- self.file_renames[referenced_tool_file] = tool_upload_response['path']
-
- def __upload_input_files(self):
- for input_file in self.input_files:
- input_upload_response = self.client.upload_input(input_file)
- self.file_renames[input_file] = input_upload_response['path']
-
- def __initialize_output_file_renames(self):
- for output_file in self.output_files:
- self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
- self.remote_path_separator,
- os.path.basename(output_file))
-
- def __initialize_config_file_renames(self):
- for config_file in self.config_files:
- self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
- self.remote_path_separator,
- os.path.basename(config_file))
-
- def __rewrite_paths(self, contents):
- new_contents = contents
- for local_path, remote_path in self.file_renames.iteritems():
- new_contents = new_contents.replace(local_path, remote_path)
- return new_contents
-
- def __rewrite_and_upload_config_files(self):
- for config_file in self.config_files:
- config_contents = self.__read(config_file)
- new_config_contents = self.__rewrite_paths(config_contents)
- self.client.upload_config_file(config_file, new_config_contents)
-
- def __rewrite_command_line(self):
- self.rewritten_command_line = self.__rewrite_paths(self.command_line)
-
- def get_rewritten_command_line(self):
- return self.rewritten_command_line
-
- def __read(self, path):
- input = open(path, "r")
- try:
- return input.read()
- finally:
- input.close()
-
-
-
-class Client(object):
- """
- """
- """
- """
- def __init__(self, remote_host, job_id, private_key=None):
- if not remote_host.endswith("/"):
- remote_host = remote_host + "/"
- ## If we don't have an explicit private_key defined, check for
- ## one embedded in the URL. A URL of the form
- ## https://moo@cow:8913 will try to contact https://cow:8913
- ## with a private key of moo
- private_key_format = "https?://(.*)@.*/?"
- private_key_match= re.match(private_key_format, remote_host)
- if not private_key and private_key_match:
- private_key = private_key_match.group(1)
- remote_host = remote_host.replace("%s@" % private_key, '', 1)
- self.remote_host = remote_host
- self.job_id = job_id
- self.private_key = private_key
-
- def url_open(self, request, data):
- return urllib2.urlopen(request, data)
-
- def __build_url(self, command, args):
- if self.private_key:
- args["private_key"] = self.private_key
- data = urllib.urlencode(args)
- url = self.remote_host + command + "?" + data
- return url
-
- def __raw_execute(self, command, args = {}, data = None):
- url = self.__build_url(command, args)
- request = urllib2.Request(url=url, data=data)
- response = self.url_open(request, data)
- return response
-
- def __raw_execute_and_parse(self, command, args = {}, data = None):
- response = self.__raw_execute(command, args, data)
- return simplejson.loads(response.read())
-
- def __upload_file(self, action, path, contents = None):
- """ """
- input = open(path, 'rb')
- try:
- mmapped_input = mmap.mmap(input.fileno(), 0, access = mmap.ACCESS_READ)
- return self.__upload_contents(action, path, mmapped_input)
- finally:
- input.close()
-
- def __upload_contents(self, action, path, contents):
- name = os.path.basename(path)
- args = {"job_id" : self.job_id, "name" : name}
- return self.__raw_execute_and_parse(action, args, contents)
-
- def upload_tool_file(self, path):
- return self.__upload_file("upload_tool_file", path)
-
- def upload_input(self, path):
- return self.__upload_file("upload_input", path)
-
- def upload_config_file(self, path, contents):
- return self.__upload_contents("upload_config_file", path, contents)
-
- def download_output(self, path):
- """ """
- name = os.path.basename(path)
- response = self.__raw_execute('download_output', {'name' : name,
- "job_id" : self.job_id})
- output = open(path, 'wb')
- try:
- while True:
- buffer = response.read(1024)
- if buffer == "":
- break
- output.write(buffer)
- finally:
- output.close()
-
- def launch(self, command_line):
- """ """
- return self.__raw_execute("launch", {"command_line" : command_line,
- "job_id" : self.job_id})
-
- def kill(self):
- return self.__raw_execute("kill", {"job_id" : self.job_id})
-
- def wait(self):
- """ """
- while True:
- check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id" : self.job_id })
- complete = check_complete_response["complete"] == "true"
- if complete:
- return check_complete_response
- time.sleep(1)
-
- def clean(self):
- self.__raw_execute("clean", { "job_id" : self.job_id })
-
- def setup(self):
- return self.__raw_execute_and_parse("setup", { "job_id" : self.job_id })
-
-
-
-class LwrJobRunner( BaseJobRunner ):
- """
- Lwr Job Runner
- """
- STOP_SIGNAL = object()
- def __init__( self, app ):
- """Start the job runner with 'nworkers' worker threads"""
- self.app = app
- self.sa_session = app.model.context
-
- # start workers
- self.queue = Queue()
- self.threads = []
- nworkers = app.config.local_job_queue_workers
- log.info( "starting workers" )
- for i in range( nworkers ):
- worker = threading.Thread( ( name="LwrJobRunner.thread-%d" % i ), target=self.run_next )
- worker.setDaemon( True )
- worker.start()
- self.threads.append( worker )
- log.debug( "%d workers ready", nworkers )
-
- def run_next( self ):
- """Run the next job, waiting until one is available if neccesary"""
- while 1:
- job_wrapper = self.queue.get()
- if job_wrapper is self.STOP_SIGNAL:
- return
- try:
- self.run_job( job_wrapper )
- except:
- log.exception( "Uncaught exception running job" )
-
- def determine_lwr_url(self, url):
- lwr_url = url[ len( 'lwr://' ) : ]
- return lwr_url
-
- def get_client_from_wrapper(self, job_wrapper):
- return self.get_client( job_wrapper.get_job_runner_url(), job_wrapper.job_id )
-
- def get_client(self, job_runner, job_id):
- lwr_url = self.determine_lwr_url( job_runner )
- return Client(lwr_url, job_id)
-
- def run_job( self, job_wrapper ):
+ def queue_job(self, job_wrapper):
stderr = stdout = command_line = ''
runner_url = job_wrapper.get_job_runner_url()
@@ -265,47 +49,98 @@
try:
job_wrapper.prepare()
if hasattr(job_wrapper, 'prepare_input_files_cmds') and job_wrapper.prepare_input_files_cmds is not None:
- for cmd in job_wrapper.prepare_input_file_cmds: # run the commands to stage the input files
+ for cmd in job_wrapper.prepare_input_files_cmds: # run the commands to stage the input files
#log.debug( 'executing: %s' % cmd )
if 0 != os.system(cmd):
raise Exception('Error running file staging command: %s' % cmd)
job_wrapper.prepare_input_files_cmds = None # prevent them from being used in-line
- command_line = self.build_command_line( job_wrapper, include_metadata=False )
+ command_line = self.build_command_line( job_wrapper, include_metadata=False, include_work_dir_outputs=False )
except:
job_wrapper.fail( "failure preparing job", exception=True )
log.exception("failure running job %d" % job_wrapper.job_id)
return
# If we were able to get a command line, run the job
- if command_line:
- try:
- #log.debug( 'executing: %s' % command_line )
- client = self.get_client_from_wrapper(job_wrapper)
- output_fnames = job_wrapper.get_output_fnames()
- output_files = [ str( o ) for o in output_fnames ]
- input_files = job_wrapper.get_input_fnames()
- file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir)
- rebuilt_command_line = file_stager.get_rewritten_command_line()
- client.launch( rebuilt_command_line )
+ if not command_line:
+ job_wrapper.finish( '', '' )
+ return
- job_wrapper.set_runner( runner_url, job_wrapper.job_id )
- job_wrapper.change_state( model.Job.states.RUNNING )
+ try:
+ #log.debug( 'executing: %s' % command_line )
+ client = self.get_client_from_wrapper(job_wrapper)
+ output_files = self.get_output_files(job_wrapper)
+ input_files = job_wrapper.get_input_fnames()
+ working_directory = job_wrapper.working_directory
+ file_stager = FileStager(client, command_line, job_wrapper.extra_filenames, input_files, output_files, job_wrapper.tool.tool_dir, working_directory)
+ rebuilt_command_line = file_stager.get_rewritten_command_line()
+ job_id = file_stager.job_id
+ client.launch( rebuilt_command_line )
+ job_wrapper.set_runner( runner_url, job_id )
+ job_wrapper.change_state( model.Job.states.RUNNING )
- run_results = client.wait()
- log.debug('run_results %s' % run_results )
- stdout = run_results['stdout']
- stderr = run_results['stderr']
+ except Exception, exc:
+ job_wrapper.fail( "failure running job", exception=True )
+ log.exception("failure running job %d" % job_wrapper.job_id)
+ return
-
- if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
- for output_file in output_files:
- client.download_output(output_file)
- client.clean()
- log.debug('execution finished: %s' % command_line)
- except Exception, exc:
- job_wrapper.fail( "failure running job", exception=True )
- log.exception("failure running job %d" % job_wrapper.job_id)
- return
+ lwr_job_state = ClusterJobState()
+ lwr_job_state.job_wrapper = job_wrapper
+ lwr_job_state.job_id = job_id
+ lwr_job_state.old_state = True
+ lwr_job_state.running = True
+ lwr_job_state.runner_url = runner_url
+ self.monitor_job(lwr_job_state)
+
+ def get_output_files(self, job_wrapper):
+ output_fnames = job_wrapper.get_output_fnames()
+ return [ str( o ) for o in output_fnames ]
+
+
+ def determine_lwr_url(self, url):
+ lwr_url = url[ len( 'lwr://' ) : ]
+ return lwr_url
+
+ def get_client_from_wrapper(self, job_wrapper):
+ job_id = job_wrapper.job_id
+ if hasattr(job_wrapper, 'task_id'):
+ job_id = "%s_%s" % (job_id, job_wrapper.task_id)
+ return self.get_client( job_wrapper.get_job_runner_url(), job_id )
+
+ def get_client_from_state(self, job_state):
+ job_runner = job_state.runner_url
+ job_id = job_state.job_id
+ return self.get_client(job_runner, job_id)
+
+ def get_client(self, job_runner, job_id):
+ lwr_url = self.determine_lwr_url( job_runner )
+ return Client(lwr_url, job_id)
+
+ def finish_job( self, job_state ):
+ stderr = stdout = command_line = ''
+ job_wrapper = job_state.job_wrapper
+ try:
+ client = self.get_client_from_state(job_state)
+
+ run_results = client.raw_check_complete()
+ log.debug('run_results %s' % run_results )
+ stdout = run_results['stdout']
+ stderr = run_results['stderr']
+
+ if job_wrapper.get_state() not in [ model.Job.states.ERROR, model.Job.states.DELETED ]:
+ work_dir_outputs = self.get_work_dir_outputs(job_wrapper)
+ output_files = self.get_output_files(job_wrapper)
+ for source_file, output_file in work_dir_outputs:
+ client.download_work_dir_output(source_file, job_wrapper.working_directory, output_file)
+ # Remove from full output_files list so don't try to download directly.
+ output_files.remove(output_file)
+ for output_file in output_files:
+ client.download_output(output_file, working_directory=job_wrapper.working_directory)
+ client.clean()
+ log.debug('execution finished: %s' % command_line)
+ except Exception, exc:
+ job_wrapper.fail( "failure running job", exception=True )
+ log.exception("failure running job %d" % job_wrapper.job_id)
+ return
#run the metadata setting script here
#this is terminate-able when output dataset/job is deleted
#so that long running set_meta()s can be canceled without having to reboot the server
@@ -321,7 +156,7 @@
job_wrapper.external_output_metadata.set_job_runner_external_pid( external_metadata_proc.pid, self.sa_session )
external_metadata_proc.wait()
log.debug( 'execution of external set_meta finished for job %d' % job_wrapper.job_id )
-
+
# Finish the job
try:
job_wrapper.finish( stdout, stderr )
@@ -329,17 +164,19 @@
log.exception("Job wrapper finish method failed")
job_wrapper.fail("Unable to finish job", exception=True)
- def put( self, job_wrapper ):
- """Add a job to the queue (by job identifier)"""
- # Change to queued state before handing to worker thread so the runner won't pick it up again
- job_wrapper.change_state( model.Job.states.QUEUED )
- self.queue.put( job_wrapper )
-
+ def fail_job( self, job_state ):
+ """
+ Seperated out so we can use the worker threads for it.
+ """
+ self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) )
+ job_state.job_wrapper.fail( job_state.fail_message )
+
def shutdown( self ):
"""Attempts to gracefully shut down the worker threads"""
log.info( "sending stop signal to worker threads" )
- for i in range( len( self.threads ) ):
- self.queue.put( self.STOP_SIGNAL )
+ self.monitor_queue.put( STOP_SIGNAL )
+ for i in range( len( self.work_threads ) ):
+ self.work_queue.put( ( STOP_SIGNAL, None ) )
log.info( "local job runner stopped" )
def check_pid( self, pid ):
@@ -355,8 +192,9 @@
def stop_job( self, job ):
#if our local job has JobExternalOutputMetadata associated, then our primary job has to have already finished
- if job.external_output_metadata:
- pid = job.external_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
+ job_ext_output_metadata = job.get_external_output_metadata()
+ if job_ext_output_metadata:
+ pid = job_ext_output_metadata[0].job_runner_external_pid #every JobExternalOutputMetadata has a pid set, we just need to take from one of them
if pid in [ None, '' ]:
log.warning( "stop_job(): %s: no PID in database for job, unable to stop" % job.id )
return
@@ -383,7 +221,21 @@
log.debug("Attempt remote lwr kill of job with url %s and id %s" % (lwr_url, job_id))
client = self.get_client(lwr_url, job_id)
client.kill()
+
+
def recover( self, job, job_wrapper ):
- # local jobs can't be recovered
- job_wrapper.change_state( model.Job.states.ERROR, info = "This job was killed when Galaxy was restarted. Please retry the job." )
-
+ """Recovers jobs stuck in the queued/running state when Galaxy started"""
+ job_state = ClusterJobState()
+ job_state.job_id = str( job.get_job_runner_external_id() )
+ job_state.runner_url = job_wrapper.get_job_runner_url()
+ job_wrapper.command_line = job.get_command_line()
+ job_state.job_wrapper = job_wrapper
+ if job.get_state() == model.Job.states.RUNNING:
+ log.debug( "(LWR/%s) is still in running state, adding to the LWR queue" % ( job.get_id()) )
+ job_state.old_state = True
+ job_state.running = True
+ self.monitor_queue.put( job_state )
+ elif job.get_state() == model.Job.states.QUEUED:
+ # LWR doesn't queue currently, so this indicates galaxy was shutoff while
+ # job was being staged. Not sure how to recover from that.
+ job_state.job_wrapper.fail( "This job was killed when Galaxy was restarted. Please retry the job." )
diff -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c -r e5ec6ed8f0337893cf4bc64e61089471b00b28b0 lib/galaxy/jobs/runners/lwr_client/__init__.py
--- /dev/null
+++ b/lib/galaxy/jobs/runners/lwr_client/__init__.py
@@ -0,0 +1,501 @@
+"""
+lwr_client
+==========
+
+This module contains logic for interfacing with an external LWR server.
+
+"""
+import mmap
+import os
+import re
+import time
+import urllib
+import urllib2
+
+import simplejson
+
+
+class JobInputs(object):
+ """
+ Abstractions over dynamic inputs created for a given job (namely the command to
+ execute and created configfiles).
+
+ **Parameters**
+
+ command_line : str
+ Local command to execute for this job. (To be rewritten.)
+ config_files : str
+ Config files created for this job. (To be rewritten.)
+
+
+ >>> import tempfile
+ >>> tf = tempfile.NamedTemporaryFile()
+ >>> def setup_inputs(tf):
+ ... open(tf.name, "w").write("world /path/to/input the rest")
+ ... inputs = JobInputs("hello /path/to/input", [tf.name])
+ ... return inputs
+ >>> inputs = setup_inputs(tf)
+ >>> inputs.rewrite_paths("/path/to/input", 'C:\\input')
+ >>> inputs.rewritten_command_line
+ 'hello C:\\\\input'
+ >>> inputs.rewritten_config_files[tf.name]
+ 'world C:\\\\input the rest'
+ >>> tf.close()
+ >>> tf = tempfile.NamedTemporaryFile()
+ >>> inputs = setup_inputs(tf)
+ >>> inputs.find_referenced_subfiles('/path/to')
+ ['/path/to/input']
+ >>> inputs.path_referenced('/path/to')
+ True
+ >>> inputs.path_referenced('/path/to/input')
+ True
+ >>> inputs.path_referenced('/path/to/notinput')
+ False
+ >>> tf.close()
+ """
+
+ def __init__(self, command_line, config_files):
+ self.rewritten_command_line = command_line
+ self.rewritten_config_files = {}
+ for config_file in config_files or []:
+ config_contents = _read(config_file)
+ self.rewritten_config_files[config_file] = config_contents
+
+ def find_referenced_subfiles(self, directory):
+ """
+ Return list of files below specified `directory` in job inputs. Could
+ use more sophisticated logic (match quotes to handle spaces, handle
+ subdirectories, etc...).
+
+ **Parameters**
+
+ directory : str
+ Full path to directory to search.
+
+ """
+ pattern = r"(%s%s\S+)" % (directory, os.sep)
+ referenced_files = set()
+ for input_contents in self.__items():
+ referenced_files.update(re.findall(pattern, input_contents))
+ return list(referenced_files)
+
+ def path_referenced(self, path):
+ pattern = r"%s" % path
+ found = False
+ for input_contents in self.__items():
+ if re.findall(pattern, input_contents):
+ found = True
+ break
+ return found
+
+ def rewrite_paths(self, local_path, remote_path):
+ """
+ Rewrite references to `local_path` with `remote_path` in job inputs.
+ """
+ self.__rewrite_command_line(local_path, remote_path)
+ self.__rewrite_config_files(local_path, remote_path)
+
+ def __rewrite_command_line(self, local_path, remote_path):
+ self.rewritten_command_line = self.rewritten_command_line.replace(local_path, remote_path)
+
+ def __rewrite_config_files(self, local_path, remote_path):
+ for config_file, rewritten_contents in self.rewritten_config_files.iteritems():
+ self.rewritten_config_files[config_file] = rewritten_contents.replace(local_path, remote_path)
+
+ def __items(self):
+ items = [self.rewritten_command_line]
+ items.extend(self.rewritten_config_files.values())
+ return items
+
+
+class FileStager(object):
+ """
+ Objects of the FileStager class interact with an LWR client object to
+ stage the files required to run jobs on a remote LWR server.
+
+ **Parameters**
+
+ client : Client
+ LWR client object.
+ command_line : str
+ The local command line to execute, this will be rewritten for the remote server.
+ config_files : list
+ List of Galaxy 'configfile's produced for this job. These will be rewritten and sent to remote server.
+ input_files : list
+ List of input files used by job. These will be transferred and references rewritten.
+ output_files : list
+ List of output_files produced by job.
+ tool_dir : str
+ Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server).
+ working_directory : str
+ Local path created by Galaxy for running this job.
+
+ """
+
+ def __init__(self, client, command_line, config_files, input_files, output_files, tool_dir, working_directory):
+ """
+ """
+ self.client = client
+ self.command_line = command_line
+ self.config_files = config_files
+ self.input_files = input_files
+ self.output_files = output_files
+ self.tool_dir = os.path.abspath(tool_dir)
+ self.working_directory = working_directory
+
+ # Setup job inputs, these will need to be rewritten before
+ # shipping off to remote LWR server.
+ self.job_inputs = JobInputs(self.command_line, self.config_files)
+
+ self.file_renames = {}
+
+ self.__handle_setup()
+ self.__initialize_referenced_tool_files()
+ self.__upload_tool_files()
+ self.__upload_input_files()
+ self.__upload_working_directory_files()
+ self.__initialize_output_file_renames()
+ self.__initialize_task_output_file_renames()
+ self.__initialize_config_file_renames()
+ self.__handle_rewrites()
+ self.__upload_rewritten_config_files()
+
+ def __handle_setup(self):
+ job_config = self.client.setup()
+
+ self.new_working_directory = job_config['working_directory']
+ self.new_outputs_directory = job_config['outputs_directory']
+ self.remote_path_separator = job_config['path_separator']
+ # If remote LWR server assigned job id, use that otherwise
+ # just use local job_id assigned.
+ galaxy_job_id = self.client.job_id
+ self.job_id = job_config.get('job_id', galaxy_job_id)
+ if self.job_id != galaxy_job_id:
+ # Remote LWR server assigned an id different than the
+ # Galaxy job id, update client to reflect this.
+ self.client.job_id = self.job_id
+
+ def __initialize_referenced_tool_files(self):
+ self.referenced_tool_files = self.job_inputs.find_referenced_subfiles(self.tool_dir)
+
+ def __upload_tool_files(self):
+ for referenced_tool_file in self.referenced_tool_files:
+ tool_upload_response = self.client.upload_tool_file(referenced_tool_file)
+ self.file_renames[referenced_tool_file] = tool_upload_response['path']
+
+ def __upload_input_files(self):
+ for input_file in self.input_files:
+ self.__upload_input_file(input_file)
+ self.__upload_input_extra_files(input_file)
+
+ def __upload_input_file(self, input_file):
+ if self.job_inputs.path_referenced(input_file):
+ input_upload_response = self.client.upload_input(input_file)
+ self.file_renames[input_file] = input_upload_response['path']
+
+ def __upload_input_extra_files(self, input_file):
+ # TODO: Determine if this is object store safe and what needs to be
+ # done if it is not.
+ files_path = "%s_files" % input_file[0:-len(".dat")]
+ if os.path.exists(files_path) and self.job_inputs.path_referenced(files_path):
+ for extra_file in os.listdir(files_path):
+ extra_file_path = os.path.join(files_path, extra_file)
+ relative_path = os.path.basename(files_path)
+ extra_file_relative_path = os.path.join(relative_path, extra_file)
+ response = self.client.upload_extra_input(extra_file_path, extra_file_relative_path)
+ self.file_renames[extra_file_path] = response['path']
+
+ def __upload_working_directory_files(self):
+ # Task manager stages files into working directory, these need to be
+ # uploaded if present.
+ for working_directory_file in os.listdir(self.working_directory):
+ path = os.path.join(self.working_directory, working_directory_file)
+ working_file_response = self.client.upload_working_directory_file(path)
+ self.file_renames[path] = working_file_response['path']
+
+ def __initialize_output_file_renames(self):
+ for output_file in self.output_files:
+ self.file_renames[output_file] = r'%s%s%s' % (self.new_outputs_directory,
+ self.remote_path_separator,
+ os.path.basename(output_file))
+
+ def __initialize_task_output_file_renames(self):
+ for output_file in self.output_files:
+ name = os.path.basename(output_file)
+ self.file_renames[os.path.join(self.working_directory, name)] = r'%s%s%s' % (self.new_working_directory,
+ self.remote_path_separator,
+ name)
+
+ def __initialize_config_file_renames(self):
+ for config_file in self.config_files:
+ self.file_renames[config_file] = r'%s%s%s' % (self.new_working_directory,
+ self.remote_path_separator,
+ os.path.basename(config_file))
+
+ def __rewrite_paths(self, contents):
+ new_contents = contents
+ for local_path, remote_path in self.file_renames.iteritems():
+ new_contents = new_contents.replace(local_path, remote_path)
+ return new_contents
+
+ def __handle_rewrites(self):
+ for local_path, remote_path in self.file_renames.iteritems():
+ self.job_inputs.rewrite_paths(local_path, remote_path)
+
+ def __upload_rewritten_config_files(self):
+ for config_file, new_config_contents in self.job_inputs.rewritten_config_files.iteritems():
+ self.client.upload_config_file(config_file, new_config_contents)
+
+ def get_rewritten_command_line(self):
+ """
+ Returns the rewritten version of the command line to execute suitable
+ for remote host.
+ """
+ return self.job_inputs.rewritten_command_line
+
+
+class Client(object):
+ """
+ Objects of this client class perform low-level communication with a remote LWR server.
+
+ **Parameters**
+
+ remote_host : str
+ Remote URL of the LWR server.
+ job_id : str
+ Galaxy job/task id.
+ private_key : str (optional)
+ Secret key the remote LWR server is configured with.
+ """
+
+ def __init__(self, remote_host, job_id, private_key=None):
+ if not remote_host.endswith("/"):
+ remote_host = remote_host + "/"
+ ## If we don't have an explicit private_key defined, check for
+ ## one embedded in the URL. A URL of the form
+ ## https://moo@cow:8913 will try to contact https://cow:8913
+ ## with a private key of moo
+ private_key_format = "https?://(.*)@.*/?"
+ private_key_match = re.match(private_key_format, remote_host)
+ if not private_key and private_key_match:
+ private_key = private_key_match.group(1)
+ remote_host = remote_host.replace("%s@" % private_key, '', 1)
+ self.remote_host = remote_host
+ self.job_id = job_id
+ self.private_key = private_key
+
+ def _url_open(self, request, data):
+ return urllib2.urlopen(request, data)
+
+ def __build_url(self, command, args):
+ if self.private_key:
+ args["private_key"] = self.private_key
+ data = urllib.urlencode(args)
+ url = self.remote_host + command + "?" + data
+ return url
+
+ def __raw_execute(self, command, args={}, data=None):
+ url = self.__build_url(command, args)
+ request = urllib2.Request(url=url, data=data)
+ response = self._url_open(request, data)
+ return response
+
+ def __raw_execute_and_parse(self, command, args={}, data=None):
+ response = self.__raw_execute(command, args, data)
+ return simplejson.loads(response.read())
+
+ def __upload_file(self, action, path, name=None, contents=None):
+ input = open(path, 'rb')
+ try:
+ mmapped_input = mmap.mmap(input.fileno(), 0, access=mmap.ACCESS_READ)
+ return self.__upload_contents(action, path, mmapped_input, name)
+ finally:
+ input.close()
+
+ def __upload_contents(self, action, path, contents, name=None):
+ if not name:
+ name = os.path.basename(path)
+ args = {"job_id": self.job_id, "name": name}
+ return self.__raw_execute_and_parse(action, args, contents)
+
+ def upload_tool_file(self, path):
+ """
+ Upload a tool related file (e.g. wrapper) required to run job.
+
+ **Parameters**
+
+ path : str
+ Local path tool.
+ """
+ return self.__upload_file("upload_tool_file", path)
+
+ def upload_input(self, path):
+ """
+ Upload input dataset to remote server.
+
+ **Parameters**
+
+ path : str
+ Local path of input dataset.
+ """
+ return self.__upload_file("upload_input", path)
+
+ def upload_extra_input(self, path, relative_name):
+ """
+ Upload extra input file to remote server.
+
+ **Parameters**
+
+ path : str
+ Extra files path of input dataset corresponding to this input.
+ relative_name : str
+ Relative path of extra file to upload relative to inputs extra files path.
+ """
+ return self.__upload_file("upload_extra_input", path, name=relative_name)
+
+ def upload_config_file(self, path, contents):
+ """
+ Upload a job's config file to the remote server.
+
+ **Parameters**
+
+ path : str
+ Local path to the original config file.
+ contents : str
+ Rewritten contents of the config file to upload.
+ """
+ return self.__upload_contents("upload_config_file", path, contents)
+
+ def upload_working_directory_file(self, path):
+ """
+ Upload the supplied file (path) from a job's working directory
+ to remote server.
+
+ **Parameters**
+
+ path : str
+ Path to file to upload.
+ """
+ return self.__upload_file("upload_working_directory_file", path)
+
+ def _get_output_type(self, name):
+ return self.__raw_execute_and_parse("get_output_type", {"name": name,
+ "job_id": self.job_id})
+
+ def download_work_dir_output(self, source, working_directory, output_path):
+ """
+ Download an output dataset specified with from_work_dir from the
+ remote server.
+
+ **Parameters**
+
+ source : str
+ Path in job's working_directory to find output in.
+ working_directory : str
+ Local working_directory for the job.
+ output_path : str
+ Full path to output dataset.
+ """
+ output = open(output_path, "wb")
+ name = os.path.basename(source)
+ self.__raw_download_output(name, self.job_id, "work_dir", output)
+
+ def download_output(self, path, working_directory):
+ """
+ Download an output dataset from the remote server.
+
+ **Parameters**
+
+ path : str
+ Local path of the dataset.
+ working_directory : str
+ Local working_directory for the job.
+ """
+ name = os.path.basename(path)
+ output_type = self._get_output_type(name)
+ if output_type == "direct":
+ output = open(path, "wb")
+ elif output_type == "task":
+ output = open(os.path.join(working_directory, name), "wb")
+ else:
+ raise Exception("No remote output found for dataset with path %s" % path)
+ self.__raw_download_output(name, self.job_id, output_type, output)
+
+ def __raw_download_output(self, name, job_id, output_type, output_file):
+ response = self.__raw_execute("download_output", {"name": name,
+ "job_id": self.job_id,
+ "output_type": output_type})
+ try:
+ while True:
+ buffer = response.read(1024)
+ if buffer == "":
+ break
+ output_file.write(buffer)
+ finally:
+ output_file.close()
+
+ def launch(self, command_line):
+ """
+ Run or queue up the execution of the supplied
+ `command_line` on the remote server.
+
+ **Parameters**
+
+ command_line : str
+ Command to execute.
+ """
+ return self.__raw_execute("launch", {"command_line": command_line,
+ "job_id": self.job_id})
+
+ def kill(self):
+ """
+ Cancel remote job, either removing from the queue or killing it.
+ """
+ return self.__raw_execute("kill", {"job_id": self.job_id})
+
+ def wait(self):
+ """
+ Wait for job to finish.
+ """
+ while True:
+ complete_response = self.raw_check_complete()
+ if complete_response["complete"] == "true":
+ return complete_response
+ time.sleep(1)
+
+ def raw_check_complete(self):
+ """
+ Get check_complete response from the remote server.
+ """
+ check_complete_response = self.__raw_execute_and_parse("check_complete", {"job_id": self.job_id})
+ return check_complete_response
+
+ def check_complete(self):
+ """
+ Return boolean indicating whether the job is complete.
+ """
+ return self.raw_check_complete()["complete"] == "true"
+
+ def clean(self):
+ """
+ Cleanup the remote job.
+ """
+ self.__raw_execute("clean", {"job_id": self.job_id})
+
+ def setup(self):
+ """
+ Setup remote LWR server to run this job.
+ """
+ return self.__raw_execute_and_parse("setup", {"job_id": self.job_id})
+
+
+def _read(path):
+ """
+ Utility method to quickly read small files (config files and tool
+ wrappers) into memory as strings.
+ """
+ input = open(path, "r")
+ try:
+ return input.read()
+ finally:
+ input.close()
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0
commit/galaxy-central: jgoecks: Change variable names to avoid conflict with built-in functions.
by Bitbucket 11 Jan '13
by Bitbucket 11 Jan '13
11 Jan '13
1 new commit in galaxy-central:
https://bitbucket.org/galaxy/galaxy-central/commits/c2199c181f09/
changeset: c2199c181f09
user: jgoecks
date: 2013-01-11 18:16:44
summary: Change variable names to avoid conflict with built-in functions.
affected #: 1 file
diff -r 3faa833c15b5162db69f061bdbd4e951ba2ffdc7 -r c2199c181f09fd5b94a9f5ea13fc39b28e803f2c lib/galaxy/visualization/data_providers/genome.py
--- a/lib/galaxy/visualization/data_providers/genome.py
+++ b/lib/galaxy/visualization/data_providers/genome.py
@@ -1062,8 +1062,8 @@
summary = _summarize_bbi( bbi, chrom, start, end, 1 )
f.close()
- min = 0
- max = 0
+ min_val = 0
+ max_val = 0
mean = 0
sd = 0
if summary is not None:
@@ -1077,10 +1077,10 @@
if valid_count > 1:
var /= valid_count - 1
sd = numpy.sqrt( var )
- min = summary.min_val[0]
- max = summary.max_val[0]
+ min_val = summary.min_val[0]
+ max_val = summary.max_val[0]
- return dict( data=dict( min=min, max=max, mean=mean, sd=sd ) )
+ return dict( data=dict( min=min_val, max=max_val, mean=mean, sd=sd ) )
def summarize_region( bbi, chrom, start, end, num_points ):
'''
Repository URL: https://bitbucket.org/galaxy/galaxy-central/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
1
0