############################################################################## # # Copyright (c) 2004-1005 Michel Pelletier # All Rights Reserved. # # This software is subject to the provisions of the Zope Public License, # Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS # FOR A PARTICULAR PURPOSE. # ############################################################################## """ Queries are any callable object (like a function) that query the triple store. These are technically not agents, just agent helpers. QueryChains are queries that chains other queries in various ways (union, intersection). """ __rdf_description__ = '''\ ''' from types import UnicodeType, NoneType, TupleType from rdflib.Identifier import Identifier from rdflib.URIRef import URIRef from rdflib.BNode import BNode from rdflib.Literal import Literal from zope.interface import implements from interfaces import IQuery, IQueryChain, IIdentifier from result import Result import BTrees from BTrees.OOBTree import OOSet as Set class QueryException(Exception): """ Thrown by a query to indicate a query exception.""" class BadQueryException(QueryException): """ Throw by a query to indicate a query is malformed.""" class Query: """ A generic query logic class. A straightforward interface to zemantic.Zemantic.triples() with support for inner queries. This class does simple triple pattern queries on a catalog. If an identifier term is a nested query (3-tuple or IQuery implementor) then it performs the inner query first, applying each of the results of the inner query to the outer query. When called, returns a result generator or None (no results) or throws an exception. >>> from zemantic import Zemantic >>> c = Zemantic() >>> q = Query(None, None, None) >>> c.query(q) is not None True """ implements(IQuery) inner = False def __init__(self, subject=None, predicate=None, object=None): s, p, o = subject, predicate, object ts = type(s) if ts is UnicodeType: if s.startswith("<"): s = URIRef(s[1:-1]) elif s.startswith('"'): raise ValueError, "Literals cannot be subjects." elif s.startswith('_'): s = BNode(s) else: raise ValueError, "Unicode subject term %s is not a well formed N3 identifier" % s elif ts is TupleType or IQuery.providedBy(s): self.inner = True elif ts is NoneType or IIdentifier.providedBy(s): pass # it's cool else: raise ValueError, "Subject term must be None unicode, IIdentifier, IQuery or 3-tuple." tp = type(p) if tp is UnicodeType: if p.startswith("<"): p = URIRef(p[1:-1]) elif p.startswith('"'): raise ValueError, "Literals cannot be predicates." elif p.startswith('_'): raise ValueError, "BNodes cannot be predicates." else: raise ValueError, "Unicode predicate term %s is not a well formed N3 identifier" % p elif tp is TupleType or IQuery.providedBy(p): self.inner = True elif tp is NoneType or IIdentifier.providedBy(p): pass # it's cool else: raise ValueError, "Subject term must be None unicode, IIdentifier, IQuery or 3-tuple." to = type(o) if to is UnicodeType: if o.startswith("<"): o = URIRef(o[1:-1]) elif o.startswith('"'): o = Literal(o[1:-1]) elif o.startswith('_'): o = BNode(o) else: raise ValueError, "Unicode object term %s is not a well formed N3 identifier" % o elif to is TupleType or IQuery.providedBy(o): self.inner = True elif to is NoneType or IIdentifier.providedBy(o): pass # it's cool else: raise ValueError, "Object term must be None, unicode, IIdentifier, IQuery or 3-tuple." self.s = s self.p = p self.o = o def __call__(self, zemantic): # common path, no inner queries try: if not self.inner: for result in zemantic.triples((self.s, self.p, self.o)): yield Result(result) else: # inner query logic here is definitely _not_ optimized, I # imagine there is all kinds of query optimization and # algebraic reduction that can be applied but that's not # the point, users will have to be aware of the fact that # broad inner queries can significantly impact # performance. # first, determine inner subjects if self.s is None: isresult = [] for s in zemantic.uniqueSubjects(): isresult.append(Result((s, None, None))) elif type(self.s) is TupleType or IQuery.providedBy(self.s): if type(self.s) is TupleType: q = apply(Query, self.s) else: q = self.s isresult = zemantic.query(q) else: isresult = [Result((self.s, None, None))] # determine inner predicates if self.p is None: ipresult = [] for p in zemantic.uniquePredicates(): ipresult.append(Result((None, p, None))) elif type(self.p) is TupleType or IQuery.providedBy(self.p): if type(self.p) is TupleType: q = apply(Query, self.p) else: q = self.p ipresult = zemantic.query(q) else: ipresult = [Result((None, self.p, None))] # determine inner objects if self.o is None: ioresult = [] for o in self.uniqueObjects(): ioresult.append(Result((None, None, o))) elif type(self.o) is TupleType or IQuery.providedBy(self.o): if type(self.o) is TupleType: q = apply(Query, self.o) else: q = self.o ioresult = zemantic.query(q) else: ioresult = [Result((None, None, self.o))] # put it all together for sresult in isresult: for presult in ipresult: for oresult in ioresult: yield Result((sresult.subject, presult.predicate, oresult.object)) except KeyError: yield None def __repr__(self): return "" % (self.s, self.p, self.o) class QueryChain(Query): """ Abstract class to combine a "chain" of queries into one result set. QueryChains are themselves Queries, and can be nested. """ implements(IQueryChain) _queries = None def __init__(self, *queries): self._queries = [] for x in queries: self.add(x) def add(self, query): """ Append a query onto the chain """ self._queries.append(query) def getQueries(self): """ Return the chain of queries. """ return self._queries def __call__(self, zemantic): """ Eval a query chain with given args. Queries throw QueryException to stop chain procesing and declare failure.""" raise NotImplementedError class UnionChain(QueryChain): """ Take the union of a chain of queries. >>> from zemantic import Zemantic >>> c = Zemantic() >>> u = UnionChain(c, Query()) >>> u = UnionChain(c, Query(), Query()) """ def __call__(self, zemantic): results = Set() for query in self._queries: BTrees.OOBTree.union(results, Set(query(zemantic))) for result in results: yield result class IntersectionChain(QueryChain): """ Take the intersection of a chain of queries. >>> from zemantic import Zemantic >>> c = Zemantic() >>> i = IntersectionChain(c, Query()) >>> i = IntersectionChain(c, Query(), Query()) """ def __call__(self, zemantic): results = Set() for query in self._queries: results.intersection(query(zemantic)) for result in results: yield result