let’s add JWT authentication, password hashing, protected routes, and a Dockerized pgAdmin to your FastAPI + PostgreSQL stack. I’ll give you a focused, ready-to-run implementation (files + explanations + Docker changes).
I’ll also briefly explain how to do the same for Flask at the end.


🚀 What you’ll get

  1. Signup (/signup) — creates user with hashed password
  2. Login (/token) — returns JWT access token
  3. Protected route (/users/me) — requires valid token
  4. Token creation/validation using python-jose
  5. Password hashing using passlib
  6. docker-compose updated to run PostgreSQL, FastAPI, and pgAdmin
  7. Environment-driven secrets (.env) for safety

🗂️ Updated project structure (FastAPI)

fastapi_postgres_docker/
├── app/
│   ├── main.py
│   ├── models.py
│   ├── database.py
│   ├── schemas.py
│   ├── crud.py
│   ├── security.py
│   └── __init__.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── .env

1) requirements.txt

Add these packages (append to existing file):

fastapi
uvicorn
sqlalchemy
psycopg2-binary
pydantic
python-jose[cryptography]
passlib[bcrypt]
python-dotenv
alembic

2) .env (example)

Do not commit this file to git.

POSTGRES_DB=testdb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=yourpassword
POSTGRES_HOST=db
POSTGRES_PORT=5432

SECRET_KEY=your-very-secret-key-please-change
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=60
PGADMIN_DEFAULT_EMAIL=admin@local
PGADMIN_DEFAULT_PASSWORD=pgadminpassword

3) database.py (unchanged except use env)

# app/database.py
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv

load_dotenv()

DB_USER = os.getenv("POSTGRES_USER", "postgres")
DB_PASS = os.getenv("POSTGRES_PASSWORD", "yourpassword")
DB_HOST = os.getenv("POSTGRES_HOST", "db")
DB_PORT = os.getenv("POSTGRES_PORT", "5432")
DB_NAME = os.getenv("POSTGRES_DB", "testdb")

DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

4) models.py (user table)

# app/models.py
from sqlalchemy import Column, Integer, String
from .database import Base

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    full_name = Column(String(255), nullable=True)
    hashed_password = Column(String(255), nullable=False)

5) schemas.py (pydantic)

# app/schemas.py
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    email: EmailStr
    full_name: str | None = None

class UserCreate(UserBase):
    password: str

class UserOut(UserBase):
    id: int

    class Config:
        orm_mode = True

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    email: str | None = None

6) crud.py (user operations)

# app/crud.py
from sqlalchemy.orm import Session
from . import models, schemas

def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()

def get_user(db: Session, user_id: int):
    return db.query(models.User).get(user_id)

def create_user(db: Session, user: schemas.UserCreate, hashed_password: str):
    db_user = models.User(email=user.email, full_name=user.full_name, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

7) security.py (hashing & JWT helpers)

# app/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext
import os
from dotenv import load_dotenv

load_dotenv()

SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "60"))

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def decode_access_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

8) main.py (routes + auth)

# app/main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from . import models, schemas, crud
from .database import SessionLocal, engine
from .security import get_password_hash, verify_password, create_access_token, decode_access_token
from dotenv import load_dotenv
import os

load_dotenv()

models.Base.metadata.create_all(bind=engine)

app = FastAPI(title="FastAPI Auth Example")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Signup - create user
@app.post("/signup", response_model=schemas.UserOut)
def signup(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    hashed_password = get_password_hash(user.password)
    created = crud.create_user(db, user, hashed_password)
    return created

# Token (login) - OAuth2 password grant
@app.post("/token", response_model=schemas.Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    # OAuth2PasswordRequestForm has 'username' and 'password' fields (username -> we use email)
    user = crud.get_user_by_email(db, email=form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail="Incorrect email or password",
                            headers={"WWW-Authenticate": "Bearer"})
    access_token = create_access_token(data={"sub": user.email})
    return {"access_token": access_token, "token_type": "bearer"}

# Helper to get current user from token
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    payload = decode_access_token(token)
    if not payload:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication")
    email: str | None = payload.get("sub")
    if email is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication")
    user = crud.get_user_by_email(db, email=email)
    if user is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
    return user

# Protected route
@app.get("/users/me", response_model=schemas.UserOut)
def read_users_me(current_user = Depends(get_current_user)):
    return current_user

Notes:

  • Login endpoint uses OAuth2 password grant form (/token) — standard and compatible with Swagger UI.
  • The JWT payload stores sub = user email; adjust to include user id if preferred.

9) Docker Compose — add pgAdmin and keep DB volume

Update (or replace) your docker-compose.yml:

version: '3.8'

services:
  db:
    image: postgres:15
    container_name: postgres_db
    environment:
      POSTGRES_DB: ${POSTGRES_DB:-testdb}
      POSTGRES_USER: ${POSTGRES_USER:-postgres}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-yourpassword}
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  api:
    build: .
    container_name: fastapi_app
    ports:
      - "8000:8000"
    depends_on:
      - db
    env_file:
      - .env

  pgadmin:
    image: dpage/pgadmin4:latest
    container_name: pgadmin
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
    ports:
      - "8080:80"
    depends_on:
      - db

volumes:
  pgdata:
  • Access pgAdmin at: http://localhost:8080
    Login with PGADMIN_DEFAULT_EMAIL / PGADMIN_DEFAULT_PASSWORD.
    Then add a server using:
    • Host: db
    • Port: 5432
    • Maintenance DB: ${POSTGRES_DB}
    • Username/Password from .env

10) Dockerfile tweak (run alembic migrations then start)

If you want migrations applied at container start, add Alembic and run migrations before uvicorn:

FROM python:3.10-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY ./app /app
COPY alembic.ini /app
COPY alembic /app/alembic

EXPOSE 8000

CMD ["sh", "-c", "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"]

(Ensure alembic.ini has correct sqlalchemy.url set to the same env DB url or modify env injection in alembic/env.py.)


11) Swagger UI interaction (login & test)

  • Start docker-compose up --build
  • Open http://localhost:8000/docs
    1. Use /signup to create a user (POST JSON body: {"email":"me@example.com","full_name":"Me","password":"pass"})
    2. Use /token → click Try it out and enter:
      • username: the email you used
      • password: the password
      • response returns access_token
    3. Click Authorize button in Swagger UI and paste Bearer <access_token> or (Swagger expects just the token) — enter Bearer <token> depending on UI; after authorize, call /users/me.

12) Security notes & best practices

  • Rotate SECRET_KEY and keep it secret (use secret manager in prod).
  • Use short token lifetimes and refresh tokens pattern for long sessions. (I can show refresh token flow if you want.)
  • Store only hashed passwords (done via bcrypt via passlib).
  • Consider HTTPS for transport security. In prod, terminate TLS via a reverse proxy (NGINX) or use cloud load balancer.
  • Use role-based access if you need admin vs normal users.

13) Brief: How to implement the same in Flask

  • Use Flask-JWT-Extended for JWT handling (or use python-jose manually).
  • Use werkzeug.security.generate_password_hash and check_password_hash or passlib.
  • Typical steps:
    • Create User model with hashed_password using SQLAlchemy.
    • /signup — hash and save password.
    • /login — verify and issue JWT with create_access_token.
    • Protect routes using @jwt_required() decorator.
  • If you want, I can provide the Flask code too (full example similar to above).

Awesome — I’ll deliver both:

  1. A ready-to-run Flask implementation (signup/login with JWT, password hashing, PostgreSQL, Docker + pgAdmin + migrations), and
  2. Extended FastAPI features added to your existing FastAPI stack: refresh tokens, role-based access, logout (token revocation), and small notes on secure production behavior.

I’ll give you project layouts, full code files, requirements.txt, Dockerfile, docker-compose.yml snippets, and short explanations for each part so you can run them locally or adapt to your project.


Part A — Complete Flask implementation (Auth + PostgreSQL + Docker + Migrations)

Project structure

flask_auth_postgres/
├── app/
│   ├── __init__.py
│   ├── models.py
│   ├── schemas.py
│   ├── database.py
│   ├── auth.py
│   ├── routes.py
│   └── migrate.py
├── requirements.txt
├── Dockerfile
├── docker-compose.yml
└── .env

requirements.txt

Flask
Flask-SQLAlchemy
Flask-Migrate
Flask-JWT-Extended
python-dotenv
psycopg2-binary
passlib[bcrypt]

.env (example — DON’T commit)

POSTGRES_DB=testdb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=yourpassword
POSTGRES_HOST=db
POSTGRES_PORT=5432

SECRET_KEY=change_this_secret
JWT_SECRET_KEY=change_this_jwt_secret
ACCESS_EXPIRES_MINUTES=60
REFRESH_EXPIRES_DAYS=7

PGADMIN_DEFAULT_EMAIL=admin@local
PGADMIN_DEFAULT_PASSWORD=pgadminpassword

app/database.py

# app/database.py
import os
from flask_sqlalchemy import SQLAlchemy
from dotenv import load_dotenv

load_dotenv()
db = SQLAlchemy()

app/models.py

# app/models.py
from .database import db
from datetime import datetime

class User(db.Model):
    __tablename__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(255), unique=True, index=True, nullable=False)
    full_name = db.Column(db.String(255))
    hashed_password = db.Column(db.String(255), nullable=False)
    role = db.Column(db.String(50), default="user")  # 'user' or 'admin'
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

class TokenBlocklist(db.Model):
    __tablename__ = "token_blocklist"
    id = db.Column(db.Integer, primary_key=True)
    jti = db.Column(db.String(36), nullable=False, index=True)  # JWT ID
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

app/schemas.py

(simple data containers used in routes — optional but useful)

# app/schemas.py
from dataclasses import dataclass

@dataclass
class UserCreate:
    email: str
    full_name: str | None
    password: str

@dataclass
class UserOut:
    id: int
    email: str
    full_name: str | None
    role: str

app/auth.py

# app/auth.py
import os
from passlib.context import CryptContext
from datetime import timedelta
from flask_jwt_extended import (
    create_access_token, create_refresh_token, decode_token, get_jwt, get_jwt_identity
)
from dotenv import load_dotenv

load_dotenv()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

ACCESS_EXPIRES_MINUTES = int(os.getenv("ACCESS_EXPIRES_MINUTES", "60"))
REFRESH_EXPIRES_DAYS = int(os.getenv("REFRESH_EXPIRES_DAYS", "7"))

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

def make_access_token(identity: str, additional_claims: dict = None):
    expires = timedelta(minutes=ACCESS_EXPIRES_MINUTES)
    return create_access_token(identity=identity, additional_claims=additional_claims or {}, expires_delta=expires)

def make_refresh_token(identity: str):
    expires = timedelta(days=REFRESH_EXPIRES_DAYS)
    return create_refresh_token(identity=identity, expires_delta=expires)

app/routes.py

# app/routes.py
from flask import Blueprint, request, jsonify
from .database import db
from .models import User, TokenBlocklist
from .auth import hash_password, verify_password, make_access_token, make_refresh_token
from flask_jwt_extended import (
    jwt_required, get_jwt_identity, get_jwt, jwt_refresh_token_required
)
from sqlalchemy.exc import IntegrityError

bp = Blueprint("routes", __name__)

@bp.route("/signup", methods=["POST"])
def signup():
    data = request.get_json()
    email = data.get("email")
    password = data.get("password")
    full_name = data.get("full_name")
    if not email or not password:
        return jsonify({"msg": "email and password required"}), 400
    user = User(email=email, full_name=full_name, hashed_password=hash_password(password))
    db.session.add(user)
    try:
        db.session.commit()
    except IntegrityError:
        db.session.rollback()
        return jsonify({"msg": "email already exists"}), 400
    return jsonify({"id": user.id, "email": user.email}), 201

@bp.route("/login", methods=["POST"])
def login():
    data = request.get_json()
    email = data.get("email")
    password = data.get("password")
    user = User.query.filter_by(email=email).first()
    if not user or not verify_password(password, user.hashed_password):
        return jsonify({"msg": "Bad credentials"}), 401
    access = make_access_token(identity=str(user.id), additional_claims={"role": user.role})
    refresh = make_refresh_token(identity=str(user.id))
    return jsonify({"access_token": access, "refresh_token": refresh})

@bp.route("/refresh", methods=["POST"])
@jwt_refresh_token_required
def refresh():
    # Note: Flask-JWT-Extended v4 uses different decorators — adjust to your version
    identity = get_jwt_identity()
    access = make_access_token(identity=identity)
    return jsonify({"access_token": access})

@bp.route("/logout", methods=["POST"])
@jwt_required()
def logout():
    jti = get_jwt()["jti"]
    db.session.add(TokenBlocklist(jti=jti))
    db.session.commit()
    return jsonify({"msg": "Token revoked"}), 200

@bp.route("/users/me", methods=["GET"])
@jwt_required()
def me():
    user_id = get_jwt_identity()
    user = User.query.get(user_id)
    if not user:
        return jsonify({"msg": "User not found"}), 404
    return jsonify({"id": user.id, "email": user.email, "full_name": user.full_name, "role": user.role})

# Example admin-protected route
@bp.route("/admin-only", methods=["GET"])
@jwt_required()
def admin_only():
    claims = get_jwt()
    if claims.get("role") != "admin":
        return jsonify({"msg": "Admins only"}), 403
    return jsonify({"msg": "Welcome admin"})

Note: jwt_refresh_token_required may vary by flask-jwt-extended version. In v4+, use @jwt_required(refresh=True) and check docs. Adjust decorators accordingly.


app/init.py

# app/__init__.py
import os
from flask import Flask, jsonify
from flask_migrate import Migrate
from flask_jwt_extended import JWTManager, get_jwt
from dotenv import load_dotenv
from .database import db
from .routes import bp
from .models import TokenBlocklist

load_dotenv()

def create_app():
    app = Flask(__name__)
    app.config["SQLALCHEMY_DATABASE_URI"] = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/{os.getenv('POSTGRES_DB')}"
    app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
    app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")
    app.config["JWT_SECRET_KEY"] = os.getenv("JWT_SECRET_KEY")

    db.init_app(app)
    migrate = Migrate(app, db)
    jwt = JWTManager(app)

    # Token revocation check
    @jwt.token_in_blocklist_loader
    def check_if_token_revoked(jwt_header, jwt_payload):
        jti = jwt_payload["jti"]
        entry = TokenBlocklist.query.filter_by(jti=jti).first()
        return entry is not None

    app.register_blueprint(bp)
    return app

app/migrate.py (run migrations programmatically — optional)

# app/migrate.py
# You can also use flask db CLI commands. This file is optional.
from app import create_app
from flask_migrate import upgrade
app = create_app()
with app.app_context():
    upgrade()

Dockerfile

FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
ENV FLASK_APP=app
EXPOSE 5000
CMD ["flask", "run", "--host=0.0.0.0"]

docker-compose.yml

version: '3.8'
services:
  db:
    image: postgres:15
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  api:
    build: .
    environment:
      - FLASK_ENV=development
    env_file:
      - .env
    ports:
      - "5000:5000"
    depends_on:
      - db

  pgadmin:
    image: dpage/pgadmin4:latest
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL}
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD}
    ports:
      - "8080:80"
    depends_on:
      - db

volumes:
  pgdata:

Migrations (Flask-Migrate)

After containers start (or locally), run migrations:

  1. Initialize (only once)
flask db init
flask db migrate -m "initial"
flask db upgrade

(If using inside Docker, you can run these commands in the container: docker exec -it <api_container> flask db migrate ...)


How to use (quick)

  1. docker-compose up --build
  2. POST /signup with JSON: {"email":"a@b.com","password":"pass","full_name":"A B"}
  3. POST /login with same JSON → returns access_token and refresh_token
  4. Use Authorization: Bearer <access_token> to call protected endpoints /users/me or /admin-only (admin-only needs role claim admin)
  5. Use /refresh to exchange refresh for new access token. Use /logout to revoke the current token.

Part B — FastAPI: Extended features (refresh tokens, role-based access, revocation, Alembic + Docker notes)

You already have the FastAPI + SQLModel + Docker stack. I’ll show the minimal additions/changes to support:

  • Refresh tokens (longer-lived, stored in DB)
  • Role-based access (admin/user) via token claims or DB lookup
  • Logout / revoke refresh token (DB token status)
  • Endpoints: /token (login), /refresh, /logout, /users/me, /admin-only

Additions to models.py (SQLModel)

# app/models.py (extend existing)
from sqlmodel import SQLModel, Field
from datetime import datetime

class User(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    email: str
    full_name: str | None = None
    hashed_password: str
    role: str = "user"
    created_at: datetime | None = Field(default_factory=datetime.utcnow)

class RefreshToken(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    jti: str  # JWT id for refresh token
    user_id: int
    revoked: bool = False
    created_at: datetime | None = Field(default_factory=datetime.utcnow)

Run SQLModel.metadata.create_all(engine) (or via Alembic autogenerate/upgrade).


security.py (extended – store refresh token jti in DB)

# app/security.py (add DB interactions)
from jose import jwt, JWTError
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
import os
from dotenv import load_dotenv
from .models import RefreshToken
from .database import SessionLocal

load_dotenv()
SECRET_KEY = os.getenv("SECRET_KEY")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "60"))
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7"))

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password): return pwd_context.hash(password)
def verify_password(plain, hashed): return pwd_context.verify(plain, hashed)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS))
    to_encode.update({"exp": expire, "type": "refresh"})
    token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    # store jti in db for revocation (we can use a uuid stored in 'jti' claim)
    payload = jwt.get_unverified_claims(token)
    jti = payload.get("jti") or payload.get("jti", str(datetime.utcnow().timestamp()))
    # Save refresh token record to DB
    with SessionLocal() as session:
        session.add(RefreshToken(jti=jti, user_id=int(data["sub"])))
        session.commit()
    return token

def decode_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

def revoke_refresh_token(jti: str):
    with SessionLocal() as session:
        toks = session.query(RefreshToken).filter(RefreshToken.jti == jti).all()
        for t in toks:
            t.revoked = True
        session.commit()

Note: depending on JWT library, you may want to put a jti claim when creating tokens (set to_encode['jti'] = str(uuid4())). The above example assumes you add jti explicitly.


main.py (FastAPI) — added endpoints

# app/main.py (only showing new/changed endpoints/concepts)
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from .database import SessionLocal, engine
from .models import User, RefreshToken
from .schemas import Token, TokenData, UserCreate, UserOut
from .security import (
    create_access_token, create_refresh_token, decode_token,
    verify_password, get_password_hash, revoke_refresh_token
)
from sqlalchemy.orm import Session
from datetime import timedelta
from jose import jwt

app = FastAPI()
# ... existing get_db, create tables etc.

@app.post("/token", response_model=Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = db.query(User).filter(User.email == form_data.username).first()
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Incorrect credentials")
    data = {"sub": str(user.id), "role": user.role}
    access_token = create_access_token(data)
    refresh_token = create_refresh_token(data)  # stored in DB inside function
    return {"access_token": access_token, "token_type": "bearer", "refresh_token": refresh_token}

@app.post("/refresh", response_model=Token)
def refresh_token(refresh_token: str):
    payload = decode_token(refresh_token)
    if not payload or payload.get("type") != "refresh":
        raise HTTPException(status_code=401, detail="Invalid refresh token")
    jti = payload.get("jti")
    # check DB if revoked
    with SessionLocal() as session:
        dbt = session.query(RefreshToken).filter(RefreshToken.jti == jti).first()
        if not dbt or dbt.revoked:
            raise HTTPException(status_code=401, detail="Refresh token revoked")
        user_id = payload.get("sub")
        user = session.get(User, int(user_id))
        if not user:
            raise HTTPException(status_code=401, detail="User not found")
        data = {"sub": str(user.id), "role": user.role}
        access_token = create_access_token(data)
        return {"access_token": access_token, "token_type": "bearer"}

@app.post("/logout")
def logout(refresh_token: str):
    payload = decode_token(refresh_token)
    if not payload:
        raise HTTPException(status_code=400, detail="Invalid token")
    jti = payload.get("jti")
    revoke_refresh_token(jti)
    return {"msg": "Logged out"}

Role-based dependency

# app/deps.py
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from .security import decode_token
from .database import SessionLocal
from .models import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_current_user(token: str = Depends(oauth2_scheme)):
    payload = decode_token(token)
    if not payload or payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="Invalid token")
    user_id = int(payload.get("sub"))
    with SessionLocal() as session:
        user = session.get(User, user_id)
        if not user:
            raise HTTPException(status_code=401, detail="User not found")
        return user

def require_admin(current_user: User = Depends(get_current_user)):
    if current_user.role != "admin":
        raise HTTPException(status_code=403, detail="Admin required")
    return current_user

Usage:

@app.get("/admin-only")
def admin_route(user = Depends(require_admin)):
    return {"msg": f"Hello {user.email}, you are admin"}

Docker & Alembic notes (FastAPI)

  • Keep alembic configured to target your app.models metadata so migrations can detect RefreshToken table and role column changes.
  • Add the Alembic auto-upgrade command to the container CMD before starting uvicorn (as shown previously).
  • Make sure the alembic.ini uses the Docker service name db as host (or set sqlalchemy.url in env.py using DATABASE_URL env var).

Security / Production Tips (both Flask & FastAPI)

  • Use HTTPS (TLS) in production — with a reverse proxy (Nginx) or cloud load balancer. Never transmit credentials over HTTP.
  • Use short-lived access tokens and store refresh tokens securely (HTTP-only secure cookies or a protected DB record).
  • Protect Against CSRF if sending tokens in cookies.
  • Rate-limit authentication endpoints.
  • Use a secret manager (AWS Secrets Manager, Azure KeyVault) for secret keys — do not keep secrets in .env in production.
  • Consider storing a hashed refresh token fingerprint in DB instead of a full token, to reduce attack surface.
  • Use rotating refresh tokens (issue new refresh token on refresh and revoke the old one).

Quick run checklist (both stacks)

  1. Create .env with correct DB credentials and secrets.
  2. Start docker stack: docker-compose up --build
  3. Apply migrations if needed (alembic upgrade head or flask db upgrade).
  4. FastAPI: visit http://localhost:8000/docs to test login, refresh, protected endpoints.
    Flask: use Postman/curl to call /signup, /login, then pass Authorization: Bearer <token>.

Posted in

Leave a Reply

Your email address will not be published. Required fields are marked *