424 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			424 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import datetime
 | |
| import re
 | |
| import unittest
 | |
| 
 | |
| from bson import ObjectId
 | |
| import pytest
 | |
| 
 | |
| from mongoengine import *
 | |
| from mongoengine.errors import InvalidQueryError
 | |
| from mongoengine.queryset import Q
 | |
| 
 | |
| 
 | |
| class TestQ(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         connect(db="mongoenginetest")
 | |
| 
 | |
|         class Person(Document):
 | |
|             name = StringField()
 | |
|             age = IntField()
 | |
|             meta = {"allow_inheritance": True}
 | |
| 
 | |
|         Person.drop_collection()
 | |
|         self.Person = Person
 | |
| 
 | |
|     def test_empty_q(self):
 | |
|         """Ensure that empty Q objects won't hurt.
 | |
|         """
 | |
|         q1 = Q()
 | |
|         q2 = Q(age__gte=18)
 | |
|         q3 = Q()
 | |
|         q4 = Q(name="test")
 | |
|         q5 = Q()
 | |
| 
 | |
|         class Person(Document):
 | |
|             name = StringField()
 | |
|             age = IntField()
 | |
| 
 | |
|         query = {"$or": [{"age": {"$gte": 18}}, {"name": "test"}]}
 | |
|         assert (q1 | q2 | q3 | q4 | q5).to_query(Person) == query
 | |
| 
 | |
|         query = {"age": {"$gte": 18}, "name": "test"}
 | |
|         assert (q1 & q2 & q3 & q4 & q5).to_query(Person) == query
 | |
| 
 | |
|     def test_q_with_dbref(self):
 | |
|         """Ensure Q objects handle DBRefs correctly"""
 | |
|         connect(db="mongoenginetest")
 | |
| 
 | |
|         class User(Document):
 | |
|             pass
 | |
| 
 | |
|         class Post(Document):
 | |
|             created_user = ReferenceField(User)
 | |
| 
 | |
|         user = User.objects.create()
 | |
|         Post.objects.create(created_user=user)
 | |
| 
 | |
|         assert Post.objects.filter(created_user=user).count() == 1
 | |
|         assert Post.objects.filter(Q(created_user=user)).count() == 1
 | |
| 
 | |
|     def test_and_combination(self):
 | |
|         """Ensure that Q-objects correctly AND together.
 | |
|         """
 | |
| 
 | |
|         class TestDoc(Document):
 | |
|             x = IntField()
 | |
|             y = StringField()
 | |
| 
 | |
|         query = (Q(x__lt=7) & Q(x__lt=3)).to_query(TestDoc)
 | |
|         assert query == {"$and": [{"x": {"$lt": 7}}, {"x": {"$lt": 3}}]}
 | |
| 
 | |
|         query = (Q(y="a") & Q(x__lt=7) & Q(x__lt=3)).to_query(TestDoc)
 | |
|         assert query == {"$and": [{"y": "a"}, {"x": {"$lt": 7}}, {"x": {"$lt": 3}}]}
 | |
| 
 | |
|         # Check normal cases work without an error
 | |
|         query = Q(x__lt=7) & Q(x__gt=3)
 | |
| 
 | |
|         q1 = Q(x__lt=7)
 | |
|         q2 = Q(x__gt=3)
 | |
|         query = (q1 & q2).to_query(TestDoc)
 | |
|         assert query == {"x": {"$lt": 7, "$gt": 3}}
 | |
| 
 | |
|         # More complex nested example
 | |
|         query = Q(x__lt=100) & Q(y__ne="NotMyString")
 | |
|         query &= Q(y__in=["a", "b", "c"]) & Q(x__gt=-100)
 | |
|         mongo_query = {
 | |
|             "x": {"$lt": 100, "$gt": -100},
 | |
|             "y": {"$ne": "NotMyString", "$in": ["a", "b", "c"]},
 | |
|         }
 | |
|         assert query.to_query(TestDoc) == mongo_query
 | |
| 
 | |
|     def test_or_combination(self):
 | |
|         """Ensure that Q-objects correctly OR together.
 | |
|         """
 | |
| 
 | |
|         class TestDoc(Document):
 | |
|             x = IntField()
 | |
| 
 | |
|         q1 = Q(x__lt=3)
 | |
|         q2 = Q(x__gt=7)
 | |
|         query = (q1 | q2).to_query(TestDoc)
 | |
|         assert query == {"$or": [{"x": {"$lt": 3}}, {"x": {"$gt": 7}}]}
 | |
| 
 | |
|     def test_and_or_combination(self):
 | |
|         """Ensure that Q-objects handle ANDing ORed components.
 | |
|         """
 | |
| 
 | |
|         class TestDoc(Document):
 | |
|             x = IntField()
 | |
|             y = BooleanField()
 | |
| 
 | |
|         TestDoc.drop_collection()
 | |
| 
 | |
|         query = Q(x__gt=0) | Q(x__exists=False)
 | |
|         query &= Q(x__lt=100)
 | |
|         assert query.to_query(TestDoc) == {
 | |
|             "$and": [
 | |
|                 {"$or": [{"x": {"$gt": 0}}, {"x": {"$exists": False}}]},
 | |
|                 {"x": {"$lt": 100}},
 | |
|             ]
 | |
|         }
 | |
| 
 | |
|         q1 = Q(x__gt=0) | Q(x__exists=False)
 | |
|         q2 = Q(x__lt=100) | Q(y=True)
 | |
|         query = (q1 & q2).to_query(TestDoc)
 | |
| 
 | |
|         TestDoc(x=101).save()
 | |
|         TestDoc(x=10).save()
 | |
|         TestDoc(y=True).save()
 | |
| 
 | |
|         assert query == {
 | |
|             "$and": [
 | |
|                 {"$or": [{"x": {"$gt": 0}}, {"x": {"$exists": False}}]},
 | |
|                 {"$or": [{"x": {"$lt": 100}}, {"y": True}]},
 | |
|             ]
 | |
|         }
 | |
|         assert 2 == TestDoc.objects(q1 & q2).count()
 | |
| 
 | |
|     def test_or_and_or_combination(self):
 | |
|         """Ensure that Q-objects handle ORing ANDed ORed components. :)
 | |
|         """
 | |
| 
 | |
|         class TestDoc(Document):
 | |
|             x = IntField()
 | |
|             y = BooleanField()
 | |
| 
 | |
|         TestDoc.drop_collection()
 | |
|         TestDoc(x=-1, y=True).save()
 | |
|         TestDoc(x=101, y=True).save()
 | |
|         TestDoc(x=99, y=False).save()
 | |
|         TestDoc(x=101, y=False).save()
 | |
| 
 | |
|         q1 = Q(x__gt=0) & (Q(y=True) | Q(y__exists=False))
 | |
|         q2 = Q(x__lt=100) & (Q(y=False) | Q(y__exists=False))
 | |
|         query = (q1 | q2).to_query(TestDoc)
 | |
| 
 | |
|         assert query == {
 | |
|             "$or": [
 | |
|                 {
 | |
|                     "$and": [
 | |
|                         {"x": {"$gt": 0}},
 | |
|                         {"$or": [{"y": True}, {"y": {"$exists": False}}]},
 | |
|                     ]
 | |
|                 },
 | |
|                 {
 | |
|                     "$and": [
 | |
|                         {"x": {"$lt": 100}},
 | |
|                         {"$or": [{"y": False}, {"y": {"$exists": False}}]},
 | |
|                     ]
 | |
|                 },
 | |
|             ]
 | |
|         }
 | |
|         assert 2 == TestDoc.objects(q1 | q2).count()
 | |
| 
 | |
|     def test_multiple_occurence_in_field(self):
 | |
|         class Test(Document):
 | |
|             name = StringField(max_length=40)
 | |
|             title = StringField(max_length=40)
 | |
| 
 | |
|         q1 = Q(name__contains="te") | Q(title__contains="te")
 | |
|         q2 = Q(name__contains="12") | Q(title__contains="12")
 | |
| 
 | |
|         q3 = q1 & q2
 | |
| 
 | |
|         query = q3.to_query(Test)
 | |
|         assert query["$and"][0] == q1.to_query(Test)
 | |
|         assert query["$and"][1] == q2.to_query(Test)
 | |
| 
 | |
|     def test_q_clone(self):
 | |
|         class TestDoc(Document):
 | |
|             x = IntField()
 | |
| 
 | |
|         TestDoc.drop_collection()
 | |
|         for i in range(1, 101):
 | |
|             t = TestDoc(x=i)
 | |
|             t.save()
 | |
| 
 | |
|         # Check normal cases work without an error
 | |
|         test = TestDoc.objects(Q(x__lt=7) & Q(x__gt=3))
 | |
| 
 | |
|         assert test.count() == 3
 | |
| 
 | |
|         test2 = test.clone()
 | |
|         assert test2.count() == 3
 | |
|         assert test2 != test
 | |
| 
 | |
|         test3 = test2.filter(x=6)
 | |
|         assert test3.count() == 1
 | |
|         assert test.count() == 3
 | |
| 
 | |
|     def test_q(self):
 | |
|         """Ensure that Q objects may be used to query for documents.
 | |
|         """
 | |
| 
 | |
|         class BlogPost(Document):
 | |
|             title = StringField()
 | |
|             publish_date = DateTimeField()
 | |
|             published = BooleanField()
 | |
| 
 | |
|         BlogPost.drop_collection()
 | |
| 
 | |
|         post1 = BlogPost(
 | |
|             title="Test 1", publish_date=datetime.datetime(2010, 1, 8), published=False
 | |
|         )
 | |
|         post1.save()
 | |
| 
 | |
|         post2 = BlogPost(
 | |
|             title="Test 2", publish_date=datetime.datetime(2010, 1, 15), published=True
 | |
|         )
 | |
|         post2.save()
 | |
| 
 | |
|         post3 = BlogPost(title="Test 3", published=True)
 | |
|         post3.save()
 | |
| 
 | |
|         post4 = BlogPost(title="Test 4", publish_date=datetime.datetime(2010, 1, 8))
 | |
|         post4.save()
 | |
| 
 | |
|         post5 = BlogPost(title="Test 1", publish_date=datetime.datetime(2010, 1, 15))
 | |
|         post5.save()
 | |
| 
 | |
|         post6 = BlogPost(title="Test 1", published=False)
 | |
|         post6.save()
 | |
| 
 | |
|         # Check ObjectId lookup works
 | |
|         obj = BlogPost.objects(id=post1.id).first()
 | |
|         assert obj == post1
 | |
| 
 | |
|         # Check Q object combination with one does not exist
 | |
|         q = BlogPost.objects(Q(title="Test 5") | Q(published=True))
 | |
|         posts = [post.id for post in q]
 | |
| 
 | |
|         published_posts = (post2, post3)
 | |
|         assert all(obj.id in posts for obj in published_posts)
 | |
| 
 | |
|         q = BlogPost.objects(Q(title="Test 1") | Q(published=True))
 | |
|         posts = [post.id for post in q]
 | |
|         published_posts = (post1, post2, post3, post5, post6)
 | |
|         assert all(obj.id in posts for obj in published_posts)
 | |
| 
 | |
|         # Check Q object combination
 | |
|         date = datetime.datetime(2010, 1, 10)
 | |
|         q = BlogPost.objects(Q(publish_date__lte=date) | Q(published=True))
 | |
|         posts = [post.id for post in q]
 | |
| 
 | |
|         published_posts = (post1, post2, post3, post4)
 | |
|         assert all(obj.id in posts for obj in published_posts)
 | |
| 
 | |
|         assert not any(obj.id in posts for obj in [post5, post6])
 | |
| 
 | |
|         BlogPost.drop_collection()
 | |
| 
 | |
|         # Check the 'in' operator
 | |
|         self.Person(name="user1", age=20).save()
 | |
|         self.Person(name="user2", age=20).save()
 | |
|         self.Person(name="user3", age=30).save()
 | |
|         self.Person(name="user4", age=40).save()
 | |
| 
 | |
|         assert self.Person.objects(Q(age__in=[20])).count() == 2
 | |
|         assert self.Person.objects(Q(age__in=[20, 30])).count() == 3
 | |
| 
 | |
|         # Test invalid query objs
 | |
|         with pytest.raises(InvalidQueryError):
 | |
|             self.Person.objects("user1")
 | |
| 
 | |
|         # filter should fail, too
 | |
|         with pytest.raises(InvalidQueryError):
 | |
|             self.Person.objects.filter("user1")
 | |
| 
 | |
|     def test_q_regex(self):
 | |
|         """Ensure that Q objects can be queried using regexes.
 | |
|         """
 | |
|         person = self.Person(name="Guido van Rossum")
 | |
|         person.save()
 | |
| 
 | |
|         obj = self.Person.objects(Q(name=re.compile("^Gui"))).first()
 | |
|         assert obj == person
 | |
|         obj = self.Person.objects(Q(name=re.compile("^gui"))).first()
 | |
|         assert obj is None
 | |
| 
 | |
|         obj = self.Person.objects(Q(name=re.compile("^gui", re.I))).first()
 | |
|         assert obj == person
 | |
| 
 | |
|         obj = self.Person.objects(Q(name__not=re.compile("^bob"))).first()
 | |
|         assert obj == person
 | |
| 
 | |
|         obj = self.Person.objects(Q(name__not=re.compile("^Gui"))).first()
 | |
|         assert obj is None
 | |
| 
 | |
|     def test_q_repr(self):
 | |
|         assert repr(Q()) == "Q(**{})"
 | |
|         assert repr(Q(name="test")) == "Q(**{'name': 'test'})"
 | |
| 
 | |
|         assert (
 | |
|             repr(Q(name="test") & Q(age__gte=18))
 | |
|             == "(Q(**{'name': 'test'}) & Q(**{'age__gte': 18}))"
 | |
|         )
 | |
| 
 | |
|         assert (
 | |
|             repr(Q(name="test") | Q(age__gte=18))
 | |
|             == "(Q(**{'name': 'test'}) | Q(**{'age__gte': 18}))"
 | |
|         )
 | |
| 
 | |
|     def test_q_lists(self):
 | |
|         """Ensure that Q objects query ListFields correctly.
 | |
|         """
 | |
| 
 | |
|         class BlogPost(Document):
 | |
|             tags = ListField(StringField())
 | |
| 
 | |
|         BlogPost.drop_collection()
 | |
| 
 | |
|         BlogPost(tags=["python", "mongo"]).save()
 | |
|         BlogPost(tags=["python"]).save()
 | |
| 
 | |
|         assert BlogPost.objects(Q(tags="mongo")).count() == 1
 | |
|         assert BlogPost.objects(Q(tags="python")).count() == 2
 | |
| 
 | |
|         BlogPost.drop_collection()
 | |
| 
 | |
|     def test_q_merge_queries_edge_case(self):
 | |
|         class User(Document):
 | |
|             email = EmailField(required=False)
 | |
|             name = StringField()
 | |
| 
 | |
|         User.drop_collection()
 | |
|         pk = ObjectId()
 | |
|         User(email="example@example.com", pk=pk).save()
 | |
| 
 | |
|         assert (
 | |
|             1
 | |
|             == User.objects.filter(Q(email="example@example.com") | Q(name="John Doe"))
 | |
|             .limit(2)
 | |
|             .filter(pk=pk)
 | |
|             .count()
 | |
|         )
 | |
| 
 | |
|     def test_chained_q_or_filtering(self):
 | |
|         class Post(EmbeddedDocument):
 | |
|             name = StringField(required=True)
 | |
| 
 | |
|         class Item(Document):
 | |
|             postables = ListField(EmbeddedDocumentField(Post))
 | |
| 
 | |
|         Item.drop_collection()
 | |
| 
 | |
|         Item(postables=[Post(name="a"), Post(name="b")]).save()
 | |
|         Item(postables=[Post(name="a"), Post(name="c")]).save()
 | |
|         Item(postables=[Post(name="a"), Post(name="b"), Post(name="c")]).save()
 | |
| 
 | |
|         assert (
 | |
|             Item.objects(Q(postables__name="a") & Q(postables__name="b")).count() == 2
 | |
|         )
 | |
|         assert (
 | |
|             Item.objects.filter(postables__name="a").filter(postables__name="b").count()
 | |
|             == 2
 | |
|         )
 | |
| 
 | |
|     def test_equality(self):
 | |
|         assert Q(name="John") == Q(name="John")
 | |
|         assert Q() == Q()
 | |
| 
 | |
|     def test_inequality(self):
 | |
|         assert Q(name="John") != Q(name="Ralph")
 | |
| 
 | |
|     def test_operation_equality(self):
 | |
|         q1 = Q(name="John") | Q(title="Sir") & Q(surname="Paul")
 | |
|         q2 = Q(name="John") | Q(title="Sir") & Q(surname="Paul")
 | |
|         assert q1 == q2
 | |
| 
 | |
|     def test_operation_inequality(self):
 | |
|         q1 = Q(name="John") | Q(title="Sir")
 | |
|         q2 = Q(title="Sir") | Q(name="John")
 | |
|         assert q1 != q2
 | |
| 
 | |
|     def test_combine_and_empty(self):
 | |
|         q = Q(x=1)
 | |
|         assert q & Q() == q
 | |
|         assert Q() & q == q
 | |
| 
 | |
|     def test_combine_and_both_empty(self):
 | |
|         assert Q() & Q() == Q()
 | |
| 
 | |
|     def test_combine_or_empty(self):
 | |
|         q = Q(x=1)
 | |
|         assert q | Q() == q
 | |
|         assert Q() | q == q
 | |
| 
 | |
|     def test_combine_or_both_empty(self):
 | |
|         assert Q() | Q() == Q()
 | |
| 
 | |
|     def test_q_bool(self):
 | |
|         assert Q(name="John")
 | |
|         assert not Q()
 | |
| 
 | |
|     def test_combine_bool(self):
 | |
|         assert not Q() & Q()
 | |
|         assert Q() & Q(name="John")
 | |
|         assert Q(name="John") & Q()
 | |
|         assert Q() | Q(name="John")
 | |
|         assert Q(name="John") | Q()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |