##############################################################################
#
# Copyright (c) 2004-1005 Michel Pelletier
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
Queries are any callable object (like a function) that query the
triple store. These are technically not agents, just agent helpers.
QueryChains are queries that chains other queries in various ways
(union, intersection).
"""
__rdf_description__ = '''\
'''
from types import UnicodeType, NoneType, TupleType
from rdflib.Identifier import Identifier
from rdflib.URIRef import URIRef
from rdflib.BNode import BNode
from rdflib.Literal import Literal
from zope.interface import implements
from interfaces import IQuery, IQueryChain, IIdentifier
from result import Result
import BTrees
from BTrees.OOBTree import OOSet as Set
class QueryException(Exception):
""" Thrown by a query to indicate a query exception."""
class BadQueryException(QueryException):
""" Throw by a query to indicate a query is malformed."""
class Query:
""" A generic query logic class. A straightforward interface to
zemantic.Zemantic.triples() with support for inner queries.
This class does simple triple pattern queries on a catalog. If an
identifier term is a nested query (3-tuple or IQuery implementor)
then it performs the inner query first, applying each of the
results of the inner query to the outer query.
When called, returns a result generator or None (no results) or
throws an exception.
>>> from zemantic import Zemantic
>>> c = Zemantic()
>>> q = Query(None, None, None)
>>> c.query(q) is not None
True
"""
implements(IQuery)
inner = False
def __init__(self, subject=None, predicate=None, object=None):
s, p, o = subject, predicate, object
ts = type(s)
if ts is UnicodeType:
if s.startswith("<"):
s = URIRef(s[1:-1])
elif s.startswith('"'):
raise ValueError, "Literals cannot be subjects."
elif s.startswith('_'):
s = BNode(s)
else:
raise ValueError, "Unicode subject term %s is not a well formed N3 identifier" % s
elif ts is TupleType or IQuery.providedBy(s):
self.inner = True
elif ts is NoneType or IIdentifier.providedBy(s):
pass # it's cool
else:
raise ValueError, "Subject term must be None unicode, IIdentifier, IQuery or 3-tuple."
tp = type(p)
if tp is UnicodeType:
if p.startswith("<"):
p = URIRef(p[1:-1])
elif p.startswith('"'):
raise ValueError, "Literals cannot be predicates."
elif p.startswith('_'):
raise ValueError, "BNodes cannot be predicates."
else:
raise ValueError, "Unicode predicate term %s is not a well formed N3 identifier" % p
elif tp is TupleType or IQuery.providedBy(p):
self.inner = True
elif tp is NoneType or IIdentifier.providedBy(p):
pass # it's cool
else:
raise ValueError, "Subject term must be None unicode, IIdentifier, IQuery or 3-tuple."
to = type(o)
if to is UnicodeType:
if o.startswith("<"):
o = URIRef(o[1:-1])
elif o.startswith('"'):
o = Literal(o[1:-1])
elif o.startswith('_'):
o = BNode(o)
else:
raise ValueError, "Unicode object term %s is not a well formed N3 identifier" % o
elif to is TupleType or IQuery.providedBy(o):
self.inner = True
elif to is NoneType or IIdentifier.providedBy(o):
pass # it's cool
else:
raise ValueError, "Object term must be None, unicode, IIdentifier, IQuery or 3-tuple."
self.s = s
self.p = p
self.o = o
def __call__(self, zemantic):
# common path, no inner queries
try:
if not self.inner:
for result in zemantic.triples((self.s, self.p, self.o)):
yield Result(result)
else:
# inner query logic here is definitely _not_ optimized, I
# imagine there is all kinds of query optimization and
# algebraic reduction that can be applied but that's not
# the point, users will have to be aware of the fact that
# broad inner queries can significantly impact
# performance.
# first, determine inner subjects
if self.s is None:
isresult = []
for s in zemantic.uniqueSubjects():
isresult.append(Result((s, None, None)))
elif type(self.s) is TupleType or IQuery.providedBy(self.s):
if type(self.s) is TupleType:
q = apply(Query, self.s)
else:
q = self.s
isresult = zemantic.query(q)
else:
isresult = [Result((self.s, None, None))]
# determine inner predicates
if self.p is None:
ipresult = []
for p in zemantic.uniquePredicates():
ipresult.append(Result((None, p, None)))
elif type(self.p) is TupleType or IQuery.providedBy(self.p):
if type(self.p) is TupleType:
q = apply(Query, self.p)
else:
q = self.p
ipresult = zemantic.query(q)
else:
ipresult = [Result((None, self.p, None))]
# determine inner objects
if self.o is None:
ioresult = []
for o in self.uniqueObjects():
ioresult.append(Result((None, None, o)))
elif type(self.o) is TupleType or IQuery.providedBy(self.o):
if type(self.o) is TupleType:
q = apply(Query, self.o)
else:
q = self.o
ioresult = zemantic.query(q)
else:
ioresult = [Result((None, None, self.o))]
# put it all together
for sresult in isresult:
for presult in ipresult:
for oresult in ioresult:
yield Result((sresult.subject, presult.predicate, oresult.object))
except KeyError:
yield None
def __repr__(self):
return "" % (self.s, self.p, self.o)
class QueryChain(Query):
""" Abstract class to combine a "chain" of queries into one result
set. QueryChains are themselves Queries, and can be nested. """
implements(IQueryChain)
_queries = None
def __init__(self, *queries):
self._queries = []
for x in queries:
self.add(x)
def add(self, query):
""" Append a query onto the chain """
self._queries.append(query)
def getQueries(self):
""" Return the chain of queries. """
return self._queries
def __call__(self, zemantic):
""" Eval a query chain with given args. Queries throw
QueryException to stop chain procesing and declare failure."""
raise NotImplementedError
class UnionChain(QueryChain):
"""
Take the union of a chain of queries.
>>> from zemantic import Zemantic
>>> c = Zemantic()
>>> u = UnionChain(c, Query())
>>> u = UnionChain(c, Query(), Query())
"""
def __call__(self, zemantic):
results = Set()
for query in self._queries:
BTrees.OOBTree.union(results, Set(query(zemantic)))
for result in results:
yield result
class IntersectionChain(QueryChain):
"""
Take the intersection of a chain of queries.
>>> from zemantic import Zemantic
>>> c = Zemantic()
>>> i = IntersectionChain(c, Query())
>>> i = IntersectionChain(c, Query(), Query())
"""
def __call__(self, zemantic):
results = Set()
for query in self._queries:
results.intersection(query(zemantic))
for result in results:
yield result