Adding a concurrency limit to Python's asyncio.as_completed
June 27, 2017 [Performance, Programming, Programming Languages, Python]Series: asyncio basics, large numbers in parallel, parallel HTTP requests, adding to stdlib
In the previous post I demonstrated how the limited_as_completed method allows us to run a very large number of tasks using concurrency, but limiting the number of concurrent tasks to a sensible limit to ensure we don't exhaust resources like memory or operating system file handles.
I think this could be a useful addition to the Python standard library, so I have been working on a modification to the current asyncio.as_completed method. My work so far is here: limited-as_completed.
I ran similar tests to the ones I ran for the last blog post with this code to validate that the modified standard library version achieves the same goals as before.
I used an identical copy of timed from the previous post and updated versions of the other files because I was using a much newer version of aiohttp along with the custom-built python I was running.
server looked like:
#!/usr/bin/env python3 from aiohttp import web import asyncio import random async def handle(request): await asyncio.sleep(random.randint(0, 3)) return web.Response(text="Hello, World!") app = web.Application() app.router.add_get('/{name}', handle) web.run_app(app)
client-async-sem needed me to add a custom TCPConnector to avoid a new limit on the number of concurrent connections that was added to aiohttp in version 2.0. I also need to move the ClientSession usage inside a coroutine to avoid a warning:
#!/usr/bin/env python3 from aiohttp import ClientSession, TCPConnector import asyncio import sys limit = 1000 async def fetch(url, session): async with session.get(url) as response: return await response.read() async def bound_fetch(sem, url, session): # Getter function with semaphore. async with sem: await fetch(url, session) async def run(r): with ClientSession(connector=TCPConnector(limit=limit)) as session: url = "http://localhost:8080/{}" tasks = [] # create instance of Semaphore sem = asyncio.Semaphore(limit) for i in range(r): # pass Semaphore and session to every GET request task = asyncio.ensure_future( bound_fetch(sem, url.format(i), session)) tasks.append(task) responses = asyncio.gather(*tasks) await responses loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.ensure_future(run(int(sys.argv[1]))))
My new code that uses my proposed extension to as_completed looked like:
#!/usr/bin/env python3 from aiohttp import ClientSession, TCPConnector import asyncio import sys async def fetch(url, session): async with session.get(url) as response: return await response.read() limit = 1000 async def print_when_done(): with ClientSession(connector=TCPConnector(limit=limit)) as session: tasks = (fetch(url.format(i), session) for i in range(r)) for res in asyncio.as_completed(tasks, limit=limit): await res r = int(sys.argv[1]) url = "http://localhost:8080/{}" loop = asyncio.get_event_loop() loop.run_until_complete(print_when_done()) loop.close()
and with these, we get similar behaviour to the previous post:
$ ./timed ./client-async-sem 10000 Memory usage: 73640KB Time: 19.18 seconds $ ./timed ./client-async-stdlib 10000 Memory usage: 49332KB Time: 18.97 seconds
So the implementation I plan to submit to the Python standard library appears to work well. In fact, I think it is better than the one I presented in the previous post, because it uses on_complete callbacks to notice when futures have completed, which reduces the busy-looping we were doing to check for and yield finished tasks.
The Python issue is bpo-30782 and the pull request is #2424.
Note: at first glance, it looks like the aiohttp.ClientSession's limit on the number of connections (introduced in version 1.0 and then updated in version 2.0) gives us what we want without any of this extra code, but in fact it only limits the number of connections, not the number of futures we are creating, so it has the same problem of unbounded memory use as the semaphore-based implementation.