details:
http://www.bx.psu.edu/hg/galaxy/rev/6b924dd68e77
changeset: 2653:6b924dd68e77
user: James Taylor <james(a)jamestaylor.org>
date: Fri Aug 28 18:09:43 2009 -0400
description:
Removing the galalxy.web.framework.servers module (unsued for a long time)
6 file(s) affected in this change:
lib/galaxy/web/framework/servers/__init__.py
lib/galaxy/web/framework/servers/flup/__init__.py
lib/galaxy/web/framework/servers/flup/ajp_forkthreaded.py
lib/galaxy/web/framework/servers/flup/preforkthreadedserver.py
lib/galaxy/web/framework/servers/fork_server.py
lib/galaxy/web/framework/servers/threadpool_server.py
diffs (1154 lines):
diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/__init__.py
--- a/lib/galaxy/web/framework/servers/__init__.py Fri Aug 28 18:03:28 2009 -0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,3 +0,0 @@
-"""
-Various WSGI webserver implementations.
-"""
\ No newline at end of file
diff -r fba947d16fa7 -r 6b924dd68e77
lib/galaxy/web/framework/servers/flup/ajp_forkthreaded.py
--- a/lib/galaxy/web/framework/servers/flup/ajp_forkthreaded.py Fri Aug 28 18:03:28 2009
-0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,211 +0,0 @@
-# Copyright (c) 2005, 2006 Allan Saddi <allan(a)saddi.com>
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
-#
-# $Id: ajp_fork.py 2188 2006-12-05 22:11:45Z asaddi $
-
-"""
-ajp - an AJP 1.3/WSGI gateway.
-
-For more information about AJP and AJP connectors for your web server, see
-<http://jakarta.apache.org/tomcat/connectors-doc/>.
-
-For more information about the Web Server Gateway Interface, see
-<http://www.python.org/peps/pep-0333.html>.
-
-Example usage:
-
- #!/usr/bin/env python
- import sys
- from myapplication import app # Assume app is your WSGI application object
- from ajp import WSGIServer
- ret = WSGIServer(app).run()
- sys.exit(ret and 42 or 0)
-
-See the documentation for WSGIServer for more information.
-
-About the bit of logic at the end:
-Upon receiving SIGHUP, the python script will exit with status code 42. This
-can be used by a wrapper script to determine if the python script should be
-re-run. When a SIGINT or SIGTERM is received, the script exits with status
-code 0, possibly indicating a normal exit.
-
-Example wrapper script:
-
- #!/bin/sh
- STATUS=42
- while test $STATUS -eq 42; do
- python "$@" that_script_above.py
- STATUS=$?
- done
-
-Example workers.properties (for mod_jk):
-
- worker.list=foo
- worker.foo.port=8009
- worker.foo.host=localhost
- worker.foo.type=ajp13
-
-Example httpd.conf (for mod_jk):
-
- JkWorkersFile /path/to/workers.properties
- JkMount /* foo
-
-Note that if you mount your ajp application anywhere but the root ("/"), you
-SHOULD specifiy scriptName to the WSGIServer constructor. This will ensure
-that SCRIPT_NAME/PATH_INFO are correctly deduced.
-"""
-
-__author__ = 'Allan Saddi <allan(a)saddi.com>'
-__version__ = '$Revision: 2188 $'
-
-import socket
-import logging
-
-from flup.server.ajp_base import BaseAJPServer, Connection
-from preforkthreadedserver import PreforkThreadedServer
-
-__all__ = ['WSGIServer']
-
-class WSGIServer(BaseAJPServer, PreforkThreadedServer):
- """
- AJP1.3/WSGI server. Runs your WSGI application as a persistant program
- that understands AJP1.3. Opens up a TCP socket, binds it, and then
- waits for forwarded requests from your webserver.
-
- Why AJP? Two good reasons are that AJP provides load-balancing and
- fail-over support. Personally, I just wanted something new to
- implement. :)
-
- Of course you will need an AJP1.3 connector for your webserver (e.g.
- mod_jk) - see <
http://jakarta.apache.org/tomcat/connectors-doc/>.
- """
- def __init__(self, application, scriptName='', environ=None,
- bindAddress=('localhost', 8009), allowedServers=None,
- loggingLevel=logging.INFO, debug=True, **kw):
- """
- scriptName is the initial portion of the URL path that "belongs"
- to your application. It is used to determine PATH_INFO (which doesn't
- seem to be passed in). An empty scriptName means your application
- is mounted at the root of your virtual host.
-
- environ, which must be a dictionary, can contain any additional
- environment variables you want to pass to your application.
-
- bindAddress is the address to bind to, which must be a tuple of
- length 2. The first element is a string, which is the host name
- or IPv4 address of a local interface. The 2nd element is the port
- number.
-
- allowedServers must be None or a list of strings representing the
- IPv4 addresses of servers allowed to connect. None means accept
- connections from anywhere.
-
- loggingLevel sets the logging level of the module-level logger.
- """
- BaseAJPServer.__init__(self, application,
- scriptName=scriptName,
- environ=environ,
- multithreaded=False,
- multiprocess=True,
- bindAddress=bindAddress,
- allowedServers=allowedServers,
- loggingLevel=loggingLevel,
- debug=debug)
- for key in ('multithreaded', 'multiprocess', 'jobClass',
'jobArgs'):
- if kw.has_key(key):
- del kw[key]
- PreforkThreadedServer.__init__(self, jobClass=Connection, jobArgs=(self,), **kw)
-
- def run(self):
- """
- Main loop. Call this after instantiating WSGIServer. SIGHUP, SIGINT,
- SIGQUIT, SIGTERM cause it to cleanup and return. (If a SIGHUP
- is caught, this method returns True. Returns False otherwise.)
- """
- self.logger.info('%s starting up', self.__class__.__name__)
-
- try:
- sock = self._setupSocket()
- except socket.error, e:
- self.logger.error('Failed to bind socket (%s), exiting', e[1])
- return False
-
- ret = PreforkThreadedServer.run(self, sock)
-
- self._cleanupSocket(sock)
-
- self.logger.info('%s shutting down%s', self.__class__.__name__,
- self._hupReceived and ' (reload requested)' or
'')
-
- return ret
-
-def paste_factory_helper(wsgiServerClass, global_conf, host, port, **local_conf):
- # I think I can't write a tuple for bindAddress in .ini file
- host = host or global_conf.get('host', 'localhost')
- port = port or global_conf.get('port', 4000)
-
- local_conf['bindAddress'] = (host, int(port))
-
- def server(application):
- server = wsgiServerClass(application, **local_conf)
- server.run()
-
- return server
-
-def factory(global_conf, host=None, port=None, **local):
- return paste_factory_helper(WSGIServer, global_conf, host, port, **local)
-
-if __name__ == '__main__':
- def test_app(environ, start_response):
- """Probably not the most efficient example."""
- import cgi
- start_response('200 OK', [('Content-Type',
'text/html')])
- yield '<html><head><title>Hello
World!</title></head>\n' \
- '<body>\n' \
- '<p>Hello World!</p>\n' \
- '<table border="1">'
- names = environ.keys()
- names.sort()
- for name in names:
- yield
'<tr><td>%s</td><td>%s</td></tr>\n' % (
- name, cgi.escape(`environ[name]`))
-
- form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ,
- keep_blank_values=1)
- if form.list:
- yield '<tr><th colspan="2">Form
data</th></tr>'
-
- for field in form.list:
- yield
'<tr><td>%s</td><td>%s</td></tr>\n' % (
- field.name, field.value)
-
- yield '</table>\n' \
- '</body></html>\n'
-
- from wsgiref import validate
- test_app = validate.validator(test_app)
- # Explicitly set bindAddress to *:4001 for testing.
- WSGIServer(test_app,
- bindAddress=('', 4001), allowedServers=None,
- loggingLevel=logging.DEBUG).run()
diff -r fba947d16fa7 -r 6b924dd68e77
lib/galaxy/web/framework/servers/flup/preforkthreadedserver.py
--- a/lib/galaxy/web/framework/servers/flup/preforkthreadedserver.py Fri Aug 28 18:03:28
2009 -0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,469 +0,0 @@
-# Copyright (c) 2005 Allan Saddi <allan(a)saddi.com>
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
-# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
-# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
-# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
-# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
-# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-# SUCH DAMAGE.
-#
-# $Id: preforkserver.py 2311 2007-01-23 00:05:04Z asaddi $
-
-__author__ = 'Allan Saddi <allan(a)saddi.com>'
-__version__ = '$Revision: 2311 $'
-
-import sys
-import os
-import socket
-import select
-import errno
-import signal
-import threading
-
-try:
- import fcntl
-except ImportError:
- def setCloseOnExec(sock):
- pass
-else:
- def setCloseOnExec(sock):
- fcntl.fcntl(sock.fileno(), fcntl.F_SETFD, fcntl.FD_CLOEXEC)
-
-from flup.server.threadpool import ThreadPool
-
-# If running Python < 2.4, require eunuchs module for socket.socketpair().
-# See <
http://www.inoi.fi/open/trac/eunuchs>.
-if not hasattr(socket, 'socketpair'):
- try:
- import eunuchs.socketpair
- except ImportError:
- # TODO: Other alternatives? Perhaps using os.pipe()?
- raise ImportError, 'Requires eunuchs module for Python < 2.4'
-
- def socketpair():
- s1, s2 = eunuchs.socketpair.socketpair()
- p, c = (socket.fromfd(s1, socket.AF_UNIX, socket.SOCK_STREAM),
- socket.fromfd(s2, socket.AF_UNIX, socket.SOCK_STREAM))
- os.close(s1)
- os.close(s2)
- return p, c
-
- socket.socketpair = socketpair
-
-class PreforkThreadedServer(object):
- """
- A preforked server model conceptually similar to Apache httpd(2). At
- any given time, ensures there are at least minSpare children ready to
- process new requests (up to a maximum of maxChildren children total).
- If the number of idle children is ever above maxSpare, the extra
- children are killed.
-
- If maxRequests is positive, each child will only handle that many
- requests in its lifetime before exiting.
-
- jobClass should be a class whose constructor takes at least two
- arguments: the client socket and client address. jobArgs, which
- must be a list or tuple, is any additional (static) arguments you
- wish to pass to the constructor.
-
- jobClass should have a run() method (taking no arguments) that does
- the actual work. When run() returns, the request is considered
- complete and the child process moves to idle state.
- """
- def __init__(self, minSpare=32, maxSpare=32, maxChildren=32,
- maxRequests=0, maxThreads=10, jobClass=None, jobArgs=()):
- self._minSpare = int( minSpare )
- self._maxSpare = int( maxSpare )
- self._maxChildren = max(maxSpare, int( maxChildren ) )
- self._maxRequests = int( maxRequests )
- self._maxThreads = int( maxThreads )
- self._jobClass = jobClass
- self._jobArgs = jobArgs
-
- # Internal state of children. Maps pids to dictionaries with two
- # members: 'file' and 'avail'. 'file' is the socket to
that
- # individidual child and 'avail' is whether or not the child is
- # free to process requests.
- self._children = {}
-
- def run(self, sock):
- """
- The main loop. Pass a socket that is ready to accept() client
- connections. Return value will be True or False indiciating whether
- or not the loop was exited due to SIGHUP.
- """
- # Set up signal handlers.
- self._keepGoing = True
- self._hupReceived = False
- self._installSignalHandlers()
-
- # Don't want operations on main socket to block.
- sock.setblocking(0)
-
- # Set close-on-exec
- setCloseOnExec(sock)
-
- # Main loop.
- while self._keepGoing:
- # Maintain minimum number of children.
- while len(self._children) < self._maxSpare:
- if not self._spawnChild(sock): break
-
- # Wait on any socket activity from live children.
- r = [x['file'] for x in self._children.values()
- if x['file'] is not None]
-
- if len(r) == len(self._children):
- timeout = None
- else:
- # There are dead children that need to be reaped, ensure
- # that they are by timing out, if necessary.
- timeout = 2
-
- try:
- r, w, e = select.select(r, [], [], timeout)
- except select.error, e:
- if e[0] != errno.EINTR:
- raise
-
- # Scan child sockets and tend to those that need attention.
- for child in r:
- # Receive status byte.
- try:
- state = child.recv(1)
- except socket.error, e:
- if e[0] in (errno.EAGAIN, errno.EINTR):
- # Guess it really didn't need attention?
- continue
- raise
- # Try to match it with a child. (Do we need a reverse map?)
- for pid,d in self._children.items():
- if child is d['file']:
- if state:
- # Set availability status accordingly.
- self._children[pid]['avail'] = state !=
'\x00'
- else:
- # Didn't receive anything. Child is most likely
- # dead.
- d = self._children[pid]
- d['file'].close()
- d['file'] = None
- d['avail'] = False
-
- # Reap children.
- self._reapChildren()
-
- # See who and how many children are available.
- availList = filter(lambda x: x[1]['avail'], self._children.items())
- avail = len(availList)
-
- if avail < self._minSpare:
- # Need to spawn more children.
- while avail < self._minSpare and \
- len(self._children) < self._maxChildren:
- if not self._spawnChild(sock): break
- avail += 1
- elif avail > self._maxSpare:
- # Too many spares, kill off the extras.
- pids = [x[0] for x in availList]
- pids.sort()
- pids = pids[self._maxSpare:]
- for pid in pids:
- d = self._children[pid]
- d['file'].close()
- d['file'] = None
- d['avail'] = False
-
- # Clean up all child processes.
- self._cleanupChildren()
-
- # Restore signal handlers.
- self._restoreSignalHandlers()
-
- # Return bool based on whether or not SIGHUP was received.
- return self._hupReceived
-
- def _cleanupChildren(self):
- """
- Closes all child sockets (letting those that are available know
- that it's time to exit). Sends SIGINT to those that are currently
- processing (and hopes that it finishses ASAP).
-
- Any children remaining after 10 seconds is SIGKILLed.
- """
- # Let all children know it's time to go.
- for pid,d in self._children.items():
- if d['file'] is not None:
- d['file'].close()
- d['file'] = None
- if not d['avail']:
- # Child is unavailable. SIGINT it.
- try:
- os.kill(pid, signal.SIGINT)
- except OSError, e:
- if e[0] != errno.ESRCH:
- raise
-
- def alrmHandler(signum, frame):
- pass
-
- # Set up alarm to wake us up after 10 seconds.
- oldSIGALRM = signal.getsignal(signal.SIGALRM)
- signal.signal(signal.SIGALRM, alrmHandler)
- signal.alarm(10)
-
- # Wait for all children to die.
- while len(self._children):
- try:
- pid, status = os.wait()
- except OSError, e:
- if e[0] in (errno.ECHILD, errno.EINTR):
- break
- if self._children.has_key(pid):
- del self._children[pid]
-
- signal.signal(signal.SIGALRM, oldSIGALRM)
-
- # Forcefully kill any remaining children.
- for pid in self._children.keys():
- try:
- os.kill(pid, signal.SIGKILL)
- except OSError, e:
- if e[0] != errno.ESRCH:
- raise
-
- def _reapChildren(self):
- """Cleans up self._children whenever children
die."""
- while True:
- try:
- pid, status = os.waitpid(-1, os.WNOHANG)
- except OSError, e:
- if e[0] == errno.ECHILD:
- break
- raise
- if pid <= 0:
- break
- if self._children.has_key(pid): # Sanity check.
- if self._children[pid]['file'] is not None:
- self._children[pid]['file'].close()
- del self._children[pid]
-
- def _spawnChild(self, sock):
- """
- Spawn a single child. Returns True if successful, False otherwise.
- """
- # This socket pair is used for very simple communication between
- # the parent and its children.
- parent, child = socket.socketpair()
- parent.setblocking(0)
- setCloseOnExec(parent)
- child.setblocking(0)
- setCloseOnExec(child)
- try:
- pid = os.fork()
- except OSError, e:
- if e[0] in (errno.EAGAIN, errno.ENOMEM):
- return False # Can't fork anymore.
- raise
- if not pid:
- # Child
- child.close()
- # Put child into its own process group.
- pid = os.getpid()
- os.setpgid(pid, pid)
- # Restore signal handlers.
- self._restoreSignalHandlers()
- # Close copies of child sockets.
- for f in [x['file'] for x in self._children.values()
- if x['file'] is not None]:
- f.close()
- self._children = {}
- try:
- # Enter main loop.
- self._child(sock, parent)
- except KeyboardInterrupt:
- pass
- sys.exit(0)
- else:
- # Parent
- parent.close()
- d = self._children[pid] = {}
- d['file'] = child
- d['avail'] = True
- return True
-
- def _isClientAllowed(self, addr):
- """Override to provide access control."""
- return True
-
- def _child(self, sock, parent):
- """Main loop for children."""
- requestCount = 0
-
- # For the moment we fix the number of threads per process exactly
- threadPool = ThreadPool( minSpare=self._maxThreads,
- maxSpare=self._maxThreads,
- maxThreads=self._maxThreads )
-
- activeThreads = [0]
- activeThreadsLock = threading.Lock()
-
- underCapacity = threading.Event()
- underCapacity.set()
-
- def jobFinished():
- activeThreadsLock.acquire()
- try:
- if activeThreads[0] == self._maxThreads:
- underCapacity.set()
- # Tell parent we're free again.
- try:
- parent.send('\xff')
- except socket.error, e:
- if e[0] == errno.EPIPE:
- # Parent is gone.
- return
- raise
- activeThreads[0] -= 1
- finally:
- activeThreadsLock.release()
-
- class JobClassWrapper( object ):
- def __init__( self, job ):
- self.job = job
- def run( self ):
- self.job.run()
- jobFinished()
-
- while True:
-
- # If all threads are busy, block
- underCapacity.wait()
-
- # Wait for any activity on the main socket or parent socket.
- r, w, e = select.select([sock, parent], [], [])
-
- for f in r:
- # If there's any activity on the parent socket, it
- # means the parent wants us to die or has died itself.
- # Either way, exit.
- if f is parent:
- return
-
- # Otherwise, there's activity on the main socket...
- try:
- clientSock, addr = sock.accept()
- except socket.error, e:
- if e[0] == errno.EAGAIN:
- # Or maybe not.
- continue
- raise
-
- setCloseOnExec(clientSock)
-
- # Check if this client is allowed.
- if not self._isClientAllowed(addr):
- clientSock.close()
- continue
-
- # Notify parent if we're no longer available.
- activeThreadsLock.acquire()
- try:
- activeThreads[0] += 1
- if activeThreads[0] == self._maxThreads:
- # No longer under capacity
- underCapacity.clear()
- # Tell parent
- try:
- parent.send('\x00')
- except socket.error, e:
- # If parent is gone, finish up this request.
- if e[0] != errno.EPIPE:
- raise
- finally:
- activeThreadsLock.release()
-
- ## # Do the job.
- ## self._jobClass(clientSock, addr, *self._jobArgs).run()
-
- ## print "Dispatching job"
-
- # Hand off to Connection.
- conn = JobClassWrapper( self._jobClass(clientSock, addr, *self._jobArgs) )
- # Since we track maxThreads we can allow queueing here, just queues
- # long enough for the callback above to finish.
- if not threadPool.addJob(conn, allowQueuing=True):
- # Should never happen since we track maxThreads carefully
- # outside of the pool
- raise Exception( "Something has gone terribly wrong" )
-
- # If we've serviced the maximum number of requests, exit.
- if self._maxRequests > 0:
- requestCount += 1
- if requestCount >= self._maxRequests:
- # Need to allow threads to finish up here.
- break
-
- # Signal handlers
-
- def _hupHandler(self, signum, frame):
- self._keepGoing = False
- self._hupReceived = True
-
- def _intHandler(self, signum, frame):
- self._keepGoing = False
-
- def _chldHandler(self, signum, frame):
- # Do nothing (breaks us out of select and allows us to reap children).
- pass
-
- def _installSignalHandlers(self):
- supportedSignals = [signal.SIGINT, signal.SIGTERM]
- if hasattr(signal, 'SIGHUP'):
- supportedSignals.append(signal.SIGHUP)
-
- self._oldSIGs = [(x,signal.getsignal(x)) for x in supportedSignals]
-
- for sig in supportedSignals:
- if hasattr(signal, 'SIGHUP') and sig == signal.SIGHUP:
- signal.signal(sig, self._hupHandler)
- else:
- signal.signal(sig, self._intHandler)
-
- def _restoreSignalHandlers(self):
- """Restores previous signal handlers."""
- for signum,handler in self._oldSIGs:
- signal.signal(signum, handler)
-
-if __name__ == '__main__':
- class TestJob(object):
- def __init__(self, sock, addr):
- self._sock = sock
- self._addr = addr
- def run(self):
- print "Client connection opened from %s:%d" % self._addr
- self._sock.send('Hello World!\n')
- self._sock.setblocking(1)
- self._sock.recv(1)
- self._sock.close()
- print "Client connection closed from %s:%d" % self._addr
- sock = socket.socket()
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('', 8080))
- sock.listen(socket.SOMAXCONN)
- PreforkThreadedServer(maxChildren=10, jobClass=TestJob).run(sock)
diff -r fba947d16fa7 -r 6b924dd68e77 lib/galaxy/web/framework/servers/fork_server.py
--- a/lib/galaxy/web/framework/servers/fork_server.py Fri Aug 28 18:03:28 2009 -0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,231 +0,0 @@
-"""
-HTTPServer implementation that uses a thread pool based SocketServer (similar
-to the approach used by CherryPy) and the WSGIHandler request handler from
-Paste.
-
-NOTE: NOT HEAVILY TESTED, DO NOT USE IN PRODUCTION!
-"""
-
-import SocketServer
-import Queue
-import threading
-import thread
-import sys
-import socket
-import select
-import os
-import signal
-import errno
-
-import logging
-log = logging.getLogger( __name__ )
-
-import pkg_resources;
-pkg_resources.require( "Paste" )
-from paste.httpserver import WSGIHandler
-
-class ThreadPool( object ):
- """
- Generic thread pool with a queue of callables to consume
- """
- SHUTDOWN = object()
- def __init__( self, nworkers, name="ThreadPool" ):
- """
- Create thread pool with `nworkers` worker threads
- """
- self.nworkers = nworkers
- self.name = name
- self.queue = Queue.Queue()
- self.workers = []
- self.worker_tracker = {}
- for i in range( self.nworkers ):
- worker = threading.Thread( target=self.worker_thread_callback,
- name=( "%s worker %d" % ( self.name, i )
) )
- worker.start()
- self.workers.append( worker )
- def worker_thread_callback( self ):
- """
- Worker thread should call this method to get and process queued
- callables
- """
- while 1:
- runnable = self.queue.get()
- if runnable is ThreadPool.SHUTDOWN:
- return
- else:
- self.worker_tracker[thread.get_ident()] = [None, None]
- try:
- runnable()
- finally:
- try:
- del self.worker_tracker[thread.get_ident()]
- except KeyError:
- pass
- def shutdown( self ):
- """
- Shutdown the queue (after finishing any pending requests)
- """
- # Add a shutdown request for every worker
- for i in range( self.nworkers ):
- self.queue.put( ThreadPool.SHUTDOWN )
- # Wait for each thread to terminate
- for worker in self.workers:
- worker.join()
-
-class PreforkThreadPoolServer( SocketServer.TCPServer ):
- """
- Server that uses a pool of threads for request handling
- """
- allow_reuse_address = 1
- def __init__( self, server_address, request_handler, nworkers, nprocesses ):
- # Create and start the workers
- self.nprocesses = nprocesses
- self.nworkers = nworkers
- self.running = True
- assert nworkers > 0, "ThreadPoolServer must have at least one
worker"
- # Call the base class constructor
- SocketServer.TCPServer.__init__( self, server_address, request_handler )
-
- def get_request( self ):
- self.socket_lock.acquire()
- try:
- return self.socket.accept()
- finally:
- self.socket_lock.release()
-
- def serve_forever(self):
- """
- Overrides `serve_forever` to shutdown cleanly.
- """
- log.info( "Serving requests..." )
- # Pre-fork each child
- children = []
- for i in range( self.nprocesses ):
- pid = os.fork()
- if pid:
- # We are in the parent process
- children.append( pid )
- else:
- # We are in the child process
- signal.signal( signal.SIGINT, self.child_sigint_handler )
- self.time_to_terminate = threading.Event()
- self.socket_lock = threading.Lock()
- self.pid = os.getpid()
- self.serve_forever_child()
- sys.exit( 0 )
- # Wait
- try:
- while len( children ) > 0:
- pid, status = os.wait()
- children.remove( pid )
- except KeyboardInterrupt:
- # Cleanup, kill all children
- print "Killing Children"
- for child in children:
- os.kill( child, signal.SIGINT )
- # Setup and alarm for 10 seconds
- signal.signal( signal.SIGALRM, lambda x, y: None )
- signal.alarm( 10 )
- # Wait
- while len( children ) > 0:
- try:
- pid, status = os.wait()
- children.remove( pid )
- except OSError, e:
- if e[0] in (errno.ECHILD, errno.EINTR):
- break
- # Kill any left
- print "Killing"
- for child in children:
- os.kill( child, signal.SIGKILL )
- log.info( "Shutting down..." )
-
- def serve_forever_child( self ):
- # self.thread_pool = ThreadPool( self.nworkers, "ThreadPoolServer on
%s:%d" % self.server_address )
- self.workers = []
- for i in range( self.nworkers ):
- worker = threading.Thread( target=self.serve_forever_thread )
- worker.start()
- self.workers.append( worker )
- self.time_to_terminate.wait()
- print "Terminating"
- for thread in self.workers:
- thread.join()
- self.socket.close()
-
- def serve_forever_thread( self ):
- while self.running:
- self.handle_request()
-
- def child_sigint_handler( self, signum, frame ):
- print "Shutting down child"
- self.shutdown()
-
- def shutdown( self ):
- """
- Finish pending requests and shutdown the server
- """
- self.running = False
- self.time_to_terminate.set()
-
- ## def server_activate(self):
- ## """
- ## Overrides server_activate to set timeout on our listener socket
- ## """
- ## # We set the timeout here so that we can trap ^C on windows
- ## self.socket.settimeout(1)
- ## SocketServer.TCPServer.server_activate(self)
-
-class WSGIPreforkThreadPoolServer( PreforkThreadPoolServer ):
- """
- Server that mixes ThreadPoolServer and WSGIHandler
- """
- def __init__( self, wsgi_application, server_address, *args, **kwargs ):
- PreforkThreadPoolServer.__init__( self, server_address, WSGIHandler, *args,
**kwargs )
- self.wsgi_application = wsgi_application
- self.wsgi_socket_timeout = None
- def get_request(self):
- # If there is a socket_timeout, set it on the accepted
- (conn,info) = PreforkThreadPoolServer.get_request(self)
- if self.wsgi_socket_timeout:
- conn.settimeout(self.wsgi_socket_timeout)
- return (conn, info)
-
-
-
-
-
-def serve( wsgi_app, global_conf, host="127.0.0.1", port="8080",
- server_version=None, protocol_version=None, start_loop=True,
- daemon_threads=None, socket_timeout=None, nworkers=10, nprocesses=10 ):
- """
- Similar to `paste.httpserver.serve` but using the thread pool server
- """
- server_address = ( host, int( port ) )
-
- # if server_version:
- # handler.server_version = server_version
- # handler.sys_version = None
- # if protocol_version:
- # assert protocol_version in
('HTTP/0.9','HTTP/1.0','HTTP/1.1')
- # handler.protocol_version = protocol_version
-
- server = WSGIPreforkThreadPoolServer( wsgi_app, server_address, int( nworkers ), int(
nprocesses ) )
- if daemon_threads:
- server.daemon_threads = daemon_threads
- if socket_timeout:
- server.wsgi_socket_timeout = int(socket_timeout)
-
- print "serving on %s:%s" % server.server_address
- if start_loop:
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- # allow CTRL+C to shutdown
- pass
- return server
-
-if __name__ == '__main__':
- from paste.wsgilib import dump_environ
- serve(dump_environ, {}, server_version="Wombles/1.0",
- protocol_version="HTTP/1.1", port="8881")
diff -r fba947d16fa7 -r 6b924dd68e77
lib/galaxy/web/framework/servers/threadpool_server.py
--- a/lib/galaxy/web/framework/servers/threadpool_server.py Fri Aug 28 18:03:28 2009
-0400
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,219 +0,0 @@
-"""
-HTTPServer implementation that uses a thread pool based SocketServer (similar
-to the approach used by CherryPy) and the WSGIHandler request handler from
-Paste.
-
-NOTE: Most of the improvments from this implementation have been moved into
- the Paste HTTP server, this should be considered deprecated.
-
-Preliminary numbers from "ab -c 50 -n 500
http://localhost:8080/", all tests
-with transaction level logging. Application processes a simple cheetah
-template (using compiled NameMapper).
-
-CherryPy 2.1
-------------
-
-Percentage of the requests served within a certain time (ms)
- 50% 354
- 66% 452
- 75% 601
- 80% 674
- 90% 2868
- 95% 3000
- 98% 3173
- 99% 3361
- 100% 6145 (last request)
-
-Paste with Paste#http server (ThreadingMixIn based)
----------------------------------------------------
-
-Percentage of the requests served within a certain time (ms)
- 50% 84
- 66% 84
- 75% 84
- 80% 84
- 90% 85
- 95% 86
- 98% 92
- 99% 97
- 100% 99 (last request)
-
-This module
------------
-
-Percentage of the requests served within a certain time (ms)
- 50% 19
- 66% 23
- 75% 26
- 80% 29
- 90% 41
- 95% 50
- 98% 70
- 99% 80
- 100% 116 (last request)
-
-"""
-
-import SocketServer
-import Queue
-import threading
-import socket
-
-import logging
-log = logging.getLogger( __name__ )
-
-import pkg_resources;
-pkg_resources.require( "Paste" )
-from paste.httpserver import WSGIHandler
-
-class ThreadPool( object ):
- """
- Generic thread pool with a queue of callables to consume
- """
- SHUTDOWN = object()
- def __init__( self, nworkers, name="ThreadPool" ):
- """
- Create thread pool with `nworkers` worker threads
- """
- self.nworkers = nworkers
- self.name = name
- self.queue = Queue.Queue()
- self.workers = []
- for i in range( self.nworkers ):
- worker = threading.Thread( target=self.worker_thread_callback,
- name=( "%s worker %d" % ( self.name, i )
) )
- worker.start()
- self.workers.append( worker )
- def worker_thread_callback( self ):
- """
- Worker thread should call this method to get and process queued
- callables
- """
- while 1:
- runnable = self.queue.get()
- if runnable is ThreadPool.SHUTDOWN:
- return
- else:
- runnable()
- def shutdown( self ):
- """
- Shutdown the queue (after finishing any pending requests)
- """
- # Add a shutdown request for every worker
- for i in range( self.nworkers ):
- self.queue.put( ThreadPool.SHUTDOWN )
- # Wait for each thread to terminate
- for worker in self.workers:
- worker.join()
-
-class ThreadPoolServer( SocketServer.TCPServer ):
- """
- Server that uses a pool of threads for request handling
- """
- allow_reuse_address = 1
- def __init__( self, server_address, request_handler, nworkers ):
- # Create and start the workers
- self.running = True
- assert nworkers > 0, "ThreadPoolServer must have at least one
worker"
- self.thread_pool = ThreadPool( nworkers, "ThreadPoolServer on %s:%d" %
server_address )
- # Call the base class constructor
- SocketServer.TCPServer.__init__( self, server_address, request_handler )
- def process_request( self, request, client_address ):
- """
- Queue the request to be processed by on of the thread pool threads
- """
- # This sets the socket to blocking mode (and no timeout) since it
- # may take the thread pool a little while to get back to it. (This
- # is the default but since we set a timeout on the parent socket so
- # that we can trap interrupts we need to restore this,.)
- request.setblocking( 1 )
- # Queue processing of the request
- self.thread_pool.queue.put( lambda: self.process_request_in_thread( request,
client_address ) )
- def process_request_in_thread( self, request, client_address ):
- """
- The worker thread should call back here to do the rest of the
- request processing.
- """
- try:
- self.finish_request( request, client_address )
- self.close_request( request)
- except:
- self.handle_error( request, client_address )
- self.close_request( request )
- def serve_forever(self):
- """
- Overrides `serve_forever` to shutdown cleanly.
- """
- try:
- log.info( "Serving requests..." )
- while self.running:
- try:
- self.handle_request()
- except socket.timeout:
- # Timeout is expected, gives interrupts a chance to
- # propogate, just keep handling
- pass
- log.info( "Shutting down..." )
- finally:
- self.thread_pool.shutdown()
- def shutdown( self ):
- """
- Finish pending requests and shutdown the server
- """
- self.running = False
- self.socket.close()
- def server_activate(self):
- """
- Overrides server_activate to set timeout on our listener socket
- """
- # We set the timeout here so that we can trap ^C on windows
- self.socket.settimeout(1)
- SocketServer.TCPServer.server_activate(self)
-
-class WSGIThreadPoolServer( ThreadPoolServer ):
- """
- Server that mixes ThreadPoolServer and WSGIHandler
- """
- def __init__( self, wsgi_application, server_address, *args, **kwargs ):
- ThreadPoolServer.__init__( self, server_address, WSGIHandler, *args, **kwargs )
- self.wsgi_application = wsgi_application
- self.wsgi_socket_timeout = None
- def get_request(self):
- # If there is a socket_timeout, set it on the accepted
- (conn,info) = ThreadPoolServer.get_request(self)
- if self.wsgi_socket_timeout:
- conn.settimeout(self.wsgi_socket_timeout)
- return (conn, info)
-
-def serve( wsgi_app, global_conf, host="127.0.0.1", port="8080",
- server_version=None, protocol_version=None, start_loop=True,
- daemon_threads=None, socket_timeout=None, nworkers=10 ):
- """
- Similar to `paste.httpserver.serve` but using the thread pool server
- """
- server_address = ( host, int( port ) )
-
- if server_version:
- handler.server_version = server_version
- handler.sys_version = None
- if protocol_version:
- assert protocol_version in
('HTTP/0.9','HTTP/1.0','HTTP/1.1')
- handler.protocol_version = protocol_version
-
- server = WSGIThreadPoolServer( wsgi_app, server_address, int( nworkers ) )
- if daemon_threads:
- server.daemon_threads = daemon_threads
- if socket_timeout:
- server.wsgi_socket_timeout = int(socket_timeout)
-
- print "serving on %s:%s" % server.server_address
- if start_loop:
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- # allow CTRL+C to shutdown
- pass
- return server
-
-
-