[z3-checkins] r22260 - in z3/sqlos/trunk/src/sqlos: . ftests
testing tests
jinty at codespeak.net
jinty at codespeak.net
Tue Jan 17 21:22:28 CET 2006
Author: jinty
Date: Tue Jan 17 21:22:24 2006
New Revision: 22260
Modified:
z3/sqlos/trunk/src/sqlos/_transaction.py
z3/sqlos/trunk/src/sqlos/adapter.py
z3/sqlos/trunk/src/sqlos/connection.py
z3/sqlos/trunk/src/sqlos/ftests/connection.txt
z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py
z3/sqlos/trunk/src/sqlos/testing/testdb.py
z3/sqlos/trunk/src/sqlos/tests/test_transaction.py
Log:
Well, here goes, hold on to something. Merged from jinty-connection:
------------------------------------------------------------------------
r22132 | jinty | 2006-01-14 07:52:00 +0100 (Sat, 14 Jan 2006) | 6 lines
Clean up the transaction manager so that it doesn't expire changed objects
anymore. Then fix the resuting test breakage.
A caveat of this work is that you should never, never keep a reference to a
SQLOS object over a transaction. Just don't do it.
------------------------------------------------------------------------
r22131 | jinty | 2006-01-14 07:22:29 +0100 (Sat, 14 Jan 2006) | 1 line
Move the cache to _transaction.py and register a synchronizer.
------------------------------------------------------------------------
r22128 | jinty | 2006-01-13 20:34:09 +0100 (Fri, 13 Jan 2006) | 1 line
Add a new test to check that one thread can immediately see the changes committed in another one
------------------------------------------------------------------------
r22125 | jinty | 2006-01-13 20:15:59 +0100 (Fri, 13 Jan 2006) | 1 line
Use one cache per thread, globally. This foxes the last test committed, but is still a bit broken.
------------------------------------------------------------------------
r22124 | jinty | 2006-01-13 20:04:10 +0100 (Fri, 13 Jan 2006) | 1 line
Make a test that shows what happens when you use a global cache shared between threads.
------------------------------------------------------------------------
r22110 | jinty | 2006-01-13 14:34:19 +0100 (Fri, 13 Jan 2006) | 8 lines
Remove our one connection per thread cache. We don't need it anymore as
zope.app.rdb has one as well since 30682. And that's the right place for
this kind of stuff.
Also this solves a serious bug where if the connection was disconnected
and zope.app.rdb dealt with this (though it doesn't yet), then the
connection would stay in the cache forever requiring a server restart.
------------------------------------------------------------------------
r22105 | jinty | 2006-01-13 13:12:43 +0100 (Fri, 13 Jan 2006) | 12 lines
Make changes to the adapter so that it ignores the thread safe pool of
connections that the sqlobject dbconnection maintains. Also fix up the handling
of DatabaseException errors. This brings the number of "thread safe ways of
managing connections" that we use from 3 to 2!!!
With the above changes we can remove the use of the sqlobject Transaction
object for databases that support transactions. This is a preclude to getting
rid of our connection cache.
Seeing as the connection cache will die soon, we rather just test our
connection descripter.
Modified: z3/sqlos/trunk/src/sqlos/_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/_transaction.py Tue Jan 17 21:22:24 2006
@@ -14,15 +14,55 @@
import sets
+import transaction
from transaction import get
-from transaction.interfaces import IDataManager
+from transaction.interfaces import IDataManager, ISynchronizer
from zope.interface import implements
from zope.app.event.objectevent import modified
from sqlobject import SQLObjectNotFound
+from sqlobject.cache import CacheSet
from zope.security.proxy import removeSecurityProxy
+from zope.thread import local
from sqlos.interfaces import ISQLObject
-from sqlos.connection import connCache
+
+cache = local() # global cache of sqlobjects, one per thread
+
+def getGlobalPerThreadCache():
+ c = getattr(cache, 'cacheset', None)
+ if c is None:
+ cache.cacheset = CacheSet()
+ c = cache.cacheset
+ return c
+
+class CacheSynchronizer:
+ """Synchronizer to expire the Global per thread cache at transaction start.
+
+ Let's check that it implements the interface correctly:
+
+ >>> from zope.interface import verify
+ >>> synch = CacheSynchronizer()
+ >>> verify.verifyObject(ISynchronizer, synch)
+ True
+ """
+ # XXX this is probably the wrong place to put this code.
+ implements(ISynchronizer)
+
+ def afterCompletion(self, transaction):
+ pass
+
+ def beforeCompletion(self, transaction):
+ pass
+
+ def newTransaction(self, transaction):
+ cacheset = getGlobalPerThreadCache()
+ for cache in cacheset.allSubCaches():
+ cache.clear() # Blech, not optimal, don't know what is.
+ # expireAll, doesn't expire the objects.
+
+_synch = CacheSynchronizer() # we need to keep a solid reference, because the
+ # transaction manager only keeps weak ones
+transaction.manager.registerSynch(_synch)
def beforeCommitHook(obj):
"""Called before transactions are started.
@@ -38,29 +78,6 @@
if not obj.sqlmeta._obsolete:
obj.sync()
-def expireSQLObject(obj):
- """Expire the SQLObject.
-
- Try to make sure none of the data makes it to the next transaction.
-
- This function should not cause SQL to be sent as it is not defined whether
- the SQL connection will commit before or after this is executed.
- """
- for connection in connCache.values():
- connection.cache.expire(obj.id, obj.__class__)
-
- # Expire object values. The transaction has either been aborted or
- # committed.
- obj.expire()
-
- # Calling sync() would commit the changes because expire()
- # doesn't clear _SO_createValues.
- # XXX this is a workaround for a bug fixed in SQLObject 0.7.1 it's
- # kept around for compatibility until we decide not to support 0.7
- # anymore
- unproxied = removeSecurityProxy(obj)
- unproxied._SO_createValues = {}
-
class SQLObjectTransactionManager:
"""
@@ -98,7 +115,6 @@
Monkeypatch some methods to ease testing:
>>> synced = []
- >>> expired = []
>>> oldsync = SamplePerson.sync
>>> def sync(self):
@@ -107,13 +123,6 @@
... oldsync(self)
>>> SamplePerson.sync = sync
- >>> oldexpire = SamplePerson.expire
- >>> def expire(self):
- ... expired.append(self.username)
- ... expired.sort()
- ... oldexpire(self)
- >>> SamplePerson.expire = expire
-
Now, we check the initial DataManager state:
>>> person.dirty
@@ -140,16 +149,11 @@
>>> synced
[]
- >>> expired
- []
>>> get().commit()
>>> synced
['jinty', 'sidnei']
- >>> expired
- ['jinty', 'sidnei']
>>> synced[:] = []
- >>> expired[:] = []
Check the state:
@@ -170,18 +174,13 @@
>>> synced
[]
- >>> expired
- []
>>> get().abort()
>>> synced
[]
- >>> expired
- ['sidnei']
And then cleanup the monkeypatched method:
>>> SamplePerson.sync = oldsync
- >>> SamplePerson.expire = oldexpire
And finally call tearDown and cleanup:
@@ -198,25 +197,12 @@
def prepare(self, txn):
return True
- def _expireObjects(self):
- for obj in self._objects:
- expireSQLObject(obj)
- self._objects.clear()
-
def abort(self, txn):
- self._expireObjects()
+ self._objects.clear()
self._joined_txn = False
def commit(self, txn):
- # Excerpt from a mail by Stuart Bishop:
- # >>| * Why do we expire objects in a successful transaction? I can't think
- # >>| of a reason why we need to.
- #
- # You need to expire objects after a successful transaction so you can see
- # database changes made by other processes or threads. This is particularly
- # important for Zope where you have a number of handler threads and possibly
- # multiple application servers in a ZEO environment.
- self._expireObjects()
+ self._objects.clear()
self._joined_txn = False
def register(self, obj):
@@ -229,9 +215,6 @@
txn = get()
txn.addBeforeCommitHook(beforeCommitHook, (obj, ))
if not self._joined_txn:
- # register the Zope DA connection as well, becuse when we prepare,
- # and cause DB activity, it might try to register as well. which
- # fails. not very nice at all.
txn.join(self)
self._joined_txn = True
Modified: z3/sqlos/trunk/src/sqlos/adapter.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/adapter.py (original)
+++ z3/sqlos/trunk/src/sqlos/adapter.py Tue Jan 17 21:22:24 2006
@@ -16,13 +16,13 @@
from sqlobject import _mysql, _postgres, _sybase, _sqlite
from sqlobject.converters import registerConverter
from sqlobject.mysql import mysqlconnection
-
from zope.app.rdb.interfaces import DatabaseException
from zope.app.container.interfaces import INameChooser
+from zope.interface import implements
from sqlos.interfaces import ISQLObject
from sqlos._transaction import SQLObjectTransactionManager
-
+from sqlos._transaction import getGlobalPerThreadCache
class ConnectionAdapter:
@@ -34,14 +34,47 @@
self.supportTransactions = False
self._dm = SQLObjectTransactionManager()
+ def _get_cache(self):
+ return getGlobalPerThreadCache()
+ def _set_cache(self, val):
+ pass # don't let ourselves be overridden
+ cache = property(_get_cache, _set_cache)
+
def makeConnection(self):
return self._connection
+ def getConnection(self):
+ # we override this because we really don't care about sqlobjects idea
+ # of threadsafe pool of db connections. zope.app.rdb takes care of
+ # that for us
+ return self.makeConnection()
+
+ def releaseConnection(self, conn, explicit=False):
+ # we simply lobotomize this method because zope.app.rdb
+ # takes care of all commits/rollbacks as well as all threading issues
+ # with connections
+ pass
+
+ def close(self):
+ # there is no sane way of doing this I can see. It would be nice to
+ # raise an error here, but sqlobject.dbconnection registers this method
+ # to be called when python exits.
+ pass
+
+ def _executeRetry(self, conn, cursor, query):
+ try:
+ return cursor.execute(query)
+ except Exception, exc:
+ raise DatabaseException(str(exc.args))
+
def _runWithConnection(self, meth, *args):
conn = self.getConnection()
try:
try:
val = meth(conn, *args)
+ except DatabaseException:
+ raise # We may have already raised Database exception in
+ # _executeRetry, so we re-raise it
except Exception, exc:
raise DatabaseException, tuple(exc.args)
finally:
@@ -59,7 +92,6 @@
super(MySQLAdapter, self).__init__(connection)
def _executeRetry(*args, **kw):
- # XXX - this should be done for all connections -jinty
try:
return mysqlconnection.MySQLConnection._executeRetry(*args, **kw)
except Exception, exc:
@@ -68,7 +100,7 @@
class SybaseAdapter(ConnectionAdapter, _sybase.builder()):
pass
- # XXX - this is probably broken, feel free to fix it - jinty
+ # XXX - this is surely broken, feel free to fix it - jinty
class PostgresAdapter(ConnectionAdapter, _postgres.builder()):
Modified: z3/sqlos/trunk/src/sqlos/connection.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/connection.py (original)
+++ z3/sqlos/trunk/src/sqlos/connection.py Tue Jan 17 21:22:24 2006
@@ -43,67 +43,19 @@
SQLObjectWarning, 2)
return
name = ut.name
+ newconn = zapi.queryUtility(IZopeDatabaseAdapter, name,
+ default=None,
+ context=context)
+ if newconn is None:
+ warnings.warn("Couldn't find a rdb connection by the "
+ "name %s. Please verify your setup." % name,
+ SQLObjectWarning, 3)
try:
- return getConnection(context, name)
+ return IZopeSQLConnection(newconn())
except ComponentLookupError:
return self
def __set__(self, inst, value):
# Ignore, so we don't get overriden.
- # We always use the connections from the
- # global thread cache.
+ # We always use the connections from the IZopeDatabaseAdapter utility.
pass
-
-# Connection cache, one per thread, a-la Transaction.
-# This code was heavily based on ZODB.Transaction
-connCache = {}
-
-try:
- import thread
-except:
- def getConnection(context, name):
- global connCache
- if not connCache.get(name):
- newconn = zapi.queryUtility(IZopeDatabaseAdapter, name,
- default=None,
- context=context)
- if newconn is None:
- warnings.warn("Couldn't find a rdb connection by the "
- "name %s. Please verify your setup." % name,
- SQLObjectWarning, 3)
- connCache[name] = IZopeSQLConnection(newconn())
- return connCache[name]
-
- def releaseConnection(name):
- global connCache
- if connCache.has_key(name):
- connCache[name].close()
- del connCache[name]
-else:
- def getConnection(context, name):
- global connCache
- tid = thread.get_ident()
- key = (tid, name)
- if not connCache.get(key):
- newconn = zapi.queryUtility(IZopeDatabaseAdapter, name,
- default=None,
- context=context)
- if newconn is None:
- warnings.warn("Couldn't find a rdb connection by the "
- "name %s. Please verify your setup." % name,
- SQLObjectWarning, 3)
- conn = IZopeSQLConnection(newconn())
- if conn.supportTransactions:
- connCache[key] = conn.transaction()
- else: # At least MySQL does not support transactions
- connCache[key] = conn
-
- return connCache[key]
-
- def releaseConnection(name):
- global connCache
- tid = thread.get_ident()
- key = (tid, name)
- if connCache.has_key(key):
- connCache[key].close()
- del connCache[key]
Modified: z3/sqlos/trunk/src/sqlos/ftests/connection.txt
==============================================================================
--- z3/sqlos/trunk/src/sqlos/ftests/connection.txt (original)
+++ z3/sqlos/trunk/src/sqlos/ftests/connection.txt Tue Jan 17 21:22:24 2006
@@ -1,21 +1,24 @@
-Here we test how the connections work with threading
+Here we test how the connection descriptor works:
- >>> from sqlos.connection import getConnection, releaseConnection
- >>> from sqlos.interfaces import IConnectionName
- >>> from zope.app import zapi
+ >>> from sqlos.connection import ConnectionDescriptor
+ >>> from sqlos.adapter import ConnectionAdapter
Get the connection name defined in sqlos:
- >>> ut = zapi.getUtility(IConnectionName)
+ >>> class Dummy:
+ ... connection = ConnectionDescriptor()
+ >>> dummy = Dummy()
-Make sure that the connections are released:
+Assert that if we get a the real connection twice, we get the same connection:
- >>> releaseConnection(ut.name)
+ >>> conn1 = dummy.connection.makeConnection()
+ >>> conn2 = dummy.connection.makeConnection()
+ >>> conn1 is conn2
+ True
-Assert that if we get a connection twice, we get the same connection:
+getConnection gives us the same result:
- >>> conn1 = getConnection(None, ut.name)
- >>> conn2 = getConnection(None, ut.name)
+ >>> conn2 = dummy.connection.getConnection()
>>> conn1 is conn2
True
@@ -23,18 +26,10 @@
>>> log = []
>>> def f():
- ... conn = getConnection(None, ut.name)
- ... log.append(conn)
+ ... log.append(dummy.connection.makeConnection())
>>> import threading
>>> thread = threading.Thread(target=f)
>>> thread.start()
>>> thread.join()
>>> log[0] is not conn1
True
-
-Releasing a connection will release it from the current thread:
-
- >>> releaseConnection(ut.name)
- >>> conn = getConnection(None, ut.name)
- >>> conn is not conn1
- True
Modified: z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/ftests/test_transaction.py Tue Jan 17 21:22:24 2006
@@ -10,10 +10,13 @@
$Id$
"""
import unittest
-from transaction import get
+from transaction import get, begin
from zope.app.testing.functional import BrowserTestCase
+from zope.app import zapi
+from zope.app.rdb.interfaces import IZopeDatabaseAdapter
+from sqlos.interfaces import IConnectionName
from sqlos.testing.sampleperson import SamplePerson
__metaclass__ = type
@@ -23,17 +26,19 @@
def setUp(self):
super(TestTransaction, self).setUp()
SamplePerson.createTable(ifNotExists=True)
- self.person = SamplePerson(fullname='Sidnei da Silva',
- username='sidnei',
- password='test')
+ person = SamplePerson(fullname='Sidnei da Silva',
+ username='sidnei',
+ password='test')
+ self.personid = person.id
# Commit what we have done
get().commit()
+ begin()
def supportTransactions(self):
return SamplePerson._connection.supportTransactions
def testChange(self):
- person = self.person
+ person = SamplePerson.get(self.personid)
self.assertEqual(person.fullname, 'Sidnei da Silva')
self.assertEqual(person.username, 'sidnei')
self.assertEqual(person.password, 'test')
@@ -48,7 +53,7 @@
self.assertEqual(person.password, 'pass')
def testCommit(self):
- person = self.person
+ person = SamplePerson.get(self.personid)
self.assertEqual(person.fullname, 'Sidnei da Silva')
self.assertEqual(person.username, 'sidnei')
self.assertEqual(person.password, 'test')
@@ -56,13 +61,14 @@
person.username = 'dreamcatcher'
person.password = 'pass'
get().commit()
- person.syncUpdate() # Let's see what is in the database
+ begin()
+ person = SamplePerson.get(self.personid)
self.assertEqual(person.fullname, 'Sidnei Silva')
self.assertEqual(person.username, 'dreamcatcher')
self.assertEqual(person.password, 'pass')
def testAbort(self):
- person = self.person
+ person = SamplePerson.get(self.personid)
self.assertEqual(person.fullname, 'Sidnei da Silva')
self.assertEqual(person.username, 'sidnei')
self.assertEqual(person.password, 'test')
@@ -71,7 +77,8 @@
person.password = 'pass'
person.sync() # Sunc to make sure that the DB is sent the statements
get().abort()
- person.syncUpdate() # Let's see what is in the database
+ begin()
+ person = SamplePerson.get(self.personid)
if self.supportTransactions():
self.assertEqual(person.fullname, 'Sidnei da Silva')
self.assertEqual(person.username, 'sidnei')
@@ -82,10 +89,79 @@
self.assertEqual(person.username, 'dreamcatcher')
self.assertEqual(person.password, 'pass')
+ def testCacheThreadIsolation(self):
+ """Tests that the changes we make in one thread don't appear in another.
+
+ This is a regression test for if the cache makes breaks the isolation
+ between threads.
+ """
+ ut = zapi.getUtility(IConnectionName)
+ adapter = zapi.queryUtility(IZopeDatabaseAdapter, ut.name)
+ if adapter.getDSN() == 'dbi://:memory:':
+ import warnings
+ warnings.warn('Warning, not testing Cache Isolation')
+ return # this test is NOT going to work against the default test
+ # database
+ # warm up the cache by getting an instance
+ person2 = SamplePerson.get(self.personid)
+ self.assertEqual(person2.fullname, 'Sidnei da Silva')
+ self.assertEqual(person2.username, 'sidnei')
+ self.assertEqual(person2.password, 'test')
+ def changePerson():
+ person1 = SamplePerson.get(self.personid)
+ person1.fullname = 'Another Person'
+ person1.username = 'notsidnei'
+ person1.password = 'nottest'
+ import threading
+ thread = threading.Thread(target=changePerson)
+ thread.start()
+ thread.join()
+ self.assertEqual(person2.fullname, 'Sidnei da Silva')
+ self.assertEqual(person2.username, 'sidnei')
+ self.assertEqual(person2.password, 'test')
+
+ def testCleanCacheOnOverTransaction(self):
+ ut = zapi.getUtility(IConnectionName)
+ adapter = zapi.queryUtility(IZopeDatabaseAdapter, ut.name)
+ if adapter.getDSN() == 'dbi://:memory:':
+ import warnings
+ warnings.warn('Warning, not testing Cache Isolation')
+ return # this test is NOT going to work against the default test
+ # database
+ # warm up the cache by getting an instance
+ person2 = SamplePerson.get(self.personid)
+ self.assertEqual(person2.fullname, 'Sidnei da Silva')
+ self.assertEqual(person2.username, 'sidnei')
+ self.assertEqual(person2.password, 'test')
+ # comit this transaction "just in case"
+ get().commit()
+ # get change and commit the person in another thread
+ def changePerson():
+ person1 = SamplePerson.get(self.personid)
+ person1.fullname = 'Another Person'
+ person1.username = 'notsidnei'
+ person1.password = 'nottest'
+ get().commit()
+ import threading
+ thread = threading.Thread(target=changePerson)
+ thread.start()
+ thread.join()
+ # start a new transaction in this thread
+ get().commit()
+ begin()
+ # If we get a new person, we should see the changed person
+ person3 = SamplePerson.get(self.personid)
+ self.assertEqual(person3.fullname, 'Another Person')
+ self.assertEqual(person3.username, 'notsidnei')
+ self.assertEqual(person3.password, 'nottest')
+
def tearDown(self):
- self.person.destroySelf()
+ person = SamplePerson.get(self.personid)
+ person.destroySelf()
super(TestTransaction, self).tearDown()
SamplePerson.dropTable()
+ get().commit()
+ begin()
def test_suite():
suite = unittest.TestSuite()
Modified: z3/sqlos/trunk/src/sqlos/testing/testdb.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/testing/testdb.py (original)
+++ z3/sqlos/trunk/src/sqlos/testing/testdb.py Tue Jan 17 21:22:24 2006
@@ -68,7 +68,7 @@
def getConnection(self):
if self.conn is None:
- self.conn = self.connectionAdapterFactory().transaction()
+ self.conn = self.connectionAdapterFactory()
return self.conn
def tearDown(self, dropTables=True):
Modified: z3/sqlos/trunk/src/sqlos/tests/test_transaction.py
==============================================================================
--- z3/sqlos/trunk/src/sqlos/tests/test_transaction.py (original)
+++ z3/sqlos/trunk/src/sqlos/tests/test_transaction.py Tue Jan 17 21:22:24 2006
@@ -2,7 +2,7 @@
import doctest
from zope.testing.doctestunit import DocTestSuite
-from transaction import get
+from transaction import get, begin
def doctest_KeepValuesOverExpireSync():
"""Regression test for a SQLObject bug in expire.
@@ -25,14 +25,20 @@
>>> person = SamplePerson(fullname='Andres', username='andres',
... password='789')
+ >>> personid = person.id
>>> get().commit()
+ >>> txn = begin()
And make sure the abort/commit cycle works:
+ >>> person = SamplePerson.get(personid)
>>> person.username = "yourname"
>>> get().abort()
+ >>> txn = begin()
+ >>> person = SamplePerson.get(personid)
>>> person.fullname = "Andres Freund"
>>> get().commit()
+ >>> txn = begin()
>>> person.username
'andres'
More information about the z3-checkins
mailing list