Instead of delete key 'definitions' from schema, bc schema was "cached" and if you try to load swagger twice, you got "no definitions" exception
187 lines
6.1 KiB
Python
187 lines
6.1 KiB
Python
import typing
|
|
from datetime import date, datetime
|
|
from inspect import getdoc
|
|
from itertools import count
|
|
from typing import List, Type
|
|
from uuid import UUID
|
|
|
|
from aiohttp.web import Response, json_response
|
|
from aiohttp.web_app import Application
|
|
|
|
from aiohttp_pydantic.oas.struct import OpenApiSpec3, OperationObject, PathItem
|
|
|
|
from ..injectors import _parse_func_signature
|
|
from ..utils import is_pydantic_base_model
|
|
from ..view import PydanticView, is_pydantic_view
|
|
from .typing import is_status_code_type
|
|
|
|
JSON_SCHEMA_TYPES = {
|
|
float: {"type": "number"},
|
|
str: {"type": "string"},
|
|
int: {"type": "integer"},
|
|
UUID: {"type": "string", "format": "uuid"},
|
|
bool: {"type": "boolean"},
|
|
datetime: {"type": "string", "format": "date-time"},
|
|
date: {"type": "string", "format": "date"},
|
|
}
|
|
|
|
|
|
def _handle_optional(type_):
|
|
"""
|
|
Returns the type wrapped in Optional or None.
|
|
|
|
>>> _handle_optional(int)
|
|
>>> _handle_optional(Optional[str])
|
|
<class 'str'>
|
|
"""
|
|
if typing.get_origin(type_) is typing.Union:
|
|
args = typing.get_args(type_)
|
|
if len(args) == 2 and type(None) in args:
|
|
return next(iter(set(args) - {type(None)}))
|
|
return None
|
|
|
|
|
|
class _OASResponseBuilder:
|
|
"""
|
|
Parse the type annotated as returned by a function and
|
|
generate the OAS operation response.
|
|
"""
|
|
|
|
def __init__(self, oas_operation, definitions):
|
|
self._oas_operation = oas_operation
|
|
self._definitions = definitions
|
|
|
|
def _process_definitions(self, schema):
|
|
if 'definitions' in schema:
|
|
for k, v in schema['definitions'].items():
|
|
self._definitions[k] = v
|
|
|
|
return {i:schema[i] for i in schema if i!='definitions'}
|
|
|
|
def _handle_pydantic_base_model(self, obj):
|
|
if is_pydantic_base_model(obj):
|
|
return self._process_definitions(obj.schema())
|
|
|
|
return {}
|
|
|
|
def _handle_list(self, obj):
|
|
if typing.get_origin(obj) is list:
|
|
return {
|
|
"type": "array",
|
|
"items": self._handle_pydantic_base_model(typing.get_args(obj)[0]),
|
|
}
|
|
return self._handle_pydantic_base_model(obj)
|
|
|
|
def _handle_status_code_type(self, obj):
|
|
if is_status_code_type(typing.get_origin(obj)):
|
|
status_code = typing.get_origin(obj).__name__[1:]
|
|
self._oas_operation.responses[status_code].content = {
|
|
"application/json": {
|
|
"schema": self._handle_list(typing.get_args(obj)[0])
|
|
}
|
|
}
|
|
|
|
elif is_status_code_type(obj):
|
|
status_code = obj.__name__[1:]
|
|
self._oas_operation.responses[status_code].content = {}
|
|
|
|
def _handle_union(self, obj):
|
|
if typing.get_origin(obj) is typing.Union:
|
|
for arg in typing.get_args(obj):
|
|
self._handle_status_code_type(arg)
|
|
self._handle_status_code_type(obj)
|
|
|
|
def build(self, obj):
|
|
self._handle_union(obj)
|
|
|
|
|
|
def _add_http_method_to_oas(
|
|
oas_path: PathItem, http_method: str, view: Type[PydanticView], definitions: dict
|
|
):
|
|
http_method = http_method.lower()
|
|
oas_operation: OperationObject = getattr(oas_path, http_method)
|
|
handler = getattr(view, http_method)
|
|
path_args, body_args, qs_args, header_args = _parse_func_signature(handler)
|
|
description = getdoc(handler)
|
|
if description:
|
|
oas_operation.description = description
|
|
|
|
if body_args:
|
|
oas_operation.request_body.content = {
|
|
"application/json": {"schema": next(iter(body_args.values())).schema()}
|
|
}
|
|
|
|
indexes = count()
|
|
for args_location, args in (
|
|
("path", path_args.items()),
|
|
("query", qs_args.items()),
|
|
("header", header_args.items()),
|
|
):
|
|
for name, type_ in args:
|
|
i = next(indexes)
|
|
oas_operation.parameters[i].in_ = args_location
|
|
oas_operation.parameters[i].name = name
|
|
optional_type = _handle_optional(type_)
|
|
if optional_type is None:
|
|
oas_operation.parameters[i].schema = JSON_SCHEMA_TYPES[type_]
|
|
oas_operation.parameters[i].required = True
|
|
else:
|
|
oas_operation.parameters[i].schema = JSON_SCHEMA_TYPES[optional_type]
|
|
oas_operation.parameters[i].required = False
|
|
|
|
return_type = handler.__annotations__.get("return")
|
|
if return_type is not None:
|
|
_OASResponseBuilder(oas_operation, definitions).build(return_type)
|
|
|
|
|
|
def generate_oas(apps: List[Application]) -> dict:
|
|
"""
|
|
Generate and return Open Api Specification from PydanticView in application.
|
|
"""
|
|
oas = OpenApiSpec3()
|
|
|
|
for app in apps:
|
|
for resources in app.router.resources():
|
|
for resource_route in resources:
|
|
if is_pydantic_view(resource_route.handler):
|
|
view: Type[PydanticView] = resource_route.handler
|
|
info = resource_route.get_info()
|
|
path = oas.paths[info.get("path", info.get("formatter"))]
|
|
if resource_route.method == "*":
|
|
for method_name in view.allowed_methods:
|
|
_add_http_method_to_oas(path, method_name, view, oas.definitions)
|
|
else:
|
|
_add_http_method_to_oas(path, resource_route.method, view, oas.definitions)
|
|
|
|
return oas.spec
|
|
|
|
|
|
async def get_oas(request):
|
|
"""
|
|
View to generate the Open Api Specification from PydanticView in application.
|
|
"""
|
|
apps = request.app["apps to expose"]
|
|
return json_response(generate_oas(apps))
|
|
|
|
|
|
async def oas_ui(request):
|
|
"""
|
|
View to serve the swagger-ui to read open api specification of application.
|
|
"""
|
|
template = request.app["index template"]
|
|
|
|
static_url = request.app.router["static"].url_for(filename="")
|
|
spec_url = request.app.router["spec"].url_for()
|
|
host = request.url.origin()
|
|
|
|
return Response(
|
|
text=template.render(
|
|
{
|
|
"openapi_spec_url": host.with_path(str(spec_url)),
|
|
"static_url": host.with_path(str(static_url)),
|
|
}
|
|
),
|
|
content_type="text/html",
|
|
charset="utf-8",
|
|
)
|