Tuesday, December 26, 2023

Parellizing API calls with python asyncio

Some thoughts on asyncio and a code example where I refactored the code to use it.


The code example refactors the get_results function to use asyncio.  It uses both _get_info_lookup and _retrieve_aggregation_results functions; their implementation details are not shown.  _retrieve_aggregation_results ultimately makes the HTTP API call to get data from an external source.

Old Code Snippet:
def get_results(metadata, granularity):
    [some code]
    combined_result = {}
    info_lookup_list = _get_info_lookup(granularity)
    for retrieval_info in info_lookup_list:
        result_info = _retrieve_aggregation_results(retrieval_info, metadata)
        combined_result[retrieval_info['result_key']] = result_info['result']
    [some code]
Refactored Code Snippet:
async def _retrieve_aggregation_results_async_coroutine(retrieval_info, metadata):
    """
    Native coroutine, modern syntax. Retrieve aggregation results.
    Return a tuple=(
        <retrieval_info['result_key']>, (String) e.g. daily, hourly
        <dictionary of lists> (Dictionary) e.g. { result_list_1, result_list_2, result_list_3 }
    )
    """
    result_info = _retrieve_aggregation_results(retrieval_info, metadata)
    return (retrieval_info['result_key'], result_info)

async def _create_aggregation_results_async_scheduler(info_lookup_list, metadata):
    """
    Create an async scheduler for retrieving aggregation results.
    Return a list of tuples for each task created:
    tuple=(
        <retrieval_info['result_key']>, (String) e.g. daily, hourly
        <dictionary of lists> (Dictionary) e.g. { result_list_1, result_list_2, result_list_3 }
    )
    """
    request_list = []

    for retrieval_info in info_lookup_list:
        # in Python 3.6 or lower, use asyncio.ensure_future() instead of create_task
        task = asyncio.ensure_future(_retrieve_aggregation_results_async_coroutine(retrieval_info, metadata))
        request_list.append(task)

    # gather all results by unpacking the request_list and passing them each as a parameter to asyncio.gather()
    result_list = await asyncio.gather(*request_list)
    return result_list

def get_results(metadata, granularity):
    [some code]
    
    info_lookup_list = _get_info_lookup(granularity)
    
    # in Python 3.6 or lower, use ayncio.get_event_loop and run_until_complete instead of asyncio.run
    try:
        event_loop = asyncio.get_event_loop()
    except RuntimeError as e:
        if str(e).startswith('There is no current event loop in thread'):
            event_loop = asyncio.new_event_loop()
            asyncio.set_event_loop(event_loop)

    result_list = event_loop.run_until_complete(_create_aggregation_results_async_scheduler(info_lookup_list, metadata))

    for tuple_result in result_list:
        # tuple_result[0] is a String that holds the result_key (e.g. daily, hourly)
        # tuple_result[1] is a Dictionary of lists (e.g. { result_list_1, result_list_2, result_list_3 })
        result_key = tuple_result[0]
        performance_result = tuple_result[1]

        combined_result[result_key] = performance_result
        
    [some code]

No comments:

Post a Comment

I appreciate your time in leaving a comment!