mongoengine/tests/queryset/test_visitor.py
2019-10-31 22:59:49 +01:00

380 lines
11 KiB
Python

import datetime
import re
import unittest
from bson import ObjectId
from mongoengine import *
from mongoengine.errors import InvalidQueryError
from mongoengine.queryset import Q
import pytest
class TestQ(unittest.TestCase):
def setUp(self):
connect(db="mongoenginetest")
class Person(Document):
name = StringField()
age = IntField()
meta = {"allow_inheritance": True}
Person.drop_collection()
self.Person = Person
def test_empty_q(self):
"""Ensure that empty Q objects won't hurt.
"""
q1 = Q()
q2 = Q(age__gte=18)
q3 = Q()
q4 = Q(name="test")
q5 = Q()
class Person(Document):
name = StringField()
age = IntField()
query = {"$or": [{"age": {"$gte": 18}}, {"name": "test"}]}
assert (q1 | q2 | q3 | q4 | q5).to_query(Person) == query
query = {"age": {"$gte": 18}, "name": "test"}
assert (q1 & q2 & q3 & q4 & q5).to_query(Person) == query
def test_q_with_dbref(self):
"""Ensure Q objects handle DBRefs correctly"""
connect(db="mongoenginetest")
class User(Document):
pass
class Post(Document):
created_user = ReferenceField(User)
user = User.objects.create()
Post.objects.create(created_user=user)
assert Post.objects.filter(created_user=user).count() == 1
assert Post.objects.filter(Q(created_user=user)).count() == 1
def test_and_combination(self):
"""Ensure that Q-objects correctly AND together.
"""
class TestDoc(Document):
x = IntField()
y = StringField()
query = (Q(x__lt=7) & Q(x__lt=3)).to_query(TestDoc)
assert query == {"$and": [{"x": {"$lt": 7}}, {"x": {"$lt": 3}}]}
query = (Q(y="a") & Q(x__lt=7) & Q(x__lt=3)).to_query(TestDoc)
assert query == {"$and": [{"y": "a"}, {"x": {"$lt": 7}}, {"x": {"$lt": 3}}]}
# Check normal cases work without an error
query = Q(x__lt=7) & Q(x__gt=3)
q1 = Q(x__lt=7)
q2 = Q(x__gt=3)
query = (q1 & q2).to_query(TestDoc)
assert query == {"x": {"$lt": 7, "$gt": 3}}
# More complex nested example
query = Q(x__lt=100) & Q(y__ne="NotMyString")
query &= Q(y__in=["a", "b", "c"]) & Q(x__gt=-100)
mongo_query = {
"x": {"$lt": 100, "$gt": -100},
"y": {"$ne": "NotMyString", "$in": ["a", "b", "c"]},
}
assert query.to_query(TestDoc) == mongo_query
def test_or_combination(self):
"""Ensure that Q-objects correctly OR together.
"""
class TestDoc(Document):
x = IntField()
q1 = Q(x__lt=3)
q2 = Q(x__gt=7)
query = (q1 | q2).to_query(TestDoc)
assert query == {"$or": [{"x": {"$lt": 3}}, {"x": {"$gt": 7}}]}
def test_and_or_combination(self):
"""Ensure that Q-objects handle ANDing ORed components.
"""
class TestDoc(Document):
x = IntField()
y = BooleanField()
TestDoc.drop_collection()
query = Q(x__gt=0) | Q(x__exists=False)
query &= Q(x__lt=100)
assert query.to_query(TestDoc) == {
"$and": [
{"$or": [{"x": {"$gt": 0}}, {"x": {"$exists": False}}]},
{"x": {"$lt": 100}},
]
}
q1 = Q(x__gt=0) | Q(x__exists=False)
q2 = Q(x__lt=100) | Q(y=True)
query = (q1 & q2).to_query(TestDoc)
TestDoc(x=101).save()
TestDoc(x=10).save()
TestDoc(y=True).save()
assert query == {
"$and": [
{"$or": [{"x": {"$gt": 0}}, {"x": {"$exists": False}}]},
{"$or": [{"x": {"$lt": 100}}, {"y": True}]},
]
}
assert 2 == TestDoc.objects(q1 & q2).count()
def test_or_and_or_combination(self):
"""Ensure that Q-objects handle ORing ANDed ORed components. :)
"""
class TestDoc(Document):
x = IntField()
y = BooleanField()
TestDoc.drop_collection()
TestDoc(x=-1, y=True).save()
TestDoc(x=101, y=True).save()
TestDoc(x=99, y=False).save()
TestDoc(x=101, y=False).save()
q1 = Q(x__gt=0) & (Q(y=True) | Q(y__exists=False))
q2 = Q(x__lt=100) & (Q(y=False) | Q(y__exists=False))
query = (q1 | q2).to_query(TestDoc)
assert query == {
"$or": [
{
"$and": [
{"x": {"$gt": 0}},
{"$or": [{"y": True}, {"y": {"$exists": False}}]},
]
},
{
"$and": [
{"x": {"$lt": 100}},
{"$or": [{"y": False}, {"y": {"$exists": False}}]},
]
},
]
}
assert 2 == TestDoc.objects(q1 | q2).count()
def test_multiple_occurence_in_field(self):
class Test(Document):
name = StringField(max_length=40)
title = StringField(max_length=40)
q1 = Q(name__contains="te") | Q(title__contains="te")
q2 = Q(name__contains="12") | Q(title__contains="12")
q3 = q1 & q2
query = q3.to_query(Test)
assert query["$and"][0] == q1.to_query(Test)
assert query["$and"][1] == q2.to_query(Test)
def test_q_clone(self):
class TestDoc(Document):
x = IntField()
TestDoc.drop_collection()
for i in range(1, 101):
t = TestDoc(x=i)
t.save()
# Check normal cases work without an error
test = TestDoc.objects(Q(x__lt=7) & Q(x__gt=3))
assert test.count() == 3
test2 = test.clone()
assert test2.count() == 3
assert test2 != test
test3 = test2.filter(x=6)
assert test3.count() == 1
assert test.count() == 3
def test_q(self):
"""Ensure that Q objects may be used to query for documents.
"""
class BlogPost(Document):
title = StringField()
publish_date = DateTimeField()
published = BooleanField()
BlogPost.drop_collection()
post1 = BlogPost(
title="Test 1", publish_date=datetime.datetime(2010, 1, 8), published=False
)
post1.save()
post2 = BlogPost(
title="Test 2", publish_date=datetime.datetime(2010, 1, 15), published=True
)
post2.save()
post3 = BlogPost(title="Test 3", published=True)
post3.save()
post4 = BlogPost(title="Test 4", publish_date=datetime.datetime(2010, 1, 8))
post4.save()
post5 = BlogPost(title="Test 1", publish_date=datetime.datetime(2010, 1, 15))
post5.save()
post6 = BlogPost(title="Test 1", published=False)
post6.save()
# Check ObjectId lookup works
obj = BlogPost.objects(id=post1.id).first()
assert obj == post1
# Check Q object combination with one does not exist
q = BlogPost.objects(Q(title="Test 5") | Q(published=True))
posts = [post.id for post in q]
published_posts = (post2, post3)
assert all(obj.id in posts for obj in published_posts)
q = BlogPost.objects(Q(title="Test 1") | Q(published=True))
posts = [post.id for post in q]
published_posts = (post1, post2, post3, post5, post6)
assert all(obj.id in posts for obj in published_posts)
# Check Q object combination
date = datetime.datetime(2010, 1, 10)
q = BlogPost.objects(Q(publish_date__lte=date) | Q(published=True))
posts = [post.id for post in q]
published_posts = (post1, post2, post3, post4)
assert all(obj.id in posts for obj in published_posts)
assert not any(obj.id in posts for obj in [post5, post6])
BlogPost.drop_collection()
# Check the 'in' operator
self.Person(name="user1", age=20).save()
self.Person(name="user2", age=20).save()
self.Person(name="user3", age=30).save()
self.Person(name="user4", age=40).save()
assert self.Person.objects(Q(age__in=[20])).count() == 2
assert self.Person.objects(Q(age__in=[20, 30])).count() == 3
# Test invalid query objs
with pytest.raises(InvalidQueryError):
self.Person.objects("user1")
# filter should fail, too
with pytest.raises(InvalidQueryError):
self.Person.objects.filter("user1")
def test_q_regex(self):
"""Ensure that Q objects can be queried using regexes.
"""
person = self.Person(name="Guido van Rossum")
person.save()
obj = self.Person.objects(Q(name=re.compile("^Gui"))).first()
assert obj == person
obj = self.Person.objects(Q(name=re.compile("^gui"))).first()
assert obj == None
obj = self.Person.objects(Q(name=re.compile("^gui", re.I))).first()
assert obj == person
obj = self.Person.objects(Q(name__not=re.compile("^bob"))).first()
assert obj == person
obj = self.Person.objects(Q(name__not=re.compile("^Gui"))).first()
assert obj == None
def test_q_repr(self):
assert repr(Q()) == "Q(**{})"
assert repr(Q(name="test")) == "Q(**{'name': 'test'})"
assert (
repr(Q(name="test") & Q(age__gte=18))
== "(Q(**{'name': 'test'}) & Q(**{'age__gte': 18}))"
)
assert (
repr(Q(name="test") | Q(age__gte=18))
== "(Q(**{'name': 'test'}) | Q(**{'age__gte': 18}))"
)
def test_q_lists(self):
"""Ensure that Q objects query ListFields correctly.
"""
class BlogPost(Document):
tags = ListField(StringField())
BlogPost.drop_collection()
BlogPost(tags=["python", "mongo"]).save()
BlogPost(tags=["python"]).save()
assert BlogPost.objects(Q(tags="mongo")).count() == 1
assert BlogPost.objects(Q(tags="python")).count() == 2
BlogPost.drop_collection()
def test_q_merge_queries_edge_case(self):
class User(Document):
email = EmailField(required=False)
name = StringField()
User.drop_collection()
pk = ObjectId()
User(email="example@example.com", pk=pk).save()
assert (
1
== User.objects.filter(Q(email="example@example.com") | Q(name="John Doe"))
.limit(2)
.filter(pk=pk)
.count()
)
def test_chained_q_or_filtering(self):
class Post(EmbeddedDocument):
name = StringField(required=True)
class Item(Document):
postables = ListField(EmbeddedDocumentField(Post))
Item.drop_collection()
Item(postables=[Post(name="a"), Post(name="b")]).save()
Item(postables=[Post(name="a"), Post(name="c")]).save()
Item(postables=[Post(name="a"), Post(name="b"), Post(name="c")]).save()
assert (
Item.objects(Q(postables__name="a") & Q(postables__name="b")).count() == 2
)
assert (
Item.objects.filter(postables__name="a").filter(postables__name="b").count()
== 2
)
if __name__ == "__main__":
unittest.main()