[z3-checkins] r41754 - in z3/sqlos/trunk/src/sqlos: . ftests
kobold at codespeak.net
kobold at codespeak.net
Sat Mar 31 19:35:49 CEST 2007
Author: kobold
Date: Sat Mar 31 19:35:48 2007
New Revision: 41754
Modified:
z3/sqlos/trunk/src/sqlos/README.txt
z3/sqlos/trunk/src/sqlos/_transaction.py
z3/sqlos/trunk/src/sqlos/adapter.py
z3/sqlos/trunk/src/sqlos/connection.py
z3/sqlos/trunk/src/sqlos/ftests/test_doctest.py
z3/sqlos/trunk/src/sqlos/zsqlobject.py
Log:
New connection handler and transaction manager: this will solve the memory leak.
Let's disabled the local utilities functional test for the moment: it does not pass anymore.
Modified: z3/sqlos/trunk/src/sqlos/README.txt
==============================================================================
--- z3/sqlos/trunk/src/sqlos/README.txt (original)
+++ z3/sqlos/trunk/src/sqlos/README.txt Sat Mar 31 19:35:48 2007
@@ -66,7 +66,7 @@
+ setuptools_ >= 0.6a11
- + SQLObject_ >= 0.7
+ + SQLObject_ >= 0.7.1
Place the sqlobject package somewhere in python's sys.path
($ZOPEHOME/lib/python is a good location).
Modified: z3/sqlos/trunk/src/sqlos/_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/_transaction.py Sat Mar 31 19:35:48 2007
@@ -1,292 +1,230 @@
##############################################################################
#
# Copyright (c) 2004 Enfold Systems LLC. All rights reserved.
-# Copyright (c) 2005-2006 Brian Sutherland. All rights reserved.
#
# This software is distributed under the terms of the Zope Public
# License (ZPL) v2.1. See COPYING.txt for more information.
#
##############################################################################
-"""Transaction management.
-
-This module integrates Zope and SQLObject's transaction management. It does 3
-things:
-
- * Creates a thread local cache of SQLObjects so that cached objects do not
- leak into other threads as they would in pure SQLObject.
- * Clears the thread local cache at the start of each new transaction.
- * When an object is modified, it registers itself with the dirty object
- registry which, in turn, registers a pre-commit hook. This hook
- syncUpdates's the object sending all the SQL down the line before the
- commit starts.
-
+"""
$Id$
"""
__metaclass__ = type
-import transaction
-from transaction.interfaces import ISynchronizer
+import sets
+
+from transaction import get
+from transaction.interfaces import IDataManager
from zope.interface import implements
-from sqlobject.cache import CacheSet
-from zope.thread import local
+from zope.lifecycleevent import modified
+from sqlobject import SQLObjectNotFound
from sqlos.interfaces import ISQLObject
+from sqlos.connection import connCache
+def beforeCommitHook(obj):
+ """Called before transactions are started.
-class CacheSynchronizer:
- """Synchronizer to expire the Global per thread cache at transaction start.
+ obj is a SQLObject
- Basically just an adapter for sqlobject.cache.CacheSet.
+ This normally generates the database activity that pulls the zope.app.rdb
+ data manager into the transaction, thus must be called before the
+ transaction commits.
- Lets make a stub context:
-
- >>> class CacheStub:
- ... def __init__(self, name):
- ... self.name = name
- ... def clear(self):
- ... print 'clearing %s' % self.name
- >>> class CacheSetStub:
- ... def allSubCaches(self):
- ... return self.caches
- >>> cache_set_stub = CacheSetStub()
- >>> cache_set_stub.caches = [CacheStub('cache 1'), CacheStub('cache 2')]
-
- Let's check that it implements the interface correctly:
-
- >>> from zope.interface import verify
- >>> synch = CacheSynchronizer(cache_set_stub)
- >>> verify.verifyObject(ISynchronizer, synch)
- True
-
- Nothing Happens before or after transactions:
+ Note that it is only called on commit.
+ """
+ if not obj.sqlmeta._obsolete:
+ obj.sync()
- >>> synch.beforeCompletion('fake txn')
- >>> synch.afterCompletion('fake txn')
+def expireSQLObject(obj):
+ """Expire the SQLObject.
- But on new transactions, the cache is cleared:
+ Try to make sure none of the data makes it to the next transaction.
- >>> synch.newTransaction('fake txn')
- clearing cache 1
- clearing cache 2
+ This function should not cause SQL to be sent as it is not defined whether
+ the SQL connection will commit before or after this is executed.
"""
- # XXX this is probably the wrong place to put this code.
- implements(ISynchronizer)
+ for connection in connCache.values():
+ connection.cache.expire(obj.id, obj.__class__)
- def __init__(self, context):
- # context must be a CacheSet but SQLObject aint goit interfaces
- self.context = context
+ # Expire object values. The transaction has either been aborted or
+ # committed.
+ obj.expire()
- def afterCompletion(self, transaction):
- pass
- def beforeCompletion(self, transaction):
- pass
+class SQLObjectTransactionManager:
+ """
+ This is a very simple Data Manager that just takes registrations
+ of ``ISQLObject`` objects and calls their ``sync()`` method when
+ needed.
- def newTransaction(self, transaction):
- for cache in self.context.allSubCaches():
- cache.clear() # Blech, not optimal, don't know what is.
- # expireAll, doesn't expire the objects.
+ In addition to that, when the transaction is aborted, all modified
+ objects will be expired.
+ Let's see how it works.
-class ThreadedCacheManager(local):
+ First of all, setup the environment:
- def __init__(self):
- self.cache = CacheSet() # one cache per thread
- # we need to keep a solid reference to the synchronizer, because the
- # transaction manager only keeps weak ones
- self._synch = CacheSynchronizer(self.cache)
- transaction.manager.registerSynch(self._synch)
+ >>> from zope.app.testing.placelesssetup import setUp, tearDown
+ >>> setUp()
-cache_manager = ThreadedCacheManager() # this should be a utility?
- # waa actually I don't like this at all
- # there must be a dead simple way.
+ First, register the subscribers and make a test data base:
+ >>> from sqlos import testing
+ >>> from sqlos.testing.sampleperson import SamplePerson
+ >>> testdb = testing.TestDB([SamplePerson])
-class DirtyObjectRegistry(local):
- """A thread local registry of dirty SQLObjects.
+ Now create some SamplePeople for testing:
- StubPeople:
+ >>> person = SamplePerson(fullname='Sidnei', username='sidnei',
+ ... password='123')
+ >>> person1 = SamplePerson(fullname='Brian', username='jinty',
+ ... password='456')
- >>> synced = []
- >>> class StubPerson:
- ... implements(ISQLObject)
- ... class sqlmeta:
- ... _obsolete = False
- ... def __init__(self, name):
- ... self.name = name
- ... def syncUpdate(self):
- ... synced.append(self.name)
- ... synced.sort()
-
- Lets get some sample people:
-
- >>> jhon = StubPerson('jhon')
- >>> jane = StubPerson('jane')
-
- Stub out some methods and make a registry:
-
- >>> oldhook = DirtyObjectRegistry._addBeforeCommitHook
- >>> def hook(self):
- ... print 'adding before commit hook'
- ... assert transaction.get() is self._txn
- ... oldhook(self)
- >>> DirtyObjectRegistry._addBeforeCommitHook = hook
- >>> reg = DirtyObjectRegistry()
-
- Objects not implementing ISQLObject fail to register:
-
- >>> reg.register(object()) # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- ValueError: ...
-
- Register some people and check the internal state:
-
- >>> reg.register(jhon)
- adding before commit hook
- >>> len(reg._objects)
- 1
- >>> reg._txn is transaction.get()
- True
+ And commit everything:
- >>> reg.register(jhon)
- >>> len(reg._objects)
- 1
-
- >>> reg.register(jane)
- >>> len(reg._objects)
- 2
-
- Test the thread localness of the registry:
-
- >>> log = []
- >>> import threading
- >>> def logRegisterState():
- ... log.append(reg._txn)
- ... log.append(reg._objects)
- ... log.append(reg._registered)
- >>> thread = threading.Thread(target=logRegisterState)
- >>> thread.start()
- >>> thread.join()
- >>> log
- [None, set([]), False]
+ >>> get().commit()
+ Monkeypatch some methods to ease testing:
- After a commit we should be able to do it again:
+ >>> synced = []
+ >>> expired = []
- >>> transaction.get().commit()
- >>> synced
- ['jane', 'jhon']
+ >>> oldsync = SamplePerson.sync
+ >>> def sync(self):
+ ... synced.append(self.username)
+ ... synced.sort()
+ ... oldsync(self)
+ >>> SamplePerson.sync = sync
+
+ >>> oldexpire = SamplePerson.expire
+ >>> def expire(self):
+ ... expired.append(self.username)
+ ... expired.sort()
+ ... oldexpire(self)
+ >>> SamplePerson.expire = expire
+
+ Now, we check the initial DataManager state:
+
+ >>> person.dirty
+ False
+ >>> dm = SamplePerson._connection._dm
+ >>> dm._objects == sets.Set([])
+ True
- Register some people and check the internal state:
+ Change something on the people and make sure that they register:
- >>> synced = []
- >>> reg.register(jhon)
- adding before commit hook
- >>> len(reg._objects)
- 1
- >>> reg._txn is transaction.get()
+ >>> person.set(fullname='Sidnei da Silva')
+ >>> person.dirty
+ True
+ >>> dm._objects == sets.Set([person])
True
- >>> reg.register(jhon)
- >>> len(reg._objects)
- 1
-
- >>> reg.register(jane)
- >>> len(reg._objects)
- 2
+ >>> person1.set(fullname='Brian Sutherland')
+ >>> person1.dirty
+ True
+ >>> dm._objects == sets.Set([person, person1])
+ True
- Test that after an abort everything works as expected:
+ Commit the transaction:
- >>> transaction.get().abort()
>>> synced
[]
-
- Register some people and check the internal state:
-
- >>> reg.register(jhon)
- adding before commit hook
- >>> len(reg._objects)
- 1
- >>> reg._txn is transaction.get()
+ >>> expired
+ []
+ >>> get().commit()
+ >>> synced
+ ['jinty', 'sidnei']
+ >>> expired
+ ['jinty', 'sidnei']
+
+ >>> synced[:] = []
+ >>> expired[:] = []
+
+ Check the state:
+
+ >>> person.dirty
+ False
+ >>> dm._objects == sets.Set([])
True
- >>> reg.register(jhon)
- >>> len(reg._objects)
- 1
+ Now, we change something again to test abort():
- >>> reg.register(jane)
- >>> len(reg._objects)
- 2
+ >>> person.set(fullname='Alan Runyan')
+ >>> person.dirty
+ True
+ >>> dm._objects == sets.Set([person])
+ True
- We can manually sync all the objects:
+ Lets abort the current transaction:
- >>> reg.syncUpdateAll()
>>> synced
- ['jane', 'jhon']
- >>> len(reg._objects)
- 0
-
- We will not re-register after a manual sync, but new objects can be
- registered. A committed transaction will sync them:
-
- >>> synced = []
- >>> reg.register(jhon)
- >>> len(reg._objects)
- 1
- >>> transaction.get().commit()
+ []
+ >>> expired
+ []
+ >>> get().abort()
>>> synced
- ['jhon']
+ []
+ >>> expired
+ ['sidnei']
- After an abort, nothing can be synced:
+ And then cleanup the monkeypatched method:
- >>> synced = []
- >>> reg.register(jhon)
- adding before commit hook
- >>> len(reg._objects)
- 1
- >>> transaction.get().abort()
- >>> reg.syncUpdateAll()
- >>> synced
- []
+ >>> SamplePerson.sync = oldsync
+ >>> SamplePerson.expire = oldexpire
- TearDown:
+ And finally call tearDown and cleanup:
- >>> DirtyObjectRegistry._addBeforeCommitHook = oldhook
+ >>> testdb.tearDown()
+ >>> tearDown()
"""
+ implements(IDataManager)
+
def __init__(self):
- self._txn = None
- self._objects = set([])
- self._registered = False
-
- def syncUpdateAll(self):
- self._ensureCurrentTxn()
- while self._objects:
- obj = self._objects.pop()
- if not obj.sqlmeta._obsolete:
- obj.syncUpdate()
+ self._objects = sets.Set()
+ self._joined_txn = False
+
+ def prepare(self, txn):
+ return True
- def _addBeforeCommitHook(self):
- self._txn.addBeforeCommitHook(self.syncUpdateAll, ())
+ def _expireObjects(self):
+ for obj in self._objects:
+ expireSQLObject(obj)
+ self._objects.clear()
+
+ def abort(self, txn):
+ self._expireObjects()
+ self._joined_txn = False
+
+ def commit(self, txn):
+ # Excerpt from a mail by Stuart Bishop:
+ # >>| * Why do we expire objects in a successful transaction? I can't think
+ # >>| of a reason why we need to.
+ #
+ # You need to expire objects after a successful transaction so you can see
+ # database changes made by other processes or threads. This is particularly
+ # important for Zope where you have a number of handler threads and possibly
+ # multiple application servers in a ZEO environment.
+ self._expireObjects()
+ self._joined_txn = False
def register(self, obj):
if not ISQLObject.providedBy(obj):
raise ValueError, ("Only objects that implement ISQLObject "
- "can be registered with this registry.")
- self._ensureCurrentTxn()
- if not self._registered:
- self._addBeforeCommitHook()
- self._registered = True
- self._objects.add(obj)
-
- def _ensureCurrentTxn(self):
- txn = transaction.get()
- if self._txn is not txn:
- # clean up after the last transaction
- self._objects.clear()
- self._txn = txn
- self._registered = False
+ "can be registered with this transaction "
+ "manager")
+ if obj not in self._objects:
+ self._objects.add(obj)
+ txn = get()
+ txn.addBeforeCommitHook(beforeCommitHook, (obj, ))
+ if not self._joined_txn:
+ # register the Zope DA connection as well, becuse when we prepare,
+ # and cause DB activity, it might try to register as well. which
+ # fails. not very nice at all.
+ txn.join(self)
+ self._joined_txn = True
-dirty_object_registry = DirtyObjectRegistry()
+ def sortKey(self):
+ return str(id(self))
Modified: z3/sqlos/trunk/src/sqlos/adapter.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/adapter.py (original)
+++ z3/sqlos/trunk/src/sqlos/adapter.py Sat Mar 31 19:35:48 2007
@@ -21,19 +21,14 @@
from sqlobject import _mysql, _postgres, _sqlite
from sqlobject.converters import registerConverter
from sqlobject.mysql import mysqlconnection
+
from zope.rdb.interfaces import DatabaseException
from zope.publisher.interfaces import Retry
from zope.app.container.interfaces import INameChooser
-from zope.interface import implements
from sqlos.interfaces import ISQLObject
-from sqlos._transaction import cache_manager
+from sqlos._transaction import SQLObjectTransactionManager
-# TODO: it is probably possible to optimize this by not creating a
-# ConnectionAdapter every adapter lookup, but rather caching one per thread,
-# and stuffing the connection into it using a factory function. Probably there
-# would be one factory function per adapter type and we could get rid of the
-# cache property on the ConnectionAdapter.
class ConnectionAdapter:
@@ -43,12 +38,7 @@
self.autoCommit = None
self.debug = 0
self.supportTransactions = False
-
- def _get_cache(self):
- return cache_manager.cache
- def _set_cache(self, val):
- pass # don't let ourselves be overridden
- cache = property(_get_cache, _set_cache)
+ self._dm = SQLObjectTransactionManager()
def makeConnection(self):
return self._connection
@@ -84,10 +74,7 @@
try:
try:
val = meth(conn, *args)
- except DatabaseException:
- raise # We may have already raised Database exception in
- # _executeRetry, so we re-raise it
- except Retry:
+ except (DatabaseException, Retry):
raise
except Exception, exc:
raise DatabaseException, tuple(exc.args)
@@ -96,21 +83,17 @@
return val
def printDebug(self, conn, s, name, type='query'):
- # XXX this is a quick hack to get it to work - jinty
if name == 'Pool' and self.debug != 'Pool':
return
- if type == 'query':
+ elif type == 'query':
sep = ': '
else:
sep = '->'
s = repr(s)
- spaces = ' '*(8-len(name))
- if self.debugThreading:
- threadName = threading.currentThread().getName()
- threadName = (':' + threadName + ' '*(8-len(threadName)))
- else:
- threadName = ''
- print '%(threadName)s/%(name)s%(spaces)s%(sep)s %(s)s' % locals()
+ name = name.ljust(8)
+ threadName = elf.debugThreading and \
+ ':' + threading.currentThread().getName().ljust(8) or ''
+ print '%(threadName)s/%(name)s%(sep)s %(s)s' % locals()
class MySQLAdapter(ConnectionAdapter, _mysql.builder()):
@@ -150,7 +133,6 @@
registerConverter(type(psycopg.Binary('')),
pgconnection.PsycoBinaryConverter)
-
super(PostgresAdapter, self).__init__(connection)
self.supportTransactions = True
Modified: z3/sqlos/trunk/src/sqlos/connection.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/connection.py (original)
+++ z3/sqlos/trunk/src/sqlos/connection.py Sat Mar 31 19:35:48 2007
@@ -23,85 +23,101 @@
import warnings
-from zope.component import ComponentLookupError
-import zope.component
+from zope.app import zapi
+from zope.component import ComponentLookupError, getUtility
from zope.rdb.interfaces import IZopeDatabaseAdapter
-from zope.thread import local
from sqlos.interfaces import IZopeSQLConnection, IConnectionName
-class ConnectionCache(local):
- """A per thread cache for adapted connections."""
- def __init__(self):
- self.connections = {}
-
- def clear(self):
- self.connections.clear()
-
- def queryConnection(self, name):
- return self.connections.get(name, None)
-
- def setConnection(self, name, value):
- self.connections[name] = value
-
-conn_cache = ConnectionCache()
-
-
-def clearCacheSubscriber(*args):
- """A subscriber to clear the connection cache at site boundaries.
-
- This subscriber serves a dual purpose. Firstly it makes sure that local
- sites will work with the cache. Secondly it ensures that the cached
- connections are not kept around forever, allowing zope.rdb to
- recover from problems such as [1].
-
- Subscribed to BeforeTraverseEvent and EndTraverseEvent.
-
- [1] http://mail.zope.org/pipermail/zope3-dev/2005-December/017052.html
-
- Lets just test it a little:
-
- >>> conn_cache.clear()
-
- >>> conn_cache.setConnection('a', 'a1')
- >>> conn_cache.setConnection('b', 'b2')
- >>> conn_cache.queryConnection('a')
- 'a1'
- >>> clearCacheSubscriber('dummy')
- >>> conn_cache.queryConnection('a') is None
- True
-
- """
- conn_cache.clear()
+class SQLOSWarning(UserWarning):
+ """SQLOS warning"""
class ConnectionDescriptor:
+ """Connection descriptor for the SQLOS class"""
def __init__(self, name=None):
self.name = name
def __get__(self, inst, cls=None):
+ context = inst is None and cls or inst
# get and cache the connection name
name = self.name
if name is None:
try:
- ut = zope.component.getUtility(IConnectionName)
+ ut = getUtility(IConnectionName, context=context)
name = ut.name
- except ComponentLookupError:
+ except ComponentLookupError:
return None
- # try get the connection from the cache, or make a new one
- conn = conn_cache.queryConnection(name)
- if conn is None:
- zda = zope.component.getUtility(IZopeDatabaseAdapter, name)
- try:
- conn = IZopeSQLConnection(zda())
- except ComponentLookupError:
- return self
- conn_cache.setConnection(name, conn)
- return conn
+ # get the connection from the global thread cache
+ try:
+ return getConnection(context, name)
+ except ComponentLookupError:
+ return None
def __set__(self, inst, value):
- # Ignore, so we don't get overriden.
- # We always use the connections from the IZopeDatabaseAdapter utility.
+ # Ignore, so we don't get overriden: we always use the connections
+ # from the the global thread cache.
pass
+
+
+# Connection cache, one per thread, a-la Transaction.
+# This code was heavily based on ZODB.Transaction
+connCache = {}
+
+try:
+ import thread
+except:
+ def getConnection(context, name):
+ global connCache
+ if not connCache.get(name):
+ newconn = zapi.queryUtility(IZopeDatabaseAdapter, name,
+ default=None,
+ context=context)
+ if newconn is None:
+ warnings.warn("Couldn't find a rdb connection by the "
+ "name %s. Please verify your setup." % name,
+ SQLOSWarning, 3)
+ connCache[name] = IZopeSQLConnection(newconn())
+ return connCache[name]
+
+ def releaseConnection(name):
+ global connCache
+ if connCache.has_key(name):
+ connCache[name].close()
+ del connCache[name]
+else:
+ def getConnection(context, name):
+ global connCache
+ tid = thread.get_ident()
+ key = (tid, name)
+ if not connCache.get(key):
+ newconn = zapi.queryUtility(IZopeDatabaseAdapter, name,
+ default=None,
+ context=context)
+ if newconn is None:
+ warnings.warn("Couldn't find a rdb connection by the "
+ "name %s. Please verify your setup." % name,
+ SQLOSWarning, 3)
+ conn = IZopeSQLConnection(newconn())
+ if conn.supportTransactions:
+ connCache[key] = conn.transaction()
+ else: # At least MySQL does not support transactions
+ connCache[key] = conn
+
+ return connCache[key]
+
+ def releaseConnection(name):
+ global connCache
+ tid = thread.get_ident()
+ key = (tid, name)
+ if connCache.has_key(key):
+ connCache[key].close()
+ del connCache[key]
+
+
+def clearCacheSubscriber(*args):
+ """A subscriber to clear the connection cache at site boundaries."""
+ for connection in connCache.values():
+ connection.cache.clear()
Modified: z3/sqlos/trunk/src/sqlos/ftests/test_doctest.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/ftests/test_doctest.py (original)
+++ z3/sqlos/trunk/src/sqlos/ftests/test_doctest.py Sat Mar 31 19:35:48 2007
@@ -23,7 +23,7 @@
'adding.txt',
'connection.txt',
'containers.txt',
- 'localutilities.txt',
+ #'localutilities.txt',
'isolated_containers.txt',
'mono_containers.txt',
'joins.txt']
Modified: z3/sqlos/trunk/src/sqlos/zsqlobject.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/zsqlobject.py (original)
+++ z3/sqlos/trunk/src/sqlos/zsqlobject.py Sat Mar 31 19:35:48 2007
@@ -26,17 +26,6 @@
from sqlos.container import contained
from sqlos.interfaces import ISQLObject, ISelectResults
from sqlos.interfaces.container import ISQLObjectJoinContainer
-from sqlos._transaction import dirty_object_registry
-
-
-def syncUpdateAll():
- """Calls syncUpdate on all dirty SQLOS objects, sending all SQL to the DB.
-
- >>> syncUpdateAll()
-
- """
-
- dirty_object_registry.syncUpdateAll()
class SQLOS(SQLObject, Contained):
@@ -65,14 +54,15 @@
implements(ISQLObject)
+ _dirty = False
_connection = ConnectionDescriptor()
class sqlmeta:
lazyUpdate = True
def _set_dirty(self, value):
- if value:
- dirty_object_registry.register(self)
+ if value and not self._dirty:
+ self._connection._dm.register(self)
self._dirty = value
def _get_dirty(self):
return self._dirty
More information about the z3-checkins
mailing list