""" The recordbot is a (it used to say 'simple' here) bot that reads SDA's history.txt file and can answer questions about records on the levels in SDA's tables. """ TODO = """\ # more player/level info? # dcc records to people? (looks tricky) """ REFRESH_DELAY = 86400 REFRESH_MIN = 10 # twisted imports from twisted.internet import reactor, protocol, defer, utils from twisted.web.client import downloadPage from twisted.internet.task import LoopingCall # system imports import time, sys, os, random, re, sets, datetime import string, cPickle, urllib, fnmatch import irc2, metaplay, feature, flattxt, parseHistory # i'm tempted to change this to random.choice(['utf-8', 'latin-1']) IRC_ENCODING = "utf-8" BASE_URL = 'http://speeddemosarchive.com/quake/' #BASE_URL = 'http://127.0.0.1/~mwh/' HISTORY_TXT_URL = BASE_URL + 'history.txt' CHISTORY_TXT_URL = BASE_URL + 'otherdemos/c-history.txt' CONFIG_URL = BASE_URL + 'config' basiccats = ['ER', 'EH', 'NR', 'NH'] def plausible_cat(cat): if cat == 'CAREFUL': return True if cat.startswith('C') or cat.startswith('NE') or cat.startswith('NN'): cat = cat[1:] return cat[:2] in ['ER', 'EH', 'NR', 'NH'] \ and ''.join(cat[2:]) in '23456789' def skillflavour(s): if s.startswith('CAREFUL'): return 3, '' elif s.startswith('NE') or s.startswith('NN'): return 2, s[1:] elif s.startswith('C'): return 1, s[1:] else: return 0, s def cmpskill(s, t): if s == t: return 0 sf, s = skillflavour(s) tf, t = skillflavour(t) if sf > tf: return 1 elif sf < tf: return -1 i = len(s) - len(t) if i < 0: return -1 elif i > 0: return 1 if len(s) == 3: i = ord(s[-1]) - ord(t[-1]) if i < 0: return -1 elif i > 0: return 1 s = s[:2] t = t[:2] if s not in basiccats: if t in basiccats: return 1 else: return cmp(s, t) else: if t not in basiccats: return -1 i = basiccats.index(s) - basiccats.index(t) if i < 0: return -1 elif i > 0: return 1 else: return 0 def dems_for_year(config, yr): d = [] for l in config.levels.itervalues(): for cat in l.demos.itervalues(): for demo in cat: if demo.date.split('.')[2] == yr: d.append(demo) return d def tabulate(data): lines = [] data = [[str(d) for d in row] for row in data] widths = [0]*len(data[0]) for i in range(len(data[0])): widths[i] = max([len(r[i]) for r in data]) dwidths = [0]*len(data[0]) for i in range(len(data[0])): dwidths[i] = max([len(r[i]) for r in data[1:]]) r = [] for w, d in zip(widths, data[0]): r.append(d.center(w)) lines.append(" | ".join(r)) r = [] for w in widths: r.append('-'*w) lines.append("-+-".join(r)) for row in data[1:]: r = [] for w, dw, d in zip(widths, dwidths, row): r.append(("%*s"%(dw, d)).center(w)) lines.append(" | ".join(r)) return lines def years(): return ['96', '97', '98', '99'] + \ ["%02d"%y for y in range(datetime.date.today().year+1 - 2000)] def table_data_id(config): r = [('Year', '# of demos', 'id demos (%)', 'coops (%)', 'id coops (%)')] for yr in years(): ds = dems_for_year(config, yr) n = len(ds) c = len([d for d in ds if d.level.series == "id"]) c = str(c) + " %5.1f"%(100.0*c/n) s = len([d for d in ds if len(d.players) > 1]) s = str(s) + " %4.1f"%(100.0*s/n) t = len([d for d in ds if len(d.players) > 1 and d.level.series == "id"]) t = str(t) + " %4.1f"%(100.0*t/n) r.append((yr, n, c, s, t)) return r def table_data_coops(config): r = [('Year', '# of demos', '# of coops (%)', '2p', '3p', '4p', '5p', '6p', '7p')] for yr in years(): ds = dems_for_year(config, yr) n = len(ds) c = len([d for d in ds if len(d.players) > 1]) c = str(c) + " %4.1f"%(100.0*c/n) ms = tuple([len([d for d in ds if len(d.players) == p]) for p in range(2, 8)]) r.append((yr, n, c,) + ms) return r def table_data_features(config, featss): firstpatann = feature.parse_feature_flags(featss[0]) patanns = [feature.parse_feature_flags(featss[0] + f) for f in featss[1:]] r = [['Year']] for fres in [firstpatann] + patanns: r[0].append('# %s'%(', '.join(fres.annotation),)) for yr in years(): ds = dems_for_year(config, yr) r.append([yr]) n = len(firstpatann.filter_demos(ds)) r[-1].append(n) for p in patanns: c = len(p.filter_demos(ds)) if n: c = str(c) + " %5.1f"%(100.0*c/n) else: c = str(c) + " ---" r[-1].append(c) return r def fmt_players(players, sep='/'): return sep.join([p.name.encode(IRC_ENCODING) for p in players]) class Reply(object): def __init__(self, ircclient, channel, line, theuser): self.ircclient = ircclient self.channel = channel self.theuser = theuser self.line = line self.lines = [] def __call__(self, *args): self.lines.append(flattxt.render(args)) def getlines(self): if len(self.lines) <= 15: for line in self.lines: yield line else: for line in self.lines[:7]: yield line yield ("[... skipping %s lines to avoid excess flood ...]" %(len(self.lines) - 14,)) for line in self.lines[-7:]: yield line class RecordBot(irc2.AdvancedClient, metaplay.AutoReloader): """ """ performLogin = 0 lineRate = 0.1 def __init__(self): self.nickname = "recordbot" irc2.AdvancedClient.__init__(self) self.ldefer = None self.orbsslap = None def connectionMade(self): irc2.AdvancedClient.connectionMade(self) self.sendLine("USER %s foo bar :%s" % (self.nickname, self.realname)) self.nick(self.nickname).addErrback( self.tryNick, self.nickname, 0) self.factory.activeClient = self self.pinger = LoopingCall(self.ping, self.nickname) self.pinger.start(120, now=False) def tryNick(self, failure, triedNick, count): print failure, triedNick, count failure.trap(irc2.NicknameInUse) if count > 4: return failure newNick = triedNick+'_' return self.nick(newNick).addErrback(self.tryNick, newNick, count + 1) def signedOn(self): irc2.AdvancedClient.signedOn(self) """Called when bot has succesfully signed on to server.""" def _joinQDQCB(*args): self.slappers = {} self.join("qdq").addCallback(_joinQDQCB) passfile = os.path.expanduser("~/.recordbotpass") if os.path.exists(passfile): passwd = open(passfile).read().strip("\n") self.msg("Q@CServe.quakenet.org", "auth recordbot %s"%(passwd,)) def noticed(self, user, channel, msg): theuser = user.split('!', 1)[0] if theuser == 'Q' and msg == "AUTH'd successfully.": self.msg('L', 'invite #sda') #self.join('sda') elif theuser == 'L': self.handleLMsg(msg) def privmsg(self, user, channel, msg): """This will get called when the bot receives a message.""" theuser = user.split('!', 1)[0] respond = False if channel == self.nickname: channel = theuser respond = True if msg.startswith("%s:" % self.nickname): self.msg(channel, "don't need '%s:' prefix for private messages!" %(self.nickname,)) msg = msg.split(':', 1)[1] commands = msg.split() if commands and commands[0].startswith('@'): respond = False elif msg.startswith("%s:" % self.nickname): respond = True msg = msg.split(':', 1)[1] commands = msg.split() else: commands = msg.split() reply = Reply(self, channel, msg, theuser) if respond: print theuser, channel, commands d = self.factory.command(reply, commands) elif commands and commands[0].startswith('@'): d = self.factory.at_command(reply, commands) else: return def _cb(x): for line in reply.getlines(): self.msg(channel, line) del reply.lines[:] _cb('prelim') if isinstance(d, defer.Deferred): d.addCallback(_cb) def handleLMsg(self, msg): if self.ldefer: if msg.startswith('#'): chan, flags = msg.split() self.ldefer.data[chan] = flags elif msg.endswith("is not authed") or msg == "End of list.": self.ldefer.callback(self.ldefer.data) self.ldefer = None self.ldefer_later.cancel() def action(self, user, channel, msg): """This will get called when the bot sees someone do an action.""" user = user.split('!', 1)[0] if "slaps %s around a bit"%(self.nickname,) in msg \ or "slaps %s about"%(self.nickname,) in msg: if user == "Doctor43": self.msg(channel, "greetings my fellow Python-implemented friend!") return t = time.time() d = t - 60*10 c = 0 for t1 in self.slappers.get(user, []): if t1 > d: c += 1 self.slappers.setdefault(user, []).append(t) if c == 0 or channel != '#qdq': self.msg(channel, '%s: watch it, punk'%(user,)) elif c == 1: self.msg(channel, '%s: do you really want to do that ' 'when I have ops?'%(user,)) elif c == 2: self.msg(channel, '%s: tick, tick, motherfucker'%(user,)) elif c > 2: self.kick(channel, user, 'never liked you anyway!') self.msg(channel, "my, it smells better now %s's gone"%(user,)) self.slappers[user] = [] def userJoined(self, user, channel): if channel != '#qdq': return print user if user == 'Orbs' and getattr(self, 'orbsslap', None) is not None: self.me('#qdq', 'slaps Orbs about a bit with %s'%(self.orbsslap,)) self.orbsslap = None g = self.factory.greets def cb(u): if u in g: self.msg(channel, '[%s] %s'%(user, g[u])) reactor.callLater(0.5, lambda :self.authName(user).addCallback(cb)) def authName(self, user): def cb(info): if 330 in info: return info[330][0][2] else: return None return self.whois(user).addCallback(cb) def timeoutL(self): if self.ldefer: self.ldefer.errback() self.ldefer = None def queryLFor(self, user): if self.ldefer: raise RuntimeError, "too fast" self.msg('L', 'whois '+user) self.ldefer = defer.Deferred() self.ldefer.data = {} self.ldefer_later = reactor.callLater(2.0, self.timeoutL) return self.ldefer def irc_INVITE(self, prefix, params): user = prefix channel = params[1] self.invited(user, channel) def invited(self, user, channel): if user.startswith('L!'): self.join(channel) def authenticated_command(*users): def wrap(func): def command(self, reply, commands): def ircshowlines(): for line in reply.getlines(): reply.ircclient.msg(reply.channel, line) del reply.lines[:] if not users and hasattr(reply, 'ircclient'): def cb(data): chan = reply.channel if not chan.startswith('#'): chan = '#qdq' if 'o' in data.get(chan, ''): d = func(self, reply, commands) ircshowlines() return d else: reply("you can't do that") return return reply.ircclient.queryLFor(reply.theuser).addCallback(cb) elif hasattr(reply, 'ircclient'): def cb(user): if user in users: d = func(self, reply, commands) ircshowlines() return d else: reply("you can't do that") return return reply.ircclient.authName(reply.theuser).addCallback(cb) elif hasattr(reply, 'session'): sess = reply.session authed = getattr(reply.session, 'authed', False) if authed: return func(self, reply, commands) else: reply('you need to auth first') return else: reply("ok, you've really confused me now") command.nodoc = True return command return wrap def hidden_command(cmd): def command(self, reply, commands): if reply.channel == '#qdq': return self.command_LEVEL(reply, ['level'] + commands) return cmd(self, reply, commands) return command class RecordBotFactory(protocol.ReconnectingClientFactory, metaplay.AutoReloader): """A factory for RecordBots. A new protocol instance will be created each time we connect to the server. """ def __init__(self, config): self.callID = reactor.callLater(0, self.refreshData) self.config = config self.greets = {} self.loadGreets() self.loadEggs() self.last_commands = [] self.last_page_commands = [] self.confirmations = {} # the class of the protocol to build when new connection is made protocol = RecordBot def buildProtocol(self, addr): self.resetDelay() return protocol.ReconnectingClientFactory.buildProtocol(self, addr) def refreshData(self): if self.callID and self.callID.active(): self.callID.cancel() dlist = defer.DeferredList([downloadPage(HISTORY_TXT_URL, "history.txt"), downloadPage(CONFIG_URL, "config"), downloadPage(CHISTORY_TXT_URL, "c-history.txt")]) dlist.addCallback(self.parseData) return dlist def parseData(self, reply): parseHistory.parseConfig(open('config', 'U'), self.config) parseHistory.parseHistory(open('history.txt', 'U'), self.config) parseHistory.parseHistory(open('c-history.txt', 'U'), self.config) self.callID = reactor.callLater(REFRESH_DELAY, self.refreshData) def saveGreets(self): cPickle.dump(self.greets, open('greets.pickle', 'wb')) def loadGreets(self): try: f = open('greets.pickle', 'rb') except IOError: self.greets = {} self.saveGreets() else: self.greets = cPickle.load(f) def saveEggs(self): cPickle.dump(self.eggs, open('eggs.pickle', 'wb')) def loadEggs(self): try: f = open('eggs.pickle', 'rb') except IOError: self.eggs = {} self.saveEggs() else: self.eggs = cPickle.load(f) def command(self, reply, commands): cmdname = commands[0] m = getattr(self, 'command_' + cmdname.upper(), None) c = commands[:] if m: r = m(reply, commands) else: line = reply.line.strip().lower() if line in self.eggs: reply(self.eggs[line]) return r = self.command_LEVEL(reply, ["level"] + commands) self.last_commands = c return r def command_HELP(self, reply, commands): reply("http://starship.python.net/crew/mwh/hacks/recordbotdocs.html") def command_TODO(self, reply, commands): for line in TODO.split("\n"): reply(line) def command_TABLEFILLER(self, reply, commands): cands = [] if len(commands) > 1: cat = commands[1].upper() if plausible_cat(cat): for level in self.config.levels: if len(self.config.levels[level].demos.get(cat, [])) == 1: cands.append((level, cat)) else: for level in self.config.levels: for cat in self.config.levels[level].demos: if len(self.config.levels[level].demos[cat]) == 1: cands.append((level, cat)) if cands: l, c = random.choice(cands) if c in self.config.levels[l].demos \ and self.config.levels[l].demos[c]: demo = self.config.levels[l].demos[c][-1] reply('%s %s (%s, %s, %s)'%( l, c, demo.date, fmt_players(demo.players), demo.time)) else: reply('%s %s (unrecorded!)'%(l, c)) def summarize_runs(self, reply, player, runs, annotation): t = 0 for demo in runs: t += parseHistory.parseTime(demo.time) tt = '' hrs = t/3600 if hrs: tt = str(hrs) + ':' t -= hrs*3600 tt += "%02d:%02d"%divmod(t, 60) if annotation: annotation = '[%s] '%(', '.join(annotation),) else: annotation = '' if len(runs) == 1: dS = '' else: dS = 's' if player: reply("%s: %s%s demo%s, total time of demos %s"%( player, annotation, len(runs), dS, tt)) else: reply("%s%s demo%s, total time of demos %s"%( annotation, len(runs), dS, tt)) def print_demolist(self, reply, demos): w = max([len(demo.level.name) for demo in demos] + [0]) for demo in demos: level = demo.level.name if demo.location: t = Link(demo.time, link=BASE_URL + demo.location) else: t = demo.time reply(fmt_stan( "%-*s %-5s %-3s (%s, %s)", w, Link(level, commands=[level, 'all']), t, Link(demo.cat, commands=[level, demo.cat, 'history']), fmt_players(demo.players), fmt_date(demo))) def demos_matching(self, feats, names, possibilities=None): if names: player = " ".join(names) if player and player not in self.config.lc_aliases: raise feature.NumbskullError("don't know player " + player) if player: player = self.config.lc_aliases[player] else: player = None if possibilities is None: possibilities = feature.possibilities[:] possibilities[feature.FEAT_TYPE] = set('abcd') fres = feature.parse_feature_flags(feats, possibilities) if player: runs = fres.filter_demos(player.runs) else: runs = fres.filter_demos(self.config.all_demos()) return runs, player, fres def command_BOTALIKE(self, reply, commands): if len(commands) == 1: commands = self.last_commands else: commands = commands[1:] link = 'http://python.net/~mwh/recordbot/botalike?command=' + \ urllib.quote(' '.join(commands)) reply(Link(link, link=link)) command_WEB = command_BOTALIKE def command_DEMOS(self, reply, commands): feats = [c for c in commands[1:] if c[0] in '+-'] names = [c for c in commands[1:] if not c[0] in '+-'] listem = False for thing in "+list", "-list": while thing in feats: listem = True feats.remove(thing) try: runs, player, fres = self.demos_matching(feats, names) except feature.NumbskullError, e: reply(e.args[0]) return if player: player = fmt_players([player]) self.summarize_runs(reply, player, runs, fres.annotation) if listem: def c(x, y): r = cmp(x.level.seqno, y.level.seqno) if r: return r r = cmpskill(x.cat, y.cat) if r: return r return cmp(x.parsedtime, y.parsedtime) runs.sort(c) self.print_demolist(reply, runs) def command_OLDEST(self, reply, commands, startat=0): feats = [c for c in commands[1:] if c[0] in '+-'] names = [c for c in commands[1:] if not c[0] in '+-'] for n in names: try: N = int(n) except ValueError: pass else: names.remove(n) break else: N = 5 try: runs, player, fres = self.demos_matching(feats, names) except feature.NumbskullError, e: reply(e.args[0]) return #runs = filter(lambda x: not x.level.variantOf, runs) cmd = commands[0].upper() if cmd in ['OLDEST', 'NEWEST']: runs.sort(key=lambda x:x.parseddate) if cmd == 'NEWEST': runs.reverse() elif cmd in ['LONGEST', 'SHORTEST']: runs.sort(key=lambda x:x.parsedtime) if cmd == 'LONGEST': runs.reverse() ann = fres.annotation if ann: ann1 = ' satisfying ' + ', '.join(ann) else: ann1 = '' if player: from1 = (' from ', player.name) else: from1 = '' reply('The ', 'next '*bool(startat), N, ' ', cmd.lower(), ' demos', ann1, from1) self.print_demolist(reply, runs[startat:startat+N]) self.last_page_lim = startat + N self.last_page_commands = commands[:] command_NEWEST = command_LONGEST = command_SHORTEST = command_OLDEST def command_PAGE(self, reply, commands): if self.last_commands[0].upper() not in ('NEWEST', 'LONGEST', 'SHORTEST', 'OLDEST', 'TOP', 'TOPLEVEL', 'PAGE') or not self.last_page_commands: reply('huh?') return meth = getattr(self, 'command_' + self.last_page_commands[0].upper()) meth(reply, self.last_page_commands, startat=self.last_page_lim) def command_RECORDS(self, reply, commands): commands.insert(1, '+records') self.command_DEMOS(reply, commands) def command_RECORDLIST(self, reply, commands): commands.insert(1, '+records') commands.insert(1, '+list') self.command_DEMOS(reply, commands) def command_TABLE(self, reply, commands): if commands[1] == 'id': for line in tabulate(table_data_id(self.config)): reply(line) elif commands[1] == 'coop': for line in tabulate(table_data_coops(self.config)): reply(line) elif commands[1:]: tail = reply.line.split(None, 1)[1] featss = [s.split() for s in tail.split(',')] try: for line in tabulate(table_data_features(self.config, featss)): reply(line) except feature.NumbskullError, e: reply(e.args[0]) else: reply("what?") @authenticated_command() def command_RELOAD(self, reply, commands): import flattxt reload(flattxt) import parseHistory reload(parseHistory) import recordbot reload(recordbot) import feature reload(feature) import webby reload(webby) import unparseHistory reload(unparseHistory) import nolans reload(nolans) import linecache linecache.clearcache() import notification reload(notification) reply("done") @authenticated_command("micahel") def command_UPDATE(self, reply, commands): def cb(output): for line in output.strip('\n').split("\n"): reply(line) return self.command_RELOAD(reply, commands) return utils.getProcessOutput("/bin/sh", args=("-c", "LC_ALL= svn up")).addCallback(cb) def command_SOURCE(self, reply, commands): reply("http://codespeak.net/svn/user/mwh/recordbot") def command_REFRESH(self, reply, commands): if self.callID.getTime() - REFRESH_DELAY + REFRESH_MIN > time.time(): reply('not so fast') else: reply('refreshing...') return self.refreshData().addCallback(lambda x:reply(" ...done!")) @authenticated_command("micahel") def command_REPARSE(self, reply, commands): self.command_RELOAD(reply, commands) self.parseData(None) def demo_to_beat(self, level, cat): if cat[-1] in '2345678': n = int(cat[-1]) spcat = cat[:-1] cats = [spcat] for i in range(2, n): cats.append(spcat + str(i)) lessplayers = filter(None, map(level.records.get, cats)) if lessplayers: return min([(d.parsedtime, d) for d in lessplayers])[1] return None def reply_unrecorded_cat(self, reply, level, cat): if level.no100 and 'H' in cat: reply("It can't be done, I tell you!") return dem2beat = self.demo_to_beat(level, cat) if dem2beat: reply("don't know ", level.name, " ", cat, " -- go beat ", dem2beat.time, "!") return reply("don't know ", level.name, " ", cat, " -- go record it!") return def command_LEVEL(self, reply, commands): options = {'all': False, 'full':False, 'history': False, 'link': False, 'long': False, 'other': False} cats = [] lname = commands[1] if lname in self.config.levels: level = self.config.levels[lname] else: for l in self.config.levels: l = self.config.levels[l] if l.file == lname: level = l break else: reply("don't know level ", lname) return for subcmd in commands[2:]: if subcmd in options: options[subcmd] = True elif plausible_cat(subcmd.upper()): cats.append(subcmd.upper()) else: reply("don't understand ", subcmd) if options['long']: if cats: reply("you didn't need that long!") cats = ['ER', 'EH', 'NR', 'NH'] if level.no100: cats = [c for c in cats if 'H' not in c] if options['all']: cats = [cat for cat in level.demos if skillflavour(cat)[0] == 0] if options['other']: cats = [cat for cat in level.demos if skillflavour(cat)[0] > 0] if options['full']: cats = list(level.demos) cats.sort(cmpskill) if cats: if not options['history']: for cat in cats: if cat not in level.demos: self.reply_unrecorded_cat(reply, level, cat) continue demo = level.demos[cat][-1] if options['link']: if demo.location is None: reply("that demo is not available " "for download, sorry") else: url = BASE_URL + demo.location reply(T.a(href=url)[url]) else: optm = '' if demo.optimal: optm = ' (believed optimal!)' if demo.location: l = Link(demo.time, link=BASE_URL+demo.location) else: l = demo.time reply('the ', Link(level.name, commands=[level.name, 'all']), ' ', Link(cat, commands=[level.name, cat, 'history']), ' is ', l, ' (', fmt_players(demo.players), ', ', fmt_date(demo), ')', optm) else: if len(cats) > 1: reply("can't list more than one history at once") return cat = cats[0] if cat not in level.demos: self.reply_unrecorded_cat(reply, level, cat) return reply(fmt_stan("history of the %s %s:", Link(level.name, commands=[level.name, 'all']), Link(cat, commands=[level.name, cat]))) pnames = [fmt_players(demo.players) for demo in level.demos[cat]] maxlen = max(map(len, pnames)) flags = [False]*len(pnames) flags[-1] = True zipped = zip(level.demos[cat], pnames, flags) bef = after = " " for demo, pname, last in zipped: if demo.location: l = Link(demo.time, link=BASE_URL+demo.location) else: l = demo.time if last: bef = "--> " after = " <--" if demo.kills: reply(bef, fmt_date(demo), fmt_stan(" %-*s %s [%s]", maxlen, pname, l, demo.kills), after) else: reply(bef, fmt_date(demo), fmt_stan(" %-*s %s", maxlen, pname, l), after) elif options['link']: if not level.url and level.variantOf: reply("assuming you meant %s..."%level.variantOf) level = self.config.levels[level.variantOf] if level.series in ['id', 'hip', 'rogue']: reply("that's copyright material that is!!!1!") elif level.url: reply(Link(level.url, link=level.url)) else: reply("http://speeddemosarchive.com/quake/maps/%s.zip"%level.name) else: msg = [level.name] if options['all']: cats = [cat for cat in level.demos if skillflavour(cat)[0] == 0] elif options['other']: cats = [cat for cat in level.demos if skillflavour(cat)[0] > 0] elif options['full']: cats = list(level.demos) else: cats = ['ER', 'EH', 'NR', 'NH'] cats.sort(cmpskill) for cat in cats: if cat in level.demos: msg.append(Link('%s=%s'%(cat, level.demos[cat][-1].time), commands=[level.name, cat, 'history'])) if len(msg) == 1: reply("no records on %s yet"%(level.name,)) else: r = [''] * (2*len(msg) - 1) r[::2] = msg r[1::2] = [' : '] * (len(msg) - 1) reply(*r) def command_0WNAGE(self, reply, commands): feats = [c for c in commands[1:] if c[0] in '+-'] names = [c for c in commands[1:] if not c[0] in '+-'] listem = False for thing in "+list", "-list": while thing in feats: listem = True feats.remove(thing) possibilities = feature.possibilities[:] possibilities[feature.FEAT_TYPE] = set('abcd') possibilities[feature.FEAT_RECORD] = set('T') possibilities[feature.FEAT_NPLAYERS] = set('1') try: runs, player, fres = self.demos_matching(feats, names, possibilities) except feature.NumbskullError, e: reply(e.args[0]) return if fres.alloweds[feature.FEAT_RECORD] != set('T'): reply("you can't 0wn without records") return if len(fres.alloweds[feature.FEAT_NPLAYERS]) != 1: reply("only works with a fixed number of players for now") return target = len(fres.alloweds[feature.FEAT_TYPE]) if target == 1: reply("having one record isn't much 0wnage") return bylevel = {} for run in runs: bylevel.setdefault((run.level, tuple(sorted(run.players))), []).append(run) TOs = [] for k, v in bylevel.iteritems(): if len(v) == target: TOs.append(k) if not player: reply(len(TOs), " TO", "s"*(len(TOs)!=1), " of specified type") else: reply("%s has TO of %d level%s"%( fmt_players([player]), len(TOs), "s"*(len(TOs) != 1))) if listem: def c(x, y): return cmp(x[0].seqno, y[0].seqno) TOs.sort(c) for l, ps in TOs: if not player: reply(l, ' ', fmt_players(ps)) else: reply(l) command_OWNAGE = command_0WNAGE def command_WHOIS(self, reply, commands): def _cb(user): if user is not None: reply("%s is authed as %s"%(commands[1], user)) else: reply("%s does not appear to be authed"%(commands[1],)) def _eb(fail): reply("who!?") d = reply.ircclient.authName(commands[1]) d.addCallback(_cb).addErrback(_eb) return d command_WHOIS.nodoc = True @authenticated_command() def command_SETGREETING(self, reply, commands): def _cb2(user): if user is None: reply("can only set greetings for authed users") return self.greets[user] = g reply("done " + repr(g)) def _eb2(f): self.greets[commands[1]] = g reply("done " + repr(g)) try: g = reply.line.split(None, 2)[2] except IndexError: g = '' d = reply.ircclient.authName(commands[1]) d.addCallback(_cb2).addErrback(_eb2) d.addCallback(lambda x:self.saveGreets()) return d def command_HOLES(self, reply, commands): feats = [c for c in commands[1:] if c[0] in '+-'] notfeats = [c for c in commands[1:] if not c[0] in '+-'] if notfeats: reply('huh?') return listem = False for thing in "+list", "-list": while thing in feats: listem = True feats.remove(thing) possibilities = feature.possibilities[:] possibilities[feature.FEAT_TYPE] = set('abcd') possibilities[feature.FEAT_MARATHON] = set('F') possibilities[feature.FEAT_NPLAYERS] = set('1') try: fres = feature.parse_feature_flags( feats, possibilities, feature.LEVEL_FEATURES|feature.SLOT_FEATURES) except feature.NumbskullError, e: reply(e.args[0]) return pskill = [] for p in fres.alloweds[feature.FEAT_NPLAYERS]: if p == '1': pskill.append('') else: pskill.append(p) cats = [feature.typecode2skill[k] for k in fres.alloweds[feature.FEAT_TYPE]] allcats = set() for cat in cats: for p in pskill: allcats.add(cat + p) levels = [l for l in self.config.levels.itervalues() if not l.variantOf and not l.no100 and l.name != 'pgexit' and str(l.marathon)[0] in fres.alloweds[feature.FEAT_MARATHON] and str(l.series)[0] in fres.alloweds[feature.FEAT_SERIES] and (not fres.levelglob or fnmatch.fnmatchcase(l.name, fres.levelglob)) ] cmpfunc = lambda x,y: cmp(x.seqno, y.seqno) levels.sort(cmpfunc) holecount = 0 slots = [] for level in levels: missingcats = allcats - set([r for (r, d) in level.records.iteritems() if d.record]) missingcats = sorted(missingcats, cmp=cmpskill) if missingcats: holecount += len(missingcats) m = [] for c in missingcats: b = self.demo_to_beat(level, c) if b: c = (c, ' (need to beat ', Link(b.time, commands=[level.name, b.cat]), ') ') else: c += ' ' m.append(c) slots.append((level, m)) if listem: if fres.randommax and len(slots) > fres.randommax: tail = ", displaying them on %s of %s levels:"%(fres.randommax, len(slots)) slots = feature.mrrandom.sample(slots, fres.randommax) else: tail = " on %s levels:"%(len(slots),) else: tail = " on %s levels"%(len(slots),) reply("There are %s matching holes%s"%(holecount, tail)) if listem: m = 0 for l, c in slots: m = max(len(l.name), m) for l, c in slots: reply(fmt_stan("%-*s ", m, Link(l.name, commands=[l.name, 'all'])), c) def command_AUTH(self, reply, commands): if hasattr(reply, 'ircclient'): passfile = os.path.expanduser("~/.recordbotpass") if os.path.exists(passfile): passwd = open(passfile).read().strip("\n") self.msg("Q@CServe.quakenet.org", "auth recordbot %s"%(passwd,)) else: fname = os.path.expanduser("~/.recordbotauth") if not os.path.exists(fname): reply("i don't think this is going to work") return thePass = open(fname).read().strip() if len(commands) != 2 or not hasattr(reply, 'session'): reply('wtf?') return if thePass == commands[1]: reply.session.authed = True reply('authed') else: reply('nope') command_AUTH.nodoc = True @authenticated_command("micahel") def command_QUIT(self, reply, commands): reply.ircclient.quit() @authenticated_command("micahel") def command_JOIN(self, reply, commands): if len(commands) != 2: return channel = commands[1] reply.ircclient.join(channel) #reactor.connectTCP("uk.quakenet.org", 6667, self) @authenticated_command("micahel") def command_KICK(self, reply, commands): parts = reply.line.split(None, 2) if len(parts) == 3: reason = parts[-1] else: reason = None reply.ircclient.kick('#qdq', commands[1], reason) @authenticated_command() def command_NICK(self, reply, commands): reply.ircclient.setNick(commands[1]) @authenticated_command() @hidden_command def command_EGGS(self, reply, commands): if not self.eggs: reply('no eggs!') for k, v in sorted(self.eggs.items()): reply(k, ' : ', v) @authenticated_command() @hidden_command def command_DELEGG(self, reply, commands): cmd = commands[0] line = reply.line.strip()[len(cmd):].strip() if line in self.eggs: reply('undefining egg definition of "%s" as "%s"'%(line, self.eggs[line])) del self.eggs[line] self.saveEggs() else: reply('what?') @hidden_command def command_Y(self, reply, commands): d = self.confirmations.get((reply.theuser, reply.channel)) if d: d.callback(None) del self.confirmations[(reply.theuser, reply.channel)] else: reply('what?') @hidden_command def command_N(self, reply, commands): if reply.channel == '#qdq': reply("don't know level n") return d = self.confirmations.get((reply.theuser, reply.channel)) if d: d.errback(None) del self.confirmations[(reply.theuser, reply.channel)] else: reply('what?') @hidden_command @authenticated_command() def command_EGG(self, reply, commands): cmd = commands[0] line = reply.line.strip()[len(cmd):].strip() if hasattr(reply, 'ircclient') and reply.channel.startswith('#'): nick = reply.ircclient.nickname + ': ' else: nick = '' if ':' not in line: reply("usage: %segg : "%nick) return term, defn = line.split(':', 1) term = term.strip().lower() defn = defn.strip() if term in self.config.levels: reply(term, ' is a level') return for l in self.config.levels: l = self.config.levels[l] if l.file == term: reply(term, ' is a level') return reply('defining ', repr(term), ' as ', repr(defn)) reply('say "%sy" to confirm or "%sn" to cancel'%(nick, nick)) d = defer.Deferred() self.confirmations[(reply.theuser, reply.channel)] = d def _cb(value): self.eggs[term] = defn self.saveEggs() reply('done') def _eb(value): reply('ok') d.addCallbacks(_cb, _eb) return d @authenticated_command() @hidden_command def command_SLAP(self, reply, commands): action = 'slaps %s about a bit with a large trout'%commands[1] reply.ircclient.me('#qdq', action) @authenticated_command() @hidden_command def command_ORBSSLAP(self, reply, commands): if commands[1:]: reply.ircclient.orbsslap = ' '.join(commands[1:]) reply("Orbs will be slapped with %r when he next joins #qdq"%(reply.ircclient.orbsslap,)) @authenticated_command() @hidden_command def command_SAY(self, reply, commands): cmd = commands[0] line = reply.line.strip()[len(cmd):].strip() reply.ircclient.msg('#qdq', line) def command_TOP(self, reply, commands, startat=0): feats = [c for c in commands[1:] if c[0] in '+-'] notfeats = [c for c in commands[1:] if not c[0] in '+-'] possibilities = feature.possibilities[:] possibilities[feature.FEAT_TYPE] = set('abcd') try: fres = feature.parse_feature_flags(feats, possibilities) except feature.NumbskullError, e: reply(e.args[0]) return if notfeats: lim = int(notfeats[0]) else: lim = 5 d = {} for r in fres.filter_demos(self.config.all_demos()): for p in r.players: d[p] = d.get(p, 0) + 1 l = [(v,k) for (k,v) in d.iteritems()] l.sort() l.reverse() if fres.alloweds[feature.FEAT_RECORD] == set('TF'): desc = 'demos' elif fres.alloweds[feature.FEAT_RECORD] == set('T'): desc = 'records' else: desc = 'beaten records' ann = [] for a in fres.annotation: if 'Record' not in a: ann.append(a) if startat: first = ("Next ", lim, " top") else: first = ("Top ", lim) if ann: reply(first, " players by ", desc, " satisfying ", ', '.join(ann)) else: reply(first, " players by ", desc) for c, p in l[startat:startat+lim]: p = fmt_players([p]) reply(Link(str(c) + ' ' + p, commands=['demos', p, '+list']+feats)) self.last_page_commands = commands[:] self.last_page_lim = startat + lim def command_TOPLEVEL(self, reply, commands, startat=0): feats = [c for c in commands[1:] if c[0] in '+-'] notfeats = [c for c in commands[1:] if not c[0] in '+-'] lim = None names = [] for notfeat in notfeats: try: lim = int(notfeat) except ValueError: names.append(notfeat) if lim is None: lim = 5 d = {} runs, player, fres = self.demos_matching(feats, names) for r in runs: d[r.level] = d.get(r.level, 0) + 1 l = [(v,k) for (k,v) in d.iteritems()] l.sort() l.reverse() if fres.alloweds[feature.FEAT_RECORD] == set('TF'): desc = 'demos' elif fres.alloweds[feature.FEAT_RECORD] == set('T'): desc = 'records' else: desc = 'beaten records' ann = [] for a in fres.annotation: if 'Record' not in a: ann.append(a) if player: player = ' by ' + fmt_players([player]) if startat: first = ("Next ", lim, " top") else: first = ("Top ", lim) line = [first, " levels by ", desc] if ann: line.extend((" satisfying ", ', '.join(ann))) if player: line.append(' and') if player: line.append(player) reply(*line) for c, level in l[startat:startat+lim]: reply(Link(str(c) + ' ' + level.name, commands=['demos', '+level:'+level.name, '+list']+feats)) self.last_page_commands = commands[:] self.last_page_lim = startat + lim def no_command_LOTW(self, reply, commands): d = cPickle.load(open('/home/crew/mwh/lotw2.pickle', 'rb')) cats = [] options = {'table':False, 'long':False, 'all':False, 'link':False} for thing in commands[1:]: if plausible_cat(thing.upper()): cats.append(thing.upper()) elif thing in options: options[thing] = True else: reply("what do you mean by %s?"%thing) return if options['link']: if options.values().count(True) != 1 or cats: reply("you're confusing") else: reply("http://speeddemosarchive.com/quake/maps/sm116_adamllis.zip") return if not cats: if options['all']: cats = sorted(d, cmpskill) else: cats = [c for c in basiccats if c in d] else: if options['all'] or options['long']: reply("you're being inconsistent") return options['long'] = True if options['table']: if len(cats) != 1: reply('only one table at a time') return cat, = cats if cat not in d: reply('no entries for %s yet'%cat) else: l = [(y,x) for (x,y) in d[cat].iteritems()] l.sort() for i, (t, p) in enumerate(l): reply('# ', i+1, ': ', p, ' in ', t) else: if options['long']: for cat in cats: if cat not in d: reply('no entries for %s yet'%cat) else: l = [(y,x) for (x,y) in d[cat].iteritems()] l.sort() reply('LOTW ', cat, ': ', l[0][1], ' in ', l[0][0]) else: r = [] for cat in cats: l = [(y,x) for (x,y) in d[cat].iteritems()] l.sort() r.append('%s = %04.2f'%(cat, l[0][0])) reply(' : '.join(r)) def at_command(self, reply, commands): cmdname = commands[0][1:] m = getattr(self, 'at_command_' + cmdname.upper(), None) if m: m(reply, commands) else: self.at_command_default(reply, commands) def at_command_default(self, reply, commands): if commands[0].endswith('quote'): who = commands[0][1:-5] quotes = open("quote.txt").read().rstrip("\n").split('\n\n') quotes = [q for q in quotes if q.lower().startswith("<"+who.lower())] if not quotes: reply("either you're making ", who, " up, or they are too boring to be quoted") return if len(commands) > 1: try: i = int(commands[1]) - 1 quote = quotes[i] except (ValueError, IndexError): reply("you speak nonsense") return else: quote = random.choice(quotes) for line in quote.split("\n"): reply(line) def at_command_QUOTE2(self, reply, commands): quotes = open("quote.txt").read().rstrip("\n").split('\n\n') if len(commands) > 1: try: i = int(commands[1]) - 1 quote = quotes[i] except (ValueError, IndexError): reply("you speak nonsense") return else: quote = random.choice(quotes) for line in quote.split("\n"): reply(line) at_command_QUOTE = at_command_QUOTE2 def at_command_ADDQUOTE(self, reply, commands): i = len("@addquote") q = reply.line[i:].strip() print repr(q) quotes = open("quote.txt").read().rstrip("\n").split('\n\n') quotes.append(q) open("quote.txt", "w").write("\n\n".join(quotes)) reply("added quote ", len(quotes)) def at_command_DENNQUOTE(self, reply, commands): quotes = open("quote.txt").read().rstrip("\n").split('\n\n') quotes = [q for q in quotes if q.startswith(" 1: try: i = int(commands[1]) - 1 quote = quotes[i] except (ValueError, IndexError): reply("you speak nonsense") return else: quote = random.choice(quotes) for line in quote.split("\n"): reply(line) def at_command_SEARCHQUOTES(self, reply, commands): if len(commands) < 2: reply("you have to search for something") return quotes = open("quote.txt").read().rstrip("\n").split('\n\n') term = reply.line.split(None, 1)[1].lower() indexes = [i+1 for (i, q) in enumerate(quotes) if term in q.lower()] if indexes: reply(' '.join(map(str, indexes))) else: reply('no hits found') from nevow.flat import registerFlattener from nevow import tags as T def flattener(*typ): def _(f): for t in typ: registerFlattener(f, t) return f return _ def makeUrl(commands): return 'botalike?command='+urllib.quote(' '.join(commands)) @flattener(parseHistory.Level) @flattxt.renderer(parseHistory.Level) def flattenLevel(level, context=None): return T.a(href=makeUrl([level.name]))[level.name] class LevelWithWidth(object): def __init__(self, level, width): self.level = level self.width = width @flattener(LevelWithWidth) @flattxt.renderer(LevelWithWidth) def flattenLevelWW(levelww, context=None): level = levelww.level width = levelww.width return T.a(href=makeUrl([level.name]))["%-*s"%(width, level.name)] @flattener(parseHistory.Demo) @flattxt.renderer(parseHistory.Demo) def flattenDemo(demo, context=None): return T.a(href=makeUrl([demo.level.name, demo.cat, 'history']))[ "%-5s"%(demo.time,), ' ', "%-4s"%demo.cat] def Link(text, commands=None, link=None): if link is None and commands is not None: link = 'botalike?command=' + urllib.quote(' '.join(commands)) elif link is None: assert 0 return T.a(href=link)[text] def fmt_date(demo): return Link(demo.date, commands=['demos', '+all', '+list', '+%s'%demo.parseddate.year, '+%s'%feature.months[demo.parseddate.month-1], '+day%s'%demo.parseddate.day]) class fmt_stan(object): def __init__(self, fmt, *stuff): self.fmt = fmt self.stuff = stuff @flattener(fmt_stan) def fmt_stan_flattener(fs, context): fmt = fs.fmt stuff = fs.stuff it = iter(stuff) parts = fmt.split('%') r = [parts[0]] for p in parts[1:]: s_index = p.index('s') f = '%' + p[:s_index + 1] if '*' in f: w = it.next() v = it.next() if isinstance(v, T.Tag): rhs = (flattxt.render(v), ) v.clear() if '*' in f: rhs = (w,) + rhs r.append(v[f%rhs]) else: rhs = (v, ) if '*' in f: rhs = (w,) + rhs r.append(f%rhs) r.append(p[s_index+1:]) return r @flattxt.renderer(fmt_stan) def fmt_stan_renderer(fs): def r(x): if isinstance(x, int): return x else: return flattxt.render(x) return fs.fmt%tuple(map(r, fs.stuff))