"""Implements Sun XDR -- eXternal Data Representation and prodives a Python binding. See: RFC 1014 / 1832 """ import struct from string import join import new from cStringIO import StringIO POLICE_CONSTRUCTOR = 0 # A debugging feature. FALSE = 0 TRUE = 1 SequenceTypes = (type(()), type([])) class XDRError (Exception): '''Error during XDR packing/unpacking''' class XDRConversionError(XDRError): '''Conversion error''' class XDRCorruptionError(XDRError): '''Error in data stream''' class XDREOFError(XDRCorruptionError): '''Reached end of data stream prematurely''' class XDRPacker: """Pack various data representations into a buffer.""" def __init__(self): self.reset() def reset(self): self.__buf = StringIO() def get_buffer(self): return self.__buf.getvalue() def pack_uint(self, x): self.__buf.write(struct.pack('>L', x)) pack_int = pack_uint pack_enum = pack_int def pack_bool(self, x): if x: self.__buf.write('\0\0\0\1') else: self.__buf.write('\0\0\0\0') def pack_uhyper(self, x): self.pack_uint(x>>32 & 0xffffffffL) self.pack_uint(x & 0xffffffffL) pack_hyper = pack_uhyper def pack_float(self, x): try: self.__buf.write(struct.pack('>f', x)) except struct.error, msg: raise XDRConversionError(msg) def pack_double(self, x): try: self.__buf.write(struct.pack('>d', x)) except struct.error, msg: raise XDRConversionError(msg) def pack_fstring(self, n, s): if n < 0: raise ValueError('fstring size must be nonnegative') n = ((n+3) >> 2) << 2 data = s[:n] data = data + (n - len(data)) * '\0' self.__buf.write(data) pack_fopaque = pack_fstring def pack_string(self, s, max=-1): n = len(s) if max >= 0 and n > max: raise XDRConversionError( 'String of length %d is longer than maximum of %d' % (n, max)) self.pack_uint(n) self.pack_fstring(n, s) pack_opaque = pack_string pack_bytes = pack_string def pack_list(self, list, pack_item): for item in list: self.pack_uint(1) pack_item(item) self.pack_uint(0) def pack_farray(self, n, list, pack_item): if len(list) <> n: raise ValueError('wrong array size') for item in list: pack_item(item) def pack_array(self, list, pack_item, max=-1): n = len(list) if max >= 0 and n > max: raise XDRConversionError( 'Array of length %d is longer than maximum of %d' % (n, max)) self.pack_uint(n) self.pack_farray(n, list, pack_item) class XDRUnpacker: """Unpacks various data representations from the given buffer.""" def __init__(self, data): self.reset(data) def reset(self, data): self.__buf = StringIO(data) def get_position(self): return self.__buf.tell() def get_buffer(self): return self.__buf.getvalue() def done(self): if self.__buf.read(1): raise XDRCorruptionError('unextracted data remains') def get_data(self, l): data = self.__buf.read(l) if len(data) < l: raise XDREOFError return data def unpack_uint(self): data = self.__buf.read(4) if len(data) < 4: raise XDREOFError x = struct.unpack('>L', data)[0] try: return int(x) except OverflowError: return x def unpack_int(self): data = self.__buf.read(4) if len(data) < 4: raise XDREOFError return struct.unpack('>l', data)[0] unpack_enum = unpack_int unpack_bool = unpack_int def unpack_uhyper(self): hi = self.unpack_uint() lo = self.unpack_uint() return long(hi)<<32 | lo def unpack_hyper(self): x = self.unpack_uhyper() if x >= 0x8000000000000000L: x = x - 0x10000000000000000L return x def unpack_float(self): data = self.__buf.read(4) if len(data) < 4: raise XDREOFError return struct.unpack('>f', data)[0] def unpack_double(self): data = self.__buf.read(8) if len(data) < 8: raise XDREOFError return struct.unpack('>d', data)[0] def unpack_fstring(self, n): if n < 0: raise ValueError('fstring size must be nonnegative') stored_len = (((n + 3) >> 2) << 2) data = self.__buf.read(stored_len) if len(data) < stored_len: raise XDREOFError return data[:n] unpack_fopaque = unpack_fstring def unpack_string(self, max=-1): n = self.unpack_uint() if max >= 0 and n > max: raise XDRCorruptionError( 'Packed string of length %d is longer than maximum of %d' % (n, max)) return self.unpack_fstring(n) unpack_opaque = unpack_string unpack_bytes = unpack_string def unpack_list(self, unpack_item): list = [] while 1: x = self.unpack_uint() if x == 0: break if x <> 1: raise XDRConversionError('0 or 1 expected, got ' + `x`) item = unpack_item() list.append(item) return list def unpack_farray(self, n, unpack_item): list = [] for i in xrange(n): list.append(unpack_item()) return list def unpack_array(self, unpack_item, max=-1): n = self.unpack_uint() if max >= 0 and n > max: raise XDRCorruptionError( 'Packed array of length %d is longer than maximum of %d' % (n, max)) return self.unpack_farray(n, unpack_item) def createBlankInstance(klass): if hasattr(klass, '__basicnew__'): return klass.__basicnew__() else: return new.instance(klass, {}) def fromXDR(klass, data): ob = createBlankInstance(klass) u = XDRUnpacker(data) ob.__xdr_unpack__(ob, u) return ob def toXDR(ob): p = XDRPacker() ob.__xdr_pack__(ob, p) return p.get_buffer() class XDRBase: ''' Base class for objects that can save and retrieve state from an XDR data stream. ''' __xdr_fields__ = () def __xdr_pack__(self, instance, packer): for field in self.__xdr_fields__: field.__xdr_pack__(self, packer) def __xdr_unpack__(self, instance, unpacker): for field in self.__xdr_fields__: field.__xdr_unpack__(self, unpacker) def __init__(self, **kw): # Convenience constructor. self.__dict__.update(kw) if POLICE_CONSTRUCTOR: # Verify signature. required = [] for field in self.__xdr_fields__: required.append(field._attr) if hasattr(field, '__xdr_extra_attributes__'): required.extend(field.__xdr_extra_attributes__(self)) todo = kw.copy() for attr in required: if not todo.has_key(attr): raise TypeError('Required argument: ' + attr) del todo[attr] if len(todo) > 0: raise TypeError('Excessive arguments: ' + join(todo.keys(), ', ')) def __str__(self): s = ['<%s (' % self.__class__.__name__] xf = self.__xdr_fields__ for field in xf: s.append('%s = %s' % (field._attr, getattr(self, field._attr))) if hasattr(field, '__xdr_extra_attributes__'): xtras = field.__xdr_extra_attributes__(self) for x in xtras: s.append('%s = %s' % (x, getattr(self, x))) s.append(')>') return join(s, '\n') class XDRField: ''' Base class for XDR field declarations. ''' def __init__(self, attr): self._attr = attr class XDRSimpleField (XDRField): ''' Base class for simple XDR fields. ''' def __init__(self, attr): self._attr = attr def __xdr_pack__(self, instance, packer): getattr(packer, self._packername)(getattr(instance, self._attr)) def __xdr_unpack__(self, instance, unpacker): setattr(instance, self._attr, getattr(unpacker, self._unpackername)()) def createSimpleXDRTypes(): # Creates XDR_uint, XDR_float, etc. types = ('uint', 'int', 'enum', 'bool', 'uhyper', 'hyper', 'float', 'double',) for t in types: classname = 'XDR_%s' % t co = new.classobj(classname, (XDRSimpleField,), {}) co._packername = 'pack_%s' % t co._unpackername = 'unpack_%s' % t globals()[classname] = co createSimpleXDRTypes() del createSimpleXDRTypes XDR_uint32 = XDR_uint XDR_uint64 = XDR_uhyper class XDR_fstring (XDRField): size = -1 def __init__(self, attr, size=-1): XDRField.__init__(self, attr) if size >= 0: self.size = size def __xdr_pack__(self, instance, packer): val = getattr(instance, self._attr) packer.pack_fstring(self.size, val) def __xdr_unpack__(self, instance, unpacker): val = unpacker.unpack_fstring(self.size) setattr(instance, self._attr, val) XDR_fopaque = XDR_fstring class XDR_string (XDRField): max = -1 def __init__(self, attr, max=-1): XDRField.__init__(self, attr) if max >= 0: self.max = max def __xdr_pack__(self, instance, packer): val = getattr(instance, self._attr) packer.pack_string(val, self.max) def __xdr_unpack__(self, instance, unpacker): val = unpacker.unpack_string(self.max) setattr(instance, self._attr, val) XDR_opaque = XDR_string XDR_bytes = XDR_string class XDR_farray (XDRField): size = -1 def __init__(self, attr, typ, size=-1): XDRField.__init__(self, attr) self._typ = typ if size >= 0: self.size = size def __xdr_pack__(self, instance, packer): func = getattr(packer, 'pack_' + self._typ) val = getattr(instance, self._attr) packer.pack_farray(self.size, val, func) def __xdr_unpack__(self, instance, unpacker): func = getattr(unpacker, 'unpack_' + self._typ) val = unpacker.unpack_farray(self.size, func) setattr(instance, self._attr, val) class XDR_array (XDRField): max = -1 def __init__(self, attr, typ, max=-1): XDRField.__init__(self, attr) self._typ = typ if max >= 0: self.max = max def __xdr_pack__(self, instance, packer): func = getattr(packer, 'pack_' + self._typ) val = getattr(instance, self._attr) packer.pack_array(val, func, self.max) def __xdr_unpack__(self, instance, unpacker): func = getattr(unpacker, 'unpack_' + self._typ) val = unpacker.unpack_farray(func, self.max) setattr(instance, self._attr, val) class XDR_list (XDRField): ''' Or, in XDR terms, a linked list composed of "optional data" structures. ''' def __init__(self, attr, klass): XDRField.__init__(self, attr) self._klass = klass def __xdr_pack__(self, instance, packer): lst = getattr(instance, self._attr) for item in lst: packer.pack_uint(1) item.__xdr_pack__(instance, packer) packer.pack_uint(0) def __xdr_unpack__(self, instance, unpacker): lst = [] klass = self._klass while 1: cont = unpacker.unpack_uint() if cont == 0: break elif cont != 1: raise XDRConversionError('0 or 1 expected, got ' + `cont`) ob = createBlankInstance(klass) ob.__xdr_unpack__(instance, unpacker) lst.append(ob) setattr(instance, self._attr, lst) class XDR_struct (XDRField): def __init__(self, attr, klass): XDRField.__init__(self, attr) self._klass = klass def __xdr_pack__(self, instance, packer): ob = getattr(instance, self._attr) ob.__xdr_pack__(ob, packer) def __xdr_unpack__(self, instance, unpacker): klass = self._klass ob = createBlankInstance(klass) ob.__xdr_unpack__(ob, unpacker) setattr(instance, self._attr, ob) class XDRFieldFactory: def __init__(self, klass): self.klass = klass def __call__(self, attr): return XDR_struct(attr, self.klass) class XDR_explicit_union (XDRField): ''' Union discriminated by an attribute stored in the parent class. ''' def __init__(self, attr, selections, default_sel=()): XDRField.__init__(self, attr) sels = {} for k, actions in selections.items(): if type(actions) not in SequenceTypes: actions = (actions,) sels[k] = actions self._sels = sels if type(default_sel) not in SequenceTypes: default_sel = (default_sel,) self._def = default_sel def __xdr_pack__(self, instance, packer): choice = getattr(instance, self._attr) packer.pack_uint(choice) if self._sels.has_key(choice): actions = self._sels[choice] else: actions = self._def for action in actions: action.__xdr_pack__(instance, packer) def __xdr_unpack__(self, instance, unpacker): choice = unpacker.unpack_uint() setattr(instance, self._attr, choice) if self._sels.has_key(choice): actions = self._sels[choice] else: actions = self._def for action in actions: action.__xdr_unpack__(instance, unpacker) def __xdr_extra_attributes__(self, instance): choice = getattr(instance, self._attr) if self._sels.has_key(choice): actions = self._sels[choice] else: actions = self._def attrs = [] for action in actions: attrs.append(action._attr) if hasattr(action, '__xdr_extra_attributes__'): attrs.extend(action.__xdr_extra_attributes__(instance)) return attrs class XDR_implicit_union (XDRField): ''' Union discriminated by a common class attribute. ''' def __init__(self, attr, klasses, subattr): XDRField.__init__(self, attr) self._subattr = subattr sels = {} for klass in klasses: sels[getattr(klass, subattr)] = klass self._sels = sels def __xdr_pack__(self, instance, packer): ob = getattr(instance, self._attr) choice = getattr(ob, self._subattr) packer.pack_uint(choice) ob.__xdr_pack__(ob, packer) def __xdr_unpack__(self, instance, unpacker): choice = unpacker.unpack_uint() klass = self._sels[choice] ob = createBlankInstance(klass) ob.__xdr_unpack__(ob, unpacker) setattr(instance, self._attr, ob) class XDROptUnion (XDRBase): ''' A union similar to the "optional data" structure defined in the RFC. Not an attribute class. ''' __xdr_attrs__ = ('opted', 'data') def _getDataClass(self, opted): return None def __init__(self, opted, *args, **kw): optattr, dataattr = self.__xdr_attrs__ setattr(self, optattr, opted) klass = self._getDataClass(opted) if klass is not None: data = apply(klass, args, kw) else: data = None setattr(self, dataattr, data) def __xdr_pack__(self, instance, packer): optattr, dataattr = self.__xdr_attrs__ opted = getattr(self, optattr) packer.pack_uint(opted) data = getattr(self, dataattr, None) if data is not None: data.__xdr_pack__(data, packer) def __xdr_unpack__(self, instance, unpacker): optattr, dataattr = self.__xdr_attrs__ opted = unpacker.unpack_uint() setattr(self, optattr, opted) klass = self._getDataClass(opted) if klass is not None: data = createBlankInstance(klass) data.__xdr_unpack__(data, unpacker) else: data = None setattr(self, dataattr, data) class XDR_void: def __xdr_pack__(self, instance, packer): pass def __xdr_unpack__(self, instance, unpacker): pass if __name__ == '__main__': # test suite p = XDRPacker() packtest = [ (p.pack_string, ('hello world',)), (p.pack_uint, (9,)), (p.pack_bool, (None,)), (p.pack_bool, ('hello',)), (p.pack_uhyper, (45L,)), (p.pack_float, (1.9,)), (p.pack_double, (1.9,)), (p.pack_list, (range(5), p.pack_uint)), (p.pack_array, (['what', 'is', 'hapnin', 'doctor'], p.pack_string)), ] succeedlist = [1] * len(packtest) count = 0 for method, args in packtest: print 'pack test', count, try: apply(method, args) print 'succeeded' except XDRConversionError, var: print 'ConversionError:', var.msg succeedlist[count] = 0 count = count + 1 data = p.get_buffer() # now verify up = XDRUnpacker(data) unpacktest = [ (up.unpack_string, (), lambda x: x == 'hello world'), (up.unpack_uint, (), lambda x: x == 9), (up.unpack_bool, (), lambda x: not x), (up.unpack_bool, (), lambda x: x), (up.unpack_uhyper, (), lambda x: x == 45L), (up.unpack_float, (), lambda x: 1.89 < x < 1.91), (up.unpack_double, (), lambda x: 1.89 < x < 1.91), (up.unpack_list, (up.unpack_uint,), lambda x: x == range(5)), (up.unpack_array, (up.unpack_string,), lambda x: x == ['what', 'is', 'hapnin', 'doctor']), ] count = 0 for method, args, pred in unpacktest: print 'unpack test', count, try: if succeedlist[count]: x = apply(method, args) print pred(x) and 'succeeded' or 'failed', ':', x else: print 'skipping' except XDRConversionError, var: print 'ConversionError:', var.msg count = count + 1 up.done()