Merge branch 'feature/allow-setting-read-concern-queryset' of git://github.com/abarto/mongoengine into abarto-feature/allow-setting-read-concern-queryset

This commit is contained in:
Bastien Gérard 2020-03-02 23:04:42 +01:00
commit d85f0e6226
3 changed files with 82 additions and 6 deletions

View File

@ -257,3 +257,11 @@ def set_write_concern(collection, write_concerns):
combined_concerns = dict(collection.write_concern.document.items()) combined_concerns = dict(collection.write_concern.document.items())
combined_concerns.update(write_concerns) combined_concerns.update(write_concerns)
yield collection.with_options(write_concern=WriteConcern(**combined_concerns)) yield collection.with_options(write_concern=WriteConcern(**combined_concerns))
@contextmanager
def set_read_write_concern(collection, write_concerns, read_concern):
combined_write_concerns = dict(collection.write_concern.document.items())
combined_write_concerns.update(write_concerns)
yield collection.with_options(write_concern=WriteConcern(**combined_write_concerns), read_concern=read_concern)

View File

@ -11,6 +11,7 @@ import pymongo
import pymongo.errors import pymongo.errors
from pymongo.collection import ReturnDocument from pymongo.collection import ReturnDocument
from pymongo.common import validate_read_preference from pymongo.common import validate_read_preference
from pymongo.read_concern import ReadConcern
import six import six
from six import iteritems from six import iteritems
@ -18,7 +19,7 @@ from mongoengine import signals
from mongoengine.base import get_document from mongoengine.base import get_document
from mongoengine.common import _import_class from mongoengine.common import _import_class
from mongoengine.connection import get_db from mongoengine.connection import get_db
from mongoengine.context_managers import set_write_concern, switch_db from mongoengine.context_managers import set_write_concern, set_read_write_concern, switch_db
from mongoengine.errors import ( from mongoengine.errors import (
BulkWriteError, BulkWriteError,
InvalidQueryError, InvalidQueryError,
@ -62,6 +63,7 @@ class BaseQuerySet(object):
self._timeout = True self._timeout = True
self._slave_okay = False self._slave_okay = False
self._read_preference = None self._read_preference = None
self._read_concern = None
self._iter = False self._iter = False
self._scalar = [] self._scalar = []
self._none = False self._none = False
@ -490,7 +492,7 @@ class BaseQuerySet(object):
return result.deleted_count return result.deleted_count
def update( def update(
self, upsert=False, multi=True, write_concern=None, full_result=False, **update self, upsert=False, multi=True, write_concern=None, read_concern=None, full_result=False, **update
): ):
"""Perform an atomic update on the fields matched by the query. """Perform an atomic update on the fields matched by the query.
@ -502,6 +504,7 @@ class BaseQuerySet(object):
``save(..., write_concern={w: 2, fsync: True}, ...)`` will ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and wait until at least two servers have recorded the write and
will force an fsync on the primary server. will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number :param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number
updated items updated items
:param update: Django-style update keyword arguments :param update: Django-style update keyword arguments
@ -528,7 +531,7 @@ class BaseQuerySet(object):
else: else:
update["$set"] = {"_cls": queryset._document._class_name} update["$set"] = {"_cls": queryset._document._class_name}
try: try:
with set_write_concern(queryset._collection, write_concern) as collection: with set_read_write_concern(queryset._collection, write_concern, read_concern) as collection:
update_func = collection.update_one update_func = collection.update_one
if multi: if multi:
update_func = collection.update_many update_func = collection.update_many
@ -545,7 +548,7 @@ class BaseQuerySet(object):
raise OperationError(message) raise OperationError(message)
raise OperationError(u"Update failed (%s)" % six.text_type(err)) raise OperationError(u"Update failed (%s)" % six.text_type(err))
def upsert_one(self, write_concern=None, **update): def upsert_one(self, write_concern=None, read_concern=None, **update):
"""Overwrite or add the first document matched by the query. """Overwrite or add the first document matched by the query.
:param write_concern: Extra keyword arguments are passed down which :param write_concern: Extra keyword arguments are passed down which
@ -554,6 +557,7 @@ class BaseQuerySet(object):
``save(..., write_concern={w: 2, fsync: True}, ...)`` will ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and wait until at least two servers have recorded the write and
will force an fsync on the primary server. will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param update: Django-style update keyword arguments :param update: Django-style update keyword arguments
:returns the new or overwritten document :returns the new or overwritten document
@ -565,6 +569,7 @@ class BaseQuerySet(object):
multi=False, multi=False,
upsert=True, upsert=True,
write_concern=write_concern, write_concern=write_concern,
read_concern=read_concern,
full_result=True, full_result=True,
**update **update
) )
@ -1196,6 +1201,20 @@ class BaseQuerySet(object):
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference
return queryset return queryset
def read_concern(self, read_concern):
"""Change the read_concern when querying.
:param read_concern: override ReplicaSetConnection-level
preference.
"""
if read_concern is not None and not isinstance(read_concern, ReadConcern):
raise TypeError("%r is not a read concern." % (read_concern,))
queryset = self.clone()
queryset._read_concern = read_concern
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_concern
return queryset
def scalar(self, *fields): def scalar(self, *fields):
"""Instead of returning Document instances, return either a specific """Instead of returning Document instances, return either a specific
value or a tuple of values in order. value or a tuple of values in order.
@ -1642,9 +1661,9 @@ class BaseQuerySet(object):
# XXX In PyMongo 3+, we define the read preference on a collection # XXX In PyMongo 3+, we define the read preference on a collection
# level, not a cursor level. Thus, we need to get a cloned collection # level, not a cursor level. Thus, we need to get a cloned collection
# object using `with_options` first. # object using `with_options` first.
if self._read_preference is not None: if self._read_preference is not None or self._read_concern is not None:
self._cursor_obj = self._collection.with_options( self._cursor_obj = self._collection.with_options(
read_preference=self._read_preference read_preference=self._read_preference, read_concern=self._read_concern
).find(self._query, **self._cursor_args) ).find(self._query, **self._cursor_args)
else: else:
self._cursor_obj = self._collection.find(self._query, **self._cursor_args) self._cursor_obj = self._collection.find(self._query, **self._cursor_args)

View File

@ -7,6 +7,7 @@ from decimal import Decimal
from bson import DBRef, ObjectId from bson import DBRef, ObjectId
import pymongo import pymongo
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference from pymongo.read_preferences import ReadPreference
from pymongo.results import UpdateResult from pymongo.results import UpdateResult
import pytest import pytest
@ -4658,6 +4659,54 @@ class TestQueryset(unittest.TestCase):
) )
assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED) assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED)
def test_read_concern(self):
class Bar(Document):
txt = StringField()
meta = {"indexes": ["txt"]}
Bar.drop_collection()
bar = Bar.objects.create(txt="xyz")
bars = list(Bar.objects.read_concern(None))
assert bars == [bar]
bars = Bar.objects.read_concern(ReadConcern(level='local'))
assert bars._read_concern == ReadConcern(level='local')
assert (
bars._cursor.collection.read_concern
== ReadConcern(level='local')
)
# Make sure that `.read_concern(...)` does accept string values.
with pytest.raises(TypeError):
Bar.objects.read_concern('local')
def assert_read_concern(qs, expected_read_concern):
assert qs._read_concern == expected_read_concern
assert qs._cursor.collection.read_concern == expected_read_concern
# Make sure read concern is respected after a `.skip(...)`.
bars = Bar.objects.skip(1).read_concern(ReadConcern('majority'))
assert_read_concern(bars, ReadConcern('majority'))
# Make sure read concern is respected after a `.limit(...)`.
bars = Bar.objects.limit(1).read_concern(ReadConcern('majority'))
assert_read_concern(bars, ReadConcern('majority'))
# Make sure read concern is respected after an `.order_by(...)`.
bars = Bar.objects.order_by("txt").read_concern(
ReadConcern('majority')
)
assert_read_concern(bars, ReadConcern('majority'))
# Make sure read concern is respected after a `.hint(...)`.
bars = Bar.objects.hint([("txt", 1)]).read_concern(
ReadConcern('majority')
)
assert_read_concern(bars, ReadConcern('majority'))
def test_json_simple(self): def test_json_simple(self):
class Embedded(EmbeddedDocument): class Embedded(EmbeddedDocument):
string = StringField() string = StringField()