[z3-checkins] r26206 - in z3/sqlos/trunk/src/sqlos: . ftests

jinty at codespeak.net jinty at codespeak.net
Mon Apr 24 03:18:57 CEST 2006


Author: jinty
Date: Mon Apr 24 03:18:48 2006
New Revision: 26206

Modified:
   z3/sqlos/trunk/src/sqlos/_transaction.py
   z3/sqlos/trunk/src/sqlos/adapter.py
   z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py
   z3/sqlos/trunk/src/sqlos/zsqlobject.py
Log:
Implement a dirty object registry that keeps track of dirty SQLOS objects and registers a beforeCommitHook. This allows us to expose a syncUpdateAll function which can be used to send all outstanding SQL to the database in mid-transaction.

Modified: z3/sqlos/trunk/src/sqlos/_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/_transaction.py	(original)
+++ z3/sqlos/trunk/src/sqlos/_transaction.py	Mon Apr 24 03:18:48 2006
@@ -15,48 +15,25 @@
     * Creates a thread local cache of SQLObjects so that cached objects do not
       leak into other threads as they would in pure SQLObject.
     * Clears the thread local cache at the start of each new transaction.
-    * When an object is modified, it registers itself with the data manager
-      which, in turn, registers a pre-commit hook. This hook sync's the object
-      sending all the SQL down the line before the two phase commit starts.
-
-XXX - There was a reason why we couldn't do this without a data manager, but I
-      cannot remember it now. -jinty
+    * When an object is modified, it registers itself with the dirty object
+      registry which, in turn, registers a pre-commit hook. This hook
+      syncUpdates's the object sending all the SQL down the line before the
+      commit starts.
 
 $Id$
 """
 
 __metaclass__ = type
 
-import sets
-
 import transaction
-from transaction import get
-from transaction.interfaces import IDataManager, ISynchronizer
+from transaction.interfaces import ISynchronizer
 from zope.interface import implements
-from zope.app.event.objectevent import modified
-from sqlobject import SQLObjectNotFound
 from sqlobject.cache import CacheSet
-from zope.security.proxy import removeSecurityProxy
 from zope.thread import local
 
 from sqlos.interfaces import ISQLObject
 
 
-def beforeCommitHook(obj):
-    """Called before transactions are started.
-
-    obj is a SQLObject
-
-    This normally generates the database activity that pulls the zope.app.rdb
-    data manager into the transaction, thus must be called before the
-    transaction commits.
-
-    Note that it is only called on commit.
-    """
-    if not obj.sqlmeta._obsolete:
-        obj.sync()
-
-
 class CacheSynchronizer:
     """Synchronizer to expire the Global per thread cache at transaction start.
 
@@ -126,144 +103,190 @@
                                        # there must be a dead simple way.
 
 
-class SQLObjectTransactionManager:
-    """
-    This is a very simple Data Manager that just takes registrations
-    of ``ISQLObject`` objects and calls their ``sync()`` method when
-    needed.
-
-    In addition to that, when the transaction is aborted, all modified
-    objects will be expired.
+class DirtyObjectRegistry(local):
+    """A thread local registry of dirty SQLObjects.
 
-    Let's see how it works.
+    StubPeople:
 
-    First of all, setup the environment:
-
-        >>> from zope.app.testing.placelesssetup import setUp, tearDown
-        >>> setUp()
-
-    First, register the subscribers and make a test data base:
-
-        >>> from sqlos import testing
-        >>> from sqlos.testing.sampleperson import SamplePerson
-        >>> testdb = testing.TestDB([SamplePerson])
+        >>> synced = []
+        >>> class StubPerson:
+        ...     implements(ISQLObject)
+        ...     class sqlmeta:
+        ...         _obsolete = False
+        ...     def __init__(self, name):
+        ...         self.name = name
+        ...     def syncUpdate(self):
+        ...         synced.append(self.name)
+        ...         synced.sort()
+
+    Lets get some sample people:
+
+        >>> jhon = StubPerson('jhon')
+        >>> jane = StubPerson('jane')
+
+    Stub out some methods and make a registry:
+
+        >>> oldhook = DirtyObjectRegistry._addBeforeCommitHook
+        >>> def hook(self):
+        ...     print 'adding before commit hook'
+        ...     assert transaction.get() is self._txn
+        ...     oldhook(self)
+        >>> DirtyObjectRegistry._addBeforeCommitHook = hook
+        >>> reg = DirtyObjectRegistry()
+
+    Objects not implementing ISQLObject fail to register:
+
+        >>> reg.register(object()) # doctest: +ELLIPSIS
+        Traceback (most recent call last):
+            ...
+        ValueError: ...
+
+    Register some people and check the internal state:
+
+        >>> reg.register(jhon)
+        adding before commit hook
+        >>> len(reg._objects)
+        1
+        >>> reg._txn is transaction.get()
+        True
 
-    Now create some SamplePeople for testing:
+        >>> reg.register(jhon)
+        >>> len(reg._objects)
+        1
+
+        >>> reg.register(jane)
+        >>> len(reg._objects)
+        2
+
+    Test the thread localness of the registry:
+
+        >>> log = []
+        >>> import threading
+        >>> def logRegisterState():
+        ...     log.append(reg._txn)
+        ...     log.append(reg._objects)
+        ...     log.append(reg._registered)
+        >>> thread = threading.Thread(target=logRegisterState)
+        >>> thread.start()
+        >>> thread.join()
+        >>> log
+        [None, set([]), False]
 
-        >>> person = SamplePerson(fullname='Sidnei', username='sidnei',
-        ... password='123')
-        >>> person1 = SamplePerson(fullname='Brian', username='jinty',
-        ... password='456')
 
-    And commit everything:
+    After a commit we should be able to do it again:
 
-        >>> get().commit()
+        >>> transaction.get().commit()
+        >>> synced
+        ['jane', 'jhon']
 
-    Monkeypatch some methods to ease testing:
+    Register some people and check the internal state:
 
         >>> synced = []
-
-        >>> oldsync = SamplePerson.sync
-        >>> def sync(self):
-        ...     synced.append(self.username)
-        ...     synced.sort()
-        ...     oldsync(self)
-        >>> SamplePerson.sync = sync
-
-    Now, we check the initial DataManager state:
-
-        >>> person.dirty
-        False
-        >>> dm = SamplePerson._connection._dm
-        >>> dm._objects == sets.Set([])
+        >>> reg.register(jhon)
+        adding before commit hook
+        >>> len(reg._objects)
+        1
+        >>> reg._txn is transaction.get()
         True
 
-    Change something on the people and make sure that they register:
+        >>> reg.register(jhon)
+        >>> len(reg._objects)
+        1
 
-        >>> person.set(fullname='Sidnei da Silva')
-        >>> person.dirty
-        True
-        >>> dm._objects == sets.Set([person])
-        True
-
-        >>> person1.set(fullname='Brian Sutherland')
-        >>> person1.dirty
-        True
-        >>> dm._objects == sets.Set([person, person1])
-        True
+        >>> reg.register(jane)
+        >>> len(reg._objects)
+        2
 
-    Commit the transaction:
+    Test that after an abort everything works as expected:
 
+        >>> transaction.get().abort()
         >>> synced
         []
-        >>> get().commit()
-        >>> synced
-        ['jinty', 'sidnei']
-
-        >>> synced[:] = []
 
-    Check the state:
+    Register some people and check the internal state:
 
-        >>> person.dirty
-        False
-        >>> dm._objects == sets.Set([])
+        >>> reg.register(jhon)
+        adding before commit hook
+        >>> len(reg._objects)
+        1
+        >>> reg._txn is transaction.get()
         True
 
-    Now, we change something again to test abort():
+        >>> reg.register(jhon)
+        >>> len(reg._objects)
+        1
 
-        >>> person.set(fullname='Alan Runyan')
-        >>> person.dirty
-        True
-        >>> dm._objects == sets.Set([person])
-        True
+        >>> reg.register(jane)
+        >>> len(reg._objects)
+        2
 
-    Lets abort the current transaction:
+    We can manually sync all the objects:
 
+        >>> reg.syncUpdateAll()
         >>> synced
-        []
-        >>> get().abort()
+        ['jane', 'jhon']
+        >>> len(reg._objects)
+        0
+
+    We will not re-register after a manual sync, but new objects can be
+    registered. A committed transaction will sync them:
+
+        >>> synced = []
+        >>> reg.register(jhon)
+        >>> len(reg._objects)
+        1
+        >>> transaction.get().commit()
         >>> synced
-        []
+        ['jhon']
 
-    And then cleanup the monkeypatched method:
+    After an abort, nothing can be synced:
 
-        >>> SamplePerson.sync = oldsync
+        >>> synced = []
+        >>> reg.register(jhon)
+        adding before commit hook
+        >>> len(reg._objects)
+        1
+        >>> transaction.get().abort()
+        >>> reg.syncUpdateAll()
+        >>> synced
+        []
 
-    And finally call tearDown and cleanup:
+    TearDown:
 
-        >>> testdb.tearDown()
-        >>> tearDown()
+        >>> DirtyObjectRegistry._addBeforeCommitHook = oldhook
     """
 
-    implements(IDataManager)
-
     def __init__(self):
-        self._objects = sets.Set()
-        self._joined_txn = False
-
-    def prepare(self, txn):
-        return True
+        self._txn = None
+        self._objects = set([])
+        self._registered = False
+
+    def syncUpdateAll(self):
+        self._ensureCurrentTxn()
+        while self._objects:
+            obj = self._objects.pop()
+            if not obj.sqlmeta._obsolete:
+                obj.syncUpdate()
 
-    def abort(self, txn):
-        self._objects.clear()
-        self._joined_txn = False
-
-    def commit(self, txn):
-        self._objects.clear()
-        self._joined_txn = False
+    def  _addBeforeCommitHook(self):
+        self._txn.addBeforeCommitHook(self.syncUpdateAll, ())
 
     def register(self, obj):
         if not ISQLObject.providedBy(obj):
             raise ValueError, ("Only objects that implement ISQLObject "
-                               "can be registered with this transaction "
-                               "manager")
-        if obj not in self._objects:
-            self._objects.add(obj)
-            txn = get()
-            txn.addBeforeCommitHook(beforeCommitHook, (obj, ))
-            if not self._joined_txn:
-                txn.join(self)
-                self._joined_txn = True
+                               "can be registered with this registry.")
+        self._ensureCurrentTxn()
+        if not self._registered:
+            self._addBeforeCommitHook()
+            self._registered = True
+        self._objects.add(obj)
+
+    def _ensureCurrentTxn(self):
+        txn = transaction.get()
+        if self._txn is not txn:
+            # clean up after the last transaction
+            self._objects.clear()
+            self._txn = txn
+            self._registered = False
 
-    def sortKey(self):
-        return str(id(self))
+dirty_object_registry = DirtyObjectRegistry()

Modified: z3/sqlos/trunk/src/sqlos/adapter.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/adapter.py	(original)
+++ z3/sqlos/trunk/src/sqlos/adapter.py	Mon Apr 24 03:18:48 2006
@@ -26,7 +26,6 @@
 from zope.interface import implements
 
 from sqlos.interfaces import ISQLObject
-from sqlos._transaction import SQLObjectTransactionManager
 from sqlos._transaction import cache_manager
 
 # TODO: it is probably possible to optimize this by not creating a
@@ -43,7 +42,6 @@
         self.autoCommit = None
         self.debug = 0
         self.supportTransactions = False
-        self._dm = SQLObjectTransactionManager()
 
     def _get_cache(self):
         return cache_manager.cache

Modified: z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py	(original)
+++ z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py	Mon Apr 24 03:18:48 2006
@@ -78,7 +78,8 @@
         person.fullname = 'Sidnei Silva'
         person.username = 'dreamcatcher'
         person.password = 'pass'
-        person.sync() # Sunc to make sure that the DB is sent the statements
+        assert person.dirty is True
+        person.sync() # Sync to ensure that the DB is sent the SQL statements
         get().abort()
         begin()
         person = SamplePerson.get(self.personid)
@@ -86,12 +87,62 @@
             self.assertEqual(person.fullname, 'Sidnei da Silva')
             self.assertEqual(person.username, 'sidnei')
             self.assertEqual(person.password, 'test')
+            # commit the next transaction so we can paranoically check
+            get().commit()
+            begin()
+            person = SamplePerson.get(self.personid)
+            self.assertEqual(person.fullname, 'Sidnei da Silva')
         else:
             # ya well no fine
+            import warnings
+            warnings.warn('SQLObject connections for this database do not '
+                          'support transactions. Not testing if transaction '
+                          'abort works.')
             self.assertEqual(person.fullname, 'Sidnei Silva')
             self.assertEqual(person.username, 'dreamcatcher')
             self.assertEqual(person.password, 'pass')
 
+    def testAbortAndCommitDirty(self):
+        """Test the commit after an abort in the presence of dirty SQLObjects.
+
+        Here we commit the next transaction with another dirty object
+        just to make sure that remnants from the previous transaction are
+        not committed.
+        Yeah, this is paranoid, but sometimes it pays to be paranoid.
+        """
+        if self.supportTransactions():
+            # make a dirty object
+            person = SamplePerson.get(self.personid)
+            self.assertEqual(person.fullname, 'Sidnei da Silva')
+            person.fullname = 'Sidnei Silva'
+            assert person.dirty is True
+            person.sync() # Sync to ensure that the DB is sent the SQL
+            # abort transaction and start a new one
+            get().abort()
+            begin()
+            # make another dirty object
+            person = SamplePerson.get(self.personid)
+            self.assertEqual(person.fullname, 'Sidnei da Silva')
+            brian = SamplePerson(fullname='Brian Sutherland',
+                                 username='brian',
+                                 password='test')
+            brian.fullname = "B. Sutherland" # make the object dirty
+            assert brian.dirty is True
+            brianid = brian.id
+            # commit the second transaction
+            get().commit()
+            begin()
+            person = SamplePerson.get(self.personid)
+            brian = SamplePerson.get(brianid)
+            self.assertEqual(person.fullname, 'Sidnei da Silva')
+            self.assertEqual(brian.fullname, 'B. Sutherland')
+        else:
+            # ya well no fine
+            import warnings
+            warnings.warn('SQLObject connections for this database do not '
+                          'support transactions. Not testing if transaction '
+                          'abort works.')
+
     def testCacheThreadIsolation(self):
         """Tests that the changes we make in one thread don't appear in another.
 

Modified: z3/sqlos/trunk/src/sqlos/zsqlobject.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/zsqlobject.py	(original)
+++ z3/sqlos/trunk/src/sqlos/zsqlobject.py	Mon Apr 24 03:18:48 2006
@@ -16,6 +16,15 @@
 
 from sqlos.connection import ConnectionDescriptor
 from sqlos.interfaces import ISQLObject
+from sqlos import _transaction
+
+def syncUpdateAll():
+    """Calls syncUpdate on all dirty SQLOS objects, sending all SQL to the DB.
+
+        >>> syncUpdateAll()
+    """
+    _transaction.dirty_object_registry.syncUpdateAll()
+
 
 class SQLOS(SQLObject):
     """Subclass SQLObject to enable ``lazy updates`` by default,
@@ -47,14 +56,7 @@
 
     def _set_dirty(self, value):
         if value:
-            self._connection._dm.register(self)
-        # This breaks the transaction functional tests and seems to be an
-        # optimization only. So I've commented it out. - jinty 2005-10-5
-        #else:
-        #    # XXX: 'objects' shouldn't really be fiddled directly like this.
-        #    # Probably there should be an unregister function.
-        #    #   - Andrew Bennetts, 2005-01-05
-        #    self._connection._dm.objects.discard(self)
+            _transaction.dirty_object_registry.register(self)
         self._dirty = value
 
     def _get_dirty(self):


More information about the z3-checkins mailing list