mongoengine/tests/test_dynamic_document.py
2012-08-13 16:53:36 +01:00

534 lines
18 KiB
Python

import unittest
from mongoengine import *
from mongoengine.connection import get_db
class DynamicDocTest(unittest.TestCase):
def setUp(self):
connect(db='mongoenginetest')
self.db = get_db()
class Person(DynamicDocument):
name = StringField()
meta = {'allow_inheritance': True}
Person.drop_collection()
self.Person = Person
def test_simple_dynamic_document(self):
"""Ensures simple dynamic documents are saved correctly"""
p = self.Person()
p.name = "James"
p.age = 34
self.assertEqual(p.to_mongo(),
{"_types": ["Person"], "_cls": "Person",
"name": "James", "age": 34}
)
p.save()
self.assertEqual(self.Person.objects.first().age, 34)
# Confirm no changes to self.Person
self.assertFalse(hasattr(self.Person, 'age'))
def test_dynamic_document_delta(self):
"""Ensures simple dynamic documents can delta correctly"""
p = self.Person(name="James", age=34)
self.assertEqual(p._delta(), ({'_types': ['Person'], 'age': 34, 'name': 'James', '_cls': 'Person'}, {}))
p.doc = 123
del(p.doc)
self.assertEqual(p._delta(), ({'_types': ['Person'], 'age': 34, 'name': 'James', '_cls': 'Person'}, {'doc': 1}))
def test_change_scope_of_variable(self):
"""Test changing the scope of a dynamic field has no adverse effects"""
p = self.Person()
p.name = "Dean"
p.misc = 22
p.save()
p = self.Person.objects.get()
p.misc = {'hello': 'world'}
p.save()
p = self.Person.objects.get()
self.assertEqual(p.misc, {'hello': 'world'})
def test_delete_dynamic_field(self):
"""Test deleting a dynamic field works"""
self.Person.drop_collection()
p = self.Person()
p.name = "Dean"
p.misc = 22
p.save()
p = self.Person.objects.get()
p.misc = {'hello': 'world'}
p.save()
p = self.Person.objects.get()
self.assertEqual(p.misc, {'hello': 'world'})
collection = self.db[self.Person._get_collection_name()]
obj = collection.find_one()
self.assertEqual(sorted(obj.keys()), ['_cls', '_id', '_types', 'misc', 'name'])
del(p.misc)
p.save()
p = self.Person.objects.get()
self.assertFalse(hasattr(p, 'misc'))
obj = collection.find_one()
self.assertEqual(sorted(obj.keys()), ['_cls', '_id', '_types', 'name'])
def test_dynamic_document_queries(self):
"""Ensure we can query dynamic fields"""
p = self.Person()
p.name = "Dean"
p.age = 22
p.save()
self.assertEqual(1, self.Person.objects(age=22).count())
p = self.Person.objects(age=22)
p = p.get()
self.assertEqual(22, p.age)
def test_complex_dynamic_document_queries(self):
class Person(DynamicDocument):
name = StringField()
Person.drop_collection()
p = Person(name="test")
p.age = "ten"
p.save()
p1 = Person(name="test1")
p1.age = "less then ten and a half"
p1.save()
p2 = Person(name="test2")
p2.age = 10
p2.save()
self.assertEqual(Person.objects(age__icontains='ten').count(), 2)
self.assertEqual(Person.objects(age__gte=10).count(), 1)
def test_complex_data_lookups(self):
"""Ensure you can query dynamic document dynamic fields"""
p = self.Person()
p.misc = {'hello': 'world'}
p.save()
self.assertEqual(1, self.Person.objects(misc__hello='world').count())
def test_inheritance(self):
"""Ensure that dynamic document plays nice with inheritance"""
class Employee(self.Person):
salary = IntField()
Employee.drop_collection()
self.assertTrue('name' in Employee._fields)
self.assertTrue('salary' in Employee._fields)
self.assertEqual(Employee._get_collection_name(),
self.Person._get_collection_name())
joe_bloggs = Employee()
joe_bloggs.name = "Joe Bloggs"
joe_bloggs.salary = 10
joe_bloggs.age = 20
joe_bloggs.save()
self.assertEqual(1, self.Person.objects(age=20).count())
self.assertEqual(1, Employee.objects(age=20).count())
joe_bloggs = self.Person.objects.first()
self.assertTrue(isinstance(joe_bloggs, Employee))
def test_embedded_dynamic_document(self):
"""Test dynamic embedded documents"""
class Embedded(DynamicEmbeddedDocument):
pass
class Doc(DynamicDocument):
pass
Doc.drop_collection()
doc = Doc()
embedded_1 = Embedded()
embedded_1.string_field = 'hello'
embedded_1.int_field = 1
embedded_1.dict_field = {'hello': 'world'}
embedded_1.list_field = ['1', 2, {'hello': 'world'}]
doc.embedded_field = embedded_1
self.assertEqual(doc.to_mongo(), {"_types": ['Doc'], "_cls": "Doc",
"embedded_field": {
"_types": ['Embedded'], "_cls": "Embedded",
"string_field": "hello",
"int_field": 1,
"dict_field": {"hello": "world"},
"list_field": ['1', 2, {'hello': 'world'}]
}
})
doc.save()
doc = Doc.objects.first()
self.assertEqual(doc.embedded_field.__class__, Embedded)
self.assertEqual(doc.embedded_field.string_field, "hello")
self.assertEqual(doc.embedded_field.int_field, 1)
self.assertEqual(doc.embedded_field.dict_field, {'hello': 'world'})
self.assertEqual(doc.embedded_field.list_field, ['1', 2, {'hello': 'world'}])
def test_complex_embedded_documents(self):
"""Test complex dynamic embedded documents setups"""
class Embedded(DynamicEmbeddedDocument):
pass
class Doc(DynamicDocument):
pass
Doc.drop_collection()
doc = Doc()
embedded_1 = Embedded()
embedded_1.string_field = 'hello'
embedded_1.int_field = 1
embedded_1.dict_field = {'hello': 'world'}
embedded_2 = Embedded()
embedded_2.string_field = 'hello'
embedded_2.int_field = 1
embedded_2.dict_field = {'hello': 'world'}
embedded_2.list_field = ['1', 2, {'hello': 'world'}]
embedded_1.list_field = ['1', 2, embedded_2]
doc.embedded_field = embedded_1
self.assertEqual(doc.to_mongo(), {"_types": ['Doc'], "_cls": "Doc",
"embedded_field": {
"_types": ['Embedded'], "_cls": "Embedded",
"string_field": "hello",
"int_field": 1,
"dict_field": {"hello": "world"},
"list_field": ['1', 2,
{"_types": ['Embedded'], "_cls": "Embedded",
"string_field": "hello",
"int_field": 1,
"dict_field": {"hello": "world"},
"list_field": ['1', 2, {'hello': 'world'}]}
]
}
})
doc.save()
doc = Doc.objects.first()
self.assertEqual(doc.embedded_field.__class__, Embedded)
self.assertEqual(doc.embedded_field.string_field, "hello")
self.assertEqual(doc.embedded_field.int_field, 1)
self.assertEqual(doc.embedded_field.dict_field, {'hello': 'world'})
self.assertEqual(doc.embedded_field.list_field[0], '1')
self.assertEqual(doc.embedded_field.list_field[1], 2)
embedded_field = doc.embedded_field.list_field[2]
self.assertEqual(embedded_field.__class__, Embedded)
self.assertEqual(embedded_field.string_field, "hello")
self.assertEqual(embedded_field.int_field, 1)
self.assertEqual(embedded_field.dict_field, {'hello': 'world'})
self.assertEqual(embedded_field.list_field, ['1', 2, {'hello': 'world'}])
def test_delta_for_dynamic_documents(self):
p = self.Person()
p.name = "Dean"
p.age = 22
p.save()
p.age = 24
self.assertEqual(p.age, 24)
self.assertEqual(p._get_changed_fields(), ['age'])
self.assertEqual(p._delta(), ({'age': 24}, {}))
p = self.Person.objects(age=22).get()
p.age = 24
self.assertEqual(p.age, 24)
self.assertEqual(p._get_changed_fields(), ['age'])
self.assertEqual(p._delta(), ({'age': 24}, {}))
p.save()
self.assertEqual(1, self.Person.objects(age=24).count())
def test_delta(self):
class Doc(DynamicDocument):
pass
Doc.drop_collection()
doc = Doc()
doc.save()
doc = Doc.objects.first()
self.assertEqual(doc._get_changed_fields(), [])
self.assertEqual(doc._delta(), ({}, {}))
doc.string_field = 'hello'
self.assertEqual(doc._get_changed_fields(), ['string_field'])
self.assertEqual(doc._delta(), ({'string_field': 'hello'}, {}))
doc._changed_fields = []
doc.int_field = 1
self.assertEqual(doc._get_changed_fields(), ['int_field'])
self.assertEqual(doc._delta(), ({'int_field': 1}, {}))
doc._changed_fields = []
dict_value = {'hello': 'world', 'ping': 'pong'}
doc.dict_field = dict_value
self.assertEqual(doc._get_changed_fields(), ['dict_field'])
self.assertEqual(doc._delta(), ({'dict_field': dict_value}, {}))
doc._changed_fields = []
list_value = ['1', 2, {'hello': 'world'}]
doc.list_field = list_value
self.assertEqual(doc._get_changed_fields(), ['list_field'])
self.assertEqual(doc._delta(), ({'list_field': list_value}, {}))
# Test unsetting
doc._changed_fields = []
doc.dict_field = {}
self.assertEqual(doc._get_changed_fields(), ['dict_field'])
self.assertEqual(doc._delta(), ({}, {'dict_field': 1}))
doc._changed_fields = []
doc.list_field = []
self.assertEqual(doc._get_changed_fields(), ['list_field'])
self.assertEqual(doc._delta(), ({}, {'list_field': 1}))
def test_delta_recursive(self):
"""Testing deltaing works with dynamic documents"""
class Embedded(DynamicEmbeddedDocument):
pass
class Doc(DynamicDocument):
pass
Doc.drop_collection()
doc = Doc()
doc.save()
doc = Doc.objects.first()
self.assertEqual(doc._get_changed_fields(), [])
self.assertEqual(doc._delta(), ({}, {}))
embedded_1 = Embedded()
embedded_1.string_field = 'hello'
embedded_1.int_field = 1
embedded_1.dict_field = {'hello': 'world'}
embedded_1.list_field = ['1', 2, {'hello': 'world'}]
doc.embedded_field = embedded_1
self.assertEqual(doc._get_changed_fields(), ['embedded_field'])
embedded_delta = {
'string_field': 'hello',
'int_field': 1,
'dict_field': {'hello': 'world'},
'list_field': ['1', 2, {'hello': 'world'}]
}
self.assertEqual(doc.embedded_field._delta(), (embedded_delta, {}))
embedded_delta.update({
'_types': ['Embedded'],
'_cls': 'Embedded',
})
self.assertEqual(doc._delta(), ({'embedded_field': embedded_delta}, {}))
doc.save()
doc.reload()
doc.embedded_field.dict_field = {}
self.assertEqual(doc._get_changed_fields(), ['embedded_field.dict_field'])
self.assertEqual(doc.embedded_field._delta(), ({}, {'dict_field': 1}))
self.assertEqual(doc._delta(), ({}, {'embedded_field.dict_field': 1}))
doc.save()
doc.reload()
doc.embedded_field.list_field = []
self.assertEqual(doc._get_changed_fields(), ['embedded_field.list_field'])
self.assertEqual(doc.embedded_field._delta(), ({}, {'list_field': 1}))
self.assertEqual(doc._delta(), ({}, {'embedded_field.list_field': 1}))
doc.save()
doc.reload()
embedded_2 = Embedded()
embedded_2.string_field = 'hello'
embedded_2.int_field = 1
embedded_2.dict_field = {'hello': 'world'}
embedded_2.list_field = ['1', 2, {'hello': 'world'}]
doc.embedded_field.list_field = ['1', 2, embedded_2]
self.assertEqual(doc._get_changed_fields(), ['embedded_field.list_field'])
self.assertEqual(doc.embedded_field._delta(), ({
'list_field': ['1', 2, {
'_cls': 'Embedded',
'_types': ['Embedded'],
'string_field': 'hello',
'dict_field': {'hello': 'world'},
'int_field': 1,
'list_field': ['1', 2, {'hello': 'world'}],
}]
}, {}))
self.assertEqual(doc._delta(), ({
'embedded_field.list_field': ['1', 2, {
'_cls': 'Embedded',
'_types': ['Embedded'],
'string_field': 'hello',
'dict_field': {'hello': 'world'},
'int_field': 1,
'list_field': ['1', 2, {'hello': 'world'}],
}]
}, {}))
doc.save()
doc.reload()
self.assertEqual(doc.embedded_field.list_field[2]._changed_fields, [])
self.assertEqual(doc.embedded_field.list_field[0], '1')
self.assertEqual(doc.embedded_field.list_field[1], 2)
for k in doc.embedded_field.list_field[2]._fields:
self.assertEqual(doc.embedded_field.list_field[2][k], embedded_2[k])
doc.embedded_field.list_field[2].string_field = 'world'
self.assertEqual(doc._get_changed_fields(), ['embedded_field.list_field.2.string_field'])
self.assertEqual(doc.embedded_field._delta(), ({'list_field.2.string_field': 'world'}, {}))
self.assertEqual(doc._delta(), ({'embedded_field.list_field.2.string_field': 'world'}, {}))
doc.save()
doc.reload()
self.assertEqual(doc.embedded_field.list_field[2].string_field, 'world')
# Test multiple assignments
doc.embedded_field.list_field[2].string_field = 'hello world'
doc.embedded_field.list_field[2] = doc.embedded_field.list_field[2]
self.assertEqual(doc._get_changed_fields(), ['embedded_field.list_field'])
self.assertEqual(doc.embedded_field._delta(), ({
'list_field': ['1', 2, {
'_types': ['Embedded'],
'_cls': 'Embedded',
'string_field': 'hello world',
'int_field': 1,
'list_field': ['1', 2, {'hello': 'world'}],
'dict_field': {'hello': 'world'}}]}, {}))
self.assertEqual(doc._delta(), ({
'embedded_field.list_field': ['1', 2, {
'_types': ['Embedded'],
'_cls': 'Embedded',
'string_field': 'hello world',
'int_field': 1,
'list_field': ['1', 2, {'hello': 'world'}],
'dict_field': {'hello': 'world'}}
]}, {}))
doc.save()
doc.reload()
self.assertEqual(doc.embedded_field.list_field[2].string_field, 'hello world')
# Test list native methods
doc.embedded_field.list_field[2].list_field.pop(0)
self.assertEqual(doc._delta(), ({'embedded_field.list_field.2.list_field': [2, {'hello': 'world'}]}, {}))
doc.save()
doc.reload()
doc.embedded_field.list_field[2].list_field.append(1)
self.assertEqual(doc._delta(), ({'embedded_field.list_field.2.list_field': [2, {'hello': 'world'}, 1]}, {}))
doc.save()
doc.reload()
self.assertEqual(doc.embedded_field.list_field[2].list_field, [2, {'hello': 'world'}, 1])
doc.embedded_field.list_field[2].list_field.sort(key=str)# use str as a key to allow comparing uncomperable types
doc.save()
doc.reload()
self.assertEqual(doc.embedded_field.list_field[2].list_field, [1, 2, {'hello': 'world'}])
del(doc.embedded_field.list_field[2].list_field[2]['hello'])
self.assertEqual(doc._delta(), ({'embedded_field.list_field.2.list_field': [1, 2, {}]}, {}))
doc.save()
doc.reload()
del(doc.embedded_field.list_field[2].list_field)
self.assertEqual(doc._delta(), ({}, {'embedded_field.list_field.2.list_field': 1}))
doc.save()
doc.reload()
doc.dict_field = {'embedded': embedded_1}
doc.save()
doc.reload()
doc.dict_field['embedded'].string_field = 'Hello World'
self.assertEqual(doc._get_changed_fields(), ['dict_field.embedded.string_field'])
self.assertEqual(doc._delta(), ({'dict_field.embedded.string_field': 'Hello World'}, {}))
def test_indexes(self):
"""Ensure that indexes are used when meta[indexes] is specified.
"""
class BlogPost(DynamicDocument):
meta = {
'indexes': [
'-date',
('category', '-date')
],
}
BlogPost.drop_collection()
info = BlogPost.objects._collection.index_information()
# _id, '-date', ('cat', 'date')
# NB: there is no index on _types by itself, since
# the indices on -date and tags will both contain
# _types as first element in the key
self.assertEqual(len(info), 3)
# Indexes are lazy so use list() to perform query
list(BlogPost.objects)
info = BlogPost.objects._collection.index_information()
info = [value['key'] for key, value in info.iteritems()]
self.assertTrue([('_types', 1), ('category', 1), ('date', -1)]
in info)
self.assertTrue([('_types', 1), ('date', -1)] in info)
def test_dynamic_and_embedded(self):
"""Ensure embedded documents play nicely"""
class Address(EmbeddedDocument):
city = StringField()
class Person(DynamicDocument):
name = StringField()
meta = {'allow_inheritance': True}
Person.drop_collection()
Person(name="Ross", address=Address(city="London")).save()
person = Person.objects.first()
person.address.city = "Lundenne"
person.save()
self.assertEqual(Person.objects.first().address.city, "Lundenne")
person = Person.objects.first()
person.address = Address(city="Londinium")
person.save()
self.assertEqual(Person.objects.first().address.city, "Londinium")
person = Person.objects.first()
person.age = 35
person.save()
self.assertEqual(Person.objects.first().age, 35)