185 lines
6.0 KiB
Python
185 lines
6.0 KiB
Python
import importlib
|
|
import json
|
|
import os
|
|
import sys
|
|
from enum import Enum
|
|
|
|
import asyncclick as click
|
|
from asyncclick import BadOptionUsage, Context, UsageError
|
|
from tortoise import Tortoise, generate_schema_for_client
|
|
|
|
from alice.migrate import Migrate
|
|
from alice.utils import get_app_connection
|
|
|
|
|
|
class Color(str, Enum):
|
|
green = "green"
|
|
red = "red"
|
|
yellow = "yellow"
|
|
|
|
|
|
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
|
|
@click.option(
|
|
"--config",
|
|
default="settings",
|
|
show_default=True,
|
|
help="Tortoise-ORM config module, will auto read dict config variable from it.",
|
|
)
|
|
@click.option(
|
|
"--tortoise-orm",
|
|
default="TORTOISE_ORM",
|
|
show_default=True,
|
|
help="Tortoise-ORM config dict variable.",
|
|
)
|
|
@click.option(
|
|
"--location", default="./migrations", show_default=True, help="Migrate store location."
|
|
)
|
|
@click.option("--app", default="models", show_default=True, help="Tortoise-ORM app name.")
|
|
@click.pass_context
|
|
async def cli(ctx: Context, config, tortoise_orm, location, app):
|
|
ctx.ensure_object(dict)
|
|
try:
|
|
config_module = importlib.import_module(config, ".")
|
|
except ModuleNotFoundError:
|
|
raise BadOptionUsage(ctx=ctx, message=f'No module named "{config}"', option_name="--config")
|
|
config = getattr(config_module, tortoise_orm, None)
|
|
if not config:
|
|
raise BadOptionUsage(
|
|
option_name="--config",
|
|
message=f'Can\'t get "{tortoise_orm}" from module "{config_module}"',
|
|
ctx=ctx,
|
|
)
|
|
|
|
if app not in config.get("apps").keys():
|
|
raise BadOptionUsage(option_name="--config", message=f'No app found in "{config}"', ctx=ctx)
|
|
|
|
ctx.obj["config"] = config
|
|
ctx.obj["location"] = location
|
|
ctx.obj["app"] = app
|
|
|
|
if ctx.invoked_subcommand == "init":
|
|
await Tortoise.init(config=config)
|
|
else:
|
|
if not os.path.isdir(location):
|
|
raise UsageError("You must exec init first", ctx=ctx)
|
|
await Migrate.init_with_old_models(config, app, location)
|
|
|
|
|
|
@cli.command(help="Generate migrate changes file.")
|
|
@click.option("--name", default="update", show_default=True, help="Migrate name.")
|
|
@click.pass_context
|
|
async def migrate(ctx: Context, name):
|
|
config = ctx.obj["config"]
|
|
location = ctx.obj["location"]
|
|
app = ctx.obj["app"]
|
|
|
|
ret = Migrate.migrate(name)
|
|
if not ret:
|
|
return click.secho("No changes detected", fg=Color.yellow)
|
|
Migrate.write_old_models(config, app, location)
|
|
click.secho(f"Success migrate {ret}", fg=Color.green)
|
|
|
|
|
|
@cli.command(help="Upgrade to latest version.")
|
|
@click.pass_context
|
|
async def upgrade(ctx: Context):
|
|
app = ctx.obj["app"]
|
|
config = ctx.obj["config"]
|
|
connection = get_app_connection(config, app)
|
|
available_versions = Migrate.get_all_version_files(is_all=False)
|
|
if not available_versions:
|
|
return click.secho("No migrate items", fg=Color.yellow)
|
|
async with connection._in_transaction() as conn:
|
|
for file in available_versions:
|
|
file_path = os.path.join(Migrate.migrate_location, file)
|
|
with open(file_path, "r") as f:
|
|
content = json.load(f)
|
|
upgrade_query_list = content.get("upgrade")
|
|
for upgrade_query in upgrade_query_list:
|
|
await conn.execute_query(upgrade_query)
|
|
|
|
with open(file_path, "w") as f:
|
|
content["migrate"] = True
|
|
json.dump(content, f, indent=4)
|
|
click.secho(f"Success upgrade {file}", fg=Color.green)
|
|
|
|
|
|
@cli.command(help="Downgrade to previous version.")
|
|
@click.pass_context
|
|
async def downgrade(ctx: Context):
|
|
app = ctx.obj["app"]
|
|
config = ctx.obj["config"]
|
|
connection = get_app_connection(config, app)
|
|
available_versions = Migrate.get_all_version_files()
|
|
if not available_versions:
|
|
return click.secho("No migrate items", fg=Color.yellow)
|
|
|
|
async with connection._in_transaction() as conn:
|
|
for file in reversed(available_versions):
|
|
file_path = os.path.join(Migrate.migrate_location, file)
|
|
with open(file_path, "r") as f:
|
|
content = json.load(f)
|
|
if content.get("migrate"):
|
|
downgrade_query_list = content.get("downgrade")
|
|
for downgrade_query in downgrade_query_list:
|
|
await conn.execute_query(downgrade_query)
|
|
else:
|
|
continue
|
|
with open(file_path, "w") as f:
|
|
content["migrate"] = False
|
|
json.dump(content, f, indent=4)
|
|
return click.secho(f"Success downgrade {file}", fg=Color.green)
|
|
|
|
|
|
@cli.command(help="Show current available heads in migrate location.")
|
|
@click.pass_context
|
|
def heads(ctx: Context):
|
|
for version in Migrate.get_all_version_files(is_all=False):
|
|
click.secho(version, fg=Color.yellow)
|
|
|
|
|
|
@cli.command(help="List all migrate items.")
|
|
@click.pass_context
|
|
def history(ctx):
|
|
for version in Migrate.get_all_version_files():
|
|
click.secho(version, fg=Color.yellow)
|
|
|
|
|
|
@cli.command(
|
|
help="Init migrate location and generate schema, you must exec first."
|
|
)
|
|
@click.option(
|
|
"--safe",
|
|
is_flag=True,
|
|
default=True,
|
|
help="When set to true, creates the table only when it does not already exist.",
|
|
show_default=True,
|
|
)
|
|
@click.pass_context
|
|
async def init(ctx: Context, safe):
|
|
location = ctx.obj["location"]
|
|
app = ctx.obj["app"]
|
|
config = ctx.obj["config"]
|
|
|
|
if not os.path.isdir(location):
|
|
os.mkdir(location)
|
|
|
|
dirname = os.path.join(location, app)
|
|
if not os.path.isdir(dirname):
|
|
os.mkdir(dirname)
|
|
click.secho(f"Success create migrate location {dirname}", fg=Color.green)
|
|
else:
|
|
return click.secho(f'Already inited app "{app}"', fg=Color.yellow)
|
|
|
|
Migrate.write_old_models(config, app, location)
|
|
|
|
connection = get_app_connection(config, app)
|
|
await generate_schema_for_client(connection, safe)
|
|
|
|
return click.secho(f'Success init for app "{app}"', fg=Color.green)
|
|
|
|
|
|
def main():
|
|
sys.path.insert(0, ".")
|
|
cli(_anyio_backend="asyncio")
|