python-betterproto/betterproto/tests/test_service_stub.py
Nat Noordanus c762c9c549 Add test for generated service stub
- Create one simple test for generated Service stubs in preparation
for making more changes in this area.
- Add dev dependency on pytest-asyncio in order to use ChannelFor
from grpclib.testing more easily.
- Create a new example proto containing a minimal rpc example.
2020-04-12 19:37:39 +02:00

36 lines
1.1 KiB
Python

import betterproto
import grpclib
from grpclib.testing import ChannelFor
import pytest
from typing import Dict
from .service import DoThingResponse, DoThingRequest, ExampleServiceStub
class ExampleService:
async def DoThing(self, stream: 'grpclib.server.Stream[DoThingRequest, DoThingResponse]'):
request = await stream.recv_message()
for iteration in range(request.iterations):
pass
await stream.send_message(DoThingResponse(request.iterations))
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
'/service.ExampleService/DoThing': grpclib.const.Handler(
self.DoThing,
grpclib.const.Cardinality.UNARY_UNARY,
DoThingRequest,
DoThingResponse,
),
}
@pytest.mark.asyncio
async def test_simple_service_call():
ITERATIONS = 42
async with ChannelFor([ExampleService()]) as channel:
stub = ExampleServiceStub(channel)
response = await stub.do_thing(iterations=ITERATIONS)
assert response.successful_iterations == ITERATIONS