Aller au contenu

Authentification

Système d'authentification JWT de JARVIS.

Architecture

graph LR
    subgraph "Tokens"
        AT[Access Token<br/>15 min]
        RT[Refresh Token<br/>7 jours]
    end

    subgraph "Storage"
        H[Header Authorization]
        C[HttpOnly Cookie]
    end

    AT --> H
    RT --> C

JWT Configuration

Génération des Tokens

from datetime import datetime, timedelta
from jose import jwt

SECRET_KEY = settings.JWT_SECRET_KEY
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7

def create_access_token(data: dict) -> str:
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode = data.copy()
    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def create_refresh_token(data: dict) -> str:
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode = data.copy()
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

Payload JWT

{
  "sub": "user-uuid",
  "tenant_id": "tenant-uuid",
  "role": "user",
  "exp": 1705400000,
  "iat": 1705399100,
  "type": "access"
}

Endpoints

POST /auth/register

curl -X POST http://localhost:8000/api/v1/auth/register \
  -H "Content-Type: application/json" \
  -d '{
    "email": "user@example.com",
    "password": "SecureP@ss123",
    "first_name": "John",
    "last_name": "Doe"
  }'

POST /auth/login

curl -X POST http://localhost:8000/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{
    "email": "user@example.com",
    "password": "SecureP@ss123"
  }'

# Response
{
  "access_token": "eyJhbGc...",
  "refresh_token": "eyJhbGc...",
  "token_type": "bearer"
}

POST /auth/refresh

curl -X POST http://localhost:8000/api/v1/auth/refresh \
  -H "Authorization: Bearer <refresh_token>"

# Response
{
  "access_token": "eyJhbGc...",
  "token_type": "bearer"
}

Password Policy

Règles de Validation

class PasswordPolicy:
    MIN_LENGTH = 8
    REQUIRE_UPPERCASE = True
    REQUIRE_LOWERCASE = True
    REQUIRE_DIGIT = True
    REQUIRE_SPECIAL = True
    SPECIAL_CHARS = "!@#$%^&*()_+-=[]{}|;:,.<>?"

def validate_password(password: str) -> bool:
    if len(password) < PasswordPolicy.MIN_LENGTH:
        raise ValueError("Mot de passe trop court (min 8 caractères)")
    if not any(c.isupper() for c in password):
        raise ValueError("Doit contenir une majuscule")
    if not any(c.islower() for c in password):
        raise ValueError("Doit contenir une minuscule")
    if not any(c.isdigit() for c in password):
        raise ValueError("Doit contenir un chiffre")
    if not any(c in PasswordPolicy.SPECIAL_CHARS for c in password):
        raise ValueError("Doit contenir un caractère spécial")
    return True

Hashing (bcrypt)

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

RBAC (Roles)

Rôles Disponibles

Role Permissions
user CRUD propres ressources
admin Gestion utilisateurs du tenant
super_admin Accès complet, multi-tenant

Protection des Routes

from fastapi import Depends, HTTPException
from app.middleware.auth import get_current_user

def require_role(required_roles: List[str]):
    async def role_checker(user: User = Depends(get_current_user)):
        if user.role not in required_roles:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return user
    return role_checker

# Utilisation
@router.get("/admin/users")
async def list_users(user: User = Depends(require_role(["admin", "super_admin"]))):
    ...

Session Management

Redis Storage

async def store_session(user_id: str, token_jti: str, ttl: int = 604800):
    """Stocker la session dans Redis (7 jours par défaut)"""
    key = f"session:{user_id}:{token_jti}"
    await redis.setex(key, ttl, "active")

async def invalidate_session(user_id: str, token_jti: str):
    """Invalider une session (logout)"""
    key = f"session:{user_id}:{token_jti}"
    await redis.delete(key)

async def invalidate_all_sessions(user_id: str):
    """Invalider toutes les sessions d'un utilisateur"""
    pattern = f"session:{user_id}:*"
    keys = await redis.keys(pattern)
    if keys:
        await redis.delete(*keys)

Sécurité Supplémentaire

Rate Limiting Login

from slowapi import Limiter

limiter = Limiter(key_func=get_remote_address)

@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
    ...

Brute Force Protection

MAX_FAILED_ATTEMPTS = 5
LOCKOUT_DURATION = 900  # 15 minutes

async def check_login_attempts(email: str) -> bool:
    key = f"login_attempts:{email}"
    attempts = await redis.get(key)
    if attempts and int(attempts) >= MAX_FAILED_ATTEMPTS:
        raise HTTPException(
            status_code=429,
            detail="Compte temporairement verrouillé"
        )
    return True

async def record_failed_attempt(email: str):
    key = f"login_attempts:{email}"
    await redis.incr(key)
    await redis.expire(key, LOCKOUT_DURATION)