# Pipeline Incident Commander –– Airflow

Automatically triage a data pipeline incident by spawning three specialist sub-agents in parallel — a log analyser, a query profiler, and an owner notifier — then composing their findings into a single structured Slack post. The orchestrator runs the moment a failure is detected, with no manual intervention required.

This tutorial covers two trigger paths: an **Airflow `on_failure_callback`** that fires the incident commander automatically when any DAG task fails, and a **manual CLI trigger** for teams who want to invoke triage on demand.

{% hint style="info" icon="compass" %}
**Before You Start**

**Paradime**

* Your Paradime API endpoint, API key, and API secret — generate these under Workspace Settings → API. Make sure to enable `DinoAI agent API` capabilities. Requires Admin access.

**Recommended reading**

Before proceeding, read the Programmable Agents section under **Products → DinoAI**:

* Quick Start
* YAML Configuration
* Tools Reference
* Agent-to-Agent Delegation

This tutorial uses `invoke_agent`, `notify_parent_session`, and `child_session_ids` — all covered in the Agent-to-Agent Delegation guide.

**Integrations**

The following must already be connected in Paradime:

* **Slack** — the orchestrator and sub-agents post to `#incidents` via `post_slack_message`
  {% endhint %}

## What You'll Build

By the end of this guide you'll have:

* Four DinoAI agent YAMLs — one orchestrator (`incident-commander`) and three specialists (`log-analyzer`, `query-profiler`, `owner-notifier`)
* An Airflow `on_failure_callback` function that fires the incident commander automatically when any task in a DAG fails, pre-populated with the DAG name, task ID, run ID, and execution date
* A manual trigger script for ad-hoc incident triage

### What Happens During a Triage

Once the incident commander is triggered:

```
incident-commander (session A)
    ├─ invoke_agent("log-analyzer")    ─► session B  ─► pulls Bolt run logs
    ├─ invoke_agent("query-profiler")  ─► session C  ─► queries warehouse history
    └─ invoke_agent("owner-notifier")  ─► session D  ─► pings model owner in Slack

    B, C, D run in parallel
        ├─ each posts updates into the #incidents Slack thread
        └─ each calls notify_parent_session with their findings

    incident-commander resumes once all three have reported
        └─ posts unified ROOT CAUSE / IMPACT / OWNER / NEXT ACTION to #incidents
```

The commander never investigates itself — it only delegates and composes. None of the sub-agents have `invoke_agent` in their tool allowlist, so the graph is exactly two levels deep. This keeps incident response predictable and prevents runaway delegation chains.

### The Slack Incident Post

Once all three sub-agents have reported back, the incident commander posts a single structured message to `#incidents`:

```
🚨 INCIDENT TRIAGE — hourly_marts / fct_orders

ROOT CAUSE
  [log-analyzer findings]

IMPACT
  [query-profiler findings — top 3 slow/failing queries]

OWNER
  [owner-notifier findings — who was pinged and confirmed]

NEXT ACTION
  [commander synthesis — one concrete action item]
```

## Architecture Overview

```mermaid
flowchart TD
    AF([Airflow on_failure_callback\nor manual CLI trigger]) --> IC

    IC([incident-commander\nsession A])
    IC --> LA([log-analyzer\nsession B])
    IC --> QP([query-profiler\nsession C])
    IC --> ON([owner-notifier\nsession D])

    LA --> LA1[list_bolt_schedules\nget_bolt_run_logs]
    LA --> LA2[notify_parent_session\n→ session A]

    QP --> QP1[run_sql_query\nlist_all_snowflake_databases]
    QP --> QP2[notify_parent_session\n→ session A]

    ON --> ON1[read_file · ripgrep_search\nfind meta.owner in schema YAML]
    ON --> ON2[post_slack_message\nping owner in #incidents]
    ON --> ON3[notify_parent_session\n→ session A]

    LA2 & QP2 & ON3 --> IC2([incident-commander resumes\nposts unified summary → #incidents])
```

{% stepper %}
{% step %}

### Create the Agent YAMLs

Create all four agent files. The orchestrator and three sub-agents must all exist in `.dinoai/agents/` before the first trigger.

**Orchestrator — `incident-commander`**

{% code title=".dinoai/agents/incident-commander.yml" lineNumbers="true" %}

```yaml
name: incident-commander
version: 1

role: >
  Data Platform Incident Commander coordinating triage across log,
  warehouse, and people axes during a pipeline incident.

goal: >
  When triggered with an incident summary: spawn the log-analyzer,
  query-profiler, and owner-notifier sub-agents in parallel via
  invoke_agent. Wait for their findings via the callback pattern and
  post a unified incident summary to the incident Slack thread with
  ROOT CAUSE / IMPACT / OWNER / NEXT ACTION.

  Structure the final Slack post as follows:

  🚨 INCIDENT TRIAGE — <schedule or DAG name> / <suspect model>

  ROOT CAUSE
    <log-analyzer findings verbatim>

  IMPACT
    <query-profiler findings verbatim — top 3 queries>

  OWNER
    <owner-notifier findings — who was pinged and confirmed>

  NEXT ACTION
    <one concrete action item synthesised from all three findings>

backstory: >
  You delegate aggressively — you do not investigate yourself. You only
  compose the final report. You always include the three sub-agent
  findings verbatim under their respective headings before writing the
  synthesis. You never post a partial report — wait for all three
  sub-agents to notify before composing.

tools:
  mode: allowlist
  list:
    - post_slack_message
    - invoke_agent

agents_squad:
  - log-analyzer
  - query-profiler
  - owner-notifier

slack:
  channel: "#incidents"
```

{% endcode %}

**Sub-agent — `log-analyzer`**

{% code title=".dinoai/agents/log-analyzer.yml" lineNumbers="true" %}

```yaml
name: log-analyzer
version: 1

role: Bolt log specialist.

goal: >
  Pull the relevant Bolt run logs for the failing schedule and report the
  failing step plus a one-line root cause. Notify the parent session with
  your findings once complete.

backstory: >
  You are concise. One paragraph maximum. You only report facts from the
  logs — never speculation.

tools:
  mode: allowlist
  list:
    - list_bolt_schedules
    - get_bolt_run_logs
    - notify_parent_session
```

{% endcode %}

**Sub-agent — `query-profiler`**

{% code title=".dinoai/agents/query-profiler.yml" lineNumbers="true" %}

```yaml
name: query-profiler
version: 1

role: Warehouse query specialist.

goal: >
  Run information-schema queries to identify slow or failing warehouse
  queries in the last hour. Report the top 3 with their owner model.
  Notify the parent session with your findings once complete.

backstory: >
  You only return facts from the warehouse, never speculation. Always
  include query ID, execution time, and model name in your findings.

tools:
  mode: allowlist
  list:
    - run_sql_query
    - list_all_snowflake_databases
    - notify_parent_session
```

{% endcode %}

**Sub-agent — `owner-notifier`**

{% code title=".dinoai/agents/owner-notifier.yml" lineNumbers="true" %}

```yaml
name: owner-notifier
version: 1

role: People-routing specialist.

goal: >
  For the model named in the trigger, read its schema YAML to find the
  owner field (meta.owner) and post a direct Slack ping in the incident
  channel. Notify the parent session with the owner you contacted.

backstory: >
  You always confirm the owner exists in the YAML before pinging. If no
  owner is found, report that explicitly rather than pinging a guess.

tools:
  mode: allowlist
  list:
    - read_file
    - search_files_and_directories
    - ripgrep_search
    - post_slack_message
    - notify_parent_session
```

{% endcode %}

{% hint style="info" %}
None of the sub-agents have `invoke_agent` in their tool allowlist. This keeps the delegation graph exactly two levels deep — the commander is the only delegator. Sub-agents cannot spawn further children, which makes incident response predictable and prevents runaway chains.
{% endhint %}
{% endstep %}

{% step %}

### Trigger Path A — Airflow `on_failure_callback`

This is the recommended path for teams running Airflow. The callback fires automatically when any task in your DAG fails, extracts the task and DAG context from Airflow's `context` dict, and hands it directly to the incident commander so the triage message is pre-populated with real incident details.

Create `dags/callbacks/incident_commander.py`:

{% code title="dags/callbacks/incident:commander.py" lineNumbers="true" %}

```python
"""
incident_commander.py
---------------------
Airflow on_failure_callback that triggers the `incident-commander`
DinoAI agent when a DAG task fails.

Attach to any DAG or individual task:

    # Whole DAG — fires on any task failure
    with DAG(
        "hourly_marts",
        on_failure_callback=trigger_incident_commander,
        ...
    ) as dag:
        ...

    # Single task — fires only when this task fails
    run_fct_orders = DbtRunOperator(
        task_id="fct_orders",
        on_failure_callback=trigger_incident_commander,
        ...
    )

Environment variables (required — set in Airflow Connections or Variables):
    PARADIME_API_ENDPOINT  – Paradime API endpoint
    PARADIME_API_KEY       – Paradime API key
    PARADIME_API_SECRET    – Paradime API secret

Optional:
    INCIDENT_SLACK_CHANNEL – Slack channel for incident posts (default: #incidents)
    SUSPECT_MODEL          – dbt™ model to focus triage on (default: extracted
                             from failed task_id if it matches a model name)
"""

from __future__ import annotations

import os
import threading
from typing import Any

from paradime import Paradime

AGENT_NAME     = "incident-commander"
SLACK_CHANNEL  = os.environ.get("INCIDENT_SLACK_CHANNEL", "#incidents")


def _build_client() -> Paradime:
    return Paradime(
        api_endpoint=os.environ["PARADIME_API_ENDPOINT"],
        api_key=os.environ["PARADIME_API_KEY"],
        api_secret=os.environ["PARADIME_API_SECRET"],
    )


def _build_incident_message(context: dict[str, Any]) -> str:
    """
    Build the incident trigger message from Airflow task context.
    Airflow passes a rich context dict to on_failure_callback — we extract
    the most useful fields for the triage agent.
    """
    dag_id        = context.get("dag").dag_id if context.get("dag") else "unknown"
    task_id       = context.get("task_instance").task_id if context.get("task_instance") else "unknown"
    run_id        = context.get("run_id", "unknown")
    execution_date = str(context.get("execution_date", "unknown"))
    exception     = str(context.get("exception", "unknown"))
    log_url       = context.get("task_instance").log_url if context.get("task_instance") else "unavailable"

    # Try to infer the suspect dbt™ model from the task ID or env override
    suspect_model = (
        os.environ.get("SUSPECT_MODEL")
        or task_id  # Airflow task IDs often match dbt™ model names directly
    )

    return (
        f"INCIDENT: Airflow task failure detected.\n\n"
        f"DAG:            {dag_id}\n"
        f"Failed task:    {task_id}\n"
        f"Run ID:         {run_id}\n"
        f"Execution date: {execution_date}\n"
        f"Exception:      {exception}\n"
        f"Log URL:        {log_url}\n"
        f"Suspect model:  {suspect_model}\n\n"
        f"Coordinate triage across log-analyzer, query-profiler, and "
        f"owner-notifier. Post a unified ROOT CAUSE / IMPACT / OWNER / "
        f"NEXT ACTION summary to {SLACK_CHANNEL}."
    )


def trigger_incident_commander(context: dict[str, Any]) -> None:
    """
    Airflow on_failure_callback. Fires asynchronously so it does not block
    Airflow's task runner thread while the agent session runs.

    Attach to a DAG:   on_failure_callback=trigger_incident_commander
    Attach to a task:  on_failure_callback=trigger_incident_commander
    """
    def _run() -> None:
        try:
            paradime = _build_client()
            message  = _build_incident_message(context)

            dag_id  = context.get("dag").dag_id if context.get("dag") else "unknown"
            task_id = context.get("task_instance").task_id if context.get("task_instance") else "unknown"

            print(f"[incident-commander] 🚨  Triggering triage for {dag_id}/{task_id} …")

            run = paradime.dinoai_agents.trigger_run_and_wait(
                agent=AGENT_NAME,
                message=message,
                slack_channel=SLACK_CHANNEL,
                timeout=1800,   # 30 minutes max
            )

            print(
                f"[incident-commander] ✅  Triage complete for {dag_id}/{task_id}.\n"
                f"{run.messages[-1].content}"
            )

        except Exception as exc:  # noqa: BLE001
            # Log but never re-raise — a callback failure must not mask the original error
            print(f"[incident-commander] ❌  Failed to trigger triage: {exc}")

    # Run in a daemon thread so Airflow doesn't wait for it
    thread = threading.Thread(target=_run, daemon=True)
    thread.start()
```

{% endcode %}

Then attach the callback to your DAG:

{% code title="dags/hourly\_marts.py" lineNumbers="true" %}

```python
from airflow import DAG
from airflow.utils.dates import days_ago
from callbacks.incident_commander import trigger_incident_commander

with DAG(
    dag_id="hourly_marts",
    schedule_interval="@hourly",
    start_date=days_ago(1),
    on_failure_callback=trigger_incident_commander,   # ← fires on any task failure
    catchup=False,
) as dag:
    ...
```

{% endcode %}

{% hint style="info" %}
The callback runs in a **daemon thread** so it does not block Airflow's task runner while the agent session runs (which can take several minutes). The original task failure is always surfaced normally in Airflow regardless of whether the triage agent succeeds or fails.
{% endhint %}

{% hint style="info" %}
To attach the callback to a **single task** rather than the whole DAG, set `on_failure_callback=trigger_incident_commander` on the operator directly. This is useful when only certain high-priority models should trigger a full triage.
{% endhint %}

{% hint style="info" %}
`PARADIME_API_ENDPOINT`, `PARADIME_API_KEY`, and `PARADIME_API_SECRET` should be stored as **Airflow Variables** or in an **Airflow Connection**, not hardcoded. Retrieve them with `Variable.get("PARADIME_API_KEY")` if you prefer the Airflow Variables pattern over environment variables.
{% endhint %}
{% endstep %}

{% step %}

### Trigger Path B — Manual CLI Trigger

For teams not running Airflow, or for ad-hoc incident triage when a failure is spotted manually, create `scripts/trigger_incident.py`:

{% code title="scripts/trigger:incident.py" lineNumbers="true" %}

```python
"""
trigger_incident.py
-------------------
Manual trigger for the `incident-commander` DinoAI agent.

Usage:
    python scripts/trigger_incident.py \
        --schedule "hourly_marts" \
        --model fct_orders \
        --context "Failed 3 times in the last 30 minutes"

Environment variables (required):
    PARADIME_API_ENDPOINT  – Paradime API endpoint
    PARADIME_API_KEY       – Paradime API key
    PARADIME_API_SECRET    – Paradime API secret
"""

import argparse
import os
import sys

from paradime import Paradime

AGENT_NAME    = "incident-commander"
SLACK_CHANNEL = os.environ.get("INCIDENT_SLACK_CHANNEL", "#incidents")


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Trigger the incident-commander agent.")
    parser.add_argument("--schedule", required=True, help="Bolt schedule or DAG name that is failing")
    parser.add_argument("--model",    required=True, help="Suspect dbt™ model name (e.g. fct_orders)")
    parser.add_argument("--context",  default="",    help="Any additional context about the failure")
    return parser.parse_args()


def main() -> None:
    args = _parse_args()

    required = {
        "PARADIME_API_ENDPOINT": os.environ.get("PARADIME_API_ENDPOINT"),
        "PARADIME_API_KEY":      os.environ.get("PARADIME_API_KEY"),
        "PARADIME_API_SECRET":   os.environ.get("PARADIME_API_SECRET"),
    }
    missing = [k for k, v in required.items() if not v]
    if missing:
        print(f"ERROR: Missing env vars: {', '.join(missing)}", file=sys.stderr)
        raise SystemExit(1)

    paradime = Paradime(
        api_endpoint=required["PARADIME_API_ENDPOINT"],
        api_key=required["PARADIME_API_KEY"],
        api_secret=required["PARADIME_API_SECRET"],
    )

    message = (
        f"INCIDENT: schedule '{args.schedule}' is failing. "
        f"Suspect model is {args.model}. "
        f"{args.context}\n\n"
        f"Coordinate triage across log-analyzer, query-profiler, and "
        f"owner-notifier. Post a unified ROOT CAUSE / IMPACT / OWNER / "
        f"NEXT ACTION summary to {SLACK_CHANNEL}."
    )

    print(f"[incident-commander] 🚨  Triggering triage for {args.schedule}/{args.model} …")

    run = paradime.dinoai_agents.trigger_run_and_wait(
        agent=AGENT_NAME,
        message=message,
        slack_channel=SLACK_CHANNEL,
        timeout=1800,
    )

    print("\n" + "=" * 72)
    print("INCIDENT TRIAGE REPORT")
    print("=" * 72)
    print(run.messages[-1].content)
    print("=" * 72)


if __name__ == "__main__":
    main()
```

{% endcode %}

Run it from your terminal:

```bash
export PARADIME_API_ENDPOINT=<your-endpoint>
export PARADIME_API_KEY=<your-key>
export PARADIME_API_SECRET=<your-secret>

python scripts/trigger_incident.py \
  --schedule "hourly_marts" \
  --model fct_orders \
  --context "Failed 3 times in the last 30 minutes. Suspect partition filter issue."
```

{% endstep %}

{% step %}

### Watch Sub-Agents in Flight

Both trigger paths block until the full triage is complete. If you want to stream sub-agent progress in real time as each specialist reports back, use the non-blocking `trigger_run` pattern and poll `child_session_ids`:

{% code title="scripts/watch:incident.py" lineNumbers="true" %}

```python
"""
watch_incident.py
-----------------
Triggers the incident commander and streams sub-agent progress in real time.
Each child session is polled independently so you see findings as they arrive
rather than waiting for all three to complete.
"""

import os
import time

from paradime import Paradime
from paradime.apis.dinoai_agents.types import DinoaiAgentRunStatus

paradime = Paradime(
    api_endpoint=os.environ["PARADIME_API_ENDPOINT"],
    api_key=os.environ["PARADIME_API_KEY"],
    api_secret=os.environ["PARADIME_API_SECRET"],
)

# Trigger without waiting — returns immediately with the parent session ID
result    = paradime.dinoai_agents.trigger_run(
    agent="incident-commander",
    message=(
        "INCIDENT: schedule 'hourly_marts' has failed 3 times in the last "
        "30 minutes. Suspect model is fct_orders. Coordinate the triage "
        "and post a unified summary in #incidents."
    ),
)
parent_id = result.agent_session_id
print(f"[commander] 🚨  Session started: {parent_id}\n")

# Poll the parent — as sub-agents are spawned, child_session_ids populates
seen_children: set[str] = set()

while True:
    parent = paradime.dinoai_agents.get_run(agent_session_id=parent_id)

    # Stream any newly spawned child sessions
    for child_id in (parent.child_session_ids or []):
        if child_id in seen_children:
            continue
        seen_children.add(child_id)

        child = paradime.dinoai_agents.get_run(agent_session_id=child_id)
        print(f"--- sub-agent spawned: {child_id} ({child.status.value}) ---")
        for msg in child.messages[-3:]:
            print(f"  [{msg.role}] {msg.content[:200]}")

    if parent.status in (DinoaiAgentRunStatus.COMPLETED, DinoaiAgentRunStatus.FAILED):
        break

    time.sleep(15)

print("\n" + "=" * 72)
print("FINAL TRIAGE REPORT")
print("=" * 72)
print(parent.messages[-1].content)
```

{% endcode %}

{% hint style="info" %}
`child_session_ids` populates progressively as the orchestrator spawns each sub-agent. The first poll may return an empty list — this is expected. The three child sessions will appear within the first 30–60 seconds as the commander issues its `invoke_agent` calls.
{% endhint %}
{% endstep %}
{% endstepper %}

## Execution Flow

When the incident commander receives the trigger message:

1. It calls `invoke_agent` three times in parallel — one for each specialist — passing its own session ID as the callback target
2. Each sub-agent runs independently, posting updates to the `#incidents` Slack thread as it works
3. Each sub-agent calls `notify_parent_session` with its findings when complete
4. The commander resumes once all three callbacks have arrived, then composes and posts the unified triage report

The entire triage typically completes in **3–8 minutes** depending on log volume and warehouse query history depth.

### Set Your Environment Variables

The callback and trigger scripts require three variables. For the Airflow path, set these as Airflow Variables or in your Airflow environment. For the manual path, export them in your shell.

| Variable                 | Description                                                                                 |
| ------------------------ | ------------------------------------------------------------------------------------------- |
| `PARADIME_API_ENDPOINT`  | Your Paradime API endpoint                                                                  |
| `PARADIME_API_KEY`       | Your Paradime API key                                                                       |
| `PARADIME_API_SECRET`    | Your Paradime API secret                                                                    |
| `INCIDENT_SLACK_CHANNEL` | *(optional)* Slack channel for incident posts — defaults to `#incidents`                    |
| `SUSPECT_MODEL`          | *(optional, Airflow only)* Override the suspect model name — defaults to the failed task ID |

{% hint style="info" %}
Your Paradime API endpoint, key, and secret are available under Workspace Settings → API. Make sure the key has `DinoAI agent API` capabilities enabled.
{% endhint %}

### File Structure

Your repository should look like this after completing the setup:

```
your-repo/
├── dbt_project.yml
├── .dinoai/
│   └── agents/
│       ├── incident-commander.yml
│       ├── log-analyzer.yml
│       ├── query-profiler.yml
│       └── owner-notifier.yml
├── dags/
│   ├── hourly_marts.py                   ← example DAG with callback attached
│   └── callbacks/
│       └── incident_commander.py         ← Airflow on_failure_callback
└── scripts/
    ├── trigger_incident.py               ← manual CLI trigger
    └── watch_incident.py                 ← real-time sub-agent streaming
```

#### Related Docs

* [**Agent-to-Agent Delegation** — how `invoke_agent` and `notify_parent_session` work](/app-help/products/dino-ai/programmable-agents/agent-to-agent-delegation.md)
* [**Programmable Agents — Quick Start** — getting started with DinoAI agents](/app-help/products/dino-ai/programmable-agents/quick-start.md)
* [**Programmable Agents — YAML Configuration** — full reference for agent config options](/app-help/products/dino-ai/programmable-agents/yaml-configuration.md)
* [**Programmable Agents — Tools Reference** — all available tools](/app-help/products/dino-ai/programmable-agents/tools-reference.md)
* [**Slack Integration** — connecting Slack to Paradime](/app-help/integrations/slack.md)
* [**Paradime API & Credentials** — where to find your API endpoint, key, and secret](/app-help/developers/generate-api-keys.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.paradime.io/app-help/guides-new/porgrammable-agents/incident-commander.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
