From aa02f87b69787ac678d4ee740e1cb2e5e6753fc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20G=C3=A9rard?= Date: Fri, 27 Dec 2019 09:23:15 +0100 Subject: [PATCH 1/2] change & deprecate .aggregate api to mimic pymongo's interface + separate the aggregation tests from the large test_queryset.py file --- docs/guide/querying.rst | 4 +- mongoengine/queryset/base.py | 29 ++- tests/queryset/test_queryset.py | 236 +----------------- tests/queryset/test_queryset_aggregation.py | 255 ++++++++++++++++++++ 4 files changed, 277 insertions(+), 247 deletions(-) create mode 100644 tests/queryset/test_queryset_aggregation.py diff --git a/docs/guide/querying.rst b/docs/guide/querying.rst index 121325ae..07de0378 100644 --- a/docs/guide/querying.rst +++ b/docs/guide/querying.rst @@ -400,7 +400,7 @@ would be generating "tag-clouds":: MongoDB aggregation API ----------------------- -If you need to run aggregation pipelines, MongoEngine provides an entry point `Pymongo's aggregation framework `_ +If you need to run aggregation pipelines, MongoEngine provides an entry point to `Pymongo's aggregation framework `_ through :meth:`~mongoengine.queryset.QuerySet.aggregate`. Check out Pymongo's documentation for the syntax and pipeline. An example of its use would be:: @@ -414,7 +414,7 @@ An example of its use would be:: {"$sort" : {"name" : -1}}, {"$project": {"_id": 0, "name": {"$toUpper": "$name"}}} ] - data = Person.objects().aggregate(*pipeline) + data = Person.objects().aggregate(pipeline) assert data == [{'name': 'BOB'}, {'name': 'JOHN'}] Query efficiency and performance diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index a648391e..aa5f2584 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -1255,16 +1255,25 @@ class BaseQuerySet(object): for data in son_data ] - def aggregate(self, *pipeline, **kwargs): - """ - Perform a aggregate function based in your queryset params + def aggregate(self, pipeline, *suppl_pipeline, **kwargs): + """Perform a aggregate function based in your queryset params + :param pipeline: list of aggregation commands,\ see: http://docs.mongodb.org/manual/core/aggregation-pipeline/ - + :param suppl_pipeline: unpacked list of pipeline (added to support deprecation of the old interface) + parameter will be removed shortly .. versionadded:: 0.9 """ - initial_pipeline = [] + using_deprecated_interface = isinstance(pipeline, dict) or bool(suppl_pipeline) + user_pipeline = [pipeline] if isinstance(pipeline, dict) else list(pipeline) + if using_deprecated_interface: + msg = "Calling .aggregate() with un unpacked list (*pipeline) is deprecated, it will soon change and will expect a list (similar to pymongo.Collection.aggregate interface), see documentation" + warnings.warn(msg, DeprecationWarning) + + user_pipeline += suppl_pipeline + + initial_pipeline = [] if self._query: initial_pipeline.append({"$match": self._query}) @@ -1281,14 +1290,14 @@ class BaseQuerySet(object): if self._skip is not None: initial_pipeline.append({"$skip": self._skip}) - pipeline = initial_pipeline + list(pipeline) + final_pipeline = initial_pipeline + user_pipeline + collection = self._collection if self._read_preference is not None: - return self._collection.with_options( + collection = self._collection.with_options( read_preference=self._read_preference - ).aggregate(pipeline, cursor={}, **kwargs) - - return self._collection.aggregate(pipeline, cursor={}, **kwargs) + ) + return collection.aggregate(final_pipeline, cursor={}, **kwargs) # JS functionality def map_reduce( diff --git a/tests/queryset/test_queryset.py b/tests/queryset/test_queryset.py index 7812ab66..b30350e6 100644 --- a/tests/queryset/test_queryset.py +++ b/tests/queryset/test_queryset.py @@ -14,7 +14,7 @@ import six from six import iteritems from mongoengine import * -from mongoengine.connection import get_connection, get_db +from mongoengine.connection import get_db from mongoengine.context_managers import query_counter, switch_db from mongoengine.errors import InvalidQueryError from mongoengine.mongodb_support import MONGODB_36, get_mongodb_version @@ -4658,21 +4658,6 @@ class TestQueryset(unittest.TestCase): ) assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED) - def test_read_preference_aggregation_framework(self): - class Bar(Document): - txt = StringField() - - meta = {"indexes": ["txt"]} - - # Aggregates with read_preference - bars = Bar.objects.read_preference( - ReadPreference.SECONDARY_PREFERRED - ).aggregate() - assert ( - bars._CommandCursor__collection.read_preference - == ReadPreference.SECONDARY_PREFERRED - ) - def test_json_simple(self): class Embedded(EmbeddedDocument): string = StringField() @@ -5399,225 +5384,6 @@ class TestQueryset(unittest.TestCase): assert Person.objects.first().name == "A" assert Person.objects._has_data(), "Cursor has data and returned False" - def test_queryset_aggregation_framework(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = Person.objects(age__lte=22).aggregate( - {"$project": {"name": {"$toUpper": "$name"}}} - ) - - assert list(data) == [ - {"_id": p1.pk, "name": "ISABELLA LUANNA"}, - {"_id": p2.pk, "name": "WILSON JUNIOR"}, - ] - - data = ( - Person.objects(age__lte=22) - .order_by("-name") - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}) - ) - - assert list(data) == [ - {"_id": p2.pk, "name": "WILSON JUNIOR"}, - {"_id": p1.pk, "name": "ISABELLA LUANNA"}, - ] - - data = ( - Person.objects(age__gte=17, age__lte=40) - .order_by("-age") - .aggregate( - {"$group": {"_id": None, "total": {"$sum": 1}, "avg": {"$avg": "$age"}}} - ) - ) - assert list(data) == [{"_id": None, "avg": 29, "total": 2}] - - data = Person.objects().aggregate({"$match": {"name": "Isabella Luanna"}}) - assert list(data) == [{u"_id": p1.pk, u"age": 16, u"name": u"Isabella Luanna"}] - - def test_queryset_aggregation_with_skip(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = Person.objects.skip(1).aggregate( - {"$project": {"name": {"$toUpper": "$name"}}} - ) - - assert list(data) == [ - {"_id": p2.pk, "name": "WILSON JUNIOR"}, - {"_id": p3.pk, "name": "SANDRA MARA"}, - ] - - def test_queryset_aggregation_with_limit(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = Person.objects.limit(1).aggregate( - {"$project": {"name": {"$toUpper": "$name"}}} - ) - - assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}] - - def test_queryset_aggregation_with_sort(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = Person.objects.order_by("name").aggregate( - {"$project": {"name": {"$toUpper": "$name"}}} - ) - - assert list(data) == [ - {"_id": p1.pk, "name": "ISABELLA LUANNA"}, - {"_id": p3.pk, "name": "SANDRA MARA"}, - {"_id": p2.pk, "name": "WILSON JUNIOR"}, - ] - - def test_queryset_aggregation_with_skip_with_limit(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = list( - Person.objects.skip(1) - .limit(1) - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}) - ) - - assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}] - - # Make sure limit/skip chaining order has no impact - data2 = ( - Person.objects.limit(1) - .skip(1) - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}) - ) - - assert data == list(data2) - - def test_queryset_aggregation_with_sort_with_limit(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = ( - Person.objects.order_by("name") - .limit(2) - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}) - ) - - assert list(data) == [ - {"_id": p1.pk, "name": "ISABELLA LUANNA"}, - {"_id": p3.pk, "name": "SANDRA MARA"}, - ] - - # Verify adding limit/skip steps works as expected - data = ( - Person.objects.order_by("name") - .limit(2) - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}) - ) - - assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}] - - data = ( - Person.objects.order_by("name") - .limit(2) - .aggregate( - {"$project": {"name": {"$toUpper": "$name"}}}, - {"$skip": 1}, - {"$limit": 1}, - ) - ) - - assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}] - - def test_queryset_aggregation_with_sort_with_skip(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = ( - Person.objects.order_by("name") - .skip(2) - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}) - ) - - assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}] - - def test_queryset_aggregation_with_sort_with_skip_with_limit(self): - class Person(Document): - name = StringField() - age = IntField() - - Person.drop_collection() - - p1 = Person(name="Isabella Luanna", age=16) - p2 = Person(name="Wilson Junior", age=21) - p3 = Person(name="Sandra Mara", age=37) - Person.objects.insert([p1, p2, p3]) - - data = ( - Person.objects.order_by("name") - .skip(1) - .limit(1) - .aggregate({"$project": {"name": {"$toUpper": "$name"}}}) - ) - - assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}] - def test_delete_count(self): [self.Person(name="User {0}".format(i), age=i * 10).save() for i in range(1, 4)] assert ( diff --git a/tests/queryset/test_queryset_aggregation.py b/tests/queryset/test_queryset_aggregation.py new file mode 100644 index 00000000..00e04a36 --- /dev/null +++ b/tests/queryset/test_queryset_aggregation.py @@ -0,0 +1,255 @@ +# -*- coding: utf-8 -*- + +import unittest +import warnings + +from pymongo.read_preferences import ReadPreference + +from mongoengine import * +from tests.utils import MongoDBTestCase + + +class TestQuerysetAggregate(MongoDBTestCase): + def test_read_preference_aggregation_framework(self): + class Bar(Document): + txt = StringField() + + meta = {"indexes": ["txt"]} + + # Aggregates with read_preference + pipeline = [] + bars = Bar.objects.read_preference( + ReadPreference.SECONDARY_PREFERRED + ).aggregate(pipeline) + assert ( + bars._CommandCursor__collection.read_preference + == ReadPreference.SECONDARY_PREFERRED + ) + + def test_queryset_aggregation_framework(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects(age__lte=22).aggregate(pipeline) + + assert list(data) == [ + {"_id": p1.pk, "name": "ISABELLA LUANNA"}, + {"_id": p2.pk, "name": "WILSON JUNIOR"}, + ] + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects(age__lte=22).order_by("-name").aggregate(pipeline) + + assert list(data) == [ + {"_id": p2.pk, "name": "WILSON JUNIOR"}, + {"_id": p1.pk, "name": "ISABELLA LUANNA"}, + ] + + pipeline = [ + {"$group": {"_id": None, "total": {"$sum": 1}, "avg": {"$avg": "$age"}}} + ] + data = ( + Person.objects(age__gte=17, age__lte=40) + .order_by("-age") + .aggregate(pipeline) + ) + assert list(data) == [{"_id": None, "avg": 29, "total": 2}] + + pipeline = [{"$match": {"name": "Isabella Luanna"}}] + data = Person.objects().aggregate(pipeline) + assert list(data) == [{u"_id": p1.pk, u"age": 16, u"name": u"Isabella Luanna"}] + + def test_queryset_aggregation_with_skip(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects.skip(1).aggregate(pipeline) + + assert list(data) == [ + {"_id": p2.pk, "name": "WILSON JUNIOR"}, + {"_id": p3.pk, "name": "SANDRA MARA"}, + ] + + def test_queryset_aggregation_with_limit(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects.limit(1).aggregate(pipeline) + + assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}] + + def test_queryset_aggregation_with_sort(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects.order_by("name").aggregate(pipeline) + + assert list(data) == [ + {"_id": p1.pk, "name": "ISABELLA LUANNA"}, + {"_id": p3.pk, "name": "SANDRA MARA"}, + {"_id": p2.pk, "name": "WILSON JUNIOR"}, + ] + + def test_queryset_aggregation_with_skip_with_limit(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = list(Person.objects.skip(1).limit(1).aggregate(pipeline)) + + assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}] + + # Make sure limit/skip chaining order has no impact + data2 = Person.objects.limit(1).skip(1).aggregate(pipeline) + + assert data == list(data2) + + def test_queryset_aggregation_with_sort_with_limit(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects.order_by("name").limit(2).aggregate(pipeline) + + assert list(data) == [ + {"_id": p1.pk, "name": "ISABELLA LUANNA"}, + {"_id": p3.pk, "name": "SANDRA MARA"}, + ] + + # Verify adding limit/skip steps works as expected + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}] + data = Person.objects.order_by("name").limit(2).aggregate(pipeline) + + assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}] + + pipeline = [ + {"$project": {"name": {"$toUpper": "$name"}}}, + {"$skip": 1}, + {"$limit": 1}, + ] + data = Person.objects.order_by("name").limit(2).aggregate(pipeline) + + assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}] + + def test_queryset_aggregation_with_sort_with_skip(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects.order_by("name").skip(2).aggregate(pipeline) + + assert list(data) == [{"_id": p2.pk, "name": "WILSON JUNIOR"}] + + def test_queryset_aggregation_with_sort_with_skip_with_limit(self): + class Person(Document): + name = StringField() + age = IntField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna", age=16) + p2 = Person(name="Wilson Junior", age=21) + p3 = Person(name="Sandra Mara", age=37) + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + data = Person.objects.order_by("name").skip(1).limit(1).aggregate(pipeline) + + assert list(data) == [{"_id": p3.pk, "name": "SANDRA MARA"}] + + def test_queryset_aggregation_deprecated_interface(self): + class Person(Document): + name = StringField() + + Person.drop_collection() + + p1 = Person(name="Isabella Luanna") + p2 = Person(name="Wilson Junior") + p3 = Person(name="Sandra Mara") + Person.objects.insert([p1, p2, p3]) + + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}] + + # Make sure a warning is emitted + with warnings.catch_warnings(): + warnings.simplefilter("error", DeprecationWarning) + with self.assertRaises(DeprecationWarning): + Person.objects.order_by("name").limit(2).aggregate(*pipeline) + + # Make sure old interface works as expected with a 1-step pipeline + data = Person.objects.order_by("name").limit(2).aggregate(*pipeline) + + assert list(data) == [ + {"_id": p1.pk, "name": "ISABELLA LUANNA"}, + {"_id": p3.pk, "name": "SANDRA MARA"}, + ] + + # Make sure old interface works as expected with a 2-steps pipeline + pipeline = [{"$project": {"name": {"$toUpper": "$name"}}}, {"$limit": 1}] + data = Person.objects.order_by("name").limit(2).aggregate(*pipeline) + + assert list(data) == [{"_id": p1.pk, "name": "ISABELLA LUANNA"}] + + +if __name__ == "__main__": + unittest.main() From 99e660c66d85620b427f2ab48eba557a6faa5a43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastien=20G=C3=A9rard?= Date: Fri, 27 Dec 2019 09:32:05 +0100 Subject: [PATCH 2/2] update changelog --- docs/changelog.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 06eb8d0c..5fe7d6b4 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,6 +6,8 @@ Changelog Development =========== - (Fill this out as you fix issues and develop your features). +- DEPRECATION: The interface of ``QuerySet.aggregate`` method was changed, it no longer takes an unpacked list of + pipeline steps (*pipeline) but simply takes the pipeline list just like ``pymongo.Collection.aggregate`` does. #2079 Changes in 0.19.0 =================