add more test
This commit is contained in:
@@ -1,6 +0,0 @@
|
||||
TORTOISE_ORM = {
|
||||
"connections": {"default": "mysql://root:123456@127.0.0.1:3306/test"},
|
||||
"apps": {
|
||||
"models": {"models": ["tests.models", "aerich.models"], "default_connection": "default"}
|
||||
},
|
||||
}
|
||||
|
||||
56
tests/diff_models.py
Normal file
56
tests/diff_models.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import datetime
|
||||
from enum import IntEnum
|
||||
|
||||
from tortoise import Model, fields
|
||||
|
||||
|
||||
class ProductType(IntEnum):
|
||||
article = 1
|
||||
page = 2
|
||||
|
||||
|
||||
class PermissionAction(IntEnum):
|
||||
create = 1
|
||||
delete = 2
|
||||
update = 3
|
||||
read = 4
|
||||
|
||||
|
||||
class Status(IntEnum):
|
||||
on = 1
|
||||
off = 0
|
||||
|
||||
|
||||
class User(Model):
|
||||
username = fields.CharField(max_length=20,)
|
||||
password = fields.CharField(max_length=200)
|
||||
last_login = fields.DatetimeField(description="Last Login", default=datetime.datetime.now)
|
||||
is_active = fields.BooleanField(default=True, description="Is Active")
|
||||
is_superuser = fields.BooleanField(default=False, description="Is SuperUser")
|
||||
avatar = fields.CharField(max_length=200, default="")
|
||||
intro = fields.TextField(default="")
|
||||
|
||||
|
||||
class Category(Model):
|
||||
slug = fields.CharField(max_length=200)
|
||||
user = fields.ForeignKeyField("diff_models.User", description="User")
|
||||
created_at = fields.DatetimeField(auto_now_add=True)
|
||||
|
||||
|
||||
class Product(Model):
|
||||
categories = fields.ManyToManyField("diff_models.Category")
|
||||
name = fields.CharField(max_length=50)
|
||||
view_num = fields.IntField(description="View Num")
|
||||
sort = fields.IntField()
|
||||
is_reviewed = fields.BooleanField(description="Is Reviewed")
|
||||
type = fields.IntEnumField(ProductType, description="Product Type")
|
||||
image = fields.CharField(max_length=200)
|
||||
body = fields.TextField()
|
||||
created_at = fields.DatetimeField(auto_now_add=True)
|
||||
|
||||
|
||||
class Config(Model):
|
||||
label = fields.CharField(max_length=200)
|
||||
key = fields.CharField(max_length=20)
|
||||
value = fields.JSONField()
|
||||
status: Status = fields.IntEnumField(Status, default=Status.on)
|
||||
@@ -1,114 +1,101 @@
|
||||
from tortoise import Tortoise
|
||||
from tortoise.backends.asyncpg.schema_generator import AsyncpgSchemaGenerator
|
||||
from tortoise.backends.mysql.schema_generator import MySQLSchemaGenerator
|
||||
from tortoise.backends.sqlite.schema_generator import SqliteSchemaGenerator
|
||||
from tortoise.contrib import test
|
||||
|
||||
from aerich.ddl.mysql import MysqlDDL
|
||||
from aerich.ddl.postgres import PostgresDDL
|
||||
from aerich.ddl.sqlite import SqliteDDL
|
||||
from aerich.migrate import Migrate
|
||||
from tests.models import Category
|
||||
|
||||
|
||||
class TestDDL(test.TruncationTestCase):
|
||||
maxDiff = None
|
||||
|
||||
def setUp(self) -> None:
|
||||
client = Tortoise.get_connection("models")
|
||||
if client.schema_generator is MySQLSchemaGenerator:
|
||||
self.ddl = MysqlDDL(client)
|
||||
elif client.schema_generator is SqliteSchemaGenerator:
|
||||
self.ddl = SqliteDDL(client)
|
||||
elif client.schema_generator is AsyncpgSchemaGenerator:
|
||||
self.ddl = PostgresDDL(client)
|
||||
|
||||
def test_create_table(self):
|
||||
ret = self.ddl.create_table(Category)
|
||||
if isinstance(self.ddl, MysqlDDL):
|
||||
self.assertEqual(
|
||||
ret,
|
||||
"""CREATE TABLE IF NOT EXISTS `category` (
|
||||
def test_create_table():
|
||||
ret = Migrate.ddl.create_table(Category)
|
||||
if isinstance(Migrate.ddl, MysqlDDL):
|
||||
assert (
|
||||
ret
|
||||
== """CREATE TABLE IF NOT EXISTS `category` (
|
||||
`id` INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
|
||||
`slug` VARCHAR(200) NOT NULL,
|
||||
`name` VARCHAR(200) NOT NULL,
|
||||
`created_at` DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
|
||||
`user_id` INT NOT NULL COMMENT 'User',
|
||||
CONSTRAINT `fk_category_user_e2e3874c` FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE CASCADE
|
||||
) CHARACTER SET utf8mb4;""",
|
||||
)
|
||||
elif isinstance(self.ddl, SqliteDDL):
|
||||
self.assertEqual(
|
||||
ret,
|
||||
"""CREATE TABLE IF NOT EXISTS "category" (
|
||||
) CHARACTER SET utf8mb4;"""
|
||||
)
|
||||
|
||||
elif isinstance(Migrate.ddl, SqliteDDL):
|
||||
assert (
|
||||
ret
|
||||
== """CREATE TABLE IF NOT EXISTS "category" (
|
||||
"id" INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
"slug" VARCHAR(200) NOT NULL,
|
||||
"name" VARCHAR(200) NOT NULL,
|
||||
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"user_id" INT NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE /* User */
|
||||
);""",
|
||||
)
|
||||
elif isinstance(self.ddl, PostgresDDL):
|
||||
self.assertEqual(
|
||||
ret,
|
||||
"""CREATE TABLE IF NOT EXISTS "category" (
|
||||
);"""
|
||||
)
|
||||
|
||||
elif isinstance(Migrate.ddl, PostgresDDL):
|
||||
assert (
|
||||
ret
|
||||
== """CREATE TABLE IF NOT EXISTS "category" (
|
||||
"id" SERIAL NOT NULL PRIMARY KEY,
|
||||
"slug" VARCHAR(200) NOT NULL,
|
||||
"name" VARCHAR(200) NOT NULL,
|
||||
"created_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"user_id" INT NOT NULL REFERENCES "user" ("id") ON DELETE CASCADE
|
||||
);
|
||||
COMMENT ON COLUMN "category"."user_id" IS 'User';""",
|
||||
)
|
||||
|
||||
def test_drop_table(self):
|
||||
ret = self.ddl.drop_table(Category)
|
||||
self.assertEqual(ret, "DROP TABLE IF EXISTS category")
|
||||
|
||||
def test_add_column(self):
|
||||
ret = self.ddl.add_column(Category, Category._meta.fields_map.get("name"))
|
||||
if isinstance(self.ddl, MysqlDDL):
|
||||
self.assertEqual(ret, "ALTER TABLE category ADD `name` VARCHAR(200) NOT NULL")
|
||||
elif isinstance(self.ddl, PostgresDDL):
|
||||
self.assertEqual(ret, 'ALTER TABLE category ADD "name" VARCHAR(200) NOT NULL')
|
||||
elif isinstance(self.ddl, SqliteDDL):
|
||||
self.assertEqual(ret, 'ALTER TABLE category ADD "name" VARCHAR(200) NOT NULL')
|
||||
|
||||
def test_drop_column(self):
|
||||
ret = self.ddl.drop_column(Category, "name")
|
||||
self.assertEqual(ret, "ALTER TABLE category DROP COLUMN name")
|
||||
self.assertEqual(ret, "ALTER TABLE category DROP COLUMN name")
|
||||
|
||||
def test_add_index(self):
|
||||
index = self.ddl.add_index(Category, ["name"])
|
||||
index_u = self.ddl.add_index(Category, ["name"], True)
|
||||
if isinstance(self.ddl, MysqlDDL):
|
||||
self.assertEqual(
|
||||
index, "ALTER TABLE category ADD INDEX idx_category_name_8b0cb9 (`name`)"
|
||||
)
|
||||
self.assertEqual(
|
||||
index_u, "ALTER TABLE category ADD UNIQUE INDEX uid_category_name_8b0cb9 (`name`)"
|
||||
)
|
||||
elif isinstance(self.ddl, SqliteDDL):
|
||||
self.assertEqual(
|
||||
index_u, 'ALTER TABLE category ADD UNIQUE INDEX uid_category_name_8b0cb9 ("name")'
|
||||
)
|
||||
self.assertEqual(
|
||||
index_u, 'ALTER TABLE category ADD UNIQUE INDEX uid_category_name_8b0cb9 ("name")'
|
||||
)
|
||||
|
||||
def test_drop_index(self):
|
||||
ret = self.ddl.drop_index(Category, ["name"])
|
||||
self.assertEqual(ret, "ALTER TABLE category DROP INDEX idx_category_name_8b0cb9")
|
||||
ret = self.ddl.drop_index(Category, ["name"], True)
|
||||
self.assertEqual(ret, "ALTER TABLE category DROP INDEX uid_category_name_8b0cb9")
|
||||
|
||||
def test_add_fk(self):
|
||||
ret = self.ddl.add_fk(Category, Category._meta.fields_map.get("user"))
|
||||
self.assertEqual(
|
||||
ret,
|
||||
"ALTER TABLE category ADD CONSTRAINT `fk_category_user_e2e3874c` FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE CASCADE",
|
||||
COMMENT ON COLUMN "category"."user_id" IS 'User';"""
|
||||
)
|
||||
|
||||
def test_drop_fk(self):
|
||||
ret = self.ddl.drop_fk(Category, Category._meta.fields_map.get("user"))
|
||||
self.assertEqual(ret, "ALTER TABLE category DROP FOREIGN KEY fk_category_user_e2e3874c")
|
||||
|
||||
def test_drop_table():
|
||||
ret = Migrate.ddl.drop_table(Category)
|
||||
assert ret == "DROP TABLE IF EXISTS category"
|
||||
|
||||
|
||||
def test_add_column():
|
||||
ret = Migrate.ddl.add_column(Category, Category._meta.fields_map.get("name"))
|
||||
if isinstance(Migrate.ddl, MysqlDDL):
|
||||
assert ret == "ALTER TABLE category ADD `name` VARCHAR(200) NOT NULL"
|
||||
elif isinstance(Migrate.ddl, PostgresDDL):
|
||||
assert ret == 'ALTER TABLE category ADD "name" VARCHAR(200) NOT NULL'
|
||||
elif isinstance(Migrate.ddl, SqliteDDL):
|
||||
assert ret == 'ALTER TABLE category ADD "name" VARCHAR(200) NOT NULL'
|
||||
|
||||
|
||||
def test_drop_column():
|
||||
ret = Migrate.ddl.drop_column(Category, "name")
|
||||
assert ret == "ALTER TABLE category DROP COLUMN name"
|
||||
assert ret == "ALTER TABLE category DROP COLUMN name"
|
||||
|
||||
|
||||
def test_add_index():
|
||||
index = Migrate.ddl.add_index(Category, ["name"])
|
||||
index_u = Migrate.ddl.add_index(Category, ["name"], True)
|
||||
if isinstance(Migrate.ddl, MysqlDDL):
|
||||
assert index == "ALTER TABLE category ADD INDEX idx_category_name_8b0cb9 (`name`)"
|
||||
|
||||
assert index_u == "ALTER TABLE category ADD UNIQUE INDEX uid_category_name_8b0cb9 (`name`)"
|
||||
|
||||
elif isinstance(Migrate.ddl, SqliteDDL):
|
||||
assert index_u == 'ALTER TABLE category ADD UNIQUE INDEX uid_category_name_8b0cb9 ("name")'
|
||||
|
||||
assert index_u == 'ALTER TABLE category ADD UNIQUE INDEX uid_category_name_8b0cb9 ("name")'
|
||||
|
||||
|
||||
def test_drop_index():
|
||||
ret = Migrate.ddl.drop_index(Category, ["name"])
|
||||
assert ret == "ALTER TABLE category DROP INDEX idx_category_name_8b0cb9"
|
||||
ret = Migrate.ddl.drop_index(Category, ["name"], True)
|
||||
assert ret == "ALTER TABLE category DROP INDEX uid_category_name_8b0cb9"
|
||||
|
||||
|
||||
def test_add_fk():
|
||||
ret = Migrate.ddl.add_fk(Category, Category._meta.fields_map.get("user"))
|
||||
assert (
|
||||
ret
|
||||
== "ALTER TABLE category ADD CONSTRAINT `fk_category_user_e2e3874c` FOREIGN KEY (`user_id`) REFERENCES `user` (`id`) ON DELETE CASCADE"
|
||||
)
|
||||
|
||||
|
||||
def test_drop_fk():
|
||||
ret = Migrate.ddl.drop_fk(Category, Category._meta.fields_map.get("user"))
|
||||
assert ret == "ALTER TABLE category DROP FOREIGN KEY fk_category_user_e2e3874c"
|
||||
|
||||
41
tests/test_migrate.py
Normal file
41
tests/test_migrate.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from tortoise import Tortoise
|
||||
|
||||
from aerich.ddl.mysql import MysqlDDL
|
||||
from aerich.ddl.postgres import PostgresDDL
|
||||
from aerich.ddl.sqlite import SqliteDDL
|
||||
from aerich.migrate import Migrate
|
||||
|
||||
|
||||
def test_migrate():
|
||||
apps = Tortoise.apps
|
||||
models = apps.get("models")
|
||||
diff_models = apps.get("diff_models")
|
||||
Migrate.diff_models(diff_models, models)
|
||||
Migrate.diff_models(models, diff_models, False)
|
||||
if isinstance(Migrate.ddl, MysqlDDL):
|
||||
assert Migrate.upgrade_operators == [
|
||||
"ALTER TABLE category ADD `name` VARCHAR(200) NOT NULL",
|
||||
"ALTER TABLE user ADD UNIQUE INDEX uid_user_usernam_9987ab (`username`)",
|
||||
]
|
||||
assert Migrate.downgrade_operators == [
|
||||
"ALTER TABLE category DROP COLUMN name",
|
||||
"ALTER TABLE user DROP INDEX uid_user_usernam_9987ab",
|
||||
]
|
||||
elif isinstance(Migrate.ddl, SqliteDDL):
|
||||
assert Migrate.upgrade_operators == [
|
||||
'ALTER TABLE category ADD "name" VARCHAR(200) NOT NULL',
|
||||
'ALTER TABLE user ADD UNIQUE INDEX uid_user_usernam_9987ab ("username")',
|
||||
]
|
||||
assert Migrate.downgrade_operators == [
|
||||
"ALTER TABLE category DROP COLUMN name",
|
||||
"ALTER TABLE user DROP INDEX uid_user_usernam_9987ab",
|
||||
]
|
||||
elif isinstance(Migrate.ddl, PostgresDDL):
|
||||
assert Migrate.upgrade_operators == [
|
||||
'ALTER TABLE category ADD "name" VARCHAR(200) NOT NULL',
|
||||
'ALTER TABLE user ADD UNIQUE INDEX uid_user_usernam_9987ab ("username")',
|
||||
]
|
||||
assert Migrate.downgrade_operators == [
|
||||
"ALTER TABLE category DROP COLUMN name",
|
||||
"ALTER TABLE user DROP INDEX uid_user_usernam_9987ab",
|
||||
]
|
||||
Reference in New Issue
Block a user