[z3-checkins] r26126 - z3/jsonserver/branch/merge/utils

reebalazs at codespeak.net reebalazs at codespeak.net
Sat Apr 22 11:47:05 CEST 2006


Author: reebalazs
Date: Sat Apr 22 11:47:04 2006
New Revision: 26126

Added:
   z3/jsonserver/branch/merge/utils/tcpwatch   (contents, props changed)
Log:
Include tcpwatch original version

Added: z3/jsonserver/branch/merge/utils/tcpwatch
==============================================================================
--- (empty file)
+++ z3/jsonserver/branch/merge/utils/tcpwatch	Sat Apr 22 11:47:04 2006
@@ -0,0 +1,1485 @@
+#!/usr/bin/python
+
+#############################################################################
+# 
+# Zope Public License (ZPL) Version 2.0
+# -----------------------------------------------
+# 
+# This software is Copyright (c) Zope Corporation (tm) and
+# Contributors. All rights reserved.
+# 
+# This license has been certified as open source. It has also
+# been designated as GPL compatible by the Free Software
+# Foundation (FSF).
+# 
+# Redistribution and use in source and binary forms, with or
+# without modification, are permitted provided that the
+# following conditions are met:
+# 
+# 1. Redistributions in source code must retain the above
+#    copyright notice, this list of conditions, and the following
+#    disclaimer.
+# 
+# 2. Redistributions in binary form must reproduce the above
+#    copyright notice, this list of conditions, and the following
+#    disclaimer in the documentation and/or other materials
+#    provided with the distribution.
+# 
+# 3. The name Zope Corporation (tm) must not be used to
+#    endorse or promote products derived from this software
+#    without prior written permission from Zope Corporation.
+# 
+# 4. The right to distribute this software or to use it for
+#    any purpose does not give you the right to use Servicemarks
+#    (sm) or Trademarks (tm) of Zope Corporation. Use of them is
+#    covered in a separate agreement (see
+#    http://www.zope.com/Marks).
+# 
+# 5. If any files are modified, you must cause the modified
+#    files to carry prominent notices stating that you changed
+#    the files and the date of any change.
+# 
+# Disclaimer
+# 
+#   THIS SOFTWARE IS PROVIDED BY ZOPE CORPORATION ``AS IS''
+#   AND ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT
+#   NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+#   AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN
+#   NO EVENT SHALL ZOPE CORPORATION OR ITS CONTRIBUTORS BE
+#   LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+#   EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+#   LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+#   HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+#   CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+#   OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+#   SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
+#   DAMAGE.
+# 
+# 
+# This software consists of contributions made by Zope
+# Corporation and many individuals on behalf of Zope
+# Corporation.  Specific attributions are listed in the
+# accompanying credits file.
+# 
+#############################################################################
+"""TCPWatch, a connection forwarder and HTTP proxy for monitoring connections.
+
+Requires Python 2.1 or above.
+
+Revision information:
+$Id: tcpwatch.py,v 1.9 2004/06/17 00:03:46 shane Exp $
+"""
+
+from __future__ import nested_scopes
+
+VERSION = '1.3'
+COPYRIGHT = (
+    'TCPWatch %s Copyright 2001 Shane Hathaway, Zope Corporation'
+    % VERSION)
+
+import sys
+import os
+import socket
+import asyncore
+import getopt
+from time import time, localtime
+
+
+RECV_BUFFER_SIZE = 8192
+show_cr = 0
+
+
+#############################################################################
+#
+# Connection forwarder
+#
+#############################################################################
+
+
+class ForwardingEndpoint (asyncore.dispatcher):
+    """A socket wrapper that accepts and generates stream messages.
+    """
+    _dests = ()
+
+    def __init__(self, conn=None):
+        self._outbuf = []
+        asyncore.dispatcher.__init__(self, conn)
+
+    def set_dests(self, dests):
+        """Sets the destination streams.
+        """
+        self._dests = dests
+
+    def write(self, data):
+        if data:
+            self._outbuf.append(data)
+            self.handle_write()
+
+    def readable(self):
+        return 1
+
+    def writable(self):
+        return not self.connected or len(self._outbuf) > 0
+
+    def handle_connect(self):
+        for d in self._dests:
+            d.write('')  # A blank string means the socket just connected.
+
+    def received(self, data):
+        if data:
+            for d in self._dests:
+                d.write(data)
+
+    def handle_read(self):
+        data = self.recv(RECV_BUFFER_SIZE)
+        self.received(data)
+
+    def handle_write(self):
+        if not self.connected:
+            # Wait for a connection.
+            return
+        buf = self._outbuf
+        while buf:
+            data = buf.pop(0)
+            if data:
+                sent = self.send(data)
+                if sent < len(data):
+                    buf.insert(0, data[sent:])
+                    break
+
+    def handle_close (self):
+        dests = self._dests
+        self._dests = ()
+        for d in dests:
+            d.close()
+        self.close()
+
+    def handle_error(self):
+        t, v = sys.exc_info()[:2]
+        for d in self._dests:
+            if hasattr(d, 'error'):
+                d.error(t, v)
+        self.handle_close()
+
+
+
+class EndpointObserver:
+    """Sends stream events to a ConnectionObserver.
+
+    Streams don't distinguish sources, while ConnectionObservers do.
+    This adapter adds source information to stream events.
+    """
+
+    def __init__(self, obs, from_client):
+        self.obs = obs
+        self.from_client = from_client
+
+    def write(self, data):
+        if data:
+            self.obs.received(data, self.from_client)
+        else:
+            self.obs.connected(self.from_client)
+
+    def close(self):
+        self.obs.closed(self.from_client)
+
+    def error(self, t, v):
+        self.obs.error(self.from_client, t, v)
+
+
+
+class ForwardedConnectionInfo:
+    transaction = 1
+
+    def __init__(self, connection_number, client_addr, server_addr=None):
+        self.opened = time()
+        self.connection_number = connection_number
+        self.client_addr = client_addr
+        self.server_addr = server_addr
+
+    def dup(self):
+        return ForwardedConnectionInfo(self.connection_number,
+                                       self.client_addr,
+                                       self.server_addr)
+
+
+
+class ForwardingService (asyncore.dispatcher):
+
+    _counter = 0
+
+    def __init__(self, listen_host, listen_port, dest_host, dest_port,
+                 observer_factory=None):
+        self._obs_factory = observer_factory
+        self._dest = (dest_host, dest_port)
+        asyncore.dispatcher.__init__(self)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        self.bind((listen_host, listen_port))
+        self.listen(5)
+
+    def handle_accept(self):
+        info = self.accept()
+        if info:
+            # Got a connection.
+            conn, addr = info
+            conn.setblocking(0)
+
+            ep1 = ForwardingEndpoint()  # connects client to self
+            ep2 = ForwardingEndpoint()  # connects self to server
+
+            counter = self._counter + 1
+            self._counter = counter
+            factory = self._obs_factory
+            if factory is not None:
+                fci = ForwardedConnectionInfo(counter, addr, self._dest)
+                obs = factory(fci)
+                dests1 = (ep2, EndpointObserver(obs, 1))
+                dests2 = (ep1, EndpointObserver(obs, 0))
+            else:
+                dests1 = (ep2,)
+                dests2 = (ep1,)
+
+            ep1.set_dests(dests1)
+            ep2.set_dests(dests2)
+
+            # Now everything is hooked up.  Let data pass.
+            ep2.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+            ep1.set_socket(conn)
+            ep1.connected = 1  # We already know the client connected.
+            ep2.connect(self._dest)
+
+    def handle_error(self):
+        # Don't stop the server.
+        import traceback
+        traceback.print_exc()
+
+
+
+class IConnectionObserver:
+
+    def connected(from_client):
+        """Called when the client or the server connects.
+        """
+
+    def received(data, from_client):
+        """Called when the client or the server sends data.
+        """
+
+    def closed(from_client):
+        """Called when the client or the server closes the channel.
+        """
+
+    def error(from_client, type, value):
+        """Called when an error occurs in the client or the server channel.
+        """
+
+
+#############################################################################
+#
+# Basic abstract connection observer and stdout observer
+#
+#############################################################################
+
+
+def escape(s):
+    # XXX This might be a brittle trick. :-(
+    return repr('"\'' + str(s))[4:-1]
+
+
+class BasicObserver:
+
+    continuing_line = -1  # Tracks when a line isn't finished.
+    arrows = ('<==', '==>')
+
+    def __init__(self):
+        self._start = time()
+
+    def _output_message(self, m, from_client):
+        if self.continuing_line >= 0:
+            self.write('\n')
+            self.continuing_line = -1
+        if from_client:
+            who = 'client'
+        else:
+            who = 'server'
+
+        t = time() - self._start
+        min, sec = divmod(t, 60)
+        self.write('[%02d:%06.3f - %s %s]\n' % (min, sec, who, m))
+        self.flush()
+
+    def connection_from(self, fci):
+        if fci.server_addr is not None:
+            self._output_message(
+                '%s:%s forwarded to %s:%s' %
+                (tuple(fci.client_addr) + tuple(fci.server_addr)), 1)
+        else:
+            self._output_message(
+                'connection from %s:%s' %
+                (tuple(fci.client_addr)), 1)
+
+        if fci.transaction > 1:
+            self._output_message(
+                ('HTTP transaction #%d' % fci.transaction), 1)
+
+    def connected(self, from_client):
+        self._output_message('connected', from_client)
+
+    def received(self, data, from_client):
+        arrow = self.arrows[from_client]
+        cl = self.continuing_line
+        if cl >= 0:
+            if cl != from_client:
+                # Switching directions.
+                self.write('\n%s' % arrow)
+        else:
+            self.write(arrow)
+
+        if data.endswith('\n'):
+            data = data[:-1]
+            newline = 1
+        else:
+            newline = 0
+
+        if not show_cr:
+            data = data.replace('\r', '')
+        lines = data.split('\n')
+        lines = map(escape, lines)
+        s = ('\n%s' % arrow).join(lines)
+        self.write(s)
+
+        if newline:
+            self.write('\n')
+            self.continuing_line = -1
+        else:
+            self.continuing_line = from_client
+        self.flush()
+
+    def closed(self, from_client):
+        self._output_message('closed', from_client)
+
+    def error(self, from_client, type, value):
+        self._output_message(
+            'connection error %s: %s' % (type, value), from_client)
+    
+    def write(self, s):
+        raise NotImplementedError
+
+    def flush(self):
+        raise NotImplementedError
+            
+
+class StdoutObserver (BasicObserver):
+
+    # __implements__ = IConnectionObserver
+
+    def __init__(self, fci):
+        BasicObserver.__init__(self)
+        self.connection_from(fci)
+
+    def write(self, s):
+        sys.stdout.write(s)
+
+    def flush(self):
+        sys.stdout.flush()
+
+
+# 'log_number' is a log file counter used for naming log files.
+log_number = 0
+
+def nextLogNumber():
+    global log_number
+    log_number = log_number + 1
+    return log_number    
+
+
+class RecordingObserver (BasicObserver):
+    """Log request to a file.
+
+    o Filenames mangle connection and transaction numbers from the
+      ForwardedConnectionInfo passed as 'fci'.
+
+    o Decorates an underlying observer, created via the passed 'sub_factory'.
+
+    o Files are created in the supplied 'record_directory'.
+
+    o Unless suppressed, log response and error to corresponding files.
+    """
+    _ERROR_SOURCES = ('Server', 'Client')
+
+    # __implements__ = IConnectionObserver
+
+    def __init__(self, fci, sub_factory, record_directory,
+                 record_prefix='watch', record_responses=1, record_errors=1):
+        self._log_number = nextLogNumber()
+        self._decorated = sub_factory(fci)
+        self._directory = record_directory
+        self._prefix = record_prefix
+        self._response = record_responses
+        self._errors = record_errors
+
+    def connected(self, from_client):
+        """See IConnectionObserver.
+        """
+        self._decorated.connected(from_client)
+
+    def received(self, data, from_client):
+        """See IConnectionObserver.
+        """
+        if from_client or self._response:
+            extension = from_client and 'request' or 'response'
+            file = self._openForAppend(extension=extension)
+            file.write(data)
+            file.close()
+        self._decorated.received(data, from_client)
+
+    def closed(self, from_client):
+        """See IConnectionObserver.
+        """
+        self._decorated.closed(from_client)
+
+    def error(self, from_client, type, value):
+        """See IConnectionObserver.
+        """
+        if self._errors:
+            file = self._openForAppend(extension='errors')
+            file.write('(%s) %s: %s\n' % (self._ERROR_SOURCES[from_client],
+                                          type, value))
+        self._decorated.error(from_client, type, value)
+
+    def _openForAppend(self, extension):
+        """Open a file with the given extension for appending.
+
+        o File should be in the directory indicated by self._directory.
+
+        o File should have a filename '<prefix>_<conn #>.<extension>'.
+        """
+        filename = '%s%04d.%s' % (self._prefix, self._log_number, extension)
+        fqpath = os.path.join(self._directory, filename)
+        return open(fqpath, 'a')
+
+
+#############################################################################
+#
+# Tkinter GUI
+#
+#############################################################################
+
+
+def setupTk(titlepart, config_info, colorized=1):
+    """Starts the Tk application and returns an observer factory.
+    """
+
+    import Tkinter
+    from ScrolledText import ScrolledText
+    from Queue import Queue, Empty
+    try:
+        from cStringIO import StringIO
+    except ImportError:
+        from StringIO import StringIO
+
+    startup_text = COPYRIGHT + ("""
+
+Use your client to connect to the proxied port(s) then click
+the list on the left to see the data transferred.
+
+%s
+""" % config_info)
+
+
+    class TkTCPWatch (Tkinter.Frame):
+        '''The tcpwatch top-level window.
+        '''
+        def __init__(self, master):
+            Tkinter.Frame.__init__(self, master)
+            self.createWidgets()
+            # connections maps ids to TkConnectionObservers.
+            self.connections = {}
+            self.showingid = ''
+            self.queue = Queue()
+            self.processQueue()
+
+        def createWidgets(self):
+            listframe = Tkinter.Frame(self)
+            listframe.pack(side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1)
+            scrollbar = Tkinter.Scrollbar(listframe, orient=Tkinter.VERTICAL)
+            self.connectlist = Tkinter.Listbox(
+                listframe, yscrollcommand=scrollbar.set, exportselection=0)
+            scrollbar.config(command=self.connectlist.yview)
+            scrollbar.pack(side=Tkinter.RIGHT, fill=Tkinter.Y)
+            self.connectlist.pack(
+                side=Tkinter.LEFT, fill=Tkinter.BOTH, expand=1)
+            self.connectlist.bind('<Button-1>', self.mouseListSelect)
+            self.textbox = ScrolledText(self, background="#ffffff")
+            self.textbox.tag_config("message", foreground="#000000")
+            self.textbox.tag_config("client", foreground="#007700")
+            self.textbox.tag_config(
+                "clientesc", foreground="#007700", background="#dddddd")
+            self.textbox.tag_config("server", foreground="#770000")
+            self.textbox.tag_config(
+                "serveresc", foreground="#770000", background="#dddddd")
+            self.textbox.insert(Tkinter.END, startup_text, "message")
+            self.textbox.pack(side='right', fill=Tkinter.BOTH, expand=1)
+            self.pack(fill=Tkinter.BOTH, expand=1)
+
+        def addConnection(self, id, conn):
+            self.connections[id] = conn
+            connectlist = self.connectlist
+            connectlist.insert(Tkinter.END, id)
+
+        def updateConnection(self, id, output):
+            if id == self.showingid:
+                textbox = self.textbox
+                for data, style in output:
+                    textbox.insert(Tkinter.END, data, style)
+
+        def mouseListSelect(self, event=None):
+            connectlist = self.connectlist
+            idx = connectlist.nearest(event.y)
+            sel = connectlist.get(idx)
+            connections = self.connections
+            if connections.has_key(sel):
+                self.showingid = ''
+                output = connections[sel].getOutput()
+                self.textbox.delete(1.0, Tkinter.END)
+                for data, style in output:
+                    self.textbox.insert(Tkinter.END, data, style)
+                self.showingid = sel
+
+        def processQueue(self):
+            try:
+                if not self.queue.empty():
+                    # Process messages for up to 1/4 second
+                    from time import time
+                    limit = time() + 0.25
+                    while time() < limit:
+                        try:
+                            f, args = self.queue.get_nowait()
+                        except Empty:
+                            break
+                        f(*args)
+            finally:
+                self.master.after(50, self.processQueue)
+
+
+    class TkConnectionObserver (BasicObserver):
+        '''A connection observer which shows captured data in a TCPWatch
+        frame.  The data is mangled for presentation.
+        '''
+        # __implements__ = IConnectionObserver
+
+        def __init__(self, frame, fci, colorized=1):
+            BasicObserver.__init__(self)
+            self._output = []  # list of tuples containing (text, style)
+            self._frame = frame
+            self._colorized = colorized
+            t = localtime(fci.opened)
+            if fci.transaction > 1:
+                base_id = '%03d-%02d' % (
+                    fci.connection_number, fci.transaction)
+            else:
+                base_id = '%03d' % fci.connection_number
+            id = '%s (%02d:%02d:%02d)' % (base_id, t[3], t[4], t[5])
+            self._id = id
+            frame.queue.put((frame.addConnection, (id, self)))
+            self.connection_from(fci)
+
+        def write(self, s):
+            output = [(s, "message")]
+            self._output.extend(output)
+            self._frame.queue.put(
+                (self._frame.updateConnection, (self._id, output)))
+
+        def flush(self):
+            pass
+
+        def received(self, data, from_client):
+            if not self._colorized:
+                BasicObserver.received(self, data, from_client)
+                return
+
+            if not show_cr:
+                data = data.replace('\r', '')
+
+            output = []
+
+            extra_color = (self._colorized == 2)
+
+            if extra_color:
+                # 4 colors: Change the color client/server and escaped chars
+                def append(ss, escaped, output=output,
+                           from_client=from_client, escape=escape):
+                    if escaped:
+                        output.append((escape(ss), from_client
+                                       and 'clientesc' or 'serveresc'))
+                    else:
+                        output.append((ss, from_client
+                                       and 'client' or 'server'))
+            else:
+                # 2 colors: Only change color for client/server
+                segments = []
+                def append(ss, escaped, segments=segments,
+                           escape=escape):
+                    if escaped:
+                        segments.append(escape(ss))
+                    else:
+                        segments.append(ss)
+
+            # Escape the input data.
+            was_escaped = 0
+            start_idx = 0
+            for idx in xrange(len(data)):
+                c = data[idx]
+                escaped = (c < ' ' and c != '\n') or c >= '\x80'
+                if was_escaped != escaped:
+                    ss = data[start_idx:idx]
+                    if ss:
+                        append(ss, was_escaped)
+                    was_escaped = escaped
+                    start_idx = idx
+            ss = data[start_idx:]
+            if ss:
+                append(ss, was_escaped)
+
+            if not extra_color:
+                output.append((''.join(segments),
+                               from_client and 'client' or 'server'))
+
+            # Send output to the frame.
+            self._output.extend(output)
+            self._frame.queue.put(
+                (self._frame.updateConnection, (self._id, output)))
+            if data.endswith('\n'):
+                self.continuing_line = -1
+            else:
+                self.continuing_line = from_client
+
+        def getOutput(self):
+            return self._output
+
+
+    def createApp(titlepart):
+        master = Tkinter.Tk()
+        app = TkTCPWatch(master)
+        try:
+            wm_title = app.master.wm_title
+        except AttributeError:
+            pass  # No wm_title method available.
+        else:
+            wm_title('TCPWatch [%s]' % titlepart)
+        return app
+
+    app = createApp(titlepart)
+
+    def tkObserverFactory(fci, app=app, colorized=colorized):
+        return TkConnectionObserver(app, fci, colorized)
+
+    return tkObserverFactory, app.mainloop
+
+
+
+#############################################################################
+#
+# The HTTP splitter
+#
+# Derived from Zope.Server.HTTPServer.
+#
+#############################################################################
+
+
+def find_double_newline(s):
+    """Returns the position just after the double newline."""
+    pos1 = s.find('\n\r\n')  # One kind of double newline
+    if pos1 >= 0:
+        pos1 += 3
+    pos2 = s.find('\n\n')    # Another kind of double newline
+    if pos2 >= 0:
+        pos2 += 2
+
+    if pos1 >= 0:
+        if pos2 >= 0:
+            return min(pos1, pos2)
+        else:
+            return pos1
+    else:
+        return pos2
+
+
+
+class StreamedReceiver:
+    """Accepts data up to a specific limit."""
+
+    completed = 0
+
+    def __init__(self, cl, buf=None):
+        self.remain = cl
+        self.buf = buf
+        if cl < 1:
+            self.completed = 1
+
+    def received(self, data):
+        rm = self.remain
+        if rm < 1:
+            self.completed = 1  # Avoid any chance of spinning
+            return 0
+        buf = self.buf
+        datalen = len(data)
+        if rm <= datalen:
+            if buf is not None:
+                buf.append(data[:rm])
+            self.remain = 0
+            self.completed = 1
+            return rm
+        else:
+            if buf is not None:
+                buf.append(data)
+            self.remain -= datalen
+            return datalen
+
+
+
+class UnlimitedReceiver:
+    """Accepts data without limits."""
+
+    completed = 0
+
+    def received(self, data):
+        # always consume everything
+        return len(data)
+
+
+
+class ChunkedReceiver:
+    """Accepts all chunks."""
+
+    chunk_remainder = 0
+    control_line = ''
+    all_chunks_received = 0
+    trailer = ''
+    completed = 0
+
+
+    def __init__(self, buf=None):
+        self.buf = buf
+
+    def received(self, s):
+        # Returns the number of bytes consumed.
+        if self.completed:
+            return 0
+        orig_size = len(s)
+        while s:
+            rm = self.chunk_remainder
+            if rm > 0:
+                # Receive the remainder of a chunk.
+                to_write = s[:rm]
+                if self.buf is not None:
+                    self.buf.append(to_write)
+                written = len(to_write)
+                s = s[written:]
+                self.chunk_remainder -= written
+            elif not self.all_chunks_received:
+                # Receive a control line.
+                s = self.control_line + s
+                pos = s.find('\n')
+                if pos < 0:
+                    # Control line not finished.
+                    self.control_line = s
+                    s = ''
+                else:
+                    # Control line finished.
+                    line = s[:pos]
+                    s = s[pos + 1:]
+                    self.control_line = ''
+                    line = line.strip()
+                    if line:
+                        # Begin a new chunk.
+                        semi = line.find(';')
+                        if semi >= 0:
+                            # discard extension info.
+                            line = line[:semi]
+                        sz = int(line.strip(), 16)  # hexadecimal
+                        if sz > 0:
+                            # Start a new chunk.
+                            self.chunk_remainder = sz
+                        else:
+                            # Finished chunks.
+                            self.all_chunks_received = 1
+                    # else expect a control line.
+            else:
+                # Receive the trailer.
+                trailer = self.trailer + s
+                if trailer[:2] == '\r\n':
+                    # No trailer.
+                    self.completed = 1
+                    return orig_size - (len(trailer) - 2)
+                elif trailer[:1] == '\n':
+                    # No trailer.
+                    self.completed = 1
+                    return orig_size - (len(trailer) - 1)
+                pos = find_double_newline(trailer)
+                if pos < 0:
+                    # Trailer not finished.
+                    self.trailer = trailer
+                    s = ''
+                else:
+                    # Finished the trailer.
+                    self.completed = 1
+                    self.trailer = trailer[:pos]
+                    return orig_size - (len(trailer) - pos)
+        return orig_size
+
+
+
+class HTTPStreamParser:
+    """A structure that parses the HTTP stream.
+    """
+
+    completed = 0    # Set once request is completed.
+    empty = 0        # Set if no request was made.
+    header_plus = ''
+    chunked = 0
+    content_length = 0
+    body_rcv = None
+
+    # headers is a mapping containing keys translated to uppercase
+    # with dashes turned into underscores.
+
+    def __init__(self, is_a_request):
+        self.headers = {}
+        self.is_a_request = is_a_request
+        self.body_data = []
+
+    def received(self, data):
+        """Receives the HTTP stream for one request.
+
+        Returns the number of bytes consumed.
+        Sets the completed flag once both the header and the
+        body have been received.
+        """
+        if self.completed:
+            return 0  # Can't consume any more.
+        datalen = len(data)
+        br = self.body_rcv
+        if br is None:
+            # In header.
+            s = self.header_plus + data
+            index = find_double_newline(s)
+            if index >= 0:
+                # Header finished.
+                header_plus = s[:index]
+                consumed = len(data) - (len(s) - index)
+                self.in_header = 0
+                # Remove preceeding blank lines.
+                header_plus = header_plus.lstrip()
+                if not header_plus:
+                    self.empty = 1
+                    self.completed = 1
+                else:
+                    self.parse_header(header_plus)
+                    if self.body_rcv is None or self.body_rcv.completed:
+                        self.completed = 1
+                return consumed
+            else:
+                # Header not finished yet.
+                self.header_plus = s
+                return datalen
+        else:
+            # In body.
+            consumed = br.received(data)
+            self.body_data.append(data[:consumed])
+            if br.completed:
+                self.completed = 1
+            return consumed
+
+
+    def parse_header(self, header_plus):
+        """Parses the header_plus block of text.
+
+        (header_plus is the headers plus the first line of the request).
+        """
+        index = header_plus.find('\n')
+        if index >= 0:
+            first_line = header_plus[:index]
+            header = header_plus[index + 1:]
+        else:
+            first_line = header_plus
+            header = ''
+        self.first_line = first_line
+        self.header = header
+
+        lines = self.get_header_lines()
+        headers = self.headers
+        for line in lines:
+            index = line.find(':')
+            if index > 0:
+                key = line[:index]
+                value = line[index + 1:].strip()
+                key1 = key.upper().replace('-', '_')
+                headers[key1] = value
+            # else there's garbage in the headers?
+
+        if not self.is_a_request:
+            # Check for a 304 response.
+            parts = first_line.split()
+            if len(parts) >= 2 and parts[1] == '304':
+                # Expect no body.
+                self.body_rcv = StreamedReceiver(0)
+
+        if self.body_rcv is None:
+            # Ignore the HTTP version and just assume
+            # that the Transfer-Encoding header, when supplied, is valid.
+            te = headers.get('TRANSFER_ENCODING', '')
+            if te == 'chunked':
+                self.chunked = 1
+                self.body_rcv = ChunkedReceiver()
+            if not self.chunked:
+                cl = int(headers.get('CONTENT_LENGTH', -1))
+                self.content_length = cl
+                if cl >= 0 or self.is_a_request:
+                    self.body_rcv = StreamedReceiver(cl)
+                else:
+                    # No content length and this is a response.
+                    # We have to assume unlimited content length.
+                    self.body_rcv = UnlimitedReceiver()
+
+
+    def get_header_lines(self):
+        """Splits the header into lines, putting multi-line headers together.
+        """
+        r = []
+        lines = self.header.split('\n')
+        for line in lines:
+            if line.endswith('\r'):
+                line = line[:-1]
+            if line and line[0] in ' \t':
+                r[-1] = r[-1] + line[1:]
+            else:
+                r.append(line)
+        return r
+
+
+
+class HTTPConnectionSplitter:
+    """Makes a new observer for each HTTP subconnection and forwards events.
+    """
+
+    # __implements__ = IConnectionObserver
+    req_index = 0
+    resp_index = 0
+
+    def __init__(self, sub_factory, fci):
+        self.sub_factory = sub_factory
+        self.transactions = []  # (observer, request_data, response_data)
+        self.fci = fci
+        self._newTransaction()
+
+    def _newTransaction(self):
+        fci = self.fci.dup()
+        fci.transaction = len(self.transactions) + 1
+        obs = self.sub_factory(fci)
+        req = HTTPStreamParser(1)
+        resp = HTTPStreamParser(0)
+        self.transactions.append((obs, req, resp))
+
+    def _mostRecentObs(self):
+        return self.transactions[-1][0]
+
+    def connected(self, from_client):
+        self._mostRecentObs().connected(from_client)
+
+    def closed(self, from_client):
+        self._mostRecentObs().closed(from_client)
+
+    def error(self, from_client, type, value):
+        self._mostRecentObs().error(from_client, type, value)
+
+    def received(self, data, from_client):
+        transactions = self.transactions
+        while data:
+            if from_client:
+                index = self.req_index
+            else:
+                index = self.resp_index
+            if index >= len(transactions):
+                self._newTransaction()
+
+            obs, req, resp = transactions[index]
+            if from_client:
+                parser = req
+            else:
+                parser = resp
+
+            consumed = parser.received(data)
+            obs.received(data[:consumed], from_client)
+            data = data[consumed:]
+            if parser.completed:
+                new_index = index + 1
+                if from_client:
+                    self.req_index = new_index
+                else:
+                    self.resp_index = new_index
+
+
+#############################################################################
+#
+# HTTP proxy
+#
+#############################################################################
+
+
+class HTTPProxyToServerConnection (ForwardingEndpoint):
+    """Ensures that responses to a persistent HTTP connection occur
+    in the correct order."""
+
+    finished = 0
+
+    def __init__(self, proxy_conn, dests=()):
+        ForwardingEndpoint.__init__(self)
+        self.response_parser = HTTPStreamParser(0)
+        self.proxy_conn = proxy_conn
+        self.set_dests(dests)
+
+        # Data for the client held until previous responses are sent
+        self.held = []
+
+    def _isMyTurn(self):
+        """Returns a true value if it's time for this response
+        to respond to the client."""
+        order = self.proxy_conn._response_order
+        if order:
+            return (order[0] is self)
+        return 1
+
+    def received(self, data):
+        """Receives data from the HTTP server to be sent back to the client."""
+        while 1:
+            parser = self.response_parser
+            if parser.completed:
+                self.finished = 1
+                self.flush()
+                # Note that any extra data returned from the server is
+                # ignored. Should it be? :-(
+                return
+            if not data:
+                break
+            consumed = parser.received(data)
+            fragment = data[:consumed]
+            data = data[consumed:]
+            ForwardingEndpoint.received(self, fragment)
+            self.held.append(fragment)
+            self.flush()
+
+    def flush(self):
+        """Flushes buffers and, if the response has been sent, allows
+        the next response to take over.
+        """
+        if self.held and self._isMyTurn():
+            data = ''.join(self.held)
+            del self.held[:]
+            self.proxy_conn.write(data)
+        if self.finished:
+            order = self.proxy_conn._response_order
+            if order and order[0] is self:
+                del order[0]
+            if order:
+                order[0].flush()  # kick!
+
+    def handle_close(self):
+        """The HTTP server closed the connection.
+        """
+        ForwardingEndpoint.handle_close(self)
+        if not self.finished:
+            # Cancel the proxy connection, even if there are responses
+            # pending, since the HTTP spec provides no way to recover
+            # from an unfinished response.
+            self.proxy_conn.close()
+
+    def close(self):
+        """Close the connection to the server.
+
+        If there is unsent response data, an error is generated.
+        """
+        self.flush()
+        if not self.finished:
+            t = IOError
+            v = 'Closed without finishing response to client'
+            for d in self._dests:
+                if hasattr(d, 'error'):
+                    d.error(t, v)
+        ForwardingEndpoint.close(self)
+
+
+
+class HTTPProxyToClientConnection (ForwardingEndpoint):
+    """A connection from a client to the proxy server"""
+
+    _req_parser = None
+    _transaction = 0
+    _obs = None
+
+    def __init__(self, conn, factory, counter, addr):
+        ForwardingEndpoint.__init__(self, conn)
+        self._obs_factory = factory
+        self._counter = counter
+        self._client_addr = addr
+        self._response_order = []
+        self._newRequest()
+
+    def _newRequest(self):
+        """Starts a new request on a persistent connection."""
+        if self._req_parser is None:
+            self._req_parser = HTTPStreamParser(1)
+        factory = self._obs_factory
+        if factory is not None:
+            fci = ForwardedConnectionInfo(self._counter, self._client_addr)
+            self._transaction = self._transaction + 1
+            fci.transaction = self._transaction
+            obs = factory(fci)
+            self._obs = obs
+            self.set_dests((EndpointObserver(obs, 1),))
+
+    def received(self, data):
+        """Accepts data received from the client."""
+        while data:
+            parser = self._req_parser
+            if parser is None:
+                # Begin another request.
+                self._newRequest()
+                parser = self._req_parser
+            if not parser.completed:
+                # Waiting for a complete request.
+                consumed = parser.received(data)
+                ForwardingEndpoint.received(self, data[:consumed])
+                data = data[consumed:]
+            if parser.completed:
+                # Connect to a server.
+                self.openProxyConnection(parser)
+                # Expect a new request or a closed connection.
+                self._req_parser = None
+
+    def openProxyConnection(self, request):
+        """Parses the client connection and opens a connection to an
+        HTTP server.
+        """
+        first_line = request.first_line.strip()
+        if not ' ' in first_line:
+            raise ValueError, ('Malformed request: %s' % first_line)
+        command, url = first_line.split(' ', 1)
+        pos = url.rfind(' HTTP/')
+        if pos >= 0:
+            protocol = url[pos + 1:]
+            url = url[:pos].rstrip()
+        else:
+            protocol = 'HTTP/1.0'
+        if url.startswith('http://'):
+            # Standard proxy
+            urlpart = url[7:]
+            if '/' in urlpart:
+                host, path = url[7:].split('/', 1)
+                path = '/' + path
+            else:
+                host = urlpart
+                path = '/'
+        else:
+            # Transparent proxy
+            host = request.headers.get('HOST')
+            path = url
+        if not host:
+            raise ValueError, ('Request type not supported: %s' % url)
+
+        if ':' in host:
+            host, port = host.split(':')
+            port = int(port)
+        else:
+            port = 80
+
+        if '@' in host:
+            username, host = host.split('@')
+
+        obs = self._obs
+        if obs is not None:
+            eo = EndpointObserver(obs, 0)
+            ptos = HTTPProxyToServerConnection(self, (eo,))
+        else:
+            ptos = HTTPProxyToServerConnection(self)
+
+        self._response_order.append(ptos)
+
+        ptos.write('%s %s %s\r\n' % (command, path, protocol))
+        # Duplicate the headers sent by the client.
+        if request.header:
+            ptos.write(request.header)
+        else:
+            ptos.write('\r\n')
+        if request.body_data:
+            ptos.write(''.join(request.body_data))
+        ptos.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        ptos.connect((host, port))
+
+    def close(self):
+        """Closes the connection to the client.
+
+        If there are open connections to proxy servers, the server
+        connections are also closed.
+        """
+        ForwardingEndpoint.close(self)
+        for ptos in self._response_order:
+            ptos.close()
+        del self._response_order[:]
+
+
+class HTTPProxyService (asyncore.dispatcher):
+    """A minimal HTTP proxy server"""
+
+    connection_class = HTTPProxyToClientConnection
+
+    _counter = 0
+
+    def __init__(self, listen_host, listen_port, observer_factory=None):
+        self._obs_factory = observer_factory
+        asyncore.dispatcher.__init__(self)
+        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.set_reuse_addr()
+        self.bind((listen_host, listen_port))
+        self.listen(5)
+
+    def handle_accept(self):
+        info = self.accept()
+        if info:
+            # Got a connection.
+            conn, addr = info
+            conn.setblocking(0)
+            counter = self._counter + 1
+            self._counter = counter
+            self.connection_class(conn, self._obs_factory, counter, addr)
+
+    def handle_error(self):
+        # Don't stop the server.
+        import traceback
+        traceback.print_exc()
+
+
+#############################################################################
+#
+# Command-line interface
+#
+#############################################################################
+
+def usage():
+    sys.stderr.write(COPYRIGHT + '\n')
+    sys.stderr.write(
+        """TCP monitoring and logging tool with support for HTTP 1.1
+Simple usage: tcpwatch.py -L listen_port:dest_hostname:dest_port
+
+TCP forwarded connection setup:
+  -L <listen_port>:<dest_port>
+     Set up a local forwarded connection
+  -L <listen_port>:<dest_host>:<dest_port>
+     Set up a forwarded connection to a specified host
+  -L <listen_host>:<listen_port>:<dest_host>:<dest_port>
+     Set up a forwarded connection to a specified host, bound to an interface
+
+HTTP setup:
+  -h (or --http) Split forwarded HTTP persistent connections
+  -p [<listen_host>:]<listen_port> Run an HTTP proxy
+
+Output options:
+  -s   Output to stdout instead of a Tkinter window
+  -n   No color in GUI (faster and consumes less RAM)
+  -c   Extra color (colorizes escaped characters)
+  --cr     Show carriage returns (ASCII 13)
+  --help   Show usage information
+
+Recording options:
+  -r <path>  (synonyms: -R, --record-directory)
+    Write recorded data to <path>.  By default, creates request and
+    response files for each request, and writes a corresponding error file
+    for any error detected by tcpwatch.
+  --record-prefix=<prefix>
+    Use <prefix> as the file prefix for logged request / response / error
+    files (defaults to 'watch').
+  --no-record-responses
+    Suppress writing '.response' files.
+  --no-record-errors
+    Suppress writing '.error' files.
+""")
+    sys.exit()
+
+
+def usageError(s):
+    sys.stderr.write(str(s) + '\n\n')
+    usage()
+
+
+def main(args):
+    global show_cr
+
+    try:
+        optlist, extra = getopt.getopt(args, 'chL:np:r:R:s',
+                                       ['help', 'http', 'cr',
+                                        'record-directory=',
+                                        'record-prefix=',
+                                        'no-record-responses',
+                                        'no-record-errors',
+                                       ])
+    except getopt.GetoptError, msg:
+        usageError(msg)
+
+    fwd_params = []
+    proxy_params = []
+    obs_factory = None
+    show_config = 0
+    split_http = 0
+    colorized = 1
+    record_directory = None
+    record_prefix = 'watch'
+    record_responses = 1
+    record_errors = 1
+    recording = {}
+
+    for option, value in optlist:
+        if option == '--help':
+            usage()
+        elif option == '--http' or option == '-h':
+            split_http = 1
+        elif option == '-n':
+            colorized = 0
+        elif option == '-c':
+            colorized = 2
+        elif option == '--cr':
+            show_cr = 1
+        elif option == '-s':
+            show_config = 1
+            obs_factory = StdoutObserver
+        elif option == '-p':
+            # HTTP proxy
+            info = value.split(':')
+            listen_host = ''
+            if len(info) == 1:
+                listen_port = int(info[0])
+            elif len(info) == 2:
+                listen_host = info[0]
+                listen_port = int(info[1])
+            else:
+                usageError('-p requires a port or a host:port parameter')
+            proxy_params.append((listen_host, listen_port))
+        elif option == '-L':
+            # TCP forwarder
+            info = value.split(':')
+            listen_host = ''
+            dest_host = ''
+            if len(info) == 2:
+                listen_port = int(info[0])
+                dest_port = int(info[1])
+            elif len(info) == 3:
+                listen_port = int(info[0])
+                dest_host = info[1]
+                dest_port = int(info[2])
+            elif len(info) == 4:
+                listen_host = info[0]
+                listen_port = int(info[1])
+                dest_host = info[2]
+                dest_port = int(info[3])
+            else:
+                usageError('-L requires 2, 3, or 4 colon-separated parameters')
+            fwd_params.append(
+                (listen_host, listen_port, dest_host, dest_port))
+        elif (option == '-r'
+              or option == '-R'
+              or option == '--record-directory'):
+            record_directory = value
+        elif option == '--record-prefix':
+            record_prefix = value
+        elif option == '--no-record-responses':
+            record_responses = 0
+        elif option == '--no-record-errors':
+            record_errors = 0
+
+    if not fwd_params and not proxy_params:
+        usageError("At least one -L or -p option is required.")
+
+    # Prepare the configuration display.
+    config_info_lines = []
+    title_lst = []
+    if fwd_params:
+        config_info_lines.extend(map(
+            lambda args: 'Forwarding %s:%d -> %s:%d' % args, fwd_params))
+        title_lst.extend(map(
+            lambda args: '%s:%d -> %s:%d' % args, fwd_params))
+    if proxy_params:
+        config_info_lines.extend(map(
+            lambda args: 'HTTP proxy listening on %s:%d' % args, proxy_params))
+        title_lst.extend(map(
+            lambda args: '%s:%d -> proxy' % args, proxy_params))
+    if split_http:
+        config_info_lines.append('HTTP connection splitting enabled.')
+    if record_directory:
+        config_info_lines.append(
+            'Recording to directory %s.' % record_directory)
+    config_info = '\n'.join(config_info_lines)
+    titlepart = ', '.join(title_lst)
+    mainloop = None
+
+    if obs_factory is None:
+        # If no observer factory has been specified, use Tkinter.
+        obs_factory, mainloop = setupTk(titlepart, config_info, colorized)
+
+    if record_directory:
+        def _decorateRecorder(fci, sub_factory=obs_factory,
+                              record_directory=record_directory,
+                              record_prefix=record_prefix,
+                              record_responses=record_responses,
+                              record_errors=record_errors):
+            return RecordingObserver(fci, sub_factory, record_directory,
+                                     record_prefix, record_responses,
+                                     record_errors)
+        obs_factory = _decorateRecorder
+
+    chosen_factory = obs_factory
+    if split_http:
+        # Put an HTTPConnectionSplitter between the events and the output.
+        def _factory(fci, sub_factory=obs_factory):
+            return HTTPConnectionSplitter(sub_factory, fci)
+        chosen_factory = _factory
+    # obs_factory is the connection observer factory without HTTP
+    # connection splitting, while chosen_factory may have connection
+    # splitting.  Proxy services use obs_factory rather than the full
+    # chosen_factory because proxy services perform connection
+    # splitting internally.
+
+    services = []
+    try:
+        # Start forwarding services.
+        for params in fwd_params:
+            args = params + (chosen_factory,)
+            s = ForwardingService(*args)
+            services.append(s)
+
+        # Start proxy services.
+        for params in proxy_params:
+            args = params + (obs_factory,)
+            s = HTTPProxyService(*args)
+            services.append(s)
+
+        if show_config:
+            sys.stderr.write(config_info + '\n')
+
+        # Run the main loop.
+        try:
+            if mainloop is not None:
+                import thread
+                thread.start_new_thread(asyncore.loop, (), {'timeout': 1.0})
+                mainloop()
+            else:
+                asyncore.loop(timeout=1.0)
+        except KeyboardInterrupt:
+            sys.stderr.write('TCPWatch finished.\n')
+    finally:
+        for s in services:
+            s.close()
+
+
+if __name__ == '__main__':
+    main(sys.argv[1:])


More information about the z3-checkins mailing list