165 lines
5.9 KiB
Python
165 lines
5.9 KiB
Python
import unittest
|
|
import pymongo
|
|
|
|
from mongoengine.queryset import QuerySet
|
|
from mongoengine import *
|
|
|
|
|
|
class QuerySetTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
connect(db='mongoenginetest')
|
|
|
|
class Person(Document):
|
|
name = StringField()
|
|
age = IntField()
|
|
self.Person = Person
|
|
|
|
def test_initialisation(self):
|
|
"""Ensure that CollectionManager is correctly initialised.
|
|
"""
|
|
self.assertTrue(isinstance(self.Person.objects, QuerySet))
|
|
self.assertEqual(self.Person.objects._collection.name(),
|
|
self.Person._meta['collection'])
|
|
self.assertTrue(isinstance(self.Person.objects._collection,
|
|
pymongo.collection.Collection))
|
|
|
|
def test_transform_query(self):
|
|
"""Ensure that the _transform_query function operates correctly.
|
|
"""
|
|
self.assertEqual(QuerySet._transform_query(name='test', age=30),
|
|
{'name': 'test', 'age': 30})
|
|
self.assertEqual(QuerySet._transform_query(age__lt=30),
|
|
{'age': {'$lt': 30}})
|
|
self.assertEqual(QuerySet._transform_query(age__gt=20, age__lt=50),
|
|
{'age': {'$gt': 20, '$lt': 50}})
|
|
self.assertEqual(QuerySet._transform_query(age=20, age__gt=50),
|
|
{'age': 20})
|
|
self.assertEqual(QuerySet._transform_query(friend__age__gte=30),
|
|
{'friend.age': {'$gte': 30}})
|
|
self.assertEqual(QuerySet._transform_query(name__exists=True),
|
|
{'name': {'$exists': True}})
|
|
|
|
def test_find(self):
|
|
"""Ensure that a query returns a valid set of results.
|
|
"""
|
|
person1 = self.Person(name="User A", age=20)
|
|
person1.save()
|
|
person2 = self.Person(name="User B", age=30)
|
|
person2.save()
|
|
|
|
# Find all people in the collection
|
|
people = self.Person.objects
|
|
self.assertEqual(people.count(), 2)
|
|
results = list(people)
|
|
self.assertTrue(isinstance(results[0], self.Person))
|
|
self.assertTrue(isinstance(results[0].id, (pymongo.objectid.ObjectId,
|
|
str, unicode)))
|
|
self.assertEqual(results[0].name, "User A")
|
|
self.assertEqual(results[0].age, 20)
|
|
self.assertEqual(results[1].name, "User B")
|
|
self.assertEqual(results[1].age, 30)
|
|
|
|
# Use a query to filter the people found to just person1
|
|
people = self.Person.objects(age=20)
|
|
self.assertEqual(people.count(), 1)
|
|
person = people.next()
|
|
self.assertEqual(person.name, "User A")
|
|
self.assertEqual(person.age, 20)
|
|
|
|
# Test limit
|
|
people = list(self.Person.objects.limit(1))
|
|
self.assertEqual(len(people), 1)
|
|
self.assertEqual(people[0].name, 'User A')
|
|
|
|
# Test skip
|
|
people = list(self.Person.objects.skip(1))
|
|
self.assertEqual(len(people), 1)
|
|
self.assertEqual(people[0].name, 'User B')
|
|
|
|
def test_find_one(self):
|
|
"""Ensure that a query using find_one returns a valid result.
|
|
"""
|
|
person1 = self.Person(name="User A", age=20)
|
|
person1.save()
|
|
person2 = self.Person(name="User B", age=30)
|
|
person2.save()
|
|
|
|
# Retrieve the first person from the database
|
|
person = self.Person.objects.first()
|
|
self.assertTrue(isinstance(person, self.Person))
|
|
self.assertEqual(person.name, "User A")
|
|
self.assertEqual(person.age, 20)
|
|
|
|
# Use a query to filter the people found to just person2
|
|
person = self.Person.objects(age=30).first()
|
|
self.assertEqual(person.name, "User B")
|
|
|
|
person = self.Person.objects(age__lt=30).first()
|
|
self.assertEqual(person.name, "User A")
|
|
|
|
# Find a document using just the object id
|
|
person = self.Person.objects.with_id(person1.id)
|
|
self.assertEqual(person.name, "User A")
|
|
|
|
def test_find_embedded(self):
|
|
"""Ensure that an embedded document is properly returned from a query.
|
|
"""
|
|
class User(EmbeddedDocument):
|
|
name = StringField()
|
|
|
|
class BlogPost(Document):
|
|
content = StringField()
|
|
author = EmbeddedDocumentField(User)
|
|
|
|
post = BlogPost(content='Had a good coffee today...')
|
|
post.author = User(name='Test User')
|
|
post.save()
|
|
|
|
result = BlogPost.objects.first()
|
|
self.assertTrue(isinstance(result.author, User))
|
|
self.assertEqual(result.author.name, 'Test User')
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
def test_delete(self):
|
|
"""Ensure that documents are properly deleted from the database.
|
|
"""
|
|
self.Person(name="User A", age=20).save()
|
|
self.Person(name="User B", age=30).save()
|
|
self.Person(name="User C", age=40).save()
|
|
|
|
self.assertEqual(self.Person.objects.count(), 3)
|
|
|
|
self.Person.objects(age__lt=30).delete()
|
|
self.assertEqual(self.Person.objects.count(), 2)
|
|
|
|
self.Person.objects.delete()
|
|
self.assertEqual(self.Person.objects.count(), 0)
|
|
|
|
def test_sort(self):
|
|
"""Ensure that QuerySets may be sorted.
|
|
"""
|
|
self.Person(name="User A", age=20).save()
|
|
self.Person(name="User B", age=40).save()
|
|
self.Person(name="User C", age=30).save()
|
|
|
|
names = [p.name for p in self.Person.objects.sort('-age')]
|
|
self.assertEqual(names, ['User B', 'User C', 'User A'])
|
|
|
|
names = [p.name for p in self.Person.objects.sort('+age')]
|
|
self.assertEqual(names, ['User A', 'User C', 'User B'])
|
|
|
|
names = [p.name for p in self.Person.objects.sort('age')]
|
|
self.assertEqual(names, ['User A', 'User C', 'User B'])
|
|
|
|
ages = [p.age for p in self.Person.objects.sort('-name')]
|
|
self.assertEqual(ages, [30, 40, 20])
|
|
|
|
def tearDown(self):
|
|
self.Person.drop_collection()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|