import os.path import sgmllib import random import time import re import mechanize from datetime import timedelta, datetime from log import log from scheduler import Scheduler, ServerTime, OneShotTask, RepetitiveTask, CannotDoTask from settings import SERVER, USERNAME, PASSWORD, MIN_WAIT, MAX_WAIT from model import Army, SendArmyType # TODO: set ServerTime.DELTA WOOD, CLAY, IRON, CROP = range(1, 5) LOGIN = SERVER + 'login.php' SEND_ARMY = SERVER + 'a2b.php' SEND_TRADER = SERVER + 'build.php?id=19' # XXX FIELDS = SERVER + 'dorf1.php' BUILD = SERVER + 'build.php?id=%d' LEGIONNAIR = '1' SETTLER_COST = [0, 5800, 5300, 7200, 5500] RESIDENCE_ID = 25 # should go to settings CAMELOT = '2435' MEDIOBOGDUM = '11709' VERULAMIUM = '13693' AQUAE_SULIS = '15941' AVALON = '17766' GLEVUM = '2810' CORINIUM = '21438' ISCA = '13114' LONDINIUM = '14675' def format_timestamp(totalsecs): hrs = totalsecs/3600 min = (totalsecs/60) % 60; sec = totalsecs % 60 return '%02d:%02d:%02d' % (hrs, min, sec) class FIELDS_parser(sgmllib.SGMLParser): from htmlentitydefs import entitydefs def __init__(self, data): self.nbuild = None self.fields = {WOOD: {}, CLAY: {}, IRON: {}, CROP: {}} self.troopkind = None self.troops = {} sgmllib.SGMLParser.__init__(self) self.feed(data) def start_a(self, attr): attrs = dict(attr) href = attrs.get('href', None) if 'build.php?id=' in href: _, nbuild = href.split('id=') self.nbuild = int(nbuild) def end_a(self): self.nbuild = None def start_img(self, attr): attrs = dict(attr) if self.nbuild is not None: self._parse_nbuild(attrs) # try to see if it's an troop image (only legionnaires so far) src = attrs.get('src', '') _, filename = os.path.split(src) if filename == '1.gif': self.troopkind = LEGIONNAIR else: self.troopkind = None def _parse_nbuild(self, attrs): src = attrs['src'] _, filename = os.path.split(src) assert filename.startswith('r') assert filename.endswith('.gif') filename = filename[1:-4] kind, level = map(int, filename.split('_')) self.fields[kind][self.nbuild] = level def handle_data(self, data): if self.troopkind is not None: try: num = int(data) except ValueError: return else: self.troops[self.troopkind] = int(data) self.troopkind = None class BUILD_parser(sgmllib.SGMLParser): from htmlentitydefs import entitydefs def __init__(self, data): self.needed_kind = None self.current_kind = None self.needed_table = False self.production = [None] * 5 # the element with index 0 is simply ignored self.available = [None] * 5 self.needed = [None] * 5 sgmllib.SGMLParser.__init__(self) self.feed(data) def calc_maxtime(self, needed=None): if needed is None: needed = self.needed maxtime = 0 for i in range(1, 5): if needed[i] is None: continue missing = needed[i] - self.available[i] if missing > 0: time_needed = float(missing)/self.production[i] maxtime = max(maxtime, time_needed) return int(maxtime * 3600) # hours to seconds def start_td(self, attr): attrs = dict(attr) id = attrs.get('id', None) if id in ('l1', 'l2', 'l3', 'l4'): kind = int(id[-1]) self.production[kind] = int(attrs['title']) self.current_kind = kind def end_td(self): self.current_kind = None def start_table(self, attr): attrs = dict(attr) if attrs.get('class', None) == 'f10': self.needed_table = True def end_table(self): self.needed_kind = None self.needed_table = False def start_img(self, attr): if not self.needed_table: return attrs = dict(attr) src = attrs['src'] _, filename = os.path.split(src) assert filename.endswith('.gif') filename = filename[:-4] if filename in ('1', '2', '3', '4'): self.needed_kind = int(filename) def handle_data(self, data): if self.current_kind is not None: avail, maximum = map(int, data.split('/')) self.available[self.current_kind] = avail elif self.needed_kind is not None: data = data.replace('|', '').strip() try: self.needed[self.needed_kind] = int(data) except ValueError: self.needed[self.needed_kind] = 0 # ?? self.needed_kind = None class Bot: def __init__(self): log('Bot starting...') log('Username: %s' % USERNAME) log('Server: %s' % LOGIN) self.br = mechanize.Browser() self._login = False self.target_idx = 0 def login(self): if self._login: return self.br.open(LOGIN) self.br.select_form(name='snd') for control in self.br.form.controls: # XXX: it's a bit weak as a control, but it works (at least for now) if hasattr(control, 'attrs'): if control.attrs.get('maxlength', None) == '15': control._value = USERNAME elif control.attrs.get('maxlength', None) == '20': control._value = PASSWORD self.br.submit() log('Login') self._login = True self.set_server_time() self.select_town(NEWDID) def set_server_time(self): content = self.br.response().read() match = re.search(r'([0-9:]*)', content) if not match: log('WARNING: cannot find server time') return stringtime = match.group(1) log('Setting server time to %s' % stringtime) hh, mm, ss = map(int, stringtime.split(':')) now = datetime.now() server_time = datetime(now.year, now.month, now.day, hh, mm, ss) ServerTime.set_server_time(server_time) def select_town(self, newdid): self.br.open(FIELDS + '?newdid=' + newdid) def select_and_send_army(self): strategy = self.load_strategy() TARGETS = strategy['TARGETS'] if self.target_idx >= len(TARGETS): self.target_idx = 0 name, x, y, wait = TARGETS[self.target_idx] self.target_idx += 1 if self.target_idx >= len(TARGETS): self.target_idx = 0 self.send_army(x, y, 5, SendArmyType.RAID, name) return wait * 2 # back and forth def send_army(self, newdid, x, y, army, type, name=''): self.login() self.select_town(newdid) self.br.open(SEND_ARMY) self.br.select_form(nr=0) self.br['x'] = str(x) self.br['y'] = str(y) for key, value in army.troops.iteritems(): self.br['t%d' % key] = str(value) self.br['c'] = [type] self.br.submit() # ok, now confirm the sending self.br.select_form(nr=0) self.br.submit() log('Army sent to (%d, %d, %s), TYPE=%s' % (x, y, name, type)) def send_trader(self, x, y, wood, clay, iron, crop): self.login() self.br.open(SEND_TRADER) self.br.select_form(nr=0) self.br['x'] = str(x) self.br['y'] = str(y) self.br['r1'] = str(wood) self.br['r2'] = str(clay) self.br['r3'] = str(iron) self.br['r4'] = str(crop) self.br.submit() # ok, now confirm the sending self.br.select_form(nr=0) self.br.submit() log('Traders sent to (%d, %d) --> (wood=%d, clay=%d, iron=%d, crop=%d)' % (x, y, wood, clay, iron, crop)) def build_settler(self, residence_id): # check if we have enough resources self.login() self.br.open(BUILD % residence_id) parser = BUILD_parser(self.br.response().read()) maxtime = parser.calc_maxtime(SETTLER_COST) if maxtime == 0: # we are ready... improve it! log('Building settler...') self.br.select_form(nr=0) self.br['t10'] = '1' self.br.submit() return None else: log('Cannot build settler now. Time needed: %s' % format_timestamp(maxtime)) raise CannotDoTask(wait=timedelta(seconds=maxtime)) def improve_build(self, id): self.login() self.br.open(FIELDS) self.br.open(BUILD % id) try: link = self.br.follow_link(text_regex = re.compile('Amplia al livello')) log('Improving build number %d' % id) return None except mechanize.LinkNotFoundError: parser = BUILD_parser(self.br.response().read()) maxtime = parser.calc_maxtime() if maxtime == 0: # we have resources, but we can't improve because we are improving another build # wait for 10 minutes maxtime = 10*60 log('Cannot improve build number %d now. Time needed: %s' % (id, format_timestamp(maxtime))) raise CannotDoTask(wait=timedelta(seconds=maxtime)) def improve_build_in_town(self, id, newdid): self.login() self.select_town(newdid) return self.improve_build(id) def improve_field(self, kind): self.login() self.br.open(FIELDS) parser = FIELDS_parser(self.br.response().read()) levels = parser.fields[kind] min_level = min(levels.values()) for nbuild, level in levels.iteritems(): if level == min_level: break return self.improve_build(nbuild) def improve_greedy(self): # improve the first field it can maxtimes = [] for kind in range(1, 5): try: self.improve_field(kind) return None # don't try to improve other fields except CannotDoTask, e: maxtimes.append(e.wait) return min(maxtimes) def improve_greedy_or_build(self, newdid): self.login() self.select_town(newdid) strategy = self.load_strategy() builds = strategy['BUILDS'] if builds: build_task = builds[0] try: build_task.do(self) strategy['BUILDS'] = builds[1:] self.save_strategy(strategy) return None except CannotDoTask, e: return e.wait else: # fall back to greedy return self.improve_greedy() def load_strategy(self): glob = { 'Build': Build, 'Settler': Settler } code = file('strategy.py').read() try: strategy = eval(code, glob) except Exception, e: log('WARNING: Error while evaluating strategy.py: %s' % e) strategy = {'BUILDS': [], 'TARGETS': []} return strategy def save_strategy(self, strategy): from pprint import pprint f = file('strategy.py', 'w') pprint(strategy, f) f.close() class AbstractBuildTask: def do(self, bot): raise NotImplementedError class Build(AbstractBuildTask): def __init__(self, build_id): self.build_id = build_id def do(self, bot): return bot.improve_build(self.build_id) def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.build_id) class Settler(AbstractBuildTask): def do(self, bot): return bot.build_settler(RESIDENCE_ID) def __repr__(self): return '%s()' % self.__class__.__name__ def main(testing, second): try: t = Bot() sched = Scheduler() t.login() # to set the server time ## sched.schedule(OneShotTask('camelot', t.send_army, CAMELOT, 56, 6, ## Army(imperian=Army.ALL), ## SendArmyType.RAID), ## ServerTime.abstime(11, 28, 51)) ## sched.schedule(OneShotTask('aquae', t.send_army, AQUAE_SULIS, 56, 6, ## Army(eq_caesaris=Army.ALL, senator=1), ## SendArmyType.RAID), ## ServerTime.abstime(10, 41, 20)) ## sched.schedule(OneShotTask('camelot', t.send_army, VERULAMIUM, 56, 6, ## Army(imperian=Army.ALL, eq_imperatoris=Army.ALL, senator=1), ## SendArmyType.RAID), ## ServerTime.abstime(11, 07, 11)) sched.schedule(RepetitiveTask('improve', t.improve_greedy_or_build, NEWDID, max_wait=timedelta(minutes=10), default_wait=timedelta(minutes=10)), ServerTime.now()) ## sched.schedule(RepetitiveTask('accademia', ## t.improve_build_in_town, 30, AQUAE_SULIS, ## max_wait=timedelta(minutes=10), ## default_wait=timedelta(minutes=10)), ## ServerTime.now()) sched.schedule(RepetitiveTask('residence', t.improve_build_in_town, 25, GLEVUM, max_wait=timedelta(minutes=10), default_wait=timedelta(minutes=10)), ServerTime.now()) ## sched.schedule(OneShotTask('magazzino', ## t.improve_build_in_town, 28, CORINIUM), ## ServerTime.now()) sched.schedule(RepetitiveTask('residence', t.improve_build_in_town, 25, ISCA, max_wait=timedelta(minutes=10), default_wait=timedelta(minutes=10)), ServerTime.now()) sched.schedule(RepetitiveTask('centro', t.improve_build_in_town, 26, LONDINIUM, max_wait=timedelta(minutes=10), default_wait=timedelta(minutes=10)), ServerTime.now()) sched.run() except KeyboardInterrupt: return 0 except Exception, e: if testing: raise log('WARNING! Exception caught') log(' %s' % e) return 1 return 0 if __name__ == '__main__': import sys testing = False second = False # default village -- Camelot NEWDID = '2435' OTHER_X = 54 OTHER_Y = 1 testing = True try: arg = sys.argv[1] except IndexError: arg = None if arg == '--test': testing = True print 'Testing enabled, not catching exceptions' elif arg == '--2': print 'Targeting Mediobogdum' NEWDID = '11709' second = True elif arg == '--3': print 'Targeting Verulamium' NEWDID = '13693' second = True elif arg == '--4': print 'Targeting Aquae Sulis' NEWDID = '15941' second = True elif arg == '--5': print 'Targeting Avalon' NEWDID = '17766' second = True elif arg == '--6': print 'Targeting Glevum' NEWDID = GLEVUM second = True elif arg == '--7': print 'Targeting Corinium' NEWDID = CORINIUM second = True sys.exit(main(testing, second))