Merge branch 'master' of github.com:MongoEngine/mongoengine into py2py3_improve_compat

This commit is contained in:
Bastien Gérard
2020-03-08 14:44:39 +01:00
23 changed files with 534 additions and 302 deletions

View File

@@ -41,7 +41,7 @@ from tests.utils import MongoDBTestCase, get_as_pymongo
TEST_IMAGE_PATH = os.path.join(os.path.dirname(__file__), "../fields/mongoengine.png")
class TestInstance(MongoDBTestCase):
class TestDocumentInstance(MongoDBTestCase):
def setUp(self):
class Job(EmbeddedDocument):
name = StringField()
@@ -3319,6 +3319,39 @@ class TestInstance(MongoDBTestCase):
f1.ref # Dereferences lazily
assert f1 == f2
def test_embedded_document_equality_with_lazy_ref(self):
class Job(EmbeddedDocument):
boss = LazyReferenceField("Person")
boss_dbref = LazyReferenceField("Person", dbref=True)
class Person(Document):
job = EmbeddedDocumentField(Job)
Person.drop_collection()
boss = Person()
worker = Person(job=Job(boss=boss, boss_dbref=boss))
boss.save()
worker.save()
worker1 = Person.objects.get(id=worker.id)
# worker1.job should be equal to the job used originally to create the
# document.
assert worker1.job == worker.job
# worker1.job should be equal to a newly created Job EmbeddedDocument
# using either the Boss object or his ID.
assert worker1.job == Job(boss=boss, boss_dbref=boss)
assert worker1.job == Job(boss=boss.id, boss_dbref=boss.id)
# The above equalities should also hold after worker1.job.boss has been
# fetch()ed.
worker1.job.boss.fetch()
assert worker1.job == worker.job
assert worker1.job == Job(boss=boss, boss_dbref=boss)
assert worker1.job == Job(boss=boss.id, boss_dbref=boss.id)
def test_dbref_equality(self):
class Test2(Document):
name = StringField()
@@ -3584,6 +3617,51 @@ class TestInstance(MongoDBTestCase):
assert b._instance == a
assert idx == 2
def test_updating_listfield_manipulate_list(self):
class Company(Document):
name = StringField()
employees = ListField(field=DictField())
Company.drop_collection()
comp = Company(name="BigBank", employees=[{"name": "John"}])
comp.save()
comp.employees.append({"name": "Bill"})
comp.save()
stored_comp = get_as_pymongo(comp)
self.assertEqual(
stored_comp,
{
"_id": comp.id,
"employees": [{"name": "John"}, {"name": "Bill"}],
"name": "BigBank",
},
)
comp = comp.reload()
comp.employees[0]["color"] = "red"
comp.employees[-1]["color"] = "blue"
comp.employees[-1].update({"size": "xl"})
comp.save()
assert len(comp.employees) == 2
assert comp.employees[0] == {"name": "John", "color": "red"}
assert comp.employees[1] == {"name": "Bill", "size": "xl", "color": "blue"}
stored_comp = get_as_pymongo(comp)
self.assertEqual(
stored_comp,
{
"_id": comp.id,
"employees": [
{"name": "John", "color": "red"},
{"size": "xl", "color": "blue", "name": "Bill"},
],
"name": "BigBank",
},
)
def test_falsey_pk(self):
"""Ensure that we can create and update a document with Falsey PK."""
@@ -3660,13 +3738,13 @@ class TestInstance(MongoDBTestCase):
value = u"I_should_be_a_dict"
coll.insert_one({"light_saber": value})
with self.assertRaises(InvalidDocumentError) as cm:
with pytest.raises(InvalidDocumentError) as exc_info:
list(Jedi.objects)
self.assertEqual(
str(cm.exception),
"Invalid data to create a `Jedi` instance.\nField 'light_saber' - The source SON object needs to be of type 'dict' but a '%s' was found"
% type(value),
assert str(
exc_info.value
) == "Invalid data to create a `Jedi` instance.\nField 'light_saber' - The source SON object needs to be of type 'dict' but a '%s' was found" % type(
value
)

View File

@@ -3,6 +3,7 @@ import pytest
from mongoengine import *
from mongoengine.base import BaseDict
from mongoengine.mongodb_support import MONGODB_36, get_mongodb_version
from tests.utils import MongoDBTestCase, get_as_pymongo
@@ -43,11 +44,7 @@ class TestDictField(MongoDBTestCase):
with pytest.raises(ValidationError):
post.validate()
post.info = {"the.title": "test"}
with pytest.raises(ValidationError):
post.validate()
post.info = {"nested": {"the.title": "test"}}
post.info = {"$title.test": "test"}
with pytest.raises(ValidationError):
post.validate()
@@ -55,6 +52,20 @@ class TestDictField(MongoDBTestCase):
with pytest.raises(ValidationError):
post.validate()
post.info = {"nested": {"the.title": "test"}}
if get_mongodb_version() < MONGODB_36:
with pytest.raises(ValidationError):
post.validate()
else:
post.validate()
post.info = {"dollar_and_dot": {"te$st.test": "test"}}
if get_mongodb_version() < MONGODB_36:
with pytest.raises(ValidationError):
post.validate()
else:
post.validate()
post.info = {"title": "test"}
post.save()

View File

@@ -151,7 +151,7 @@ class TestFileField(MongoDBTestCase):
result = StreamFile.objects.first()
assert streamfile == result
assert result.the_file.read() == text + more_text
# self.assertEqual(result.the_file.content_type, content_type)
# assert result.the_file.content_type == content_type
result.the_file.seek(0)
assert result.the_file.tell() == 0
assert result.the_file.read(len(text)) == text

View File

@@ -14,7 +14,7 @@ import six
from six import iteritems
from mongoengine import *
from mongoengine.connection import get_connection, get_db
from mongoengine.connection import get_db
from mongoengine.context_managers import query_counter, switch_db
from mongoengine.errors import InvalidQueryError
from mongoengine.mongodb_support import MONGODB_36, get_mongodb_version
@@ -4658,21 +4658,6 @@ class TestQueryset(unittest.TestCase):
)
assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED)
def test_read_preference_aggregation_framework(self):
class Bar(Document):
txt = StringField()
meta = {"indexes": ["txt"]}
# Aggregates with read_preference
bars = Bar.objects.read_preference(
ReadPreference.SECONDARY_PREFERRED
).aggregate()
assert (
bars._CommandCursor__collection.read_preference
== ReadPreference.SECONDARY_PREFERRED
)
def test_json_simple(self):
class Embedded(EmbeddedDocument):
string = StringField()
@@ -5399,225 +5384,6 @@ class TestQueryset(unittest.TestCase):
assert Person.objects.first().name == "A"
assert Person.objects._has_data(), "Cursor has data and returned False"
def test_queryset_aggregation_framework(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = Person.objects(age__lte=22).aggregate(
{"$project": {"name": {"$toUpper": "$name"}}}
)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p2.pk, "name": "WILSON JUNIOR"},
]
data = (
Person.objects(age__lte=22)
.order_by("-name")
.aggregate({"$project": {"name": {"$toUpper": "$name"}}})
)
assert list(data) == [
{"_id": p2.pk, "name": "WILSON JUNIOR"},
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
]
data = (
Person.objects(age__gte=17, age__lte=40)
.order_by("-age")
.aggregate(
{"$group": {"_id": None, "total": {"$sum": 1}, "avg": {"$avg": "$age"}}}
)
)
assert list(data) == [{"_id": None, "avg": 29, "total": 2}]
data = Person.objects().aggregate({"$match": {"name": "Isabella Luanna"}})
assert list(data) == [{u"_id": p1.pk, u"age": 16, u"name": u"Isabella Luanna"}]
def test_queryset_aggregation_with_skip(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = Person.objects.skip(1).aggregate(
{"$project": {"name": {"$toUpper": "$name"}}}
)
assert list(data) == [
{"_id": p2.pk, "name": "WILSON JUNIOR"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
def test_queryset_aggregation_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = Person.objects.limit(1).aggregate(
{"$project": {"name": {"$toUpper": "$name"}}}
)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
def test_queryset_aggregation_with_sort(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = Person.objects.order_by("name").aggregate(
{"$project": {"name": {"$toUpper": "$name"}}}
)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
{"_id": p2.pk, "name": "WILSON JUNIOR"},
]
def test_queryset_aggregation_with_skip_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = list(
Person.objects.skip(1)
.limit(1)
.aggregate({"$project": {"name": {"$toUpper": "$name"}}})
)
assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}]
# Make sure limit/skip chaining order has no impact
data2 = (
Person.objects.limit(1)
.skip(1)
.aggregate({"$project": {"name": {"$toUpper": "$name"}}})
)
assert data == list(data2)
def test_queryset_aggregation_with_sort_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = (
Person.objects.order_by("name")
.limit(2)
.aggregate({"$project": {"name": {"$toUpper": "$name"}}})
)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
# Verify adding limit/skip steps works as expected
data = (
Person.objects.order_by("name")
.limit(2)
.aggregate({"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1})
)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
data = (
Person.objects.order_by("name")
.limit(2)
.aggregate(
{"$project": {"name": {"$toUpper": "$name"}}},
{"$skip": 1},
{"$limit": 1},
)
)
assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}]
def test_queryset_aggregation_with_sort_with_skip(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = (
Person.objects.order_by("name")
.skip(2)
.aggregate({"$project": {"name": {"$toUpper": "$name"}}})
)
assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}]
def test_queryset_aggregation_with_sort_with_skip_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
data = (
Person.objects.order_by("name")
.skip(1)
.limit(1)
.aggregate({"$project": {"name": {"$toUpper": "$name"}}})
)
assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}]
def test_delete_count(self):
[self.Person(name="User {0}".format(i), age=i * 10).save() for i in range(1, 4)]
assert (

View File

@@ -0,0 +1,255 @@
# -*- coding: utf-8 -*-
import unittest
import warnings
from pymongo.read_preferences import ReadPreference
from mongoengine import *
from tests.utils import MongoDBTestCase
class TestQuerysetAggregate(MongoDBTestCase):
def test_read_preference_aggregation_framework(self):
class Bar(Document):
txt = StringField()
meta = {"indexes": ["txt"]}
# Aggregates with read_preference
pipeline = []
bars = Bar.objects.read_preference(
ReadPreference.SECONDARY_PREFERRED
).aggregate(pipeline)
assert (
bars._CommandCursor__collection.read_preference
== ReadPreference.SECONDARY_PREFERRED
)
def test_queryset_aggregation_framework(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects(age__lte=22).aggregate(pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p2.pk, "name": "WILSON JUNIOR"},
]
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects(age__lte=22).order_by("-name").aggregate(pipeline)
assert list(data) == [
{"_id": p2.pk, "name": "WILSON JUNIOR"},
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
]
pipeline = [
{"$group": {"_id": None, "total": {"$sum": 1}, "avg": {"$avg": "$age"}}}
]
data = (
Person.objects(age__gte=17, age__lte=40)
.order_by("-age")
.aggregate(pipeline)
)
assert list(data) == [{"_id": None, "avg": 29, "total": 2}]
pipeline = [{"$match": {"name": "Isabella Luanna"}}]
data = Person.objects().aggregate(pipeline)
assert list(data) == [{u"_id": p1.pk, u"age": 16, u"name": u"Isabella Luanna"}]
def test_queryset_aggregation_with_skip(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.skip(1).aggregate(pipeline)
assert list(data) == [
{"_id": p2.pk, "name": "WILSON JUNIOR"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
def test_queryset_aggregation_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.limit(1).aggregate(pipeline)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
def test_queryset_aggregation_with_sort(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").aggregate(pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
{"_id": p2.pk, "name": "WILSON JUNIOR"},
]
def test_queryset_aggregation_with_skip_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = list(Person.objects.skip(1).limit(1).aggregate(pipeline))
assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}]
# Make sure limit/skip chaining order has no impact
data2 = Person.objects.limit(1).skip(1).aggregate(pipeline)
assert data == list(data2)
def test_queryset_aggregation_with_sort_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").limit(2).aggregate(pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
# Verify adding limit/skip steps works as expected
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}]
data = Person.objects.order_by("name").limit(2).aggregate(pipeline)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
pipeline = [
{"$project": {"name": {"$toUpper": "$name"}}},
{"$skip": 1},
{"$limit": 1},
]
data = Person.objects.order_by("name").limit(2).aggregate(pipeline)
assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}]
def test_queryset_aggregation_with_sort_with_skip(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").skip(2).aggregate(pipeline)
assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}]
def test_queryset_aggregation_with_sort_with_skip_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").skip(1).limit(1).aggregate(pipeline)
assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}]
def test_queryset_aggregation_deprecated_interface(self):
class Person(Document):
name = StringField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna")
p2 = Person(name="Wilson Junior")
p3 = Person(name="Sandra Mara")
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
# Make sure a warning is emitted
with warnings.catch_warnings():
warnings.simplefilter("error", DeprecationWarning)
with self.assertRaises(DeprecationWarning):
Person.objects.order_by("name").limit(2).aggregate(*pipeline)
# Make sure old interface works as expected with a 1-step pipeline
data = Person.objects.order_by("name").limit(2).aggregate(*pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
# Make sure old interface works as expected with a 2-steps pipeline
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}]
data = Person.objects.order_by("name").limit(2).aggregate(*pipeline)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
if __name__ == "__main__":
unittest.main()

View File

@@ -24,6 +24,12 @@ class TestTransform(unittest.TestCase):
}
assert transform.query(friend__age__gte=30) == {"friend.age": {"$gte": 30}}
assert transform.query(name__exists=True) == {"name": {"$exists": True}}
assert transform.query(name=["Mark"], __raw__={"name": {"$in": "Tom"}}) == {
"$and": [{"name": ["Mark"]}, {"name": {"$in": "Tom"}}]
}
assert transform.query(name__in=["Tom"], __raw__={"name": "Mark"}) == {
"$and": [{"name": {"$in": ["Tom"]}}, {"name": "Mark"}]
}
def test_transform_update(self):
class LisDoc(Document):