Microsoft Graph Api UnknownError in batch request

It’s possible to optimize the usage of Microsoft Graph Api by batching requests. There is a limitation for 20 requests in one batch request.

Here is a simplified examle:

api_requests = {
    1: {
        'id': 1, 
        'url': '/myorganization/users/<user_id1>/getMemberGroups',
        'method': 'GET',
    },
    2: {
        'id': 2,
        'url': '/myorganization/users/<user_id2>/getMemberGroups',
        'method': 'GET'
    }
}

def batch_requests(api_requests, access_token):
    request_json = [{
        "id": request['id'],
        "url": request['url'],
        "method": request.get('method', 'GET'),
        "body": request.get('body'),
        "headers": request.get('headers', {"Content-Type": "application/json"})
    } for request in api_requests.values()]
    request_json = {'requests': request_json}

    response = requests.request(
        'POST', 
        url='https://graph.microsoft.com/beta/$batch', 
        json=request_json,
        headers={'Authorization': 'Bearer ' + access_token}).json()
    if 'error' in response:
        # process error
        pass

    results = {}
    for response in response['responses']:
        results[response['id']] = (response['status'], response['body'])

    return results

There is a list of possible errors and recommendations how to work with them. But there is no information about UnknownError, it arrives with status code 503 or 504 and it does not appear often. In my practice it’s enough just to repeat the failed requests.

def batch_requests(api_requests, attempt=0):
    request_json = [{
        "id": request['id'],
        "url": request['url'],
        "method": request.get('method', 'GET'),
        "body": request.get('body'),
        "headers": request.get('headers', {"Content-Type": "application/json"})
    } for request in api_requests.values()]
    request_json = {'requests': request_json}

    response = requests.request(
        'POST', 
        url='https://graph.microsoft.com/beta/$batch', 
        json=request_json,
        headers={'Authorization': 'Bearer ' + access_token}).json()
    if 'error' in response:
        # process error
        pass

    failed_requests = {}
    results = {}
    for response in response['responses']:
        if 'error' in response['body']:
            if response['body']['error']['code'] == 'UnknownError':
                failed_requests[response['id']] = api_requests[response['id']]
        else:
            results[response['id']] = (response['status'], response['body'])

    if failed_requests and attempt == 0:
        results.update(batch_requests(failed_requests, attempt + 1))

    return results
comments powered by Disqus