db_alias support and fixes for custom map/reduce output
This commit is contained in:
parent
d4b3649640
commit
295ef3dc1d
@ -7,12 +7,14 @@ import pprint
|
|||||||
import re
|
import re
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
from bson import SON
|
||||||
from bson.code import Code
|
from bson.code import Code
|
||||||
from bson import json_util
|
from bson import json_util
|
||||||
import pymongo
|
import pymongo
|
||||||
from pymongo.common import validate_read_preference
|
from pymongo.common import validate_read_preference
|
||||||
|
|
||||||
from mongoengine import signals
|
from mongoengine import signals
|
||||||
|
from mongoengine.connection import get_db
|
||||||
from mongoengine.common import _import_class
|
from mongoengine.common import _import_class
|
||||||
from mongoengine.base.common import get_document
|
from mongoengine.base.common import get_document
|
||||||
from mongoengine.errors import (OperationError, NotUniqueError,
|
from mongoengine.errors import (OperationError, NotUniqueError,
|
||||||
@ -923,7 +925,36 @@ class BaseQuerySet(object):
|
|||||||
map_reduce_function = 'inline_map_reduce'
|
map_reduce_function = 'inline_map_reduce'
|
||||||
else:
|
else:
|
||||||
map_reduce_function = 'map_reduce'
|
map_reduce_function = 'map_reduce'
|
||||||
mr_args['out'] = output
|
|
||||||
|
if isinstance(output, basestring):
|
||||||
|
mr_args['out'] = output
|
||||||
|
|
||||||
|
elif isinstance(output, dict):
|
||||||
|
ordered_output = []
|
||||||
|
|
||||||
|
for part in ('replace', 'merge', 'reduce'):
|
||||||
|
value = output.get(part)
|
||||||
|
if value:
|
||||||
|
ordered_output.append((part, value))
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise OperationError("actionData not specified for output")
|
||||||
|
|
||||||
|
db_alias = output.get('db_alias')
|
||||||
|
remaing_args = ['db', 'sharded', 'nonAtomic']
|
||||||
|
|
||||||
|
if db_alias:
|
||||||
|
ordered_output.append(('db', get_db(db_alias).name))
|
||||||
|
del remaing_args[0]
|
||||||
|
|
||||||
|
|
||||||
|
for part in remaing_args:
|
||||||
|
value = output.get(part)
|
||||||
|
if value:
|
||||||
|
ordered_output.append((part, value))
|
||||||
|
|
||||||
|
mr_args['out'] = SON(ordered_output)
|
||||||
|
|
||||||
results = getattr(queryset._collection, map_reduce_function)(
|
results = getattr(queryset._collection, map_reduce_function)(
|
||||||
map_f, reduce_f, **mr_args)
|
map_f, reduce_f, **mr_args)
|
||||||
|
@ -14,7 +14,7 @@ from pymongo.read_preferences import ReadPreference
|
|||||||
from bson import ObjectId
|
from bson import ObjectId
|
||||||
|
|
||||||
from mongoengine import *
|
from mongoengine import *
|
||||||
from mongoengine.connection import get_connection
|
from mongoengine.connection import get_connection, get_db
|
||||||
from mongoengine.python_support import PY3
|
from mongoengine.python_support import PY3
|
||||||
from mongoengine.context_managers import query_counter
|
from mongoengine.context_managers import query_counter
|
||||||
from mongoengine.queryset import (QuerySet, QuerySetManager,
|
from mongoengine.queryset import (QuerySet, QuerySetManager,
|
||||||
@ -1925,6 +1925,140 @@ class QuerySetTest(unittest.TestCase):
|
|||||||
|
|
||||||
BlogPost.drop_collection()
|
BlogPost.drop_collection()
|
||||||
|
|
||||||
|
def test_map_reduce_custom_output(self):
|
||||||
|
"""
|
||||||
|
Test map/reduce custom output
|
||||||
|
"""
|
||||||
|
register_connection('test2', 'mongoenginetest2')
|
||||||
|
|
||||||
|
class Family(Document):
|
||||||
|
id = IntField(
|
||||||
|
primary_key=True)
|
||||||
|
log = StringField()
|
||||||
|
|
||||||
|
class Person(Document):
|
||||||
|
id = IntField(
|
||||||
|
primary_key=True)
|
||||||
|
name = StringField()
|
||||||
|
age = IntField()
|
||||||
|
family = ReferenceField(Family)
|
||||||
|
|
||||||
|
Family.drop_collection()
|
||||||
|
Person.drop_collection()
|
||||||
|
|
||||||
|
# creating first family
|
||||||
|
f1 = Family(id=1, log="Trav 02 de Julho")
|
||||||
|
f1.save()
|
||||||
|
|
||||||
|
# persons of first family
|
||||||
|
Person(id=1, family=f1, name=u"Wilson Jr", age=21).save()
|
||||||
|
Person(id=2, family=f1, name=u"Wilson Father", age=45).save()
|
||||||
|
Person(id=3, family=f1, name=u"Eliana Costa", age=40).save()
|
||||||
|
Person(id=4, family=f1, name=u"Tayza Mariana", age=17).save()
|
||||||
|
|
||||||
|
# creating second family
|
||||||
|
f2 = Family(id=2, log="Av prof frasc brunno")
|
||||||
|
f2.save()
|
||||||
|
|
||||||
|
#persons of second family
|
||||||
|
Person(id=5, family=f2, name="Isabella Luanna", age=16).save()
|
||||||
|
Person(id=6, family=f2, name="Sandra Mara", age=36).save()
|
||||||
|
Person(id=7, family=f2, name="Igor Gabriel", age=10).save()
|
||||||
|
|
||||||
|
# creating third family
|
||||||
|
f3 = Family(id=3, log="Av brazil")
|
||||||
|
f3.save()
|
||||||
|
|
||||||
|
#persons of thrird family
|
||||||
|
Person(id=8, family=f3, name="Arthur WA", age=30).save()
|
||||||
|
Person(id=9, family=f3, name="Paula Leonel", age=25).save()
|
||||||
|
|
||||||
|
# executing join map/reduce
|
||||||
|
map_person = """
|
||||||
|
function () {
|
||||||
|
emit(this.family, {
|
||||||
|
totalAge: this.age,
|
||||||
|
persons: [{
|
||||||
|
name: this.name,
|
||||||
|
age: this.age
|
||||||
|
}]});
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
map_family = """
|
||||||
|
function () {
|
||||||
|
emit(this._id, {
|
||||||
|
totalAge: 0,
|
||||||
|
persons: []
|
||||||
|
});
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
reduce_f = """
|
||||||
|
function (key, values) {
|
||||||
|
var family = {persons: [], totalAge: 0};
|
||||||
|
|
||||||
|
values.forEach(function(value) {
|
||||||
|
if (value.persons) {
|
||||||
|
value.persons.forEach(function (person) {
|
||||||
|
family.persons.push(person);
|
||||||
|
family.totalAge += person.age;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return family;
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
cursor = Family.objects.map_reduce(
|
||||||
|
map_f=map_family,
|
||||||
|
reduce_f=reduce_f,
|
||||||
|
output={'replace': 'family_map', 'db_alias': 'test2'})
|
||||||
|
|
||||||
|
# start a map/reduce
|
||||||
|
cursor.next()
|
||||||
|
|
||||||
|
results = Person.objects.map_reduce(
|
||||||
|
map_f=map_person,
|
||||||
|
reduce_f=reduce_f,
|
||||||
|
output={'reduce': 'family_map', 'db_alias': 'test2'})
|
||||||
|
|
||||||
|
results = list(results)
|
||||||
|
collection = get_db('test2').family_map
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
collection.find_one({'_id': 1}), {
|
||||||
|
'_id': 1,
|
||||||
|
'value': {
|
||||||
|
'persons': [
|
||||||
|
{'age': 21, 'name': u'Wilson Jr'},
|
||||||
|
{'age': 45, 'name': u'Wilson Father'},
|
||||||
|
{'age': 40, 'name': u'Eliana Costa'},
|
||||||
|
{'age': 17, 'name': u'Tayza Mariana'}],
|
||||||
|
'totalAge': 123}
|
||||||
|
})
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
collection.find_one({'_id': 2}), {
|
||||||
|
'_id': 2,
|
||||||
|
'value': {
|
||||||
|
'persons': [
|
||||||
|
{'age': 16, 'name': u'Isabella Luanna'},
|
||||||
|
{'age': 36, 'name': u'Sandra Mara'},
|
||||||
|
{'age': 10, 'name': u'Igor Gabriel'}],
|
||||||
|
'totalAge': 62}
|
||||||
|
})
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
collection.find_one({'_id': 3}), {
|
||||||
|
'_id': 3,
|
||||||
|
'value': {
|
||||||
|
'persons': [
|
||||||
|
{'age': 30, 'name': u'Arthur WA'},
|
||||||
|
{'age': 25, 'name': u'Paula Leonel'}],
|
||||||
|
'totalAge': 55}
|
||||||
|
})
|
||||||
|
|
||||||
def test_map_reduce_finalize(self):
|
def test_map_reduce_finalize(self):
|
||||||
"""Ensure that map, reduce, and finalize run and introduce "scope"
|
"""Ensure that map, reduce, and finalize run and introduce "scope"
|
||||||
by simulating "hotness" ranking with Reddit algorithm.
|
by simulating "hotness" ranking with Reddit algorithm.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user