From 8e17e42e26d944d90bdd16bfacb8d07879768055 Mon Sep 17 00:00:00 2001 From: Agustin Date: Fri, 24 Jan 2020 13:11:07 -0300 Subject: [PATCH] Allow setting read_concern --- mongoengine/context_managers.py | 8 ++++++ mongoengine/queryset/base.py | 31 +++++++++++++++++---- tests/queryset/test_queryset.py | 49 +++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/mongoengine/context_managers.py b/mongoengine/context_managers.py index 1592ceef..0f6c8698 100644 --- a/mongoengine/context_managers.py +++ b/mongoengine/context_managers.py @@ -257,3 +257,11 @@ def set_write_concern(collection, write_concerns): combined_concerns = dict(collection.write_concern.document.items()) combined_concerns.update(write_concerns) yield collection.with_options(write_concern=WriteConcern(**combined_concerns)) + + +@contextmanager +def set_read_write_concern(collection, write_concerns, read_concern): + combined_write_concerns = dict(collection.write_concern.document.items()) + combined_write_concerns.update(write_concerns) + + yield collection.with_options(write_concern=WriteConcern(**combined_write_concerns), read_concern=read_concern) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index 50cb37ac..0b76235c 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -11,6 +11,7 @@ import pymongo import pymongo.errors from pymongo.collection import ReturnDocument from pymongo.common import validate_read_preference +from pymongo.read_concern import ReadConcern import six from six import iteritems @@ -18,7 +19,7 @@ from mongoengine import signals from mongoengine.base import get_document from mongoengine.common import _import_class from mongoengine.connection import get_db -from mongoengine.context_managers import set_write_concern, switch_db +from mongoengine.context_managers import set_write_concern, set_read_write_concern, switch_db from mongoengine.errors import ( BulkWriteError, InvalidQueryError, @@ -62,6 +63,7 @@ class BaseQuerySet(object): self._timeout = True self._slave_okay = False self._read_preference = None + self._read_concern = None self._iter = False self._scalar = [] self._none = False @@ -490,7 +492,7 @@ class BaseQuerySet(object): return result.deleted_count def update( - self, upsert=False, multi=True, write_concern=None, full_result=False, **update + self, upsert=False, multi=True, write_concern=None, read_concern=None, full_result=False, **update ): """Perform an atomic update on the fields matched by the query. @@ -502,6 +504,7 @@ class BaseQuerySet(object): ``save(..., write_concern={w: 2, fsync: True}, ...)`` will wait until at least two servers have recorded the write and will force an fsync on the primary server. + :param read_concern: Override the read concern for the operation :param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number updated items :param update: Django-style update keyword arguments @@ -528,7 +531,7 @@ class BaseQuerySet(object): else: update["$set"] = {"_cls": queryset._document._class_name} try: - with set_write_concern(queryset._collection, write_concern) as collection: + with set_read_write_concern(queryset._collection, write_concern, read_concern) as collection: update_func = collection.update_one if multi: update_func = collection.update_many @@ -545,7 +548,7 @@ class BaseQuerySet(object): raise OperationError(message) raise OperationError(u"Update failed (%s)" % six.text_type(err)) - def upsert_one(self, write_concern=None, **update): + def upsert_one(self, write_concern=None, read_concern=None, **update): """Overwrite or add the first document matched by the query. :param write_concern: Extra keyword arguments are passed down which @@ -554,6 +557,7 @@ class BaseQuerySet(object): ``save(..., write_concern={w: 2, fsync: True}, ...)`` will wait until at least two servers have recorded the write and will force an fsync on the primary server. + :param read_concern: Override the read concern for the operation :param update: Django-style update keyword arguments :returns the new or overwritten document @@ -565,6 +569,7 @@ class BaseQuerySet(object): multi=False, upsert=True, write_concern=write_concern, + read_concern=read_concern, full_result=True, **update ) @@ -1196,6 +1201,20 @@ class BaseQuerySet(object): queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference return queryset + def read_concern(self, read_concern): + """Change the read_concern when querying. + + :param read_concern: override ReplicaSetConnection-level + preference. + """ + if read_concern is not None and not isinstance(read_concern, ReadConcern): + raise TypeError("%r is not a read concern." % (read_concern,)) + + queryset = self.clone() + queryset._read_concern = read_concern + queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_concern + return queryset + def scalar(self, *fields): """Instead of returning Document instances, return either a specific value or a tuple of values in order. @@ -1642,9 +1661,9 @@ class BaseQuerySet(object): # XXX In PyMongo 3+, we define the read preference on a collection # level, not a cursor level. Thus, we need to get a cloned collection # object using `with_options` first. - if self._read_preference is not None: + if self._read_preference is not None or self._read_concern is not None: self._cursor_obj = self._collection.with_options( - read_preference=self._read_preference + read_preference=self._read_preference, read_concern=self._read_concern ).find(self._query, **self._cursor_args) else: self._cursor_obj = self._collection.find(self._query, **self._cursor_args) diff --git a/tests/queryset/test_queryset.py b/tests/queryset/test_queryset.py index b30350e6..c238752d 100644 --- a/tests/queryset/test_queryset.py +++ b/tests/queryset/test_queryset.py @@ -7,6 +7,7 @@ from decimal import Decimal from bson import DBRef, ObjectId import pymongo +from pymongo.read_concern import ReadConcern from pymongo.read_preferences import ReadPreference from pymongo.results import UpdateResult import pytest @@ -4658,6 +4659,54 @@ class TestQueryset(unittest.TestCase): ) assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED) + def test_read_concern(self): + class Bar(Document): + txt = StringField() + + meta = {"indexes": ["txt"]} + + Bar.drop_collection() + bar = Bar.objects.create(txt="xyz") + + bars = list(Bar.objects.read_concern(None)) + assert bars == [bar] + + bars = Bar.objects.read_concern(ReadConcern(level='local')) + assert bars._read_concern == ReadConcern(level='local') + assert ( + bars._cursor.collection.read_concern + == ReadConcern(level='local') + ) + + # Make sure that `.read_concern(...)` does accept string values. + with pytest.raises(TypeError): + Bar.objects.read_concern('local') + + def assert_read_concern(qs, expected_read_concern): + assert qs._read_concern == expected_read_concern + assert qs._cursor.collection.read_concern == expected_read_concern + + # Make sure read concern is respected after a `.skip(...)`. + bars = Bar.objects.skip(1).read_concern(ReadConcern('majority')) + assert_read_concern(bars, ReadConcern('majority')) + + # Make sure read concern is respected after a `.limit(...)`. + bars = Bar.objects.limit(1).read_concern(ReadConcern('majority')) + assert_read_concern(bars, ReadConcern('majority')) + + # Make sure read concern is respected after an `.order_by(...)`. + bars = Bar.objects.order_by("txt").read_concern( + ReadConcern('majority') + ) + assert_read_concern(bars, ReadConcern('majority')) + + # Make sure read concern is respected after a `.hint(...)`. + bars = Bar.objects.hint([("txt", 1)]).read_concern( + ReadConcern('majority') + ) + assert_read_concern(bars, ReadConcern('majority')) + + def test_json_simple(self): class Embedded(EmbeddedDocument): string = StringField()