# dbt™ Model Query Cost Optimizer — BigQuery

Automatically detect your most expensive dbt™ models, diagnose SQL anti-patterns in the model source files, apply targeted rewrites, and open a pull request — all triggered by a single DinoAI agent session. The agent reads BigQuery performance stats to understand where bytes and slot time are being wasted, makes the fixes directly in your repository, and posts the PR link to Slack without any manual intervention.

{% hint style="info" icon="compass" %}
**Before You Start**

**Paradime**

* Your Paradime API endpoint, API key, and API secret — generate these under Workspace Settings → API. Make sure to enable `DinoAI agent API` capabilities. Requires Admin access.

**BigQuery**

* Your Paradime workspace must have an active BigQuery connection with sufficient permissions to read job metadata. The agent calls `get_bigquery_query_performance_stats` to retrieve performance stats for the BigQuery jobs that each dbt™ model produced.

**Model source**

You need at least one of the following so the script knows which dbt™ models to analyse:

* A **Bolt schedule name** — the script reads job IDs directly from the latest run's `run_results.json` artifact, which maps each dbt™ model to the BigQuery job it produced
* One or more **BigQuery job IDs** supplied directly via CLI flag or environment variable, if you already know which models are expensive

**Integrations**

The following must already be connected in Paradime:

* **Slack** — the agent posts the PR link and a cost summary to `#finops` via `post_slack_message`
  {% endhint %}

## What You'll Build

By the end of this guide you'll have:

* A Poetry project wired to the Paradime SDK
* A `run_query_optimizer.py` trigger script that reads dbt™ model results from Bolt artifacts (or accepts job IDs directly), filters to the top costly models by bytes processed, and hands the full context to the agent
* A `query-optimizer` DinoAI agent YAML that diagnoses anti-patterns, rewrites dbt™ model SQL and config, opens a PR, and notifies Slack

### What the Agent Does

Once triggered, the agent works through five phases without stopping:

```
PHASE 1 — Fetch BigQuery performance stats for each dbt™ model's job ID
PHASE 2 — Locate the dbt™ model .sql and YAML files in the repository
PHASE 3 — Diagnose SQL anti-patterns and build a concrete optimisation plan
PHASE 4 — Create a branch, apply all changes to the model files, commit and push
PHASE 5 — Open a PR and post a cost summary to #finops on Slack
```

The agent never drops columns referenced by downstream models, never pushes directly to `main`, and never guesses — if a business decision is required it leaves a `TODO` comment in the SQL and calls it out in the PR description.

### Anti-Patterns the Agent Detects and Fixes

| Anti-Pattern               | What It Means                                                | Fix Applied                                                       |
| -------------------------- | ------------------------------------------------------------ | ----------------------------------------------------------------- |
| `PARTITION_FILTER_MISSING` | Partitioned table scanned without a partition filter         | Adds `partition_by` config and an `is_incremental()` WHERE clause |
| `NO_CLUSTERING`            | High-cardinality filter columns not declared as `cluster_by` | Adds `cluster_by` to the config block                             |
| `WRONG_MATERIALIZATION`    | Frequently queried view causing repeated full scans          | Changes to `table` or `incremental` materialization               |
| `INCREMENTAL_OPPORTUNITY`  | Full table with a natural time-based key                     | Converts to incremental with `is_incremental()` filter            |
| `SELECT_STAR`              | All columns selected, pulling unnecessary bytes              | Enumerates only columns used by downstream models                 |
| `CARTESIAN_FAN_OUT_JOIN`   | JOIN on non-unique key causing row multiplication            | Adds `QUALIFY ROW_NUMBER()` deduplication                         |
| `REPEATED_CTE`             | CTE referenced more than twice in the same query             | Materialises as a separate intermediate model                     |
| `UNFILTERED_LARGE_TABLE`   | Large source pulled with no predicate                        | Pushes a `WHERE` clause into the CTE                              |

### Architecture Overview

```mermaid
flowchart TD
    A([run_query_optimizer.py]) --> B[Bolt artifact fetch\nreads run_results.json\nfrom latest Bolt run]
    A --> C[Model filter\ndeduplicate by model name\napply GB threshold · select top N]
    A --> D[DinoAI agent trigger\nparadime.dinoai_agents\n.trigger_run / .get_run]

    B --> B1[paradime.bolt.*\nlist_runs\nget_run_status\nget_latest_artifact_url]
    B1 --> B2[(run_results.json\ndbt™ model → BigQuery job ID)]

    D --> E([query-optimizer agent])

    E --> E1[get_bigquery_query_performance_stats\nreads job stats per model]
    E --> E2[read_file · write_file\nrewrites .sql and YAML]
    E --> E3[run_terminal_command\ngit branch · commit · push]
    E --> E4[gh pr create]
    E --> E5[post_slack_message\n→ #finops]
```

## How It Works

`run_query_optimizer.py` loads configuration from environment variables, optionally reads the latest Bolt run's `run_results.json` artifact to extract the BigQuery job ID produced by each dbt™ model, filters to the top-N costliest models above a configurable bytes threshold (default: 10 GB), then triggers a single `query-optimizer` DinoAI agent session with the full model context pre-embedded in the opening message. The script polls every 20 seconds until the agent completes or times out (default: 60 minutes), then prints the full optimisation report to stdout. An optional follow-up drill-down on a single model can be sent within the same session.

{% stepper %}
{% step %}

### Set Up the Poetry Project

Create the following `pyproject.toml` at the **same directory level as your `dbt_project.yml`**. This is required so the agent can locate and rewrite dbt™ model files correctly during the optimisation pass.

{% code title="pyproject.toml" lineNumbers="true" %}

```toml
[tool.poetry]
name = "query-optimizer"
version = "0.1.0"
description = "dbt Model Query Cost Optimizer — DinoAI Agent Trigger"
authors = ["Your Name <you@example.com>"]
package-mode = false

[tool.poetry.dependencies]
python = ">=3.11,<3.13"
requests = "^2.28.1"
paradime-io = "^5.3.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
```

{% endcode %}

Install dependencies by running:

```bash
poetry install
```

{% hint style="info" %}
The only two third-party dependencies are `requests` (for downloading Bolt artifacts) and `paradime-io` (the Paradime SDK). All other imports — `argparse`, `os`, `sys`, `textwrap`, `time` — are part of the Python standard library.
{% endhint %}
{% endstep %}

{% step %}

### Create the Agent YAML

Create the following file in your repository at `.dinoai/agents/query-optimizer.yml`. This defines the agent's role, five-phase goal, anti-pattern knowledge, guardrails, and Slack output channel.

{% code title=".dinoai/agents/query-optimizer.yml" lineNumbers="true" %}

```yaml
name: query-optimizer
version: 1

role: >
  BigQuery Query Optimization Engineer with deep expertise in dbt™ model
  performance tuning, SQL anti-patterns, and BigQuery-specific optimization
  strategies including partitioning, clustering, materialization changes,
  incremental strategies, and slot usage reduction.

goal: >
  You will be given one or more BigQuery job IDs. Your mission is not to
  report problems — it is to fix them. Locate the dbt™ model source files,
  diagnose the inefficiencies, rewrite the SQL and/or YAML config directly
  in the repository, open a pull request with all changes, and post the PR
  link to Slack.

  ── PHASE 1: GATHER PERFORMANCE DATA ──────────────────────────────────────

  For each provided job ID, call get_bigquery_query_performance_stats to
  retrieve bytes_processed, bytes_billed, slot_milliseconds, elapsed_ms,
  shuffle_output_bytes, spill_to_disk, query_plan stages, and referenced
  tables. Rank jobs by bytes_processed descending. Skip any job where
  bytes_processed < 10 GB.

  ── PHASE 2: LOCATE THE dbt™ MODELS ──────────────────────────────────────

  For each job above the threshold, find the .sql file that produces the
  destination table. Then find the corresponding YAML documentation file
  (e.g. _<model_name>.yml or schema.yml) so config blocks and descriptions
  can be updated together.

  ── PHASE 3: DIAGNOSE AND PLAN ───────────────────────────────────────────

  Read the full SQL of each model. Identify which anti-patterns apply,
  citing exact line numbers. For each model, write a concise plan with:
    - Current cost metrics (bytes, slot ms, duration)
    - List of issues found (label them by anti-pattern name)
    - Proposed changes (exact config block additions, SQL rewrites)
    - Expected impact (estimated bytes reduction or slot ms saved)

  ── PHASE 4: IMPLEMENT THE CHANGES ───────────────────────────────────────

  Create a new branch via run_terminal_command:
    git checkout -b perf/query-optimizer-<YYYYMMDD>-<5-random-chars>

  Apply every planned change directly to the repository files via
  read_file + write_file. Update both the config block and the SQL body
  as needed. After modifying any .sql file, update the corresponding YAML
  documentation if columns, config, or descriptions have changed.
  Stage and commit all changes.

  ── PHASE 5: OPEN A PR AND NOTIFY SLACK ──────────────────────────────────

  Create the pull request via `gh pr create` with title:
    perf: optimise BigQuery cost — <comma-separated model names>

  After the PR is created, post to #finops via post_slack_message:
    🚀 *Query Optimizer — Optimisation PR Opened*
    *Models optimised:* `<model_1>`, `<model_2>`, ...
    *Est. total bytes saved:* ~<X> GB per run
    *Changes made:* one-line summary per model
    *Pull Request:* <PR URL>

  Do not stop after Phase 4. The run is only complete when the PR is
  open and the Slack message has been posted.

  ── GUARDRAILS ────────────────────────────────────────────────────────────

  - NEVER drop columns referenced by downstream models.
  - NEVER change a model's grain unless explicitly fixing a fan-out join.
  - NEVER push directly to main. Always use a feature branch and PR.
  - If a model cannot be safely converted to incremental without more
    business context, change only the config block and leave a TODO
    comment in the SQL.
  - If bytes_processed < 10 GB for a given job, skip it and say so.
  - Be surgical. Do not reformat the entire file — only touch what needs
    to change, preserving existing style.

backstory: >
  You are a senior BigQuery performance engineer embedded in the analytics
  team. You have a bias for action — you do not write reports, you write
  code. When you identify an inefficiency you fix it in the file, commit
  it to a branch, open a PR, and notify the team on Slack — all without
  human intervention.

  You think in bytes and slot-milliseconds, not seconds. You always
  quantify the expected impact of each change. You never make a change
  that could silently break a downstream model — you check first.

tools:
  mode: allowlist
  list:
    - get_bigquery_query_performance_stats
    - run_sql_query
    - read_file
    - write_file
    - search_files_and_directories
    - ripgrep_search
    - list_all_bigquery_projects_and_datasets
    - list_all_tables_in_bigquery_dataset
    - list_all_columns_in_bigquery_table
    - run_terminal_command
    - post_slack_message

slack:
  channel: "#finops"
```

{% endcode %}

{% hint style="info" %}
`tools.mode: allowlist` restricts the agent to only the tools listed. This is important for a write-capable agent — it prevents accidental access to tools outside the intended optimisation workflow.
{% endhint %}

{% hint style="info" %}
The Slack channel is set to `#finops` by default. Update `slack.channel` before committing if your team routes cost alerts to a different channel.
{% endhint %}
{% endstep %}

{% step %}

### Create the Trigger Script

Create `scripts/run_query_optimizer.py`. This script reads dbt™ model results from Bolt artifacts (or accepts BigQuery job IDs directly via CLI), filters to the top costly models by bytes processed, builds a structured opening message with all pre-fetched metadata, triggers the `query-optimizer` agent, and polls until completion.

{% code title="scripts/run:query:optimizer.py" lineNumbers="true" %}

```python
"""
run_query_optimizer.py
----------------------
Trigger script for the `query-optimizer` DinoAI agent.

Environment variables (required)
=================================
  PARADIME_API_ENDPOINT  – Paradime GraphQL endpoint
  PARADIME_API_KEY       – Paradime API key
  PARADIME_API_SECRET    – Paradime API secret

  At least ONE of the following must also be set (or passed via CLI):
  BOLT_SCHEDULE_NAME        – Bolt schedule to pull job IDs from automatically
  QUERY_OPTIMIZER_JOB_IDS   – Comma-separated BigQuery job IDs (skips Bolt fetch)

Environment variables (optional)
=================================
  QUERY_OPTIMIZER_TOP_N          – Max models to optimise (default: 5)
  QUERY_OPTIMIZER_MIN_BYTES_GB   – Skip jobs below this GB threshold (default: 10)
  QUERY_OPTIMIZER_DRILL_MODEL    – dbt™ model to drill into after the main pass
  QUERY_OPTIMIZER_TIMEOUT        – Max seconds to wait for the agent (default: 3600)
  QUERY_OPTIMIZER_POLL_INTERVAL  – Polling cadence in seconds (default: 20)
"""

import argparse
import os
import sys
import textwrap
import time

import requests

from paradime import Paradime
from paradime.apis.dinoai_agents.exception import DinoaiAgentRunFailedException
from paradime.apis.dinoai_agents.types import DinoaiAgentRunStatus

AGENT_NAME    = "query-optimizer"
ARTIFACT_PATH = "target/run_results.json"
_GB           = 1024 ** 3

TOP_N         = int(os.environ.get("QUERY_OPTIMIZER_TOP_N", 5))
MIN_BYTES_GB  = float(os.environ.get("QUERY_OPTIMIZER_MIN_BYTES_GB", 10))
DRILL_MODEL   = os.environ.get("QUERY_OPTIMIZER_DRILL_MODEL", "").strip()
TIMEOUT       = int(os.environ.get("QUERY_OPTIMIZER_TIMEOUT", 3600))
POLL_INTERVAL = int(os.environ.get("QUERY_OPTIMIZER_POLL_INTERVAL", 20))


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Trigger the query-optimizer DinoAI agent."
    )
    source = parser.add_mutually_exclusive_group()
    source.add_argument(
        "--job-ids", nargs="+", metavar="JOB_ID",
        default=_split_env("QUERY_OPTIMIZER_JOB_IDS"),
        help="One or more BigQuery job IDs. (env: QUERY_OPTIMIZER_JOB_IDS)",
    )
    source.add_argument(
        "--schedule-name",
        default=os.environ.get("BOLT_SCHEDULE_NAME", "").strip(),
        help="Bolt schedule name — job IDs fetched from latest run artifact. (env: BOLT_SCHEDULE_NAME)",
    )
    parser.add_argument("--top-n", type=int, default=TOP_N)
    parser.add_argument("--min-bytes-gb", type=float, default=MIN_BYTES_GB)
    parser.add_argument("--drill-model", default=DRILL_MODEL)
    return parser.parse_args()


def _split_env(var: str) -> list[str]:
    raw = os.environ.get(var, "").strip()
    return [v.strip() for v in raw.split(",") if v.strip()] if raw else []


def _build_client() -> Paradime:
    required = {
        "PARADIME_API_ENDPOINT": os.environ.get("PARADIME_API_ENDPOINT"),
        "PARADIME_API_KEY":      os.environ.get("PARADIME_API_KEY"),
        "PARADIME_API_SECRET":   os.environ.get("PARADIME_API_SECRET"),
    }
    missing = [k for k, v in required.items() if not v]
    if missing:
        print(f"ERROR: Missing env vars: {', '.join(missing)}", file=sys.stderr)
        raise SystemExit(1)
    return Paradime(
        api_endpoint=required["PARADIME_API_ENDPOINT"],
        api_key=required["PARADIME_API_KEY"],
        api_secret=required["PARADIME_API_SECRET"],
    )


def _fetch_run_results_from_bolt(paradime: Paradime, schedule_name: str) -> list[dict]:
    print(f"[query-optimizer] 🔍  Fetching latest Bolt run for '{schedule_name}' …")
    try:
        latest_runs = paradime.bolt.list_runs(schedule_name=schedule_name, limit=1)
        latest_run  = latest_runs.runs[0] if latest_runs.runs else None
    except Exception as exc:
        print(f"[query-optimizer] ⚠️   Could not fetch run list: {exc}", file=sys.stderr)
        return []

    if not latest_run:
        print("[query-optimizer] ⚠️   No runs found for this schedule.", file=sys.stderr)
        return []

    run_id = latest_run.id
    print(f"[query-optimizer] 🏃  Run ID: {run_id} | status: {latest_run.state}")

    try:
        run_status     = paradime.bolt.get_run_status(run_id)
        commands       = getattr(run_status, "commands", None) or []
        command_indices = list(range(len(commands))) if commands else list(range(5))
    except Exception:
        command_indices = list(range(5))

    all_model_results: list[dict] = []
    seen_urls: set[str] = set()

    for command_index in command_indices:
        try:
            url = paradime.bolt.get_latest_artifact_url(
                schedule_name=schedule_name,
                artifact_path=ARTIFACT_PATH,
                command_index=command_index,
            )
        except Exception:
            continue

        if not url or url in seen_urls:
            continue
        seen_urls.add(url)

        try:
            resp = requests.get(url, timeout=30)
            resp.raise_for_status()
            data = resp.json()
        except Exception as exc:
            print(f"[query-optimizer] ⚠️   Failed to download artifact: {exc}", file=sys.stderr)
            continue

        for result in data.get("results", []):
            if not result.get("unique_id", "").startswith("model."):
                continue
            adapter = result.get("adapter_response") or {}
            all_model_results.append({
                "name":             result["unique_id"].split(".")[-1],
                "unique_id":        result["unique_id"],
                "status":           result.get("status", "—"),
                "execution_time_s": round(result.get("execution_time") or 0, 2),
                "job_id":           adapter.get("job_id", "—"),
                "bytes_billed":     adapter.get("bytes_billed", 0),
                "bytes_processed":  adapter.get("bytes_processed", 0),
                "rows_affected":    adapter.get("rows_affected", "—"),
                "command_index":    command_index,
                "generated_at":     data.get("metadata", {}).get("generated_at", "unknown"),
            })

    return all_model_results


def _top_costly_models(all_results: list[dict], top_n: int, min_bytes_gb: float) -> list[dict]:
    deduped: dict[str, dict] = {}
    for row in all_results:
        name = row["name"]
        if name not in deduped or row["bytes_processed"] > deduped[name]["bytes_processed"]:
            deduped[name] = row
    min_bytes = int(min_bytes_gb * _GB)
    filtered  = [r for r in deduped.values() if r["bytes_processed"] >= min_bytes]
    return sorted(filtered, key=lambda r: r["bytes_processed"], reverse=True)[:top_n]


def _rows_from_job_ids(job_ids: list[str]) -> list[dict]:
    return [
        {
            "name": f"job_{i + 1}", "unique_id": "—", "status": "unknown",
            "execution_time_s": 0, "job_id": jid, "bytes_billed": 0,
            "bytes_processed": 2 ** 63, "rows_affected": "—",
            "command_index": 0, "generated_at": "—",
        }
        for i, jid in enumerate(job_ids)
    ]


def _format_table(rows: list[dict]) -> str:
    if not rows:
        return "(no model results to display)"
    headers = {
        "name": "Model", "execution_time_s": "Time (s)",
        "bytes_processed": "Bytes Processed", "bytes_billed": "Bytes Billed",
        "job_id": "BigQuery Job ID", "command_index": "Cmd",
    }
    widths = {col: len(label) for col, label in headers.items()}
    for row in rows:
        for col in headers:
            widths[col] = max(widths[col], len(str(row.get(col, ""))))
    def _row_str(values):
        return "  ".join(str(values.get(col, "")).ljust(widths[col]) for col in headers)
    header_line = _row_str(headers)
    return "\n".join([header_line, "-" * len(header_line)] + [_row_str(r) for r in rows])


def _build_opening_message(rows: list[dict], source_label: str) -> str:
    job_id_lines = "\n".join(
        f"  {i + 1}. model=`{r['name']}`  job_id=`{r['job_id']}`  "
        f"bytes_processed={r['bytes_processed']:,}  bytes_billed={r['bytes_billed']:,}  "
        f"execution_time={r['execution_time_s']}s"
        for i, r in enumerate(rows)
    )
    return textwrap.dedent(f"""
        You are being triggered to analyse and fix costly dbt™ models running on BigQuery.

        SOURCE: {source_label}

        ── PRE-FETCHED MODEL PERFORMANCE DATA ────────────────────────────────────

        {_format_table(rows)}

        ── JOB IDs TO OPTIMISE ───────────────────────────────────────────────────

        {job_id_lines}

        Work through all five phases (GATHER → LOCATE → DIAGNOSE → IMPLEMENT →
        PR + SLACK) in order without stopping. The run is only complete when the
        PR is open and the Slack message has been posted to #finops.
    """).strip()


def _build_drill_message(drill_model: str, rows: list[dict]) -> str:
    drill_row = next((r for r in rows if r["name"] == drill_model), None)
    job_hint  = (
        f"Its BigQuery job ID is `{drill_row['job_id']}` ({drill_row['execution_time_s']}s). "
        f"Use get_bigquery_query_performance_stats to retrieve the full query plan. "
        if drill_row and drill_row["job_id"] != "—"
        else f"No pre-fetched job ID for `{drill_model}` — analyse the .sql file directly. "
    )
    return (
        f"Now drill deeper into `{drill_model}` specifically. {job_hint}"
        f"Show the exact query plan stage(s) consuming the most slot ms, identify "
        f"the bottleneck type, and apply any additional targeted fixes not already "
        f"covered in the main pass. Commit changes to the existing branch, update "
        f"the PR description, and post a follow-up to #finops with the updated PR link."
    )


def _poll_until_done(paradime: Paradime, session_id: str, label: str) -> object:
    start = time.time()
    while True:
        run = paradime.dinoai_agents.get_run(agent_session_id=session_id)
        if run.status == DinoaiAgentRunStatus.COMPLETED:
            print(f"[query-optimizer] ✅  {label} completed")
            return run
        if run.status == DinoaiAgentRunStatus.FAILED:
            last_msg = run.messages[-1].content if run.messages else "no messages"
            raise DinoaiAgentRunFailedException(f"{label} FAILED. Last: {last_msg}")
        elapsed = time.time() - start
        if elapsed > TIMEOUT:
            raise TimeoutError(f"Timed out after {TIMEOUT}s for session {session_id}")
        print(f"[query-optimizer]    {label} — {run.status.value} ({int(elapsed)}s) … {POLL_INTERVAL}s")
        time.sleep(POLL_INTERVAL)


def main() -> None:
    args = _parse_args()
    paradime = _build_client()

    if args.job_ids:
        rows         = _rows_from_job_ids(args.job_ids)
        source_label = f"explicit job IDs: {', '.join(args.job_ids)}"
    elif args.schedule_name:
        all_results  = _fetch_run_results_from_bolt(paradime, args.schedule_name)
        rows         = _top_costly_models(all_results, args.top_n, args.min_bytes_gb)
        source_label = f"Bolt schedule '{args.schedule_name}' — top {len(rows)} models above {args.min_bytes_gb} GB"
        if not rows:
            print("[query-optimizer] ⚠️   No models above threshold. Exiting.")
            raise SystemExit(0)
    else:
        print("ERROR: Provide --job-ids or --schedule-name.", file=sys.stderr)
        raise SystemExit(1)

    print("\n" + "=" * 72)
    print(_format_table(rows))
    print("=" * 72 + "\n")

    trigger    = paradime.dinoai_agents.trigger_run(
        agent=AGENT_NAME,
        message=_build_opening_message(rows, source_label),
    )
    session_id = trigger.agent_session_id
    print(f"[query-optimizer] 📋  Session ID: {session_id}")

    main_run = _poll_until_done(paradime, session_id, "Main optimisation pass")
    print(main_run.messages[-1].content if main_run.messages else "(no report)")

    if args.drill_model:
        paradime.dinoai_agents.send_message(
            agent_session_id=session_id,
            message=_build_drill_message(args.drill_model, rows),
        )
        drill_run = _poll_until_done(paradime, session_id, f"Drill-down on '{args.drill_model}'")
        print(drill_run.messages[-1].content if drill_run.messages else "(no drill report)")

    print("[query-optimizer] ✅  All done.")


if __name__ == "__main__":
    main()
```

{% endcode %}

{% hint style="info" %}
The script accepts `--schedule-name` or `--job-ids` but not both — they are mutually exclusive. When `--schedule-name` is used, the script reads `run_results.json` from the latest Bolt run and maps each dbt™ model name to the BigQuery job ID it produced. When `--job-ids` is used, the agent receives the job IDs directly and resolves the model files itself via performance stats.
{% endhint %}

{% hint style="info" %}
When no models are found above the bytes threshold the script exits cleanly with code `0` — this makes it safe to run on a schedule even when recent Bolt runs had no costly models.
{% endhint %}
{% endstep %}

{% step %}

### Set Your Environment Variables

Store your secrets in Paradime before scheduling. Go to **Account Settings → Environment Variables** and add the following:

| Variable                       | Description                                                                                                                                            |
| ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `PARADIME_API_ENDPOINT`        | Your Paradime GraphQL endpoint                                                                                                                         |
| `PARADIME_API_KEY`             | Your Paradime API key                                                                                                                                  |
| `PARADIME_API_SECRET`          | Your Paradime API secret                                                                                                                               |
| `BOLT_SCHEDULE_NAME`           | Name of the Bolt schedule whose `run_results.json` maps dbt™ models to their BigQuery job IDs                                                          |
| `QUERY_OPTIMIZER_TOP_N`        | *(optional)* Max models per run — defaults to `5`                                                                                                      |
| `QUERY_OPTIMIZER_MIN_BYTES_GB` | *(optional)* GB threshold — defaults to `10`                                                                                                           |
| `QUERY_OPTIMIZER_DRILL_MODEL`  | *(optional)* Model name to drill into after the main pass                                                                                              |
| `QUERY_OPTIMIZER_TIMEOUT`      | *(optional)* Max seconds to wait per session — defaults to `3600`                                                                                      |
| `QUERY_OPTIMIZER_JOB_IDS`      | *(optional)* Comma-separated BigQuery job IDs to analyse directly — bypasses Bolt artifact fetch when you already know which dbt™ models are expensive |

{% hint style="info" %}
Your Paradime API endpoint, key, and secret are available under Workspace Settings → API. Make sure the key has `DinoAI agent API` capabilities enabled.
{% endhint %}
{% endstep %}

{% step %}

### Run the Optimizer Manually

**Option A — Automatically detect costly dbt™ models from a Bolt schedule:**

```bash
poetry run python scripts/run_query_optimizer.py \
  --schedule-name "my_dbt_production_schedule" \
  --top-n 5 \
  --min-bytes-gb 10
```

**Option B — Supply BigQuery job IDs directly (when you already know which models are expensive):**

```bash
poetry run python scripts/run_query_optimizer.py \
  --job-ids bqjob_r1234abcd bqjob_r5678efgh bqjob_r9012ijkl
```

**Option C — Include a follow-up drill-down on a specific dbt™ model:**

```bash
poetry run python scripts/run_query_optimizer.py \
  --schedule-name "my_dbt_production_schedule" \
  --drill-model fct_orders
```

You should see output like:

```
========================================================================
Model              Time (s)  Bytes Processed    Bytes Billed       BigQuery Job ID       Cmd
------------------------------------------------------------------------
fct_orders         142.30    85,899,345,920     85,899,345,920     bqjob_r1234abcd       0
dim_sessions       98.50     32,212,254,720     32,212,254,720     bqjob_r5678efgh       0
========================================================================

[query-optimizer] 📋  Session ID: agt_sess_abc123xyz
[query-optimizer]    Main optimisation pass — running (0s) … 20s
[query-optimizer]    Main optimisation pass — running (20s) … 20s
[query-optimizer] ✅  Main optimisation pass completed
```

The agent then posts its full optimisation plan and PR link to stdout, and a cost summary to `#finops` on Slack.
{% endstep %}
{% endstepper %}

## Schedule with Bolt

Run the optimizer automatically on a recurring cadence so costly dbt™ models are caught and fixed without any manual intervention. The schedule reads the latest Bolt run's `run_results.json`, identifies the top offending models by bytes processed, and triggers the agent to fix and PR them.

### Add Environment Variables to Paradime

Go to **Account Settings → Environment Variables** and confirm the variables from the previous step are saved. Bolt schedules read from the same environment variable store, so no additional setup is needed.

### Create the Bolt Schedule

Go to **Bolt → Schedules** and click **New Schedule**. Name it something like `Query Cost Optimizer` and add the following two commands in order:

```bash
poetry install
```

```bash
poetry run python scripts/run_query_optimizer.py --schedule-name "your_dbt_production_schedule" --top-n 5 --min-bytes-gb 10
```

{% hint style="info" %}
Replace `your_dbt_production_schedule` with the exact name of the Bolt schedule that runs your dbt™ models in production. The script reads that schedule's `run_results.json` artifact, which contains a record per dbt™ model including the BigQuery job ID it produced.
{% endhint %}

{% hint style="info" %}
`poetry install` runs first on every execution so that any dependency updates committed to `pyproject.toml` are picked up automatically — no manual intervention needed after a dependency bump.
{% endhint %}

#### Choose a Schedule Frequency

The right cadence depends on how frequently your production dbt™ models run and how quickly costs accumulate in your warehouse. The table below covers the most common options:

| Cadence                   | Cron expression | When it runs                 |
| ------------------------- | --------------- | ---------------------------- |
| Every day at 8 AM         | `0 8 * * *`     | Monday–Sunday, 8:00 AM       |
| Weekdays at 8 AM          | `0 8 * * 1-5`   | Monday–Friday, 8:00 AM       |
| Once a week (Monday 8 AM) | `0 8 * * 1`     | Every Monday, 8:00 AM        |
| Twice a week (Mon & Thu)  | `0 8 * * 1,4`   | Monday and Thursday, 8:00 AM |
| Every 12 hours            | `0 */12 * * *`  | Midnight and noon daily      |

{% hint style="info" %}
For most teams, **weekdays at 8 AM** (`0 8 * * 1-5`) is a sensible default — it runs after overnight production dbt™ models have completed, so the latest `run_results.json` artifacts are always available. The script exits cleanly with code `0` when no dbt™ models exceed the bytes threshold, so there is no cost or noise on quiet days.
{% endhint %}

#### File Structure

Your repository should look like this after completing the setup:

```
your-repo/
├── dbt_project.yml
├── pyproject.toml                      ← same level as dbt_project.yml
├── .dinoai/
│   └── agents/
│       └── query-optimizer.yml
└── scripts/
    └── run_query_optimizer.py          ← trigger script
```

{% hint style="info" %}
`pyproject.toml` must sit at the same directory level as `dbt_project.yml`. The agent uses `run_terminal_command`, `read_file`, and `write_file` to navigate and rewrite your dbt™ project files, so the working directory at session start must be the repo root.
{% endhint %}

## Related Docs

* [**Programmable Agents — Quick Start** — getting started with DinoAI agents](/app-help/products/dino-ai/programmable-agents/quick-start.md)
* [**Programmable Agents — YAML Configuration** — full reference for agent config options](/app-help/products/dino-ai/programmable-agents/yaml-configuration.md)
* [**Programmable Agents — Tools Reference** — all available tools including `get_bigquery_query_performance_stats` and `post_slack_message`](/app-help/products/dino-ai/programmable-agents/tools-reference.md)
* [**Bolt — Schedules** — creating and managing Bolt schedules](/app-help/products/bolt/creating-schedules.md)
* [**Paradime SDK — Artifacts** — accessing run artifacts including `run_results.json`](/app-help/developers/python-sdk/modules/bolt.md)
* [**Slack Integration** — connecting Slack to Paradime](/app-help/integrations/slack.md)
* [**Paradime API & Credentials** — where to find your API endpoint, key, and secret](/app-help/developers/generate-api-keys.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.paradime.io/app-help/guides-new/porgrammable-agents/dbt-tm-model-query-cost-optimizer-bigquery.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
