[pypy-svn] r39962 - in pypy/dist/pypy/lib: . app_test

afayolle at codespeak.net afayolle at codespeak.net
Mon Mar 5 17:16:15 CET 2007


Author: afayolle
Date: Mon Mar  5 17:16:13 2007
New Revision: 39962

Added:
   pypy/dist/pypy/lib/aop.py   (contents, props changed)
   pypy/dist/pypy/lib/app_test/sample_aop_code.py   (contents, props changed)
   pypy/dist/pypy/lib/app_test/test_aop.py   (contents, props changed)
Log:
AOP in Python: first useful test passing

Added: pypy/dist/pypy/lib/aop.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/aop.py	Mon Mar  5 17:16:13 2007
@@ -0,0 +1,314 @@
+"""tentative API for AOP in python.
+
+heavily influenced by Aspect++"""
+
+
+###########################
+# API
+###########################
+import parser
+
+# advices
+# -------
+
+
+class Debug(parser.ASTVisitor):
+    def __init__(self):
+        self.offset = 0
+    def default(self, node):
+        print ' '*self.offset+str(node)
+        self.offset += 2
+        for child in node.getChildNodes():
+            child.accept(self)
+        self.offset -= 2
+        return node
+
+DEBUGGER= Debug()
+
+class Advice(parser.ASTVisitor):
+    requires_dynamic_pointcut=True
+    def __init__(self, pointcut):
+        if self.requires_dynamic_pointcut != pointcut.isdynamic:
+            raise TypeError('Expecting a static pointcut')
+        self.pointcut = pointcut
+        
+    def __call__(self, function):
+        print 'wrapping advice %s on %s' % (self.pointcut, function.__name__)
+        self.function = function
+        return self
+
+##     def decorated(self,*args, **kwargs):
+##         print 'calling aspectized function %s (with %s)' % (self.function.__name__, self.pointcut)
+##         return self.function(*args, **kwargs)
+
+    def weave(self, ast, enc):
+        return ast.accept(self)
+
+    def visitFunction(self, node):
+        if self.pointcut.match(node):
+            self.weave_at_pointcut(node,
+                                   self.pointcut.joinpoint(node))
+        return node
+
+    def vistClass(self, node):
+        if self.pointcut.match(node):
+            print "found match", node.name
+        return node
+        
+
+def make_aop_call(id):
+    """return an AST for a call to a woven function
+    id is the integer returned when the advice was stored in the registry"""
+    p = parser
+    return p.ASTDiscard(p.ASTCallFunc(p.ASTName('__aop__'),
+                                      [p.ASTConst(id),], # arguments
+                                      None, # *args
+                                      None # *kwargs
+                                      )
+                        )
+                     
+
+def is_aop_call(node):
+    p = parser
+    return node.__class__ == p.ASTDiscard and \
+           node.expr.__class__ == p.ASTCallFunc and \
+           node.expr.node.varname == '__aop__'
+
+class around(Advice):
+    """specify code to be run instead of the pointcut"""
+    def weave_at_pointcut(self, node, tjp):
+        print "WEAVE around!!!"
+        pass # XXX WRITEME
+
+class before(Advice):
+    """specify code to be run before the pointcut"""
+    def weave_at_pointcut(self, node, tjp):
+        print "WEAVE before!!!"
+        id = __aop__.register_joinpoint(self.function, tjp)
+        statement_list = node.code.nodes
+        for idx, stmt in enumerate(statement_list):
+            if is_aop_call(stmt):
+                continue
+            else:
+                break
+        statement_list.insert(idx, make_aop_call(id))
+        node.code.nodes = statement_list
+        
+            
+
+class after(Advice):
+    """specify code to be run after the pointcut"""
+    def weave_at_pointcut(self, node, tjp):
+        print "WEAVE after!!!"
+        pass # XXX WRITEME
+
+class introduce(Advice):
+    """insert new code in the pointcut
+    this is the only advice available on static point cuts"""
+    requires_dynamic_pointcut=False
+    def weave_at_pointcut(self, node, tjp):
+        print "WEAVE introduce!!!"
+        pass # XXX WRITEME
+
+    
+        
+# TODO: add new base classes to a pointcut. Maybe with introduce ?
+
+# JoinPoint
+# --------
+
+class JoinPoint:
+    # API for use within advices
+    def signature(self):
+        """return: string representation of the signature of the joint
+        point"""
+        return self.signature
+
+    def that(self):
+        """return: a reference on the object initiating the call, or
+        None if it is a static method or a global function"""
+        return self.that
+    
+    def target(self):
+        """return: reference the object that is the target of a call
+        or None if it is a static method or a global function"""
+        return self.target
+
+    def result(self):
+        """return: reference on result value or None"""
+        return self.result
+
+    def arguments(self):
+        """return: the (args, kwargs) of the join point"""
+        return self.arguments
+
+    def proceed(self, *args, **kwargs):
+        """execute the original code in an around advice"""
+        self.result = self.func(*args, **kwargs)
+
+    def action(self):
+        """return: the runtime action object containing the execution
+        environment to execute XXX"""
+        pass
+
+    def __init__(self, signature=None, that=None, target=None, result=None, arguments=None, func=None):
+        self.signature = signature
+        self.that = that
+        self.target = target
+        self.result = result
+        self.arguments = arguments
+        self.func = func
+    
+
+# PointCut
+# --------
+
+class PointCut:
+    """a collection of Join Points."""
+    # maybe not managed as a real collection.
+    
+    # API for use in declarative code
+    def __init__(self, pointcut):
+        """if pointcut is a string:
+               * matches a substring without . or ,
+               ** matches any substring
+               pointcut looks like object_name_or_pattern[([argname_or_pattern[, ...]])]
+               the pointcut is static
+           else, pointcut must be a pointcut instance"""
+        if type(pointcut) == str:
+            self.pointcutdef = pointcut
+        elif isinstance(pointcut, PointCut):
+            self.pointcutdef = pointcut.pointcutdef # XXX FIXME
+        else:
+            raise TypeError(type(pointcut))
+        self.isdynamic = False
+
+    def __and__(self, other):
+        """return: new pointcut, intersection of the join points in the self and other"""
+        pass
+    def __or__(self, other):
+        """return: new pointcut, union of the join points in the self and other"""
+        pass
+    def __not__(self):
+        """return: new pointcut, exclusion of the join points in self"""
+        pass
+
+    def call(self):
+        """return a dynamic pointcut representing places where the pointcut is called"""
+        # XXX may be difficult to implement ?
+        self.isdynamic = True
+        #raise NotImplementedError('call')
+        return self
+
+    def execution(self):
+        """return a dynamic pointcut representing places where the pointcut is executed"""
+        self.isdynamic = True
+        return self
+
+    def initialization(self):
+        """return a dynamic pointcut representing places where the pointcut is instantiated"""
+        self.isdynamic = True
+        #raise NotImplementedError('initialization')
+        return self
+    
+    def destruction(self):
+        """return a dynamic pointcut representing places where the pointcut is destroyed"""
+        self.isdynamic = True
+        #raise NotImplementedError('destruction')
+
+        return self
+
+    def match(self, astnode):
+        # FIXME !!! :-)
+        try:
+            return astnode.name == self.pointcutdef
+        except AttributeError:
+            return False
+
+    def joinpoint(self, node):
+        """returns a join point instance for the node"""
+        assert self.match(node)
+        return JoinPoint()
+
+### make these class methods of PointCut ?
+def within(pointcutstring):
+    """return point cut filtering  joinpoints on lexical scope"""
+    pass
+
+def base(pointcutstring):
+    """return class pointcuts based on the class hierarchy"""
+    pass
+def derived(pointcutstring):
+    """return class pointcuts based on the class hierarchy"""
+    pass
+
+def that(typepattern):
+    pass
+def target(typepattern):
+    pass
+def result(typepattern):
+    pass
+def args(typepattern):
+    pass
+
+
+class Weaver:
+    """The weaver is responsible for weaving the Aspects in the code
+    using the compiler_hook. The woven modules will generally use the
+    __aop__ builtin instance of this class to run the advices"""
+    def __init__(self):
+        self.advices = []
+        self.joinpoints = {}
+        self._id = 1
+        parser.install_compiler_hook(self.weave)
+
+    def register_advice(self, aspect, advice):
+        self.advices.append((aspect, advice))
+
+    def weave(self, ast, enc):
+        for aspect, advice in self.advices:
+            self._curr_aspect = aspect
+            ast = advice.weave(ast, enc)
+        self._curr_aspect = None
+        return ast
+
+    def _next_id(self):
+        try:
+            return self._id
+        finally:
+            self._id += 1
+    
+    def register_joinpoint(self, callable, *args):
+        assert self._curr_aspect is not None
+        id = self._next_id()
+        args = (self._curr_aspect,) + args
+        self.joinpoints[id] = callable, args
+        return id
+
+    def __call__(self, id):
+        callable, args = self.joinpoints[id]
+        callable(*args)
+
+import __builtin__
+__builtin__.__aop__ = Weaver()
+del __builtin__
+
+
+
+# Aspect metaclass
+# ----------------
+
+class Aspect(type):
+##     def __init__(cls, name, bases, dct):
+##         super(Aspect, cls).__init__(name, bases, dct)
+
+    def __call__(cls, *args, **kwargs):
+        instance = super(Aspect, cls).__call__(*args, **kwargs)
+        for name, advice in cls.__dict__.iteritems():
+            if isinstance(advice, Advice):
+                print "registering advice %s.%s" % (instance.__class__.__name__, name)
+                __aop__.register_advice(instance, advice)
+
+        return instance
+
+

Added: pypy/dist/pypy/lib/app_test/sample_aop_code.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/app_test/sample_aop_code.py	Mon Mar  5 17:16:13 2007
@@ -0,0 +1,8 @@
+def foo(b,c):
+    a = 2
+    d = bar()
+    return b+a+c+d
+
+
+def bar():
+    return 42

Added: pypy/dist/pypy/lib/app_test/test_aop.py
==============================================================================
--- (empty file)
+++ pypy/dist/pypy/lib/app_test/test_aop.py	Mon Mar  5 17:16:13 2007
@@ -0,0 +1,63 @@
+from pypy.conftest import gettestobjspace
+
+class AppTestAop(object):
+    def setup_class(cls):
+        cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
+
+    def test_init(self):
+        import aop
+
+    def test_static_dynamic_advice_and_pointcut(self):
+        from  aop import PointCut, introduce, before, around, after
+
+        dyn_pc = PointCut('foo').call()
+        assert dyn_pc.isdynamic
+
+        stat_pc = PointCut('bar')
+        assert not stat_pc.isdynamic
+
+        assert not introduce.requires_dynamic_pointcut
+        raises(TypeError, introduce, dyn_pc)
+        adv = introduce(stat_pc)
+        assert adv is not None
+
+        for advice in before, around, after:
+            assert advice.requires_dynamic_pointcut
+            raises(TypeError, advice, stat_pc)
+            adv = advice(dyn_pc)
+            assert adv is not None
+
+    def test_is_aop(self):
+        from aop import is_aop_call
+        import parser
+        func = """
+def f():
+    __aop__(1)
+    g(12)
+    __aop__(12)
+"""
+        funcast = parser.source2ast(func).node.nodes[0]
+        result = [is_aop_call(n) for n in funcast.code.nodes]
+        assert result == [True, False, True]
+
+    def test_simple_aspect(self):
+        from  aop import PointCut, Aspect, before
+        
+        class AspectTest:
+            __metaclass__ = Aspect 
+            def __init__(self):
+                self.executed = False
+            @before(PointCut('foo').execution())
+            def advice_before_excecution(self, tjp):
+                self.executed = True
+
+        assert __aop__.advices == []
+        aspect = AspectTest()
+        assert __aop__.advices == [(aspect, AspectTest.advice_before_excecution)] 
+        assert not aspect.executed
+
+        from app_test import sample_aop_code
+        assert not aspect.executed
+        sample_aop_code.foo(1,2)
+        assert aspect.executed
+


More information about the pypy-svn mailing list