From b06d7948543870cc4ca0bb41a4450e18d76053ec Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 5 Dec 2010 23:43:19 -0800 Subject: [PATCH] Implementation of DENY rules. --- mongoengine/document.py | 14 +++++++++++++- tests/document.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/mongoengine/document.py b/mongoengine/document.py index 39442f6f..38831b22 100644 --- a/mongoengine/document.py +++ b/mongoengine/document.py @@ -99,6 +99,17 @@ class Document(BaseDocument): :param safe: check if the operation succeeded before returning """ + # Check for DENY rules before actually deleting/nullifying any other + # references + for rule_entry in self._meta['delete_rules']: + document_cls, field_name = rule_entry + rule = self._meta['delete_rules'][rule_entry] + if rule == DENY and document_cls.objects(**{field_name: self.id}).count() > 0: + msg = u'Could not delete document (at least %s.%s refers to it)' % \ + (document_cls.__name__, field_name) + logging.error(msg) + raise OperationError(msg) + for rule_entry in self._meta['delete_rules']: document_cls, field_name = rule_entry rule = self._meta['delete_rules'][rule_entry] @@ -106,7 +117,8 @@ class Document(BaseDocument): if rule == CASCADE: document_cls.objects(**{field_name: self.id}).delete(safe=safe) elif rule == NULLIFY: - document_cls.objects(**{field_name: self.id}).update(**{'unset__%s' % field_name: 1}) + document_cls.objects(**{field_name: + self.id}).update(**{'unset__%s' % field_name: 1}) id_field = self._meta['id_field'] object_id = self._fields[id_field].to_mongo(self[id_field]) diff --git a/tests/document.py b/tests/document.py index e5ff3b12..99657993 100644 --- a/tests/document.py +++ b/tests/document.py @@ -667,7 +667,36 @@ class DocumentTest(unittest.TestCase): """Ensure that a document cannot be referenced if there are still documents referring to it. """ - self.fail() + + class BlogPost(Document): + content = StringField() + author = ReferenceField(self.Person, delete_rule=DENY) + + self.Person.drop_collection() + BlogPost.drop_collection() + + author = self.Person(name='Test User') + author.save() + + post = BlogPost(content = 'Watched some TV') + post.author = author + post.save() + + # Delete the Person should be denied + self.assertRaises(OperationError, author.delete) # Should raise denied error + self.assertEqual(len(BlogPost.objects), 1) # No objects may have been deleted + self.assertEqual(len(self.Person.objects), 1) + + # Other users, that don't have BlogPosts must be removable, like normal + author = self.Person(name='Another User') + author.save() + + self.assertEqual(len(self.Person.objects), 2) + author.delete() + self.assertEqual(len(self.Person.objects), 1) + + self.Person.drop_collection() + BlogPost.drop_collection() def tearDown(self):