From 68447af1277b72ed9338abd0949d8716b9b135e3 Mon Sep 17 00:00:00 2001 From: lukasloetkolben Date: Mon, 22 Feb 2021 21:42:02 +0100 Subject: [PATCH] - feature: allow_disk_use --- mongoengine/queryset/base.py | 64 ++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index c4a7e771..2c2f5f74 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -34,7 +34,6 @@ from mongoengine.queryset import transform from mongoengine.queryset.field_list import QueryFieldList from mongoengine.queryset.visitor import Q, QNode - __all__ = ("BaseQuerySet", "DO_NOTHING", "NULLIFY", "CASCADE", "DENY", "PULL") # Delete rules @@ -63,6 +62,7 @@ class BaseQuerySet: self._loaded_fields = QueryFieldList() self._ordering = None self._snapshot = False + self._allow_disk_use = False self._timeout = True self._read_preference = None self._read_concern = None @@ -113,8 +113,8 @@ class BaseQuerySet: # Make sure proper query object is passed. if not isinstance(q_obj, QNode): msg = ( - "Not a query object: %s. " - "Did you intend to use key=value?" % q_obj + "Not a query object: %s. " + "Did you intend to use key=value?" % q_obj ) raise InvalidQueryError(msg) query &= q_obj @@ -294,7 +294,7 @@ class BaseQuerySet: return result def insert( - self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None + self, doc_or_docs, load_bulk=True, write_concern=None, signal_kwargs=None ): """bulk insert documents @@ -397,10 +397,10 @@ class BaseQuerySet: # mimic the fact that setting .limit(0) in pymongo sets no limit # https://docs.mongodb.com/manual/reference/method/cursor.limit/#zero-value if ( - self._limit == 0 - and with_limit_and_skip is False - or self._none - or self._empty + self._limit == 0 + and with_limit_and_skip is False + or self._none + or self._empty ): return 0 @@ -450,13 +450,13 @@ class BaseQuerySet: # Handle deletes where skips or limits have been applied or # there is an untriggered delete signal has_delete_signal = signals.signals_available and ( - signals.pre_delete.has_receivers_for(doc) - or signals.post_delete.has_receivers_for(doc) + signals.pre_delete.has_receivers_for(doc) + or signals.post_delete.has_receivers_for(doc) ) call_document_delete = ( - queryset._skip or queryset._limit or has_delete_signal - ) and not _from_doc_delete + queryset._skip or queryset._limit or has_delete_signal + ) and not _from_doc_delete if call_document_delete: cnt = 0 @@ -519,13 +519,13 @@ class BaseQuerySet: return result.deleted_count def update( - self, - upsert=False, - multi=True, - write_concern=None, - read_concern=None, - full_result=False, - **update, + self, + upsert=False, + multi=True, + write_concern=None, + read_concern=None, + full_result=False, + **update, ): """Perform an atomic update on the fields matched by the query. @@ -563,7 +563,7 @@ class BaseQuerySet: update["$set"] = {"_cls": queryset._document._class_name} try: with set_read_write_concern( - queryset._collection, write_concern, read_concern + queryset._collection, write_concern, read_concern ) as collection: update_func = collection.update_one if multi: @@ -637,7 +637,7 @@ class BaseQuerySet: ) def modify( - self, upsert=False, full_response=False, remove=False, new=False, **update + self, upsert=False, full_response=False, remove=False, new=False, **update ): """Update and return the updated document. @@ -798,6 +798,7 @@ class BaseQuerySet: "_loaded_fields", "_ordering", "_snapshot", + "_allow_disk_use", "_timeout", "_read_preference", "_read_concern", @@ -967,7 +968,7 @@ class BaseQuerySet: for field_part in field.split(".")[1:]: # if looping on embedded document, get the document type instance if instance and isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): doc_field = instance # now get the subdocument @@ -976,12 +977,12 @@ class BaseQuerySet: if isinstance(doc_field, ListField): doc_field = getattr(doc_field, "field", doc_field) if isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): instance = getattr(doc_field, "document_type", None) if instance and isinstance( - doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) + doc_field, (EmbeddedDocumentField, GenericEmbeddedDocumentField) ): distinct = [instance(**doc) for doc in distinct] @@ -1165,6 +1166,16 @@ class BaseQuerySet: queryset._snapshot = enabled return queryset + def allow_disk_use(self, enabled): + """Enable or disable the use of temporary files on disk while processing a blocking sort operation. + (To store data exceeding the 100 megabyte system memory limit) + + :param enabled: wether or not temporary files on disk are used + """ + queryset = self.clone() + queryset._allow_disk_use = enabled + return queryset + def timeout(self, enabled): """Enable or disable the default mongod timeout when querying. (no_cursor_timeout option) @@ -1306,7 +1317,7 @@ class BaseQuerySet: # JS functionality def map_reduce( - self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None + self, map_f, reduce_f, output, finalize_f=None, limit=None, scope=None ): """Perform a map/reduce query using the current query spec and ordering. While ``map_reduce`` respects ``QuerySet`` chaining, @@ -1604,6 +1615,9 @@ class BaseQuerySet: if not self._timeout: cursor_args["no_cursor_timeout"] = True + if self._allow_disk_use: + cursor_args["allow_disk_use"] = True + if self._loaded_fields: cursor_args[fields_name] = self._loaded_fields.as_dict()