from xdrlib2 import * # Based on RFC 1831. class AuthFlavor (XDR_enum): AUTH_NONE = 0 AUTH_SYS = 1 # Unix-like authentication. AUTH_SHORT = 2 # Random token authentication. class MsgType (XDR_enum): CALL = 0 REPLY = 1 class ReplyStat (XDR_enum): MSG_ACCEPTED = 0 MSG_DENIED = 1 class AcceptStat (XDR_enum): SUCCESS = 0 # RPC executed successfully PROG_UNAVAIL = 1 # remote hasn't exported program PROG_MISMATCH = 2 # remote can't support version number PROC_UNAVAIL = 3 # program can't support procedure GARBAGE_ARGS = 4 # procedure can't decode params SYSTEM_ERR = 5 # errors like memory allocation failure class RejectStat (XDR_enum): RPC_MISMATCH = 0 # RPC version number != 2 AUTH_ERROR = 1 # remote can't authenticate caller class AuthStat (XDR_enum): AUTH_OK = 0 # Success # Failed at remote end AUTH_BADCRED = 1 # bad credential (seal broken) AUTH_REJECTEDCRED = 2 # client must begin new session AUTH_BADVERF = 3 # bad verifier (seal broken) AUTH_REJECTEDVERF = 4 # verifier expired or replayed AUTH_TOOWEAK = 5 # rejected for security reasons # Failed locally AUTH_INVALIDRESP = 6 # bogus response verifier AUTH_FAILED = 7 # reason unknown class OpaqueAuth (XDRBase): __xdr_fields__ = ( AuthFlavor('flavor'), XDR_opaque('body'), ) class AcceptedReply (XDRBase): reply_stat = ReplyStat.MSG_ACCEPTED __xdr_fields__ = ( XDR_struct('verf', OpaqueAuth), XDR_explicit_union('accept_stat', {AcceptStat.PROG_MISMATCH: (XDR_uint('mismatch_low'), XDR_uint('mismatch_high'),) } ), ) class RejectedReply (XDRBase): reply_stat = ReplyStat.MSG_DENIED __xdr_fields__ = ( XDR_explicit_union('reject_stat', {RejectStat.RPC_MISMATCH: (XDR_uint('mismatch_low'), XDR_uint('mismatch_high')), RejectStat.AUTH_ERROR: (AuthStat('auth_stat'),) } ), ) class ReplyBody (XDRBase): mtype = MsgType.REPLY __xdr_fields__ = ( XDR_implicit_union('reply', (AcceptedReply, RejectedReply), 'reply_stat'), ) class CallBody (XDRBase): mtype = MsgType.CALL __xdr_fields__ = ( XDR_uint('rpcvers'), XDR_uint('prog'), XDR_uint('vers'), XDR_uint('proc'), XDR_struct('cred', OpaqueAuth), XDR_struct('verf', OpaqueAuth), ) class RpcMsg (XDRBase): __xdr_fields__ = ( XDR_uint('xid'), XDR_implicit_union('body', (CallBody, ReplyBody), 'mtype'), ) class UnixAuthParms (XDRBase): __xdr_fields__ = ( XDR_uint('stamp'), XDR_string('machinename', 255), XDR_uint('uid'), XDR_uint('gid'), XDR_array('gids', 'uint', 16), ) class RPCProgramDef: pass def initializeProgram(klass, parent_globals): names = klass.procnames procs = {} for num, name in names.items(): argsClass = parent_globals[name + '_args'] resClass = parent_globals[name + '_res'] need_auth = parent_globals.get(name + '_need_auth', 1) procs[num] = (name, argsClass, resClass, need_auth) klass.procs = procs if __name__ == '__main__': import socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('', 5679)) data, addr = s.recvfrom(8192) ob = fromXDR(RpcMsg, data) print ob.body.verf.__dict__