Automating a Complete Pipeline with an Agent
You have an AI agent that answers questions. That's good. But the real power of agents lies in putting them to work on complete production pipelines — chains of operations that transform a raw idea into a final deliverable, automatically, without human intervention.
In this article, we build a concrete case together: a video production pipeline automated by an AI agent. From the initial idea to uploading on YouTube, including script generation, image creation, and video editing. We cover the architecture, error handling, retries, and orchestration with cron + agent + database.
🎯 The Concrete Case: Automated Video Pipeline
Here's the pipeline we're going to build:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ IDEA │───▶│ SCRIPT │───▶│ IMAGES │───▶│ VIDEO │───▶│ UPLOAD │
│ │ │ │ │ │ │ │ │ │
│ Topic + │ │ Text + │ │ Visuals │ │ Editing │ │ YouTube │
│ angle │ │ narrat° │ │ per │ │ auto │ │ + meta │
│ │ │ │ │ scene │ │ │ │ │
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └─────────┘
Each step can fail. The agent must:
- Orchestrate the tools in the right order
- Detect errors and retry intelligently
- Persist the state to resume after a crash
- Notify in case of a blocking problem
🏗️ Global Architecture
The Three Pillars
The architecture relies on three collaborating components:
┌───────────────────────────────────────────────┐
│ CRON │
│ Triggers the pipeline at regular intervals │
│ Ex: every day at 8am │
└──────────────────────┬────────────────────────┘
│
▼
┌───────────────────────────────────────────────┐
│ AGENT IA │
│ Orchestrates tools, manages logic │
│ Makes decisions, handles errors │
└──────────────────────┬────────────────────────┘
│
▼
┌───────────────────────────────────────────────┐
│ DATABASE │
│ Stores the state of each task │
│ History, statuses, intermediate results │
└───────────────────────────────────────────────┘
Why This Architecture?
| Component | Role | Why It's Necessary |
|---|---|---|
| Cron | Trigger | The agent doesn't run 24/7, cron wakes it up |
| Agent | Brain | Makes decisions, adapts to errors |
| DB | Memory | Survives crashes, enables tracking and debugging |
Without the DB, a crash = starting over. Without cron, manual launch is required. Without the agent, it's a rigid script that breaks at the slightest unexpected event.
💾 Data Model
Let's start with the database. We use SQLite for simplicity, but the concept applies to PostgreSQL or any other DBMS.
-- Main pipelines table
CREATE TABLE pipelines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
status TEXT DEFAULT 'pending',
-- pending, running, completed, failed, paused
current_step TEXT,
config JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
completed_at DATETIME,
error_count INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3
);
-- Individual steps table
CREATE TABLE pipeline_steps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER REFERENCES pipelines(id),
step_name TEXT NOT NULL,
step_order INTEGER NOT NULL,
status TEXT DEFAULT 'pending',
-- pending, running, completed, failed, skipped
input_data JSON,
output_data JSON,
error_message TEXT,
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 3,
started_at DATETIME,
completed_at DATETIME,
duration_seconds REAL
);
-- Logs table for debugging
CREATE TABLE pipeline_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id INTEGER REFERENCES pipelines(id),
step_name TEXT,
level TEXT DEFAULT 'info',
-- debug, info, warning, error
message TEXT,
metadata JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for frequent queries
CREATE INDEX idx_pipelines_status ON pipelines(status);
CREATE INDEX idx_steps_pipeline ON pipeline_steps(pipeline_id, step_order);
CREATE INDEX idx_logs_pipeline ON pipeline_logs(pipeline_id, created_at);
Why Persist Each Step?
Imagine image generation works (step 3) but video editing fails (step 4). Without persistence, you have to redo everything — including images that cost API credits. With the DB:
# The agent checks the state and resumes where it left off
pipeline = db.get_pipeline(pipeline_id)
steps = db.get_steps(pipeline_id)
for step in steps:
if step.status == "completed":
continue # Already done, skip
if step.status == "failed" and step.attempts >= step.max_attempts:
notify_human(f"Step {step.step_name} failed definitively")
break
# Execute or re-execute this step
execute_step(step)
🤖 The Orchestrating Agent
Code Structure
Here's the complete architecture of the orchestrating agent:
```python
import sqlite3
import json
import time
from datetime import datetime
from typing import Optional
class PipelineAgent:
"""Agent that orchestrates a video production pipeline."""
def __init__(self, db_path: str, llm_client, tools: dict):
self.db_path = db_path
self.llm = llm_client
self.tools = tools
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
def log(self, pipeline_id: int, step: str, level: str, message: str, metadata=None):
"""Records a log in the DB."""
self.conn.execute(
"INSERT INTO pipeline_logs (pipeline_id, step_name, level, message, metadata) VALUES (?, ?, ?, ?, ?)",
(pipeline_id, step, level, message, json.dumps(metadata) if metadata else None)
)
self.conn.commit()
def create_pipeline(self, name: str, config: dict) -> int:
"""Creates a new pipeline with its steps."""
cursor = self.conn.execute(
"INSERT INTO pipelines (name, config) VALUES (?, ?)",
(name, json.dumps(config))
)
pipeline_id = cursor.lastrowid
steps = [
("generate_idea", 1),
("write_script", 2),
("generate_images", 3),
("create_video", 4),
("upload_publish", 5),
]
for step_name, order in steps:
self.conn.execute(
"INSERT INTO pipeline_steps (pipeline_id, step_name, step_order) VALUES (?, ?, ?)",
(pipeline_id, step_name, order)
)
self.conn.commit()
self.log(pipeline_id, "init", "info", f"Pipeline '{name}' created with {len(steps)} steps")
return pipeline_id
def run_pipeline(self, pipeline_id: int):
"""Executes (or resumes) a pipeline."""
pipeline = self.conn.execute(
"SELECT * FROM pipelines WHERE id = ?", (pipeline_id,)
).fetchone()
if not pipeline:
raise ValueError(f"Pipeline {pipeline_id} not found")
if pipeline["status"] == "completed":
return {"status": "already_completed"}
# Mark as running
self.conn.execute(
"UPDATE pipelines SET status='running', updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pipeline_id,)
)
self.conn.commit()
steps = self.conn.execute(
"SELECT * FROM pipeline_steps WHERE pipeline_id=? ORDER BY step_order",
(pipeline_id,)
).fetchall()
config = json.loads(pipeline["config"])
context = {} # Accumulated results
for step in steps:
if step["status"] == "completed":
# Load previous result into context
if step["output_data"]:
context[step["step_name"]] = json.loads(step["output_data"])
continue
# Execute step
success = self.execute_step(pipeline_id, step, config, context)
if not success:
self.conn.execute(
"UPDATE pipelines SET status='failed', current_step=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(step["step_name"], pipeline_id)
)
self.conn.commit()
self.log(pipeline_id, step["step_name"], "error",
f"Pipeline stopped: step {step['step_name']} failed definitively")
return {"status": "failed", "failed_step": step["step_name"]}
# Everything is done
self.conn.execute(
"UPDATE pipelines SET status='completed', completed_at=CURRENT_TIMESTAMP, updated_at=CURRENT_TIMESTAMP WHERE id=?",
(pipeline_id,)
)
self.conn.commit()
self.log(pipeline_id, "final", "info", "Pipeline completed successfully")
return {"status": "completed", "context": context}
def execute_step(self, pipeline_id: int, step, config: dict, context: dict) -> bool:
"""Executes a step with retry and error handling."""
step_name = step["step_name"]
max_attempts = step["max_attempts"]
current_attempts = step["attempts"]
for attempt in range(current_attempts, max_attempts):
self.log(pipeline_id, step_name, "info",
f"Attempt {attempt + 1}/{max_attempts}")
# Mark as running
self.conn.execute(
"""UPDATE pipeline_steps
SET status='running', attempts=?, started_at=CURRENT_TIMESTAMP
WHERE id=?""",
(attempt + 1, step["id"])
)
self.conn.commit()
start_time = time.time()
try:
# Call corresponding function
handler = getattr(self, f"step_{step_name}")
result = handler(config, context)
duration = time.time() - start_time
# Success!
self.conn.execute(
"""UPDATE pipeline_steps
SET status='completed', output_data=?,
completed_at=CURRENT_TIMESTAMP, duration_seconds=?
WHERE id=?""",
(json.dumps(result), duration, step["id"])
)
self.conn.commit()
context[step_name] = result
self.log(pipeline_id, step_name, "info",
f"Step succeeded in {duration:.1f}s")
return True
except Exception as e:
duration = time.time() - start_time
error_msg = str(e)
self.conn.execute(
"""UPDATE pipeline_steps
SET status='failed', error_message=?, duration_seconds=?
WHERE id=?""",
(error_msg, duration, step["id"])
)
self.conn.commit()
self.log(pipeline_id, step_name, "error",
f"Attempt {attempt + 1} failed: {error_msg}")
# Exponential backoff before retry
if attempt < max_attempts - 1:
wait_time = 2 ** attempt * 5 # 5s, 10s, 20s
self.log(pipeline_id, step_name, "info",
f"Waiting {wait_time}s before retry")