Added support for the $not operator
This commit is contained in:
parent
76cb851c40
commit
b4c54b1b62
@ -265,6 +265,124 @@ class Q(QNode):
|
||||
return not bool(self.query)
|
||||
|
||||
|
||||
class OldQ(object):
|
||||
|
||||
OR = '||'
|
||||
AND = '&&'
|
||||
OPERATORS = {
|
||||
'eq': ('((this.%(field)s instanceof Array) && '
|
||||
' this.%(field)s.indexOf(%(value)s) != -1) ||'
|
||||
' this.%(field)s == %(value)s'),
|
||||
'ne': 'this.%(field)s != %(value)s',
|
||||
'gt': 'this.%(field)s > %(value)s',
|
||||
'gte': 'this.%(field)s >= %(value)s',
|
||||
'lt': 'this.%(field)s < %(value)s',
|
||||
'lte': 'this.%(field)s <= %(value)s',
|
||||
'lte': 'this.%(field)s <= %(value)s',
|
||||
'in': '%(value)s.indexOf(this.%(field)s) != -1',
|
||||
'nin': '%(value)s.indexOf(this.%(field)s) == -1',
|
||||
'mod': '%(field)s %% %(value)s',
|
||||
'all': ('%(value)s.every(function(a){'
|
||||
'return this.%(field)s.indexOf(a) != -1 })'),
|
||||
'size': 'this.%(field)s.length == %(value)s',
|
||||
'exists': 'this.%(field)s != null',
|
||||
'regex_eq': '%(value)s.test(this.%(field)s)',
|
||||
'regex_ne': '!%(value)s.test(this.%(field)s)',
|
||||
}
|
||||
|
||||
def __init__(self, **query):
|
||||
self.query = [query]
|
||||
|
||||
def _combine(self, other, op):
|
||||
obj = Q()
|
||||
if not other.query[0]:
|
||||
return self
|
||||
if self.query[0]:
|
||||
obj.query = (['('] + copy.deepcopy(self.query) + [op] +
|
||||
copy.deepcopy(other.query) + [')'])
|
||||
else:
|
||||
obj.query = copy.deepcopy(other.query)
|
||||
return obj
|
||||
|
||||
def __or__(self, other):
|
||||
return self._combine(other, self.OR)
|
||||
|
||||
def __and__(self, other):
|
||||
return self._combine(other, self.AND)
|
||||
|
||||
def as_js(self, document):
|
||||
js = []
|
||||
js_scope = {}
|
||||
for i, item in enumerate(self.query):
|
||||
if isinstance(item, dict):
|
||||
item_query = QuerySet._transform_query(document, **item)
|
||||
# item_query will values will either be a value or a dict
|
||||
js.append(self._item_query_as_js(item_query, js_scope, i))
|
||||
else:
|
||||
js.append(item)
|
||||
return pymongo.code.Code(' '.join(js), js_scope)
|
||||
|
||||
def _item_query_as_js(self, item_query, js_scope, item_num):
|
||||
# item_query will be in one of the following forms
|
||||
# {'age': 25, 'name': 'Test'}
|
||||
# {'age': {'$lt': 25}, 'name': {'$in': ['Test', 'Example']}
|
||||
# {'age': {'$lt': 25, '$gt': 18}}
|
||||
js = []
|
||||
for i, (key, value) in enumerate(item_query.items()):
|
||||
op = 'eq'
|
||||
# Construct a variable name for the value in the JS
|
||||
value_name = 'i%sf%s' % (item_num, i)
|
||||
if isinstance(value, dict):
|
||||
# Multiple operators for this field
|
||||
for j, (op, value) in enumerate(value.items()):
|
||||
# Create a custom variable name for this operator
|
||||
op_value_name = '%so%s' % (value_name, j)
|
||||
# Construct the JS that uses this op
|
||||
value, operation_js = self._build_op_js(op, key, value,
|
||||
op_value_name)
|
||||
# Update the js scope with the value for this op
|
||||
js_scope[op_value_name] = value
|
||||
js.append(operation_js)
|
||||
else:
|
||||
# Construct the JS for this field
|
||||
value, field_js = self._build_op_js(op, key, value, value_name)
|
||||
js_scope[value_name] = value
|
||||
js.append(field_js)
|
||||
return ' && '.join(js)
|
||||
|
||||
def _build_op_js(self, op, key, value, value_name):
|
||||
"""Substitute the values in to the correct chunk of Javascript.
|
||||
"""
|
||||
if isinstance(value, RE_TYPE):
|
||||
# Regexes are handled specially
|
||||
if op.strip('$') == 'ne':
|
||||
op_js = Q.OPERATORS['regex_ne']
|
||||
else:
|
||||
op_js = Q.OPERATORS['regex_eq']
|
||||
else:
|
||||
op_js = Q.OPERATORS[op.strip('$')]
|
||||
|
||||
# Comparing two ObjectIds in Javascript doesn't work..
|
||||
if isinstance(value, pymongo.objectid.ObjectId):
|
||||
value = unicode(value)
|
||||
|
||||
# Handle DBRef
|
||||
if isinstance(value, pymongo.dbref.DBRef):
|
||||
op_js = '(this.%(field)s.$id == "%(id)s" &&'\
|
||||
' this.%(field)s.$ref == "%(ref)s")' % {
|
||||
'field': key,
|
||||
'id': unicode(value.id),
|
||||
'ref': unicode(value.collection)
|
||||
}
|
||||
value = None
|
||||
|
||||
# Perform the substitution
|
||||
operation_js = op_js % {
|
||||
'field': key,
|
||||
'value': value_name
|
||||
}
|
||||
return value, operation_js
|
||||
|
||||
class QuerySet(object):
|
||||
"""A set of results returned from a query. Wraps a MongoDB cursor,
|
||||
providing :class:`~mongoengine.Document` objects as the results.
|
||||
@ -462,7 +580,7 @@ class QuerySet(object):
|
||||
"""Transform a query from Django-style format to Mongo format.
|
||||
"""
|
||||
operators = ['ne', 'gt', 'gte', 'lt', 'lte', 'in', 'nin', 'mod',
|
||||
'all', 'size', 'exists']
|
||||
'all', 'size', 'exists', 'not']
|
||||
geo_operators = ['within_distance', 'within_box', 'near']
|
||||
match_operators = ['contains', 'icontains', 'startswith',
|
||||
'istartswith', 'endswith', 'iendswith',
|
||||
@ -478,6 +596,11 @@ class QuerySet(object):
|
||||
if parts[-1] in operators + match_operators + geo_operators:
|
||||
op = parts.pop()
|
||||
|
||||
negate = False
|
||||
if parts[-1] == 'not':
|
||||
parts.pop()
|
||||
negate = True
|
||||
|
||||
if _doc_cls:
|
||||
# Switch field names to proper names [set in Field(name='foo')]
|
||||
fields = QuerySet._lookup_field(_doc_cls, parts)
|
||||
@ -485,7 +608,7 @@ class QuerySet(object):
|
||||
|
||||
# Convert value to proper value
|
||||
field = fields[-1]
|
||||
singular_ops = [None, 'ne', 'gt', 'gte', 'lt', 'lte']
|
||||
singular_ops = [None, 'ne', 'gt', 'gte', 'lt', 'lte', 'not']
|
||||
singular_ops += match_operators
|
||||
if op in singular_ops:
|
||||
value = field.prepare_query_value(op, value)
|
||||
@ -511,6 +634,9 @@ class QuerySet(object):
|
||||
elif op not in match_operators:
|
||||
value = {'$' + op: value}
|
||||
|
||||
if negate:
|
||||
value = {'$not': value}
|
||||
|
||||
for i, part in indices:
|
||||
parts.insert(i, part)
|
||||
key = '.'.join(parts)
|
||||
|
@ -336,6 +336,18 @@ class QuerySetTest(unittest.TestCase):
|
||||
obj = self.Person.objects(Q(name__icontains='[.\'Geek')).first()
|
||||
self.assertEqual(obj, person)
|
||||
|
||||
def test_not(self):
|
||||
"""Ensure that the __not operator works as expected.
|
||||
"""
|
||||
alice = self.Person(name='Alice', age=25)
|
||||
alice.save()
|
||||
|
||||
obj = self.Person.objects(name__iexact='alice').first()
|
||||
self.assertEqual(obj, alice)
|
||||
|
||||
obj = self.Person.objects(name__not__iexact='alice').first()
|
||||
self.assertEqual(obj, None)
|
||||
|
||||
def test_filter_chaining(self):
|
||||
"""Ensure filters can be chained together.
|
||||
"""
|
||||
@ -545,10 +557,10 @@ class QuerySetTest(unittest.TestCase):
|
||||
obj = self.Person.objects(Q(name=re.compile('^gui', re.I))).first()
|
||||
self.assertEqual(obj, person)
|
||||
|
||||
obj = self.Person.objects(Q(name__ne=re.compile('^bob'))).first()
|
||||
obj = self.Person.objects(Q(name__not=re.compile('^bob'))).first()
|
||||
self.assertEqual(obj, person)
|
||||
|
||||
obj = self.Person.objects(Q(name__ne=re.compile('^Gui'))).first()
|
||||
obj = self.Person.objects(Q(name__not=re.compile('^Gui'))).first()
|
||||
self.assertEqual(obj, None)
|
||||
|
||||
def test_q_lists(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user