import os, os.path, struct, time, datetime, gc, types def timestamp_from_datetime(d): return time.mktime(d.timetuple()) + d.microsecond / 1000000.0 def datetime_from_timestamp(t): return datetime.datetime.fromtimestamp(t) def _decode_entry(data): recvd, timestamp, content_len = Entry.struct.unpack_from(data) recvd = bool(recvd) when = datetime_from_timestamp(timestamp) return recvd, when, content_len class Entry(object): struct = struct.Struct('!BdI') @classmethod def read_from(cls, file): header = file.read(cls.struct.size) recvd, timestamp, content_len = _decode_entry(header) content = file.read(content_len) if len(content) != content_len: raise ValueError('Only read %i of %i charcters'%(len(content), content_len)) return cls(recvd, timestamp, content) def __init__(self, recvd=None, when=None, content=None, data = None): self.recvd = recvd self.when = when self.content = content if data: self.data = data def __repr__(self): return 'Entry(%r, %r, %r)'%(self.recvd, self.when, self.content) def __eq__(self, other): return self.recvd == other.recvd and self.when == other.when and self.content == other.content # Properties def get_data(self): return self.struct.pack(int(self.recvd), timestamp_from_datetime(self.when), len(self.content))+self.content def set_data(self, data): self.recvd, self.when, content_len = _decode_entry(data) self.content = data[self.struct.size : self.struct.size + content_len] data = property(get_data, set_data) class Logger(object): def __init__(self, file): self.file = file def recv(self, data, when=None): entry = Entry(True, when or datetime.datetime.now(), data) print 'logging received entry', repr(entry) self.file.write(entry.data) def send(self, data, when=None): entry = Entry(False, when or datetime.datetime.now(), data) print 'logging sent entry', repr(entry) self.file.write(entry.data) class LogReader(object): def __init__(self, file): self.file = file def __iter__(self): self.file.seek(0) while True: try: yield Entry.read_from(self.file) except ValueError: if file.read() != '': raise packet_types={} def register_type(number, cls): packet_types[number]=cls class PacketParsingError(ValueError): pass class Packet(object): struct=struct.Struct('!BiI') def __init__(self, id_hash=None, content=None, data=None): self.id_hash = id_hash self.content = content if data: self.data = data def __repr__(self): return 'Packet(%r, %r)'%(self.id_hash, self.content) def __eq__(self, other): return self.id_hash == other.id_hash and self.content == other.content # Properties def get_type(self): '''Shortcut for content.type''' return self.content.type type = property(get_type) def get_data(self): content_data=self.content.data data = self.struct.pack(self.type, self.id_hash, len(content_data))+content_data return data def set_data(self, data, strict=True): type, self.id_hash, content_len = self.struct.unpack_from(data) i=self.struct.size j=i+content_len self.content = packet_types[type](data=data[i:j]) data=property(get_data, set_data) class Message(object): type=0 struct=struct.Struct('!I') def __init__(self, message=None, data=None): self.message=message if data: self.data=data def __repr__(self): return 'Message(%r)'%(self.message) def __eq__(self, other): return self.message == other.message # Properties def get_data(self): data=self.struct.pack(len(self.message))+str(self.message) return data def set_data(self, data): message_len, = self.struct.unpack_from(data) # Note tuple i = self.struct.size j = i+message_len self.message=data[i:j] data=property(get_data, set_data) register_type(Message.type, Message) packets = [Packet(727508507, Message('testing'))] def format_referrers(referrers): message = [] for referrer in referrers: if isinstance(referrer, types.FrameType): message.append('frame: %s:%i (%s)'%(referrer.f_code.co_filename, referrer.f_lineno, referrer.f_code.co_name)) else: message.append(repr(referrer)) return '\n'.join(message) class TestLoggerAndReader: filename = 'TestLoggerAndReader.test' subjects = packets def test_sanity(self): try: assert not os.path.exists(self.filename) logger = Logger(open(self.filename, 'wb')) for packet in self.subjects: logger.recv(packet.data) logger.file.close() reader = LogReader(open(self.filename, 'rb')) for entry, packet in zip(reader, self.subjects): print 'reading received entry:', repr(entry) assert entry.recvd == True assert Packet(data=entry.content) == packet reader.file.close() finally: try: print 'logger.file referrers:' print format_referrers(gc.get_referrers(logger.file)) del logger.file print 'reader.file referrers:' print format_referrers(gc.get_referrers(reader.file)) del reader.file except UnboundLocalError: pass gc.collect() os.remove(self.filename)