map/reduce result objects now only have 'key', 'value', and 'object' properties; MapReduceDocument.key_object now returns proper Document subclass; added finalize with Reddit ranking simulation; MapReduceDocuments now yielded;

This commit is contained in:
blackbrrr 2010-02-12 14:39:08 -06:00
parent 5c311eefb1
commit 9be6c41af7
3 changed files with 132 additions and 21 deletions

View File

@ -124,34 +124,31 @@ class MapReduceDocument(object):
:param key: Document/result key, often an instance of :param key: Document/result key, often an instance of
:class:`~pymongo.objectid.ObjectId`. If supplied as :class:`~pymongo.objectid.ObjectId`. If supplied as
an ``ObjectId`` found in the given ``collection``, an ``ObjectId`` found in the given ``collection``,
the object can be accessed via the ``key_object`` property. the object can be accessed via the ``object`` property.
:param value: The result(s) for this key. If given as a dictionary, :param value: The result(s) for this key.
each key in the dictionary will be available as
an instance attribute.
.. versionadded:: 0.2.2 .. versionadded:: 0.2.2
""" """
def __init__(self, collection, key, value): def __init__(self, document, collection, key, value):
self._document = document
self._collection = collection self._collection = collection
self.key = key self.key = key
self.value = value self.value = value
if isinstance(value, dict):
# create attributes for each named result
for k, v in value.iteritems():
setattr(self, k, v)
@property @property
def object(self): def object(self):
"""Lazy-load the object referenced by ``self.key``. If ``self.key`` """Lazy-load the object referenced by ``self.key``. If ``self.key``
is not an ``ObjectId``, simply return ``self.key``. is not an ``ObjectId``, simply return ``self.key``.
""" """
if not isinstance(self.key, pymongo.objectid.ObjectId): if not isinstance(self.key, (pymongo.objectid.ObjectId)):
return self.key try:
self.key = pymongo.objectid.ObjectId(self.key)
except:
return self.key
if not hasattr(self, "_key_object"): if not hasattr(self, "_key_object"):
self._key_object = self._collection.find_one(self.key) self._key_object = self._document.objects.with_id(self.key)
return self._key_object return self._key_object
return self._key_object return self._key_object

View File

@ -125,6 +125,7 @@ class QuerySet(object):
self._query = {} self._query = {}
self._where_clause = None self._where_clause = None
self._ordering = [] self._ordering = []
self._limit = None
# If inheritance is allowed, only return instances and instances of # If inheritance is allowed, only return instances and instances of
# subclasses of the class being used # subclasses of the class being used
@ -380,7 +381,8 @@ class QuerySet(object):
def __len__(self): def __len__(self):
return self.count() return self.count()
def map_reduce(self, map_f, reduce_f, scope=None, keep_temp=False): def map_reduce(self, map_f, reduce_f, finalize_f=None, limit=None,
scope=None, keep_temp=False):
"""Perform a map/reduce query using the current query spec """Perform a map/reduce query using the current query spec
and ordering. While ``map_reduce`` respects ``QuerySet`` chaining, and ordering. While ``map_reduce`` respects ``QuerySet`` chaining,
it must be the last call made, as it does not return a maleable it must be the last call made, as it does not return a maleable
@ -402,6 +404,8 @@ class QuerySet(object):
:param reduce_f: reduce function, as :param reduce_f: reduce function, as
:class:`~pymongo.code.Code` or string :class:`~pymongo.code.Code` or string
:param scope: values to insert into map/reduce global scope. Optional. :param scope: values to insert into map/reduce global scope. Optional.
:param limit: number of objects from current query to provide
to map/reduce method
:param keep_temp: keep temporary table (boolean, default ``True``) :param keep_temp: keep temporary table (boolean, default ``True``)
Returns a list of :class:`~mongoengine.document.MapReduceDocument`. Returns a list of :class:`~mongoengine.document.MapReduceDocument`.
@ -427,13 +431,16 @@ class QuerySet(object):
mr_args = {'query': self._query, 'keeptemp': keep_temp} mr_args = {'query': self._query, 'keeptemp': keep_temp}
if finalize_f:
if not isinstance(finalize_f, pymongo.code.Code):
finalize_f = pymongo.code.Code(finalize_f)
mr_args['finalize'] = finalize_f
if scope: if scope:
mr_args['scope'] = scope mr_args['scope'] = scope
if limit: if limit:
mr_args['limit'] = limit mr_args['limit'] = limit
docs = []
results = self._collection.map_reduce(map_f, reduce_f, **mr_args) results = self._collection.map_reduce(map_f, reduce_f, **mr_args)
results = results.find() results = results.find()
@ -441,10 +448,8 @@ class QuerySet(object):
results = results.sort(self._ordering) results = results.sort(self._ordering)
for doc in results: for doc in results:
mrd = MapReduceDocument(self._collection, doc['_id'], doc['value']) yield MapReduceDocument(self._document, self._collection,
docs.append(mrd) doc['_id'], doc['value'])
return docs
def limit(self, n): def limit(self, n):
"""Limit the number of returned documents to `n`. This may also be """Limit the number of returned documents to `n`. This may also be
@ -452,6 +457,7 @@ class QuerySet(object):
:param n: the maximum number of objects to return :param n: the maximum number of objects to return
""" """
self._limit = n
self._cursor.limit(n) self._cursor.limit(n)
# Return self to allow chaining # Return self to allow chaining
return self return self

View File

@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
import unittest import unittest
import pymongo import pymongo
from datetime import datetime from datetime import datetime, timedelta
from mongoengine.queryset import (QuerySet, MultipleObjectsReturned, from mongoengine.queryset import (QuerySet, MultipleObjectsReturned,
DoesNotExist) DoesNotExist)
@ -489,16 +492,121 @@ class QuerySetTest(unittest.TestCase):
# ensure both artists are found # ensure both artists are found
results = Song.objects.map_reduce(map_f, reduce_f) results = Song.objects.map_reduce(map_f, reduce_f)
results = list(results)
self.assertEqual(len(results), 2) self.assertEqual(len(results), 2)
# query for a count of Songs per artist, ordered by -count. # query for a count of Songs per artist, ordered by -count.
# Patti Smith has 3 song credits, and should therefore be first. # Patti Smith has 3 song credits, and should therefore be first.
results = Song.objects.order_by("-value").map_reduce(map_f, reduce_f) results = Song.objects.order_by("-value").map_reduce(map_f, reduce_f)
results = list(results)
self.assertEqual(results[0].key, "Patti Smith") self.assertEqual(results[0].key, "Patti Smith")
self.assertEqual(results[0].value, 3.0) self.assertEqual(results[0].value, 3.0)
Song.drop_collection() Song.drop_collection()
def test_map_reduce_finalize(self):
"""Ensure scope and finalize are working correctly by simulating
"hotness" ranking with Reddit algorithm.
"""
from time import mktime
class Link(Document):
title = StringField()
up_votes = IntField()
down_votes = IntField()
submitted = DateTimeField()
Link.drop_collection()
now = datetime.utcnow()
# Note: Test data taken from a custom Reddit homepage on
# Fri, 12 Feb 2010 14:36:00 -0600.
Link(title = "Google Buzz auto-followed a woman's abusive ex ...",
up_votes = 1079,
down_votes = 553,
submitted = now-timedelta(hours=4)).save()
Link(title = "We did it! Barbie is a computer engineer.",
up_votes = 481,
down_votes = 124,
submitted = now-timedelta(hours=2)).save()
Link(title = "This Is A Mosquito Getting Killed By A Laser",
up_votes = 1446,
down_votes = 530,
submitted=now-timedelta(hours=13)).save()
Link(title = "Arabic flashcards land physics student in jail.",
up_votes = 215,
down_votes = 105,
submitted = now-timedelta(hours=6)).save()
Link(title = "The Burger Lab: Presenting, the Flood Burger",
up_votes = 48,
down_votes = 17,
submitted = now-timedelta(hours=5)).save()
Link(title="How to see polarization with the naked eye",
up_votes = 74,
down_votes = 13,
submitted = now-timedelta(hours=10)).save()
map_f = """
function() {
emit(this._id, {up_delta: this.up_votes - this.down_votes,
reddit_epoch: new Date(2005, 12, 8, 7, 46, 43, 0).getTime(),
sub_date: this.submitted.getTime()})
}
"""
reduce_f = """
function(key, values) {
data = values[0];
x = data.up_delta;
// calculate time diff between reddit epoch and submission
sec_since_epoch = data.sub_date - data.reddit_epoch;
sec_since_epoch /= 1000;
// calculate 'Y'
if(x > 0) {
y = 1;
} else if (x = 0) {
y = 0;
} else {
y = -1;
}
// calculate 'Z', the maximal value
if(Math.abs(x) >= 1) {
z = Math.abs(x);
} else {
z = 1;
}
return {x: x, y: y, z: z, t_s: sec_since_epoch};
}
"""
finalize_f = """
function(key, value) {
// f(sec_since_epoch,y,z) = log10(z) + ((y*sec_since_epoch) / 45000)
z_10 = Math.log(value.z) / Math.log(10);
weight = z_10 + ((value.y * value.t_s) / 45000);
return weight;
}
"""
# ensure both artists are found
results = Link.objects.order_by("-value")
results = results.map_reduce(map_f, reduce_f, finalize_f=finalize_f)
results = list(results)
self.assertTrue(results[0].object.title.startswith("Google Buzz"))
self.assertTrue(results[-1].object.title.startswith("How to see"))
Link.drop_collection()
def test_item_frequencies(self): def test_item_frequencies(self):
"""Ensure that item frequencies are properly generated from lists. """Ensure that item frequencies are properly generated from lists.
""" """