from twisted.internet.protocol import ServerFactory, ReconnectingClientFactory from twisted.protocols.basic import LineOnlyReceiver from twisted.words.protocols.irc import IRCClient from twisted.internet.task import LoopingCall from twisted.internet import reactor, defer, utils from twisted.web.client import getPage from twisted.python import log import metaplay, random, os, cPickle, re class Reply(object): def __init__(self, ircclient, channel, line, theuser): self.ircclient = ircclient self.channel = channel self.theuser = theuser self.line = line self.lines = [] def __call__(self, *args): self.lines.append(''.join(args)) def getlines(self): if len(self.lines) <= 15: for line in self.lines: yield line else: for line in self.lines[:7]: yield line yield ("[... skipping %s lines to avoid excess flood ...]" %(len(self.lines) - 14,)) for line in self.lines[-7:]: yield line def authenticated_command(*users): def wrap(func): def command(self, reply, commands): def ircshowlines(): for line in reply.getlines(): reply.ircclient.msg(reply.channel, line) del reply.lines[:] if not users and hasattr(reply, 'ircclient'): def cb(data): chan = reply.channel if not chan.startswith('#'): chan = '#qdq' if 'o' in data.get(chan, ''): d = func(self, reply, commands) ircshowlines() return d else: reply("you can't do that") return return reply.ircclient.queryLFor(reply.theuser).addCallback(cb) elif hasattr(reply, 'ircclient'): def cb(user): if user in users: d = func(self, reply, commands) ircshowlines() return d else: reply("you can't do that") return return defer.succeed(reply.theuser).addCallback(cb) else: reply("ok, you've really confused me now") command.nodoc = True return command return wrap class ElbowtoneResponder(metaplay.AutoReloader): def __init__(self): self.renamers = {} if os.path.exists("renamers.pickle"): self.renamers = cPickle.load(open("renamers.pickle", "rb")) self.loadEggs() def saveEggs(self): cPickle.dump(self.eggs, open('eggs.pickle', 'wb')) def loadEggs(self): try: f = open('eggs.pickle', 'rb') except IOError: self.eggs = {} self.saveEggs() else: self.eggs = cPickle.load(f) def command_EGG(self, reply, commands): key, value = reply.line[3:].split(':') key = key.strip() value = value.strip() self.eggs[key] = value self.saveEggs() reply('defining %r as %r'%(key, value)) def command_RENAME(self, reply, commands): if not reply.channel.startswith('#'): reply("no") reply.ircclient.msg("#pypy", '%s is trying to fiddle with me privately' % channel) return prefix = None if len(commands) > 1: prefix = commands[1][:10] self.renamers[reply.theuser] = self.renamers.get(reply.theuser, 0) + 1 cPickle.dump(self.renamers, open("renamers.pickle", "wb")) reply.ircclient.rename(prefix) def command_RENAMERS(self, reply, commands): renamers = sorted(self.renamers.items(), key=lambda (k,v):v, reverse=True) reply_ = [] for renamer, count in renamers: reply_.append('%s: %s'%(renamer, count)) if len(', '.join(reply_)) > 100: reply_.append('...') break reply("Renamers: ", ', '.join(reply_)) def command_RELOAD(self, reply, commands): import elbowtone reload(elbowtone) reply("ok") def command_VCS(self, reply, commands): vcs = random.choice(['subversion', 'darcs', 'bzr', 'arch', 'CVS', 'Visual SourceSafe', 'SCCS', 'mercurial', 'copying files around', 'monotone', 'codeville']) adjective = random.choice(["best", "worst"]) reply("%s is clearly the %s VCS ever"%(vcs, adjective)) def command_OS(self, reply, commands): os = random.choice(['Mac OS X', 'Linux', 'Multics', 'TOPS-20', 'Windows for Workgroups', 'DOS', 'AT&T Unix', 'Sun Solaris', 'VISTA!?!!', 'Copland', 'unununium', 'Plan9', 'Windows NT 3.1', 'Xenix', 'Minix', 'Tunes', 'VXWorks', "movitz", "genera", 'Debian GNU/HURD']) adjective = random.choice(["best", "worst", "only", "most vaporous", "least useful"]) reply("%s is clearly the %s operating system ever"%(os, adjective)) def command_COMPANY(self, reply, commands): words = list(open('/usr/share/dict/words')) def w(): while 1: r = random.choice(words) if r[0].islower() and "'" not in r: return r[:-1] company = w()+w() reply(company[0].upper() + company[1:]) def command_EDITOR(self, reply, commands): editor = random.choice(['vim', 'emacs', 'ed', 'jed', 'nano', 'pico', 'notepad']) adjective = random.choice(["worst", "best", "funkiest", "most efficient", "most obnoxious"]) reply("%s is clearly the %s editor ever"%(editor, adjective)) def command_HELP(self, reply, commands): reply("optimist") def command_SOURCE(self, reply, commands): reply("http://codespeak.net/svn/user/mwh/elbowtone/") @authenticated_command("mwh", "mwhudson") def command_COMMIT(self, reply, commands): def cb(output): for line in output.strip('\n').split("\n"): reply(line) msg = ' '.join(commands[1:]) if not msg: msg = 'irc commit' return utils.getProcessOutput("/usr/bin/svn", args=("ci", "-m", msg) ).addCallback(cb) def command_CLASSNAME(self, reply, commands): def cb(output): reply(re.search('^

([a-zA-Z0-9_]*)

$', output, re.MULTILINE).group(1)) d = getPage('http://www.classnamer.com/') return d.addCallback(cb).addErrback(log.err) def command_default(self, reply, commands): response = self.eggs.get(reply.line.strip()) if response is not None: reply(response) def command_LANGUAGE(self, reply, commands): reply("Python.") def command(self, reply, commands): cmdname = commands[0] m = getattr(self, 'command_' + cmdname.upper(), None) if m: r = m(reply, commands) else: line = reply.line.strip().lower() r = self.command_default(reply, commands) return r class NotificationProtocol(LineOnlyReceiver, metaplay.AutoReloader): delimiter = '\n' def lineReceived(self, line): for d in self.factory.dests: self.ircclient.msg(d, line) class NotificationFactory(ServerFactory, metaplay.AutoReloader): protocol = NotificationProtocol def __init__(self, botfactory, dests=['#pypy']): self.botfactory = botfactory self.dests = dests def buildProtocol(self, addr): r = ServerFactory.buildProtocol(self, addr) r.ircclient = self.botfactory.activeClient return r class MyIRC(IRCClient, metaplay.AutoReloader): performLogin = 0 lineRate = 0.1 def __init__(self): self.nickname = "elbowtone" self.lastnick = '!!!tone' def update_instance(self): if not hasattr(self, 'lastnick'): self.lastnick = '!!!tone' def connectionMade(self): IRCClient.connectionMade(self) self.sendLine("USER %s foo bar :%s" % (self.nickname, self.realname)) self.setNick(self.nickname) self.factory.activeClient = self self.pinger = LoopingCall(self._ping) self.pinger.start(60, now=False) self.next_rename = reactor.callLater(86400, self.rename) self.responder = ElbowtoneResponder() def _ping(self): self.ping(self.nickname) def signedOn(self): IRCClient.signedOn(self) self.join("pypy") def noticed(self, user, channel, msg): pass def rename(self, prefix=None): if self.next_rename and self.next_rename.active(): self.next_rename.cancel() self.next_rename = reactor.callLater(86400, self.rename) organs = ['elbow', 'chin', 'neck', 'heart', 'lung', 'knee', 'skull', 'foot', 'nose', 'ear', 'eye', 'arm', 'hand', 'chest', 'back', 'head', 'waist', 'wrist', 'face', 'lip', 'liver', 'kidney', 'spleen', 'skin', 'tongue'] if prefix is None: nick = self.nickname while nick == self.nickname: nick = random.choice(organs) + 'tone' else: nick = prefix + 'tone' if nick != self.nickname: self.lastnick = self.nickname self.sendLine("NICK %s" % nick) def privmsg(self, user, channel, msg): prefix = "%s:"%self.nickname usernick = user.split('!', 1)[0] if channel == self.nickname: channel = usernick respond = True if msg.startswith("%s:" % self.nickname): self.msg(channel, "don't need '%s:' prefix for private messages!" %(self.nickname,)) msg = msg.split(':', 1)[1] msg = msg.strip() else: if msg == 'elbowtone: who do you think you are?': self.msg(channel, self.nickname) respond = msg.startswith(prefix) if respond: msg = msg[len(prefix):].strip() if not respond and msg.startswith(self.lastnick + ':'): self.msg(channel, "hi " + usernick + " i'm over here now!") return commands = msg.split() if respond and commands: print usernick, channel, msg reply = Reply(self, channel, msg, usernick) d = self.responder.command(reply, commands) def _cb(x): for line in reply.getlines(): self.msg(channel, line) del reply.lines[:] _cb('prelim') if isinstance(d, defer.Deferred): d.addCallback(_cb) def pong(self, user, secs): pass class IRCClientFactory(ReconnectingClientFactory, metaplay.AutoReloader): protocol = MyIRC def buildProtocol(self, addr): self.resetDelay() return ReconnectingClientFactory.buildProtocol(self, addr)