Merge branch 'master' into pr/368

This commit is contained in:
Ross Lawley
2014-06-27 11:32:19 +01:00
76 changed files with 5414 additions and 2203 deletions

View File

@@ -3,6 +3,7 @@ import sys
sys.path[0:0] = [""]
import unittest
from bson import SON
from mongoengine import *
from mongoengine.connection import get_db
@@ -312,29 +313,24 @@ class DeltaTest(unittest.TestCase):
self.circular_reference_deltas_2(DynamicDocument, Document)
self.circular_reference_deltas_2(DynamicDocument, DynamicDocument)
def circular_reference_deltas_2(self, DocClass1, DocClass2):
def circular_reference_deltas_2(self, DocClass1, DocClass2, dbref=True):
class Person(DocClass1):
name = StringField()
owns = ListField(ReferenceField('Organization'))
employer = ReferenceField('Organization')
owns = ListField(ReferenceField('Organization', dbref=dbref))
employer = ReferenceField('Organization', dbref=dbref)
class Organization(DocClass2):
name = StringField()
owner = ReferenceField('Person')
employees = ListField(ReferenceField('Person'))
owner = ReferenceField('Person', dbref=dbref)
employees = ListField(ReferenceField('Person', dbref=dbref))
Person.drop_collection()
Organization.drop_collection()
person = Person(name="owner")
person.save()
employee = Person(name="employee")
employee.save()
organization = Organization(name="company")
organization.save()
person = Person(name="owner").save()
employee = Person(name="employee").save()
organization = Organization(name="company").save()
person.owns.append(organization)
organization.owner = person
@@ -354,6 +350,8 @@ class DeltaTest(unittest.TestCase):
self.assertEqual(o.owner, p)
self.assertEqual(e.employer, o)
return person, organization, employee
def test_delta_db_field(self):
self.delta_db_field(Document)
self.delta_db_field(DynamicDocument)
@@ -613,13 +611,13 @@ class DeltaTest(unittest.TestCase):
Person.drop_collection()
p = Person(name="James", age=34)
self.assertEqual(p._delta(), ({'age': 34, 'name': 'James',
'_cls': 'Person'}, {}))
self.assertEqual(p._delta(), (
SON([('_cls', 'Person'), ('name', 'James'), ('age', 34)]), {}))
p.doc = 123
del(p.doc)
self.assertEqual(p._delta(), ({'age': 34, 'name': 'James',
'_cls': 'Person'}, {'doc': 1}))
self.assertEqual(p._delta(), (
SON([('_cls', 'Person'), ('name', 'James'), ('age', 34)]), {}))
p = Person()
p.name = "Dean"
@@ -631,14 +629,14 @@ class DeltaTest(unittest.TestCase):
self.assertEqual(p._get_changed_fields(), ['age'])
self.assertEqual(p._delta(), ({'age': 24}, {}))
p = self.Person.objects(age=22).get()
p = Person.objects(age=22).get()
p.age = 24
self.assertEqual(p.age, 24)
self.assertEqual(p._get_changed_fields(), ['age'])
self.assertEqual(p._delta(), ({'age': 24}, {}))
p.save()
self.assertEqual(1, self.Person.objects(age=24).count())
self.assertEqual(1, Person.objects(age=24).count())
def test_dynamic_delta(self):
@@ -685,6 +683,99 @@ class DeltaTest(unittest.TestCase):
self.assertEqual(doc._get_changed_fields(), ['list_field'])
self.assertEqual(doc._delta(), ({}, {'list_field': 1}))
def test_delta_with_dbref_true(self):
person, organization, employee = self.circular_reference_deltas_2(Document, Document, True)
employee.name = 'test'
self.assertEqual(organization._get_changed_fields(), [])
updates, removals = organization._delta()
self.assertEqual({}, removals)
self.assertEqual({}, updates)
organization.employees.append(person)
updates, removals = organization._delta()
self.assertEqual({}, removals)
self.assertTrue('employees' in updates)
def test_delta_with_dbref_false(self):
person, organization, employee = self.circular_reference_deltas_2(Document, Document, False)
employee.name = 'test'
self.assertEqual(organization._get_changed_fields(), [])
updates, removals = organization._delta()
self.assertEqual({}, removals)
self.assertEqual({}, updates)
organization.employees.append(person)
updates, removals = organization._delta()
self.assertEqual({}, removals)
self.assertTrue('employees' in updates)
def test_nested_nested_fields_mark_as_changed(self):
class EmbeddedDoc(EmbeddedDocument):
name = StringField()
class MyDoc(Document):
subs = MapField(MapField(EmbeddedDocumentField(EmbeddedDoc)))
name = StringField()
MyDoc.drop_collection()
mydoc = MyDoc(name='testcase1', subs={'a': {'b': EmbeddedDoc(name='foo')}}).save()
mydoc = MyDoc.objects.first()
subdoc = mydoc.subs['a']['b']
subdoc.name = 'bar'
self.assertEqual(["name"], subdoc._get_changed_fields())
self.assertEqual(["subs.a.b.name"], mydoc._get_changed_fields())
mydoc._clear_changed_fields()
self.assertEqual([], mydoc._get_changed_fields())
def test_referenced_object_changed_attributes(self):
"""Ensures that when you save a new reference to a field, the referenced object isn't altered"""
class Organization(Document):
name = StringField()
class User(Document):
name = StringField()
org = ReferenceField('Organization', required=True)
Organization.drop_collection()
User.drop_collection()
org1 = Organization(name='Org 1')
org1.save()
org2 = Organization(name='Org 2')
org2.save()
user = User(name='Fred', org=org1)
user.save()
org1.reload()
org2.reload()
user.reload()
self.assertEqual(org1.name, 'Org 1')
self.assertEqual(org2.name, 'Org 2')
self.assertEqual(user.name, 'Fred')
user.name = 'Harold'
user.org = org2
org2.name = 'New Org 2'
self.assertEqual(org2.name, 'New Org 2')
user.save()
org2.save()
self.assertEqual(org2.name, 'New Org 2')
org2.reload()
self.assertEqual(org2.name, 'New Org 2')
if __name__ == '__main__':
unittest.main()

View File

@@ -292,6 +292,44 @@ class DynamicTest(unittest.TestCase):
person.save()
self.assertEqual(Person.objects.first().age, 35)
def test_dynamic_and_embedded_dict_access(self):
"""Ensure embedded dynamic documents work with dict[] style access"""
class Address(EmbeddedDocument):
city = StringField()
class Person(DynamicDocument):
name = StringField()
Person.drop_collection()
Person(name="Ross", address=Address(city="London")).save()
person = Person.objects.first()
person.attrval = "This works"
person["phone"] = "555-1212" # but this should too
# Same thing two levels deep
person["address"]["city"] = "Lundenne"
person.save()
self.assertEqual(Person.objects.first().address.city, "Lundenne")
self.assertEqual(Person.objects.first().phone, "555-1212")
person = Person.objects.first()
person.address = Address(city="Londinium")
person.save()
self.assertEqual(Person.objects.first().address.city, "Londinium")
person = Person.objects.first()
person["age"] = 35
person.save()
self.assertEqual(Person.objects.first().age, 35)
if __name__ == '__main__':
unittest.main()

View File

@@ -156,6 +156,25 @@ class IndexesTest(unittest.TestCase):
self.assertEqual([{'fields': [('_cls', 1), ('title', 1)]}],
A._meta['index_specs'])
def test_index_no_cls(self):
"""Ensure index specs are inhertited correctly"""
class A(Document):
title = StringField()
meta = {
'indexes': [
{'fields': ('title',), 'cls': False},
],
'allow_inheritance': True,
'index_cls': False
}
self.assertEqual([('title', 1)], A._meta['index_specs'][0]['fields'])
A._get_collection().drop_indexes()
A.ensure_indexes()
info = A._get_collection().index_information()
self.assertEqual(len(info.keys()), 2)
def test_build_index_spec_is_not_destructive(self):
class MyDoc(Document):
@@ -472,7 +491,7 @@ class IndexesTest(unittest.TestCase):
def invalid_index_2():
return BlogPost.objects.hint(('tags', 1))
self.assertRaises(TypeError, invalid_index_2)
self.assertRaises(Exception, invalid_index_2)
def test_unique(self):
"""Ensure that uniqueness constraints are applied to fields.

View File

@@ -10,11 +10,12 @@ import uuid
from datetime import datetime
from bson import DBRef
from tests.fixtures import PickleEmbedded, PickleTest, PickleSignalsTest
from tests.fixtures import (PickleEmbedded, PickleTest, PickleSignalsTest,
PickleDyanmicEmbedded, PickleDynamicTest)
from mongoengine import *
from mongoengine.errors import (NotRegistered, InvalidDocumentError,
InvalidQueryError)
InvalidQueryError, NotUniqueError)
from mongoengine.queryset import NULLIFY, Q
from mongoengine.connection import get_db
from mongoengine.base import get_document
@@ -56,7 +57,7 @@ class InstanceTest(unittest.TestCase):
date = DateTimeField(default=datetime.now)
meta = {
'max_documents': 10,
'max_size': 90000,
'max_size': 4096,
}
Log.drop_collection()
@@ -74,7 +75,7 @@ class InstanceTest(unittest.TestCase):
options = Log.objects._collection.options()
self.assertEqual(options['capped'], True)
self.assertEqual(options['max'], 10)
self.assertEqual(options['size'], 90000)
self.assertTrue(options['size'] >= 4096)
# Check that the document cannot be redefined with different options
def recreate_log_document():
@@ -352,6 +353,14 @@ class InstanceTest(unittest.TestCase):
self.assertEqual(person.name, "Test User")
self.assertEqual(person.age, 20)
person.reload('age')
self.assertEqual(person.name, "Test User")
self.assertEqual(person.age, 21)
person.reload()
self.assertEqual(person.name, "Mr Test User")
self.assertEqual(person.age, 21)
person.reload()
self.assertEqual(person.name, "Mr Test User")
self.assertEqual(person.age, 21)
@@ -401,6 +410,7 @@ class InstanceTest(unittest.TestCase):
'embedded_field.dict_field'])
doc.save()
self.assertEqual(len(doc.list_field), 4)
doc = doc.reload(10)
self.assertEqual(doc._get_changed_fields(), [])
self.assertEqual(len(doc.list_field), 4)
@@ -408,6 +418,37 @@ class InstanceTest(unittest.TestCase):
self.assertEqual(len(doc.embedded_field.list_field), 4)
self.assertEqual(len(doc.embedded_field.dict_field), 2)
doc.list_field.append(1)
doc.save()
doc.dict_field['extra'] = 1
doc = doc.reload(10, 'list_field')
self.assertEqual(doc._get_changed_fields(), [])
self.assertEqual(len(doc.list_field), 5)
self.assertEqual(len(doc.dict_field), 3)
self.assertEqual(len(doc.embedded_field.list_field), 4)
self.assertEqual(len(doc.embedded_field.dict_field), 2)
def test_reload_doesnt_exist(self):
class Foo(Document):
pass
f = Foo()
try:
f.reload()
except Foo.DoesNotExist:
pass
except Exception as ex:
self.assertFalse("Threw wrong exception")
f.save()
f.delete()
try:
f.reload()
except Foo.DoesNotExist:
pass
except Exception as ex:
self.assertFalse("Threw wrong exception")
def test_dictionary_access(self):
"""Ensure that dictionary-style field access works properly.
"""
@@ -443,6 +484,13 @@ class InstanceTest(unittest.TestCase):
self.assertEqual(Employee(name="Bob", age=35, salary=0).to_mongo().keys(),
['_cls', 'name', 'age', 'salary'])
def test_embedded_document_to_mongo_id(self):
class SubDoc(EmbeddedDocument):
id = StringField(required=True)
sub_doc = SubDoc(id="abc")
self.assertEqual(sub_doc.to_mongo().keys(), ['id'])
def test_embedded_document(self):
"""Ensure that embedded documents are set up correctly.
"""
@@ -482,6 +530,23 @@ class InstanceTest(unittest.TestCase):
doc = Doc.objects.get()
self.assertEqual(doc, doc.embedded_field[0]._instance)
def test_instance_is_set_on_setattr(self):
class Email(EmbeddedDocument):
email = EmailField()
class Account(Document):
email = EmbeddedDocumentField(Email)
Account.drop_collection()
acc = Account()
acc.email = Email(email='test@example.com')
self.assertTrue(hasattr(acc._data["email"], "_instance"))
acc.save()
acc1 = Account.objects.first()
self.assertTrue(hasattr(acc1._data["email"], "_instance"))
def test_document_clean(self):
class TestDocument(Document):
status = StringField()
@@ -771,6 +836,80 @@ class InstanceTest(unittest.TestCase):
p1.reload()
self.assertEqual(p1.name, p.parent.name)
def test_save_atomicity_condition(self):
class Widget(Document):
toggle = BooleanField(default=False)
count = IntField(default=0)
save_id = UUIDField()
def flip(widget):
widget.toggle = not widget.toggle
widget.count += 1
def UUID(i):
return uuid.UUID(int=i)
Widget.drop_collection()
w1 = Widget(toggle=False, save_id=UUID(1))
# ignore save_condition on new record creation
w1.save(save_condition={'save_id':UUID(42)})
w1.reload()
self.assertFalse(w1.toggle)
self.assertEqual(w1.save_id, UUID(1))
self.assertEqual(w1.count, 0)
# mismatch in save_condition prevents save
flip(w1)
self.assertTrue(w1.toggle)
self.assertEqual(w1.count, 1)
w1.save(save_condition={'save_id':UUID(42)})
w1.reload()
self.assertFalse(w1.toggle)
self.assertEqual(w1.count, 0)
# matched save_condition allows save
flip(w1)
self.assertTrue(w1.toggle)
self.assertEqual(w1.count, 1)
w1.save(save_condition={'save_id':UUID(1)})
w1.reload()
self.assertTrue(w1.toggle)
self.assertEqual(w1.count, 1)
# save_condition can be used to ensure atomic read & updates
# i.e., prevent interleaved reads and writes from separate contexts
w2 = Widget.objects.get()
self.assertEqual(w1, w2)
old_id = w1.save_id
flip(w1)
w1.save_id = UUID(2)
w1.save(save_condition={'save_id':old_id})
w1.reload()
self.assertFalse(w1.toggle)
self.assertEqual(w1.count, 2)
flip(w2)
flip(w2)
w2.save(save_condition={'save_id':old_id})
w2.reload()
self.assertFalse(w2.toggle)
self.assertEqual(w2.count, 2)
# save_condition uses mongoengine-style operator syntax
flip(w1)
w1.save(save_condition={'count__lt':w1.count})
w1.reload()
self.assertTrue(w1.toggle)
self.assertEqual(w1.count, 3)
flip(w1)
w1.save(save_condition={'count__gte':w1.count})
w1.reload()
self.assertTrue(w1.toggle)
self.assertEqual(w1.count, 3)
def test_update(self):
"""Ensure that an existing document is updated instead of be
overwritten."""
@@ -941,6 +1080,16 @@ class InstanceTest(unittest.TestCase):
self.assertRaises(InvalidQueryError, update_no_op_raises)
def test_update_unique_field(self):
class Doc(Document):
name = StringField(unique=True)
doc1 = Doc(name="first").save()
doc2 = Doc(name="second").save()
self.assertRaises(NotUniqueError, lambda:
doc2.update(set__name=doc1.name))
def test_embedded_update(self):
"""
Test update on `EmbeddedDocumentField` fields
@@ -1827,6 +1976,29 @@ class InstanceTest(unittest.TestCase):
self.assertEqual(pickle_doc.string, "Two")
self.assertEqual(pickle_doc.lists, ["1", "2", "3"])
def test_dynamic_document_pickle(self):
pickle_doc = PickleDynamicTest(name="test", number=1, string="One", lists=['1', '2'])
pickle_doc.embedded = PickleDyanmicEmbedded(foo="Bar")
pickled_doc = pickle.dumps(pickle_doc) # make sure pickling works even before the doc is saved
pickle_doc.save()
pickled_doc = pickle.dumps(pickle_doc)
resurrected = pickle.loads(pickled_doc)
self.assertEqual(resurrected, pickle_doc)
self.assertEqual(resurrected._fields_ordered,
pickle_doc._fields_ordered)
self.assertEqual(resurrected._dynamic_fields.keys(),
pickle_doc._dynamic_fields.keys())
self.assertEqual(resurrected.embedded, pickle_doc.embedded)
self.assertEqual(resurrected.embedded._fields_ordered,
pickle_doc.embedded._fields_ordered)
self.assertEqual(resurrected.embedded._dynamic_fields.keys(),
pickle_doc.embedded._dynamic_fields.keys())
def test_picklable_on_signals(self):
pickle_doc = PickleSignalsTest(number=1, string="One", lists=['1', '2'])
pickle_doc.embedded = PickleEmbedded()
@@ -2209,6 +2381,8 @@ class InstanceTest(unittest.TestCase):
log.machine = "Localhost"
log.save()
self.assertTrue(log.id is not None)
log.log = "Saving"
log.save()
@@ -2232,6 +2406,8 @@ class InstanceTest(unittest.TestCase):
log.machine = "Localhost"
log.save()
self.assertTrue(log.id is not None)
log.log = "Saving"
log.save()
@@ -2289,6 +2465,16 @@ class InstanceTest(unittest.TestCase):
self.assertEqual(person.name, "Test User")
self.assertEqual(person.age, 42)
def test_mixed_creation_dynamic(self):
"""Ensure that document may be created using mixed arguments.
"""
class Person(DynamicDocument):
name = StringField()
person = Person("Test User", age=42)
self.assertEqual(person.name, "Test User")
self.assertEqual(person.age, 42)
def test_bad_mixed_creation(self):
"""Ensure that document gives correct error when duplicating arguments
"""
@@ -2329,7 +2515,7 @@ class InstanceTest(unittest.TestCase):
for parameter_name, parameter in self.parameters.iteritems():
parameter.expand()
class System(Document):
class NodesSystem(Document):
name = StringField(required=True)
nodes = MapField(ReferenceField(Node, dbref=False))
@@ -2337,20 +2523,38 @@ class InstanceTest(unittest.TestCase):
for node_name, node in self.nodes.iteritems():
node.expand()
node.save(*args, **kwargs)
super(System, self).save(*args, **kwargs)
super(NodesSystem, self).save(*args, **kwargs)
System.drop_collection()
NodesSystem.drop_collection()
Node.drop_collection()
system = System(name="system")
system = NodesSystem(name="system")
system.nodes["node"] = Node()
system.save()
system.nodes["node"].parameters["param"] = Parameter()
system.save()
system = System.objects.first()
system = NodesSystem.objects.first()
self.assertEqual("UNDEFINED", system.nodes["node"].parameters["param"].macros["test"].value)
def test_embedded_document_equality(self):
class Test(Document):
field = StringField(required=True)
class Embedded(EmbeddedDocument):
ref = ReferenceField(Test)
Test.drop_collection()
test = Test(field='123').save() # has id
e = Embedded(ref=test)
f1 = Embedded._from_son(e.to_mongo())
f2 = Embedded._from_son(e.to_mongo())
self.assertEqual(f1, f2)
f1.ref # Dereferences lazily
self.assertEqual(f1, f2)
if __name__ == '__main__':
unittest.main()

View File

@@ -31,6 +31,10 @@ class TestJson(unittest.TestCase):
doc = Doc(string="Hi", embedded_field=Embedded(string="Hi"))
doc_json = doc.to_json(sort_keys=True, separators=(',', ':'))
expected_json = """{"embedded_field":{"string":"Hi"},"string":"Hi"}"""
self.assertEqual(doc_json, expected_json)
self.assertEqual(doc, Doc.from_json(doc.to_json()))
def test_json_complex(self):

View File

@@ -384,6 +384,9 @@ class FieldTest(unittest.TestCase):
person.height = 4.0
self.assertRaises(ValidationError, person.validate)
person_2 = Person(height='something invalid')
self.assertRaises(ValidationError, person_2.validate)
def test_decimal_validation(self):
"""Ensure that invalid values cannot be assigned to decimal fields.
"""
@@ -405,6 +408,11 @@ class FieldTest(unittest.TestCase):
self.assertRaises(ValidationError, person.validate)
person.height = Decimal('4.0')
self.assertRaises(ValidationError, person.validate)
person.height = 'something invalid'
self.assertRaises(ValidationError, person.validate)
person_2 = Person(height='something invalid')
self.assertRaises(ValidationError, person_2.validate)
Person.drop_collection()
@@ -1018,6 +1026,32 @@ class FieldTest(unittest.TestCase):
e.mapping = {}
self.assertEqual([], e._changed_fields)
def test_slice_marks_field_as_changed(self):
class Simple(Document):
widgets = ListField()
simple = Simple(widgets=[1, 2, 3, 4]).save()
simple.widgets[:3] = []
self.assertEqual(['widgets'], simple._changed_fields)
simple.save()
simple = simple.reload()
self.assertEqual(simple.widgets, [4])
def test_del_slice_marks_field_as_changed(self):
class Simple(Document):
widgets = ListField()
simple = Simple(widgets=[1, 2, 3, 4]).save()
del simple.widgets[:3]
self.assertEqual(['widgets'], simple._changed_fields)
simple.save()
simple = simple.reload()
self.assertEqual(simple.widgets, [4])
def test_list_field_complex(self):
"""Ensure that the list fields can handle the complex types."""
@@ -1083,9 +1117,15 @@ class FieldTest(unittest.TestCase):
post.info = {'$title': 'test'}
self.assertRaises(ValidationError, post.validate)
post.info = {'nested': {'$title': 'test'}}
self.assertRaises(ValidationError, post.validate)
post.info = {'the.title': 'test'}
self.assertRaises(ValidationError, post.validate)
post.info = {'nested': {'the.title': 'test'}}
self.assertRaises(ValidationError, post.validate)
post.info = {1: 'test'}
self.assertRaises(ValidationError, post.validate)
@@ -1864,6 +1904,37 @@ class FieldTest(unittest.TestCase):
Post.drop_collection()
User.drop_collection()
def test_generic_reference_list_item_modification(self):
"""Ensure that modifications of related documents (through generic reference) don't influence on querying
"""
class Post(Document):
title = StringField()
class User(Document):
username = StringField()
bookmarks = ListField(GenericReferenceField())
Post.drop_collection()
User.drop_collection()
post_1 = Post(title="Behind the Scenes of the Pavement Reunion")
post_1.save()
user = User(bookmarks=[post_1])
user.save()
post_1.title = "Title was modified"
user.username = "New username"
user.save()
user = User.objects(bookmarks__all=[post_1]).first()
self.assertNotEqual(user, None)
self.assertEqual(user.bookmarks[0], post_1)
Post.drop_collection()
User.drop_collection()
def test_binary_fields(self):
"""Ensure that binary fields can be stored and retrieved.
"""
@@ -2428,6 +2499,9 @@ class FieldTest(unittest.TestCase):
user = User(email="ross@example.com")
self.assertTrue(user.validate() is None)
user = User(email="ross@example.co.uk")
self.assertTrue(user.validate() is None)
user = User(email=("Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S"
"ucictfqpdkK9iS1zeFw8sg7s7cwAF7suIfUfeyueLpfosjn3"
"aJIazqqWkm7.net"))
@@ -2436,6 +2510,9 @@ class FieldTest(unittest.TestCase):
user = User(email='me@localhost')
self.assertRaises(ValidationError, user.validate)
user = User(email="ross@example.com.")
self.assertRaises(ValidationError, user.validate)
def test_email_field_honors_regex(self):
class User(Document):
email = EmailField(regex=r'\w+@example.com')
@@ -2448,6 +2525,92 @@ class FieldTest(unittest.TestCase):
user = User(email='me@example.com')
self.assertTrue(user.validate() is None)
def test_tuples_as_tuples(self):
"""
Ensure that tuples remain tuples when they are
inside a ComplexBaseField
"""
from mongoengine.base import BaseField
class EnumField(BaseField):
def __init__(self, **kwargs):
super(EnumField, self).__init__(**kwargs)
def to_mongo(self, value):
return value
def to_python(self, value):
return tuple(value)
class TestDoc(Document):
items = ListField(EnumField())
TestDoc.drop_collection()
tuples = [(100, 'Testing')]
doc = TestDoc()
doc.items = tuples
doc.save()
x = TestDoc.objects().get()
self.assertTrue(x is not None)
self.assertTrue(len(x.items) == 1)
self.assertTrue(tuple(x.items[0]) in tuples)
self.assertTrue(x.items[0] in tuples)
def test_dynamic_fields_class(self):
class Doc2(Document):
field_1 = StringField(db_field='f')
class Doc(Document):
my_id = IntField(required=True, unique=True, primary_key=True)
embed_me = DynamicField(db_field='e')
field_x = StringField(db_field='x')
Doc.drop_collection()
Doc2.drop_collection()
doc2 = Doc2(field_1="hello")
doc = Doc(my_id=1, embed_me=doc2, field_x="x")
self.assertRaises(OperationError, doc.save)
doc2.save()
doc.save()
doc = Doc.objects.get()
self.assertEqual(doc.embed_me.field_1, "hello")
def test_dynamic_fields_embedded_class(self):
class Embed(EmbeddedDocument):
field_1 = StringField(db_field='f')
class Doc(Document):
my_id = IntField(required=True, unique=True, primary_key=True)
embed_me = DynamicField(db_field='e')
field_x = StringField(db_field='x')
Doc.drop_collection()
Doc(my_id=1, embed_me=Embed(field_1="hello"), field_x="x").save()
doc = Doc.objects.get()
self.assertEqual(doc.embed_me.field_1, "hello")
def test_invalid_dict_value(self):
class DictFieldTest(Document):
dictionary = DictField(required=True)
DictFieldTest.drop_collection()
test = DictFieldTest(dictionary=None)
test.dictionary # Just access to test getter
self.assertRaises(ValidationError, test.validate)
test = DictFieldTest(dictionary=False)
test.dictionary # Just access to test getter
self.assertRaises(ValidationError, test.validate)
if __name__ == '__main__':
unittest.main()

View File

@@ -53,11 +53,12 @@ class FileTest(unittest.TestCase):
content_type = 'text/plain'
putfile = PutFile()
putfile.the_file.put(text, content_type=content_type)
putfile.the_file.put(text, content_type=content_type, filename="hello")
putfile.save()
result = PutFile.objects.first()
self.assertTrue(putfile == result)
self.assertEqual("%s" % result.the_file, "<GridFSProxy: hello>")
self.assertEqual(result.the_file.read(), text)
self.assertEqual(result.the_file.content_type, content_type)
result.the_file.delete() # Remove file from GridFS
@@ -278,7 +279,7 @@ class FileTest(unittest.TestCase):
t.image.put(f)
self.fail("Should have raised an invalidation error")
except ValidationError, e:
self.assertEquals("%s" % e, "Invalid image: cannot identify image file")
self.assertEqual("%s" % e, "Invalid image: cannot identify image file %s" % f)
t = TestImage()
t.image.put(open(TEST_IMAGE_PATH, 'rb'))
@@ -455,5 +456,31 @@ class FileTest(unittest.TestCase):
self.assertEqual(1, TestImage.objects(Q(image1=grid_id)
or Q(image2=grid_id)).count())
def test_complex_field_filefield(self):
"""Ensure you can add meta data to file"""
class Animal(Document):
genus = StringField()
family = StringField()
photos = ListField(FileField())
Animal.drop_collection()
marmot = Animal(genus='Marmota', family='Sciuridae')
marmot_photo = open(TEST_IMAGE_PATH, 'rb') # Retrieve a photo from disk
photos_field = marmot._fields['photos'].field
new_proxy = photos_field.get_proxy_obj('photos', marmot)
new_proxy.put(marmot_photo, content_type='image/jpeg', foo='bar')
marmot_photo.close()
marmot.photos.append(new_proxy)
marmot.save()
marmot = Animal.objects.get()
self.assertEqual(marmot.photos[0].content_type, 'image/jpeg')
self.assertEqual(marmot.photos[0].foo, 'bar')
self.assertEqual(marmot.photos[0].get().length, 8313)
if __name__ == '__main__':
unittest.main()

View File

@@ -75,6 +75,12 @@ class GeoFieldTest(unittest.TestCase):
self._test_for_expected_error(Location, coord, expected)
Location(loc=[1, 2]).validate()
Location(loc={
"type": "Point",
"coordinates": [
81.4471435546875,
23.61432859499169
]}).validate()
def test_linestring_validation(self):
class Location(Document):

View File

@@ -17,6 +17,14 @@ class PickleTest(Document):
photo = FileField()
class PickleDyanmicEmbedded(DynamicEmbeddedDocument):
date = DateTimeField(default=datetime.now)
class PickleDynamicTest(DynamicDocument):
number = IntField()
class PickleSignalsTest(Document):
number = IntField()
string = StringField(choices=(('One', '1'), ('Two', '2')))

View File

@@ -3,3 +3,4 @@ from field_list import *
from queryset import *
from visitor import *
from geo import *
from modify import *

View File

@@ -162,6 +162,10 @@ class OnlyExcludeAllTest(unittest.TestCase):
self.assertEqual(obj.name, person.name)
self.assertEqual(obj.age, person.age)
obj = self.Person.objects.only(*('id', 'name',)).get()
self.assertEqual(obj.name, person.name)
self.assertEqual(obj.age, None)
# Check polymorphism still works
class Employee(self.Person):
salary = IntField(db_field='wage')
@@ -395,5 +399,28 @@ class OnlyExcludeAllTest(unittest.TestCase):
numbers = Numbers.objects.fields(embedded__n={"$slice": [-5, 10]}).get()
self.assertEqual(numbers.embedded.n, [-5, -4, -3, -2, -1])
def test_exclude_from_subclasses_docs(self):
class Base(Document):
username = StringField()
meta = {'allow_inheritance': True}
class Anon(Base):
anon = BooleanField()
class User(Base):
password = StringField()
wibble = StringField()
Base.drop_collection()
User(username="mongodb", password="secret").save()
user = Base.objects().exclude("password", "wibble").first()
self.assertEqual(user.password, None)
self.assertRaises(LookUpError, Base.objects.exclude, "made_up")
if __name__ == '__main__':
unittest.main()

View File

@@ -5,6 +5,8 @@ import unittest
from datetime import datetime, timedelta
from mongoengine import *
from nose.plugins.skip import SkipTest
__all__ = ("GeoQueriesTest",)
@@ -139,6 +141,7 @@ class GeoQueriesTest(unittest.TestCase):
def test_spherical_geospatial_operators(self):
"""Ensure that spherical geospatial queries are working
"""
raise SkipTest("https://jira.mongodb.org/browse/SERVER-14039")
class Point(Document):
location = GeoPointField()
@@ -414,5 +417,47 @@ class GeoQueriesTest(unittest.TestCase):
roads = Road.objects.filter(poly__geo_intersects={"$geometry": polygon}).count()
self.assertEqual(1, roads)
def test_2dsphere_point_sets_correctly(self):
class Location(Document):
loc = PointField()
Location.drop_collection()
Location(loc=[1,2]).save()
loc = Location.objects.as_pymongo()[0]
self.assertEqual(loc["loc"], {"type": "Point", "coordinates": [1, 2]})
Location.objects.update(set__loc=[2,1])
loc = Location.objects.as_pymongo()[0]
self.assertEqual(loc["loc"], {"type": "Point", "coordinates": [2, 1]})
def test_2dsphere_linestring_sets_correctly(self):
class Location(Document):
line = LineStringField()
Location.drop_collection()
Location(line=[[1, 2], [2, 2]]).save()
loc = Location.objects.as_pymongo()[0]
self.assertEqual(loc["line"], {"type": "LineString", "coordinates": [[1, 2], [2, 2]]})
Location.objects.update(set__line=[[2, 1], [1, 2]])
loc = Location.objects.as_pymongo()[0]
self.assertEqual(loc["line"], {"type": "LineString", "coordinates": [[2, 1], [1, 2]]})
def test_geojson_PolygonField(self):
class Location(Document):
poly = PolygonField()
Location.drop_collection()
Location(poly=[[[40, 5], [40, 6], [41, 6], [40, 5]]]).save()
loc = Location.objects.as_pymongo()[0]
self.assertEqual(loc["poly"], {"type": "Polygon", "coordinates": [[[40, 5], [40, 6], [41, 6], [40, 5]]]})
Location.objects.update(set__poly=[[[40, 4], [40, 6], [41, 6], [40, 4]]])
loc = Location.objects.as_pymongo()[0]
self.assertEqual(loc["poly"], {"type": "Polygon", "coordinates": [[[40, 4], [40, 6], [41, 6], [40, 4]]]})
if __name__ == '__main__':
unittest.main()

102
tests/queryset/modify.py Normal file
View File

@@ -0,0 +1,102 @@
import sys
sys.path[0:0] = [""]
import unittest
from mongoengine import connect, Document, IntField
__all__ = ("FindAndModifyTest",)
class Doc(Document):
id = IntField(primary_key=True)
value = IntField()
class FindAndModifyTest(unittest.TestCase):
def setUp(self):
connect(db="mongoenginetest")
Doc.drop_collection()
def assertDbEqual(self, docs):
self.assertEqual(list(Doc._collection.find().sort("id")), docs)
def test_modify(self):
Doc(id=0, value=0).save()
doc = Doc(id=1, value=1).save()
old_doc = Doc.objects(id=1).modify(set__value=-1)
self.assertEqual(old_doc.to_json(), doc.to_json())
self.assertDbEqual([{"_id": 0, "value": 0}, {"_id": 1, "value": -1}])
def test_modify_with_new(self):
Doc(id=0, value=0).save()
doc = Doc(id=1, value=1).save()
new_doc = Doc.objects(id=1).modify(set__value=-1, new=True)
doc.value = -1
self.assertEqual(new_doc.to_json(), doc.to_json())
self.assertDbEqual([{"_id": 0, "value": 0}, {"_id": 1, "value": -1}])
def test_modify_not_existing(self):
Doc(id=0, value=0).save()
self.assertEqual(Doc.objects(id=1).modify(set__value=-1), None)
self.assertDbEqual([{"_id": 0, "value": 0}])
def test_modify_with_upsert(self):
Doc(id=0, value=0).save()
old_doc = Doc.objects(id=1).modify(set__value=1, upsert=True)
self.assertEqual(old_doc, None)
self.assertDbEqual([{"_id": 0, "value": 0}, {"_id": 1, "value": 1}])
def test_modify_with_upsert_existing(self):
Doc(id=0, value=0).save()
doc = Doc(id=1, value=1).save()
old_doc = Doc.objects(id=1).modify(set__value=-1, upsert=True)
self.assertEqual(old_doc.to_json(), doc.to_json())
self.assertDbEqual([{"_id": 0, "value": 0}, {"_id": 1, "value": -1}])
def test_modify_with_upsert_with_new(self):
Doc(id=0, value=0).save()
new_doc = Doc.objects(id=1).modify(upsert=True, new=True, set__value=1)
self.assertEqual(new_doc.to_mongo(), {"_id": 1, "value": 1})
self.assertDbEqual([{"_id": 0, "value": 0}, {"_id": 1, "value": 1}])
def test_modify_with_remove(self):
Doc(id=0, value=0).save()
doc = Doc(id=1, value=1).save()
old_doc = Doc.objects(id=1).modify(remove=True)
self.assertEqual(old_doc.to_json(), doc.to_json())
self.assertDbEqual([{"_id": 0, "value": 0}])
def test_find_and_modify_with_remove_not_existing(self):
Doc(id=0, value=0).save()
self.assertEqual(Doc.objects(id=1).modify(remove=True), None)
self.assertDbEqual([{"_id": 0, "value": 0}])
def test_modify_with_order_by(self):
Doc(id=0, value=3).save()
Doc(id=1, value=2).save()
Doc(id=2, value=1).save()
doc = Doc(id=3, value=0).save()
old_doc = Doc.objects().order_by("-id").modify(set__value=-1)
self.assertEqual(old_doc.to_json(), doc.to_json())
self.assertDbEqual([
{"_id": 0, "value": 3}, {"_id": 1, "value": 2},
{"_id": 2, "value": 1}, {"_id": 3, "value": -1}])
def test_modify_with_fields(self):
Doc(id=0, value=0).save()
Doc(id=1, value=1).save()
old_doc = Doc.objects(id=1).only("id").modify(set__value=-1)
self.assertEqual(old_doc.to_mongo(), {"_id": 1})
self.assertDbEqual([{"_id": 0, "value": 0}, {"_id": 1, "value": -1}])
if __name__ == '__main__':
unittest.main()

View File

@@ -14,9 +14,9 @@ from pymongo.read_preferences import ReadPreference
from bson import ObjectId
from mongoengine import *
from mongoengine.connection import get_connection
from mongoengine.connection import get_connection, get_db
from mongoengine.python_support import PY3
from mongoengine.context_managers import query_counter
from mongoengine.context_managers import query_counter, switch_db
from mongoengine.queryset import (QuerySet, QuerySetManager,
MultipleObjectsReturned, DoesNotExist,
queryset_manager)
@@ -25,17 +25,29 @@ from mongoengine.errors import InvalidQueryError
__all__ = ("QuerySetTest",)
class db_ops_tracker(query_counter):
def get_ops(self):
ignore_query = {"ns": {"$ne": "%s.system.indexes" % self.db.name}}
return list(self.db.system.profile.find(ignore_query))
class QuerySetTest(unittest.TestCase):
def setUp(self):
connect(db='mongoenginetest')
connect(db='mongoenginetest2', alias='test2')
class PersonMeta(EmbeddedDocument):
weight = IntField()
class Person(Document):
name = StringField()
age = IntField()
person_meta = EmbeddedDocumentField(PersonMeta)
meta = {'allow_inheritance': True}
Person.drop_collection()
self.PersonMeta = PersonMeta
self.Person = Person
def test_initialisation(self):
@@ -536,6 +548,23 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(club.members['John']['gender'], "F")
self.assertEqual(club.members['John']['age'], 14)
def test_update_results(self):
self.Person.drop_collection()
result = self.Person(name="Bob", age=25).update(upsert=True, full_result=True)
self.assertTrue(isinstance(result, dict))
self.assertTrue("upserted" in result)
self.assertFalse(result["updatedExisting"])
bob = self.Person.objects.first()
result = bob.update(set__age=30, full_result=True)
self.assertTrue(isinstance(result, dict))
self.assertTrue(result["updatedExisting"])
self.Person(name="Bob", age=20).save()
result = self.Person.objects(name="Bob").update(set__name="bobby", multi=True)
self.assertEqual(result, 2)
def test_upsert(self):
self.Person.drop_collection()
@@ -628,7 +657,10 @@ class QuerySetTest(unittest.TestCase):
blogs.append(Blog(title="post %s" % i, posts=[post1, post2]))
Blog.objects.insert(blogs, load_bulk=False)
self.assertEqual(q, 1) # 1 for the insert
if (get_connection().max_wire_version <= 1):
self.assertEqual(q, 1)
else:
self.assertEqual(q, 99) # profiling logs each doc now in the bulk op
Blog.drop_collection()
Blog.ensure_indexes()
@@ -637,7 +669,10 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(q, 0)
Blog.objects.insert(blogs)
self.assertEqual(q, 2) # 1 for insert, and 1 for in bulk fetch
if (get_connection().max_wire_version <= 1):
self.assertEqual(q, 2) # 1 for insert, and 1 for in bulk fetch
else:
self.assertEqual(q, 100) # 99 for insert, and 1 for in bulk fetch
Blog.drop_collection()
@@ -1018,6 +1053,54 @@ class QuerySetTest(unittest.TestCase):
expected = [blog_post_1, blog_post_2, blog_post_3]
self.assertSequence(qs, expected)
def test_clear_ordering(self):
""" Ensure that the default ordering can be cleared by calling order_by().
"""
class BlogPost(Document):
title = StringField()
published_date = DateTimeField()
meta = {
'ordering': ['-published_date']
}
BlogPost.drop_collection()
with db_ops_tracker() as q:
BlogPost.objects.filter(title='whatever').first()
self.assertEqual(len(q.get_ops()), 1)
self.assertEqual(q.get_ops()[0]['query']['$orderby'], {u'published_date': -1})
with db_ops_tracker() as q:
BlogPost.objects.filter(title='whatever').order_by().first()
self.assertEqual(len(q.get_ops()), 1)
print q.get_ops()[0]['query']
self.assertFalse('$orderby' in q.get_ops()[0]['query'])
def test_no_ordering_for_get(self):
""" Ensure that Doc.objects.get doesn't use any ordering.
"""
class BlogPost(Document):
title = StringField()
published_date = DateTimeField()
meta = {
'ordering': ['-published_date']
}
BlogPost.objects.create(title='whatever', published_date=datetime.utcnow())
with db_ops_tracker() as q:
BlogPost.objects.get(title='whatever')
self.assertEqual(len(q.get_ops()), 1)
self.assertFalse('$orderby' in q.get_ops()[0]['query'])
# Ordering should be ignored for .get even if we set it explicitly
with db_ops_tracker() as q:
BlogPost.objects.order_by('-title').get(title='whatever')
self.assertEqual(len(q.get_ops()), 1)
self.assertFalse('$orderby' in q.get_ops()[0]['query'])
def test_find_embedded(self):
"""Ensure that an embedded document is properly returned from a query.
"""
@@ -1475,9 +1558,6 @@ class QuerySetTest(unittest.TestCase):
def test_pull_nested(self):
class User(Document):
name = StringField()
class Collaborator(EmbeddedDocument):
user = StringField()
@@ -1492,8 +1572,7 @@ class QuerySetTest(unittest.TestCase):
Site.drop_collection()
c = Collaborator(user='Esteban')
s = Site(name="test", collaborators=[c])
s.save()
s = Site(name="test", collaborators=[c]).save()
Site.objects(id=s.id).update_one(pull__collaborators__user='Esteban')
self.assertEqual(Site.objects.first().collaborators, [])
@@ -1503,6 +1582,71 @@ class QuerySetTest(unittest.TestCase):
self.assertRaises(InvalidQueryError, pull_all)
def test_pull_from_nested_embedded(self):
class User(EmbeddedDocument):
name = StringField()
def __unicode__(self):
return '%s' % self.name
class Collaborator(EmbeddedDocument):
helpful = ListField(EmbeddedDocumentField(User))
unhelpful = ListField(EmbeddedDocumentField(User))
class Site(Document):
name = StringField(max_length=75, unique=True, required=True)
collaborators = EmbeddedDocumentField(Collaborator)
Site.drop_collection()
c = User(name='Esteban')
f = User(name='Frank')
s = Site(name="test", collaborators=Collaborator(helpful=[c], unhelpful=[f])).save()
Site.objects(id=s.id).update_one(pull__collaborators__helpful=c)
self.assertEqual(Site.objects.first().collaborators['helpful'], [])
Site.objects(id=s.id).update_one(pull__collaborators__unhelpful={'name': 'Frank'})
self.assertEqual(Site.objects.first().collaborators['unhelpful'], [])
def pull_all():
Site.objects(id=s.id).update_one(pull_all__collaborators__helpful__name=['Ross'])
self.assertRaises(InvalidQueryError, pull_all)
def test_pull_from_nested_mapfield(self):
class Collaborator(EmbeddedDocument):
user = StringField()
def __unicode__(self):
return '%s' % self.user
class Site(Document):
name = StringField(max_length=75, unique=True, required=True)
collaborators = MapField(ListField(EmbeddedDocumentField(Collaborator)))
Site.drop_collection()
c = Collaborator(user='Esteban')
f = Collaborator(user='Frank')
s = Site(name="test", collaborators={'helpful':[c],'unhelpful':[f]})
s.save()
Site.objects(id=s.id).update_one(pull__collaborators__helpful__user='Esteban')
self.assertEqual(Site.objects.first().collaborators['helpful'], [])
Site.objects(id=s.id).update_one(pull__collaborators__unhelpful={'user':'Frank'})
self.assertEqual(Site.objects.first().collaborators['unhelpful'], [])
def pull_all():
Site.objects(id=s.id).update_one(pull_all__collaborators__helpful__user=['Ross'])
self.assertRaises(InvalidQueryError, pull_all)
def test_update_one_pop_generic_reference(self):
class BlogTag(Document):
@@ -1596,6 +1740,32 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(message.authors[1].name, "Ross")
self.assertEqual(message.authors[2].name, "Adam")
def test_reload_embedded_docs_instance(self):
class SubDoc(EmbeddedDocument):
val = IntField()
class Doc(Document):
embedded = EmbeddedDocumentField(SubDoc)
doc = Doc(embedded=SubDoc(val=0)).save()
doc.reload()
self.assertEqual(doc.pk, doc.embedded._instance.pk)
def test_reload_list_embedded_docs_instance(self):
class SubDoc(EmbeddedDocument):
val = IntField()
class Doc(Document):
embedded = ListField(EmbeddedDocumentField(SubDoc))
doc = Doc(embedded=[SubDoc(val=0)]).save()
doc.reload()
self.assertEqual(doc.pk, doc.embedded[0]._instance.pk)
def test_order_by(self):
"""Ensure that QuerySets may be ordered.
"""
@@ -1816,6 +1986,140 @@ class QuerySetTest(unittest.TestCase):
BlogPost.drop_collection()
def test_map_reduce_custom_output(self):
"""
Test map/reduce custom output
"""
register_connection('test2', 'mongoenginetest2')
class Family(Document):
id = IntField(
primary_key=True)
log = StringField()
class Person(Document):
id = IntField(
primary_key=True)
name = StringField()
age = IntField()
family = ReferenceField(Family)
Family.drop_collection()
Person.drop_collection()
# creating first family
f1 = Family(id=1, log="Trav 02 de Julho")
f1.save()
# persons of first family
Person(id=1, family=f1, name=u"Wilson Jr", age=21).save()
Person(id=2, family=f1, name=u"Wilson Father", age=45).save()
Person(id=3, family=f1, name=u"Eliana Costa", age=40).save()
Person(id=4, family=f1, name=u"Tayza Mariana", age=17).save()
# creating second family
f2 = Family(id=2, log="Av prof frasc brunno")
f2.save()
#persons of second family
Person(id=5, family=f2, name="Isabella Luanna", age=16).save()
Person(id=6, family=f2, name="Sandra Mara", age=36).save()
Person(id=7, family=f2, name="Igor Gabriel", age=10).save()
# creating third family
f3 = Family(id=3, log="Av brazil")
f3.save()
#persons of thrird family
Person(id=8, family=f3, name="Arthur WA", age=30).save()
Person(id=9, family=f3, name="Paula Leonel", age=25).save()
# executing join map/reduce
map_person = """
function () {
emit(this.family, {
totalAge: this.age,
persons: [{
name: this.name,
age: this.age
}]});
}
"""
map_family = """
function () {
emit(this._id, {
totalAge: 0,
persons: []
});
}
"""
reduce_f = """
function (key, values) {
var family = {persons: [], totalAge: 0};
values.forEach(function(value) {
if (value.persons) {
value.persons.forEach(function (person) {
family.persons.push(person);
family.totalAge += person.age;
});
}
});
return family;
}
"""
cursor = Family.objects.map_reduce(
map_f=map_family,
reduce_f=reduce_f,
output={'replace': 'family_map', 'db_alias': 'test2'})
# start a map/reduce
cursor.next()
results = Person.objects.map_reduce(
map_f=map_person,
reduce_f=reduce_f,
output={'reduce': 'family_map', 'db_alias': 'test2'})
results = list(results)
collection = get_db('test2').family_map
self.assertEqual(
collection.find_one({'_id': 1}), {
'_id': 1,
'value': {
'persons': [
{'age': 21, 'name': u'Wilson Jr'},
{'age': 45, 'name': u'Wilson Father'},
{'age': 40, 'name': u'Eliana Costa'},
{'age': 17, 'name': u'Tayza Mariana'}],
'totalAge': 123}
})
self.assertEqual(
collection.find_one({'_id': 2}), {
'_id': 2,
'value': {
'persons': [
{'age': 16, 'name': u'Isabella Luanna'},
{'age': 36, 'name': u'Sandra Mara'},
{'age': 10, 'name': u'Igor Gabriel'}],
'totalAge': 62}
})
self.assertEqual(
collection.find_one({'_id': 3}), {
'_id': 3,
'value': {
'persons': [
{'age': 30, 'name': u'Arthur WA'},
{'age': 25, 'name': u'Paula Leonel'}],
'totalAge': 55}
})
def test_map_reduce_finalize(self):
"""Ensure that map, reduce, and finalize run and introduce "scope"
by simulating "hotness" ranking with Reddit algorithm.
@@ -2165,6 +2469,19 @@ class QuerySetTest(unittest.TestCase):
self.Person(name='ageless person').save()
self.assertEqual(int(self.Person.objects.average('age')), avg)
# dot notation
self.Person(name='person meta', person_meta=self.PersonMeta(weight=0)).save()
self.assertAlmostEqual(int(self.Person.objects.average('person_meta.weight')), 0)
for i, weight in enumerate(ages):
self.Person(name='test meta%i', person_meta=self.PersonMeta(weight=weight)).save()
self.assertAlmostEqual(int(self.Person.objects.average('person_meta.weight')), avg)
self.Person(name='test meta none').save()
self.assertEqual(int(self.Person.objects.average('person_meta.weight')), avg)
def test_sum(self):
"""Ensure that field can be summed over correctly.
"""
@@ -2177,6 +2494,153 @@ class QuerySetTest(unittest.TestCase):
self.Person(name='ageless person').save()
self.assertEqual(int(self.Person.objects.sum('age')), sum(ages))
for i, age in enumerate(ages):
self.Person(name='test meta%s' % i, person_meta=self.PersonMeta(weight=age)).save()
self.assertEqual(int(self.Person.objects.sum('person_meta.weight')), sum(ages))
self.Person(name='weightless person').save()
self.assertEqual(int(self.Person.objects.sum('age')), sum(ages))
def test_embedded_average(self):
class Pay(EmbeddedDocument):
value = DecimalField()
class Doc(Document):
name = StringField()
pay = EmbeddedDocumentField(
Pay)
Doc.drop_collection()
Doc(name=u"Wilson Junior",
pay=Pay(value=150)).save()
Doc(name=u"Isabella Luanna",
pay=Pay(value=530)).save()
Doc(name=u"Tayza mariana",
pay=Pay(value=165)).save()
Doc(name=u"Eliana Costa",
pay=Pay(value=115)).save()
self.assertEqual(
Doc.objects.average('pay.value'),
240)
def test_embedded_array_average(self):
class Pay(EmbeddedDocument):
values = ListField(DecimalField())
class Doc(Document):
name = StringField()
pay = EmbeddedDocumentField(
Pay)
Doc.drop_collection()
Doc(name=u"Wilson Junior",
pay=Pay(values=[150, 100])).save()
Doc(name=u"Isabella Luanna",
pay=Pay(values=[530, 100])).save()
Doc(name=u"Tayza mariana",
pay=Pay(values=[165, 100])).save()
Doc(name=u"Eliana Costa",
pay=Pay(values=[115, 100])).save()
self.assertEqual(
Doc.objects.average('pay.values'),
170)
def test_array_average(self):
class Doc(Document):
values = ListField(DecimalField())
Doc.drop_collection()
Doc(values=[150, 100]).save()
Doc(values=[530, 100]).save()
Doc(values=[165, 100]).save()
Doc(values=[115, 100]).save()
self.assertEqual(
Doc.objects.average('values'),
170)
def test_embedded_sum(self):
class Pay(EmbeddedDocument):
value = DecimalField()
class Doc(Document):
name = StringField()
pay = EmbeddedDocumentField(
Pay)
Doc.drop_collection()
Doc(name=u"Wilson Junior",
pay=Pay(value=150)).save()
Doc(name=u"Isabella Luanna",
pay=Pay(value=530)).save()
Doc(name=u"Tayza mariana",
pay=Pay(value=165)).save()
Doc(name=u"Eliana Costa",
pay=Pay(value=115)).save()
self.assertEqual(
Doc.objects.sum('pay.value'),
960)
def test_embedded_array_sum(self):
class Pay(EmbeddedDocument):
values = ListField(DecimalField())
class Doc(Document):
name = StringField()
pay = EmbeddedDocumentField(
Pay)
Doc.drop_collection()
Doc(name=u"Wilson Junior",
pay=Pay(values=[150, 100])).save()
Doc(name=u"Isabella Luanna",
pay=Pay(values=[530, 100])).save()
Doc(name=u"Tayza mariana",
pay=Pay(values=[165, 100])).save()
Doc(name=u"Eliana Costa",
pay=Pay(values=[115, 100])).save()
self.assertEqual(
Doc.objects.sum('pay.values'),
1360)
def test_array_sum(self):
class Doc(Document):
values = ListField(DecimalField())
Doc.drop_collection()
Doc(values=[150, 100]).save()
Doc(values=[530, 100]).save()
Doc(values=[165, 100]).save()
Doc(values=[115, 100]).save()
self.assertEqual(
Doc.objects.sum('values'),
1360)
def test_distinct(self):
"""Ensure that the QuerySet.distinct method works.
"""
@@ -2250,6 +2714,27 @@ class QuerySetTest(unittest.TestCase):
Product.drop_collection()
def test_distinct_ListField_EmbeddedDocumentField(self):
class Author(EmbeddedDocument):
name = StringField()
class Book(Document):
title = StringField()
authors = ListField(EmbeddedDocumentField(Author))
Book.drop_collection()
mark_twain = Author(name="Mark Twain")
john_tolkien = Author(name="John Ronald Reuel Tolkien")
book = Book(title="Tom Sawyer", authors=[mark_twain]).save()
book = Book(title="The Lord of the Rings", authors=[john_tolkien]).save()
book = Book(title="The Stories", authors=[mark_twain, john_tolkien]).save()
authors = Book.objects.distinct("authors")
self.assertEqual(authors, [mark_twain, john_tolkien])
def test_custom_manager(self):
"""Ensure that custom QuerySetManager instances work as expected.
"""
@@ -2592,6 +3077,19 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual(10, Post.objects.limit(5).skip(5).count(with_limit_and_skip=False))
def test_count_and_none(self):
"""Test count works with None()"""
class MyDoc(Document):
pass
MyDoc.drop_collection()
for i in xrange(0, 10):
MyDoc().save()
self.assertEqual(MyDoc.objects.count(), 10)
self.assertEqual(MyDoc.objects.none().count(), 0)
def test_call_after_limits_set(self):
"""Ensure that re-filtering after slicing works
"""
@@ -2654,6 +3152,23 @@ class QuerySetTest(unittest.TestCase):
Number.drop_collection()
def test_using(self):
"""Ensure that switching databases for a queryset is possible
"""
class Number2(Document):
n = IntField()
Number2.drop_collection()
with switch_db(Number2, 'test2') as Number2:
Number2.drop_collection()
for i in range(1, 10):
t = Number2(n=i)
t.switch_db('test2')
t.save()
self.assertEqual(len(Number2.objects.using('test2')), 9)
def test_unset_reference(self):
class Comment(Document):
text = StringField()
@@ -3048,7 +3563,7 @@ class QuerySetTest(unittest.TestCase):
class Foo(EmbeddedDocument):
shape = StringField()
color = StringField()
trick = BooleanField()
thick = BooleanField()
meta = {'allow_inheritance': False}
class Bar(Document):
@@ -3057,17 +3572,20 @@ class QuerySetTest(unittest.TestCase):
Bar.drop_collection()
b1 = Bar(foo=[Foo(shape= "square", color ="purple", thick = False),
Foo(shape= "circle", color ="red", thick = True)])
b1 = Bar(foo=[Foo(shape="square", color="purple", thick=False),
Foo(shape="circle", color="red", thick=True)])
b1.save()
b2 = Bar(foo=[Foo(shape= "square", color ="red", thick = True),
Foo(shape= "circle", color ="purple", thick = False)])
b2 = Bar(foo=[Foo(shape="square", color="red", thick=True),
Foo(shape="circle", color="purple", thick=False)])
b2.save()
ak = list(Bar.objects(foo__match={'shape': "square", "color": "purple"}))
self.assertEqual([b1], ak)
ak = list(Bar.objects(foo__match=Foo(shape="square", color="purple")))
self.assertEqual([b1], ak)
def test_upsert_includes_cls(self):
"""Upserts should include _cls information for inheritable classes
"""
@@ -3088,6 +3606,24 @@ class QuerySetTest(unittest.TestCase):
Test.objects(test='foo').update_one(upsert=True, set__test='foo')
self.assertTrue('_cls' in Test._collection.find_one())
def test_update_upsert_looks_like_a_digit(self):
class MyDoc(DynamicDocument):
pass
MyDoc.drop_collection()
self.assertEqual(1, MyDoc.objects.update_one(upsert=True, inc__47=1))
self.assertEqual(MyDoc.objects.get()['47'], 1)
def test_dictfield_key_looks_like_a_digit(self):
"""Only should work with DictField even if they have numeric keys."""
class MyDoc(Document):
test = DictField()
MyDoc.drop_collection()
doc = MyDoc(test={'47': 1})
doc.save()
self.assertEqual(MyDoc.objects.only('test__47').get().test['47'], 1)
def test_read_preference(self):
class Bar(Document):
pass
@@ -3116,7 +3652,7 @@ class QuerySetTest(unittest.TestCase):
Doc(string="Bye", embedded_field=Embedded(string="Bye")).save()
Doc().save()
json_data = Doc.objects.to_json()
json_data = Doc.objects.to_json(sort_keys=True, separators=(',', ':'))
doc_objects = list(Doc.objects)
self.assertEqual(doc_objects, Doc.objects.from_json(json_data))
@@ -3181,6 +3717,9 @@ class QuerySetTest(unittest.TestCase):
User(name="Bob Dole", age=89, price=Decimal('1.11')).save()
User(name="Barack Obama", age=51, price=Decimal('2.22')).save()
results = User.objects.only('id', 'name').as_pymongo()
self.assertEqual(sorted(results[0].keys()), sorted(['_id', 'name']))
users = User.objects.only('name', 'price').as_pymongo()
results = list(users)
self.assertTrue(isinstance(results[0], dict))
@@ -3241,6 +3780,8 @@ class QuerySetTest(unittest.TestCase):
self.assertTrue(isinstance(qs.first().organization, Organization))
self.assertFalse(isinstance(qs.no_dereference().first().organization,
Organization))
self.assertFalse(isinstance(qs.no_dereference().get().organization,
Organization))
self.assertTrue(isinstance(qs.first().organization, Organization))
def test_cached_queryset(self):
@@ -3257,7 +3798,13 @@ class QuerySetTest(unittest.TestCase):
[x for x in people]
self.assertEqual(100, len(people._result_cache))
self.assertEqual(None, people._len)
import platform
if platform.python_implementation() != "PyPy":
# PyPy evaluates __len__ when iterating with list comprehensions while CPython does not.
# This may be a bug in PyPy (PyPy/#1802) but it does not affect the behavior of MongoEngine.
self.assertEqual(None, people._len)
self.assertEqual(q, 1)
list(people)
@@ -3267,6 +3814,27 @@ class QuerySetTest(unittest.TestCase):
people.count() # count is cached
self.assertEqual(q, 1)
def test_no_cached_queryset(self):
class Person(Document):
name = StringField()
Person.drop_collection()
for i in xrange(100):
Person(name="No: %s" % i).save()
with query_counter() as q:
self.assertEqual(q, 0)
people = Person.objects.no_cache()
[x for x in people]
self.assertEqual(q, 1)
list(people)
self.assertEqual(q, 2)
people.count()
self.assertEqual(q, 3)
def test_cache_not_cloned(self):
class User(Document):
@@ -3288,6 +3856,34 @@ class QuerySetTest(unittest.TestCase):
self.assertEqual("%s" % users, "[<User: Bob>]")
self.assertEqual(1, len(users._result_cache))
def test_no_cache(self):
"""Ensure you can add meta data to file"""
class Noddy(Document):
fields = DictField()
Noddy.drop_collection()
for i in xrange(100):
noddy = Noddy()
for j in range(20):
noddy.fields["key"+str(j)] = "value "+str(j)
noddy.save()
docs = Noddy.objects.no_cache()
counter = len([1 for i in docs])
self.assertEqual(counter, 100)
self.assertEqual(len(list(docs)), 100)
self.assertRaises(TypeError, lambda: len(docs))
with query_counter() as q:
self.assertEqual(q, 0)
list(docs)
self.assertEqual(q, 1)
list(docs)
self.assertEqual(q, 2)
def test_nested_queryset_iterator(self):
# Try iterating the same queryset twice, nested.
names = ['Alice', 'Bob', 'Chuck', 'David', 'Eric', 'Francis', 'George']
@@ -3419,6 +4015,128 @@ class QuerySetTest(unittest.TestCase):
'_cls': 'Animal.Cat'
})
def test_can_have_field_same_name_as_query_operator(self):
class Size(Document):
name = StringField()
class Example(Document):
size = ReferenceField(Size)
Size.drop_collection()
Example.drop_collection()
instance_size = Size(name="Large").save()
Example(size=instance_size).save()
self.assertEqual(Example.objects(size=instance_size).count(), 1)
self.assertEqual(Example.objects(size__in=[instance_size]).count(), 1)
def test_cursor_in_an_if_stmt(self):
class Test(Document):
test_field = StringField()
Test.drop_collection()
queryset = Test.objects
if queryset:
raise AssertionError('Empty cursor returns True')
test = Test()
test.test_field = 'test'
test.save()
queryset = Test.objects
if not test:
raise AssertionError('Cursor has data and returned False')
queryset.next()
if not queryset:
raise AssertionError('Cursor has data and it must returns True,'
' even in the last item.')
def test_bool_performance(self):
class Person(Document):
name = StringField()
Person.drop_collection()
for i in xrange(100):
Person(name="No: %s" % i).save()
with query_counter() as q:
if Person.objects:
pass
self.assertEqual(q, 1)
op = q.db.system.profile.find({"ns":
{"$ne": "%s.system.indexes" % q.db.name}})[0]
self.assertEqual(op['nreturned'], 1)
def test_bool_with_ordering(self):
class Person(Document):
name = StringField()
Person.drop_collection()
Person(name="Test").save()
qs = Person.objects.order_by('name')
with query_counter() as q:
if qs:
pass
op = q.db.system.profile.find({"ns":
{"$ne": "%s.system.indexes" % q.db.name}})[0]
self.assertFalse('$orderby' in op['query'],
'BaseQuerySet cannot use orderby in if stmt')
with query_counter() as p:
for x in qs:
pass
op = p.db.system.profile.find({"ns":
{"$ne": "%s.system.indexes" % q.db.name}})[0]
self.assertTrue('$orderby' in op['query'],
'BaseQuerySet cannot remove orderby in for loop')
def test_bool_with_ordering_from_meta_dict(self):
class Person(Document):
name = StringField()
meta = {
'ordering': ['name']
}
Person.drop_collection()
Person(name="B").save()
Person(name="C").save()
Person(name="A").save()
with query_counter() as q:
if Person.objects:
pass
op = q.db.system.profile.find({"ns":
{"$ne": "%s.system.indexes" % q.db.name}})[0]
self.assertFalse('$orderby' in op['query'],
'BaseQuerySet must remove orderby from meta in boolen test')
self.assertEqual(Person.objects.first().name, 'A')
self.assertTrue(Person.objects._has_data(),
'Cursor has data and returned False')
if __name__ == '__main__':
unittest.main()

View File

@@ -31,6 +31,31 @@ class TransformTest(unittest.TestCase):
self.assertEqual(transform.query(name__exists=True),
{'name': {'$exists': True}})
def test_transform_update(self):
class DicDoc(Document):
dictField = DictField()
class Doc(Document):
pass
DicDoc.drop_collection()
Doc.drop_collection()
doc = Doc().save()
dic_doc = DicDoc().save()
for k, v in (("set", "$set"), ("set_on_insert", "$setOnInsert"), ("push", "$push")):
update = transform.update(DicDoc, **{"%s__dictField__test" % k: doc})
self.assertTrue(isinstance(update[v]["dictField.test"], dict))
# Update special cases
update = transform.update(DicDoc, unset__dictField__test=doc)
self.assertEqual(update["$unset"]["dictField.test"], 1)
update = transform.update(DicDoc, pull__dictField__test=doc)
self.assertTrue(isinstance(update["$pull"]["dictField"]["test"], dict))
def test_query_field_name(self):
"""Ensure that the correct field name is used when querying.
"""
@@ -142,6 +167,35 @@ class TransformTest(unittest.TestCase):
{'attachments.views.extracted': 'no'}]}
self.assertEqual(expected, raw_query)
def test_geojson_PointField(self):
class Location(Document):
loc = PointField()
update = transform.update(Location, set__loc=[1, 2])
self.assertEqual(update, {'$set': {'loc': {"type": "Point", "coordinates": [1,2]}}})
update = transform.update(Location, set__loc={"type": "Point", "coordinates": [1,2]})
self.assertEqual(update, {'$set': {'loc': {"type": "Point", "coordinates": [1,2]}}})
def test_geojson_LineStringField(self):
class Location(Document):
line = LineStringField()
update = transform.update(Location, set__line=[[1, 2], [2, 2]])
self.assertEqual(update, {'$set': {'line': {"type": "LineString", "coordinates": [[1, 2], [2, 2]]}}})
update = transform.update(Location, set__line={"type": "LineString", "coordinates": [[1, 2], [2, 2]]})
self.assertEqual(update, {'$set': {'line': {"type": "LineString", "coordinates": [[1, 2], [2, 2]]}}})
def test_geojson_PolygonField(self):
class Location(Document):
poly = PolygonField()
update = transform.update(Location, set__poly=[[[40, 5], [40, 6], [41, 6], [40, 5]]])
self.assertEqual(update, {'$set': {'poly': {"type": "Polygon", "coordinates": [[[40, 5], [40, 6], [41, 6], [40, 5]]]}}})
update = transform.update(Location, set__poly={"type": "Polygon", "coordinates": [[[40, 5], [40, 6], [41, 6], [40, 5]]]})
self.assertEqual(update, {'$set': {'poly': {"type": "Polygon", "coordinates": [[[40, 5], [40, 6], [41, 6], [40, 5]]]}}})
if __name__ == '__main__':
unittest.main()

View File

@@ -1,6 +1,11 @@
import sys
sys.path[0:0] = [""]
import unittest
try:
import unittest2 as unittest
except ImportError:
import unittest
import datetime
import pymongo
@@ -34,6 +39,17 @@ class ConnectionTest(unittest.TestCase):
conn = get_connection('testdb')
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
def test_sharing_connections(self):
"""Ensure that connections are shared when the connection settings are exactly the same
"""
connect('mongoenginetest', alias='testdb1')
expected_connection = get_connection('testdb1')
connect('mongoenginetest', alias='testdb2')
actual_connection = get_connection('testdb2')
self.assertIs(expected_connection, actual_connection)
def test_connect_uri(self):
"""Ensure that the connect() method works properly with uri's
"""
@@ -56,6 +72,35 @@ class ConnectionTest(unittest.TestCase):
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
def test_connect_uri_without_db(self):
"""Ensure that the connect() method works properly with uri's
without database_name
"""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
c.admin.add_user("admin", "password")
c.admin.authenticate("admin", "password")
c.mongoenginetest.add_user("username", "password")
self.assertRaises(ConnectionError, connect, "testdb_uri_bad", host='mongodb://test:password@localhost')
connect("mongoenginetest", host='mongodb://localhost/')
conn = get_connection()
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
db = get_db()
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
def test_register_connection(self):
"""Ensure that connections with different aliases may be registered.
"""
@@ -69,6 +114,14 @@ class ConnectionTest(unittest.TestCase):
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest2')
def test_register_connection_defaults(self):
"""Ensure that defaults are used when the host and port are None.
"""
register_connection('testdb', 'mongoenginetest', host=None, port=None)
conn = get_connection('testdb')
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
def test_connection_kwargs(self):
"""Ensure that connection kwargs get passed to pymongo.
"""

View File

@@ -0,0 +1,107 @@
import unittest
from mongoengine.base.datastructures import StrictDict, SemiStrictDict
class TestStrictDict(unittest.TestCase):
def strict_dict_class(self, *args, **kwargs):
return StrictDict.create(*args, **kwargs)
def setUp(self):
self.dtype = self.strict_dict_class(("a", "b", "c"))
def test_init(self):
d = self.dtype(a=1, b=1, c=1)
self.assertEqual((d.a, d.b, d.c), (1, 1, 1))
def test_init_fails_on_nonexisting_attrs(self):
self.assertRaises(AttributeError, lambda: self.dtype(a=1, b=2, d=3))
def test_eq(self):
d = self.dtype(a=1, b=1, c=1)
dd = self.dtype(a=1, b=1, c=1)
e = self.dtype(a=1, b=1, c=3)
f = self.dtype(a=1, b=1)
g = self.strict_dict_class(("a", "b", "c", "d"))(a=1, b=1, c=1, d=1)
h = self.strict_dict_class(("a", "c", "b"))(a=1, b=1, c=1)
i = self.strict_dict_class(("a", "c", "b"))(a=1, b=1, c=2)
self.assertEqual(d, dd)
self.assertNotEqual(d, e)
self.assertNotEqual(d, f)
self.assertNotEqual(d, g)
self.assertNotEqual(f, d)
self.assertEqual(d, h)
self.assertNotEqual(d, i)
def test_setattr_getattr(self):
d = self.dtype()
d.a = 1
self.assertEqual(d.a, 1)
self.assertRaises(AttributeError, lambda: d.b)
def test_setattr_raises_on_nonexisting_attr(self):
d = self.dtype()
def _f():
d.x=1
self.assertRaises(AttributeError, _f)
def test_setattr_getattr_special(self):
d = self.strict_dict_class(["items"])
d.items = 1
self.assertEqual(d.items, 1)
def test_get(self):
d = self.dtype(a=1)
self.assertEqual(d.get('a'), 1)
self.assertEqual(d.get('b', 'bla'), 'bla')
def test_items(self):
d = self.dtype(a=1)
self.assertEqual(d.items(), [('a', 1)])
d = self.dtype(a=1, b=2)
self.assertEqual(d.items(), [('a', 1), ('b', 2)])
def test_mappings_protocol(self):
d = self.dtype(a=1, b=2)
assert dict(d) == {'a': 1, 'b': 2}
assert dict(**d) == {'a': 1, 'b': 2}
class TestSemiSrictDict(TestStrictDict):
def strict_dict_class(self, *args, **kwargs):
return SemiStrictDict.create(*args, **kwargs)
def test_init_fails_on_nonexisting_attrs(self):
# disable irrelevant test
pass
def test_setattr_raises_on_nonexisting_attr(self):
# disable irrelevant test
pass
def test_setattr_getattr_nonexisting_attr_succeeds(self):
d = self.dtype()
d.x = 1
self.assertEqual(d.x, 1)
def test_init_succeeds_with_nonexisting_attrs(self):
d = self.dtype(a=1, b=1, c=1, x=2)
self.assertEqual((d.a, d.b, d.c, d.x), (1, 1, 1, 2))
def test_iter_with_nonexisting_attrs(self):
d = self.dtype(a=1, b=1, c=1, x=2)
self.assertEqual(list(d), ['a', 'b', 'c', 'x'])
def test_iteritems_with_nonexisting_attrs(self):
d = self.dtype(a=1, b=1, c=1, x=2)
self.assertEqual(list(d.iteritems()), [('a', 1), ('b', 1), ('c', 1), ('x', 2)])
def tets_cmp_with_strict_dicts(self):
d = self.dtype(a=1, b=1, c=1)
dd = StrictDict.create(("a", "b", "c"))(a=1, b=1, c=1)
self.assertEqual(d, dd)
def test_cmp_with_strict_dict_with_nonexisting_attrs(self):
d = self.dtype(a=1, b=1, c=1, x=2)
dd = StrictDict.create(("a", "b", "c", "x"))(a=1, b=1, c=1, x=2)
self.assertEqual(d, dd)
if __name__ == '__main__':
unittest.main()

View File

@@ -1145,37 +1145,32 @@ class FieldTest(unittest.TestCase):
self.assertEqual(q, 2)
def test_tuples_as_tuples(self):
"""
Ensure that tuples remain tuples when they are
inside a ComplexBaseField
"""
from mongoengine.base import BaseField
def test_objectid_reference_across_databases(self):
# mongoenginetest - Is default connection alias from setUp()
# Register Aliases
register_connection('testdb-1', 'mongoenginetest2')
class EnumField(BaseField):
class User(Document):
name = StringField()
meta = {"db_alias": "testdb-1"}
def __init__(self, **kwargs):
super(EnumField, self).__init__(**kwargs)
class Book(Document):
name = StringField()
author = ReferenceField(User)
def to_mongo(self, value):
return value
# Drops
User.drop_collection()
Book.drop_collection()
def to_python(self, value):
return tuple(value)
user = User(name="Ross").save()
Book(name="MongoEngine for pros", author=user).save()
class TestDoc(Document):
items = ListField(EnumField())
# Can't use query_counter across databases - so test the _data object
book = Book.objects.first()
self.assertFalse(isinstance(book._data['author'], User))
TestDoc.drop_collection()
tuples = [(100, 'Testing')]
doc = TestDoc()
doc.items = tuples
doc.save()
x = TestDoc.objects().get()
self.assertTrue(x is not None)
self.assertTrue(len(x.items) == 1)
self.assertTrue(tuple(x.items[0]) in tuples)
self.assertTrue(x.items[0] in tuples)
book.select_related()
self.assertTrue(isinstance(book._data['author'], User))
def test_non_ascii_pk(self):
"""
@@ -1200,6 +1195,30 @@ class FieldTest(unittest.TestCase):
self.assertEqual(2, len([brand for bg in brand_groups for brand in bg.brands]))
def test_dereferencing_embedded_listfield_referencefield(self):
class Tag(Document):
meta = {'collection': 'tags'}
name = StringField()
class Post(EmbeddedDocument):
body = StringField()
tags = ListField(ReferenceField("Tag", dbref=True))
class Page(Document):
meta = {'collection': 'pages'}
tags = ListField(ReferenceField("Tag", dbref=True))
posts = ListField(EmbeddedDocumentField(Post))
Tag.drop_collection()
Page.drop_collection()
tag = Tag(name='test').save()
post = Post(body='test body', tags=[tag])
Page(tags=[tag], posts=[post]).save()
page = Page.objects.first()
self.assertEqual(page.tags[0], page.posts[0].tags[0])
if __name__ == '__main__':
unittest.main()

View File

@@ -2,48 +2,45 @@ import sys
sys.path[0:0] = [""]
import unittest
from nose.plugins.skip import SkipTest
from mongoengine.python_support import PY3
from mongoengine import *
from mongoengine.django.shortcuts import get_document_or_404
from django.http import Http404
from django.template import Context, Template
from django.conf import settings
from django.core.paginator import Paginator
settings.configure(
USE_TZ=True,
INSTALLED_APPS=('django.contrib.auth', 'mongoengine.django.mongo_auth'),
AUTH_USER_MODEL=('mongo_auth.MongoUser'),
AUTHENTICATION_BACKENDS = ('mongoengine.django.auth.MongoEngineBackend',)
)
try:
from mongoengine.django.shortcuts import get_document_or_404
from django.http import Http404
from django.template import Context, Template
from django.conf import settings
from django.core.paginator import Paginator
settings.configure(
USE_TZ=True,
INSTALLED_APPS=('django.contrib.auth', 'mongoengine.django.mongo_auth'),
AUTH_USER_MODEL=('mongo_auth.MongoUser'),
from django.contrib.auth import authenticate, get_user_model
from mongoengine.django.auth import User
from mongoengine.django.mongo_auth.models import (
MongoUser,
MongoUserManager,
get_user_document,
)
try:
from django.contrib.auth import authenticate, get_user_model
from mongoengine.django.auth import User
from mongoengine.django.mongo_auth.models import MongoUser, MongoUserManager
DJ15 = True
except Exception:
DJ15 = False
from django.contrib.sessions.tests import SessionTestsMixin
from mongoengine.django.sessions import SessionStore, MongoSession
except Exception, err:
if PY3:
SessionTestsMixin = type # dummy value so no error
SessionStore = None # dummy value so no error
else:
raise err
DJ15 = True
except Exception:
DJ15 = False
from django.contrib.sessions.tests import SessionTestsMixin
from mongoengine.django.sessions import SessionStore, MongoSession
from datetime import tzinfo, timedelta
ZERO = timedelta(0)
class FixedOffset(tzinfo):
"""Fixed offset in minutes east from UTC."""
def __init__(self, offset, name):
self.__offset = timedelta(minutes = offset)
self.__offset = timedelta(minutes=offset)
self.__name = name
def utcoffset(self, dt):
@@ -70,8 +67,6 @@ def activate_timezone(tz):
class QuerySetTest(unittest.TestCase):
def setUp(self):
if PY3:
raise SkipTest('django does not have Python 3 support')
connect(db='mongoenginetest')
class Person(Document):
@@ -173,6 +168,8 @@ class QuerySetTest(unittest.TestCase):
class Note(Document):
text = StringField()
Note.drop_collection()
for i in xrange(1, 101):
Note(name="Note: %s" % i).save()
@@ -223,8 +220,6 @@ class MongoDBSessionTest(SessionTestsMixin, unittest.TestCase):
backend = SessionStore
def setUp(self):
if PY3:
raise SkipTest('django does not have Python 3 support')
connect(db='mongoenginetest')
MongoSession.drop_collection()
super(MongoDBSessionTest, self).setUp()
@@ -262,17 +257,18 @@ class MongoAuthTest(unittest.TestCase):
}
def setUp(self):
if PY3:
raise SkipTest('django does not have Python 3 support')
if not DJ15:
raise SkipTest('mongo_auth requires Django 1.5')
connect(db='mongoenginetest')
User.drop_collection()
super(MongoAuthTest, self).setUp()
def test_user_model(self):
def test_get_user_model(self):
self.assertEqual(get_user_model(), MongoUser)
def test_get_user_document(self):
self.assertEqual(get_user_document(), User)
def test_user_manager(self):
manager = get_user_model()._default_manager
self.assertTrue(isinstance(manager, MongoUserManager))

View File

@@ -54,7 +54,9 @@ class SignalTests(unittest.TestCase):
@classmethod
def post_save(cls, sender, document, **kwargs):
dirty_keys = document._delta()[0].keys() + document._delta()[1].keys()
signal_output.append('post_save signal, %s' % document)
signal_output.append('post_save dirty keys, %s' % dirty_keys)
if 'created' in kwargs:
if kwargs['created']:
signal_output.append('Is created')
@@ -203,6 +205,7 @@ class SignalTests(unittest.TestCase):
"pre_save_post_validation signal, Bill Shakespeare",
"Is created",
"post_save signal, Bill Shakespeare",
"post_save dirty keys, ['name']",
"Is created"
])
@@ -213,6 +216,7 @@ class SignalTests(unittest.TestCase):
"pre_save_post_validation signal, William Shakespeare",
"Is updated",
"post_save signal, William Shakespeare",
"post_save dirty keys, ['name']",
"Is updated"
])