Aller au contenu

Multi-tenancy & Row-Level Security

Isolation des données entre tenants dans JARVIS.

Architecture Multi-tenant

graph TB
    subgraph "Tenant A"
        UA[Users A]
        DA[Data A]
    end

    subgraph "Tenant B"
        UB[Users B]
        DB[Data B]
    end

    subgraph "Shared Infrastructure"
        API[FastAPI]
        PG[(PostgreSQL)]
        R[(Redis)]
    end

    UA --> API
    UB --> API
    API --> PG
    PG --> DA
    PG --> DB

Row-Level Security (RLS)

Activation sur PostgreSQL

-- Activer RLS sur les tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE conversations ENABLE ROW LEVEL SECURITY;
ALTER TABLE messages ENABLE ROW LEVEL SECURITY;
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Forcer RLS même pour le owner
ALTER TABLE users FORCE ROW LEVEL SECURITY;

Policies RLS

-- Policy pour les utilisateurs
CREATE POLICY tenant_isolation_users ON users
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Policy pour les conversations
CREATE POLICY tenant_isolation_conversations ON conversations
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Policy pour les messages
CREATE POLICY tenant_isolation_messages ON messages
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

-- Policy pour les documents
CREATE POLICY tenant_isolation_documents ON documents
    USING (tenant_id = current_setting('app.current_tenant_id')::uuid);

Injection du Contexte Tenant

Middleware FastAPI

from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession

class TenantMiddleware:
    async def __call__(self, request: Request, call_next):
        # Extraire tenant_id du JWT
        user = request.state.user
        tenant_id = user.tenant_id

        # Injecter dans le contexte de la requête
        request.state.tenant_id = tenant_id

        response = await call_next(request)
        return response

Session Database avec Contexte

from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession

@asynccontextmanager
async def get_tenant_session(tenant_id: str) -> AsyncSession:
    """Obtenir une session DB avec le contexte tenant."""
    async with async_session() as session:
        # Définir le tenant_id pour RLS
        await session.execute(
            text(f"SET app.current_tenant_id = '{tenant_id}'")
        )
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Utilisation dans les Routes

@router.get("/conversations")
async def list_conversations(
    request: Request,
    db: AsyncSession = Depends(get_db)
):
    tenant_id = request.state.tenant_id

    # RLS filtre automatiquement par tenant_id
    result = await db.execute(
        select(Conversation).order_by(Conversation.created_at.desc())
    )
    return result.scalars().all()

Schéma des Tables

Table Tenants

CREATE TABLE tenants (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(100) UNIQUE NOT NULL,
    plan VARCHAR(50) DEFAULT 'free',
    settings JSONB DEFAULT '{}',
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

Tables avec tenant_id

-- Exemple: table users
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id UUID NOT NULL REFERENCES tenants(id),
    email VARCHAR(255) NOT NULL,
    -- ... autres colonnes
    UNIQUE(tenant_id, email)  -- Email unique par tenant
);

-- Index pour performance
CREATE INDEX idx_users_tenant_id ON users(tenant_id);

Isolation des Ressources

MinIO (Stockage)

def get_bucket_name(tenant_id: str) -> str:
    """Un bucket par tenant pour isolation complète."""
    return f"tenant-{tenant_id}"

async def upload_document(tenant_id: str, file: UploadFile):
    bucket = get_bucket_name(tenant_id)
    # S'assurer que le bucket existe
    await minio.ensure_bucket(bucket)
    # Upload dans le bucket du tenant
    await minio.upload(bucket, file.filename, file.file)

Redis (Cache)

def get_cache_key(tenant_id: str, key: str) -> str:
    """Préfixer les clés Redis par tenant."""
    return f"tenant:{tenant_id}:{key}"

async def get_cached(tenant_id: str, key: str):
    full_key = get_cache_key(tenant_id, key)
    return await redis.get(full_key)

Tests d'Isolation

@pytest.mark.asyncio
async def test_tenant_isolation():
    """Vérifier que les données sont isolées entre tenants."""

    # Créer 2 tenants
    tenant_a = await create_tenant("Tenant A")
    tenant_b = await create_tenant("Tenant B")

    # Créer un document pour tenant A
    async with get_tenant_session(tenant_a.id) as session:
        doc = Document(title="Secret A", tenant_id=tenant_a.id)
        session.add(doc)
        await session.commit()

    # Vérifier que tenant B ne peut pas voir le document
    async with get_tenant_session(tenant_b.id) as session:
        result = await session.execute(select(Document))
        docs = result.scalars().all()
        assert len(docs) == 0  # Aucun document visible

    # Vérifier que tenant A voit son document
    async with get_tenant_session(tenant_a.id) as session:
        result = await session.execute(select(Document))
        docs = result.scalars().all()
        assert len(docs) == 1
        assert docs[0].title == "Secret A"

Bonnes Pratiques

  1. Toujours vérifier le tenant_id dans les opérations critiques
  2. Ne jamais exposer les tenant_id dans les URLs publiques
  3. Utiliser des slugs pour les URLs tenant-specific
  4. Auditer les accès cross-tenant (logs d'audit)
  5. Tester l'isolation dans la CI/CD