2 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/a1f2539d151b/ Changeset: a1f2539d151b User: dan Date: 2013-05-17 23:10:50 Summary: Add get_file_size and shrink_stream_by_size to galaxy.util Affected #: 1 file diff -r c6299cd86acba6e10befed8a9fbac93a7da5d880 -r a1f2539d151b29397c461135993b8b044d396d5c lib/galaxy/util/__init__.py --- a/lib/galaxy/util/__init__.py +++ b/lib/galaxy/util/__init__.py @@ -24,6 +24,11 @@ log = logging.getLogger(__name__) _lock = threading.RLock() +CHUNK_SIZE = 65536 #64k + +DATABASE_MAX_STRING_SIZE = 32768 +DATABASE_MAX_STRING_SIZE_PRETTY = '32K' + gzip_magic = '\037\213' bz2_magic = 'BZh' DEFAULT_ENCODING = 'utf-8' @@ -102,7 +107,7 @@ if line and line[0] != '#': yield line.split(sep) -def file_reader(fp, chunk_size=65536): +def file_reader( fp, chunk_size=CHUNK_SIZE ): """This generator yields the open fileobject in chunks (default 64k). Closes the file at the end""" while 1: data = fp.read(chunk_size) @@ -152,6 +157,60 @@ elem.tail = i + pad return elem +def get_file_size( value, default=None ): + try: + #try built-in + return os.path.getsize( value ) + except: + try: + #try built-in one name attribute + return os.path.getsize( value.name ) + except: + try: + #try tell() of end of object + offset = value.tell() + value.seek( 0, 2 ) + rval = value.tell() + value.seek( offset ) + return rval + except: + #return default value + return default + +def shrink_stream_by_size( value, size, join_by="..", left_larger=True, beginning_on_size_error=False, end_on_size_error=False ): + rval = '' + if get_file_size( value ) > size: + start = value.tell() + len_join_by = len( join_by ) + min_size = len_join_by + 2 + if size < min_size: + if beginning_on_size_error: + rval = value.read( size ) + value.seek( start ) + return rval + elif end_on_size_error: + value.seek( -size, 2 ) + rval = value.read( size ) + value.seek( start ) + return rval + raise ValueError( 'With the provided join_by value (%s), the minimum size value is %i.' % ( join_by, min_size ) ) + left_index = right_index = int( ( size - len_join_by ) / 2 ) + if left_index + right_index + len_join_by < size: + if left_larger: + left_index += 1 + else: + right_index += 1 + rval = value.read( left_index ) + join_by + value.seek( -right_index, 2 ) + rval += value.read( right_index ) + else: + while True: + data = value.read( CHUNK_SIZE ) + if not data: + break + rval += data + return rval + def shrink_string_by_size( value, size, join_by="..", left_larger=True, beginning_on_size_error=False, end_on_size_error=False ): if len( value ) > size: len_join_by = len( join_by ) https://bitbucket.org/galaxy/galaxy-central/commits/0afdd38eb5ec/ Changeset: 0afdd38eb5ec User: dan Date: 2013-05-17 23:10:51 Summary: Update job runner code to use start and end slices of long stderr and stdout. Affected #: 4 files diff -r a1f2539d151b29397c461135993b8b044d396d5c -r 0afdd38eb5ec63708a58b2e2bb8aadecf7f42f72 lib/galaxy/jobs/__init__.py --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -27,15 +27,15 @@ log = logging.getLogger( __name__ ) +DATABASE_MAX_STRING_SIZE = util.DATABASE_MAX_STRING_SIZE +DATABASE_MAX_STRING_SIZE_PRETTY = util.DATABASE_MAX_STRING_SIZE_PRETTY + # This file, if created in the job's working directory, will be used for # setting advanced metadata properties on the job and its associated outputs. # This interface is currently experimental, is only used by the upload tool, # and should eventually become API'd TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json' -DATABASE_MAX_STRING_SIZE = 32768 -DATABASE_MAX_STRING_SIZE_PRETTY = '32K' - class Sleeper( object ): """ Provides a 'sleep' method that sleeps for a number of seconds *unless* diff -r a1f2539d151b29397c461135993b8b044d396d5c -r 0afdd38eb5ec63708a58b2e2bb8aadecf7f42f72 lib/galaxy/jobs/runners/__init__.py --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -12,6 +12,7 @@ import galaxy.jobs from galaxy import model +from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size log = logging.getLogger( __name__ ) @@ -383,8 +384,8 @@ which_try = 0 while which_try < (self.app.config.retry_job_output_collection + 1): try: - stdout = file( job_state.output_file, "r" ).read( 32768 ) - stderr = file( job_state.error_file, "r" ).read( 32768 ) + stdout = shrink_stream_by_size( file( job_state.output_file, "r" ), DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) + stderr = shrink_stream_by_size( file( job_state.error_file, "r" ), DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) which_try = (self.app.config.retry_job_output_collection + 1) except Exception, e: if which_try == self.app.config.retry_job_output_collection: diff -r a1f2539d151b29397c461135993b8b044d396d5c -r 0afdd38eb5ec63708a58b2e2bb8aadecf7f42f72 lib/galaxy/jobs/runners/local.py --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -12,6 +12,7 @@ from galaxy import model from galaxy.jobs.runners import BaseJobRunner +from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size log = logging.getLogger( __name__ ) @@ -78,8 +79,8 @@ exit_code = proc.wait() stdout_file.seek( 0 ) stderr_file.seek( 0 ) - stdout = stdout_file.read( 32768 ) - stderr = stderr_file.read( 32768 ) + stdout = shrink_stream_by_size( stdout_file, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) + stderr = shrink_stream_by_size( stderr_file, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) stdout_file.close() stderr_file.close() log.debug('execution finished: %s' % command_line) diff -r a1f2539d151b29397c461135993b8b044d396d5c -r 0afdd38eb5ec63708a58b2e2bb8aadecf7f42f72 lib/galaxy/jobs/runners/pbs.py --- a/lib/galaxy/jobs/runners/pbs.py +++ b/lib/galaxy/jobs/runners/pbs.py @@ -5,6 +5,7 @@ from galaxy import model from galaxy.datatypes.data import nice_size from galaxy.util.bunch import Bunch +from galaxy.util import DATABASE_MAX_STRING_SIZE, shrink_stream_by_size from galaxy.jobs import JobDestination from galaxy.jobs.runners import AsynchronousJobState, AsynchronousJobRunner @@ -522,8 +523,8 @@ ofh = file(ofile, "r") efh = file(efile, "r") ecfh = file(ecfile, "r") - stdout = ofh.read( 32768 ) - stderr = efh.read( 32768 ) + stdout = shrink_stream_by_size( ofh, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) + stderr = shrink_stream_by_size( efh, DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True ) # This should be an 8-bit exit code, but read ahead anyway: exit_code_str = ecfh.read(32) except: Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.