mongoengine/tests/queryset/test_queryset_aggregation.py
2021-02-23 21:49:45 +01:00

282 lines
9.4 KiB
Python

import unittest
import warnings
from pymongo.read_preferences import ReadPreference
from mongoengine import *
from tests.utils import MongoDBTestCase
class TestQuerysetAggregate(MongoDBTestCase):
def test_read_preference_aggregation_framework(self):
class Bar(Document):
txt = StringField()
meta = {"indexes": ["txt"]}
# Aggregates with read_preference
pipeline = []
bars = Bar.objects.read_preference(
ReadPreference.SECONDARY_PREFERRED
).aggregate(pipeline)
assert (
bars._CommandCursor__collection.read_preference
== ReadPreference.SECONDARY_PREFERRED
)
def test_queryset_aggregation_framework(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects(age__lte=22).aggregate(pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p2.pk, "name": "WILSON JUNIOR"},
]
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects(age__lte=22).order_by("-name").aggregate(pipeline)
assert list(data) == [
{"_id": p2.pk, "name": "WILSON JUNIOR"},
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
]
pipeline = [
{"$group": {"_id": None, "total": {"$sum": 1}, "avg": {"$avg": "$age"}}}
]
data = (
Person.objects(age__gte=17, age__lte=40)
.order_by("-age")
.aggregate(pipeline)
)
assert list(data) == [{"_id": None, "avg": 29, "total": 2}]
pipeline = [{"$match": {"name": "Isabella Luanna"}}]
data = Person.objects().aggregate(pipeline)
assert list(data) == [{"_id": p1.pk, "age": 16, "name": "Isabella Luanna"}]
def test_queryset_aggregation_with_skip(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.skip(1).aggregate(pipeline)
assert list(data) == [
{"_id": p2.pk, "name": "WILSON JUNIOR"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
def test_queryset_aggregation_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.limit(1).aggregate(pipeline)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
def test_queryset_aggregation_with_sort(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").aggregate(pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
{"_id": p2.pk, "name": "WILSON JUNIOR"},
]
def test_queryset_aggregation_with_skip_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = list(Person.objects.skip(1).limit(1).aggregate(pipeline))
assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}]
# Make sure limit/skip chaining order has no impact
data2 = Person.objects.limit(1).skip(1).aggregate(pipeline)
assert data == list(data2)
def test_queryset_aggregation_with_sort_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").limit(2).aggregate(pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
# Verify adding limit/skip steps works as expected
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}]
data = Person.objects.order_by("name").limit(2).aggregate(pipeline)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
pipeline = [
{"$project": {"name": {"$toUpper": "$name"}}},
{"$skip": 1},
{"$limit": 1},
]
data = Person.objects.order_by("name").limit(2).aggregate(pipeline)
assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}]
def test_queryset_aggregation_with_sort_with_skip(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").skip(2).aggregate(pipeline)
assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}]
def test_queryset_aggregation_with_sort_with_skip_with_limit(self):
class Person(Document):
name = StringField()
age = IntField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna", age=16)
p2 = Person(name="Wilson Junior", age=21)
p3 = Person(name="Sandra Mara", age=37)
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
data = Person.objects.order_by("name").skip(1).limit(1).aggregate(pipeline)
assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}]
def test_queryset_aggregation_deprecated_interface(self):
class Person(Document):
name = StringField()
Person.drop_collection()
p1 = Person(name="Isabella Luanna")
p2 = Person(name="Wilson Junior")
p3 = Person(name="Sandra Mara")
Person.objects.insert([p1, p2, p3])
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}]
# Make sure a warning is emitted
with warnings.catch_warnings():
warnings.simplefilter("error", DeprecationWarning)
with self.assertRaises(DeprecationWarning):
Person.objects.order_by("name").limit(2).aggregate(*pipeline)
# Make sure old interface works as expected with a 1-step pipeline
data = Person.objects.order_by("name").limit(2).aggregate(*pipeline)
assert list(data) == [
{"_id": p1.pk, "name": "ISABELLA LUANNA"},
{"_id": p3.pk, "name": "SANDRA MARA"},
]
# Make sure old interface works as expected with a 2-steps pipeline
pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}]
data = Person.objects.order_by("name").limit(2).aggregate(*pipeline)
assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}]
def test_queryset_aggregation_geonear_aggregation_on_pointfield(self):
"""test ensures that $geonear can be used as a 1-stage pipeline and that
MongoEngine does not interfer with such pipeline (#2473)
"""
class Aggr(Document):
name = StringField()
c = PointField()
Aggr.drop_collection()
agg1 = Aggr(name="X", c=[10.634584, 35.8245029]).save()
agg2 = Aggr(name="Y", c=[10.634584, 35.8245029]).save()
pipeline = [
{
"$geoNear": {
"near": {"type": "Point", "coordinates": [10.634584, 35.8245029]},
"distanceField": "c",
"spherical": True,
}
}
]
assert list(Aggr.objects.aggregate(*pipeline)) == [
{"_id": agg1.id, "c": 0.0, "name": "X"},
{"_id": agg2.id, "c": 0.0, "name": "Y"},
]
if __name__ == "__main__":
unittest.main()