Perfect 👌 — let’s make this the ultimate “Asyncio in Python” guide — explained that would teach you in an interactive class — from basics → practical → real industry usage → interview prep.


🧠 1. What is asyncio?

Think of Python code as a line of people at a single counter (like in a bank).
Each person (task) must wait for the previous one to finish — this is synchronous programming.

Now imagine there are multiple counters where people can wait while others are being served — everyone progresses faster.
That’s asynchronous programming, and asyncio is the framework that makes this possible in Python.

Definition:

asyncio is a Python library used to write concurrent (non-blocking) code using async/await syntax.

It helps run I/O-bound tasks efficiently — like:

  • Downloading multiple files
  • Calling multiple APIs
  • Reading from databases or web sockets
  • Handling multiple users in servers (FastAPI, aiohttp, etc.)

⚙️ 2. The Key Idea

Normally, Python waits for each function to finish before going to the next.

But with asyncio, we can say:

“When one task is waiting (for network, disk, or database), let another task run.”

This uses an event loop — a manager that keeps track of all tasks and switches between them efficiently.


🪄 3. Basic Example: Without and With asyncio

🐢 Normal (Synchronous) version

import time

def download_file(name):
    print(f"Downloading {name}...")
    time.sleep(2)
    print(f"Finished {name}")

def main():
    download_file("File1")
    download_file("File2")
    download_file("File3")

main()

⏱️ Takes 6 seconds total (2 + 2 + 2)


⚡ Asynchronous version

import asyncio

async def download_file(name):
    print(f"Downloading {name}...")
    await asyncio.sleep(2)
    print(f"Finished {name}")

async def main():
    tasks = [
        asyncio.create_task(download_file("File1")),
        asyncio.create_task(download_file("File2")),
        asyncio.create_task(download_file("File3"))
    ]
    await asyncio.gather(*tasks)

asyncio.run(main())

⏱️ Takes only ~2 seconds total!

Because all tasks run concurrently while waiting on await asyncio.sleep() (non-blocking wait).


🔍 4. Important Keywords

KeywordMeaning
async defDeclares an asynchronous function (coroutine)
awaitPauses function until awaited task finishes
asyncio.run()Starts the event loop
asyncio.create_task()Schedules coroutine to run concurrently
asyncio.gather()Runs multiple coroutines concurrently and waits for all

💼 5. Real-World Use Cases

🔹 Web Scraping / API Calls

import aiohttp
import asyncio

async def fetch_data(url, session):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ["https://example.com", "https://python.org", "https://github.com"]
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(*(fetch_data(url, session) for url in urls))
        print([len(r) for r in results])

asyncio.run(main())

Result: Fetches all URLs at once — much faster than sequential requests.get() calls.


🔹 Database Access (e.g., asyncpg for PostgreSQL)

import asyncio
import asyncpg

async def fetch_users():
    conn = await asyncpg.connect(user='user', password='pass', database='db', host='localhost')
    rows = await conn.fetch('SELECT * FROM users')
    await conn.close()
    return rows

asyncio.run(fetch_users())

Used in FastAPI and async microservices to handle many DB calls without blocking.


🔹 Chat / WebSocket Server (aiohttp)

Handles thousands of simultaneous users efficiently.

from aiohttp import web
import asyncio

async def handle(request):
    await asyncio.sleep(1)
    return web.Response(text="Hello async world!")

app = web.Application()
app.add_routes([web.get('/', handle)])
web.run_app(app)

🏭 6. How It’s Used in Industry

Industry Use CaseExample LibraryDescription
Web APIsFastAPI, aiohttpNon-blocking API endpoints
Data PipelinesAsync ETL, AirbyteConcurrent data fetching & transformation
Chat / Notification systemsWebSockets, aiohttpHandle thousands of live users
Cloud and MicroservicesAsync microservicesEfficient inter-service calls
ML ServingFastAPI async endpointsServe models faster by overlapping I/O

🧩 7. Mixing Sync + Async

Sometimes you need to run a blocking function (like a CPU-heavy calculation) inside async code.

Use asyncio.to_thread() to offload it:

import asyncio
import time

def heavy_task():
    time.sleep(3)
    return "Done"

async def main():
    result = await asyncio.to_thread(heavy_task)
    print(result)

asyncio.run(main())

🚀 8. When NOT to Use asyncio

❌ Not good for CPU-bound tasks (e.g., ML training, image processing).
✅ Use multiprocessing or job queues instead.

✅ Asyncio shines for I/O-bound tasks — API calls, DB access, networking.


💬 9. Interactive Analogy (Easy Way to Remember)

ScenarioType
You cook pasta 🍝 and wait at the stove till it boilsSynchronous
You start boiling pasta, then chop vegetables 🥕 and prepare sauce while waitingAsynchronous

That’s exactly how asyncio helps your program multitask efficiently.


🎯 10. Top Interview Questions on asyncio

🔢QuestionQuick Hint / Answer
1What is asyncio in Python?A library for asynchronous, non-blocking I/O.
2Difference between threading and asyncio?Threading uses OS threads; asyncio uses a single thread with cooperative multitasking.
3What is an event loop?The central scheduler that runs async tasks.
4What is a coroutine?A special async function that can pause/resume with await.
5How do you run multiple async tasks concurrently?asyncio.gather() or asyncio.create_task().
6Can you use await in a normal function?No, must be inside an async def.
7What happens if you block inside an async function?The event loop freezes; use await asyncio.sleep() instead of time.sleep().
8How do you handle exceptions in async tasks?Wrap in try/except, or use return_exceptions=True in asyncio.gather().
9Difference between concurrency and parallelism?Concurrency = many tasks taking turns; Parallelism = many tasks literally running at once.
10Give real-world use cases of asyncio.Web servers, web scraping, async DB queries, chatbots.

🧭 11. Recommended Next Steps to Master asyncio

  1. ✅ Practice writing async versions of your existing synchronous scripts.
  2. ⚙️ Use aiohttp for concurrent web requests.
  3. 🧩 Try FastAPI with async endpoints.
  4. 📊 Combine with async database drivers like asyncpg.
  5. 💬 Explore asyncio.Queue and asyncio.Semaphore for producer-consumer patterns.
  6. 🧠 Build a mini async project — like a concurrent weather fetcher or chatbot API.

Awesome — let’s build the Async Weather API Fetcher end-to-end: clear steps, runnable code, exercises, and next steps so you can practice everything asyncio brings to the table.

I’ll give you:

  1. Project setup (requirements + env)
  2. Simple single-file async fetcher (aiohttp + asyncio)
  3. Concurrency control (Semaphore + rate limiting)
  4. Producer-consumer with asyncio.Queue
  5. Retry & error handling
  6. Caching and writing results to disk (and an example of saving to Postgres with asyncpg)
  7. FastAPI async endpoint that uses the fetcher
  8. Tests with pytest-asyncio
  9. Exercises, improvements, and the Recommended Next Steps you asked for

1. Quick setup

Create a project folder and virtualenv:

mkdir async-weather
cd async-weather
python -m venv venv
source venv/bin/activate     # Windows: venv\Scripts\activate

requirements.txt

aiohttp==3.8.5
asyncio     # builtin, no install
python-dotenv==1.0.0
fastapi==0.95.2
uvicorn==0.22.0
asyncpg==0.29.0     # optional: async Postgres client
pytest==7.4.0
pytest-asyncio==0.22.0

Install:

pip install -r requirements.txt

Get an API key from a weather API (e.g., OpenWeatherMap). Put it in .env:

OPENWEATHER_API_KEY=your_key_here

(If you prefer another provider, adapt the URL.)


2. Simple async fetcher — the heart of the project

weather_fetcher.py — a simple async fetcher that queries OpenWeatherMap for multiple cities concurrently.

# weather_fetcher.py
import asyncio
import os
from typing import List, Dict, Any
from aiohttp import ClientSession, ClientTimeout
from dotenv import load_dotenv

load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
BASE_URL = "https://api.openweathermap.org/data/2.5/weather"

async def fetch_city_weather(session: ClientSession, city: str) -> Dict[str, Any]:
    params = {"q": city, "appid": API_KEY, "units": "metric"}
    async with session.get(BASE_URL, params=params) as resp:
        resp.raise_for_status()
        data = await resp.json()
        return {"city": city, "data": data}

async def fetch_all(cities: List[str]) -> List[Dict[str, Any]]:
    timeout = ClientTimeout(total=10)
    async with ClientSession(timeout=timeout) as session:
        tasks = [asyncio.create_task(fetch_city_weather(session, c)) for c in cities]
        results = await asyncio.gather(*tasks, return_exceptions=False)
        return results

if __name__ == "__main__":
    cities = ["London", "Paris", "New Delhi", "Bengaluru", "Tokyo"]
    results = asyncio.run(fetch_all(cities))
    for r in results:
        print(r["city"], "->", r["data"]["weather"][0]["description"])

Notes:

  • asyncio.create_task() schedules concurrent coroutines.
  • aiohttp.ClientSession is used for efficient connection pooling.
  • asyncio.run() starts the event loop.

Run:

python weather_fetcher.py

You should see descriptions for each city. If you don’t have a key, the API will return a 401 — later we add a mock fallback.


3. Concurrency control: asyncio.Semaphore for rate limits

APIs often restrict requests per second. Use a Semaphore to limit concurrent requests.

# fetch_with_semaphore.py (expand on previous file)
import asyncio
from aiohttp import ClientSession, ClientTimeout

SEM = asyncio.Semaphore(5)  # allow 5 concurrent requests

async def fetch_with_limit(session, city):
    async with SEM:
        return await fetch_city_weather(session, city)

If the provider allows 10 requests/sec, tune Semaphore and use asyncio.sleep() to add spacing between batches.


4. Producer-consumer pattern with asyncio.Queue

Useful if you want a set of producers (e.g., reading city list from a file) and a pool of worker consumers that fetch.

# queue_worker.py
import asyncio
from aiohttp import ClientSession

async def worker(name: int, queue: asyncio.Queue, session: ClientSession, results: list):
    while True:
        city = await queue.get()
        if city is None:
            queue.task_done()
            break
        try:
            r = await fetch_city_weather(session, city)
            results.append(r)
        except Exception as e:
            print(f"Worker {name} failed for {city}: {e}")
        finally:
            queue.task_done()

async def run_queue(cities):
    q = asyncio.Queue()
    for c in cities:
        await q.put(c)
    # add shutdown sentinels for workers
    num_workers = 3
    timeout = ClientTimeout(total=10)
    results = []
    async with ClientSession(timeout=timeout) as session:
        workers = [asyncio.create_task(worker(i, q, session, results)) for i in range(num_workers)]
        # add None sentinel per worker to stop
        for _ in range(num_workers):
            await q.put(None)
        await q.join()
        for w in workers:
            w.cancel()
    return results

This pattern cleanly separates producers (putting items into queue) from consumers (workers processing).


5. Retry logic & error handling

Network calls fail. Use exponential backoff retries.

# with_retries.py
import asyncio
import random
from aiohttp import ClientError

async def fetch_with_retries(session, city, retries=3, backoff_factor=0.5):
    delay = backoff_factor
    for attempt in range(1, retries + 1):
        try:
            return await fetch_city_weather(session, city)
        except (ClientError, asyncio.TimeoutError) as e:
            if attempt == retries:
                raise
            await asyncio.sleep(delay + random.random() * 0.1)
            delay *= 2

Wrap the call in try/except to log errors instead of crashing the whole run.


6. Simple in-memory caching (avoid repeated calls) & saving results

A naive cache using dict. For production use Redis / memcached.

# caching.py
import time

CACHE = {}  # { "City": (timestamp, result) }
TTL = 60 * 10  # 10 minutes

def get_cached(city):
    entry = CACHE.get(city)
    if not entry:
        return None
    ts, result = entry
    if time.time() - ts > TTL:
        del CACHE[city]
        return None
    return result

def set_cache(city, result):
    CACHE[city] = (time.time(), result)

# integrate into fetch:
async def fetch_cached(session, city):
    cached = get_cached(city)
    if cached:
        return {"city": city, "data": cached, "cached": True}
    r = await fetch_city_weather(session, city)
    set_cache(city, r["data"])
    return {"city": city, "data": r["data"], "cached": False}

Save results to JSON file asynchronously (writing file is blocking; use asyncio.to_thread to offload):

import json
import asyncio

async def save_results_to_file(results, path="results.json"):
    def write():
        with open(path, "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
    await asyncio.to_thread(write)

7. Save to Postgres using asyncpg (example)

Create table:

CREATE TABLE weather_cache (
  city TEXT PRIMARY KEY,
  fetched_at TIMESTAMP,
  payload JSONB
);

Insert/update asynchronously:

# db_save.py
import asyncpg
import datetime
import json

DSN = "postgresql://user:password@localhost:5432/weatherdb"

async def upsert_weather(conn, city, payload):
    await conn.execute(
        """
        INSERT INTO weather_cache(city, fetched_at, payload)
        VALUES($1, $2, $3)
        ON CONFLICT (city) DO UPDATE
        SET fetched_at = excluded.fetched_at, payload = excluded.payload
        """,
        city, datetime.datetime.utcnow(), json.dumps(payload)
    )

async def save_many(records):
    conn = await asyncpg.connect(DSN)
    try:
        for r in records:
            await upsert_weather(conn, r["city"], r["data"])
    finally:
        await conn.close()

Call await save_many(results) after fetching. In high-throughput systems, use a connection pool: asyncpg.create_pool().


8. Build a FastAPI endpoint that uses the async fetcher

app.py

# app.py
from fastapi import FastAPI, HTTPException
import asyncio
from typing import List
from weather_fetcher import fetch_all  # import from earlier

app = FastAPI()

@app.post("/fetch")
async def fetch_cities(cities: List[str]):
    try:
        results = await fetch_all(cities)
        return {"count": len(results), "results": results}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Run:

uvicorn app:app --reload

Curl:

curl -X POST "http://127.0.0.1:8000/fetch" -H "Content-Type: application/json" -d '["Delhi","Mumbai"]'

This demonstrates async end-to-end: FastAPI receives request, calls await fetch_all(...), not blocking other requests.


9. Tests with pytest-asyncio

tests/test_fetcher.py

import pytest
import asyncio
from weather_fetcher import fetch_all

@pytest.mark.asyncio
async def test_fetch_all_returns_list():
    # Use known small list or mock the session for predictable tests
    cities = ["London"]
    results = await fetch_all(cities)
    assert isinstance(results, list)
    assert results[0]["city"] == "London"

For deterministic tests, mock aiohttp.ClientSession using aresponses or monkeypatch.


10. Local run & experiment checklist

  1. Ensure .env has your API key.
  2. Run python weather_fetcher.py for a quick test.
  3. Try python queue_worker.py to test the queue pattern.
  4. Start FastAPI uvicorn app:app --reload and test with curl/postman.
  5. If using Postgres, create DB & table, then test db_save.py.

11. Exercises & practice tasks (progressive)

A — Beginner

  • Convert a synchronous script of yours that calls an API repeatedly to an async version with aiohttp.
  • Add basic caching to avoid re-fetching recent results.

B — Intermediate

  • Implement Semaphore and verify you don’t exceed N concurrent requests.
  • Add exponential backoff & retry for transient errors.
  • Write pytest tests mocking network calls.

C — Advanced

  • Build a FastAPI endpoint that returns cached results immediately and triggers a background refresh (use asyncio.create_task() to refresh cache).
  • Implement a rate-limited queue using aiolimiter or hand-rolled token-bucket logic.
  • Save results to Postgres using a connection pool and ensure idempotency (UPSERT).

D — Real-world stretch

  • Deploy FastAPI app with Gunicorn + Uvicorn workers (example: gunicorn -k uvicorn.workers.UvicornWorker app:app -w 4).
  • Add Prometheus metrics (request latency, queue lengths, success/failure counts).
  • Add logging and structured JSON logs.

12. How this maps to industry usage

  • Web backends: FastAPI endpoints with async DB and HTTP calls for high throughput.
  • Microservices: Async service calling many downstream services concurrently.
  • Data ingestion: Concurrently fetch many sources (APIs, webhooks) into ETL pipelines.
  • Realtime: WebSocket servers handling thousands of connections with asyncio.
  • Bots / agents: Handle concurrent user conversations and external API calls.

13. Top asyncio interview questions (expanded)

  1. What is the event loop?
    Describe how it schedules and runs coroutines and handles I/O readiness.
  2. Difference between coroutine, task, and future?
    • coroutine: function declared by async def (call returns coroutine object).
    • task: wrapper (via asyncio.create_task) scheduled on the event loop.
    • future: low-level awaitable representing an eventual result; tasks wrap futures.
  3. How does await work?
    await yields control to the event loop and suspends the coroutine until the awaited awaitable is done.
  4. asyncio.gather() vs asyncio.wait()?
    gather aggregates results (and raises first exception unless return_exceptions=True). wait gives sets of done/pending and is more flexible for timeouts or partial handling.
  5. Can asyncio run code in parallel?
    No — asyncio is single-thread cooperative concurrency (unless combined with threads/processes). For CPU-bound tasks, use threads/multiprocessing.
  6. How do you handle blocking code in an async app?
    Use asyncio.to_thread() or run in separate executor.
  7. How to cancel tasks?
    task.cancel() and handle asyncio.CancelledError inside coroutine if needed.
  8. What are common pitfalls?
    • Using time.sleep() inside async code (blocks loop)
    • forgetting await (coroutine never runs)
    • too many concurrent connections (OOM/network throttling)
    • not closing ClientSession (connection leak)
  9. How to use Semaphore and Queue?
    Explain with code and when to prefer each.
  10. How to test async code?
    Use pytest-asyncio or asyncio.run() in tests; mock network calls for determinism.

14. Recommended Next Steps to Master asyncio (your checklist)

  1. Practice writing async versions of existing scripts: Choose 2–3 of your synchronous utilities and convert them to async using aiohttp or asyncpg.
  2. Use aiohttp for concurrent web requests: Build a small scraper/fetcher and measure speed vs sync requests.
  3. Try FastAPI with async endpoints: Build a small REST service that proxies multiple APIs concurrently.
  4. Combine with async databases (e.g., asyncpg): Implement caching/persistence with an async DB driver.
  5. Learn asyncio.Queue and asyncio.Semaphore: Implement producer-consumer and bounded concurrency patterns.
  6. Build a mini async project: This Weather Fetcher — expand it with caching, DB persisting, retries, metrics, and a frontend.
  7. Testing: Add pytest-asyncio tests and practice mocking aiohttp.
  8. Observability & deployment: Add logging, metrics (Prometheus), and deploy with Uvicorn/Gunicorn.
  9. Edge cases & robustness: Learn cancellation, task supervision patterns, graceful shutdown, and resource cleanup.
  10. Concurrency primitives: Study asyncio.Lock, Event, Condition, Barrier for complex coordination.

15. Final: a short action plan for you (6 steps — 1 hour each)

  1. 0–1h: Setup project, get API key, run weather_fetcher.py.
  2. 1–2h: Add Semaphore and try different concurrency numbers; measure wall time.
  3. 1–2h: Implement retries and caching; test with flaky network (turn off internet).
  4. 2–3h: Implement queue worker pattern and persist to JSON.
  5. 2–3h: Build FastAPI wrapper and test with multiple concurrent curl requests.
  6. 2–3h: Write tests, add DB save (asyncpg), add a small metric and deploy locally with uvicorn.

Fantastic — we’ll go through each of the 6 action-plan tasks step-by-step, interactively, and I’ll give you the exact code to copy/run at every stage. I’ll also include run commands, what to expect, and small exercises at the end of each step so you get hands-on practice.

Assumptions: you have a Python 3.10+ environment, venv activated, and (optionally) an OpenWeatherMap API key. If you don’t have a key, the code includes a mock fallback so you can still run and learn.


Stage 0 — Project skeleton (do this once)

Run these commands once to create project structure:

mkdir async-weather
cd async-weather
python -m venv venv
# activate venv: Linux/Mac: source venv/bin/activate  Windows: venv\Scripts\activate
pip install aiohttp python-dotenv fastapi uvicorn pytest pytest-asyncio asyncpg

Create .env:

OPENWEATHER_API_KEY=your_api_key_here

If you don’t have a key, leave it blank — code will use mock responses.

Create files we’ll populate:

touch weather_fetcher.py fetch_with_semaphore.py queue_worker.py with_retries.py caching_and_save.py app.py tests_test_fetcher.py

Stage 1 — Basic async fetcher (1 hour)

Goal: run a minimal aiohttp + asyncio fetcher for multiple cities.

Create weather_fetcher.py with this content:

# weather_fetcher.py
import os
import asyncio
from typing import List, Dict, Any
from aiohttp import ClientSession, ClientTimeout, ClientResponseError
from dotenv import load_dotenv
import time
import json

load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")  # may be None

BASE_URL = "https://api.openweathermap.org/data/2.5/weather"

# --- Mock fallback if no API key provided (so you can run locally) ---
async def _mock_fetch(city: str, delay: float = 0.6) -> Dict[str, Any]:
    await asyncio.sleep(delay)
    return {
        "city": city,
        "data": {
            "weather": [{"description": f"mocked weather for {city}"}],
            "main": {"temp": 25.0}
        },
        "mocked": True
    }

# --- Real network fetch ---
async def fetch_city_weather(session: ClientSession, city: str) -> Dict[str, Any]:
    if not API_KEY:
        return await _mock_fetch(city)
    params = {"q": city, "appid": API_KEY, "units": "metric"}
    async with session.get(BASE_URL, params=params) as resp:
        # raise for status so errors are surfaced
        resp.raise_for_status()
        data = await resp.json()
        return {"city": city, "data": data, "mocked": False}

async def fetch_all(cities: List[str]) -> List[Dict[str, Any]]:
    timeout = ClientTimeout(total=10)
    async with ClientSession(timeout=timeout) as session:
        tasks = [asyncio.create_task(fetch_city_weather(session, c)) for c in cities]
        results = await asyncio.gather(*tasks, return_exceptions=False)
        return results

# Quick-run helper
if __name__ == "__main__":
    cities = ["London", "Paris", "New Delhi", "Bengaluru", "Tokyo"]
    start = time.time()
    results = asyncio.run(fetch_all(cities))
    elapsed = time.time() - start
    print(f"Fetched {len(results)} cities in {elapsed:.2f}s")
    for r in results:
        desc = r["data"]["weather"][0]["description"]
        print(r["city"], "->", desc, "(mocked)" if r.get("mocked") else "")

Run it:

python weather_fetcher.py

Expected:

  • If API key present: you’ll see real weather descriptions (and time ~1–3s depending on network + concurrency).
  • If no key: mock responses, printed quickly.

Exercise 1: Change cities, add 20 city names and re-run. Observe wall-clock time.


Stage 2 — Concurrency control with asyncio.Semaphore (30–45 min)

Goal: limit concurrent requests to avoid rate limits.

Create fetch_with_semaphore.py (or expand weather_fetcher.py — here it’s separated):

# fetch_with_semaphore.py
import asyncio
import time
from aiohttp import ClientSession, ClientTimeout
from weather_fetcher import fetch_city_weather  # reuse earlier function

SEM = asyncio.Semaphore(5)  # allow 5 concurrent requests

async def fetch_with_limit(session, city):
    async with SEM:
        return await fetch_city_weather(session, city)

async def fetch_all_limited(cities):
    timeout = ClientTimeout(total=10)
    async with ClientSession(timeout=timeout) as session:
        tasks = [asyncio.create_task(fetch_with_limit(session, c)) for c in cities]
        return await asyncio.gather(*tasks)

if __name__ == "__main__":
    cities = [f"City{i}" for i in range(1, 21)]
    start = time.time()
    results = asyncio.run(fetch_all_limited(cities))
    print(f"Fetched {len(results)} in {time.time()-start:.2f}s")

Run:

python fetch_with_semaphore.py

What to observe:

  • With SEM = 5 many tasks will run in batches of up to 5. If you use real API, this helps keep under rate limits.

Exercise 2: Change SEM to 1, 5, 20 and measure time — notice tradeoffs between throughput and resource usage.


Stage 3 — Retries & robust error handling (45–60 min)

Goal: add exponential backoff retry for transient network failures.

Create (or append) with_retries.py:

# with_retries.py
import asyncio
import random
from aiohttp import ClientError
from weather_fetcher import fetch_city_weather
from aiohttp import ClientSession, ClientTimeout

async def fetch_with_retries(session: ClientSession, city: str, retries=3, backoff_factor=0.5):
    attempt = 0
    delay = backoff_factor
    while True:
        attempt += 1
        try:
            return await fetch_city_weather(session, city)
        except (ClientError, asyncio.TimeoutError) as e:
            if attempt >= retries:
                raise
            await asyncio.sleep(delay + random.random() * 0.1)
            delay *= 2

async def fetch_all_with_retries(cities):
    timeout = ClientTimeout(total=10)
    async with ClientSession(timeout=timeout) as session:
        tasks = [asyncio.create_task(fetch_with_retries(session, c)) for c in cities]
        return await asyncio.gather(*tasks, return_exceptions=False)

if __name__ == "__main__":
    cities = ["London", "Paris", "InvalidCityName_404", "Tokyo"]
    try:
        res = asyncio.run(fetch_all_with_retries(cities))
        print("Done:", [r["city"] for r in res])
    except Exception as e:
        print("Fetch error:", e)

Notes:

  • If you hit real 404, resp.raise_for_status() in fetch_city_weather will raise ClientResponseError — you might want to treat 404 as permanent failure (no retry). You can check exception status code to avoid retrying for 4xx except 429 (rate-limit).
  • For brevity we retry on ClientError & TimeoutError.

Exercise 3: Modify code to not retry for 4xx (client) errors other than 429. (Hint: except ClientResponseError as cre: if cre.status < 500: raise)


Stage 4 — Producer-consumer with asyncio.Queue + save results to JSON (1–2 hours)

Goal: use Queue for scalable worker pool and persist output to disk without blocking loop.

Create queue_worker.py:

# queue_worker.py
import asyncio
import time
import json
from aiohttp import ClientSession, ClientTimeout
from weather_fetcher import fetch_city_weather
from typing import List

NUM_WORKERS = 4

async def worker(name: int, queue: asyncio.Queue, session: ClientSession, results: List[dict]):
    while True:
        city = await queue.get()
        if city is None:  # sentinel for shutdown
            queue.task_done()
            break
        try:
            r = await fetch_city_weather(session, city)
            results.append(r)
            print(f"[worker {name}] got {city}")
        except Exception as e:
            print(f"[worker {name}] failed {city}: {e}")
        finally:
            queue.task_done()

async def run_queue(cities: List[str]):
    q = asyncio.Queue()
    for c in cities:
        await q.put(c)

    timeout = ClientTimeout(total=10)
    results = []
    async with ClientSession(timeout=timeout) as session:
        workers = [asyncio.create_task(worker(i, q, session, results)) for i in range(NUM_WORKERS)]
        # add shutdown sentinels
        for _ in workers:
            await q.put(None)
        await q.join()
        # cancel workers
        for w in workers:
            w.cancel()
    return results

async def save_results_async(results, path="results.json"):
    # writing is blocking; offload to thread pool
    import asyncio
    def write():
        with open(path, "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
    await asyncio.to_thread(write)

if __name__ == "__main__":
    cities = ["London", "Paris", "Delhi", "Mumbai", "Bengaluru", "Chennai", "Tokyo", "Seoul"]
    start = time.time()
    results = asyncio.run(run_queue(cities))
    print(f"Fetched {len(results)} in {time.time()-start:.2f}s")
    asyncio.run(save_results_async(results))
    print("Saved to results.json")

Run:

python queue_worker.py

Check results.json — it contains collected JSON (mock or real data). Using asyncio.to_thread(write) ensures file write doesn’t block event loop.

Exercise 4: Extend worker to call fetch_with_retries instead (from Stage 3), and measure reliability under flaky network.


Stage 5 — FastAPI wrapper & background refresh (2–3 hours)

Goal: expose an async endpoint that fetches weather concurrently and returns result. Also show pattern: return cached result fast + background refresh.

Create app.py:

# app.py
from fastapi import FastAPI, HTTPException, BackgroundTasks
from typing import List
import asyncio
from weather_fetcher import fetch_all
from caching_and_save import get_cached, set_cache  # we'll create this next

app = FastAPI()

@app.post("/fetch")
async def fetch_cities(cities: List[str]):
    """
    Fetch list of cities concurrently and return results (fresh fetch).
    """
    try:
        results = await fetch_all(cities)
        return {"count": len(results), "results": results}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Endpoint to return cached quickly and optionally refresh in background
@app.post("/fetch_cached")
async def fetch_cached_endpoint(cities: List[str], background_tasks: BackgroundTasks):
    results = []
    to_refresh = []
    for c in cities:
        cached = get_cached(c)
        if cached:
            results.append({"city": c, "data": cached, "cached": True})
        else:
            # return a quick placeholder and schedule background refresh
            results.append({"city": c, "data": None, "cached": False})
            to_refresh.append(c)
    if to_refresh:
        # schedule background refresh; do not await
        background_tasks.add_task(background_refresh, to_refresh)
    return {"count": len(results), "results": results}

async def background_refresh(cities):
    # fetch and update cache
    fresh = await fetch_all(cities)
    for r in fresh:
        set_cache(r["city"], r["data"])

Now create caching_and_save.py to provide get_cached/set_cache used above:

# caching_and_save.py
import time
from typing import Any, Optional

CACHE = {}  # city -> (timestamp, data)
TTL = 60 * 10  # 10 minutes

def get_cached(city: str) -> Optional[Any]:
    entry = CACHE.get(city)
    if not entry:
        return None
    ts, data = entry
    if time.time() - ts > TTL:
        del CACHE[city]
        return None
    return data

def set_cache(city: str, data: Any):
    CACHE[city] = (time.time(), data)

Run the FastAPI app:

uvicorn app:app --reload

Test with curl (fresh fetch):

curl -X POST "http://127.0.0.1:8000/fetch" -H "Content-Type: application/json" -d '["London","Tokyo"]'

Test cached endpoint:

curl -X POST "http://127.0.0.1:8000/fetch_cached" -H "Content-Type: application/json" -d '["London","Tokyo"]'
  • First call will show cached: False and schedule background refresh.
  • Subsequent calls within TTL will return cached: True.

Exercise 5: Add simple in-memory metrics counters: total requests served, cache hits, cache misses. Print them or expose /metrics endpoint.


Stage 6 — Tests, DB persistence (asyncpg), and deployment notes (2–4 hours)

Goal: test the fetcher and optionally store results to Postgres using asyncpg.

A — Test with pytest-asyncio

Create tests/test_fetcher.py:

# tests/test_fetcher.py
import pytest
import asyncio
from weather_fetcher import fetch_all

@pytest.mark.asyncio
async def test_fetch_all_returns_list():
    cities = ["MockCity1", "MockCity2"]
    results = await fetch_all(cities)
    assert isinstance(results, list)
    assert len(results) == 2
    assert results[0]["city"] == "MockCity1"

Run:

pytest -q

Notes: This simple test works because of mock fallback. For real-network tests, either skip network tests or mock aiohttp session.

B — Persisting to Postgres (asyncpg)

Create db_save.py:

# db_save.py
import asyncpg
import datetime
import os
import json

DSN = os.getenv("DATABASE_DSN", "postgresql://user:pass@localhost:5432/weatherdb")

async def create_table(pool):
    async with pool.acquire() as conn:
        await conn.execute("""
        CREATE TABLE IF NOT EXISTS weather_cache (
            city TEXT PRIMARY KEY,
            fetched_at TIMESTAMP,
            payload JSONB
        )
        """)

async def upsert_weather(pool, city, payload):
    async with pool.acquire() as conn:
        await conn.execute("""
            INSERT INTO weather_cache(city, fetched_at, payload)
            VALUES($1, $2, $3)
            ON CONFLICT (city) DO UPDATE
            SET fetched_at = EXCLUDED.fetched_at, payload = EXCLUDED.payload
        """, city, datetime.datetime.utcnow(), json.dumps(payload))

async def save_many(records):
    pool = await asyncpg.create_pool(DSN, min_size=1, max_size=5)
    try:
        await create_table(pool)
        for r in records:
            await upsert_weather(pool, r["city"], r["data"])
    finally:
        await pool.close()

# CLI helper
if __name__ == "__main__":
    import asyncio, sys, json
    # usage: python db_save.py results.json
    if len(sys.argv) < 2:
        print("pass a json file with results")
        raise SystemExit(1)
    path = sys.argv[1]
    with open(path, "r", encoding="utf-8") as f:
        records = json.load(f)
    asyncio.run(save_many(records))
    print("Saved to DB")

Run sequence if using DB:

  1. Start Postgres locally and set DATABASE_DSN env var or edit DSN in file.
  2. Fetch results using queue_worker.py to create results.json.
  3. python db_save.py results.json

Deployment notes:

  • For production FastAPI deploy: use Gunicorn + Uvicorn workers: gunicorn -k uvicorn.workers.UvicornWorker app:app -w 4
  • Use environment variables for secrets and configuration.
  • Use Redis for caching in multi-instance deployments.
  • Add monitoring (Prometheus + Grafana) for latency, success/failure, queue lengths.

Summary checklist — what you now have

  • ✅ Stage1: async fetcher using aiohttp (weather_fetcher.py)
  • ✅ Stage2: concurrency control with Semaphore (fetch_with_semaphore.py)
  • ✅ Stage3: retry/backoff logic (with_retries.py)
  • ✅ Stage4: producer-consumer with Queue and async file save (queue_worker.py)
  • ✅ Stage5: FastAPI endpoints + cached fast-path + background refresh (app.py)
  • ✅ Stage6: tests and optional DB persistence via asyncpg (tests/test_fetcher.py, db_save.py)

Short practice plan (next 5 sessions)

  1. Session 1 (1 hour): Run Stage1 → Stage2, play with SEM values and many cities. Track timings.
  2. Session 2 (1 hour): Add Stage3 retry behavior, test with simulated network failures.
  3. Session 3 (1.5 hour): Implement Stage4 queue + persist JSON; integrate retry into workers.
  4. Session 4 (2 hours): Build and run FastAPI (Stage5); add cache metrics and try concurrent curl clients.
  5. Session 5 (2–3 hours): Add Postgres persistence, write a test suite, and containerize app (Dockerfile + docker-compose).

Quick extras / tips

  • Avoid time.sleep() in async code — use await asyncio.sleep().
  • For CPU-heavy tasks, use asyncio.to_thread() or run_in_executor.
  • For true horizontal scaling, use Redis for cache and a message queue (RabbitMQ/Kafka) for producers/consumers.
  • For request rate-limit handling: check headers (e.g., Retry-After) and honor provider limits.

If you want, I can:

  • generate a single ZIP of the repo with all files filled in, or
  • step you through Session 1 now and run the exact code snippets interactively (I’ll provide the exact terminal commands and expected console output),
  • or produce a Dockerfile + docker-compose.yml to run FastAPI + Postgres locally.

Which of those do you want right now?

Posted in

Leave a Reply

Your email address will not be published. Required fields are marked *