From c903af032fd65614585e9b9a377abf14f046fdfc Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Thu, 26 May 2011 15:44:43 +0100 Subject: [PATCH] Added inline_map_reduce functionality Also added map_reduce method for calculating item_frequencies Closes #183 --- docs/changelog.rst | 2 ++ mongoengine/queryset.py | 63 ++++++++++++++++++++++++++++++++++++--- setup.py | 2 +- tests/queryset.py | 65 ++++++++++++++++++++++++++--------------- 4 files changed, 104 insertions(+), 28 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 29d03bc2..686b326f 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -5,6 +5,8 @@ Changelog Changes in dev ============== +- Added optional map_reduce method item_frequencies +- Added inline_map_reduce option to map_reduce - Updated connection exception so it provides more info on the cause. - Added searching multiple levels deep in ``DictField`` - Added ``DictField`` entries containing strings to use matching operators diff --git a/mongoengine/queryset.py b/mongoengine/queryset.py index f5020ab8..17a1b0da 100644 --- a/mongoengine/queryset.py +++ b/mongoengine/queryset.py @@ -774,7 +774,8 @@ class QuerySet(object): :param map_f: map function, as :class:`~pymongo.code.Code` or string :param reduce_f: reduce function, as :class:`~pymongo.code.Code` or string - :param output: output collection name + :param output: output collection name, if set to 'inline' will try to + use :class:`~pymongo.collection.Collection.inline_map_reduce` :param finalize_f: finalize function, an optional function that performs any post-reduction processing. :param scope: values to insert into map/reduce global scope. Optional. @@ -824,8 +825,17 @@ class QuerySet(object): if limit: mr_args['limit'] = limit - results = self._collection.map_reduce(map_f, reduce_f, output, **mr_args) - results = results.find() + + if output == 'inline' or (not keep_temp and not self._ordering): + map_reduce_function = 'inline_map_reduce' + else: + map_reduce_function = 'map_reduce' + mr_args['out'] = output + + results = getattr(self._collection, map_reduce_function)(map_f, reduce_f, **mr_args) + + if map_reduce_function == 'map_reduce': + results = results.find() if self._ordering: results = results.sort(self._ordering) @@ -1266,7 +1276,7 @@ class QuerySet(object): """ return self.exec_js(average_func, field) - def item_frequencies(self, field, normalize=False): + def item_frequencies(self, field, normalize=False, map_reduce=False): """Returns a dictionary of all items present in a field across the whole queried set of documents, and their corresponding frequency. This is useful for generating tag clouds, or searching documents. @@ -1276,7 +1286,52 @@ class QuerySet(object): :param field: the field to use :param normalize: normalize the results so they add to 1.0 + :param map_reduce: Use map_reduce over exec_js """ + if map_reduce: + return self._item_frequencies_map_reduce(field, normalize=normalize) + return self._item_frequencies_exec_js(field, normalize=normalize) + + def _item_frequencies_map_reduce(self, field, normalize=False): + map_func = """ + function() { + if (this[~%(field)s].constructor == Array) { + this[~%(field)s].forEach(function(item) { + emit(item, 1); + }); + } else { + emit(this[~%(field)s], 1); + } + } + """ % dict(field=field) + reduce_func = """ + function(key, values) { + var total = 0; + var valuesSize = values.length; + for (var i=0; i < valuesSize; i++) { + total += parseInt(values[i], 10); + } + return total; + } + """ + values = self.map_reduce(map_func, reduce_func, 'inline', keep_temp=False) + frequencies = {} + for f in values: + key = f.key + if isinstance(key, float): + if int(key) == key: + key = int(key) + key = str(key) + frequencies[key] = f.value + + if normalize: + count = sum(frequencies.values()) + frequencies = dict([(k, v/count) for k,v in frequencies.items()]) + + return frequencies + + def _item_frequencies_exec_js(self, field, normalize=False): + """Uses exec_js to execute""" freq_func = """ function(field) { if (options.normalize) { diff --git a/setup.py b/setup.py index e0585b7c..0c19d8d0 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_version(version_tuple): version = '%s.%s' % (version, version_tuple[2]) return version -# Dirty hack to get version number from monogengine/__init__.py - we can't +# Dirty hack to get version number from monogengine/__init__.py - we can't # import it as it depends on PyMongo and PyMongo isn't installed until this # file is read init = os.path.join(os.path.dirname(__file__), 'mongoengine', '__init__.py') diff --git a/tests/queryset.py b/tests/queryset.py index d5611fdf..1f03fbd9 100644 --- a/tests/queryset.py +++ b/tests/queryset.py @@ -1466,35 +1466,54 @@ class QuerySetTest(unittest.TestCase): BlogPost(hits=2, tags=['music', 'watch']).save() BlogPost(hits=2, tags=['music', 'actors']).save() - f = BlogPost.objects.item_frequencies('tags') - f = dict((key, int(val)) for key, val in f.items()) - self.assertEqual(set(['music', 'film', 'actors', 'watch']), set(f.keys())) - self.assertEqual(f['music'], 3) - self.assertEqual(f['actors'], 2) - self.assertEqual(f['watch'], 2) - self.assertEqual(f['film'], 1) + def test_assertions(f): + f = dict((key, int(val)) for key, val in f.items()) + self.assertEqual(set(['music', 'film', 'actors', 'watch']), set(f.keys())) + self.assertEqual(f['music'], 3) + self.assertEqual(f['actors'], 2) + self.assertEqual(f['watch'], 2) + self.assertEqual(f['film'], 1) + + exec_js = BlogPost.objects.item_frequencies('tags') + map_reduce = BlogPost.objects.item_frequencies('tags', map_reduce=True) + test_assertions(exec_js) + test_assertions(map_reduce) # Ensure query is taken into account - f = BlogPost.objects(hits__gt=1).item_frequencies('tags') - f = dict((key, int(val)) for key, val in f.items()) - self.assertEqual(set(['music', 'actors', 'watch']), set(f.keys())) - self.assertEqual(f['music'], 2) - self.assertEqual(f['actors'], 1) - self.assertEqual(f['watch'], 1) + def test_assertions(f): + f = dict((key, int(val)) for key, val in f.items()) + self.assertEqual(set(['music', 'actors', 'watch']), set(f.keys())) + self.assertEqual(f['music'], 2) + self.assertEqual(f['actors'], 1) + self.assertEqual(f['watch'], 1) + + exec_js = BlogPost.objects(hits__gt=1).item_frequencies('tags') + map_reduce = BlogPost.objects(hits__gt=1).item_frequencies('tags', map_reduce=True) + test_assertions(exec_js) + test_assertions(map_reduce) # Check that normalization works - f = BlogPost.objects.item_frequencies('tags', normalize=True) - self.assertAlmostEqual(f['music'], 3.0/8.0) - self.assertAlmostEqual(f['actors'], 2.0/8.0) - self.assertAlmostEqual(f['watch'], 2.0/8.0) - self.assertAlmostEqual(f['film'], 1.0/8.0) + def test_assertions(f): + self.assertAlmostEqual(f['music'], 3.0/8.0) + self.assertAlmostEqual(f['actors'], 2.0/8.0) + self.assertAlmostEqual(f['watch'], 2.0/8.0) + self.assertAlmostEqual(f['film'], 1.0/8.0) + + exec_js = BlogPost.objects.item_frequencies('tags', normalize=True) + map_reduce = BlogPost.objects.item_frequencies('tags', normalize=True, map_reduce=True) + test_assertions(exec_js) + test_assertions(map_reduce) # Check item_frequencies works for non-list fields - f = BlogPost.objects.item_frequencies('hits') - f = dict((key, int(val)) for key, val in f.items()) - self.assertEqual(set(['1', '2']), set(f.keys())) - self.assertEqual(f['1'], 1) - self.assertEqual(f['2'], 2) + def test_assertions(f): + self.assertEqual(set(['1', '2']), set(f.keys())) + self.assertEqual(f['1'], 1) + self.assertEqual(f['2'], 2) + + exec_js = BlogPost.objects.item_frequencies('hits') + map_reduce = BlogPost.objects.item_frequencies('hits', map_reduce=True) + test_assertions(exec_js) + test_assertions(map_reduce) BlogPost.drop_collection()