4 new commits in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/e3e3576a3124/ Changeset: e3e3576a3124 Branch: central-eliminate-fabric User: inithello Date: 2014-02-20 15:47:35 Summary: Eliminate the use of fabric when executing commands defined in an <action type="shell_command"> tag. Affected #: 1 file diff -r 74ba9422f7f9e01ed59d24163e1873875bb34d42 -r e3e3576a31247e390b3dfafd24d7523c78fa4afc lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py @@ -4,9 +4,14 @@ import logging import os import shutil +import sys import tempfile -import shutil import td_common_util +import time +import shlex + +from subprocess import PIPE, Popen +from threading import Thread from contextlib import contextmanager from galaxy.util import unicodify from galaxy.util.template import fill_template @@ -16,12 +21,19 @@ eggs.require( 'paramiko' ) eggs.require( 'Fabric' ) +from fabric import state from fabric.api import env from fabric.api import hide from fabric.api import lcd from fabric.api import local from fabric.api import settings from fabric.api import prefix +from fabric.operations import _AttributeString + +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty log = logging.getLogger( __name__ ) @@ -120,6 +132,22 @@ if int( version.split( "." )[ 0 ] ) < 1: raise NotImplementedError( "Install Fabric version 1.0 or later." ) +def enqueue_output( stdout, stdout_queue, stderr, stderr_queue ): + stdout_logger = logging.getLogger( 'fabric_util.STDOUT' ) + stderr_logger = logging.getLogger( 'fabric_util.STDERR' ) + for line in iter( stdout.readline, b'' ): + output = line.rstrip() + stdout_logger.debug( output ) + stdout_queue.put( output ) + stdout.close() + stdout_queue.put(None) + for line in iter( stderr.readline, b'' ): + output = line.rstrip() + stderr_logger.debug( output ) + stderr_queue.put( output ) + stderr.close() + stderr_queue.put(None) + def file_append( text, file_path, skip_if_contained=True, make_executable=True ): ''' Append a line to a file unless skip_if_contained is True and the line already exists in the file. This method creates the file @@ -168,10 +196,9 @@ for shell_file_path in shell_file_paths: env_file_builder.append_line( action="source", value=shell_file_path ) -def handle_command( app, tool_dependency, install_dir, cmd, return_output=False ): +def handle_command( app, tool_dependency, install_dir, cmd, return_output=False, use_fabric=False ): context = app.install_model.context - with settings( warn_only=True ): - output = local( cmd, capture=True ) + output = run_local_command( cmd, capture_output=True, stream_output=True ) log_results( cmd, output, os.path.join( install_dir, INSTALLATION_LOG ) ) if output.return_code: tool_dependency.status = app.install_model.ToolDependency.installation_status.ERROR @@ -263,7 +290,7 @@ set_prior_environment_commands.append( 'echo "%s: $%s"' % ( inherited_env_var_name, inherited_env_var_name ) ) command = ' ; '.join( set_prior_environment_commands ) # Run the command and capture the output. - command_return = handle_command( app, tool_dependency, install_dir, command, return_output=True ) + command_return = handle_command( app, tool_dependency, install_dir, command, return_output=True, use_fabric=True ) # And extract anything labeled with the name of the environment variable we're populating here. if '%s: ' % inherited_env_var_name in command_return: environment_variable_value = command_return.split( '\n' ) @@ -370,7 +397,7 @@ filtered_actions = actions[ 1: ] return_code = handle_command( app, tool_dependency, install_dir, action_dict[ 'command' ] ) if return_code: - return tool_dependency + return tool_dependency dir = package_name elif action_type == 'download_file': # <action type="download_file">http://effectors.org/download/version/TTSS_GUI-1.0.1.jar</action> @@ -645,7 +672,7 @@ elif action_type == 'shell_command': with settings( warn_only=True ): cmd = install_environment.build_command( action_dict[ 'command' ] ) - return_code = handle_command( app, tool_dependency, install_dir, cmd ) + return_code = handle_command( app, tool_dependency, install_dir, cmd, use_fabric=False ) if return_code: return tool_dependency elif action_type == 'template_command': @@ -755,6 +782,30 @@ if os.path.exists( work_dir ): local( 'rm -rf %s' % work_dir ) +def run_local_command( command, capture_output=True, stream_output=True ): + wrapped_command = shlex.split( "/bin/sh -c '%s'" % command ) + stdout_queue = Queue() + stderr_queue = Queue() + process_handle = Popen( wrapped_command, stdout=PIPE, stderr=PIPE, bufsize=1, close_fds=False, cwd=state.env['lcwd'] ) + stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) + stdio_thread.daemon = True + stdio_thread.start() + stdout, stderr = wait_for_process( process_handle, stream_output, stdout_queue, stderr_queue ) + # Handle error condition (deal with stdout being None, too) + output = _AttributeString( stdout.strip() if stdout else "" ) + errors = _AttributeString( stderr.strip() if stderr else "" ) + output.failed = False + output.return_code = process_handle.returncode + output.stderr = errors + if process_handle.returncode not in env.ok_ret_codes: + output.failed = True + message = "Encountered an error (return code %s) while executing '%s'" % ( process_handle.returncode, command ) + log.error( message ) + output.succeeded = not output.failed + # If we were capturing, this will be a string; otherwise it will be None. + if capture_output: + return output + def set_galaxy_environment( galaxy_user, tool_dependency_dir, host='localhost', shell='/bin/bash -l -c' ): """General Galaxy environment configuration. This method is not currently used.""" env.user = galaxy_user @@ -764,3 +815,21 @@ env.use_sudo = False env.safe_cmd = local return env + +def wait_for_process( process_handle, stream_output, stdout_queue, stderr_queue ): + standard_out = [] + standard_err = [] + process_handle.wait() + while True: + line = stdout_queue.get() + if line is None: + break + standard_out.append(line) + while True: + line = stderr_queue.get() + if line is None: + break + standard_err.append(line) + stdout = '\n'.join( standard_out ) + stderr = '\n'.join( standard_err ) + return stdout, stderr https://bitbucket.org/galaxy/galaxy-central/commits/496e1d36231c/ Changeset: 496e1d36231c Branch: central-eliminate-fabric User: inithello Date: 2014-02-20 21:45:42 Summary: Code cleanup, documentation. Affected #: 1 file diff -r e3e3576a31247e390b3dfafd24d7523c78fa4afc -r 496e1d36231c143333c972fb9f78961b27ce210e lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py @@ -133,6 +133,14 @@ raise NotImplementedError( "Install Fabric version 1.0 or later." ) def enqueue_output( stdout, stdout_queue, stderr, stderr_queue ): + ''' + This method places streamed stdout and stderr into a threaded IPC queue target, defined as follows: + + stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) + + When input is received, it logs that input to the defined logger, and saves it to that thread's queue. The calling thread + can then retrieve that data using thread.stdout and thread.stderr. + ''' stdout_logger = logging.getLogger( 'fabric_util.STDOUT' ) stderr_logger = logging.getLogger( 'fabric_util.STDERR' ) for line in iter( stdout.readline, b'' ): @@ -140,13 +148,13 @@ stdout_logger.debug( output ) stdout_queue.put( output ) stdout.close() - stdout_queue.put(None) + stdout_queue.put( None ) for line in iter( stderr.readline, b'' ): output = line.rstrip() stderr_logger.debug( output ) stderr_queue.put( output ) stderr.close() - stderr_queue.put(None) + stderr_queue.put( None ) def file_append( text, file_path, skip_if_contained=True, make_executable=True ): ''' @@ -786,7 +794,7 @@ wrapped_command = shlex.split( "/bin/sh -c '%s'" % command ) stdout_queue = Queue() stderr_queue = Queue() - process_handle = Popen( wrapped_command, stdout=PIPE, stderr=PIPE, bufsize=1, close_fds=False, cwd=state.env['lcwd'] ) + process_handle = Popen( wrapped_command, stdout=PIPE, stderr=PIPE, bufsize=1, close_fds=False, cwd=state.env[ 'lcwd' ] ) stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) stdio_thread.daemon = True stdio_thread.start() https://bitbucket.org/galaxy/galaxy-central/commits/df8d1dfe007e/ Changeset: df8d1dfe007e Branch: central-eliminate-fabric User: inithello Date: 2014-02-20 22:00:21 Summary: Use the provided use_fabric parameter in handle_command. Affected #: 1 file diff -r 496e1d36231c143333c972fb9f78961b27ce210e -r df8d1dfe007e56529a26f696a97a4159827d706b lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py @@ -206,7 +206,11 @@ def handle_command( app, tool_dependency, install_dir, cmd, return_output=False, use_fabric=False ): context = app.install_model.context - output = run_local_command( cmd, capture_output=True, stream_output=True ) + if use_fabric: + with settings( warn_only=True ): + output = local( cmd, capture=True ) + else: + output = run_local_command( cmd, capture_output=True, stream_output=True ) log_results( cmd, output, os.path.join( install_dir, INSTALLATION_LOG ) ) if output.return_code: tool_dependency.status = app.install_model.ToolDependency.installation_status.ERROR https://bitbucket.org/galaxy/galaxy-central/commits/647946821504/ Changeset: 647946821504 User: davebgx Date: 2014-02-20 22:27:11 Summary: Merged in davebgx/galaxy-central/central-eliminate-fabric (pull request #330) Eliminate use of fabric when executing tool dependency recipes' shell_command actions. Affected #: 1 file diff -r 2efd48ff20dc24d7c6ccd94e4abe1319aa52f25d -r 647946821504986a4941738cab0ec52be2c295cf lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py --- a/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py +++ b/lib/tool_shed/galaxy_install/tool_dependencies/fabric_util.py @@ -4,9 +4,14 @@ import logging import os import shutil +import sys import tempfile -import shutil import td_common_util +import time +import shlex + +from subprocess import PIPE, Popen +from threading import Thread from contextlib import contextmanager from galaxy.util import unicodify from galaxy.util.template import fill_template @@ -16,12 +21,19 @@ eggs.require( 'paramiko' ) eggs.require( 'Fabric' ) +from fabric import state from fabric.api import env from fabric.api import hide from fabric.api import lcd from fabric.api import local from fabric.api import settings from fabric.api import prefix +from fabric.operations import _AttributeString + +try: + from Queue import Queue, Empty +except ImportError: + from queue import Queue, Empty log = logging.getLogger( __name__ ) @@ -120,6 +132,30 @@ if int( version.split( "." )[ 0 ] ) < 1: raise NotImplementedError( "Install Fabric version 1.0 or later." ) +def enqueue_output( stdout, stdout_queue, stderr, stderr_queue ): + ''' + This method places streamed stdout and stderr into a threaded IPC queue target, defined as follows: + + stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) + + When input is received, it logs that input to the defined logger, and saves it to that thread's queue. The calling thread + can then retrieve that data using thread.stdout and thread.stderr. + ''' + stdout_logger = logging.getLogger( 'fabric_util.STDOUT' ) + stderr_logger = logging.getLogger( 'fabric_util.STDERR' ) + for line in iter( stdout.readline, b'' ): + output = line.rstrip() + stdout_logger.debug( output ) + stdout_queue.put( output ) + stdout.close() + stdout_queue.put( None ) + for line in iter( stderr.readline, b'' ): + output = line.rstrip() + stderr_logger.debug( output ) + stderr_queue.put( output ) + stderr.close() + stderr_queue.put( None ) + def file_append( text, file_path, skip_if_contained=True, make_executable=True ): ''' Append a line to a file unless skip_if_contained is True and the line already exists in the file. This method creates the file @@ -168,10 +204,13 @@ for shell_file_path in shell_file_paths: env_file_builder.append_line( action="source", value=shell_file_path ) -def handle_command( app, tool_dependency, install_dir, cmd, return_output=False ): +def handle_command( app, tool_dependency, install_dir, cmd, return_output=False, use_fabric=False ): context = app.install_model.context - with settings( warn_only=True ): - output = local( cmd, capture=True ) + if use_fabric: + with settings( warn_only=True ): + output = local( cmd, capture=True ) + else: + output = run_local_command( cmd, capture_output=True, stream_output=True ) log_results( cmd, output, os.path.join( install_dir, INSTALLATION_LOG ) ) if output.return_code: tool_dependency.status = app.install_model.ToolDependency.installation_status.ERROR @@ -263,7 +302,7 @@ set_prior_environment_commands.append( 'echo "%s: $%s"' % ( inherited_env_var_name, inherited_env_var_name ) ) command = ' ; '.join( set_prior_environment_commands ) # Run the command and capture the output. - command_return = handle_command( app, tool_dependency, install_dir, command, return_output=True ) + command_return = handle_command( app, tool_dependency, install_dir, command, return_output=True, use_fabric=True ) # And extract anything labeled with the name of the environment variable we're populating here. if '%s: ' % inherited_env_var_name in command_return: environment_variable_value = command_return.split( '\n' ) @@ -370,7 +409,7 @@ filtered_actions = actions[ 1: ] return_code = handle_command( app, tool_dependency, install_dir, action_dict[ 'command' ] ) if return_code: - return tool_dependency + return tool_dependency dir = package_name elif action_type == 'download_file': # <action type="download_file">http://effectors.org/download/version/TTSS_GUI-1.0.1.jar</action> @@ -645,7 +684,7 @@ elif action_type == 'shell_command': with settings( warn_only=True ): cmd = install_environment.build_command( action_dict[ 'command' ] ) - return_code = handle_command( app, tool_dependency, install_dir, cmd ) + return_code = handle_command( app, tool_dependency, install_dir, cmd, use_fabric=False ) if return_code: return tool_dependency elif action_type == 'template_command': @@ -755,6 +794,30 @@ if os.path.exists( work_dir ): local( 'rm -rf %s' % work_dir ) +def run_local_command( command, capture_output=True, stream_output=True ): + wrapped_command = shlex.split( "/bin/sh -c '%s'" % command ) + stdout_queue = Queue() + stderr_queue = Queue() + process_handle = Popen( wrapped_command, stdout=PIPE, stderr=PIPE, bufsize=1, close_fds=False, cwd=state.env[ 'lcwd' ] ) + stdio_thread = Thread( target=enqueue_output, args=( process_handle.stdout, stdout_queue, process_handle.stderr, stderr_queue ) ) + stdio_thread.daemon = True + stdio_thread.start() + stdout, stderr = wait_for_process( process_handle, stream_output, stdout_queue, stderr_queue ) + # Handle error condition (deal with stdout being None, too) + output = _AttributeString( stdout.strip() if stdout else "" ) + errors = _AttributeString( stderr.strip() if stderr else "" ) + output.failed = False + output.return_code = process_handle.returncode + output.stderr = errors + if process_handle.returncode not in env.ok_ret_codes: + output.failed = True + message = "Encountered an error (return code %s) while executing '%s'" % ( process_handle.returncode, command ) + log.error( message ) + output.succeeded = not output.failed + # If we were capturing, this will be a string; otherwise it will be None. + if capture_output: + return output + def set_galaxy_environment( galaxy_user, tool_dependency_dir, host='localhost', shell='/bin/bash -l -c' ): """General Galaxy environment configuration. This method is not currently used.""" env.user = galaxy_user @@ -764,3 +827,21 @@ env.use_sudo = False env.safe_cmd = local return env + +def wait_for_process( process_handle, stream_output, stdout_queue, stderr_queue ): + standard_out = [] + standard_err = [] + process_handle.wait() + while True: + line = stdout_queue.get() + if line is None: + break + standard_out.append(line) + while True: + line = stderr_queue.get() + if line is None: + break + standard_err.append(line) + stdout = '\n'.join( standard_out ) + stderr = '\n'.join( standard_err ) + return stdout, stderr Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.