Added inline_map_reduce functionality

Also added map_reduce method for calculating item_frequencies
Closes #183
This commit is contained in:
Ross Lawley 2011-05-26 15:44:43 +01:00
parent 9dd3504765
commit c903af032f
4 changed files with 104 additions and 28 deletions

View File

@ -5,6 +5,8 @@ Changelog
Changes in dev Changes in dev
============== ==============
- Added optional map_reduce method item_frequencies
- Added inline_map_reduce option to map_reduce
- Updated connection exception so it provides more info on the cause. - Updated connection exception so it provides more info on the cause.
- Added searching multiple levels deep in ``DictField`` - Added searching multiple levels deep in ``DictField``
- Added ``DictField`` entries containing strings to use matching operators - Added ``DictField`` entries containing strings to use matching operators

View File

@ -774,7 +774,8 @@ class QuerySet(object):
:param map_f: map function, as :class:`~pymongo.code.Code` or string :param map_f: map function, as :class:`~pymongo.code.Code` or string
:param reduce_f: reduce function, as :param reduce_f: reduce function, as
:class:`~pymongo.code.Code` or string :class:`~pymongo.code.Code` or string
:param output: output collection name :param output: output collection name, if set to 'inline' will try to
use :class:`~pymongo.collection.Collection.inline_map_reduce`
:param finalize_f: finalize function, an optional function that :param finalize_f: finalize function, an optional function that
performs any post-reduction processing. performs any post-reduction processing.
:param scope: values to insert into map/reduce global scope. Optional. :param scope: values to insert into map/reduce global scope. Optional.
@ -824,8 +825,17 @@ class QuerySet(object):
if limit: if limit:
mr_args['limit'] = limit mr_args['limit'] = limit
results = self._collection.map_reduce(map_f, reduce_f, output, **mr_args)
results = results.find() if output == 'inline' or (not keep_temp and not self._ordering):
map_reduce_function = 'inline_map_reduce'
else:
map_reduce_function = 'map_reduce'
mr_args['out'] = output
results = getattr(self._collection, map_reduce_function)(map_f, reduce_f, **mr_args)
if map_reduce_function == 'map_reduce':
results = results.find()
if self._ordering: if self._ordering:
results = results.sort(self._ordering) results = results.sort(self._ordering)
@ -1266,7 +1276,7 @@ class QuerySet(object):
""" """
return self.exec_js(average_func, field) return self.exec_js(average_func, field)
def item_frequencies(self, field, normalize=False): def item_frequencies(self, field, normalize=False, map_reduce=False):
"""Returns a dictionary of all items present in a field across """Returns a dictionary of all items present in a field across
the whole queried set of documents, and their corresponding frequency. the whole queried set of documents, and their corresponding frequency.
This is useful for generating tag clouds, or searching documents. This is useful for generating tag clouds, or searching documents.
@ -1276,7 +1286,52 @@ class QuerySet(object):
:param field: the field to use :param field: the field to use
:param normalize: normalize the results so they add to 1.0 :param normalize: normalize the results so they add to 1.0
:param map_reduce: Use map_reduce over exec_js
""" """
if map_reduce:
return self._item_frequencies_map_reduce(field, normalize=normalize)
return self._item_frequencies_exec_js(field, normalize=normalize)
def _item_frequencies_map_reduce(self, field, normalize=False):
map_func = """
function() {
if (this[~%(field)s].constructor == Array) {
this[~%(field)s].forEach(function(item) {
emit(item, 1);
});
} else {
emit(this[~%(field)s], 1);
}
}
""" % dict(field=field)
reduce_func = """
function(key, values) {
var total = 0;
var valuesSize = values.length;
for (var i=0; i < valuesSize; i++) {
total += parseInt(values[i], 10);
}
return total;
}
"""
values = self.map_reduce(map_func, reduce_func, 'inline', keep_temp=False)
frequencies = {}
for f in values:
key = f.key
if isinstance(key, float):
if int(key) == key:
key = int(key)
key = str(key)
frequencies[key] = f.value
if normalize:
count = sum(frequencies.values())
frequencies = dict([(k, v/count) for k,v in frequencies.items()])
return frequencies
def _item_frequencies_exec_js(self, field, normalize=False):
"""Uses exec_js to execute"""
freq_func = """ freq_func = """
function(field) { function(field) {
if (options.normalize) { if (options.normalize) {

View File

@ -15,7 +15,7 @@ def get_version(version_tuple):
version = '%s.%s' % (version, version_tuple[2]) version = '%s.%s' % (version, version_tuple[2])
return version return version
# Dirty hack to get version number from monogengine/__init__.py - we can't # Dirty hack to get version number from monogengine/__init__.py - we can't
# import it as it depends on PyMongo and PyMongo isn't installed until this # import it as it depends on PyMongo and PyMongo isn't installed until this
# file is read # file is read
init = os.path.join(os.path.dirname(__file__), 'mongoengine', '__init__.py') init = os.path.join(os.path.dirname(__file__), 'mongoengine', '__init__.py')

View File

@ -1466,35 +1466,54 @@ class QuerySetTest(unittest.TestCase):
BlogPost(hits=2, tags=['music', 'watch']).save() BlogPost(hits=2, tags=['music', 'watch']).save()
BlogPost(hits=2, tags=['music', 'actors']).save() BlogPost(hits=2, tags=['music', 'actors']).save()
f = BlogPost.objects.item_frequencies('tags') def test_assertions(f):
f = dict((key, int(val)) for key, val in f.items()) f = dict((key, int(val)) for key, val in f.items())
self.assertEqual(set(['music', 'film', 'actors', 'watch']), set(f.keys())) self.assertEqual(set(['music', 'film', 'actors', 'watch']), set(f.keys()))
self.assertEqual(f['music'], 3) self.assertEqual(f['music'], 3)
self.assertEqual(f['actors'], 2) self.assertEqual(f['actors'], 2)
self.assertEqual(f['watch'], 2) self.assertEqual(f['watch'], 2)
self.assertEqual(f['film'], 1) self.assertEqual(f['film'], 1)
exec_js = BlogPost.objects.item_frequencies('tags')
map_reduce = BlogPost.objects.item_frequencies('tags', map_reduce=True)
test_assertions(exec_js)
test_assertions(map_reduce)
# Ensure query is taken into account # Ensure query is taken into account
f = BlogPost.objects(hits__gt=1).item_frequencies('tags') def test_assertions(f):
f = dict((key, int(val)) for key, val in f.items()) f = dict((key, int(val)) for key, val in f.items())
self.assertEqual(set(['music', 'actors', 'watch']), set(f.keys())) self.assertEqual(set(['music', 'actors', 'watch']), set(f.keys()))
self.assertEqual(f['music'], 2) self.assertEqual(f['music'], 2)
self.assertEqual(f['actors'], 1) self.assertEqual(f['actors'], 1)
self.assertEqual(f['watch'], 1) self.assertEqual(f['watch'], 1)
exec_js = BlogPost.objects(hits__gt=1).item_frequencies('tags')
map_reduce = BlogPost.objects(hits__gt=1).item_frequencies('tags', map_reduce=True)
test_assertions(exec_js)
test_assertions(map_reduce)
# Check that normalization works # Check that normalization works
f = BlogPost.objects.item_frequencies('tags', normalize=True) def test_assertions(f):
self.assertAlmostEqual(f['music'], 3.0/8.0) self.assertAlmostEqual(f['music'], 3.0/8.0)
self.assertAlmostEqual(f['actors'], 2.0/8.0) self.assertAlmostEqual(f['actors'], 2.0/8.0)
self.assertAlmostEqual(f['watch'], 2.0/8.0) self.assertAlmostEqual(f['watch'], 2.0/8.0)
self.assertAlmostEqual(f['film'], 1.0/8.0) self.assertAlmostEqual(f['film'], 1.0/8.0)
exec_js = BlogPost.objects.item_frequencies('tags', normalize=True)
map_reduce = BlogPost.objects.item_frequencies('tags', normalize=True, map_reduce=True)
test_assertions(exec_js)
test_assertions(map_reduce)
# Check item_frequencies works for non-list fields # Check item_frequencies works for non-list fields
f = BlogPost.objects.item_frequencies('hits') def test_assertions(f):
f = dict((key, int(val)) for key, val in f.items()) self.assertEqual(set(['1', '2']), set(f.keys()))
self.assertEqual(set(['1', '2']), set(f.keys())) self.assertEqual(f['1'], 1)
self.assertEqual(f['1'], 1) self.assertEqual(f['2'], 2)
self.assertEqual(f['2'], 2)
exec_js = BlogPost.objects.item_frequencies('hits')
map_reduce = BlogPost.objects.item_frequencies('hits', map_reduce=True)
test_assertions(exec_js)
test_assertions(map_reduce)
BlogPost.drop_collection() BlogPost.drop_collection()