From 295ef3dc1deb1a805f8b19e3e859a381f825f119 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wilson=20J=C3=BAnior?= Date: Tue, 25 Feb 2014 15:36:30 -0300 Subject: [PATCH] db_alias support and fixes for custom map/reduce output --- mongoengine/queryset/base.py | 33 ++++++++- tests/queryset/queryset.py | 136 ++++++++++++++++++++++++++++++++++- 2 files changed, 167 insertions(+), 2 deletions(-) diff --git a/mongoengine/queryset/base.py b/mongoengine/queryset/base.py index c2ad027e..4bd7128e 100644 --- a/mongoengine/queryset/base.py +++ b/mongoengine/queryset/base.py @@ -7,12 +7,14 @@ import pprint import re import warnings +from bson import SON from bson.code import Code from bson import json_util import pymongo from pymongo.common import validate_read_preference from mongoengine import signals +from mongoengine.connection import get_db from mongoengine.common import _import_class from mongoengine.base.common import get_document from mongoengine.errors import (OperationError, NotUniqueError, @@ -923,7 +925,36 @@ class BaseQuerySet(object): map_reduce_function = 'inline_map_reduce' else: map_reduce_function = 'map_reduce' - mr_args['out'] = output + + if isinstance(output, basestring): + mr_args['out'] = output + + elif isinstance(output, dict): + ordered_output = [] + + for part in ('replace', 'merge', 'reduce'): + value = output.get(part) + if value: + ordered_output.append((part, value)) + break + + else: + raise OperationError("actionData not specified for output") + + db_alias = output.get('db_alias') + remaing_args = ['db', 'sharded', 'nonAtomic'] + + if db_alias: + ordered_output.append(('db', get_db(db_alias).name)) + del remaing_args[0] + + + for part in remaing_args: + value = output.get(part) + if value: + ordered_output.append((part, value)) + + mr_args['out'] = SON(ordered_output) results = getattr(queryset._collection, map_reduce_function)( map_f, reduce_f, **mr_args) diff --git a/tests/queryset/queryset.py b/tests/queryset/queryset.py index 7ff2965d..2fcd466c 100644 --- a/tests/queryset/queryset.py +++ b/tests/queryset/queryset.py @@ -14,7 +14,7 @@ from pymongo.read_preferences import ReadPreference from bson import ObjectId from mongoengine import * -from mongoengine.connection import get_connection +from mongoengine.connection import get_connection, get_db from mongoengine.python_support import PY3 from mongoengine.context_managers import query_counter from mongoengine.queryset import (QuerySet, QuerySetManager, @@ -1925,6 +1925,140 @@ class QuerySetTest(unittest.TestCase): BlogPost.drop_collection() + def test_map_reduce_custom_output(self): + """ + Test map/reduce custom output + """ + register_connection('test2', 'mongoenginetest2') + + class Family(Document): + id = IntField( + primary_key=True) + log = StringField() + + class Person(Document): + id = IntField( + primary_key=True) + name = StringField() + age = IntField() + family = ReferenceField(Family) + + Family.drop_collection() + Person.drop_collection() + + # creating first family + f1 = Family(id=1, log="Trav 02 de Julho") + f1.save() + + # persons of first family + Person(id=1, family=f1, name=u"Wilson Jr", age=21).save() + Person(id=2, family=f1, name=u"Wilson Father", age=45).save() + Person(id=3, family=f1, name=u"Eliana Costa", age=40).save() + Person(id=4, family=f1, name=u"Tayza Mariana", age=17).save() + + # creating second family + f2 = Family(id=2, log="Av prof frasc brunno") + f2.save() + + #persons of second family + Person(id=5, family=f2, name="Isabella Luanna", age=16).save() + Person(id=6, family=f2, name="Sandra Mara", age=36).save() + Person(id=7, family=f2, name="Igor Gabriel", age=10).save() + + # creating third family + f3 = Family(id=3, log="Av brazil") + f3.save() + + #persons of thrird family + Person(id=8, family=f3, name="Arthur WA", age=30).save() + Person(id=9, family=f3, name="Paula Leonel", age=25).save() + + # executing join map/reduce + map_person = """ + function () { + emit(this.family, { + totalAge: this.age, + persons: [{ + name: this.name, + age: this.age + }]}); + } + """ + + map_family = """ + function () { + emit(this._id, { + totalAge: 0, + persons: [] + }); + } + """ + + reduce_f = """ + function (key, values) { + var family = {persons: [], totalAge: 0}; + + values.forEach(function(value) { + if (value.persons) { + value.persons.forEach(function (person) { + family.persons.push(person); + family.totalAge += person.age; + }); + } + }); + + return family; + } + """ + cursor = Family.objects.map_reduce( + map_f=map_family, + reduce_f=reduce_f, + output={'replace': 'family_map', 'db_alias': 'test2'}) + + # start a map/reduce + cursor.next() + + results = Person.objects.map_reduce( + map_f=map_person, + reduce_f=reduce_f, + output={'reduce': 'family_map', 'db_alias': 'test2'}) + + results = list(results) + collection = get_db('test2').family_map + + self.assertEqual( + collection.find_one({'_id': 1}), { + '_id': 1, + 'value': { + 'persons': [ + {'age': 21, 'name': u'Wilson Jr'}, + {'age': 45, 'name': u'Wilson Father'}, + {'age': 40, 'name': u'Eliana Costa'}, + {'age': 17, 'name': u'Tayza Mariana'}], + 'totalAge': 123} + }) + + self.assertEqual( + collection.find_one({'_id': 2}), { + '_id': 2, + 'value': { + 'persons': [ + {'age': 16, 'name': u'Isabella Luanna'}, + {'age': 36, 'name': u'Sandra Mara'}, + {'age': 10, 'name': u'Igor Gabriel'}], + 'totalAge': 62} + }) + + self.assertEqual( + collection.find_one({'_id': 3}), { + '_id': 3, + 'value': { + 'persons': [ + {'age': 30, 'name': u'Arthur WA'}, + {'age': 25, 'name': u'Paula Leonel'}], + 'totalAge': 55} + }) + def test_map_reduce_finalize(self): """Ensure that map, reduce, and finalize run and introduce "scope" by simulating "hotness" ranking with Reddit algorithm.