From 50b755db0c535fd96b374be88d79410ebc356838 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Wed, 19 Dec 2012 13:35:37 +0000 Subject: [PATCH] Split out queryset tests --- tests/__init__.py | 3 +- tests/queryset/__init__.py | 5 + tests/queryset/field_list.py | 67 ++ .../queryset.py} | 571 +----------------- tests/queryset/transform.py | 148 +++++ tests/queryset/visitor.py | 310 ++++++++++ 6 files changed, 565 insertions(+), 539 deletions(-) create mode 100644 tests/queryset/__init__.py create mode 100644 tests/queryset/field_list.py rename tests/{test_queryset.py => queryset/queryset.py} (85%) create mode 100644 tests/queryset/transform.py create mode 100644 tests/queryset/visitor.py diff --git a/tests/__init__.py b/tests/__init__.py index ccc90f79..152a8ce0 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,2 +1,3 @@ from all_warnings import AllWarnings -from document import * \ No newline at end of file +from document import * +from queryset import * \ No newline at end of file diff --git a/tests/queryset/__init__.py b/tests/queryset/__init__.py new file mode 100644 index 00000000..93cb8c23 --- /dev/null +++ b/tests/queryset/__init__.py @@ -0,0 +1,5 @@ + +from transform import * +from field_list import * +from queryset import * +from visitor import * \ No newline at end of file diff --git a/tests/queryset/field_list.py b/tests/queryset/field_list.py new file mode 100644 index 00000000..f3b457b4 --- /dev/null +++ b/tests/queryset/field_list.py @@ -0,0 +1,67 @@ +import sys +sys.path[0:0] = [""] + +import unittest + +from mongoengine import * +from mongoengine.queryset import QueryFieldList + +__all__ = ("QueryFieldListTest",) + +class QueryFieldListTest(unittest.TestCase): + + def test_empty(self): + q = QueryFieldList() + self.assertFalse(q) + + q = QueryFieldList(always_include=['_cls']) + self.assertFalse(q) + + def test_include_include(self): + q = QueryFieldList() + q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'a': True, 'b': True}) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'b': True}) + + def test_include_exclude(self): + q = QueryFieldList() + q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'a': True, 'b': True}) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE) + self.assertEqual(q.as_dict(), {'a': True}) + + def test_exclude_exclude(self): + q = QueryFieldList() + q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE) + self.assertEqual(q.as_dict(), {'a': False, 'b': False}) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE) + self.assertEqual(q.as_dict(), {'a': False, 'b': False, 'c': False}) + + def test_exclude_include(self): + q = QueryFieldList() + q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE) + self.assertEqual(q.as_dict(), {'a': False, 'b': False}) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'c': True}) + + def test_always_include(self): + q = QueryFieldList(always_include=['x', 'y']) + q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True}) + + def test_reset(self): + q = QueryFieldList(always_include=['x', 'y']) + q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True}) + q.reset() + self.assertFalse(q) + q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) + self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'b': True, 'c': True}) + + def test_using_a_slice(self): + q = QueryFieldList() + q += QueryFieldList(fields=['a'], value={"$slice": 5}) + self.assertEqual(q.as_dict(), {'a': {"$slice": 5}}) diff --git a/tests/test_queryset.py b/tests/queryset/queryset.py similarity index 85% rename from tests/test_queryset.py rename to tests/queryset/queryset.py index 07725d4c..e3e02158 100644 --- a/tests/test_queryset.py +++ b/tests/queryset/queryset.py @@ -18,12 +18,13 @@ from mongoengine import * from mongoengine.connection import get_connection from mongoengine.python_support import PY3 from mongoengine.tests import query_counter -from mongoengine.queryset import (Q, QuerySet, QuerySetManager, +from mongoengine.queryset import (QuerySet, QuerySetManager, MultipleObjectsReturned, DoesNotExist, QueryFieldList, queryset_manager) -from mongoengine.queryset import transform from mongoengine.errors import InvalidQueryError +__all__ = ("QuerySetTest",) + class QuerySetTest(unittest.TestCase): @@ -47,22 +48,6 @@ class QuerySetTest(unittest.TestCase): self.assertTrue(isinstance(self.Person.objects._collection, pymongo.collection.Collection)) - def test_transform_query(self): - """Ensure that the _transform_query function operates correctly. - """ - self.assertEqual(transform.query(name='test', age=30), - {'name': 'test', 'age': 30}) - self.assertEqual(transform.query(age__lt=30), - {'age': {'$lt': 30}}) - self.assertEqual(transform.query(age__gt=20, age__lt=50), - {'age': {'$gt': 20, '$lt': 50}}) - self.assertEqual(transform.query(age=20, age__gt=50), - {'age': 20}) - self.assertEqual(transform.query(friend__age__gte=30), - {'friend.age': {'$gte': 30}}) - self.assertEqual(transform.query(name__exists=True), - {'name': {'$exists': True}}) - def test_cannot_perform_joins_references(self): class BlogPost(Document): @@ -264,30 +249,6 @@ class QuerySetTest(unittest.TestCase): self.assertEqual(list(A.objects.none()), []) self.assertEqual(list(A.objects.none().all()), []) - def test_chaining(self): - class A(Document): - pass - - class B(Document): - a = ReferenceField(A) - - A.drop_collection() - B.drop_collection() - - a1 = A().save() - a2 = A().save() - - B(a=a1).save() - - # Works - q1 = B.objects.filter(a__in=[a1, a2], a=a1)._query - - # Doesn't work - q2 = B.objects.filter(a__in=[a1, a2]) - q2 = q2.filter(a=a1)._query - - self.assertEqual(q1, q2) - def test_update_write_options(self): """Test that passing write_options works""" @@ -830,48 +791,30 @@ class QuerySetTest(unittest.TestCase): self.assertEqual(obj, person) obj = self.Person.objects(name__contains='Van').first() self.assertEqual(obj, None) - obj = self.Person.objects(Q(name__contains='van')).first() - self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__contains='Van')).first() - self.assertEqual(obj, None) # Test icontains obj = self.Person.objects(name__icontains='Van').first() self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__icontains='Van')).first() - self.assertEqual(obj, person) # Test startswith obj = self.Person.objects(name__startswith='Guido').first() self.assertEqual(obj, person) obj = self.Person.objects(name__startswith='guido').first() self.assertEqual(obj, None) - obj = self.Person.objects(Q(name__startswith='Guido')).first() - self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__startswith='guido')).first() - self.assertEqual(obj, None) # Test istartswith obj = self.Person.objects(name__istartswith='guido').first() self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__istartswith='guido')).first() - self.assertEqual(obj, person) # Test endswith obj = self.Person.objects(name__endswith='Rossum').first() self.assertEqual(obj, person) obj = self.Person.objects(name__endswith='rossuM').first() self.assertEqual(obj, None) - obj = self.Person.objects(Q(name__endswith='Rossum')).first() - self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__endswith='rossuM')).first() - self.assertEqual(obj, None) # Test iendswith obj = self.Person.objects(name__iendswith='rossuM').first() self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__iendswith='rossuM')).first() - self.assertEqual(obj, person) # Test exact obj = self.Person.objects(name__exact='Guido van Rossum').first() @@ -880,28 +823,18 @@ class QuerySetTest(unittest.TestCase): self.assertEqual(obj, None) obj = self.Person.objects(name__exact='Guido van Rossu').first() self.assertEqual(obj, None) - obj = self.Person.objects(Q(name__exact='Guido van Rossum')).first() - self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__exact='Guido van rossum')).first() - self.assertEqual(obj, None) - obj = self.Person.objects(Q(name__exact='Guido van Rossu')).first() - self.assertEqual(obj, None) # Test iexact obj = self.Person.objects(name__iexact='gUIDO VAN rOSSUM').first() self.assertEqual(obj, person) obj = self.Person.objects(name__iexact='gUIDO VAN rOSSU').first() self.assertEqual(obj, None) - obj = self.Person.objects(Q(name__iexact='gUIDO VAN rOSSUM')).first() - self.assertEqual(obj, person) - obj = self.Person.objects(Q(name__iexact='gUIDO VAN rOSSU')).first() - self.assertEqual(obj, None) # Test unsafe expressions person = self.Person(name='Guido van Rossum [.\'Geek\']') person.save() - obj = self.Person.objects(Q(name__icontains='[.\'Geek')).first() + obj = self.Person.objects(name__icontains='[.\'Geek').first() self.assertEqual(obj, person) def test_not(self): @@ -944,14 +877,14 @@ class QuerySetTest(unittest.TestCase): blog_3.save() blog_post_1 = BlogPost(blog=blog_1, title="Blog Post #1", - is_published = True, - published_date=datetime(2010, 1, 5, 0, 0 ,0)) + is_published=True, + published_date=datetime(2010, 1, 5, 0, 0, 0)) blog_post_2 = BlogPost(blog=blog_2, title="Blog Post #2", - is_published = True, - published_date=datetime(2010, 1, 6, 0, 0 ,0)) + is_published=True, + published_date=datetime(2010, 1, 6, 0, 0, 0)) blog_post_3 = BlogPost(blog=blog_3, title="Blog Post #3", - is_published = True, - published_date=datetime(2010, 1, 7, 0, 0 ,0)) + is_published=True, + published_date=datetime(2010, 1, 7, 0, 0, 0)) blog_post_1.save() blog_post_2.save() @@ -971,21 +904,6 @@ class QuerySetTest(unittest.TestCase): BlogPost.drop_collection() Blog.drop_collection() - def test_raw_and_merging(self): - class Doc(Document): - meta = {'allow_inheritance': False} - - raw_query = Doc.objects(__raw__={'deleted': False, - 'scraped': 'yes', - '$nor': [{'views.extracted': 'no'}, - {'attachments.views.extracted':'no'}] - })._query - - expected = {'deleted': False, 'scraped': 'yes', - '$nor': [{'views.extracted': 'no'}, - {'attachments.views.extracted': 'no'}]} - self.assertEqual(expected, raw_query) - def test_ordering(self): """Ensure default ordering is applied and can be overridden. """ @@ -1000,11 +918,11 @@ class QuerySetTest(unittest.TestCase): BlogPost.drop_collection() blog_post_1 = BlogPost(title="Blog Post #1", - published_date=datetime(2010, 1, 5, 0, 0 ,0)) + published_date=datetime(2010, 1, 5, 0, 0, 0)) blog_post_2 = BlogPost(title="Blog Post #2", - published_date=datetime(2010, 1, 6, 0, 0 ,0)) + published_date=datetime(2010, 1, 6, 0, 0, 0)) blog_post_3 = BlogPost(title="Blog Post #3", - published_date=datetime(2010, 1, 7, 0, 0 ,0)) + published_date=datetime(2010, 1, 7, 0, 0, 0)) blog_post_1.save() blog_post_2.save() @@ -1310,151 +1228,6 @@ class QuerySetTest(unittest.TestCase): BlogPost.drop_collection() - def test_q(self): - """Ensure that Q objects may be used to query for documents. - """ - class BlogPost(Document): - title = StringField() - publish_date = DateTimeField() - published = BooleanField() - - BlogPost.drop_collection() - - post1 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 8), published=False) - post1.save() - - post2 = BlogPost(title='Test 2', publish_date=datetime(2010, 1, 15), published=True) - post2.save() - - post3 = BlogPost(title='Test 3', published=True) - post3.save() - - post4 = BlogPost(title='Test 4', publish_date=datetime(2010, 1, 8)) - post4.save() - - post5 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 15)) - post5.save() - - post6 = BlogPost(title='Test 1', published=False) - post6.save() - - # Check ObjectId lookup works - obj = BlogPost.objects(id=post1.id).first() - self.assertEqual(obj, post1) - - # Check Q object combination with one does not exist - q = BlogPost.objects(Q(title='Test 5') | Q(published=True)) - posts = [post.id for post in q] - - published_posts = (post2, post3) - self.assertTrue(all(obj.id in posts for obj in published_posts)) - - q = BlogPost.objects(Q(title='Test 1') | Q(published=True)) - posts = [post.id for post in q] - published_posts = (post1, post2, post3, post5, post6) - self.assertTrue(all(obj.id in posts for obj in published_posts)) - - # Check Q object combination - date = datetime(2010, 1, 10) - q = BlogPost.objects(Q(publish_date__lte=date) | Q(published=True)) - posts = [post.id for post in q] - - published_posts = (post1, post2, post3, post4) - self.assertTrue(all(obj.id in posts for obj in published_posts)) - - self.assertFalse(any(obj.id in posts for obj in [post5, post6])) - - BlogPost.drop_collection() - - # Check the 'in' operator - self.Person(name='user1', age=20).save() - self.Person(name='user2', age=20).save() - self.Person(name='user3', age=30).save() - self.Person(name='user4', age=40).save() - - self.assertEqual(len(self.Person.objects(Q(age__in=[20]))), 2) - self.assertEqual(len(self.Person.objects(Q(age__in=[20, 30]))), 3) - - # Test invalid query objs - def wrong_query_objs(): - self.Person.objects('user1') - def wrong_query_objs_filter(): - self.Person.objects('user1') - self.assertRaises(InvalidQueryError, wrong_query_objs) - self.assertRaises(InvalidQueryError, wrong_query_objs_filter) - - def test_q_regex(self): - """Ensure that Q objects can be queried using regexes. - """ - person = self.Person(name='Guido van Rossum') - person.save() - - import re - obj = self.Person.objects(Q(name=re.compile('^Gui'))).first() - self.assertEqual(obj, person) - obj = self.Person.objects(Q(name=re.compile('^gui'))).first() - self.assertEqual(obj, None) - - obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first() - self.assertEqual(obj, person) - - obj = self.Person.objects(Q(name__not=re.compile('^bob'))).first() - self.assertEqual(obj, person) - - obj = self.Person.objects(Q(name__not=re.compile('^Gui'))).first() - self.assertEqual(obj, None) - - def test_q_lists(self): - """Ensure that Q objects query ListFields correctly. - """ - class BlogPost(Document): - tags = ListField(StringField()) - - BlogPost.drop_collection() - - BlogPost(tags=['python', 'mongo']).save() - BlogPost(tags=['python']).save() - - self.assertEqual(len(BlogPost.objects(Q(tags='mongo'))), 1) - self.assertEqual(len(BlogPost.objects(Q(tags='python'))), 2) - - BlogPost.drop_collection() - - def test_raw_query_and_Q_objects(self): - """ - Test raw plays nicely - """ - class Foo(Document): - name = StringField() - a = StringField() - b = StringField() - c = StringField() - - meta = { - 'allow_inheritance': False - } - - query = Foo.objects(__raw__={'$nor': [{'name': 'bar'}]})._query - self.assertEqual(query, {'$nor': [{'name': 'bar'}]}) - - q1 = {'$or': [{'a': 1}, {'b': 1}]} - query = Foo.objects(Q(__raw__=q1) & Q(c=1))._query - self.assertEqual(query, {'$or': [{'a': 1}, {'b': 1}], 'c': 1}) - - def test_q_merge_queries_edge_case(self): - - class User(Document): - email = EmailField(required=False) - name = StringField() - - User.drop_collection() - pk = ObjectId() - User(email='example@example.com', pk=pk).save() - - self.assertEqual(1, User.objects.filter( - Q(email='example@example.com') | - Q(name='John Doe') - ).limit(2).filter(pk=pk).count()) def test_exec_js_query(self): """Ensure that queries are properly formed for use in exec_js. @@ -1491,13 +1264,6 @@ class QuerySetTest(unittest.TestCase): c = BlogPost.objects(published=False).exec_js(js_func, 'hits') self.assertEqual(c, 1) - # Ensure that Q object queries work - c = BlogPost.objects(Q(published=True)).exec_js(js_func, 'hits') - self.assertEqual(c, 2) - - c = BlogPost.objects(Q(published=False)).exec_js(js_func, 'hits') - self.assertEqual(c, 1) - BlogPost.drop_collection() def test_exec_js_field_sub(self): @@ -2558,56 +2324,6 @@ class QuerySetTest(unittest.TestCase): BlogPost.drop_collection() - def test_query_field_name(self): - """Ensure that the correct field name is used when querying. - """ - class Comment(EmbeddedDocument): - content = StringField(db_field='commentContent') - - class BlogPost(Document): - title = StringField(db_field='postTitle') - comments = ListField(EmbeddedDocumentField(Comment), - db_field='postComments') - - - BlogPost.drop_collection() - - data = {'title': 'Post 1', 'comments': [Comment(content='test')]} - post = BlogPost(**data) - post.save() - - self.assertTrue('postTitle' in - BlogPost.objects(title=data['title'])._query) - self.assertFalse('title' in - BlogPost.objects(title=data['title'])._query) - self.assertEqual(len(BlogPost.objects(title=data['title'])), 1) - - self.assertTrue('_id' in BlogPost.objects(pk=post.id)._query) - self.assertEqual(len(BlogPost.objects(pk=post.id)), 1) - - self.assertTrue('postComments.commentContent' in - BlogPost.objects(comments__content='test')._query) - self.assertEqual(len(BlogPost.objects(comments__content='test')), 1) - - BlogPost.drop_collection() - - def test_query_pk_field_name(self): - """Ensure that the correct "primary key" field name is used when querying - """ - class BlogPost(Document): - title = StringField(primary_key=True, db_field='postTitle') - - BlogPost.drop_collection() - - data = { 'title':'Post 1' } - post = BlogPost(**data) - post.save() - - self.assertTrue('_id' in BlogPost.objects(pk=data['title'])._query) - self.assertTrue('_id' in BlogPost.objects(title=data['title'])._query) - self.assertEqual(len(BlogPost.objects(pk=data['title'])), 1) - - BlogPost.drop_collection() def test_query_value_conversion(self): """Ensure that query values are properly converted when necessary. @@ -3446,227 +3162,6 @@ class QuerySetTest(unittest.TestCase): else: self.assertEqual("[u'A1', u'A2']", "%s" % sorted(self.Person.objects.scalar('name').in_bulk(list(pks)).values())) - -class QTest(unittest.TestCase): - - def setUp(self): - connect(db='mongoenginetest') - - def test_empty_q(self): - """Ensure that empty Q objects won't hurt. - """ - q1 = Q() - q2 = Q(age__gte=18) - q3 = Q() - q4 = Q(name='test') - q5 = Q() - - class Person(Document): - name = StringField() - age = IntField() - - query = {'$or': [{'age': {'$gte': 18}}, {'name': 'test'}]} - self.assertEqual((q1 | q2 | q3 | q4 | q5).to_query(Person), query) - - query = {'age': {'$gte': 18}, 'name': 'test'} - self.assertEqual((q1 & q2 & q3 & q4 & q5).to_query(Person), query) - - def test_q_with_dbref(self): - """Ensure Q objects handle DBRefs correctly""" - connect(db='mongoenginetest') - - class User(Document): - pass - - class Post(Document): - created_user = ReferenceField(User) - - user = User.objects.create() - Post.objects.create(created_user=user) - - self.assertEqual(Post.objects.filter(created_user=user).count(), 1) - self.assertEqual(Post.objects.filter(Q(created_user=user)).count(), 1) - - def test_and_combination(self): - """Ensure that Q-objects correctly AND together. - """ - class TestDoc(Document): - x = IntField() - y = StringField() - - # Check than an error is raised when conflicting queries are anded - def invalid_combination(): - query = Q(x__lt=7) & Q(x__lt=3) - query.to_query(TestDoc) - self.assertRaises(InvalidQueryError, invalid_combination) - - # Check normal cases work without an error - query = Q(x__lt=7) & Q(x__gt=3) - - q1 = Q(x__lt=7) - q2 = Q(x__gt=3) - query = (q1 & q2).to_query(TestDoc) - self.assertEqual(query, {'x': {'$lt': 7, '$gt': 3}}) - - # More complex nested example - query = Q(x__lt=100) & Q(y__ne='NotMyString') - query &= Q(y__in=['a', 'b', 'c']) & Q(x__gt=-100) - mongo_query = { - 'x': {'$lt': 100, '$gt': -100}, - 'y': {'$ne': 'NotMyString', '$in': ['a', 'b', 'c']}, - } - self.assertEqual(query.to_query(TestDoc), mongo_query) - - def test_or_combination(self): - """Ensure that Q-objects correctly OR together. - """ - class TestDoc(Document): - x = IntField() - - q1 = Q(x__lt=3) - q2 = Q(x__gt=7) - query = (q1 | q2).to_query(TestDoc) - self.assertEqual(query, { - '$or': [ - {'x': {'$lt': 3}}, - {'x': {'$gt': 7}}, - ] - }) - - def test_and_or_combination(self): - """Ensure that Q-objects handle ANDing ORed components. - """ - class TestDoc(Document): - x = IntField() - y = BooleanField() - - query = (Q(x__gt=0) | Q(x__exists=False)) - query &= Q(x__lt=100) - self.assertEqual(query.to_query(TestDoc), { - '$or': [ - {'x': {'$lt': 100, '$gt': 0}}, - {'x': {'$lt': 100, '$exists': False}}, - ] - }) - - q1 = (Q(x__gt=0) | Q(x__exists=False)) - q2 = (Q(x__lt=100) | Q(y=True)) - query = (q1 & q2).to_query(TestDoc) - - self.assertEqual(['$or'], query.keys()) - conditions = [ - {'x': {'$lt': 100, '$gt': 0}}, - {'x': {'$lt': 100, '$exists': False}}, - {'x': {'$gt': 0}, 'y': True}, - {'x': {'$exists': False}, 'y': True}, - ] - self.assertEqual(len(conditions), len(query['$or'])) - for condition in conditions: - self.assertTrue(condition in query['$or']) - - def test_or_and_or_combination(self): - """Ensure that Q-objects handle ORing ANDed ORed components. :) - """ - class TestDoc(Document): - x = IntField() - y = BooleanField() - - q1 = (Q(x__gt=0) & (Q(y=True) | Q(y__exists=False))) - q2 = (Q(x__lt=100) & (Q(y=False) | Q(y__exists=False))) - query = (q1 | q2).to_query(TestDoc) - - self.assertEqual(['$or'], query.keys()) - conditions = [ - {'x': {'$gt': 0}, 'y': True}, - {'x': {'$gt': 0}, 'y': {'$exists': False}}, - {'x': {'$lt': 100}, 'y':False}, - {'x': {'$lt': 100}, 'y': {'$exists': False}}, - ] - self.assertEqual(len(conditions), len(query['$or'])) - for condition in conditions: - self.assertTrue(condition in query['$or']) - - - def test_q_clone(self): - - class TestDoc(Document): - x = IntField() - - TestDoc.drop_collection() - for i in xrange(1, 101): - t = TestDoc(x=i) - t.save() - - # Check normal cases work without an error - test = TestDoc.objects(Q(x__lt=7) & Q(x__gt=3)) - - self.assertEqual(test.count(), 3) - - test2 = test.clone() - self.assertEqual(test2.count(), 3) - self.assertFalse(test2 == test) - - test2.filter(x=6) - self.assertEqual(test2.count(), 1) - self.assertEqual(test.count(), 3) - -class QueryFieldListTest(unittest.TestCase): - def test_empty(self): - q = QueryFieldList() - self.assertFalse(q) - - q = QueryFieldList(always_include=['_cls']) - self.assertFalse(q) - - def test_include_include(self): - q = QueryFieldList() - q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'a': True, 'b': True}) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'b': True}) - - def test_include_exclude(self): - q = QueryFieldList() - q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'a': True, 'b': True}) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE) - self.assertEqual(q.as_dict(), {'a': True}) - - def test_exclude_exclude(self): - q = QueryFieldList() - q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE) - self.assertEqual(q.as_dict(), {'a': False, 'b': False}) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.EXCLUDE) - self.assertEqual(q.as_dict(), {'a': False, 'b': False, 'c': False}) - - def test_exclude_include(self): - q = QueryFieldList() - q += QueryFieldList(fields=['a', 'b'], value=QueryFieldList.EXCLUDE) - self.assertEqual(q.as_dict(), {'a': False, 'b': False}) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'c': True}) - - def test_always_include(self): - q = QueryFieldList(always_include=['x', 'y']) - q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True}) - - def test_reset(self): - q = QueryFieldList(always_include=['x', 'y']) - q += QueryFieldList(fields=['a', 'b', 'x'], value=QueryFieldList.EXCLUDE) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'c': True}) - q.reset() - self.assertFalse(q) - q += QueryFieldList(fields=['b', 'c'], value=QueryFieldList.ONLY) - self.assertEqual(q.as_dict(), {'x': True, 'y': True, 'b': True, 'c': True}) - - def test_using_a_slice(self): - q = QueryFieldList() - q += QueryFieldList(fields=['a'], value={"$slice": 5}) - self.assertEqual(q.as_dict(), {'a': {"$slice": 5}}) - def test_elem_match(self): class Foo(EmbeddedDocument): shape = StringField() @@ -3691,6 +3186,26 @@ class QueryFieldListTest(unittest.TestCase): ak = list(Bar.objects(foo__match={'shape': "square", "color": "purple"})) self.assertEqual([b1], ak) + def test_upsert_includes_cls(self): + """Upserts should include _cls information for inheritable classes + """ + + class Test(Document): + test = StringField() + + Test.drop_collection() + Test.objects(test='foo').update_one(upsert=True, set__test='foo') + self.assertFalse('_cls' in Test._collection.find_one()) + + class Test(Document): + meta = {'allow_inheritance': True} + test = StringField() + + Test.drop_collection() + + Test.objects(test='foo').update_one(upsert=True, set__test='foo') + self.assertTrue('_cls' in Test._collection.find_one()) + def test_read_preference(self): class Bar(Document): pass @@ -3769,26 +3284,6 @@ class QueryFieldListTest(unittest.TestCase): self.assertEqual(doc_objects, Doc.objects.from_json(json_data)) - def test_upsert_includes_cls(self): - """Upserts should include _cls information for inheritable classes - """ - - class Test(Document): - test = StringField() - - Test.drop_collection() - Test.objects(test='foo').update_one(upsert=True, set__test='foo') - self.assertFalse('_cls' in Test._collection.find_one()) - - class Test(Document): - meta = {'allow_inheritance': True} - test = StringField() - - Test.drop_collection() - - Test.objects(test='foo').update_one(upsert=True, set__test='foo') - self.assertTrue('_cls' in Test._collection.find_one()) - def test_as_pymongo(self): from decimal import Decimal diff --git a/tests/queryset/transform.py b/tests/queryset/transform.py new file mode 100644 index 00000000..666b3451 --- /dev/null +++ b/tests/queryset/transform.py @@ -0,0 +1,148 @@ +from __future__ import with_statement +import sys +sys.path[0:0] = [""] + +import unittest + +from mongoengine import * +from mongoengine.queryset import Q +from mongoengine.queryset import transform + +__all__ = ("TransformTest",) + + +class TransformTest(unittest.TestCase): + + def setUp(self): + connect(db='mongoenginetest') + + def test_transform_query(self): + """Ensure that the _transform_query function operates correctly. + """ + self.assertEqual(transform.query(name='test', age=30), + {'name': 'test', 'age': 30}) + self.assertEqual(transform.query(age__lt=30), + {'age': {'$lt': 30}}) + self.assertEqual(transform.query(age__gt=20, age__lt=50), + {'age': {'$gt': 20, '$lt': 50}}) + self.assertEqual(transform.query(age=20, age__gt=50), + {'age': 20}) + self.assertEqual(transform.query(friend__age__gte=30), + {'friend.age': {'$gte': 30}}) + self.assertEqual(transform.query(name__exists=True), + {'name': {'$exists': True}}) + + def test_query_field_name(self): + """Ensure that the correct field name is used when querying. + """ + class Comment(EmbeddedDocument): + content = StringField(db_field='commentContent') + + class BlogPost(Document): + title = StringField(db_field='postTitle') + comments = ListField(EmbeddedDocumentField(Comment), + db_field='postComments') + + BlogPost.drop_collection() + + data = {'title': 'Post 1', 'comments': [Comment(content='test')]} + post = BlogPost(**data) + post.save() + + self.assertTrue('postTitle' in + BlogPost.objects(title=data['title'])._query) + self.assertFalse('title' in + BlogPost.objects(title=data['title'])._query) + self.assertEqual(len(BlogPost.objects(title=data['title'])), 1) + + self.assertTrue('_id' in BlogPost.objects(pk=post.id)._query) + self.assertEqual(len(BlogPost.objects(pk=post.id)), 1) + + self.assertTrue('postComments.commentContent' in + BlogPost.objects(comments__content='test')._query) + self.assertEqual(len(BlogPost.objects(comments__content='test')), 1) + + BlogPost.drop_collection() + + def test_query_pk_field_name(self): + """Ensure that the correct "primary key" field name is used when + querying + """ + class BlogPost(Document): + title = StringField(primary_key=True, db_field='postTitle') + + BlogPost.drop_collection() + + data = {'title': 'Post 1'} + post = BlogPost(**data) + post.save() + + self.assertTrue('_id' in BlogPost.objects(pk=data['title'])._query) + self.assertTrue('_id' in BlogPost.objects(title=data['title'])._query) + self.assertEqual(len(BlogPost.objects(pk=data['title'])), 1) + + BlogPost.drop_collection() + + def test_chaining(self): + class A(Document): + pass + + class B(Document): + a = ReferenceField(A) + + A.drop_collection() + B.drop_collection() + + a1 = A().save() + a2 = A().save() + + B(a=a1).save() + + # Works + q1 = B.objects.filter(a__in=[a1, a2], a=a1)._query + + # Doesn't work + q2 = B.objects.filter(a__in=[a1, a2]) + q2 = q2.filter(a=a1)._query + + self.assertEqual(q1, q2) + + def test_raw_query_and_Q_objects(self): + """ + Test raw plays nicely + """ + class Foo(Document): + name = StringField() + a = StringField() + b = StringField() + c = StringField() + + meta = { + 'allow_inheritance': False + } + + query = Foo.objects(__raw__={'$nor': [{'name': 'bar'}]})._query + self.assertEqual(query, {'$nor': [{'name': 'bar'}]}) + + q1 = {'$or': [{'a': 1}, {'b': 1}]} + query = Foo.objects(Q(__raw__=q1) & Q(c=1))._query + self.assertEqual(query, {'$or': [{'a': 1}, {'b': 1}], 'c': 1}) + + def test_raw_and_merging(self): + class Doc(Document): + meta = {'allow_inheritance': False} + + raw_query = Doc.objects(__raw__={'deleted': False, + 'scraped': 'yes', + '$nor': [{'views.extracted': 'no'}, + {'attachments.views.extracted':'no'}] + })._query + + expected = {'deleted': False, 'scraped': 'yes', + '$nor': [{'views.extracted': 'no'}, + {'attachments.views.extracted': 'no'}]} + self.assertEqual(expected, raw_query) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/queryset/visitor.py b/tests/queryset/visitor.py new file mode 100644 index 00000000..71c35615 --- /dev/null +++ b/tests/queryset/visitor.py @@ -0,0 +1,310 @@ +from __future__ import with_statement +import sys +sys.path[0:0] = [""] + +import unittest + +from bson import ObjectId +from datetime import datetime + +from mongoengine import * +from mongoengine.queryset import Q +from mongoengine.errors import InvalidQueryError + +__all__ = ("QTest",) + + +class QTest(unittest.TestCase): + + def setUp(self): + connect(db='mongoenginetest') + + class Person(Document): + name = StringField() + age = IntField() + meta = {'allow_inheritance': True} + + Person.drop_collection() + self.Person = Person + + def test_empty_q(self): + """Ensure that empty Q objects won't hurt. + """ + q1 = Q() + q2 = Q(age__gte=18) + q3 = Q() + q4 = Q(name='test') + q5 = Q() + + class Person(Document): + name = StringField() + age = IntField() + + query = {'$or': [{'age': {'$gte': 18}}, {'name': 'test'}]} + self.assertEqual((q1 | q2 | q3 | q4 | q5).to_query(Person), query) + + query = {'age': {'$gte': 18}, 'name': 'test'} + self.assertEqual((q1 & q2 & q3 & q4 & q5).to_query(Person), query) + + def test_q_with_dbref(self): + """Ensure Q objects handle DBRefs correctly""" + connect(db='mongoenginetest') + + class User(Document): + pass + + class Post(Document): + created_user = ReferenceField(User) + + user = User.objects.create() + Post.objects.create(created_user=user) + + self.assertEqual(Post.objects.filter(created_user=user).count(), 1) + self.assertEqual(Post.objects.filter(Q(created_user=user)).count(), 1) + + def test_and_combination(self): + """Ensure that Q-objects correctly AND together. + """ + class TestDoc(Document): + x = IntField() + y = StringField() + + # Check than an error is raised when conflicting queries are anded + def invalid_combination(): + query = Q(x__lt=7) & Q(x__lt=3) + query.to_query(TestDoc) + self.assertRaises(InvalidQueryError, invalid_combination) + + # Check normal cases work without an error + query = Q(x__lt=7) & Q(x__gt=3) + + q1 = Q(x__lt=7) + q2 = Q(x__gt=3) + query = (q1 & q2).to_query(TestDoc) + self.assertEqual(query, {'x': {'$lt': 7, '$gt': 3}}) + + # More complex nested example + query = Q(x__lt=100) & Q(y__ne='NotMyString') + query &= Q(y__in=['a', 'b', 'c']) & Q(x__gt=-100) + mongo_query = { + 'x': {'$lt': 100, '$gt': -100}, + 'y': {'$ne': 'NotMyString', '$in': ['a', 'b', 'c']}, + } + self.assertEqual(query.to_query(TestDoc), mongo_query) + + def test_or_combination(self): + """Ensure that Q-objects correctly OR together. + """ + class TestDoc(Document): + x = IntField() + + q1 = Q(x__lt=3) + q2 = Q(x__gt=7) + query = (q1 | q2).to_query(TestDoc) + self.assertEqual(query, { + '$or': [ + {'x': {'$lt': 3}}, + {'x': {'$gt': 7}}, + ] + }) + + def test_and_or_combination(self): + """Ensure that Q-objects handle ANDing ORed components. + """ + class TestDoc(Document): + x = IntField() + y = BooleanField() + + query = (Q(x__gt=0) | Q(x__exists=False)) + query &= Q(x__lt=100) + self.assertEqual(query.to_query(TestDoc), { + '$or': [ + {'x': {'$lt': 100, '$gt': 0}}, + {'x': {'$lt': 100, '$exists': False}}, + ] + }) + + q1 = (Q(x__gt=0) | Q(x__exists=False)) + q2 = (Q(x__lt=100) | Q(y=True)) + query = (q1 & q2).to_query(TestDoc) + + self.assertEqual(['$or'], query.keys()) + conditions = [ + {'x': {'$lt': 100, '$gt': 0}}, + {'x': {'$lt': 100, '$exists': False}}, + {'x': {'$gt': 0}, 'y': True}, + {'x': {'$exists': False}, 'y': True}, + ] + self.assertEqual(len(conditions), len(query['$or'])) + for condition in conditions: + self.assertTrue(condition in query['$or']) + + def test_or_and_or_combination(self): + """Ensure that Q-objects handle ORing ANDed ORed components. :) + """ + class TestDoc(Document): + x = IntField() + y = BooleanField() + + q1 = (Q(x__gt=0) & (Q(y=True) | Q(y__exists=False))) + q2 = (Q(x__lt=100) & (Q(y=False) | Q(y__exists=False))) + query = (q1 | q2).to_query(TestDoc) + + self.assertEqual(['$or'], query.keys()) + conditions = [ + {'x': {'$gt': 0}, 'y': True}, + {'x': {'$gt': 0}, 'y': {'$exists': False}}, + {'x': {'$lt': 100}, 'y':False}, + {'x': {'$lt': 100}, 'y': {'$exists': False}}, + ] + self.assertEqual(len(conditions), len(query['$or'])) + for condition in conditions: + self.assertTrue(condition in query['$or']) + + def test_q_clone(self): + + class TestDoc(Document): + x = IntField() + + TestDoc.drop_collection() + for i in xrange(1, 101): + t = TestDoc(x=i) + t.save() + + # Check normal cases work without an error + test = TestDoc.objects(Q(x__lt=7) & Q(x__gt=3)) + + self.assertEqual(test.count(), 3) + + test2 = test.clone() + self.assertEqual(test2.count(), 3) + self.assertFalse(test2 == test) + + test2.filter(x=6) + self.assertEqual(test2.count(), 1) + self.assertEqual(test.count(), 3) + + def test_q(self): + """Ensure that Q objects may be used to query for documents. + """ + class BlogPost(Document): + title = StringField() + publish_date = DateTimeField() + published = BooleanField() + + BlogPost.drop_collection() + + post1 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 8), published=False) + post1.save() + + post2 = BlogPost(title='Test 2', publish_date=datetime(2010, 1, 15), published=True) + post2.save() + + post3 = BlogPost(title='Test 3', published=True) + post3.save() + + post4 = BlogPost(title='Test 4', publish_date=datetime(2010, 1, 8)) + post4.save() + + post5 = BlogPost(title='Test 1', publish_date=datetime(2010, 1, 15)) + post5.save() + + post6 = BlogPost(title='Test 1', published=False) + post6.save() + + # Check ObjectId lookup works + obj = BlogPost.objects(id=post1.id).first() + self.assertEqual(obj, post1) + + # Check Q object combination with one does not exist + q = BlogPost.objects(Q(title='Test 5') | Q(published=True)) + posts = [post.id for post in q] + + published_posts = (post2, post3) + self.assertTrue(all(obj.id in posts for obj in published_posts)) + + q = BlogPost.objects(Q(title='Test 1') | Q(published=True)) + posts = [post.id for post in q] + published_posts = (post1, post2, post3, post5, post6) + self.assertTrue(all(obj.id in posts for obj in published_posts)) + + # Check Q object combination + date = datetime(2010, 1, 10) + q = BlogPost.objects(Q(publish_date__lte=date) | Q(published=True)) + posts = [post.id for post in q] + + published_posts = (post1, post2, post3, post4) + self.assertTrue(all(obj.id in posts for obj in published_posts)) + + self.assertFalse(any(obj.id in posts for obj in [post5, post6])) + + BlogPost.drop_collection() + + # Check the 'in' operator + self.Person(name='user1', age=20).save() + self.Person(name='user2', age=20).save() + self.Person(name='user3', age=30).save() + self.Person(name='user4', age=40).save() + + self.assertEqual(len(self.Person.objects(Q(age__in=[20]))), 2) + self.assertEqual(len(self.Person.objects(Q(age__in=[20, 30]))), 3) + + # Test invalid query objs + def wrong_query_objs(): + self.Person.objects('user1') + def wrong_query_objs_filter(): + self.Person.objects('user1') + self.assertRaises(InvalidQueryError, wrong_query_objs) + self.assertRaises(InvalidQueryError, wrong_query_objs_filter) + + def test_q_regex(self): + """Ensure that Q objects can be queried using regexes. + """ + person = self.Person(name='Guido van Rossum') + person.save() + + import re + obj = self.Person.objects(Q(name=re.compile('^Gui'))).first() + self.assertEqual(obj, person) + obj = self.Person.objects(Q(name=re.compile('^gui'))).first() + self.assertEqual(obj, None) + + obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first() + self.assertEqual(obj, person) + + obj = self.Person.objects(Q(name__not=re.compile('^bob'))).first() + self.assertEqual(obj, person) + + obj = self.Person.objects(Q(name__not=re.compile('^Gui'))).first() + self.assertEqual(obj, None) + + def test_q_lists(self): + """Ensure that Q objects query ListFields correctly. + """ + class BlogPost(Document): + tags = ListField(StringField()) + + BlogPost.drop_collection() + + BlogPost(tags=['python', 'mongo']).save() + BlogPost(tags=['python']).save() + + self.assertEqual(len(BlogPost.objects(Q(tags='mongo'))), 1) + self.assertEqual(len(BlogPost.objects(Q(tags='python'))), 2) + + BlogPost.drop_collection() + + def test_q_merge_queries_edge_case(self): + + class User(Document): + email = EmailField(required=False) + name = StringField() + + User.drop_collection() + pk = ObjectId() + User(email='example@example.com', pk=pk).save() + + self.assertEqual(1, User.objects.filter( + Q(email='example@example.com') | + Q(name='John Doe') + ).limit(2).filter(pk=pk).count()) \ No newline at end of file