#! /usr/bin/env python """ For crappy high-packets-dropping connexion. Provides a fast-recovery TCP-like pipe, over UDP. Usage: on a remote server with non-firewalled incoming UDP connexions: udpcrappyline.py --server LISTENPORT REALSERVER:PORT on your local machine with auto-routed-back UDP packets: udpcrappyline.py --client LISTENPORT CRAPPYLINESERVER:PORT """ from socket import * import greensock2 import pipelayer from struct import pack, unpack from time import time as now RETRY_TIMEOUT = 3 # seconds KEEPALIVE = 82 # seconds PACKET_SIZE = 1500 - 32 MAX_SIZE = PACKET_SIZE + 4 INIT_PACKET = '' MAGIC = 'uCl1' KEEPALIVE_PACKET = 'uCl' VERBOSE = False class Runner: def __init__(self, forward): self.forward = forward output_giver, self.output_accepter = greensock2.meetingpoint() self.input_giver, input_accepter = greensock2.meetingpoint() queue_size_trigger, queue_size_waiter = greensock2.meetingpoint() encode_trigger, encode_waiter = greensock2.meetingpoint() self.pipe = pipelayer.PipeLayer() self.pipe.timeout = RETRY_TIMEOUT greensock2.autogreenlet(self.forwardreader, queue_size_waiter, encode_trigger) greensock2.autogreenlet(self.forwardwriter, input_accepter, encode_trigger) greensock2.autogreenlet(self.encoder, encode_waiter, output_giver, queue_size_trigger) if KEEPALIVE: self.last_packet_sent = now() greensock2.autogreenlet(self.keepalive_sender, output_giver) def mainloop(self): greensock2.wait() def add_transport(self, conn): greensock2.autogreenlet(self.locallinereader, conn) greensock2.autogreenlet(self.locallinewriter, conn) def locallinereader(self, conn): while 1: self.input_giver.wait() data = greensock2.recv(conn, MAX_SIZE) if data[:4] == MAGIC: log("[in] %d bytes", len(data)) self.input_giver.give(data[4:]) elif data == KEEPALIVE_PACKET: log("[in] keepalive") else: log("[in] %d bytes - bad magic", len(data)) def locallinewriter(self, conn): while 1: greensock2.wait_output(conn) buffer = self.output_accepter.accept() if buffer[:4] == MAGIC: log("[out] %d bytes", len(buffer)) elif buffer == KEEPALIVE_PACKET: log("[out] keepalive") else: log("[out] %d bytes - bad magic", len(buffer)) conn.send(buffer) del buffer def forwardwriter(self, input_accepter, encode_trigger): while 1: data = input_accepter.accept() self.pipe.settime(now()) data = self.pipe.decode(data) if data: greensock2.sendall(self.forward, data) del data encode_trigger.trigger() def forwardreader(self, queue_size_waiter, encode_trigger): while 1: while self.pipe.queue_size() > 8192: queue_size_waiter.wait_trigger() buffer = greensock2.recv(self.forward, 8192) if not buffer: raise EOFError("local connexion closed") self.pipe.queue(buffer) #print "QUEUED:", self.pipe.queue_size(), self.pipe.settime(now()) del buffer encode_trigger.trigger() def encoder(self, encode_waiter, output_giver, queue_size_trigger): output_giver.wait() while 1: t = now() delay = self.pipe.settime(t) if delay == 0.0: data = self.pipe.encode(PACKET_SIZE) #print "QUEUED:", self.pipe.queue_size(), self.pipe.settime(now()) if data: output_giver.give(MAGIC + data) self.last_packet_sent = t del data queue_size_trigger.trigger() output_giver.wait() else: # wait for the specified period or for more data to encode if delay is not None: delay *= 1.05 encode_waiter.wait_trigger(delay) def keepalive_sender(self, output_giver): while 1: t = now() delay = self.last_packet_sent + KEEPALIVE - t if delay < 0.5: output_giver.give(KEEPALIVE_PACKET) self.last_packet_sent = t else: greensock2.sleep(delay) if VERBOSE: def log(msg, *args): print '*', msg % args else: def log(msg, *args): pass def run_server(port, forward_addr): srv = socket(AF_INET, SOCK_DGRAM) srv.bind(('', port)) print 'waiting for the first packet on %r.' % (srv.getsockname(),) data, client_addr = srv.recvfrom(MAX_SIZE) srv.connect(client_addr) print 'received packet from %r.' % (client_addr,) if data != INIT_PACKET: raise ValueError, "bad init packet" forward = socket(AF_INET, SOCK_STREAM) forward.connect(forward_addr) print 'connected to %r.' % (forward.getpeername(),) runner = Runner(forward) runner.add_transport(srv) runner.mainloop() def run_client(local_port, server_addr, callback=None): local1 = socket(AF_INET, SOCK_STREAM) local1.bind(('', local_port)) local1.listen(1) print 'waiting for local connexion on %r...' % (local1.getsockname(),) if callback: callback(local1) local, addr = local1.accept() print 'accepted connexion from %r.' % (addr,) runner = Runner(local) conn = socket(AF_INET, SOCK_DGRAM) conn.connect(server_addr) conn.send(INIT_PACKET) runner.add_transport(conn) runner.mainloop() if __name__ == '__main__': import sys if len(sys.argv) != 4: print >> sys.stderr, __doc__ sys.exit(2) mode, listenport, forwardaddr = sys.argv[1:] listenport = int(listenport) forwardhost, forwardport = forwardaddr.split(':') forwardport = int(forwardport) forwardaddr = (forwardhost, forwardport) if mode == '--server': run_server(listenport, forwardaddr) elif mode == '--client': run_client(listenport, forwardaddr) else: print >> sys.stderr, __doc__ sys.exit(2)