Merge pull request #2314 from bagerard/abarto-feature/allow-setting-read-concern-queryset

Abarto feature/allow setting read concern queryset
This commit is contained in:
Bastien Gérard 2020-04-26 22:57:57 +02:00 committed by GitHub
commit 130e9c519c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 152 additions and 6 deletions

View File

@ -256,3 +256,4 @@ that much better:
* Eric Timmons (https://github.com/daewok) * Eric Timmons (https://github.com/daewok)
* Matthew Simpson (https://github.com/mcsimps2) * Matthew Simpson (https://github.com/mcsimps2)
* Leonardo Domingues (https://github.com/leodmgs) * Leonardo Domingues (https://github.com/leodmgs)
* Agustin Barto (https://github.com/abarto)

View File

@ -21,6 +21,7 @@ Development
- ``Queryset._ensure_indexes`` and ``Queryset.ensure_indexes``, the right method to use is ``Document.ensure_indexes`` - ``Queryset._ensure_indexes`` and ``Queryset.ensure_indexes``, the right method to use is ``Document.ensure_indexes``
- Added pre-commit #2212 - Added pre-commit #2212
- Renamed requirements-lint.txt to requirements-dev.txt #2212 - Renamed requirements-lint.txt to requirements-dev.txt #2212
- Support for setting ReadConcern #2255
Changes in 0.19.1 Changes in 0.19.1
================= =================

View File

@ -1,5 +1,6 @@
from contextlib import contextmanager from contextlib import contextmanager
from pymongo.read_concern import ReadConcern
from pymongo.write_concern import WriteConcern from pymongo.write_concern import WriteConcern
from mongoengine.common import _import_class from mongoengine.common import _import_class
@ -13,6 +14,7 @@ __all__ = (
"no_sub_classes", "no_sub_classes",
"query_counter", "query_counter",
"set_write_concern", "set_write_concern",
"set_read_write_concern",
) )
@ -256,3 +258,21 @@ def set_write_concern(collection, write_concerns):
combined_concerns = dict(collection.write_concern.document.items()) combined_concerns = dict(collection.write_concern.document.items())
combined_concerns.update(write_concerns) combined_concerns.update(write_concerns)
yield collection.with_options(write_concern=WriteConcern(**combined_concerns)) yield collection.with_options(write_concern=WriteConcern(**combined_concerns))
@contextmanager
def set_read_write_concern(collection, write_concerns, read_concerns):
combined_write_concerns = dict(collection.write_concern.document.items())
if write_concerns is not None:
combined_write_concerns.update(write_concerns)
combined_read_concerns = dict(collection.read_concern.document.items())
if read_concerns is not None:
combined_read_concerns.update(read_concerns)
yield collection.with_options(
write_concern=WriteConcern(**combined_write_concerns),
read_concern=ReadConcern(**combined_read_concerns),
)

View File

@ -3,18 +3,25 @@ import itertools
import re import re
import warnings import warnings
from collections.abc import Mapping
from bson import SON, json_util from bson import SON, json_util
from bson.code import Code from bson.code import Code
import pymongo import pymongo
import pymongo.errors import pymongo.errors
from pymongo.collection import ReturnDocument from pymongo.collection import ReturnDocument
from pymongo.common import validate_read_preference from pymongo.common import validate_read_preference
from pymongo.read_concern import ReadConcern
from mongoengine import signals from mongoengine import signals
from mongoengine.base import get_document from mongoengine.base import get_document
from mongoengine.common import _import_class from mongoengine.common import _import_class
from mongoengine.connection import get_db from mongoengine.connection import get_db
from mongoengine.context_managers import set_write_concern, switch_db from mongoengine.context_managers import (
set_read_write_concern,
set_write_concern,
switch_db,
)
from mongoengine.errors import ( from mongoengine.errors import (
BulkWriteError, BulkWriteError,
InvalidQueryError, InvalidQueryError,
@ -57,6 +64,7 @@ class BaseQuerySet:
self._snapshot = False self._snapshot = False
self._timeout = True self._timeout = True
self._read_preference = None self._read_preference = None
self._read_concern = None
self._iter = False self._iter = False
self._scalar = [] self._scalar = []
self._none = False self._none = False
@ -484,7 +492,13 @@ class BaseQuerySet:
return result.deleted_count return result.deleted_count
def update( def update(
self, upsert=False, multi=True, write_concern=None, full_result=False, **update self,
upsert=False,
multi=True,
write_concern=None,
read_concern=None,
full_result=False,
**update
): ):
"""Perform an atomic update on the fields matched by the query. """Perform an atomic update on the fields matched by the query.
@ -496,6 +510,7 @@ class BaseQuerySet:
``save(..., write_concern={w: 2, fsync: True}, ...)`` will ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and wait until at least two servers have recorded the write and
will force an fsync on the primary server. will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number :param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number
updated items updated items
:param update: Django-style update keyword arguments :param update: Django-style update keyword arguments
@ -522,7 +537,9 @@ class BaseQuerySet:
else: else:
update["$set"] = {"_cls": queryset._document._class_name} update["$set"] = {"_cls": queryset._document._class_name}
try: try:
with set_write_concern(queryset._collection, write_concern) as collection: with set_read_write_concern(
queryset._collection, write_concern, read_concern
) as collection:
update_func = collection.update_one update_func = collection.update_one
if multi: if multi:
update_func = collection.update_many update_func = collection.update_many
@ -539,7 +556,7 @@ class BaseQuerySet:
raise OperationError(message) raise OperationError(message)
raise OperationError("Update failed (%s)" % err) raise OperationError("Update failed (%s)" % err)
def upsert_one(self, write_concern=None, **update): def upsert_one(self, write_concern=None, read_concern=None, **update):
"""Overwrite or add the first document matched by the query. """Overwrite or add the first document matched by the query.
:param write_concern: Extra keyword arguments are passed down which :param write_concern: Extra keyword arguments are passed down which
@ -548,6 +565,7 @@ class BaseQuerySet:
``save(..., write_concern={w: 2, fsync: True}, ...)`` will ``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and wait until at least two servers have recorded the write and
will force an fsync on the primary server. will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param update: Django-style update keyword arguments :param update: Django-style update keyword arguments
:returns the new or overwritten document :returns the new or overwritten document
@ -559,6 +577,7 @@ class BaseQuerySet:
multi=False, multi=False,
upsert=True, upsert=True,
write_concern=write_concern, write_concern=write_concern,
read_concern=read_concern,
full_result=True, full_result=True,
**update **update
) )
@ -1177,6 +1196,22 @@ class BaseQuerySet:
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference
return queryset return queryset
def read_concern(self, read_concern):
"""Change the read_concern when querying.
:param read_concern: override ReplicaSetConnection-level
preference.
"""
if read_concern is not None and not isinstance(read_concern, Mapping):
raise TypeError("%r is not a valid read concern." % (read_concern,))
queryset = self.clone()
queryset._read_concern = (
ReadConcern(**read_concern) if read_concern is not None else None
)
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_concern
return queryset
def scalar(self, *fields): def scalar(self, *fields):
"""Instead of returning Document instances, return either a specific """Instead of returning Document instances, return either a specific
value or a tuple of values in order. value or a tuple of values in order.
@ -1623,9 +1658,9 @@ class BaseQuerySet:
# XXX In PyMongo 3+, we define the read preference on a collection # XXX In PyMongo 3+, we define the read preference on a collection
# level, not a cursor level. Thus, we need to get a cloned collection # level, not a cursor level. Thus, we need to get a cloned collection
# object using `with_options` first. # object using `with_options` first.
if self._read_preference is not None: if self._read_preference is not None or self._read_concern is not None:
self._cursor_obj = self._collection.with_options( self._cursor_obj = self._collection.with_options(
read_preference=self._read_preference read_preference=self._read_preference, read_concern=self._read_concern
).find(self._query, **self._cursor_args) ).find(self._query, **self._cursor_args)
else: else:
self._cursor_obj = self._collection.find(self._query, **self._cursor_args) self._cursor_obj = self._collection.find(self._query, **self._cursor_args)

View File

@ -7,6 +7,7 @@ from decimal import Decimal
from bson import DBRef, ObjectId from bson import DBRef, ObjectId
import pymongo import pymongo
from pymongo.read_concern import ReadConcern
from pymongo.read_preferences import ReadPreference from pymongo.read_preferences import ReadPreference
from pymongo.results import UpdateResult from pymongo.results import UpdateResult
import pytest import pytest
@ -4726,6 +4727,46 @@ class TestQueryset(unittest.TestCase):
) )
assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED) assert_read_pref(bars, ReadPreference.SECONDARY_PREFERRED)
def test_read_concern(self):
class Bar(Document):
txt = StringField()
meta = {"indexes": ["txt"]}
Bar.drop_collection()
bar = Bar.objects.create(txt="xyz")
bars = list(Bar.objects.read_concern(None))
assert bars == [bar]
bars = Bar.objects.read_concern({"level": "local"})
assert bars._read_concern.document == {"level": "local"}
assert bars._cursor.collection.read_concern.document == {"level": "local"}
# Make sure that `.read_concern(...)` does not accept string values.
with pytest.raises(TypeError):
Bar.objects.read_concern("local")
def assert_read_concern(qs, expected_read_concern):
assert qs._read_concern.document == expected_read_concern
assert qs._cursor.collection.read_concern.document == expected_read_concern
# Make sure read concern is respected after a `.skip(...)`.
bars = Bar.objects.skip(1).read_concern({"level": "local"})
assert_read_concern(bars, {"level": "local"})
# Make sure read concern is respected after a `.limit(...)`.
bars = Bar.objects.limit(1).read_concern({"level": "local"})
assert_read_concern(bars, {"level": "local"})
# Make sure read concern is respected after an `.order_by(...)`.
bars = Bar.objects.order_by("txt").read_concern({"level": "local"})
assert_read_concern(bars, {"level": "local"})
# Make sure read concern is respected after a `.hint(...)`.
bars = Bar.objects.hint([("txt", 1)]).read_concern({"level": "majority"})
assert_read_concern(bars, {"level": "majority"})
def test_json_simple(self): def test_json_simple(self):
class Embedded(EmbeddedDocument): class Embedded(EmbeddedDocument):
string = StringField() string = StringField()

View File

@ -8,6 +8,8 @@ from mongoengine.context_managers import (
no_dereference, no_dereference,
no_sub_classes, no_sub_classes,
query_counter, query_counter,
set_read_write_concern,
set_write_concern,
switch_collection, switch_collection,
switch_db, switch_db,
) )
@ -15,6 +17,52 @@ from mongoengine.pymongo_support import count_documents
class TestContextManagers: class TestContextManagers:
def test_set_write_concern(self):
connect("mongoenginetest")
class User(Document):
name = StringField()
collection = User._get_collection()
original_write_concern = collection.write_concern
with set_write_concern(
collection, {"w": "majority", "j": True, "wtimeout": 1234}
) as updated_collection:
assert updated_collection.write_concern.document == {
"w": "majority",
"j": True,
"wtimeout": 1234,
}
assert original_write_concern.document == collection.write_concern.document
def test_set_read_write_concern(self):
connect("mongoenginetest")
class User(Document):
name = StringField()
collection = User._get_collection()
original_read_concern = collection.read_concern
original_write_concern = collection.write_concern
with set_read_write_concern(
collection,
{"w": "majority", "j": True, "wtimeout": 1234},
{"level": "local"},
) as update_collection:
assert update_collection.read_concern.document == {"level": "local"}
assert update_collection.write_concern.document == {
"w": "majority",
"j": True,
"wtimeout": 1234,
}
assert original_read_concern.document == collection.read_concern.document
assert original_write_concern.document == collection.write_concern.document
def test_switch_db_context_manager(self): def test_switch_db_context_manager(self):
connect("mongoenginetest") connect("mongoenginetest")
register_connection("testdb-1", "mongoenginetest2") register_connection("testdb-1", "mongoenginetest2")