Merge branch 'master' into feature/allow-setting-read-concern-queryset
This commit is contained in:
@@ -11,8 +11,6 @@ from pymongo.read_concern import ReadConcern
|
||||
from pymongo.read_preferences import ReadPreference
|
||||
from pymongo.results import UpdateResult
|
||||
import pytest
|
||||
import six
|
||||
from six import iteritems
|
||||
|
||||
from mongoengine import *
|
||||
from mongoengine.connection import get_db
|
||||
@@ -111,7 +109,7 @@ class TestQueryset(unittest.TestCase):
|
||||
# Filter people by age
|
||||
people = self.Person.objects(age=20)
|
||||
assert people.count() == 1
|
||||
person = people.next()
|
||||
person = next(people)
|
||||
assert person == user_a
|
||||
assert person.name == "User A"
|
||||
assert person.age == 20
|
||||
@@ -119,7 +117,7 @@ class TestQueryset(unittest.TestCase):
|
||||
def test_limit(self):
|
||||
"""Ensure that QuerySet.limit works as expected."""
|
||||
user_a = self.Person.objects.create(name="User A", age=20)
|
||||
user_b = self.Person.objects.create(name="User B", age=30)
|
||||
_ = self.Person.objects.create(name="User B", age=30)
|
||||
|
||||
# Test limit on a new queryset
|
||||
people = list(self.Person.objects.limit(1))
|
||||
@@ -151,6 +149,11 @@ class TestQueryset(unittest.TestCase):
|
||||
user_b = self.Person.objects.create(name="User B", age=30)
|
||||
|
||||
# Test skip on a new queryset
|
||||
people = list(self.Person.objects.skip(0))
|
||||
assert len(people) == 2
|
||||
assert people[0] == user_a
|
||||
assert people[1] == user_b
|
||||
|
||||
people = list(self.Person.objects.skip(1))
|
||||
assert len(people) == 1
|
||||
assert people[0] == user_b
|
||||
@@ -275,32 +278,47 @@ class TestQueryset(unittest.TestCase):
|
||||
with pytest.raises(InvalidQueryError):
|
||||
self.Person.objects(name="User A").with_id(person1.id)
|
||||
|
||||
def test_find_only_one(self):
|
||||
"""Ensure that a query using ``get`` returns at most one result.
|
||||
"""
|
||||
def test_get_no_document_exists_raises_doesnotexist(self):
|
||||
assert self.Person.objects.count() == 0
|
||||
# Try retrieving when no objects exists
|
||||
with pytest.raises(DoesNotExist):
|
||||
self.Person.objects.get()
|
||||
with pytest.raises(self.Person.DoesNotExist):
|
||||
self.Person.objects.get()
|
||||
|
||||
def test_get_multiple_match_raises_multipleobjectsreturned(self):
|
||||
"""Ensure that a query using ``get`` returns at most one result.
|
||||
"""
|
||||
assert self.Person.objects().count() == 0
|
||||
|
||||
person1 = self.Person(name="User A", age=20)
|
||||
person1.save()
|
||||
person2 = self.Person(name="User B", age=30)
|
||||
|
||||
p = self.Person.objects.get()
|
||||
assert p == person1
|
||||
|
||||
person2 = self.Person(name="User B", age=20)
|
||||
person2.save()
|
||||
|
||||
# Retrieve the first person from the database
|
||||
person3 = self.Person(name="User C", age=30)
|
||||
person3.save()
|
||||
|
||||
# .get called without argument
|
||||
with pytest.raises(MultipleObjectsReturned):
|
||||
self.Person.objects.get()
|
||||
with pytest.raises(self.Person.MultipleObjectsReturned):
|
||||
self.Person.objects.get()
|
||||
|
||||
# check filtering
|
||||
with pytest.raises(MultipleObjectsReturned):
|
||||
self.Person.objects.get(age__lt=30)
|
||||
with pytest.raises(MultipleObjectsReturned) as exc_info:
|
||||
self.Person.objects(age__lt=30).get()
|
||||
assert "2 or more items returned, instead of 1" == str(exc_info.value)
|
||||
|
||||
# Use a query to filter the people found to just person2
|
||||
person = self.Person.objects.get(age=30)
|
||||
assert person.name == "User B"
|
||||
|
||||
person = self.Person.objects.get(age__lt=30)
|
||||
assert person.name == "User A"
|
||||
assert person == person3
|
||||
|
||||
def test_find_array_position(self):
|
||||
"""Ensure that query by array position works.
|
||||
@@ -2574,13 +2592,8 @@ class TestQueryset(unittest.TestCase):
|
||||
age = IntField()
|
||||
|
||||
with db_ops_tracker() as q:
|
||||
adult1 = (
|
||||
User.objects.filter(age__gte=18).comment("looking for an adult").first()
|
||||
)
|
||||
|
||||
adult2 = (
|
||||
User.objects.comment("looking for an adult").filter(age__gte=18).first()
|
||||
)
|
||||
User.objects.filter(age__gte=18).comment("looking for an adult").first()
|
||||
User.objects.comment("looking for an adult").filter(age__gte=18).first()
|
||||
|
||||
ops = q.get_ops()
|
||||
assert len(ops) == 2
|
||||
@@ -2769,7 +2782,7 @@ class TestQueryset(unittest.TestCase):
|
||||
)
|
||||
|
||||
# start a map/reduce
|
||||
cursor.next()
|
||||
next(cursor)
|
||||
|
||||
results = Person.objects.map_reduce(
|
||||
map_f=map_person,
|
||||
@@ -4094,7 +4107,7 @@ class TestQueryset(unittest.TestCase):
|
||||
info = Comment.objects._collection.index_information()
|
||||
info = [
|
||||
(value["key"], value.get("unique", False), value.get("sparse", False))
|
||||
for key, value in iteritems(info)
|
||||
for key, value in info.items()
|
||||
]
|
||||
assert ([("_cls", 1), ("message", 1)], False, False) in info
|
||||
|
||||
@@ -4396,7 +4409,7 @@ class TestQueryset(unittest.TestCase):
|
||||
# Use a query to filter the people found to just person1
|
||||
people = self.Person.objects(age=20).scalar("name")
|
||||
assert people.count() == 1
|
||||
person = people.next()
|
||||
person = next(people)
|
||||
assert person == "User A"
|
||||
|
||||
# Test limit
|
||||
@@ -4446,24 +4459,14 @@ class TestQueryset(unittest.TestCase):
|
||||
"A0" == "%s" % self.Person.objects.order_by("name").scalar("name").first()
|
||||
)
|
||||
assert "A0" == "%s" % self.Person.objects.scalar("name").order_by("name")[0]
|
||||
if six.PY3:
|
||||
assert (
|
||||
"['A1', 'A2']"
|
||||
== "%s" % self.Person.objects.order_by("age").scalar("name")[1:3]
|
||||
)
|
||||
assert (
|
||||
"['A51', 'A52']"
|
||||
== "%s" % self.Person.objects.order_by("age").scalar("name")[51:53]
|
||||
)
|
||||
else:
|
||||
assert (
|
||||
"[u'A1', u'A2']"
|
||||
== "%s" % self.Person.objects.order_by("age").scalar("name")[1:3]
|
||||
)
|
||||
assert (
|
||||
"[u'A51', u'A52']"
|
||||
== "%s" % self.Person.objects.order_by("age").scalar("name")[51:53]
|
||||
)
|
||||
assert (
|
||||
"['A1', 'A2']"
|
||||
== "%s" % self.Person.objects.order_by("age").scalar("name")[1:3]
|
||||
)
|
||||
assert (
|
||||
"['A51', 'A52']"
|
||||
== "%s" % self.Person.objects.order_by("age").scalar("name")[51:53]
|
||||
)
|
||||
|
||||
# with_id and in_bulk
|
||||
person = self.Person.objects.order_by("name").first()
|
||||
@@ -4471,10 +4474,7 @@ class TestQueryset(unittest.TestCase):
|
||||
|
||||
pks = self.Person.objects.order_by("age").scalar("pk")[1:3]
|
||||
names = self.Person.objects.scalar("name").in_bulk(list(pks)).values()
|
||||
if six.PY3:
|
||||
expected = "['A1', 'A2']"
|
||||
else:
|
||||
expected = "[u'A1', u'A2']"
|
||||
expected = "['A1', 'A2']"
|
||||
assert expected == "%s" % sorted(names)
|
||||
|
||||
def test_fields(self):
|
||||
@@ -4519,7 +4519,7 @@ class TestQueryset(unittest.TestCase):
|
||||
|
||||
foos_without_y = list(Foo.objects.order_by("y").fields(y=0))
|
||||
|
||||
assert all(o.y is None for o in foos_with_x)
|
||||
assert all(o.y is None for o in foos_without_y)
|
||||
|
||||
foos_with_sliced_items = list(Foo.objects.order_by("y").fields(slice__items=1))
|
||||
|
||||
@@ -5403,7 +5403,7 @@ class TestQueryset(unittest.TestCase):
|
||||
if not test:
|
||||
raise AssertionError("Cursor has data and returned False")
|
||||
|
||||
queryset.next()
|
||||
next(queryset)
|
||||
if not queryset:
|
||||
raise AssertionError(
|
||||
"Cursor has data and it must returns True, even in the last item."
|
||||
@@ -5636,7 +5636,7 @@ class TestQueryset(unittest.TestCase):
|
||||
self.Person.objects.create(name="Baz")
|
||||
assert self.Person.objects.count(with_limit_and_skip=True) == 3
|
||||
|
||||
newPerson = self.Person.objects.create(name="Foo_1")
|
||||
self.Person.objects.create(name="Foo_1")
|
||||
assert self.Person.objects.count(with_limit_and_skip=True) == 4
|
||||
|
||||
def test_no_cursor_timeout(self):
|
||||
|
||||
Reference in New Issue
Block a user