Orchestration Model

Ключевое решение

Агенты НЕ вызывают друг друга напрямую. Вся коммуникация — через таблицу tasks в PostgreSQL.

Почему:

  • Отказоустойчивость — упавший агент не ломает цепочку, задача остаётся в очереди
  • Retry — повторный запуск без дублирования
  • Наблюдаемость — каждая задача имеет статус, время, стоимость
  • Масштабирование — добавляем воркеров без изменения логики
  • Backpressure — очередь растёт, но система не падёт

SQL-схема задач

CREATE TABLE tasks (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    parent_id       UUID REFERENCES tasks(id),
    root_id         UUID,
    from_agent      TEXT,
    to_agent        TEXT NOT NULL,
    type            TEXT NOT NULL,
    hypothesis_id   UUID,
    payload         JSONB NOT NULL,
    success_criteria JSONB,
    kill_criteria   JSONB,
    budget_tokens   INT DEFAULT 50000,
    budget_seconds  INT DEFAULT 300,
    budget_usd      NUMERIC(10,4) DEFAULT 1.0,
    status          TEXT NOT NULL DEFAULT 'pending',
    priority        INT DEFAULT 5,
    created_at      TIMESTAMPTZ DEFAULT now(),
    started_at      TIMESTAMPTZ,
    completed_at    TIMESTAMPTZ,
    result          JSONB,
    error           TEXT
);
 
CREATE INDEX idx_tasks_status_agent ON tasks(status, to_agent, priority);
CREATE INDEX idx_tasks_root ON tasks(root_id);

Жизненный цикл задачи

pending → running → judging → done | failed | escalated
СтатусОписание
pendingВ очереди, ожидает воркера
runningВоркер взял, агент выполняет
judgingВыполнено, Agent-Judge оценивает
doneЗавершено успешно
failedОшибка или kill criteria
escalatedПередано на уровень выше или human

Детали: Process-TaskLifecycle.

Worker Loop

Псевдокод обработки очереди:

while True:
    # Атомарно взять задачу — SKIP LOCKED ключевой
    task = db.execute("""
        UPDATE tasks
        SET status = 'running', started_at = now()
        WHERE id = (
            SELECT id FROM tasks
            WHERE status = 'pending'
              AND to_agent = ANY(%s)
            ORDER BY priority, created_at
            FOR UPDATE SKIP LOCKED
            LIMIT 1
        )
        RETURNING *
    """, [worker.supported_agents])
 
    if not task:
        sleep(1)
        continue
 
    # Pre-execution policy check
    violation = policy.validate_input(task)
    if violation:
        task.status = 'escalated'
        task.error = violation
        continue
 
    # Run agent
    result = agent_runner.run(task)
 
    # Post-execution policy check
    violation = policy.validate_output(task, result)
    if violation:
        task.status = 'escalated'
        task.error = violation
        continue
 
    task.result = result
    task.status = 'judging'  # → Judge оценит

FOR UPDATE SKIP LOCKED — ключевой паттерн. Несколько воркеров берут разные задачи, не блокируя друг друга. См. ADR-0001-Use-Postgres-As-Queue.

Иерархия делегирования

Пример полной цепочки:

User (Telegram)
  └→ OpenClaw (gateway)
       └→ CEO Agent
            └→ Intel Director
                 ├→ Competitor Scout  (parallel)
                 ├→ Market Researcher (parallel)
                 └→ Trend Analyst     (parallel)

Fan-out (делегирование)

Director создаёт N child tasks с parent_id = self.task.id. Все дочерние задачи выполняются параллельно.

Fan-in (агрегация)

Отдельный worker периодически проверяет:

SELECT parent_id FROM tasks
WHERE parent_id IS NOT NULL
GROUP BY parent_id
HAVING COUNT(*) = COUNT(*) FILTER (WHERE status IN ('done', 'failed'))

Когда все children завершены → родительская задача переходит в ready_to_aggregate → Director агрегирует результаты.

Таймауты

  • budget_seconds — максимальное время выполнения
  • Watchdog — фоновый процесс проверяет:
    UPDATE tasks SET status = 'failed', error = 'timeout'
    WHERE status = 'running'
      AND started_at < now() - (budget_seconds || ' seconds')::interval
  • Retry — НЕ автоматический. Задача помечается failed, CEO Agent решает: retry, reassign, или escalate.

Связанные документы