Async Python: Because Waiting Sucks
I was working on a project that involved a large number of LLM API calls. During the PoC phase, the implementation was sequential and simple. Roughly speaking, 1k LLM calls took about an hour, around 4 seconds/request.
At that point, no one really complained. It was a PoC, the focus was correctness.
Things changed once we started thinking about production. An hour for 1k calls wasn’t just slow, it limited how the system could be used. Reducing the time per request became one of the problems we had to solve.
There were a few directions to explore: parallelizing requests, batching calls, etc. etc.. During one of these discussions, an SME suggested trying async.
I hadn’t worked with async before. I’d used multiprocessing in the past. It took some time to understand how async actually behaves, and where it fits. Once that clicked, the changes themselves were surprisingly small, and the impact was hard to ignore. I got our pipeline down to about 10 minutes for those same 1k calls.
This post covers the why and how: why async over other options, the core syntax, how to actually run tasks concurrently, and then some gotchas and advanced patterns I picked up along the way.
Why Async and Not Multiprocessing or Threading
Before writing any async code, I profiled what we had. 90% was wall time, the CPU just sitting there, waiting for API responses, doing absolutely nothing.
That's when it clicked.
My code wasn't slow because it was doing too much work. It was slow because it was waiting too much. And it was waiting one thing at a time, like a person who won't start boiling water until they've finished chopping vegetables.
The rule of thumb I landed on:
- If your code is mostly waiting → use async
- If your code is mostly computing → use multiprocessing
Async isn't true parallelism. It's concurrency. While waiting for one API response, Python fires off another request. When the first one comes back, it picks up where it left off. One worker, many tasks, smart switching.
Parallelism: Multiple workers doing tasks simultaneously (multiprocessing)
Concurrency: One worker switching between tasks during wait times (async)
Threading is also an option for I/O-bound work, but async is more mature for this use case and better supported by libraries. So that's what I went with.
The Core Syntax: async, await, and asyncio.run()
The syntax is minimal. You're basically telling Python two things:
- This function involves waiting → mark it with
async def - This line is where the waiting happens → put
awaitin front of it
import asyncio
async def do_something():
print("Starting task")
await asyncio.sleep(1) # "wait here, go do other stuff if you have it"
print("Task done")
return "result"
# To call this from regular (non-async) code:
asyncio.run(do_something())
Output:
Starting task
Task done
That's the skeleton. async def marks the function, await marks the wait point, asyncio.run() kicks it off from synchronous code.
Nothing fancy yet. The real benefit comes when you have multiple tasks.
Running Multiple Tasks Concurrently
A single async function doesn't give you any speedup. The magic happens when you have multiple tasks that can wait at the same time.
You create a list of tasks and use asyncio.gather() to run them concurrently:
import asyncio
import time
async def task(name, duration):
print(f"Task {name}: starting")
await asyncio.sleep(duration)
print(f"Task {name}: done")
return f"Result from {name}"
async def main():
start = time.perf_counter()
tasks = [
task("A", 2),
task("B", 2),
task("C", 2)
]
results = await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"\nAll done in {elapsed:.2f} seconds")
print(f"Results: {results}")
asyncio.run(main())
Output:
Task A: starting
Task B: starting
Task C: starting
Task A: done
Task B: done
Task C: done
All done in 2.00 seconds
Results: ['Result from A', 'Result from B', 'Result from C']
Three 2-second tasks. Finished in 2 seconds total. They started together, waited together, finished together.
Now compare that to awaiting each task one by one:
async def main_sequential():
start = time.perf_counter()
result_a = await task("A", 2)
result_b = await task("B", 2)
result_c = await task("C", 2)
elapsed = time.perf_counter() - start
print(f"\nAll done in {elapsed:.2f} seconds")
asyncio.run(main_sequential())
Output:
Task A: starting
Task A: done
Task B: starting
Task B: done
Task C: starting
Task C: done
All done in 6.00 seconds
6 seconds. No concurrency. When you await one thing at a time, Python has nothing to switch to while waiting.
The task list + gather() is what makes concurrency actually happen.
Not Everything Works with Await
This part confused me at first.
You can't just slap await in front of any function and expect it to become concurrent. The function has to be async-compatible, it needs to know how to yield control back to the event loop while it waits.
Regular blocking functions don't do this. They just... block.
import asyncio
import time
async def blocking_task(name, duration):
print(f"Task {name}: starting")
time.sleep(duration) # regular sleep - blocks everything
print(f"Task {name}: done")
async def main():
start = time.perf_counter()
tasks = [
blocking_task("A", 2),
blocking_task("B", 2),
blocking_task("C", 2)
]
await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Done in {elapsed:.2f} seconds")
asyncio.run(main())
Output:
Task A: starting
Task A: done
Task B: starting
Task B: done
Task C: starting
Task C: done
Done in 6.00 seconds
6 seconds again. time.sleep() doesn't know how to tell async "go do something else while I wait." It just holds onto the thread.
The fix: use async-compatible alternatives.
| Blocking | Async Alternative |
|---|---|
time.sleep() | asyncio.sleep() |
requests | aiohttp, httpx |
| OpenAI SDK | openai.AsyncOpenAI |
For most common I/O operations, there's an async version available. For LLM calls specifically, OpenAI's SDK has AsyncOpenAI which works great.
What if there's no async alternative?
Sometimes you're stuck with a blocking function that doesn't have an async version. In that case, you can wrap it with asyncio.to_thread():
import asyncio
import time
async def wrapped_task(name, duration):
print(f"Task {name}: starting")
await asyncio.to_thread(time.sleep, duration) # runs in separate thread
print(f"Task {name}: done")
async def main():
start = time.perf_counter()
tasks = [
wrapped_task("A", 2),
wrapped_task("B", 2),
wrapped_task("C", 2)
]
await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"Done in {elapsed:.2f} seconds")
asyncio.run(main())
Output:
Task A: starting
Task B: starting
Task C: starting
Task A: done
Task B: done
Task C: done
Done in 2.00 seconds
Back to 2 seconds. to_thread() offloads the blocking call to a separate thread, freeing up the event loop. It's a good escape hatch when you can't find an async-native library.
That covers the main stuff, why async, the syntax, running tasks concurrently, and making sure your functions are actually async-compatible.
What follows are some gotchas I ran into and a more advanced pattern that's useful when you're dealing with rate limits.
Notebooks and Scripts Behave Differently
This one cost me some confusion.
In a .py script:
asyncio.run(main())
In a Jupyter notebook:
await main()
The reason: Jupyter already has an event loop running in the background. Scripts don't. If you try asyncio.run() inside a notebook, you'll get an error saying the event loop is already running.
Not a big deal once you know, but it's easy to get stuck on this if you're switching between environments.
Forgetting Await Fails Silently
If you call an async function without await, Python doesn't throw an error. It just returns a coroutine object and moves on. Your function never actually runs.
async def fetch_data():
await asyncio.sleep(1)
return "data"
# Wrong - returns coroutine object, function never executes
result = fetch_data()
print(result) # <coroutine object fetch_data at 0x...>
# Right
result = await fetch_data()
print(result) # "data"
I've debugged this more times than I'd like to admit. If your async code seems to be doing nothing, check for missing awaits.
Controlling Concurrency with Semaphores
Once you've got async working, you might run into a different problem: too much concurrency.
Say you have 1000 API calls to make, but the API only allows 10 requests per second. If you fire all 1000 at once, you'll hit rate limits immediately.
Semaphores let you cap how many tasks can run at the same time:
import asyncio
import time
async def rate_limited_task(name, semaphore):
async with semaphore: # waits here if too many tasks are already running
print(f"Task {name}: starting")
await asyncio.sleep(1)
print(f"Task {name}: done")
async def main():
semaphore = asyncio.Semaphore(3) # max 3 concurrent
start = time.perf_counter()
tasks = [rate_limited_task(i, semaphore) for i in range(9)]
await asyncio.gather(*tasks)
elapsed = time.perf_counter() - start
print(f"\nDone in {elapsed:.2f} seconds")
asyncio.run(main())
Output:
Task 0: starting
Task 1: starting
Task 2: starting
Task 0: done
Task 1: done
Task 2: done
Task 3: starting
Task 4: starting
Task 5: starting
Task 3: done
Task 4: done
Task 5: done
Task 6: starting
Task 7: starting
Task 8: starting
Task 6: done
Task 7: done
Task 8: done
Done in 3.00 seconds
9 tasks, 3 at a time, 1 second each = 3 seconds total.
Without the semaphore, it would finish in ~1 second but you'd probably get rate limited or banned. With sequential code, it would take 9 seconds. Semaphores give you a middle ground.
You can also use multiple semaphores in the same codebase if you're calling different APIs with different rate limits.
Quick Reference
| Want to... | Do this |
|---|---|
| Mark a function as async | async def func(): |
| Wait for an async call | await func() |
| Run async from sync code (script) | asyncio.run(func()) |
| Run async in Jupyter | await func() |
| Run multiple tasks concurrently | await asyncio.gather(*tasks) |
| Limit concurrent tasks | asyncio.Semaphore(n) |
| Make blocking function async-compatible | await asyncio.to_thread(func, args) |
When to Use What
| Situation | Use |
|---|---|
| Mostly waiting (API calls, I/O) | asyncio |
| Mostly computing (data processing) | multiprocessing |
| Simple script, no performance issues | Regular Python |
That's what I learned getting async into production. The core is simple, async, await, gather(). The rest is just knowing the gotchas and when to reach for something like semaphores.
Once it clicks, it's hard to go back to writing sequential I/O code.
Thoughts? Questions?
If something here sparked a thought, or if you have feedback or questions, I'd love to hear from you.