2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/494b51bda9c1/ Changeset: 494b51bda9c1 User: jmchilton Date: 2014-01-12 05:40:18 Summary: Rework workflow functional testing so isn't tested by default... Meant to be used with a specific workflow, but nose is picking it up by default when no workflow has been specified. Affected #: 4 files diff -r c8c00d106c7a6883feaacca809b42fd7c107fe2a -r 494b51bda9c1a319672e92cce4ec17386cd65390 run_functional_tests.sh --- a/run_functional_tests.sh +++ b/run_functional_tests.sh @@ -51,7 +51,7 @@ python ./test/tool_shed/functional_tests.py -v --with-nosehtml --html-report-file ./test/tool_shed/run_functional_tests.html $2 fi elif [ $1 = '-workflow' ]; then - python ./scripts/functional_tests.py -v functional.test_workflow:WorkflowTestCase --with-nosehtml --html-report-file ./test/tool_shed/run_functional_tests.html -workflow $2 + python ./scripts/functional_tests.py -v functional.workflow:WorkflowTestCase --with-nosehtml --html-report-file ./test/tool_shed/run_functional_tests.html -workflow $2 elif [ $1 = '-data_managers' ]; then if [ ! $2 ]; then python ./scripts/functional_tests.py -v functional.test_data_managers --with-nosehtml --html-report-file run_functional_tests.html -data_managers diff -r c8c00d106c7a6883feaacca809b42fd7c107fe2a -r 494b51bda9c1a319672e92cce4ec17386cd65390 scripts/functional_tests.py --- a/scripts/functional_tests.py +++ b/scripts/functional_tests.py @@ -475,10 +475,10 @@ def _run_functional_test( testing_shed_tools=None ): workflow_test = __check_arg( '-workflow', param=True ) if workflow_test: - import functional.test_workflow - functional.test_workflow.WorkflowTestCase.workflow_test_file = workflow_test - functional.test_workflow.WorkflowTestCase.master_api_key = master_api_key - functional.test_workflow.WorkflowTestCase.user_api_key = get_user_api_key() + import functional.workflow + functional.workflow.WorkflowTestCase.workflow_test_file = workflow_test + functional.workflow.WorkflowTestCase.master_api_key = master_api_key + functional.workflow.WorkflowTestCase.user_api_key = get_user_api_key() data_manager_test = __check_arg( '-data_managers', param=False ) if data_manager_test: import functional.test_data_managers diff -r c8c00d106c7a6883feaacca809b42fd7c107fe2a -r 494b51bda9c1a319672e92cce4ec17386cd65390 test/functional/test_workflow.py --- a/test/functional/test_workflow.py +++ /dev/null @@ -1,185 +0,0 @@ -import os -import sys -from base.twilltestcase import TwillTestCase -from base.interactor import GalaxyInteractorApi, stage_data_in_history - -from galaxy.util import parse_xml -from galaxy.tools.test import parse_param_elem, require_file, test_data_iter, parse_output_elems -from json import load, dumps - -from logging import getLogger -log = getLogger( __name__ ) - - -class WorkflowTestCase( TwillTestCase ): - """ - Kind of a shell of a test case for running workflow tests. Probably - needs to look more like test_toolbox. - """ - workflow_test_file = None - user_api_key = None - master_api_key = None - - def test_workflow( self, workflow_test_file=None ): - maxseconds = 120 - workflow_test_file = workflow_test_file or WorkflowTestCase.workflow_test_file - assert workflow_test_file - workflow_test = parse_test_file( workflow_test_file ) - galaxy_interactor = GalaxyWorkflowInteractor( self ) - - # Calling workflow https://github.com/jmchilton/blend4j/blob/master/src/test/java/com/github/jm... - - # Import workflow - workflow_id, step_id_map, output_defs = self.__import_workflow( galaxy_interactor, workflow_test.workflow ) - - # Stage data and history for workflow - test_history = galaxy_interactor.new_history() - stage_data_in_history( galaxy_interactor, workflow_test.test_data(), test_history ) - - # Build workflow parameters - uploads = galaxy_interactor.uploads - ds_map = {} - for step_index, input_dataset_label in workflow_test.input_datasets(): - # Upload is {"src": "hda", "id": hid} - try: - upload = uploads[ workflow_test.upload_name( input_dataset_label ) ] - except KeyError: - raise AssertionError( "Failed to find upload with label %s in uploaded datasets %s" % ( input_dataset_label, uploads ) ) - - ds_map[ step_id_map[ step_index ] ] = upload - - payload = { - "history": "hist_id=%s" % test_history, - "ds_map": dumps( ds_map ), - "workflow_id": workflow_id, - } - run_response = galaxy_interactor.run_workflow( payload ).json() - - outputs = run_response[ 'outputs' ] - if not len( outputs ) == len( output_defs ): - msg_template = "Number of outputs [%d] created by workflow execution does not equal expected number from input file [%d]." - msg = msg_template % ( len( outputs ), len( output_defs ) ) - raise AssertionError( msg ) - - galaxy_interactor.wait_for_ids( test_history, outputs ) - - for expected_output_def in workflow_test.outputs: - # Get the correct hid - name, outfile, attributes = expected_output_def - - output_data = outputs[ int( name ) ] - try: - galaxy_interactor.verify_output( test_history, output_data, outfile, attributes=attributes, shed_tool_id=None, maxseconds=maxseconds ) - except Exception: - for stream in ['stdout', 'stderr']: - stream_output = galaxy_interactor.get_job_stream( test_history, output_data, stream=stream ) - print >>sys.stderr, self._format_stream( stream_output, stream=stream, format=True ) - raise - - def __import_workflow( self, galaxy_interactor, workflow ): - """ - Import workflow into Galaxy and return id and mapping of step ids. - """ - workflow_info = galaxy_interactor.import_workflow( workflow ).json() - try: - workflow_id = workflow_info[ 'id' ] - except KeyError: - raise AssertionError( "Failed to find id for workflow import response %s" % workflow_info ) - - # Well ideally the local copy of the workflow would have the same step ids - # as the one imported through the API, but API workflow imports are 1-indexed - # and GUI exports 0-indexed as of mid-november 2013. - - imported_workflow = galaxy_interactor.read_workflow( workflow_id ) - #log.info("local %s\nimported%s" % (workflow, imported_workflow)) - step_id_map = {} - local_steps_ids = sorted( [ int( step_id ) for step_id in workflow[ 'steps' ].keys() ] ) - imported_steps_ids = sorted( [ int( step_id ) for step_id in imported_workflow[ 'steps' ].keys() ] ) - for local_step_id, imported_step_id in zip( local_steps_ids, imported_steps_ids ): - step_id_map[ local_step_id ] = imported_step_id - - output_defs = [] - for local_step_id in local_steps_ids: - step_def = workflow['steps'][ str( local_step_id ) ] - output_defs.extend( step_def.get( "outputs", [] ) ) - - return workflow_id, step_id_map, output_defs - - -def parse_test_file( workflow_test_file ): - tree = parse_xml( workflow_test_file ) - root = tree.getroot() - input_elems = root.findall( "input" ) - required_files = [] - dataset_dict = {} - for input_elem in input_elems: - name, value, attrib = parse_param_elem( input_elem ) - require_file( name, value, attrib, required_files ) - dataset_dict[ name ] = value - - outputs = parse_output_elems( root ) - - workflow_file_rel_path = root.get( 'file' ) - if not workflow_file_rel_path: - raise Exception( "Workflow test XML must declare file attribute pointing to workflow under test." ) - - # TODO: Normalize this path, prevent it from accessing arbitrary files on system. - worfklow_file_abs_path = os.path.join( os.path.dirname( workflow_test_file ), workflow_file_rel_path ) - - return WorkflowTest( - dataset_dict, - required_files, - worfklow_file_abs_path, - outputs=outputs, - ) - - -class WorkflowTest( object ): - - def __init__( self, dataset_dict, required_files, workflow_file, outputs ): - self.dataset_dict = dataset_dict - self.required_files = required_files - self.workflow = load( open( workflow_file, "r" ) ) - self.outputs = outputs - - def test_data( self ): - return test_data_iter( self.required_files ) - - def upload_name( self, input_dataset_label ): - return self.dataset_dict[ input_dataset_label ] - - def input_datasets( self ): - steps = self.workflow[ "steps" ] - log.info("in input_datasets with steps %s" % steps) - for step_index, step_dict in steps.iteritems(): - if step_dict.get( "name", None ) == "Input dataset": - yield int( step_index ), step_dict[ "inputs" ][0][ "name" ] - - -class GalaxyWorkflowInteractor(GalaxyInteractorApi): - - def __init__( self, twill_test_case ): - super(GalaxyWorkflowInteractor, self).__init__( twill_test_case ) - - def import_workflow( self, workflow_rep ): - payload = { "workflow": dumps( workflow_rep ) } - return self._post( "workflows/upload", data=payload ) - - def run_workflow( self, data ): - return self._post( "workflows", data=data ) - - def read_workflow( self, id ): - return self._get( "workflows/%s" % id ).json() - - def wait_for_ids( self, history_id, ids ): - self.twill_test_case.wait_for( lambda: not all( [ self.__dataset_ready( history_id, id ) for id in ids ] ), maxseconds=120 ) - - def __dataset_ready( self, history_id, id ): - contents = self._get( 'histories/%s/contents' % history_id ).json() - for content in contents: - - if content["id"] == id: - state = content[ 'state' ] - state_ready = self._state_ready( state, error_msg="Dataset creation failed for dataset with name %s." % content[ 'name' ] ) - return state_ready - return False diff -r c8c00d106c7a6883feaacca809b42fd7c107fe2a -r 494b51bda9c1a319672e92cce4ec17386cd65390 test/functional/workflow.py --- /dev/null +++ b/test/functional/workflow.py @@ -0,0 +1,185 @@ +import os +import sys +from base.twilltestcase import TwillTestCase +from base.interactor import GalaxyInteractorApi, stage_data_in_history + +from galaxy.util import parse_xml +from galaxy.tools.test import parse_param_elem, require_file, test_data_iter, parse_output_elems +from json import load, dumps + +from logging import getLogger +log = getLogger( __name__ ) + + +class WorkflowTestCase( TwillTestCase ): + """ + Kind of a shell of a test case for running workflow tests. Probably + needs to look more like test_toolbox. + """ + workflow_test_file = None + user_api_key = None + master_api_key = None + + def test_workflow( self, workflow_test_file=None ): + maxseconds = 120 + workflow_test_file = workflow_test_file or WorkflowTestCase.workflow_test_file + assert workflow_test_file + workflow_test = parse_test_file( workflow_test_file ) + galaxy_interactor = GalaxyWorkflowInteractor( self ) + + # Calling workflow https://github.com/jmchilton/blend4j/blob/master/src/test/java/com/github/jm... + + # Import workflow + workflow_id, step_id_map, output_defs = self.__import_workflow( galaxy_interactor, workflow_test.workflow ) + + # Stage data and history for workflow + test_history = galaxy_interactor.new_history() + stage_data_in_history( galaxy_interactor, workflow_test.test_data(), test_history ) + + # Build workflow parameters + uploads = galaxy_interactor.uploads + ds_map = {} + for step_index, input_dataset_label in workflow_test.input_datasets(): + # Upload is {"src": "hda", "id": hid} + try: + upload = uploads[ workflow_test.upload_name( input_dataset_label ) ] + except KeyError: + raise AssertionError( "Failed to find upload with label %s in uploaded datasets %s" % ( input_dataset_label, uploads ) ) + + ds_map[ step_id_map[ step_index ] ] = upload + + payload = { + "history": "hist_id=%s" % test_history, + "ds_map": dumps( ds_map ), + "workflow_id": workflow_id, + } + run_response = galaxy_interactor.run_workflow( payload ).json() + + outputs = run_response[ 'outputs' ] + if not len( outputs ) == len( output_defs ): + msg_template = "Number of outputs [%d] created by workflow execution does not equal expected number from input file [%d]." + msg = msg_template % ( len( outputs ), len( output_defs ) ) + raise AssertionError( msg ) + + galaxy_interactor.wait_for_ids( test_history, outputs ) + + for expected_output_def in workflow_test.outputs: + # Get the correct hid + name, outfile, attributes = expected_output_def + + output_data = outputs[ int( name ) ] + try: + galaxy_interactor.verify_output( test_history, output_data, outfile, attributes=attributes, shed_tool_id=None, maxseconds=maxseconds ) + except Exception: + for stream in ['stdout', 'stderr']: + stream_output = galaxy_interactor.get_job_stream( test_history, output_data, stream=stream ) + print >>sys.stderr, self._format_stream( stream_output, stream=stream, format=True ) + raise + + def __import_workflow( self, galaxy_interactor, workflow ): + """ + Import workflow into Galaxy and return id and mapping of step ids. + """ + workflow_info = galaxy_interactor.import_workflow( workflow ).json() + try: + workflow_id = workflow_info[ 'id' ] + except KeyError: + raise AssertionError( "Failed to find id for workflow import response %s" % workflow_info ) + + # Well ideally the local copy of the workflow would have the same step ids + # as the one imported through the API, but API workflow imports are 1-indexed + # and GUI exports 0-indexed as of mid-november 2013. + + imported_workflow = galaxy_interactor.read_workflow( workflow_id ) + #log.info("local %s\nimported%s" % (workflow, imported_workflow)) + step_id_map = {} + local_steps_ids = sorted( [ int( step_id ) for step_id in workflow[ 'steps' ].keys() ] ) + imported_steps_ids = sorted( [ int( step_id ) for step_id in imported_workflow[ 'steps' ].keys() ] ) + for local_step_id, imported_step_id in zip( local_steps_ids, imported_steps_ids ): + step_id_map[ local_step_id ] = imported_step_id + + output_defs = [] + for local_step_id in local_steps_ids: + step_def = workflow['steps'][ str( local_step_id ) ] + output_defs.extend( step_def.get( "outputs", [] ) ) + + return workflow_id, step_id_map, output_defs + + +def parse_test_file( workflow_test_file ): + tree = parse_xml( workflow_test_file ) + root = tree.getroot() + input_elems = root.findall( "input" ) + required_files = [] + dataset_dict = {} + for input_elem in input_elems: + name, value, attrib = parse_param_elem( input_elem ) + require_file( name, value, attrib, required_files ) + dataset_dict[ name ] = value + + outputs = parse_output_elems( root ) + + workflow_file_rel_path = root.get( 'file' ) + if not workflow_file_rel_path: + raise Exception( "Workflow test XML must declare file attribute pointing to workflow under test." ) + + # TODO: Normalize this path, prevent it from accessing arbitrary files on system. + worfklow_file_abs_path = os.path.join( os.path.dirname( workflow_test_file ), workflow_file_rel_path ) + + return WorkflowTest( + dataset_dict, + required_files, + worfklow_file_abs_path, + outputs=outputs, + ) + + +class WorkflowTest( object ): + + def __init__( self, dataset_dict, required_files, workflow_file, outputs ): + self.dataset_dict = dataset_dict + self.required_files = required_files + self.workflow = load( open( workflow_file, "r" ) ) + self.outputs = outputs + + def test_data( self ): + return test_data_iter( self.required_files ) + + def upload_name( self, input_dataset_label ): + return self.dataset_dict[ input_dataset_label ] + + def input_datasets( self ): + steps = self.workflow[ "steps" ] + log.info("in input_datasets with steps %s" % steps) + for step_index, step_dict in steps.iteritems(): + if step_dict.get( "name", None ) == "Input dataset": + yield int( step_index ), step_dict[ "inputs" ][0][ "name" ] + + +class GalaxyWorkflowInteractor(GalaxyInteractorApi): + + def __init__( self, twill_test_case ): + super(GalaxyWorkflowInteractor, self).__init__( twill_test_case ) + + def import_workflow( self, workflow_rep ): + payload = { "workflow": dumps( workflow_rep ) } + return self._post( "workflows/upload", data=payload ) + + def run_workflow( self, data ): + return self._post( "workflows", data=data ) + + def read_workflow( self, id ): + return self._get( "workflows/%s" % id ).json() + + def wait_for_ids( self, history_id, ids ): + self.twill_test_case.wait_for( lambda: not all( [ self.__dataset_ready( history_id, id ) for id in ids ] ), maxseconds=120 ) + + def __dataset_ready( self, history_id, id ): + contents = self._get( 'histories/%s/contents' % history_id ).json() + for content in contents: + + if content["id"] == id: + state = content[ 'state' ] + state_ready = self._state_ready( state, error_msg="Dataset creation failed for dataset with name %s." % content[ 'name' ] ) + return state_ready + return False https://bitbucket.org/galaxy/galaxy-central/commits/c7986c31fd74/ Changeset: c7986c31fd74 User: jmchilton Date: 2014-01-12 05:40:18 Summary: More work on eliminating references to test_db_util.sa_session. ... tweaked styling of several functional tests (the non-toolbox,datamanager,workflow,api functional tests) along the way - mostly PEP-8 fixes and eliminating * imports. Did not modify tests which do not pass - but did not in the source where they fail inside a TODO. Affected #: 14 files diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/base/test_db_util.py --- a/test/base/test_db_util.py +++ b/test/base/test_db_util.py @@ -1,153 +1,223 @@ import galaxy.model from galaxy.model.orm import * -from functional.database_contexts import galaxy_context as sa_session +from functional import database_contexts +# Deprecated - import database_contexts and use galaxy_context +sa_session = database_contexts.galaxy_context from base.twilltestcase import * -import sys + + +def gx_context(): + return database_contexts.galaxy_context + def delete_obj( obj ): - sa_session.delete( obj ) - sa_session.flush() + gx_context().delete( obj ) + gx_context().flush() + + def delete_request_type_permissions( id ): - rtps = sa_session.query( galaxy.model.RequestTypePermissions ) \ - .filter( and_( galaxy.model.RequestTypePermissions.table.c.request_type_id==id ) ) \ + rtps = gx_context().query( galaxy.model.RequestTypePermissions ) \ + .filter( and_( galaxy.model.RequestTypePermissions.table.c.request_type_id == id ) ) \ .order_by( desc( galaxy.model.RequestTypePermissions.table.c.create_time ) ) for rtp in rtps: - sa_session.delete( rtp ) - sa_session.flush() + gx_context().delete( rtp ) + gx_context().flush() + + def delete_user_roles( user ): for ura in user.roles: - sa_session.delete( ura ) - sa_session.flush() + gx_context().delete( ura ) + gx_context().flush() + + def flush( obj ): - sa_session.add( obj ) - sa_session.flush() + gx_context().add( obj ) + gx_context().flush() + + def get_all_histories_for_user( user ): - return sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.user_id==user.id, - galaxy.model.History.table.c.deleted==False ) ) \ + return gx_context().query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.user_id == user.id, + galaxy.model.History.table.c.deleted == False ) ) \ .all() + + def get_dataset_permissions_by_dataset( dataset ): - return sa_session.query( galaxy.model.DatasetPermissions ) \ - .filter( galaxy.model.DatasetPermissions.table.c.dataset_id==dataset.id ) \ + return gx_context().query( galaxy.model.DatasetPermissions ) \ + .filter( galaxy.model.DatasetPermissions.table.c.dataset_id == dataset.id ) \ .all() + + def get_dataset_permissions_by_role( role ): - return sa_session.query( galaxy.model.DatasetPermissions ) \ + return gx_context().query( galaxy.model.DatasetPermissions ) \ .filter( galaxy.model.DatasetPermissions.table.c.role_id == role.id ) \ .first() + + def get_default_history_permissions_by_history( history ): - return sa_session.query( galaxy.model.DefaultHistoryPermissions ) \ - .filter( galaxy.model.DefaultHistoryPermissions.table.c.history_id==history.id ) \ + return gx_context().query( galaxy.model.DefaultHistoryPermissions ) \ + .filter( galaxy.model.DefaultHistoryPermissions.table.c.history_id == history.id ) \ .all() + + def get_default_history_permissions_by_role( role ): - return sa_session.query( galaxy.model.DefaultHistoryPermissions ) \ + return gx_context().query( galaxy.model.DefaultHistoryPermissions ) \ .filter( galaxy.model.DefaultHistoryPermissions.table.c.role_id == role.id ) \ .all() + + def get_default_user_permissions_by_role( role ): - return sa_session.query( galaxy.model.DefaultUserPermissions ) \ + return gx_context().query( galaxy.model.DefaultUserPermissions ) \ .filter( galaxy.model.DefaultUserPermissions.table.c.role_id == role.id ) \ .all() + + def get_default_user_permissions_by_user( user ): - return sa_session.query( galaxy.model.DefaultUserPermissions ) \ - .filter( galaxy.model.DefaultUserPermissions.table.c.user_id==user.id ) \ + return gx_context().query( galaxy.model.DefaultUserPermissions ) \ + .filter( galaxy.model.DefaultUserPermissions.table.c.user_id == user.id ) \ .all() + + def get_form( name ): - fdc_list = sa_session.query( galaxy.model.FormDefinitionCurrent ) \ + fdc_list = gx_context().query( galaxy.model.FormDefinitionCurrent ) \ .filter( galaxy.model.FormDefinitionCurrent.table.c.deleted == False ) \ .order_by( galaxy.model.FormDefinitionCurrent.table.c.create_time.desc() ) for fdc in fdc_list: - sa_session.refresh( fdc ) - sa_session.refresh( fdc.latest_form ) + gx_context().refresh( fdc ) + gx_context().refresh( fdc.latest_form ) if fdc.latest_form.name == name: return fdc.latest_form return None + + def get_folder( parent_id, name, description ): - return sa_session.query( galaxy.model.LibraryFolder ) \ - .filter( and_( galaxy.model.LibraryFolder.table.c.parent_id==parent_id, - galaxy.model.LibraryFolder.table.c.name==name, - galaxy.model.LibraryFolder.table.c.description==description ) ) \ + return gx_context().query( galaxy.model.LibraryFolder ) \ + .filter( and_( galaxy.model.LibraryFolder.table.c.parent_id == parent_id, + galaxy.model.LibraryFolder.table.c.name == name, + galaxy.model.LibraryFolder.table.c.description == description ) ) \ .first() + + def get_group_by_name( name ): - return sa_session.query( galaxy.model.Group ).filter( galaxy.model.Group.table.c.name==name ).first() + return gx_context().query( galaxy.model.Group ).filter( galaxy.model.Group.table.c.name == name ).first() + + def get_group_role_associations_by_group( group ): - return sa_session.query( galaxy.model.GroupRoleAssociation ) \ + return gx_context().query( galaxy.model.GroupRoleAssociation ) \ .filter( galaxy.model.GroupRoleAssociation.table.c.group_id == group.id ) \ .all() + + def get_group_role_associations_by_role( role ): - return sa_session.query( galaxy.model.GroupRoleAssociation ) \ + return gx_context().query( galaxy.model.GroupRoleAssociation ) \ .filter( galaxy.model.GroupRoleAssociation.table.c.role_id == role.id ) \ .all() + + def get_latest_dataset(): - return sa_session.query( galaxy.model.Dataset ) \ + return gx_context().query( galaxy.model.Dataset ) \ .order_by( desc( galaxy.model.Dataset.table.c.create_time ) ) \ .first() + + def get_latest_hda(): - return sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ + return gx_context().query( galaxy.model.HistoryDatasetAssociation ) \ .order_by( desc( galaxy.model.HistoryDatasetAssociation.table.c.create_time ) ) \ .first() + + def get_latest_history_for_user( user ): - return sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==user.id ) ) \ + return gx_context().query( galaxy.model.History ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() + + def get_latest_ldda_by_name( name ): - return sa_session.query( galaxy.model.LibraryDatasetDatasetAssociation ) \ - .filter( and_( galaxy.model.LibraryDatasetDatasetAssociation.table.c.name==name, + return gx_context().query( galaxy.model.LibraryDatasetDatasetAssociation ) \ + .filter( and_( galaxy.model.LibraryDatasetDatasetAssociation.table.c.name == name, galaxy.model.LibraryDatasetDatasetAssociation.table.c.deleted == False ) ) \ .order_by( desc( galaxy.model.LibraryDatasetDatasetAssociation.table.c.create_time ) ) \ .first() + + def get_latest_lddas( limit ): - return sa_session.query( galaxy.model.LibraryDatasetDatasetAssociation ) \ + return gx_context().query( galaxy.model.LibraryDatasetDatasetAssociation ) \ .order_by( desc( galaxy.model.LibraryDatasetDatasetAssociation.table.c.update_time ) ) \ .limit( limit ) + + def get_library( name, description, synopsis ): - return sa_session.query( galaxy.model.Library ) \ - .filter( and_( galaxy.model.Library.table.c.name==name, - galaxy.model.Library.table.c.description==description, - galaxy.model.Library.table.c.synopsis==synopsis, - galaxy.model.Library.table.c.deleted==False ) ) \ + return gx_context().query( galaxy.model.Library ) \ + .filter( and_( galaxy.model.Library.table.c.name == name, + galaxy.model.Library.table.c.description == description, + galaxy.model.Library.table.c.synopsis == synopsis, + galaxy.model.Library.table.c.deleted == False ) ) \ .first() + + def get_private_role( user ): for role in user.all_roles(): if role.name == user.email and role.description == 'Private Role for %s' % user.email: return role raise AssertionError( "Private role not found for user '%s'" % user.email ) + + def get_request_by_name( name ): - return sa_session.query( galaxy.model.Request ) \ - .filter( and_( galaxy.model.Request.table.c.name==name, - galaxy.model.Request.table.c.deleted==False ) ) \ - .first() + return gx_context().query( galaxy.model.Request ) \ + .filter( and_( galaxy.model.Request.table.c.name == name, + galaxy.model.Request.table.c.deleted == False ) ) \ + .first() + + def get_request_type_by_name( name ): - return sa_session.query( galaxy.model.RequestType ) \ - .filter( and_( galaxy.model.RequestType.table.c.name==name ) ) \ + return gx_context().query( galaxy.model.RequestType ) \ + .filter( and_( galaxy.model.RequestType.table.c.name == name ) ) \ .order_by( desc( galaxy.model.RequestType.table.c.create_time ) ) \ .first() + + def get_role_by_name( name ): - return sa_session.query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name==name ).first() + return gx_context().query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name == name ).first() + + def get_user( email ): - return sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email==email ) \ + return gx_context().query( galaxy.model.User ) \ + .filter( galaxy.model.User.table.c.email == email ) \ .first() + + def get_user_address( user, short_desc ): - return sa_session.query( galaxy.model.UserAddress ) \ - .filter( and_( galaxy.model.UserAddress.table.c.user_id==user.id, - galaxy.model.UserAddress.table.c.desc==short_desc, - galaxy.model.UserAddress.table.c.deleted==False ) ) \ + return gx_context().query( galaxy.model.UserAddress ) \ + .filter( and_( galaxy.model.UserAddress.table.c.user_id == user.id, + galaxy.model.UserAddress.table.c.desc == short_desc, + galaxy.model.UserAddress.table.c.deleted == False ) ) \ .order_by( desc( galaxy.model.UserAddress.table.c.create_time ) ) \ - .first() + .first() + + def get_user_group_associations_by_group( group ): - return sa_session.query( galaxy.model.UserGroupAssociation ) \ + return gx_context().query( galaxy.model.UserGroupAssociation ) \ .filter( galaxy.model.UserGroupAssociation.table.c.group_id == group.id ) \ .all() + + def get_user_info_form_definition(): return galaxy.model.FormDefinition.types.USER_INFO + + def get_user_role_associations_by_role( role ): - return sa_session.query( galaxy.model.UserRoleAssociation ) \ + return gx_context().query( galaxy.model.UserRoleAssociation ) \ .filter( galaxy.model.UserRoleAssociation.table.c.role_id == role.id ) \ .all() + + def mark_obj_deleted( obj ): obj.deleted = True - sa_session.add( obj ) - sa_session.flush() + gx_context().add( obj ) + gx_context().flush() + + def refresh( obj ): - sa_session.refresh( obj ) + gx_context().refresh( obj ) diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_DNAse_flanked_genes.py --- a/test/functional/test_DNAse_flanked_genes.py +++ b/test/functional/test_DNAse_flanked_genes.py @@ -1,21 +1,24 @@ import galaxy.model from galaxy.model.orm import * -from base.test_db_util import sa_session +import database_contexts from base.twilltestcase import TwillTestCase """ A sample analysis""" + class AnalysisDNAseHSSFlankedGenes( TwillTestCase ): + def test_get_DNAseHSS_flanked_genes( self ): + sa_session = database_contexts.galaxy_context self.logout() self.login( email='test@bx.psu.edu' ) admin_user = sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ) \ + .filter( galaxy.model.User.table.c.email == 'test@bx.psu.edu' ) \ .one() self.new_history( name='DNAseHSS_flanked_genes' ) history1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() track_params = dict( diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_admin_features.py --- a/test/functional/test_admin_features.py +++ b/test/functional/test_admin_features.py @@ -1,5 +1,32 @@ -from base.twilltestcase import * -from base.test_db_util import * +from base.twilltestcase import TwillTestCase +from functional import database_contexts +import galaxy.model +from base.test_db_util import ( + get_user, + get_private_role, + get_all_histories_for_user, + get_latest_history_for_user, + get_default_history_permissions_by_history, + get_latest_dataset, + refresh, + flush, + get_group_by_name, + get_role_by_name, + get_user_group_associations_by_group, + get_default_history_permissions_by_role, + get_default_user_permissions_by_role, + get_user_role_associations_by_role, + get_group_role_associations_by_group, + get_dataset_permissions_by_role, + get_group_role_associations_by_role, +) + + +# Globals setup by these tests. +regular_user1 = regular_user2 = regular_user3 = admin_user = None +role_one = role_two = role_three = None +group_zero = group_one = group_two = None + class TestDataSecurity( TwillTestCase ): def test_000_initiate_users( self ): @@ -19,6 +46,7 @@ global admin_user admin_user = get_user( 'test@bx.psu.edu' ) assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' + def test_005_create_new_user_account_as_admin( self ): """Testing creating a new user account as admin""" # Logged in as admin_user @@ -31,8 +59,9 @@ username='admin-user', redirect='' ) if not username_taken: - raise AssertionError, "The public name (%s) is already being used by another user, but no error was displayed" \ - % 'admin-user' + error_msg = "The public name (%s) is already being used by another user, but no error was displayed" % 'admin-user' + raise AssertionError( error_msg ) + # Test setting the user name to an invalid one. Note that the account must not exist in order # for this test to work as desired, so the email we're passing is important... previously_created, username_taken, invalid_username = self.create_new_account_as_admin( email='diff@you.com', @@ -40,7 +69,7 @@ username='h', redirect='' ) if not invalid_username: - raise AssertionError, "The public name (%s) is is invalid, but no error was displayed" % username + raise AssertionError( "The public name (%s) is is invalid, but no error was displayed" % 'diff@you.com' ) previously_created, username_taken, invalid_username = self.create_new_account_as_admin( email=email, password=password, username='regular-user3', @@ -61,7 +90,7 @@ raise AssertionError( '%d UserRoleAssociations were created for user %s when the admin created the account ( should have been 1 )' \ % ( len( regular_user3.roles ), regular_user3.email ) ) for ura in regular_user3.roles: - role = sa_session.query( galaxy.model.Role ).get( ura.role_id ) + role = database_contexts.galaxy_context.query( galaxy.model.Role ).get( ura.role_id ) if not previously_created and role.type != 'private': raise AssertionError( 'Role created for user %s when the admin created the account is not private, type is' \ % str( role.type ) ) @@ -73,9 +102,11 @@ # Make sure the user was not associated with any groups if regular_user3.groups: raise AssertionError( 'Groups were incorrectly associated with user %s when the admin created the account' % email ) + def test_010_reset_password_as_admin( self ): """Testing reseting a user password as admin""" self.reset_password_as_admin( user_id=self.security.encode_id( regular_user3.id ), password='testreset' ) + def test_015_login_after_password_reset( self ): """Testing logging in after an admin reset a password - tests DefaultHistoryPermissions for accounts created by an admin""" # logged in as admin_user @@ -84,7 +115,7 @@ # Make sure a History and HistoryDefaultPermissions exist for the user latest_history = get_latest_history_for_user( regular_user3 ) if not latest_history.user_id == regular_user3.id: - raise AssertionError( 'A history was not created for user %s when he logged in' % email ) + raise AssertionError( 'A history was not created for user %s when he logged in' % regular_user3.email ) if not latest_history.default_permissions: raise AssertionError( 'No DefaultHistoryPermissions were created for history id %d when it was created' % latest_history.id ) dhps = get_default_history_permissions_by_history( latest_history ) @@ -102,22 +133,25 @@ if dp.action != galaxy.model.Dataset.permitted_actions.DATASET_MANAGE_PERMISSIONS.action: raise AssertionError( 'The DatasetPermissions for dataset id %d is %s ( should have been %s )' \ % ( latest_dataset.id, - latest_dataset.actions.action, + latest_dataset.actions.action, galaxy.model.Dataset.permitted_actions.DATASET_MANAGE_PERMISSIONS.action ) ) self.logout() # Reset the password to the default for later tests self.login( email='test@bx.psu.edu' ) self.reset_password_as_admin( user_id=self.security.encode_id( regular_user3.id ), password='testuser' ) + def test_020_mark_user_deleted( self ): """Testing marking a user account as deleted""" # Logged in as admin_user self.mark_user_deleted( user_id=self.security.encode_id( regular_user3.id ), email=regular_user3.email ) if not regular_user3.active_histories: raise AssertionError( 'HistoryDatasetAssociations for regular_user3 were incorrectly deleted when the user was marked deleted' ) + def test_025_undelete_user( self ): """Testing undeleting a user account""" # Logged in as admin_user self.undelete_user( user_id=self.security.encode_id( regular_user3.id ), email=regular_user3.email ) + def test_030_create_role( self ): """Testing creating new role with 3 members ( and a new group named the same ), then renaming the role""" # Logged in as admin_user @@ -137,7 +171,7 @@ "One of the groups associated with this role is the newly created group with the same name." ] ) # Get the role object for later tests global role_one - role_one = sa_session.query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name==name ).first() + role_one = database_contexts.galaxy_context.query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name == name ).first() assert role_one is not None, 'Problem retrieving role named "Role One" from the database' # Make sure UserRoleAssociations are correct if len( role_one.users ) != len( in_user_ids ): @@ -156,13 +190,14 @@ group_zero = get_group_by_name( name ) # Rename the role rename = "Role One's been Renamed" - new_description="This is Role One's Re-described" + new_description = "This is Role One's Re-described" self.rename_role( self.security.encode_id( role_one.id ), name=rename, description=new_description ) self.visit_url( '%s/admin/roles' % self.url ) self.check_page_for_string( rename ) self.check_page_for_string( new_description ) # Reset the role back to the original name and description self.rename_role( self.security.encode_id( role_one.id ), name=name, description=description ) + def test_035_create_group( self ): """Testing creating new group with 3 members and 2 associated roles, then renaming it""" # Logged in as admin_user @@ -202,6 +237,7 @@ self.check_page_for_string( rename ) # Reset the group back to the original name self.rename_group( self.security.encode_id( group_one.id ), name=name ) + def test_040_add_members_and_role_to_group( self ): """Testing editing user membership and role associations of an existing group""" # Logged in as admin_user @@ -224,6 +260,7 @@ group_two.name, user_ids=user_ids, role_ids=role_ids ) + def test_045_create_role_with_user_and_group_associations( self ): """Testing creating a role with user and group associations""" # Logged in as admin_user @@ -233,9 +270,9 @@ # associate_users_and_groups_with_role() method. name = 'Role Two' description = 'This is Role Two' - user_ids=[ str( admin_user.id ) ] - group_ids=[ str( group_two.id ) ] - private_role=admin_user.email + user_ids = [ str( admin_user.id ) ] + group_ids = [ str( group_two.id ) ] + private_role = admin_user.email # Create the role self.create_role( name=name, description=description, @@ -263,15 +300,16 @@ refresh( group_two ) if len( group_two.roles ) != 2: raise AssertionError( '%d GroupRoleAssociations are associated with group id %d ( should be 2 )' % ( len( group_two.roles ), group_two.id ) ) + def test_050_change_user_role_associations( self ): """Testing changing roles associated with a user""" # Logged in as admin_user # Create a new role with no associations name = 'Role Three' description = 'This is Role Three' - user_ids=[] - group_ids=[] - private_role=admin_user.email + user_ids = [] + group_ids = [] + private_role = admin_user.email self.create_role( name=name, description=description, in_user_ids=user_ids, @@ -301,6 +339,7 @@ if len( admin_user.roles ) != 4: raise AssertionError( '%d UserRoleAssociations are associated with %s ( should be 4 )' % \ ( len( admin_user.roles ), admin_user.email ) ) + def test_055_mark_group_deleted( self ): """Testing marking a group as deleted""" # Logged in as admin_user @@ -314,6 +353,7 @@ raise AssertionError( '%s incorrectly lost all members when it was marked as deleted.' % group_two.name ) if not group_two.roles: raise AssertionError( '%s incorrectly lost all role associations when it was marked as deleted.' % group_two.name ) + def test_060_undelete_group( self ): """Testing undeleting a deleted group""" # Logged in as admin_user @@ -321,6 +361,7 @@ refresh( group_two ) if group_two.deleted: raise AssertionError( '%s was not correctly marked as not deleted.' % group_two.name ) + def test_065_mark_role_deleted( self ): """Testing marking a role as deleted""" # Logged in as admin_user @@ -335,10 +376,12 @@ raise AssertionError( '%s incorrectly lost all user associations when it was marked as deleted.' % role_two.name ) if not role_two.groups: raise AssertionError( '%s incorrectly lost all group associations when it was marked as deleted.' % role_two.name ) + def test_070_undelete_role( self ): """Testing undeleting a deleted role""" # Logged in as admin_user self.undelete_role( self.security.encode_id( role_two.id ), role_two.name ) + def test_075_purge_user( self ): """Testing purging a user account""" # Logged in as admin_user @@ -352,26 +395,27 @@ if len( regular_user3.default_permissions ) != 1: raise AssertionError( 'DefaultUserPermissions for user %s were not deleted.' % regular_user3.email ) for dup in regular_user3.default_permissions: - role = sa_session.query( galaxy.model.Role ).get( dup.role_id ) + role = database_contexts.galaxy_context.query( galaxy.model.Role ).get( dup.role_id ) if role.type != 'private': raise AssertionError( 'DefaultUserPermissions for user %s are not related with the private role.' % regular_user3.email ) # Make sure History deleted for history in regular_user3.histories: refresh( history ) if not history.deleted: - raise AssertionError( 'User %s has active history id %d after their account was marked as purged.' % ( regular_user3.email, hda.id ) ) + raise AssertionError( 'User %s has active history id %d after their account was marked as purged.' % ( regular_user3.email, history.id ) ) # NOTE: Not all hdas / datasets will be deleted at the time a history is deleted - the cleanup_datasets.py script # is responsible for this. # Make sure UserGroupAssociations deleted if regular_user3.groups: - raise AssertionError( 'User %s has active group id %d after their account was marked as purged.' % ( regular_user3.email, uga.id ) ) + raise AssertionError( 'User %s has active group after their account was marked as purged.' % ( regular_user3.email ) ) # Make sure UserRoleAssociations deleted EXCEPT FOR THE PRIVATE ROLE if len( regular_user3.roles ) != 1: raise AssertionError( 'UserRoleAssociations for user %s were not deleted.' % regular_user3.email ) for ura in regular_user3.roles: - role = sa_session.query( galaxy.model.Role ).get( ura.role_id ) + role = database_contexts.galaxy_context.query( galaxy.model.Role ).get( ura.role_id ) if role.type != 'private': raise AssertionError( 'UserRoleAssociations for user %s are not related with the private role.' % regular_user3.email ) + def test_080_manually_unpurge_user( self ): """Testing manually un-purging a user account""" # Logged in as admin_user @@ -381,6 +425,7 @@ regular_user3.purged = False regular_user3.deleted = False flush( regular_user3 ) + def test_085_purge_group( self ): """Testing purging a group""" # Logged in as admin_user @@ -394,6 +439,7 @@ raise AssertionError( "Purging the group did not delete the GroupRoleAssociations for group_id '%s'" % group_two.id ) # Undelete the group for later test runs self.undelete_group( self.security.encode_id( group_two.id ), group_two.name ) + def test_090_purge_role( self ): """Testing purging a role""" # Logged in as admin_user @@ -414,6 +460,7 @@ # Make sure there are no DatasetPermissionss if get_dataset_permissions_by_role( role_two ): raise AssertionError( "Purging the role did not delete the DatasetPermissionss for role_id '%s'" % role_two.id ) + def test_095_manually_unpurge_role( self ): """Testing manually un-purging a role""" # Logged in as admin_user @@ -422,6 +469,7 @@ role_two.purged = False flush( role_two ) self.undelete_role( self.security.encode_id( role_two.id ), role_two.name ) + def test_999_reset_data_for_later_test_runs( self ): """Reseting data to enable later test runs to pass""" # Logged in as admin_user @@ -433,8 +481,8 @@ self.purge_role( self.security.encode_id( role.id ), role.name ) # Manually delete the role from the database refresh( role ) - sa_session.delete( role ) - sa_session.flush() + database_contexts.galaxy_context.delete( role ) + database_contexts.galaxy_context.flush() ################## # Eliminate all groups ################## @@ -443,8 +491,8 @@ self.purge_group( self.security.encode_id( group.id ), group.name ) # Manually delete the group from the database refresh( group ) - sa_session.delete( group ) - sa_session.flush() + database_contexts.galaxy_context.delete( group ) + database_contexts.galaxy_context.flush() ################## # Make sure all users are associated only with their private roles ################## diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_data_security.py --- a/test/functional/test_data_security.py +++ b/test/functional/test_data_security.py @@ -1,7 +1,24 @@ -from base.twilltestcase import * -from base.test_db_util import * +import galaxy.model +from base.twilltestcase import TwillTestCase +from base.test_db_util import ( + get_user, + get_private_role, + get_latest_history_for_user, + get_default_history_permissions_by_history, + get_latest_dataset, + refresh, + get_default_user_permissions_by_user, + get_dataset_permissions_by_dataset, +) + +regular_user1 = regular_user2 = regular_user3 = None +admin_user = None +admin_user_private_role = regular_user1_private_role = None +regular_user2_private_role = None + class TestDataSecurity( TwillTestCase ): + def test_000_initiate_users( self ): """Ensuring all required user accounts exist""" self.logout() @@ -32,6 +49,7 @@ assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' global admin_user_private_role admin_user_private_role = get_private_role( admin_user ) + def test_005_default_permissions( self ): """Testing initial settings for DefaultUserPermissions and DefaultHistoryPermissions""" # Logged in as admin_user @@ -59,7 +77,8 @@ # Try deleting the admin_user's private role self.manage_roles_and_groups_for_user( self.security.encode_id( admin_user.id ), out_role_ids=str( admin_user_private_role.id ), - strings_displayed = [ "You cannot eliminate a user's private role association." ] ) + strings_displayed=[ "You cannot eliminate a user's private role association." ] ) + def test_010_private_role_creation_and_default_history_permissions( self ): """Testing private role creation and changing DefaultHistoryPermissions for new histories""" # Logged in as admin_user @@ -127,6 +146,7 @@ if dps != dhps: raise AssertionError( 'DatasetPermissions "%s" for dataset id %d differ from DefaultHistoryPermissions "%s" for history id %d' \ % ( str( dps ), latest_dataset.id, str( dhps ), latest_history.id ) ) + def test_015_change_default_permissions_for_current_history( self ): """Testing changing DefaultHistoryPermissions for the current history""" # logged in a regular_user1 @@ -139,7 +159,6 @@ # Make sure these are in sorted order for later comparison actions_in = [ 'manage permissions' ] permissions_out = [ 'DATASET_ACCESS' ] - actions_out = [ 'access' ] # Change DefaultHistoryPermissions for the current history self.history_set_default_permissions( permissions_out=permissions_out, permissions_in=permissions_in, role_id=str( regular_user2_private_role.id ) ) if len( latest_history.default_permissions ) != len( actions_in ): @@ -168,6 +187,7 @@ if dps != dhps: raise AssertionError( 'DatasetPermissionss "%s" for dataset id %d differ from DefaultHistoryPermissions "%s"' \ % ( str( dps ), latest_dataset.id, str( dhps ) ) ) + def test_999_reset_data_for_later_test_runs( self ): """Reseting data to enable later test runs to pass""" # Logged in as regular_user2 diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_dataset_features.py --- a/test/functional/test_dataset_features.py +++ b/test/functional/test_dataset_features.py @@ -3,7 +3,7 @@ class TestDatasetFeatures( TwillTestCase ): - + def test_0000_initiate_users( self ): """Ensuring all required user accounts exist""" self.logout() @@ -18,18 +18,18 @@ self.new_history() latest_history = test_db_util.get_latest_history_for_user( admin_user ) assert latest_history is not None, "Problem retrieving latest_history from database" - + def test_0005_initiate_data( self ): '''Ensure that data exists for this test suite.''' self.upload_file( '1.bed' ) - + def test_0010_view_dataset_params( self ): '''Test viewing a dataset's parameters.''' hda = self.find_hda_by_dataset_name( '1.bed' ) assert hda is not None, 'Could not retrieve latest hda from history API.' self.visit_url( '/datasets/%s/show_params' % hda[ 'id'] ) self.check_for_strings( strings_displayed=[ '1.bed', 'uploaded' ] ) - + def test_0015_report_dataset_error( self ): '''Load and submit the report error form. This should show an error message, as the functional test instance should not be configured for email.''' hda = test_db_util.get_latest_hda() diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_get_data.py --- a/test/functional/test_get_data.py +++ b/test/functional/test_get_data.py @@ -1,7 +1,12 @@ -import galaxy.model -from galaxy.model.orm import * from base.twilltestcase import TwillTestCase -from base.test_db_util import * +from base.test_db_util import ( + get_user, + get_latest_history_for_user, + get_latest_hda, +) + +admin_user = None + class UploadData( TwillTestCase ): @@ -25,7 +30,7 @@ self.is_history_empty() return get_latest_history_for_user( user ) - def test_0005_upload_file( self ): + def test_0005_upload_file( self ): """ Test uploading 1.bed, NOT setting the file format """ @@ -39,13 +44,13 @@ self.delete_history( id=self.security.encode_id( history.id ) ) - def test_0006_upload_file( self ): + def test_0006_upload_file( self ): """ Test uploading 1.bed.spaces, with space to tab selected, NOT setting the file format """ history = self.create_fresh_history( admin_user ) - self.upload_file( '1.bed.spaces', space_to_tab = True ) + self.upload_file( '1.bed.spaces', space_to_tab=True ) hda = get_latest_hda() assert hda is not None, "Problem retrieving hda from database" self.verify_dataset_correctness( '1.bed', hid=str( hda.hid ) ) @@ -209,13 +214,13 @@ history = self.create_fresh_history( admin_user ) # lped data types include a ped_file and a map_file ( which is binary ) - self.upload_file( None, ftype='lped', metadata = [ { 'name':'base_name', 'value':'rgenetics' } ], composite_data = [ { 'name':'ped_file', 'value':'tinywga.ped' }, { 'name':'map_file', 'value':'tinywga.map'} ] ) + self.upload_file( None, ftype='lped', metadata=[ { 'name':'base_name', 'value':'rgenetics' } ], composite_data=[ { 'name':'ped_file', 'value':'tinywga.ped' }, { 'name':'map_file', 'value':'tinywga.map'} ] ) # Get the latest hid for testing hda = get_latest_hda() assert hda is not None, "Problem retrieving hda from database" # We'll test against the resulting ped file and map file for correctness - self.verify_composite_datatype_file_content( 'tinywga.ped', str( hda.id ), base_name = 'rgenetics.ped' ) - self.verify_composite_datatype_file_content( 'tinywga.map', str( hda.id ), base_name = 'rgenetics.map' ) + self.verify_composite_datatype_file_content( 'tinywga.ped', str( hda.id ), base_name='rgenetics.ped' ) + self.verify_composite_datatype_file_content( 'tinywga.map', str( hda.id ), base_name='rgenetics.map' ) self.check_hda_json_for_key_value( self.security.encode_id( hda.id ), "metadata_base_name", "rgenetics", use_string_contains=True ) @@ -229,13 +234,13 @@ history = self.create_fresh_history( admin_user ) # lped data types include a ped_file and a map_file ( which is binary ) - self.upload_file( None, ftype='lped', metadata = [ { 'name':'base_name', 'value':'rgenetics' } ], composite_data = [ { 'name':'ped_file', 'value':'tinywga.ped', 'space_to_tab':True }, { 'name':'map_file', 'value':'tinywga.map'} ] ) + self.upload_file( None, ftype='lped', metadata=[ { 'name':'base_name', 'value':'rgenetics' } ], composite_data=[ { 'name':'ped_file', 'value':'tinywga.ped', 'space_to_tab':True }, { 'name':'map_file', 'value':'tinywga.map'} ] ) # Get the latest hid for testing hda = get_latest_hda() assert hda is not None, "Problem retrieving hda from database" # We'll test against the resulting ped file and map file for correctness - self.verify_composite_datatype_file_content( 'tinywga.ped.space_to_tab', str( hda.id ), base_name = 'rgenetics.ped' ) - self.verify_composite_datatype_file_content( 'tinywga.map', str( hda.id ), base_name = 'rgenetics.map' ) + self.verify_composite_datatype_file_content( 'tinywga.ped.space_to_tab', str( hda.id ), base_name='rgenetics.ped' ) + self.verify_composite_datatype_file_content( 'tinywga.map', str( hda.id ), base_name='rgenetics.map' ) self.check_hda_json_for_key_value( self.security.encode_id( hda.id ), "metadata_base_name", "rgenetics", use_string_contains=True ) @@ -250,8 +255,8 @@ # pbed data types include a bim_file, a bed_file and a fam_file self.upload_file( None, ftype='pbed', - metadata = [ { 'name':'base_name', 'value':'rgenetics' } ], - composite_data = [ + metadata=[ { 'name':'base_name', 'value':'rgenetics' } ], + composite_data=[ { 'name':'bim_file', 'value':'tinywga.bim' }, { 'name':'bed_file', 'value':'tinywga.bed' }, { 'name':'fam_file', 'value':'tinywga.fam' } ]) @@ -259,9 +264,9 @@ hda = get_latest_hda() assert hda is not None, "Problem retrieving hda from database" # We'll test against the resulting ped file and map file for correctness - self.verify_composite_datatype_file_content( 'tinywga.bim', str( hda.id ), base_name = 'rgenetics.bim' ) - self.verify_composite_datatype_file_content( 'tinywga.bed', str( hda.id ), base_name = 'rgenetics.bed' ) - self.verify_composite_datatype_file_content( 'tinywga.fam', str( hda.id ), base_name = 'rgenetics.fam' ) + self.verify_composite_datatype_file_content( 'tinywga.bim', str( hda.id ), base_name='rgenetics.bim' ) + self.verify_composite_datatype_file_content( 'tinywga.bed', str( hda.id ), base_name='rgenetics.bed' ) + self.verify_composite_datatype_file_content( 'tinywga.fam', str( hda.id ), base_name='rgenetics.fam' ) self.check_hda_json_for_key_value( self.security.encode_id( hda.id ), "metadata_base_name", "rgenetics", use_string_contains=True ) diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_history_functions.py --- a/test/functional/test_history_functions.py +++ b/test/functional/test_history_functions.py @@ -4,6 +4,7 @@ from base.test_db_util import sa_session from base.twilltestcase import * + class TestHistory( TwillTestCase ): def test_000_history_behavior_between_logout_login( self ): @@ -16,7 +17,7 @@ global anonymous_history anonymous_history = ( sa_session.query( galaxy.model.History ) - .filter( and_( galaxy.model.History.table.c.deleted==False, galaxy.model.History.table.c.name==name ) ) + .filter( and_( galaxy.model.History.table.c.deleted == False, galaxy.model.History.table.c.name == name ) ) .order_by( desc( galaxy.model.History.table.c.create_time ) ) .first() ) @@ -26,7 +27,7 @@ self.login( email='test1@bx.psu.edu', username='regular-user1' ) global regular_user1 regular_user1 = sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email=='test1@bx.psu.edu' ) \ + .filter( galaxy.model.User.table.c.email == 'test1@bx.psu.edu' ) \ .first() assert regular_user1 is not None, 'Problem retrieving user with email "test1@bx.psu.edu" from the database' # Current history should be anonymous_history @@ -39,21 +40,21 @@ self.login( email='test2@bx.psu.edu', username='regular-user2' ) global regular_user2 regular_user2 = sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email=='test2@bx.psu.edu' ) \ + .filter( galaxy.model.User.table.c.email == 'test2@bx.psu.edu' ) \ .first() assert regular_user2 is not None, 'Problem retrieving user with email "test2@bx.psu.edu" from the database' self.logout() self.login( email='test3@bx.psu.edu', username='regular-user3' ) global regular_user3 regular_user3 = sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email=='test3@bx.psu.edu' ) \ + .filter( galaxy.model.User.table.c.email == 'test3@bx.psu.edu' ) \ .first() assert regular_user3 is not None, 'Problem retrieving user with email "test3@bx.psu.edu" from the database' self.logout() self.login( email='test@bx.psu.edu', username='admin-user' ) global admin_user admin_user = sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ) \ + .filter( galaxy.model.User.table.c.email == 'test@bx.psu.edu' ) \ .one() assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' # Get the admin_user private role for later use @@ -66,8 +67,8 @@ if not admin_user_private_role: raise AssertionError( "Private role not found for user '%s'" % admin_user.email ) historyA = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert historyA is not None, "Problem retrieving historyA from database" @@ -76,8 +77,8 @@ self.logout() self.login( email=admin_user.email ) historyB = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert historyB is not None, "Problem retrieving historyB from database" @@ -87,23 +88,23 @@ """Testing deleting histories""" # Logged in as admin_user historyB = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert historyB is not None, "Problem retrieving historyB from database" self.delete_history( self.security.encode_id( historyB.id ) ) sa_session.refresh( historyB ) if not historyB.deleted: - raise AssertionError, "Problem deleting history id %d" % historyB.id + raise AssertionError( "Problem deleting history id %d" % historyB.id ) # Since we deleted the current history, make sure the history frame was refreshed self.check_history_for_string( 'Your history is empty.' ) # We'll now test deleting a list of histories # After deleting the current history, a new one should have been created global history1 history1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history1 is not None, "Problem retrieving history1 from database" @@ -111,8 +112,8 @@ self.new_history( name=urllib.quote( 'history2' ) ) global history2 history2 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history2 is not None, "Problem retrieving history2 from database" @@ -123,26 +124,26 @@ self.check_history_for_string( 'Your history is empty.' ) try: self.view_stored_active_histories( strings_displayed=[ history1.name ] ) - raise AssertionError, "History %s is displayed in the active history list after it was deleted" % history1.name + raise AssertionError( "History %s is displayed in the active history list after it was deleted" % history1.name ) except: pass self.view_stored_deleted_histories( strings_displayed=[ history1.name ] ) try: self.view_stored_active_histories( strings_displayed=[ history2.name ] ) - raise AssertionError, "History %s is displayed in the active history list after it was deleted" % history2.name + raise AssertionError( "History %s is displayed in the active history list after it was deleted" % history2.name ) except: pass self.view_stored_deleted_histories( strings_displayed=[ history2.name ] ) sa_session.refresh( history1 ) if not history1.deleted: - raise AssertionError, "Problem deleting history id %d" % history1.id + raise AssertionError( "Problem deleting history id %d" % history1.id ) if not history1.default_permissions: - raise AssertionError, "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history1.id + raise AssertionError( "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history1.id ) sa_session.refresh( history2 ) if not history2.deleted: - raise AssertionError, "Problem deleting history id %d" % history2.id + raise AssertionError( "Problem deleting history id %d" % history2.id ) if not history2.default_permissions: - raise AssertionError, "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history2.id + raise AssertionError( "Default permissions were incorrectly deleted from the db for history id %d when it was deleted" % history2.id ) # Current history is empty self.history_options( user=True ) @@ -151,12 +152,12 @@ # Logged in as admin_user global history3 history3 = sa_session.query( galaxy.model.History ) \ - .filter( galaxy.model.History.table.c.deleted==False ) \ + .filter( galaxy.model.History.table.c.deleted == False ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history3 is not None, "Problem retrieving history3 from database" if history3.deleted: - raise AssertionError, "History id %d deleted when it should not be" % latest_history.id + raise AssertionError( "History id %d deleted when it should not be" % latest_history.id ) self.rename_history( self.security.encode_id( history3.id ), history3.name, new_name=urllib.quote( 'history 3' ) ) sa_session.refresh( history3 ) @@ -192,7 +193,7 @@ # Make sure history3 is now accessible. sa_session.refresh( history3 ) if not history3.importable: - raise AssertionError, "History 3 is not marked as importable after make_accessible_via_link" + raise AssertionError( "History 3 is not marked as importable after make_accessible_via_link" ) # Try importing history3 #Importing your own history was enabled in 5248:dc9efb540f61. #self.import_history_via_url( self.security.encode_id( history3.id ), @@ -235,13 +236,13 @@ strings_displayed_after_submit=[ 'has been created.' ] ) global history3_clone1 history3_clone1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==regular_user1.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == regular_user1.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history3_clone1 is not None, "Problem retrieving history3_clone1 from database" # Check list of histories to make sure shared history3 was cloned - strings_displayed=[ "Copy of '%s' shared by '%s'" % ( history3.name, admin_user.email ) ] + strings_displayed = [ "Copy of '%s' shared by '%s'" % ( history3.name, admin_user.email ) ] self.view_stored_active_histories( strings_displayed=strings_displayed ) def test_035_clone_current_history( self ): @@ -255,8 +256,8 @@ self.upload_file( '2.bed', dbkey='hg18' ) hda_2_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history3.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '2.bed' ) ) .first() ) assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" self.delete_history_item( str( hda_2_bed.id ) ) @@ -264,8 +265,8 @@ self.upload_file( '3.bed', dbkey='hg18' ) hda_3_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='3.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history3.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '3.bed' ) ) .first() ) assert hda_3_bed is not None, "Problem retrieving hda_3_bed from database" self.delete_history_item( str( hda_3_bed.id ) ) @@ -276,8 +277,8 @@ strings_displayed_after_submit=['has been created.' ] ) global history3_clone2 history3_clone2 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history3_clone2 is not None, "Problem retrieving history3_clone2 from database" @@ -288,14 +289,14 @@ self.switch_history( id=self.security.encode_id( history3_clone2.id ), name=history3_clone2.name ) hda_2_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3_clone2.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history3_clone2.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '2.bed' ) ) .first() ) assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" hda_3_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history3_clone2.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='3.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history3_clone2.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '3.bed' ) ) .first() ) assert hda_3_bed is not None, "Problem retrieving hda_3_bed from database" @@ -315,15 +316,15 @@ global history3_clone3 history3_clone3 = ( sa_session.query( galaxy.model.History ) - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) .order_by( desc( galaxy.model.History.table.c.create_time ) ) .first() ) assert history3_clone3 is not None, "Problem retrieving history3_clone3 from database" # Check list of histories to make sure shared history3 was cloned - self.view_stored_active_histories( strings_displayed = ["Copy of '%s'" % history3.name ] ) + self.view_stored_active_histories( strings_displayed=[ "Copy of '%s'" % history3.name ] ) # Switch to the cloned history to make sure ONLY activatable datasets were cloned self.switch_history( id=self.security.encode_id( history3_clone3.id ) ) @@ -332,7 +333,7 @@ try: self.check_history_for_exact_string( '"deleted": true', show_deleted=True ) #self.check_history_for_string( 'This dataset has been deleted.', show_deleted=True ) - raise AssertionError, "Deleted datasets incorrectly included in cloned history history3_clone3" + raise AssertionError( "Deleted datasets incorrectly included in cloned history history3_clone3" ) except: pass @@ -342,14 +343,14 @@ self.new_history() global history4 history4 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history4 is not None, "Problem retrieving history4 from database" self.rename_history( self.security.encode_id( history4.id ), history4.name, new_name=urllib.quote( 'history 4' ) ) sa_session.refresh( history4 ) - # Galaxy's new history sharing code does not yet support sharing multiple histories; when support for sharing multiple histories is added, + # Galaxy's new history sharing code does not yet support sharing multiple histories; when support for sharing multiple histories is added, # this test will be uncommented and updated. """ self.upload_file( '2.bed', dbkey='hg18' ) @@ -377,8 +378,8 @@ self.new_history() global history5 history5 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history5 is not None, "Problem retrieving history5 from database" @@ -405,14 +406,14 @@ history5_dataset1 = hda.dataset break assert history5_dataset1 is not None, "Problem retrieving history5_dataset1 from the database" - # The permissions on the dataset should be restricted from sharing with anyone due to the + # The permissions on the dataset should be restricted from sharing with anyone due to the # inherited history permissions dataset_permissions = [ a.action for a in history5_dataset1.actions ] dataset_permissions.sort() if dataset_permissions != history5_default_permissions: err_msg = "Dataset permissions for history5_dataset1 (%s) were not correctly inherited from history permissions (%s)" \ % ( str( dataset_permissions ), str( history5_default_permissions ) ) - raise AssertionError, err_msg + raise AssertionError( err_msg ) # Make sure when we logout and login, the history default permissions are preserved self.logout() self.login( email=admin_user.email ) @@ -420,7 +421,7 @@ current_history_permissions = [ dhp.action for dhp in history5.default_permissions ] current_history_permissions.sort() if current_history_permissions != history5_default_permissions: - raise AssertionError, "With logout and login, the history default permissions are not preserved" + raise AssertionError( "With logout and login, the history default permissions are not preserved" ) def test_050_sharing_restricted_history_by_making_datasets_public( self ): """Testing sharing a restricted history by making the datasets public""" @@ -440,8 +441,8 @@ strings_displayed_after_submit=[ 'has been created.' ] ) global history5_clone1 history5_clone1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==regular_user1.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == regular_user1.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history5_clone1 is not None, "Problem retrieving history5_clone1 from database" @@ -462,21 +463,21 @@ 'The following datasets can be shared with %s by updating their permissions' % regular_user2.email ] self.share_current_history( regular_user2.email, strings_displayed_after_submit=strings_displayed_after_submit, - action='private' ) + action='private' ) # We should now have a new sharing role global sharing_role role_name = 'Sharing role for: %s, %s' % ( admin_user.email, regular_user2.email ) - sharing_role = sa_session.query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name==role_name ).first() + sharing_role = sa_session.query( galaxy.model.Role ).filter( galaxy.model.Role.table.c.name == role_name ).first() if not sharing_role: # May have created a sharing role in a previous functional test suite from the opposite direction. role_name = 'Sharing role for: %s, %s' % ( regular_user2.email, admin_user.email ) sharing_role = sa_session.query( galaxy.model.Role ) \ - .filter( and_( galaxy.model.Role.table.c.type==role_type, - galaxy.model.Role.table.c.name==role_name ) ) \ + .filter( and_( galaxy.model.Role.table.c.type == role_type, + galaxy.model.Role.table.c.name == role_name ) ) \ .first() if not sharing_role: raise AssertionError( "Privately sharing a dataset did not properly create a sharing role" ) - # The DATASET_ACCESS permission on 2.bed was originally associated with admin_user's private role. + # The DATASET_ACCESS permission on 2.bed was originally associated with admin_user's private role. # Since we created a new sharing role for 2.bed, the original permission should have been eliminated, # replaced with the sharing role. history5_dataset2 = None @@ -498,8 +499,8 @@ strings_displayed_after_submit=[ 'has been created.' ] ) global history5_clone2 history5_clone2 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==regular_user2.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == regular_user2.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history5_clone2 is not None, "Problem retrieving history5_clone2 from database" @@ -512,13 +513,13 @@ self.check_history_for_string( '2.bed' ) # Get both new hdas from the db that were created for the shared history hda_1_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone2.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) \ + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history5_clone2.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '1.bed' ) ) \ .first() assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" hda_2_bed = sa_session.query( galaxy.model.HistoryDatasetAssociation ) \ - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone2.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) \ + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history5_clone2.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '2.bed' ) ) \ .first() assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" # Make sure 1.bed is accessible since it is public @@ -549,7 +550,7 @@ self.visit_page( "root/history_options" ) try: self.check_page_for_string( 'List</a> histories shared with you by others' ) - raise AssertionError, "history5 still shared with regular_user2 after unsharing it with that user." + raise AssertionError( "history5 still shared with regular_user2 after unsharing it with that user." ) except: pass @@ -558,7 +559,8 @@ email = '%s,%s' % ( regular_user2.email, regular_user3.email ) strings_displayed_after_submit = [ 'The following datasets can be shared with %s with no changes' % email, - 'The following datasets can be shared with %s by updating their permissions' % email ] + 'The following datasets can be shared with %s by updating their permissions' % email + ] # history5 will be shared with regular_user1, regular_user2 and regular_user3 self.share_current_history( email, @@ -576,8 +578,8 @@ global history5_clone3 history5_clone3 = ( sa_session.query( galaxy.model.History ) - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==regular_user2.id ) ) + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == regular_user2.id ) ) .order_by( desc( galaxy.model.History.table.c.create_time ) ) .first() ) assert history5_clone3 is not None, "Problem retrieving history5_clone3 from database" @@ -592,14 +594,14 @@ # Get both new hdas from the db that were created for the shared history hda_1_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone3.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history5_clone3.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '1.bed' ) ) .first() ) assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" hda_2_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone3.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history5_clone3.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '2.bed' ) ) .first() ) assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" @@ -622,8 +624,8 @@ global history5_clone4 history5_clone4 = ( sa_session.query( galaxy.model.History ) - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==regular_user3.id ) ) + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == regular_user3.id ) ) .order_by( desc( galaxy.model.History.table.c.create_time ) ) .first() ) assert history5_clone4 is not None, "Problem retrieving history5_clone4 from database" @@ -638,14 +640,14 @@ # Get both new hdas from the db that were created for the shared history hda_1_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone4.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='1.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history5_clone4.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '1.bed' ) ) .first() ) assert hda_1_bed is not None, "Problem retrieving hda_1_bed from database" hda_2_bed = ( sa_session.query( galaxy.model.HistoryDatasetAssociation ) - .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id==history5_clone4.id, - galaxy.model.HistoryDatasetAssociation.table.c.name=='2.bed' ) ) + .filter( and_( galaxy.model.HistoryDatasetAssociation.table.c.history_id == history5_clone4.id, + galaxy.model.HistoryDatasetAssociation.table.c.name == '2.bed' ) ) .first() ) assert hda_2_bed is not None, "Problem retrieving hda_2_bed from database" # Make sure 1.bed is accessible since it is public @@ -653,7 +655,7 @@ # Make sure 2.bed is not accessible since it is protected try: self.display_history_item( str( hda_2_bed.id ), strings_displayed=[ 'chr1' ] ) - raise AssertionError, "History item 2.bed is accessible by user %s when is should not be" % regular_user3.email + raise AssertionError( "History item 2.bed is accessible by user %s when is should not be" % regular_user3.email ) except: pass @@ -696,7 +698,7 @@ self.visit_page( "root/history_options" ) try: self.check_page_for_string( 'List</a> histories shared with you by others' ) - raise AssertionError, "history5 still shared with regular_user2 after unshaing it with that user." + raise AssertionError( "history5 still shared with regular_user2 after unshaing it with that user." ) except: pass @@ -705,7 +707,7 @@ self.visit_page( "root/history_options" ) try: self.check_page_for_string( 'List</a> histories shared with you by others' ) - raise AssertionError, "history5 still shared with regular_user3 after unshaing it with that user." + raise AssertionError( "history5 still shared with regular_user3 after unshaing it with that user." ) except: pass self.logout() @@ -725,8 +727,8 @@ self.new_history( name=urllib.quote( 'show hide deleted datasets' ) ) latest_history = ( sa_session.query( galaxy.model.History ) - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) .order_by( desc( galaxy.model.History.table.c.create_time ) ) .first() ) assert latest_history is not None, "Problem retrieving latest_history from database" @@ -769,8 +771,8 @@ # Deleting the current history in the last method created a new history latest_history = ( sa_session.query( galaxy.model.History ) - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) .order_by( desc( galaxy.model.History.table.c.create_time ) ) .first() ) assert latest_history is not None, "Problem retrieving latest_history from database" @@ -786,14 +788,14 @@ self.visit_url( "%s/history/?show_deleted=False" % self.url ) self.check_page_for_string( '1.bed' ) self.check_page_for_string( 'hg15' ) - self.assertEqual ( len( self.get_history_as_data_list() ), 1 ) + self.assertEqual( len( self.get_history_as_data_list() ), 1 ) # Delete the history item self.delete_history_item( str( latest_hda.id ), strings_displayed=[ "Your history is empty" ] ) - self.assertEqual ( len( self.get_history_as_data_list() ), 0 ) + self.assertEqual( len( self.get_history_as_data_list() ), 0 ) # Try deleting an invalid hid try: self.delete_history_item( 'XXX' ) - raise AssertionError, "Inproperly able to delete hda_id 'XXX' which is not an integer" + raise AssertionError( "Inproperly able to delete hda_id 'XXX' which is not an integer" ) except: pass # Undelete the history item @@ -809,8 +811,8 @@ # logged in as admin_user self.new_history( name=urllib.quote( 'copy history items' ) ) history6 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history6 is not None, "Problem retrieving history6 from database" @@ -820,7 +822,7 @@ .first() assert hda1 is not None, "Problem retrieving hda1 from database" # We'll just test copying 1 hda - source_dataset_ids=self.security.encode_id( hda1.id ) + source_dataset_ids = self.security.encode_id( hda1.id ) # The valid list of target histories is only the user's active histories all_target_history_ids = [ self.security.encode_id( hda.id ) for hda in admin_user.active_histories ] # Since history1 and history2 have been deleted, they should not be displayed in the list of target histories @@ -834,12 +836,12 @@ deleted_history_ids=deleted_history_ids ) sa_session.refresh( history6 ) if len( history6.datasets ) != 2: - raise AssertionError, "Copying hda1 to the current history failed, history 6 has %d datasets, but should have 2" % len( history6.datasets ) + raise AssertionError( "Copying hda1 to the current history failed, history 6 has %d datasets, but should have 2" % len( history6.datasets ) ) # Test copying 1 hda to another history self.new_history( name=urllib.quote( 'copy history items - 2' ) ) history7 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() assert history7 is not None, "Problem retrieving history7 from database" @@ -848,7 +850,7 @@ target_history_id = self.security.encode_id( history7.id ) all_target_history_ids = [ self.security.encode_id( hda.id ) for hda in admin_user.active_histories ] # Test copying to the a history that is not the current history - target_history_ids=[ self.security.encode_id( history7.id ) ] + self.security.encode_id( history7.id ) self.copy_history_item( source_dataset_id=source_dataset_ids, target_history_id=target_history_id, all_target_history_ids=all_target_history_ids, diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_library_features.py --- a/test/functional/test_library_features.py +++ b/test/functional/test_library_features.py @@ -1,6 +1,8 @@ from base.twilltestcase import * from base.test_db_util import * + +# TODO: Functional tests start failing at 070, fix or eliminate rest of tests. class TestLibraryFeatures( TwillTestCase ): def test_000_initiate_users( self ): """Ensuring all required user accounts exist""" diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_library_security.py --- a/test/functional/test_library_security.py +++ b/test/functional/test_library_security.py @@ -1,6 +1,8 @@ from base.twilltestcase import * from base.test_db_util import * + +# TODO: Functional tests start failing at 050, fix or eliminate rest of tests. class TestLibrarySecurity( TwillTestCase ): def test_000_initiate_users( self ): """Ensuring all required user accounts exist""" diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_library_templates.py --- a/test/functional/test_library_templates.py +++ b/test/functional/test_library_templates.py @@ -1,7 +1,35 @@ -from base.twilltestcase import * -from base.test_db_util import * +from base.twilltestcase import TwillTestCase +from base.test_db_util import ( + get_user, + get_private_role, + get_form, + get_library, + get_folder, + get_user_address, + get_latest_ldda_by_name, + get_latest_hda, + mark_obj_deleted, + refresh +) +import galaxy.model + +AddressField_form = None +CheckboxField_form = None +SelectField_form = None +TextArea_form = None +TextField_form = None +WorkflowField_form = None +address_field_name = checkbox_field_name = select_field_name = None +workflow_field_name = textfield_name = textarea_name = None +user_address1 = user_address2 = None +ldda1 = library1 = library2 = library3 = library4 = library5 = library6 = None +folder1 = folder2 = folder3 = folder4 = folder5 = folder6 = None +admin_user = None +regular_user1 = regular_user2 = regular_user3 = None + class TestLibraryFeatures( TwillTestCase ): + def test_000_initiate_users( self ): """Ensuring all required user accounts exist""" self.logout() @@ -32,6 +60,7 @@ assert admin_user is not None, 'Problem retrieving user with email "test@bx.psu.edu" from the database' global admin_user_private_role admin_user_private_role = get_private_role( admin_user ) + def test_005_create_library_templates( self ): """Testing creating several LibraryInformationTemplate form definitions""" # Logged in as admin_user @@ -109,6 +138,7 @@ library5 = get_library( 'library5', 'library5 description', 'library5 synopsis' ) global library6 library6 = get_library( 'library6', 'library6 description', 'library6 synopsis' ) + def test_015_add_template_to_library1( self ): """Testing add an inheritable template containing an AddressField to library1""" # Logged in as admin_user @@ -119,6 +149,7 @@ form_id=self.security.encode_id( AddressField_form.id ), form_name=AddressField_form.name, library_id=self.security.encode_id( library1.id ) ) + def test_020_add_folder_to_library1( self ): """Testing adding a folder to library1""" # Logged in as admin_user @@ -133,6 +164,7 @@ description=description ) global folder1 folder1 = get_folder( folder.id, name, description ) + def test_025_check_library1( self ): """Checking library1 and its root folder""" # Logged in as admin_user @@ -146,6 +178,7 @@ template_refresh_field_name=address_field_name, strings_displayed=[ AddressField_form.name, 'This is an inherited template and is not required to be used with this folder' ] ) + def test_030_add_dataset_to_folder1( self ): """Testing adding a ldda1 to folder1, and adding a new UserAddress on the upload form.""" # Logged in as admin_user @@ -189,6 +222,7 @@ self.security.encode_id( ldda1.id ), ldda1.name, strings_displayed=[ 'Dick' ] ) + def test_035_edit_contents_of_ldda1_tempplate( self ): """Testing editing the contents of ldda1 AddressField template by adding a new user_address""" short_desc = 'Home' @@ -212,6 +246,7 @@ global user_address2 user_address2 = get_user_address( admin_user, short_desc ) assert user_address2 is not None, 'Problem retrieving user_address2 from the database' + def test_040_edit_contents_of_folder1_template( self ): """Testing editing the contents of folder1 AddressField template""" # Make sure the template and contents were inherited to folder1 @@ -223,6 +258,7 @@ strings_displayed=[ AddressField_form.name, 'This is an inherited template and is not required to be used with this folder' ], strings_displayed_after_submit=[ 'Richard' ] ) + def test_045_add_dataset_to_folder1( self ): """Testing adding another ldda to folder1""" # The upload form should now inherit user_address2 on the upload form @@ -245,6 +281,7 @@ ldda_name=ldda1.name, template_refresh_field_name=address_field_name, strings_displayed=[ user_address2.desc ] ) + def test_050_add_template_to_library2( self ): """ Testing add an inheritable template containing an CheckboxField to library2""" # Add a template containing an CheckboxField to library1 @@ -257,7 +294,8 @@ # Check the CheckboxField to make sure the template contents are inherited self.library_info( 'library_admin', self.security.encode_id( library2.id ), - template_fields = [ ( checkbox_field_name, '1' ) ] ) + template_fields=[ ( checkbox_field_name, '1' ) ] ) + def test_055_add_folder2_to_library2( self ): """Testing adding a folder to library2""" # Logged in as admin_user @@ -272,12 +310,14 @@ description=description ) global folder2 folder2 = get_folder( folder.id, name, description ) + def test_060_check_library2( self ): """Checking library2 and its root folder""" # Logged in as admin_user self.browse_library( cntrller='library_admin', library_id=self.security.encode_id( library2.id ), strings_displayed=[ folder2.name, folder2.description ] ) + def test_065_save_folder2_inherited_template( self ): """Saving the inherited template for folder2""" # Logged in as admin_user @@ -288,6 +328,7 @@ template_fields=[ ( checkbox_field_name, '1' ) ], strings_displayed=[ CheckboxField_form.name, 'This is an inherited template and is not required to be used with this folder' ] ) + def test_070_add_ldda_to_folder2( self ): """ Testing adding a new library dataset to library2's folder, making sure the CheckboxField is @@ -316,6 +357,7 @@ self.security.encode_id( ldda.id ), ldda.name, strings_displayed=[ 'CheckboxField', 'checked' ] ) + def test_080_add_template_to_library3( self ): """ Testing add an inheritable template containing an SelectField to library3""" # Logged in as admin_user @@ -330,6 +372,7 @@ self.library_info( 'library_admin', self.security.encode_id( library3.id ), template_fields=[ ( select_field_name, 'Option1' ) ] ) + def test_085_add_folder3_to_library3( self ): """Testing adding a folder to library3""" # Logged in as admin_user @@ -344,12 +387,14 @@ description=description ) global folder3 folder3 = get_folder( folder.id, name, description ) + def test_090_check_library3( self ): """Checking library3 and its root folder""" # Logged in as admin_user self.browse_library( cntrller='library_admin', library_id=self.security.encode_id( library3.id ), strings_displayed=[ folder3.name, folder3.description ] ) + def test_095_save_folder3_inherited_template( self ): """Saving the inherited template for folder3""" # Logged in as admin_user @@ -361,6 +406,7 @@ strings_displayed=[ SelectField_form.name, 'This is an inherited template and is not required to be used with this folder', 'Option1' ] ) + def test_100_add_ldda_to_folder3( self ): """ Testing adding a new library dataset to library3's folder, making sure the SelectField setting is correct on the upload form. @@ -411,6 +457,7 @@ self.security.encode_id( ldda.id ), ldda.name, strings_displayed=[ 'SelectField', 'Option1' ] ) + def test_105_add_template_to_library4( self ): """ Testing add an inheritable template containing an TextArea to library4""" # Logged in as admin_user @@ -425,6 +472,7 @@ self.library_info( 'library_admin', self.security.encode_id( library4.id ), template_fields=[ ( textarea_name, 'This text should be inherited' ) ] ) + def test_110_add_folder4_to_library4( self ): """Testing adding a folder to library4""" # Logged in as admin_user @@ -439,6 +487,7 @@ description=description ) global folder4 folder4 = get_folder( folder.id, name, description ) + def test_115_save_folder4_inherited_template( self ): """Saving the inherited template for folder4""" # Logged in as admin_user @@ -450,6 +499,7 @@ strings_displayed=[ TextArea_form.name, 'This is an inherited template and is not required to be used with this folder', 'This text should be inherited' ] ) + def test_120_add_ldda_to_folder4( self ): """ Testing adding a new library dataset to library4's folder, making sure the TextArea setting is correct on the upload form. @@ -477,6 +527,7 @@ self.security.encode_id( ldda.id ), ldda.name, strings_displayed=[ 'TextArea', 'This text should be inherited' ] ) + def test_125_add_template_to_library5( self ): """ Testing add an inheritable template containing an TextField to library5""" # Add an inheritable template to library5 @@ -490,6 +541,7 @@ self.library_info( 'library_admin', self.security.encode_id( library5.id ), template_fields=[ ( textfield_name, 'This text should be inherited' ) ] ) + def test_130_add_folder5_to_library5( self ): """Testing adding a folder to library5""" # Logged in as admin_user @@ -504,6 +556,7 @@ description=description ) global folder5 folder5 = get_folder( folder.id, name, description ) + def test_135_save_folder5_inherited_template( self ): """Saving the inherited template for folder5""" # Logged in as admin_user @@ -515,6 +568,7 @@ strings_displayed=[ TextField_form.name, 'This is an inherited template and is not required to be used with this folder', 'This text should be inherited' ] ) + def test_140_add_ldda_to_folder5( self ): """ Testing adding a new library dataset to library5's folder, making sure the TextField setting is correct on the upload form. @@ -542,6 +596,7 @@ self.security.encode_id( ldda.id ), ldda.name, strings_displayed=[ 'TextField', 'This text should be inherited' ] ) + def test_145_edit_library5_template_layout( self ): """Test editing the layout of library5's template""" # Currently there is only a TextField, and we'll add a TextArea. @@ -553,6 +608,7 @@ field_label_1=TextArea_form.name, field_helptext_1='%s help' % TextArea_form.name, field_default_1='%s default' % TextArea_form.name ) + def test_150_add_ldda_to_library5( self ): """ Testing adding a new library dataset to library5's folder, making sure the TextField and new TextArea settings are correct on the upload form. @@ -584,6 +640,7 @@ strings_displayed=[ 'TextField', 'This text should be inherited', 'TextArea' ] ) + def test_155_add_template_to_library6( self ): """ Testing add an inheritable template containing an WorkflowField to library6""" # Add an inheritable template to library6 @@ -594,6 +651,7 @@ form_id=self.security.encode_id( WorkflowField_form.id ), form_name=WorkflowField_form.name, library_id=self.security.encode_id( library6.id ) ) + def test_160_add_folder6_to_library6( self ): """Testing adding a folder to library6""" # Logged in as admin_user @@ -608,6 +666,7 @@ description=description ) global folder6 folder6 = get_folder( folder.id, name, description ) + def test_165_save_folder6_inherited_template( self ): """Saving the inherited template for folder6""" # Logged in as admin_user @@ -619,6 +678,7 @@ strings_displayed=[ WorkflowField_form.name, 'This is an inherited template and is not required to be used with this folder', 'none' ] ) + def test_170_add_ldda_to_folder6( self ): """ Testing adding a new library dataset to library6's folder, making sure the WorkflowField setting is correct on the upload form. @@ -646,6 +706,7 @@ self.security.encode_id( ldda.id ), ldda.name, strings_displayed=[ 'WorkflowField', 'none' ] ) + def test_999_reset_data_for_later_test_runs( self ): """Reseting data to enable later test runs to pass""" # Logged in as admin_user diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_metadata_editing.py --- a/test/functional/test_metadata_editing.py +++ b/test/functional/test_metadata_editing.py @@ -1,21 +1,26 @@ +from base.twilltestcase import TwillTestCase +from functional import database_contexts import galaxy.model -from galaxy.model.orm import * -from base.test_db_util import sa_session -from base.twilltestcase import TwillTestCase +from galaxy.model.orm import ( + and_, + desc, +) + class TestMetadataEdit( TwillTestCase ): def test_00_metadata_edit( self ): """test_metadata_edit: Testing metadata editing""" + sa_session = database_contexts.galaxy_context self.logout() self.login( email='test@bx.psu.edu', username='admin-user' ) admin_user = sa_session.query( galaxy.model.User ) \ - .filter( galaxy.model.User.table.c.email=='test@bx.psu.edu' ) \ + .filter( galaxy.model.User.table.c.email == 'test@bx.psu.edu' ) \ .one() self.new_history( name='Test Metadata Edit' ) history1 = sa_session.query( galaxy.model.History ) \ - .filter( and_( galaxy.model.History.table.c.deleted==False, - galaxy.model.History.table.c.user_id==admin_user.id ) ) \ + .filter( and_( galaxy.model.History.table.c.deleted == False, + galaxy.model.History.table.c.user_id == admin_user.id ) ) \ .order_by( desc( galaxy.model.History.table.c.create_time ) ) \ .first() self.upload_file( '1.bed' ) diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_sample_tracking.py --- a/test/functional/test_sample_tracking.py +++ b/test/functional/test_sample_tracking.py @@ -3,6 +3,8 @@ from base.twilltestcase import * from base.test_db_util import * + +# TODO: Functional tests start failing at 025, fix or eliminate rest of tests. class TestFormsAndSampleTracking( TwillTestCase ): # ====== Setup Users, Groups & Roles required for this test suite ========= def test_000_initiate_users( self ): diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_tags.py --- a/test/functional/test_tags.py +++ b/test/functional/test_tags.py @@ -1,7 +1,17 @@ -from base.twilltestcase import * -from base.test_db_util import * +from base.twilltestcase import TwillTestCase +from base.test_db_util import ( + get_user, + get_private_role, + get_latest_history_for_user, + get_latest_hda, +) + +history1 = None +admin_user = None + class TestTags( TwillTestCase ): + # TODO: Add more functional test coverage for tags def test_000_initiate_users( self ): """Ensuring all required user accounts exist""" diff -r 494b51bda9c1a319672e92cce4ec17386cd65390 -r c7986c31fd7447f7d1e2a85bdfa43be6bc4b3b8a test/functional/test_user_info.py --- a/test/functional/test_user_info.py +++ b/test/functional/test_user_info.py @@ -1,6 +1,8 @@ from base.twilltestcase import * from base.test_db_util import * + +# TODO: Functional tests start failing at 020, fix or eliminate rest of tests. class TestUserInfo( TwillTestCase ): def test_000_initiate_users( self ): """Ensuring all required user accounts exist""" Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.