[z3-checkins] r21348 - z3/sqlos/trunk/src/sqlos
andres at codespeak.net
andres at codespeak.net
Tue Dec 20 14:53:06 CET 2005
Author: andres
Date: Tue Dec 20 14:53:05 2005
New Revision: 21348
Modified:
z3/sqlos/trunk/src/sqlos/_transaction.py
z3/sqlos/trunk/src/sqlos/adapter.py
Log:
Merged jinty's transaction branch.
Modified: z3/sqlos/trunk/src/sqlos/_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/_transaction.py Tue Dec 20 14:53:05 2005
@@ -15,13 +15,51 @@
import sets
from transaction import get
-
from transaction.interfaces import IDataManager
from zope.interface import implements
from zope.app.event.objectevent import modified
+from sqlobject import SQLObjectNotFound
+from zope.security.proxy import removeSecurityProxy
from sqlos.interfaces import ISQLObject
-from sqlos.event.interfaces import expired
+from sqlos.connection import connCache
+
+def beforeCommitHook(obj):
+ """Called before transactions are started.
+
+ obj is a SQLObject
+
+ This normally generates the database activity that pulls the zope.app.rdb
+ data manager into the transaction, thus must be called before the
+ transaction commits.
+
+ Note that it is only called on commit.
+ """
+ if not obj.sqlmeta._obsolete:
+ obj.sync()
+
+def expireSQLObject(obj):
+ """Expire the SQLObject.
+
+ Try to make sure none of the data makes it to the next transaction.
+
+ This function should not cause SQL to be sent as it is not defined whether
+ the SQL connection will commit before or after this is executed.
+ """
+ for connection in connCache.values():
+ connection.cache.expire(obj.id, obj.__class__)
+
+ # Expire object values. The transaction has either been aborted or
+ # committed.
+ obj.expire()
+
+ # Calling sync() would commit the changes because expire()
+ # doesn't clear _SO_createValues.
+ # XXX this is a workaround for a bug fixed in SQLObject 0.7.1 it's
+ # kept around for compatibility until we decide not to support 0.7
+ # anymore
+ unproxied = removeSecurityProxy(obj)
+ unproxied._SO_createValues = {}
class SQLObjectTransactionManager:
@@ -37,241 +75,148 @@
First of all, setup the environment:
- >>> from zope.app.testing.placelesssetup import setUp, tearDown
- >>> setUp()
+ >>> from zope.app.testing.placelesssetup import setUp, tearDown
+ >>> setUp()
First, register the subscribers and make a test data base:
- >>> from sqlos import testing
- >>> from sqlos.testing.sampleperson import SamplePerson
- >>> testing.setUpSubscribers()
- >>> testdb = testing.TestDB([SamplePerson])
-
- Now create a SamplePerson for testing:
-
- >>> person = SamplePerson(fullname='Sidnei', username='sidnei',
- ... password='123')
- >>> person.id
- 1
- >>> person.fullname
- 'Sidnei'
- >>> person.username
- 'sidnei'
- >>> person.password
- '123'
-
- Create a temp method to ease testing:
-
- >>> oldsync = SamplePerson.syncUpdate
- >>> def syncUpdate(self):
- ... self._sync_called = True
- ... oldsync(self)
- ...
- >>> SamplePerson.syncUpdate = syncUpdate
- >>> SamplePerson._sync_called = False
-
- >>> oldexpire = SamplePerson.expire
- >>> def expire(self):
- ... self._expire_called = True
- ... oldexpire(self)
- ...
- >>> SamplePerson.expire = expire
- >>> SamplePerson._expire_called = False
-
- Now, we check the DataManager state:
-
- >>> person.dirty
- False
- >>> dm = SamplePerson._connection._dm
- >>> dm.state, dm.delta
- (0, 1)
- >>> person.set(fullname='Sidnei da Silva')
- >>> person.dirty
- True
- >>> dm.state, dm.delta
- (0, 2)
-
- Check that syncUpdate() hasn't been called yet:
-
- >>> person._sync_called
- False
- >>> person._expire_called
- False
-
- Prepare the transaction, make sure we get the expected values:
-
- >>> t1 = '1'
- >>> dm.prepare(t1)
- True
- >>> dm.state, dm.delta
- (2, 2)
-
- Make sure prepare can't be called twice:
-
- >>> dm.prepare(t1)
- Traceback (most recent call last):
- ...
- TypeError: Already prepared
-
- Check that syncUpdate() has been called, but not expire:
- XXX I am not sure why syncUpdate is called rather than just sync????
-
- >>> person._sync_called
- True
- >>> person._expire_called
- False
-
- Commit, and make sure syncUpdate and expire have been called:
-
- >>> dm.commit(t1)
- >>> dm.state, dm.delta
- (0, 0)
- >>> person._sync_called
- True
- >>> person._expire_called
- True
- >>> person.dirty
- False
-
- To clean up we also commit the overall transaction:
-
- >>> get().commit()
-
- Now, we do everything again to test abort():
-
- >>> person._sync_called = False
- >>> person._expire_called = False
- >>> person.set(fullname='Alan Runyan')
- >>> person.dirty
- True
- >>> dm.state, dm.delta
- (0, 1)
- >>> dm.prepare(t1)
- True
- >>> dm.state, dm.delta
- (1, 1)
- >>> dm.abort(t1)
- >>> dm.state, dm.delta
- (0, 0)
-
- Make sure syncUpdate and expire have been called:
-
- >>> person._sync_called
- True
- >>> person._expire_called
- True
-
- But the object is not dirty nor expired, because sync() has been
- called to refetch the values from the database:
-
- >>> person.dirty, person._expired
- (False, False)
-
- Lets abort the current transaction to cleanup:
-
- >>> get().abort()
-
- Now lets test the integration with the zope transaction machinery, to make
- sure that the Zope DA is registered for committing and that our transaction
- manager is also called:
-
- >>> oldcommit = SQLObjectTransactionManager.commit
- >>> def commit(self, txn):
- ... print 'committing'
- ... oldcommit(self, txn)
- >>> SQLObjectTransactionManager.commit = commit
- >>> from zope.app.rdb import ZopeConnection
- >>> oldregister = ZopeConnection.registerForTxn
- >>> def register(self):
- ... if not self._txn_registered:
- ... print 'registering zope da for txn'
- ... oldregister(self)
- >>> ZopeConnection.registerForTxn = register
-
- change the sample person and try another commit
- (regression test for http://codespeak.net/issues/z3/issue9):
-
- >>> people = list(SamplePerson.select(SamplePerson.q.username=='sidnei'))
- registering zope da for txn
- >>> len(people)
- 1
- >>> people[0].username = 'alan'
- >>> get().commit()
- committing
+ >>> from sqlos import testing
+ >>> from sqlos.testing.sampleperson import SamplePerson
+ >>> testdb = testing.TestDB([SamplePerson])
+
+ Now create some SamplePeople for testing:
+
+ >>> person = SamplePerson(fullname='Sidnei', username='sidnei',
+ ... password='123')
+ >>> person1 = SamplePerson(fullname='Brian', username='jinty',
+ ... password='456')
+
+ And commit everything:
+
+ >>> get().commit()
+
+ Monkeypatch some methods to ease testing:
+
+ >>> synced = []
+ >>> expired = []
+
+ >>> oldsync = SamplePerson.sync
+ >>> def sync(self):
+ ... synced.append(self.username)
+ ... synced.sort()
+ ... oldsync(self)
+ >>> SamplePerson.sync = sync
+
+ >>> oldexpire = SamplePerson.expire
+ >>> def expire(self):
+ ... expired.append(self.username)
+ ... expired.sort()
+ ... oldexpire(self)
+ >>> SamplePerson.expire = expire
+
+ Now, we check the initial DataManager state:
+
+ >>> person.dirty
+ False
+ >>> dm = SamplePerson._connection._dm
+ >>> dm._objects == sets.Set([])
+ True
+
+ Change something on the people and make sure that they register:
+
+ >>> person.set(fullname='Sidnei da Silva')
+ >>> person.dirty
+ True
+ >>> dm._objects == sets.Set([person])
+ True
+
+ >>> person1.set(fullname='Brian Sutherland')
+ >>> person1.dirty
+ True
+ >>> dm._objects == sets.Set([person, person1])
+ True
+
+ Commit the transaction:
+
+ >>> synced
+ []
+ >>> expired
+ []
+ >>> get().commit()
+ >>> synced
+ ['jinty', 'sidnei']
+ >>> expired
+ ['jinty', 'sidnei']
+
+ >>> synced[:] = []
+ >>> expired[:] = []
+
+ Check the state:
+
+ >>> person.dirty
+ False
+ >>> dm._objects == sets.Set([])
+ True
+
+ Now, we change something again to test abort():
+
+ >>> person.set(fullname='Alan Runyan')
+ >>> person.dirty
+ True
+ >>> dm._objects == sets.Set([person])
+ True
+
+ Lets abort the current transaction:
+
+ >>> synced
+ []
+ >>> expired
+ []
+ >>> get().abort()
+ >>> synced
+ []
+ >>> expired
+ ['sidnei']
And then cleanup the monkeypatched method:
- >>> SamplePerson.syncUpdate = oldsync
- >>> del SamplePerson._sync_called
- >>> SamplePerson.expire = oldexpire
- >>> del SamplePerson._expire_called
- >>> SQLObjectTransactionManager.commit = oldcommit
- >>> ZopeConnection.registerForTxn = oldregister
+ >>> SamplePerson.sync = oldsync
+ >>> SamplePerson.expire = oldexpire
And finally call tearDown and cleanup:
- >>> testdb.tearDown()
- >>> tearDown()
+ >>> testdb.tearDown()
+ >>> tearDown()
"""
implements(IDataManager)
- def __init__(self, sqlconnection):
- self.objects = sets.Set()
- self.sqlconnection = sqlconnection
- self.state = 0
- self.prepared = False
+ def __init__(self):
+ self._objects = sets.Set()
self._joined_txn = False
- self.delta = 0
- self.transaction = None
- def _checkTransaction(self, txn):
- if (txn is not self.transaction
- and self.transaction is not None):
- raise TypeError("Transaction missmatch",
- txn, self.transaction)
def prepare(self, txn):
- if self.prepared:
- raise TypeError('Already prepared')
- self.prepared = True
- self._checkTransaction(txn)
- self.transaction = txn
- self.state += self.delta
- if self.objects:
- for obj in list(self.objects):
- # XXX: This _SO_obsolete check is a workaround until we get a
- # newer version of sqlos.
- # - Andrew Bennetts, 2004-10-25
- if not obj.sqlmeta._obsolete:
- obj.sync()
return True
+ def _expireObjects(self):
+ for obj in self._objects:
+ expireSQLObject(obj)
+ self._objects.clear()
+
def abort(self, txn):
- self._checkTransaction(txn)
- if self.transaction is not None:
- self.transaction = None
-
- self.state -= self.delta
- self.prepared = False
- self.delta = 0
- for obj in self.objects:
- expired(obj)
- self.objects.clear()
+ self._expireObjects()
self._joined_txn = False
def commit(self, txn):
- if not self.prepared:
- raise TypeError('Not prepared to commit')
- self._checkTransaction(txn)
- self.state -= self.delta
- self.delta = 0
- self.transaction = None
- self.prepared = False
- for obj in self.objects:
- # XXX Can't know if modified has been fired yet. Maybe we
- # should *not* fire a modified event here?
- modified(obj)
- self.objects.clear()
+ # Excerpt from a mail by Stuart Bishop:
+ # >>| * Why do we expire objects in a successful transaction? I can't think
+ # >>| of a reason why we need to.
+ #
+ # You need to expire objects after a successful transaction so you can see
+ # database changes made by other processes or threads. This is particularly
+ # important for Zope where you have a number of handler threads and possibly
+ # multiple application servers in a ZEO environment.
+ self._expireObjects()
self._joined_txn = False
def register(self, obj):
@@ -279,15 +224,16 @@
raise ValueError, ("Only objects that implement ISQLObject "
"can be registered with this transaction "
"manager")
- self.objects.add(obj)
- self.delta += 1
- if not self._joined_txn:
- # register the Zope DA connection as well, becuse when we prepare,
- # and cause DB activity, it might try to register as well. which
- # fails. not very nice at all.
- self.sqlconnection.registerForTxn()
- get().join(self)
- self._joined_txn = True
+ if obj not in self._objects:
+ self._objects.add(obj)
+ txn = get()
+ txn.addBeforeCommitHook(beforeCommitHook, (obj, ))
+ if not self._joined_txn:
+ # register the Zope DA connection as well, becuse when we prepare,
+ # and cause DB activity, it might try to register as well. which
+ # fails. not very nice at all.
+ txn.join(self)
+ self._joined_txn = True
def sortKey(self):
return str(id(self))
Modified: z3/sqlos/trunk/src/sqlos/adapter.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/adapter.py (original)
+++ z3/sqlos/trunk/src/sqlos/adapter.py Tue Dec 20 14:53:05 2005
@@ -32,7 +32,7 @@
self.autoCommit = None
self.debug = 0
self.supportTransactions = False
- self._dm = SQLObjectTransactionManager(connection)
+ self._dm = SQLObjectTransactionManager()
def makeConnection(self):
return self._connection
More information about the z3-checkins
mailing list