RabbitMQ with AioHttp example
In this example I have two services:
- auth which sends an event that user is authenticated
- users which listen for this event and update user’s last online time.
These services use aio_pika library for async RabbitMQ connection and topic exchange for the message routing.
To setup and cleanup RabbitMQ in the app initialization process.
app.on_startup.append(events.setup_rabbitmq)
app.on_cleanup.append(events.close_rabbitmq)
Auth service events.py
import asyncio
import aio_pika
import aiormq
import logging
from datetime import datetime
from typing import Dict
from aiohttp import web
from bson import ObjectId, json_util
logger = logging.getLogger(__name__)
async def setup_rabbitmq(app: web.Application) -> None:
"""
Open connection to RabbitMQ and declare auth exchange,
try to reconnect every 10 seconds if there is a problem.
"""
config = app["config"]
loop = asyncio.get_event_loop()
try:
connection: aio_pika.Connection = await aio_pika.connect_robust(config.RABBITMQ_URI, loop=loop)
except (ConnectionError, aiormq.exceptions.IncompatibleProtocolError) as e:
logger.error(f"action=setup_rabbitmq, status=fail, retry=10s, {e}")
await asyncio.sleep(10)
await setup_rabbitmq(app)
return None
channel = await connection.channel()
auth_exchange = await channel.declare_exchange(name="auth", type=aio_pika.ExchangeType.TOPIC, durable=True)
app["rabbitmq"] = connection
app["rabbitmq_auth_exchange"] = auth_exchange
logger.info(f"action=setup_rabbitmq, status=success")
async def close_rabbitmq(app: web.Application) -> None:
if app.get("rabbitmq"):
await app["rabbitmq"].close()
logger.info("action=close_rabbitmq, status=success")
async def send_event(auth_exchange: aio_pika.Exchange, routing_key: str, message: Dict) -> None:
""" Publish a message serialized to json to auth exchange. """
await auth_exchange.publish(
aio_pika.Message(
body=json_util.dumps(message, json_options=json_util.RELAXED_JSON_OPTIONS).encode(),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
),
routing_key=routing_key,
)
async def send_authenticated_event(app: web.Application, user_id: ObjectId) -> None:
await send_event(
app["rabbitmq_auth_exchange"], "auth.event.authenticated", {"user_id": user_id, "ts": datetime.utcnow()}
)
Then in the login view, I can send the event:
@routes.post("/login")
async def login(request: web.Request) -> web.Response:
...
user_id = await services.login(email, password)
await event.send_authenticated_event(request.app, user_id)
...
Users service events.py
import asyncio
import aio_pika
import aiormq
import logging
from typing import Dict
from aiohttp import web
from bson import json_util
from . import services
logger = logging.getLogger(__name__)
async def setup_rabbitmq(app: web.Application) -> None:
"""
Open connection to RabbitMQ and create listener task,
try to reconnect every 10 seconds if there is a problem.
"""
config = app["config"]
loop = asyncio.get_event_loop()
try:
connection: aio_pika.Connection = await aio_pika.connect_robust(config.RABBITMQ_URI, loop=loop)
except (ConnectionError, aiormq.exceptions.IncompatibleProtocolError) as e:
logger.error(f"action=setup_rabbitmq, status=fail, retry=10s, {e}")
await asyncio.sleep(10)
await setup_rabbitmq(app)
return None
app["rabbitmq"] = connection
app["rabbitmq_listener"] = asyncio.create_task(listen_events(app))
logger.info(f"action=setup_rabbitmq, status=success")
async def close_rabbitmq(app: web.Application) -> None:
if app.get("rabbitmq_listener"):
app["rabbitmq_listener"].cancel()
await app["rabbitmq_listener"]
if app.get("rabbitmq"):
await app["rabbitmq"].close()
logger.info("action=close_rabbitmq, status=success")
async def on_message(message: aio_pika.IncomingMessage) -> None:
""" Choose a handler for message processing by routing key. """
handlers = {"auth.event.authenticated": services.update_user_online_time}
async with message.process():
handler = handlers[message.routing_key]
body = json_util.loads(message.body, json_options=json_util.RELAXED_JSON_OPTIONS)
await handler(**body)
async def listen_events(app: web.Application) -> None:
""" Declare queue, message binding and start the listener. """
connection = app["rabbitmq"]
try:
channel = await connection.channel()
await channel.set_qos(prefetch_count=100)
auth_exchange = await channel.declare_exchange(name="auth", type=aio_pika.ExchangeType.TOPIC, durable=True)
queue = await channel.declare_queue(name=f"user.set-online-state", durable=True)
await queue.bind(auth_exchange, routing_key="auth.event.authenticated")
async with queue.iterator() as queue_iter:
async for message in queue_iter:
await on_message(message)
except asyncio.CancelledError:
pass