1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/55e0fe991c42/ Changeset: 55e0fe991c42 User: jmchilton Date: 2013-11-15 00:49:58 Summary: PEP-8 fixes for test/base/twilltestcase.py. Still hundreds of lines to fix. Affected #: 1 file diff -r a231103b3d6e14578a226f9f6d5a3b68c0896854 -r 55e0fe991c42a3d1551ae9965d2186b6870aaba9 test/base/twilltestcase.py --- a/test/base/twilltestcase.py +++ b/test/base/twilltestcase.py @@ -1,7 +1,20 @@ import pkg_resources pkg_resources.require( "twill==0.9" ) -import StringIO, os, filecmp, time, unittest, urllib, logging, difflib, tarfile, zipfile, tempfile, re, shutil, subprocess +import StringIO +import os +import filecmp +import time +import unittest +import urllib +import logging +import difflib +import tarfile +import zipfile +import tempfile +import re +import shutil +import subprocess import pprint import twill @@ -26,6 +39,7 @@ logging.getLogger( "ClientCookie.cookies" ).setLevel( logging.WARNING ) log = logging.getLogger( __name__ ) + class TwillTestCase( unittest.TestCase ): def setUp( self ): @@ -46,10 +60,10 @@ self.shed_tools_dict = {} self.keepOutdir = os.environ.get( 'GALAXY_TEST_SAVE', '' ) if self.keepOutdir > '': - try: - os.makedirs(self.keepOutdir) - except: - pass + try: + os.makedirs(self.keepOutdir) + except: + pass self.home() # Functions associated with files @@ -83,7 +97,7 @@ diff_lines = get_lines_diff( diff ) if diff_lines > allowed_diff_count: diff_slice = diff[0:40] - #FIXME: This pdf stuff is rather special cased and has not been updated to consider lines_diff + #FIXME: This pdf stuff is rather special cased and has not been updated to consider lines_diff #due to unknown desired behavior when used in conjunction with a non-zero lines_diff #PDF forgiveness can probably be handled better by not special casing by __extension__ here #and instead using lines_diff or a regular expression matching @@ -109,13 +123,13 @@ break if not valid_diff: invalid_diff_lines += 1 - log.info('## files diff on %s and %s lines_diff=%d, found diff = %d, found pdf invalid diff = %d' % (file1,file2,allowed_diff_count,diff_lines,invalid_diff_lines)) + log.info('## files diff on %s and %s lines_diff=%d, found diff = %d, found pdf invalid diff = %d' % (file1, file2, allowed_diff_count, diff_lines, invalid_diff_lines)) if invalid_diff_lines > allowed_diff_count: # Print out diff_slice so we can see what failed print "###### diff_slice ######" raise AssertionError( "".join( diff_slice ) ) else: - log.info('## files diff on %s and %s lines_diff=%d, found diff = %d' % (file1,file2,allowed_diff_count,diff_lines)) + log.info('## files diff on %s and %s lines_diff=%d, found diff = %d' % (file1, file2, allowed_diff_count, diff_lines)) for line in diff_slice: for char in line: if ord( char ) > 128: @@ -124,7 +138,7 @@ def files_re_match( self, file1, file2, attributes=None ): """Checks the contents of 2 files for differences using re.match""" - local_file = open( file1, 'U' ).readlines() #regex file + local_file = open( file1, 'U' ).readlines() # regex file history_data = open( file2, 'U' ).readlines() assert len( local_file ) == len( history_data ), 'Data File and Regular Expression File contain a different number of lines (%s != %s)\nHistory Data (first 40 lines):\n%s' % ( len( local_file ), len( history_data ), ''.join( history_data[:40] ) ) if attributes is None: @@ -139,11 +153,11 @@ line_diff_count += 1 diffs.append( 'Regular Expression: %s\nData file : %s' % ( local_file[i].rstrip( '\r\n' ), history_data[i].rstrip( '\r\n' ) ) ) if line_diff_count > lines_diff: - raise AssertionError, "Regular expression did not match data file (allowed variants=%i):\n%s" % ( lines_diff, "".join( diffs ) ) + raise AssertionError( "Regular expression did not match data file (allowed variants=%i):\n%s" % ( lines_diff, "".join( diffs ) ) ) def files_re_match_multiline( self, file1, file2, attributes=None ): """Checks the contents of 2 files for differences using re.match in multiline mode""" - local_file = open( file1, 'U' ).read() #regex file + local_file = open( file1, 'U' ).read() # regex file if attributes is None: attributes = {} if attributes.get( 'sort', False ): @@ -157,7 +171,7 @@ def files_contains( self, file1, file2, attributes=None ): """Checks the contents of file2 for substrings found in file1, on a per-line basis""" - local_file = open( file1, 'U' ).readlines() #regex file + local_file = open( file1, 'U' ).readlines() # regex file #TODO: allow forcing ordering of contains history_data = open( file2, 'U' ).read() lines_diff = int( attributes.get( 'lines_diff', 0 ) ) @@ -167,7 +181,7 @@ if contains not in history_data: line_diff_count += 1 if line_diff_count > lines_diff: - raise AssertionError, "Failed to find '%s' in history data. (lines_diff=%i):\n" % ( contains, lines_diff ) + raise AssertionError( "Failed to find '%s' in history data. (lines_diff=%i):\n" % ( contains, lines_diff ) ) def get_filename( self, filename, shed_tool_id=None ): if shed_tool_id and self.shed_tools_dict: @@ -189,8 +203,8 @@ so the tool-data directory of test data files is contained in the installed tool shed repository. """ self.visit_url( "%s/tool_runner?tool_id=upload1" % self.url ) - try: - self.refresh_form( "file_type", ftype ) #Refresh, to support composite files + try: + self.refresh_form( "file_type", ftype ) # Refresh, to support composite files tc.fv( "tool_form", "dbkey", dbkey ) if metadata: for elem in metadata: @@ -214,9 +228,9 @@ hids = self.get_hids_in_history() for hid in hids: try: - valid_hid = int( hid ) + int( hid ) except: - raise AssertionError, "Invalid hid (%s) created when uploading file %s" % ( hid, filename ) + raise AssertionError( "Invalid hid (%s) created when uploading file %s" % ( hid, filename ) ) # Wait for upload processing to finish (TODO: this should be done in each test case instead) self.wait() @@ -231,21 +245,21 @@ self.home() except Exception, e: errmsg = "Problem executing upload utility using url_paste: %s" % str( e ) - raise AssertionError( e ) + raise AssertionError( errmsg ) # Make sure every history item has a valid hid hids = self.get_hids_in_history() for hid in hids: try: - valid_hid = int( hid ) + int( hid ) except: - raise AssertionError, "Invalid hid (%s) created when pasting %s" % ( hid, url_paste ) + raise AssertionError( "Invalid hid (%s) created when pasting %s" % ( hid, url_paste ) ) # Wait for upload processing to finish (TODO: this should be done in each test case instead) self.wait() def json_from_url( self, url ): self.visit_url( url ) return from_json_string( self.last_page() ) - + # Functions associated with histories def get_history_from_api( self, encoded_history_id=None ): if encoded_history_id is None: @@ -255,7 +269,7 @@ def get_latest_history( self ): return self.json_from_url( '/api/histories' )[ 0 ] - + def find_hda_by_dataset_name( self, name, history=None ): if history is None: history = self.get_history_from_api() @@ -269,7 +283,7 @@ self.visit_page( "history" ) page = self.last_page() if page.find( 'error' ) > -1: - raise AssertionError('Errors in the history for user %s' % self.user ) + raise AssertionError( 'Errors in the history for user %s' % self.user ) def check_history_for_string( self, patt, show_deleted=False ): """Breaks patt on whitespace and searches for each element seperately in the history""" @@ -320,7 +334,7 @@ # twill stores the regex match in a special stack variable match = twill.namespaces.get_twill_glocals()[1][ '__match__' ] json_data = from_json_string( match ) - assert check_fn( json_data ), 'failed check_fn: %s' %( check_fn.func_name ) + assert check_fn( json_data ), 'failed check_fn: %s' % ( check_fn.func_name ) except Exception, exc: log.error( exc, exc_info=True ) @@ -374,6 +388,7 @@ num_deleted = len( id.split( ',' ) ) self.home() self.visit_page( "history/list?operation=delete&id=%s" % ( id ) ) + check_str = 'Deleted %d %s' % ( num_deleted, iff( num_deleted != 1, "histories", "history" ) ) self.check_page_for_string( check_str ) self.home() @@ -423,7 +438,7 @@ if active_datasets: self.check_page_for_string( 'Create</a> a new empty history' ) self.check_page_for_string( 'Construct workflow</a> from current history' ) - self.check_page_for_string( 'Copy</a> current history' ) + self.check_page_for_string( 'Copy</a> current history' ) self.check_page_for_string( 'Share</a> current history' ) self.check_page_for_string( 'Change default permissions</a> for current history' ) if histories_shared_by_others: @@ -447,7 +462,7 @@ def rename_history( self, id, old_name, new_name ): """Rename an existing history""" self.home() - self.visit_page( "history/rename?id=%s&name=%s" %( id, new_name ) ) + self.visit_page( "history/rename?id=%s&name=%s" % ( id, new_name ) ) check_str = 'History: %s renamed to: %s' % ( old_name, urllib.unquote( new_name ) ) self.check_page_for_string( check_str ) self.home() @@ -476,6 +491,7 @@ for check_str in action_strings_displayed: self.check_page_for_string( check_str ) tc.fv( 'share_restricted', 'action', action ) + tc.submit( "share_restricted_button" ) for check_str in action_strings_displayed_after_submit: self.check_page_for_string( check_str ) @@ -495,6 +511,7 @@ # If we have an action, then we are sharing datasets with users that do not have access permissions on them tc.fv( 'share_restricted', 'action', action ) tc.submit( "share_restricted_button" ) + for check_str in action_strings_displayed: self.check_page_for_string( check_str ) self.home() @@ -525,6 +542,7 @@ for check_str in strings_displayed: self.check_page_for_string( check_str ) self.home() + def view_stored_deleted_histories( self, strings_displayed=[] ): self.home() self.visit_page( "history/list?f-deleted=True" ) @@ -534,12 +552,14 @@ for check_str in strings_displayed: self.check_page_for_string( check_str ) self.home() + def view_shared_histories( self, strings_displayed=[] ): self.home() self.visit_page( "history/list_shared" ) for check_str in strings_displayed: self.check_page_for_string( check_str ) self.home() + def copy_history( self, history_id, copy_choice, strings_displayed=[], strings_displayed_after_submit=[] ): self.home() self.visit_page( "history/copy?id=%s" % history_id ) @@ -550,6 +570,7 @@ for check_str in strings_displayed_after_submit: self.check_page_for_string( check_str ) self.home() + def make_accessible_via_link( self, history_id, strings_displayed=[], strings_displayed_after_submit=[] ): self.home() self.visit_page( "history/list?operation=share+or+publish&id=%s" % history_id ) @@ -562,6 +583,7 @@ for check_str in strings_displayed_after_submit: self.check_page_for_string( check_str ) self.home() + def disable_access_via_link( self, history_id, strings_displayed=[], strings_displayed_after_submit=[] ): self.home() self.visit_page( "history/list?operation=share+or+publish&id=%s" % history_id ) @@ -574,6 +596,7 @@ for check_str in strings_displayed_after_submit: self.check_page_for_string( check_str ) self.home() + def import_history_via_url( self, history_id, email, strings_displayed_after_submit=[] ): self.home() self.visit_page( "history/imp?&id=%s" % history_id ) @@ -584,9 +607,10 @@ # Functions associated with datasets (history items) and meta data def _get_job_stream_output( self, hda_id, stream, format ): self.visit_page( "datasets/%s/%s" % ( self.security.encode_id( hda_id ), stream ) ) + output = self.last_page() if format: - msg = "---------------------- >> begin tool %s << -----------------------\n" % stream + msg = "---------------------- >> begin tool %s << -----------------------\n" % stream msg += output + "\n" msg += "----------------------- >> end tool %s << ------------------------\n" % stream else: @@ -609,43 +633,48 @@ """Looks for 'patt' in the edit page when editing a dataset""" data_list = self.get_history_as_data_list() self.assertTrue( data_list ) - if hid is None: # take last hid + if hid is None: # take last hid elem = data_list[-1] hid = int( elem.get('hid') ) self.assertTrue( hid ) self.visit_page( "dataset/edit?hid=%s" % hid ) for subpatt in patt.split(): tc.find(subpatt) + def delete_history_item( self, hda_id, strings_displayed=[] ): """Deletes an item from a history""" try: hda_id = int( hda_id ) except: - raise AssertionError, "Invalid hda_id '%s' - must be int" % hda_id + raise AssertionError( "Invalid hda_id '%s' - must be int" % hda_id ) self.visit_url( "%s/datasets/%s/delete?show_deleted_on_refresh=False" % ( self.url, self.security.encode_id( hda_id ) ) ) for check_str in strings_displayed: self.check_page_for_string( check_str ) + def undelete_history_item( self, hda_id, strings_displayed=[] ): """Un-deletes a deleted item in a history""" try: hda_id = int( hda_id ) except: - raise AssertionError, "Invalid hda_id '%s' - must be int" % hda_id + raise AssertionError( "Invalid hda_id '%s' - must be int" % hda_id ) self.visit_url( "%s/datasets/%s/undelete" % ( self.url, self.security.encode_id( hda_id ) ) ) for check_str in strings_displayed: self.check_page_for_string( check_str ) + def display_history_item( self, hda_id, strings_displayed=[] ): """Displays a history item - simulates eye icon click""" self.visit_url( '%s/datasets/%s/display/' % ( self.url, self.security.encode_id( hda_id ) ) ) for check_str in strings_displayed: self.check_page_for_string( check_str ) self.home() + def view_history( self, history_id, strings_displayed=[] ): """Displays a history for viewing""" self.visit_url( '%s/history/view?id=%s' % ( self.url, self.security.encode_id( history_id ) ) ) for check_str in strings_displayed: self.check_page_for_string( check_str ) self.home() + def edit_hda_attribute_info( self, hda_id, new_name='', new_info='', new_dbkey='', new_startcol='', strings_displayed=[], strings_not_displayed=[] ): """Edit history_dataset_association attribute information""" @@ -673,14 +702,16 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) incorrectly displayed on Edit Attributes page." % check_str + raise AssertionError( "String (%s) incorrectly displayed on Edit Attributes page." % check_str ) except: pass self.home() + def check_hda_attribute_info( self, hda_id, strings_displayed=[] ): """Edit history_dataset_association attribute information""" for check_str in strings_displayed: self.check_page_for_string( check_str ) + def auto_detect_metadata( self, hda_id ): """Auto-detect history_dataset_association metadata""" self.home() @@ -695,6 +726,7 @@ self.check_page_for_string( 'Attributes updated' ) #self.check_page_for_string( 'Attributes updated' ) self.home() + def convert_format( self, hda_id, target_type ): """Convert format of history_dataset_association""" self.home() @@ -703,8 +735,9 @@ tc.fv( 'convert_data', 'target_type', target_type ) tc.submit( 'convert_data' ) self.check_page_for_string( 'The file conversion of Convert BED to GFF on data' ) - self.wait() #wait for the format convert tool to finish before returning + self.wait() # wait for the format convert tool to finish before returning self.home() + def change_datatype( self, hda_id, datatype ): """Change format of history_dataset_association""" self.home() @@ -714,6 +747,7 @@ tc.submit( 'change' ) self.check_page_for_string( 'Changed the type of dataset' ) self.home() + def copy_history_item( self, source_dataset_id=None, target_history_id=None, all_target_history_ids=[], deleted_history_ids=[] ): """ @@ -730,10 +764,10 @@ for id in deleted_history_ids: try: self.check_page_for_string( id ) - raise AssertionError, "deleted history id %d displayed in list of target histories" % id + raise AssertionError( "deleted history id %d displayed in list of target histories" % id ) except: pass - + tc.fv( '1', 'target_history_id', target_history_id ) tc.submit( 'do_copy' ) check_str = '1 dataset copied to 1 history' @@ -760,17 +794,17 @@ def makeTfname(self, fname=None): """safe temp name - preserve the file extension for tools that interpret it""" - suffix = os.path.split(fname)[-1] # ignore full path - fd,temp_prefix = tempfile.mkstemp(prefix='tmp',suffix=suffix) + suffix = os.path.split(fname)[-1] # ignore full path + fd, temp_prefix = tempfile.mkstemp(prefix='tmp', suffix=suffix) return temp_prefix def verify_dataset_correctness( self, filename, hid=None, wait=True, maxseconds=120, attributes=None, shed_tool_id=None ): """Verifies that the attributes and contents of a history item meet expectations""" if wait: - self.wait( maxseconds=maxseconds ) #wait for job to finish + self.wait( maxseconds=maxseconds ) # wait for job to finish data_list = self.get_history_as_data_list() self.assertTrue( data_list ) - if hid is None: # take last hid + if hid is None: # take last hid elem = data_list[-1] hid = str( elem.get('hid') ) else: @@ -804,9 +838,9 @@ raise AssertionError( errmsg ) if filename is not None: local_name = self.get_filename( filename, shed_tool_id=shed_tool_id ) - temp_name = self.makeTfname(fname = filename) + temp_name = self.makeTfname(fname=filename) file( temp_name, 'wb' ).write( data ) - + # if the server's env has GALAXY_TEST_SAVE, save the output file to that dir if self.keepOutdir: ofn = os.path.join( self.keepOutdir, os.path.basename( local_name ) ) @@ -814,12 +848,11 @@ try: shutil.copy( temp_name, ofn ) except Exception, exc: - error_log_msg = ( 'TwillTestCase could not save output file %s to %s: ' % ( temp_name, ofn ) ) + error_log_msg = ( 'TwillTestCase could not save output file %s to %s: ' % ( temp_name, ofn ) ) error_log_msg += str( exc ) log.error( error_log_msg, exc_info=True ) else: log.debug('## GALAXY_TEST_SAVE=%s. saved %s' % ( self.keepOutdir, ofn ) ) - try: # have to nest try-except in try-finally to handle 2.4 try: @@ -837,15 +870,15 @@ elif compare == 're_match_multiline': self.files_re_match_multiline( local_name, temp_name, attributes=attributes ) elif compare == 'sim_size': - delta = attributes.get('delta','100') + delta = attributes.get('delta', '100') s1 = len(data) s2 = os.path.getsize(local_name) - if abs(s1-s2) > int(delta): - raise Exception, 'Files %s=%db but %s=%db - compare (delta=%s) failed' % (temp_name,s1,local_name,s2,delta) + if abs(s1 - s2) > int(delta): + raise Exception( 'Files %s=%db but %s=%db - compare (delta=%s) failed' % (temp_name, s1, local_name, s2, delta) ) elif compare == "contains": self.files_contains( local_name, temp_name, attributes=attributes ) else: - raise Exception, 'Unimplemented Compare type: %s' % compare + raise Exception( 'Unimplemented Compare type: %s' % compare ) if extra_files: self.verify_extra_files_content( extra_files, elem.get( 'id' ), shed_tool_id=shed_tool_id ) except AssertionError, err: @@ -876,22 +909,22 @@ for filename in os.listdir( self.get_filename( extra_value, shed_tool_id=shed_tool_id ) ): files_list.append( ( filename, os.path.join( extra_value, filename ), extra_attributes ) ) else: - raise ValueError, 'unknown extra_files type: %s' % extra_type + raise ValueError( 'unknown extra_files type: %s' % extra_type ) for filename, filepath, attributes in files_list: self.verify_composite_datatype_file_content( filepath, hda_id, base_name=filename, attributes=attributes, shed_tool_id=shed_tool_id ) - + def verify_composite_datatype_file_content( self, file_name, hda_id, base_name=None, attributes=None, shed_tool_id=None ): local_name = self.get_filename( file_name, shed_tool_id=shed_tool_id ) if base_name is None: base_name = os.path.split(file_name)[-1] - temp_name = self.makeTfname(fname = base_name) + temp_name = self.makeTfname(fname=base_name) self.visit_url( "%s/datasets/%s/display/%s" % ( self.url, self.security.encode_id( hda_id ), base_name ) ) data = self.last_page() file( temp_name, 'wb' ).write( data ) if self.keepOutdir > '': - ofn = os.path.join(self.keepOutdir,base_name) - shutil.copy(temp_name,ofn) - log.debug('## GALAXY_TEST_SAVE=%s. saved %s' % (self.keepOutdir,ofn)) + ofn = os.path.join(self.keepOutdir, base_name) + shutil.copy(temp_name, ofn) + log.debug('## GALAXY_TEST_SAVE=%s. saved %s' % (self.keepOutdir, ofn)) try: # have to nest try-except in try-finally to handle 2.4 try: @@ -905,13 +938,13 @@ elif compare == 're_match_multiline': self.files_re_match_multiline( local_name, temp_name, attributes=attributes ) elif compare == 'sim_size': - delta = attributes.get('delta','100') + delta = attributes.get('delta', '100') s1 = len(data) s2 = os.path.getsize(local_name) - if abs(s1-s2) > int(delta): - raise Exception, 'Files %s=%db but %s=%db - compare (delta=%s) failed' % (temp_name,s1,local_name,s2,delta) + if abs(s1 - s2) > int(delta): + raise Exception( 'Files %s=%db but %s=%db - compare (delta=%s) failed' % (temp_name, s1, local_name, s2, delta) ) else: - raise Exception, 'Unimplemented Compare type: %s' % compare + raise Exception( 'Unimplemented Compare type: %s' % compare ) except AssertionError, err: errmsg = 'Composite file (%s) of History item %s different than expected, difference (using %s):\n' % ( base_name, hda_id, compare ) errmsg += str( err ) @@ -925,7 +958,7 @@ return True def is_binary( self, filename ): - temp = open( filename, "U" ) # why is this not filename? Where did temp_name come from + temp = open( filename, "U" ) # why is this not filename? Where did temp_name come from lineno = 0 for line in temp: lineno += 1 @@ -949,7 +982,7 @@ # Functions associated with user accounts def create( self, cntrller='user', email='test@bx.psu.edu', password='testuser', username='admin-user', redirect='' ): - # HACK: don't use panels because late_javascripts() messes up the twill browser and it + # HACK: don't use panels because late_javascripts() messes up the twill browser and it # can't find form fields (and hence user can't be logged in). self.visit_url( "%s/user/create?cntrller=%s&use_panels=False" % ( self.url, cntrller ) ) tc.fv( 'registration', 'email', email ) @@ -980,6 +1013,7 @@ except: pass return previously_created, username_taken, invalid_username + def create_user_with_info( self, email, password, username, user_info_values, user_type_fd_id='', cntrller='user', strings_displayed=[], strings_displayed_after_submit=[] ): # This method creates a new user with associated info @@ -998,6 +1032,7 @@ for check_str in strings_displayed: self.check_page_for_string( check_str) tc.submit( "create_user_button" ) + def edit_user_info( self, cntrller='user', id='', new_email='', new_username='', password='', new_password='', info_values=[], strings_displayed=[], strings_displayed_after_submit=[] ): if cntrller == 'admin': @@ -1021,51 +1056,53 @@ tc.submit( "change_password_button" ) if info_values: for index, ( field_name, info_value ) in enumerate( info_values ): - field_index = index + 1 tc.fv( "user_info", field_name, info_value ) tc.submit( "edit_user_info_button" ) for check_str in strings_displayed_after_submit: self.check_page_for_string( check_str ) self.home() + def user_set_default_permissions( self, cntrller='user', permissions_out=[], permissions_in=[], role_id='2' ): - # role.id = 2 is Private Role for test2@bx.psu.edu - # NOTE: Twill has a bug that requires the ~/user/permissions page to contain at least 1 option value + # role.id = 2 is Private Role for test2@bx.psu.edu + # NOTE: Twill has a bug that requires the ~/user/permissions page to contain at least 1 option value # in each select list or twill throws an exception, which is: ParseError: OPTION outside of SELECT - # Due to this bug, we'll bypass visiting the page, and simply pass the permissions on to the + # Due to this bug, we'll bypass visiting the page, and simply pass the permissions on to the # /user/set_default_permissions method. url = "user/set_default_permissions?cntrller=%s&update_roles_button=Save&id=None" % cntrller for po in permissions_out: key = '%s_out' % po - url ="%s&%s=%s" % ( url, key, str( role_id ) ) + url = "%s&%s=%s" % ( url, key, str( role_id ) ) for pi in permissions_in: key = '%s_in' % pi - url ="%s&%s=%s" % ( url, key, str( role_id ) ) + url = "%s&%s=%s" % ( url, key, str( role_id ) ) self.visit_url( "%s/%s" % ( self.url, url ) ) self.check_page_for_string( 'Default new history permissions have been changed.' ) self.home() - def history_set_default_permissions( self, permissions_out=[], permissions_in=[], role_id=3 ): # role.id = 3 is Private Role for test3@bx.psu.edu - # NOTE: Twill has a bug that requires the ~/user/permissions page to contain at least 1 option value + + def history_set_default_permissions( self, permissions_out=[], permissions_in=[], role_id=3 ): # role.id = 3 is Private Role for test3@bx.psu.edu + # NOTE: Twill has a bug that requires the ~/user/permissions page to contain at least 1 option value # in each select list or twill throws an exception, which is: ParseError: OPTION outside of SELECT - # Due to this bug, we'll bypass visiting the page, and simply pass the permissions on to the + # Due to this bug, we'll bypass visiting the page, and simply pass the permissions on to the # /user/set_default_permissions method. url = "root/history_set_default_permissions?update_roles_button=Save&id=None&dataset=True" for po in permissions_out: key = '%s_out' % po - url ="%s&%s=%s" % ( url, key, str( role_id ) ) + url = "%s&%s=%s" % ( url, key, str( role_id ) ) for pi in permissions_in: key = '%s_in' % pi - url ="%s&%s=%s" % ( url, key, str( role_id ) ) + url = "%s&%s=%s" % ( url, key, str( role_id ) ) self.home() self.visit_url( "%s/%s" % ( self.url, url ) ) self.check_page_for_string( 'Default history permissions have been changed.' ) self.home() + def login( self, email='test@bx.psu.edu', password='testuser', username='admin-user', redirect='' ): # test@bx.psu.edu is configured as an admin user previously_created, username_taken, invalid_username = \ self.create( email=email, password=password, username=username, redirect=redirect ) if previously_created: # The acount has previously been created, so just login. - # HACK: don't use panels because late_javascripts() messes up the twill browser and it + # HACK: don't use panels because late_javascripts() messes up the twill browser and it # can't find form fields (and hence user can't be logged in). self.visit_url( "%s/user/login?use_panels=False" % self.url ) self.submit_form( 1, 'login_button', email=email, redirect=redirect, password=password ) @@ -1075,7 +1112,7 @@ self.visit_page( "user/logout" ) self.check_page_for_string( "You have been logged out" ) self.home() - + # Functions associated with browsers, cookies, HTML forms and page visits def check_for_strings( self, strings_displayed=[], strings_not_displayed=[] ): @@ -1087,15 +1124,15 @@ self.check_string_not_in_page( string ) def check_page_for_string( self, patt ): - """Looks for 'patt' in the current browser page""" + """Looks for 'patt' in the current browser page""" page = self.last_page() if page.find( patt ) == -1: fname = self.write_temp_file( page ) errmsg = "no match to '%s'\npage content written to '%s'" % ( patt, fname ) raise AssertionError( errmsg ) - + def check_string_count_in_page( self, patt, min_count ): - """Checks the number of 'patt' occurrences in the current browser page""" + """Checks the number of 'patt' occurrences in the current browser page""" page = self.last_page() patt_count = page.count( patt ) # The number of occurrences of patt in the page should be at least min_count @@ -1104,15 +1141,15 @@ fname = self.write_temp_file( page ) errmsg = "%i occurrences of '%s' found instead of %i.\npage content written to '%s' " % ( min_count, patt, patt_count, fname ) raise AssertionError( errmsg ) - + def check_string_not_in_page( self, patt ): - """Checks to make sure 'patt' is NOT in the page.""" + """Checks to make sure 'patt' is NOT in the page.""" page = self.last_page() if page.find( patt ) != -1: fname = self.write_temp_file( page ) errmsg = "string (%s) incorrectly displayed in page.\npage content written to '%s'" % ( patt, fname ) raise AssertionError( errmsg ) - + def check_page(self, strings_displayed, strings_displayed_count, strings_not_displayed): """Checks a page for strings displayed, not displayed and number of occurrences of a string""" for check_str in strings_displayed: @@ -1122,7 +1159,6 @@ for check_str in strings_not_displayed: self.check_string_not_in_page( check_str ) - def write_temp_file( self, content, suffix='.html' ): fd, fname = tempfile.mkstemp( suffix=suffix, prefix='twilltestcase-' ) f = os.fdopen( fd, "w" ) @@ -1174,50 +1210,50 @@ for i, control in enumerate( f.controls ): formcontrols.append( "control %d: %s" % ( i, str( control ) ) ) for i, control in enumerate( f.controls ): - if not hc_prefix in str( control ): - try: - #check if a repeat element needs to be added - if control.name is not None: - if control.name not in kwd and control.name.endswith( '_add' ): - #control name doesn't exist, could be repeat - repeat_startswith = control.name[0:-4] - if repeat_startswith and not [ c_name for c_name in controls.keys() if c_name.startswith( repeat_startswith ) ] and [ c_name for c_name in kwd.keys() if c_name.startswith( repeat_startswith ) ]: - tc.browser.clicked( f, control ) - tc.submit( control.name ) + if not hc_prefix in str( control ): + try: + #check if a repeat element needs to be added + if control.name is not None: + if control.name not in kwd and control.name.endswith( '_add' ): + #control name doesn't exist, could be repeat + repeat_startswith = control.name[0:-4] + if repeat_startswith and not [ c_name for c_name in controls.keys() if c_name.startswith( repeat_startswith ) ] and [ c_name for c_name in kwd.keys() if c_name.startswith( repeat_startswith ) ]: + tc.browser.clicked( f, control ) + tc.submit( control.name ) + return self.submit_form( form_no=form_no, button=button, **kwd ) + # Check for refresh_on_change attribute, submit a change if required + if hasattr( control, 'attrs' ) and 'refresh_on_change' in control.attrs.keys(): + changed = False + # For DataToolParameter, control.value is the HDA id, but kwd contains the filename. + # This loop gets the filename/label for the selected values. + item_labels = [ item.attrs[ 'label' ] for item in control.get_items() if item.selected ] + for value in kwd[ control.name ]: + if value not in control.value and True not in [ value in item_label for item_label in item_labels ]: + changed = True + break + if changed: + # Clear Control and set to proper value + control.clear() + # kwd[control.name] should be a singlelist + for elem in kwd[ control.name ]: + tc.fv( f.name, control.name, str( elem ) ) + # Create a new submit control, allows form to refresh, instead of going to next page + control = ClientForm.SubmitControl( 'SubmitControl', '___refresh_grouping___', {'name': 'refresh_grouping'} ) + control.add_to_form( f ) + control.fixup() + # Submit for refresh + tc.submit( '___refresh_grouping___' ) return self.submit_form( form_no=form_no, button=button, **kwd ) - # Check for refresh_on_change attribute, submit a change if required - if hasattr( control, 'attrs' ) and 'refresh_on_change' in control.attrs.keys(): - changed = False - # For DataToolParameter, control.value is the HDA id, but kwd contains the filename. - # This loop gets the filename/label for the selected values. - item_labels = [ item.attrs[ 'label' ] for item in control.get_items() if item.selected ] - for value in kwd[ control.name ]: - if value not in control.value and True not in [ value in item_label for item_label in item_labels ]: - changed = True - break - if changed: - # Clear Control and set to proper value - control.clear() - # kwd[control.name] should be a singlelist - for elem in kwd[ control.name ]: - tc.fv( f.name, control.name, str( elem ) ) - # Create a new submit control, allows form to refresh, instead of going to next page - control = ClientForm.SubmitControl( 'SubmitControl', '___refresh_grouping___', {'name':'refresh_grouping'} ) - control.add_to_form( f ) - control.fixup() - # Submit for refresh - tc.submit( '___refresh_grouping___' ) - return self.submit_form( form_no=form_no, button=button, **kwd ) - except Exception, e: - log.exception( "In submit_form, continuing, but caught exception." ) - for formcontrol in formcontrols: - log.debug( formcontrol ) - continue - controls[ control.name ] = control + except Exception: + log.exception( "In submit_form, continuing, but caught exception." ) + for formcontrol in formcontrols: + log.debug( formcontrol ) + continue + controls[ control.name ] = control # No refresh_on_change attribute found in current form, so process as usual for control_name, control_value in kwd.items(): if control_name not in controls: - continue # these cannot be handled safely - cause the test to barf out + continue # these cannot be handled safely - cause the test to barf out if not isinstance( control_value, list ): control_value = [ control_value ] control = controls[ control_name ] @@ -1247,7 +1283,7 @@ else: for elem in control_value: control.get( name=elem ).selected = True - else: # control.is_of_kind( "singlelist" ) + else: # control.is_of_kind( "singlelist" ) for elem in control_value: try: tc.fv( f.name, control.name, str( elem ) ) @@ -1298,15 +1334,16 @@ control.clear() tc.fv( f.name, control.name, value ) # Create a new submit control, allows form to refresh, instead of going to next page - control = ClientForm.SubmitControl( 'SubmitControl', '___refresh_grouping___', {'name':'refresh_grouping'} ) + control = ClientForm.SubmitControl( 'SubmitControl', '___refresh_grouping___', {'name': 'refresh_grouping'} ) control.add_to_form( f ) control.fixup() # Submit for refresh tc.submit( '___refresh_grouping___' ) + def visit_page( self, page ): # tc.go("./%s" % page) if not page.startswith( "/" ): - page = "/" + page + page = "/" + page tc.go( self.url + page ) tc.code( 200 ) @@ -1322,7 +1359,7 @@ if repeat_name is not None: repeat_button = '%s_add' % repeat_name # Must click somewhere in tool_form, to disambiguate what form - # is being targetted. + # is being targetted. tc.browser.clicked( tc.browser.get_form( 'tool_form' ), None ) # Submit the "repeat" form button to add an input) tc.submit( repeat_button ) @@ -1336,9 +1373,9 @@ galaxy_url = urllib.quote_plus( "%s/tool_runner/index?" % self.url ) self.visit_url( "http://genome.ucsc.edu/cgi-bin/hgTables?GALAXY_URL=%s&hgta_compressType=none&tool_id=%s&%s" % ( galaxy_url, tool_id, track_string ) ) tc.fv( "mainForm", "hgta_doTopSubmit", "get output" ) - self.submit_form( button="get output" )#, **track_params ) + self.submit_form( button="get output" ) tc.fv( 2, "hgta_doGalaxyQuery", "Send query to Galaxy" ) - self.submit_form( button="Send query to Galaxy" )#, **output_params ) #AssertionError: Attempting to set field 'fbQual' to value '['whole']' in form 'None' threw exception: no matching forms! control: <RadioControl(fbQual=[whole, upstreamAll, endAll])> + self.submit_form( button="Send query to Galaxy" ) def get_running_datasets( self ): self.visit_url( '/api/histories' ) @@ -1363,7 +1400,7 @@ slept += sleep_amount sleep_amount *= 2 if slept + sleep_amount > maxseconds: - sleep_amount = maxseconds - slept # don't overshoot maxseconds + sleep_amount = maxseconds - slept # don't overshoot maxseconds else: break assert slept < maxseconds @@ -1373,7 +1410,7 @@ def create_new_account_as_admin( self, email='test4@bx.psu.edu', password='testuser', username='regular-user4', redirect='' ): """Create a new account for another user""" - # HACK: don't use panels because late_javascripts() messes up the twill browser and it + # HACK: don't use panels because late_javascripts() messes up the twill browser and it # can't find form fields (and hence user can't be logged in). self.visit_url( "%s/user/create?cntrller=admin" % self.url ) self.submit_form( 1, 'create_user_button', email=email, redirect=redirect, password=password, confirm=password, username=username ) @@ -1409,6 +1446,7 @@ tc.submit( "reset_user_password_button" ) self.check_page_for_string( "Passwords reset for 1 user." ) self.home() + def mark_user_deleted( self, user_id, email='' ): """Mark a user as deleted""" self.home() @@ -1416,6 +1454,7 @@ check_str = "Deleted 1 users" self.check_page_for_string( check_str ) self.home() + def undelete_user( self, user_id, email='' ): """Undelete a user""" self.home() @@ -1423,6 +1462,7 @@ check_str = "Undeleted 1 users" self.check_page_for_string( check_str ) self.home() + def purge_user( self, user_id, email ): """Purge a user account""" self.home() @@ -1430,6 +1470,7 @@ check_str = "Purged 1 users" self.check_page_for_string( check_str ) self.home() + def manage_roles_and_groups_for_user( self, user_id, in_role_ids=[], out_role_ids=[], in_group_ids=[], out_group_ids=[], strings_displayed=[] ): self.home() @@ -1448,12 +1489,13 @@ for check_str in strings_displayed: self.check_page_for_string( check_str ) self.home() + # Tests associated with roles - # Tests associated with roles def browse_roles( self, strings_displayed=[] ): self.visit_url( '%s/admin/roles' % self.url ) for check_str in strings_displayed: self.check_page_for_string( check_str ) + def create_role( self, name='Role One', description="This is Role One", @@ -1488,6 +1530,7 @@ self.visit_url( "%s/admin/roles" % self.url ) self.check_page_for_string( name ) self.home() + def rename_role( self, role_id, name='Role One Renamed', description='This is Role One Re-described' ): """Rename a role""" self.home() @@ -1497,6 +1540,7 @@ tc.fv( "1", "description", description ) tc.submit( "rename_role_button" ) self.home() + def mark_role_deleted( self, role_id, role_name ): """Mark a role as deleted""" self.home() @@ -1504,6 +1548,7 @@ check_str = "Deleted 1 roles: %s" % role_name self.check_page_for_string( check_str ) self.home() + def undelete_role( self, role_id, role_name ): """Undelete an existing role""" self.home() @@ -1511,6 +1556,7 @@ check_str = "Undeleted 1 roles: %s" % role_name self.check_page_for_string( check_str ) self.home() + def purge_role( self, role_id, role_name ): """Purge an existing role""" self.home() @@ -1518,6 +1564,7 @@ check_str = "Purged 1 roles: %s" % role_name self.check_page_for_string( check_str ) self.home() + def associate_users_and_groups_with_role( self, role_id, role_name, user_ids=[], group_ids=[] ): self.home() url = "%s/admin/role?id=%s&role_members_edit_button=Save" % ( self.url, role_id ) @@ -1548,10 +1595,12 @@ self.visit_url( "%s/admin/groups" % self.url ) self.check_page_for_string( name ) self.home() + def browse_groups( self, strings_displayed=[] ): self.visit_url( '%s/admin/groups' % self.url ) for check_str in strings_displayed: self.check_page_for_string( check_str ) + def rename_group( self, group_id, name='Group One Renamed' ): """Rename a group""" self.home() @@ -1560,6 +1609,7 @@ tc.fv( "1", "name", name ) tc.submit( "rename_group_button" ) self.home() + def associate_users_and_roles_with_group( self, group_id, group_name, user_ids=[], role_ids=[] ): self.home() url = "%s/admin/manage_users_and_roles_for_group?id=%s&group_roles_users_edit_button=Save" % ( self.url, group_id ) @@ -1571,6 +1621,7 @@ check_str = "Group '%s' has been updated with %d associated roles and %d associated users" % ( group_name, len( role_ids ), len( user_ids ) ) self.check_page_for_string( check_str ) self.home() + def mark_group_deleted( self, group_id, group_name ): """Mark a group as deleted""" self.home() @@ -1578,6 +1629,7 @@ check_str = "Deleted 1 groups: %s" % group_name self.check_page_for_string( check_str ) self.home() + def undelete_group( self, group_id, group_name ): """Undelete an existing group""" self.home() @@ -1585,6 +1637,7 @@ check_str = "Undeleted 1 groups: %s" % group_name self.check_page_for_string( check_str ) self.home() + def purge_group( self, group_id, group_name ): """Purge an existing group""" self.home() @@ -1628,7 +1681,7 @@ if num_options == 0: # Default to 2 options num_options = 2 - for index2 in range( 1, num_options+1 ): + for index2 in range( 1, num_options + 1 ): tc.submit( "addoption_0" ) # Add contents to the new options fields for index2 in range( num_options ): @@ -1644,6 +1697,7 @@ for check_str in strings_displayed_after_submit: self.check_page_for_string( check_str ) self.home() + def edit_form( self, id, form_type='', new_form_name='', new_form_desc='', field_dicts=[], field_index=0, strings_displayed=[], strings_not_displayed=[], strings_displayed_after_submit=[] ): """Edit form details; name and description""" @@ -1771,10 +1825,10 @@ url = "request_type/request_type_permissions?id=%s&update_roles_button=Save" % ( request_type_id ) for po in permissions_out: key = '%s_out' % po - url ="%s&%s=%s" % ( url, key, role_ids_str ) + url = "%s&%s=%s" % ( url, key, role_ids_str ) for pi in permissions_in: key = '%s_in' % pi - url ="%s&%s=%s" % ( url, key, role_ids_str ) + url = "%s&%s=%s" % ( url, key, role_ids_str ) self.home() self.visit_url( "%s/%s" % ( self.url, url ) ) check_str = "Permissions updated for request type '%s'" % request_type_name @@ -2080,7 +2134,7 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) incorrectly displayed when browing library." % check_str + raise AssertionError( "String (%s) incorrectly displayed when browing library." % check_str ) except: pass def browse_libraries_regular_user( self, strings_displayed=[], strings_not_displayed=[] ): @@ -2090,7 +2144,7 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) incorrectly displayed when browing library." % check_str + raise AssertionError( "String (%s) incorrectly displayed when browing library." % check_str ) except: pass def browse_library( self, cntrller, library_id, show_deleted=False, strings_displayed=[], strings_not_displayed=[] ): @@ -2100,7 +2154,7 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) incorrectly displayed when browing library." % check_str + raise AssertionError( "String (%s) incorrectly displayed when browing library." % check_str ) except: pass def create_library( self, name='Library One', description='This is Library One', synopsis='Synopsis for Library One' ): @@ -2156,10 +2210,10 @@ url = "library_common/library_permissions?id=%s&cntrller=%s&update_roles_button=Save" % ( library_id, cntrller ) for po in permissions_out: key = '%s_out' % po - url ="%s&%s=%s" % ( url, key, role_ids_str ) + url = "%s&%s=%s" % ( url, key, role_ids_str ) for pi in permissions_in: key = '%s_in' % pi - url ="%s&%s=%s" % ( url, key, role_ids_str ) + url = "%s&%s=%s" % ( url, key, role_ids_str ) self.home() self.visit_url( "%s/%s" % ( self.url, url ) ) check_str = "Permissions updated for library '%s'." % library_name @@ -2208,7 +2262,7 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) incorrectly displayed." % check_str + raise AssertionError( "String (%s) incorrectly displayed." % check_str ) except: pass if template_refresh_field_contents: @@ -2227,7 +2281,7 @@ for check_str in strings_not_displayed_after_submit: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) incorrectly displayed." % check_str + raise AssertionError( "String (%s) incorrectly displayed." % check_str ) except: pass self.home() @@ -2301,10 +2355,10 @@ ( self.url, cntrller, library_id, folder_id, id ) for po in permissions_out: key = '%s_out' % po - url ="%s&%s=%s" % ( url, key, role_ids_str ) + url = "%s&%s=%s" % ( url, key, role_ids_str ) for pi in permissions_in: key = '%s_in' % pi - url ="%s&%s=%s" % ( url, key, role_ids_str ) + url = "%s&%s=%s" % ( url, key, role_ids_str ) if permissions_in or permissions_out: url += "&update_roles_button=Save" self.visit_url( url ) @@ -2322,7 +2376,7 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) should not have been displayed on ldda info page." % check_str + raise AssertionError( "String (%s) should not have been displayed on ldda info page." % check_str ) except: pass self.home() @@ -2354,7 +2408,7 @@ for check_str in strings_not_displayed: try: self.check_page_for_string( check_str ) - raise AssertionError, "String (%s) should not have been displayed on ldda Edit Attributes page." % check_str + raise AssertionError( "String (%s) should not have been displayed on ldda Edit Attributes page." % check_str ) except: pass self.home() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.