#!/usr/bin/env python import sys, os from vpath import newpath, error, checker import spambayes.hammie import email def warning(msg): print >>sys.stderr, "warning:", msg def report(msg): print >>sys.stdout, "report:", msg def progress(msg): print >>sys.stdout, "...", msg class Classi: def __init__(self, msg): if msg in ('spam','unsure','ham'): self.tag = msg self.prob = None else: try: value = msg['X-Spambayes-Classification'] except KeyError: self.tag = self.prob = None else: self.tag, self.prob = map(str.strip, value.split(';')) self.tag = self.tag.lower() self.prob = float(self.prob) def __nonzero__(self): return self.tag is not None def __eq__(self, other): return self.tag == other.tag def __ne__(self, other): return not (self == other) def __str__(self): if self.prob is not None: return '<%s %.2f>' % (self.tag, self.prob) return '<%s - >' % self.tag def check(self, **kw): if self.tag in kw: return bool(kw[self.tag]) spamclassi = Classi('spam') hamclassi = Classi('ham') unsureclassi = Classi('unsure') class Trainer: def __init__(self, username, userdir): self.userdir = userdir self.username = username self.maildir = userdir.join('Maildir') self.spambayesdb = self.maildir.join('spambayes.db') self.hammie = spambayes.hammie.open(str(self.spambayesdb), 'dbm', 'c') def run(self): if not self.maildir.check(dir=1): warning("Maildir %r does not exist" % self.maildir) return spamlist = self.configinfo('spam', 'spam') for name in spamlist: self.trainboxas(name, spamclassi) hamlist = self.configinfo('ham', '.') for name in hamlist: self.trainboxas(name, hamclassi) def configinfo(self, typ, defaultcontent): """ handle and return configuration of spam/ham lists. """ listfile = self.maildir.join('list' + typ) if not listfile.check(): l = ["# list of %s folders" % typ, "# each line holds a folder name", ] if typ != 'spam': l.append("# '.' means your inbox. ") l.append(defaultcontent) listfile.write("\n".join(l)) os.system('chown %s %s' % (self.username, listfile)) progress('created default list: %s' % listfile) boxes = [x for x in listfile.readlines(cr=0) if x.strip() and not x.startswith('#')] #lastrun = listfile.new(ext='lastrun') #if not lastrun.check(): # lastrun.ensure().setmtime(0) return boxes # , lastrun def getnewermessages(self, mbox, lasttime): for msgpath in mbox.join('cur').visit(fil=checker(file=1)): if msgpath.mtime() > lasttime: msg = email.message_from_file(msgpath.open('rb')) if msg: yield msgpath, msg for msgpath in mbox.join('new').visit(fil=checker(file=1)): if msgpath.mtime() > lasttime: msg = email.message_from_file(msgpath.open('rb')) if msg: yield msgpath, msg def checkmboxformat(self, mbox): if not mbox.check(): warning('listed mbox %s does not exist' % mbox) elif not (mbox.join('cur').check() and mbox.join('new').check()): warning('mbox %s has no maildir structure' % mbox) else: return 1 def getflags(self, path): """ return maildir-message flags. """ bn = path.get('basename') rest = bn.split(':')[-1] l = rest.split(',') if len(l) != 2: #warning("message filename format error %r" % path) return '' return l[1] def trainboxas(self, boxname, targetclassi): if boxname == '.': # that's the maildir itself mbox = self.maildir else: mbox = self.maildir.join('.'+boxname) if not self.checkmboxformat(mbox): return progress("checking if %s contains only %s" %(mbox, targetclassi)) lasttime = 0 for path, message in self.getnewermessages(mbox, lasttime): if 'T' in self.getflags(path): continue self.consider(path, message, targetclassi) def consider(self, path, message, targetclassi): classi = Classi(message) if not classi: result = self.hammie.filter(message) newmessage = email.message_from_string(result) classi = Classi(newmessage) if classi != targetclassi: warning(('message %r was not classified and mismatches ' 'its folder classification, ignoring it') % newmessage,) return else: path.write(result) warning('classified previously unclassified message and wrote back %r' % path) else: if targetclassi != classi: self.train(path, message, targetclassi, classi) def train(self, path, message, classi, oldclassi): """ retrain the previous training method. """ self.hammie.untrain_from_header(message) self.hammie.train(message, classi.check(spam=1), True) result = self.hammie.filter(message) newmessage = email.message_from_string(result) newclassi = Classi(newmessage) progress("retrained %r as %s (was %s)" %(path.relto(self.maildir), newclassi, oldclassi)) path.write(result) #print "wrote", path, str(newmessage) #print "old was", str(message) os.system('chown %s %s' % (self.username, path)) if __name__ == '__main__': import pwd try: username = os.getlogin() except OSError: username = os.environ['USER'] userdir = newpath(xxlocal=pwd.getpwnam(username)[-2]) t = Trainer(username, userdir) t.run()