commit/galaxy-central: greg: Improve time management and process handling when installing repositories and tool dependencies from the tool shed and completely eliminate the tuse of fabric.local when installing tool dependencies.
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/cb00a217028c/ Changeset: cb00a217028c User: greg Date: 2014-02-27 18:14:53 Summary: Improve time management and process handling when installing repositories and tool dependencies from the tool shed and completely eliminate the tuse of fabric.local when installing tool dependencies. Affected #: 2 files diff -r a3db4f68660384e96212f7ebb0ff1856db40278b -r cb00a217028c903a0868bad0b370b325893eb708 lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py @@ -29,7 +29,6 @@ from fabric.api import env from fabric.api import hide from fabric.api import lcd -from fabric.api import local from fabric.api import settings from fabric.api import prefix from fabric.operations import _AttributeString @@ -37,7 +36,6 @@ log = logging.getLogger( __name__ ) INSTALLATION_LOG = 'INSTALLATION.log' -NO_CMD_OUTPUT_TIMEOUT = 600.0 VIRTUALENV_URL = 'https://pypi.python.org/packages/source/v/virtualenv/virtualenv-1.9.1.tar.gz' @@ -55,10 +53,13 @@ def run( self ): """Read lines and put them on the queue.""" + thread_lock = threading.Lock() + thread_lock.acquire() for line in iter( self._fd.readline, '' ): stripped_line = line.rstrip() self.lines.append( stripped_line ) self._queue.put( stripped_line ) + thread_lock.release() def installation_complete( self ): """Make sure there is more installation and compilation logging content expected.""" @@ -156,6 +157,24 @@ if int( version.split( "." )[ 0 ] ) < 1: raise NotImplementedError( "Install Fabric version 1.0 or later." ) +def close_file_descriptor( fd ): + """Attempt to close a file descriptor.""" + start_timer = time.time() + error = '' + while True: + try: + fd.close() + break + except IOError, e: + # Undoubtedly close() was called during a concurrent operation on the same file object. + log.debug( 'Error closing file descriptor: %s' % str( e ) ) + time.sleep( .5 ) + current_wait_time = time.time() - start_timer + if current_wait_time >= 600: + error = 'Error closing file descriptor: %s' % str( e ) + break + return error + def enqueue_output( stdout, stdout_queue, stderr, stderr_queue ): """ This method places streamed stdout and stderr into a threaded IPC queue target. Received data @@ -177,16 +196,20 @@ def file_append( text, file_path, skip_if_contained=True, make_executable=True ): """ - Append a line to a file unless skip_if_contained is True and the line already exists in the file. This method creates the file - if it doesn't exist. If make_executable is True, the permissions on the file are set to executable by the owner. This method - is similar to a local version of fabric.contrib.files.append. + Append a line to a file unless skip_if_contained is True and the line already exists in the file. + This method creates the file if it doesn't exist. If make_executable is True, the permissions on + the file are set to executable by the owner. """ if not os.path.exists( file_path ): - local( 'touch %s' % file_path ) + return_code = handle_simple_command( 'touch %s' % file_path ) + if return_code: + return return_code if make_executable: # Explicitly set the file to the received mode if valid. with settings( hide( 'everything' ), warn_only=True ): - local( 'chmod +x %s' % file_path ) + return_code = handle_simple_command( 'chmod +x %s' % file_path ) + if return_code: + return return_code return_code = 0 # Convert the received text to a list, in order to support adding one or more lines to the file. if isinstance( text, basestring ): @@ -198,12 +221,12 @@ # If the line exists in the file, egrep will return a success. with settings( hide( 'everything' ), warn_only=True ): egrep_cmd = 'egrep "^%s$" %s' % ( regex, file_path ) - contains_line = local( egrep_cmd ).succeeded - if contains_line: + return_code = handle_simple_command( egrep_cmd ) + if return_code == 0: continue # Append the current line to the file, escaping any single quotes in the line. line = line.replace( "'", r"'\\''" ) - return_code = local( "echo '%s' >> %s" % ( line, file_path ) ).return_code + return_code = handle_simple_command( "echo '%s' >> %s" % ( line, file_path ) ) if return_code: # Return upon the first error encountered. return return_code @@ -225,7 +248,7 @@ def handle_command( app, tool_dependency, install_dir, cmd, return_output=False ): context = app.install_model.context - output = run_local_command( cmd ) + output = handle_complex_command( cmd ) log_results( cmd, output, os.path.join( install_dir, INSTALLATION_LOG ) ) stdout = output.stdout stderr = output.stderr @@ -251,6 +274,96 @@ return output return output.return_code +def handle_complex_command( command ): + """ + Wrap subprocess.Popen in such a way that the stderr and stdout from running a shell command will + be captured and logged in nearly real time. This is similar to fabric.local, but allows us to + retain control over the process. This method is considered "complex" because it uses queues and + threads - see handle_simple_command(). + """ + wrapped_command = shlex.split( "/bin/sh -c '%s'" % str( command ) ) + # Launch the command as subprocess. A bufsize of 1 means line buffered. + process_handle = subprocess.Popen( wrapped_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + close_fds=False, + cwd=state.env[ 'lcwd' ] ) + pid = process_handle.pid + # Launch the asynchronous readers of the process' stdout and stderr. + stdout_queue = Queue.Queue() + stdout_reader = AsynchronousReader( process_handle.stdout, stdout_queue ) + stdout_reader.start() + stderr_queue = Queue.Queue() + stderr_reader = AsynchronousReader( process_handle.stderr, stderr_queue ) + stderr_reader.start() + # Place streamed stdout and stderr into a threaded IPC queue target so it can + # be printed and stored for later retrieval when generating the INSTALLATION.log. + stdio_thread = threading.Thread( target=enqueue_output, + args=( process_handle.stdout, + stdout_queue, + process_handle.stderr, + stderr_queue ) ) + thread_lock = threading.Lock() + thread_lock.acquire() + stdio_thread.start() + # Check the queues for output until there is nothing more to get. + start_timer = time.time() + while not stdout_reader.installation_complete() or not stderr_reader.installation_complete(): + # Show what we received from standard output. + while not stdout_queue.empty(): + try: + line = stdout_queue.get() + except Queue.Empty: + line = None + break + if line: + print line + start_timer = time.time() + else: + break + # Show what we received from standard error. + while not stderr_queue.empty(): + try: + line = stderr_queue.get() + except Queue.Empty: + line = None + break + if line: + print line + start_timer = time.time() + else: + stderr_queue.task_done() + break + # Sleep a bit before asking the readers again. + time.sleep( .1 ) + current_wait_time = time.time() - start_timer + if stdout_queue.empty() and stderr_queue.empty() and current_wait_time > td_common_util.NO_OUTPUT_TIMEOUT: + err_msg = "\nShutting down process id %s because it generated no output for the defined timeout period of %.1f seconds.\n" % \ + ( pid, td_common_util.NO_OUTPUT_TIMEOUT ) + stderr_reader.lines.append( err_msg ) + process_handle.kill() + break + thread_lock.release() + # Wait until each of the threads we've started terminate. The following calls will block each thread + # until it terminates either normally, through an unhandled exception, or until the timeout occurs. + stdio_thread.join( td_common_util.NO_OUTPUT_TIMEOUT ) + stdout_reader.join( td_common_util.NO_OUTPUT_TIMEOUT ) + stderr_reader.join( td_common_util.NO_OUTPUT_TIMEOUT ) + # Close subprocess' file descriptors. + error = close_file_descriptor( process_handle.stdout ) + error = close_file_descriptor( process_handle.stderr ) + stdout = '\n'.join( stdout_reader.lines ) + stderr = '\n'.join( stderr_reader.lines ) + # Handle error condition (deal with stdout being None, too) + output = _AttributeString( stdout.strip() if stdout else "" ) + errors = _AttributeString( stderr.strip() if stderr else "" ) + # Make sure the process has finished. + process_handle.poll() + output.return_code = process_handle.returncode + output.stderr = errors + return output + def handle_environment_variables( app, tool_dependency, install_dir, env_var_dict, set_prior_environment_commands ): """ This method works with with a combination of three tool dependency definition tag sets, which are defined in the tool_dependencies.xml file in the @@ -343,6 +456,30 @@ env_var_dict[ 'value' ] = env_var_value return env_var_dict +def handle_simple_command( command ): + """ + Use this instead of fabric.local to retain control over the process spawned to run the + received command. + """ + wrapped_command = shlex.split( "/bin/sh -c '%s'" % str( command ) ) + # Launch the command as subprocess. A bufsize of 1 means line buffered. + try: + process_handle = subprocess.Popen( wrapped_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + close_fds=False, + cwd=state.env[ 'lcwd' ] ) + except OSError, e: + # We don't have a working directory. + process_handle = subprocess.Popen( wrapped_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE ) + stdout, stderr = process_handle.communicate() + if stderr: + log.debug( "Problem executing command %s: %s." % ( wrapped_command, stderr ) ) + return process_handle.returncode + def install_virtualenv( app, venv_dir ): if not os.path.exists( venv_dir ): with make_tmp_dir() as work_dir: @@ -623,8 +760,6 @@ # </action> filtered_actions = [ a for a in actions ] dir = install_dir - # We need to be careful in determining if the value of dir is a valid directory because we're dealing with 2 environments, the fabric local - # environment and the python environment. Checking the path as follows should work. full_path_to_dir = os.path.abspath( os.path.join( work_dir, dir ) ) if not os.path.exists( full_path_to_dir ): os.makedirs( full_path_to_dir ) @@ -795,8 +930,7 @@ def log_results( command, fabric_AttributeString, file_path ): """ - Write attributes of fabric.operations._AttributeString (which is the output of executing command using fabric's local() method) - to a specified log file. + Write attributes of fabric.operations._AttributeString to a specified log file. """ if os.path.exists( file_path ): logfile = open( file_path, 'ab' ) @@ -817,104 +951,7 @@ work_dir = tempfile.mkdtemp( prefix="tmp-toolshed-mtd" ) yield work_dir if os.path.exists( work_dir ): - local( 'rm -rf %s' % work_dir ) - -def run_local_command( command ): - """ - Wrap subprocess.Popen in such a way that the stderr and stdout from running a shell command will - be captured and logged in nearly real time. This is similar to fabric.local, but allows us to - retain control over the process. - """ - wrapped_command = shlex.split( "/bin/sh -c '%s'" % str( command ) ) - # Launch the command as subprocess. A bufsize of 1 means line buffered. - process_handle = subprocess.Popen( wrapped_command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - bufsize=1, - close_fds=False, - cwd=state.env[ 'lcwd' ] ) - pid = process_handle.pid - # Launch the asynchronous readers of the process' stdout and stderr. - stdout_queue = Queue.Queue() - stdout_reader = AsynchronousReader( process_handle.stdout, stdout_queue ) - stdout_reader.start() - stderr_queue = Queue.Queue() - stderr_reader = AsynchronousReader( process_handle.stderr, stderr_queue ) - stderr_reader.start() - # Place streamed stdout and stderr into a threaded IPC queue target so it can - # be printed and stored for later retrieval when generating the INSTALLATION.log. - stdio_thread = threading.Thread( target=enqueue_output, - args=( process_handle.stdout, - stdout_queue, - process_handle.stderr, - stderr_queue ) ) - stdio_thread.start() - # Check the queues for output until there is nothing more to get. - start_timer = time.time() - while not stdout_reader.installation_complete() or not stderr_reader.installation_complete(): - # Show what we received from standard output. - while not stdout_queue.empty(): - try: - line = stdout_queue.get() - except Queue.Empty: - err_msg = "\nShutting down process id %s because it generated no stdout for the defined timout period of %d seconds.\n" % \ - ( pid, NO_CMD_OUTPUT_TIMEOUT ) - print err_msg - stdout_reader.lines.append( err_msg ) - # Close subprocess' file descriptors. - process_handle.stdout.close() - process_handle.stderr.close() - process_handle.kill() - line = None - break - if line: - print line - start_timer = time.time() - else: - break - # Show what we received from standard error. - while not stderr_queue.empty(): - try: - line = stderr_queue.get() - except Queue.Empty: - err_msg = "\nShutting down process id %s because it generated no stderr for the defined timout period of %d seconds.\n" % \ - ( pid, NO_CMD_OUTPUT_TIMEOUT ) - print err_msg - stderr_reader.lines.append( err_msg ) - # Close subprocess' file descriptor. - process_handle.stdout.close() - process_handle.stderr.close() - process_handle.kill() - line = None - break - if line: - print line - start_timer = time.time() - else: - stderr_queue.task_done() - break - # Sleep a bit before asking the readers again. - time.sleep( .1 ) - current_wait_time = time.time() - start_timer - if stdout_queue.empty() and stderr_queue.empty() and current_wait_time < NO_CMD_OUTPUT_TIMEOUT: - break - # Wait until each of the threads we've started terminate. The following calls will block each thread - # until it terminates either normally, through an unhandled exception, or until the timeout occurs. - stdio_thread.join( NO_CMD_OUTPUT_TIMEOUT ) - stdout_reader.join( NO_CMD_OUTPUT_TIMEOUT ) - stderr_reader.join( NO_CMD_OUTPUT_TIMEOUT ) - # Close subprocess' file descriptors. - process_handle.stdout.close() - process_handle.stderr.close() - stdout = '\n'.join( stdout_reader.lines ) - stderr = '\n'.join( stderr_reader.lines ) - # Handle error condition (deal with stdout being None, too) - output = _AttributeString( stdout.strip() if stdout else "" ) - errors = _AttributeString( stderr.strip() if stderr else "" ) - process_handle.poll() - output.return_code = process_handle.returncode - output.stderr = errors - return output + return_code = handle_simple_command( 'rm -rf %s' % work_dir ) def set_galaxy_environment( galaxy_user, tool_dependency_dir, host='localhost', shell='/bin/bash -l -c' ): """General Galaxy environment configuration. This method is not currently used.""" @@ -923,5 +960,4 @@ env.host_string = host env.shell = shell env.use_sudo = False - env.safe_cmd = local return env diff -r a3db4f68660384e96212f7ebb0ff1856db40278b -r cb00a217028c903a0868bad0b370b325893eb708 lib/tool_shed/galaxy_install/tool_dependencies/td_common_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/td_common_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/td_common_util.py @@ -4,6 +4,7 @@ import shutil import sys import tarfile +import time import traceback import urllib2 import zipfile @@ -13,9 +14,12 @@ log = logging.getLogger( __name__ ) +# Set no activity timeout to 20 minutes. +NO_OUTPUT_TIMEOUT = 1200 + class CompressedFile( object ): - + def __init__( self, file_path, mode='r' ): if istar( file_path ): self.file_type = 'tar' @@ -30,7 +34,7 @@ self.archive = getattr( self, method )( file_path, mode ) else: raise NameError( 'File type %s specified, no open method found.' % self.file_type ) - + def extract( self, path ): '''Determine the path to which the archive should be extracted.''' contents = self.getmembers() @@ -58,36 +62,36 @@ os.makedirs( extraction_path ) self.archive.extractall( os.path.join( extraction_path ) ) return os.path.abspath( extraction_path ) - + def getmembers_tar( self ): return self.archive.getmembers() - + def getmembers_zip( self ): return self.archive.infolist() - + def getname_tar( self, item ): return item.name - + def getname_zip( self, item ): return item.filename - + def getmember( self, name ): for member in self.getmembers(): if self.getname( member ) == name: return member - + def getmembers( self ): return getattr( self, 'getmembers_%s' % self.type )() - + def getname( self, member ): return getattr( self, 'getname_%s' % self.type )( member ) - + def isdir( self, member ): return getattr( self, 'isdir_%s' % self.type )( member ) - + def isdir_tar( self, member ): return member.isdir() - + def isdir_zip( self, member ): if member.filename.endswith( os.sep ): return True @@ -97,7 +101,7 @@ if not self.isdir( member ): return True return False - + def open_tar( self, filepath, mode ): return tarfile.open( filepath, mode, errorlevel=0 ) @@ -146,9 +150,7 @@ return None def download_binary( url, work_dir ): - ''' - Download a pre-compiled binary from the specified URL. - ''' + """Download a pre-compiled binary from the specified URL.""" downloaded_filename = os.path.split( url )[ -1 ] dir = url_download( work_dir, downloaded_filename, url, extract=False ) return downloaded_filename @@ -179,7 +181,6 @@ return os.path.abspath( os.path.join( root, name ) ) return None - def get_env_shell_file_paths( app, elem ): # Currently only the following tag set is supported. # <repository toolshed="http://localhost:9009/" name="package_numpy_1_7" owner="test" changeset_revision="c84c6a8be056"> @@ -455,6 +456,8 @@ file_path = os.path.join( install_dir, downloaded_file_name ) src = None dst = None + # Set a timer so we don't sit here forever. + start_time = time.time() try: src = urllib2.urlopen( download_url ) dst = open( file_path, 'wb' ) @@ -464,8 +467,14 @@ dst.write( chunk ) else: break - except: - raise + time_taken = time.time() - start_time + if time_taken > NO_OUTPUT_TIMEOUT: + err_msg = 'Downloading from URL %s took longer than the defined timeout period of %.1f seconds.' % \ + ( str( download_url ), NO_OUTPUT_TIMEOUT ) + raise Exception( err_msg ) + except Exception, e: + err_msg = err_msg = 'Error downloading from URL %s: %s' % ( str( download_url ), NO_OUTPUT_TIMEOUT ) + raise Exception( err_msg ) finally: if src: src.close() Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org