Added tests for custom db-backend

This commit is contained in:
Swen Kooij 2017-02-02 14:36:22 +02:00
parent 5ba9be4aeb
commit 1ef02616ec
5 changed files with 162 additions and 42 deletions

View File

@ -117,11 +117,8 @@ class SchemaEditor(_get_schema_editor_base()):
) )
self.execute(sql) self.execute(sql)
def _update_hstore_constraints(self, model, old_field, new_field): def _apply_hstore_constraints(self, method, model, field):
"""Updates the UNIQUE constraints for the specified field.""" """Creates/drops UNIQUE constraints for a field."""
old_uniqueness = getattr(old_field, 'uniqueness', None)
new_uniqueness = getattr(new_field, 'uniqueness', None)
def _compose_keys(constraint): def _compose_keys(constraint):
if isinstance(constraint, str): if isinstance(constraint, str):
@ -129,22 +126,37 @@ class SchemaEditor(_get_schema_editor_base()):
return constraint return constraint
uniqueness = getattr(field, 'uniqueness', None)
if not uniqueness:
return
for keys in uniqueness:
method(
model,
field,
_compose_keys(keys)
)
def _update_hstore_constraints(self, model, old_field, new_field):
"""Updates the UNIQUE constraints for the specified field."""
old_uniqueness = getattr(old_field, 'uniqueness', None)
new_uniqueness = getattr(new_field, 'uniqueness', None)
# drop any old uniqueness constraints # drop any old uniqueness constraints
if old_uniqueness: if old_uniqueness:
for keys in old_uniqueness: self._apply_hstore_constraints(
self._drop_hstore_unique( self._drop_hstore_unique,
model, model,
old_field, old_field
_compose_keys(keys)
) )
# (re-)create uniqueness constraints # (re-)create uniqueness constraints
if new_uniqueness: if new_uniqueness:
for keys in new_uniqueness: self._apply_hstore_constraints(
self._create_hstore_unique( self._create_hstore_unique,
model, model,
old_field, new_field
_compose_keys(keys)
) )
def _alter_field(self, model, old_field, new_field, *args, **kwargs): def _alter_field(self, model, old_field, new_field, *args, **kwargs):
@ -170,7 +182,26 @@ class SchemaEditor(_get_schema_editor_base()):
if not isinstance(field, LocalizedField): if not isinstance(field, LocalizedField):
continue continue
self._update_hstore_constraints(model, field, field) self._apply_hstore_constraints(
self._create_hstore_unique,
model,
field
)
def delete_model(self, model):
"""Ran when a model is being deleted."""
super().delete_model(model)
for field in model._meta.local_fields:
if not isinstance(field, LocalizedField):
continue
self._apply_hstore_constraints(
self._drop_hstore_unique,
model,
field
)
class DatabaseWrapper(_get_backend_base()): class DatabaseWrapper(_get_backend_base()):

View File

@ -1,6 +1,5 @@
from django.db import models from django.db import models, transaction
from django.db.utils import IntegrityError from django.db.utils import IntegrityError
from django.db import transaction
from .fields import LocalizedField from .fields import LocalizedField
from .localized_value import LocalizedValue from .localized_value import LocalizedValue
@ -41,7 +40,6 @@ class LocalizedModel(models.Model):
if not hasattr(self, 'retries'): if not hasattr(self, 'retries'):
self.retries = 0 self.retries = 0
error = None
with transaction.atomic(): with transaction.atomic():
try: try:
return super(LocalizedModel, self).save(*args, **kwargs) return super(LocalizedModel, self).save(*args, **kwargs)
@ -52,12 +50,8 @@ class LocalizedModel(models.Model):
# that apply to slug fields... so yea.. this is as # that apply to slug fields... so yea.. this is as
# retarded as it gets... i am sorry :( # retarded as it gets... i am sorry :(
if 'slug' not in str(ex): if 'slug' not in str(ex):
raise ex
error = ex
if self.retries >= 100: if self.retries >= 100:
raise error raise ex
self.retries += 1 self.retries += 1
return self.save() return self.save()

View File

@ -7,7 +7,7 @@ with open(os.path.join(os.path.dirname(__file__), 'README.rst')) as readme:
setup( setup(
name='django-localized-fields', name='django-localized-fields',
version='2.3', version='2.4',
packages=find_packages(), packages=find_packages(),
include_package_data=True, include_package_data=True,
license='MIT License', license='MIT License',

View File

@ -1,38 +1,46 @@
from django.db import connection, migrations from django.db import connection, migrations
from localized_fields import LocalizedModel
from django.db.migrations.executor import MigrationExecutor from django.db.migrations.executor import MigrationExecutor
from django.contrib.postgres.operations import HStoreExtension from django.contrib.postgres.operations import HStoreExtension
from localized_fields import LocalizedModel
def get_fake_model(name='TestModel', fields={}):
"""Creates a fake model to use during unit tests."""
def define_fake_model(name='TestModel', fields=None):
attributes = { attributes = {
'app_label': 'localized_fields', 'app_label': 'localized_fields',
'__module__': __name__, '__module__': __name__,
'__name__': name '__name__': name
} }
if fields:
attributes.update(fields) attributes.update(fields)
TestModel = type(name, (LocalizedModel,), attributes) model = type(name, (LocalizedModel,), attributes)
return model
def get_fake_model(name='TestModel', fields=None):
"""Creates a fake model to use during unit tests."""
model = define_fake_model(name, fields)
class TestProject: class TestProject:
def clone(self, *args, **kwargs): def clone(self, *_args, **_kwargs):
return self
@property
def apps(self):
return self return self
class TestMigration(migrations.Migration): class TestMigration(migrations.Migration):
operations = [ operations = [HStoreExtension()]
HStoreExtension()
]
with connection.schema_editor() as schema_editor: with connection.schema_editor() as schema_editor:
migration_executor = MigrationExecutor(schema_editor.connection) migration_executor = MigrationExecutor(schema_editor.connection)
migration_executor.apply_migration( migration_executor.apply_migration(
TestProject(), TestProject(), TestMigration('eh', 'localized_fields'))
TestMigration('eh', 'localized_fields')
)
schema_editor.create_model(TestModel) schema_editor.create_model(model)
return TestModel return model

87
tests/test_db_backend.py Normal file
View File

@ -0,0 +1,87 @@
from unittest import mock
from django.db import connection
from django.conf import settings
from django.test import TestCase
from django.db.backends.base.schema import BaseDatabaseSchemaEditor
from localized_fields import LocalizedField, get_language_codes
from .fake_model import define_fake_model
from django.apps import apps
class DBBackendTestCase(TestCase):
"""Tests the custom database back-end."""
@staticmethod
def test_hstore_extension_enabled():
"""Tests whether the `hstore` extension was
enabled automatically."""
with connection.cursor() as cursor:
cursor.execute((
'SELECT count(*) FROM pg_extension '
'WHERE extname = \'hstore\''
))
assert cursor.fetchone()[0] == 1
@classmethod
def test_migration_create_drop_model(cls):
"""Tests whether models containing a :see:LocalizedField
with a `uniqueness` constraint get created properly,
with the contraints in the database."""
model = define_fake_model('NewModel', {
'title': LocalizedField(uniqueness=get_language_codes())
})
# create the model in db and verify the indexes are being created
with mock.patch.object(BaseDatabaseSchemaEditor, 'execute') as execute:
with connection.schema_editor() as schema_editor:
schema_editor.create_model(model)
create_index_calls = [
call for call in execute.mock_calls if 'CREATE UNIQUE INDEX' in str(call)
]
assert len(create_index_calls) == len(settings.LANGUAGES)
# delete the model in the db and verify the indexes are being deleted
with mock.patch.object(BaseDatabaseSchemaEditor, 'execute') as execute:
with connection.schema_editor() as schema_editor:
schema_editor.delete_model(model)
drop_index_calls = [
call for call in execute.mock_calls if 'DROP INDEX' in str(call)
]
assert len(drop_index_calls) == len(settings.LANGUAGES)
@classmethod
def test_migration_alter_field(cls):
"""Tests whether the back-end correctly removes and
adds `uniqueness` constraints when altering a :see:LocalizedField."""
define_fake_model('ExistingModel', {
'title': LocalizedField(uniqueness=get_language_codes())
})
app_config = apps.get_app_config('tests')
with mock.patch.object(BaseDatabaseSchemaEditor, 'execute') as execute:
with connection.schema_editor() as schema_editor:
dynmodel = app_config.get_model('ExistingModel')
schema_editor.alter_field(
dynmodel,
dynmodel._meta.fields[1],
dynmodel._meta.fields[1]
)
index_calls = [
call for call in execute.mock_calls
if 'INDEX' in str(call) and 'title' in str(call)
]
assert len(index_calls) == len(settings.LANGUAGES) * 2