Multi-tenancy & Row-Level Security¶
Isolation des données entre tenants dans JARVIS.
Architecture Multi-tenant¶
graph TB
subgraph "Tenant A"
UA[Users A]
DA[Data A]
end
subgraph "Tenant B"
UB[Users B]
DB[Data B]
end
subgraph "Shared Infrastructure"
API[FastAPI]
PG[(PostgreSQL)]
R[(Redis)]
end
UA --> API
UB --> API
API --> PG
PG --> DA
PG --> DB Row-Level Security (RLS)¶
Activation sur PostgreSQL¶
-- Activer RLS sur les tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE conversations ENABLE ROW LEVEL SECURITY;
ALTER TABLE messages ENABLE ROW LEVEL SECURITY;
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- Forcer RLS même pour le owner
ALTER TABLE users FORCE ROW LEVEL SECURITY;
Policies RLS¶
-- Policy pour les utilisateurs
CREATE POLICY tenant_isolation_users ON users
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- Policy pour les conversations
CREATE POLICY tenant_isolation_conversations ON conversations
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- Policy pour les messages
CREATE POLICY tenant_isolation_messages ON messages
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
-- Policy pour les documents
CREATE POLICY tenant_isolation_documents ON documents
USING (tenant_id = current_setting('app.current_tenant_id')::uuid);
Injection du Contexte Tenant¶
Middleware FastAPI¶
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession
class TenantMiddleware:
async def __call__(self, request: Request, call_next):
# Extraire tenant_id du JWT
user = request.state.user
tenant_id = user.tenant_id
# Injecter dans le contexte de la requête
request.state.tenant_id = tenant_id
response = await call_next(request)
return response
Session Database avec Contexte¶
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession
@asynccontextmanager
async def get_tenant_session(tenant_id: str) -> AsyncSession:
"""Obtenir une session DB avec le contexte tenant."""
async with async_session() as session:
# Définir le tenant_id pour RLS
await session.execute(
text(f"SET app.current_tenant_id = '{tenant_id}'")
)
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Utilisation dans les Routes¶
@router.get("/conversations")
async def list_conversations(
request: Request,
db: AsyncSession = Depends(get_db)
):
tenant_id = request.state.tenant_id
# RLS filtre automatiquement par tenant_id
result = await db.execute(
select(Conversation).order_by(Conversation.created_at.desc())
)
return result.scalars().all()
Schéma des Tables¶
Table Tenants¶
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL,
plan VARCHAR(50) DEFAULT 'free',
settings JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Tables avec tenant_id¶
-- Exemple: table users
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
email VARCHAR(255) NOT NULL,
-- ... autres colonnes
UNIQUE(tenant_id, email) -- Email unique par tenant
);
-- Index pour performance
CREATE INDEX idx_users_tenant_id ON users(tenant_id);
Isolation des Ressources¶
MinIO (Stockage)¶
def get_bucket_name(tenant_id: str) -> str:
"""Un bucket par tenant pour isolation complète."""
return f"tenant-{tenant_id}"
async def upload_document(tenant_id: str, file: UploadFile):
bucket = get_bucket_name(tenant_id)
# S'assurer que le bucket existe
await minio.ensure_bucket(bucket)
# Upload dans le bucket du tenant
await minio.upload(bucket, file.filename, file.file)
Redis (Cache)¶
def get_cache_key(tenant_id: str, key: str) -> str:
"""Préfixer les clés Redis par tenant."""
return f"tenant:{tenant_id}:{key}"
async def get_cached(tenant_id: str, key: str):
full_key = get_cache_key(tenant_id, key)
return await redis.get(full_key)
Tests d'Isolation¶
@pytest.mark.asyncio
async def test_tenant_isolation():
"""Vérifier que les données sont isolées entre tenants."""
# Créer 2 tenants
tenant_a = await create_tenant("Tenant A")
tenant_b = await create_tenant("Tenant B")
# Créer un document pour tenant A
async with get_tenant_session(tenant_a.id) as session:
doc = Document(title="Secret A", tenant_id=tenant_a.id)
session.add(doc)
await session.commit()
# Vérifier que tenant B ne peut pas voir le document
async with get_tenant_session(tenant_b.id) as session:
result = await session.execute(select(Document))
docs = result.scalars().all()
assert len(docs) == 0 # Aucun document visible
# Vérifier que tenant A voit son document
async with get_tenant_session(tenant_a.id) as session:
result = await session.execute(select(Document))
docs = result.scalars().all()
assert len(docs) == 1
assert docs[0].title == "Secret A"
Bonnes Pratiques¶
- Toujours vérifier le tenant_id dans les opérations critiques
- Ne jamais exposer les tenant_id dans les URLs publiques
- Utiliser des slugs pour les URLs tenant-specific
- Auditer les accès cross-tenant (logs d'audit)
- Tester l'isolation dans la CI/CD