[z3-checkins] r26206 - in z3/sqlos/trunk/src/sqlos: . ftests
jinty at codespeak.net
jinty at codespeak.net
Mon Apr 24 03:18:57 CEST 2006
Author: jinty
Date: Mon Apr 24 03:18:48 2006
New Revision: 26206
Modified:
z3/sqlos/trunk/src/sqlos/_transaction.py
z3/sqlos/trunk/src/sqlos/adapter.py
z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py
z3/sqlos/trunk/src/sqlos/zsqlobject.py
Log:
Implement a dirty object registry that keeps track of dirty SQLOS objects and registers a beforeCommitHook. This allows us to expose a syncUpdateAll function which can be used to send all outstanding SQL to the database in mid-transaction.
Modified: z3/sqlos/trunk/src/sqlos/_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/_transaction.py Mon Apr 24 03:18:48 2006
@@ -15,48 +15,25 @@
* Creates a thread local cache of SQLObjects so that cached objects do not
leak into other threads as they would in pure SQLObject.
* Clears the thread local cache at the start of each new transaction.
- * When an object is modified, it registers itself with the data manager
- which, in turn, registers a pre-commit hook. This hook sync's the object
- sending all the SQL down the line before the two phase commit starts.
-
-XXX - There was a reason why we couldn't do this without a data manager, but I
- cannot remember it now. -jinty
+ * When an object is modified, it registers itself with the dirty object
+ registry which, in turn, registers a pre-commit hook. This hook
+ syncUpdates's the object sending all the SQL down the line before the
+ commit starts.
$Id$
"""
__metaclass__ = type
-import sets
-
import transaction
-from transaction import get
-from transaction.interfaces import IDataManager, ISynchronizer
+from transaction.interfaces import ISynchronizer
from zope.interface import implements
-from zope.app.event.objectevent import modified
-from sqlobject import SQLObjectNotFound
from sqlobject.cache import CacheSet
-from zope.security.proxy import removeSecurityProxy
from zope.thread import local
from sqlos.interfaces import ISQLObject
-def beforeCommitHook(obj):
- """Called before transactions are started.
-
- obj is a SQLObject
-
- This normally generates the database activity that pulls the zope.app.rdb
- data manager into the transaction, thus must be called before the
- transaction commits.
-
- Note that it is only called on commit.
- """
- if not obj.sqlmeta._obsolete:
- obj.sync()
-
-
class CacheSynchronizer:
"""Synchronizer to expire the Global per thread cache at transaction start.
@@ -126,144 +103,190 @@
# there must be a dead simple way.
-class SQLObjectTransactionManager:
- """
- This is a very simple Data Manager that just takes registrations
- of ``ISQLObject`` objects and calls their ``sync()`` method when
- needed.
-
- In addition to that, when the transaction is aborted, all modified
- objects will be expired.
+class DirtyObjectRegistry(local):
+ """A thread local registry of dirty SQLObjects.
- Let's see how it works.
+ StubPeople:
- First of all, setup the environment:
-
- >>> from zope.app.testing.placelesssetup import setUp, tearDown
- >>> setUp()
-
- First, register the subscribers and make a test data base:
-
- >>> from sqlos import testing
- >>> from sqlos.testing.sampleperson import SamplePerson
- >>> testdb = testing.TestDB([SamplePerson])
+ >>> synced = []
+ >>> class StubPerson:
+ ... implements(ISQLObject)
+ ... class sqlmeta:
+ ... _obsolete = False
+ ... def __init__(self, name):
+ ... self.name = name
+ ... def syncUpdate(self):
+ ... synced.append(self.name)
+ ... synced.sort()
+
+ Lets get some sample people:
+
+ >>> jhon = StubPerson('jhon')
+ >>> jane = StubPerson('jane')
+
+ Stub out some methods and make a registry:
+
+ >>> oldhook = DirtyObjectRegistry._addBeforeCommitHook
+ >>> def hook(self):
+ ... print 'adding before commit hook'
+ ... assert transaction.get() is self._txn
+ ... oldhook(self)
+ >>> DirtyObjectRegistry._addBeforeCommitHook = hook
+ >>> reg = DirtyObjectRegistry()
+
+ Objects not implementing ISQLObject fail to register:
+
+ >>> reg.register(object()) # doctest: +ELLIPSIS
+ Traceback (most recent call last):
+ ...
+ ValueError: ...
+
+ Register some people and check the internal state:
+
+ >>> reg.register(jhon)
+ adding before commit hook
+ >>> len(reg._objects)
+ 1
+ >>> reg._txn is transaction.get()
+ True
- Now create some SamplePeople for testing:
+ >>> reg.register(jhon)
+ >>> len(reg._objects)
+ 1
+
+ >>> reg.register(jane)
+ >>> len(reg._objects)
+ 2
+
+ Test the thread localness of the registry:
+
+ >>> log = []
+ >>> import threading
+ >>> def logRegisterState():
+ ... log.append(reg._txn)
+ ... log.append(reg._objects)
+ ... log.append(reg._registered)
+ >>> thread = threading.Thread(target=logRegisterState)
+ >>> thread.start()
+ >>> thread.join()
+ >>> log
+ [None, set([]), False]
- >>> person = SamplePerson(fullname='Sidnei', username='sidnei',
- ... password='123')
- >>> person1 = SamplePerson(fullname='Brian', username='jinty',
- ... password='456')
- And commit everything:
+ After a commit we should be able to do it again:
- >>> get().commit()
+ >>> transaction.get().commit()
+ >>> synced
+ ['jane', 'jhon']
- Monkeypatch some methods to ease testing:
+ Register some people and check the internal state:
>>> synced = []
-
- >>> oldsync = SamplePerson.sync
- >>> def sync(self):
- ... synced.append(self.username)
- ... synced.sort()
- ... oldsync(self)
- >>> SamplePerson.sync = sync
-
- Now, we check the initial DataManager state:
-
- >>> person.dirty
- False
- >>> dm = SamplePerson._connection._dm
- >>> dm._objects == sets.Set([])
+ >>> reg.register(jhon)
+ adding before commit hook
+ >>> len(reg._objects)
+ 1
+ >>> reg._txn is transaction.get()
True
- Change something on the people and make sure that they register:
+ >>> reg.register(jhon)
+ >>> len(reg._objects)
+ 1
- >>> person.set(fullname='Sidnei da Silva')
- >>> person.dirty
- True
- >>> dm._objects == sets.Set([person])
- True
-
- >>> person1.set(fullname='Brian Sutherland')
- >>> person1.dirty
- True
- >>> dm._objects == sets.Set([person, person1])
- True
+ >>> reg.register(jane)
+ >>> len(reg._objects)
+ 2
- Commit the transaction:
+ Test that after an abort everything works as expected:
+ >>> transaction.get().abort()
>>> synced
[]
- >>> get().commit()
- >>> synced
- ['jinty', 'sidnei']
-
- >>> synced[:] = []
- Check the state:
+ Register some people and check the internal state:
- >>> person.dirty
- False
- >>> dm._objects == sets.Set([])
+ >>> reg.register(jhon)
+ adding before commit hook
+ >>> len(reg._objects)
+ 1
+ >>> reg._txn is transaction.get()
True
- Now, we change something again to test abort():
+ >>> reg.register(jhon)
+ >>> len(reg._objects)
+ 1
- >>> person.set(fullname='Alan Runyan')
- >>> person.dirty
- True
- >>> dm._objects == sets.Set([person])
- True
+ >>> reg.register(jane)
+ >>> len(reg._objects)
+ 2
- Lets abort the current transaction:
+ We can manually sync all the objects:
+ >>> reg.syncUpdateAll()
>>> synced
- []
- >>> get().abort()
+ ['jane', 'jhon']
+ >>> len(reg._objects)
+ 0
+
+ We will not re-register after a manual sync, but new objects can be
+ registered. A committed transaction will sync them:
+
+ >>> synced = []
+ >>> reg.register(jhon)
+ >>> len(reg._objects)
+ 1
+ >>> transaction.get().commit()
>>> synced
- []
+ ['jhon']
- And then cleanup the monkeypatched method:
+ After an abort, nothing can be synced:
- >>> SamplePerson.sync = oldsync
+ >>> synced = []
+ >>> reg.register(jhon)
+ adding before commit hook
+ >>> len(reg._objects)
+ 1
+ >>> transaction.get().abort()
+ >>> reg.syncUpdateAll()
+ >>> synced
+ []
- And finally call tearDown and cleanup:
+ TearDown:
- >>> testdb.tearDown()
- >>> tearDown()
+ >>> DirtyObjectRegistry._addBeforeCommitHook = oldhook
"""
- implements(IDataManager)
-
def __init__(self):
- self._objects = sets.Set()
- self._joined_txn = False
-
- def prepare(self, txn):
- return True
+ self._txn = None
+ self._objects = set([])
+ self._registered = False
+
+ def syncUpdateAll(self):
+ self._ensureCurrentTxn()
+ while self._objects:
+ obj = self._objects.pop()
+ if not obj.sqlmeta._obsolete:
+ obj.syncUpdate()
- def abort(self, txn):
- self._objects.clear()
- self._joined_txn = False
-
- def commit(self, txn):
- self._objects.clear()
- self._joined_txn = False
+ def _addBeforeCommitHook(self):
+ self._txn.addBeforeCommitHook(self.syncUpdateAll, ())
def register(self, obj):
if not ISQLObject.providedBy(obj):
raise ValueError, ("Only objects that implement ISQLObject "
- "can be registered with this transaction "
- "manager")
- if obj not in self._objects:
- self._objects.add(obj)
- txn = get()
- txn.addBeforeCommitHook(beforeCommitHook, (obj, ))
- if not self._joined_txn:
- txn.join(self)
- self._joined_txn = True
+ "can be registered with this registry.")
+ self._ensureCurrentTxn()
+ if not self._registered:
+ self._addBeforeCommitHook()
+ self._registered = True
+ self._objects.add(obj)
+
+ def _ensureCurrentTxn(self):
+ txn = transaction.get()
+ if self._txn is not txn:
+ # clean up after the last transaction
+ self._objects.clear()
+ self._txn = txn
+ self._registered = False
- def sortKey(self):
- return str(id(self))
+dirty_object_registry = DirtyObjectRegistry()
Modified: z3/sqlos/trunk/src/sqlos/adapter.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/adapter.py (original)
+++ z3/sqlos/trunk/src/sqlos/adapter.py Mon Apr 24 03:18:48 2006
@@ -26,7 +26,6 @@
from zope.interface import implements
from sqlos.interfaces import ISQLObject
-from sqlos._transaction import SQLObjectTransactionManager
from sqlos._transaction import cache_manager
# TODO: it is probably possible to optimize this by not creating a
@@ -43,7 +42,6 @@
self.autoCommit = None
self.debug = 0
self.supportTransactions = False
- self._dm = SQLObjectTransactionManager()
def _get_cache(self):
return cache_manager.cache
Modified: z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py Mon Apr 24 03:18:48 2006
@@ -78,7 +78,8 @@
person.fullname = 'Sidnei Silva'
person.username = 'dreamcatcher'
person.password = 'pass'
- person.sync() # Sunc to make sure that the DB is sent the statements
+ assert person.dirty is True
+ person.sync() # Sync to ensure that the DB is sent the SQL statements
get().abort()
begin()
person = SamplePerson.get(self.personid)
@@ -86,12 +87,62 @@
self.assertEqual(person.fullname, 'Sidnei da Silva')
self.assertEqual(person.username, 'sidnei')
self.assertEqual(person.password, 'test')
+ # commit the next transaction so we can paranoically check
+ get().commit()
+ begin()
+ person = SamplePerson.get(self.personid)
+ self.assertEqual(person.fullname, 'Sidnei da Silva')
else:
# ya well no fine
+ import warnings
+ warnings.warn('SQLObject connections for this database do not '
+ 'support transactions. Not testing if transaction '
+ 'abort works.')
self.assertEqual(person.fullname, 'Sidnei Silva')
self.assertEqual(person.username, 'dreamcatcher')
self.assertEqual(person.password, 'pass')
+ def testAbortAndCommitDirty(self):
+ """Test the commit after an abort in the presence of dirty SQLObjects.
+
+ Here we commit the next transaction with another dirty object
+ just to make sure that remnants from the previous transaction are
+ not committed.
+ Yeah, this is paranoid, but sometimes it pays to be paranoid.
+ """
+ if self.supportTransactions():
+ # make a dirty object
+ person = SamplePerson.get(self.personid)
+ self.assertEqual(person.fullname, 'Sidnei da Silva')
+ person.fullname = 'Sidnei Silva'
+ assert person.dirty is True
+ person.sync() # Sync to ensure that the DB is sent the SQL
+ # abort transaction and start a new one
+ get().abort()
+ begin()
+ # make another dirty object
+ person = SamplePerson.get(self.personid)
+ self.assertEqual(person.fullname, 'Sidnei da Silva')
+ brian = SamplePerson(fullname='Brian Sutherland',
+ username='brian',
+ password='test')
+ brian.fullname = "B. Sutherland" # make the object dirty
+ assert brian.dirty is True
+ brianid = brian.id
+ # commit the second transaction
+ get().commit()
+ begin()
+ person = SamplePerson.get(self.personid)
+ brian = SamplePerson.get(brianid)
+ self.assertEqual(person.fullname, 'Sidnei da Silva')
+ self.assertEqual(brian.fullname, 'B. Sutherland')
+ else:
+ # ya well no fine
+ import warnings
+ warnings.warn('SQLObject connections for this database do not '
+ 'support transactions. Not testing if transaction '
+ 'abort works.')
+
def testCacheThreadIsolation(self):
"""Tests that the changes we make in one thread don't appear in another.
Modified: z3/sqlos/trunk/src/sqlos/zsqlobject.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/zsqlobject.py (original)
+++ z3/sqlos/trunk/src/sqlos/zsqlobject.py Mon Apr 24 03:18:48 2006
@@ -16,6 +16,15 @@
from sqlos.connection import ConnectionDescriptor
from sqlos.interfaces import ISQLObject
+from sqlos import _transaction
+
+def syncUpdateAll():
+ """Calls syncUpdate on all dirty SQLOS objects, sending all SQL to the DB.
+
+ >>> syncUpdateAll()
+ """
+ _transaction.dirty_object_registry.syncUpdateAll()
+
class SQLOS(SQLObject):
"""Subclass SQLObject to enable ``lazy updates`` by default,
@@ -47,14 +56,7 @@
def _set_dirty(self, value):
if value:
- self._connection._dm.register(self)
- # This breaks the transaction functional tests and seems to be an
- # optimization only. So I've commented it out. - jinty 2005-10-5
- #else:
- # # XXX: 'objects' shouldn't really be fiddled directly like this.
- # # Probably there should be an unregister function.
- # # - Andrew Bennetts, 2005-01-05
- # self._connection._dm.objects.discard(self)
+ _transaction.dirty_object_registry.register(self)
self._dirty = value
def _get_dirty(self):
More information about the z3-checkins
mailing list