📑 Table des matières

Automatiser un pipeline complet avec un agent

Agents IA 🔴 Avancé ⏱️ 16 min de lecture 📅 2026-02-24

Automatiser un pipeline complet avec un agent

Vous avez un agent IA qui répond à des questions. C'est bien. Mais le vrai pouvoir des agents, c'est de les mettre au travail sur des pipelines de production complets — des chaînes d'opérations qui transforment une idée brute en livrable final, automatiquement, sans intervention humaine.

Dans cet article, on construit ensemble un cas concret : un pipeline de production vidéo automatisé par un agent IA. De l'idée initiale jusqu'à l'upload sur YouTube, en passant par la génération du script, des images et du montage. On couvre l'architecture, la gestion d'erreurs, le retry, et l'orchestration avec cron + agent + base de données.

🎯 Le cas concret : pipeline vidéo automatisé

Voici le pipeline qu'on va construire :

┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│  IDÉE   │───▶│ SCRIPT  │───▶│ IMAGES  │───▶│ VIDÉO   │───▶│ UPLOAD  │
│         │    │         │    │         │    │         │    │         │
│ Sujet + │    │ Texte + │    │ Visuels │    │ Montage │    │ YouTube │
│ angle   │    │ narrat° │    │ par     │    │ auto    │    │ + meta  │
│         │    │         │    │ scène   │    │         │    │         │
└─────────┘    └─────────┘    └─────────┘    └─────────┘    └─────────┘

Chaque étape peut échouer. L'agent doit :

  • Orchestrer les outils dans le bon ordre
  • Détecter les erreurs et réessayer intelligemment
  • Persister l'état pour reprendre après un crash
  • Notifier en cas de problème bloquant

🏗️ Architecture globale

Les trois piliers

L'architecture repose sur trois composants qui collaborent :

┌───────────────────────────────────────────────┐
│                    CRON                         │
│  Déclenche le pipeline à intervalles réguliers  │
│  Ex: tous les jours à 8h                        │
└──────────────────────┬────────────────────────┘
                       │
                       ▼
┌───────────────────────────────────────────────┐
│                   AGENT IA                      │
│  Orchestre les outils, gère la logique          │
│  Prend les décisions, gère les erreurs          │
└──────────────────────┬────────────────────────┘
                       │
                       ▼
┌───────────────────────────────────────────────┐
│              BASE DE DONNÉES                    │
│  Stocke l'état de chaque tâche                  │
│  Historique, statuts, résultats intermédiaires   │
└───────────────────────────────────────────────┘

Pourquoi cette architecture ?

Composant Rôle Pourquoi c'est nécessaire
Cron Déclencheur L'agent ne tourne pas 24/7, le cron le réveille
Agent Cerveau Prend les décisions, s'adapte aux erreurs
BDD Mémoire Survit aux crashes, permet le suivi et le debug

Sans la BDD, un crash = tout recommencer. Sans le cron, il faut lancer manuellement. Sans l'agent, c'est un script rigide qui casse au moindre imprévu.

💾 Modèle de données

Commençons par la base de données. On utilise SQLite pour la simplicité, mais le concept s'applique à PostgreSQL ou tout autre SGBD.

-- Table principale des pipelines
CREATE TABLE pipelines (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    -- pending, running, completed, failed, paused
    current_step TEXT,
    config JSON,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    completed_at DATETIME,
    error_count INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3
);

-- Table des étapes individuelles
CREATE TABLE pipeline_steps (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pipeline_id INTEGER REFERENCES pipelines(id),
    step_name TEXT NOT NULL,
    step_order INTEGER NOT NULL,
    status TEXT DEFAULT 'pending',
    -- pending, running, completed, failed, skipped
    input_data JSON,
    output_data JSON,
    error_message TEXT,
    attempts INTEGER DEFAULT 0,
    max_attempts INTEGER DEFAULT 3,
    started_at DATETIME,
    completed_at DATETIME,
    duration_seconds REAL
);

-- Table des logs pour le debug
CREATE TABLE pipeline_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pipeline_id INTEGER REFERENCES pipelines(id),
    step_name TEXT,
    level TEXT DEFAULT 'info',
    -- debug, info, warning, error
    message TEXT,
    metadata JSON,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- Index pour les requêtes fréquentes
CREATE INDEX idx_pipelines_status ON pipelines(status);
CREATE INDEX idx_steps_pipeline ON pipeline_steps(pipeline_id, step_order);
CREATE INDEX idx_logs_pipeline ON pipeline_logs(pipeline_id, created_at);

Pourquoi persister chaque étape ?

Imaginez que la génération d'images fonctionne (étape 3) mais que le montage vidéo échoue (étape 4). Sans persistance, il faut tout refaire — y compris les images qui ont coûté des crédits API. Avec la BDD :

# L'agent vérifie l'état et reprend où il en était
pipeline = db.get_pipeline(pipeline_id)
steps = db.get_steps(pipeline_id)

for step in steps:
    if step.status == "completed":
        continue  # Déjà fait, on passe
    if step.status == "failed" and step.attempts >= step.max_attempts:
        notify_human(f"Étape {step.step_name} en échec définitif")
        break
    # Exécuter ou ré-exécuter cette étape
    execute_step(step)

🤖 L'agent orchestrateur

Structure du code

Voici l'architecture complète de l'agent orchestrateur :

import sqlite3
import json
import time
from datetime import datetime
from typing import Optional

class PipelineAgent:
    """Agent qui orchestre un pipeline de production vidéo."""

    def __init__(self, db_path: str, llm_client, tools: dict):
        self.db_path = db_path
        self.llm = llm_client
        self.tools = tools
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row

    def log(self, pipeline_id: int, step: str, level: str, message: str, metadata=None):
        """Enregistre un log dans la BDD."""
        self.conn.execute(
            "INSERT INTO pipeline_logs (pipeline_id, step_name, level, message, metadata) VALUES (?, ?, ?, ?, ?)",
            (pipeline_id, step, level, message, json.dumps(metadata) if metadata else None)
        )
        self.conn.commit()

    def create_pipeline(self, name: str, config: dict) -> int:
        """Crée un nouveau pipeline avec ses étapes."""
        cursor = self.conn.execute(
            "INSERT INTO pipelines (name, config) VALUES (?, ?)",
            (name, json.dumps(config))
        )
        pipeline_id = cursor.lastrowid

        steps = [
            ("generate_idea", 1),
            ("write_script", 2),
            ("generate_images", 3),
            ("create_video", 4),
            ("upload_publish", 5),
        ]

        for step_name, order in steps:
            self.conn.execute(
                "INSERT INTO pipeline_steps (pipeline_id, step_name, step_order) VALUES (?, ?, ?)",
                (pipeline_id, step_name, order)
            )

        self.conn.commit()
        self.log(pipeline_id, "init", "info", f"Pipeline '{name}' créé avec {len(steps)} étapes")
        return pipeline_id

    def run_pipeline(self, pipeline_id: int):
        """Exécute (ou reprend) un pipeline."""
        pipeline = self.conn.execute(
            "SELECT * FROM pipelines WHERE id = ?", (pipeline_id,)
        ).fetchone()

        if not pipeline:
            raise ValueError(f"Pipeline {pipeline_id} non trouvé")

        if pipeline["status"] == "completed":
            return {"status": "already_completed"}

        # Marquer comme en cours
        self.conn.execute(
            "UPDATE pipelines SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?",
            (pipeline_id,)
        )
        self.conn.commit()

        steps = self.conn.execute(
            "SELECT * FROM pipeline_steps WHERE pipeline_id=? ORDER BY step_order",
            (pipeline_id,)
        ).fetchall()

        config = json.loads(pipeline["config"])
        context = {}  # Résultats accumulés

        for step in steps:
            if step["status"] == "completed":
                # Charger le résultat précédent dans le contexte
                if step["output_data"]:
                    context[step["step_name"]] = json.loads(step["output_data"])
                continue

            # Exécuter l'étape
            success = self.execute_step(pipeline_id, step, config, context)

            if not success:
                self.conn.execute(
                    "UPDATE pipelines SET status='failed', current_step=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
                    (step["step_name"], pipeline_id)
                )
                self.conn.commit()
                self.log(pipeline_id, step["step_name"], "error",
                        f"Pipeline arrêté: étape {step['step_name']} en échec définitif")
                return {"status": "failed", "failed_step": step["step_name"]}

        # Tout est terminé
        self.conn.execute(
            "UPDATE pipelines SET status='completed', completed_at=CURRENT_TIMESTAMP, updated_at=CURRENT_TIMESTAMP WHERE id=?",
            (pipeline_id,)
        )
        self.conn.commit()
        self.log(pipeline_id, "final", "info", "Pipeline terminé avec succès")
        return {"status": "completed", "context": context}

    def execute_step(self, pipeline_id: int, step, config: dict, context: dict) -> bool:
        """Exécute une étape avec retry et gestion d'erreurs."""
        step_name = step["step_name"]
        max_attempts = step["max_attempts"]
        current_attempts = step["attempts"]

        for attempt in range(current_attempts, max_attempts):
            self.log(pipeline_id, step_name, "info",
                    f"Tentative {attempt + 1}/{max_attempts}")

            # Marquer comme en cours
            self.conn.execute(
                """UPDATE pipeline_steps
                   SET status='running', attempts=?, started_at=CURRENT_TIMESTAMP
                   WHERE id=?""",
                (attempt + 1, step["id"])
            )
            self.conn.commit()

            start_time = time.time()

            try:
                # Appeler la fonction correspondante
                handler = getattr(self, f"step_{step_name}")
                result = handler(config, context)

                duration = time.time() - start_time

                # Succès !
                self.conn.execute(
                    """UPDATE pipeline_steps
                       SET status='completed', output_data=?,
                           completed_at=CURRENT_TIMESTAMP, duration_seconds=?
                       WHERE id=?""",
                    (json.dumps(result), duration, step["id"])
                )
                self.conn.commit()

                context[step_name] = result
                self.log(pipeline_id, step_name, "info",
                        f"Étape réussie en {duration:.1f}s")
                return True

            except Exception as e:
                duration = time.time() - start_time
                error_msg = str(e)

                self.conn.execute(
                    """UPDATE pipeline_steps
                       SET status='failed', error_message=?, duration_seconds=?
                       WHERE id=?""",
                    (error_msg, duration, step["id"])
                )
                self.conn.commit()

                self.log(pipeline_id, step_name, "error",
                        f"Échec tentative {attempt + 1}: {error_msg}")

                # Backoff exponentiel avant retry
                if attempt < max_attempts - 1:
                    wait_time = 2 ** attempt * 5  # 5s, 10s, 20s
                    self.log(pipeline_id, step_name, "info",
                            f"Attente de {wait_time}s avant retry")
                    time.sleep(wait_time)

        return False  # Toutes les tentatives épuisées

Les étapes du pipeline

Maintenant, implémentons chaque étape :

    # --- Étape 1 : Générer l'idée ---
    def step_generate_idea(self, config: dict, context: dict) -> dict:
        """Génère une idée de vidéo basée sur la thématique."""
        theme = config.get("theme", "intelligence artificielle")
        audience = config.get("audience", "développeurs francophones")

        prompt = f"""Génère une idée de vidéo YouTube sur le thème "{theme}"
pour une audience de {audience}.

Retourne un JSON avec:
- title: titre accrocheur (max 60 chars)
- hook: phrase d'accroche (max 150 chars)
- angle: angle unique/original
- key_points: liste de 5-7 points clés à couvrir
- estimated_duration: durée estimée en minutes"""

        response = self.llm.complete(prompt)
        idea = json.loads(response)

        # Validation
        if len(idea.get("title", "")) > 80:
            raise ValueError("Titre trop long")
        if len(idea.get("key_points", [])) < 3:
            raise ValueError("Pas assez de points clés")

        return idea

    # --- Étape 2 : Écrire le script ---
    def step_write_script(self, config: dict, context: dict) -> dict:
        """Écrit le script complet de la vidéo."""
        idea = context["generate_idea"]

        prompt = f"""Écris un script vidéo YouTube complet basé sur cette idée:

Titre: {idea['title']}
Hook: {idea['hook']}
Angle: {idea['angle']}
Points clés: {json.dumps(idea['key_points'])}
Durée cible: {idea['estimated_duration']} minutes

Le script doit inclure:
1. Introduction avec hook (15 secondes)
2. Développement structuré par scènes
3. Conclusion avec call-to-action

Pour chaque scène, indique:
- narration: le texte à dire
- visual: description de l'image/animation à montrer
- duration_seconds: durée estimée

Retourne un JSON avec: title, description, tags, scenes[]"""

        response = self.llm.complete(prompt)
        script = json.loads(response)

        # Validation
        total_duration = sum(s.get("duration_seconds", 0) for s in script["scenes"])
        if total_duration < 60:
            raise ValueError(f"Script trop court: {total_duration}s")

        return script

    # --- Étape 3 : Générer les images ---
    def step_generate_images(self, config: dict, context: dict) -> dict:
        """Génère les visuels pour chaque scène."""
        script = context["write_script"]
        images = []

        for i, scene in enumerate(script["scenes"]):
            visual_desc = scene.get("visual", "")

            # Utiliser l'outil de génération d'images
            prompt = f"""Crée un prompt de génération d''image pour cette scène:
Description: {visual_desc}
Style: {config.get('visual_style', 'moderne, flat design, couleurs vives')}

Le prompt doit être en anglais, détaillé, pour Stable Diffusion ou DALL-E."""

            image_prompt = self.llm.complete(prompt)

            # Appel à l'API de génération d'images
            image_result = self.tools["generate_image"](
                prompt=image_prompt,
                style=config.get("image_style", "digital-art"),
                size="1920x1080"
            )

            images.append({
                "scene_index": i,
                "prompt_used": image_prompt,
                "image_path": image_result["path"],
                "image_url": image_result.get("url")
            })

            self.log(0, "generate_images", "debug",
                    f"Image {i+1}/{len(script['scenes'])} générée")

        return {"images": images, "count": len(images)}

    # --- Étape 4 : Créer la vidéo ---
    def step_create_video(self, config: dict, context: dict) -> dict:
        """Assemble les images et l'audio en vidéo."""
        script = context["write_script"]
        images = context["generate_images"]["images"]

        # Générer la narration audio (TTS)
        narration_parts = []
        for i, scene in enumerate(script["scenes"]):
            audio_result = self.tools["text_to_speech"](
                text=scene["narration"],
                voice=config.get("voice", "fr-FR-DeniseNeural"),
                output_format="mp3"
            )
            narration_parts.append({
                "scene_index": i,
                "audio_path": audio_result["path"],
                "duration": audio_result["duration"]
            })

        # Assembler avec FFmpeg
        video_result = self.tools["assemble_video"](
            images=[img["image_path"] for img in images],
            audio_parts=[n["audio_path"] for n in narration_parts],
            durations=[n["duration"] for n in narration_parts],
            output_path=f"/tmp/video_{int(time.time())}.mp4",
            resolution="1920x1080",
            fps=30
        )

        return {
            "video_path": video_result["path"],
            "duration": video_result["total_duration"],
            "file_size_mb": video_result["file_size_mb"]
        }

    # --- Étape 5 : Upload et publication ---
    def step_upload_publish(self, config: dict, context: dict) -> dict:
        """Upload la vidéo sur YouTube."""
        script = context["write_script"]
        video = context["create_video"]

        upload_result = self.tools["youtube_upload"](
            video_path=video["video_path"],
            title=script["title"],
            description=script["description"],
            tags=script["tags"],
            category="28",  # Science & Technology
            privacy="private",  # D'abord en privé pour review
            thumbnail=context["generate_images"]["images"][0]["image_path"]
        )

        return {
            "video_id": upload_result["video_id"],
            "url": f"https://youtube.com/watch?v={upload_result['video_id']}",
            "status": "uploaded_private"
        }

⚡ Gestion intelligente des erreurs

Pour aller plus loin sur ce sujet, consultez notre guide Créer son premier agent IA autonome.

La gestion d'erreurs est ce qui différencie un pipeline amateur d'un pipeline de production. Voici les stratégies clés.

Pour aller plus loin sur ce sujet, consultez notre guide MCP, Function Calling, Tool Use : le guide complet.

Retry avec backoff exponentiel

import random

def retry_with_backoff(func, max_retries=3, base_delay=5):
    """Retry avec backoff exponentiel et jitter."""
    for attempt in range(max_retries):
        try:
            return func()
        except RateLimitError:
            # Backoff exponentiel + jitter aléatoire
            delay = base_delay * (2 ** attempt) + random.uniform(0, 2)
            print(f"Rate limit - attente {delay:.1f}s (tentative {attempt + 1})")
            time.sleep(delay)
        except (TimeoutError, ConnectionError) as e:
            # Erreurs réseau : retry rapide
            delay = base_delay * (attempt + 1)
            print(f"Erreur réseau: {e} - retry dans {delay}s")
            time.sleep(delay)
        except ValidationError as e:
            # Erreur de validation : pas de retry, c'est une erreur logique
            raise
    raise MaxRetriesExceeded(f"Échec après {max_retries} tentatives")

Catégories d'erreurs

Toutes les erreurs ne se traitent pas de la même façon :

Type d'erreur Action Retry ?
Rate limit (429) Backoff exponentiel ✅ Oui
Timeout réseau Retry rapide ✅ Oui
Erreur API (500) Retry avec délai ✅ Oui
Contenu invalide Re-générer avec un prompt ajusté ✅ Oui (avec adaptation)
Auth expirée (401) Rafraîchir le token ✅ Oui (1 fois)
Erreur de validation Corriger la logique ❌ Non
Quota dépassé Notifier l'humain ❌ Non
Fichier manquant Vérifier l'étape précédente ❌ Non (re-run étape précédente)

L'agent qui s'adapte

L'avantage d'utiliser un agent IA (plutôt qu'un script) pour l'orchestration, c'est qu'il peut raisonner sur les erreurs :

def handle_error_with_llm(self, step_name: str, error: str, context: dict) -> dict:
    """L'agent analyse l'erreur et décide de la stratégie."""
    prompt = f"""Une erreur s'est produite dans le pipeline vidéo.

Étape: {step_name}
Erreur: {error}
Contexte: {json.dumps(context, indent=2)}

Analyse l'erreur et recommande une action:
1. RETRY - Réessayer la même étape
2. RETRY_MODIFIED - Réessayer avec des paramètres modifiés (précise lesquels)
3. SKIP - Sauter cette étape (si non critique)
4. ROLLBACK - Refaire l'étape précédente
5. ABORT - Arrêter le pipeline et notifier un humain

Retourne un JSON: {{"action": "...", "reason": "...", "modifications": {{}}}}"""

    response = self.llm.complete(prompt)
    decision = json.loads(response)

    self.log(0, step_name, "info",
            f"Décision de l''agent: {decision['action']} - {decision['reason']}")

    return decision

Exemples concrets d'adaptation

Scénario 1 : Image générée de mauvaise qualité

Erreur: "Image quality score below threshold (0.3/1.0)"
→ Agent décide: RETRY_MODIFIED
→ Modification: reformuler le prompt en ajoutant "high quality, detailed, 4K"

Scénario 2 : API de TTS temporairement indisponible

Erreur: "503 Service Unavailable"
→ Agent décide: RETRY avec backoff
→ Si 3 échecs: tente un TTS alternatif (fallback)

Scénario 3 : Vidéo trop longue pour YouTube

Erreur: "Video exceeds 15 minute limit for unverified accounts"
→ Agent décide: RETRY_MODIFIED
→ Modification: couper la vidéo en 2 parties

⏰ Orchestration avec Cron

Configuration du cron

Le cron déclenche le pipeline selon un planning défini. Avec OpenClaw, c'est natif :

# Dans la configuration OpenClaw
cron:
  video_pipeline:
    schedule: "0 8 * * 1-5"  # Lundi-Vendredi à 8h
    task: "Lance le pipeline vidéo du jour"
    enabled: true

Pour un cron système classique :

# crontab -e
# Pipeline vidéo tous les jours à 8h
0 8 * * * /usr/bin/python3 /opt/pipeline/run.py >> /var/log/pipeline.log 2>&1

Le script de lancement

#!/usr/bin/env python3
"""Script lancé par cron pour exécuter le pipeline."""

import sys
import sqlite3
from datetime import datetime, timedelta
from pipeline_agent import PipelineAgent

DB_PATH = "/opt/pipeline/database.db"

def main():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row

    # 1. Vérifier s'il y a un pipeline en échec à reprendre
    failed = conn.execute(
        "SELECT id FROM pipelines WHERE status='failed' AND error_count < max_retries ORDER BY updated_at LIMIT 1"
    ).fetchone()

    if failed:
        print(f"Reprise du pipeline #{failed['id']} en échec")
        agent = PipelineAgent(DB_PATH, llm, tools)
        result = agent.run_pipeline(failed["id"])
        print(f"Résultat: {result['status']}")
        return

    # 2. Vérifier qu'on n'a pas déjà un pipeline aujourd'hui
    today = datetime.now().strftime("%Y-%m-%d")
    existing = conn.execute(
        "SELECT id FROM pipelines WHERE date(created_at)=? AND status IN ('running','completed')",
        (today,)
    ).fetchone()

    if existing:
        print(f"Pipeline du jour déjà existant (#{existing['id']})")
        return

    # 3. Créer et lancer un nouveau pipeline
    agent = PipelineAgent(DB_PATH, llm, tools)

    # Configuration du jour
    config = get_daily_config()

    pipeline_id = agent.create_pipeline(
        name=f"video_{today}",
        config=config
    )

    print(f"Pipeline #{pipeline_id} créé, lancement...")
    result = agent.run_pipeline(pipeline_id)
    print(f"Résultat: {result['status']}")

    # Notification
    if result["status"] == "completed":
        notify_success(pipeline_id, result)
    elif result["status"] == "failed":
        notify_failure(pipeline_id, result)

def get_daily_config():
    """Retourne la configuration du pipeline du jour."""
    # On peut varier les thèmes, les styles, etc.
    themes = [
        {"theme": "nouveautés IA", "audience": "tech enthusiasts"},
        {"theme": "tutoriel Python", "audience": "développeurs débutants"},
        {"theme": "outils productivité IA", "audience": "entrepreneurs"},
        {"theme": "actualité tech", "audience": "grand public tech"},
        {"theme": "deep dive architecture", "audience": "développeurs seniors"},
    ]
    day_of_week = datetime.now().weekday()
    return themes[day_of_week % len(themes)]

if __name__ == "__main__":
    main()

📊 Monitoring et dashboard

Un pipeline de production a besoin de visibilité. Voici les métriques essentielles à suivre.

Requêtes de monitoring

-- Taux de succès sur les 30 derniers jours
SELECT
    COUNT(CASE WHEN status = 'completed' THEN 1 END) as success,
    COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed,
    COUNT(*) as total,
    ROUND(100.0 * COUNT(CASE WHEN status = 'completed' THEN 1 END) / COUNT(*), 1) as success_rate
FROM pipelines
WHERE created_at >= datetime('now', '-30 days');

-- Durée moyenne par étape
SELECT
    step_name,
    ROUND(AVG(duration_seconds), 1) as avg_duration,
    ROUND(MIN(duration_seconds), 1) as min_duration,
    ROUND(MAX(duration_seconds), 1) as max_duration,
    COUNT(CASE WHEN status = 'failed' THEN 1 END) as failures
FROM pipeline_steps
WHERE completed_at IS NOT NULL
GROUP BY step_name
ORDER BY step_order;

-- Pipelines qui ont nécessité des retries
SELECT
    p.id,
    p.name,
    p.status,
    SUM(ps.attempts - 1) as total_retries,
    GROUP_CONCAT(
        CASE WHEN ps.attempts > 1
        THEN ps.step_name || '(' || ps.attempts || ')'
        END
    ) as retried_steps
FROM pipelines p
JOIN pipeline_steps ps ON p.id = ps.pipeline_id
WHERE ps.attempts > 1
GROUP BY p.id
ORDER BY p.created_at DESC
LIMIT 20;

Dashboard simple en Python

def print_dashboard(db_path: str):
    """Affiche un dashboard texte du pipeline."""
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row

    # Stats globales
    stats = conn.execute("""
        SELECT
            COUNT(*) as total,
            COUNT(CASE WHEN status='completed' THEN 1 END) as completed,
            COUNT(CASE WHEN status='failed' THEN 1 END) as failed,
            COUNT(CASE WHEN status='running' THEN 1 END) as running
        FROM pipelines
        WHERE created_at >= datetime('now', '-7 days')
    """).fetchone()

    print("=" * 50)
    print("📊 PIPELINE DASHBOARD (7 derniers jours)")
    print("=" * 50)
    print(f"  Total:    {stats['total']}")
    print(f"  ✅ OK:     {stats['completed']}")
    print(f"  ❌ Échec:  {stats['failed']}")
    print(f"  🔄 En cours: {stats['running']}")
    if stats['total'] > 0:
        rate = 100 * stats['completed'] / stats['total']
        print(f"  📈 Taux:   {rate:.0f}%")
    print()

    # Derniers pipelines
    recent = conn.execute("""
        SELECT id, name, status, created_at,
               ROUND((julianday(COALESCE(completed_at, datetime('now')))
                     - julianday(created_at)) * 86400) as duration_s
        FROM pipelines
        ORDER BY created_at DESC
        LIMIT 5
    """).fetchall()

    print("📋 Derniers pipelines:")
    for p in recent:
        icon = {"completed": "✅", "failed": "❌", "running": "🔄", "pending": "⏳"}.get(p["status"], "❓")
        duration = f"{int(p['duration_s'] or 0)}s"
        print(f"  {icon} #{p['id']} {p['name']} ({duration})")

    conn.close()

🔒 Bonnes pratiques de production

1. Idempotence

Chaque étape doit être idempotente : l'exécuter deux fois produit le même résultat. C'est crucial pour le retry.

def step_generate_images(self, config, context):
    """Idempotent : vérifie si les images existent déjà."""
    output_dir = f"/data/pipeline_{context['pipeline_id']}/images"
    os.makedirs(output_dir, exist_ok=True)

    expected_count = len(context["write_script"]["scenes"])
    existing = glob.glob(f"{output_dir}/*.png")

    if len(existing) == expected_count:
        # Déjà générées, on retourne les chemins existants
        return {"images": sorted(existing), "count": expected_count}

    # Sinon, générer les manquantes
    ...

2. Timeouts sur tout

import signal

class TimeoutError(Exception):
    pass

def with_timeout(func, timeout_seconds=300):
    """Exécute une fonction avec un timeout."""
    def handler(signum, frame):
        raise TimeoutError(f"Timeout après {timeout_seconds}s")

    signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout_seconds)
    try:
        return func()
    finally:
        signal.alarm(0)

# Utilisation
result = with_timeout(
    lambda: generate_image(prompt),
    timeout_seconds=120
)

3. Nettoyage des fichiers temporaires

def cleanup_pipeline(pipeline_id: int, keep_final=True):
    """Nettoie les fichiers temporaires après un pipeline terminé."""
    work_dir = f"/data/pipeline_{pipeline_id}"

    if keep_final:
        # Garder uniquement la vidéo finale
        final_video = glob.glob(f"{work_dir}/final_*.mp4")
        temp_files = glob.glob(f"{work_dir}/tmp_*")
        for f in temp_files:
            os.remove(f)
    else:
        shutil.rmtree(work_dir, ignore_errors=True)

4. Notifications intelligentes

def notify_on_status(pipeline_id: int, status: str, details: dict):
    """Notifie selon le statut — pas de spam."""
    if status == "completed":
        # Notification simple de succès
        send_notification(
            title="✅ Vidéo prête",
            body=f"Pipeline #{pipeline_id} terminé. URL: {details.get('url')}",
            priority="normal"
        )
    elif status == "failed":
        # Notification urgente avec détails
        send_notification(
            title=f"❌ Pipeline #{pipeline_id} en échec",
            body=f"Étape: {details['failed_step']}\nErreur: {details['error'][:200]}",
            priority="high"
        )
    # Pas de notification pour "running" — trop de bruit

🚀 Aller plus loin

Parallélisation des étapes indépendantes

Si certaines étapes sont indépendantes, vous pouvez les paralléliser :

import asyncio

async def run_parallel_steps(self, steps, config, context):
    """Exécute des étapes indépendantes en parallèle."""
    tasks = [
        asyncio.create_task(self.async_execute_step(step, config, context))
        for step in steps
    ]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    for step, result in zip(steps, results):
        if isinstance(result, Exception):
            raise result
        context[step["step_name"]] = result

Pipeline conditionnel

L'agent peut décider de modifier le pipeline en cours de route :

def step_quality_check(self, config, context):
    """Vérifie la qualité et décide s'il faut des étapes supplémentaires."""
    video = context["create_video"]

    quality = self.tools["analyze_video"](video["video_path"])

    if quality["audio_score"] < 0.7:
        # Ajouter une étape de nettoyage audio
        self.add_dynamic_step("clean_audio", after="create_video")

    if quality["visual_score"] < 0.7:
        # Re-générer les images de mauvaise qualité
        bad_scenes = [s for s in quality["scenes"] if s["score"] < 0.5]
        return {"needs_regen": True, "bad_scenes": bad_scenes}

    return {"quality": "ok", "scores": quality}

Intégration avec OpenClaw

OpenClaw est parfaitement adapté pour orchestrer ce type de pipeline grâce à ses cron jobs natifs et son accès aux outils via MCP :

# Configuration OpenClaw pour le pipeline
cron:
  daily_video:
    schedule: "0 8 * * 1-5"
    task: |
      Vérifie la table pipelines dans la BDD.
      S'il y a un pipeline en échec, reprends-le.
      Sinon, crée un nouveau pipeline vidéo du jour
      et exécute-le étape par étape.
      Notifie-moi du résultat.

L'avantage d'OpenClaw : l'agent a accès à tous les outils (shell, web, BDD, API) nativement, et le cron est géré par la plateforme. Pas besoin de scripts de lancement complexes.

📚 Articles liés