Fixed GridFSProxy __getattr__ behaviour (#196)
This commit is contained in:
parent
485b811bd0
commit
c5b047d0cd
3
AUTHORS
3
AUTHORS
@ -130,4 +130,5 @@ that much better:
|
|||||||
* Jakub Kot
|
* Jakub Kot
|
||||||
* Jorge Bastida
|
* Jorge Bastida
|
||||||
* Stefan Wójcik
|
* Stefan Wójcik
|
||||||
* Pete Campton
|
* Pete Campton
|
||||||
|
* Martyn Smith
|
@ -22,6 +22,7 @@ Changes in 0.8.X
|
|||||||
- Only allow QNode instances to be passed as query objects (#199)
|
- Only allow QNode instances to be passed as query objects (#199)
|
||||||
- Dynamic fields are now validated on save (#153) (#154)
|
- Dynamic fields are now validated on save (#153) (#154)
|
||||||
- Added support for multiple slices and made slicing chainable. (#170) (#190) (#191)
|
- Added support for multiple slices and made slicing chainable. (#170) (#190) (#191)
|
||||||
|
- Fixed GridFSProxy __getattr__ behaviour (#196)
|
||||||
|
|
||||||
Changes in 0.7.9
|
Changes in 0.7.9
|
||||||
================
|
================
|
||||||
|
@ -18,20 +18,10 @@ a document is created to store details about animals, including a photo::
|
|||||||
family = StringField()
|
family = StringField()
|
||||||
photo = FileField()
|
photo = FileField()
|
||||||
|
|
||||||
marmot = Animal('Marmota', 'Sciuridae')
|
marmot = Animal(genus='Marmota', family='Sciuridae')
|
||||||
|
|
||||||
marmot_photo = open('marmot.jpg', 'r') # Retrieve a photo from disk
|
|
||||||
marmot.photo = marmot_photo # Store photo in the document
|
|
||||||
marmot.photo.content_type = 'image/jpeg' # Store metadata
|
|
||||||
|
|
||||||
marmot.save()
|
|
||||||
|
|
||||||
Another way of writing to a :class:`~mongoengine.FileField` is to use the
|
|
||||||
:func:`put` method. This allows for metadata to be stored in the same call as
|
|
||||||
the file::
|
|
||||||
|
|
||||||
marmot.photo.put(marmot_photo, content_type='image/jpeg')
|
|
||||||
|
|
||||||
|
marmot_photo = open('marmot.jpg', 'r')
|
||||||
|
marmot.photo.put(marmot_photo, content_type = 'image/jpeg')
|
||||||
marmot.save()
|
marmot.save()
|
||||||
|
|
||||||
Retrieval
|
Retrieval
|
||||||
|
@ -969,7 +969,7 @@ class GridFSProxy(object):
|
|||||||
if name in attrs:
|
if name in attrs:
|
||||||
return self.__getattribute__(name)
|
return self.__getattribute__(name)
|
||||||
obj = self.get()
|
obj = self.get()
|
||||||
if name in dir(obj):
|
if hasattr(obj, name):
|
||||||
return getattr(obj, name)
|
return getattr(obj, name)
|
||||||
raise AttributeError
|
raise AttributeError
|
||||||
|
|
||||||
|
@ -19,7 +19,8 @@ from mongoengine.queryset import NULLIFY, Q
|
|||||||
from mongoengine.connection import get_db
|
from mongoengine.connection import get_db
|
||||||
from mongoengine.base import get_document
|
from mongoengine.base import get_document
|
||||||
|
|
||||||
TEST_IMAGE_PATH = os.path.join(os.path.dirname(__file__), 'mongoengine.png')
|
TEST_IMAGE_PATH = os.path.join(os.path.dirname(__file__),
|
||||||
|
'../fields/mongoengine.png')
|
||||||
|
|
||||||
__all__ = ("InstanceTest",)
|
__all__ = ("InstanceTest",)
|
||||||
|
|
||||||
|
2
tests/fields/__init__.py
Normal file
2
tests/fields/__init__.py
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
from fields import *
|
||||||
|
from file import *
|
@ -4,24 +4,21 @@ import sys
|
|||||||
sys.path[0:0] = [""]
|
sys.path[0:0] = [""]
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
import os
|
|
||||||
import unittest
|
import unittest
|
||||||
import uuid
|
import uuid
|
||||||
import tempfile
|
|
||||||
|
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
|
|
||||||
from bson import Binary, DBRef, ObjectId
|
from bson import Binary, DBRef, ObjectId
|
||||||
import gridfs
|
|
||||||
|
|
||||||
from nose.plugins.skip import SkipTest
|
|
||||||
from mongoengine import *
|
from mongoengine import *
|
||||||
from mongoengine.connection import get_db
|
from mongoengine.connection import get_db
|
||||||
from mongoengine.base import _document_registry
|
from mongoengine.base import _document_registry
|
||||||
from mongoengine.errors import NotRegistered
|
from mongoengine.errors import NotRegistered
|
||||||
from mongoengine.python_support import PY3, b, StringIO, bin_type
|
from mongoengine.python_support import PY3, b, bin_type
|
||||||
|
|
||||||
|
__all__ = ("FieldTest", )
|
||||||
|
|
||||||
TEST_IMAGE_PATH = os.path.join(os.path.dirname(__file__), 'document/mongoengine.png')
|
|
||||||
|
|
||||||
class FieldTest(unittest.TestCase):
|
class FieldTest(unittest.TestCase):
|
||||||
|
|
||||||
@ -1728,302 +1725,6 @@ class FieldTest(unittest.TestCase):
|
|||||||
|
|
||||||
Shirt.drop_collection()
|
Shirt.drop_collection()
|
||||||
|
|
||||||
def test_file_fields(self):
|
|
||||||
"""Ensure that file fields can be written to and their data retrieved
|
|
||||||
"""
|
|
||||||
class PutFile(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
class StreamFile(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
class SetFile(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
text = b('Hello, World!')
|
|
||||||
more_text = b('Foo Bar')
|
|
||||||
content_type = 'text/plain'
|
|
||||||
|
|
||||||
PutFile.drop_collection()
|
|
||||||
StreamFile.drop_collection()
|
|
||||||
SetFile.drop_collection()
|
|
||||||
|
|
||||||
putfile = PutFile()
|
|
||||||
putfile.the_file.put(text, content_type=content_type)
|
|
||||||
putfile.save()
|
|
||||||
putfile.validate()
|
|
||||||
result = PutFile.objects.first()
|
|
||||||
self.assertTrue(putfile == result)
|
|
||||||
self.assertEqual(result.the_file.read(), text)
|
|
||||||
self.assertEqual(result.the_file.content_type, content_type)
|
|
||||||
result.the_file.delete() # Remove file from GridFS
|
|
||||||
PutFile.objects.delete()
|
|
||||||
|
|
||||||
# Ensure file-like objects are stored
|
|
||||||
putfile = PutFile()
|
|
||||||
putstring = StringIO()
|
|
||||||
putstring.write(text)
|
|
||||||
putstring.seek(0)
|
|
||||||
putfile.the_file.put(putstring, content_type=content_type)
|
|
||||||
putfile.save()
|
|
||||||
putfile.validate()
|
|
||||||
result = PutFile.objects.first()
|
|
||||||
self.assertTrue(putfile == result)
|
|
||||||
self.assertEqual(result.the_file.read(), text)
|
|
||||||
self.assertEqual(result.the_file.content_type, content_type)
|
|
||||||
result.the_file.delete()
|
|
||||||
|
|
||||||
streamfile = StreamFile()
|
|
||||||
streamfile.the_file.new_file(content_type=content_type)
|
|
||||||
streamfile.the_file.write(text)
|
|
||||||
streamfile.the_file.write(more_text)
|
|
||||||
streamfile.the_file.close()
|
|
||||||
streamfile.save()
|
|
||||||
streamfile.validate()
|
|
||||||
result = StreamFile.objects.first()
|
|
||||||
self.assertTrue(streamfile == result)
|
|
||||||
self.assertEqual(result.the_file.read(), text + more_text)
|
|
||||||
self.assertEqual(result.the_file.content_type, content_type)
|
|
||||||
result.the_file.seek(0)
|
|
||||||
self.assertEqual(result.the_file.tell(), 0)
|
|
||||||
self.assertEqual(result.the_file.read(len(text)), text)
|
|
||||||
self.assertEqual(result.the_file.tell(), len(text))
|
|
||||||
self.assertEqual(result.the_file.read(len(more_text)), more_text)
|
|
||||||
self.assertEqual(result.the_file.tell(), len(text + more_text))
|
|
||||||
result.the_file.delete()
|
|
||||||
|
|
||||||
# Ensure deleted file returns None
|
|
||||||
self.assertTrue(result.the_file.read() == None)
|
|
||||||
|
|
||||||
setfile = SetFile()
|
|
||||||
setfile.the_file = text
|
|
||||||
setfile.save()
|
|
||||||
setfile.validate()
|
|
||||||
result = SetFile.objects.first()
|
|
||||||
self.assertTrue(setfile == result)
|
|
||||||
self.assertEqual(result.the_file.read(), text)
|
|
||||||
|
|
||||||
# Try replacing file with new one
|
|
||||||
result.the_file.replace(more_text)
|
|
||||||
result.save()
|
|
||||||
result.validate()
|
|
||||||
result = SetFile.objects.first()
|
|
||||||
self.assertTrue(setfile == result)
|
|
||||||
self.assertEqual(result.the_file.read(), more_text)
|
|
||||||
result.the_file.delete()
|
|
||||||
|
|
||||||
PutFile.drop_collection()
|
|
||||||
StreamFile.drop_collection()
|
|
||||||
SetFile.drop_collection()
|
|
||||||
|
|
||||||
# Make sure FileField is optional and not required
|
|
||||||
class DemoFile(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
DemoFile.objects.create()
|
|
||||||
|
|
||||||
def test_file_field_no_default(self):
|
|
||||||
|
|
||||||
class GridDocument(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
GridDocument.drop_collection()
|
|
||||||
|
|
||||||
with tempfile.TemporaryFile() as f:
|
|
||||||
f.write(b("Hello World!"))
|
|
||||||
f.flush()
|
|
||||||
|
|
||||||
# Test without default
|
|
||||||
doc_a = GridDocument()
|
|
||||||
doc_a.save()
|
|
||||||
|
|
||||||
doc_b = GridDocument.objects.with_id(doc_a.id)
|
|
||||||
doc_b.the_file.replace(f, filename='doc_b')
|
|
||||||
doc_b.save()
|
|
||||||
self.assertNotEqual(doc_b.the_file.grid_id, None)
|
|
||||||
|
|
||||||
# Test it matches
|
|
||||||
doc_c = GridDocument.objects.with_id(doc_b.id)
|
|
||||||
self.assertEqual(doc_b.the_file.grid_id, doc_c.the_file.grid_id)
|
|
||||||
|
|
||||||
# Test with default
|
|
||||||
doc_d = GridDocument(the_file=b(''))
|
|
||||||
doc_d.save()
|
|
||||||
|
|
||||||
doc_e = GridDocument.objects.with_id(doc_d.id)
|
|
||||||
self.assertEqual(doc_d.the_file.grid_id, doc_e.the_file.grid_id)
|
|
||||||
|
|
||||||
doc_e.the_file.replace(f, filename='doc_e')
|
|
||||||
doc_e.save()
|
|
||||||
|
|
||||||
doc_f = GridDocument.objects.with_id(doc_e.id)
|
|
||||||
self.assertEqual(doc_e.the_file.grid_id, doc_f.the_file.grid_id)
|
|
||||||
|
|
||||||
db = GridDocument._get_db()
|
|
||||||
grid_fs = gridfs.GridFS(db)
|
|
||||||
self.assertEqual(['doc_b', 'doc_e'], grid_fs.list())
|
|
||||||
|
|
||||||
def test_file_uniqueness(self):
|
|
||||||
"""Ensure that each instance of a FileField is unique
|
|
||||||
"""
|
|
||||||
class TestFile(Document):
|
|
||||||
name = StringField()
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
# First instance
|
|
||||||
test_file = TestFile()
|
|
||||||
test_file.name = "Hello, World!"
|
|
||||||
test_file.the_file.put(b('Hello, World!'))
|
|
||||||
test_file.save()
|
|
||||||
|
|
||||||
# Second instance
|
|
||||||
test_file_dupe = TestFile()
|
|
||||||
data = test_file_dupe.the_file.read() # Should be None
|
|
||||||
|
|
||||||
self.assertTrue(test_file.name != test_file_dupe.name)
|
|
||||||
self.assertTrue(test_file.the_file.read() != data)
|
|
||||||
|
|
||||||
TestFile.drop_collection()
|
|
||||||
|
|
||||||
def test_file_boolean(self):
|
|
||||||
"""Ensure that a boolean test of a FileField indicates its presence
|
|
||||||
"""
|
|
||||||
class TestFile(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
test_file = TestFile()
|
|
||||||
self.assertFalse(bool(test_file.the_file))
|
|
||||||
test_file.the_file = b('Hello, World!')
|
|
||||||
test_file.the_file.content_type = 'text/plain'
|
|
||||||
test_file.save()
|
|
||||||
self.assertTrue(bool(test_file.the_file))
|
|
||||||
|
|
||||||
TestFile.drop_collection()
|
|
||||||
|
|
||||||
def test_file_cmp(self):
|
|
||||||
"""Test comparing against other types"""
|
|
||||||
class TestFile(Document):
|
|
||||||
the_file = FileField()
|
|
||||||
|
|
||||||
test_file = TestFile()
|
|
||||||
self.assertFalse(test_file.the_file in [{"test": 1}])
|
|
||||||
|
|
||||||
def test_image_field(self):
|
|
||||||
if PY3:
|
|
||||||
raise SkipTest('PIL does not have Python 3 support')
|
|
||||||
|
|
||||||
class TestImage(Document):
|
|
||||||
image = ImageField()
|
|
||||||
|
|
||||||
TestImage.drop_collection()
|
|
||||||
|
|
||||||
t = TestImage()
|
|
||||||
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
|
||||||
t.save()
|
|
||||||
|
|
||||||
t = TestImage.objects.first()
|
|
||||||
|
|
||||||
self.assertEqual(t.image.format, 'PNG')
|
|
||||||
|
|
||||||
w, h = t.image.size
|
|
||||||
self.assertEqual(w, 371)
|
|
||||||
self.assertEqual(h, 76)
|
|
||||||
|
|
||||||
t.image.delete()
|
|
||||||
|
|
||||||
def test_image_field_resize(self):
|
|
||||||
if PY3:
|
|
||||||
raise SkipTest('PIL does not have Python 3 support')
|
|
||||||
|
|
||||||
class TestImage(Document):
|
|
||||||
image = ImageField(size=(185, 37))
|
|
||||||
|
|
||||||
TestImage.drop_collection()
|
|
||||||
|
|
||||||
t = TestImage()
|
|
||||||
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
|
||||||
t.save()
|
|
||||||
|
|
||||||
t = TestImage.objects.first()
|
|
||||||
|
|
||||||
self.assertEqual(t.image.format, 'PNG')
|
|
||||||
w, h = t.image.size
|
|
||||||
|
|
||||||
self.assertEqual(w, 185)
|
|
||||||
self.assertEqual(h, 37)
|
|
||||||
|
|
||||||
t.image.delete()
|
|
||||||
|
|
||||||
def test_image_field_resize_force(self):
|
|
||||||
if PY3:
|
|
||||||
raise SkipTest('PIL does not have Python 3 support')
|
|
||||||
|
|
||||||
class TestImage(Document):
|
|
||||||
image = ImageField(size=(185, 37, True))
|
|
||||||
|
|
||||||
TestImage.drop_collection()
|
|
||||||
|
|
||||||
t = TestImage()
|
|
||||||
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
|
||||||
t.save()
|
|
||||||
|
|
||||||
t = TestImage.objects.first()
|
|
||||||
|
|
||||||
self.assertEqual(t.image.format, 'PNG')
|
|
||||||
w, h = t.image.size
|
|
||||||
|
|
||||||
self.assertEqual(w, 185)
|
|
||||||
self.assertEqual(h, 37)
|
|
||||||
|
|
||||||
t.image.delete()
|
|
||||||
|
|
||||||
def test_image_field_thumbnail(self):
|
|
||||||
if PY3:
|
|
||||||
raise SkipTest('PIL does not have Python 3 support')
|
|
||||||
|
|
||||||
class TestImage(Document):
|
|
||||||
image = ImageField(thumbnail_size=(92, 18))
|
|
||||||
|
|
||||||
TestImage.drop_collection()
|
|
||||||
|
|
||||||
t = TestImage()
|
|
||||||
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
|
||||||
t.save()
|
|
||||||
|
|
||||||
t = TestImage.objects.first()
|
|
||||||
|
|
||||||
self.assertEqual(t.image.thumbnail.format, 'PNG')
|
|
||||||
self.assertEqual(t.image.thumbnail.width, 92)
|
|
||||||
self.assertEqual(t.image.thumbnail.height, 18)
|
|
||||||
|
|
||||||
t.image.delete()
|
|
||||||
|
|
||||||
def test_file_multidb(self):
|
|
||||||
register_connection('test_files', 'test_files')
|
|
||||||
class TestFile(Document):
|
|
||||||
name = StringField()
|
|
||||||
the_file = FileField(db_alias="test_files",
|
|
||||||
collection_name="macumba")
|
|
||||||
|
|
||||||
TestFile.drop_collection()
|
|
||||||
|
|
||||||
# delete old filesystem
|
|
||||||
get_db("test_files").macumba.files.drop()
|
|
||||||
get_db("test_files").macumba.chunks.drop()
|
|
||||||
|
|
||||||
# First instance
|
|
||||||
test_file = TestFile()
|
|
||||||
test_file.name = "Hello, World!"
|
|
||||||
test_file.the_file.put(b('Hello, World!'),
|
|
||||||
name="hello.txt")
|
|
||||||
test_file.save()
|
|
||||||
|
|
||||||
data = get_db("test_files").macumba.files.find_one()
|
|
||||||
self.assertEqual(data.get('name'), 'hello.txt')
|
|
||||||
|
|
||||||
test_file = TestFile.objects.first()
|
|
||||||
self.assertEqual(test_file.the_file.read(),
|
|
||||||
b('Hello, World!'))
|
|
||||||
|
|
||||||
def test_geo_indexes(self):
|
def test_geo_indexes(self):
|
||||||
"""Ensure that indexes are created automatically for GeoPointFields.
|
"""Ensure that indexes are created automatically for GeoPointFields.
|
||||||
"""
|
"""
|
370
tests/fields/file.py
Normal file
370
tests/fields/file.py
Normal file
@ -0,0 +1,370 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
from __future__ import with_statement
|
||||||
|
import sys
|
||||||
|
sys.path[0:0] = [""]
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
import os
|
||||||
|
import unittest
|
||||||
|
import uuid
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from decimal import Decimal
|
||||||
|
|
||||||
|
from bson import Binary, DBRef, ObjectId
|
||||||
|
import gridfs
|
||||||
|
|
||||||
|
from nose.plugins.skip import SkipTest
|
||||||
|
from mongoengine import *
|
||||||
|
from mongoengine.connection import get_db
|
||||||
|
from mongoengine.base import _document_registry
|
||||||
|
from mongoengine.errors import NotRegistered
|
||||||
|
from mongoengine.python_support import PY3, b, StringIO, bin_type
|
||||||
|
|
||||||
|
TEST_IMAGE_PATH = os.path.join(os.path.dirname(__file__), 'mongoengine.png')
|
||||||
|
|
||||||
|
|
||||||
|
class FileTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
connect(db='mongoenginetest')
|
||||||
|
self.db = get_db()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.db.drop_collection('fs.files')
|
||||||
|
self.db.drop_collection('fs.chunks')
|
||||||
|
|
||||||
|
def test_file_field_optional(self):
|
||||||
|
# Make sure FileField is optional and not required
|
||||||
|
class DemoFile(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
DemoFile.objects.create()
|
||||||
|
|
||||||
|
def test_file_fields(self):
|
||||||
|
"""Ensure that file fields can be written to and their data retrieved
|
||||||
|
"""
|
||||||
|
|
||||||
|
class PutFile(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
|
||||||
|
PutFile.drop_collection()
|
||||||
|
|
||||||
|
text = b('Hello, World!')
|
||||||
|
more_text = b('Foo Bar')
|
||||||
|
content_type = 'text/plain'
|
||||||
|
|
||||||
|
putfile = PutFile()
|
||||||
|
putfile.the_file.put(text, content_type=content_type)
|
||||||
|
putfile.save()
|
||||||
|
putfile.validate()
|
||||||
|
result = PutFile.objects.first()
|
||||||
|
self.assertTrue(putfile == result)
|
||||||
|
self.assertEqual(result.the_file.read(), text)
|
||||||
|
self.assertEqual(result.the_file.content_type, content_type)
|
||||||
|
result.the_file.delete() # Remove file from GridFS
|
||||||
|
PutFile.objects.delete()
|
||||||
|
|
||||||
|
# Ensure file-like objects are stored
|
||||||
|
PutFile.drop_collection()
|
||||||
|
|
||||||
|
putfile = PutFile()
|
||||||
|
putstring = StringIO()
|
||||||
|
putstring.write(text)
|
||||||
|
putstring.seek(0)
|
||||||
|
putfile.the_file.put(putstring, content_type=content_type)
|
||||||
|
putfile.save()
|
||||||
|
putfile.validate()
|
||||||
|
result = PutFile.objects.first()
|
||||||
|
self.assertTrue(putfile == result)
|
||||||
|
self.assertEqual(result.the_file.read(), text)
|
||||||
|
self.assertEqual(result.the_file.content_type, content_type)
|
||||||
|
result.the_file.delete()
|
||||||
|
|
||||||
|
def test_file_fields_stream(self):
|
||||||
|
"""Ensure that file fields can be written to and their data retrieved
|
||||||
|
"""
|
||||||
|
class StreamFile(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
|
||||||
|
StreamFile.drop_collection()
|
||||||
|
|
||||||
|
text = b('Hello, World!')
|
||||||
|
more_text = b('Foo Bar')
|
||||||
|
content_type = 'text/plain'
|
||||||
|
|
||||||
|
streamfile = StreamFile()
|
||||||
|
streamfile.the_file.new_file(content_type=content_type)
|
||||||
|
streamfile.the_file.write(text)
|
||||||
|
streamfile.the_file.write(more_text)
|
||||||
|
streamfile.the_file.close()
|
||||||
|
streamfile.save()
|
||||||
|
streamfile.validate()
|
||||||
|
result = StreamFile.objects.first()
|
||||||
|
self.assertTrue(streamfile == result)
|
||||||
|
self.assertEqual(result.the_file.read(), text + more_text)
|
||||||
|
self.assertEqual(result.the_file.content_type, content_type)
|
||||||
|
result.the_file.seek(0)
|
||||||
|
self.assertEqual(result.the_file.tell(), 0)
|
||||||
|
self.assertEqual(result.the_file.read(len(text)), text)
|
||||||
|
self.assertEqual(result.the_file.tell(), len(text))
|
||||||
|
self.assertEqual(result.the_file.read(len(more_text)), more_text)
|
||||||
|
self.assertEqual(result.the_file.tell(), len(text + more_text))
|
||||||
|
result.the_file.delete()
|
||||||
|
|
||||||
|
# Ensure deleted file returns None
|
||||||
|
self.assertTrue(result.the_file.read() == None)
|
||||||
|
|
||||||
|
def test_file_fields_set(self):
|
||||||
|
|
||||||
|
class SetFile(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
|
||||||
|
text = b('Hello, World!')
|
||||||
|
more_text = b('Foo Bar')
|
||||||
|
|
||||||
|
SetFile.drop_collection()
|
||||||
|
|
||||||
|
setfile = SetFile()
|
||||||
|
setfile.the_file = text
|
||||||
|
setfile.save()
|
||||||
|
|
||||||
|
result = SetFile.objects.first()
|
||||||
|
self.assertTrue(setfile == result)
|
||||||
|
self.assertEqual(result.the_file.read(), text)
|
||||||
|
|
||||||
|
# Try replacing file with new one
|
||||||
|
result.the_file.replace(more_text)
|
||||||
|
result.save()
|
||||||
|
result.validate()
|
||||||
|
result = SetFile.objects.first()
|
||||||
|
self.assertTrue(setfile == result)
|
||||||
|
self.assertEqual(result.the_file.read(), more_text)
|
||||||
|
result.the_file.delete()
|
||||||
|
|
||||||
|
def test_file_field_no_default(self):
|
||||||
|
|
||||||
|
class GridDocument(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
|
||||||
|
GridDocument.drop_collection()
|
||||||
|
|
||||||
|
with tempfile.TemporaryFile() as f:
|
||||||
|
f.write(b("Hello World!"))
|
||||||
|
f.flush()
|
||||||
|
|
||||||
|
# Test without default
|
||||||
|
doc_a = GridDocument()
|
||||||
|
doc_a.save()
|
||||||
|
|
||||||
|
doc_b = GridDocument.objects.with_id(doc_a.id)
|
||||||
|
doc_b.the_file.replace(f, filename='doc_b')
|
||||||
|
doc_b.save()
|
||||||
|
self.assertNotEqual(doc_b.the_file.grid_id, None)
|
||||||
|
|
||||||
|
# Test it matches
|
||||||
|
doc_c = GridDocument.objects.with_id(doc_b.id)
|
||||||
|
self.assertEqual(doc_b.the_file.grid_id, doc_c.the_file.grid_id)
|
||||||
|
|
||||||
|
# Test with default
|
||||||
|
doc_d = GridDocument(the_file=b(''))
|
||||||
|
doc_d.save()
|
||||||
|
|
||||||
|
doc_e = GridDocument.objects.with_id(doc_d.id)
|
||||||
|
self.assertEqual(doc_d.the_file.grid_id, doc_e.the_file.grid_id)
|
||||||
|
|
||||||
|
doc_e.the_file.replace(f, filename='doc_e')
|
||||||
|
doc_e.save()
|
||||||
|
|
||||||
|
doc_f = GridDocument.objects.with_id(doc_e.id)
|
||||||
|
self.assertEqual(doc_e.the_file.grid_id, doc_f.the_file.grid_id)
|
||||||
|
|
||||||
|
db = GridDocument._get_db()
|
||||||
|
grid_fs = gridfs.GridFS(db)
|
||||||
|
self.assertEqual(['doc_b', 'doc_e'], grid_fs.list())
|
||||||
|
|
||||||
|
def test_file_uniqueness(self):
|
||||||
|
"""Ensure that each instance of a FileField is unique
|
||||||
|
"""
|
||||||
|
class TestFile(Document):
|
||||||
|
name = StringField()
|
||||||
|
the_file = FileField()
|
||||||
|
|
||||||
|
# First instance
|
||||||
|
test_file = TestFile()
|
||||||
|
test_file.name = "Hello, World!"
|
||||||
|
test_file.the_file.put(b('Hello, World!'))
|
||||||
|
test_file.save()
|
||||||
|
|
||||||
|
# Second instance
|
||||||
|
test_file_dupe = TestFile()
|
||||||
|
data = test_file_dupe.the_file.read() # Should be None
|
||||||
|
|
||||||
|
self.assertTrue(test_file.name != test_file_dupe.name)
|
||||||
|
self.assertTrue(test_file.the_file.read() != data)
|
||||||
|
|
||||||
|
TestFile.drop_collection()
|
||||||
|
|
||||||
|
def test_file_saving(self):
|
||||||
|
"""Ensure you can add meta data to file"""
|
||||||
|
|
||||||
|
class Animal(Document):
|
||||||
|
genus = StringField()
|
||||||
|
family = StringField()
|
||||||
|
photo = FileField()
|
||||||
|
|
||||||
|
Animal.drop_collection()
|
||||||
|
marmot = Animal(genus='Marmota', family='Sciuridae')
|
||||||
|
|
||||||
|
marmot_photo = open(TEST_IMAGE_PATH, 'r') # Retrieve a photo from disk
|
||||||
|
marmot.photo.put(marmot_photo, content_type='image/jpeg', foo='bar')
|
||||||
|
marmot.photo.close()
|
||||||
|
marmot.save()
|
||||||
|
|
||||||
|
marmot = Animal.objects.get()
|
||||||
|
self.assertEqual(marmot.photo.content_type, 'image/jpeg')
|
||||||
|
self.assertEqual(marmot.photo.foo, 'bar')
|
||||||
|
|
||||||
|
def test_file_boolean(self):
|
||||||
|
"""Ensure that a boolean test of a FileField indicates its presence
|
||||||
|
"""
|
||||||
|
class TestFile(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
TestFile.drop_collection()
|
||||||
|
|
||||||
|
test_file = TestFile()
|
||||||
|
self.assertFalse(bool(test_file.the_file))
|
||||||
|
test_file.the_file.put(b('Hello, World!'), content_type='text/plain')
|
||||||
|
test_file.save()
|
||||||
|
self.assertTrue(bool(test_file.the_file))
|
||||||
|
|
||||||
|
test_file = TestFile.objects.first()
|
||||||
|
self.assertEqual(test_file.the_file.content_type, "text/plain")
|
||||||
|
|
||||||
|
def test_file_cmp(self):
|
||||||
|
"""Test comparing against other types"""
|
||||||
|
class TestFile(Document):
|
||||||
|
the_file = FileField()
|
||||||
|
|
||||||
|
test_file = TestFile()
|
||||||
|
self.assertFalse(test_file.the_file in [{"test": 1}])
|
||||||
|
|
||||||
|
def test_image_field(self):
|
||||||
|
if PY3:
|
||||||
|
raise SkipTest('PIL does not have Python 3 support')
|
||||||
|
|
||||||
|
class TestImage(Document):
|
||||||
|
image = ImageField()
|
||||||
|
|
||||||
|
TestImage.drop_collection()
|
||||||
|
|
||||||
|
t = TestImage()
|
||||||
|
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
||||||
|
t.save()
|
||||||
|
|
||||||
|
t = TestImage.objects.first()
|
||||||
|
|
||||||
|
self.assertEqual(t.image.format, 'PNG')
|
||||||
|
|
||||||
|
w, h = t.image.size
|
||||||
|
self.assertEqual(w, 371)
|
||||||
|
self.assertEqual(h, 76)
|
||||||
|
|
||||||
|
t.image.delete()
|
||||||
|
|
||||||
|
def test_image_field_resize(self):
|
||||||
|
if PY3:
|
||||||
|
raise SkipTest('PIL does not have Python 3 support')
|
||||||
|
|
||||||
|
class TestImage(Document):
|
||||||
|
image = ImageField(size=(185, 37))
|
||||||
|
|
||||||
|
TestImage.drop_collection()
|
||||||
|
|
||||||
|
t = TestImage()
|
||||||
|
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
||||||
|
t.save()
|
||||||
|
|
||||||
|
t = TestImage.objects.first()
|
||||||
|
|
||||||
|
self.assertEqual(t.image.format, 'PNG')
|
||||||
|
w, h = t.image.size
|
||||||
|
|
||||||
|
self.assertEqual(w, 185)
|
||||||
|
self.assertEqual(h, 37)
|
||||||
|
|
||||||
|
t.image.delete()
|
||||||
|
|
||||||
|
def test_image_field_resize_force(self):
|
||||||
|
if PY3:
|
||||||
|
raise SkipTest('PIL does not have Python 3 support')
|
||||||
|
|
||||||
|
class TestImage(Document):
|
||||||
|
image = ImageField(size=(185, 37, True))
|
||||||
|
|
||||||
|
TestImage.drop_collection()
|
||||||
|
|
||||||
|
t = TestImage()
|
||||||
|
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
||||||
|
t.save()
|
||||||
|
|
||||||
|
t = TestImage.objects.first()
|
||||||
|
|
||||||
|
self.assertEqual(t.image.format, 'PNG')
|
||||||
|
w, h = t.image.size
|
||||||
|
|
||||||
|
self.assertEqual(w, 185)
|
||||||
|
self.assertEqual(h, 37)
|
||||||
|
|
||||||
|
t.image.delete()
|
||||||
|
|
||||||
|
def test_image_field_thumbnail(self):
|
||||||
|
if PY3:
|
||||||
|
raise SkipTest('PIL does not have Python 3 support')
|
||||||
|
|
||||||
|
class TestImage(Document):
|
||||||
|
image = ImageField(thumbnail_size=(92, 18))
|
||||||
|
|
||||||
|
TestImage.drop_collection()
|
||||||
|
|
||||||
|
t = TestImage()
|
||||||
|
t.image.put(open(TEST_IMAGE_PATH, 'r'))
|
||||||
|
t.save()
|
||||||
|
|
||||||
|
t = TestImage.objects.first()
|
||||||
|
|
||||||
|
self.assertEqual(t.image.thumbnail.format, 'PNG')
|
||||||
|
self.assertEqual(t.image.thumbnail.width, 92)
|
||||||
|
self.assertEqual(t.image.thumbnail.height, 18)
|
||||||
|
|
||||||
|
t.image.delete()
|
||||||
|
|
||||||
|
def test_file_multidb(self):
|
||||||
|
register_connection('test_files', 'test_files')
|
||||||
|
|
||||||
|
class TestFile(Document):
|
||||||
|
name = StringField()
|
||||||
|
the_file = FileField(db_alias="test_files",
|
||||||
|
collection_name="macumba")
|
||||||
|
|
||||||
|
TestFile.drop_collection()
|
||||||
|
|
||||||
|
# delete old filesystem
|
||||||
|
get_db("test_files").macumba.files.drop()
|
||||||
|
get_db("test_files").macumba.chunks.drop()
|
||||||
|
|
||||||
|
# First instance
|
||||||
|
test_file = TestFile()
|
||||||
|
test_file.name = "Hello, World!"
|
||||||
|
test_file.the_file.put(b('Hello, World!'),
|
||||||
|
name="hello.txt")
|
||||||
|
test_file.save()
|
||||||
|
|
||||||
|
data = get_db("test_files").macumba.files.find_one()
|
||||||
|
self.assertEqual(data.get('name'), 'hello.txt')
|
||||||
|
|
||||||
|
test_file = TestFile.objects.first()
|
||||||
|
self.assertEqual(test_file.the_file.read(),
|
||||||
|
b('Hello, World!'))
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Before Width: | Height: | Size: 8.1 KiB After Width: | Height: | Size: 8.1 KiB |
Loading…
x
Reference in New Issue
Block a user