From 68447af1277b72ed9338abd0949d8716b9b135e3 Mon Sep 17 00:00:00 2001 From: lukasloetkolben Date: Mon, 22 Feb 2021 21:42:02 +0100 Subject: [PATCH 1/5] - feature: allow_disk_use --- mongoengine/queryset/base.py | 64 ++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index c4a7e771..2c2f5f74 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -34,7 +34,6 @@ from mongoengine.queryset import transform from mongoengine.queryset.field_list import QueryFieldList from mongoengine.queryset.visitor import Q, QNode - __all__ = ("BaseQuerySet", "DO_NOTHING", "NULLIFY", "CASCADE", "DENY", "PULL") # Delete rules @@ -63,6 +62,7 @@ class BaseQuerySet: self._loaded_fields = QueryFieldList() self._ordering = None self._snapshot = False + self._allow_disk_use = False self._timeout = True self._read_preference = None self._read_concern = None @@ -113,8 +113,8 @@ class BaseQuerySet: # Make sure proper query object is passed. if not isinstance(q_obj, QNode): msg = ( - "Not a query object: %s. " - "Did you intend to use key=value?" % q_obj + "Not a query object: %s. " + "Did you intend to use key=value?" % q_obj ) raise InvalidQueryError(msg) query &= q_obj @@ -294,7 +294,7 @@ class BaseQuerySet: return result def insert( - self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None + self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None ): """bulk insert documents @@ -397,10 +397,10 @@ class BaseQuerySet: # mimic the fact that setting .limit(0) in pymongo sets no limit # https://docs.mongodb.com/manual/reference/method/cursor.limit/#zero-value if ( - self._limit == 0 - and with_limit_and_skip is False - or self._none - or self._empty + self._limit == 0 + and with_limit_and_skip is False + or self._none + or self._empty ): return 0 @@ -450,13 +450,13 @@ class BaseQuerySet: # Handle deletes where skips or limits have been applied or # there is an untriggered delete signal has_delete_signal = signals.signals_available and ( - signals.pre_delete.has_receivers_for(doc) - or signals.post_delete.has_receivers_for(doc) + signals.pre_delete.has_receivers_for(doc) + or signals.post_delete.has_receivers_for(doc) ) call_document_delete = ( - queryset._skip or queryset._limit or has_delete_signal - ) and not _from_doc_delete + queryset._skip or queryset._limit or has_delete_signal + ) and not _from_doc_delete if call_document_delete: cnt = 0 @@ -519,13 +519,13 @@ class BaseQuerySet: return result.deleted_count def update( - self, - upsert=False, - multi=True, - write_concern=None, - read_concern=None, - full_result=False, - **update, + self, + upsert=False, + multi=True, + write_concern=None, + read_concern=None, + full_result=False, + **update, ): """Perform an atomic update on the fields matched by the query. @@ -563,7 +563,7 @@ class BaseQuerySet: update["$set"] = {"_cls": queryset._document._class_name} try: with set_read_write_concern( - queryset._collection, write_concern, read_concern + queryset._collection, write_concern, read_concern ) as collection: update_func = collection.update_one if multi: @@ -637,7 +637,7 @@ class BaseQuerySet: ) def modify( - self, upsert=False, full_response=False, remove=False, new=False, **update + self, upsert=False, full_response=False, remove=False, new=False, **update ): """Update and return the updated document. @@ -798,6 +798,7 @@ class BaseQuerySet: "_loaded_fields", "_ordering", "_snapshot", + "_allow_disk_use", "_timeout", "_read_preference", "_read_concern", @@ -967,7 +968,7 @@ class BaseQuerySet: for field_part in field.split(".")[1:]: # if looping on embedded document, get the document type instance if instance and isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): doc_field = instance # now get the subdocument @@ -976,12 +977,12 @@ class BaseQuerySet: if isinstance(doc_field, ListField): doc_field = getattr(doc_field, "field", doc_field) if isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): instance = getattr(doc_field, "document_type", None) if instance and isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): distinct = [instance(**doc) for doc in distinct] @@ -1165,6 +1166,16 @@ class BaseQuerySet: queryset._snapshot = enabled return queryset + def allow_disk_use(self, enabled): + """Enable or disable the use of temporary files on disk while processing a blocking sort operation. + (To store data exceeding the 100 megabyte system memory limit) + + :param enabled: wether or not temporary files on disk are used + """ + queryset = self.clone() + queryset._allow_disk_use = enabled + return queryset + def timeout(self, enabled): """Enable or disable the default mongod timeout when querying. (no_cursor_timeout option) @@ -1306,7 +1317,7 @@ class BaseQuerySet: # JS functionality def map_reduce( - self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None + self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None ): """Perform a map/reduce query using the current query spec and ordering. While ``map_reduce`` respects ``QuerySet`` chaining, @@ -1604,6 +1615,9 @@ class BaseQuerySet: if not self._timeout: cursor_args["no_cursor_timeout"] = True + if self._allow_disk_use: + cursor_args["allow_disk_use"] = True + if self._loaded_fields: cursor_args[fields_name] = self._loaded_fields.as_dict() From 80a3b1c88cd8747b7fe4cd7203a3f2d654b7df0e Mon Sep 17 00:00:00 2001 From: lukasloetkolben Date: Mon, 22 Feb 2021 22:06:10 +0100 Subject: [PATCH 2/5] - feature: allow_disk_use --- mongoengine/queryset/base.py | 53 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index 2c2f5f74..c5193ed9 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -34,6 +34,7 @@ from mongoengine.queryset import transform from mongoengine.queryset.field_list import QueryFieldList from mongoengine.queryset.visitor import Q, QNode + __all__ = ("BaseQuerySet", "DO_NOTHING", "NULLIFY", "CASCADE", "DENY", "PULL") # Delete rules @@ -62,8 +63,8 @@ class BaseQuerySet: self._loaded_fields = QueryFieldList() self._ordering = None self._snapshot = False - self._allow_disk_use = False self._timeout = True + self._allow_disk_use = False self._read_preference = None self._read_concern = None self._iter = False @@ -113,8 +114,8 @@ class BaseQuerySet: # Make sure proper query object is passed. if not isinstance(q_obj, QNode): msg = ( - "Not a query object: %s. " - "Did you intend to use key=value?" % q_obj + "Not a query object: %s. " + "Did you intend to use key=value?" % q_obj ) raise InvalidQueryError(msg) query &= q_obj @@ -294,7 +295,7 @@ class BaseQuerySet: return result def insert( - self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None + self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None ): """bulk insert documents @@ -397,10 +398,10 @@ class BaseQuerySet: # mimic the fact that setting .limit(0) in pymongo sets no limit # https://docs.mongodb.com/manual/reference/method/cursor.limit/#zero-value if ( - self._limit == 0 - and with_limit_and_skip is False - or self._none - or self._empty + self._limit == 0 + and with_limit_and_skip is False + or self._none + or self._empty ): return 0 @@ -450,13 +451,13 @@ class BaseQuerySet: # Handle deletes where skips or limits have been applied or # there is an untriggered delete signal has_delete_signal = signals.signals_available and ( - signals.pre_delete.has_receivers_for(doc) - or signals.post_delete.has_receivers_for(doc) + signals.pre_delete.has_receivers_for(doc) + or signals.post_delete.has_receivers_for(doc) ) call_document_delete = ( - queryset._skip or queryset._limit or has_delete_signal - ) and not _from_doc_delete + queryset._skip or queryset._limit or has_delete_signal + ) and not _from_doc_delete if call_document_delete: cnt = 0 @@ -519,13 +520,13 @@ class BaseQuerySet: return result.deleted_count def update( - self, - upsert=False, - multi=True, - write_concern=None, - read_concern=None, - full_result=False, - **update, + self, + upsert=False, + multi=True, + write_concern=None, + read_concern=None, + full_result=False, + **update, ): """Perform an atomic update on the fields matched by the query. @@ -563,7 +564,7 @@ class BaseQuerySet: update["$set"] = {"_cls": queryset._document._class_name} try: with set_read_write_concern( - queryset._collection, write_concern, read_concern + queryset._collection, write_concern, read_concern ) as collection: update_func = collection.update_one if multi: @@ -637,7 +638,7 @@ class BaseQuerySet: ) def modify( - self, upsert=False, full_response=False, remove=False, new=False, **update + self, upsert=False, full_response=False, remove=False, new=False, **update ): """Update and return the updated document. @@ -798,8 +799,8 @@ class BaseQuerySet: "_loaded_fields", "_ordering", "_snapshot", - "_allow_disk_use", "_timeout", + "_allow_disk_use", "_read_preference", "_read_concern", "_iter", @@ -968,7 +969,7 @@ class BaseQuerySet: for field_part in field.split(".")[1:]: # if looping on embedded document, get the document type instance if instance and isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): doc_field = instance # now get the subdocument @@ -977,12 +978,12 @@ class BaseQuerySet: if isinstance(doc_field, ListField): doc_field = getattr(doc_field, "field", doc_field) if isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): instance = getattr(doc_field, "document_type", None) if instance and isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): distinct = [instance(**doc) for doc in distinct] @@ -1317,7 +1318,7 @@ class BaseQuerySet: # JS functionality def map_reduce( - self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None + self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None ): """Perform a map/reduce query using the current query spec and ordering. While ``map_reduce`` respects ``QuerySet`` chaining, From 0d5e028c55f3f78c896f1881be3a942717cf6cc2 Mon Sep 17 00:00:00 2001 From: lukasloetkolben Date: Fri, 26 Feb 2021 18:11:32 +0100 Subject: [PATCH 3/5] - tests for feature allow_disk_use --- tests/queryset/test_queryset.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/queryset/test_queryset.py b/tests/queryset/test_queryset.py index 079447bc..6310b9cd 100644 --- a/tests/queryset/test_queryset.py +++ b/tests/queryset/test_queryset.py @@ -5656,6 +5656,31 @@ class TestQueryset(unittest.TestCase): qs = self.Person.objects().timeout(False) assert qs._cursor_args == {"no_cursor_timeout": True} + def test_allow_disk_use(self): + qs = self.Person.objects() + assert qs._cursor_args == {} + + qs = self.Person.objects().allow_disk_use(False) + assert qs._cursor_args == {} + + qs = self.Person.objects().allow_disk_use(True) + assert qs._cursor_args == {"allow_disk_use": True} + + # Test if allow_disk_use changes the results + self.Person.drop_collection() + self.Person.objects.create(name="Foo", age=12) + self.Person.objects.create(name="Baz", age=17) + self.Person.objects.create(name="Bar", age=13) + + qs_disk = self.Person.objects().order_by("age").allow_disk_use(True) + qs = self.Person.objects().order_by("age") + + assert qs_disk.count() == qs.count() + + for index in range(qs_disk.count()): + assert qs_disk[index] == qs[index] + + if __name__ == "__main__": unittest.main() From 467e9c3ddf80b15ea93e61b2dc96e1d0e67074e3 Mon Sep 17 00:00:00 2001 From: lukasloetkolben Date: Fri, 26 Feb 2021 18:12:58 +0100 Subject: [PATCH 4/5] typo wether --> whether --- mongoengine/queryset/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index c5193ed9..ae8cd407 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -1171,7 +1171,7 @@ class BaseQuerySet: """Enable or disable the use of temporary files on disk while processing a blocking sort operation. (To store data exceeding the 100 megabyte system memory limit) - :param enabled: wether or not temporary files on disk are used + :param enabled: whether or not temporary files on disk are used """ queryset = self.clone() queryset._allow_disk_use = enabled From 9ff5d8426c3d156001048f23e67709174a0329af Mon Sep 17 00:00:00 2001 From: Bastien Gerard Date: Sat, 27 Feb 2021 21:30:07 +0100 Subject: [PATCH 5/5] restrict test on allowDiskUse to mongoDB >= 4.4 in CI + ran black --- tests/queryset/test_queryset.py | 3 ++- tests/utils.py | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/queryset/test_queryset.py b/tests/queryset/test_queryset.py index 6310b9cd..4d281c60 100644 --- a/tests/queryset/test_queryset.py +++ b/tests/queryset/test_queryset.py @@ -21,6 +21,7 @@ from mongoengine.queryset import ( QuerySetManager, queryset_manager, ) +from tests.utils import requires_mongodb_gte_44 class db_ops_tracker(query_counter): @@ -5656,6 +5657,7 @@ class TestQueryset(unittest.TestCase): qs = self.Person.objects().timeout(False) assert qs._cursor_args == {"no_cursor_timeout": True} + @requires_mongodb_gte_44 def test_allow_disk_use(self): qs = self.Person.objects() assert qs._cursor_args == {} @@ -5681,6 +5683,5 @@ class TestQueryset(unittest.TestCase): assert qs_disk[index] == qs[index] - if __name__ == "__main__": unittest.main() diff --git a/tests/utils.py b/tests/utils.py index 0899c208..adb0bdb4 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,3 +1,4 @@ +import operator import unittest import pytest @@ -33,6 +34,10 @@ def get_as_pymongo(doc): return doc.__class__.objects.as_pymongo().get(id=doc.id) +def requires_mongodb_gte_44(func): + return _decorated_with_ver_requirement(func, (4, 4), oper=operator.ge) + + def _decorated_with_ver_requirement(func, mongo_version_req, oper): """Return a MongoDB version requirement decorator.