How to use asyncio gRPC in AioHTTP microservices
Usually I have a lot of microservices and they communicate using REST API. For communication with frontend I still need REST, but between microservices gRPC looks more promising.
Some benefits of it:
- is built on HTTP2, using multiplexed streams - better connection management
- performance benefits because of Protobuf binary format
- code generation for a multilingual environment
Here is my first attempt to use gRPC in AioHTTP applications with some explanations.
Structure and code generation
You can find the code on github.
I have there service1
and service2
, you can see how they communicate on the schema.
The Protobuf files I put in protos
directory on the same level.
I think it would be easier to manage them when there are many services.
The structure of directories in protos
should be the same as in app
directory in services,
to have right imports after the code generation.
The command for code generation looks like:
python -m grpc_tools.protoc --proto_path=../protos/service1 --python_out=. --grpc_python_out=. --mypy_out=. ../protos/service1/app/grpc/service1.proto
I put it in Makefile
for each service and required libraries are grpcio-tools
, mypy-protobuf
.
I used pyton:bullseye-slim
as a base image in Dockerfile
for services, with alpine
it tries to build gprc
and it’s imposibly slow.
To bypass flake8 linter for the generated code I excluded it in setup.cfg
[flake8]
max-line-length = 120
exclude =
app/grpc
gRPC server
To run the gRPC server I used aiohttp cleanup context.
To track the calls between services I add request_id
to the call metadata and to process it transparently there is RequestIdInterceptor
.
For the servicer I keep the reference to the aiohttp app, so if I’ll need an access to the app config or objects like db or cache, I’ll have it.
class RequestIdInterceptor(grpc.aio.ServerInterceptor):
async def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
for (header, value) in handler_call_details.invocation_metadata:
if header == "request_id":
contextvars.REQUEST_ID.set(value)
break
return await continuation(handler_call_details)
class Service1(service1_pb2_grpc.Service1Servicer):
def __init__(self, app: web.Application) -> None:
self.app = app
async def ProcessName(
self, request: service1_pb2.ProcessNameRequest, context: grpc.aio.ServicerContext
) -> service1_pb2.ProcessNameResponse:
name = await services.process_name(name=request.name)
return service1_pb2.ProcessNameResponse(name=name)
def _init(app: web.Application, listen_addr: str) -> grpc.aio.Server:
server = grpc.aio.server(interceptors=(RequestIdInterceptor(),))
server.add_insecure_port(listen_addr)
service1_pb2_grpc.add_Service1Servicer_to_server(Service1(app), server)
return server
async def _start_grpc_server(server: grpc.aio.Server) -> None:
await server.start()
await server.wait_for_termination()
async def grpc_server_ctx(app: web.Application) -> AsyncGenerator:
listen_addr = "[::]:50051"
server = _init(app, listen_addr)
task = asyncio.create_task(_start_grpc_server(server))
logger.info(f"action=init_grpc_server, address={listen_addr}")
yield
await server.stop(grace=None)
task.cancel()
await task
gRPC client
I also used cleanup context to setup channel and stub, to be able reuse them, as recommended in performance best practices.
There is also a RequestIdInterceptor to add request_id
to the call metadata.
async def grpc_channels_ctx(app: aiohttp.web.Application) -> AsyncGenerator:
grpc_channel_service1 = grpc.aio.insecure_channel(
app["config"].SERVICE1_CHANNEL, interceptors=(RequestIdInterceptor(),)
)
app["service1_stub"] = service1_pb2_grpc.Service1Stub(grpc_channel_service1)
yield
await grpc_channel_service1.close()
At the end the call looks very simple, but I didn’t work on errors part. It’s always important to have a timeout in the calls to the other services, I put it in the call, but maybe it’s better to create an interceptor.
name_response = await app["service1_stub"].ProcessName(service1_pb2.ProcessNameRequest(name=name), timeout=10)
The questions
There left some questions I still did not work on, I’ll put an update later.
- How to organize pool of channels https://github.com/grpc/grpc/issues/21386
- How to work with errors, reconnect to the server, retry requests
- How to transparently encode/decode
bson
types likeObjectId
- How to setup TLS between microservices
Performance
I made some quick tests with ab -c 100 -n 1000
to compare the difference between gRPC and REST calls for the case like in this example.
I got 509.53
requests/sec for gRPC and 280.60
for REST, so it’s faster and it uses less connections, I can increase the concurrency parameter higher with the same ulimit.
To run the example
Just clone the repository and run docker compose up
, then make a call with any http client, I like to use httpie.
> http http://localhost:8081/api/v1/say-hello\?name\=test
HTTP/1.1 200 OK
Content-Length: 14
Content-Type: application/json; charset=utf-8
Date: Sun, 23 Jan 2022 10:26:42 GMT
Server: Python/3.9 aiohttp/3.8.1
X-Request-ID: c4550306-a96e-4e98-9134-ca76adeb51a3
"Hello, TEST!"
And the logs look like
aiohttp-grpcio-example-service1-1 | {"levelname": "INFO", "name": "app.services", "lineno": 9, "message": "action=process_name, status=success, name=TEST", "request_id": "c4550306-a96e-4e98-9134-ca76adeb51a3"}
aiohttp-grpcio-example-service2-1 | {"levelname": "INFO", "name": "app.services", "lineno": 12, "message": "action=say_hello, status=success, hello=Hello, TEST!", "request_id": "c4550306-a96e-4e98-9134-ca76adeb51a3"}
aiohttp-grpcio-example-service2-1 | {"levelname": "INFO", "name": "aiohttp.access", "lineno": 206, "message": "request: 172.18.0.1 GET /api/v1/say-hello?name=test HTTP/1.1 200 223 0.005508 200 223", "request_id": "c4550306-a96e-4e98-9134-ca76adeb51a3", "remote_address": "172.18.0.1", "first_request_line": "GET /api/v1/say-hello?name=test HTTP/1.1", "response_status": 200, "response_size": 223, "request_time_frac": "0.005508"}