Split out queryset tests

This commit is contained in:
Ross Lawley 2012-12-19 13:35:37 +00:00
parent 420c3e0073
commit 50b755db0c
6 changed files with 565 additions and 539 deletions

View File

@ -1,2 +1,3 @@
from all_warnings import AllWarnings from all_warnings import AllWarnings
from document import * from document import *
from queryset import *

View File

@ -0,0 +1,5 @@
from transform import *
from field_list import *
from queryset import *
from visitor import *

View File

@ -0,0 +1,67 @@
import sys
sys.path[0:0] = [""]
import unittest
from mongoengine import *
from mongoengine.queryset import QueryFieldList
__all__ = ("QueryFieldListTest",)
class QueryFieldListTest(unittest.TestCase):
def test_empty(self):
q = QueryFieldList()
self.assertFalse(q)
q = QueryFieldList(always_include=['_cls'])
self.assertFalse(q)
def test_include_include(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'a': True, 'b': True})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'b': True})
def test_include_exclude(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'a': True, 'b': True})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': True})
def test_exclude_exclude(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': False, 'b': False})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': False, 'b': False, 'c': False})
def test_exclude_include(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': False, 'b': False})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'c': True})
def test_always_include(self):
q = QueryFieldList(always_include=['x', 'y'])
q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE)
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True})
def test_reset(self):
q = QueryFieldList(always_include=['x', 'y'])
q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE)
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True})
q.reset()
self.assertFalse(q)
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'b': True, 'c': True})
def test_using_a_slice(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a'], value={"$slice": 5})
self.assertEqual(q.as_dict(), {'a': {"$slice": 5}})

View File

@ -18,12 +18,13 @@ from mongoengine import *
from mongoengine.connection import get_connection from mongoengine.connection import get_connection
from mongoengine.python_support import PY3 from mongoengine.python_support import PY3
from mongoengine.tests import query_counter from mongoengine.tests import query_counter
from mongoengine.queryset import (Q, QuerySet, QuerySetManager, from mongoengine.queryset import (QuerySet, QuerySetManager,
MultipleObjectsReturned, DoesNotExist, MultipleObjectsReturned, DoesNotExist,
QueryFieldList, queryset_manager) QueryFieldList, queryset_manager)
from mongoengine.queryset import transform
from mongoengine.errors import InvalidQueryError from mongoengine.errors import InvalidQueryError
__all__ = ("QuerySetTest",)
class QuerySetTest(unittest.TestCase): class QuerySetTest(unittest.TestCase):
@ -47,22 +48,6 @@ class QuerySetTest(unittest.TestCase):
self.assertTrue(isinstance(self.Person.objects._collection, self.assertTrue(isinstance(self.Person.objects._collection,
pymongo.collection.Collection)) pymongo.collection.Collection))
def test_transform_query(self):
"""Ensure that the _transform_query function operates correctly.
"""
self.assertEqual(transform.query(name='test', age=30),
{'name': 'test', 'age': 30})
self.assertEqual(transform.query(age__lt=30),
{'age': {'$lt': 30}})
self.assertEqual(transform.query(age__gt=20, age__lt=50),
{'age': {'$gt': 20, '$lt': 50}})
self.assertEqual(transform.query(age=20, age__gt=50),
{'age': 20})
self.assertEqual(transform.query(friend__age__gte=30),
{'friend.age': {'$gte': 30}})
self.assertEqual(transform.query(name__exists=True),
{'name': {'$exists': True}})
def test_cannot_perform_joins_references(self): def test_cannot_perform_joins_references(self):
class BlogPost(Document): class BlogPost(Document):
@ -264,30 +249,6 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(list(A.objects.none()), []) self.assertEqual(list(A.objects.none()), [])
self.assertEqual(list(A.objects.none().all()), []) self.assertEqual(list(A.objects.none().all()), [])
def test_chaining(self):
class A(Document):
pass
class B(Document):
a = ReferenceField(A)
A.drop_collection()
B.drop_collection()
a1 = A().save()
a2 = A().save()
B(a=a1).save()
# Works
q1 = B.objects.filter(a__in=[a1, a2], a=a1)._query
# Doesn't work
q2 = B.objects.filter(a__in=[a1, a2])
q2 = q2.filter(a=a1)._query
self.assertEqual(q1, q2)
def test_update_write_options(self): def test_update_write_options(self):
"""Test that passing write_options works""" """Test that passing write_options works"""
@ -830,48 +791,30 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(name__contains='Van').first() obj = self.Person.objects(name__contains='Van').first()
self.assertEqual(obj, None) self.assertEqual(obj, None)
obj = self.Person.objects(Q(name__contains='van')).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__contains='Van')).first()
self.assertEqual(obj, None)
# Test icontains # Test icontains
obj = self.Person.objects(name__icontains='Van').first() obj = self.Person.objects(name__icontains='Van').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__icontains='Van')).first()
self.assertEqual(obj, person)
# Test startswith # Test startswith
obj = self.Person.objects(name__startswith='Guido').first() obj = self.Person.objects(name__startswith='Guido').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(name__startswith='guido').first() obj = self.Person.objects(name__startswith='guido').first()
self.assertEqual(obj, None) self.assertEqual(obj, None)
obj = self.Person.objects(Q(name__startswith='Guido')).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__startswith='guido')).first()
self.assertEqual(obj, None)
# Test istartswith # Test istartswith
obj = self.Person.objects(name__istartswith='guido').first() obj = self.Person.objects(name__istartswith='guido').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__istartswith='guido')).first()
self.assertEqual(obj, person)
# Test endswith # Test endswith
obj = self.Person.objects(name__endswith='Rossum').first() obj = self.Person.objects(name__endswith='Rossum').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(name__endswith='rossuM').first() obj = self.Person.objects(name__endswith='rossuM').first()
self.assertEqual(obj, None) self.assertEqual(obj, None)
obj = self.Person.objects(Q(name__endswith='Rossum')).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__endswith='rossuM')).first()
self.assertEqual(obj, None)
# Test iendswith # Test iendswith
obj = self.Person.objects(name__iendswith='rossuM').first() obj = self.Person.objects(name__iendswith='rossuM').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__iendswith='rossuM')).first()
self.assertEqual(obj, person)
# Test exact # Test exact
obj = self.Person.objects(name__exact='Guido van Rossum').first() obj = self.Person.objects(name__exact='Guido van Rossum').first()
@ -880,28 +823,18 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(obj, None) self.assertEqual(obj, None)
obj = self.Person.objects(name__exact='Guido van Rossu').first() obj = self.Person.objects(name__exact='Guido van Rossu').first()
self.assertEqual(obj, None) self.assertEqual(obj, None)
obj = self.Person.objects(Q(name__exact='Guido van Rossum')).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__exact='Guido van rossum')).first()
self.assertEqual(obj, None)
obj = self.Person.objects(Q(name__exact='Guido van Rossu')).first()
self.assertEqual(obj, None)
# Test iexact # Test iexact
obj = self.Person.objects(name__iexact='gUIDO VAN rOSSUM').first() obj = self.Person.objects(name__iexact='gUIDO VAN rOSSUM').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
obj = self.Person.objects(name__iexact='gUIDO VAN rOSSU').first() obj = self.Person.objects(name__iexact='gUIDO VAN rOSSU').first()
self.assertEqual(obj, None) self.assertEqual(obj, None)
obj = self.Person.objects(Q(name__iexact='gUIDO VAN rOSSUM')).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__iexact='gUIDO VAN rOSSU')).first()
self.assertEqual(obj, None)
# Test unsafe expressions # Test unsafe expressions
person = self.Person(name='Guido van Rossum [.\'Geek\']') person = self.Person(name='Guido van Rossum [.\'Geek\']')
person.save() person.save()
obj = self.Person.objects(Q(name__icontains='[.\'Geek')).first() obj = self.Person.objects(name__icontains='[.\'Geek').first()
self.assertEqual(obj, person) self.assertEqual(obj, person)
def test_not(self): def test_not(self):
@ -971,21 +904,6 @@ class QuerySetTest(unittest.TestCase):
BlogPost.drop_collection() BlogPost.drop_collection()
Blog.drop_collection() Blog.drop_collection()
def test_raw_and_merging(self):
class Doc(Document):
meta = {'allow_inheritance': False}
raw_query = Doc.objects(__raw__={'deleted': False,
'scraped': 'yes',
'$nor': [{'views.extracted': 'no'},
{'attachments.views.extracted':'no'}]
})._query
expected = {'deleted': False, 'scraped': 'yes',
'$nor': [{'views.extracted': 'no'},
{'attachments.views.extracted': 'no'}]}
self.assertEqual(expected, raw_query)
def test_ordering(self): def test_ordering(self):
"""Ensure default ordering is applied and can be overridden. """Ensure default ordering is applied and can be overridden.
""" """
@ -1310,151 +1228,6 @@ class QuerySetTest(unittest.TestCase):
BlogPost.drop_collection() BlogPost.drop_collection()
def test_q(self):
"""Ensure that Q objects may be used to query for documents.
"""
class BlogPost(Document):
title = StringField()
publish_date = DateTimeField()
published = BooleanField()
BlogPost.drop_collection()
post1 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 8), published=False)
post1.save()
post2 = BlogPost(title='Test 2', publish_date=datetime(2010, 1, 15), published=True)
post2.save()
post3 = BlogPost(title='Test 3', published=True)
post3.save()
post4 = BlogPost(title='Test 4', publish_date=datetime(2010, 1, 8))
post4.save()
post5 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 15))
post5.save()
post6 = BlogPost(title='Test 1', published=False)
post6.save()
# Check ObjectId lookup works
obj = BlogPost.objects(id=post1.id).first()
self.assertEqual(obj, post1)
# Check Q object combination with one does not exist
q = BlogPost.objects(Q(title='Test 5') | Q(published=True))
posts = [post.id for post in q]
published_posts = (post2, post3)
self.assertTrue(all(obj.id in posts for obj in published_posts))
q = BlogPost.objects(Q(title='Test 1') | Q(published=True))
posts = [post.id for post in q]
published_posts = (post1, post2, post3, post5, post6)
self.assertTrue(all(obj.id in posts for obj in published_posts))
# Check Q object combination
date = datetime(2010, 1, 10)
q = BlogPost.objects(Q(publish_date__lte=date) | Q(published=True))
posts = [post.id for post in q]
published_posts = (post1, post2, post3, post4)
self.assertTrue(all(obj.id in posts for obj in published_posts))
self.assertFalse(any(obj.id in posts for obj in [post5, post6]))
BlogPost.drop_collection()
# Check the 'in' operator
self.Person(name='user1', age=20).save()
self.Person(name='user2', age=20).save()
self.Person(name='user3', age=30).save()
self.Person(name='user4', age=40).save()
self.assertEqual(len(self.Person.objects(Q(age__in=[20]))), 2)
self.assertEqual(len(self.Person.objects(Q(age__in=[20, 30]))), 3)
# Test invalid query objs
def wrong_query_objs():
self.Person.objects('user1')
def wrong_query_objs_filter():
self.Person.objects('user1')
self.assertRaises(InvalidQueryError, wrong_query_objs)
self.assertRaises(InvalidQueryError, wrong_query_objs_filter)
def test_q_regex(self):
"""Ensure that Q objects can be queried using regexes.
"""
person = self.Person(name='Guido van Rossum')
person.save()
import re
obj = self.Person.objects(Q(name=re.compile('^Gui'))).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name=re.compile('^gui'))).first()
self.assertEqual(obj, None)
obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__not=re.compile('^bob'))).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__not=re.compile('^Gui'))).first()
self.assertEqual(obj, None)
def test_q_lists(self):
"""Ensure that Q objects query ListFields correctly.
"""
class BlogPost(Document):
tags = ListField(StringField())
BlogPost.drop_collection()
BlogPost(tags=['python', 'mongo']).save()
BlogPost(tags=['python']).save()
self.assertEqual(len(BlogPost.objects(Q(tags='mongo'))), 1)
self.assertEqual(len(BlogPost.objects(Q(tags='python'))), 2)
BlogPost.drop_collection()
def test_raw_query_and_Q_objects(self):
"""
Test raw plays nicely
"""
class Foo(Document):
name = StringField()
a = StringField()
b = StringField()
c = StringField()
meta = {
'allow_inheritance': False
}
query = Foo.objects(__raw__={'$nor': [{'name': 'bar'}]})._query
self.assertEqual(query, {'$nor': [{'name': 'bar'}]})
q1 = {'$or': [{'a': 1}, {'b': 1}]}
query = Foo.objects(Q(__raw__=q1) & Q(c=1))._query
self.assertEqual(query, {'$or': [{'a': 1}, {'b': 1}], 'c': 1})
def test_q_merge_queries_edge_case(self):
class User(Document):
email = EmailField(required=False)
name = StringField()
User.drop_collection()
pk = ObjectId()
User(email='example@example.com', pk=pk).save()
self.assertEqual(1, User.objects.filter(
Q(email='example@example.com') |
Q(name='John Doe')
).limit(2).filter(pk=pk).count())
def test_exec_js_query(self): def test_exec_js_query(self):
"""Ensure that queries are properly formed for use in exec_js. """Ensure that queries are properly formed for use in exec_js.
@ -1491,13 +1264,6 @@ class QuerySetTest(unittest.TestCase):
c = BlogPost.objects(published=False).exec_js(js_func, 'hits') c = BlogPost.objects(published=False).exec_js(js_func, 'hits')
self.assertEqual(c, 1) self.assertEqual(c, 1)
# Ensure that Q object queries work
c = BlogPost.objects(Q(published=True)).exec_js(js_func, 'hits')
self.assertEqual(c, 2)
c = BlogPost.objects(Q(published=False)).exec_js(js_func, 'hits')
self.assertEqual(c, 1)
BlogPost.drop_collection() BlogPost.drop_collection()
def test_exec_js_field_sub(self): def test_exec_js_field_sub(self):
@ -2558,56 +2324,6 @@ class QuerySetTest(unittest.TestCase):
BlogPost.drop_collection() BlogPost.drop_collection()
def test_query_field_name(self):
"""Ensure that the correct field name is used when querying.
"""
class Comment(EmbeddedDocument):
content = StringField(db_field='commentContent')
class BlogPost(Document):
title = StringField(db_field='postTitle')
comments = ListField(EmbeddedDocumentField(Comment),
db_field='postComments')
BlogPost.drop_collection()
data = {'title': 'Post 1', 'comments': [Comment(content='test')]}
post = BlogPost(**data)
post.save()
self.assertTrue('postTitle' in
BlogPost.objects(title=data['title'])._query)
self.assertFalse('title' in
BlogPost.objects(title=data['title'])._query)
self.assertEqual(len(BlogPost.objects(title=data['title'])), 1)
self.assertTrue('_id' in BlogPost.objects(pk=post.id)._query)
self.assertEqual(len(BlogPost.objects(pk=post.id)), 1)
self.assertTrue('postComments.commentContent' in
BlogPost.objects(comments__content='test')._query)
self.assertEqual(len(BlogPost.objects(comments__content='test')), 1)
BlogPost.drop_collection()
def test_query_pk_field_name(self):
"""Ensure that the correct "primary key" field name is used when querying
"""
class BlogPost(Document):
title = StringField(primary_key=True, db_field='postTitle')
BlogPost.drop_collection()
data = { 'title':'Post 1' }
post = BlogPost(**data)
post.save()
self.assertTrue('_id' in BlogPost.objects(pk=data['title'])._query)
self.assertTrue('_id' in BlogPost.objects(title=data['title'])._query)
self.assertEqual(len(BlogPost.objects(pk=data['title'])), 1)
BlogPost.drop_collection()
def test_query_value_conversion(self): def test_query_value_conversion(self):
"""Ensure that query values are properly converted when necessary. """Ensure that query values are properly converted when necessary.
@ -3446,227 +3162,6 @@ class QuerySetTest(unittest.TestCase):
else: else:
self.assertEqual("[u'A1', u'A2']", "%s" % sorted(self.Person.objects.scalar('name').in_bulk(list(pks)).values())) self.assertEqual("[u'A1', u'A2']", "%s" % sorted(self.Person.objects.scalar('name').in_bulk(list(pks)).values()))
class QTest(unittest.TestCase):
def setUp(self):
connect(db='mongoenginetest')
def test_empty_q(self):
"""Ensure that empty Q objects won't hurt.
"""
q1 = Q()
q2 = Q(age__gte=18)
q3 = Q()
q4 = Q(name='test')
q5 = Q()
class Person(Document):
name = StringField()
age = IntField()
query = {'$or': [{'age': {'$gte': 18}}, {'name': 'test'}]}
self.assertEqual((q1 | q2 | q3 | q4 | q5).to_query(Person), query)
query = {'age': {'$gte': 18}, 'name': 'test'}
self.assertEqual((q1 & q2 & q3 & q4 & q5).to_query(Person), query)
def test_q_with_dbref(self):
"""Ensure Q objects handle DBRefs correctly"""
connect(db='mongoenginetest')
class User(Document):
pass
class Post(Document):
created_user = ReferenceField(User)
user = User.objects.create()
Post.objects.create(created_user=user)
self.assertEqual(Post.objects.filter(created_user=user).count(), 1)
self.assertEqual(Post.objects.filter(Q(created_user=user)).count(), 1)
def test_and_combination(self):
"""Ensure that Q-objects correctly AND together.
"""
class TestDoc(Document):
x = IntField()
y = StringField()
# Check than an error is raised when conflicting queries are anded
def invalid_combination():
query = Q(x__lt=7) & Q(x__lt=3)
query.to_query(TestDoc)
self.assertRaises(InvalidQueryError, invalid_combination)
# Check normal cases work without an error
query = Q(x__lt=7) & Q(x__gt=3)
q1 = Q(x__lt=7)
q2 = Q(x__gt=3)
query = (q1 & q2).to_query(TestDoc)
self.assertEqual(query, {'x': {'$lt': 7, '$gt': 3}})
# More complex nested example
query = Q(x__lt=100) & Q(y__ne='NotMyString')
query &= Q(y__in=['a', 'b', 'c']) & Q(x__gt=-100)
mongo_query = {
'x': {'$lt': 100, '$gt': -100},
'y': {'$ne': 'NotMyString', '$in': ['a', 'b', 'c']},
}
self.assertEqual(query.to_query(TestDoc), mongo_query)
def test_or_combination(self):
"""Ensure that Q-objects correctly OR together.
"""
class TestDoc(Document):
x = IntField()
q1 = Q(x__lt=3)
q2 = Q(x__gt=7)
query = (q1 | q2).to_query(TestDoc)
self.assertEqual(query, {
'$or': [
{'x': {'$lt': 3}},
{'x': {'$gt': 7}},
]
})
def test_and_or_combination(self):
"""Ensure that Q-objects handle ANDing ORed components.
"""
class TestDoc(Document):
x = IntField()
y = BooleanField()
query = (Q(x__gt=0) | Q(x__exists=False))
query &= Q(x__lt=100)
self.assertEqual(query.to_query(TestDoc), {
'$or': [
{'x': {'$lt': 100, '$gt': 0}},
{'x': {'$lt': 100, '$exists': False}},
]
})
q1 = (Q(x__gt=0) | Q(x__exists=False))
q2 = (Q(x__lt=100) | Q(y=True))
query = (q1 & q2).to_query(TestDoc)
self.assertEqual(['$or'], query.keys())
conditions = [
{'x': {'$lt': 100, '$gt': 0}},
{'x': {'$lt': 100, '$exists': False}},
{'x': {'$gt': 0}, 'y': True},
{'x': {'$exists': False}, 'y': True},
]
self.assertEqual(len(conditions), len(query['$or']))
for condition in conditions:
self.assertTrue(condition in query['$or'])
def test_or_and_or_combination(self):
"""Ensure that Q-objects handle ORing ANDed ORed components. :)
"""
class TestDoc(Document):
x = IntField()
y = BooleanField()
q1 = (Q(x__gt=0) & (Q(y=True) | Q(y__exists=False)))
q2 = (Q(x__lt=100) & (Q(y=False) | Q(y__exists=False)))
query = (q1 | q2).to_query(TestDoc)
self.assertEqual(['$or'], query.keys())
conditions = [
{'x': {'$gt': 0}, 'y': True},
{'x': {'$gt': 0}, 'y': {'$exists': False}},
{'x': {'$lt': 100}, 'y':False},
{'x': {'$lt': 100}, 'y': {'$exists': False}},
]
self.assertEqual(len(conditions), len(query['$or']))
for condition in conditions:
self.assertTrue(condition in query['$or'])
def test_q_clone(self):
class TestDoc(Document):
x = IntField()
TestDoc.drop_collection()
for i in xrange(1, 101):
t = TestDoc(x=i)
t.save()
# Check normal cases work without an error
test = TestDoc.objects(Q(x__lt=7) & Q(x__gt=3))
self.assertEqual(test.count(), 3)
test2 = test.clone()
self.assertEqual(test2.count(), 3)
self.assertFalse(test2 == test)
test2.filter(x=6)
self.assertEqual(test2.count(), 1)
self.assertEqual(test.count(), 3)
class QueryFieldListTest(unittest.TestCase):
def test_empty(self):
q = QueryFieldList()
self.assertFalse(q)
q = QueryFieldList(always_include=['_cls'])
self.assertFalse(q)
def test_include_include(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'a': True, 'b': True})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'b': True})
def test_include_exclude(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'a': True, 'b': True})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': True})
def test_exclude_exclude(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': False, 'b': False})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': False, 'b': False, 'c': False})
def test_exclude_include(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE)
self.assertEqual(q.as_dict(), {'a': False, 'b': False})
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'c': True})
def test_always_include(self):
q = QueryFieldList(always_include=['x', 'y'])
q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE)
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True})
def test_reset(self):
q = QueryFieldList(always_include=['x', 'y'])
q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE)
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True})
q.reset()
self.assertFalse(q)
q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY)
self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'b': True, 'c': True})
def test_using_a_slice(self):
q = QueryFieldList()
q += QueryFieldList(fields=['a'], value={"$slice": 5})
self.assertEqual(q.as_dict(), {'a': {"$slice": 5}})
def test_elem_match(self): def test_elem_match(self):
class Foo(EmbeddedDocument): class Foo(EmbeddedDocument):
shape = StringField() shape = StringField()
@ -3691,6 +3186,26 @@ class QueryFieldListTest(unittest.TestCase):
ak = list(Bar.objects(foo__match={'shape': "square", "color": "purple"})) ak = list(Bar.objects(foo__match={'shape': "square", "color": "purple"}))
self.assertEqual([b1], ak) self.assertEqual([b1], ak)
def test_upsert_includes_cls(self):
"""Upserts should include _cls information for inheritable classes
"""
class Test(Document):
test = StringField()
Test.drop_collection()
Test.objects(test='foo').update_one(upsert=True, set__test='foo')
self.assertFalse('_cls' in Test._collection.find_one())
class Test(Document):
meta = {'allow_inheritance': True}
test = StringField()
Test.drop_collection()
Test.objects(test='foo').update_one(upsert=True, set__test='foo')
self.assertTrue('_cls' in Test._collection.find_one())
def test_read_preference(self): def test_read_preference(self):
class Bar(Document): class Bar(Document):
pass pass
@ -3769,26 +3284,6 @@ class QueryFieldListTest(unittest.TestCase):
self.assertEqual(doc_objects, Doc.objects.from_json(json_data)) self.assertEqual(doc_objects, Doc.objects.from_json(json_data))
def test_upsert_includes_cls(self):
"""Upserts should include _cls information for inheritable classes
"""
class Test(Document):
test = StringField()
Test.drop_collection()
Test.objects(test='foo').update_one(upsert=True, set__test='foo')
self.assertFalse('_cls' in Test._collection.find_one())
class Test(Document):
meta = {'allow_inheritance': True}
test = StringField()
Test.drop_collection()
Test.objects(test='foo').update_one(upsert=True, set__test='foo')
self.assertTrue('_cls' in Test._collection.find_one())
def test_as_pymongo(self): def test_as_pymongo(self):
from decimal import Decimal from decimal import Decimal

148
tests/queryset/transform.py Normal file
View File

@ -0,0 +1,148 @@
from __future__ import with_statement
import sys
sys.path[0:0] = [""]
import unittest
from mongoengine import *
from mongoengine.queryset import Q
from mongoengine.queryset import transform
__all__ = ("TransformTest",)
class TransformTest(unittest.TestCase):
def setUp(self):
connect(db='mongoenginetest')
def test_transform_query(self):
"""Ensure that the _transform_query function operates correctly.
"""
self.assertEqual(transform.query(name='test', age=30),
{'name': 'test', 'age': 30})
self.assertEqual(transform.query(age__lt=30),
{'age': {'$lt': 30}})
self.assertEqual(transform.query(age__gt=20, age__lt=50),
{'age': {'$gt': 20, '$lt': 50}})
self.assertEqual(transform.query(age=20, age__gt=50),
{'age': 20})
self.assertEqual(transform.query(friend__age__gte=30),
{'friend.age': {'$gte': 30}})
self.assertEqual(transform.query(name__exists=True),
{'name': {'$exists': True}})
def test_query_field_name(self):
"""Ensure that the correct field name is used when querying.
"""
class Comment(EmbeddedDocument):
content = StringField(db_field='commentContent')
class BlogPost(Document):
title = StringField(db_field='postTitle')
comments = ListField(EmbeddedDocumentField(Comment),
db_field='postComments')
BlogPost.drop_collection()
data = {'title': 'Post 1', 'comments': [Comment(content='test')]}
post = BlogPost(**data)
post.save()
self.assertTrue('postTitle' in
BlogPost.objects(title=data['title'])._query)
self.assertFalse('title' in
BlogPost.objects(title=data['title'])._query)
self.assertEqual(len(BlogPost.objects(title=data['title'])), 1)
self.assertTrue('_id' in BlogPost.objects(pk=post.id)._query)
self.assertEqual(len(BlogPost.objects(pk=post.id)), 1)
self.assertTrue('postComments.commentContent' in
BlogPost.objects(comments__content='test')._query)
self.assertEqual(len(BlogPost.objects(comments__content='test')), 1)
BlogPost.drop_collection()
def test_query_pk_field_name(self):
"""Ensure that the correct "primary key" field name is used when
querying
"""
class BlogPost(Document):
title = StringField(primary_key=True, db_field='postTitle')
BlogPost.drop_collection()
data = {'title': 'Post 1'}
post = BlogPost(**data)
post.save()
self.assertTrue('_id' in BlogPost.objects(pk=data['title'])._query)
self.assertTrue('_id' in BlogPost.objects(title=data['title'])._query)
self.assertEqual(len(BlogPost.objects(pk=data['title'])), 1)
BlogPost.drop_collection()
def test_chaining(self):
class A(Document):
pass
class B(Document):
a = ReferenceField(A)
A.drop_collection()
B.drop_collection()
a1 = A().save()
a2 = A().save()
B(a=a1).save()
# Works
q1 = B.objects.filter(a__in=[a1, a2], a=a1)._query
# Doesn't work
q2 = B.objects.filter(a__in=[a1, a2])
q2 = q2.filter(a=a1)._query
self.assertEqual(q1, q2)
def test_raw_query_and_Q_objects(self):
"""
Test raw plays nicely
"""
class Foo(Document):
name = StringField()
a = StringField()
b = StringField()
c = StringField()
meta = {
'allow_inheritance': False
}
query = Foo.objects(__raw__={'$nor': [{'name': 'bar'}]})._query
self.assertEqual(query, {'$nor': [{'name': 'bar'}]})
q1 = {'$or': [{'a': 1}, {'b': 1}]}
query = Foo.objects(Q(__raw__=q1) & Q(c=1))._query
self.assertEqual(query, {'$or': [{'a': 1}, {'b': 1}], 'c': 1})
def test_raw_and_merging(self):
class Doc(Document):
meta = {'allow_inheritance': False}
raw_query = Doc.objects(__raw__={'deleted': False,
'scraped': 'yes',
'$nor': [{'views.extracted': 'no'},
{'attachments.views.extracted':'no'}]
})._query
expected = {'deleted': False, 'scraped': 'yes',
'$nor': [{'views.extracted': 'no'},
{'attachments.views.extracted': 'no'}]}
self.assertEqual(expected, raw_query)
if __name__ == '__main__':
unittest.main()

310
tests/queryset/visitor.py Normal file
View File

@ -0,0 +1,310 @@
from __future__ import with_statement
import sys
sys.path[0:0] = [""]
import unittest
from bson import ObjectId
from datetime import datetime
from mongoengine import *
from mongoengine.queryset import Q
from mongoengine.errors import InvalidQueryError
__all__ = ("QTest",)
class QTest(unittest.TestCase):
def setUp(self):
connect(db='mongoenginetest')
class Person(Document):
name = StringField()
age = IntField()
meta = {'allow_inheritance': True}
Person.drop_collection()
self.Person = Person
def test_empty_q(self):
"""Ensure that empty Q objects won't hurt.
"""
q1 = Q()
q2 = Q(age__gte=18)
q3 = Q()
q4 = Q(name='test')
q5 = Q()
class Person(Document):
name = StringField()
age = IntField()
query = {'$or': [{'age': {'$gte': 18}}, {'name': 'test'}]}
self.assertEqual((q1 | q2 | q3 | q4 | q5).to_query(Person), query)
query = {'age': {'$gte': 18}, 'name': 'test'}
self.assertEqual((q1 & q2 & q3 & q4 & q5).to_query(Person), query)
def test_q_with_dbref(self):
"""Ensure Q objects handle DBRefs correctly"""
connect(db='mongoenginetest')
class User(Document):
pass
class Post(Document):
created_user = ReferenceField(User)
user = User.objects.create()
Post.objects.create(created_user=user)
self.assertEqual(Post.objects.filter(created_user=user).count(), 1)
self.assertEqual(Post.objects.filter(Q(created_user=user)).count(), 1)
def test_and_combination(self):
"""Ensure that Q-objects correctly AND together.
"""
class TestDoc(Document):
x = IntField()
y = StringField()
# Check than an error is raised when conflicting queries are anded
def invalid_combination():
query = Q(x__lt=7) & Q(x__lt=3)
query.to_query(TestDoc)
self.assertRaises(InvalidQueryError, invalid_combination)
# Check normal cases work without an error
query = Q(x__lt=7) & Q(x__gt=3)
q1 = Q(x__lt=7)
q2 = Q(x__gt=3)
query = (q1 & q2).to_query(TestDoc)
self.assertEqual(query, {'x': {'$lt': 7, '$gt': 3}})
# More complex nested example
query = Q(x__lt=100) & Q(y__ne='NotMyString')
query &= Q(y__in=['a', 'b', 'c']) & Q(x__gt=-100)
mongo_query = {
'x': {'$lt': 100, '$gt': -100},
'y': {'$ne': 'NotMyString', '$in': ['a', 'b', 'c']},
}
self.assertEqual(query.to_query(TestDoc), mongo_query)
def test_or_combination(self):
"""Ensure that Q-objects correctly OR together.
"""
class TestDoc(Document):
x = IntField()
q1 = Q(x__lt=3)
q2 = Q(x__gt=7)
query = (q1 | q2).to_query(TestDoc)
self.assertEqual(query, {
'$or': [
{'x': {'$lt': 3}},
{'x': {'$gt': 7}},
]
})
def test_and_or_combination(self):
"""Ensure that Q-objects handle ANDing ORed components.
"""
class TestDoc(Document):
x = IntField()
y = BooleanField()
query = (Q(x__gt=0) | Q(x__exists=False))
query &= Q(x__lt=100)
self.assertEqual(query.to_query(TestDoc), {
'$or': [
{'x': {'$lt': 100, '$gt': 0}},
{'x': {'$lt': 100, '$exists': False}},
]
})
q1 = (Q(x__gt=0) | Q(x__exists=False))
q2 = (Q(x__lt=100) | Q(y=True))
query = (q1 & q2).to_query(TestDoc)
self.assertEqual(['$or'], query.keys())
conditions = [
{'x': {'$lt': 100, '$gt': 0}},
{'x': {'$lt': 100, '$exists': False}},
{'x': {'$gt': 0}, 'y': True},
{'x': {'$exists': False}, 'y': True},
]
self.assertEqual(len(conditions), len(query['$or']))
for condition in conditions:
self.assertTrue(condition in query['$or'])
def test_or_and_or_combination(self):
"""Ensure that Q-objects handle ORing ANDed ORed components. :)
"""
class TestDoc(Document):
x = IntField()
y = BooleanField()
q1 = (Q(x__gt=0) & (Q(y=True) | Q(y__exists=False)))
q2 = (Q(x__lt=100) & (Q(y=False) | Q(y__exists=False)))
query = (q1 | q2).to_query(TestDoc)
self.assertEqual(['$or'], query.keys())
conditions = [
{'x': {'$gt': 0}, 'y': True},
{'x': {'$gt': 0}, 'y': {'$exists': False}},
{'x': {'$lt': 100}, 'y':False},
{'x': {'$lt': 100}, 'y': {'$exists': False}},
]
self.assertEqual(len(conditions), len(query['$or']))
for condition in conditions:
self.assertTrue(condition in query['$or'])
def test_q_clone(self):
class TestDoc(Document):
x = IntField()
TestDoc.drop_collection()
for i in xrange(1, 101):
t = TestDoc(x=i)
t.save()
# Check normal cases work without an error
test = TestDoc.objects(Q(x__lt=7) & Q(x__gt=3))
self.assertEqual(test.count(), 3)
test2 = test.clone()
self.assertEqual(test2.count(), 3)
self.assertFalse(test2 == test)
test2.filter(x=6)
self.assertEqual(test2.count(), 1)
self.assertEqual(test.count(), 3)
def test_q(self):
"""Ensure that Q objects may be used to query for documents.
"""
class BlogPost(Document):
title = StringField()
publish_date = DateTimeField()
published = BooleanField()
BlogPost.drop_collection()
post1 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 8), published=False)
post1.save()
post2 = BlogPost(title='Test 2', publish_date=datetime(2010, 1, 15), published=True)
post2.save()
post3 = BlogPost(title='Test 3', published=True)
post3.save()
post4 = BlogPost(title='Test 4', publish_date=datetime(2010, 1, 8))
post4.save()
post5 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 15))
post5.save()
post6 = BlogPost(title='Test 1', published=False)
post6.save()
# Check ObjectId lookup works
obj = BlogPost.objects(id=post1.id).first()
self.assertEqual(obj, post1)
# Check Q object combination with one does not exist
q = BlogPost.objects(Q(title='Test 5') | Q(published=True))
posts = [post.id for post in q]
published_posts = (post2, post3)
self.assertTrue(all(obj.id in posts for obj in published_posts))
q = BlogPost.objects(Q(title='Test 1') | Q(published=True))
posts = [post.id for post in q]
published_posts = (post1, post2, post3, post5, post6)
self.assertTrue(all(obj.id in posts for obj in published_posts))
# Check Q object combination
date = datetime(2010, 1, 10)
q = BlogPost.objects(Q(publish_date__lte=date) | Q(published=True))
posts = [post.id for post in q]
published_posts = (post1, post2, post3, post4)
self.assertTrue(all(obj.id in posts for obj in published_posts))
self.assertFalse(any(obj.id in posts for obj in [post5, post6]))
BlogPost.drop_collection()
# Check the 'in' operator
self.Person(name='user1', age=20).save()
self.Person(name='user2', age=20).save()
self.Person(name='user3', age=30).save()
self.Person(name='user4', age=40).save()
self.assertEqual(len(self.Person.objects(Q(age__in=[20]))), 2)
self.assertEqual(len(self.Person.objects(Q(age__in=[20, 30]))), 3)
# Test invalid query objs
def wrong_query_objs():
self.Person.objects('user1')
def wrong_query_objs_filter():
self.Person.objects('user1')
self.assertRaises(InvalidQueryError, wrong_query_objs)
self.assertRaises(InvalidQueryError, wrong_query_objs_filter)
def test_q_regex(self):
"""Ensure that Q objects can be queried using regexes.
"""
person = self.Person(name='Guido van Rossum')
person.save()
import re
obj = self.Person.objects(Q(name=re.compile('^Gui'))).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name=re.compile('^gui'))).first()
self.assertEqual(obj, None)
obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__not=re.compile('^bob'))).first()
self.assertEqual(obj, person)
obj = self.Person.objects(Q(name__not=re.compile('^Gui'))).first()
self.assertEqual(obj, None)
def test_q_lists(self):
"""Ensure that Q objects query ListFields correctly.
"""
class BlogPost(Document):
tags = ListField(StringField())
BlogPost.drop_collection()
BlogPost(tags=['python', 'mongo']).save()
BlogPost(tags=['python']).save()
self.assertEqual(len(BlogPost.objects(Q(tags='mongo'))), 1)
self.assertEqual(len(BlogPost.objects(Q(tags='python'))), 2)
BlogPost.drop_collection()
def test_q_merge_queries_edge_case(self):
class User(Document):
email = EmailField(required=False)
name = StringField()
User.drop_collection()
pk = ObjectId()
User(email='example@example.com', pk=pk).save()
self.assertEqual(1, User.objects.filter(
Q(email='example@example.com') |
Q(name='John Doe')
).limit(2).filter(pk=pk).count())