Compare commits
3 Commits
fix-iterat
...
batch-size
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34ba527e6d | ||
|
|
ea9027755f | ||
|
|
43668a93a2 |
@@ -82,6 +82,7 @@ class BaseQuerySet(object):
|
|||||||
self._limit = None
|
self._limit = None
|
||||||
self._skip = None
|
self._skip = None
|
||||||
self._hint = -1 # Using -1 as None is a valid value for hint
|
self._hint = -1 # Using -1 as None is a valid value for hint
|
||||||
|
self._batch_size = None
|
||||||
self.only_fields = []
|
self.only_fields = []
|
||||||
self._max_time_ms = None
|
self._max_time_ms = None
|
||||||
|
|
||||||
@@ -275,8 +276,6 @@ class BaseQuerySet(object):
|
|||||||
except StopIteration:
|
except StopIteration:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
# If we were able to retrieve the 2nd doc, rewind the cursor and
|
|
||||||
# raise the MultipleObjectsReturned exception.
|
|
||||||
queryset.rewind()
|
queryset.rewind()
|
||||||
message = u'%d items returned, instead of 1' % queryset.count()
|
message = u'%d items returned, instead of 1' % queryset.count()
|
||||||
raise queryset._document.MultipleObjectsReturned(message)
|
raise queryset._document.MultipleObjectsReturned(message)
|
||||||
@@ -783,6 +782,19 @@ class BaseQuerySet(object):
|
|||||||
queryset._hint = index
|
queryset._hint = index
|
||||||
return queryset
|
return queryset
|
||||||
|
|
||||||
|
def batch_size(self, size):
|
||||||
|
"""Limit the number of documents returned in a single batch (each
|
||||||
|
batch requires a round trip to the server).
|
||||||
|
|
||||||
|
See http://api.mongodb.com/python/current/api/pymongo/cursor.html#pymongo.cursor.Cursor.batch_size
|
||||||
|
for details.
|
||||||
|
|
||||||
|
:param size: desired size of each batch.
|
||||||
|
"""
|
||||||
|
queryset = self.clone()
|
||||||
|
queryset._batch_size = size
|
||||||
|
return queryset
|
||||||
|
|
||||||
def distinct(self, field):
|
def distinct(self, field):
|
||||||
"""Return a list of distinct values for a given field.
|
"""Return a list of distinct values for a given field.
|
||||||
|
|
||||||
@@ -1469,6 +1481,9 @@ class BaseQuerySet(object):
|
|||||||
if self._hint != -1:
|
if self._hint != -1:
|
||||||
self._cursor_obj.hint(self._hint)
|
self._cursor_obj.hint(self._hint)
|
||||||
|
|
||||||
|
if self._batch_size is not None:
|
||||||
|
self._cursor_obj.batch_size(self._batch_size)
|
||||||
|
|
||||||
return self._cursor_obj
|
return self._cursor_obj
|
||||||
|
|
||||||
def __deepcopy__(self, memo):
|
def __deepcopy__(self, memo):
|
||||||
|
|||||||
@@ -27,10 +27,9 @@ class QuerySet(BaseQuerySet):
|
|||||||
in batches of ``ITER_CHUNK_SIZE``.
|
in batches of ``ITER_CHUNK_SIZE``.
|
||||||
|
|
||||||
If ``self._has_more`` the cursor hasn't been exhausted so cache then
|
If ``self._has_more`` the cursor hasn't been exhausted so cache then
|
||||||
batch. Otherwise iterate the result_cache.
|
batch. Otherwise iterate the result_cache.
|
||||||
"""
|
"""
|
||||||
self._iter = True
|
self._iter = True
|
||||||
|
|
||||||
if self._has_more:
|
if self._has_more:
|
||||||
return self._iter_results()
|
return self._iter_results()
|
||||||
|
|
||||||
@@ -43,12 +42,10 @@ class QuerySet(BaseQuerySet):
|
|||||||
"""
|
"""
|
||||||
if self._len is not None:
|
if self._len is not None:
|
||||||
return self._len
|
return self._len
|
||||||
|
|
||||||
# Populate the result cache with *all* of the docs in the cursor
|
|
||||||
if self._has_more:
|
if self._has_more:
|
||||||
|
# populate the cache
|
||||||
list(self._iter_results())
|
list(self._iter_results())
|
||||||
|
|
||||||
# Cache the length of the complete result cache and return it
|
|
||||||
self._len = len(self._result_cache)
|
self._len = len(self._result_cache)
|
||||||
return self._len
|
return self._len
|
||||||
|
|
||||||
@@ -67,33 +64,18 @@ class QuerySet(BaseQuerySet):
|
|||||||
def _iter_results(self):
|
def _iter_results(self):
|
||||||
"""A generator for iterating over the result cache.
|
"""A generator for iterating over the result cache.
|
||||||
|
|
||||||
Also populates the cache if there are more possible results to
|
Also populates the cache if there are more possible results to yield.
|
||||||
yield. Raises StopIteration when there are no more results.
|
Raises StopIteration when there are no more results"""
|
||||||
"""
|
|
||||||
if self._result_cache is None:
|
if self._result_cache is None:
|
||||||
self._result_cache = []
|
self._result_cache = []
|
||||||
|
|
||||||
pos = 0
|
pos = 0
|
||||||
while True:
|
while True:
|
||||||
|
upper = len(self._result_cache)
|
||||||
# For all positions lower than the length of the current result
|
while pos < upper:
|
||||||
# cache, serve the docs straight from the cache w/o hitting the
|
|
||||||
# database.
|
|
||||||
# XXX it's VERY important to compute the len within the `while`
|
|
||||||
# condition because the result cache might expand mid-iteration
|
|
||||||
# (e.g. if we call len(qs) inside a loop that iterates over the
|
|
||||||
# queryset). Fortunately len(list) is O(1) in Python, so this
|
|
||||||
# doesn't cause performance issues.
|
|
||||||
while pos < len(self._result_cache):
|
|
||||||
yield self._result_cache[pos]
|
yield self._result_cache[pos]
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
# Raise StopIteration if we already established there were no more
|
|
||||||
# docs in the db cursor.
|
|
||||||
if not self._has_more:
|
if not self._has_more:
|
||||||
raise StopIteration
|
raise StopIteration
|
||||||
|
|
||||||
# Otherwise, populate more of the cache and repeat.
|
|
||||||
if len(self._result_cache) <= pos:
|
if len(self._result_cache) <= pos:
|
||||||
self._populate_cache()
|
self._populate_cache()
|
||||||
|
|
||||||
@@ -104,22 +86,12 @@ class QuerySet(BaseQuerySet):
|
|||||||
"""
|
"""
|
||||||
if self._result_cache is None:
|
if self._result_cache is None:
|
||||||
self._result_cache = []
|
self._result_cache = []
|
||||||
|
if self._has_more:
|
||||||
# Skip populating the cache if we already established there are no
|
try:
|
||||||
# more docs to pull from the database.
|
for i in xrange(ITER_CHUNK_SIZE):
|
||||||
if not self._has_more:
|
self._result_cache.append(self.next())
|
||||||
return
|
except StopIteration:
|
||||||
|
self._has_more = False
|
||||||
# Pull in ITER_CHUNK_SIZE docs from the database and store them in
|
|
||||||
# the result cache.
|
|
||||||
try:
|
|
||||||
for i in xrange(ITER_CHUNK_SIZE):
|
|
||||||
self._result_cache.append(self.next())
|
|
||||||
except StopIteration:
|
|
||||||
# Getting this exception means there are no more docs in the
|
|
||||||
# db cursor. Set _has_more to False so that we can use that
|
|
||||||
# information in other places.
|
|
||||||
self._has_more = False
|
|
||||||
|
|
||||||
def count(self, with_limit_and_skip=False):
|
def count(self, with_limit_and_skip=False):
|
||||||
"""Count the selected elements in the query.
|
"""Count the selected elements in the query.
|
||||||
|
|||||||
@@ -337,6 +337,34 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
query = query.filter(boolfield=True)
|
query = query.filter(boolfield=True)
|
||||||
self.assertEqual(query.count(), 1)
|
self.assertEqual(query.count(), 1)
|
||||||
|
|
||||||
|
def test_batch_size(self):
|
||||||
|
"""Ensure that batch_size works."""
|
||||||
|
class A(Document):
|
||||||
|
s = StringField()
|
||||||
|
|
||||||
|
A.drop_collection()
|
||||||
|
|
||||||
|
for i in range(100):
|
||||||
|
A.objects.create(s=str(i))
|
||||||
|
|
||||||
|
# test iterating over the result set
|
||||||
|
cnt = 0
|
||||||
|
for a in A.objects.batch_size(10):
|
||||||
|
cnt += 1
|
||||||
|
self.assertEqual(cnt, 100)
|
||||||
|
|
||||||
|
# test chaining
|
||||||
|
qs = A.objects.all()
|
||||||
|
qs = qs.limit(10).batch_size(20).skip(91)
|
||||||
|
cnt = 0
|
||||||
|
for a in qs:
|
||||||
|
cnt += 1
|
||||||
|
self.assertEqual(cnt, 9)
|
||||||
|
|
||||||
|
# test invalid batch size
|
||||||
|
qs = A.objects.batch_size(-1)
|
||||||
|
self.assertRaises(ValueError, lambda: list(qs))
|
||||||
|
|
||||||
def test_update_write_concern(self):
|
def test_update_write_concern(self):
|
||||||
"""Test that passing write_concern works"""
|
"""Test that passing write_concern works"""
|
||||||
self.Person.drop_collection()
|
self.Person.drop_collection()
|
||||||
@@ -4890,56 +4918,6 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
|
|
||||||
self.assertEqual(1, Doc.objects(item__type__="axe").count())
|
self.assertEqual(1, Doc.objects(item__type__="axe").count())
|
||||||
|
|
||||||
def test_len_during_iteration(self):
|
|
||||||
"""Tests that calling len on a queyset during iteration doesn't
|
|
||||||
stop paging.
|
|
||||||
"""
|
|
||||||
class Data(Document):
|
|
||||||
pass
|
|
||||||
|
|
||||||
for i in xrange(300):
|
|
||||||
Data().save()
|
|
||||||
|
|
||||||
records = Data.objects.limit(250)
|
|
||||||
|
|
||||||
# This should pull all 250 docs from mongo and populate the result
|
|
||||||
# cache
|
|
||||||
len(records)
|
|
||||||
|
|
||||||
# Assert that iterating over documents in the qs touches every
|
|
||||||
# document even if we call len(qs) midway through the iteration.
|
|
||||||
for i, r in enumerate(records):
|
|
||||||
if i == 58:
|
|
||||||
len(records)
|
|
||||||
self.assertEqual(i, 249)
|
|
||||||
|
|
||||||
# Assert the same behavior is true even if we didn't pre-populate the
|
|
||||||
# result cache.
|
|
||||||
records = Data.objects.limit(250)
|
|
||||||
for i, r in enumerate(records):
|
|
||||||
if i == 58:
|
|
||||||
len(records)
|
|
||||||
self.assertEqual(i, 249)
|
|
||||||
|
|
||||||
def test_iteration_within_iteration(self):
|
|
||||||
"""You should be able to reliably iterate over all the documents
|
|
||||||
in a given queryset even if there are multiple iterations of it
|
|
||||||
happening at the same time.
|
|
||||||
"""
|
|
||||||
class Data(Document):
|
|
||||||
pass
|
|
||||||
|
|
||||||
for i in xrange(300):
|
|
||||||
Data().save()
|
|
||||||
|
|
||||||
qs = Data.objects.limit(250)
|
|
||||||
for i, doc in enumerate(qs):
|
|
||||||
for j, doc2 in enumerate(qs):
|
|
||||||
pass
|
|
||||||
|
|
||||||
self.assertEqual(i, 249)
|
|
||||||
self.assertEqual(j, 249)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user