Authentification¶
Système d'authentification JWT de JARVIS.
Architecture¶
graph LR
subgraph "Tokens"
AT[Access Token<br/>15 min]
RT[Refresh Token<br/>7 jours]
end
subgraph "Storage"
H[Header Authorization]
C[HttpOnly Cookie]
end
AT --> H
RT --> C JWT Configuration¶
Génération des Tokens¶
from datetime import datetime, timedelta
from jose import jwt
SECRET_KEY = settings.JWT_SECRET_KEY
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
def create_access_token(data: dict) -> str:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = data.copy()
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode = data.copy()
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
Payload JWT¶
{
"sub": "user-uuid",
"tenant_id": "tenant-uuid",
"role": "user",
"exp": 1705400000,
"iat": 1705399100,
"type": "access"
}
Endpoints¶
POST /auth/register¶
curl -X POST http://localhost:8000/api/v1/auth/register \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"password": "SecureP@ss123",
"first_name": "John",
"last_name": "Doe"
}'
POST /auth/login¶
curl -X POST http://localhost:8000/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{
"email": "user@example.com",
"password": "SecureP@ss123"
}'
# Response
{
"access_token": "eyJhbGc...",
"refresh_token": "eyJhbGc...",
"token_type": "bearer"
}
POST /auth/refresh¶
curl -X POST http://localhost:8000/api/v1/auth/refresh \
-H "Authorization: Bearer <refresh_token>"
# Response
{
"access_token": "eyJhbGc...",
"token_type": "bearer"
}
Password Policy¶
Règles de Validation¶
class PasswordPolicy:
MIN_LENGTH = 8
REQUIRE_UPPERCASE = True
REQUIRE_LOWERCASE = True
REQUIRE_DIGIT = True
REQUIRE_SPECIAL = True
SPECIAL_CHARS = "!@#$%^&*()_+-=[]{}|;:,.<>?"
def validate_password(password: str) -> bool:
if len(password) < PasswordPolicy.MIN_LENGTH:
raise ValueError("Mot de passe trop court (min 8 caractères)")
if not any(c.isupper() for c in password):
raise ValueError("Doit contenir une majuscule")
if not any(c.islower() for c in password):
raise ValueError("Doit contenir une minuscule")
if not any(c.isdigit() for c in password):
raise ValueError("Doit contenir un chiffre")
if not any(c in PasswordPolicy.SPECIAL_CHARS for c in password):
raise ValueError("Doit contenir un caractère spécial")
return True
Hashing (bcrypt)¶
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
RBAC (Roles)¶
Rôles Disponibles¶
| Role | Permissions |
|---|---|
user | CRUD propres ressources |
admin | Gestion utilisateurs du tenant |
super_admin | Accès complet, multi-tenant |
Protection des Routes¶
from fastapi import Depends, HTTPException
from app.middleware.auth import get_current_user
def require_role(required_roles: List[str]):
async def role_checker(user: User = Depends(get_current_user)):
if user.role not in required_roles:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return user
return role_checker
# Utilisation
@router.get("/admin/users")
async def list_users(user: User = Depends(require_role(["admin", "super_admin"]))):
...
Session Management¶
Redis Storage¶
async def store_session(user_id: str, token_jti: str, ttl: int = 604800):
"""Stocker la session dans Redis (7 jours par défaut)"""
key = f"session:{user_id}:{token_jti}"
await redis.setex(key, ttl, "active")
async def invalidate_session(user_id: str, token_jti: str):
"""Invalider une session (logout)"""
key = f"session:{user_id}:{token_jti}"
await redis.delete(key)
async def invalidate_all_sessions(user_id: str):
"""Invalider toutes les sessions d'un utilisateur"""
pattern = f"session:{user_id}:*"
keys = await redis.keys(pattern)
if keys:
await redis.delete(*keys)
Sécurité Supplémentaire¶
Rate Limiting Login¶
from slowapi import Limiter
limiter = Limiter(key_func=get_remote_address)
@router.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, ...):
...
Brute Force Protection¶
MAX_FAILED_ATTEMPTS = 5
LOCKOUT_DURATION = 900 # 15 minutes
async def check_login_attempts(email: str) -> bool:
key = f"login_attempts:{email}"
attempts = await redis.get(key)
if attempts and int(attempts) >= MAX_FAILED_ATTEMPTS:
raise HTTPException(
status_code=429,
detail="Compte temporairement verrouillé"
)
return True
async def record_failed_attempt(email: str):
key = f"login_attempts:{email}"
await redis.incr(key)
await redis.expire(key, LOCKOUT_DURATION)