865 lines
31 KiB
Python
865 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
|
import unittest
|
|
|
|
from bson import SON
|
|
from mongoengine import *
|
|
from mongoengine.pymongo_support import list_collection_names
|
|
from tests.utils import MongoDBTestCase
|
|
|
|
|
|
class DeltaTest(MongoDBTestCase):
|
|
|
|
def setUp(self):
|
|
super(DeltaTest, self).setUp()
|
|
|
|
class Person(Document):
|
|
name = StringField()
|
|
age = IntField()
|
|
|
|
non_field = True
|
|
|
|
meta = {"allow_inheritance": True}
|
|
|
|
self.Person = Person
|
|
|
|
def tearDown(self):
|
|
for collection in list_collection_names(self.db):
|
|
self.db.drop_collection(collection)
|
|
|
|
def test_delta(self):
|
|
self.delta(Document)
|
|
self.delta(DynamicDocument)
|
|
|
|
def delta(self, DocClass):
|
|
|
|
class Doc(DocClass):
|
|
string_field = StringField()
|
|
int_field = IntField()
|
|
dict_field = DictField()
|
|
list_field = ListField()
|
|
|
|
Doc.drop_collection()
|
|
doc = Doc()
|
|
doc.save()
|
|
|
|
doc = Doc.objects.first()
|
|
self.assertEqual(doc._get_changed_fields(), [])
|
|
self.assertEqual(doc._delta(), ({}, {}))
|
|
|
|
doc.string_field = 'hello'
|
|
self.assertEqual(doc._get_changed_fields(), ['string_field'])
|
|
self.assertEqual(doc._delta(), ({'string_field': 'hello'}, {}))
|
|
|
|
doc._changed_fields = []
|
|
doc.int_field = 1
|
|
self.assertEqual(doc._get_changed_fields(), ['int_field'])
|
|
self.assertEqual(doc._delta(), ({'int_field': 1}, {}))
|
|
|
|
doc._changed_fields = []
|
|
dict_value = {'hello': 'world', 'ping': 'pong'}
|
|
doc.dict_field = dict_value
|
|
self.assertEqual(doc._get_changed_fields(), ['dict_field'])
|
|
self.assertEqual(doc._delta(), ({'dict_field': dict_value}, {}))
|
|
|
|
doc._changed_fields = []
|
|
list_value = ['1', 2, {'hello': 'world'}]
|
|
doc.list_field = list_value
|
|
self.assertEqual(doc._get_changed_fields(), ['list_field'])
|
|
self.assertEqual(doc._delta(), ({'list_field': list_value}, {}))
|
|
|
|
# Test unsetting
|
|
doc._changed_fields = []
|
|
doc.dict_field = {}
|
|
self.assertEqual(doc._get_changed_fields(), ['dict_field'])
|
|
self.assertEqual(doc._delta(), ({}, {'dict_field': 1}))
|
|
|
|
doc._changed_fields = []
|
|
doc.list_field = []
|
|
self.assertEqual(doc._get_changed_fields(), ['list_field'])
|
|
self.assertEqual(doc._delta(), ({}, {'list_field': 1}))
|
|
|
|
def test_delta_recursive(self):
|
|
self.delta_recursive(Document, EmbeddedDocument)
|
|
self.delta_recursive(DynamicDocument, EmbeddedDocument)
|
|
self.delta_recursive(Document, DynamicEmbeddedDocument)
|
|
self.delta_recursive(DynamicDocument, DynamicEmbeddedDocument)
|
|
|
|
def delta_recursive(self, DocClass, EmbeddedClass):
|
|
|
|
class Embedded(EmbeddedClass):
|
|
id = StringField()
|
|
string_field = StringField()
|
|
int_field = IntField()
|
|
dict_field = DictField()
|
|
list_field = ListField()
|
|
|
|
class Doc(DocClass):
|
|
string_field = StringField()
|
|
int_field = IntField()
|
|
dict_field = DictField()
|
|
list_field = ListField()
|
|
embedded_field = EmbeddedDocumentField(Embedded)
|
|
|
|
Doc.drop_collection()
|
|
doc = Doc()
|
|
doc.save()
|
|
|
|
doc = Doc.objects.first()
|
|
self.assertEqual(doc._get_changed_fields(), [])
|
|
self.assertEqual(doc._delta(), ({}, {}))
|
|
|
|
embedded_1 = Embedded()
|
|
embedded_1.id = "010101"
|
|
embedded_1.string_field = 'hello'
|
|
embedded_1.int_field = 1
|
|
embedded_1.dict_field = {'hello': 'world'}
|
|
embedded_1.list_field = ['1', 2, {'hello': 'world'}]
|
|
doc.embedded_field = embedded_1
|
|
|
|
self.assertEqual(doc._get_changed_fields(), ['embedded_field'])
|
|
|
|
embedded_delta = {
|
|
'id': "010101",
|
|
'string_field': 'hello',
|
|
'int_field': 1,
|
|
'dict_field': {'hello': 'world'},
|
|
'list_field': ['1', 2, {'hello': 'world'}]
|
|
}
|
|
self.assertEqual(doc.embedded_field._delta(), (embedded_delta, {}))
|
|
self.assertEqual(doc._delta(),
|
|
({'embedded_field': embedded_delta}, {}))
|
|
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
doc.embedded_field.dict_field = {}
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['embedded_field.dict_field'])
|
|
self.assertEqual(doc.embedded_field._delta(), ({}, {'dict_field': 1}))
|
|
self.assertEqual(doc._delta(), ({}, {'embedded_field.dict_field': 1}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.dict_field, {})
|
|
|
|
doc.embedded_field.list_field = []
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['embedded_field.list_field'])
|
|
self.assertEqual(doc.embedded_field._delta(), ({}, {'list_field': 1}))
|
|
self.assertEqual(doc._delta(), ({}, {'embedded_field.list_field': 1}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field, [])
|
|
|
|
embedded_2 = Embedded()
|
|
embedded_2.string_field = 'hello'
|
|
embedded_2.int_field = 1
|
|
embedded_2.dict_field = {'hello': 'world'}
|
|
embedded_2.list_field = ['1', 2, {'hello': 'world'}]
|
|
|
|
doc.embedded_field.list_field = ['1', 2, embedded_2]
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['embedded_field.list_field'])
|
|
|
|
self.assertEqual(doc.embedded_field._delta(), ({
|
|
'list_field': ['1', 2, {
|
|
'_cls': 'Embedded',
|
|
'string_field': 'hello',
|
|
'dict_field': {'hello': 'world'},
|
|
'int_field': 1,
|
|
'list_field': ['1', 2, {'hello': 'world'}],
|
|
}]
|
|
}, {}))
|
|
|
|
self.assertEqual(doc._delta(), ({
|
|
'embedded_field.list_field': ['1', 2, {
|
|
'_cls': 'Embedded',
|
|
'string_field': 'hello',
|
|
'dict_field': {'hello': 'world'},
|
|
'int_field': 1,
|
|
'list_field': ['1', 2, {'hello': 'world'}],
|
|
}]
|
|
}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
self.assertEqual(doc.embedded_field.list_field[0], '1')
|
|
self.assertEqual(doc.embedded_field.list_field[1], 2)
|
|
for k in doc.embedded_field.list_field[2]._fields:
|
|
self.assertEqual(doc.embedded_field.list_field[2][k],
|
|
embedded_2[k])
|
|
|
|
doc.embedded_field.list_field[2].string_field = 'world'
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['embedded_field.list_field.2.string_field'])
|
|
self.assertEqual(doc.embedded_field._delta(),
|
|
({'list_field.2.string_field': 'world'}, {}))
|
|
self.assertEqual(doc._delta(),
|
|
({'embedded_field.list_field.2.string_field': 'world'}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].string_field,
|
|
'world')
|
|
|
|
# Test multiple assignments
|
|
doc.embedded_field.list_field[2].string_field = 'hello world'
|
|
doc.embedded_field.list_field[2] = doc.embedded_field.list_field[2]
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['embedded_field.list_field.2'])
|
|
self.assertEqual(doc.embedded_field._delta(), ({'list_field.2': {
|
|
'_cls': 'Embedded',
|
|
'string_field': 'hello world',
|
|
'int_field': 1,
|
|
'list_field': ['1', 2, {'hello': 'world'}],
|
|
'dict_field': {'hello': 'world'}}
|
|
}, {}))
|
|
self.assertEqual(doc._delta(), ({'embedded_field.list_field.2': {
|
|
'_cls': 'Embedded',
|
|
'string_field': 'hello world',
|
|
'int_field': 1,
|
|
'list_field': ['1', 2, {'hello': 'world'}],
|
|
'dict_field': {'hello': 'world'}}
|
|
}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].string_field,
|
|
'hello world')
|
|
|
|
# Test list native methods
|
|
doc.embedded_field.list_field[2].list_field.pop(0)
|
|
self.assertEqual(doc._delta(),
|
|
({'embedded_field.list_field.2.list_field':
|
|
[2, {'hello': 'world'}]}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
doc.embedded_field.list_field[2].list_field.append(1)
|
|
self.assertEqual(doc._delta(),
|
|
({'embedded_field.list_field.2.list_field':
|
|
[2, {'hello': 'world'}, 1]}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].list_field,
|
|
[2, {'hello': 'world'}, 1])
|
|
|
|
doc.embedded_field.list_field[2].list_field.sort(key=str)
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].list_field,
|
|
[1, 2, {'hello': 'world'}])
|
|
|
|
del doc.embedded_field.list_field[2].list_field[2]['hello']
|
|
self.assertEqual(doc._delta(),
|
|
({}, {'embedded_field.list_field.2.list_field.2.hello': 1}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
del doc.embedded_field.list_field[2].list_field
|
|
self.assertEqual(doc._delta(),
|
|
({}, {'embedded_field.list_field.2.list_field': 1}))
|
|
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
doc.dict_field['Embedded'] = embedded_1
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
doc.dict_field['Embedded'].string_field = 'Hello World'
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['dict_field.Embedded.string_field'])
|
|
self.assertEqual(doc._delta(),
|
|
({'dict_field.Embedded.string_field': 'Hello World'}, {}))
|
|
|
|
def test_circular_reference_deltas(self):
|
|
self.circular_reference_deltas(Document, Document)
|
|
self.circular_reference_deltas(Document, DynamicDocument)
|
|
self.circular_reference_deltas(DynamicDocument, Document)
|
|
self.circular_reference_deltas(DynamicDocument, DynamicDocument)
|
|
|
|
def circular_reference_deltas(self, DocClass1, DocClass2):
|
|
|
|
class Person(DocClass1):
|
|
name = StringField()
|
|
owns = ListField(ReferenceField('Organization'))
|
|
|
|
class Organization(DocClass2):
|
|
name = StringField()
|
|
owner = ReferenceField('Person')
|
|
|
|
Person.drop_collection()
|
|
Organization.drop_collection()
|
|
|
|
person = Person(name="owner").save()
|
|
organization = Organization(name="company").save()
|
|
|
|
person.owns.append(organization)
|
|
organization.owner = person
|
|
|
|
person.save()
|
|
organization.save()
|
|
|
|
p = Person.objects[0].select_related()
|
|
o = Organization.objects.first()
|
|
self.assertEqual(p.owns[0], o)
|
|
self.assertEqual(o.owner, p)
|
|
|
|
def test_circular_reference_deltas_2(self):
|
|
self.circular_reference_deltas_2(Document, Document)
|
|
self.circular_reference_deltas_2(Document, DynamicDocument)
|
|
self.circular_reference_deltas_2(DynamicDocument, Document)
|
|
self.circular_reference_deltas_2(DynamicDocument, DynamicDocument)
|
|
|
|
def circular_reference_deltas_2(self, DocClass1, DocClass2, dbref=True):
|
|
|
|
class Person(DocClass1):
|
|
name = StringField()
|
|
owns = ListField(ReferenceField('Organization', dbref=dbref))
|
|
employer = ReferenceField('Organization', dbref=dbref)
|
|
|
|
class Organization(DocClass2):
|
|
name = StringField()
|
|
owner = ReferenceField('Person', dbref=dbref)
|
|
employees = ListField(ReferenceField('Person', dbref=dbref))
|
|
|
|
Person.drop_collection()
|
|
Organization.drop_collection()
|
|
|
|
person = Person(name="owner").save()
|
|
employee = Person(name="employee").save()
|
|
organization = Organization(name="company").save()
|
|
|
|
person.owns.append(organization)
|
|
organization.owner = person
|
|
|
|
organization.employees.append(employee)
|
|
employee.employer = organization
|
|
|
|
person.save()
|
|
organization.save()
|
|
employee.save()
|
|
|
|
p = Person.objects.get(name="owner")
|
|
e = Person.objects.get(name="employee")
|
|
o = Organization.objects.first()
|
|
|
|
self.assertEqual(p.owns[0], o)
|
|
self.assertEqual(o.owner, p)
|
|
self.assertEqual(e.employer, o)
|
|
|
|
return person, organization, employee
|
|
|
|
def test_delta_db_field(self):
|
|
self.delta_db_field(Document)
|
|
self.delta_db_field(DynamicDocument)
|
|
|
|
def delta_db_field(self, DocClass):
|
|
|
|
class Doc(DocClass):
|
|
string_field = StringField(db_field='db_string_field')
|
|
int_field = IntField(db_field='db_int_field')
|
|
dict_field = DictField(db_field='db_dict_field')
|
|
list_field = ListField(db_field='db_list_field')
|
|
|
|
Doc.drop_collection()
|
|
doc = Doc()
|
|
doc.save()
|
|
|
|
doc = Doc.objects.first()
|
|
self.assertEqual(doc._get_changed_fields(), [])
|
|
self.assertEqual(doc._delta(), ({}, {}))
|
|
|
|
doc.string_field = 'hello'
|
|
self.assertEqual(doc._get_changed_fields(), ['db_string_field'])
|
|
self.assertEqual(doc._delta(), ({'db_string_field': 'hello'}, {}))
|
|
|
|
doc._changed_fields = []
|
|
doc.int_field = 1
|
|
self.assertEqual(doc._get_changed_fields(), ['db_int_field'])
|
|
self.assertEqual(doc._delta(), ({'db_int_field': 1}, {}))
|
|
|
|
doc._changed_fields = []
|
|
dict_value = {'hello': 'world', 'ping': 'pong'}
|
|
doc.dict_field = dict_value
|
|
self.assertEqual(doc._get_changed_fields(), ['db_dict_field'])
|
|
self.assertEqual(doc._delta(), ({'db_dict_field': dict_value}, {}))
|
|
|
|
doc._changed_fields = []
|
|
list_value = ['1', 2, {'hello': 'world'}]
|
|
doc.list_field = list_value
|
|
self.assertEqual(doc._get_changed_fields(), ['db_list_field'])
|
|
self.assertEqual(doc._delta(), ({'db_list_field': list_value}, {}))
|
|
|
|
# Test unsetting
|
|
doc._changed_fields = []
|
|
doc.dict_field = {}
|
|
self.assertEqual(doc._get_changed_fields(), ['db_dict_field'])
|
|
self.assertEqual(doc._delta(), ({}, {'db_dict_field': 1}))
|
|
|
|
doc._changed_fields = []
|
|
doc.list_field = []
|
|
self.assertEqual(doc._get_changed_fields(), ['db_list_field'])
|
|
self.assertEqual(doc._delta(), ({}, {'db_list_field': 1}))
|
|
|
|
# Test it saves that data
|
|
doc = Doc()
|
|
doc.save()
|
|
|
|
doc.string_field = 'hello'
|
|
doc.int_field = 1
|
|
doc.dict_field = {'hello': 'world'}
|
|
doc.list_field = ['1', 2, {'hello': 'world'}]
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
self.assertEqual(doc.string_field, 'hello')
|
|
self.assertEqual(doc.int_field, 1)
|
|
self.assertEqual(doc.dict_field, {'hello': 'world'})
|
|
self.assertEqual(doc.list_field, ['1', 2, {'hello': 'world'}])
|
|
|
|
def test_delta_recursive_db_field(self):
|
|
self.delta_recursive_db_field(Document, EmbeddedDocument)
|
|
self.delta_recursive_db_field(Document, DynamicEmbeddedDocument)
|
|
self.delta_recursive_db_field(DynamicDocument, EmbeddedDocument)
|
|
self.delta_recursive_db_field(DynamicDocument, DynamicEmbeddedDocument)
|
|
|
|
def delta_recursive_db_field(self, DocClass, EmbeddedClass):
|
|
|
|
class Embedded(EmbeddedClass):
|
|
string_field = StringField(db_field='db_string_field')
|
|
int_field = IntField(db_field='db_int_field')
|
|
dict_field = DictField(db_field='db_dict_field')
|
|
list_field = ListField(db_field='db_list_field')
|
|
|
|
class Doc(DocClass):
|
|
string_field = StringField(db_field='db_string_field')
|
|
int_field = IntField(db_field='db_int_field')
|
|
dict_field = DictField(db_field='db_dict_field')
|
|
list_field = ListField(db_field='db_list_field')
|
|
embedded_field = EmbeddedDocumentField(Embedded,
|
|
db_field='db_embedded_field')
|
|
|
|
Doc.drop_collection()
|
|
doc = Doc()
|
|
doc.save()
|
|
|
|
doc = Doc.objects.first()
|
|
self.assertEqual(doc._get_changed_fields(), [])
|
|
self.assertEqual(doc._delta(), ({}, {}))
|
|
|
|
embedded_1 = Embedded()
|
|
embedded_1.string_field = 'hello'
|
|
embedded_1.int_field = 1
|
|
embedded_1.dict_field = {'hello': 'world'}
|
|
embedded_1.list_field = ['1', 2, {'hello': 'world'}]
|
|
doc.embedded_field = embedded_1
|
|
|
|
self.assertEqual(doc._get_changed_fields(), ['db_embedded_field'])
|
|
|
|
embedded_delta = {
|
|
'db_string_field': 'hello',
|
|
'db_int_field': 1,
|
|
'db_dict_field': {'hello': 'world'},
|
|
'db_list_field': ['1', 2, {'hello': 'world'}]
|
|
}
|
|
self.assertEqual(doc.embedded_field._delta(), (embedded_delta, {}))
|
|
self.assertEqual(doc._delta(),
|
|
({'db_embedded_field': embedded_delta}, {}))
|
|
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
doc.embedded_field.dict_field = {}
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['db_embedded_field.db_dict_field'])
|
|
self.assertEqual(doc.embedded_field._delta(),
|
|
({}, {'db_dict_field': 1}))
|
|
self.assertEqual(doc._delta(),
|
|
({}, {'db_embedded_field.db_dict_field': 1}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.dict_field, {})
|
|
|
|
doc.embedded_field.list_field = []
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['db_embedded_field.db_list_field'])
|
|
self.assertEqual(doc.embedded_field._delta(),
|
|
({}, {'db_list_field': 1}))
|
|
self.assertEqual(doc._delta(),
|
|
({}, {'db_embedded_field.db_list_field': 1}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field, [])
|
|
|
|
embedded_2 = Embedded()
|
|
embedded_2.string_field = 'hello'
|
|
embedded_2.int_field = 1
|
|
embedded_2.dict_field = {'hello': 'world'}
|
|
embedded_2.list_field = ['1', 2, {'hello': 'world'}]
|
|
|
|
doc.embedded_field.list_field = ['1', 2, embedded_2]
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['db_embedded_field.db_list_field'])
|
|
self.assertEqual(doc.embedded_field._delta(), ({
|
|
'db_list_field': ['1', 2, {
|
|
'_cls': 'Embedded',
|
|
'db_string_field': 'hello',
|
|
'db_dict_field': {'hello': 'world'},
|
|
'db_int_field': 1,
|
|
'db_list_field': ['1', 2, {'hello': 'world'}],
|
|
}]
|
|
}, {}))
|
|
|
|
self.assertEqual(doc._delta(), ({
|
|
'db_embedded_field.db_list_field': ['1', 2, {
|
|
'_cls': 'Embedded',
|
|
'db_string_field': 'hello',
|
|
'db_dict_field': {'hello': 'world'},
|
|
'db_int_field': 1,
|
|
'db_list_field': ['1', 2, {'hello': 'world'}],
|
|
}]
|
|
}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
self.assertEqual(doc.embedded_field.list_field[0], '1')
|
|
self.assertEqual(doc.embedded_field.list_field[1], 2)
|
|
for k in doc.embedded_field.list_field[2]._fields:
|
|
self.assertEqual(doc.embedded_field.list_field[2][k],
|
|
embedded_2[k])
|
|
|
|
doc.embedded_field.list_field[2].string_field = 'world'
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['db_embedded_field.db_list_field.2.db_string_field'])
|
|
self.assertEqual(doc.embedded_field._delta(),
|
|
({'db_list_field.2.db_string_field': 'world'}, {}))
|
|
self.assertEqual(doc._delta(),
|
|
({'db_embedded_field.db_list_field.2.db_string_field': 'world'},
|
|
{}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].string_field,
|
|
'world')
|
|
|
|
# Test multiple assignments
|
|
doc.embedded_field.list_field[2].string_field = 'hello world'
|
|
doc.embedded_field.list_field[2] = doc.embedded_field.list_field[2]
|
|
self.assertEqual(doc._get_changed_fields(),
|
|
['db_embedded_field.db_list_field.2'])
|
|
self.assertEqual(doc.embedded_field._delta(), ({'db_list_field.2': {
|
|
'_cls': 'Embedded',
|
|
'db_string_field': 'hello world',
|
|
'db_int_field': 1,
|
|
'db_list_field': ['1', 2, {'hello': 'world'}],
|
|
'db_dict_field': {'hello': 'world'}}}, {}))
|
|
self.assertEqual(doc._delta(), ({
|
|
'db_embedded_field.db_list_field.2': {
|
|
'_cls': 'Embedded',
|
|
'db_string_field': 'hello world',
|
|
'db_int_field': 1,
|
|
'db_list_field': ['1', 2, {'hello': 'world'}],
|
|
'db_dict_field': {'hello': 'world'}}
|
|
}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].string_field,
|
|
'hello world')
|
|
|
|
# Test list native methods
|
|
doc.embedded_field.list_field[2].list_field.pop(0)
|
|
self.assertEqual(doc._delta(),
|
|
({'db_embedded_field.db_list_field.2.db_list_field':
|
|
[2, {'hello': 'world'}]}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
doc.embedded_field.list_field[2].list_field.append(1)
|
|
self.assertEqual(doc._delta(),
|
|
({'db_embedded_field.db_list_field.2.db_list_field':
|
|
[2, {'hello': 'world'}, 1]}, {}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].list_field,
|
|
[2, {'hello': 'world'}, 1])
|
|
|
|
doc.embedded_field.list_field[2].list_field.sort(key=str)
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
self.assertEqual(doc.embedded_field.list_field[2].list_field,
|
|
[1, 2, {'hello': 'world'}])
|
|
|
|
del doc.embedded_field.list_field[2].list_field[2]['hello']
|
|
self.assertEqual(doc._delta(),
|
|
({}, {'db_embedded_field.db_list_field.2.db_list_field.2.hello': 1}))
|
|
doc.save()
|
|
doc = doc.reload(10)
|
|
|
|
del doc.embedded_field.list_field[2].list_field
|
|
self.assertEqual(doc._delta(), ({},
|
|
{'db_embedded_field.db_list_field.2.db_list_field': 1}))
|
|
|
|
def test_delta_for_dynamic_documents(self):
|
|
class Person(DynamicDocument):
|
|
name = StringField()
|
|
meta = {'allow_inheritance': True}
|
|
|
|
Person.drop_collection()
|
|
|
|
p = Person(name="James", age=34)
|
|
self.assertEqual(p._delta(), (
|
|
SON([('_cls', 'Person'), ('name', 'James'), ('age', 34)]), {}))
|
|
|
|
p.doc = 123
|
|
del p.doc
|
|
self.assertEqual(p._delta(), (
|
|
SON([('_cls', 'Person'), ('name', 'James'), ('age', 34)]), {}))
|
|
|
|
p = Person()
|
|
p.name = "Dean"
|
|
p.age = 22
|
|
p.save()
|
|
|
|
p.age = 24
|
|
self.assertEqual(p.age, 24)
|
|
self.assertEqual(p._get_changed_fields(), ['age'])
|
|
self.assertEqual(p._delta(), ({'age': 24}, {}))
|
|
|
|
p = Person.objects(age=22).get()
|
|
p.age = 24
|
|
self.assertEqual(p.age, 24)
|
|
self.assertEqual(p._get_changed_fields(), ['age'])
|
|
self.assertEqual(p._delta(), ({'age': 24}, {}))
|
|
|
|
p.save()
|
|
self.assertEqual(1, Person.objects(age=24).count())
|
|
|
|
def test_dynamic_delta(self):
|
|
|
|
class Doc(DynamicDocument):
|
|
pass
|
|
|
|
Doc.drop_collection()
|
|
doc = Doc()
|
|
doc.save()
|
|
|
|
doc = Doc.objects.first()
|
|
self.assertEqual(doc._get_changed_fields(), [])
|
|
self.assertEqual(doc._delta(), ({}, {}))
|
|
|
|
doc.string_field = 'hello'
|
|
self.assertEqual(doc._get_changed_fields(), ['string_field'])
|
|
self.assertEqual(doc._delta(), ({'string_field': 'hello'}, {}))
|
|
|
|
doc._changed_fields = []
|
|
doc.int_field = 1
|
|
self.assertEqual(doc._get_changed_fields(), ['int_field'])
|
|
self.assertEqual(doc._delta(), ({'int_field': 1}, {}))
|
|
|
|
doc._changed_fields = []
|
|
dict_value = {'hello': 'world', 'ping': 'pong'}
|
|
doc.dict_field = dict_value
|
|
self.assertEqual(doc._get_changed_fields(), ['dict_field'])
|
|
self.assertEqual(doc._delta(), ({'dict_field': dict_value}, {}))
|
|
|
|
doc._changed_fields = []
|
|
list_value = ['1', 2, {'hello': 'world'}]
|
|
doc.list_field = list_value
|
|
self.assertEqual(doc._get_changed_fields(), ['list_field'])
|
|
self.assertEqual(doc._delta(), ({'list_field': list_value}, {}))
|
|
|
|
# Test unsetting
|
|
doc._changed_fields = []
|
|
doc.dict_field = {}
|
|
self.assertEqual(doc._get_changed_fields(), ['dict_field'])
|
|
self.assertEqual(doc._delta(), ({}, {'dict_field': 1}))
|
|
|
|
doc._changed_fields = []
|
|
doc.list_field = []
|
|
self.assertEqual(doc._get_changed_fields(), ['list_field'])
|
|
self.assertEqual(doc._delta(), ({}, {'list_field': 1}))
|
|
|
|
def test_delta_with_dbref_true(self):
|
|
person, organization, employee = self.circular_reference_deltas_2(Document, Document, True)
|
|
employee.name = 'test'
|
|
|
|
self.assertEqual(organization._get_changed_fields(), [])
|
|
|
|
updates, removals = organization._delta()
|
|
self.assertEqual({}, removals)
|
|
self.assertEqual({}, updates)
|
|
|
|
organization.employees.append(person)
|
|
updates, removals = organization._delta()
|
|
self.assertEqual({}, removals)
|
|
self.assertIn('employees', updates)
|
|
|
|
def test_delta_with_dbref_false(self):
|
|
person, organization, employee = self.circular_reference_deltas_2(Document, Document, False)
|
|
employee.name = 'test'
|
|
|
|
self.assertEqual(organization._get_changed_fields(), [])
|
|
|
|
updates, removals = organization._delta()
|
|
self.assertEqual({}, removals)
|
|
self.assertEqual({}, updates)
|
|
|
|
organization.employees.append(person)
|
|
updates, removals = organization._delta()
|
|
self.assertEqual({}, removals)
|
|
self.assertIn('employees', updates)
|
|
|
|
def test_nested_nested_fields_mark_as_changed(self):
|
|
class EmbeddedDoc(EmbeddedDocument):
|
|
name = StringField()
|
|
|
|
class MyDoc(Document):
|
|
subs = MapField(MapField(EmbeddedDocumentField(EmbeddedDoc)))
|
|
name = StringField()
|
|
|
|
MyDoc.drop_collection()
|
|
|
|
mydoc = MyDoc(name='testcase1', subs={'a': {'b': EmbeddedDoc(name='foo')}}).save()
|
|
|
|
mydoc = MyDoc.objects.first()
|
|
subdoc = mydoc.subs['a']['b']
|
|
subdoc.name = 'bar'
|
|
|
|
self.assertEqual(["name"], subdoc._get_changed_fields())
|
|
self.assertEqual(["subs.a.b.name"], mydoc._get_changed_fields())
|
|
|
|
mydoc._clear_changed_fields()
|
|
self.assertEqual([], mydoc._get_changed_fields())
|
|
|
|
def test_lower_level_mark_as_changed(self):
|
|
class EmbeddedDoc(EmbeddedDocument):
|
|
name = StringField()
|
|
|
|
class MyDoc(Document):
|
|
subs = MapField(EmbeddedDocumentField(EmbeddedDoc))
|
|
|
|
MyDoc.drop_collection()
|
|
|
|
MyDoc().save()
|
|
|
|
mydoc = MyDoc.objects.first()
|
|
mydoc.subs['a'] = EmbeddedDoc()
|
|
self.assertEqual(["subs.a"], mydoc._get_changed_fields())
|
|
|
|
subdoc = mydoc.subs['a']
|
|
subdoc.name = 'bar'
|
|
|
|
self.assertEqual(["name"], subdoc._get_changed_fields())
|
|
self.assertEqual(["subs.a"], mydoc._get_changed_fields())
|
|
mydoc.save()
|
|
|
|
mydoc._clear_changed_fields()
|
|
self.assertEqual([], mydoc._get_changed_fields())
|
|
|
|
def test_upper_level_mark_as_changed(self):
|
|
class EmbeddedDoc(EmbeddedDocument):
|
|
name = StringField()
|
|
|
|
class MyDoc(Document):
|
|
subs = MapField(EmbeddedDocumentField(EmbeddedDoc))
|
|
|
|
MyDoc.drop_collection()
|
|
|
|
MyDoc(subs={'a': EmbeddedDoc(name='foo')}).save()
|
|
|
|
mydoc = MyDoc.objects.first()
|
|
subdoc = mydoc.subs['a']
|
|
subdoc.name = 'bar'
|
|
|
|
self.assertEqual(["name"], subdoc._get_changed_fields())
|
|
self.assertEqual(["subs.a.name"], mydoc._get_changed_fields())
|
|
|
|
mydoc.subs['a'] = EmbeddedDoc()
|
|
self.assertEqual(["subs.a"], mydoc._get_changed_fields())
|
|
mydoc.save()
|
|
|
|
mydoc._clear_changed_fields()
|
|
self.assertEqual([], mydoc._get_changed_fields())
|
|
|
|
def test_referenced_object_changed_attributes(self):
|
|
"""Ensures that when you save a new reference to a field, the referenced object isn't altered"""
|
|
|
|
class Organization(Document):
|
|
name = StringField()
|
|
|
|
class User(Document):
|
|
name = StringField()
|
|
org = ReferenceField('Organization', required=True)
|
|
|
|
Organization.drop_collection()
|
|
User.drop_collection()
|
|
|
|
org1 = Organization(name='Org 1')
|
|
org1.save()
|
|
|
|
org2 = Organization(name='Org 2')
|
|
org2.save()
|
|
|
|
user = User(name='Fred', org=org1)
|
|
user.save()
|
|
|
|
org1.reload()
|
|
org2.reload()
|
|
user.reload()
|
|
self.assertEqual(org1.name, 'Org 1')
|
|
self.assertEqual(org2.name, 'Org 2')
|
|
self.assertEqual(user.name, 'Fred')
|
|
|
|
user.name = 'Harold'
|
|
user.org = org2
|
|
|
|
org2.name = 'New Org 2'
|
|
self.assertEqual(org2.name, 'New Org 2')
|
|
|
|
user.save()
|
|
org2.save()
|
|
|
|
self.assertEqual(org2.name, 'New Org 2')
|
|
org2.reload()
|
|
self.assertEqual(org2.name, 'New Org 2')
|
|
|
|
def test_delta_for_nested_map_fields(self):
|
|
class UInfoDocument(Document):
|
|
phone = StringField()
|
|
|
|
class EmbeddedRole(EmbeddedDocument):
|
|
type = StringField()
|
|
|
|
class EmbeddedUser(EmbeddedDocument):
|
|
name = StringField()
|
|
roles = MapField(field=EmbeddedDocumentField(EmbeddedRole))
|
|
rolist = ListField(field=EmbeddedDocumentField(EmbeddedRole))
|
|
info = ReferenceField(UInfoDocument)
|
|
|
|
class Doc(Document):
|
|
users = MapField(field=EmbeddedDocumentField(EmbeddedUser))
|
|
num = IntField(default=-1)
|
|
|
|
Doc.drop_collection()
|
|
|
|
doc = Doc(num=1)
|
|
doc.users["007"] = EmbeddedUser(name="Agent007")
|
|
doc.save()
|
|
|
|
uinfo = UInfoDocument(phone="79089269066")
|
|
uinfo.save()
|
|
|
|
d = Doc.objects(num=1).first()
|
|
d.users["007"]["roles"]["666"] = EmbeddedRole(type="superadmin")
|
|
d.users["007"]["rolist"].append(EmbeddedRole(type="oops"))
|
|
d.users["007"]["info"] = uinfo
|
|
delta = d._delta()
|
|
self.assertEqual(True, "users.007.roles.666" in delta[0])
|
|
self.assertEqual(True, "users.007.rolist" in delta[0])
|
|
self.assertEqual(True, "users.007.info" in delta[0])
|
|
self.assertEqual('superadmin', delta[0]["users.007.roles.666"]["type"])
|
|
self.assertEqual('oops', delta[0]["users.007.rolist"][0]["type"])
|
|
self.assertEqual(uinfo.id, delta[0]["users.007.info"])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|