Perfect 👌 — let’s make this the ultimate “Asyncio in Python” guide — explained that would teach you in an interactive class — from basics → practical → real industry usage → interview prep.
🧠 1. What is asyncio?
Think of Python code as a line of people at a single counter (like in a bank).
Each person (task) must wait for the previous one to finish — this is synchronous programming.
Now imagine there are multiple counters where people can wait while others are being served — everyone progresses faster.
That’s asynchronous programming, and asyncio is the framework that makes this possible in Python.
✅ Definition:
asynciois a Python library used to write concurrent (non-blocking) code using async/await syntax.
It helps run I/O-bound tasks efficiently — like:
- Downloading multiple files
- Calling multiple APIs
- Reading from databases or web sockets
- Handling multiple users in servers (FastAPI, aiohttp, etc.)
⚙️ 2. The Key Idea
Normally, Python waits for each function to finish before going to the next.
But with asyncio, we can say:
“When one task is waiting (for network, disk, or database), let another task run.”
This uses an event loop — a manager that keeps track of all tasks and switches between them efficiently.
🪄 3. Basic Example: Without and With asyncio
🐢 Normal (Synchronous) version
import time
def download_file(name):
print(f"Downloading {name}...")
time.sleep(2)
print(f"Finished {name}")
def main():
download_file("File1")
download_file("File2")
download_file("File3")
main()
⏱️ Takes 6 seconds total (2 + 2 + 2)
⚡ Asynchronous version
import asyncio
async def download_file(name):
print(f"Downloading {name}...")
await asyncio.sleep(2)
print(f"Finished {name}")
async def main():
tasks = [
asyncio.create_task(download_file("File1")),
asyncio.create_task(download_file("File2")),
asyncio.create_task(download_file("File3"))
]
await asyncio.gather(*tasks)
asyncio.run(main())
⏱️ Takes only ~2 seconds total!
Because all tasks run concurrently while waiting on await asyncio.sleep() (non-blocking wait).
🔍 4. Important Keywords
| Keyword | Meaning |
|---|---|
async def | Declares an asynchronous function (coroutine) |
await | Pauses function until awaited task finishes |
asyncio.run() | Starts the event loop |
asyncio.create_task() | Schedules coroutine to run concurrently |
asyncio.gather() | Runs multiple coroutines concurrently and waits for all |
💼 5. Real-World Use Cases
🔹 Web Scraping / API Calls
import aiohttp
import asyncio
async def fetch_data(url, session):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ["https://example.com", "https://python.org", "https://github.com"]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*(fetch_data(url, session) for url in urls))
print([len(r) for r in results])
asyncio.run(main())
Result: Fetches all URLs at once — much faster than sequential requests.get() calls.
🔹 Database Access (e.g., asyncpg for PostgreSQL)
import asyncio
import asyncpg
async def fetch_users():
conn = await asyncpg.connect(user='user', password='pass', database='db', host='localhost')
rows = await conn.fetch('SELECT * FROM users')
await conn.close()
return rows
asyncio.run(fetch_users())
Used in FastAPI and async microservices to handle many DB calls without blocking.
🔹 Chat / WebSocket Server (aiohttp)
Handles thousands of simultaneous users efficiently.
from aiohttp import web
import asyncio
async def handle(request):
await asyncio.sleep(1)
return web.Response(text="Hello async world!")
app = web.Application()
app.add_routes([web.get('/', handle)])
web.run_app(app)
🏭 6. How It’s Used in Industry
| Industry Use Case | Example Library | Description |
|---|---|---|
| Web APIs | FastAPI, aiohttp | Non-blocking API endpoints |
| Data Pipelines | Async ETL, Airbyte | Concurrent data fetching & transformation |
| Chat / Notification systems | WebSockets, aiohttp | Handle thousands of live users |
| Cloud and Microservices | Async microservices | Efficient inter-service calls |
| ML Serving | FastAPI async endpoints | Serve models faster by overlapping I/O |
🧩 7. Mixing Sync + Async
Sometimes you need to run a blocking function (like a CPU-heavy calculation) inside async code.
Use asyncio.to_thread() to offload it:
import asyncio
import time
def heavy_task():
time.sleep(3)
return "Done"
async def main():
result = await asyncio.to_thread(heavy_task)
print(result)
asyncio.run(main())
🚀 8. When NOT to Use asyncio
❌ Not good for CPU-bound tasks (e.g., ML training, image processing).
✅ Use multiprocessing or job queues instead.
✅ Asyncio shines for I/O-bound tasks — API calls, DB access, networking.
💬 9. Interactive Analogy (Easy Way to Remember)
| Scenario | Type |
|---|---|
| You cook pasta 🍝 and wait at the stove till it boils | Synchronous |
| You start boiling pasta, then chop vegetables 🥕 and prepare sauce while waiting | Asynchronous |
That’s exactly how asyncio helps your program multitask efficiently.
🎯 10. Top Interview Questions on asyncio
| 🔢 | Question | Quick Hint / Answer |
|---|---|---|
| 1 | What is asyncio in Python? | A library for asynchronous, non-blocking I/O. |
| 2 | Difference between threading and asyncio? | Threading uses OS threads; asyncio uses a single thread with cooperative multitasking. |
| 3 | What is an event loop? | The central scheduler that runs async tasks. |
| 4 | What is a coroutine? | A special async function that can pause/resume with await. |
| 5 | How do you run multiple async tasks concurrently? | asyncio.gather() or asyncio.create_task(). |
| 6 | Can you use await in a normal function? | No, must be inside an async def. |
| 7 | What happens if you block inside an async function? | The event loop freezes; use await asyncio.sleep() instead of time.sleep(). |
| 8 | How do you handle exceptions in async tasks? | Wrap in try/except, or use return_exceptions=True in asyncio.gather(). |
| 9 | Difference between concurrency and parallelism? | Concurrency = many tasks taking turns; Parallelism = many tasks literally running at once. |
| 10 | Give real-world use cases of asyncio. | Web servers, web scraping, async DB queries, chatbots. |
🧭 11. Recommended Next Steps to Master asyncio
- ✅ Practice writing async versions of your existing synchronous scripts.
- ⚙️ Use
aiohttpfor concurrent web requests. - 🧩 Try
FastAPIwith async endpoints. - 📊 Combine with async database drivers like
asyncpg. - 💬 Explore
asyncio.Queueandasyncio.Semaphorefor producer-consumer patterns. - 🧠 Build a mini async project — like a concurrent weather fetcher or chatbot API.
Awesome — let’s build the Async Weather API Fetcher end-to-end: clear steps, runnable code, exercises, and next steps so you can practice everything asyncio brings to the table.
I’ll give you:
- Project setup (requirements + env)
- Simple single-file async fetcher (aiohttp + asyncio)
- Concurrency control (Semaphore + rate limiting)
- Producer-consumer with
asyncio.Queue - Retry & error handling
- Caching and writing results to disk (and an example of saving to Postgres with
asyncpg) - FastAPI async endpoint that uses the fetcher
- Tests with
pytest-asyncio - Exercises, improvements, and the Recommended Next Steps you asked for
1. Quick setup
Create a project folder and virtualenv:
mkdir async-weather
cd async-weather
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
requirements.txt
aiohttp==3.8.5
asyncio # builtin, no install
python-dotenv==1.0.0
fastapi==0.95.2
uvicorn==0.22.0
asyncpg==0.29.0 # optional: async Postgres client
pytest==7.4.0
pytest-asyncio==0.22.0
Install:
pip install -r requirements.txt
Get an API key from a weather API (e.g., OpenWeatherMap). Put it in .env:
OPENWEATHER_API_KEY=your_key_here
(If you prefer another provider, adapt the URL.)
2. Simple async fetcher — the heart of the project
weather_fetcher.py — a simple async fetcher that queries OpenWeatherMap for multiple cities concurrently.
# weather_fetcher.py
import asyncio
import os
from typing import List, Dict, Any
from aiohttp import ClientSession, ClientTimeout
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
BASE_URL = "https://api.openweathermap.org/data/2.5/weather"
async def fetch_city_weather(session: ClientSession, city: str) -> Dict[str, Any]:
params = {"q": city, "appid": API_KEY, "units": "metric"}
async with session.get(BASE_URL, params=params) as resp:
resp.raise_for_status()
data = await resp.json()
return {"city": city, "data": data}
async def fetch_all(cities: List[str]) -> List[Dict[str, Any]]:
timeout = ClientTimeout(total=10)
async with ClientSession(timeout=timeout) as session:
tasks = [asyncio.create_task(fetch_city_weather(session, c)) for c in cities]
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
if __name__ == "__main__":
cities = ["London", "Paris", "New Delhi", "Bengaluru", "Tokyo"]
results = asyncio.run(fetch_all(cities))
for r in results:
print(r["city"], "->", r["data"]["weather"][0]["description"])
Notes:
asyncio.create_task()schedules concurrent coroutines.aiohttp.ClientSessionis used for efficient connection pooling.asyncio.run()starts the event loop.
Run:
python weather_fetcher.py
You should see descriptions for each city. If you don’t have a key, the API will return a 401 — later we add a mock fallback.
3. Concurrency control: asyncio.Semaphore for rate limits
APIs often restrict requests per second. Use a Semaphore to limit concurrent requests.
# fetch_with_semaphore.py (expand on previous file)
import asyncio
from aiohttp import ClientSession, ClientTimeout
SEM = asyncio.Semaphore(5) # allow 5 concurrent requests
async def fetch_with_limit(session, city):
async with SEM:
return await fetch_city_weather(session, city)
If the provider allows 10 requests/sec, tune Semaphore and use asyncio.sleep() to add spacing between batches.
4. Producer-consumer pattern with asyncio.Queue
Useful if you want a set of producers (e.g., reading city list from a file) and a pool of worker consumers that fetch.
# queue_worker.py
import asyncio
from aiohttp import ClientSession
async def worker(name: int, queue: asyncio.Queue, session: ClientSession, results: list):
while True:
city = await queue.get()
if city is None:
queue.task_done()
break
try:
r = await fetch_city_weather(session, city)
results.append(r)
except Exception as e:
print(f"Worker {name} failed for {city}: {e}")
finally:
queue.task_done()
async def run_queue(cities):
q = asyncio.Queue()
for c in cities:
await q.put(c)
# add shutdown sentinels for workers
num_workers = 3
timeout = ClientTimeout(total=10)
results = []
async with ClientSession(timeout=timeout) as session:
workers = [asyncio.create_task(worker(i, q, session, results)) for i in range(num_workers)]
# add None sentinel per worker to stop
for _ in range(num_workers):
await q.put(None)
await q.join()
for w in workers:
w.cancel()
return results
This pattern cleanly separates producers (putting items into queue) from consumers (workers processing).
5. Retry logic & error handling
Network calls fail. Use exponential backoff retries.
# with_retries.py
import asyncio
import random
from aiohttp import ClientError
async def fetch_with_retries(session, city, retries=3, backoff_factor=0.5):
delay = backoff_factor
for attempt in range(1, retries + 1):
try:
return await fetch_city_weather(session, city)
except (ClientError, asyncio.TimeoutError) as e:
if attempt == retries:
raise
await asyncio.sleep(delay + random.random() * 0.1)
delay *= 2
Wrap the call in try/except to log errors instead of crashing the whole run.
6. Simple in-memory caching (avoid repeated calls) & saving results
A naive cache using dict. For production use Redis / memcached.
# caching.py
import time
CACHE = {} # { "City": (timestamp, result) }
TTL = 60 * 10 # 10 minutes
def get_cached(city):
entry = CACHE.get(city)
if not entry:
return None
ts, result = entry
if time.time() - ts > TTL:
del CACHE[city]
return None
return result
def set_cache(city, result):
CACHE[city] = (time.time(), result)
# integrate into fetch:
async def fetch_cached(session, city):
cached = get_cached(city)
if cached:
return {"city": city, "data": cached, "cached": True}
r = await fetch_city_weather(session, city)
set_cache(city, r["data"])
return {"city": city, "data": r["data"], "cached": False}
Save results to JSON file asynchronously (writing file is blocking; use asyncio.to_thread to offload):
import json
import asyncio
async def save_results_to_file(results, path="results.json"):
def write():
with open(path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
await asyncio.to_thread(write)
7. Save to Postgres using asyncpg (example)
Create table:
CREATE TABLE weather_cache (
city TEXT PRIMARY KEY,
fetched_at TIMESTAMP,
payload JSONB
);
Insert/update asynchronously:
# db_save.py
import asyncpg
import datetime
import json
DSN = "postgresql://user:password@localhost:5432/weatherdb"
async def upsert_weather(conn, city, payload):
await conn.execute(
"""
INSERT INTO weather_cache(city, fetched_at, payload)
VALUES($1, $2, $3)
ON CONFLICT (city) DO UPDATE
SET fetched_at = excluded.fetched_at, payload = excluded.payload
""",
city, datetime.datetime.utcnow(), json.dumps(payload)
)
async def save_many(records):
conn = await asyncpg.connect(DSN)
try:
for r in records:
await upsert_weather(conn, r["city"], r["data"])
finally:
await conn.close()
Call await save_many(results) after fetching. In high-throughput systems, use a connection pool: asyncpg.create_pool().
8. Build a FastAPI endpoint that uses the async fetcher
app.py
# app.py
from fastapi import FastAPI, HTTPException
import asyncio
from typing import List
from weather_fetcher import fetch_all # import from earlier
app = FastAPI()
@app.post("/fetch")
async def fetch_cities(cities: List[str]):
try:
results = await fetch_all(cities)
return {"count": len(results), "results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Run:
uvicorn app:app --reload
Curl:
curl -X POST "http://127.0.0.1:8000/fetch" -H "Content-Type: application/json" -d '["Delhi","Mumbai"]'
This demonstrates async end-to-end: FastAPI receives request, calls await fetch_all(...), not blocking other requests.
9. Tests with pytest-asyncio
tests/test_fetcher.py
import pytest
import asyncio
from weather_fetcher import fetch_all
@pytest.mark.asyncio
async def test_fetch_all_returns_list():
# Use known small list or mock the session for predictable tests
cities = ["London"]
results = await fetch_all(cities)
assert isinstance(results, list)
assert results[0]["city"] == "London"
For deterministic tests, mock aiohttp.ClientSession using aresponses or monkeypatch.
10. Local run & experiment checklist
- Ensure
.envhas your API key. - Run
python weather_fetcher.pyfor a quick test. - Try
python queue_worker.pyto test the queue pattern. - Start FastAPI
uvicorn app:app --reloadand test with curl/postman. - If using Postgres, create DB & table, then test
db_save.py.
11. Exercises & practice tasks (progressive)
A — Beginner
- Convert a synchronous script of yours that calls an API repeatedly to an async version with
aiohttp. - Add basic caching to avoid re-fetching recent results.
B — Intermediate
- Implement
Semaphoreand verify you don’t exceed N concurrent requests. - Add exponential backoff & retry for transient errors.
- Write
pytesttests mocking network calls.
C — Advanced
- Build a FastAPI endpoint that returns cached results immediately and triggers a background refresh (use
asyncio.create_task()to refresh cache). - Implement a rate-limited queue using
aiolimiteror hand-rolled token-bucket logic. - Save results to Postgres using a connection pool and ensure idempotency (UPSERT).
D — Real-world stretch
- Deploy FastAPI app with Gunicorn + Uvicorn workers (example:
gunicorn -k uvicorn.workers.UvicornWorker app:app -w 4). - Add Prometheus metrics (request latency, queue lengths, success/failure counts).
- Add logging and structured JSON logs.
12. How this maps to industry usage
- Web backends: FastAPI endpoints with async DB and HTTP calls for high throughput.
- Microservices: Async service calling many downstream services concurrently.
- Data ingestion: Concurrently fetch many sources (APIs, webhooks) into ETL pipelines.
- Realtime: WebSocket servers handling thousands of connections with
asyncio. - Bots / agents: Handle concurrent user conversations and external API calls.
13. Top asyncio interview questions (expanded)
- What is the event loop?
Describe how it schedules and runs coroutines and handles I/O readiness. - Difference between coroutine, task, and future?
- coroutine: function declared by
async def(call returns coroutine object). - task: wrapper (via
asyncio.create_task) scheduled on the event loop. - future: low-level awaitable representing an eventual result; tasks wrap futures.
- coroutine: function declared by
- How does
awaitwork?awaityields control to the event loop and suspends the coroutine until the awaited awaitable is done. asyncio.gather()vsasyncio.wait()?gatheraggregates results (and raises first exception unlessreturn_exceptions=True).waitgives sets of done/pending and is more flexible for timeouts or partial handling.- Can
asynciorun code in parallel?
No —asynciois single-thread cooperative concurrency (unless combined with threads/processes). For CPU-bound tasks, use threads/multiprocessing. - How do you handle blocking code in an async app?
Useasyncio.to_thread()or run in separate executor. - How to cancel tasks?
task.cancel()and handleasyncio.CancelledErrorinside coroutine if needed. - What are common pitfalls?
- Using
time.sleep()inside async code (blocks loop) - forgetting
await(coroutine never runs) - too many concurrent connections (OOM/network throttling)
- not closing
ClientSession(connection leak)
- Using
- How to use
SemaphoreandQueue?
Explain with code and when to prefer each. - How to test async code?
Usepytest-asyncioorasyncio.run()in tests; mock network calls for determinism.
14. Recommended Next Steps to Master asyncio (your checklist)
- Practice writing async versions of existing scripts: Choose 2–3 of your synchronous utilities and convert them to async using
aiohttporasyncpg. - Use
aiohttpfor concurrent web requests: Build a small scraper/fetcher and measure speed vs syncrequests. - Try FastAPI with async endpoints: Build a small REST service that proxies multiple APIs concurrently.
- Combine with async databases (e.g., asyncpg): Implement caching/persistence with an async DB driver.
- Learn
asyncio.Queueandasyncio.Semaphore: Implement producer-consumer and bounded concurrency patterns. - Build a mini async project: This Weather Fetcher — expand it with caching, DB persisting, retries, metrics, and a frontend.
- Testing: Add
pytest-asynciotests and practice mockingaiohttp. - Observability & deployment: Add logging, metrics (Prometheus), and deploy with Uvicorn/Gunicorn.
- Edge cases & robustness: Learn cancellation, task supervision patterns, graceful shutdown, and resource cleanup.
- Concurrency primitives: Study
asyncio.Lock,Event,Condition,Barrierfor complex coordination.
15. Final: a short action plan for you (6 steps — 1 hour each)
- 0–1h: Setup project, get API key, run
weather_fetcher.py. - 1–2h: Add Semaphore and try different concurrency numbers; measure wall time.
- 1–2h: Implement retries and caching; test with flaky network (turn off internet).
- 2–3h: Implement queue worker pattern and persist to JSON.
- 2–3h: Build FastAPI wrapper and test with multiple concurrent curl requests.
- 2–3h: Write tests, add DB save (asyncpg), add a small metric and deploy locally with uvicorn.
Fantastic — we’ll go through each of the 6 action-plan tasks step-by-step, interactively, and I’ll give you the exact code to copy/run at every stage. I’ll also include run commands, what to expect, and small exercises at the end of each step so you get hands-on practice.
Assumptions: you have a Python 3.10+ environment, venv activated, and (optionally) an OpenWeatherMap API key. If you don’t have a key, the code includes a mock fallback so you can still run and learn.
Stage 0 — Project skeleton (do this once)
Run these commands once to create project structure:
mkdir async-weather
cd async-weather
python -m venv venv
# activate venv: Linux/Mac: source venv/bin/activate Windows: venv\Scripts\activate
pip install aiohttp python-dotenv fastapi uvicorn pytest pytest-asyncio asyncpg
Create .env:
OPENWEATHER_API_KEY=your_api_key_here
If you don’t have a key, leave it blank — code will use mock responses.
Create files we’ll populate:
touch weather_fetcher.py fetch_with_semaphore.py queue_worker.py with_retries.py caching_and_save.py app.py tests_test_fetcher.py
Stage 1 — Basic async fetcher (1 hour)
Goal: run a minimal aiohttp + asyncio fetcher for multiple cities.
Create weather_fetcher.py with this content:
# weather_fetcher.py
import os
import asyncio
from typing import List, Dict, Any
from aiohttp import ClientSession, ClientTimeout, ClientResponseError
from dotenv import load_dotenv
import time
import json
load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY") # may be None
BASE_URL = "https://api.openweathermap.org/data/2.5/weather"
# --- Mock fallback if no API key provided (so you can run locally) ---
async def _mock_fetch(city: str, delay: float = 0.6) -> Dict[str, Any]:
await asyncio.sleep(delay)
return {
"city": city,
"data": {
"weather": [{"description": f"mocked weather for {city}"}],
"main": {"temp": 25.0}
},
"mocked": True
}
# --- Real network fetch ---
async def fetch_city_weather(session: ClientSession, city: str) -> Dict[str, Any]:
if not API_KEY:
return await _mock_fetch(city)
params = {"q": city, "appid": API_KEY, "units": "metric"}
async with session.get(BASE_URL, params=params) as resp:
# raise for status so errors are surfaced
resp.raise_for_status()
data = await resp.json()
return {"city": city, "data": data, "mocked": False}
async def fetch_all(cities: List[str]) -> List[Dict[str, Any]]:
timeout = ClientTimeout(total=10)
async with ClientSession(timeout=timeout) as session:
tasks = [asyncio.create_task(fetch_city_weather(session, c)) for c in cities]
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
# Quick-run helper
if __name__ == "__main__":
cities = ["London", "Paris", "New Delhi", "Bengaluru", "Tokyo"]
start = time.time()
results = asyncio.run(fetch_all(cities))
elapsed = time.time() - start
print(f"Fetched {len(results)} cities in {elapsed:.2f}s")
for r in results:
desc = r["data"]["weather"][0]["description"]
print(r["city"], "->", desc, "(mocked)" if r.get("mocked") else "")
Run it:
python weather_fetcher.py
Expected:
- If API key present: you’ll see real weather descriptions (and time ~1–3s depending on network + concurrency).
- If no key: mock responses, printed quickly.
Exercise 1: Change cities, add 20 city names and re-run. Observe wall-clock time.
Stage 2 — Concurrency control with asyncio.Semaphore (30–45 min)
Goal: limit concurrent requests to avoid rate limits.
Create fetch_with_semaphore.py (or expand weather_fetcher.py — here it’s separated):
# fetch_with_semaphore.py
import asyncio
import time
from aiohttp import ClientSession, ClientTimeout
from weather_fetcher import fetch_city_weather # reuse earlier function
SEM = asyncio.Semaphore(5) # allow 5 concurrent requests
async def fetch_with_limit(session, city):
async with SEM:
return await fetch_city_weather(session, city)
async def fetch_all_limited(cities):
timeout = ClientTimeout(total=10)
async with ClientSession(timeout=timeout) as session:
tasks = [asyncio.create_task(fetch_with_limit(session, c)) for c in cities]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
cities = [f"City{i}" for i in range(1, 21)]
start = time.time()
results = asyncio.run(fetch_all_limited(cities))
print(f"Fetched {len(results)} in {time.time()-start:.2f}s")
Run:
python fetch_with_semaphore.py
What to observe:
- With
SEM = 5many tasks will run in batches of up to 5. If you use real API, this helps keep under rate limits.
Exercise 2: Change SEM to 1, 5, 20 and measure time — notice tradeoffs between throughput and resource usage.
Stage 3 — Retries & robust error handling (45–60 min)
Goal: add exponential backoff retry for transient network failures.
Create (or append) with_retries.py:
# with_retries.py
import asyncio
import random
from aiohttp import ClientError
from weather_fetcher import fetch_city_weather
from aiohttp import ClientSession, ClientTimeout
async def fetch_with_retries(session: ClientSession, city: str, retries=3, backoff_factor=0.5):
attempt = 0
delay = backoff_factor
while True:
attempt += 1
try:
return await fetch_city_weather(session, city)
except (ClientError, asyncio.TimeoutError) as e:
if attempt >= retries:
raise
await asyncio.sleep(delay + random.random() * 0.1)
delay *= 2
async def fetch_all_with_retries(cities):
timeout = ClientTimeout(total=10)
async with ClientSession(timeout=timeout) as session:
tasks = [asyncio.create_task(fetch_with_retries(session, c)) for c in cities]
return await asyncio.gather(*tasks, return_exceptions=False)
if __name__ == "__main__":
cities = ["London", "Paris", "InvalidCityName_404", "Tokyo"]
try:
res = asyncio.run(fetch_all_with_retries(cities))
print("Done:", [r["city"] for r in res])
except Exception as e:
print("Fetch error:", e)
Notes:
- If you hit real 404,
resp.raise_for_status()infetch_city_weatherwill raiseClientResponseError— you might want to treat 404 as permanent failure (no retry). You can check exception status code to avoid retrying for 4xx except 429 (rate-limit). - For brevity we retry on
ClientError&TimeoutError.
Exercise 3: Modify code to not retry for 4xx (client) errors other than 429. (Hint: except ClientResponseError as cre: if cre.status < 500: raise)
Stage 4 — Producer-consumer with asyncio.Queue + save results to JSON (1–2 hours)
Goal: use Queue for scalable worker pool and persist output to disk without blocking loop.
Create queue_worker.py:
# queue_worker.py
import asyncio
import time
import json
from aiohttp import ClientSession, ClientTimeout
from weather_fetcher import fetch_city_weather
from typing import List
NUM_WORKERS = 4
async def worker(name: int, queue: asyncio.Queue, session: ClientSession, results: List[dict]):
while True:
city = await queue.get()
if city is None: # sentinel for shutdown
queue.task_done()
break
try:
r = await fetch_city_weather(session, city)
results.append(r)
print(f"[worker {name}] got {city}")
except Exception as e:
print(f"[worker {name}] failed {city}: {e}")
finally:
queue.task_done()
async def run_queue(cities: List[str]):
q = asyncio.Queue()
for c in cities:
await q.put(c)
timeout = ClientTimeout(total=10)
results = []
async with ClientSession(timeout=timeout) as session:
workers = [asyncio.create_task(worker(i, q, session, results)) for i in range(NUM_WORKERS)]
# add shutdown sentinels
for _ in workers:
await q.put(None)
await q.join()
# cancel workers
for w in workers:
w.cancel()
return results
async def save_results_async(results, path="results.json"):
# writing is blocking; offload to thread pool
import asyncio
def write():
with open(path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
await asyncio.to_thread(write)
if __name__ == "__main__":
cities = ["London", "Paris", "Delhi", "Mumbai", "Bengaluru", "Chennai", "Tokyo", "Seoul"]
start = time.time()
results = asyncio.run(run_queue(cities))
print(f"Fetched {len(results)} in {time.time()-start:.2f}s")
asyncio.run(save_results_async(results))
print("Saved to results.json")
Run:
python queue_worker.py
Check results.json — it contains collected JSON (mock or real data). Using asyncio.to_thread(write) ensures file write doesn’t block event loop.
Exercise 4: Extend worker to call fetch_with_retries instead (from Stage 3), and measure reliability under flaky network.
Stage 5 — FastAPI wrapper & background refresh (2–3 hours)
Goal: expose an async endpoint that fetches weather concurrently and returns result. Also show pattern: return cached result fast + background refresh.
Create app.py:
# app.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from typing import List
import asyncio
from weather_fetcher import fetch_all
from caching_and_save import get_cached, set_cache # we'll create this next
app = FastAPI()
@app.post("/fetch")
async def fetch_cities(cities: List[str]):
"""
Fetch list of cities concurrently and return results (fresh fetch).
"""
try:
results = await fetch_all(cities)
return {"count": len(results), "results": results}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Endpoint to return cached quickly and optionally refresh in background
@app.post("/fetch_cached")
async def fetch_cached_endpoint(cities: List[str], background_tasks: BackgroundTasks):
results = []
to_refresh = []
for c in cities:
cached = get_cached(c)
if cached:
results.append({"city": c, "data": cached, "cached": True})
else:
# return a quick placeholder and schedule background refresh
results.append({"city": c, "data": None, "cached": False})
to_refresh.append(c)
if to_refresh:
# schedule background refresh; do not await
background_tasks.add_task(background_refresh, to_refresh)
return {"count": len(results), "results": results}
async def background_refresh(cities):
# fetch and update cache
fresh = await fetch_all(cities)
for r in fresh:
set_cache(r["city"], r["data"])
Now create caching_and_save.py to provide get_cached/set_cache used above:
# caching_and_save.py
import time
from typing import Any, Optional
CACHE = {} # city -> (timestamp, data)
TTL = 60 * 10 # 10 minutes
def get_cached(city: str) -> Optional[Any]:
entry = CACHE.get(city)
if not entry:
return None
ts, data = entry
if time.time() - ts > TTL:
del CACHE[city]
return None
return data
def set_cache(city: str, data: Any):
CACHE[city] = (time.time(), data)
Run the FastAPI app:
uvicorn app:app --reload
Test with curl (fresh fetch):
curl -X POST "http://127.0.0.1:8000/fetch" -H "Content-Type: application/json" -d '["London","Tokyo"]'
Test cached endpoint:
curl -X POST "http://127.0.0.1:8000/fetch_cached" -H "Content-Type: application/json" -d '["London","Tokyo"]'
- First call will show
cached: Falseand schedule background refresh. - Subsequent calls within TTL will return
cached: True.
Exercise 5: Add simple in-memory metrics counters: total requests served, cache hits, cache misses. Print them or expose /metrics endpoint.
Stage 6 — Tests, DB persistence (asyncpg), and deployment notes (2–4 hours)
Goal: test the fetcher and optionally store results to Postgres using asyncpg.
A — Test with pytest-asyncio
Create tests/test_fetcher.py:
# tests/test_fetcher.py
import pytest
import asyncio
from weather_fetcher import fetch_all
@pytest.mark.asyncio
async def test_fetch_all_returns_list():
cities = ["MockCity1", "MockCity2"]
results = await fetch_all(cities)
assert isinstance(results, list)
assert len(results) == 2
assert results[0]["city"] == "MockCity1"
Run:
pytest -q
Notes: This simple test works because of mock fallback. For real-network tests, either skip network tests or mock aiohttp session.
B — Persisting to Postgres (asyncpg)
Create db_save.py:
# db_save.py
import asyncpg
import datetime
import os
import json
DSN = os.getenv("DATABASE_DSN", "postgresql://user:pass@localhost:5432/weatherdb")
async def create_table(pool):
async with pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS weather_cache (
city TEXT PRIMARY KEY,
fetched_at TIMESTAMP,
payload JSONB
)
""")
async def upsert_weather(pool, city, payload):
async with pool.acquire() as conn:
await conn.execute("""
INSERT INTO weather_cache(city, fetched_at, payload)
VALUES($1, $2, $3)
ON CONFLICT (city) DO UPDATE
SET fetched_at = EXCLUDED.fetched_at, payload = EXCLUDED.payload
""", city, datetime.datetime.utcnow(), json.dumps(payload))
async def save_many(records):
pool = await asyncpg.create_pool(DSN, min_size=1, max_size=5)
try:
await create_table(pool)
for r in records:
await upsert_weather(pool, r["city"], r["data"])
finally:
await pool.close()
# CLI helper
if __name__ == "__main__":
import asyncio, sys, json
# usage: python db_save.py results.json
if len(sys.argv) < 2:
print("pass a json file with results")
raise SystemExit(1)
path = sys.argv[1]
with open(path, "r", encoding="utf-8") as f:
records = json.load(f)
asyncio.run(save_many(records))
print("Saved to DB")
Run sequence if using DB:
- Start Postgres locally and set
DATABASE_DSNenv var or edit DSN in file. - Fetch results using
queue_worker.pyto createresults.json. python db_save.py results.json
Deployment notes:
- For production FastAPI deploy: use Gunicorn + Uvicorn workers:
gunicorn -k uvicorn.workers.UvicornWorker app:app -w 4 - Use environment variables for secrets and configuration.
- Use Redis for caching in multi-instance deployments.
- Add monitoring (Prometheus + Grafana) for latency, success/failure, queue lengths.
Summary checklist — what you now have
- ✅ Stage1: async fetcher using
aiohttp(weather_fetcher.py) - ✅ Stage2: concurrency control with
Semaphore(fetch_with_semaphore.py) - ✅ Stage3: retry/backoff logic (
with_retries.py) - ✅ Stage4: producer-consumer with
Queueand async file save (queue_worker.py) - ✅ Stage5: FastAPI endpoints + cached fast-path + background refresh (
app.py) - ✅ Stage6: tests and optional DB persistence via
asyncpg(tests/test_fetcher.py,db_save.py)
Short practice plan (next 5 sessions)
- Session 1 (1 hour): Run Stage1 → Stage2, play with SEM values and many cities. Track timings.
- Session 2 (1 hour): Add Stage3 retry behavior, test with simulated network failures.
- Session 3 (1.5 hour): Implement Stage4 queue + persist JSON; integrate retry into workers.
- Session 4 (2 hours): Build and run FastAPI (Stage5); add cache metrics and try concurrent curl clients.
- Session 5 (2–3 hours): Add Postgres persistence, write a test suite, and containerize app (Dockerfile + docker-compose).
Quick extras / tips
- Avoid
time.sleep()in async code — useawait asyncio.sleep(). - For CPU-heavy tasks, use
asyncio.to_thread()orrun_in_executor. - For true horizontal scaling, use Redis for cache and a message queue (RabbitMQ/Kafka) for producers/consumers.
- For request rate-limit handling: check headers (e.g.,
Retry-After) and honor provider limits.
If you want, I can:
- generate a single ZIP of the repo with all files filled in, or
- step you through Session 1 now and run the exact code snippets interactively (I’ll provide the exact terminal commands and expected console output),
- or produce a Dockerfile + docker-compose.yml to run FastAPI + Postgres locally.
Which of those do you want right now?
Leave a Reply