add fk and drop fk

This commit is contained in:
long2ice 2021-02-02 20:35:05 +08:00
parent f443dc68db
commit c60bdd290e
9 changed files with 876 additions and 111 deletions

View File

@ -61,12 +61,12 @@ class BaseDDL:
def _get_default(self, model: "Type[Model]", field_describe: dict): def _get_default(self, model: "Type[Model]", field_describe: dict):
db_table = model._meta.db_table db_table = model._meta.db_table
default = field_describe.get('default') default = field_describe.get("default")
db_column = field_describe.get('db_column') db_column = field_describe.get("db_column")
auto_now_add = field_describe.get("auto_now_add", False) auto_now_add = field_describe.get("auto_now_add", False)
auto_now = field_describe.get("auto_now", False) auto_now = field_describe.get("auto_now", False)
if default is not None or auto_now_add: if default is not None or auto_now_add:
if field_describe.get('field_type')in ['UUIDField', 'TextField', 'JSONField']: if field_describe.get("field_type") in ["UUIDField", "TextField", "JSONField"]:
default = "" default = ""
else: else:
try: try:
@ -109,7 +109,7 @@ class BaseDDL:
def drop_column(self, model: "Type[Model]", field_describe: dict): def drop_column(self, model: "Type[Model]", field_describe: dict):
return self._DROP_COLUMN_TEMPLATE.format( return self._DROP_COLUMN_TEMPLATE.format(
table_name=model._meta.db_table, column_name=field_describe.get('db_column') table_name=model._meta.db_table, column_name=field_describe.get("db_column")
) )
def modify_column(self, model: "Type[Model]", field_object: Field): def modify_column(self, model: "Type[Model]", field_object: Field):
@ -168,34 +168,35 @@ class BaseDDL:
table_name=model._meta.db_table, table_name=model._meta.db_table,
) )
def add_fk(self, model: "Type[Model]", field_describe: dict, field_describe_target: dict): def add_fk(self, model: "Type[Model]", field_describe: dict, reference_table_describe: dict):
db_table = model._meta.db_table db_table = model._meta.db_table
db_column = field_describe.get("raw_field") db_column = field_describe.get("raw_field")
reference_id = reference_table_describe.get("pk_field").get("db_column")
fk_name = self.schema_generator._generate_fk_name( fk_name = self.schema_generator._generate_fk_name(
from_table=db_table, from_table=db_table,
from_field=db_column, from_field=db_column,
to_table=field_describe.get('name'), to_table=reference_table_describe.get("table"),
to_field=db_column, to_field=reference_table_describe.get("pk_field").get("db_column"),
) )
return self._ADD_FK_TEMPLATE.format( return self._ADD_FK_TEMPLATE.format(
table_name=db_table, table_name=db_table,
fk_name=fk_name, fk_name=fk_name,
db_column=db_column, db_column=db_column,
table=field_describe.get('name'), table=field_describe.get("name"),
field=db_column, field=reference_id,
on_delete=field_describe.get('on_delete'), on_delete=field_describe.get("on_delete"),
) )
def drop_fk(self, model: "Type[Model]", field_describe: dict, field_describe_target: dict): def drop_fk(self, model: "Type[Model]", field_describe: dict, reference_table_describe: dict):
db_table = model._meta.db_table db_table = model._meta.db_table
return self._DROP_FK_TEMPLATE.format( return self._DROP_FK_TEMPLATE.format(
table_name=db_table, table_name=db_table,
fk_name=self.schema_generator._generate_fk_name( fk_name=self.schema_generator._generate_fk_name(
from_table=db_table, from_table=db_table,
from_field=field_describe.get('raw_field'), from_field=field_describe.get("raw_field"),
to_table=field_describe.get('name'), to_table=reference_table_describe.get("table"),
to_field=field_describe_target.get('db_column'), to_field=reference_table_describe.get("pk_field").get("db_column"),
), ),
) )

View File

@ -177,7 +177,86 @@ class Migrate:
if new_model_str not in old_models.keys(): if new_model_str not in old_models.keys():
cls._add_operator(cls.add_model(cls._get_model(new_model_str)), upgrade) cls._add_operator(cls.add_model(cls._get_model(new_model_str)), upgrade)
else: else:
cls.diff_model(old_models.get(new_model_str), new_model_describe, upgrade) old_model_describe = old_models.get(new_model_str)
old_unique_together = old_model_describe.get("unique_together")
new_unique_together = new_model_describe.get("unique_together")
old_data_fields = old_model_describe.get("data_fields")
new_data_fields = new_model_describe.get("data_fields")
old_data_fields_name = list(map(lambda x: x.get("name"), old_data_fields))
new_data_fields_name = list(map(lambda x: x.get("name"), new_data_fields))
model = cls._get_model(new_model_describe.get("name").split(".")[1])
# add fields
for new_data_field_name in set(new_data_fields_name).difference(
set(old_data_fields_name)
):
cls._add_operator(
cls._add_field(
model,
next(
filter(
lambda x: x.get("name") == new_data_field_name, new_data_fields
)
),
),
upgrade,
)
# remove fields
for old_data_field_name in set(old_data_fields_name).difference(
set(new_data_fields_name)
):
cls._add_operator(
cls._remove_field(
model,
next(
filter(
lambda x: x.get("name") == old_data_field_name, old_data_fields
)
),
),
upgrade,
)
old_fk_fields = old_model_describe.get("fk_fields")
new_fk_fields = new_model_describe.get("fk_fields")
old_fk_fields_name = list(map(lambda x: x.get("name"), old_fk_fields))
new_fk_fields_name = list(map(lambda x: x.get("name"), new_fk_fields))
# add fk
for new_fk_field_name in set(new_fk_fields_name).difference(
set(old_fk_fields_name)
):
fk_field = next(
filter(lambda x: x.get("name") == new_fk_field_name, new_fk_fields)
)
cls._add_operator(
cls._add_fk(model, fk_field, old_models.get(fk_field.get("python_type"))),
upgrade,
)
# drop fk
for old_fk_field_name in set(old_fk_fields_name).difference(
set(new_fk_fields_name)
):
old_fk_field = next(
filter(lambda x: x.get("name") == old_fk_field_name, old_fk_fields)
)
cls._add_operator(
cls._drop_fk(
model, old_fk_field, old_models.get(old_fk_field.get("python_type"))
),
upgrade,
)
# change fields
for field_name in set(new_data_fields_name).intersection(set(old_data_fields_name)):
old_data_field = next(
filter(lambda x: x.get("name") == field_name, old_data_fields)
)
new_data_field = next(
filter(lambda x: x.get("name") == field_name, new_data_fields)
)
for old_model in old_models: for old_model in old_models:
if old_model not in new_models.keys(): if old_model not in new_models.keys():
@ -195,59 +274,6 @@ class Migrate:
def remove_model(cls, model: Type[Model]): def remove_model(cls, model: Type[Model]):
return cls.ddl.drop_table(model) return cls.ddl.drop_table(model)
@classmethod
def diff_model(cls, old_model_describe: dict, new_model_describe: dict, upgrade=True):
"""
diff single model
:param old_model_describe:
:param new_model_describe:
:param upgrade:
:return:
"""
old_unique_together = old_model_describe.get('unique_together')
new_unique_together = new_model_describe.get('unique_together')
old_data_fields = old_model_describe.get('data_fields')
new_data_fields = new_model_describe.get('data_fields')
old_data_fields_name = list(map(lambda x: x.get('name'), old_data_fields))
new_data_fields_name = list(map(lambda x: x.get('name'), new_data_fields))
model = cls._get_model(new_model_describe.get('name').split('.')[1])
# add fields
for new_data_field_name in set(new_data_fields_name).difference(set(old_data_fields_name)):
cls._add_operator(
cls._add_field(model, next(filter(lambda x: x.get('name') == new_data_field_name, new_data_fields))),
upgrade)
# remove fields
for old_data_field_name in set(old_data_fields_name).difference(set(new_data_fields_name)):
cls._add_operator(
cls._remove_field(model, next(filter(lambda x: x.get('name') == old_data_field_name, old_data_fields))),
upgrade)
old_fk_fields = old_model_describe.get('fk_fields')
new_fk_fields = new_model_describe.get('fk_fields')
old_fk_fields_name = list(map(lambda x: x.get('name'), old_fk_fields))
new_fk_fields_name = list(map(lambda x: x.get('name'), new_fk_fields))
# add fk
for new_fk_field_name in set(new_fk_fields_name).difference(set(old_fk_fields_name)):
fk_field = next(filter(lambda x: x.get('name') == new_fk_field_name, new_fk_fields))
cls._add_operator(
cls._add_fk(model, fk_field,
next(filter(lambda x: x.get('db_column') == fk_field.get('raw_field'), new_data_fields))),
upgrade)
# drop fk
for old_fk_field_name in set(old_fk_fields_name).difference(set(new_fk_fields_name)):
old_fk_field = next(filter(lambda x: x.get('name') == old_fk_field_name, old_fk_fields))
cls._add_operator(
cls._drop_fk(
model, old_fk_field,
next(filter(lambda x: x.get('db_column') == old_fk_field.get('raw_field'), old_data_fields))),
upgrade)
@classmethod @classmethod
def _resolve_fk_fields_name(cls, model: Type[Model], fields_name: Tuple[str]): def _resolve_fk_fields_name(cls, model: Type[Model], fields_name: Tuple[str]):
ret = [] ret = []
@ -312,8 +338,8 @@ class Migrate:
return cls.ddl.modify_column(model, field) return cls.ddl.modify_column(model, field)
@classmethod @classmethod
def _drop_fk(cls, model: Type[Model], field_describe: dict, field_describe_target: dict): def _drop_fk(cls, model: Type[Model], field_describe: dict, reference_table_describe: dict):
return cls.ddl.drop_fk(model, field_describe, field_describe_target) return cls.ddl.drop_fk(model, field_describe, reference_table_describe)
@classmethod @classmethod
def _remove_field(cls, model: Type[Model], field_describe: dict): def _remove_field(cls, model: Type[Model], field_describe: dict):
@ -333,14 +359,14 @@ class Migrate:
) )
@classmethod @classmethod
def _add_fk(cls, model: Type[Model], field_describe: dict, field_describe_target: dict): def _add_fk(cls, model: Type[Model], field_describe: dict, reference_table_describe: dict):
""" """
add fk add fk
:param model: :param model:
:param field: :param field:
:return: :return:
""" """
return cls.ddl.add_fk(model, field_describe, field_describe_target) return cls.ddl.add_fk(model, field_describe, reference_table_describe)
@classmethod @classmethod
def _merge_operators(cls): def _merge_operators(cls):

32
poetry.lock generated
View File

@ -495,23 +495,30 @@ python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*"
[[package]] [[package]]
name = "tortoise-orm" name = "tortoise-orm"
version = "0.16.20" version = "0.16.21"
description = "Easy async ORM for python, built with relations in mind" description = "Easy async ORM for python, built with relations in mind"
category = "main" category = "main"
optional = false optional = false
python-versions = ">=3.7,<4.0" python-versions = "^3.7"
develop = false
[package.dependencies] [package.dependencies]
aiosqlite = ">=0.16.0,<0.17.0" aiosqlite = "^0.16.0"
iso8601 = ">=0.1.13,<0.2.0" iso8601 = "^0.1.13"
pypika = ">=0.44.0,<0.45.0" pypika = "^0.44.0"
pytz = ">=2020.4,<2021.0" pytz = "^2020.4"
[package.extras] [package.extras]
docs = ["pygments", "cloud-sptheme", "docutils", "sphinx"] accel = ["ciso8601 (>=2.1.2,<3.0.0)", "uvloop (>=0.14.0,<0.15.0)", "python-rapidjson"]
aiomysql = ["aiomysql"]
asyncpg = ["asyncpg"] asyncpg = ["asyncpg"]
accel = ["ciso8601 (>=2.1.2,<3.0.0)", "python-rapidjson", "uvloop (>=0.14.0,<0.15.0)"] aiomysql = ["aiomysql"]
docs = ["sphinx", "cloud-sptheme", "pygments", "docutils"]
[package.source]
type = "git"
url = "https://github.com/tortoise/tortoise-orm.git"
reference = "develop"
resolved_reference = "37bb36ef3a715b03d18c30452764b348eac21c21"
[[package]] [[package]]
name = "typed-ast" name = "typed-ast"
@ -547,7 +554,7 @@ dbdrivers = ["aiomysql", "asyncpg"]
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = "^3.7" python-versions = "^3.7"
content-hash = "9adf7beba99d615c71a9148391386c9016cbafc7c11c5fc3ad81c8ec61026236" content-hash = "57603697a31bfe9829bf3706b607c62edb7a3c1b18f45db6752e2d2261f0db41"
[metadata.files] [metadata.files]
aiomysql = [ aiomysql = [
@ -828,10 +835,7 @@ toml = [
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"}, {file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},
{file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"}, {file = "toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f"},
] ]
tortoise-orm = [ tortoise-orm = []
{file = "tortoise-orm-0.16.20.tar.gz", hash = "sha256:a501eab941fcf2cea5bf58d2ac6cb3420457c307106985c06aa6282f1ac86687"},
{file = "tortoise_orm-0.16.20-py3-none-any.whl", hash = "sha256:de8802b2d03a3b6802e684d4afef9e86ae5ffed3b83ad096ef22723a066b54bb"},
]
typed-ast = [ typed-ast = [
{file = "typed_ast-1.4.2-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:7703620125e4fb79b64aa52427ec192822e9f45d37d4b6625ab37ef403e1df70"}, {file = "typed_ast-1.4.2-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:7703620125e4fb79b64aa52427ec192822e9f45d37d4b6625ab37ef403e1df70"},
{file = "typed_ast-1.4.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:c9aadc4924d4b5799112837b226160428524a9a45f830e0d0f184b19e4090487"}, {file = "typed_ast-1.4.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:c9aadc4924d4b5799112837b226160428524a9a45f830e0d0f184b19e4090487"},

View File

@ -16,7 +16,7 @@ include = ["CHANGELOG.md", "LICENSE", "README.md"]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.7" python = "^3.7"
tortoise-orm = "^0.16.21" tortoise-orm = { git = "https://github.com/tortoise/tortoise-orm.git", branch = "develop" }
click = "*" click = "*"
pydantic = "*" pydantic = "*"
aiomysql = { version = "*", optional = true } aiomysql = { version = "*", optional = true }

View File

@ -27,13 +27,13 @@ class User(Model):
last_login = fields.DatetimeField(description="Last Login", default=datetime.datetime.now) last_login = fields.DatetimeField(description="Last Login", default=datetime.datetime.now)
is_active = fields.BooleanField(default=True, description="Is Active") is_active = fields.BooleanField(default=True, description="Is Active")
is_superuser = fields.BooleanField(default=False, description="Is SuperUser") is_superuser = fields.BooleanField(default=False, description="Is SuperUser")
avatar = fields.CharField(max_length=200, default="")
intro = fields.TextField(default="") intro = fields.TextField(default="")
class Email(Model): class Email(Model):
email = fields.CharField(max_length=200) email = fields.CharField(max_length=200, index=True)
is_primary = fields.BooleanField(default=False) is_primary = fields.BooleanField(default=False)
address = fields.CharField(max_length=200)
class Category(Model): class Category(Model):
@ -60,3 +60,4 @@ class Config(Model):
key = fields.CharField(max_length=20) key = fields.CharField(max_length=20)
value = fields.JSONField() value = fields.JSONField()
status: Status = fields.IntEnumField(Status, default=Status.on) status: Status = fields.IntEnumField(Status, default=Status.on)
user = fields.ForeignKeyField("models.User", description="User")

View File

@ -24,7 +24,7 @@ class Status(IntEnum):
class User(Model): class User(Model):
username = fields.CharField(max_length=20) username = fields.CharField(max_length=20)
password = fields.CharField(max_length=200) password = fields.CharField(max_length=200)
last_login_at = fields.DatetimeField(description="Last Login", default=datetime.datetime.now) last_login = fields.DatetimeField(description="Last Login", default=datetime.datetime.now)
is_active = fields.BooleanField(default=True, description="Is Active") is_active = fields.BooleanField(default=True, description="Is Active")
is_superuser = fields.BooleanField(default=False, description="Is SuperUser") is_superuser = fields.BooleanField(default=False, description="Is SuperUser")
avatar = fields.CharField(max_length=200, default="") avatar = fields.CharField(max_length=200, default="")
@ -34,17 +34,18 @@ class User(Model):
class Email(Model): class Email(Model):
email = fields.CharField(max_length=200) email = fields.CharField(max_length=200)
is_primary = fields.BooleanField(default=False) is_primary = fields.BooleanField(default=False)
user = fields.ForeignKeyField("diff_models.User", db_constraint=True) user = fields.ForeignKeyField("models.User", db_constraint=False)
class Category(Model): class Category(Model):
slug = fields.CharField(max_length=200) slug = fields.CharField(max_length=200)
user = fields.ForeignKeyField("diff_models.User", description="User") name = fields.CharField(max_length=200)
user = fields.ForeignKeyField("models.User", description="User")
created_at = fields.DatetimeField(auto_now_add=True) created_at = fields.DatetimeField(auto_now_add=True)
class Product(Model): class Product(Model):
categories = fields.ManyToManyField("diff_models.Category") categories = fields.ManyToManyField("models.Category")
name = fields.CharField(max_length=50) name = fields.CharField(max_length=50)
view_num = fields.IntField(description="View Num") view_num = fields.IntField(description="View Num")
sort = fields.IntField() sort = fields.IntField()

View File

@ -180,7 +180,9 @@ def test_drop_index():
def test_add_fk(): def test_add_fk():
ret = Migrate.ddl.add_fk(Category, Category._meta.fields_map.get("user").describe(False)) ret = Migrate.ddl.add_fk(
Category, Category._meta.fields_map.get("user").describe(False), User.describe(False)
)
if isinstance(Migrate.ddl, MysqlDDL): if isinstance(Migrate.ddl, MysqlDDL):
assert ( assert (
ret ret
@ -194,7 +196,9 @@ def test_add_fk():
def test_drop_fk(): def test_drop_fk():
ret = Migrate.ddl.drop_fk(Category, Category._meta.fields_map.get("user")) ret = Migrate.ddl.drop_fk(
Category, Category._meta.fields_map.get("user").describe(False), User.describe(False)
)
if isinstance(Migrate.ddl, MysqlDDL): if isinstance(Migrate.ddl, MysqlDDL):
assert ret == "ALTER TABLE `category` DROP FOREIGN KEY `fk_category_user_e2e3874c`" assert ret == "ALTER TABLE `category` DROP FOREIGN KEY `fk_category_user_e2e3874c`"
elif isinstance(Migrate.ddl, PostgresDDL): elif isinstance(Migrate.ddl, PostgresDDL):

View File

@ -1,25 +1,753 @@
import pytest import pytest
from pytest_mock import MockerFixture
from tortoise import Tortoise
from aerich.ddl.mysql import MysqlDDL from aerich.ddl.mysql import MysqlDDL
from aerich.ddl.postgres import PostgresDDL from aerich.ddl.postgres import PostgresDDL
from aerich.ddl.sqlite import SqliteDDL from aerich.ddl.sqlite import SqliteDDL
from aerich.exceptions import NotSupportError from aerich.exceptions import NotSupportError
from aerich.migrate import Migrate from aerich.migrate import Migrate
from aerich.utils import get_models_describe
old_models_describe = {
"models.Category": {
"name": "models.Category",
"app": "models",
"table": "category",
"abstract": False,
"description": None,
"docstring": None,
"unique_together": [],
"pk_field": {
"name": "id",
"field_type": "IntField",
"db_column": "id",
"python_type": "int",
"generated": True,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
"data_fields": [
{
"name": "slug",
"field_type": "CharField",
"db_column": "slug",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "name",
"field_type": "CharField",
"db_column": "name",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "created_at",
"field_type": "DatetimeField",
"db_column": "created_at",
"python_type": "datetime.datetime",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"readOnly": True},
"db_field_types": {
"": "TIMESTAMP",
"mysql": "DATETIME(6)",
"postgres": "TIMESTAMPTZ",
},
},
{
"name": "user_id",
"field_type": "IntField",
"db_column": "user_id",
"python_type": "int",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": "User",
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
],
"fk_fields": [
{
"name": "user",
"field_type": "ForeignKeyFieldInstance",
"python_type": "models.User",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": "User",
"docstring": None,
"constraints": {},
"raw_field": "user_id",
"on_delete": "CASCADE",
}
],
"backward_fk_fields": [],
"o2o_fields": [],
"backward_o2o_fields": [],
"m2m_fields": [
{
"name": "products",
"field_type": "ManyToManyFieldInstance",
"python_type": "models.Product",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
}
],
},
"models.Config": {
"name": "models.Config",
"app": "models",
"table": "config",
"abstract": False,
"description": None,
"docstring": None,
"unique_together": [],
"pk_field": {
"name": "id",
"field_type": "IntField",
"db_column": "id",
"python_type": "int",
"generated": True,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
"data_fields": [
{
"name": "label",
"field_type": "CharField",
"db_column": "label",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "key",
"field_type": "CharField",
"db_column": "key",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 20},
"db_field_types": {"": "VARCHAR(20)"},
},
{
"name": "value",
"field_type": "JSONField",
"db_column": "value",
"python_type": "Union[dict, list]",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
"db_field_types": {"": "TEXT", "postgres": "JSONB"},
},
{
"name": "status",
"field_type": "IntEnumFieldInstance",
"db_column": "status",
"python_type": "int",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": 1,
"description": "on: 1\noff: 0",
"docstring": None,
"constraints": {"ge": -32768, "le": 32767},
"db_field_types": {"": "SMALLINT"},
},
],
"fk_fields": [],
"backward_fk_fields": [],
"o2o_fields": [],
"backward_o2o_fields": [],
"m2m_fields": [],
},
"models.Email": {
"name": "models.Email",
"app": "models",
"table": "email",
"abstract": False,
"description": None,
"docstring": None,
"unique_together": [],
"pk_field": {
"name": "id",
"field_type": "IntField",
"db_column": "id",
"python_type": "int",
"generated": True,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
"data_fields": [
{
"name": "email",
"field_type": "CharField",
"db_column": "email",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "is_primary",
"field_type": "BooleanField",
"db_column": "is_primary",
"python_type": "bool",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": False,
"description": None,
"docstring": None,
"constraints": {},
"db_field_types": {"": "BOOL", "sqlite": "INT"},
},
{
"name": "user_id",
"field_type": "IntField",
"db_column": "user_id",
"python_type": "int",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
],
"fk_fields": [
{
"name": "user",
"field_type": "ForeignKeyFieldInstance",
"python_type": "models.User",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
"raw_field": "user_id",
"on_delete": "CASCADE",
}
],
"backward_fk_fields": [],
"o2o_fields": [],
"backward_o2o_fields": [],
"m2m_fields": [],
},
"models.Product": {
"name": "models.Product",
"app": "models",
"table": "product",
"abstract": False,
"description": None,
"docstring": None,
"unique_together": [],
"pk_field": {
"name": "id",
"field_type": "IntField",
"db_column": "id",
"python_type": "int",
"generated": True,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
"data_fields": [
{
"name": "name",
"field_type": "CharField",
"db_column": "name",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 50},
"db_field_types": {"": "VARCHAR(50)"},
},
{
"name": "view_num",
"field_type": "IntField",
"db_column": "view_num",
"python_type": "int",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": "View Num",
"docstring": None,
"constraints": {"ge": -2147483648, "le": 2147483647},
"db_field_types": {"": "INT"},
},
{
"name": "sort",
"field_type": "IntField",
"db_column": "sort",
"python_type": "int",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": -2147483648, "le": 2147483647},
"db_field_types": {"": "INT"},
},
{
"name": "is_reviewed",
"field_type": "BooleanField",
"db_column": "is_reviewed",
"python_type": "bool",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": "Is Reviewed",
"docstring": None,
"constraints": {},
"db_field_types": {"": "BOOL", "sqlite": "INT"},
},
{
"name": "type",
"field_type": "IntEnumFieldInstance",
"db_column": "type",
"python_type": "int",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": "Product Type",
"docstring": None,
"constraints": {"ge": -32768, "le": 32767},
"db_field_types": {"": "SMALLINT"},
},
{
"name": "image",
"field_type": "CharField",
"db_column": "image",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "body",
"field_type": "TextField",
"db_column": "body",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
"db_field_types": {"": "TEXT", "mysql": "LONGTEXT"},
},
{
"name": "created_at",
"field_type": "DatetimeField",
"db_column": "created_at",
"python_type": "datetime.datetime",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"readOnly": True},
"db_field_types": {
"": "TIMESTAMP",
"mysql": "DATETIME(6)",
"postgres": "TIMESTAMPTZ",
},
},
],
"fk_fields": [],
"backward_fk_fields": [],
"o2o_fields": [],
"backward_o2o_fields": [],
"m2m_fields": [
{
"name": "categories",
"field_type": "ManyToManyFieldInstance",
"python_type": "models.Category",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
}
],
},
"models.User": {
"name": "models.User",
"app": "models",
"table": "user",
"abstract": False,
"description": None,
"docstring": None,
"unique_together": [],
"pk_field": {
"name": "id",
"field_type": "IntField",
"db_column": "id",
"python_type": "int",
"generated": True,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
"data_fields": [
{
"name": "username",
"field_type": "CharField",
"db_column": "username",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 20},
"db_field_types": {"": "VARCHAR(20)"},
},
{
"name": "password",
"field_type": "CharField",
"db_column": "password",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "last_login",
"field_type": "DatetimeField",
"db_column": "last_login",
"python_type": "datetime.datetime",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": "<function None.now>",
"description": "Last Login",
"docstring": None,
"constraints": {},
"db_field_types": {
"": "TIMESTAMP",
"mysql": "DATETIME(6)",
"postgres": "TIMESTAMPTZ",
},
},
{
"name": "is_active",
"field_type": "BooleanField",
"db_column": "is_active",
"python_type": "bool",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": True,
"description": "Is Active",
"docstring": None,
"constraints": {},
"db_field_types": {"": "BOOL", "sqlite": "INT"},
},
{
"name": "is_superuser",
"field_type": "BooleanField",
"db_column": "is_superuser",
"python_type": "bool",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": False,
"description": "Is SuperUser",
"docstring": None,
"constraints": {},
"db_field_types": {"": "BOOL", "sqlite": "INT"},
},
{
"name": "avatar",
"field_type": "CharField",
"db_column": "avatar",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": "",
"description": None,
"docstring": None,
"constraints": {"max_length": 200},
"db_field_types": {"": "VARCHAR(200)"},
},
{
"name": "intro",
"field_type": "TextField",
"db_column": "intro",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": "",
"description": None,
"docstring": None,
"constraints": {},
"db_field_types": {"": "TEXT", "mysql": "LONGTEXT"},
},
],
"fk_fields": [],
"backward_fk_fields": [
{
"name": "categorys",
"field_type": "BackwardFKRelation",
"python_type": "models.Category",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": "User",
"docstring": None,
"constraints": {},
},
{
"name": "emails",
"field_type": "BackwardFKRelation",
"python_type": "models.Email",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
},
],
"o2o_fields": [],
"backward_o2o_fields": [],
"m2m_fields": [],
},
"models.Aerich": {
"name": "models.Aerich",
"app": "models",
"table": "aerich",
"abstract": False,
"description": None,
"docstring": None,
"unique_together": [],
"pk_field": {
"name": "id",
"field_type": "IntField",
"db_column": "id",
"python_type": "int",
"generated": True,
"Noneable": False,
"unique": True,
"indexed": True,
"default": None,
"description": None,
"docstring": None,
"constraints": {"ge": 1, "le": 2147483647},
"db_field_types": {"": "INT"},
},
"data_fields": [
{
"name": "version",
"field_type": "CharField",
"db_column": "version",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 255},
"db_field_types": {"": "VARCHAR(255)"},
},
{
"name": "app",
"field_type": "CharField",
"db_column": "app",
"python_type": "str",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {"max_length": 20},
"db_field_types": {"": "VARCHAR(20)"},
},
{
"name": "content",
"field_type": "JSONField",
"db_column": "content",
"python_type": "Union[dict, list]",
"generated": False,
"Noneable": False,
"unique": False,
"indexed": False,
"default": None,
"description": None,
"docstring": None,
"constraints": {},
"db_field_types": {"": "TEXT", "postgres": "JSONB"},
},
],
"fk_fields": [],
"backward_fk_fields": [],
"o2o_fields": [],
"backward_o2o_fields": [],
"m2m_fields": [],
},
}
def test_migrate(mocker: MockerFixture): def test_migrate():
mocker.patch("click.prompt", return_value=True) """
apps = Tortoise.apps models.py diff with old_models.py
models = apps.get("models") - add field: Email.address
diff_models = apps.get("diff_models") - add fk: Config.user
Migrate.diff_models(diff_models, models) - drop fk: Email.user
- drop field: User.avatar
- add index: Email.email
- remove unique: User.username
"""
models_describe = get_models_describe("models")
Migrate.diff_models(old_models_describe, models_describe)
if isinstance(Migrate.ddl, SqliteDDL): if isinstance(Migrate.ddl, SqliteDDL):
with pytest.raises(NotSupportError): with pytest.raises(NotSupportError):
Migrate.diff_models(models, diff_models, False) Migrate.diff_models(models_describe, old_models_describe, False)
else: else:
Migrate.diff_models(models, diff_models, False) Migrate.diff_models(models_describe, old_models_describe, False)
Migrate._merge_operators() Migrate._merge_operators()
if isinstance(Migrate.ddl, MysqlDDL): if isinstance(Migrate.ddl, MysqlDDL):
assert Migrate.upgrade_operators == [ assert Migrate.upgrade_operators == [