Merge branch 'new-q-objects' into v0.4
This commit is contained in:
commit
b2b4456f74
@ -7,6 +7,7 @@ import pymongo.dbref
|
|||||||
import pymongo.objectid
|
import pymongo.objectid
|
||||||
import re
|
import re
|
||||||
import copy
|
import copy
|
||||||
|
import itertools
|
||||||
|
|
||||||
__all__ = ['queryset_manager', 'Q', 'InvalidQueryError',
|
__all__ = ['queryset_manager', 'Q', 'InvalidQueryError',
|
||||||
'InvalidCollectionError']
|
'InvalidCollectionError']
|
||||||
@ -18,6 +19,7 @@ REPR_OUTPUT_SIZE = 20
|
|||||||
class DoesNotExist(Exception):
|
class DoesNotExist(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class MultipleObjectsReturned(Exception):
|
class MultipleObjectsReturned(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -29,13 +31,244 @@ class InvalidQueryError(Exception):
|
|||||||
class OperationError(Exception):
|
class OperationError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class InvalidCollectionError(Exception):
|
class InvalidCollectionError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
RE_TYPE = type(re.compile(''))
|
RE_TYPE = type(re.compile(''))
|
||||||
|
|
||||||
|
|
||||||
class Q(object):
|
class QNodeVisitor(object):
|
||||||
|
"""Base visitor class for visiting Q-object nodes in a query tree.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def visit_combination(self, combination):
|
||||||
|
"""Called by QCombination objects.
|
||||||
|
"""
|
||||||
|
return combination
|
||||||
|
|
||||||
|
def visit_query(self, query):
|
||||||
|
"""Called by (New)Q objects.
|
||||||
|
"""
|
||||||
|
return query
|
||||||
|
|
||||||
|
|
||||||
|
class SimplificationVisitor(QNodeVisitor):
|
||||||
|
"""Simplifies query trees by combinging unnecessary 'and' connection nodes
|
||||||
|
into a single Q-object.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def visit_combination(self, combination):
|
||||||
|
if combination.operation == combination.AND:
|
||||||
|
# The simplification only applies to 'simple' queries
|
||||||
|
if all(isinstance(node, Q) for node in combination.children):
|
||||||
|
queries = [node.query for node in combination.children]
|
||||||
|
return Q(**self._query_conjunction(queries))
|
||||||
|
return combination
|
||||||
|
|
||||||
|
def _query_conjunction(self, queries):
|
||||||
|
"""Merges query dicts - effectively &ing them together.
|
||||||
|
"""
|
||||||
|
query_ops = set()
|
||||||
|
combined_query = {}
|
||||||
|
for query in queries:
|
||||||
|
ops = set(query.keys())
|
||||||
|
# Make sure that the same operation isn't applied more than once
|
||||||
|
# to a single field
|
||||||
|
intersection = ops.intersection(query_ops)
|
||||||
|
if intersection:
|
||||||
|
msg = 'Duplicate query contitions: '
|
||||||
|
raise InvalidQueryError(msg + ', '.join(intersection))
|
||||||
|
|
||||||
|
query_ops.update(ops)
|
||||||
|
combined_query.update(copy.deepcopy(query))
|
||||||
|
return combined_query
|
||||||
|
|
||||||
|
|
||||||
|
class QueryTreeTransformerVisitor(QNodeVisitor):
|
||||||
|
"""Transforms the query tree in to a form that may be used with MongoDB.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def visit_combination(self, combination):
|
||||||
|
if combination.operation == combination.AND:
|
||||||
|
# MongoDB doesn't allow us to have too many $or operations in our
|
||||||
|
# queries, so the aim is to move the ORs up the tree to one
|
||||||
|
# 'master' $or. Firstly, we must find all the necessary parts (part
|
||||||
|
# of an AND combination or just standard Q object), and store them
|
||||||
|
# separately from the OR parts.
|
||||||
|
or_groups = []
|
||||||
|
and_parts = []
|
||||||
|
for node in combination.children:
|
||||||
|
if isinstance(node, QCombination):
|
||||||
|
if node.operation == node.OR:
|
||||||
|
# Any of the children in an $or component may cause
|
||||||
|
# the query to succeed
|
||||||
|
or_groups.append(node.children)
|
||||||
|
elif node.operation == node.AND:
|
||||||
|
and_parts.append(node)
|
||||||
|
elif isinstance(node, Q):
|
||||||
|
and_parts.append(node)
|
||||||
|
|
||||||
|
# Now we combine the parts into a usable query. AND together all of
|
||||||
|
# the necessary parts. Then for each $or part, create a new query
|
||||||
|
# that ANDs the necessary part with the $or part.
|
||||||
|
clauses = []
|
||||||
|
for or_group in itertools.product(*or_groups):
|
||||||
|
q_object = reduce(lambda a, b: a & b, and_parts, Q())
|
||||||
|
q_object = reduce(lambda a, b: a & b, or_group, q_object)
|
||||||
|
clauses.append(q_object)
|
||||||
|
|
||||||
|
# Finally, $or the generated clauses in to one query. Each of the
|
||||||
|
# clauses is sufficient for the query to succeed.
|
||||||
|
return reduce(lambda a, b: a | b, clauses, Q())
|
||||||
|
|
||||||
|
if combination.operation == combination.OR:
|
||||||
|
children = []
|
||||||
|
# Crush any nested ORs in to this combination as MongoDB doesn't
|
||||||
|
# support nested $or operations
|
||||||
|
for node in combination.children:
|
||||||
|
if (isinstance(node, QCombination) and
|
||||||
|
node.operation == combination.OR):
|
||||||
|
children += node.children
|
||||||
|
else:
|
||||||
|
children.append(node)
|
||||||
|
combination.children = children
|
||||||
|
|
||||||
|
return combination
|
||||||
|
|
||||||
|
|
||||||
|
class QueryCompilerVisitor(QNodeVisitor):
|
||||||
|
"""Compiles the nodes in a query tree to a PyMongo-compatible query
|
||||||
|
dictionary.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, document):
|
||||||
|
self.document = document
|
||||||
|
|
||||||
|
def visit_combination(self, combination):
|
||||||
|
if combination.operation == combination.OR:
|
||||||
|
return {'$or': combination.children}
|
||||||
|
elif combination.operation == combination.AND:
|
||||||
|
return self._mongo_query_conjunction(combination.children)
|
||||||
|
return combination
|
||||||
|
|
||||||
|
def visit_query(self, query):
|
||||||
|
return QuerySet._transform_query(self.document, **query.query)
|
||||||
|
|
||||||
|
def _mongo_query_conjunction(self, queries):
|
||||||
|
"""Merges Mongo query dicts - effectively &ing them together.
|
||||||
|
"""
|
||||||
|
combined_query = {}
|
||||||
|
for query in queries:
|
||||||
|
for field, ops in query.items():
|
||||||
|
if field not in combined_query:
|
||||||
|
combined_query[field] = ops
|
||||||
|
else:
|
||||||
|
# The field is already present in the query the only way
|
||||||
|
# we can merge is if both the existing value and the new
|
||||||
|
# value are operation dicts, reject anything else
|
||||||
|
if (not isinstance(combined_query[field], dict) or
|
||||||
|
not isinstance(ops, dict)):
|
||||||
|
message = 'Conflicting values for ' + field
|
||||||
|
raise InvalidQueryError(message)
|
||||||
|
|
||||||
|
current_ops = set(combined_query[field].keys())
|
||||||
|
new_ops = set(ops.keys())
|
||||||
|
# Make sure that the same operation isn't applied more than
|
||||||
|
# once to a single field
|
||||||
|
intersection = current_ops.intersection(new_ops)
|
||||||
|
if intersection:
|
||||||
|
msg = 'Duplicate query contitions: '
|
||||||
|
raise InvalidQueryError(msg + ', '.join(intersection))
|
||||||
|
|
||||||
|
# Right! We've got two non-overlapping dicts of operations!
|
||||||
|
combined_query[field].update(copy.deepcopy(ops))
|
||||||
|
return combined_query
|
||||||
|
|
||||||
|
|
||||||
|
class QNode(object):
|
||||||
|
"""Base class for nodes in query trees.
|
||||||
|
"""
|
||||||
|
|
||||||
|
AND = 0
|
||||||
|
OR = 1
|
||||||
|
|
||||||
|
def to_query(self, document):
|
||||||
|
query = self.accept(SimplificationVisitor())
|
||||||
|
query = query.accept(QueryTreeTransformerVisitor())
|
||||||
|
query = query.accept(QueryCompilerVisitor(document))
|
||||||
|
return query
|
||||||
|
|
||||||
|
def accept(self, visitor):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def _combine(self, other, operation):
|
||||||
|
"""Combine this node with another node into a QCombination object.
|
||||||
|
"""
|
||||||
|
if other.empty:
|
||||||
|
return self
|
||||||
|
|
||||||
|
if self.empty:
|
||||||
|
return other
|
||||||
|
|
||||||
|
return QCombination(operation, [self, other])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def empty(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __or__(self, other):
|
||||||
|
return self._combine(other, self.OR)
|
||||||
|
|
||||||
|
def __and__(self, other):
|
||||||
|
return self._combine(other, self.AND)
|
||||||
|
|
||||||
|
|
||||||
|
class QCombination(QNode):
|
||||||
|
"""Represents the combination of several conditions by a given logical
|
||||||
|
operator.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, operation, children):
|
||||||
|
self.operation = operation
|
||||||
|
self.children = []
|
||||||
|
for node in children:
|
||||||
|
# If the child is a combination of the same type, we can merge its
|
||||||
|
# children directly into this combinations children
|
||||||
|
if isinstance(node, QCombination) and node.operation == operation:
|
||||||
|
self.children += node.children
|
||||||
|
else:
|
||||||
|
self.children.append(node)
|
||||||
|
|
||||||
|
def accept(self, visitor):
|
||||||
|
for i in range(len(self.children)):
|
||||||
|
self.children[i] = self.children[i].accept(visitor)
|
||||||
|
|
||||||
|
return visitor.visit_combination(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def empty(self):
|
||||||
|
return not bool(self.children)
|
||||||
|
|
||||||
|
|
||||||
|
class Q(QNode):
|
||||||
|
"""A simple query object, used in a query tree to build up more complex
|
||||||
|
query structures.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, **query):
|
||||||
|
self.query = query
|
||||||
|
|
||||||
|
def accept(self, visitor):
|
||||||
|
return visitor.visit_query(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def empty(self):
|
||||||
|
return not bool(self.query)
|
||||||
|
|
||||||
|
|
||||||
|
class OldQ(object):
|
||||||
|
|
||||||
OR = '||'
|
OR = '||'
|
||||||
AND = '&&'
|
AND = '&&'
|
||||||
@ -162,7 +395,9 @@ class QuerySet(object):
|
|||||||
self._document = document
|
self._document = document
|
||||||
self._collection_obj = collection
|
self._collection_obj = collection
|
||||||
self._accessed_collection = False
|
self._accessed_collection = False
|
||||||
self._query = {}
|
self._mongo_query = None
|
||||||
|
self._query_obj = Q()
|
||||||
|
self._initial_query = {}
|
||||||
self._where_clause = None
|
self._where_clause = None
|
||||||
self._loaded_fields = []
|
self._loaded_fields = []
|
||||||
self._ordering = []
|
self._ordering = []
|
||||||
@ -170,11 +405,18 @@ class QuerySet(object):
|
|||||||
# If inheritance is allowed, only return instances and instances of
|
# If inheritance is allowed, only return instances and instances of
|
||||||
# subclasses of the class being used
|
# subclasses of the class being used
|
||||||
if document._meta.get('allow_inheritance'):
|
if document._meta.get('allow_inheritance'):
|
||||||
self._query = {'_types': self._document._class_name}
|
self._initial_query = {'_types': self._document._class_name}
|
||||||
self._cursor_obj = None
|
self._cursor_obj = None
|
||||||
self._limit = None
|
self._limit = None
|
||||||
self._skip = None
|
self._skip = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _query(self):
|
||||||
|
if self._mongo_query is None:
|
||||||
|
self._mongo_query = self._query_obj.to_query(self._document)
|
||||||
|
self._mongo_query.update(self._initial_query)
|
||||||
|
return self._mongo_query
|
||||||
|
|
||||||
def ensure_index(self, key_or_list, drop_dups=False, background=False,
|
def ensure_index(self, key_or_list, drop_dups=False, background=False,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
"""Ensure that the given indexes are in place.
|
"""Ensure that the given indexes are in place.
|
||||||
@ -233,10 +475,14 @@ class QuerySet(object):
|
|||||||
objects, only the last one will be used
|
objects, only the last one will be used
|
||||||
:param query: Django-style query keyword arguments
|
:param query: Django-style query keyword arguments
|
||||||
"""
|
"""
|
||||||
|
#if q_obj:
|
||||||
|
#self._where_clause = q_obj.as_js(self._document)
|
||||||
|
query = Q(**query)
|
||||||
if q_obj:
|
if q_obj:
|
||||||
self._where_clause = q_obj.as_js(self._document)
|
query &= q_obj
|
||||||
query = QuerySet._transform_query(_doc_cls=self._document, **query)
|
self._query_obj &= query
|
||||||
self._query.update(query)
|
self._mongo_query = None
|
||||||
|
self._cursor_obj = None
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def filter(self, *q_objs, **query):
|
def filter(self, *q_objs, **query):
|
||||||
@ -338,7 +584,7 @@ class QuerySet(object):
|
|||||||
"""Transform a query from Django-style format to Mongo format.
|
"""Transform a query from Django-style format to Mongo format.
|
||||||
"""
|
"""
|
||||||
operators = ['ne', 'gt', 'gte', 'lt', 'lte', 'in', 'nin', 'mod',
|
operators = ['ne', 'gt', 'gte', 'lt', 'lte', 'in', 'nin', 'mod',
|
||||||
'all', 'size', 'exists']
|
'all', 'size', 'exists', 'not']
|
||||||
geo_operators = ['within_distance', 'within_box', 'near']
|
geo_operators = ['within_distance', 'within_box', 'near']
|
||||||
match_operators = ['contains', 'icontains', 'startswith',
|
match_operators = ['contains', 'icontains', 'startswith',
|
||||||
'istartswith', 'endswith', 'iendswith',
|
'istartswith', 'endswith', 'iendswith',
|
||||||
@ -354,6 +600,11 @@ class QuerySet(object):
|
|||||||
if parts[-1] in operators + match_operators + geo_operators:
|
if parts[-1] in operators + match_operators + geo_operators:
|
||||||
op = parts.pop()
|
op = parts.pop()
|
||||||
|
|
||||||
|
negate = False
|
||||||
|
if parts[-1] == 'not':
|
||||||
|
parts.pop()
|
||||||
|
negate = True
|
||||||
|
|
||||||
if _doc_cls:
|
if _doc_cls:
|
||||||
# Switch field names to proper names [set in Field(name='foo')]
|
# Switch field names to proper names [set in Field(name='foo')]
|
||||||
fields = QuerySet._lookup_field(_doc_cls, parts)
|
fields = QuerySet._lookup_field(_doc_cls, parts)
|
||||||
@ -361,7 +612,7 @@ class QuerySet(object):
|
|||||||
|
|
||||||
# Convert value to proper value
|
# Convert value to proper value
|
||||||
field = fields[-1]
|
field = fields[-1]
|
||||||
singular_ops = [None, 'ne', 'gt', 'gte', 'lt', 'lte']
|
singular_ops = [None, 'ne', 'gt', 'gte', 'lt', 'lte', 'not']
|
||||||
singular_ops += match_operators
|
singular_ops += match_operators
|
||||||
if op in singular_ops:
|
if op in singular_ops:
|
||||||
value = field.prepare_query_value(op, value)
|
value = field.prepare_query_value(op, value)
|
||||||
@ -386,6 +637,9 @@ class QuerySet(object):
|
|||||||
"been implemented" % op)
|
"been implemented" % op)
|
||||||
elif op not in match_operators:
|
elif op not in match_operators:
|
||||||
value = {'$' + op: value}
|
value = {'$' + op: value}
|
||||||
|
|
||||||
|
if negate:
|
||||||
|
value = {'$not': value}
|
||||||
|
|
||||||
for i, part in indices:
|
for i, part in indices:
|
||||||
parts.insert(i, part)
|
parts.insert(i, part)
|
||||||
|
@ -53,9 +53,6 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
person2 = self.Person(name="User B", age=30)
|
person2 = self.Person(name="User B", age=30)
|
||||||
person2.save()
|
person2.save()
|
||||||
|
|
||||||
q1 = Q(name='test')
|
|
||||||
q2 = Q(age__gte=18)
|
|
||||||
|
|
||||||
# Find all people in the collection
|
# Find all people in the collection
|
||||||
people = self.Person.objects
|
people = self.Person.objects
|
||||||
self.assertEqual(len(people), 2)
|
self.assertEqual(len(people), 2)
|
||||||
@ -156,7 +153,8 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
|
|
||||||
# Retrieve the first person from the database
|
# Retrieve the first person from the database
|
||||||
self.assertRaises(MultipleObjectsReturned, self.Person.objects.get)
|
self.assertRaises(MultipleObjectsReturned, self.Person.objects.get)
|
||||||
self.assertRaises(self.Person.MultipleObjectsReturned, self.Person.objects.get)
|
self.assertRaises(self.Person.MultipleObjectsReturned,
|
||||||
|
self.Person.objects.get)
|
||||||
|
|
||||||
# Use a query to filter the people found to just person2
|
# Use a query to filter the people found to just person2
|
||||||
person = self.Person.objects.get(age=30)
|
person = self.Person.objects.get(age=30)
|
||||||
@ -234,7 +232,8 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
self.assertEqual(created, False)
|
self.assertEqual(created, False)
|
||||||
|
|
||||||
# Try retrieving when no objects exists - new doc should be created
|
# Try retrieving when no objects exists - new doc should be created
|
||||||
person, created = self.Person.objects.get_or_create(age=50, defaults={'name': 'User C'})
|
kwargs = dict(age=50, defaults={'name': 'User C'})
|
||||||
|
person, created = self.Person.objects.get_or_create(**kwargs)
|
||||||
self.assertEqual(created, True)
|
self.assertEqual(created, True)
|
||||||
|
|
||||||
person = self.Person.objects.get(age=50)
|
person = self.Person.objects.get(age=50)
|
||||||
@ -337,6 +336,18 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
obj = self.Person.objects(Q(name__icontains='[.\'Geek')).first()
|
obj = self.Person.objects(Q(name__icontains='[.\'Geek')).first()
|
||||||
self.assertEqual(obj, person)
|
self.assertEqual(obj, person)
|
||||||
|
|
||||||
|
def test_not(self):
|
||||||
|
"""Ensure that the __not operator works as expected.
|
||||||
|
"""
|
||||||
|
alice = self.Person(name='Alice', age=25)
|
||||||
|
alice.save()
|
||||||
|
|
||||||
|
obj = self.Person.objects(name__iexact='alice').first()
|
||||||
|
self.assertEqual(obj, alice)
|
||||||
|
|
||||||
|
obj = self.Person.objects(name__not__iexact='alice').first()
|
||||||
|
self.assertEqual(obj, None)
|
||||||
|
|
||||||
def test_filter_chaining(self):
|
def test_filter_chaining(self):
|
||||||
"""Ensure filters can be chained together.
|
"""Ensure filters can be chained together.
|
||||||
"""
|
"""
|
||||||
@ -546,9 +557,10 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first()
|
obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first()
|
||||||
self.assertEqual(obj, person)
|
self.assertEqual(obj, person)
|
||||||
|
|
||||||
obj = self.Person.objects(Q(name__ne=re.compile('^bob'))).first()
|
obj = self.Person.objects(Q(name__not=re.compile('^bob'))).first()
|
||||||
self.assertEqual(obj, person)
|
self.assertEqual(obj, person)
|
||||||
obj = self.Person.objects(Q(name__ne=re.compile('^Gui'))).first()
|
|
||||||
|
obj = self.Person.objects(Q(name__not=re.compile('^Gui'))).first()
|
||||||
self.assertEqual(obj, None)
|
self.assertEqual(obj, None)
|
||||||
|
|
||||||
def test_q_lists(self):
|
def test_q_lists(self):
|
||||||
@ -1346,43 +1358,6 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
|
|
||||||
class QTest(unittest.TestCase):
|
class QTest(unittest.TestCase):
|
||||||
|
|
||||||
def test_or_and(self):
|
|
||||||
"""Ensure that Q objects may be combined correctly.
|
|
||||||
"""
|
|
||||||
q1 = Q(name='test')
|
|
||||||
q2 = Q(age__gte=18)
|
|
||||||
|
|
||||||
query = ['(', {'name': 'test'}, '||', {'age__gte': 18}, ')']
|
|
||||||
self.assertEqual((q1 | q2).query, query)
|
|
||||||
|
|
||||||
query = ['(', {'name': 'test'}, '&&', {'age__gte': 18}, ')']
|
|
||||||
self.assertEqual((q1 & q2).query, query)
|
|
||||||
|
|
||||||
query = ['(', '(', {'name': 'test'}, '&&', {'age__gte': 18}, ')', '||',
|
|
||||||
{'name': 'example'}, ')']
|
|
||||||
self.assertEqual((q1 & q2 | Q(name='example')).query, query)
|
|
||||||
|
|
||||||
def test_item_query_as_js(self):
|
|
||||||
"""Ensure that the _item_query_as_js utilitiy method works properly.
|
|
||||||
"""
|
|
||||||
q = Q()
|
|
||||||
examples = [
|
|
||||||
|
|
||||||
({'name': 'test'}, ('((this.name instanceof Array) && '
|
|
||||||
'this.name.indexOf(i0f0) != -1) || this.name == i0f0'),
|
|
||||||
{'i0f0': 'test'}),
|
|
||||||
({'age': {'$gt': 18}}, 'this.age > i0f0o0', {'i0f0o0': 18}),
|
|
||||||
({'name': 'test', 'age': {'$gt': 18, '$lte': 65}},
|
|
||||||
('this.age <= i0f0o0 && this.age > i0f0o1 && '
|
|
||||||
'((this.name instanceof Array) && '
|
|
||||||
'this.name.indexOf(i0f1) != -1) || this.name == i0f1'),
|
|
||||||
{'i0f0o0': 65, 'i0f0o1': 18, 'i0f1': 'test'}),
|
|
||||||
]
|
|
||||||
for item, js, scope in examples:
|
|
||||||
test_scope = {}
|
|
||||||
self.assertEqual(q._item_query_as_js(item, test_scope, 0), js)
|
|
||||||
self.assertEqual(scope, test_scope)
|
|
||||||
|
|
||||||
def test_empty_q(self):
|
def test_empty_q(self):
|
||||||
"""Ensure that empty Q objects won't hurt.
|
"""Ensure that empty Q objects won't hurt.
|
||||||
"""
|
"""
|
||||||
@ -1392,11 +1367,15 @@ class QTest(unittest.TestCase):
|
|||||||
q4 = Q(name='test')
|
q4 = Q(name='test')
|
||||||
q5 = Q()
|
q5 = Q()
|
||||||
|
|
||||||
query = ['(', {'age__gte': 18}, '||', {'name': 'test'}, ')']
|
class Person(Document):
|
||||||
self.assertEqual((q1 | q2 | q3 | q4 | q5).query, query)
|
name = StringField()
|
||||||
|
age = IntField()
|
||||||
|
|
||||||
query = ['(', {'age__gte': 18}, '&&', {'name': 'test'}, ')']
|
query = {'$or': [{'age': {'$gte': 18}}, {'name': 'test'}]}
|
||||||
self.assertEqual((q1 & q2 & q3 & q4 & q5).query, query)
|
self.assertEqual((q1 | q2 | q3 | q4 | q5).to_query(Person), query)
|
||||||
|
|
||||||
|
query = {'age': {'$gte': 18}, 'name': 'test'}
|
||||||
|
self.assertEqual((q1 & q2 & q3 & q4 & q5).to_query(Person), query)
|
||||||
|
|
||||||
def test_q_with_dbref(self):
|
def test_q_with_dbref(self):
|
||||||
"""Ensure Q objects handle DBRefs correctly"""
|
"""Ensure Q objects handle DBRefs correctly"""
|
||||||
@ -1415,5 +1394,107 @@ class QTest(unittest.TestCase):
|
|||||||
self.assertEqual(Post.objects.filter(Q(created_user=user)).count(), 1)
|
self.assertEqual(Post.objects.filter(Q(created_user=user)).count(), 1)
|
||||||
|
|
||||||
|
|
||||||
|
class NewQTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_and_combination(self):
|
||||||
|
"""Ensure that Q-objects correctly AND together.
|
||||||
|
"""
|
||||||
|
class TestDoc(Document):
|
||||||
|
x = IntField()
|
||||||
|
y = StringField()
|
||||||
|
|
||||||
|
# Check than an error is raised when conflicting queries are anded
|
||||||
|
def invalid_combination():
|
||||||
|
query = Q(x__lt=7) & Q(x__lt=3)
|
||||||
|
query.to_query(TestDoc)
|
||||||
|
self.assertRaises(InvalidQueryError, invalid_combination)
|
||||||
|
|
||||||
|
# Check normal cases work without an error
|
||||||
|
query = Q(x__lt=7) & Q(x__gt=3)
|
||||||
|
|
||||||
|
q1 = Q(x__lt=7)
|
||||||
|
q2 = Q(x__gt=3)
|
||||||
|
query = (q1 & q2).to_query(TestDoc)
|
||||||
|
self.assertEqual(query, {'x': {'$lt': 7, '$gt': 3}})
|
||||||
|
|
||||||
|
# More complex nested example
|
||||||
|
query = Q(x__lt=100) & Q(y__ne='NotMyString')
|
||||||
|
query &= Q(y__in=['a', 'b', 'c']) & Q(x__gt=-100)
|
||||||
|
mongo_query = {
|
||||||
|
'x': {'$lt': 100, '$gt': -100},
|
||||||
|
'y': {'$ne': 'NotMyString', '$in': ['a', 'b', 'c']},
|
||||||
|
}
|
||||||
|
self.assertEqual(query.to_query(TestDoc), mongo_query)
|
||||||
|
|
||||||
|
def test_or_combination(self):
|
||||||
|
"""Ensure that Q-objects correctly OR together.
|
||||||
|
"""
|
||||||
|
class TestDoc(Document):
|
||||||
|
x = IntField()
|
||||||
|
|
||||||
|
q1 = Q(x__lt=3)
|
||||||
|
q2 = Q(x__gt=7)
|
||||||
|
query = (q1 | q2).to_query(TestDoc)
|
||||||
|
self.assertEqual(query, {
|
||||||
|
'$or': [
|
||||||
|
{'x': {'$lt': 3}},
|
||||||
|
{'x': {'$gt': 7}},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
|
||||||
|
def test_and_or_combination(self):
|
||||||
|
"""Ensure that Q-objects handle ANDing ORed components.
|
||||||
|
"""
|
||||||
|
class TestDoc(Document):
|
||||||
|
x = IntField()
|
||||||
|
y = BooleanField()
|
||||||
|
|
||||||
|
query = (Q(x__gt=0) | Q(x__exists=False))
|
||||||
|
query &= Q(x__lt=100)
|
||||||
|
self.assertEqual(query.to_query(TestDoc), {
|
||||||
|
'$or': [
|
||||||
|
{'x': {'$lt': 100, '$gt': 0}},
|
||||||
|
{'x': {'$lt': 100, '$exists': False}},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
|
||||||
|
q1 = (Q(x__gt=0) | Q(x__exists=False))
|
||||||
|
q2 = (Q(x__lt=100) | Q(y=True))
|
||||||
|
query = (q1 & q2).to_query(TestDoc)
|
||||||
|
|
||||||
|
self.assertEqual(['$or'], query.keys())
|
||||||
|
conditions = [
|
||||||
|
{'x': {'$lt': 100, '$gt': 0}},
|
||||||
|
{'x': {'$lt': 100, '$exists': False}},
|
||||||
|
{'x': {'$gt': 0}, 'y': True},
|
||||||
|
{'x': {'$exists': False}, 'y': True},
|
||||||
|
]
|
||||||
|
self.assertEqual(len(conditions), len(query['$or']))
|
||||||
|
for condition in conditions:
|
||||||
|
self.assertTrue(condition in query['$or'])
|
||||||
|
|
||||||
|
def test_or_and_or_combination(self):
|
||||||
|
"""Ensure that Q-objects handle ORing ANDed ORed components. :)
|
||||||
|
"""
|
||||||
|
class TestDoc(Document):
|
||||||
|
x = IntField()
|
||||||
|
y = BooleanField()
|
||||||
|
|
||||||
|
q1 = (Q(x__gt=0) & (Q(y=True) | Q(y__exists=False)))
|
||||||
|
q2 = (Q(x__lt=100) & (Q(y=False) | Q(y__exists=False)))
|
||||||
|
query = (q1 | q2).to_query(TestDoc)
|
||||||
|
|
||||||
|
self.assertEqual(['$or'], query.keys())
|
||||||
|
conditions = [
|
||||||
|
{'x': {'$gt': 0}, 'y': True},
|
||||||
|
{'x': {'$gt': 0}, 'y': {'$exists': False}},
|
||||||
|
{'x': {'$lt': 100}, 'y':False},
|
||||||
|
{'x': {'$lt': 100}, 'y': {'$exists': False}},
|
||||||
|
]
|
||||||
|
self.assertEqual(len(conditions), len(query['$or']))
|
||||||
|
for condition in conditions:
|
||||||
|
self.assertTrue(condition in query['$or'])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user