607 lines
20 KiB
Python
607 lines
20 KiB
Python
import unittest
|
|
from datetime import datetime
|
|
import pymongo
|
|
|
|
from mongoengine import *
|
|
from mongoengine.connection import _get_db
|
|
|
|
|
|
class DocumentTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
connect(db='mongoenginetest')
|
|
self.db = _get_db()
|
|
|
|
class Person(Document):
|
|
name = StringField()
|
|
age = IntField()
|
|
self.Person = Person
|
|
|
|
def test_drop_collection(self):
|
|
"""Ensure that the collection may be dropped from the database.
|
|
"""
|
|
self.Person(name='Test').save()
|
|
|
|
collection = self.Person._meta['collection']
|
|
self.assertTrue(collection in self.db.collection_names())
|
|
|
|
self.Person.drop_collection()
|
|
self.assertFalse(collection in self.db.collection_names())
|
|
|
|
def test_definition(self):
|
|
"""Ensure that document may be defined using fields.
|
|
"""
|
|
name_field = StringField()
|
|
age_field = IntField()
|
|
|
|
class Person(Document):
|
|
name = name_field
|
|
age = age_field
|
|
non_field = True
|
|
|
|
self.assertEqual(Person._fields['name'], name_field)
|
|
self.assertEqual(Person._fields['age'], age_field)
|
|
self.assertFalse('non_field' in Person._fields)
|
|
self.assertTrue('id' in Person._fields)
|
|
# Test iteration over fields
|
|
fields = list(Person())
|
|
self.assertTrue('name' in fields and 'age' in fields)
|
|
# Ensure Document isn't treated like an actual document
|
|
self.assertFalse(hasattr(Document, '_fields'))
|
|
|
|
def test_get_superclasses(self):
|
|
"""Ensure that the correct list of superclasses is assembled.
|
|
"""
|
|
class Animal(Document): pass
|
|
class Fish(Animal): pass
|
|
class Mammal(Animal): pass
|
|
class Human(Mammal): pass
|
|
class Dog(Mammal): pass
|
|
|
|
mammal_superclasses = {'Animal': Animal}
|
|
self.assertEqual(Mammal._superclasses, mammal_superclasses)
|
|
|
|
dog_superclasses = {
|
|
'Animal': Animal,
|
|
'Animal.Mammal': Mammal,
|
|
}
|
|
self.assertEqual(Dog._superclasses, dog_superclasses)
|
|
|
|
def test_get_subclasses(self):
|
|
"""Ensure that the correct list of subclasses is retrieved by the
|
|
_get_subclasses method.
|
|
"""
|
|
class Animal(Document): pass
|
|
class Fish(Animal): pass
|
|
class Mammal(Animal): pass
|
|
class Human(Mammal): pass
|
|
class Dog(Mammal): pass
|
|
|
|
mammal_subclasses = {
|
|
'Animal.Mammal.Dog': Dog,
|
|
'Animal.Mammal.Human': Human
|
|
}
|
|
self.assertEqual(Mammal._get_subclasses(), mammal_subclasses)
|
|
|
|
animal_subclasses = {
|
|
'Animal.Fish': Fish,
|
|
'Animal.Mammal': Mammal,
|
|
'Animal.Mammal.Dog': Dog,
|
|
'Animal.Mammal.Human': Human
|
|
}
|
|
self.assertEqual(Animal._get_subclasses(), animal_subclasses)
|
|
|
|
def test_polymorphic_queries(self):
|
|
"""Ensure that the correct subclasses are returned from a query"""
|
|
class Animal(Document): pass
|
|
class Fish(Animal): pass
|
|
class Mammal(Animal): pass
|
|
class Human(Mammal): pass
|
|
class Dog(Mammal): pass
|
|
|
|
Animal().save()
|
|
Fish().save()
|
|
Mammal().save()
|
|
Human().save()
|
|
Dog().save()
|
|
|
|
classes = [obj.__class__ for obj in Animal.objects]
|
|
self.assertEqual(classes, [Animal, Fish, Mammal, Human, Dog])
|
|
|
|
classes = [obj.__class__ for obj in Mammal.objects]
|
|
self.assertEqual(classes, [Mammal, Human, Dog])
|
|
|
|
classes = [obj.__class__ for obj in Human.objects]
|
|
self.assertEqual(classes, [Human])
|
|
|
|
Animal.drop_collection()
|
|
|
|
def test_inheritance(self):
|
|
"""Ensure that document may inherit fields from a superclass document.
|
|
"""
|
|
class Employee(self.Person):
|
|
salary = IntField()
|
|
|
|
self.assertTrue('name' in Employee._fields)
|
|
self.assertTrue('salary' in Employee._fields)
|
|
self.assertEqual(Employee._meta['collection'],
|
|
self.Person._meta['collection'])
|
|
|
|
# Ensure that MRO error is not raised
|
|
class A(Document): pass
|
|
class B(A): pass
|
|
class C(B): pass
|
|
|
|
def test_allow_inheritance(self):
|
|
"""Ensure that inheritance may be disabled on simple classes and that
|
|
_cls and _types will not be used.
|
|
"""
|
|
class Animal(Document):
|
|
meta = {'allow_inheritance': False}
|
|
name = StringField()
|
|
|
|
Animal.drop_collection()
|
|
|
|
def create_dog_class():
|
|
class Dog(Animal):
|
|
pass
|
|
self.assertRaises(ValueError, create_dog_class)
|
|
|
|
# Check that _cls etc aren't present on simple documents
|
|
dog = Animal(name='dog')
|
|
dog.save()
|
|
collection = self.db[Animal._meta['collection']]
|
|
obj = collection.find_one()
|
|
self.assertFalse('_cls' in obj)
|
|
self.assertFalse('_types' in obj)
|
|
|
|
Animal.drop_collection()
|
|
|
|
def create_employee_class():
|
|
class Employee(self.Person):
|
|
meta = {'allow_inheritance': False}
|
|
self.assertRaises(ValueError, create_employee_class)
|
|
|
|
# Test the same for embedded documents
|
|
class Comment(EmbeddedDocument):
|
|
content = StringField()
|
|
meta = {'allow_inheritance': False}
|
|
|
|
def create_special_comment():
|
|
class SpecialComment(Comment):
|
|
pass
|
|
self.assertRaises(ValueError, create_special_comment)
|
|
|
|
comment = Comment(content='test')
|
|
self.assertFalse('_cls' in comment.to_mongo())
|
|
self.assertFalse('_types' in comment.to_mongo())
|
|
|
|
def test_collection_name(self):
|
|
"""Ensure that a collection with a specified name may be used.
|
|
"""
|
|
collection = 'personCollTest'
|
|
if collection in self.db.collection_names():
|
|
self.db.drop_collection(collection)
|
|
|
|
class Person(Document):
|
|
name = StringField()
|
|
meta = {'collection': collection}
|
|
|
|
user = Person(name="Test User")
|
|
user.save()
|
|
self.assertTrue(collection in self.db.collection_names())
|
|
|
|
user_obj = self.db[collection].find_one()
|
|
self.assertEqual(user_obj['name'], "Test User")
|
|
|
|
user_obj = Person.objects[0]
|
|
self.assertEqual(user_obj.name, "Test User")
|
|
|
|
Person.drop_collection()
|
|
self.assertFalse(collection in self.db.collection_names())
|
|
|
|
def test_inherited_collections(self):
|
|
"""Ensure that subclassed documents don't override parents' collections.
|
|
"""
|
|
class Drink(Document):
|
|
name = StringField()
|
|
|
|
class AlcoholicDrink(Drink):
|
|
meta = {'collection': 'booze'}
|
|
|
|
class Drinker(Document):
|
|
drink = GenericReferenceField()
|
|
|
|
Drink.drop_collection()
|
|
AlcoholicDrink.drop_collection()
|
|
Drinker.drop_collection()
|
|
|
|
red_bull = Drink(name='Red Bull')
|
|
red_bull.save()
|
|
|
|
programmer = Drinker(drink=red_bull)
|
|
programmer.save()
|
|
|
|
beer = AlcoholicDrink(name='Beer')
|
|
beer.save()
|
|
|
|
real_person = Drinker(drink=beer)
|
|
real_person.save()
|
|
|
|
self.assertEqual(Drinker.objects[0].drink.name, red_bull.name)
|
|
self.assertEqual(Drinker.objects[1].drink.name, beer.name)
|
|
|
|
def test_capped_collection(self):
|
|
"""Ensure that capped collections work properly.
|
|
"""
|
|
class Log(Document):
|
|
date = DateTimeField(default=datetime.now)
|
|
meta = {
|
|
'max_documents': 10,
|
|
'max_size': 90000,
|
|
}
|
|
|
|
Log.drop_collection()
|
|
|
|
# Ensure that the collection handles up to its maximum
|
|
for i in range(10):
|
|
Log().save()
|
|
|
|
self.assertEqual(len(Log.objects), 10)
|
|
|
|
# Check that extra documents don't increase the size
|
|
Log().save()
|
|
self.assertEqual(len(Log.objects), 10)
|
|
|
|
options = Log.objects._collection.options()
|
|
self.assertEqual(options['capped'], True)
|
|
self.assertEqual(options['max'], 10)
|
|
self.assertEqual(options['size'], 90000)
|
|
|
|
# Check that the document cannot be redefined with different options
|
|
def recreate_log_document():
|
|
class Log(Document):
|
|
date = DateTimeField(default=datetime.now)
|
|
meta = {
|
|
'max_documents': 11,
|
|
}
|
|
# Create the collection by accessing Document.objects
|
|
Log.objects
|
|
self.assertRaises(InvalidCollectionError, recreate_log_document)
|
|
|
|
Log.drop_collection()
|
|
|
|
def test_indexes(self):
|
|
"""Ensure that indexes are used when meta[indexes] is specified.
|
|
"""
|
|
class BlogPost(Document):
|
|
date = DateTimeField(db_field='addDate', default=datetime.now)
|
|
category = StringField()
|
|
tags = ListField(StringField())
|
|
meta = {
|
|
'indexes': [
|
|
'-date',
|
|
'tags',
|
|
('category', '-date')
|
|
],
|
|
}
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
info = BlogPost.objects._collection.index_information()
|
|
# _id, types, '-date', 'tags', ('cat', 'date')
|
|
self.assertEqual(len(info), 5)
|
|
|
|
# Indexes are lazy so use list() to perform query
|
|
list(BlogPost.objects)
|
|
info = BlogPost.objects._collection.index_information()
|
|
info = [value['key'] for key, value in info.iteritems()]
|
|
self.assertTrue([('_types', 1), ('category', 1), ('addDate', -1)]
|
|
in info)
|
|
self.assertTrue([('_types', 1), ('addDate', -1)] in info)
|
|
# tags is a list field so it shouldn't have _types in the index
|
|
self.assertTrue([('tags', 1)] in info)
|
|
|
|
class ExtendedBlogPost(BlogPost):
|
|
title = StringField()
|
|
meta = {'indexes': ['title']}
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
list(ExtendedBlogPost.objects)
|
|
info = ExtendedBlogPost.objects._collection.index_information()
|
|
info = [value['key'] for key, value in info.iteritems()]
|
|
self.assertTrue([('_types', 1), ('category', 1), ('addDate', -1)]
|
|
in info)
|
|
self.assertTrue([('_types', 1), ('addDate', -1)] in info)
|
|
self.assertTrue([('_types', 1), ('title', 1)] in info)
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
def test_unique(self):
|
|
"""Ensure that uniqueness constraints are applied to fields.
|
|
"""
|
|
class BlogPost(Document):
|
|
title = StringField()
|
|
slug = StringField(unique=True)
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
post1 = BlogPost(title='test1', slug='test')
|
|
post1.save()
|
|
|
|
# Two posts with the same slug is not allowed
|
|
post2 = BlogPost(title='test2', slug='test')
|
|
self.assertRaises(OperationError, post2.save)
|
|
|
|
class Date(EmbeddedDocument):
|
|
year = IntField(db_field='yr')
|
|
|
|
class BlogPost(Document):
|
|
title = StringField()
|
|
date = EmbeddedDocumentField(Date)
|
|
slug = StringField(unique_with='date.year')
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
post1 = BlogPost(title='test1', date=Date(year=2009), slug='test')
|
|
post1.save()
|
|
|
|
# day is different so won't raise exception
|
|
post2 = BlogPost(title='test2', date=Date(year=2010), slug='test')
|
|
post2.save()
|
|
|
|
# Now there will be two docs with the same slug and the same day: fail
|
|
post3 = BlogPost(title='test3', date=Date(year=2010), slug='test')
|
|
self.assertRaises(OperationError, post3.save)
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
def test_custom_id_field(self):
|
|
"""Ensure that documents may be created with custom primary keys.
|
|
"""
|
|
class User(Document):
|
|
username = StringField(primary_key=True)
|
|
name = StringField()
|
|
|
|
User.drop_collection()
|
|
|
|
self.assertEqual(User._fields['username'].db_field, '_id')
|
|
self.assertEqual(User._meta['id_field'], 'username')
|
|
|
|
def create_invalid_user():
|
|
User(name='test').save() # no primary key field
|
|
self.assertRaises(ValidationError, create_invalid_user)
|
|
|
|
def define_invalid_user():
|
|
class EmailUser(User):
|
|
email = StringField(primary_key=True)
|
|
self.assertRaises(ValueError, define_invalid_user)
|
|
|
|
class EmailUser(User):
|
|
email = StringField()
|
|
|
|
user = User(username='test', name='test user')
|
|
user.save()
|
|
|
|
user_obj = User.objects.first()
|
|
self.assertEqual(user_obj.id, 'test')
|
|
|
|
user_son = User.objects._collection.find_one()
|
|
self.assertEqual(user_son['_id'], 'test')
|
|
self.assertTrue('username' not in user_son['_id'])
|
|
|
|
User.drop_collection()
|
|
|
|
def test_creation(self):
|
|
"""Ensure that document may be created using keyword arguments.
|
|
"""
|
|
person = self.Person(name="Test User", age=30)
|
|
self.assertEqual(person.name, "Test User")
|
|
self.assertEqual(person.age, 30)
|
|
|
|
def test_reload(self):
|
|
"""Ensure that attributes may be reloaded.
|
|
"""
|
|
person = self.Person(name="Test User", age=20)
|
|
person.save()
|
|
|
|
person_obj = self.Person.objects.first()
|
|
person_obj.name = "Mr Test User"
|
|
person_obj.age = 21
|
|
person_obj.save()
|
|
|
|
self.assertEqual(person.name, "Test User")
|
|
self.assertEqual(person.age, 20)
|
|
|
|
person.reload()
|
|
self.assertEqual(person.name, "Mr Test User")
|
|
self.assertEqual(person.age, 21)
|
|
|
|
def test_dictionary_access(self):
|
|
"""Ensure that dictionary-style field access works properly.
|
|
"""
|
|
person = self.Person(name='Test User', age=30)
|
|
self.assertEquals(person['name'], 'Test User')
|
|
|
|
self.assertRaises(KeyError, person.__getitem__, 'salary')
|
|
self.assertRaises(KeyError, person.__setitem__, 'salary', 50)
|
|
|
|
person['name'] = 'Another User'
|
|
self.assertEquals(person['name'], 'Another User')
|
|
|
|
# Length = length(assigned fields + id)
|
|
self.assertEquals(len(person), 3)
|
|
|
|
self.assertTrue('age' in person)
|
|
person.age = None
|
|
self.assertFalse('age' in person)
|
|
self.assertFalse('nationality' in person)
|
|
|
|
def test_embedded_document(self):
|
|
"""Ensure that embedded documents are set up correctly.
|
|
"""
|
|
class Comment(EmbeddedDocument):
|
|
content = StringField()
|
|
|
|
self.assertTrue('content' in Comment._fields)
|
|
self.assertFalse('id' in Comment._fields)
|
|
self.assertFalse('collection' in Comment._meta)
|
|
|
|
def test_embedded_document_validation(self):
|
|
"""Ensure that embedded documents may be validated.
|
|
"""
|
|
class Comment(EmbeddedDocument):
|
|
date = DateTimeField()
|
|
content = StringField(required=True)
|
|
|
|
comment = Comment()
|
|
self.assertRaises(ValidationError, comment.validate)
|
|
|
|
comment.content = 'test'
|
|
comment.validate()
|
|
|
|
comment.date = 4
|
|
self.assertRaises(ValidationError, comment.validate)
|
|
|
|
comment.date = datetime.now()
|
|
comment.validate()
|
|
|
|
def test_save(self):
|
|
"""Ensure that a document may be saved in the database.
|
|
"""
|
|
# Create person object and save it to the database
|
|
person = self.Person(name='Test User', age=30)
|
|
person.save()
|
|
# Ensure that the object is in the database
|
|
collection = self.db[self.Person._meta['collection']]
|
|
person_obj = collection.find_one({'name': 'Test User'})
|
|
self.assertEqual(person_obj['name'], 'Test User')
|
|
self.assertEqual(person_obj['age'], 30)
|
|
self.assertEqual(person_obj['_id'], person.id)
|
|
# Test skipping validation on save
|
|
class Recipient(Document):
|
|
email = EmailField(required=True)
|
|
|
|
recipient = Recipient(email='root@localhost')
|
|
self.assertRaises(ValidationError, recipient.save)
|
|
try:
|
|
recipient.save(validate=False)
|
|
except ValidationError:
|
|
fail()
|
|
|
|
def test_delete(self):
|
|
"""Ensure that document may be deleted using the delete method.
|
|
"""
|
|
person = self.Person(name="Test User", age=30)
|
|
person.save()
|
|
self.assertEqual(len(self.Person.objects), 1)
|
|
person.delete()
|
|
self.assertEqual(len(self.Person.objects), 0)
|
|
|
|
def test_save_custom_id(self):
|
|
"""Ensure that a document may be saved with a custom _id.
|
|
"""
|
|
# Create person object and save it to the database
|
|
person = self.Person(name='Test User', age=30,
|
|
id='497ce96f395f2f052a494fd4')
|
|
person.save()
|
|
# Ensure that the object is in the database with the correct _id
|
|
collection = self.db[self.Person._meta['collection']]
|
|
person_obj = collection.find_one({'name': 'Test User'})
|
|
self.assertEqual(str(person_obj['_id']), '497ce96f395f2f052a494fd4')
|
|
|
|
def test_save_list(self):
|
|
"""Ensure that a list field may be properly saved.
|
|
"""
|
|
class Comment(EmbeddedDocument):
|
|
content = StringField()
|
|
|
|
class BlogPost(Document):
|
|
content = StringField()
|
|
comments = ListField(EmbeddedDocumentField(Comment))
|
|
tags = ListField(StringField())
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
post = BlogPost(content='Went for a walk today...')
|
|
post.tags = tags = ['fun', 'leisure']
|
|
comments = [Comment(content='Good for you'), Comment(content='Yay.')]
|
|
post.comments = comments
|
|
post.save()
|
|
|
|
collection = self.db[BlogPost._meta['collection']]
|
|
post_obj = collection.find_one()
|
|
self.assertEqual(post_obj['tags'], tags)
|
|
for comment_obj, comment in zip(post_obj['comments'], comments):
|
|
self.assertEqual(comment_obj['content'], comment['content'])
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
def test_save_embedded_document(self):
|
|
"""Ensure that a document with an embedded document field may be
|
|
saved in the database.
|
|
"""
|
|
class EmployeeDetails(EmbeddedDocument):
|
|
position = StringField()
|
|
|
|
class Employee(self.Person):
|
|
salary = IntField()
|
|
details = EmbeddedDocumentField(EmployeeDetails)
|
|
|
|
# Create employee object and save it to the database
|
|
employee = Employee(name='Test Employee', age=50, salary=20000)
|
|
employee.details = EmployeeDetails(position='Developer')
|
|
employee.save()
|
|
|
|
# Ensure that the object is in the database
|
|
collection = self.db[self.Person._meta['collection']]
|
|
employee_obj = collection.find_one({'name': 'Test Employee'})
|
|
self.assertEqual(employee_obj['name'], 'Test Employee')
|
|
self.assertEqual(employee_obj['age'], 50)
|
|
# Ensure that the 'details' embedded object saved correctly
|
|
self.assertEqual(employee_obj['details']['position'], 'Developer')
|
|
|
|
def test_save_reference(self):
|
|
"""Ensure that a document reference field may be saved in the database.
|
|
"""
|
|
|
|
class BlogPost(Document):
|
|
meta = {'collection': 'blogpost_1'}
|
|
content = StringField()
|
|
author = ReferenceField(self.Person)
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
author = self.Person(name='Test User')
|
|
author.save()
|
|
|
|
post = BlogPost(content='Watched some TV today... how exciting.')
|
|
# Should only reference author when saving
|
|
post.author = author
|
|
post.save()
|
|
|
|
post_obj = BlogPost.objects.first()
|
|
|
|
# Test laziness
|
|
self.assertTrue(isinstance(post_obj._data['author'],
|
|
pymongo.dbref.DBRef))
|
|
self.assertTrue(isinstance(post_obj.author, self.Person))
|
|
self.assertEqual(post_obj.author.name, 'Test User')
|
|
|
|
# Ensure that the dereferenced object may be changed and saved
|
|
post_obj.author.age = 25
|
|
post_obj.author.save()
|
|
|
|
author = list(self.Person.objects(name='Test User'))[-1]
|
|
self.assertEqual(author.age, 25)
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
def tearDown(self):
|
|
self.Person.drop_collection()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|