#!/usr/bin/env python import sys, os import py import spambayes.hammie import email def warning(msg): print >>sys.stderr, "warning:", msg def report(msg): print >>sys.stdout, "report:", msg def progress(msg): print >>sys.stdout, "...", msg class Classi: def __init__(self, msg): if msg in ('spam','unsure','ham'): self.tag = msg self.prob = None else: try: value = msg['X-Spambayes-Classification'] if value is None: raise KeyError except KeyError: self.tag = self.prob = None else: self.tag, self.prob = map(str.strip, value.split(';')) self.tag = self.tag.lower() self.prob = float(self.prob) def __nonzero__(self): return self.tag is not None def __eq__(self, other): return self.tag == other.tag def __ne__(self, other): return not (self == other) def __str__(self): if self.prob is not None: return '<%s %.2f>' % (self.tag, self.prob) return '<%s>' % self.tag def check(self, **kw): if self.tag in kw: return bool(kw[self.tag]) spamclassi = Classi('spam') hamclassi = Classi('ham') unsureclassi = Classi('unsure') class Trainer: def __init__(self, username, userdir): self.userdir = userdir self.username = username for cand in '.maildir', 'Maildir': maildir = userdir.join(cand) if maildir.check(): spambayesdb = maildir.join('spambayes.db') if spambayesdb.check(): break else: raise py.error.ENOENT("could not locate maildir for %s" %userdir) self.maildir = maildir self.spambayesdb = spambayesdb self.hammie = spambayes.hammie.open(str(self.spambayesdb), 'dbm', 'w') def run(self): if not self.maildir.check(dir=1): warning("Maildir %r does not exist" % self.maildir) return spamlist = self.configinfo('spam', 'spam') for name in spamlist: self.trainboxas(name, spamclassi) hamlist = self.configinfo('ham', '.') for name in hamlist: self.trainboxas(name, hamclassi) self.hammie.store() def configinfo(self, typ, defaultcontent): """ handle and return configuration of spam/ham lists. """ listfile = self.maildir.join('list' + typ) if not listfile.check(): l = ["# list of %s folders" % typ, "# each line holds a folder name", ] if typ != 'spam': l.append("# '.' means your inbox. ") l.append(defaultcontent) listfile.write("\n".join(l)) os.system('chown %s %s' % (self.username, listfile)) progress('created default list: %s' % listfile) boxes = [x for x in listfile.readlines(cr=0) if x.strip() and not x.startswith('#')] #lastrun = listfile.new(ext='lastrun') #if not lastrun.check(): # lastrun.ensure().setmtime(0) return boxes # , lastrun def getnewermessages(self, mbox, lasttime): for msgpath in mbox.join('cur').visit(fil=lambda x: x.check(file=1)): if msgpath.mtime() > lasttime: msg = email.message_from_file(msgpath.open('rb')) if msg: yield msgpath, msg for msgpath in mbox.join('new').visit(fil=lambda x: x.check(file=1)): if msgpath.mtime() > lasttime: msg = email.message_from_file(msgpath.open('rb')) if msg: yield msgpath, msg def checkmboxformat(self, mbox): if not mbox.check(): warning('listed mbox %s does not exist' % mbox) elif not (mbox.join('cur').check() and mbox.join('new').check()): warning('mbox %s has no maildir structure' % mbox) else: return 1 def getflags(self, path): """ return maildir-message flags. """ bn = path.basename rest = bn.split(':')[-1] l = rest.split(',') if len(l) != 2: #warning("message filename format error %r" % path) return '' return l[1] def trainboxas(self, boxname, targetclassi): if boxname == '.': # that's the maildir itself mbox = self.maildir else: mbox = self.maildir.join('.'+boxname) if not self.checkmboxformat(mbox): return progress("checking if %s contains only %s" %(mbox, targetclassi)) lasttime = 0 for path, message in self.getnewermessages(mbox, lasttime): if 'T' in self.getflags(path): continue self.consider(path, message, targetclassi) def consider(self, path, message, targetclassi): classi = Classi(message) if not classi: try: result = self.hammie.filter(message) except TypeError: print "spambayes-filtering of message %r failed" % path #import traceback #traceback.print_exc() return newmessage = email.message_from_string(result) classi = Classi(newmessage) if classi != targetclassi: warning(('message %r was not classified and mismatches ' 'its folder classification, ignoring it') % newmessage,) return else: path.write(result) warning("classified previously unclassified message and wrote back '%s'" % path) else: if targetclassi != classi: self.train(path, message, targetclassi, classi) def train(self, path, message, classi, oldclassi): """ retrain the previous training method. """ if oldclassi.check(unsure=0): self.hammie.untrain(message, oldclassi.check(spam=1)) self.hammie.train(message, classi.check(spam=1), True) result = self.hammie.filter(message) newmessage = email.message_from_string(result) newclassi = Classi(newmessage) progress("retrained %r (towards %s) ... was %s, is %s" %( path.relto(self.userdir), classi, oldclassi, newclassi)) path.write(result) #print "wrote", path, str(newmessage) #print "old was", str(message) os.system('chown %s %s' % (self.username, path)) if __name__ == '__main__': import pwd import user userdir = py.path.local(user.home) username = userdir.basename #print "using username %r" % username t = Trainer(username, userdir) t.run()