# -*- coding: utf-8 -*- from mongoengine import * from mongoengine.base import BaseDict from tests.utils import MongoDBTestCase, get_as_pymongo class TestDictField(MongoDBTestCase): def test_storage(self): class BlogPost(Document): info = DictField() BlogPost.drop_collection() info = {'testkey': 'testvalue'} post = BlogPost(info=info).save() self.assertEqual( get_as_pymongo(post), { '_id': post.id, 'info': info } ) def test_general_things(self): """Ensure that dict types work as expected.""" class BlogPost(Document): info = DictField() BlogPost.drop_collection() post = BlogPost() post.info = 'my post' self.assertRaises(ValidationError, post.validate) post.info = ['test', 'test'] self.assertRaises(ValidationError, post.validate) post.info = {'$title': 'test'} self.assertRaises(ValidationError, post.validate) post.info = {'nested': {'$title': 'test'}} self.assertRaises(ValidationError, post.validate) post.info = {'the.title': 'test'} self.assertRaises(ValidationError, post.validate) post.info = {'nested': {'the.title': 'test'}} self.assertRaises(ValidationError, post.validate) post.info = {1: 'test'} self.assertRaises(ValidationError, post.validate) post.info = {'title': 'test'} post.save() post = BlogPost() post.info = {'title': 'dollar_sign', 'details': {'te$t': 'test'}} post.save() post = BlogPost() post.info = {'details': {'test': 'test'}} post.save() post = BlogPost() post.info = {'details': {'test': 3}} post.save() self.assertEqual(BlogPost.objects.count(), 4) self.assertEqual( BlogPost.objects.filter(info__title__exact='test').count(), 1) self.assertEqual( BlogPost.objects.filter(info__details__test__exact='test').count(), 1) post = BlogPost.objects.filter(info__title__exact='dollar_sign').first() self.assertIn('te$t', post['info']['details']) # Confirm handles non strings or non existing keys self.assertEqual( BlogPost.objects.filter(info__details__test__exact=5).count(), 0) self.assertEqual( BlogPost.objects.filter(info__made_up__test__exact='test').count(), 0) post = BlogPost.objects.create(info={'title': 'original'}) post.info.update({'title': 'updated'}) post.save() post.reload() self.assertEqual('updated', post.info['title']) post.info.setdefault('authors', []) post.save() post.reload() self.assertEqual([], post.info['authors']) def test_dictfield_dump_document(self): """Ensure a DictField can handle another document's dump.""" class Doc(Document): field = DictField() class ToEmbed(Document): id = IntField(primary_key=True, default=1) recursive = DictField() class ToEmbedParent(Document): id = IntField(primary_key=True, default=1) recursive = DictField() meta = {'allow_inheritance': True} class ToEmbedChild(ToEmbedParent): pass to_embed_recursive = ToEmbed(id=1).save() to_embed = ToEmbed( id=2, recursive=to_embed_recursive.to_mongo().to_dict()).save() doc = Doc(field=to_embed.to_mongo().to_dict()) doc.save() assert isinstance(doc.field, dict) assert doc.field == {'_id': 2, 'recursive': {'_id': 1, 'recursive': {}}} # Same thing with a Document with a _cls field to_embed_recursive = ToEmbedChild(id=1).save() to_embed_child = ToEmbedChild( id=2, recursive=to_embed_recursive.to_mongo().to_dict()).save() doc = Doc(field=to_embed_child.to_mongo().to_dict()) doc.save() assert isinstance(doc.field, dict) assert doc.field == { '_id': 2, '_cls': 'ToEmbedParent.ToEmbedChild', 'recursive': {'_id': 1, '_cls': 'ToEmbedParent.ToEmbedChild', 'recursive': {}} } def test_dictfield_strict(self): """Ensure that dict field handles validation if provided a strict field type.""" class Simple(Document): mapping = DictField(field=IntField()) Simple.drop_collection() e = Simple() e.mapping['someint'] = 1 e.save() # try creating an invalid mapping with self.assertRaises(ValidationError): e.mapping['somestring'] = "abc" e.save() def test_dictfield_complex(self): """Ensure that the dict field can handle the complex types.""" class SettingBase(EmbeddedDocument): meta = {'allow_inheritance': True} class StringSetting(SettingBase): value = StringField() class IntegerSetting(SettingBase): value = IntField() class Simple(Document): mapping = DictField() Simple.drop_collection() e = Simple() e.mapping['somestring'] = StringSetting(value='foo') e.mapping['someint'] = IntegerSetting(value=42) e.mapping['nested_dict'] = {'number': 1, 'string': 'Hi!', 'float': 1.001, 'complex': IntegerSetting(value=42), 'list': [IntegerSetting(value=42), StringSetting(value='foo')]} e.save() e2 = Simple.objects.get(id=e.id) self.assertIsInstance(e2.mapping['somestring'], StringSetting) self.assertIsInstance(e2.mapping['someint'], IntegerSetting) # Test querying self.assertEqual( Simple.objects.filter(mapping__someint__value=42).count(), 1) self.assertEqual( Simple.objects.filter(mapping__nested_dict__number=1).count(), 1) self.assertEqual( Simple.objects.filter(mapping__nested_dict__complex__value=42).count(), 1) self.assertEqual( Simple.objects.filter(mapping__nested_dict__list__0__value=42).count(), 1) self.assertEqual( Simple.objects.filter(mapping__nested_dict__list__1__value='foo').count(), 1) # Confirm can update Simple.objects().update( set__mapping={"someint": IntegerSetting(value=10)}) Simple.objects().update( set__mapping__nested_dict__list__1=StringSetting(value='Boo')) self.assertEqual( Simple.objects.filter(mapping__nested_dict__list__1__value='foo').count(), 0) self.assertEqual( Simple.objects.filter(mapping__nested_dict__list__1__value='Boo').count(), 1) def test_ensure_unique_default_instances(self): """Ensure that every field has it's own unique default instance.""" class D(Document): data = DictField() data2 = DictField(default=lambda: {}) d1 = D() d1.data['foo'] = 'bar' d1.data2['foo'] = 'bar' d2 = D() self.assertEqual(d2.data, {}) self.assertEqual(d2.data2, {}) def test_dict_field_invalid_dict_value(self): class DictFieldTest(Document): dictionary = DictField(required=True) DictFieldTest.drop_collection() test = DictFieldTest(dictionary=None) test.dictionary # Just access to test getter self.assertRaises(ValidationError, test.validate) test = DictFieldTest(dictionary=False) test.dictionary # Just access to test getter self.assertRaises(ValidationError, test.validate) def test_dict_field_raises_validation_error_if_wrongly_assign_embedded_doc(self): class DictFieldTest(Document): dictionary = DictField(required=True) DictFieldTest.drop_collection() class Embedded(EmbeddedDocument): name = StringField() embed = Embedded(name='garbage') doc = DictFieldTest(dictionary=embed) with self.assertRaises(ValidationError) as ctx_err: doc.validate() self.assertIn("'dictionary'", str(ctx_err.exception)) self.assertIn('Only dictionaries may be used in a DictField', str(ctx_err.exception)) def test_atomic_update_dict_field(self): """Ensure that the entire DictField can be atomically updated.""" class Simple(Document): mapping = DictField(field=ListField(IntField(required=True))) Simple.drop_collection() e = Simple() e.mapping['someints'] = [1, 2] e.save() e.update(set__mapping={"ints": [3, 4]}) e.reload() self.assertEqual(BaseDict, type(e.mapping)) self.assertEqual({"ints": [3, 4]}, e.mapping) # try creating an invalid mapping with self.assertRaises(ValueError): e.update(set__mapping={"somestrings": ["foo", "bar", ]}) def test_dictfield_with_referencefield_complex_nesting_cases(self): """Ensure complex nesting inside DictField handles dereferencing of ReferenceField(dbref=True | False)""" # Relates to Issue #1453 class Doc(Document): s = StringField() class Simple(Document): mapping0 = DictField(ReferenceField(Doc, dbref=True)) mapping1 = DictField(ReferenceField(Doc, dbref=False)) mapping2 = DictField(ListField(ReferenceField(Doc, dbref=True))) mapping3 = DictField(ListField(ReferenceField(Doc, dbref=False))) mapping4 = DictField(DictField(field=ReferenceField(Doc, dbref=True))) mapping5 = DictField(DictField(field=ReferenceField(Doc, dbref=False))) mapping6 = DictField(ListField(DictField(ReferenceField(Doc, dbref=True)))) mapping7 = DictField(ListField(DictField(ReferenceField(Doc, dbref=False)))) mapping8 = DictField(ListField(DictField(ListField(ReferenceField(Doc, dbref=True))))) mapping9 = DictField(ListField(DictField(ListField(ReferenceField(Doc, dbref=False))))) Doc.drop_collection() Simple.drop_collection() d = Doc(s='aa').save() e = Simple() e.mapping0['someint'] = e.mapping1['someint'] = d e.mapping2['someint'] = e.mapping3['someint'] = [d] e.mapping4['someint'] = e.mapping5['someint'] = {'d': d} e.mapping6['someint'] = e.mapping7['someint'] = [{'d': d}] e.mapping8['someint'] = e.mapping9['someint'] = [{'d': [d]}] e.save() s = Simple.objects.first() self.assertIsInstance(s.mapping0['someint'], Doc) self.assertIsInstance(s.mapping1['someint'], Doc) self.assertIsInstance(s.mapping2['someint'][0], Doc) self.assertIsInstance(s.mapping3['someint'][0], Doc) self.assertIsInstance(s.mapping4['someint']['d'], Doc) self.assertIsInstance(s.mapping5['someint']['d'], Doc) self.assertIsInstance(s.mapping6['someint'][0]['d'], Doc) self.assertIsInstance(s.mapping7['someint'][0]['d'], Doc) self.assertIsInstance(s.mapping8['someint'][0]['d'][0], Doc) self.assertIsInstance(s.mapping9['someint'][0]['d'][0], Doc)