Increase OAS description

Parce docstring of http handlers to increase OAS
Remove the not expected definitions key in the OAS
This commit is contained in:
Vincent Maillol
2020-12-20 11:05:24 +01:00
parent 25fcac18ec
commit 070d7e7259
12 changed files with 470 additions and 116 deletions

View File

@@ -1,10 +1,28 @@
import argparse
import importlib
import json
from typing import Dict, Protocol, Optional, Callable
import sys
from .view import generate_oas
class YamlModule(Protocol):
"""
Yaml Module type hint
"""
def dump(self, data) -> str:
pass
yaml: Optional[YamlModule]
try:
import yaml
except ImportError:
yaml = None
def application_type(value):
"""
Return aiohttp application defined in the value.
@@ -26,6 +44,35 @@ def application_type(value):
raise argparse.ArgumentTypeError(error) from error
def base_oas_file_type(value) -> Dict:
"""
Load base oas file
"""
try:
with open(value) as oas_file:
data = oas_file.read()
except OSError as error:
raise argparse.ArgumentTypeError(error) from error
return json.loads(data)
def format_type(value) -> Callable:
"""
Date Dumper one of (json, yaml)
"""
dumpers = {"json": lambda data: json.dumps(data, sort_keys=True, indent=4)}
if yaml is not None:
dumpers["yaml"] = yaml.dump
try:
return dumpers[value]
except KeyError:
raise argparse.ArgumentTypeError(
f"Wrong format value. (allowed values: {tuple(dumpers.keys())})"
) from None
def setup(parser: argparse.ArgumentParser):
parser.add_argument(
"apps",
@@ -35,11 +82,52 @@ def setup(parser: argparse.ArgumentParser):
help="The name of the module containing the asyncio.web.Application."
" By default the variable named 'app' is loaded but you can define"
" an other variable name ending the name of module with : characters"
" and the name of variable. Example: my_package.my_module:my_app",
" and the name of variable. Example: my_package.my_module:my_app"
" If your asyncio.web.Application is returned by a function, you can"
" use the syntax: my_package.my_module:my_app()",
)
parser.add_argument(
"-b",
"--base-oas-file",
metavar="FILE",
dest="base",
type=base_oas_file_type,
help="A file that will be used as base to generate OAS",
default={},
)
parser.add_argument(
"-o",
"--output",
metavar="FILE",
type=argparse.FileType("w"),
help="File to write the output",
default=sys.stdout,
)
if yaml:
help_output_format = (
"The output format, can be 'json' or 'yaml' (default is json)"
)
else:
help_output_format = "The output format, only 'json' is available install pyyaml to have yaml output format"
parser.add_argument(
"-f",
"--format",
metavar="FORMAT",
dest="formatter",
type=format_type,
help=help_output_format,
default=format_type("json"),
)
parser.set_defaults(func=show_oas)
def show_oas(args: argparse.Namespace):
print(json.dumps(generate_oas(args.apps), sort_keys=True, indent=4))
"""
Display Open API Specification on the stdout.
"""
spec = args.base
spec.update(generate_oas(args.apps))
print(args.formatter(spec), file=args.output)

View File

@@ -0,0 +1,120 @@
"""
Utility to extract extra OAS description from docstring.
"""
import re
import textwrap
from typing import Dict
class LinesIterator:
def __init__(self, lines: str):
self._lines = lines.splitlines()
self._i = -1
def next_line(self) -> str:
if self._i == len(self._lines) - 1:
raise StopIteration from None
self._i += 1
return self._lines[self._i]
def rewind(self) -> str:
if self._i == -1:
raise StopIteration from None
self._i -= 1
return self._lines[self._i]
def __iter__(self):
return self
def __next__(self):
return self.next_line()
def _i_extract_block(lines: LinesIterator):
"""
Iter the line within an indented block and dedent them.
"""
# Go to the first not empty or not white space line.
try:
line = next(lines)
except StopIteration:
return # No block to extract.
while line.strip() == "":
try:
line = next(lines)
except StopIteration:
return
# Get the size of the indentation.
if (match := re.search("^ +", line)) is None:
return # No block to extract.
indent = match.group()
yield line[len(indent) :]
# Yield lines until the indentation is the same or is greater than
# the first block line.
try:
line = next(lines)
except StopIteration:
return
while (is_empty := line.strip() == "") or line.startswith(indent):
yield "" if is_empty else line[len(indent) :]
try:
line = next(lines)
except StopIteration:
return
lines.rewind()
def _dedent_under_first_line(text: str) -> str:
"""
Apply textwrap.dedent ignoring the first line.
"""
lines = text.splitlines()
other_lines = "\n".join(lines[1:])
if other_lines:
return f"{lines[0]}\n{textwrap.dedent(other_lines)}"
return text
def status_code(docstring: str) -> Dict[int, str]:
"""
Extract the "Status Code:" block of the docstring.
"""
iterator = LinesIterator(docstring)
for line in iterator:
if re.fullmatch("status\\s+codes?\\s*:", line, re.IGNORECASE):
blocks = []
lines = []
for line_of_block in _i_extract_block(iterator):
if re.search("^\\d{3}\\s*:", line_of_block):
if lines:
blocks.append("\n".join(lines))
lines = []
lines.append(line_of_block)
if lines:
blocks.append("\n".join(lines))
return {
int(status.strip()): _dedent_under_first_line(desc.strip())
for status, desc in (block.split(":", 1) for block in blocks)
}
return {}
def operation(docstring: str) -> str:
"""
Extract all docstring except the "Status Code:" block.
"""
lines = LinesIterator(docstring)
ret = []
for line in lines:
if re.fullmatch("status\\s+codes?\\s*:", line, re.IGNORECASE):
for _ in _i_extract_block(lines):
pass
else:
ret.append(line)
return ("\n".join(ret)).strip()

View File

@@ -133,6 +133,7 @@ class Parameters:
class Response:
def __init__(self, spec: dict):
self._spec = spec
self._spec.setdefault("description", "")
@property
def description(self) -> str:

View File

@@ -8,6 +8,7 @@ from aiohttp.web_app import Application
from pydantic import BaseModel
from aiohttp_pydantic.oas.struct import OpenApiSpec3, OperationObject, PathItem
from . import docstring_parser
from ..injectors import _parse_func_signature
from ..utils import is_pydantic_base_model
@@ -18,7 +19,7 @@ from .typing import is_status_code_type
def _handle_optional(type_):
"""
Returns the type wrapped in Optional or None.
>>> from typing import Optional
>>> _handle_optional(int)
>>> _handle_optional(Optional[str])
<class 'str'>
@@ -36,14 +37,15 @@ class _OASResponseBuilder:
generate the OAS operation response.
"""
def __init__(self, oas: OpenApiSpec3, oas_operation):
def __init__(self, oas: OpenApiSpec3, oas_operation, status_code_descriptions):
self._oas_operation = oas_operation
self._oas = oas
self._status_code_descriptions = status_code_descriptions
def _handle_pydantic_base_model(self, obj):
if is_pydantic_base_model(obj):
response_schema = obj.schema(ref_template="#/components/schemas/{model}")
if def_sub_schemas := response_schema.get("definitions", None):
if def_sub_schemas := response_schema.pop("definitions", None):
self._oas.components.schemas.update(def_sub_schemas)
return response_schema
return {}
@@ -64,10 +66,16 @@ class _OASResponseBuilder:
"schema": self._handle_list(typing.get_args(obj)[0])
}
}
desc = self._status_code_descriptions.get(int(status_code))
if desc:
self._oas_operation.responses[status_code].description = desc
elif is_status_code_type(obj):
status_code = obj.__name__[1:]
self._oas_operation.responses[status_code].content = {}
desc = self._status_code_descriptions.get(int(status_code))
if desc:
self._oas_operation.responses[status_code].description = desc
def _handle_union(self, obj):
if typing.get_origin(obj) is typing.Union:
@@ -90,13 +98,16 @@ def _add_http_method_to_oas(
)
description = getdoc(handler)
if description:
oas_operation.description = description
oas_operation.description = docstring_parser.operation(description)
status_code_descriptions = docstring_parser.status_code(description)
else:
status_code_descriptions = {}
if body_args:
body_schema = next(iter(body_args.values())).schema(
ref_template="#/components/schemas/{model}"
)
if def_sub_schemas := body_schema.get("definitions", None):
if def_sub_schemas := body_schema.pop("definitions", None):
oas.components.schemas.update(def_sub_schemas)
oas_operation.request_body.content = {
@@ -127,7 +138,9 @@ def _add_http_method_to_oas(
return_type = handler.__annotations__.get("return")
if return_type is not None:
_OASResponseBuilder(oas, oas_operation).build(return_type)
_OASResponseBuilder(oas, oas_operation, status_code_descriptions).build(
return_type
)
def generate_oas(apps: List[Application]) -> dict:

View File

@@ -9,8 +9,14 @@ from aiohttp.web_exceptions import HTTPMethodNotAllowed
from aiohttp.web_response import StreamResponse
from pydantic import ValidationError
from .injectors import (AbstractInjector, BodyGetter, HeadersGetter,
MatchInfoGetter, QueryGetter, _parse_func_signature)
from .injectors import (
AbstractInjector,
BodyGetter,
HeadersGetter,
MatchInfoGetter,
QueryGetter,
_parse_func_signature,
)
class PydanticView(AbstractView):