RabbitMQ with AioHttp example

RabbitMQ with AioHttp example

In this example I have two services:

  • auth which sends an event that user is authenticated
  • users which listen for this event and update user’s last online time.

These services use aio_pika library for async RabbitMQ connection and topic exchange for the message routing.

To setup and cleanup RabbitMQ in the app initialization process.

app.on_startup.append(events.setup_rabbitmq)
app.on_cleanup.append(events.close_rabbitmq)

Auth service events.py

import asyncio
import aio_pika
import aiormq
import logging
from datetime import datetime
from typing import Dict
from aiohttp import web
from bson import ObjectId, json_util


logger = logging.getLogger(__name__)


async def setup_rabbitmq(app: web.Application) -> None:
    """
    Open connection to RabbitMQ and declare auth exchange, 
    try to reconnect every 10 seconds if there is a problem.
    """
    config = app["config"]
    loop = asyncio.get_event_loop()
    try:
        connection: aio_pika.Connection = await aio_pika.connect_robust(config.RABBITMQ_URI, loop=loop)
    except (ConnectionError, aiormq.exceptions.IncompatibleProtocolError) as e:
        logger.error(f"action=setup_rabbitmq, status=fail, retry=10s, {e}")
        await asyncio.sleep(10)
        await setup_rabbitmq(app)
        return None

    channel = await connection.channel()
    auth_exchange = await channel.declare_exchange(name="auth", type=aio_pika.ExchangeType.TOPIC, durable=True)

    app["rabbitmq"] = connection
    app["rabbitmq_auth_exchange"] = auth_exchange
    logger.info(f"action=setup_rabbitmq, status=success")


async def close_rabbitmq(app: web.Application) -> None:
    if app.get("rabbitmq"):
        await app["rabbitmq"].close()
    logger.info("action=close_rabbitmq, status=success")


async def send_event(auth_exchange: aio_pika.Exchange, routing_key: str, message: Dict) -> None:
    """ Publish a message serialized to json to auth exchange. """
    await auth_exchange.publish(
        aio_pika.Message(
            body=json_util.dumps(message, json_options=json_util.RELAXED_JSON_OPTIONS).encode(),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
        ),
        routing_key=routing_key,
    )


async def send_authenticated_event(app: web.Application, user_id: ObjectId) -> None:
    await send_event(
        app["rabbitmq_auth_exchange"], "auth.event.authenticated", {"user_id": user_id, "ts": datetime.utcnow()}
    )

Then in the login view, I can send the event:

@routes.post("/login")
async def login(request: web.Request) -> web.Response:
    ...
    user_id = await services.login(email, password)
    await event.send_authenticated_event(request.app, user_id)
    ...

Users service events.py

import asyncio
import aio_pika
import aiormq
import logging
from typing import Dict
from aiohttp import web
from bson import json_util
from . import services


logger = logging.getLogger(__name__)


async def setup_rabbitmq(app: web.Application) -> None:
    """
    Open connection to RabbitMQ and create listener task, 
    try to reconnect every 10 seconds if there is a problem.
    """
    config = app["config"]
    loop = asyncio.get_event_loop()
    try:
        connection: aio_pika.Connection = await aio_pika.connect_robust(config.RABBITMQ_URI, loop=loop)
    except (ConnectionError, aiormq.exceptions.IncompatibleProtocolError) as e:
        logger.error(f"action=setup_rabbitmq, status=fail, retry=10s, {e}")
        await asyncio.sleep(10)
        await setup_rabbitmq(app)
        return None

    app["rabbitmq"] = connection
    app["rabbitmq_listener"] = asyncio.create_task(listen_events(app))

    logger.info(f"action=setup_rabbitmq, status=success")


async def close_rabbitmq(app: web.Application) -> None:
    if app.get("rabbitmq_listener"):
        app["rabbitmq_listener"].cancel()
        await app["rabbitmq_listener"]
    if app.get("rabbitmq"):
        await app["rabbitmq"].close()
    logger.info("action=close_rabbitmq, status=success")


async def on_message(message: aio_pika.IncomingMessage) -> None:
    """ Choose a handler for message processing by routing key. """
    handlers = {"auth.event.authenticated": services.update_user_online_time}
    async with message.process():
        handler = handlers[message.routing_key]
        body = json_util.loads(message.body, json_options=json_util.RELAXED_JSON_OPTIONS)
        await handler(**body)


async def listen_events(app: web.Application) -> None:
    """ Declare queue, message binding and start the listener. """
    connection = app["rabbitmq"]
    try:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=100)

        auth_exchange = await channel.declare_exchange(name="auth", type=aio_pika.ExchangeType.TOPIC, durable=True)

        queue = await channel.declare_queue(name=f"user.set-online-state", durable=True)
        await queue.bind(auth_exchange, routing_key="auth.event.authenticated")

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                await on_message(message)
    except asyncio.CancelledError:
        pass
comments powered by Disqus