Orchestration Model
Ключевое решение
Агенты НЕ вызывают друг друга напрямую. Вся коммуникация — через таблицу tasks в PostgreSQL.
Почему:
- Отказоустойчивость — упавший агент не ломает цепочку, задача остаётся в очереди
- Retry — повторный запуск без дублирования
- Наблюдаемость — каждая задача имеет статус, время, стоимость
- Масштабирование — добавляем воркеров без изменения логики
- Backpressure — очередь растёт, но система не падёт
SQL-схема задач
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
parent_id UUID REFERENCES tasks(id),
root_id UUID,
from_agent TEXT,
to_agent TEXT NOT NULL,
type TEXT NOT NULL,
hypothesis_id UUID,
payload JSONB NOT NULL,
success_criteria JSONB,
kill_criteria JSONB,
budget_tokens INT DEFAULT 50000,
budget_seconds INT DEFAULT 300,
budget_usd NUMERIC(10,4) DEFAULT 1.0,
status TEXT NOT NULL DEFAULT 'pending',
priority INT DEFAULT 5,
created_at TIMESTAMPTZ DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
result JSONB,
error TEXT
);
CREATE INDEX idx_tasks_status_agent ON tasks(status, to_agent, priority);
CREATE INDEX idx_tasks_root ON tasks(root_id);Жизненный цикл задачи
pending → running → judging → done | failed | escalated
| Статус | Описание |
|---|---|
pending | В очереди, ожидает воркера |
running | Воркер взял, агент выполняет |
judging | Выполнено, Agent-Judge оценивает |
done | Завершено успешно |
failed | Ошибка или kill criteria |
escalated | Передано на уровень выше или human |
Детали: Process-TaskLifecycle.
Worker Loop
Псевдокод обработки очереди:
while True:
# Атомарно взять задачу — SKIP LOCKED ключевой
task = db.execute("""
UPDATE tasks
SET status = 'running', started_at = now()
WHERE id = (
SELECT id FROM tasks
WHERE status = 'pending'
AND to_agent = ANY(%s)
ORDER BY priority, created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
""", [worker.supported_agents])
if not task:
sleep(1)
continue
# Pre-execution policy check
violation = policy.validate_input(task)
if violation:
task.status = 'escalated'
task.error = violation
continue
# Run agent
result = agent_runner.run(task)
# Post-execution policy check
violation = policy.validate_output(task, result)
if violation:
task.status = 'escalated'
task.error = violation
continue
task.result = result
task.status = 'judging' # → Judge оценитFOR UPDATE SKIP LOCKED — ключевой паттерн. Несколько воркеров берут разные задачи, не блокируя друг друга. См. ADR-0001-Use-Postgres-As-Queue.
Иерархия делегирования
Пример полной цепочки:
User (Telegram)
└→ OpenClaw (gateway)
└→ CEO Agent
└→ Intel Director
├→ Competitor Scout (parallel)
├→ Market Researcher (parallel)
└→ Trend Analyst (parallel)
Fan-out (делегирование)
Director создаёт N child tasks с parent_id = self.task.id. Все дочерние задачи выполняются параллельно.
Fan-in (агрегация)
Отдельный worker периодически проверяет:
SELECT parent_id FROM tasks
WHERE parent_id IS NOT NULL
GROUP BY parent_id
HAVING COUNT(*) = COUNT(*) FILTER (WHERE status IN ('done', 'failed'))Когда все children завершены → родительская задача переходит в ready_to_aggregate → Director агрегирует результаты.
Таймауты
budget_seconds— максимальное время выполнения- Watchdog — фоновый процесс проверяет:
UPDATE tasks SET status = 'failed', error = 'timeout' WHERE status = 'running' AND started_at < now() - (budget_seconds || ' seconds')::interval - Retry — НЕ автоматический. Задача помечается failed, CEO Agent решает: retry, reassign, или escalate.
Связанные документы
- ADR-0001-Use-Postgres-As-Queue — обоснование выбора PG
- System-Overview — общая архитектура
- Policy-Layer — pre/post проверки
- Process-TaskLifecycle — детали жизненного цикла
- Agent-CEO — верхний уровень оркестрации