Some thoughts on asyncio and a code example where I refactored the code to use it.
- Asyncio is one of the three ways to parallelize API calls with python. Asyncio was available starting in python 3.4.
- Why are all the examples online having to do with asyncio and aiohttp? You can use regular python 'requests' library with asyncio.
- I hadn't explicitly used the unpack * operator to unpack a list into separate expressions in python yet:
- https://realpython.com/async-io-python/#the-asyncawait-syntax-and-native-coroutines
- https://stackoverflow.com/questions/71235585/passing-in-a-list-of-functions-to-asyncio-gather
- THIS article used the explicit word unpack and not just use the * and not explain what it was - https://superfastpython.com/asyncio-gather/
- No event loop in current thread, create if doesn't exist:
The code example refactors the get_results function to use asyncio. It uses both _get_info_lookup and _retrieve_aggregation_results functions; their implementation details are not shown. _retrieve_aggregation_results ultimately makes the HTTP API call to get data from an external source.
Old Code Snippet:
def get_results(metadata, granularity):
[some code]
combined_result = {}
info_lookup_list = _get_info_lookup(granularity)
for retrieval_info in info_lookup_list:
result_info = _retrieve_aggregation_results(retrieval_info, metadata)
combined_result[retrieval_info['result_key']] = result_info['result']
[some code]
Refactored Code Snippet:
async def _retrieve_aggregation_results_async_coroutine(retrieval_info, metadata):
"""
Native coroutine, modern syntax. Retrieve aggregation results.
Return a tuple=(
<retrieval_info['result_key']>, (String) e.g. daily, hourly
<dictionary of lists> (Dictionary) e.g. { result_list_1, result_list_2, result_list_3 }
)
"""
result_info = _retrieve_aggregation_results(retrieval_info, metadata)
return (retrieval_info['result_key'], result_info)
async def _create_aggregation_results_async_scheduler(info_lookup_list, metadata):
"""
Create an async scheduler for retrieving aggregation results.
Return a list of tuples for each task created:
tuple=(
<retrieval_info['result_key']>, (String) e.g. daily, hourly
<dictionary of lists> (Dictionary) e.g. { result_list_1, result_list_2, result_list_3 }
)
"""
request_list = []
for retrieval_info in info_lookup_list:
# in Python 3.6 or lower, use asyncio.ensure_future() instead of create_task
task = asyncio.ensure_future(_retrieve_aggregation_results_async_coroutine(retrieval_info, metadata))
request_list.append(task)
# gather all results by unpacking the request_list and passing them each as a parameter to asyncio.gather()
result_list = await asyncio.gather(*request_list)
return result_list
def get_results(metadata, granularity):
[some code]
info_lookup_list = _get_info_lookup(granularity)
# in Python 3.6 or lower, use ayncio.get_event_loop and run_until_complete instead of asyncio.run
try:
event_loop = asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith('There is no current event loop in thread'):
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
result_list = event_loop.run_until_complete(_create_aggregation_results_async_scheduler(info_lookup_list, metadata))
for tuple_result in result_list:
# tuple_result[0] is a String that holds the result_key (e.g. daily, hourly)
# tuple_result[1] is a Dictionary of lists (e.g. { result_list_1, result_list_2, result_list_3 })
result_key = tuple_result[0]
performance_result = tuple_result[1]
combined_result[result_key] = performance_result
[some code]
No comments:
Post a Comment
I appreciate your time in leaving a comment!