Calling python async function from sync code

This is an example of how to transform sequential requests to parallel in some legacy code with asyncio loop when the refactoring is not an option.

To show the idea I’m using a simplified version of microsoft graph api requests to get user manager by user ids.

The code before


def get_manager(user_id: str) -> Optional[Dict]:
    headers = get_auth_headers()
    url = f"https://graph.microsoft.com/beta/users/{user_id}/manager"
    response = requests.request("GET", url=url, headers=headers, timeout=5)
    if response.status_code == 200:
        return response.json()
    return None


def get_managers(user_ids: List[str]) -> Dict[str, Dict]:
    user_managers: Dict[str, Dict] = {}
    for user_id in user_ids:
        manager = get_manager(user_id)
        if manager:
            user_managers[user_id] = manager
    return user_managers

The code after

The important part is a RateLimiter to avoid too many 429 responses.


class RateLimiter:
    """Rate limits for aiohttp session requests."""

    RATE = 100  # number of requests per second

    def __init__(self, session):
        self.session = session
        self.tokens = self.RATE
        self.updated_at = time.monotonic()

    async def request(self, *args, **kwargs):
        await self.wait_for_token()
        return self.session.request(*args, **kwargs)

    async def wait_for_token(self):
        while self.tokens < 1:
            self.add_new_tokens()
            await asyncio.sleep(0.1)
        self.tokens -= 1

    def add_new_tokens(self):
        now = time.monotonic()
        time_since_update = now - self.updated_at
        new_tokens = time_since_update * self.RATE
        if self.tokens + new_tokens >= 1:
            self.tokens = min(self.tokens + new_tokens, self.RATE)
            self.updated_at = now


async def get_manager(session: aiohttp.ClientSession, user_id: str) -> Optional[Tuple[str, Dict]]:
    headers = get_auth_headers()
    url = f"https://graph.microsoft.com/beta/users/{user_id}/manager"
    timeout = aiohttp.ClientTimeout(sock_connect=2, sock_read=5)
    async with await session.request("GET", url=url, headers=headers, timeout=timeout) as response:
        if response.status == 429:  # throttling
            await asyncio.sleep(120)  # wait two minutes
            return await async_microsoft_graph_request(session, user_id)

        if response.status == 200:
            return user_id, await response.json()

        return None


def get_managers(user_ids: List[str]) -> Dict[str, Dict]:
    async def batch_tasks():
        tasks = []
        conn = aiohttp.TCPConnector(ttl_dns_cache=300, family=socket.AF_INET)
        async with aiohttp.ClientSession(connector=conn) as session:
            rate_limiter_session = RateLimiter(session)
            for user_id in user_ids:
                tasks.append(get_manager(user_id))
            return await asyncio.gather(*tasks)

    user_managers: Dict[str, Dict] = {}
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    results = loop.run_until_complete(batch_tasks())
    for result in results:
        if result is not None:
            user_managers[result[0]] = result[1]
    # Wait 250 ms for the underlying SSL connections to close https://github.com/aio-libs/aiohttp/issues/1925
    loop.run_until_complete(asyncio.sleep(0.250))
    loop.close()
    return user_managers

Here is 4 times more code, it’s more complex, but it’s faster and does not change the other logic for the consumer of get_managers function.

comments powered by Disqus