Unlocking the power of asyncio Semaphore

When building asynchronous applications, oftentimes you need to limit the number of simultaneous connections to a shared resource. It can be your internal server, or an API that has usage limits.

asyncio library provides a dedicated synchronization primitive Semaphore created exactly for this purpose. However, let’s first try to solve this problem without using it, in order to fully appreciate the value of this mechanism.

We can limit the number of simultaneous connections by using a counter variable that will be incremented whenever we start making a request and decremented when we receive our response. Let’s look at the example code:

import asyncio

import aiohttp

URL = "https://google.com/"

connections = 0
connections_limit = 3


async def make_request(session: aiohttp.ClientSession):
    global connections
    while connections >= connections_limit:
        await asyncio.sleep(0.1)

    connections += 1

    async with session.get(URL) as response:
        await response.text()

    connections -= 1


async def main():
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*(make_request(session) for _ in range(10)))


asyncio.run(main())

You may have guessed that this code has a problem - it allows possible race conditions when multiple coroutines try to concurrently mutate our connections variable. This can lead to inaccurate connections counting. We can fix that by introducing a Lock:

import asyncio

import aiohttp

URL = "https://google.com/"

connections = 0
connections_limit = 3
connections_lock = asyncio.Lock()


async def make_request(session: aiohttp.ClientSession):
    global connections
    while connections >= connections_limit:
        await asyncio.sleep(0.1)

    async with connections_lock:
        connections += 1

    async with session.get(URL) as response:
        await response.text()

    async with connections_lock:
        connections -= 1


async def main():
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*(make_request(session) for _ in range(10)))


asyncio.run(main())

Now the connections_lock ensures that only a single coroutine can mutate the connections variable at any moment in time. Even though this example now works, the code is far from being beautiful. It can however be simplified significantly by using the above mentioned Semaphore primitive:

import asyncio

import aiohttp

URL = "https://google.com/"

connections = 0
sem = asyncio.Semaphore(3)


async def make_request(session: aiohttp.ClientSession):
    async with sem:
        async with session.get(URL) as response:
            await response.text()


async def main():
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*(make_request(session) for _ in range(10)))


asyncio.run(main())

This code does pretty much the same job as the example with the Lock. asyncio.Semaphore automatically blocks the execution of the coroutine if the number of currently acquired locks that were not released exceeds the specified capacity.