Automatiser un pipeline complet avec un agent
Vous avez un agent IA qui répond à des questions. C'est bien. Mais le vrai pouvoir des agents, c'est de les mettre au travail sur des pipelines de production complets — des chaînes d'opérations qui transforment une idée brute en livrable final, automatiquement, sans intervention humaine.
Dans cet article, on construit ensemble un cas concret : un pipeline de production vidéo automatisé par un agent IA. De l'idée initiale jusqu'à l'upload sur YouTube, en passant par la génération du script, des images et du montage. On couvre l'architecture, la gestion d'erreurs, le retry, et l'orchestration avec cron + agent + base de données.
🎯 Le cas concret : pipeline vidéo automatisé
Voici le pipeline qu'on va construire :
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ IDÉE │───▶│ SCRIPT │───▶│ IMAGES │───▶│ VIDÉO │───▶│ UPLOAD │
│ │ │ │ │ │ │ │ │ │
│ Sujet + │ │ Texte + │ │ Visuels │ │ Montage │ │ YouTube │
│ angle │ │ narrat° │ │ par │ │ auto │ │ + meta │
│ │ │ │ │ scène │ │ │ │ │
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘
Chaque étape peut échouer. L'agent doit :
- Orchestrer les outils dans le bon ordre
- Détecter les erreurs et réessayer intelligemment
- Persister l'état pour reprendre après un crash
- Notifier en cas de problème bloquant
🏗️ Architecture globale
Les trois piliers
L'architecture repose sur trois composants qui collaborent :
┌───────────────────────────────────────────────┐
│ CRON │
│ Déclenche le pipeline à intervalles réguliers │
│ Ex: tous les jours à 8h │
└──────────────────────┬────────────────────────┘
│
▼
┌───────────────────────────────────────────────┐
│ AGENT IA │
│ Orchestre les outils, gère la logique │
│ Prend les décisions, gère les erreurs │
└──────────────────────┬────────────────────────┘
│
▼
┌───────────────────────────────────────────────┐
│ BASE DE DONNÉES │
│ Stocke l'état de chaque tâche │
│ Historique, statuts, résultats intermédiaires │
└───────────────────────────────────────────────┘
Pourquoi cette architecture ?
| Composant | Rôle | Pourquoi c'est nécessaire |
|---|---|---|
| Cron | Déclencheur | L'agent ne tourne pas 24/7, le cron le réveille |
| Agent | Cerveau | Prend les décisions, s'adapte aux erreurs |
| BDD | Mémoire | Survit aux crashes, permet le suivi et le debug |
Sans la BDD, un crash = tout recommencer. Sans le cron, il faut lancer manuellement. Sans l'agent, c'est un script rigide qui casse au moindre imprévu.
💾 Modèle de données
Commençons par la base de données. On utilise SQLite pour la simplicité, mais le concept s'applique à PostgreSQL ou tout autre SGBD.
-- Table principale des pipelines
CREATE TABLE pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
status TEXT DEFAULT 'pending',
-- pending, running, completed, failed, paused
current_step TEXT,
config JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
completed_at DATETIME,
error_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3
);
-- Table des étapes individuelles
CREATE TABLE pipeline_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER REFERENCES pipelines(id),
step_name TEXT NOT NULL,
step_order INTEGER NOT NULL,
status TEXT DEFAULT 'pending',
-- pending, running, completed, failed, skipped
input_data JSON,
output_data JSON,
error_message TEXT,
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 3,
started_at DATETIME,
completed_at DATETIME,
duration_seconds REAL
);
-- Table des logs pour le debug
CREATE TABLE pipeline_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER REFERENCES pipelines(id),
step_name TEXT,
level TEXT DEFAULT 'info',
-- debug, info, warning, error
message TEXT,
metadata JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- Index pour les requêtes fréquentes
CREATE INDEX idx_pipelines_status ON pipelines(status);
CREATE INDEX idx_steps_pipeline ON pipeline_steps(pipeline_id, step_order);
CREATE INDEX idx_logs_pipeline ON pipeline_logs(pipeline_id, created_at);
Pourquoi persister chaque étape ?
Imaginez que la génération d'images fonctionne (étape 3) mais que le montage vidéo échoue (étape 4). Sans persistance, il faut tout refaire — y compris les images qui ont coûté des crédits API. Avec la BDD :
# L'agent vérifie l'état et reprend où il en était
pipeline = db.get_pipeline(pipeline_id)
steps = db.get_steps(pipeline_id)
for step in steps:
if step.status == "completed":
continue # Déjà fait, on passe
if step.status == "failed" and step.attempts >= step.max_attempts:
notify_human(f"Étape {step.step_name} en échec définitif")
break
# Exécuter ou ré-exécuter cette étape
execute_step(step)
🤖 L'agent orchestrateur
Structure du code
Voici l'architecture complète de l'agent orchestrateur :
import sqlite3
import json
import time
from datetime import datetime
from typing import Optional
class PipelineAgent:
"""Agent qui orchestre un pipeline de production vidéo."""
def __init__(self, db_path: str, llm_client, tools: dict):
self.db_path = db_path
self.llm = llm_client
self.tools = tools
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
def log(self, pipeline_id: int, step: str, level: str, message: str, metadata=None):
"""Enregistre un log dans la BDD."""
self.conn.execute(
"INSERT INTO pipeline_logs (pipeline_id, step_name, level, message, metadata) VALUES (?, ?, ?, ?, ?)",
(pipeline_id, step, level, message, json.dumps(metadata) if metadata else None)
)
self.conn.commit()
def create_pipeline(self, name: str, config: dict) -> int:
"""Crée un nouveau pipeline avec ses étapes."""
cursor = self.conn.execute(
"INSERT INTO pipelines (name, config) VALUES (?, ?)",
(name, json.dumps(config))
)
pipeline_id = cursor.lastrowid
steps = [
("generate_idea", 1),
("write_script", 2),
("generate_images", 3),
("create_video", 4),
("upload_publish", 5),
]
for step_name, order in steps:
self.conn.execute(
"INSERT INTO pipeline_steps (pipeline_id, step_name, step_order) VALUES (?, ?, ?)",
(pipeline_id, step_name, order)
)
self.conn.commit()
self.log(pipeline_id, "init", "info", f"Pipeline '{name}' créé avec {len(steps)} étapes")
return pipeline_id
def run_pipeline(self, pipeline_id: int):
"""Exécute (ou reprend) un pipeline."""
pipeline = self.conn.execute(
"SELECT * FROM pipelines WHERE id = ?", (pipeline_id,)
).fetchone()
if not pipeline:
raise ValueError(f"Pipeline {pipeline_id} non trouvé")
if pipeline["status"] == "completed":
return {"status": "already_completed"}
# Marquer comme en cours
self.conn.execute(
"UPDATE pipelines SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pipeline_id,)
)
self.conn.commit()
steps = self.conn.execute(
"SELECT * FROM pipeline_steps WHERE pipeline_id=? ORDER BY step_order",
(pipeline_id,)
).fetchall()
config = json.loads(pipeline["config"])
context = {} # Résultats accumulés
for step in steps:
if step["status"] == "completed":
# Charger le résultat précédent dans le contexte
if step["output_data"]:
context[step["step_name"]] = json.loads(step["output_data"])
continue
# Exécuter l'étape
success = self.execute_step(pipeline_id, step, config, context)
if not success:
self.conn.execute(
"UPDATE pipelines SET status='failed', current_step=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(step["step_name"], pipeline_id)
)
self.conn.commit()
self.log(pipeline_id, step["step_name"], "error",
f"Pipeline arrêté: étape {step['step_name']} en échec définitif")
return {"status": "failed", "failed_step": step["step_name"]}
# Tout est terminé
self.conn.execute(
"UPDATE pipelines SET status='completed', completed_at=CURRENT_TIMESTAMP, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pipeline_id,)
)
self.conn.commit()
self.log(pipeline_id, "final", "info", "Pipeline terminé avec succès")
return {"status": "completed", "context": context}
def execute_step(self, pipeline_id: int, step, config: dict, context: dict) -> bool:
"""Exécute une étape avec retry et gestion d'erreurs."""
step_name = step["step_name"]
max_attempts = step["max_attempts"]
current_attempts = step["attempts"]
for attempt in range(current_attempts, max_attempts):
self.log(pipeline_id, step_name, "info",
f"Tentative {attempt + 1}/{max_attempts}")
# Marquer comme en cours
self.conn.execute(
"""UPDATE pipeline_steps
SET status='running', attempts=?, started_at=CURRENT_TIMESTAMP
WHERE id=?""",
(attempt + 1, step["id"])
)
self.conn.commit()
start_time = time.time()
try:
# Appeler la fonction correspondante
handler = getattr(self, f"step_{step_name}")
result = handler(config, context)
duration = time.time() - start_time
# Succès !
self.conn.execute(
"""UPDATE pipeline_steps
SET status='completed', output_data=?,
completed_at=CURRENT_TIMESTAMP, duration_seconds=?
WHERE id=?""",
(json.dumps(result), duration, step["id"])
)
self.conn.commit()
context[step_name] = result
self.log(pipeline_id, step_name, "info",
f"Étape réussie en {duration:.1f}s")
return True
except Exception as e:
duration = time.time() - start_time
error_msg = str(e)
self.conn.execute(
"""UPDATE pipeline_steps
SET status='failed', error_message=?, duration_seconds=?
WHERE id=?""",
(error_msg, duration, step["id"])
)
self.conn.commit()
self.log(pipeline_id, step_name, "error",
f"Échec tentative {attempt + 1}: {error_msg}")
# Backoff exponentiel avant retry
if attempt < max_attempts - 1:
wait_time = 2 ** attempt * 5 # 5s, 10s, 20s
self.log(pipeline_id, step_name, "info",
f"Attente de {wait_time}s avant retry")
time.sleep(wait_time)
return False # Toutes les tentatives épuisées
Les étapes du pipeline
Maintenant, implémentons chaque étape :
# --- Étape 1 : Générer l'idée ---
def step_generate_idea(self, config: dict, context: dict) -> dict:
"""Génère une idée de vidéo basée sur la thématique."""
theme = config.get("theme", "intelligence artificielle")
audience = config.get("audience", "développeurs francophones")
prompt = f"""Génère une idée de vidéo YouTube sur le thème "{theme}"
pour une audience de {audience}.
Retourne un JSON avec:
- title: titre accrocheur (max 60 chars)
- hook: phrase d'accroche (max 150 chars)
- angle: angle unique/original
- key_points: liste de 5-7 points clés à couvrir
- estimated_duration: durée estimée en minutes"""
response = self.llm.complete(prompt)
idea = json.loads(response)
# Validation
if len(idea.get("title", "")) > 80:
raise ValueError("Titre trop long")
if len(idea.get("key_points", [])) < 3:
raise ValueError("Pas assez de points clés")
return idea
# --- Étape 2 : Écrire le script ---
def step_write_script(self, config: dict, context: dict) -> dict:
"""Écrit le script complet de la vidéo."""
idea = context["generate_idea"]
prompt = f"""Écris un script vidéo YouTube complet basé sur cette idée:
Titre: {idea['title']}
Hook: {idea['hook']}
Angle: {idea['angle']}
Points clés: {json.dumps(idea['key_points'])}
Durée cible: {idea['estimated_duration']} minutes
Le script doit inclure:
1. Introduction avec hook (15 secondes)
2. Développement structuré par scènes
3. Conclusion avec call-to-action
Pour chaque scène, indique:
- narration: le texte à dire
- visual: description de l'image/animation à montrer
- duration_seconds: durée estimée
Retourne un JSON avec: title, description, tags, scenes[]"""
response = self.llm.complete(prompt)
script = json.loads(response)
# Validation
total_duration = sum(s.get("duration_seconds", 0) for s in script["scenes"])
if total_duration < 60:
raise ValueError(f"Script trop court: {total_duration}s")
return script
# --- Étape 3 : Générer les images ---
def step_generate_images(self, config: dict, context: dict) -> dict:
"""Génère les visuels pour chaque scène."""
script = context["write_script"]
images = []
for i, scene in enumerate(script["scenes"]):
visual_desc = scene.get("visual", "")
# Utiliser l'outil de génération d'images
prompt = f"""Crée un prompt de génération d''image pour cette scène:
Description: {visual_desc}
Style: {config.get('visual_style', 'moderne, flat design, couleurs vives')}
Le prompt doit être en anglais, détaillé, pour Stable Diffusion ou DALL-E."""
image_prompt = self.llm.complete(prompt)
# Appel à l'API de génération d'images
image_result = self.tools["generate_image"](
prompt=image_prompt,
style=config.get("image_style", "digital-art"),
size="1920x1080"
)
images.append({
"scene_index": i,
"prompt_used": image_prompt,
"image_path": image_result["path"],
"image_url": image_result.get("url")
})
self.log(0, "generate_images", "debug",
f"Image {i+1}/{len(script['scenes'])} générée")
return {"images": images, "count": len(images)}
# --- Étape 4 : Créer la vidéo ---
def step_create_video(self, config: dict, context: dict) -> dict:
"""Assemble les images et l'audio en vidéo."""
script = context["write_script"]
images = context["generate_images"]["images"]
# Générer la narration audio (TTS)
narration_parts = []
for i, scene in enumerate(script["scenes"]):
audio_result = self.tools["text_to_speech"](
text=scene["narration"],
voice=config.get("voice", "fr-FR-DeniseNeural"),
output_format="mp3"
)
narration_parts.append({
"scene_index": i,
"audio_path": audio_result["path"],
"duration": audio_result["duration"]
})
# Assembler avec FFmpeg
video_result = self.tools["assemble_video"](
images=[img["image_path"] for img in images],
audio_parts=[n["audio_path"] for n in narration_parts],
durations=[n["duration"] for n in narration_parts],
output_path=f"/tmp/video_{int(time.time())}.mp4",
resolution="1920x1080",
fps=30
)
return {
"video_path": video_result["path"],
"duration": video_result["total_duration"],
"file_size_mb": video_result["file_size_mb"]
}
# --- Étape 5 : Upload et publication ---
def step_upload_publish(self, config: dict, context: dict) -> dict:
"""Upload la vidéo sur YouTube."""
script = context["write_script"]
video = context["create_video"]
upload_result = self.tools["youtube_upload"](
video_path=video["video_path"],
title=script["title"],
description=script["description"],
tags=script["tags"],
category="28", # Science & Technology
privacy="private", # D'abord en privé pour review
thumbnail=context["generate_images"]["images"][0]["image_path"]
)
return {
"video_id": upload_result["video_id"],
"url": f"https://youtube.com/watch?v={upload_result['video_id']}",
"status": "uploaded_private"
}
⚡ Gestion intelligente des erreurs
Pour aller plus loin sur ce sujet, consultez notre guide Créer son premier agent IA autonome.
La gestion d'erreurs est ce qui différencie un pipeline amateur d'un pipeline de production. Voici les stratégies clés.
Pour aller plus loin sur ce sujet, consultez notre guide MCP, Function Calling, Tool Use : le guide complet.
Retry avec backoff exponentiel
import random
def retry_with_backoff(func, max_retries=3, base_delay=5):
"""Retry avec backoff exponentiel et jitter."""
for attempt in range(max_retries):
try:
return func()
except RateLimitError:
# Backoff exponentiel + jitter aléatoire
delay = base_delay * (2 ** attempt) + random.uniform(0, 2)
print(f"Rate limit - attente {delay:.1f}s (tentative {attempt + 1})")
time.sleep(delay)
except (TimeoutError, ConnectionError) as e:
# Erreurs réseau : retry rapide
delay = base_delay * (attempt + 1)
print(f"Erreur réseau: {e} - retry dans {delay}s")
time.sleep(delay)
except ValidationError as e:
# Erreur de validation : pas de retry, c'est une erreur logique
raise
raise MaxRetriesExceeded(f"Échec après {max_retries} tentatives")
Catégories d'erreurs
Toutes les erreurs ne se traitent pas de la même façon :
| Type d'erreur | Action | Retry ? |
|---|---|---|
| Rate limit (429) | Backoff exponentiel | ✅ Oui |
| Timeout réseau | Retry rapide | ✅ Oui |
| Erreur API (500) | Retry avec délai | ✅ Oui |
| Contenu invalide | Re-générer avec un prompt ajusté | ✅ Oui (avec adaptation) |
| Auth expirée (401) | Rafraîchir le token | ✅ Oui (1 fois) |
| Erreur de validation | Corriger la logique | ❌ Non |
| Quota dépassé | Notifier l'humain | ❌ Non |
| Fichier manquant | Vérifier l'étape précédente | ❌ Non (re-run étape précédente) |
L'agent qui s'adapte
L'avantage d'utiliser un agent IA (plutôt qu'un script) pour l'orchestration, c'est qu'il peut raisonner sur les erreurs :
def handle_error_with_llm(self, step_name: str, error: str, context: dict) -> dict:
"""L'agent analyse l'erreur et décide de la stratégie."""
prompt = f"""Une erreur s'est produite dans le pipeline vidéo.
Étape: {step_name}
Erreur: {error}
Contexte: {json.dumps(context, indent=2)}
Analyse l'erreur et recommande une action:
1. RETRY - Réessayer la même étape
2. RETRY_MODIFIED - Réessayer avec des paramètres modifiés (précise lesquels)
3. SKIP - Sauter cette étape (si non critique)
4. ROLLBACK - Refaire l'étape précédente
5. ABORT - Arrêter le pipeline et notifier un humain
Retourne un JSON: {{"action": "...", "reason": "...", "modifications": {{}}}}"""
response = self.llm.complete(prompt)
decision = json.loads(response)
self.log(0, step_name, "info",
f"Décision de l''agent: {decision['action']} - {decision['reason']}")
return decision
Exemples concrets d'adaptation
Scénario 1 : Image générée de mauvaise qualité
Erreur: "Image quality score below threshold (0.3/1.0)"
→ Agent décide: RETRY_MODIFIED
→ Modification: reformuler le prompt en ajoutant "high quality, detailed, 4K"
Scénario 2 : API de TTS temporairement indisponible
Erreur: "503 Service Unavailable"
→ Agent décide: RETRY avec backoff
→ Si 3 échecs: tente un TTS alternatif (fallback)
Scénario 3 : Vidéo trop longue pour YouTube
Erreur: "Video exceeds 15 minute limit for unverified accounts"
→ Agent décide: RETRY_MODIFIED
→ Modification: couper la vidéo en 2 parties
⏰ Orchestration avec Cron
Configuration du cron
Le cron déclenche le pipeline selon un planning défini. Avec OpenClaw, c'est natif :
# Dans la configuration OpenClaw
cron:
video_pipeline:
schedule: "0 8 * * 1-5" # Lundi-Vendredi à 8h
task: "Lance le pipeline vidéo du jour"
enabled: true
Pour un cron système classique :
# crontab -e
# Pipeline vidéo tous les jours à 8h
0 8 * * * /usr/bin/python3 /opt/pipeline/run.py >> /var/log/pipeline.log 2>&1
Le script de lancement
#!/usr/bin/env python3
"""Script lancé par cron pour exécuter le pipeline."""
import sys
import sqlite3
from datetime import datetime, timedelta
from pipeline_agent import PipelineAgent
DB_PATH = "/opt/pipeline/database.db"
def main():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# 1. Vérifier s'il y a un pipeline en échec à reprendre
failed = conn.execute(
"SELECT id FROM pipelines WHERE status='failed' AND error_count < max_retries ORDER BY updated_at LIMIT 1"
).fetchone()
if failed:
print(f"Reprise du pipeline #{failed['id']} en échec")
agent = PipelineAgent(DB_PATH, llm, tools)
result = agent.run_pipeline(failed["id"])
print(f"Résultat: {result['status']}")
return
# 2. Vérifier qu'on n'a pas déjà un pipeline aujourd'hui
today = datetime.now().strftime("%Y-%m-%d")
existing = conn.execute(
"SELECT id FROM pipelines WHERE date(created_at)=? AND status IN ('running','completed')",
(today,)
).fetchone()
if existing:
print(f"Pipeline du jour déjà existant (#{existing['id']})")
return
# 3. Créer et lancer un nouveau pipeline
agent = PipelineAgent(DB_PATH, llm, tools)
# Configuration du jour
config = get_daily_config()
pipeline_id = agent.create_pipeline(
name=f"video_{today}",
config=config
)
print(f"Pipeline #{pipeline_id} créé, lancement...")
result = agent.run_pipeline(pipeline_id)
print(f"Résultat: {result['status']}")
# Notification
if result["status"] == "completed":
notify_success(pipeline_id, result)
elif result["status"] == "failed":
notify_failure(pipeline_id, result)
def get_daily_config():
"""Retourne la configuration du pipeline du jour."""
# On peut varier les thèmes, les styles, etc.
themes = [
{"theme": "nouveautés IA", "audience": "tech enthusiasts"},
{"theme": "tutoriel Python", "audience": "développeurs débutants"},
{"theme": "outils productivité IA", "audience": "entrepreneurs"},
{"theme": "actualité tech", "audience": "grand public tech"},
{"theme": "deep dive architecture", "audience": "développeurs seniors"},
]
day_of_week = datetime.now().weekday()
return themes[day_of_week % len(themes)]
if __name__ == "__main__":
main()
📊 Monitoring et dashboard
Un pipeline de production a besoin de visibilité. Voici les métriques essentielles à suivre.
Requêtes de monitoring
-- Taux de succès sur les 30 derniers jours
SELECT
COUNT(CASE WHEN status = 'completed' THEN 1 END) as success,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed,
COUNT(*) as total,
ROUND(100.0 * COUNT(CASE WHEN status = 'completed' THEN 1 END) / COUNT(*), 1) as success_rate
FROM pipelines
WHERE created_at >= datetime('now', '-30 days');
-- Durée moyenne par étape
SELECT
step_name,
ROUND(AVG(duration_seconds), 1) as avg_duration,
ROUND(MIN(duration_seconds), 1) as min_duration,
ROUND(MAX(duration_seconds), 1) as max_duration,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failures
FROM pipeline_steps
WHERE completed_at IS NOT NULL
GROUP BY step_name
ORDER BY step_order;
-- Pipelines qui ont nécessité des retries
SELECT
p.id,
p.name,
p.status,
SUM(ps.attempts - 1) as total_retries,
GROUP_CONCAT(
CASE WHEN ps.attempts > 1
THEN ps.step_name || '(' || ps.attempts || ')'
END
) as retried_steps
FROM pipelines p
JOIN pipeline_steps ps ON p.id = ps.pipeline_id
WHERE ps.attempts > 1
GROUP BY p.id
ORDER BY p.created_at DESC
LIMIT 20;
Dashboard simple en Python
def print_dashboard(db_path: str):
"""Affiche un dashboard texte du pipeline."""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
# Stats globales
stats = conn.execute("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status='completed' THEN 1 END) as completed,
COUNT(CASE WHEN status='failed' THEN 1 END) as failed,
COUNT(CASE WHEN status='running' THEN 1 END) as running
FROM pipelines
WHERE created_at >= datetime('now', '-7 days')
""").fetchone()
print("=" * 50)
print("📊 PIPELINE DASHBOARD (7 derniers jours)")
print("=" * 50)
print(f" Total: {stats['total']}")
print(f" ✅ OK: {stats['completed']}")
print(f" ❌ Échec: {stats['failed']}")
print(f" 🔄 En cours: {stats['running']}")
if stats['total'] > 0:
rate = 100 * stats['completed'] / stats['total']
print(f" 📈 Taux: {rate:.0f}%")
print()
# Derniers pipelines
recent = conn.execute("""
SELECT id, name, status, created_at,
ROUND((julianday(COALESCE(completed_at, datetime('now')))
- julianday(created_at)) * 86400) as duration_s
FROM pipelines
ORDER BY created_at DESC
LIMIT 5
""").fetchall()
print("📋 Derniers pipelines:")
for p in recent:
icon = {"completed": "✅", "failed": "❌", "running": "🔄", "pending": "⏳"}.get(p["status"], "❓")
duration = f"{int(p['duration_s'] or 0)}s"
print(f" {icon} #{p['id']} {p['name']} ({duration})")
conn.close()
🔒 Bonnes pratiques de production
1. Idempotence
Chaque étape doit être idempotente : l'exécuter deux fois produit le même résultat. C'est crucial pour le retry.
def step_generate_images(self, config, context):
"""Idempotent : vérifie si les images existent déjà."""
output_dir = f"/data/pipeline_{context['pipeline_id']}/images"
os.makedirs(output_dir, exist_ok=True)
expected_count = len(context["write_script"]["scenes"])
existing = glob.glob(f"{output_dir}/*.png")
if len(existing) == expected_count:
# Déjà générées, on retourne les chemins existants
return {"images": sorted(existing), "count": expected_count}
# Sinon, générer les manquantes
...
2. Timeouts sur tout
import signal
class TimeoutError(Exception):
pass
def with_timeout(func, timeout_seconds=300):
"""Exécute une fonction avec un timeout."""
def handler(signum, frame):
raise TimeoutError(f"Timeout après {timeout_seconds}s")
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_seconds)
try:
return func()
finally:
signal.alarm(0)
# Utilisation
result = with_timeout(
lambda: generate_image(prompt),
timeout_seconds=120
)
3. Nettoyage des fichiers temporaires
def cleanup_pipeline(pipeline_id: int, keep_final=True):
"""Nettoie les fichiers temporaires après un pipeline terminé."""
work_dir = f"/data/pipeline_{pipeline_id}"
if keep_final:
# Garder uniquement la vidéo finale
final_video = glob.glob(f"{work_dir}/final_*.mp4")
temp_files = glob.glob(f"{work_dir}/tmp_*")
for f in temp_files:
os.remove(f)
else:
shutil.rmtree(work_dir, ignore_errors=True)
4. Notifications intelligentes
def notify_on_status(pipeline_id: int, status: str, details: dict):
"""Notifie selon le statut — pas de spam."""
if status == "completed":
# Notification simple de succès
send_notification(
title="✅ Vidéo prête",
body=f"Pipeline #{pipeline_id} terminé. URL: {details.get('url')}",
priority="normal"
)
elif status == "failed":
# Notification urgente avec détails
send_notification(
title=f"❌ Pipeline #{pipeline_id} en échec",
body=f"Étape: {details['failed_step']}\nErreur: {details['error'][:200]}",
priority="high"
)
# Pas de notification pour "running" — trop de bruit
🚀 Aller plus loin
Parallélisation des étapes indépendantes
Si certaines étapes sont indépendantes, vous pouvez les paralléliser :
import asyncio
async def run_parallel_steps(self, steps, config, context):
"""Exécute des étapes indépendantes en parallèle."""
tasks = [
asyncio.create_task(self.async_execute_step(step, config, context))
for step in steps
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for step, result in zip(steps, results):
if isinstance(result, Exception):
raise result
context[step["step_name"]] = result
Pipeline conditionnel
L'agent peut décider de modifier le pipeline en cours de route :
def step_quality_check(self, config, context):
"""Vérifie la qualité et décide s'il faut des étapes supplémentaires."""
video = context["create_video"]
quality = self.tools["analyze_video"](video["video_path"])
if quality["audio_score"] < 0.7:
# Ajouter une étape de nettoyage audio
self.add_dynamic_step("clean_audio", after="create_video")
if quality["visual_score"] < 0.7:
# Re-générer les images de mauvaise qualité
bad_scenes = [s for s in quality["scenes"] if s["score"] < 0.5]
return {"needs_regen": True, "bad_scenes": bad_scenes}
return {"quality": "ok", "scores": quality}
Intégration avec OpenClaw
OpenClaw est parfaitement adapté pour orchestrer ce type de pipeline grâce à ses cron jobs natifs et son accès aux outils via MCP :
# Configuration OpenClaw pour le pipeline
cron:
daily_video:
schedule: "0 8 * * 1-5"
task: |
Vérifie la table pipelines dans la BDD.
S'il y a un pipeline en échec, reprends-le.
Sinon, crée un nouveau pipeline vidéo du jour
et exécute-le étape par étape.
Notifie-moi du résultat.
L'avantage d'OpenClaw : l'agent a accès à tous les outils (shell, web, BDD, API) nativement, et le cron est géré par la plateforme. Pas besoin de scripts de lancement complexes.
📚 Articles liés
- Les 5 patterns d'agents IA qui marchent — Les patterns d'architecture utilisés dans ce pipeline
- MCP, Function Calling, Tool Use : le guide complet — Comment l'agent appelle ses outils
- Automatiser sa vie avec OpenClaw — D'autres exemples d'automatisation avec un agent
- Configurer OpenClaw : SOUL, AGENTS et Skills — Configurer l'agent et ses cron jobs
- Installer OpenClaw sur un VPS — Mettre en place l'infrastructure pour votre pipeline
- Sécuriser son installation OpenClaw — Sécuriser un pipeline qui tourne en production