1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/d15e1876ba0a/ Changeset: d15e1876ba0a User: greg Date: 2014-02-25 17:35:12 Summary: Rework the code allowing us to eliminate fabric.local when installing tool dependency packages. Affected #: 1 file diff -r cab3db6e1d592fe04fbce998eeee0061e6cf3e5b -r d15e1876ba0affc36e73945e587a35e5ff243063 lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py @@ -3,15 +3,16 @@ import logging import os +import Queue import shutil +import subprocess import sys import tempfile import td_common_util +import threading import time import shlex -from subprocess import PIPE, Popen -from threading import Thread from contextlib import contextmanager from galaxy.util import unicodify from galaxy.util.template import fill_template @@ -37,6 +38,30 @@ VIRTUALENV_URL = 'https://pypi.python.org/packages/source/v/virtualenv/virtualenv-1.9.1.tar.gz' +class AsynchronousReader( threading.Thread ): + """ + A helper class to implement asynchronous reading of a stream in a separate thread. Read lines are pushed + onto a queue to be consumed in another thread. + """ + + def __init__( self, fd, queue ): + threading.Thread.__init__( self ) + self._fd = fd + self._queue = queue + self.lines = [] + + def run( self ): + """Read lines and put them on the queue.""" + for line in iter( self._fd.readline, '' ): + stripped_line = line.rstrip() + self.lines.append( stripped_line ) + self._queue.put( stripped_line ) + + def installation_complete( self ): + """Make sure there is more installation and compilation logging content expected.""" + return not self.is_alive() and self._queue.empty() + + class EnvFileBuilder( object ): def __init__( self, install_dir ): @@ -129,35 +154,28 @@ raise NotImplementedError( "Install Fabric version 1.0 or later." ) def enqueue_output( stdout, stdout_queue, stderr, stderr_queue ): - ''' - This method places streamed stdout and stderr into a threaded IPC queue target, defined as follows: - - stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) - - When input is received, it logs that input to the defined logger, and saves it to that thread's queue. The calling thread - can then retrieve that data using thread.stdout and thread.stderr. - ''' - stdout_logger = logging.getLogger( 'fabric_util.STDOUT' ) - stderr_logger = logging.getLogger( 'fabric_util.STDERR' ) + """ + This method places streamed stdout and stderr into a threaded IPC queue target. Received data + is printed and saved to that thread's queue. The calling thread can then retrieve the data using + thread.stdout and thread.stderr. + """ for line in iter( stdout.readline, b'' ): output = line.rstrip() - stdout_logger.debug( output ) + print output stdout_queue.put( output ) - stdout.close() stdout_queue.put( None ) for line in iter( stderr.readline, b'' ): output = line.rstrip() - stderr_logger.debug( output ) + print output stderr_queue.put( output ) - stderr.close() stderr_queue.put( None ) def file_append( text, file_path, skip_if_contained=True, make_executable=True ): - ''' + """ Append a line to a file unless skip_if_contained is True and the line already exists in the file. This method creates the file if it doesn't exist. If make_executable is True, the permissions on the file are set to executable by the owner. This method is similar to a local version of fabric.contrib.files.append. - ''' + """ if not os.path.exists( file_path ): local( 'touch %s' % file_path ) if make_executable: @@ -202,10 +220,9 @@ def handle_command( app, tool_dependency, install_dir, cmd, return_output=False ): context = app.install_model.context - with settings( warn_only=True ): - output = local( cmd, capture=True ) + output = run_local_command( cmd ) log_results( cmd, output, os.path.join( install_dir, INSTALLATION_LOG ) ) - if output.return_code: + if output.return_code not in [ None, env.ok_ret_codes ]: tool_dependency.status = app.install_model.ToolDependency.installation_status.ERROR if output.stderr: tool_dependency.error_message = unicodify( str( output.stderr )[ :32768 ] ) @@ -787,31 +804,100 @@ if os.path.exists( work_dir ): local( 'rm -rf %s' % work_dir ) -def run_local_command( command, capture_output=True, stream_output=True ): - # TODO: Overhaul this method. - import Queue +def run_local_command( command ): + """ + Wrap subprocess.Popen in such a way that the stderr and stdout from running a shell command will + be captured and logged in nearly real time. This is similar to fabric.local, but allows us to + retain control over the process. + """ wrapped_command = shlex.split( "/bin/sh -c '%s'" % command ) - stdout_queue = Queue() - stderr_queue = Queue() - process_handle = Popen( wrapped_command, stdout=PIPE, stderr=PIPE, bufsize=1, close_fds=False, cwd=state.env[ 'lcwd' ] ) - stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) + # Launch the command as subprocess. A bufsize of 1 means line buffered. + process_handle = subprocess.Popen( wrapped_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=1, + close_fds=False, + cwd=state.env[ 'lcwd' ] ) + pid = process_handle.pid + # Launch the asynchronous readers of the process' stdout and stderr. + stdout_queue = Queue.Queue() + stdout_reader = AsynchronousReader( process_handle.stdout, stdout_queue ) + stdout_reader.start() + stderr_queue = Queue.Queue() + stderr_reader = AsynchronousReader( process_handle.stderr, stderr_queue ) + stderr_reader.start() + # Place streamed stdout and stderr into a threaded IPC queue target so it can + # be printed and stored for later retrieval when generating the INSTALLATION.log. + stdio_thread = threading.Thread( target=enqueue_output, + args=( process_handle.stdout, + stdout_queue, + process_handle.stderr, + stderr_queue ) ) stdio_thread.daemon = True stdio_thread.start() - stdout, stderr = wait_for_process( process_handle, stream_output, stdout_queue, stderr_queue ) + # Check the queues for output until there is nothing more to get. + start_timer = time.time() + while not stdout_reader.installation_complete() or not stderr_reader.installation_complete(): + # Show what we received from standard output. + while not stdout_queue.empty(): + try: + line = stdout_queue.get() + except Queue.Empty: + err_msg = "\nShutting down process id %s because it generated no stdout for the defined timout period of %d seconds.\n" % \ + ( pid, NO_CMD_OUTPUT_TIMEOUT ) + print err_msg + stdout_reader.lines.append( err_msg ) + # Close subprocess' file descriptors. + process_handle.stdout.close() + process_handle.stderr.close() + process_handle.kill() + line = None + break + if line: + print line + start_timer = time.time() + else: + break + # Show what we received from standard error. + while not stderr_queue.empty(): + try: + line = stderr_queue.get() + except Queue.Empty: + err_msg = "\nShutting down process id %s because it generated no stderr for the defined timout period of %d seconds.\n" % \ + ( pid, NO_CMD_OUTPUT_TIMEOUT ) + print err_msg + stderr_reader.lines.append( err_msg ) + # Close subprocess' file descriptor. + process_handle.stdout.close() + process_handle.stderr.close() + process_handle.kill() + line = None + break + if line: + print line + start_timer = time.time() + else: + stderr_queue.task_done() + break + # Sleep a bit before asking the readers again. + time.sleep( .1 ) + current_wait_time = time.time() - start_timer + if stdout_queue.empty() and stderr_queue.empty() and current_wait_time < NO_CMD_OUTPUT_TIMEOUT: + break + # Let's be tidy and join the threads we've started. + stdout_reader.join() + stderr_reader.join() + # Close subprocess' file descriptors. + process_handle.stdout.close() + process_handle.stderr.close() + stdout = '\n'.join( stdout_reader.lines ) + stderr = '\n'.join( stderr_reader.lines ) # Handle error condition (deal with stdout being None, too) output = _AttributeString( stdout.strip() if stdout else "" ) errors = _AttributeString( stderr.strip() if stderr else "" ) - output.failed = False output.return_code = process_handle.returncode output.stderr = errors - if process_handle.returncode not in env.ok_ret_codes: - output.failed = True - message = "Encountered an error (return code %s) while executing '%s'" % ( process_handle.returncode, command ) - log.error( message ) - output.succeeded = not output.failed - # If we were capturing, this will be a string; otherwise it will be None. - if capture_output: - return output + return output def set_galaxy_environment( galaxy_user, tool_dependency_dir, host='localhost', shell='/bin/bash -l -c' ): """General Galaxy environment configuration. This method is not currently used.""" @@ -822,50 +908,3 @@ env.use_sudo = False env.safe_cmd = local return env - -def wait_for_process( process_handle, stream_output, stdout_queue, stderr_queue ): - # TODO: Overhaul this method. - import Queue - pid = process_handle.pid - standard_out = [] - standard_err = [] - process_handle.wait() - # Generate stdout. - while True: - try: - line = stdout_queue.get( timeout=NO_CMD_OUTPUT_TIMEOUT ) - except Queue.Empty: - err_msg = "\nShutting down process id %s because it generated no stdout for the defined timout period of %d seconds.\n" % \ - ( pid, NO_CMD_OUTPUT_TIMEOUT ) - print err_msg - standard_out.append( err_msg ) - stdout_queue.task_done() - process_handle.kill() - break - else: - if line: - standard_out.append( line ) - else: - stdout_queue.task_done() - break - # Generate stderr. - while True: - try: - line = stderr_queue.get( timeout=NO_CMD_OUTPUT_TIMEOUT ) - except Queue.Empty: - err_msg = "\nShutting down process id %s because it generated no stderr for the defined timout period of %d seconds.\n" % \ - ( pid, NO_CMD_OUTPUT_TIMEOUT ) - print err_msg - standard_err.append( err_msg ) - stderr_queue.task_done() - process_handle.kill() - break - else: - if line: - standard_err.append( line ) - else: - stderr_queue.task_done() - break - stdout = '\n'.join( standard_out ) - stderr = '\n'.join( standard_err ) - return stdout, stderr Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.