Added Custom Objects Managers

Managers can now be directly declared in a Document eg::

```python
    class CustomQuerySetManager(QuerySetManager):

        @staticmethod
        def get_queryset(doc_cls, queryset):
            return queryset(is_published=True)

    class Post(Document):
        is_published = BooleanField(default=False)
        published = CustomQuerySetManager()
```

Refactored the name of the `_manager_func` to `get_queryset` to mark it as
part the public API.  If declaring a Manager with a get_queryset method, it
should be a staticmethod, that accepts the document_class and the queryset.

Note - you can still use decorators in fact in the example below,
we effectively do the same thing as the first example and is much less verbose.

```python

    class Post(Document):
        is_published = BooleanField(default=False)

        @queryset_manager
        def published(doc_cls, queryset):
            return queryset(is_published=True)
```

Thanks to @theojulienne for the initial impetus and code sample #108
This commit is contained in:
Ross Lawley 2011-05-24 11:26:46 +01:00
parent 1b72ea9cc1
commit 1126c85903
3 changed files with 63 additions and 12 deletions

View File

@ -299,8 +299,10 @@ class TopLevelDocumentMetaclass(DocumentMetaclass):
new_class = super_new(cls, name, bases, attrs)
# Provide a default queryset unless one has been manually provided
if not hasattr(new_class, 'objects'):
new_class.objects = QuerySetManager()
manager = attrs.get('objects', QuerySetManager())
if hasattr(manager, 'queryset_class'):
meta['queryset_class'] = manager.queryset_class
new_class.objects = manager
user_indexes = [QuerySet._build_index_spec(new_class, spec)
for spec in meta['indexes']] + base_indexes

View File

@ -428,8 +428,6 @@ class QuerySet(object):
querying collection
:param query: Django-style query keyword arguments
"""
#if q_obj:
#self._where_clause = q_obj.as_js(self._document)
query = Q(**query)
if q_obj:
query &= q_obj
@ -1308,8 +1306,11 @@ class QuerySet(object):
class QuerySetManager(object):
def __init__(self, manager_func=None):
self._manager_func = manager_func
get_queryset = None
def __init__(self, queryset_func=None):
if queryset_func:
self.get_queryset = queryset_func
self._collections = {}
def __get__(self, instance, owner):
@ -1353,11 +1354,11 @@ class QuerySetManager(object):
# owner is the document that contains the QuerySetManager
queryset_class = owner._meta['queryset_class'] or QuerySet
queryset = queryset_class(owner, self._collections[(db, collection)])
if self._manager_func:
if self._manager_func.func_code.co_argcount == 1:
queryset = self._manager_func(queryset)
if self.get_queryset:
if self.get_queryset.func_code.co_argcount == 1:
queryset = self.get_queryset(queryset)
else:
queryset = self._manager_func(owner, queryset)
queryset = self.get_queryset(owner, queryset)
return queryset

View File

@ -5,8 +5,9 @@ import unittest
import pymongo
from datetime import datetime, timedelta
from mongoengine.queryset import (QuerySet, MultipleObjectsReturned,
DoesNotExist, QueryFieldList)
from mongoengine.queryset import (QuerySet, QuerySetManager,
MultipleObjectsReturned, DoesNotExist,
QueryFieldList)
from mongoengine import *
@ -1737,6 +1738,53 @@ class QuerySetTest(unittest.TestCase):
Post.drop_collection()
def test_custom_querysets_set_manager_directly(self):
"""Ensure that custom QuerySet classes may be used.
"""
class CustomQuerySet(QuerySet):
def not_empty(self):
return len(self) > 0
class CustomQuerySetManager(QuerySetManager):
queryset_class = CustomQuerySet
class Post(Document):
objects = CustomQuerySetManager()
Post.drop_collection()
self.assertTrue(isinstance(Post.objects, CustomQuerySet))
self.assertFalse(Post.objects.not_empty())
Post().save()
self.assertTrue(Post.objects.not_empty())
Post.drop_collection()
def test_custom_querysets_managers_directly(self):
"""Ensure that custom QuerySet classes may be used.
"""
class CustomQuerySetManager(QuerySetManager):
@staticmethod
def get_queryset(doc_cls, queryset):
return queryset(is_published=True)
class Post(Document):
is_published = BooleanField(default=False)
published = CustomQuerySetManager()
Post.drop_collection()
Post().save()
Post(is_published=True).save()
self.assertEquals(Post.objects.count(), 2)
self.assertEquals(Post.published.count(), 1)
Post.drop_collection()
def test_call_after_limits_set(self):
"""Ensure that re-filtering after slicing works
"""