commit/galaxy-central: natefoo: Drop threadframe for heartbeat (its functionality was integrated in sys in Python 2.5...)
1 new commit in galaxy-central: https://bitbucket.org/galaxy/galaxy-central/commits/5d5ed9cbbf9b/ Changeset: 5d5ed9cbbf9b User: natefoo Date: 2014-06-12 15:51:45 Summary: Drop threadframe for heartbeat (its functionality was integrated in sys in Python 2.5...) Affected #: 2 files diff -r 07df86dff47c924830c2912500af356180778162 -r 5d5ed9cbbf9b6a37dbf0777cf4b88015df9135f4 eggs.ini --- a/eggs.ini +++ b/eggs.ini @@ -26,7 +26,6 @@ pysqlite = 2.5.6 python_lzo = 1.08_2.03_static PyYAML = 3.10 -threadframe = 0.2 guppy = 0.1.10 SQLAlchemy = 0.7.9 ; msgpack_python = 0.2.4 diff -r 07df86dff47c924830c2912500af356180778162 -r 5d5ed9cbbf9b6a37dbf0777cf4b88015df9135f4 lib/galaxy/util/heartbeat.py --- a/lib/galaxy/util/heartbeat.py +++ b/lib/galaxy/util/heartbeat.py @@ -1,182 +1,167 @@ +import threading +import time +import traceback +import os +import sys -# Attempt to load threadframe module, and only define Heartbeat class -# if available +def get_current_thread_object_dict(): + """ + Get a dictionary of all 'Thread' objects created via the threading + module keyed by thread_id. Note that not all interpreter threads + have a thread objects, only the main thread and any created via the + 'threading' module. Threads created via the low level 'thread' module + will not be in the returned dictionary. -try: + HACK: This mucks with the internals of the threading module since that + module does not expose any way to match 'Thread' objects with + intepreter thread identifiers (though it should). + """ + rval = dict() + # Acquire the lock and then union the contents of 'active' and 'limbo' + # threads into the return value. + threading._active_limbo_lock.acquire() + rval.update( threading._active ) + rval.update( threading._limbo ) + threading._active_limbo_lock.release() + return rval - import pkg_resources - pkg_resources.require( "threadframe" ) +class Heartbeat( threading.Thread ): + """ + Thread that periodically dumps the state of all threads to a file + """ + def __init__( self, name="Heartbeat Thread", period=20, fname="heartbeat.log" ): + threading.Thread.__init__( self, name=name ) + self.should_stop = False + self.period = period + self.fname = fname + self.file = None + self.fname_nonsleeping = fname + ".nonsleeping" + self.file_nonsleeping = None + self.nonsleeping_heartbeats = { } + # Save process id + self.pid = os.getpid() + # Event to wait on when sleeping, allows us to interrupt for shutdown + self.wait_event = threading.Event() + def run( self ): + self.file = open( self.fname, "a" ) + print >> self.file, "Heartbeat for pid %d thread started at %s" % ( self.pid, time.asctime() ) + print >> self.file + self.file_nonsleeping = open ( self.fname_nonsleeping, "a" ) + print >> self.file_nonsleeping, "Non-Sleeping-threads for pid %d thread started at %s" % ( self.pid, time.asctime() ) + print >> self.file_nonsleeping + try: + while not self.should_stop: + # Print separator with timestamp + print >> self.file, "Traceback dump for all threads at %s:" % time.asctime() + print >> self.file + # Print the thread states + threads = get_current_thread_object_dict() + for thread_id, frame in sys._current_frames().iteritems(): + if thread_id in threads: + object = repr( threads[thread_id] ) + else: + object = "<No Thread object>" + print >> self.file, "Thread %s, %s:" % ( thread_id, object ) + print >> self.file + traceback.print_stack( frame, file=self.file ) + print >> self.file + print >> self.file, "End dump" + print >> self.file + self.file.flush() + self.print_nonsleeping(threads) + # Sleep for a bit + self.wait_event.wait( self.period ) + finally: + print >> self.file, "Heartbeat for pid %d thread stopped at %s" % ( self.pid, time.asctime() ) + print >> self.file + # Cleanup + self.file.close() + self.file_nonsleeping.close() -except: + def shutdown( self ): + self.should_stop = True + self.wait_event.set() + self.join() - import sys - print >> sys.stderr, "No threadframe module, Heartbeat not available" - Heartbeat = None + def thread_is_sleeping ( self, last_stack_frame ): + """ + Returns True if the given stack-frame represents a known + sleeper function (at least in python 2.5) + """ + _filename = last_stack_frame[0] + _line = last_stack_frame[1] + _funcname = last_stack_frame[2] + _text = last_stack_frame[3] + ### Ugly hack to tell if a thread is supposedly sleeping or not + ### These are the most common sleeping functions I've found. + ### Is there a better way? (python interpreter internals?) + ### Tested only with python 2.5 + if _funcname=="wait" and _text=="waiter.acquire()": + return True + if _funcname=="wait" and _text=="_sleep(delay)": + return True + if _funcname=="accept" and _text[-14:]=="_sock.accept()": + return True + if _funcname=="monitor" and _text.startswith("time.sleep( ") and _text.endswith(" )"): + return True + if _funcname=="drain_events" and _text=="sleep(polling_interval)": + return True + #Ugly hack: always skip the heartbeat thread + #TODO: get the current thread-id in python + # skip heartbeat thread by thread-id, not by filename + if _filename.find("/lib/galaxy/util/heartbeat.py")!=-1: + return True + ## By default, assume the thread is not sleeping + return False -else: + def get_interesting_stack_frame ( self, stack_frames ): + """ + Scans a given backtrace stack frames, returns a single + quadraple of [filename, line, function-name, text] of + the single, deepest, most interesting frame. - import threading - import threadframe - import time - import traceback - import os - import sys + Interesting being:: - def get_current_thread_object_dict(): + inside the galaxy source code ("/lib/galaxy"), + prefreably not an egg. """ - Get a dictionary of all 'Thread' objects created via the threading - module keyed by thread_id. Note that not all interpreter threads - have a thread objects, only the main thread and any created via the - 'threading' module. Threads created via the low level 'thread' module - will not be in the returned dictionary. + for _filename, _line, _funcname, _text in reversed(stack_frames): + idx = _filename.find("/lib/galaxy/") + if idx!=-1: + relative_filename = _filename[idx:] + return ( relative_filename, _line, _funcname, _text ) + # no "/lib/galaxy" code found, return the innermost frame + return stack_frames[-1] - HACK: This mucks with the internals of the threading module since that - module does not expose any way to match 'Thread' objects with - intepreter thread identifiers (though it should). - """ - rval = dict() - # Acquire the lock and then union the contents of 'active' and 'limbo' - # threads into the return value. - threading._active_limbo_lock.acquire() - rval.update( threading._active ) - rval.update( threading._limbo ) - threading._active_limbo_lock.release() - return rval + def print_nonsleeping( self, threads_object_dict ): + print >> self.file_nonsleeping, "Non-Sleeping threads at %s:" % time.asctime() + print >> self.file_nonsleeping + all_threads_are_sleeping = True + threads = get_current_thread_object_dict() + for thread_id, frame in sys._current_frames().iteritems(): + if thread_id in threads: + object = repr( threads[thread_id] ) + else: + object = "<No Thread object>" + tb = traceback.extract_stack(frame) + if self.thread_is_sleeping(tb[-1]): + if thread_id in self.nonsleeping_heartbeats: + del self.nonsleeping_heartbeats[thread_id] + continue - class Heartbeat( threading.Thread ): - """ - Thread that periodically dumps the state of all threads to a file using - the `threadframe` extension - """ - def __init__( self, name="Heartbeat Thread", period=20, fname="heartbeat.log" ): - threading.Thread.__init__( self, name=name ) - self.should_stop = False - self.period = period - self.fname = fname - self.file = None - self.fname_nonsleeping = fname + ".nonsleeping" - self.file_nonsleeping = None - self.nonsleeping_heartbeats = { } - # Save process id - self.pid = os.getpid() - # Event to wait on when sleeping, allows us to interrupt for shutdown - self.wait_event = threading.Event() - def run( self ): - self.file = open( self.fname, "a" ) - print >> self.file, "Heartbeat for pid %d thread started at %s" % ( self.pid, time.asctime() ) - print >> self.file - self.file_nonsleeping = open ( self.fname_nonsleeping, "a" ) - print >> self.file_nonsleeping, "Non-Sleeping-threads for pid %d thread started at %s" % ( self.pid, time.asctime() ) - print >> self.file_nonsleeping - try: - while not self.should_stop: - # Print separator with timestamp - print >> self.file, "Traceback dump for all threads at %s:" % time.asctime() - print >> self.file - # Print the thread states - threads = get_current_thread_object_dict() - for thread_id, frame in threadframe.dict().iteritems(): - if thread_id in threads: - object = repr( threads[thread_id] ) - else: - object = "<No Thread object>" - print >> self.file, "Thread %s, %s:" % ( thread_id, object ) - print >> self.file - traceback.print_stack( frame, file=self.file ) - print >> self.file - print >> self.file, "End dump" - print >> self.file - self.file.flush() - self.print_nonsleeping(threads) - # Sleep for a bit - self.wait_event.wait( self.period ) - finally: - print >> self.file, "Heartbeat for pid %d thread stopped at %s" % ( self.pid, time.asctime() ) - print >> self.file - # Cleanup - self.file.close() - self.file_nonsleeping.close() + # Count non-sleeping thread heartbeats + if thread_id in self.nonsleeping_heartbeats: + self.nonsleeping_heartbeats[thread_id] += 1 + else: + self.nonsleeping_heartbeats[thread_id]=1 - def shutdown( self ): - self.should_stop = True - self.wait_event.set() - self.join() + good_frame = self.get_interesting_stack_frame(tb) + print >> self.file_nonsleeping, "Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s" % \ + ( thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3] ) + all_threads_are_sleeping = False - def thread_is_sleeping ( self, last_stack_frame ): - """ - Returns True if the given stack-frame represents a known - sleeper function (at least in python 2.5) - """ - _filename = last_stack_frame[0] - _line = last_stack_frame[1] - _funcname = last_stack_frame[2] - _text = last_stack_frame[3] - ### Ugly hack to tell if a thread is supposedly sleeping or not - ### These are the most common sleeping functions I've found. - ### Is there a better way? (python interpreter internals?) - ### Tested only with python 2.5 - if _funcname=="wait" and _text=="waiter.acquire()": - return True - if _funcname=="wait" and _text=="_sleep(delay)": - return True - if _funcname=="accept" and _text[-14:]=="_sock.accept()": - return True - #Ugly hack: always skip the heartbeat thread - #TODO: get the current thread-id in python - # skip heartbeat thread by thread-id, not by filename - if _filename.find("/lib/galaxy/util/heartbeat.py")!=-1: - return True - ## By default, assume the thread is not sleeping - return False + if all_threads_are_sleeping: + print >> self.file_nonsleeping, "All threads are sleeping." + print >> self.file_nonsleeping + self.file_nonsleeping.flush() - def get_interesting_stack_frame ( self, stack_frames ): - """ - Scans a given backtrace stack frames, returns a single - quadraple of [filename, line, function-name, text] of - the single, deepest, most interesting frame. - - Interesting being:: - - inside the galaxy source code ("/lib/galaxy"), - prefreably not an egg. - """ - for _filename, _line, _funcname, _text in reversed(stack_frames): - idx = _filename.find("/lib/galaxy/") - if idx!=-1: - relative_filename = _filename[idx:] - return ( relative_filename, _line, _funcname, _text ) - # no "/lib/galaxy" code found, return the innermost frame - return stack_frames[-1] - - def print_nonsleeping( self, threads_object_dict ): - print >> self.file_nonsleeping, "Non-Sleeping threads at %s:" % time.asctime() - print >> self.file_nonsleeping - all_threads_are_sleeping = True - threads = get_current_thread_object_dict() - for thread_id, frame in threadframe.dict().iteritems(): - if thread_id in threads: - object = repr( threads[thread_id] ) - else: - object = "<No Thread object>" - tb = traceback.extract_stack(frame) - if self.thread_is_sleeping(tb[-1]): - if thread_id in self.nonsleeping_heartbeats: - del self.nonsleeping_heartbeats[thread_id] - continue - - # Count non-sleeping thread heartbeats - if thread_id in self.nonsleeping_heartbeats: - self.nonsleeping_heartbeats[thread_id] += 1 - else: - self.nonsleeping_heartbeats[thread_id]=1 - - good_frame = self.get_interesting_stack_frame(tb) - print >> self.file_nonsleeping, "Thread %s\t%s\tnon-sleeping for %d heartbeat(s)\n File %s:%d\n Function \"%s\"\n %s" % \ - ( thread_id, object, self.nonsleeping_heartbeats[thread_id], good_frame[0], good_frame[1], good_frame[2], good_frame[3] ) - all_threads_are_sleeping = False - - if all_threads_are_sleeping: - print >> self.file_nonsleeping, "All threads are sleeping." - print >> self.file_nonsleeping - self.file_nonsleeping.flush() - Repository URL: https://bitbucket.org/galaxy/galaxy-central/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.
participants (1)
-
commits-noreply@bitbucket.org