Merge branch 'master' of github.com:MongoEngine/mongoengine into fix_count_documents_deprecation

This commit is contained in:
Bastien Gérard
2020-08-11 23:01:33 +02:00
68 changed files with 1073 additions and 895 deletions

View File

@@ -1,24 +1,27 @@
from __future__ import absolute_import
import copy
import itertools
import re
import warnings
from collections.abc import Mapping
from bson import SON, json_util
from bson.code import Code
import pymongo
import pymongo.errors
from pymongo.collection import ReturnDocument
from pymongo.common import validate_read_preference
import six
from six import iteritems
from pymongo.read_concern import ReadConcern
from mongoengine import signals
from mongoengine.base import get_document
from mongoengine.common import _import_class
from mongoengine.connection import get_db
from mongoengine.context_managers import set_write_concern, switch_db
from mongoengine.context_managers import (
set_read_write_concern,
set_write_concern,
switch_db,
)
from mongoengine.errors import (
BulkWriteError,
InvalidQueryError,
@@ -42,7 +45,7 @@ DENY = 3
PULL = 4
class BaseQuerySet(object):
class BaseQuerySet:
"""A set of results returned from a query. Wraps a MongoDB cursor,
providing :class:`~mongoengine.Document` objects as the results.
"""
@@ -61,8 +64,8 @@ class BaseQuerySet(object):
self._ordering = None
self._snapshot = False
self._timeout = True
self._slave_okay = False
self._read_preference = None
self._read_concern = None
self._iter = False
self._scalar = []
self._none = False
@@ -81,6 +84,7 @@ class BaseQuerySet(object):
self._cursor_obj = None
self._limit = None
self._skip = None
self._hint = -1 # Using -1 as None is a valid value for hint
self._collation = None
self._batch_size = None
@@ -88,6 +92,13 @@ class BaseQuerySet(object):
self._max_time_ms = None
self._comment = None
# Hack - As people expect cursor[5:5] to return
# an empty result set. It's hard to do that right, though, because the
# server uses limit(0) to mean 'no limit'. So we set _empty
# in that case and check for it when iterating. We also unset
# it anytime we change _limit. Inspired by how it is done in pymongo.Cursor
self._empty = False
def __call__(self, q_obj=None, **query):
"""Filter the selected documents by calling the
:class:`~mongoengine.queryset.QuerySet` with a query.
@@ -160,6 +171,7 @@ class BaseQuerySet(object):
[<User: User object>, <User: User object>]
"""
queryset = self.clone()
queryset._empty = False
# Handle a slice
if isinstance(key, slice):
@@ -167,6 +179,8 @@ class BaseQuerySet(object):
queryset._skip, queryset._limit = key.start, key.stop
if key.start and key.stop:
queryset._limit = key.stop - key.start
if queryset._limit == 0:
queryset._empty = True
# Allow further QuerySet modifications to be performed
return queryset
@@ -205,8 +219,6 @@ class BaseQuerySet(object):
"""Avoid to open all records in an if stmt in Py3."""
return self._has_data()
__nonzero__ = __bool__ # For Py2 support
# Core functions
def all(self):
@@ -257,20 +269,21 @@ class BaseQuerySet(object):
queryset = queryset.filter(*q_objs, **query)
try:
result = six.next(queryset)
result = next(queryset)
except StopIteration:
msg = "%s matching query does not exist." % queryset._document._class_name
raise queryset._document.DoesNotExist(msg)
try:
six.next(queryset)
# Check if there is another match
next(queryset)
except StopIteration:
return result
# If we were able to retrieve the 2nd doc, rewind the cursor and
# raise the MultipleObjectsReturned exception.
queryset.rewind()
message = u"%d items returned, instead of 1" % queryset.count()
raise queryset._document.MultipleObjectsReturned(message)
# If we were able to retrieve the 2nd doc, raise the MultipleObjectsReturned exception.
raise queryset._document.MultipleObjectsReturned(
"2 or more items returned, instead of 1"
)
def create(self, **kwargs):
"""Create new object. Returns the saved object instance.
@@ -354,20 +367,20 @@ class BaseQuerySet(object):
)
except pymongo.errors.DuplicateKeyError as err:
message = "Could not save document (%s)"
raise NotUniqueError(message % six.text_type(err))
raise NotUniqueError(message % err)
except pymongo.errors.BulkWriteError as err:
# inserting documents that already have an _id field will
# give huge performance debt or raise
message = u"Bulk write error: (%s)"
raise BulkWriteError(message % six.text_type(err.details))
message = "Bulk write error: (%s)"
raise BulkWriteError(message % err.details)
except pymongo.errors.OperationFailure as err:
message = "Could not save document (%s)"
if re.match("^E1100[01] duplicate key", six.text_type(err)):
if re.match("^E1100[01] duplicate key", str(err)):
# E11000 - duplicate key error index
# E11001 - duplicate key on update
message = u"Tried to save duplicate unique keys (%s)"
raise NotUniqueError(message % six.text_type(err))
raise OperationError(message % six.text_type(err))
message = "Tried to save duplicate unique keys (%s)"
raise NotUniqueError(message % err)
raise OperationError(message % err)
# Apply inserted_ids to documents
for doc, doc_id in zip(docs, ids):
@@ -395,7 +408,12 @@ class BaseQuerySet(object):
"""
# mimic the fact that setting .limit(0) in pymongo sets no limit
# https://docs.mongodb.com/manual/reference/method/cursor.limit/#zero-value
if self._limit == 0 and with_limit_and_skip is False or self._none:
if (
self._limit == 0
and with_limit_and_skip is False
or self._none
or self._empty
):
return 0
kwargs = (
@@ -513,7 +531,13 @@ class BaseQuerySet(object):
return result.deleted_count
def update(
self, upsert=False, multi=True, write_concern=None, full_result=False, **update
self,
upsert=False,
multi=True,
write_concern=None,
read_concern=None,
full_result=False,
**update
):
"""Perform an atomic update on the fields matched by the query.
@@ -525,6 +549,7 @@ class BaseQuerySet(object):
``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and
will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param full_result: Return the associated ``pymongo.UpdateResult`` rather than just the number
updated items
:param update: Django-style update keyword arguments
@@ -551,7 +576,9 @@ class BaseQuerySet(object):
else:
update["$set"] = {"_cls": queryset._document._class_name}
try:
with set_write_concern(queryset._collection, write_concern) as collection:
with set_read_write_concern(
queryset._collection, write_concern, read_concern
) as collection:
update_func = collection.update_one
if multi:
update_func = collection.update_many
@@ -561,14 +588,14 @@ class BaseQuerySet(object):
elif result.raw_result:
return result.raw_result["n"]
except pymongo.errors.DuplicateKeyError as err:
raise NotUniqueError(u"Update failed (%s)" % six.text_type(err))
raise NotUniqueError("Update failed (%s)" % err)
except pymongo.errors.OperationFailure as err:
if six.text_type(err) == u"multi not coded yet":
message = u"update() method requires MongoDB 1.1.3+"
if str(err) == "multi not coded yet":
message = "update() method requires MongoDB 1.1.3+"
raise OperationError(message)
raise OperationError(u"Update failed (%s)" % six.text_type(err))
raise OperationError("Update failed (%s)" % err)
def upsert_one(self, write_concern=None, **update):
def upsert_one(self, write_concern=None, read_concern=None, **update):
"""Overwrite or add the first document matched by the query.
:param write_concern: Extra keyword arguments are passed down which
@@ -577,6 +604,7 @@ class BaseQuerySet(object):
``save(..., write_concern={w: 2, fsync: True}, ...)`` will
wait until at least two servers have recorded the write and
will force an fsync on the primary server.
:param read_concern: Override the read concern for the operation
:param update: Django-style update keyword arguments
:returns the new or overwritten document
@@ -588,6 +616,7 @@ class BaseQuerySet(object):
multi=False,
upsert=True,
write_concern=write_concern,
read_concern=read_concern,
full_result=True,
**update
)
@@ -684,9 +713,9 @@ class BaseQuerySet(object):
**self._cursor_args
)
except pymongo.errors.DuplicateKeyError as err:
raise NotUniqueError(u"Update failed (%s)" % err)
raise NotUniqueError("Update failed (%s)" % err)
except pymongo.errors.OperationFailure as err:
raise OperationError(u"Update failed (%s)" % err)
raise OperationError("Update failed (%s)" % err)
if full_response:
if result["value"] is not None:
@@ -715,10 +744,10 @@ class BaseQuerySet(object):
return queryset.filter(pk=object_id).first()
def in_bulk(self, object_ids):
"""Retrieve a set of documents by their ids.
""""Retrieve a set of documents by their ids.
:param object_ids: a list or tuple of ``ObjectId``\ s
:rtype: dict of ObjectIds as keys and collection-specific
:param object_ids: a list or tuple of ObjectId's
:rtype: dict of ObjectId's as keys and collection-specific
Document subclasses as values.
.. versionadded:: 0.3
@@ -745,7 +774,9 @@ class BaseQuerySet(object):
return doc_map
def none(self):
"""Helper that just returns a list"""
"""Returns a queryset that never returns any objects and no query will be executed when accessing the results
inspired by django none() https://docs.djangoproject.com/en/dev/ref/models/querysets/#none
"""
queryset = self.clone()
queryset._none = True
return queryset
@@ -798,13 +829,14 @@ class BaseQuerySet(object):
"_ordering",
"_snapshot",
"_timeout",
"_slave_okay",
"_read_preference",
"_read_concern",
"_iter",
"_scalar",
"_as_pymongo",
"_limit",
"_skip",
"_empty",
"_hint",
"_collation",
"_auto_dereference",
@@ -845,6 +877,7 @@ class BaseQuerySet(object):
"""
queryset = self.clone()
queryset._limit = n
queryset._empty = False # cancels the effect of empty
# If a cursor object has already been created, apply the limit to it.
if queryset._cursor_obj:
@@ -1012,7 +1045,7 @@ class BaseQuerySet(object):
.. versionchanged:: 0.5 - Added subfield support
"""
fields = {f: QueryFieldList.ONLY for f in fields}
self.only_fields = fields.keys()
self.only_fields = list(fields.keys())
return self.fields(True, **fields)
def exclude(self, *fields):
@@ -1049,9 +1082,11 @@ class BaseQuerySet(object):
posts = BlogPost.objects(...).fields(comments=0)
To retrieve a subrange of array elements:
To retrieve a subrange or sublist of array elements,
support exist for both the `slice` and `elemMatch` projection operator:
posts = BlogPost.objects(...).fields(slice__comments=5)
posts = BlogPost.objects(...).fields(elemMatch__comments="test")
:param kwargs: A set of keyword arguments identifying what to
include, exclude, or slice.
@@ -1060,7 +1095,7 @@ class BaseQuerySet(object):
"""
# Check for an operator and transform to mongo-style if there is
operators = ["slice"]
operators = ["slice", "elemMatch"]
cleaned_fields = []
for key, value in kwargs.items():
parts = key.split("__")
@@ -1163,7 +1198,7 @@ class BaseQuerySet(object):
def explain(self):
"""Return an explain plan record for the
:class:`~mongoengine.queryset.QuerySet`\ 's cursor.
:class:`~mongoengine.queryset.QuerySet` cursor.
"""
return self._cursor.explain()
@@ -1193,20 +1228,6 @@ class BaseQuerySet(object):
queryset._timeout = enabled
return queryset
# DEPRECATED. Has no more impact on PyMongo 3+
def slave_okay(self, enabled):
"""Enable or disable the slave_okay when querying.
:param enabled: whether or not the slave_okay is enabled
.. deprecated:: Ignored with PyMongo 3+
"""
msg = "slave_okay is deprecated as it has no impact when using PyMongo 3+."
warnings.warn(msg, DeprecationWarning)
queryset = self.clone()
queryset._slave_okay = enabled
return queryset
def read_preference(self, read_preference):
"""Change the read_preference when querying.
@@ -1219,6 +1240,22 @@ class BaseQuerySet(object):
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_preference
return queryset
def read_concern(self, read_concern):
"""Change the read_concern when querying.
:param read_concern: override ReplicaSetConnection-level
preference.
"""
if read_concern is not None and not isinstance(read_concern, Mapping):
raise TypeError("%r is not a valid read concern." % (read_concern,))
queryset = self.clone()
queryset._read_concern = (
ReadConcern(**read_concern) if read_concern is not None else None
)
queryset._cursor_obj = None # we need to re-create the cursor object whenever we apply read_concern
return queryset
def scalar(self, *fields):
"""Instead of returning Document instances, return either a specific
value or a tuple of values in order.
@@ -1318,10 +1355,11 @@ class BaseQuerySet(object):
final_pipeline = initial_pipeline + user_pipeline
collection = self._collection
if self._read_preference is not None:
if self._read_preference is not None or self._read_concern is not None:
collection = self._collection.with_options(
read_preference=self._read_preference
read_preference=self._read_preference, read_concern=self._read_concern
)
return collection.aggregate(final_pipeline, cursor={}, **kwargs)
# JS functionality
@@ -1375,13 +1413,13 @@ class BaseQuerySet(object):
map_f_scope = {}
if isinstance(map_f, Code):
map_f_scope = map_f.scope
map_f = six.text_type(map_f)
map_f = str(map_f)
map_f = Code(queryset._sub_js_fields(map_f), map_f_scope)
reduce_f_scope = {}
if isinstance(reduce_f, Code):
reduce_f_scope = reduce_f.scope
reduce_f = six.text_type(reduce_f)
reduce_f = str(reduce_f)
reduce_f_code = queryset._sub_js_fields(reduce_f)
reduce_f = Code(reduce_f_code, reduce_f_scope)
@@ -1391,7 +1429,7 @@ class BaseQuerySet(object):
finalize_f_scope = {}
if isinstance(finalize_f, Code):
finalize_f_scope = finalize_f.scope
finalize_f = six.text_type(finalize_f)
finalize_f = str(finalize_f)
finalize_f_code = queryset._sub_js_fields(finalize_f)
finalize_f = Code(finalize_f_code, finalize_f_scope)
mr_args["finalize"] = finalize_f
@@ -1407,7 +1445,7 @@ class BaseQuerySet(object):
else:
map_reduce_function = "map_reduce"
if isinstance(output, six.string_types):
if isinstance(output, str):
mr_args["out"] = output
elif isinstance(output, dict):
@@ -1591,10 +1629,10 @@ class BaseQuerySet(object):
def __next__(self):
"""Wrap the result in a :class:`~mongoengine.Document` object.
"""
if self._limit == 0 or self._none:
if self._none or self._empty:
raise StopIteration
raw_doc = six.next(self._cursor)
raw_doc = next(self._cursor)
if self._as_pymongo:
return raw_doc
@@ -1610,8 +1648,6 @@ class BaseQuerySet(object):
return doc
next = __next__ # For Python2 support
def rewind(self):
"""Rewind the cursor to its unevaluated state.
@@ -1665,9 +1701,9 @@ class BaseQuerySet(object):
# XXX In PyMongo 3+, we define the read preference on a collection
# level, not a cursor level. Thus, we need to get a cloned collection
# object using `with_options` first.
if self._read_preference is not None:
if self._read_preference is not None or self._read_concern is not None:
self._cursor_obj = self._collection.with_options(
read_preference=self._read_preference
read_preference=self._read_preference, read_concern=self._read_concern
).find(self._query, **self._cursor_args)
else:
self._cursor_obj = self._collection.find(self._query, **self._cursor_args)
@@ -1839,13 +1875,13 @@ class BaseQuerySet(object):
}
"""
total, data, types = self.exec_js(freq_func, field)
values = {types.get(k): int(v) for k, v in iteritems(data)}
values = {types.get(k): int(v) for k, v in data.items()}
if normalize:
values = {k: float(v) / total for k, v in values.items()}
frequencies = {}
for k, v in iteritems(values):
for k, v in values.items():
if isinstance(k, float):
if int(k) == k:
k = int(k)
@@ -1865,7 +1901,7 @@ class BaseQuerySet(object):
field_parts = field.split(".")
try:
field = ".".join(
f if isinstance(f, six.string_types) else f.db_field
f if isinstance(f, str) else f.db_field
for f in self._document._lookup_field(field_parts)
)
db_field_paths.append(field)
@@ -1877,7 +1913,7 @@ class BaseQuerySet(object):
for subdoc in subclasses:
try:
subfield = ".".join(
f if isinstance(f, six.string_types) else f.db_field
f if isinstance(f, str) else f.db_field
for f in subdoc._lookup_field(field_parts)
)
db_field_paths.append(subfield)
@@ -1951,7 +1987,7 @@ class BaseQuerySet(object):
field_name = match.group(1).split(".")
fields = self._document._lookup_field(field_name)
# Substitute the correct name for the field into the javascript
return u'["%s"]' % fields[-1].db_field
return '["%s"]' % fields[-1].db_field
def field_path_sub(match):
# Extract just the field name, and look up the field objects
@@ -1981,23 +2017,3 @@ class BaseQuerySet(object):
setattr(queryset, "_" + method_name, val)
return queryset
# Deprecated
def ensure_index(self, **kwargs):
"""Deprecated use :func:`Document.ensure_index`"""
msg = (
"Doc.objects()._ensure_index() is deprecated. "
"Use Doc.ensure_index() instead."
)
warnings.warn(msg, DeprecationWarning)
self._document.__class__.ensure_index(**kwargs)
return self
def _ensure_indexes(self):
"""Deprecated use :func:`~Document.ensure_indexes`"""
msg = (
"Doc.objects()._ensure_indexes() is deprecated. "
"Use Doc.ensure_indexes() instead."
)
warnings.warn(msg, DeprecationWarning)
self._document.__class__.ensure_indexes()