[pypy-svn] r40870 - in pypy/dist/pypy/lib: . app_test
afayolle at codespeak.net
afayolle at codespeak.net
Wed Mar 21 00:33:32 CET 2007
Author: afayolle
Date: Wed Mar 21 00:33:30 2007
New Revision: 40870
Modified:
pypy/dist/pypy/lib/aop.py
pypy/dist/pypy/lib/app_test/sample_aop_code.py
pypy/dist/pypy/lib/app_test/test_aop.py
Log:
added compound point cuts and introduce advice
Modified: pypy/dist/pypy/lib/aop.py
==============================================================================
--- pypy/dist/pypy/lib/aop.py (original)
+++ pypy/dist/pypy/lib/aop.py Wed Mar 21 00:33:30 2007
@@ -2,6 +2,7 @@
heavily influenced by Aspect++"""
+
__all__ = ('around', 'before', 'after', 'introduce', 'PointCut', 'Aspect')
###########################
# API
@@ -49,8 +50,13 @@
PointCut: self.weave_at_static,
}
self.weave_at = dispatch[pointcut.__class__]
+ self.woven_code = None
+ def __repr__(self):
+ return '<%s: %s at %s>' % (self.__class__.__name__,
+ self.woven_code,
+ self.pointcut)
def __call__(self, function):
debug('wrapping advice %s on %s', self.pointcut, function.__name__)
@@ -58,7 +64,6 @@
return self
def weave(self, ast, enc, modulename):
- debug("modulename = %s", modulename)
self.curr_encoding = enc
if self.pointcut.match_module(modulename):
return ast.mutate(self)
@@ -68,14 +73,9 @@
def default(self, node):
if self.pointcut.match(node):
node = self.weave_at(node,
- self.pointcut.joinpoint(node))
+ self.pointcut.joinpoint(node))
return node
-## def visitClass(self, node):
-## if self.pointcut.match(node):
-## print("found match", node.name)
-## return node
-
def weave_at_execution(self, node, tjp):
raise NotImplementedError("abstract method")
@@ -234,7 +234,37 @@
requires_dynamic_pointcut=False
def weave_at_static(self, node, tjp):
debug("WEAVE introduce!!!")
- pass # XXX WRITEME
+ p = parser
+ id = __aop__.register_joinpoint(self.woven_code, tjp)
+ if node.code.__class__ == p.ASTPass:
+ node.code = p.ASTStmt([])
+
+ methods = node.code.nodes
+
+ newmethod = p.ASTFunction(None,
+ self.woven_code.func_name,
+ [p.ASTAssName('self', 0),
+ p.ASTAssName('args', 0),
+ ],
+ [],
+ p.CO_VARARGS,
+ self.woven_code.func_doc,
+ p.ASTStmt([p.ASTPrintnl([p.ASTName('self')], None),
+ p.ASTPrintnl([p.ASTName('args')], None),
+ p.ASTReturn(p.ASTCallFunc(p.ASTGetattr(p.ASTName('__aop__'), 'call_introduced'),
+ [p.ASTConst(id),
+ p.ASTAdd(p.ASTTuple([p.ASTName('self')]), p.ASTName('args')),
+ ], None, None)
+ )
+ ]
+ ),
+ node.lineno
+ )
+
+
+
+ methods.append(newmethod)
+ node.code.nodes = methods
debug('newnode: %s', node)
return node
@@ -242,7 +272,6 @@
# JoinPoint
# --------
-
class JoinPoint:
# API for use within advices
def signature(self):
@@ -250,6 +279,9 @@
point"""
return self._signature
+ def name(self):
+ return self._name
+
def that(self):
"""return: a reference on the object initiating the call, or
None if it is a static method or a global function"""
@@ -287,6 +319,7 @@
self._arguments = arguments
self._argnames = None
self.func = func
+ self._name = None
# PointCut
@@ -307,7 +340,7 @@
The created point cut is static.
The pointcut argument can also be a pointcut instance"""
- info('%r %r %r %s', module, klass, func, pointcut)
+ ##debug('%s %s %s %s', module, klass, func, pointcut)
if pointcut is None:
self.func_re = re.compile(func)
self.module_re = re.compile(module)
@@ -316,20 +349,28 @@
self.func_re = pointcut.func_re
self.module_re = pointcut.module_re
self.class_re = pointcut.class_re
+ ##debug('*** %s %s %s', self.func_re, self.module_re, self.class_re)
else:
raise TypeError(type(pointcut))
self.isdynamic = False
+ def __repr__(self):
+ return '<%s on %s:%s:%s>' % (self.__class__.__name__,
+ self.module_re.pattern,
+ self.class_re.pattern,
+ self.func_re.pattern)
+
def __and__(self, other):
"""return: new pointcut, intersection of the join points in the self and other"""
- if other.__class__ != self.__class__:
+ if not isinstance(other, PointCut):
raise TypeError(other.__class__.__name__)
- pass
+ return _AndPC(self, other)
+
def __or__(self, other):
"""return: new pointcut, union of the join points in the self and other"""
- if other.__class__ != self.__class__:
+ if not isinstance(other, PointCut):
raise TypeError(other.__class__.__name__)
- pass
+ return _OrPC(self, other)
## def __not__(self):
## """return: new pointcut, exclusion of the join points in self"""
@@ -358,16 +399,64 @@
def match_module(self, modulename):
return self.module_re.match(modulename)
- def match(self, astnode):
+ def find_classname(self, node):
+ while node is not None:
+ node = node.parent
+ if isinstance(node, parser.ASTClass):
+ return node.name
+ return ''
+
+ def match(self, node):
"a static point cut only matches classes: the function part is not used"
assert self.func_re.pattern == '.*'
- return self.class_re.match(astnode.name)
+ return isinstance(node, parser.ASTClass) and \
+ self.class_re.match(node.name)
def joinpoint(self, node):
"""returns a join point instance for the node"""
# assert self.match(node)
return JoinPoint()
+class _CompoundPC(PointCut):
+ def __init__(self, pc1, pc2):
+ self.pc1 = pc1
+ self.pc2 = pc2
+
+ def __repr__(self):
+ return "<%s(%r, %r)>" % (self.__class__.__name__, self.pc1, self.pc2)
+ def call(self):
+ """return a dynamic pointcut representing places where the pointcut is called"""
+ return self.__class__(self.pc1.call(), self.pc2.call())
+
+ def execution(self):
+ """return a dynamic pointcut representing places where the pointcut is executed"""
+ return self.__class__(self.pc1.execution(), self.pc2.execution())
+
+ def initialization(self):
+ """return a dynamic pointcut representing places where the pointcut is instantiated"""
+ return self.__class__(self.pc1.initialization(), self.pc2.initialization())
+
+ def destruction(self):
+ """return a dynamic pointcut representing places where the pointcut is destroyed"""
+ return self.__class__(self.pc1.destruction(), self.pc2.destruction())
+
+class _AndPC(_CompoundPC):
+ def match_module(self, modulename):
+ return self.pc1.match_module(modulename) and self.pc2.match_module(modulename)
+
+ def match(self, node):
+ return self.pc1.match(node) and self.pc2.match(node)
+
+class _OrPC(_CompoundPC):
+ def __init__(self, pc1, pc2):
+ self.pc1 = pc1
+ self.pc2 = pc2
+
+ def match_module(self, modulename):
+ return self.pc1.match_module(modulename) or self.pc2.match_module(modulename)
+
+ def match(self, node):
+ return self.pc1.match(node) or self.pc2.match(node)
class AbstractDynamicPointCut(PointCut):
def __init__(self, pointcut):
@@ -393,15 +482,18 @@
matching func_re, defined within a class matching class_re written
in a module matching module_re"""
- def match(self, astnode):
- # FIXME: class_re
- return isinstance(astnode, parser.ASTFunction) and \
- self.func_re.match(astnode.name)
+ def match(self, node):
+ if not isinstance(node, parser.ASTFunction):
+ return False
+ classname = self.find_classname(node)
+ return self.class_re.match(classname) and \
+ self.func_re.match(node.name)
def joinpoint(self, node):
"""returns a join point instance for the node"""
# assert self.match(node)
jp = JoinPoint()
+ jp._name = node.name
jp._flags = node.flags
jp._argnames = [a.name for a in node.argnames]
jp._defaultargvalues = [d.value for d in node.defaults]
@@ -414,9 +506,12 @@
module matching module_re"""
def match(self, node):
- # FIXME: class_re
+ if not isinstance(node, parser.ASTCallFunc):
+ return False
+ classname = self.find_classname(node)
return isinstance(node, parser.ASTCallFunc) and \
isinstance(node.node, parser.ASTName) and \
+ self.class_re.match(classname) and \
self.func_re.match(node.node.varname)
@@ -426,7 +521,7 @@
method in a class matching class_re in a module matching module_re"""
def __init__(self, pointcut):
ExecutionPointCut.__init__(self, pointcut=pointcut)
- self.func_re='^__del__$'
+ self.func_re = re.compile('^__del__$')
### XXX: won't match anything if no __init__ method exists (or only on a parent class)
class InitializationPointCut(ExecutionPointCut):
@@ -435,29 +530,7 @@
module_re"""
def __init__(self, pointcut):
ExecutionPointCut.__init__(self, pointcut=pointcut)
- self.func_re='^__init__$'
-
-
-### make these class methods of PointCut ?
-def within(pointcutstring):
- """return point cut filtering joinpoints on lexical scope"""
- pass
-
-def base(pointcutstring):
- """return class pointcuts based on the class hierarchy"""
- pass
-def derived(pointcutstring):
- """return class pointcuts based on the class hierarchy"""
- pass
-
-def that(typepattern):
- pass
-def target(typepattern):
- pass
-def result(typepattern):
- pass
-def args(typepattern):
- pass
+ self.func_re = re.compile('^__init__$')
class _UndefinedResult:
"""used to denote that the result of a call to a aspectised
@@ -496,8 +569,8 @@
if not self.advices:
return ast
try:
- info('Weaving on %s %s', filename, sys.path)
modulename = self._guessmodule(filename)
+ info('Looking for something to weave on %s', modulename)
for aspect, advice in self.advices:
self._curr_aspect = aspect
ast = advice.weave(ast, enc, modulename)
@@ -539,6 +612,13 @@
args = (aspect, joinpoint,) + arguments
return woven_code(*args)
+ def call_introduced(self, id, args):
+ info('call to __aop__.call_introduced(%d, *%s)', id, args)
+ woven_code, (aspect, joinpoint, arguments) = self.joinpoints[id]
+ debug('woven_code: %s', woven_code)
+ return woven_code(aspect, *args)
+
+
import __builtin__
__builtin__.__aop__ = Weaver()
del __builtin__
@@ -556,7 +636,7 @@
instance = super(Aspect, cls).__call__(*args, **kwargs)
for name, advice in cls.__dict__.iteritems():
if isinstance(advice, Advice):
- info("registering advice %s.%s", instance.__class__.__name__, name)
+ info("registering advice %s", advice)
__aop__.register_advice(instance, advice)
return instance
Modified: pypy/dist/pypy/lib/app_test/sample_aop_code.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/sample_aop_code.py (original)
+++ pypy/dist/pypy/lib/app_test/sample_aop_code.py Wed Mar 21 00:33:30 2007
@@ -24,10 +24,16 @@
class Mumble:
def __init__(self, param):
self.p = param
- def frobble(self):
- return 3 * self.p
+ def frobble(self, b):
+ return 3 * self.p + b
def __del__(self):
- print 'poof'
+ print 'Mumble goes poof'
+
+def truc():
+ m = Mumble(2)
+ r = m.frobble(1)
+ print 'truc', r, 'expected 7'
+ return r
"""
import os
import os.path as osp
@@ -38,10 +44,15 @@
return osp.join(osp.dirname(__file__), name)
def write_module(name):
+ clean_module(name)
f = open(_make_filename(name), 'w')
f.write(code)
f.close()
def clean_module(name):
- os.unlink(_make_filename(name))
+ name = _make_filename(name)
+ if os.path.isfile(name):
+ os.unlink(name)
+ if os.path.isfile(name+'c'):
+ os.unlink(name+'c')
Modified: pypy/dist/pypy/lib/app_test/test_aop.py
==============================================================================
--- pypy/dist/pypy/lib/app_test/test_aop.py (original)
+++ pypy/dist/pypy/lib/app_test/test_aop.py Wed Mar 21 00:33:30 2007
@@ -60,11 +60,93 @@
assert pc.match_module('logilab.common')
assert not pc.match_module('logilab')
assert not pc.match_module('common.logilab')
+
+ def test_static_pointcut_match(self):
+ from aop import PointCut
+ from parser import ASTClass, ASTPass, ASTFunction
+ pc = PointCut(klass="Mumble")
+ assert pc.match(ASTClass('Mumble', [], None, ASTPass()))
+ assert pc.match(ASTClass('MumblesALot', [], None, ASTPass()))
+ f = ASTFunction(None, 'MumblesALot', [], [], 0, '', ASTPass())
+ f.parent = ASTClass('MumblesALot', [], None, ASTPass())
+ assert not pc.match(f)
+
+ def test_exec_pointcut_match(self):
+ from aop import PointCut
+ from parser import ASTClass, ASTPass, ASTFunction
+ pc = PointCut(klass="Mumble", func='frobble').execution()
+ f = ASTFunction(None, 'frobble', [], [], 0, '', ASTPass())
+ f.parent = ASTClass('MumblesALot', [], None, ASTPass())
+ assert pc.match(f)
+ f.parent.name = 'Babble'
+ assert not pc.match(f)
+ c = ASTClass('frobbles_a_bit', [], None, ASTPass())
+ c.parent = ASTClass('MumblesALot', [], None, ASTPass())
+ assert not pc.match(c)
+
+ def test_call_pointcut_match(self):
+ from aop import PointCut
+ from parser import ASTClass, ASTPass, ASTFunction, ASTName, ASTCallFunc
+ pc = PointCut(klass="Mumble", func='frobble').call()
+ cf = ASTCallFunc( ASTName('frobble'), [], None, None)
+ c = ASTClass('MumblesALot', [], None, ASTPass())
+ cf.parent = c
+ assert pc.match(cf)
+ f = ASTFunction(None, 'frobble', [], [], 0, '', ASTPass())
+ f.parent = c
+ assert not pc.match(f)
+ c2 = ASTClass('frobbles_a_bit', [], None, ASTPass())
+ c2.parent = c
+ assert not pc.match(c2)
+ c.name = 'Babble'
+ assert not pc.match(cf)
+
+ def test_init_pointcut_match(self):
+ from aop import PointCut
+ from parser import ASTClass, ASTPass, ASTFunction
+ pc = PointCut(klass="Mumble").initialization()
+ init = ASTFunction(None, '__init__', [], [], 0, '', ASTPass())
+ c = ASTClass('MumblesALot', [], None, ASTPass())
+ init.parent = c
+ assert pc.match(init)
+ c2 = ASTClass('__init__', [], None, ASTPass())
+ c2.parent = c
+ assert not pc.match(c2)
+ init.name = 'frobble'
+ assert not pc.match(init)
+
+ def test_destruction_pointcut_match(self):
+ from aop import PointCut
+ from parser import ASTClass, ASTPass, ASTFunction, ASTCallFunc, ASTName
+ pc = PointCut(klass="Mumble").destruction()
+ delete = ASTFunction(None, '__del__', [], [], 0, '', ASTPass())
+ c = ASTClass('MumblesALot', [], None, ASTPass())
+ delete.parent = c
+ assert pc.match(delete)
+ c2 = ASTClass('__del__', [], None, ASTPass())
+ c2.parent = c
+ assert not pc.match(c2)
+ delete.name = 'frobble'
+ assert not pc.match(delete)
+
+
+ def test_and_compound_pointcut_match(self):
+ from aop import PointCut
+ from parser import ASTClass, ASTPass, ASTFunction, ASTCallFunc, ASTName
+ pc1 = PointCut(klass="Mumble")
+ pc2 = PointCut(func="frobble")
+ pc = (pc1 & pc2).execution()
+ f = ASTFunction(None, 'frobble', [], [], 0, '', ASTPass())
+ f.parent = ASTClass('MumblesALot', [], None, ASTPass())
+ assert pc.match(f)
+ f.parent.name = 'Babble'
+ assert not pc.match(f)
+ c = ASTClass('frobbles_a_bit', [], None, ASTPass())
+ c.parent = ASTClass('MumblesALot', [], None, ASTPass())
+ assert not pc.match(c)
+
class AppTestWeavingAtExecution(object):
- def setup_class(cls):
- cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
-
def test_simple_aspect_before_execution(self):
from aop import PointCut, Aspect, before
from app_test import sample_aop_code
@@ -95,6 +177,36 @@
assert answ == 47
sample_aop_code.clean_module('aop_before_execution')
+ def test_aspect_before_meth_execution(self):
+ from aop import PointCut, Aspect, before
+ from app_test import sample_aop_code
+ __aop__._clear_all()
+ sample_aop_code.write_module('aop_before_meth_execution')
+
+ class AspectTest:
+ __metaclass__ = Aspect
+ def __init__(self):
+ self.executed = False
+ @before(PointCut(func='frobble', klass='Mumble').execution())
+ def advice_before_meth_execution(self, tjp):
+ self.executed = True
+ self.argnames = tjp._argnames
+ self.flags = tjp._flags
+
+ assert __aop__.advices == []
+ aspect = AspectTest()
+ assert __aop__.advices == [(aspect, AspectTest.advice_before_meth_execution)]
+ assert not aspect.executed
+
+ from app_test import aop_before_meth_execution
+ assert aspect.executed == 0
+ answ = aop_before_meth_execution.truc()
+ assert aspect.executed == 1
+ assert aspect.argnames == ['self', 'b']
+ assert aspect.flags == 0
+ assert answ == 7
+ sample_aop_code.clean_module('aop_before_meth_execution')
+
def test_simple_aspect_after_execution(self):
from aop import PointCut, Aspect, after
from app_test import sample_aop_code
@@ -153,9 +265,6 @@
class AppTestWeavingAtCall(object):
- def setup_class(cls):
- cls.space = gettestobjspace(**{'objspace.usepycfiles':False})
-
def test_simple_aspect_before_call(self):
from aop import PointCut, Aspect, before
from app_test import sample_aop_code
@@ -251,3 +360,27 @@
assert aspect.result == 42
assert answ == 47
sample_aop_code.clean_module('aop_around_call')
+
+
+
+class AppTestWeavingIntroduce(object):
+ def test_introduce(self):
+ from aop import PointCut, Aspect, introduce
+ from app_test import sample_aop_code
+ __aop__._clear_all()
+ sample_aop_code.write_module('aop_introduce')
+ class AspectTest:
+ __metaclass__ = Aspect
+ @introduce(PointCut(klass='Mumble'))
+ def newmethod(self, it, a, b):
+ return it.p*a+b
+
+ aspect = AspectTest()
+ from app_test import aop_introduce
+ c = aop_introduce.Mumble(2)
+ try:
+ answ = c.newmethod(1,3)
+ except Exception, exc:
+ print exc.__class__.__name__, exc
+ assert False
+ assert answ == 5
More information about the pypy-svn
mailing list