How to use asyncio gRPC in AioHTTP microservices

How to use asyncio gRPC in AioHTTP microservices

Usually I have a lot of microservices and they communicate using REST API. For communication with frontend I still need REST, but between microservices gRPC looks more promising.

Some benefits of it:

  • is built on HTTP2, using multiplexed streams - better connection management
  • performance benefits because of Protobuf binary format
  • code generation for a multilingual environment

Here is my first attempt to use gRPC in AioHTTP applications with some explanations.

Structure and code generation

You can find the code on github. I have there service1 and service2, you can see how they communicate on the schema. The Protobuf files I put in protos directory on the same level. I think it would be easier to manage them when there are many services. The structure of directories in protos should be the same as in app directory in services, to have right imports after the code generation.

The command for code generation looks like:

python -m grpc_tools.protoc --proto_path=../protos/service1 --python_out=. --grpc_python_out=. --mypy_out=. ../protos/service1/app/grpc/service1.proto

I put it in Makefile for each service and required libraries are grpcio-tools, mypy-protobuf.

I used pyton:bullseye-slim as a base image in Dockerfile for services, with alpine it tries to build gprc and it’s imposibly slow.

To bypass flake8 linter for the generated code I excluded it in setup.cfg

[flake8]
max-line-length = 120
exclude =
    app/grpc

gRPC server

To run the gRPC server I used aiohttp cleanup context. To track the calls between services I add request_id to the call metadata and to process it transparently there is RequestIdInterceptor. For the servicer I keep the reference to the aiohttp app, so if I’ll need an access to the app config or objects like db or cache, I’ll have it.

class RequestIdInterceptor(grpc.aio.ServerInterceptor):
    async def intercept_service(
        self,
        continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
        handler_call_details: grpc.HandlerCallDetails,
    ) -> grpc.RpcMethodHandler:
        for (header, value) in handler_call_details.invocation_metadata:
            if header == "request_id":
                contextvars.REQUEST_ID.set(value)
                break
        return await continuation(handler_call_details)


class Service1(service1_pb2_grpc.Service1Servicer):
    def __init__(self, app: web.Application) -> None:
        self.app = app

    async def ProcessName(
        self, request: service1_pb2.ProcessNameRequest, context: grpc.aio.ServicerContext
    ) -> service1_pb2.ProcessNameResponse:
        name = await services.process_name(name=request.name)
        return service1_pb2.ProcessNameResponse(name=name)


def _init(app: web.Application, listen_addr: str) -> grpc.aio.Server:
    server = grpc.aio.server(interceptors=(RequestIdInterceptor(),))
    server.add_insecure_port(listen_addr)
    service1_pb2_grpc.add_Service1Servicer_to_server(Service1(app), server)
    return server


async def _start_grpc_server(server: grpc.aio.Server) -> None:
    await server.start()
    await server.wait_for_termination()


async def grpc_server_ctx(app: web.Application) -> AsyncGenerator:
    listen_addr = "[::]:50051"

    server = _init(app, listen_addr)
    task = asyncio.create_task(_start_grpc_server(server))
    logger.info(f"action=init_grpc_server, address={listen_addr}")

    yield

    await server.stop(grace=None)
    task.cancel()
    await task

gRPC client

I also used cleanup context to setup channel and stub, to be able reuse them, as recommended in performance best practices. There is also a RequestIdInterceptor to add request_id to the call metadata.

async def grpc_channels_ctx(app: aiohttp.web.Application) -> AsyncGenerator:
    grpc_channel_service1 = grpc.aio.insecure_channel(
        app["config"].SERVICE1_CHANNEL, interceptors=(RequestIdInterceptor(),)
    )
    app["service1_stub"] = service1_pb2_grpc.Service1Stub(grpc_channel_service1)
    yield
    await grpc_channel_service1.close()

At the end the call looks very simple, but I didn’t work on errors part. It’s always important to have a timeout in the calls to the other services, I put it in the call, but maybe it’s better to create an interceptor.

name_response = await app["service1_stub"].ProcessName(service1_pb2.ProcessNameRequest(name=name), timeout=10)

The questions

There left some questions I still did not work on, I’ll put an update later.

  • How to organize pool of channels https://github.com/grpc/grpc/issues/21386
  • How to work with errors, reconnect to the server, retry requests
  • How to transparently encode/decode bson types like ObjectId
  • How to setup TLS between microservices

Performance

I made some quick tests with ab -c 100 -n 1000 to compare the difference between gRPC and REST calls for the case like in this example. I got 509.53 requests/sec for gRPC and 280.60 for REST, so it’s faster and it uses less connections, I can increase the concurrency parameter higher with the same ulimit.

To run the example

Just clone the repository and run docker compose up, then make a call with any http client, I like to use httpie.

> http http://localhost:8081/api/v1/say-hello\?name\=test
HTTP/1.1 200 OK
Content-Length: 14
Content-Type: application/json; charset=utf-8
Date: Sun, 23 Jan 2022 10:26:42 GMT
Server: Python/3.9 aiohttp/3.8.1
X-Request-ID: c4550306-a96e-4e98-9134-ca76adeb51a3

"Hello, TEST!"

And the logs look like

aiohttp-grpcio-example-service1-1  | {"levelname": "INFO", "name": "app.services", "lineno": 9, "message": "action=process_name, status=success, name=TEST", "request_id": "c4550306-a96e-4e98-9134-ca76adeb51a3"}
aiohttp-grpcio-example-service2-1  | {"levelname": "INFO", "name": "app.services", "lineno": 12, "message": "action=say_hello, status=success, hello=Hello, TEST!", "request_id": "c4550306-a96e-4e98-9134-ca76adeb51a3"}
aiohttp-grpcio-example-service2-1  | {"levelname": "INFO", "name": "aiohttp.access", "lineno": 206, "message": "request: 172.18.0.1 GET /api/v1/say-hello?name=test HTTP/1.1 200 223 0.005508 200 223", "request_id": "c4550306-a96e-4e98-9134-ca76adeb51a3", "remote_address": "172.18.0.1", "first_request_line": "GET /api/v1/say-hello?name=test HTTP/1.1", "response_status": 200, "response_size": 223, "request_time_frac": "0.005508"}
comments powered by Disqus