diff --git a/AUTHORS b/AUTHORS index 7d3000ce..02e43955 100644 --- a/AUTHORS +++ b/AUTHORS @@ -256,3 +256,4 @@ that much better: * Eric Timmons (https://github.com/daewok) * Matthew Simpson (https://github.com/mcsimps2) * Leonardo Domingues (https://github.com/leodmgs) + * Agustin Barto (https://github.com/abarto) diff --git a/docs/changelog.rst b/docs/changelog.rst index 76545559..625526a3 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -21,6 +21,7 @@ Development - ``Queryset._ensure_indexes`` and ``Queryset.ensure_indexes``, the right method to use is ``Document.ensure_indexes`` - Added pre-commit #2212 - Renamed requirements-lint.txt to requirements-dev.txt #2212 +- Support for setting ReadConcern #2255 Changes in 0.19.1 ================= diff --git a/mongoengine/context_managers.py b/mongoengine/context_managers.py index 8bfb902b..5f2b5229 100644 --- a/mongoengine/context_managers.py +++ b/mongoengine/context_managers.py @@ -1,5 +1,6 @@ from contextlib import contextmanager +from pymongo.read_concern import ReadConcern from pymongo.write_concern import WriteConcern from mongoengine.common import _import_class @@ -13,6 +14,7 @@ __all__ = ( "no_sub_classes", "query_counter", "set_write_concern", + "set_read_write_concern", ) @@ -256,3 +258,21 @@ def set_write_concern(collection, write_concerns): combined_concerns = dict(collection.write_concern.document.items()) combined_concerns.update(write_concerns) yield collection.with_options(write_concern=WriteConcern(**combined_concerns)) + + +@contextmanager +def set_read_write_concern(collection, write_concerns, read_concerns): + combined_write_concerns = dict(collection.write_concern.document.items()) + + if write_concerns is not None: + combined_write_concerns.update(write_concerns) + + combined_read_concerns = dict(collection.read_concern.document.items()) + + if read_concerns is not None: + combined_read_concerns.update(read_concerns) + + yield collection.with_options( + write_concern=WriteConcern(**combined_write_concerns), + read_concern=ReadConcern(**combined_read_concerns), + ) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index 23cb79c5..39c44b29 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -3,18 +3,25 @@ import itertools import re import warnings +from collections.abc import Mapping + from bson import SON, json_util from bson.code import Code import pymongo import pymongo.errors from pymongo.collection import ReturnDocument from pymongo.common import validate_read_preference +from pymongo.read_concern import ReadConcern from mongoengine import signals from mongoengine.base import get_document from mongoengine.common import _import_class from mongoengine.connection import get_db -from mongoengine.context_managers import set_write_concern, switch_db +from mongoengine.context_managers import ( + set_read_write_concern, + set_write_concern, + switch_db, +) from mongoengine.errors import ( BulkWriteError, InvalidQueryError, @@ -57,6 +64,7 @@ class BaseQuerySet: self._snapshot = False self._timeout = True self._read_preference = None + self._read_concern = None self._iter = False self._scalar = [] self._none = False @@ -484,7 +492,13 @@ class BaseQuerySet: return result.deleted_count def update( - self, upsert=False, multi=True, write_concern=None, full_result=False, **update + self, + upsert=False, + multi=True, + write_concern=None, + read_concern=None, + full_result=False, + **update ): """Perform an atomic update on the fields matched by the query. @@ -496,6 +510,7 @@ class BaseQuerySet: ``save(..., write_concern={w: 2, fsync: True}, ...)`` will wait until at least two servers have recorded the write and will force an fsync on the primary server. + :param read_concern: Override the read concern for the operation :param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number updated items :param update: Django-style update keyword arguments @@ -522,7 +537,9 @@ class BaseQuerySet: else: update["$set"] = {"_cls": queryset._document._class_name} try: - with set_write_concern(queryset._collection, write_concern) as collection: + with set_read_write_concern( + queryset._collection, write_concern, read_concern + ) as collection: update_func = collection.update_one if multi: update_func = collection.update_many @@ -539,7 +556,7 @@ class BaseQuerySet: raise OperationError(message) raise OperationError("Update failed (%s)" % err) - def upsert_one(self, write_concern=None, **update): + def upsert_one(self, write_concern=None, read_concern=None, **update): """Overwrite or add the first document matched by the query. :param write_concern: Extra keyword arguments are passed down which @@ -548,6 +565,7 @@ class BaseQuerySet: ``save(..., write_concern={w: 2, fsync: True}, ...)`` will wait until at least two servers have recorded the write and will force an fsync on the primary server. + :param read_concern: Override the read concern for the operation :param update: Django-style update keyword arguments :returns the new or overwritten document @@ -559,6 +577,7 @@ class BaseQuerySet: multi=False, upsert=True, write_concern=write_concern, + read_concern=read_concern, full_result=True, **update ) @@ -1177,6 +1196,22 @@ class BaseQuerySet: queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference return queryset + def read_concern(self, read_concern): + """Change the read_concern when querying. + + :param read_concern: override ReplicaSetConnection-level + preference. + """ + if read_concern is not None and not isinstance(read_concern, Mapping): + raise TypeError("%r is not a valid read concern." % (read_concern,)) + + queryset = self.clone() + queryset._read_concern = ( + ReadConcern(**read_concern) if read_concern is not None else None + ) + queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_concern + return queryset + def scalar(self, *fields): """Instead of returning Document instances, return either a specific value or a tuple of values in order. @@ -1623,9 +1658,9 @@ class BaseQuerySet: # XXX In PyMongo 3+, we define the read preference on a collection # level, not a cursor level. Thus, we need to get a cloned collection # object using `with_options` first. - if self._read_preference is not None: + if self._read_preference is not None or self._read_concern is not None: self._cursor_obj = self._collection.with_options( - read_preference=self._read_preference + read_preference=self._read_preference, read_concern=self._read_concern ).find(self._query, **self._cursor_args) else: self._cursor_obj = self._collection.find(self._query, **self._cursor_args) diff --git a/tests/queryset/test_queryset.py b/tests/queryset/test_queryset.py index cb8e7bba..6b6000c9 100644 --- a/tests/queryset/test_queryset.py +++ b/tests/queryset/test_queryset.py @@ -7,6 +7,7 @@ from decimal import Decimal from bson import DBRef, ObjectId import pymongo +from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.results import UpdateResult import pytest @@ -4726,6 +4727,46 @@ class TestQueryset(unittest.TestCase): ) assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED) + def test_read_concern(self): + class Bar(Document): + txt = StringField() + + meta = {"indexes": ["txt"]} + + Bar.drop_collection() + bar = Bar.objects.create(txt="xyz") + + bars = list(Bar.objects.read_concern(None)) + assert bars == [bar] + + bars = Bar.objects.read_concern({"level": "local"}) + assert bars._read_concern.document == {"level": "local"} + assert bars._cursor.collection.read_concern.document == {"level": "local"} + + # Make sure that `.read_concern(...)` does not accept string values. + with pytest.raises(TypeError): + Bar.objects.read_concern("local") + + def assert_read_concern(qs, expected_read_concern): + assert qs._read_concern.document == expected_read_concern + assert qs._cursor.collection.read_concern.document == expected_read_concern + + # Make sure read concern is respected after a `.skip(...)`. + bars = Bar.objects.skip(1).read_concern({"level": "local"}) + assert_read_concern(bars, {"level": "local"}) + + # Make sure read concern is respected after a `.limit(...)`. + bars = Bar.objects.limit(1).read_concern({"level": "local"}) + assert_read_concern(bars, {"level": "local"}) + + # Make sure read concern is respected after an `.order_by(...)`. + bars = Bar.objects.order_by("txt").read_concern({"level": "local"}) + assert_read_concern(bars, {"level": "local"}) + + # Make sure read concern is respected after a `.hint(...)`. + bars = Bar.objects.hint([("txt", 1)]).read_concern({"level": "majority"}) + assert_read_concern(bars, {"level": "majority"}) + def test_json_simple(self): class Embedded(EmbeddedDocument): string = StringField() diff --git a/tests/test_context_managers.py b/tests/test_context_managers.py index 4410fa90..a4864c40 100644 --- a/tests/test_context_managers.py +++ b/tests/test_context_managers.py @@ -8,6 +8,8 @@ from mongoengine.context_managers import ( no_dereference, no_sub_classes, query_counter, + set_read_write_concern, + set_write_concern, switch_collection, switch_db, ) @@ -15,6 +17,52 @@ from mongoengine.pymongo_support import count_documents class TestContextManagers: + def test_set_write_concern(self): + connect("mongoenginetest") + + class User(Document): + name = StringField() + + collection = User._get_collection() + original_write_concern = collection.write_concern + + with set_write_concern( + collection, {"w": "majority", "j": True, "wtimeout": 1234} + ) as updated_collection: + assert updated_collection.write_concern.document == { + "w": "majority", + "j": True, + "wtimeout": 1234, + } + + assert original_write_concern.document == collection.write_concern.document + + def test_set_read_write_concern(self): + connect("mongoenginetest") + + class User(Document): + name = StringField() + + collection = User._get_collection() + + original_read_concern = collection.read_concern + original_write_concern = collection.write_concern + + with set_read_write_concern( + collection, + {"w": "majority", "j": True, "wtimeout": 1234}, + {"level": "local"}, + ) as update_collection: + assert update_collection.read_concern.document == {"level": "local"} + assert update_collection.write_concern.document == { + "w": "majority", + "j": True, + "wtimeout": 1234, + } + + assert original_read_concern.document == collection.read_concern.document + assert original_write_concern.document == collection.write_concern.document + def test_switch_db_context_manager(self): connect("mongoenginetest") register_connection("testdb-1", "mongoenginetest2")