How to send celery task from async code

The more simple way is to add an endpoint to the service with celery. Here is an example of a low level way based on Message Protocol.

import base64
import json
import logging
import os
import socket
import uuid
from typing import Dict, List, Optional
import aioredis
from aiohttp import web


logger = logging.getLogger(__name__)


async def setup_redis(app: web.Application) -> None:
    if not app.get("redis"):
        try:
            app["redis"] = await aioredis.create_redis_pool(app["config"].REDIS_URL)
        except Exception as e:
            # The application can continue to work if no connection to redis
            logger.exception(f"action=setup_redis, status=fail, {e}")
            app["redis"] = None

async def send_task(
    *,
    app: web.Application,
    queue: str,
    task_name: str,
    task_args: Optional[List] = None,
    task_kwargs: Optional[Dict] = None,
) -> None:
    """app is an instance of aiohttp application."""
    if not app["redis"]:
        await setup_redis(app)
    if not app["redis"]:
        return None
    if not task_args:
        task_args = []
    if not task_kwargs:
        task_kwargs = {}
    task_id = str(uuid.uuid4())
    delivery_tag = str(uuid.uuid4())
    body = base64.b64encode(json.dumps((task_args, task_kwargs, {})).encode("utf-8")).decode("utf-8")
    message = {
        "body": body,
        "content-encoding": "utf-8",
        "content-type": "application/json",
        "headers": {
            "lang": "py",
            "task": task_name,
            "id": task_id,
            "shadow": None,
            "eta": None,
            "expires": None,
            "group": None,
            "retries": 0,
            "timelimit": [None, None],
            "root_id": task_id,
            "parent_id": None,
            "argsrepr": repr(task_args),
            "kwargsrepr": repr(task_kwargs),
            "origin": f"{os.getpid()}@{socket.gethostname()}",
        },
        "properties": {
            "correlation_id": task_id,
            "delivery_mode": 2,  # persistent
            "delivery_info": {"exchange": "", "routing_key": queue},
            "priority": 0,
            "body_encoding": "base64",
            "delivery_tag": delivery_tag,
        },
    }
    try:
        await app["redis"].lpush(queue, json.dumps(message))
    except Exception as e:
        logger.exception(f"action=send_task, status=fail, {e}")
comments powered by Disqus