import os.path
import sgmllib
import random
import time
import re
import mechanize
from datetime import timedelta, datetime
from log import log
from scheduler import Scheduler, ServerTime, OneShotTask, RepetitiveTask, CannotDoTask
from settings import SERVER, USERNAME, PASSWORD, MIN_WAIT, MAX_WAIT
from model import Army, SendArmyType
# TODO: set ServerTime.DELTA
WOOD, CLAY, IRON, CROP = range(1, 5)
LOGIN = SERVER + 'login.php'
SEND_ARMY = SERVER + 'a2b.php'
SEND_TRADER = SERVER + 'build.php?id=19' # XXX
FIELDS = SERVER + 'dorf1.php'
BUILD = SERVER + 'build.php?id=%d'
LEGIONNAIR = '1'
SETTLER_COST = [0, 5800, 5300, 7200, 5500]
RESIDENCE_ID = 25 # should go to settings
CAMELOT = '2435'
MEDIOBOGDUM = '11709'
VERULAMIUM = '13693'
AQUAE_SULIS = '15941'
AVALON = '17766'
GLEVUM = '2810'
CORINIUM = '21438'
ISCA = '13114'
LONDINIUM = '14675'
def format_timestamp(totalsecs):
hrs = totalsecs/3600
min = (totalsecs/60) % 60;
sec = totalsecs % 60
return '%02d:%02d:%02d' % (hrs, min, sec)
class FIELDS_parser(sgmllib.SGMLParser):
from htmlentitydefs import entitydefs
def __init__(self, data):
self.nbuild = None
self.fields = {WOOD: {}, CLAY: {}, IRON: {}, CROP: {}}
self.troopkind = None
self.troops = {}
sgmllib.SGMLParser.__init__(self)
self.feed(data)
def start_a(self, attr):
attrs = dict(attr)
href = attrs.get('href', None)
if 'build.php?id=' in href:
_, nbuild = href.split('id=')
self.nbuild = int(nbuild)
def end_a(self):
self.nbuild = None
def start_img(self, attr):
attrs = dict(attr)
if self.nbuild is not None:
self._parse_nbuild(attrs)
# try to see if it's an troop image (only legionnaires so far)
src = attrs.get('src', '')
_, filename = os.path.split(src)
if filename == '1.gif':
self.troopkind = LEGIONNAIR
else:
self.troopkind = None
def _parse_nbuild(self, attrs):
src = attrs['src']
_, filename = os.path.split(src)
assert filename.startswith('r')
assert filename.endswith('.gif')
filename = filename[1:-4]
kind, level = map(int, filename.split('_'))
self.fields[kind][self.nbuild] = level
def handle_data(self, data):
if self.troopkind is not None:
try:
num = int(data)
except ValueError:
return
else:
self.troops[self.troopkind] = int(data)
self.troopkind = None
class BUILD_parser(sgmllib.SGMLParser):
from htmlentitydefs import entitydefs
def __init__(self, data):
self.needed_kind = None
self.current_kind = None
self.needed_table = False
self.production = [None] * 5 # the element with index 0 is simply ignored
self.available = [None] * 5
self.needed = [None] * 5
sgmllib.SGMLParser.__init__(self)
self.feed(data)
def calc_maxtime(self, needed=None):
if needed is None:
needed = self.needed
maxtime = 0
for i in range(1, 5):
if needed[i] is None:
continue
missing = needed[i] - self.available[i]
if missing > 0:
time_needed = float(missing)/self.production[i]
maxtime = max(maxtime, time_needed)
return int(maxtime * 3600) # hours to seconds
def start_td(self, attr):
attrs = dict(attr)
id = attrs.get('id', None)
if id in ('l1', 'l2', 'l3', 'l4'):
kind = int(id[-1])
self.production[kind] = int(attrs['title'])
self.current_kind = kind
def end_td(self):
self.current_kind = None
def start_table(self, attr):
attrs = dict(attr)
if attrs.get('class', None) == 'f10':
self.needed_table = True
def end_table(self):
self.needed_kind = None
self.needed_table = False
def start_img(self, attr):
if not self.needed_table:
return
attrs = dict(attr)
src = attrs['src']
_, filename = os.path.split(src)
assert filename.endswith('.gif')
filename = filename[:-4]
if filename in ('1', '2', '3', '4'):
self.needed_kind = int(filename)
def handle_data(self, data):
if self.current_kind is not None:
avail, maximum = map(int, data.split('/'))
self.available[self.current_kind] = avail
elif self.needed_kind is not None:
data = data.replace('|', '').strip()
try:
self.needed[self.needed_kind] = int(data)
except ValueError:
self.needed[self.needed_kind] = 0 # ??
self.needed_kind = None
class Bot:
def __init__(self):
log('Bot starting...')
log('Username: %s' % USERNAME)
log('Server: %s' % LOGIN)
self.br = mechanize.Browser()
self._login = False
self.target_idx = 0
def login(self):
if self._login:
return
self.br.open(LOGIN)
self.br.select_form(name='snd')
for control in self.br.form.controls:
# XXX: it's a bit weak as a control, but it works (at least for now)
if hasattr(control, 'attrs'):
if control.attrs.get('maxlength', None) == '15':
control._value = USERNAME
elif control.attrs.get('maxlength', None) == '20':
control._value = PASSWORD
self.br.submit()
log('Login')
self._login = True
self.set_server_time()
self.select_town(NEWDID)
def set_server_time(self):
content = self.br.response().read()
match = re.search(r'([0-9:]*)', content)
if not match:
log('WARNING: cannot find server time')
return
stringtime = match.group(1)
log('Setting server time to %s' % stringtime)
hh, mm, ss = map(int, stringtime.split(':'))
now = datetime.now()
server_time = datetime(now.year, now.month, now.day, hh, mm, ss)
ServerTime.set_server_time(server_time)
def select_town(self, newdid):
self.br.open(FIELDS + '?newdid=' + newdid)
def select_and_send_army(self):
strategy = self.load_strategy()
TARGETS = strategy['TARGETS']
if self.target_idx >= len(TARGETS):
self.target_idx = 0
name, x, y, wait = TARGETS[self.target_idx]
self.target_idx += 1
if self.target_idx >= len(TARGETS):
self.target_idx = 0
self.send_army(x, y, 5, SendArmyType.RAID, name)
return wait * 2 # back and forth
def send_army(self, newdid, x, y, army, type, name=''):
self.login()
self.select_town(newdid)
self.br.open(SEND_ARMY)
self.br.select_form(nr=0)
self.br['x'] = str(x)
self.br['y'] = str(y)
for key, value in army.troops.iteritems():
self.br['t%d' % key] = str(value)
self.br['c'] = [type]
self.br.submit()
# ok, now confirm the sending
self.br.select_form(nr=0)
self.br.submit()
log('Army sent to (%d, %d, %s), TYPE=%s' % (x, y, name, type))
def send_trader(self, x, y, wood, clay, iron, crop):
self.login()
self.br.open(SEND_TRADER)
self.br.select_form(nr=0)
self.br['x'] = str(x)
self.br['y'] = str(y)
self.br['r1'] = str(wood)
self.br['r2'] = str(clay)
self.br['r3'] = str(iron)
self.br['r4'] = str(crop)
self.br.submit()
# ok, now confirm the sending
self.br.select_form(nr=0)
self.br.submit()
log('Traders sent to (%d, %d) --> (wood=%d, clay=%d, iron=%d, crop=%d)' % (x, y, wood, clay, iron, crop))
def build_settler(self, residence_id):
# check if we have enough resources
self.login()
self.br.open(BUILD % residence_id)
parser = BUILD_parser(self.br.response().read())
maxtime = parser.calc_maxtime(SETTLER_COST)
if maxtime == 0:
# we are ready... improve it!
log('Building settler...')
self.br.select_form(nr=0)
self.br['t10'] = '1'
self.br.submit()
return None
else:
log('Cannot build settler now. Time needed: %s' % format_timestamp(maxtime))
raise CannotDoTask(wait=timedelta(seconds=maxtime))
def improve_build(self, id):
self.login()
self.br.open(FIELDS)
self.br.open(BUILD % id)
try:
link = self.br.follow_link(text_regex = re.compile('Amplia al livello'))
log('Improving build number %d' % id)
return None
except mechanize.LinkNotFoundError:
parser = BUILD_parser(self.br.response().read())
maxtime = parser.calc_maxtime()
if maxtime == 0:
# we have resources, but we can't improve because we are improving another build
# wait for 10 minutes
maxtime = 10*60
log('Cannot improve build number %d now. Time needed: %s' % (id, format_timestamp(maxtime)))
raise CannotDoTask(wait=timedelta(seconds=maxtime))
def improve_build_in_town(self, id, newdid):
self.login()
self.select_town(newdid)
return self.improve_build(id)
def improve_field(self, kind):
self.login()
self.br.open(FIELDS)
parser = FIELDS_parser(self.br.response().read())
levels = parser.fields[kind]
min_level = min(levels.values())
for nbuild, level in levels.iteritems():
if level == min_level:
break
return self.improve_build(nbuild)
def improve_greedy(self):
# improve the first field it can
maxtimes = []
for kind in range(1, 5):
try:
self.improve_field(kind)
return None # don't try to improve other fields
except CannotDoTask, e:
maxtimes.append(e.wait)
return min(maxtimes)
def improve_greedy_or_build(self, newdid):
self.login()
self.select_town(newdid)
strategy = self.load_strategy()
builds = strategy['BUILDS']
if builds:
build_task = builds[0]
try:
build_task.do(self)
strategy['BUILDS'] = builds[1:]
self.save_strategy(strategy)
return None
except CannotDoTask, e:
return e.wait
else:
# fall back to greedy
return self.improve_greedy()
def load_strategy(self):
glob = {
'Build': Build,
'Settler': Settler
}
code = file('strategy.py').read()
try:
strategy = eval(code, glob)
except Exception, e:
log('WARNING: Error while evaluating strategy.py: %s' % e)
strategy = {'BUILDS': [], 'TARGETS': []}
return strategy
def save_strategy(self, strategy):
from pprint import pprint
f = file('strategy.py', 'w')
pprint(strategy, f)
f.close()
class AbstractBuildTask:
def do(self, bot):
raise NotImplementedError
class Build(AbstractBuildTask):
def __init__(self, build_id):
self.build_id = build_id
def do(self, bot):
return bot.improve_build(self.build_id)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.build_id)
class Settler(AbstractBuildTask):
def do(self, bot):
return bot.build_settler(RESIDENCE_ID)
def __repr__(self):
return '%s()' % self.__class__.__name__
def main(testing, second):
try:
t = Bot()
sched = Scheduler()
t.login() # to set the server time
## sched.schedule(OneShotTask('camelot', t.send_army, CAMELOT, 56, 6,
## Army(imperian=Army.ALL),
## SendArmyType.RAID),
## ServerTime.abstime(11, 28, 51))
## sched.schedule(OneShotTask('aquae', t.send_army, AQUAE_SULIS, 56, 6,
## Army(eq_caesaris=Army.ALL, senator=1),
## SendArmyType.RAID),
## ServerTime.abstime(10, 41, 20))
## sched.schedule(OneShotTask('camelot', t.send_army, VERULAMIUM, 56, 6,
## Army(imperian=Army.ALL, eq_imperatoris=Army.ALL, senator=1),
## SendArmyType.RAID),
## ServerTime.abstime(11, 07, 11))
sched.schedule(RepetitiveTask('improve',
t.improve_greedy_or_build, NEWDID,
max_wait=timedelta(minutes=10),
default_wait=timedelta(minutes=10)),
ServerTime.now())
## sched.schedule(RepetitiveTask('accademia',
## t.improve_build_in_town, 30, AQUAE_SULIS,
## max_wait=timedelta(minutes=10),
## default_wait=timedelta(minutes=10)),
## ServerTime.now())
sched.schedule(RepetitiveTask('residence',
t.improve_build_in_town, 25, GLEVUM,
max_wait=timedelta(minutes=10),
default_wait=timedelta(minutes=10)),
ServerTime.now())
## sched.schedule(OneShotTask('magazzino',
## t.improve_build_in_town, 28, CORINIUM),
## ServerTime.now())
sched.schedule(RepetitiveTask('residence',
t.improve_build_in_town, 25, ISCA,
max_wait=timedelta(minutes=10),
default_wait=timedelta(minutes=10)),
ServerTime.now())
sched.schedule(RepetitiveTask('centro',
t.improve_build_in_town, 26, LONDINIUM,
max_wait=timedelta(minutes=10),
default_wait=timedelta(minutes=10)),
ServerTime.now())
sched.run()
except KeyboardInterrupt:
return 0
except Exception, e:
if testing:
raise
log('WARNING! Exception caught')
log(' %s' % e)
return 1
return 0
if __name__ == '__main__':
import sys
testing = False
second = False
# default village -- Camelot
NEWDID = '2435'
OTHER_X = 54
OTHER_Y = 1
testing = True
try:
arg = sys.argv[1]
except IndexError:
arg = None
if arg == '--test':
testing = True
print 'Testing enabled, not catching exceptions'
elif arg == '--2':
print 'Targeting Mediobogdum'
NEWDID = '11709'
second = True
elif arg == '--3':
print 'Targeting Verulamium'
NEWDID = '13693'
second = True
elif arg == '--4':
print 'Targeting Aquae Sulis'
NEWDID = '15941'
second = True
elif arg == '--5':
print 'Targeting Avalon'
NEWDID = '17766'
second = True
elif arg == '--6':
print 'Targeting Glevum'
NEWDID = GLEVUM
second = True
elif arg == '--7':
print 'Targeting Corinium'
NEWDID = CORINIUM
second = True
sys.exit(main(testing, second))