Python's asyncio library has matured into a powerful framework for writing concurrent code using
the async/await syntax. Unlike threading, asyncio uses a single-threaded event loop
that switches between tasks at await points, making it ideal for I/O-bound applications such as web servers,
API clients, database drivers, and microservices. This deep dive explores the core concepts: event loops,
coroutines, tasks, futures, synchronization primitives, error handling, and real-world patterns.
By the end, you'll understand how to design efficient asynchronous systems and avoid common pitfalls.
The event loop is the central execution engine. It manages a queue of tasks, runs them until they await,
then switches to another ready task. You can get the current loop, run tasks until complete, and even
create custom loops (though rarely needed). In modern Python (3.10+), asyncio.run() handles
loop creation and cleanup.
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+ recommended way
asyncio.run(say_hello())
# Manual loop management (older style)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(say_hello())
loop.close()
asyncio.run() for top‑level entry points. It creates a new loop, sets it as the current loop, runs the coroutine, and cleans up resources.
A coroutine is defined with async def. It can suspend its execution using await.
When you call a coroutine, it returns a coroutine object; it does not start running until awaited or scheduled.
Coroutines are lightweight and can be composed.
async def fetch_data(url):
print(f"Fetching {url}")
await asyncio.sleep(0.5) # simulate network
return {"data": "sample"}
async def process():
data = await fetch_data("https://api.example.com")
print(f"Processed: {data}")
asyncio.run(process())
To run multiple coroutines concurrently, wrap them in asyncio.Task objects. Use
asyncio.create_task() to schedule a coroutine on the event loop. Tasks execute in the background
and can be awaited later. This is the primary way to achieve concurrency.
async def worker(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished after {delay}s")
return f"Result from {name}"
async def main():
task1 = asyncio.create_task(worker("A", 2))
task2 = asyncio.create_task(worker("B", 1))
# Wait for both
results = await asyncio.gather(task1, task2)
print(results)
asyncio.run(main())
asyncio.gather() returns results when all tasks complete.
asyncio.wait() gives more control (first completed, timeouts).
A Future represents a result that will be available in the future. It's the low-level building block
that Task inherits from. You rarely create futures directly, but understanding them helps when
integrating with callback‑based libraries.
async def set_future(fut):
await asyncio.sleep(1)
fut.set_result("Done!")
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
asyncio.create_task(set_future(fut))
result = await fut
print(result)
asyncio.run(main())
If you have a CPU‑intensive or synchronous I/O function (e.g., requests.get()), it will block the event loop.
Use asyncio.to_thread() (Python 3.9+) to run it in a separate thread without blocking.
import time
import asyncio
def blocking_task(duration):
time.sleep(duration) # blocks thread
return f"Slept {duration}s"
async def main():
result = await asyncio.to_thread(blocking_task, 2)
print(result)
asyncio.run(main())
concurrent.futures.ProcessPoolExecutor with loop.run_in_executor().
Even in a single‑threaded event loop, race conditions can occur when tasks share mutable state and interleave.
Asyncio provides thread‑safe synchronization primitives: asyncio.Lock, asyncio.Event,
asyncio.Semaphore, and asyncio.Queue. Use them to coordinate tasks.
async def worker(lock, shared_list, value):
async with lock:
# critical section
shared_list.append(value)
await asyncio.sleep(0.1) # simulate work
print(f"Appended {value}")
async def main():
lock = asyncio.Lock()
shared = []
tasks = [asyncio.create_task(worker(lock, shared, i)) for i in range(5)]
await asyncio.gather(*tasks)
print(shared)
asyncio.run(main())
async def fetch_url(sem, url):
async with sem:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
sem = asyncio.Semaphore(3) # max 3 concurrent fetches
urls = [f"http://example.com/{i}" for i in range(10)]
tasks = [asyncio.create_task(fetch_url(sem, url)) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
asyncio.Queue is a thread‑safe async queue. Use it to implement producer‑consumer patterns,
where one task produces items and multiple consumer tasks process them concurrently.
async def producer(queue, n):
for i in range(n):
await queue.put(i)
print(f"Produced {i}")
await asyncio.sleep(0.2)
await queue.put(None) # sentinel
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed {item}")
await asyncio.sleep(0.5)
queue.task_done()
async def main():
q = asyncio.Queue(maxsize=10)
prod = asyncio.create_task(producer(q, 10))
cons1 = asyncio.create_task(consumer(q, "A"))
cons2 = asyncio.create_task(consumer(q, "B"))
await prod
await q.join() # wait for all items processed
await asyncio.gather(cons1, cons2)
asyncio.run(main())
Timeouts prevent hanging operations. Use asyncio.wait_for() or asyncio.timeout() (Python 3.11+).
You can also explicitly cancel tasks with task.cancel() and handle asyncio.CancelledError.
async def slow_operation():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2)
except asyncio.TimeoutError:
print("Operation timed out")
# Python 3.11+ style
try:
async with asyncio.timeout(1):
await asyncio.sleep(2)
except TimeoutError:
print("Again timed out")
asyncio.run(main())
asyncio.CancelledError inside the coroutine.
Use try/finally to clean up resources.
Unhandled exceptions in tasks are propagated when you await them. If you never await a task, the exception
may be lost. Use asyncio.gather(return_exceptions=True) to collect exceptions without raising immediately.
async def might_fail(n):
await asyncio.sleep(0.2)
if n % 2 == 0:
raise ValueError(f"Bad number {n}")
return n
async def main():
results = await asyncio.gather(
might_fail(1), might_fail(2), might_fail(3),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"Caught: {r}")
else:
print(f"Success: {r}")
asyncio.run(main())
Use async with for resources that need async setup/teardown, like network connections or file handles.
Define your own with __aenter__ and __aexit__.
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource")
await asyncio.sleep(0.1)
async def main():
async with AsyncResource() as res:
print("Using resource")
asyncio.run(main())
Async iterators allow you to async for over data that becomes available asynchronously.
Define them with __aiter__ and __anext__, or use async generators (async def with yield).
async def counter(start, stop):
for i in range(start, stop):
await asyncio.sleep(0.1)
yield i
async def main():
async for value in counter(0, 5):
print(value)
asyncio.run(main())
Asyncio can be tricky to debug. Enable debug mode with asyncio.run(debug=True) or set environment variable
PYTHONASYNCIODEBUG=1. It enables slow callback detection, resource warnings, and more verbose errors.
Use asyncio.current_task() and asyncio.all_tasks() to inspect running tasks.
import asyncio
async def main():
print(f"Current task: {asyncio.current_task()}")
tasks = asyncio.all_tasks()
print(f"All tasks: {tasks}")
asyncio.run(main(), debug=True)
For mixed workloads, use loop.run_in_executor(). The default executor uses threads;
you can also pass concurrent.futures.ProcessPoolExecutor() for CPU-bound tasks.
import concurrent.futures
def cpu_bound(n):
return sum(i * i for i in range(n))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound, 10_000_000)
print(result)
asyncio.run(main())
Combine semaphores (rate limiting), queues, and error handling to build a robust scraper.
import aiohttp
import asyncio
async def fetch(session, url, sem):
async with sem:
try:
async with session.get(url) as resp:
return await resp.text()
except Exception as e:
return f"Error: {e}"
async def scrape(urls, max_concurrent=5):
sem = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
urls = ["https://example.com" for _ in range(20)]
results = asyncio.run(scrape(urls))
print(f"Fetched {len([r for r in results if not isinstance(r, Exception)])} pages")
asyncio.gather() for many small I/O tasks.time.sleep) – use asyncio.sleep.uvloop for ultra‑high performance (replaces default event loop).uvloop (pip install uvloop) and run
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) for 2x‑4x speedup in high‑concurrency scenarios.
Python's asyncio is a paradigm shift from traditional threading. By understanding the event loop, coroutines, tasks, and synchronization primitives, you can write highly concurrent I/O‑bound applications with elegance and efficiency. Start small, use the built‑in high‑level APIs, and gradually adopt patterns like queues and semaphores. Debug with the built‑in tools, and always measure performance. Asyncio is not just a library; it's a mindset – embrace non‑blocking, cooperative multitasking, and your applications will scale gracefully.
The future of Python concurrency is asynchronous. Whether you're building web servers (FastAPI, aiohttp),
database drivers, or real‑time data pipelines, asyncio will be at the core. Dive deeper by exploring
libraries like anyio, trio, and httpx.