587 lines
17 KiB
Python
587 lines
17 KiB
Python
import copy
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
from io import BytesIO
|
|
|
|
import gridfs
|
|
import pytest
|
|
|
|
from mongoengine import *
|
|
from mongoengine.connection import get_db
|
|
|
|
try:
|
|
from PIL import Image
|
|
|
|
HAS_PIL = True
|
|
except ImportError:
|
|
HAS_PIL = False
|
|
|
|
from tests.utils import MongoDBTestCase
|
|
|
|
require_pil = pytest.mark.skipif(not HAS_PIL, reason="PIL not installed")
|
|
|
|
TEST_IMAGE_PATH = os.path.join(os.path.dirname(__file__), "mongoengine.png")
|
|
TEST_IMAGE2_PATH = os.path.join(os.path.dirname(__file__), "mongodb_leaf.png")
|
|
|
|
|
|
def get_file(path):
|
|
"""Use a BytesIO instead of a file to allow
|
|
to have a one-liner and avoid that the file remains opened"""
|
|
bytes_io = BytesIO()
|
|
with open(path, "rb") as f:
|
|
bytes_io.write(f.read())
|
|
bytes_io.seek(0)
|
|
return bytes_io
|
|
|
|
|
|
class TestFileField(MongoDBTestCase):
|
|
def tearDown(self):
|
|
self.db.drop_collection("fs.files")
|
|
self.db.drop_collection("fs.chunks")
|
|
|
|
def test_file_field_optional(self):
|
|
# Make sure FileField is optional and not required
|
|
class DemoFile(Document):
|
|
the_file = FileField()
|
|
|
|
DemoFile.objects.create()
|
|
|
|
def test_file_fields(self):
|
|
"""Ensure that file fields can be written to and their data retrieved
|
|
"""
|
|
|
|
class PutFile(Document):
|
|
the_file = FileField()
|
|
|
|
PutFile.drop_collection()
|
|
|
|
text = "Hello, World!".encode("latin-1")
|
|
content_type = "text/plain"
|
|
|
|
putfile = PutFile()
|
|
putfile.the_file.put(text, content_type=content_type, filename="hello")
|
|
putfile.save()
|
|
|
|
result = PutFile.objects.first()
|
|
assert putfile == result
|
|
assert (
|
|
"%s" % result.the_file
|
|
== "<GridFSProxy: hello (%s)>" % result.the_file.grid_id
|
|
)
|
|
assert result.the_file.read() == text
|
|
assert result.the_file.content_type == content_type
|
|
result.the_file.delete() # Remove file from GridFS
|
|
PutFile.objects.delete()
|
|
|
|
# Ensure file-like objects are stored
|
|
PutFile.drop_collection()
|
|
|
|
putfile = PutFile()
|
|
putstring = BytesIO()
|
|
putstring.write(text)
|
|
putstring.seek(0)
|
|
putfile.the_file.put(putstring, content_type=content_type)
|
|
putfile.save()
|
|
|
|
result = PutFile.objects.first()
|
|
assert putfile == result
|
|
assert result.the_file.read() == text
|
|
assert result.the_file.content_type == content_type
|
|
result.the_file.delete()
|
|
|
|
def test_file_fields_stream(self):
|
|
"""Ensure that file fields can be written to and their data retrieved
|
|
"""
|
|
|
|
class StreamFile(Document):
|
|
the_file = FileField()
|
|
|
|
StreamFile.drop_collection()
|
|
|
|
text = "Hello, World!".encode("latin-1")
|
|
more_text = "Foo Bar".encode("latin-1")
|
|
content_type = "text/plain"
|
|
|
|
streamfile = StreamFile()
|
|
streamfile.the_file.new_file(content_type=content_type)
|
|
streamfile.the_file.write(text)
|
|
streamfile.the_file.write(more_text)
|
|
streamfile.the_file.close()
|
|
streamfile.save()
|
|
|
|
result = StreamFile.objects.first()
|
|
assert streamfile == result
|
|
assert result.the_file.read() == text + more_text
|
|
assert result.the_file.content_type == content_type
|
|
result.the_file.seek(0)
|
|
assert result.the_file.tell() == 0
|
|
assert result.the_file.read(len(text)) == text
|
|
assert result.the_file.tell() == len(text)
|
|
assert result.the_file.read(len(more_text)) == more_text
|
|
assert result.the_file.tell() == len(text + more_text)
|
|
result.the_file.delete()
|
|
|
|
# Ensure deleted file returns None
|
|
assert result.the_file.read() is None
|
|
|
|
def test_file_fields_stream_after_none(self):
|
|
"""Ensure that a file field can be written to after it has been saved as
|
|
None
|
|
"""
|
|
|
|
class StreamFile(Document):
|
|
the_file = FileField()
|
|
|
|
StreamFile.drop_collection()
|
|
|
|
text = "Hello, World!".encode("latin-1")
|
|
more_text = "Foo Bar".encode("latin-1")
|
|
|
|
streamfile = StreamFile()
|
|
streamfile.save()
|
|
streamfile.the_file.new_file()
|
|
streamfile.the_file.write(text)
|
|
streamfile.the_file.write(more_text)
|
|
streamfile.the_file.close()
|
|
streamfile.save()
|
|
|
|
result = StreamFile.objects.first()
|
|
assert streamfile == result
|
|
assert result.the_file.read() == text + more_text
|
|
# assert result.the_file.content_type == content_type
|
|
result.the_file.seek(0)
|
|
assert result.the_file.tell() == 0
|
|
assert result.the_file.read(len(text)) == text
|
|
assert result.the_file.tell() == len(text)
|
|
assert result.the_file.read(len(more_text)) == more_text
|
|
assert result.the_file.tell() == len(text + more_text)
|
|
result.the_file.delete()
|
|
|
|
# Ensure deleted file returns None
|
|
assert result.the_file.read() is None
|
|
|
|
def test_file_fields_set(self):
|
|
class SetFile(Document):
|
|
the_file = FileField()
|
|
|
|
text = "Hello, World!".encode("latin-1")
|
|
more_text = "Foo Bar".encode("latin-1")
|
|
|
|
SetFile.drop_collection()
|
|
|
|
setfile = SetFile()
|
|
setfile.the_file = text
|
|
setfile.save()
|
|
|
|
result = SetFile.objects.first()
|
|
assert setfile == result
|
|
assert result.the_file.read() == text
|
|
|
|
# Try replacing file with new one
|
|
result.the_file.replace(more_text)
|
|
result.save()
|
|
|
|
result = SetFile.objects.first()
|
|
assert setfile == result
|
|
assert result.the_file.read() == more_text
|
|
result.the_file.delete()
|
|
|
|
def test_file_field_no_default(self):
|
|
class GridDocument(Document):
|
|
the_file = FileField()
|
|
|
|
GridDocument.drop_collection()
|
|
|
|
with tempfile.TemporaryFile() as f:
|
|
f.write("Hello World!".encode("latin-1"))
|
|
f.flush()
|
|
|
|
# Test without default
|
|
doc_a = GridDocument()
|
|
doc_a.save()
|
|
|
|
doc_b = GridDocument.objects.with_id(doc_a.id)
|
|
doc_b.the_file.replace(f, filename="doc_b")
|
|
doc_b.save()
|
|
assert doc_b.the_file.grid_id is not None
|
|
|
|
# Test it matches
|
|
doc_c = GridDocument.objects.with_id(doc_b.id)
|
|
assert doc_b.the_file.grid_id == doc_c.the_file.grid_id
|
|
|
|
# Test with default
|
|
doc_d = GridDocument(the_file="".encode("latin-1"))
|
|
doc_d.save()
|
|
|
|
doc_e = GridDocument.objects.with_id(doc_d.id)
|
|
assert doc_d.the_file.grid_id == doc_e.the_file.grid_id
|
|
|
|
doc_e.the_file.replace(f, filename="doc_e")
|
|
doc_e.save()
|
|
|
|
doc_f = GridDocument.objects.with_id(doc_e.id)
|
|
assert doc_e.the_file.grid_id == doc_f.the_file.grid_id
|
|
|
|
db = GridDocument._get_db()
|
|
grid_fs = gridfs.GridFS(db)
|
|
assert ["doc_b", "doc_e"] == grid_fs.list()
|
|
|
|
def test_file_uniqueness(self):
|
|
"""Ensure that each instance of a FileField is unique
|
|
"""
|
|
|
|
class TestFile(Document):
|
|
name = StringField()
|
|
the_file = FileField()
|
|
|
|
# First instance
|
|
test_file = TestFile()
|
|
test_file.name = "Hello, World!"
|
|
test_file.the_file.put("Hello, World!".encode("latin-1"))
|
|
test_file.save()
|
|
|
|
# Second instance
|
|
test_file_dupe = TestFile()
|
|
data = test_file_dupe.the_file.read() # Should be None
|
|
|
|
assert test_file.name != test_file_dupe.name
|
|
assert test_file.the_file.read() != data
|
|
|
|
TestFile.drop_collection()
|
|
|
|
def test_file_saving(self):
|
|
"""Ensure you can add meta data to file"""
|
|
|
|
class Animal(Document):
|
|
genus = StringField()
|
|
family = StringField()
|
|
photo = FileField()
|
|
|
|
Animal.drop_collection()
|
|
marmot = Animal(genus="Marmota", family="Sciuridae")
|
|
|
|
marmot_photo_content = get_file(TEST_IMAGE_PATH) # Retrieve a photo from disk
|
|
marmot.photo.put(marmot_photo_content, content_type="image/jpeg", foo="bar")
|
|
marmot.photo.close()
|
|
marmot.save()
|
|
|
|
marmot = Animal.objects.get()
|
|
assert marmot.photo.content_type == "image/jpeg"
|
|
assert marmot.photo.foo == "bar"
|
|
|
|
def test_file_reassigning(self):
|
|
class TestFile(Document):
|
|
the_file = FileField()
|
|
|
|
TestFile.drop_collection()
|
|
|
|
test_file = TestFile(the_file=get_file(TEST_IMAGE_PATH)).save()
|
|
assert test_file.the_file.get().length == 8313
|
|
|
|
test_file = TestFile.objects.first()
|
|
test_file.the_file = get_file(TEST_IMAGE2_PATH)
|
|
test_file.save()
|
|
assert test_file.the_file.get().length == 4971
|
|
|
|
def test_file_boolean(self):
|
|
"""Ensure that a boolean test of a FileField indicates its presence
|
|
"""
|
|
|
|
class TestFile(Document):
|
|
the_file = FileField()
|
|
|
|
TestFile.drop_collection()
|
|
|
|
test_file = TestFile()
|
|
assert not bool(test_file.the_file)
|
|
test_file.the_file.put(
|
|
"Hello, World!".encode("latin-1"), content_type="text/plain"
|
|
)
|
|
test_file.save()
|
|
assert bool(test_file.the_file)
|
|
|
|
test_file = TestFile.objects.first()
|
|
assert test_file.the_file.content_type == "text/plain"
|
|
|
|
def test_file_cmp(self):
|
|
"""Test comparing against other types"""
|
|
|
|
class TestFile(Document):
|
|
the_file = FileField()
|
|
|
|
test_file = TestFile()
|
|
assert test_file.the_file not in [{"test": 1}]
|
|
|
|
def test_file_disk_space(self):
|
|
""" Test disk space usage when we delete/replace a file """
|
|
|
|
class TestFile(Document):
|
|
the_file = FileField()
|
|
|
|
text = "Hello, World!".encode("latin-1")
|
|
content_type = "text/plain"
|
|
|
|
testfile = TestFile()
|
|
testfile.the_file.put(text, content_type=content_type, filename="hello")
|
|
testfile.save()
|
|
|
|
# Now check fs.files and fs.chunks
|
|
db = TestFile._get_db()
|
|
|
|
files = db.fs.files.find()
|
|
chunks = db.fs.chunks.find()
|
|
assert len(list(files)) == 1
|
|
assert len(list(chunks)) == 1
|
|
|
|
# Deleting the docoument should delete the files
|
|
testfile.delete()
|
|
|
|
files = db.fs.files.find()
|
|
chunks = db.fs.chunks.find()
|
|
assert len(list(files)) == 0
|
|
assert len(list(chunks)) == 0
|
|
|
|
# Test case where we don't store a file in the first place
|
|
testfile = TestFile()
|
|
testfile.save()
|
|
|
|
files = db.fs.files.find()
|
|
chunks = db.fs.chunks.find()
|
|
assert len(list(files)) == 0
|
|
assert len(list(chunks)) == 0
|
|
|
|
testfile.delete()
|
|
|
|
files = db.fs.files.find()
|
|
chunks = db.fs.chunks.find()
|
|
assert len(list(files)) == 0
|
|
assert len(list(chunks)) == 0
|
|
|
|
# Test case where we overwrite the file
|
|
testfile = TestFile()
|
|
testfile.the_file.put(text, content_type=content_type, filename="hello")
|
|
testfile.save()
|
|
|
|
text = "Bonjour, World!".encode("latin-1")
|
|
testfile.the_file.replace(text, content_type=content_type, filename="hello")
|
|
testfile.save()
|
|
|
|
files = db.fs.files.find()
|
|
chunks = db.fs.chunks.find()
|
|
assert len(list(files)) == 1
|
|
assert len(list(chunks)) == 1
|
|
|
|
testfile.delete()
|
|
|
|
files = db.fs.files.find()
|
|
chunks = db.fs.chunks.find()
|
|
assert len(list(files)) == 0
|
|
assert len(list(chunks)) == 0
|
|
|
|
@require_pil
|
|
def test_image_field(self):
|
|
class TestImage(Document):
|
|
image = ImageField()
|
|
|
|
TestImage.drop_collection()
|
|
|
|
with tempfile.TemporaryFile() as f:
|
|
f.write("Hello World!".encode("latin-1"))
|
|
f.flush()
|
|
|
|
t = TestImage()
|
|
try:
|
|
t.image.put(f)
|
|
self.fail("Should have raised an invalidation error")
|
|
except ValidationError as e:
|
|
assert "%s" % e == "Invalid image: cannot identify image file %s" % f
|
|
|
|
t = TestImage()
|
|
t.image.put(get_file(TEST_IMAGE_PATH))
|
|
t.save()
|
|
|
|
t = TestImage.objects.first()
|
|
|
|
assert t.image.format == "PNG"
|
|
|
|
w, h = t.image.size
|
|
assert w == 371
|
|
assert h == 76
|
|
|
|
t.image.delete()
|
|
|
|
@require_pil
|
|
def test_image_field_reassigning(self):
|
|
class TestFile(Document):
|
|
the_file = ImageField()
|
|
|
|
TestFile.drop_collection()
|
|
|
|
test_file = TestFile(the_file=get_file(TEST_IMAGE_PATH)).save()
|
|
assert test_file.the_file.size == (371, 76)
|
|
|
|
test_file = TestFile.objects.first()
|
|
test_file.the_file = get_file(TEST_IMAGE2_PATH)
|
|
test_file.save()
|
|
assert test_file.the_file.size == (45, 101)
|
|
|
|
@require_pil
|
|
def test_image_field_resize(self):
|
|
class TestImage(Document):
|
|
image = ImageField(size=(185, 37, True))
|
|
|
|
TestImage.drop_collection()
|
|
|
|
t = TestImage()
|
|
t.image.put(get_file(TEST_IMAGE_PATH))
|
|
t.save()
|
|
|
|
t = TestImage.objects.first()
|
|
|
|
assert t.image.format == "PNG"
|
|
w, h = t.image.size
|
|
|
|
assert w == 185
|
|
assert h == 37
|
|
|
|
t.image.delete()
|
|
|
|
@require_pil
|
|
def test_image_field_resize_force(self):
|
|
class TestImage(Document):
|
|
image = ImageField(size=(185, 37, True))
|
|
|
|
TestImage.drop_collection()
|
|
|
|
t = TestImage()
|
|
t.image.put(get_file(TEST_IMAGE_PATH))
|
|
t.save()
|
|
|
|
t = TestImage.objects.first()
|
|
|
|
assert t.image.format == "PNG"
|
|
w, h = t.image.size
|
|
|
|
assert w == 185
|
|
assert h == 37
|
|
|
|
t.image.delete()
|
|
|
|
@require_pil
|
|
def test_image_field_thumbnail(self):
|
|
class TestImage(Document):
|
|
image = ImageField(thumbnail_size=(92, 18, True))
|
|
|
|
TestImage.drop_collection()
|
|
|
|
t = TestImage()
|
|
t.image.put(get_file(TEST_IMAGE_PATH))
|
|
t.save()
|
|
|
|
t = TestImage.objects.first()
|
|
|
|
assert t.image.thumbnail.format == "PNG"
|
|
assert t.image.thumbnail.width == 92
|
|
assert t.image.thumbnail.height == 18
|
|
|
|
t.image.delete()
|
|
|
|
def test_file_multidb(self):
|
|
register_connection("test_files", "test_files")
|
|
|
|
class TestFile(Document):
|
|
name = StringField()
|
|
the_file = FileField(db_alias="test_files", collection_name="macumba")
|
|
|
|
TestFile.drop_collection()
|
|
|
|
# delete old filesystem
|
|
get_db("test_files").macumba.files.drop()
|
|
get_db("test_files").macumba.chunks.drop()
|
|
|
|
# First instance
|
|
test_file = TestFile()
|
|
test_file.name = "Hello, World!"
|
|
test_file.the_file.put("Hello, World!".encode("latin-1"), name="hello.txt")
|
|
test_file.save()
|
|
|
|
data = get_db("test_files").macumba.files.find_one()
|
|
assert data.get("name") == "hello.txt"
|
|
|
|
test_file = TestFile.objects.first()
|
|
assert test_file.the_file.read() == "Hello, World!".encode("latin-1")
|
|
|
|
test_file = TestFile.objects.first()
|
|
test_file.the_file = "Hello, World!".encode("latin-1")
|
|
test_file.save()
|
|
|
|
test_file = TestFile.objects.first()
|
|
assert test_file.the_file.read() == "Hello, World!".encode("latin-1")
|
|
|
|
def test_copyable(self):
|
|
class PutFile(Document):
|
|
the_file = FileField()
|
|
|
|
PutFile.drop_collection()
|
|
|
|
text = "Hello, World!".encode("latin-1")
|
|
content_type = "text/plain"
|
|
|
|
putfile = PutFile()
|
|
putfile.the_file.put(text, content_type=content_type)
|
|
putfile.save()
|
|
|
|
class TestFile(Document):
|
|
name = StringField()
|
|
|
|
assert putfile == copy.copy(putfile)
|
|
assert putfile == copy.deepcopy(putfile)
|
|
|
|
@require_pil
|
|
def test_get_image_by_grid_id(self):
|
|
class TestImage(Document):
|
|
|
|
image1 = ImageField()
|
|
image2 = ImageField()
|
|
|
|
TestImage.drop_collection()
|
|
|
|
t = TestImage()
|
|
t.image1.put(get_file(TEST_IMAGE_PATH))
|
|
t.image2.put(get_file(TEST_IMAGE2_PATH))
|
|
t.save()
|
|
|
|
test = TestImage.objects.first()
|
|
grid_id = test.image1.grid_id
|
|
|
|
assert 1 == TestImage.objects(Q(image1=grid_id) or Q(image2=grid_id)).count()
|
|
|
|
def test_complex_field_filefield(self):
|
|
"""Ensure you can add meta data to file"""
|
|
|
|
class Animal(Document):
|
|
genus = StringField()
|
|
family = StringField()
|
|
photos = ListField(FileField())
|
|
|
|
Animal.drop_collection()
|
|
marmot = Animal(genus="Marmota", family="Sciuridae")
|
|
|
|
with open(TEST_IMAGE_PATH, "rb") as marmot_photo: # Retrieve a photo from disk
|
|
photos_field = marmot._fields["photos"].field
|
|
new_proxy = photos_field.get_proxy_obj("photos", marmot)
|
|
new_proxy.put(marmot_photo, content_type="image/jpeg", foo="bar")
|
|
|
|
marmot.photos.append(new_proxy)
|
|
marmot.save()
|
|
|
|
marmot = Animal.objects.get()
|
|
assert marmot.photos[0].content_type == "image/jpeg"
|
|
assert marmot.photos[0].foo == "bar"
|
|
assert marmot.photos[0].get().length == 8313
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|