import sys, os, tempfile, atexit, cStringIO class Concept(object): model = None def __init__(self): self.eq = Operation(self, self) def __sub__(self, functor): if not isinstance(functor, Functor): raise TypeError, 'cannot sub Concept and %s' % type(functor).__name__ base, functors = self.functorchain() if functor.cod is not base: raise TypeError, 'functor subtraction' return Subconcept((functor,) + functors) def functorchain(self): return self, () def getbase(self): return self def getabstract(self): return self def __eq__(self, other): if not isinstance(other, Concept): return False base1, functors1 = self.functorchain() base2, functors2 = other.functorchain() return base1 is base2 and functors1 == functors2 def __ne__(self, other): return not (self == other) def __hash__(self): base, functors = self.functorchain() return id(base) ^ hash(functors) def __call__(self, obj): base, functors = self.functorchain() if isinstance(obj, Abstract): if base is not obj.abstract: raise TypeError, "abstract mismatch" base = obj.base functors = obj.functors + functors return Abstract(base, functors, obj) def __repr__(self): if self.model is not None: return self.model._repr_attribute(self) else: return super(Concept, self).__repr__() class Subconcept(Concept): def __init__(self, functors): self.functors = functors def functorchain(self): return self.functors[0].dom, self.functors def getbase(self): return self.functors[0].dom def getabstract(self): return self.functors[-1].cod def __repr__(self): for f in self.functors: if f.model is not None: model = f.model break else: return super(Subconcept, self).__repr__() l = [model._name_attribute(f) for f in self.functors] return '' % (model.__name__, ' '.join(l)) class Functor(object): model = None def __init__(self, dom, cod): self.dom = dom self.cod = cod def __repr__(self): if self.model is not None: return self.model._repr_attribute(self) else: return super(Functor, self).__repr__() class Operation(object): def __init__(self, *args): self.args = args self.impls = {} def define(self, f, *types): if len(self.args) != len(types): raise TypeError, "expected %d types" % len(self.args) for a, t in zip(self.args, types): if t.getabstract() is not a: raise TypeError, "wrong argument type" self.impls[types] = f def __call__(self, *args): if len(self.args)-1 != len(args): raise TypeError, "expected %d args" % (len(self.args)-1) for a, t in zip(args, self.args): if a.abstract != t: raise TypeError, "got %r, expected %r" % (a.abstract, t) for types, f in self.impls.items(): clists = [] for a, t in zip(args, types): base, functors = t.functorchain() clist = a.coerce_to(functors) if clist is None: break clists.append(clist) else: def cname(f): return getattr(f, '__name__', `f`) converted = [] for a, clist in zip(args, clists): a = a.obj for c in clist: print ' * %s(%.80r)' % (cname(c), a) a = c(a) converted.append(a) print '** %s(%s)' % (cname(f), ', '.join(['%.80r' % a for a in converted])) a = f(*converted) return types[-1](a) raise TypeError, "cannot find a way to call this" class Abstract(object): def __init__(self, base, functors, obj): self.base = base self.functors = functors if functors: self.abstract = functors[-1].cod else: self.abstract = base self.obj = obj def is_concrete(self): return not self.functors def more_concrete(self): return Abstract(self.base, self.functors[:-1], self.obj) def coercions(self): """Enumerates (target_functors, conversion_function).""" if not self.is_concrete(): lastf = self.functors[-1] for f, c in self.more_concrete().coercions(): yield f + (lastf,), c for (f_in, f_out), c in self.abstract.eq.impls.items(): base, functors = f_in.functorchain() if functors == self.functors: base, functors = f_out.functorchain() yield functors, c def rec_coercions(self): """Enumerates (target_functors, list_of_conversion_functions).""" pending = [] def reg(functors, clist, seen={}): if functors not in seen: seen[functors] = True pending.append((functors, clist)) return True else: return False if reg(self.functors, ()): yield self.functors, () for from_functors, clist in pending: if from_functors: from_base = from_functors[0].dom else: from_base = self.base a = Abstract(from_base, from_functors, self.obj) for target_functors, c in a.coercions(): clist2 = clist + (c,) if reg(target_functors, clist2): yield target_functors, clist2 def coerce_to(self, target_functors): target_functors = tuple(target_functors) #print 'target is:', target_functors for functors, clist in self.rec_coercions(): #print 'trying:', functors if functors == target_functors: #print 'ok.' return clist #print 'failed.' return None def __repr__(self): for f in self.functors: if f.model is not None: model = f.model break else: if self.base.model is not None: model = self.base.model else: return super(Subconcept, self).__repr__() if self.functors: l = [model._name_attribute(f) for f in self.functors] return '<%r as %s.%s>' % (self.obj, model.__name__, ' '.join(l)) else: return '<%r as %r>' % (self.obj, self.base) class ModelClass(type): def __init__(self, name, bases, dict): super(ModelClass, self).__init__(name, bases, dict) for value in self.__dict__.values(): if isinstance(value, Functor) or isinstance(value, Concept): value.model = self def _name_attribute(self, attr): for name, value in self.__dict__.items(): if value is attr: return name return '?' def _repr_attribute(self, attr): return '<%s %s.%s>' % (attr.__class__.__name__, self.__name__, self._name_attribute(attr)) class Model(object): __metaclass__ = ModelClass class MImages(Model): c_none = Concept() c_dbid = Concept() c_string = Concept() c_file = Concept() c_jpegfile = Concept() c_ppmfile = Concept() c_image = Concept() pathname = Functor(c_string, c_file) strcontent = Functor(c_string, c_file) dbimage = Functor(c_dbid, c_jpegfile) jpegdata = Functor(c_file, c_jpegfile) ppmdata = Functor(c_file, c_ppmfile) jpegimage = Functor(c_jpegfile, c_image) ppmimage = Functor(c_ppmfile, c_image) show = Operation(c_image, c_none) mul = Operation(c_image, c_image, c_image) ## def show1(f): ## g = os.popen('xv -', 'w') ## while 1: ## data = f.read(4096) ## if not data: break ## g.write(data) ## g.close() ## show.define(show1, c_image-jpegimage-jpegdata, c_none) def show1(src): os.system('xv "%s"' % src) show.define(show1, c_image-jpegimage-jpegdata-pathname, c_none) def mul1(fn1, fn2): return os.popen('./and "%s" "%s"' % (fn1, fn2), 'r') mul.define(mul1, c_image-ppmimage-ppmdata-pathname, c_image-ppmimage-ppmdata-pathname, c_image-ppmimage-ppmdata) def encode_jpg(src): return os.popen('cjpeg "%s"' % src, 'r') c_image.eq.define(encode_jpg, c_image-ppmimage-ppmdata-pathname, c_image-jpegimage-jpegdata) ## def decode_jpg(f): ## stdin, stdout = os.popen2('djpeg') ## def write_stuff(): ## while True: ## buf = f.read(4096) ## if not buf: break ## stdin.write(buf) ## stdin.close() ## thread.start_new_thread(write_stuff, ()) ## return stdout ## c_image.eq.define(decode_jpg, ## c_image-jpegimage-jpegdata, ## c_image-ppmimage-ppmdata) def decode_jpg(src): return os.popen('djpeg "%s"' % src, 'r') c_image.eq.define(decode_jpg, c_image-jpegimage-jpegdata-pathname, c_image-ppmimage-ppmdata) def binary_open(fn): return open(fn, 'rb') c_file.eq.define(binary_open, c_file-pathname, c_file) def make_temp_file(f, hack={}): if f in hack: return hack[f] fd, filename = tempfile.mkstemp() def try_unlink(): try: os.unlink(filename) except: pass atexit.register(try_unlink) g = os.fdopen(fd, 'wb') while True: buf = f.read(4096) if not buf: break g.write(buf) g.close() f.close() hack[f] = filename return filename c_file.eq.define(make_temp_file, c_file, c_file-pathname) c_file.eq.define(cStringIO.StringIO, c_file-strcontent, c_file) def db_load(n): if os.curdir not in sys.path: sys.path.insert(0, os.curdir) cwd = os.getcwd() try: os.chdir('../db') import mytest return mytest.load(n, djpeg=None) finally: os.chdir(cwd) c_jpegfile.eq.define(db_load, c_jpegfile-dbimage, c_jpegfile-jpegdata-strcontent) if __name__ == '__main__': jpg = MImages.c_image-MImages.jpegimage-MImages.jpegdata-MImages.pathname ppm = MImages.c_image-MImages.jpegimage-MImages.jpegdata-MImages.pathname dbi = MImages.c_image-MImages.jpegimage-MImages.dbimage ## f1 = jpg('/tmp/home/holrh007.jpg') ## f2 = jpg('/tmp/home/z128.jpg') ## f3 = MImages.mul(f1, f2) ## #MImages.show(f3) ## n1 = dbi(-59551) ## n2 = dbi(-65683) ## n3 = MImages.mul(n1, n2) x1 = jpg('x1.jpg') x2 = jpg('x2.jpg') x3 = MImages.mul(x1, x2)