[hg] galaxy 2653: Removing the galalxy.web.framework.servers mod...
details: http://www.bx.psu.edu/hg/galaxy/rev/6b924dd68e77 changeset: 2653:6b924dd68e77 user: James Taylor <james@jamestaylor.org> date: Fri Aug 28 18:09:43 2009 -0400 description: Removing the galalxy.web.framework.servers module (unsued for a long time) 6 file(s) affected in this change: lib/galaxy/web/framework/servers/__init__.py lib/galaxy/web/framework/servers/flup/__init__.py lib/galaxy/web/framework/servers/flup/ajp_forkthreaded.py lib/galaxy/web/framework/servers/flup/preforkthreadedserver.py lib/galaxy/web/framework/servers/fork_server.py lib/galaxy/web/framework/servers/threadpool_server.py diffs (1154 lines): diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/__init__.py --- a/lib/galaxy/web/framework/servers/__init__.py Fri Aug 28 18:03:28 2009 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,3 +0,0 @@ -""" -Various WSGI webserver implementations. -""" \ No newline at end of file diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/flup/ajp_forkthreaded.py --- a/lib/galaxy/web/framework/servers/flup/ajp_forkthreaded.py Fri Aug 28 18:03:28 2009 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,211 +0,0 @@ -# Copyright (c) 2005, 2006 Allan Saddi <allan@saddi.com> -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. -# -# $Id: ajp_fork.py 2188 2006-12-05 22:11:45Z asaddi $ - -""" -ajp - an AJP 1.3/WSGI gateway. - -For more information about AJP and AJP connectors for your web server, see -<http://jakarta.apache.org/tomcat/connectors-doc/>. - -For more information about the Web Server Gateway Interface, see -<http://www.python.org/peps/pep-0333.html>. - -Example usage: - - #!/usr/bin/env python - import sys - from myapplication import app # Assume app is your WSGI application object - from ajp import WSGIServer - ret = WSGIServer(app).run() - sys.exit(ret and 42 or 0) - -See the documentation for WSGIServer for more information. - -About the bit of logic at the end: -Upon receiving SIGHUP, the python script will exit with status code 42. This -can be used by a wrapper script to determine if the python script should be -re-run. When a SIGINT or SIGTERM is received, the script exits with status -code 0, possibly indicating a normal exit. - -Example wrapper script: - - #!/bin/sh - STATUS=42 - while test $STATUS -eq 42; do - python "$@" that_script_above.py - STATUS=$? - done - -Example workers.properties (for mod_jk): - - worker.list=foo - worker.foo.port=8009 - worker.foo.host=localhost - worker.foo.type=ajp13 - -Example httpd.conf (for mod_jk): - - JkWorkersFile /path/to/workers.properties - JkMount /* foo - -Note that if you mount your ajp application anywhere but the root ("/"), you -SHOULD specifiy scriptName to the WSGIServer constructor. This will ensure -that SCRIPT_NAME/PATH_INFO are correctly deduced. -""" - -__author__ = 'Allan Saddi <allan@saddi.com>' -__version__ = '$Revision: 2188 $' - -import socket -import logging - -from flup.server.ajp_base import BaseAJPServer, Connection -from preforkthreadedserver import PreforkThreadedServer - -__all__ = ['WSGIServer'] - -class WSGIServer(BaseAJPServer, PreforkThreadedServer): - """ - AJP1.3/WSGI server. Runs your WSGI application as a persistant program - that understands AJP1.3. Opens up a TCP socket, binds it, and then - waits for forwarded requests from your webserver. - - Why AJP? Two good reasons are that AJP provides load-balancing and - fail-over support. Personally, I just wanted something new to - implement. :) - - Of course you will need an AJP1.3 connector for your webserver (e.g. - mod_jk) - see <http://jakarta.apache.org/tomcat/connectors-doc/>. - """ - def __init__(self, application, scriptName='', environ=None, - bindAddress=('localhost', 8009), allowedServers=None, - loggingLevel=logging.INFO, debug=True, **kw): - """ - scriptName is the initial portion of the URL path that "belongs" - to your application. It is used to determine PATH_INFO (which doesn't - seem to be passed in). An empty scriptName means your application - is mounted at the root of your virtual host. - - environ, which must be a dictionary, can contain any additional - environment variables you want to pass to your application. - - bindAddress is the address to bind to, which must be a tuple of - length 2. The first element is a string, which is the host name - or IPv4 address of a local interface. The 2nd element is the port - number. - - allowedServers must be None or a list of strings representing the - IPv4 addresses of servers allowed to connect. None means accept - connections from anywhere. - - loggingLevel sets the logging level of the module-level logger. - """ - BaseAJPServer.__init__(self, application, - scriptName=scriptName, - environ=environ, - multithreaded=False, - multiprocess=True, - bindAddress=bindAddress, - allowedServers=allowedServers, - loggingLevel=loggingLevel, - debug=debug) - for key in ('multithreaded', 'multiprocess', 'jobClass', 'jobArgs'): - if kw.has_key(key): - del kw[key] - PreforkThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw) - - def run(self): - """ - Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT, - SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP - is caught, this method returns True. Returns False otherwise.) - """ - self.logger.info('%s starting up', self.__class__.__name__) - - try: - sock = self._setupSocket() - except socket.error, e: - self.logger.error('Failed to bind socket (%s), exiting', e[1]) - return False - - ret = PreforkThreadedServer.run(self, sock) - - self._cleanupSocket(sock) - - self.logger.info('%s shutting down%s', self.__class__.__name__, - self._hupReceived and ' (reload requested)' or '') - - return ret - -def paste_factory_helper(wsgiServerClass, global_conf, host, port, **local_conf): - # I think I can't write a tuple for bindAddress in .ini file - host = host or global_conf.get('host', 'localhost') - port = port or global_conf.get('port', 4000) - - local_conf['bindAddress'] = (host, int(port)) - - def server(application): - server = wsgiServerClass(application, **local_conf) - server.run() - - return server - -def factory(global_conf, host=None, port=None, **local): - return paste_factory_helper(WSGIServer, global_conf, host, port, **local) - -if __name__ == '__main__': - def test_app(environ, start_response): - """Probably not the most efficient example.""" - import cgi - start_response('200 OK', [('Content-Type', 'text/html')]) - yield '<html><head><title>Hello World!</title></head>\n' \ - '<body>\n' \ - '<p>Hello World!</p>\n' \ - '<table border="1">' - names = environ.keys() - names.sort() - for name in names: - yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( - name, cgi.escape(`environ[name]`)) - - form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ, - keep_blank_values=1) - if form.list: - yield '<tr><th colspan="2">Form data</th></tr>' - - for field in form.list: - yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( - field.name, field.value) - - yield '</table>\n' \ - '</body></html>\n' - - from wsgiref import validate - test_app = validate.validator(test_app) - # Explicitly set bindAddress to *:4001 for testing. - WSGIServer(test_app, - bindAddress=('', 4001), allowedServers=None, - loggingLevel=logging.DEBUG).run() diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/flup/preforkthreadedserver.py --- a/lib/galaxy/web/framework/servers/flup/preforkthreadedserver.py Fri Aug 28 18:03:28 2009 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,469 +0,0 @@ -# Copyright (c) 2005 Allan Saddi <allan@saddi.com> -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND -# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS -# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) -# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT -# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY -# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF -# SUCH DAMAGE. -# -# $Id: preforkserver.py 2311 2007-01-23 00:05:04Z asaddi $ - -__author__ = 'Allan Saddi <allan@saddi.com>' -__version__ = '$Revision: 2311 $' - -import sys -import os -import socket -import select -import errno -import signal -import threading - -try: - import fcntl -except ImportError: - def setCloseOnExec(sock): - pass -else: - def setCloseOnExec(sock): - fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC) - -from flup.server.threadpool import ThreadPool - -# If running Python < 2.4, require eunuchs module for socket.socketpair(). -# See <http://www.inoi.fi/open/trac/eunuchs>. -if not hasattr(socket, 'socketpair'): - try: - import eunuchs.socketpair - except ImportError: - # TODO: Other alternatives? Perhaps using os.pipe()? - raise ImportError, 'Requires eunuchs module for Python < 2.4' - - def socketpair(): - s1, s2 = eunuchs.socketpair.socketpair() - p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM), - socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM)) - os.close(s1) - os.close(s2) - return p, c - - socket.socketpair = socketpair - -class PreforkThreadedServer(object): - """ - A preforked server model conceptually similar to Apache httpd(2). At - any given time, ensures there are at least minSpare children ready to - process new requests (up to a maximum of maxChildren children total). - If the number of idle children is ever above maxSpare, the extra - children are killed. - - If maxRequests is positive, each child will only handle that many - requests in its lifetime before exiting. - - jobClass should be a class whose constructor takes at least two - arguments: the client socket and client address. jobArgs, which - must be a list or tuple, is any additional (static) arguments you - wish to pass to the constructor. - - jobClass should have a run() method (taking no arguments) that does - the actual work. When run() returns, the request is considered - complete and the child process moves to idle state. - """ - def __init__(self, minSpare=32, maxSpare=32, maxChildren=32, - maxRequests=0, maxThreads=10, jobClass=None, jobArgs=()): - self._minSpare = int( minSpare ) - self._maxSpare = int( maxSpare ) - self._maxChildren = max(maxSpare, int( maxChildren ) ) - self._maxRequests = int( maxRequests ) - self._maxThreads = int( maxThreads ) - self._jobClass = jobClass - self._jobArgs = jobArgs - - # Internal state of children. Maps pids to dictionaries with two - # members: 'file' and 'avail'. 'file' is the socket to that - # individidual child and 'avail' is whether or not the child is - # free to process requests. - self._children = {} - - def run(self, sock): - """ - The main loop. Pass a socket that is ready to accept() client - connections. Return value will be True or False indiciating whether - or not the loop was exited due to SIGHUP. - """ - # Set up signal handlers. - self._keepGoing = True - self._hupReceived = False - self._installSignalHandlers() - - # Don't want operations on main socket to block. - sock.setblocking(0) - - # Set close-on-exec - setCloseOnExec(sock) - - # Main loop. - while self._keepGoing: - # Maintain minimum number of children. - while len(self._children) < self._maxSpare: - if not self._spawnChild(sock): break - - # Wait on any socket activity from live children. - r = [x['file'] for x in self._children.values() - if x['file'] is not None] - - if len(r) == len(self._children): - timeout = None - else: - # There are dead children that need to be reaped, ensure - # that they are by timing out, if necessary. - timeout = 2 - - try: - r, w, e = select.select(r, [], [], timeout) - except select.error, e: - if e[0] != errno.EINTR: - raise - - # Scan child sockets and tend to those that need attention. - for child in r: - # Receive status byte. - try: - state = child.recv(1) - except socket.error, e: - if e[0] in (errno.EAGAIN, errno.EINTR): - # Guess it really didn't need attention? - continue - raise - # Try to match it with a child. (Do we need a reverse map?) - for pid,d in self._children.items(): - if child is d['file']: - if state: - # Set availability status accordingly. - self._children[pid]['avail'] = state != '\x00' - else: - # Didn't receive anything. Child is most likely - # dead. - d = self._children[pid] - d['file'].close() - d['file'] = None - d['avail'] = False - - # Reap children. - self._reapChildren() - - # See who and how many children are available. - availList = filter(lambda x: x[1]['avail'], self._children.items()) - avail = len(availList) - - if avail < self._minSpare: - # Need to spawn more children. - while avail < self._minSpare and \ - len(self._children) < self._maxChildren: - if not self._spawnChild(sock): break - avail += 1 - elif avail > self._maxSpare: - # Too many spares, kill off the extras. - pids = [x[0] for x in availList] - pids.sort() - pids = pids[self._maxSpare:] - for pid in pids: - d = self._children[pid] - d['file'].close() - d['file'] = None - d['avail'] = False - - # Clean up all child processes. - self._cleanupChildren() - - # Restore signal handlers. - self._restoreSignalHandlers() - - # Return bool based on whether or not SIGHUP was received. - return self._hupReceived - - def _cleanupChildren(self): - """ - Closes all child sockets (letting those that are available know - that it's time to exit). Sends SIGINT to those that are currently - processing (and hopes that it finishses ASAP). - - Any children remaining after 10 seconds is SIGKILLed. - """ - # Let all children know it's time to go. - for pid,d in self._children.items(): - if d['file'] is not None: - d['file'].close() - d['file'] = None - if not d['avail']: - # Child is unavailable. SIGINT it. - try: - os.kill(pid, signal.SIGINT) - except OSError, e: - if e[0] != errno.ESRCH: - raise - - def alrmHandler(signum, frame): - pass - - # Set up alarm to wake us up after 10 seconds. - oldSIGALRM = signal.getsignal(signal.SIGALRM) - signal.signal(signal.SIGALRM, alrmHandler) - signal.alarm(10) - - # Wait for all children to die. - while len(self._children): - try: - pid, status = os.wait() - except OSError, e: - if e[0] in (errno.ECHILD, errno.EINTR): - break - if self._children.has_key(pid): - del self._children[pid] - - signal.signal(signal.SIGALRM, oldSIGALRM) - - # Forcefully kill any remaining children. - for pid in self._children.keys(): - try: - os.kill(pid, signal.SIGKILL) - except OSError, e: - if e[0] != errno.ESRCH: - raise - - def _reapChildren(self): - """Cleans up self._children whenever children die.""" - while True: - try: - pid, status = os.waitpid(-1, os.WNOHANG) - except OSError, e: - if e[0] == errno.ECHILD: - break - raise - if pid <= 0: - break - if self._children.has_key(pid): # Sanity check. - if self._children[pid]['file'] is not None: - self._children[pid]['file'].close() - del self._children[pid] - - def _spawnChild(self, sock): - """ - Spawn a single child. Returns True if successful, False otherwise. - """ - # This socket pair is used for very simple communication between - # the parent and its children. - parent, child = socket.socketpair() - parent.setblocking(0) - setCloseOnExec(parent) - child.setblocking(0) - setCloseOnExec(child) - try: - pid = os.fork() - except OSError, e: - if e[0] in (errno.EAGAIN, errno.ENOMEM): - return False # Can't fork anymore. - raise - if not pid: - # Child - child.close() - # Put child into its own process group. - pid = os.getpid() - os.setpgid(pid, pid) - # Restore signal handlers. - self._restoreSignalHandlers() - # Close copies of child sockets. - for f in [x['file'] for x in self._children.values() - if x['file'] is not None]: - f.close() - self._children = {} - try: - # Enter main loop. - self._child(sock, parent) - except KeyboardInterrupt: - pass - sys.exit(0) - else: - # Parent - parent.close() - d = self._children[pid] = {} - d['file'] = child - d['avail'] = True - return True - - def _isClientAllowed(self, addr): - """Override to provide access control.""" - return True - - def _child(self, sock, parent): - """Main loop for children.""" - requestCount = 0 - - # For the moment we fix the number of threads per process exactly - threadPool = ThreadPool( minSpare=self._maxThreads, - maxSpare=self._maxThreads, - maxThreads=self._maxThreads ) - - activeThreads = [0] - activeThreadsLock = threading.Lock() - - underCapacity = threading.Event() - underCapacity.set() - - def jobFinished(): - activeThreadsLock.acquire() - try: - if activeThreads[0] == self._maxThreads: - underCapacity.set() - # Tell parent we're free again. - try: - parent.send('\xff') - except socket.error, e: - if e[0] == errno.EPIPE: - # Parent is gone. - return - raise - activeThreads[0] -= 1 - finally: - activeThreadsLock.release() - - class JobClassWrapper( object ): - def __init__( self, job ): - self.job = job - def run( self ): - self.job.run() - jobFinished() - - while True: - - # If all threads are busy, block - underCapacity.wait() - - # Wait for any activity on the main socket or parent socket. - r, w, e = select.select([sock, parent], [], []) - - for f in r: - # If there's any activity on the parent socket, it - # means the parent wants us to die or has died itself. - # Either way, exit. - if f is parent: - return - - # Otherwise, there's activity on the main socket... - try: - clientSock, addr = sock.accept() - except socket.error, e: - if e[0] == errno.EAGAIN: - # Or maybe not. - continue - raise - - setCloseOnExec(clientSock) - - # Check if this client is allowed. - if not self._isClientAllowed(addr): - clientSock.close() - continue - - # Notify parent if we're no longer available. - activeThreadsLock.acquire() - try: - activeThreads[0] += 1 - if activeThreads[0] == self._maxThreads: - # No longer under capacity - underCapacity.clear() - # Tell parent - try: - parent.send('\x00') - except socket.error, e: - # If parent is gone, finish up this request. - if e[0] != errno.EPIPE: - raise - finally: - activeThreadsLock.release() - - ## # Do the job. - ## self._jobClass(clientSock, addr, *self._jobArgs).run() - - ## print "Dispatching job" - - # Hand off to Connection. - conn = JobClassWrapper( self._jobClass(clientSock, addr, *self._jobArgs) ) - # Since we track maxThreads we can allow queueing here, just queues - # long enough for the callback above to finish. - if not threadPool.addJob(conn, allowQueuing=True): - # Should never happen since we track maxThreads carefully - # outside of the pool - raise Exception( "Something has gone terribly wrong" ) - - # If we've serviced the maximum number of requests, exit. - if self._maxRequests > 0: - requestCount += 1 - if requestCount >= self._maxRequests: - # Need to allow threads to finish up here. - break - - # Signal handlers - - def _hupHandler(self, signum, frame): - self._keepGoing = False - self._hupReceived = True - - def _intHandler(self, signum, frame): - self._keepGoing = False - - def _chldHandler(self, signum, frame): - # Do nothing (breaks us out of select and allows us to reap children). - pass - - def _installSignalHandlers(self): - supportedSignals = [signal.SIGINT, signal.SIGTERM] - if hasattr(signal, 'SIGHUP'): - supportedSignals.append(signal.SIGHUP) - - self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals] - - for sig in supportedSignals: - if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP: - signal.signal(sig, self._hupHandler) - else: - signal.signal(sig, self._intHandler) - - def _restoreSignalHandlers(self): - """Restores previous signal handlers.""" - for signum,handler in self._oldSIGs: - signal.signal(signum, handler) - -if __name__ == '__main__': - class TestJob(object): - def __init__(self, sock, addr): - self._sock = sock - self._addr = addr - def run(self): - print "Client connection opened from %s:%d" % self._addr - self._sock.send('Hello World!\n') - self._sock.setblocking(1) - self._sock.recv(1) - self._sock.close() - print "Client connection closed from %s:%d" % self._addr - sock = socket.socket() - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('', 8080)) - sock.listen(socket.SOMAXCONN) - PreforkThreadedServer(maxChildren=10, jobClass=TestJob).run(sock) diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/fork_server.py --- a/lib/galaxy/web/framework/servers/fork_server.py Fri Aug 28 18:03:28 2009 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,231 +0,0 @@ -""" -HTTPServer implementation that uses a thread pool based SocketServer (similar -to the approach used by CherryPy) and the WSGIHandler request handler from -Paste. - -NOTE: NOT HEAVILY TESTED, DO NOT USE IN PRODUCTION! -""" - -import SocketServer -import Queue -import threading -import thread -import sys -import socket -import select -import os -import signal -import errno - -import logging -log = logging.getLogger( __name__ ) - -import pkg_resources; -pkg_resources.require( "Paste" ) -from paste.httpserver import WSGIHandler - -class ThreadPool( object ): - """ - Generic thread pool with a queue of callables to consume - """ - SHUTDOWN = object() - def __init__( self, nworkers, name="ThreadPool" ): - """ - Create thread pool with `nworkers` worker threads - """ - self.nworkers = nworkers - self.name = name - self.queue = Queue.Queue() - self.workers = [] - self.worker_tracker = {} - for i in range( self.nworkers ): - worker = threading.Thread( target=self.worker_thread_callback, - name=( "%s worker %d" % ( self.name, i ) ) ) - worker.start() - self.workers.append( worker ) - def worker_thread_callback( self ): - """ - Worker thread should call this method to get and process queued - callables - """ - while 1: - runnable = self.queue.get() - if runnable is ThreadPool.SHUTDOWN: - return - else: - self.worker_tracker[thread.get_ident()] = [None, None] - try: - runnable() - finally: - try: - del self.worker_tracker[thread.get_ident()] - except KeyError: - pass - def shutdown( self ): - """ - Shutdown the queue (after finishing any pending requests) - """ - # Add a shutdown request for every worker - for i in range( self.nworkers ): - self.queue.put( ThreadPool.SHUTDOWN ) - # Wait for each thread to terminate - for worker in self.workers: - worker.join() - -class PreforkThreadPoolServer( SocketServer.TCPServer ): - """ - Server that uses a pool of threads for request handling - """ - allow_reuse_address = 1 - def __init__( self, server_address, request_handler, nworkers, nprocesses ): - # Create and start the workers - self.nprocesses = nprocesses - self.nworkers = nworkers - self.running = True - assert nworkers > 0, "ThreadPoolServer must have at least one worker" - # Call the base class constructor - SocketServer.TCPServer.__init__( self, server_address, request_handler ) - - def get_request( self ): - self.socket_lock.acquire() - try: - return self.socket.accept() - finally: - self.socket_lock.release() - - def serve_forever(self): - """ - Overrides `serve_forever` to shutdown cleanly. - """ - log.info( "Serving requests..." ) - # Pre-fork each child - children = [] - for i in range( self.nprocesses ): - pid = os.fork() - if pid: - # We are in the parent process - children.append( pid ) - else: - # We are in the child process - signal.signal( signal.SIGINT, self.child_sigint_handler ) - self.time_to_terminate = threading.Event() - self.socket_lock = threading.Lock() - self.pid = os.getpid() - self.serve_forever_child() - sys.exit( 0 ) - # Wait - try: - while len( children ) > 0: - pid, status = os.wait() - children.remove( pid ) - except KeyboardInterrupt: - # Cleanup, kill all children - print "Killing Children" - for child in children: - os.kill( child, signal.SIGINT ) - # Setup and alarm for 10 seconds - signal.signal( signal.SIGALRM, lambda x, y: None ) - signal.alarm( 10 ) - # Wait - while len( children ) > 0: - try: - pid, status = os.wait() - children.remove( pid ) - except OSError, e: - if e[0] in (errno.ECHILD, errno.EINTR): - break - # Kill any left - print "Killing" - for child in children: - os.kill( child, signal.SIGKILL ) - log.info( "Shutting down..." ) - - def serve_forever_child( self ): - # self.thread_pool = ThreadPool( self.nworkers, "ThreadPoolServer on %s:%d" % self.server_address ) - self.workers = [] - for i in range( self.nworkers ): - worker = threading.Thread( target=self.serve_forever_thread ) - worker.start() - self.workers.append( worker ) - self.time_to_terminate.wait() - print "Terminating" - for thread in self.workers: - thread.join() - self.socket.close() - - def serve_forever_thread( self ): - while self.running: - self.handle_request() - - def child_sigint_handler( self, signum, frame ): - print "Shutting down child" - self.shutdown() - - def shutdown( self ): - """ - Finish pending requests and shutdown the server - """ - self.running = False - self.time_to_terminate.set() - - ## def server_activate(self): - ## """ - ## Overrides server_activate to set timeout on our listener socket - ## """ - ## # We set the timeout here so that we can trap ^C on windows - ## self.socket.settimeout(1) - ## SocketServer.TCPServer.server_activate(self) - -class WSGIPreforkThreadPoolServer( PreforkThreadPoolServer ): - """ - Server that mixes ThreadPoolServer and WSGIHandler - """ - def __init__( self, wsgi_application, server_address, *args, **kwargs ): - PreforkThreadPoolServer.__init__( self, server_address, WSGIHandler, *args, **kwargs ) - self.wsgi_application = wsgi_application - self.wsgi_socket_timeout = None - def get_request(self): - # If there is a socket_timeout, set it on the accepted - (conn,info) = PreforkThreadPoolServer.get_request(self) - if self.wsgi_socket_timeout: - conn.settimeout(self.wsgi_socket_timeout) - return (conn, info) - - - - - -def serve( wsgi_app, global_conf, host="127.0.0.1", port="8080", - server_version=None, protocol_version=None, start_loop=True, - daemon_threads=None, socket_timeout=None, nworkers=10, nprocesses=10 ): - """ - Similar to `paste.httpserver.serve` but using the thread pool server - """ - server_address = ( host, int( port ) ) - - # if server_version: - # handler.server_version = server_version - # handler.sys_version = None - # if protocol_version: - # assert protocol_version in ('HTTP/0.9','HTTP/1.0','HTTP/1.1') - # handler.protocol_version = protocol_version - - server = WSGIPreforkThreadPoolServer( wsgi_app, server_address, int( nworkers ), int( nprocesses ) ) - if daemon_threads: - server.daemon_threads = daemon_threads - if socket_timeout: - server.wsgi_socket_timeout = int(socket_timeout) - - print "serving on %s:%s" % server.server_address - if start_loop: - try: - server.serve_forever() - except KeyboardInterrupt: - # allow CTRL+C to shutdown - pass - return server - -if __name__ == '__main__': - from paste.wsgilib import dump_environ - serve(dump_environ, {}, server_version="Wombles/1.0", - protocol_version="HTTP/1.1", port="8881") diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/threadpool_server.py --- a/lib/galaxy/web/framework/servers/threadpool_server.py Fri Aug 28 18:03:28 2009 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,219 +0,0 @@ -""" -HTTPServer implementation that uses a thread pool based SocketServer (similar -to the approach used by CherryPy) and the WSGIHandler request handler from -Paste. - -NOTE: Most of the improvments from this implementation have been moved into - the Paste HTTP server, this should be considered deprecated. - -Preliminary numbers from "ab -c 50 -n 500 http://localhost:8080/", all tests -with transaction level logging. Application processes a simple cheetah -template (using compiled NameMapper). - -CherryPy 2.1 ------------- - -Percentage of the requests served within a certain time (ms) - 50% 354 - 66% 452 - 75% 601 - 80% 674 - 90% 2868 - 95% 3000 - 98% 3173 - 99% 3361 - 100% 6145 (last request) - -Paste with Paste#http server (ThreadingMixIn based) ---------------------------------------------------- - -Percentage of the requests served within a certain time (ms) - 50% 84 - 66% 84 - 75% 84 - 80% 84 - 90% 85 - 95% 86 - 98% 92 - 99% 97 - 100% 99 (last request) - -This module ------------ - -Percentage of the requests served within a certain time (ms) - 50% 19 - 66% 23 - 75% 26 - 80% 29 - 90% 41 - 95% 50 - 98% 70 - 99% 80 - 100% 116 (last request) - -""" - -import SocketServer -import Queue -import threading -import socket - -import logging -log = logging.getLogger( __name__ ) - -import pkg_resources; -pkg_resources.require( "Paste" ) -from paste.httpserver import WSGIHandler - -class ThreadPool( object ): - """ - Generic thread pool with a queue of callables to consume - """ - SHUTDOWN = object() - def __init__( self, nworkers, name="ThreadPool" ): - """ - Create thread pool with `nworkers` worker threads - """ - self.nworkers = nworkers - self.name = name - self.queue = Queue.Queue() - self.workers = [] - for i in range( self.nworkers ): - worker = threading.Thread( target=self.worker_thread_callback, - name=( "%s worker %d" % ( self.name, i ) ) ) - worker.start() - self.workers.append( worker ) - def worker_thread_callback( self ): - """ - Worker thread should call this method to get and process queued - callables - """ - while 1: - runnable = self.queue.get() - if runnable is ThreadPool.SHUTDOWN: - return - else: - runnable() - def shutdown( self ): - """ - Shutdown the queue (after finishing any pending requests) - """ - # Add a shutdown request for every worker - for i in range( self.nworkers ): - self.queue.put( ThreadPool.SHUTDOWN ) - # Wait for each thread to terminate - for worker in self.workers: - worker.join() - -class ThreadPoolServer( SocketServer.TCPServer ): - """ - Server that uses a pool of threads for request handling - """ - allow_reuse_address = 1 - def __init__( self, server_address, request_handler, nworkers ): - # Create and start the workers - self.running = True - assert nworkers > 0, "ThreadPoolServer must have at least one worker" - self.thread_pool = ThreadPool( nworkers, "ThreadPoolServer on %s:%d" % server_address ) - # Call the base class constructor - SocketServer.TCPServer.__init__( self, server_address, request_handler ) - def process_request( self, request, client_address ): - """ - Queue the request to be processed by on of the thread pool threads - """ - # This sets the socket to blocking mode (and no timeout) since it - # may take the thread pool a little while to get back to it. (This - # is the default but since we set a timeout on the parent socket so - # that we can trap interrupts we need to restore this,.) - request.setblocking( 1 ) - # Queue processing of the request - self.thread_pool.queue.put( lambda: self.process_request_in_thread( request, client_address ) ) - def process_request_in_thread( self, request, client_address ): - """ - The worker thread should call back here to do the rest of the - request processing. - """ - try: - self.finish_request( request, client_address ) - self.close_request( request) - except: - self.handle_error( request, client_address ) - self.close_request( request ) - def serve_forever(self): - """ - Overrides `serve_forever` to shutdown cleanly. - """ - try: - log.info( "Serving requests..." ) - while self.running: - try: - self.handle_request() - except socket.timeout: - # Timeout is expected, gives interrupts a chance to - # propogate, just keep handling - pass - log.info( "Shutting down..." ) - finally: - self.thread_pool.shutdown() - def shutdown( self ): - """ - Finish pending requests and shutdown the server - """ - self.running = False - self.socket.close() - def server_activate(self): - """ - Overrides server_activate to set timeout on our listener socket - """ - # We set the timeout here so that we can trap ^C on windows - self.socket.settimeout(1) - SocketServer.TCPServer.server_activate(self) - -class WSGIThreadPoolServer( ThreadPoolServer ): - """ - Server that mixes ThreadPoolServer and WSGIHandler - """ - def __init__( self, wsgi_application, server_address, *args, **kwargs ): - ThreadPoolServer.__init__( self, server_address, WSGIHandler, *args, **kwargs ) - self.wsgi_application = wsgi_application - self.wsgi_socket_timeout = None - def get_request(self): - # If there is a socket_timeout, set it on the accepted - (conn,info) = ThreadPoolServer.get_request(self) - if self.wsgi_socket_timeout: - conn.settimeout(self.wsgi_socket_timeout) - return (conn, info) - -def serve( wsgi_app, global_conf, host="127.0.0.1", port="8080", - server_version=None, protocol_version=None, start_loop=True, - daemon_threads=None, socket_timeout=None, nworkers=10 ): - """ - Similar to `paste.httpserver.serve` but using the thread pool server - """ - server_address = ( host, int( port ) ) - - if server_version: - handler.server_version = server_version - handler.sys_version = None - if protocol_version: - assert protocol_version in ('HTTP/0.9','HTTP/1.0','HTTP/1.1') - handler.protocol_version = protocol_version - - server = WSGIThreadPoolServer( wsgi_app, server_address, int( nworkers ) ) - if daemon_threads: - server.daemon_threads = daemon_threads - if socket_timeout: - server.wsgi_socket_timeout = int(socket_timeout) - - print "serving on %s:%s" % server.server_address - if start_loop: - try: - server.serve_forever() - except KeyboardInterrupt: - # allow CTRL+C to shutdown - pass - return server - - -
participants (1)
-
Greg Von Kuster