Merge branch 'mapreduce' of git://github.com/blackbrrr/mongoengine

Conflicts:
	mongoengine/queryset.py
This commit is contained in:
Harry Marr 2010-03-17 12:31:08 +00:00
commit 047cc218a6
5 changed files with 346 additions and 78 deletions

View File

@ -21,6 +21,9 @@ Documents
.. autoclass:: mongoengine.EmbeddedDocument
:members:
.. autoclass:: mongoengine.MapReduceDocument
:members:
Querying
========

View File

@ -22,7 +22,7 @@ sys.path.append(os.path.abspath('..'))
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc']
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.todo']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']

View File

@ -115,3 +115,39 @@ class Document(BaseDocument):
"""
db = _get_db()
db.drop_collection(cls._meta['collection'])
class MapReduceDocument(object):
"""A document returned from a map/reduce query.
:param collection: An instance of :class:`~pymongo.Collection`
:param key: Document/result key, often an instance of
:class:`~pymongo.objectid.ObjectId`. If supplied as
an ``ObjectId`` found in the given ``collection``,
the object can be accessed via the ``object`` property.
:param value: The result(s) for this key.
.. versionadded:: 0.3
"""
def __init__(self, document, collection, key, value):
self._document = document
self._collection = collection
self.key = key
self.value = value
@property
def object(self):
"""Lazy-load the object referenced by ``self.key``. If ``self.key``
is not an ``ObjectId``, simply return ``self.key``.
"""
if not isinstance(self.key, (pymongo.objectid.ObjectId)):
try:
self.key = pymongo.objectid.ObjectId(self.key)
except:
return self.key
if not hasattr(self, "_key_object"):
self._key_object = self._document.objects.with_id(self.key)
return self._key_object
return self._key_object

View File

@ -30,6 +30,7 @@ class OperationError(Exception):
RE_TYPE = type(re.compile(''))
class Q(object):
OR = '||'
@ -143,6 +144,7 @@ class QuerySet(object):
self._query = {}
self._where_clause = None
self._loaded_fields = []
self._ordering = []
# If inheritance is allowed, only return instances and instances of
# subclasses of the class being used
@ -360,7 +362,7 @@ class QuerySet(object):
may be provided as a keyword argument called :attr:`defaults`.
"""
defaults = query.get('defaults', {})
if query.has_key('defaults'):
if 'defaults' in query:
del query['defaults']
self.__call__(*q_objs, **query)
@ -439,6 +441,70 @@ class QuerySet(object):
def __len__(self):
return self.count()
def map_reduce(self, map_f, reduce_f, finalize_f=None, limit=None,
scope=None, keep_temp=False):
"""Perform a map/reduce query using the current query spec
and ordering. While ``map_reduce`` respects ``QuerySet`` chaining,
it must be the last call made, as it does not return a maleable
``QuerySet``.
See the :meth:`~mongoengine.tests.QuerySetTest.test_map_reduce`
and :meth:`~mongoengine.tests.QuerySetTest.test_map_advanced`
tests in ``tests.queryset.QuerySetTest`` for usage examples.
:param map_f: map function, as :class:`~pymongo.code.Code` or string
:param reduce_f: reduce function, as
:class:`~pymongo.code.Code` or string
:param finalize_f: finalize function, an optional function that
performs any post-reduction processing.
:param scope: values to insert into map/reduce global scope. Optional.
:param limit: number of objects from current query to provide
to map/reduce method
:param keep_temp: keep temporary table (boolean, default ``True``)
Returns an iterator yielding
:class:`~mongoengine.document.MapReduceDocument`.
.. note:: Map/Reduce requires server version **>= 1.1.1**. The PyMongo
:meth:`~pymongo.collection.Collection.map_reduce` helper requires
PyMongo version **>= 1.2**.
.. versionadded:: 0.3
"""
from document import MapReduceDocument
if not hasattr(self._collection, "map_reduce"):
raise NotImplementedError("Requires MongoDB >= 1.1.1")
if not isinstance(map_f, pymongo.code.Code):
map_f = pymongo.code.Code(map_f)
if not isinstance(reduce_f, pymongo.code.Code):
reduce_f = pymongo.code.Code(reduce_f)
mr_args = {'query': self._query, 'keeptemp': keep_temp}
if finalize_f:
if not isinstance(finalize_f, pymongo.code.Code):
finalize_f = pymongo.code.Code(finalize_f)
mr_args['finalize'] = finalize_f
if scope:
mr_args['scope'] = scope
if limit:
mr_args['limit'] = limit
results = self._collection.map_reduce(map_f, reduce_f, **mr_args)
results = results.find()
if self._ordering:
results = results.sort(self._ordering)
for doc in results:
yield MapReduceDocument(self._document, self._collection,
doc['_id'], doc['value'])
def limit(self, n):
"""Limit the number of returned documents to `n`. This may also be
achieved using array-slicing syntax (e.g. ``User.objects[:5]``).
@ -450,6 +516,7 @@ class QuerySet(object):
else:
self._cursor.limit(n)
self._limit = n
# Return self to allow chaining
return self
@ -525,6 +592,7 @@ class QuerySet(object):
key = key[1:]
key_list.append((key, direction))
self._ordering = key_list
self._cursor.sort(key_list)
return self
@ -836,6 +904,7 @@ class QuerySetManager(object):
queryset = self._manager_func(owner, queryset)
return queryset
def queryset_manager(func):
"""Decorator that allows you to define custom QuerySet managers on
:class:`~mongoengine.Document` classes. The manager must be a function that

View File

@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
import unittest
import pymongo
from datetime import datetime
from datetime import datetime, timedelta
from mongoengine.queryset import (QuerySet, MultipleObjectsReturned,
DoesNotExist)
@ -631,6 +634,163 @@ class QuerySetTest(unittest.TestCase):
ages = [p.age for p in self.Person.objects.order_by('-name')]
self.assertEqual(ages, [30, 40, 20])
def test_map_reduce(self):
"""Ensure map/reduce is both mapping and reducing.
"""
class BlogPost(Document):
title = StringField()
tags = ListField(StringField())
BlogPost.drop_collection()
BlogPost(title="Post #1", tags=['music', 'film', 'print']).save()
BlogPost(title="Post #2", tags=['music', 'film']).save()
BlogPost(title="Post #3", tags=['film', 'photography']).save()
map_f = """
function() {
this.tags.forEach(function(tag) {
emit(tag, 1);
});
}
"""
reduce_f = """
function(key, values) {
var total = 0;
for(var i=0; i<values.length; i++) {
total += values[i];
}
return total;
}
"""
# run a map/reduce operation spanning all posts
results = BlogPost.objects.map_reduce(map_f, reduce_f)
results = list(results)
self.assertEqual(len(results), 4)
music = filter(lambda r: r.key == "music", results)[0]
self.assertEqual(music.value, 2)
film = filter(lambda r: r.key == "film", results)[0]
self.assertEqual(film.value, 3)
BlogPost.drop_collection()
def test_map_reduce_finalize(self):
"""Ensure that map, reduce, and finalize run and introduce "scope"
by simulating "hotness" ranking with Reddit algorithm.
"""
from time import mktime
class Link(Document):
title = StringField()
up_votes = IntField()
down_votes = IntField()
submitted = DateTimeField()
Link.drop_collection()
now = datetime.utcnow()
# Note: Test data taken from a custom Reddit homepage on
# Fri, 12 Feb 2010 14:36:00 -0600. Link ordering should
# reflect order of insertion below, but is not influenced
# by insertion order.
Link(title = "Google Buzz auto-followed a woman's abusive ex ...",
up_votes = 1079,
down_votes = 553,
submitted = now-timedelta(hours=4)).save()
Link(title = "We did it! Barbie is a computer engineer.",
up_votes = 481,
down_votes = 124,
submitted = now-timedelta(hours=2)).save()
Link(title = "This Is A Mosquito Getting Killed By A Laser",
up_votes = 1446,
down_votes = 530,
submitted=now-timedelta(hours=13)).save()
Link(title = "Arabic flashcards land physics student in jail.",
up_votes = 215,
down_votes = 105,
submitted = now-timedelta(hours=6)).save()
Link(title = "The Burger Lab: Presenting, the Flood Burger",
up_votes = 48,
down_votes = 17,
submitted = now-timedelta(hours=5)).save()
Link(title="How to see polarization with the naked eye",
up_votes = 74,
down_votes = 13,
submitted = now-timedelta(hours=10)).save()
map_f = """
function() {
emit(this._id, {up_delta: this.up_votes - this.down_votes,
sub_date: this.submitted.getTime() / 1000})
}
"""
reduce_f = """
function(key, values) {
data = values[0];
x = data.up_delta;
// calculate time diff between reddit epoch and submission
sec_since_epoch = data.sub_date - reddit_epoch;
// calculate 'Y'
if(x > 0) {
y = 1;
} else if (x = 0) {
y = 0;
} else {
y = -1;
}
// calculate 'Z', the maximal value
if(Math.abs(x) >= 1) {
z = Math.abs(x);
} else {
z = 1;
}
return {x: x, y: y, z: z, t_s: sec_since_epoch};
}
"""
finalize_f = """
function(key, value) {
// f(sec_since_epoch,y,z) = log10(z) + ((y*sec_since_epoch) / 45000)
z_10 = Math.log(value.z) / Math.log(10);
weight = z_10 + ((value.y * value.t_s) / 45000);
return weight;
}
"""
# provide the reddit epoch (used for ranking) as a variable available
# to all phases of the map/reduce operation: map, reduce, and finalize.
reddit_epoch = mktime(datetime(2005, 12, 8, 7, 46, 43).timetuple())
scope = {'reddit_epoch': reddit_epoch}
# run a map/reduce operation across all links. ordering is set
# to "-value", which orders the "weight" value returned from
# "finalize_f" in descending order.
results = Link.objects.order_by("-value")
results = results.map_reduce(map_f,
reduce_f,
finalize_f=finalize_f,
scope=scope)
results = list(results)
# assert troublesome Buzz article is ranked 1st
self.assertTrue(results[0].object.title.startswith("Google Buzz"))
# assert laser vision is ranked last
self.assertTrue(results[-1].object.title.startswith("How to see"))
Link.drop_collection()
def test_item_frequencies(self):
"""Ensure that item frequencies are properly generated from lists.
"""