📑 Table des matières

Automatiser un pipeline complet avec un agent

Agents IA 🔴 Avancé ⏱️ 16 min de lecture 📅 2026-02-24

Automating a Complete Pipeline with an Agent

You have an AI agent that answers questions. That's good. But the real power of agents lies in putting them to work on complete production pipelines — chains of operations that transform a raw idea into a final deliverable, automatically, without human intervention.

In this article, we build a concrete case together: a video production pipeline automated by an AI agent. From the initial idea to uploading on YouTube, including script generation, image creation, and video editing. We cover the architecture, error handling, retries, and orchestration with cron + agent + database.

🎯 The Concrete Case: Automated Video Pipeline

Here's the pipeline we're going to build:

┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐
│  IDEA   │───▶│ SCRIPT  │───▶│ IMAGES  │───▶│ VIDEO   │───▶│ UPLOAD  │
│         │    │         │    │         │    │         │    │         │
│ Topic + │    │ Text + │    │ Visuals │    │ Editing │    │ YouTube │
│ angle   │    │ narrat° │    │ per     │    │ auto    │    │ + meta  │
│         │    │         │    │ scene   │    │         │    │         │
└─────────┘    └─────────┘    └─────────┘    └─────────┘    └─────────┘

Each step can fail. The agent must:

  • Orchestrate the tools in the right order
  • Detect errors and retry intelligently
  • Persist the state to resume after a crash
  • Notify in case of a blocking problem

🏗️ Global Architecture

The Three Pillars

The architecture relies on three collaborating components:

┌───────────────────────────────────────────────┐
                    CRON                         
  Triggers the pipeline at regular intervals  
  Ex: every day at 8am                           
└──────────────────────┬────────────────────────┘
                       
                       
┌───────────────────────────────────────────────┐
                   AGENT IA                      
  Orchestrates tools, manages logic              
  Makes decisions, handles errors               
└──────────────────────┬────────────────────────┘
                       
                       
┌───────────────────────────────────────────────┐
              DATABASE                           
  Stores the state of each task                 
  History, statuses, intermediate results        
└───────────────────────────────────────────────┘

Why This Architecture?

Component Role Why It's Necessary
Cron Trigger The agent doesn't run 24/7, cron wakes it up
Agent Brain Makes decisions, adapts to errors
DB Memory Survives crashes, enables tracking and debugging

Without the DB, a crash = starting over. Without cron, manual launch is required. Without the agent, it's a rigid script that breaks at the slightest unexpected event.

💾 Data Model

Let's start with the database. We use SQLite for simplicity, but the concept applies to PostgreSQL or any other DBMS.

-- Main pipelines table
CREATE TABLE pipelines (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT NOT NULL,
    status TEXT DEFAULT 'pending',
    -- pending, running, completed, failed, paused
    current_step TEXT,
    config JSON,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    completed_at DATETIME,
    error_count INTEGER DEFAULT 0,
    max_retries INTEGER DEFAULT 3
);

-- Individual steps table
CREATE TABLE pipeline_steps (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pipeline_id INTEGER REFERENCES pipelines(id),
    step_name TEXT NOT NULL,
    step_order INTEGER NOT NULL,
    status TEXT DEFAULT 'pending',
    -- pending, running, completed, failed, skipped
    input_data JSON,
    output_data JSON,
    error_message TEXT,
    attempts INTEGER DEFAULT 0,
    max_attempts INTEGER DEFAULT 3,
    started_at DATETIME,
    completed_at DATETIME,
    duration_seconds REAL
);

-- Logs table for debugging
CREATE TABLE pipeline_logs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    pipeline_id INTEGER REFERENCES pipelines(id),
    step_name TEXT,
    level TEXT DEFAULT 'info',
    -- debug, info, warning, error
    message TEXT,
    metadata JSON,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- Indexes for frequent queries
CREATE INDEX idx_pipelines_status ON pipelines(status);
CREATE INDEX idx_steps_pipeline ON pipeline_steps(pipeline_id, step_order);
CREATE INDEX idx_logs_pipeline ON pipeline_logs(pipeline_id, created_at);

Why Persist Each Step?

Imagine image generation works (step 3) but video editing fails (step 4). Without persistence, you have to redo everything — including images that cost API credits. With the DB:

# The agent checks the state and resumes where it left off
pipeline = db.get_pipeline(pipeline_id)
steps = db.get_steps(pipeline_id)

for step in steps:
    if step.status == "completed":
        continue  # Already done, skip
    if step.status == "failed" and step.attempts >= step.max_attempts:
        notify_human(f"Step {step.step_name} failed definitively")
        break
    # Execute or re-execute this step
    execute_step(step)

🤖 The Orchestrating Agent

Code Structure

Here's the complete architecture of the orchestrating agent:

```python
import sqlite3
import json
import time
from datetime import datetime
from typing import Optional

class PipelineAgent:
"""Agent that orchestrates a video production pipeline."""

def __init__(self, db_path: str, llm_client, tools: dict):
    self.db_path = db_path
    self.llm = llm_client
    self.tools = tools
    self.conn = sqlite3.connect(db_path)
    self.conn.row_factory = sqlite3.Row

def log(self, pipeline_id: int, step: str, level: str, message: str, metadata=None):
    """Records a log in the DB."""
    self.conn.execute(
        "INSERT INTO pipeline_logs (pipeline_id, step_name, level, message, metadata) VALUES (?, ?, ?, ?, ?)",
        (pipeline_id, step, level, message, json.dumps(metadata) if metadata else None)
    )
    self.conn.commit()

def create_pipeline(self, name: str, config: dict) -> int:
    """Creates a new pipeline with its steps."""
    cursor = self.conn.execute(
        "INSERT INTO pipelines (name, config) VALUES (?, ?)",
        (name, json.dumps(config))
    )
    pipeline_id = cursor.lastrowid

    steps = [
        ("generate_idea", 1),
        ("write_script", 2),
        ("generate_images", 3),
        ("create_video", 4),
        ("upload_publish", 5),
    ]

    for step_name, order in steps:
        self.conn.execute(
            "INSERT INTO pipeline_steps (pipeline_id, step_name, step_order) VALUES (?, ?, ?)",
            (pipeline_id, step_name, order)
        )

    self.conn.commit()
    self.log(pipeline_id, "init", "info", f"Pipeline '{name}' created with {len(steps)} steps")
    return pipeline_id

def run_pipeline(self, pipeline_id: int):
    """Executes (or resumes) a pipeline."""
    pipeline = self.conn.execute(
        "SELECT * FROM pipelines WHERE id = ?", (pipeline_id,)
    ).fetchone()

    if not pipeline:
        raise ValueError(f"Pipeline {pipeline_id} not found")

    if pipeline["status"] == "completed":
        return {"status": "already_completed"}

    # Mark as running
    self.conn.execute(
        "UPDATE pipelines SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?",
        (pipeline_id,)
    )
    self.conn.commit()

    steps = self.conn.execute(
        "SELECT * FROM pipeline_steps WHERE pipeline_id=? ORDER BY step_order",
        (pipeline_id,)
    ).fetchall()

    config = json.loads(pipeline["config"])
    context = {}  # Accumulated results

    for step in steps:
        if step["status"] == "completed":
            # Load previous result into context
            if step["output_data"]:
                context[step["step_name"]] = json.loads(step["output_data"])
            continue

        # Execute step
        success = self.execute_step(pipeline_id, step, config, context)

        if not success:
            self.conn.execute(
                "UPDATE pipelines SET status='failed', current_step=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
                (step["step_name"], pipeline_id)
            )
            self.conn.commit()
            self.log(pipeline_id, step["step_name"], "error",
                    f"Pipeline stopped: step {step['step_name']} failed definitively")
            return {"status": "failed", "failed_step": step["step_name"]}

    # Everything is done
    self.conn.execute(
        "UPDATE pipelines SET status='completed', completed_at=CURRENT_TIMESTAMP, updated_at=CURRENT_TIMESTAMP WHERE id=?",
        (pipeline_id,)
    )
    self.conn.commit()
    self.log(pipeline_id, "final", "info", "Pipeline completed successfully")
    return {"status": "completed", "context": context}

def execute_step(self, pipeline_id: int, step, config: dict, context: dict) -> bool:
    """Executes a step with retry and error handling."""
    step_name = step["step_name"]
    max_attempts = step["max_attempts"]
    current_attempts = step["attempts"]

    for attempt in range(current_attempts, max_attempts):
        self.log(pipeline_id, step_name, "info",
                f"Attempt {attempt + 1}/{max_attempts}")

        # Mark as running
        self.conn.execute(
            """UPDATE pipeline_steps
               SET status='running', attempts=?, started_at=CURRENT_TIMESTAMP
               WHERE id=?""",
            (attempt + 1, step["id"])
        )
        self.conn.commit()

        start_time = time.time()

        try:
            # Call corresponding function
            handler = getattr(self, f"step_{step_name}")
            result = handler(config, context)

            duration = time.time() - start_time

            # Success!
            self.conn.execute(
                """UPDATE pipeline_steps
                   SET status='completed', output_data=?,
                       completed_at=CURRENT_TIMESTAMP, duration_seconds=?
                   WHERE id=?""",
                (json.dumps(result), duration, step["id"])
            )
            self.conn.commit()

            context[step_name] = result
            self.log(pipeline_id, step_name, "info",
                    f"Step succeeded in {duration:.1f}s")
            return True

        except Exception as e:
            duration = time.time() - start_time
            error_msg = str(e)

            self.conn.execute(
                """UPDATE pipeline_steps
                   SET status='failed', error_message=?, duration_seconds=?
                   WHERE id=?""",
                (error_msg, duration, step["id"])
            )
            self.conn.commit()

            self.log(pipeline_id, step_name, "error",
                    f"Attempt {attempt + 1} failed: {error_msg}")

            # Exponential backoff before retry
            if attempt < max_attempts - 1:
                wait_time = 2 ** attempt * 5  # 5s, 10s, 20s
                self.log(pipeline_id, step_name, "info",
                        f"Waiting {wait_time}s before retry")