Allow setting read_concern
This commit is contained in:
		| @@ -257,3 +257,11 @@ def set_write_concern(collection, write_concerns): | ||||
|     combined_concerns = dict(collection.write_concern.document.items()) | ||||
|     combined_concerns.update(write_concerns) | ||||
|     yield collection.with_options(write_concern=WriteConcern(**combined_concerns)) | ||||
|  | ||||
|  | ||||
| @contextmanager | ||||
| def set_read_write_concern(collection, write_concerns, read_concern): | ||||
|     combined_write_concerns = dict(collection.write_concern.document.items()) | ||||
|     combined_write_concerns.update(write_concerns) | ||||
|  | ||||
|     yield collection.with_options(write_concern=WriteConcern(**combined_write_concerns), read_concern=read_concern) | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import pymongo | ||||
| import pymongo.errors | ||||
| from pymongo.collection import ReturnDocument | ||||
| from pymongo.common import validate_read_preference | ||||
| from pymongo.read_concern import ReadConcern | ||||
| import six | ||||
| from six import iteritems | ||||
|  | ||||
| @@ -18,7 +19,7 @@ from mongoengine import signals | ||||
| from mongoengine.base import get_document | ||||
| from mongoengine.common import _import_class | ||||
| from mongoengine.connection import get_db | ||||
| from mongoengine.context_managers import set_write_concern, switch_db | ||||
| from mongoengine.context_managers import set_write_concern, set_read_write_concern, switch_db | ||||
| from mongoengine.errors import ( | ||||
|     BulkWriteError, | ||||
|     InvalidQueryError, | ||||
| @@ -62,6 +63,7 @@ class BaseQuerySet(object): | ||||
|         self._timeout = True | ||||
|         self._slave_okay = False | ||||
|         self._read_preference = None | ||||
|         self._read_concern = None | ||||
|         self._iter = False | ||||
|         self._scalar = [] | ||||
|         self._none = False | ||||
| @@ -490,7 +492,7 @@ class BaseQuerySet(object): | ||||
|                 return result.deleted_count | ||||
|  | ||||
|     def update( | ||||
|         self, upsert=False, multi=True, write_concern=None, full_result=False, **update | ||||
|         self, upsert=False, multi=True, write_concern=None, read_concern=None, full_result=False, **update | ||||
|     ): | ||||
|         """Perform an atomic update on the fields matched by the query. | ||||
|  | ||||
| @@ -502,6 +504,7 @@ class BaseQuerySet(object): | ||||
|             ``save(..., write_concern={w: 2, fsync: True}, ...)`` will | ||||
|             wait until at least two servers have recorded the write and | ||||
|             will force an fsync on the primary server. | ||||
|         :param read_concern: Override the read concern for the operation | ||||
|         :param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number | ||||
|             updated items | ||||
|         :param update: Django-style update keyword arguments | ||||
| @@ -528,7 +531,7 @@ class BaseQuerySet(object): | ||||
|             else: | ||||
|                 update["$set"] = {"_cls": queryset._document._class_name} | ||||
|         try: | ||||
|             with set_write_concern(queryset._collection, write_concern) as collection: | ||||
|             with set_read_write_concern(queryset._collection, write_concern, read_concern) as collection: | ||||
|                 update_func = collection.update_one | ||||
|                 if multi: | ||||
|                     update_func = collection.update_many | ||||
| @@ -545,7 +548,7 @@ class BaseQuerySet(object): | ||||
|                 raise OperationError(message) | ||||
|             raise OperationError(u"Update failed (%s)" % six.text_type(err)) | ||||
|  | ||||
|     def upsert_one(self, write_concern=None, **update): | ||||
|     def upsert_one(self, write_concern=None, read_concern=None, **update): | ||||
|         """Overwrite or add the first document matched by the query. | ||||
|  | ||||
|         :param write_concern: Extra keyword arguments are passed down which | ||||
| @@ -554,6 +557,7 @@ class BaseQuerySet(object): | ||||
|             ``save(..., write_concern={w: 2, fsync: True}, ...)`` will | ||||
|             wait until at least two servers have recorded the write and | ||||
|             will force an fsync on the primary server. | ||||
|         :param read_concern: Override the read concern for the operation | ||||
|         :param update: Django-style update keyword arguments | ||||
|  | ||||
|         :returns the new or overwritten document | ||||
| @@ -565,6 +569,7 @@ class BaseQuerySet(object): | ||||
|             multi=False, | ||||
|             upsert=True, | ||||
|             write_concern=write_concern, | ||||
|             read_concern=read_concern, | ||||
|             full_result=True, | ||||
|             **update | ||||
|         ) | ||||
| @@ -1196,6 +1201,20 @@ class BaseQuerySet(object): | ||||
|         queryset._cursor_obj = None  # we need to re-create the cursor object whenever we apply read_preference | ||||
|         return queryset | ||||
|  | ||||
|     def read_concern(self, read_concern): | ||||
|         """Change the read_concern when querying. | ||||
|  | ||||
|         :param read_concern: override ReplicaSetConnection-level | ||||
|             preference. | ||||
|         """ | ||||
|         if read_concern is not None and not isinstance(read_concern, ReadConcern): | ||||
|             raise TypeError("%r is not a read concern." % (read_concern,)) | ||||
|  | ||||
|         queryset = self.clone() | ||||
|         queryset._read_concern = read_concern | ||||
|         queryset._cursor_obj = None  # we need to re-create the cursor object whenever we apply read_concern | ||||
|         return queryset | ||||
|      | ||||
|     def scalar(self, *fields): | ||||
|         """Instead of returning Document instances, return either a specific | ||||
|         value or a tuple of values in order. | ||||
| @@ -1642,9 +1661,9 @@ class BaseQuerySet(object): | ||||
|         # XXX In PyMongo 3+, we define the read preference on a collection | ||||
|         # level, not a cursor level. Thus, we need to get a cloned collection | ||||
|         # object using `with_options` first. | ||||
|         if self._read_preference is not None: | ||||
|         if self._read_preference is not None or self._read_concern is not None: | ||||
|             self._cursor_obj = self._collection.with_options( | ||||
|                 read_preference=self._read_preference | ||||
|                 read_preference=self._read_preference, read_concern=self._read_concern | ||||
|             ).find(self._query, **self._cursor_args) | ||||
|         else: | ||||
|             self._cursor_obj = self._collection.find(self._query, **self._cursor_args) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user