325 lines
12 KiB
Python
325 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
from mongoengine import *
|
|
from mongoengine.base import BaseDict
|
|
|
|
from tests.utils import MongoDBTestCase, get_as_pymongo
|
|
|
|
|
|
class TestDictField(MongoDBTestCase):
|
|
|
|
def test_storage(self):
|
|
class BlogPost(Document):
|
|
info = DictField()
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
info = {'testkey': 'testvalue'}
|
|
post = BlogPost(info=info).save()
|
|
self.assertEqual(
|
|
get_as_pymongo(post),
|
|
{
|
|
'_id': post.id,
|
|
'info': info
|
|
}
|
|
)
|
|
|
|
def test_general_things(self):
|
|
"""Ensure that dict types work as expected."""
|
|
class BlogPost(Document):
|
|
info = DictField()
|
|
|
|
BlogPost.drop_collection()
|
|
|
|
post = BlogPost()
|
|
post.info = 'my post'
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = ['test', 'test']
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = {'$title': 'test'}
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = {'nested': {'$title': 'test'}}
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = {'the.title': 'test'}
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = {'nested': {'the.title': 'test'}}
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = {1: 'test'}
|
|
self.assertRaises(ValidationError, post.validate)
|
|
|
|
post.info = {'title': 'test'}
|
|
post.save()
|
|
|
|
post = BlogPost()
|
|
post.info = {'title': 'dollar_sign', 'details': {'te$t': 'test'}}
|
|
post.save()
|
|
|
|
post = BlogPost()
|
|
post.info = {'details': {'test': 'test'}}
|
|
post.save()
|
|
|
|
post = BlogPost()
|
|
post.info = {'details': {'test': 3}}
|
|
post.save()
|
|
|
|
self.assertEqual(BlogPost.objects.count(), 4)
|
|
self.assertEqual(
|
|
BlogPost.objects.filter(info__title__exact='test').count(), 1)
|
|
self.assertEqual(
|
|
BlogPost.objects.filter(info__details__test__exact='test').count(), 1)
|
|
|
|
post = BlogPost.objects.filter(info__title__exact='dollar_sign').first()
|
|
self.assertIn('te$t', post['info']['details'])
|
|
|
|
# Confirm handles non strings or non existing keys
|
|
self.assertEqual(
|
|
BlogPost.objects.filter(info__details__test__exact=5).count(), 0)
|
|
self.assertEqual(
|
|
BlogPost.objects.filter(info__made_up__test__exact='test').count(), 0)
|
|
|
|
post = BlogPost.objects.create(info={'title': 'original'})
|
|
post.info.update({'title': 'updated'})
|
|
post.save()
|
|
post.reload()
|
|
self.assertEqual('updated', post.info['title'])
|
|
|
|
post.info.setdefault('authors', [])
|
|
post.save()
|
|
post.reload()
|
|
self.assertEqual([], post.info['authors'])
|
|
|
|
def test_dictfield_dump_document(self):
|
|
"""Ensure a DictField can handle another document's dump."""
|
|
class Doc(Document):
|
|
field = DictField()
|
|
|
|
class ToEmbed(Document):
|
|
id = IntField(primary_key=True, default=1)
|
|
recursive = DictField()
|
|
|
|
class ToEmbedParent(Document):
|
|
id = IntField(primary_key=True, default=1)
|
|
recursive = DictField()
|
|
|
|
meta = {'allow_inheritance': True}
|
|
|
|
class ToEmbedChild(ToEmbedParent):
|
|
pass
|
|
|
|
to_embed_recursive = ToEmbed(id=1).save()
|
|
to_embed = ToEmbed(
|
|
id=2, recursive=to_embed_recursive.to_mongo().to_dict()).save()
|
|
doc = Doc(field=to_embed.to_mongo().to_dict())
|
|
doc.save()
|
|
self.assertIsInstance(doc.field, dict)
|
|
self.assertEqual(doc.field, {'_id': 2, 'recursive': {'_id': 1, 'recursive': {}}})
|
|
# Same thing with a Document with a _cls field
|
|
to_embed_recursive = ToEmbedChild(id=1).save()
|
|
to_embed_child = ToEmbedChild(
|
|
id=2, recursive=to_embed_recursive.to_mongo().to_dict()).save()
|
|
doc = Doc(field=to_embed_child.to_mongo().to_dict())
|
|
doc.save()
|
|
self.assertIsInstance(doc.field, dict)
|
|
expected = {
|
|
'_id': 2, '_cls': 'ToEmbedParent.ToEmbedChild',
|
|
'recursive': {'_id': 1, '_cls': 'ToEmbedParent.ToEmbedChild', 'recursive': {}}
|
|
}
|
|
self.assertEqual(doc.field, expected)
|
|
|
|
def test_dictfield_strict(self):
|
|
"""Ensure that dict field handles validation if provided a strict field type."""
|
|
class Simple(Document):
|
|
mapping = DictField(field=IntField())
|
|
|
|
Simple.drop_collection()
|
|
|
|
e = Simple()
|
|
e.mapping['someint'] = 1
|
|
e.save()
|
|
|
|
# try creating an invalid mapping
|
|
with self.assertRaises(ValidationError):
|
|
e.mapping['somestring'] = "abc"
|
|
e.save()
|
|
|
|
def test_dictfield_complex(self):
|
|
"""Ensure that the dict field can handle the complex types."""
|
|
class SettingBase(EmbeddedDocument):
|
|
meta = {'allow_inheritance': True}
|
|
|
|
class StringSetting(SettingBase):
|
|
value = StringField()
|
|
|
|
class IntegerSetting(SettingBase):
|
|
value = IntField()
|
|
|
|
class Simple(Document):
|
|
mapping = DictField()
|
|
|
|
Simple.drop_collection()
|
|
|
|
e = Simple()
|
|
e.mapping['somestring'] = StringSetting(value='foo')
|
|
e.mapping['someint'] = IntegerSetting(value=42)
|
|
e.mapping['nested_dict'] = {'number': 1, 'string': 'Hi!',
|
|
'float': 1.001,
|
|
'complex': IntegerSetting(value=42),
|
|
'list': [IntegerSetting(value=42),
|
|
StringSetting(value='foo')]}
|
|
e.save()
|
|
|
|
e2 = Simple.objects.get(id=e.id)
|
|
self.assertIsInstance(e2.mapping['somestring'], StringSetting)
|
|
self.assertIsInstance(e2.mapping['someint'], IntegerSetting)
|
|
|
|
# Test querying
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__someint__value=42).count(), 1)
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__nested_dict__number=1).count(), 1)
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__nested_dict__complex__value=42).count(), 1)
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__nested_dict__list__0__value=42).count(), 1)
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__nested_dict__list__1__value='foo').count(), 1)
|
|
|
|
# Confirm can update
|
|
Simple.objects().update(
|
|
set__mapping={"someint": IntegerSetting(value=10)})
|
|
Simple.objects().update(
|
|
set__mapping__nested_dict__list__1=StringSetting(value='Boo'))
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__nested_dict__list__1__value='foo').count(), 0)
|
|
self.assertEqual(
|
|
Simple.objects.filter(mapping__nested_dict__list__1__value='Boo').count(), 1)
|
|
|
|
def test_push_dict(self):
|
|
class MyModel(Document):
|
|
events = ListField(DictField())
|
|
|
|
doc = MyModel(events=[{'a': 1}]).save()
|
|
raw_doc = get_as_pymongo(doc)
|
|
expected_raw_doc = {
|
|
'_id': doc.id,
|
|
'events': [{'a': 1}]
|
|
}
|
|
self.assertEqual(raw_doc, expected_raw_doc)
|
|
|
|
MyModel.objects(id=doc.id).update(push__events={})
|
|
raw_doc = get_as_pymongo(doc)
|
|
expected_raw_doc = {
|
|
'_id': doc.id,
|
|
'events': [{'a': 1}, {}]
|
|
}
|
|
self.assertEqual(raw_doc, expected_raw_doc)
|
|
|
|
def test_ensure_unique_default_instances(self):
|
|
"""Ensure that every field has it's own unique default instance."""
|
|
class D(Document):
|
|
data = DictField()
|
|
data2 = DictField(default=lambda: {})
|
|
|
|
d1 = D()
|
|
d1.data['foo'] = 'bar'
|
|
d1.data2['foo'] = 'bar'
|
|
d2 = D()
|
|
self.assertEqual(d2.data, {})
|
|
self.assertEqual(d2.data2, {})
|
|
|
|
def test_dict_field_invalid_dict_value(self):
|
|
class DictFieldTest(Document):
|
|
dictionary = DictField(required=True)
|
|
|
|
DictFieldTest.drop_collection()
|
|
|
|
test = DictFieldTest(dictionary=None)
|
|
test.dictionary # Just access to test getter
|
|
self.assertRaises(ValidationError, test.validate)
|
|
|
|
test = DictFieldTest(dictionary=False)
|
|
test.dictionary # Just access to test getter
|
|
self.assertRaises(ValidationError, test.validate)
|
|
|
|
def test_dict_field_raises_validation_error_if_wrongly_assign_embedded_doc(self):
|
|
class DictFieldTest(Document):
|
|
dictionary = DictField(required=True)
|
|
|
|
DictFieldTest.drop_collection()
|
|
|
|
class Embedded(EmbeddedDocument):
|
|
name = StringField()
|
|
|
|
embed = Embedded(name='garbage')
|
|
doc = DictFieldTest(dictionary=embed)
|
|
with self.assertRaises(ValidationError) as ctx_err:
|
|
doc.validate()
|
|
self.assertIn("'dictionary'", str(ctx_err.exception))
|
|
self.assertIn('Only dictionaries may be used in a DictField', str(ctx_err.exception))
|
|
|
|
def test_atomic_update_dict_field(self):
|
|
"""Ensure that the entire DictField can be atomically updated."""
|
|
class Simple(Document):
|
|
mapping = DictField(field=ListField(IntField(required=True)))
|
|
|
|
Simple.drop_collection()
|
|
|
|
e = Simple()
|
|
e.mapping['someints'] = [1, 2]
|
|
e.save()
|
|
e.update(set__mapping={"ints": [3, 4]})
|
|
e.reload()
|
|
self.assertEqual(BaseDict, type(e.mapping))
|
|
self.assertEqual({"ints": [3, 4]}, e.mapping)
|
|
|
|
# try creating an invalid mapping
|
|
with self.assertRaises(ValueError):
|
|
e.update(set__mapping={"somestrings": ["foo", "bar", ]})
|
|
|
|
def test_dictfield_with_referencefield_complex_nesting_cases(self):
|
|
"""Ensure complex nesting inside DictField handles dereferencing of ReferenceField(dbref=True | False)"""
|
|
# Relates to Issue #1453
|
|
class Doc(Document):
|
|
s = StringField()
|
|
|
|
class Simple(Document):
|
|
mapping0 = DictField(ReferenceField(Doc, dbref=True))
|
|
mapping1 = DictField(ReferenceField(Doc, dbref=False))
|
|
mapping2 = DictField(ListField(ReferenceField(Doc, dbref=True)))
|
|
mapping3 = DictField(ListField(ReferenceField(Doc, dbref=False)))
|
|
mapping4 = DictField(DictField(field=ReferenceField(Doc, dbref=True)))
|
|
mapping5 = DictField(DictField(field=ReferenceField(Doc, dbref=False)))
|
|
mapping6 = DictField(ListField(DictField(ReferenceField(Doc, dbref=True))))
|
|
mapping7 = DictField(ListField(DictField(ReferenceField(Doc, dbref=False))))
|
|
mapping8 = DictField(ListField(DictField(ListField(ReferenceField(Doc, dbref=True)))))
|
|
mapping9 = DictField(ListField(DictField(ListField(ReferenceField(Doc, dbref=False)))))
|
|
|
|
Doc.drop_collection()
|
|
Simple.drop_collection()
|
|
|
|
d = Doc(s='aa').save()
|
|
e = Simple()
|
|
e.mapping0['someint'] = e.mapping1['someint'] = d
|
|
e.mapping2['someint'] = e.mapping3['someint'] = [d]
|
|
e.mapping4['someint'] = e.mapping5['someint'] = {'d': d}
|
|
e.mapping6['someint'] = e.mapping7['someint'] = [{'d': d}]
|
|
e.mapping8['someint'] = e.mapping9['someint'] = [{'d': [d]}]
|
|
e.save()
|
|
|
|
s = Simple.objects.first()
|
|
self.assertIsInstance(s.mapping0['someint'], Doc)
|
|
self.assertIsInstance(s.mapping1['someint'], Doc)
|
|
self.assertIsInstance(s.mapping2['someint'][0], Doc)
|
|
self.assertIsInstance(s.mapping3['someint'][0], Doc)
|
|
self.assertIsInstance(s.mapping4['someint']['d'], Doc)
|
|
self.assertIsInstance(s.mapping5['someint']['d'], Doc)
|
|
self.assertIsInstance(s.mapping6['someint'][0]['d'], Doc)
|
|
self.assertIsInstance(s.mapping7['someint'][0]['d'], Doc)
|
|
self.assertIsInstance(s.mapping8['someint'][0]['d'][0], Doc)
|
|
self.assertIsInstance(s.mapping9['someint'][0]['d'][0], Doc)
|