Merge in upstream dev
This commit is contained in:
288
tests/dereference.py
Normal file
288
tests/dereference.py
Normal file
@@ -0,0 +1,288 @@
|
||||
import unittest
|
||||
|
||||
from mongoengine import *
|
||||
from mongoengine.connection import _get_db
|
||||
from mongoengine.tests import query_counter
|
||||
|
||||
|
||||
class FieldTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
connect(db='mongoenginetest')
|
||||
self.db = _get_db()
|
||||
|
||||
def test_list_item_dereference(self):
|
||||
"""Ensure that DBRef items in ListFields are dereferenced.
|
||||
"""
|
||||
class User(Document):
|
||||
name = StringField()
|
||||
|
||||
class Group(Document):
|
||||
members = ListField(ReferenceField(User))
|
||||
|
||||
User.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
for i in xrange(1, 51):
|
||||
user = User(name='user %s' % i)
|
||||
user.save()
|
||||
|
||||
group = Group(members=User.objects)
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 2)
|
||||
|
||||
User.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
def test_recursive_reference(self):
|
||||
"""Ensure that ReferenceFields can reference their own documents.
|
||||
"""
|
||||
class Employee(Document):
|
||||
name = StringField()
|
||||
boss = ReferenceField('self')
|
||||
friends = ListField(ReferenceField('self'))
|
||||
|
||||
bill = Employee(name='Bill Lumbergh')
|
||||
bill.save()
|
||||
|
||||
michael = Employee(name='Michael Bolton')
|
||||
michael.save()
|
||||
|
||||
samir = Employee(name='Samir Nagheenanajar')
|
||||
samir.save()
|
||||
|
||||
friends = [michael, samir]
|
||||
peter = Employee(name='Peter Gibbons', boss=bill, friends=friends)
|
||||
peter.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
peter = Employee.objects.with_id(peter.id)
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
peter.boss
|
||||
self.assertEqual(q, 2)
|
||||
|
||||
peter.friends
|
||||
self.assertEqual(q, 3)
|
||||
|
||||
def test_generic_reference(self):
|
||||
|
||||
class UserA(Document):
|
||||
name = StringField()
|
||||
|
||||
class UserB(Document):
|
||||
name = StringField()
|
||||
|
||||
class UserC(Document):
|
||||
name = StringField()
|
||||
|
||||
class Group(Document):
|
||||
members = ListField(GenericReferenceField())
|
||||
|
||||
UserA.drop_collection()
|
||||
UserB.drop_collection()
|
||||
UserC.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
members = []
|
||||
for i in xrange(1, 51):
|
||||
a = UserA(name='User A %s' % i)
|
||||
a.save()
|
||||
|
||||
b = UserB(name='User B %s' % i)
|
||||
b.save()
|
||||
|
||||
c = UserC(name='User C %s' % i)
|
||||
c.save()
|
||||
|
||||
members += [a, b, c]
|
||||
|
||||
group = Group(members=members)
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 4)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 4)
|
||||
|
||||
UserA.drop_collection()
|
||||
UserB.drop_collection()
|
||||
UserC.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
def test_map_field_reference(self):
|
||||
|
||||
class User(Document):
|
||||
name = StringField()
|
||||
|
||||
class Group(Document):
|
||||
members = MapField(ReferenceField(User))
|
||||
|
||||
User.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
members = []
|
||||
for i in xrange(1, 51):
|
||||
user = User(name='user %s' % i)
|
||||
user.save()
|
||||
members.append(user)
|
||||
|
||||
group = Group(members=dict([(str(u.id), u) for u in members]))
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 2)
|
||||
|
||||
User.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
def ztest_generic_reference_dict_field(self):
|
||||
|
||||
class UserA(Document):
|
||||
name = StringField()
|
||||
|
||||
class UserB(Document):
|
||||
name = StringField()
|
||||
|
||||
class UserC(Document):
|
||||
name = StringField()
|
||||
|
||||
class Group(Document):
|
||||
members = DictField()
|
||||
|
||||
UserA.drop_collection()
|
||||
UserB.drop_collection()
|
||||
UserC.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
members = []
|
||||
for i in xrange(1, 51):
|
||||
a = UserA(name='User A %s' % i)
|
||||
a.save()
|
||||
|
||||
b = UserB(name='User B %s' % i)
|
||||
b.save()
|
||||
|
||||
c = UserC(name='User C %s' % i)
|
||||
c.save()
|
||||
|
||||
members += [a, b, c]
|
||||
|
||||
group = Group(members=dict([(str(u.id), u) for u in members]))
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 4)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 4)
|
||||
|
||||
group.members = {}
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
UserA.drop_collection()
|
||||
UserB.drop_collection()
|
||||
UserC.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
def test_generic_reference_map_field(self):
|
||||
|
||||
class UserA(Document):
|
||||
name = StringField()
|
||||
|
||||
class UserB(Document):
|
||||
name = StringField()
|
||||
|
||||
class UserC(Document):
|
||||
name = StringField()
|
||||
|
||||
class Group(Document):
|
||||
members = MapField(GenericReferenceField())
|
||||
|
||||
UserA.drop_collection()
|
||||
UserB.drop_collection()
|
||||
UserC.drop_collection()
|
||||
Group.drop_collection()
|
||||
|
||||
members = []
|
||||
for i in xrange(1, 51):
|
||||
a = UserA(name='User A %s' % i)
|
||||
a.save()
|
||||
|
||||
b = UserB(name='User B %s' % i)
|
||||
b.save()
|
||||
|
||||
c = UserC(name='User C %s' % i)
|
||||
c.save()
|
||||
|
||||
members += [a, b, c]
|
||||
|
||||
group = Group(members=dict([(str(u.id), u) for u in members]))
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 4)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 4)
|
||||
|
||||
group.members = {}
|
||||
group.save()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
group_obj = Group.objects.first()
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
[m for m in group_obj.members]
|
||||
self.assertEqual(q, 1)
|
||||
|
||||
UserA.drop_collection()
|
||||
UserB.drop_collection()
|
||||
UserC.drop_collection()
|
||||
Group.drop_collection()
|
||||
@@ -377,6 +377,40 @@ class DocumentTest(unittest.TestCase):
|
||||
|
||||
BlogPost.drop_collection()
|
||||
|
||||
|
||||
def test_dictionary_indexes(self):
|
||||
"""Ensure that indexes are used when meta[indexes] contains dictionaries
|
||||
instead of lists.
|
||||
"""
|
||||
class BlogPost(Document):
|
||||
date = DateTimeField(db_field='addDate', default=datetime.now)
|
||||
category = StringField()
|
||||
tags = ListField(StringField())
|
||||
meta = {
|
||||
'indexes': [
|
||||
{ 'fields': ['-date'], 'unique': True,
|
||||
'sparse': True, 'types': False },
|
||||
],
|
||||
}
|
||||
|
||||
BlogPost.drop_collection()
|
||||
|
||||
info = BlogPost.objects._collection.index_information()
|
||||
# _id, '-date'
|
||||
self.assertEqual(len(info), 3)
|
||||
|
||||
# Indexes are lazy so use list() to perform query
|
||||
list(BlogPost.objects)
|
||||
info = BlogPost.objects._collection.index_information()
|
||||
info = [(value['key'],
|
||||
value.get('unique', False),
|
||||
value.get('sparse', False))
|
||||
for key, value in info.iteritems()]
|
||||
self.assertTrue(([('addDate', -1)], True, True) in info)
|
||||
|
||||
BlogPost.drop_collection()
|
||||
|
||||
|
||||
def test_unique(self):
|
||||
"""Ensure that uniqueness constraints are applied to fields.
|
||||
"""
|
||||
|
||||
@@ -187,6 +187,66 @@ class FieldTest(unittest.TestCase):
|
||||
log.time = '1pm'
|
||||
self.assertRaises(ValidationError, log.validate)
|
||||
|
||||
def test_datetime(self):
|
||||
"""Tests showing pymongo datetime fields handling of microseconds.
|
||||
Microseconds are rounded to the nearest millisecond and pre UTC
|
||||
handling is wonky.
|
||||
|
||||
See: http://api.mongodb.org/python/current/api/bson/son.html#dt
|
||||
"""
|
||||
class LogEntry(Document):
|
||||
date = DateTimeField()
|
||||
|
||||
LogEntry.drop_collection()
|
||||
|
||||
# Post UTC - microseconds are rounded (down) nearest millisecond and dropped
|
||||
d1 = datetime.datetime(1970, 01, 01, 00, 00, 01, 999)
|
||||
d2 = datetime.datetime(1970, 01, 01, 00, 00, 01)
|
||||
log = LogEntry()
|
||||
log.date = d1
|
||||
log.save()
|
||||
log.reload()
|
||||
self.assertNotEquals(log.date, d1)
|
||||
self.assertEquals(log.date, d2)
|
||||
|
||||
# Post UTC - microseconds are rounded (down) nearest millisecond
|
||||
d1 = datetime.datetime(1970, 01, 01, 00, 00, 01, 9999)
|
||||
d2 = datetime.datetime(1970, 01, 01, 00, 00, 01, 9000)
|
||||
log.date = d1
|
||||
log.save()
|
||||
log.reload()
|
||||
self.assertNotEquals(log.date, d1)
|
||||
self.assertEquals(log.date, d2)
|
||||
|
||||
# Pre UTC dates microseconds below 1000 are dropped
|
||||
d1 = datetime.datetime(1969, 12, 31, 23, 59, 59, 999)
|
||||
d2 = datetime.datetime(1969, 12, 31, 23, 59, 59)
|
||||
log.date = d1
|
||||
log.save()
|
||||
log.reload()
|
||||
self.assertNotEquals(log.date, d1)
|
||||
self.assertEquals(log.date, d2)
|
||||
|
||||
# Pre UTC microseconds above 1000 is wonky.
|
||||
# log.date has an invalid microsecond value so I can't construct
|
||||
# a date to compare.
|
||||
#
|
||||
# However, the timedelta is predicable with pre UTC timestamps
|
||||
# It always adds 16 seconds and [777216-776217] microseconds
|
||||
for i in xrange(1001, 3113, 33):
|
||||
d1 = datetime.datetime(1969, 12, 31, 23, 59, 59, i)
|
||||
log.date = d1
|
||||
log.save()
|
||||
log.reload()
|
||||
self.assertNotEquals(log.date, d1)
|
||||
|
||||
delta = log.date - d1
|
||||
self.assertEquals(delta.seconds, 16)
|
||||
microseconds = 777216 - (i % 1000)
|
||||
self.assertEquals(delta.microseconds, microseconds)
|
||||
|
||||
LogEntry.drop_collection()
|
||||
|
||||
def test_list_validation(self):
|
||||
"""Ensure that a list field only accepts lists with valid elements.
|
||||
"""
|
||||
|
||||
@@ -9,6 +9,7 @@ from mongoengine.queryset import (QuerySet, QuerySetManager,
|
||||
MultipleObjectsReturned, DoesNotExist,
|
||||
QueryFieldList)
|
||||
from mongoengine import *
|
||||
from mongoengine.tests import query_counter
|
||||
|
||||
|
||||
class QuerySetTest(unittest.TestCase):
|
||||
@@ -331,6 +332,125 @@ class QuerySetTest(unittest.TestCase):
|
||||
person = self.Person.objects.get(age=50)
|
||||
self.assertEqual(person.name, "User C")
|
||||
|
||||
def test_bulk_insert(self):
|
||||
"""Ensure that query by array position works.
|
||||
"""
|
||||
|
||||
class Comment(EmbeddedDocument):
|
||||
name = StringField()
|
||||
|
||||
class Post(EmbeddedDocument):
|
||||
comments = ListField(EmbeddedDocumentField(Comment))
|
||||
|
||||
class Blog(Document):
|
||||
title = StringField()
|
||||
tags = ListField(StringField())
|
||||
posts = ListField(EmbeddedDocumentField(Post))
|
||||
|
||||
Blog.drop_collection()
|
||||
|
||||
with query_counter() as q:
|
||||
self.assertEqual(q, 0)
|
||||
|
||||
comment1 = Comment(name='testa')
|
||||
comment2 = Comment(name='testb')
|
||||
post1 = Post(comments=[comment1, comment2])
|
||||
post2 = Post(comments=[comment2, comment2])
|
||||
|
||||
blogs = []
|
||||
for i in xrange(1, 100):
|
||||
blogs.append(Blog(title="post %s" % i, posts=[post1, post2]))
|
||||
|
||||
Blog.objects.insert(blogs, load_bulk=False)
|
||||
self.assertEqual(q, 2) # 1 for the inital connection and 1 for the insert
|
||||
|
||||
Blog.objects.insert(blogs)
|
||||
self.assertEqual(q, 4) # 1 for insert, and 1 for in bulk
|
||||
|
||||
Blog.drop_collection()
|
||||
|
||||
comment1 = Comment(name='testa')
|
||||
comment2 = Comment(name='testb')
|
||||
post1 = Post(comments=[comment1, comment2])
|
||||
post2 = Post(comments=[comment2, comment2])
|
||||
blog1 = Blog(title="code", posts=[post1, post2])
|
||||
blog2 = Blog(title="mongodb", posts=[post2, post1])
|
||||
blog1, blog2 = Blog.objects.insert([blog1, blog2])
|
||||
self.assertEqual(blog1.title, "code")
|
||||
self.assertEqual(blog2.title, "mongodb")
|
||||
|
||||
self.assertEqual(Blog.objects.count(), 2)
|
||||
|
||||
# test handles people trying to upsert
|
||||
def throw_operation_error():
|
||||
blogs = Blog.objects
|
||||
Blog.objects.insert(blogs)
|
||||
|
||||
self.assertRaises(OperationError, throw_operation_error)
|
||||
|
||||
# test handles other classes being inserted
|
||||
def throw_operation_error_wrong_doc():
|
||||
class Author(Document):
|
||||
pass
|
||||
Blog.objects.insert(Author())
|
||||
|
||||
self.assertRaises(OperationError, throw_operation_error_wrong_doc)
|
||||
|
||||
def throw_operation_error_not_a_document():
|
||||
Blog.objects.insert("HELLO WORLD")
|
||||
|
||||
self.assertRaises(OperationError, throw_operation_error_not_a_document)
|
||||
|
||||
Blog.drop_collection()
|
||||
|
||||
blog1 = Blog(title="code", posts=[post1, post2])
|
||||
blog1 = Blog.objects.insert(blog1)
|
||||
self.assertEqual(blog1.title, "code")
|
||||
self.assertEqual(Blog.objects.count(), 1)
|
||||
|
||||
Blog.drop_collection()
|
||||
blog1 = Blog(title="code", posts=[post1, post2])
|
||||
obj_id = Blog.objects.insert(blog1, load_bulk=False)
|
||||
self.assertEquals(obj_id.__class__.__name__, 'ObjectId')
|
||||
|
||||
def test_slave_okay(self):
|
||||
"""Ensures that a query can take slave_okay syntax
|
||||
"""
|
||||
person1 = self.Person(name="User A", age=20)
|
||||
person1.save()
|
||||
person2 = self.Person(name="User B", age=30)
|
||||
person2.save()
|
||||
|
||||
# Retrieve the first person from the database
|
||||
person = self.Person.objects.slave_okay(True).first()
|
||||
self.assertTrue(isinstance(person, self.Person))
|
||||
self.assertEqual(person.name, "User A")
|
||||
self.assertEqual(person.age, 20)
|
||||
|
||||
def test_cursor_args(self):
|
||||
"""Ensures the cursor args can be set as expected
|
||||
"""
|
||||
p = self.Person.objects
|
||||
# Check default
|
||||
self.assertEqual(p._cursor_args,
|
||||
{'snapshot': False, 'slave_okay': False, 'timeout': True})
|
||||
|
||||
p.snapshot(False).slave_okay(False).timeout(False)
|
||||
self.assertEqual(p._cursor_args,
|
||||
{'snapshot': False, 'slave_okay': False, 'timeout': False})
|
||||
|
||||
p.snapshot(True).slave_okay(False).timeout(False)
|
||||
self.assertEqual(p._cursor_args,
|
||||
{'snapshot': True, 'slave_okay': False, 'timeout': False})
|
||||
|
||||
p.snapshot(True).slave_okay(True).timeout(False)
|
||||
self.assertEqual(p._cursor_args,
|
||||
{'snapshot': True, 'slave_okay': True, 'timeout': False})
|
||||
|
||||
p.snapshot(True).slave_okay(True).timeout(True)
|
||||
self.assertEqual(p._cursor_args,
|
||||
{'snapshot': True, 'slave_okay': True, 'timeout': True})
|
||||
|
||||
def test_repeated_iteration(self):
|
||||
"""Ensure that QuerySet rewinds itself one iteration finishes.
|
||||
"""
|
||||
@@ -2115,8 +2235,27 @@ class QuerySetTest(unittest.TestCase):
|
||||
Number.drop_collection()
|
||||
|
||||
|
||||
def test_ensure_index(self):
|
||||
"""Ensure that manual creation of indexes works.
|
||||
"""
|
||||
class Comment(Document):
|
||||
message = StringField()
|
||||
|
||||
Comment.objects.ensure_index('message')
|
||||
|
||||
info = Comment.objects._collection.index_information()
|
||||
info = [(value['key'],
|
||||
value.get('unique', False),
|
||||
value.get('sparse', False))
|
||||
for key, value in info.iteritems()]
|
||||
self.assertTrue(([('_types', 1), ('message', 1)], False, False) in info)
|
||||
|
||||
|
||||
class QTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
connect(db='mongoenginetest')
|
||||
|
||||
def test_empty_q(self):
|
||||
"""Ensure that empty Q objects won't hurt.
|
||||
"""
|
||||
|
||||
130
tests/signals.py
Normal file
130
tests/signals.py
Normal file
@@ -0,0 +1,130 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
|
||||
from mongoengine import *
|
||||
from mongoengine import signals
|
||||
|
||||
signal_output = []
|
||||
|
||||
|
||||
class SignalTests(unittest.TestCase):
|
||||
"""
|
||||
Testing signals before/after saving and deleting.
|
||||
"""
|
||||
|
||||
def get_signal_output(self, fn, *args, **kwargs):
|
||||
# Flush any existing signal output
|
||||
global signal_output
|
||||
signal_output = []
|
||||
fn(*args, **kwargs)
|
||||
return signal_output
|
||||
|
||||
def setUp(self):
|
||||
connect(db='mongoenginetest')
|
||||
class Author(Document):
|
||||
name = StringField()
|
||||
|
||||
def __unicode__(self):
|
||||
return self.name
|
||||
|
||||
@classmethod
|
||||
def pre_init(cls, instance, **kwargs):
|
||||
signal_output.append('pre_init signal, %s' % cls.__name__)
|
||||
signal_output.append(str(kwargs['values']))
|
||||
|
||||
@classmethod
|
||||
def post_init(cls, instance, **kwargs):
|
||||
signal_output.append('post_init signal, %s' % instance)
|
||||
|
||||
@classmethod
|
||||
def pre_save(cls, instance, **kwargs):
|
||||
signal_output.append('pre_save signal, %s' % instance)
|
||||
|
||||
@classmethod
|
||||
def post_save(cls, instance, **kwargs):
|
||||
signal_output.append('post_save signal, %s' % instance)
|
||||
if 'created' in kwargs:
|
||||
if kwargs['created']:
|
||||
signal_output.append('Is created')
|
||||
else:
|
||||
signal_output.append('Is updated')
|
||||
|
||||
@classmethod
|
||||
def pre_delete(cls, instance, **kwargs):
|
||||
signal_output.append('pre_delete signal, %s' % instance)
|
||||
|
||||
@classmethod
|
||||
def post_delete(cls, instance, **kwargs):
|
||||
signal_output.append('post_delete signal, %s' % instance)
|
||||
|
||||
self.Author = Author
|
||||
|
||||
# Save up the number of connected signals so that we can check at the end
|
||||
# that all the signals we register get properly unregistered
|
||||
self.pre_signals = (
|
||||
len(signals.pre_init.receivers),
|
||||
len(signals.post_init.receivers),
|
||||
len(signals.pre_save.receivers),
|
||||
len(signals.post_save.receivers),
|
||||
len(signals.pre_delete.receivers),
|
||||
len(signals.post_delete.receivers)
|
||||
)
|
||||
|
||||
signals.pre_init.connect(Author.pre_init)
|
||||
signals.post_init.connect(Author.post_init)
|
||||
signals.pre_save.connect(Author.pre_save)
|
||||
signals.post_save.connect(Author.post_save)
|
||||
signals.pre_delete.connect(Author.pre_delete)
|
||||
signals.post_delete.connect(Author.post_delete)
|
||||
|
||||
def tearDown(self):
|
||||
signals.pre_init.disconnect(self.Author.pre_init)
|
||||
signals.post_init.disconnect(self.Author.post_init)
|
||||
signals.post_delete.disconnect(self.Author.post_delete)
|
||||
signals.pre_delete.disconnect(self.Author.pre_delete)
|
||||
signals.post_save.disconnect(self.Author.post_save)
|
||||
signals.pre_save.disconnect(self.Author.pre_save)
|
||||
|
||||
# Check that all our signals got disconnected properly.
|
||||
post_signals = (
|
||||
len(signals.pre_init.receivers),
|
||||
len(signals.post_init.receivers),
|
||||
len(signals.pre_save.receivers),
|
||||
len(signals.post_save.receivers),
|
||||
len(signals.pre_delete.receivers),
|
||||
len(signals.post_delete.receivers)
|
||||
)
|
||||
|
||||
self.assertEqual(self.pre_signals, post_signals)
|
||||
|
||||
def test_model_signals(self):
|
||||
""" Model saves should throw some signals. """
|
||||
|
||||
def create_author():
|
||||
a1 = self.Author(name='Bill Shakespeare')
|
||||
|
||||
self.assertEqual(self.get_signal_output(create_author), [
|
||||
"pre_init signal, Author",
|
||||
"{'name': 'Bill Shakespeare'}",
|
||||
"post_init signal, Bill Shakespeare",
|
||||
])
|
||||
|
||||
a1 = self.Author(name='Bill Shakespeare')
|
||||
self.assertEqual(self.get_signal_output(a1.save), [
|
||||
"pre_save signal, Bill Shakespeare",
|
||||
"post_save signal, Bill Shakespeare",
|
||||
"Is created"
|
||||
])
|
||||
|
||||
a1.reload()
|
||||
a1.name='William Shakespeare'
|
||||
self.assertEqual(self.get_signal_output(a1.save), [
|
||||
"pre_save signal, William Shakespeare",
|
||||
"post_save signal, William Shakespeare",
|
||||
"Is updated"
|
||||
])
|
||||
|
||||
self.assertEqual(self.get_signal_output(a1.delete), [
|
||||
'pre_delete signal, William Shakespeare',
|
||||
'post_delete signal, William Shakespeare',
|
||||
])
|
||||
Reference in New Issue
Block a user