# dbt™ Model Query Cost Optimizer — Snowflake

Automatically detect your most expensive dbt™ models, diagnose Snowflake-specific SQL anti-patterns in the model source files, apply targeted rewrites, and open a pull request — all triggered by a single DinoAI agent session. The agent reads Snowflake query performance stats to understand where credits and execution time are being wasted, makes the fixes directly in your repository, and posts the PR link to Slack without any manual intervention.

{% hint style="info" icon="compass" %}
**Before You Start**

**Paradime**

* Your Paradime API endpoint, API key, and API secret — generate these under Workspace Settings → API. Make sure to enable `DinoAI agent API` capabilities. Requires Admin access.

**Snowflake**

* Your Paradime workspace must have an active Snowflake connection with sufficient permissions to read query history metadata. The agent calls `get_snowflake_query_performance_stats` to retrieve performance stats for the Snowflake queries that each dbt™ model produced.

**Model source**

You need at least one of the following so the script knows which dbt™ models to analyse:

* A **Bolt schedule name** — the script reads query IDs directly from the latest run's `run_results.json` artifact, which maps each dbt™ model to the Snowflake query it produced via `adapter_response.query_id`
* One or more **Snowflake query IDs** supplied directly via CLI flag or environment variable, if you already know which models are expensive

**Integrations**

The following must already be connected in Paradime:

* **Slack** — the agent posts the PR link and a cost summary to `#finops` via `post_slack_message`
  {% endhint %}

## What You'll Build

By the end of this guide you'll have:

* A Poetry project wired to the Paradime SDK
* A `run_snowflake_query_optimizer.py` trigger script that reads dbt™ model results from Bolt artifacts (or accepts Snowflake query IDs directly), filters to the top costly models by execution time, and hands the full context to the agent
* A `snowflake-query-optimizer` DinoAI agent YAML that diagnoses Snowflake-specific anti-patterns, rewrites dbt™ model SQL and config, opens a PR, and notifies Slack

### What the Agent Does

Once triggered, the agent works through five phases without stopping:

```
PHASE 1 — Fetch Snowflake performance stats for each dbt™ model's query ID
PHASE 2 — Locate the dbt™ model .sql and YAML files in the repository
PHASE 3 — Diagnose Snowflake SQL anti-patterns and build a concrete optimisation plan
PHASE 4 — Create a branch, apply all changes to the model files, commit and push
PHASE 5 — Open a PR and post a cost summary to #finops on Slack
```

The agent never drops columns referenced by downstream models, never pushes directly to `main`, and never guesses — if a business decision is required it leaves a `TODO` comment in the SQL and calls it out in the PR description.

### Anti-Patterns the Agent Detects and Fixes

| Anti-Pattern                    | What It Means                                                                                                                                              | Fix Applied                                                                           |
| ------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------- |
| `WAREHOUSE_OVERSIZED`           | Model runs on a warehouse larger than required for its data volume, burning unnecessary credits                                                            | Adds a `snowflake_warehouse` config targeting a smaller warehouse size                |
| `NO_CLUSTER_KEY`                | Large table filtered on high-cardinality columns but no clustering key defined, causing full micro-partition scans                                         | Adds `cluster_by` to the dbt™ config block with the appropriate filter columns        |
| `WRONG_MATERIALIZATION`         | Frequently queried view causing repeated full recomputation on every downstream query                                                                      | Changes to `table` or `incremental` materialization                                   |
| `INCREMENTAL_OPPORTUNITY`       | Full table rebuild on every run despite having a natural time-based key, consuming credits unnecessarily                                                   | Converts to incremental with an `is_incremental()` filter on the timestamp column     |
| `RESULT_CACHE_BYPASS`           | Model query is not benefiting from Snowflake's result cache due to non-deterministic functions (e.g. `CURRENT_TIMESTAMP()`, `RANDOM()`) used unnecessarily | Removes or replaces non-deterministic expressions with deterministic equivalents      |
| `SELECT_STAR`                   | All columns selected from a source, scanning micro-partitions that contain unneeded data                                                                   | Enumerates only the columns actually used by downstream models                        |
| `CARTESIAN_FAN_OUT_JOIN`        | JOIN on a non-unique key causing row multiplication and credit waste                                                                                       | Adds `QUALIFY ROW_NUMBER() OVER (PARTITION BY ...) = 1` deduplication before the join |
| `REPEATED_CTE`                  | A CTE is referenced more than twice in the same query; Snowflake re-evaluates it each time                                                                 | Materialises the CTE as a separate intermediate dbt™ model                            |
| `UNFILTERED_LARGE_TABLE`        | A large `source()` or `ref()` pulled with no predicate, scanning all micro-partitions                                                                      | Pushes a `WHERE` clause into the CTE that reads that table                            |
| `SEARCH_OPTIMISATION_CANDIDATE` | Model supports frequent point lookups or selective equality filters but `search_optimization` is not enabled                                               | Adds `post-hook: ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION` to the config block  |

### Architecture Overview

```mermaid
flowchart TD
    A([run_snowflake_query_optimizer.py]) --> B[Bolt artifact fetch\nreads run_results.json\nfrom latest Bolt run]
    A --> C[Model filter\ndeduplicate by model name\napply execution time threshold · select top N]
    A --> D[DinoAI agent trigger\nparadime.dinoai_agents\n.trigger_run / .get_run]

    B --> B1[paradime.bolt.*\nlist_runs\nget_run_status\nget_latest_artifact_url]
    B1 --> B2[(run_results.json\ndbt™ model → adapter_response.query_id)]

    D --> E([snowflake-query-optimizer agent])

    E --> E1[get_snowflake_query_performance_stats\nreads query stats per model]
    E --> E2[read_file · write_file\nrewrites .sql and YAML]
    E --> E3[run_terminal_command\ngit branch · commit · push]
    E --> E4[gh pr create]
    E --> E5[post_slack_message\n→ #finops]
```

## How It Works

`run_snowflake_query_optimizer.py` loads configuration from environment variables, optionally reads the latest Bolt run's `run_results.json` artifact to extract the Snowflake query ID produced by each dbt™ model from `adapter_response.query_id`, filters to the top-N slowest models above a configurable execution time threshold (default: 30 seconds), then triggers a single `snowflake-query-optimizer` DinoAI agent session with the full model context pre-embedded in the opening message. The script polls every 20 seconds until the agent completes or times out (default: 60 minutes), then prints the full optimisation report to stdout. An optional follow-up drill-down on a single model can be sent within the same session.

{% hint style="info" %}
Unlike the BigQuery version which filters by bytes processed, the Snowflake version filters by **execution time in seconds** — this is the most reliable proxy for credit consumption available in `run_results.json` without needing to call the Snowflake query history API upfront.
{% endhint %}

{% stepper %}
{% step %}

### Set Up the Poetry Project

Create the following `pyproject.toml` at the **same directory level as your `dbt_project.yml`**. This is required so the agent can locate and rewrite dbt™ model files correctly during the optimisation pass.

{% code title="pyproject.toml" lineNumbers="true" %}

```toml
[tool.poetry]
name = "snowflake-query-optimizer"
version = "0.1.0"
description = "dbt Model Query Cost Optimizer (Snowflake) — DinoAI Agent Trigger"
authors = ["Your Name <you@example.com>"]
package-mode = false

[tool.poetry.dependencies]
python = ">=3.11,<3.13"
requests = "^2.28.1"
paradime-io = "^5.3.0"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
```

{% endcode %}

Install dependencies by running:

```bash
poetry install
```

{% hint style="info" %}
The only two third-party dependencies are `requests` (for downloading Bolt artifacts) and `paradime-io` (the Paradime SDK). All other imports — `argparse`, `os`, `sys`, `textwrap`, `time` — are part of the Python standard library.
{% endhint %}
{% endstep %}

{% step %}

### Create the Agent YAML

Create the following file in your repository at `.dinoai/agents/snowflake-query-optimizer.yml`. This defines the agent's role, five-phase goal, Snowflake-specific anti-pattern knowledge, guardrails, and Slack output channel.

{% code title=".dinoai/agents/snowflake-query-optimizer.yml" lineNumbers="true" %}

```yaml
name: snowflake-query-optimizer
version: 1

role: >
  Snowflake Query Optimization Engineer with deep expertise in dbt™ model
  performance tuning, Snowflake-specific SQL anti-patterns, and optimization
  strategies including warehouse right-sizing, clustering keys, search
  optimization, result cache utilization, incremental strategies, and
  micro-partition pruning.

goal: >
  You will be given one or more Snowflake query IDs, each corresponding to
  a dbt™ model execution. Your mission is not to report problems — it is to
  fix them. Locate the dbt™ model source files, diagnose the inefficiencies,
  rewrite the SQL and/or YAML config directly in the repository, open a pull
  request with all changes, and post the PR link to Slack.

  ── PHASE 1: GATHER PERFORMANCE DATA ──────────────────────────────────────

  For each provided query ID, call get_snowflake_query_performance_stats to
  retrieve execution_time_ms, credits_used, bytes_scanned, bytes_spilled_to_disk,
  partitions_scanned, partitions_total, percentage_scanned_from_cache,
  warehouse_name, warehouse_size, and query_text. Rank models by
  execution_time_ms descending. Skip any query where execution_time_ms < 30000
  (30 seconds).

  ── PHASE 2: LOCATE THE dbt™ MODELS ──────────────────────────────────────

  For each model above the threshold, find the .sql file that produces the
  destination table. Then find the corresponding YAML documentation file
  (e.g. _<model_name>.yml or schema.yml) so config blocks and descriptions
  can be updated together.

  ── PHASE 3: DIAGNOSE AND PLAN ───────────────────────────────────────────

  Read the full SQL of each model. Identify which anti-patterns apply,
  citing exact line numbers. For each model, write a concise plan with:
    - Current cost metrics (execution_time_ms, credits_used, partitions_scanned
      vs partitions_total, bytes_spilled_to_disk, warehouse_size)
    - List of issues found (label them by anti-pattern name)
    - Proposed changes (exact config block additions, SQL rewrites)
    - Expected impact (estimated credit reduction or execution time saved)

  Key signals to look for:
    - partitions_scanned close to partitions_total → missing cluster key or
      no partition pruning
    - bytes_spilled_to_disk > 0 → warehouse undersized for this query or
      query needs refactoring to reduce data movement
    - percentage_scanned_from_cache = 0 and query is deterministic →
      result cache bypass due to non-deterministic functions
    - warehouse_size is X-Large or above for a model with low row counts →
      WAREHOUSE_OVERSIZED
    - materialized='view' with high execution_time_ms and multiple refs →
      WRONG_MATERIALIZATION

  ── PHASE 4: IMPLEMENT THE CHANGES ───────────────────────────────────────

  Create a new branch via run_terminal_command:
    git checkout -b perf/snowflake-optimizer-<YYYYMMDD>-<5-random-chars>

  Apply every planned change directly to the repository files via
  read_file + write_file. For each model, update both the config block
  and the SQL body as needed. After modifying any .sql file, update the
  corresponding YAML documentation if columns, config, or descriptions
  have changed. Stage and commit all changes.

  ── PHASE 5: OPEN A PR AND NOTIFY SLACK ──────────────────────────────────

  Create the pull request via `gh pr create` with title:
    perf: optimise Snowflake credit usage — <comma-separated model names>

  After the PR is created, post to #finops via post_slack_message:
    🚀 *Snowflake Query Optimizer — Optimisation PR Opened*
    *Models optimised:* `<model_1>`, `<model_2>`, ...
    *Est. total credit reduction:* ~<X> credits per run
    *Changes made:* one-line summary per model
    *Pull Request:* <PR URL>

  Do not stop after Phase 4. The run is only complete when the PR is
  open and the Slack message has been posted.

  ── GUARDRAILS ────────────────────────────────────────────────────────────

  - NEVER drop columns referenced by downstream models. Before removing any
    column, check all downstream .sql files for usage via ripgrep_search.
  - NEVER change a model's grain unless explicitly fixing a fan-out join —
    and always call this out in the PR body.
  - NEVER push directly to main. Always use a feature branch and PR.
  - NEVER change the warehouse_size in a dbt™ config without confirming the
    model's typical row count justifies it — check via run_sql_query.
  - If a model cannot be safely converted to incremental without more
    business context, change only the config block and leave a TODO
    comment in the SQL explaining what the team needs to decide.
  - If execution_time_ms < 30000 for a given query, skip it and say so.
  - Be surgical. Do not reformat the entire file — only touch what needs
    to change, preserving existing style (uppercase keywords, snake_case
    aliases, CTE structure, column grouping comments).

backstory: >
  You are a senior Snowflake performance engineer embedded in the analytics
  team. You have a bias for action — you do not write reports, you write
  code. When you identify an inefficiency you fix it in the file, commit
  it to a branch, open a PR, and notify the team on Slack — all without
  human intervention.

  You think in credits and partition pruning ratios, not abstract
  performance scores. You always quantify the expected impact of each
  change. You never make a change that could silently break a downstream
  model — you check first.

  You know the following Snowflake anti-patterns and their fixes:

    WAREHOUSE_OVERSIZED — the model runs on a warehouse larger than
    necessary for its data volume. Fix: add a snowflake_warehouse config
    targeting a smaller warehouse, or use query_tag to route to an
    appropriately sized warehouse.

    NO_CLUSTER_KEY — a large table is filtered on high-cardinality columns
    but no clustering key is defined, causing full micro-partition scans
    (partitions_scanned ≈ partitions_total). Fix: add cluster_by to the
    dbt™ config block using the most selective filter column(s).

    WRONG_MATERIALIZATION — a model is a view but is queried frequently by
    multiple downstream models, causing repeated full recomputation. Fix:
    change to materialized='table' or materialized='incremental'.

    INCREMENTAL_OPPORTUNITY — a model is a full table rebuild every run but
    has a natural time-based incremental key (e.g. created_at, event_date).
    Fix: convert to incremental with an is_incremental() filter.

    RESULT_CACHE_BYPASS — the model does not benefit from Snowflake's result
    cache because it uses non-deterministic functions (CURRENT_TIMESTAMP(),
    CURRENT_DATE(), RANDOM(), UUID_STRING()) unnecessarily.
    Fix: replace with deterministic equivalents or move non-deterministic
    expressions outside the cached layer.

    SELECT_STAR — the model selects all columns from a source, scanning
    micro-partitions that contain unneeded data. Fix: enumerate only the
    columns actually used downstream (check ref() usages via ripgrep_search).

    CARTESIAN_FAN_OUT_JOIN — a JOIN on a non-unique key causing row
    multiplication and excess credit use. Fix: add QUALIFY ROW_NUMBER()
    OVER (PARTITION BY <join_key> ORDER BY <tie_break>) = 1 before the join.

    REPEATED_CTE — a CTE is referenced more than twice in the same query;
    Snowflake re-evaluates it each time. Fix: materialise the CTE as a
    separate intermediate dbt™ model.

    UNFILTERED_LARGE_TABLE — a source() or ref() model pulls a very large
    table with no predicate, scanning all micro-partitions. Fix: push a
    WHERE clause into the CTE that reads that table.

    SEARCH_OPTIMISATION_CANDIDATE — the model supports frequent point
    lookups or selective equality/LIKE filters but search_optimization is
    not enabled. Fix: add a post-hook to the config block:
    post-hook: "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"

tools:
  mode: allowlist
  list:
    - get_snowflake_query_performance_stats
    - run_sql_query
    - read_file
    - write_file
    - search_files_and_directories
    - ripgrep_search
    - run_terminal_command
    - post_slack_message

slack:
  channel: "#finops"
```

{% endcode %}

{% hint style="info" %}
`tools.mode: allowlist` restricts the agent to only the tools listed. This is important for a write-capable agent — it prevents accidental access to tools outside the intended optimisation workflow.
{% endhint %}

{% hint style="info" %}
The Slack channel is set to `#finops` by default. Update `slack.channel` before committing if your team routes cost alerts to a different channel.
{% endhint %}
{% endstep %}

{% step %}

### Create the Trigger Script

Create `scripts/run_snowflake_query_optimizer.py`. This script reads dbt™ model results from Bolt artifacts (or accepts Snowflake query IDs directly via CLI), extracts `adapter_response.query_id` from each result, filters to the top costly models by execution time, builds a structured opening message with all pre-fetched metadata, triggers the `snowflake-query-optimizer` agent, and polls until completion.

{% hint style="info" %}
In Snowflake's `run_results.json`, the query ID is found at `adapter_response.query_id` — for example `"01c4683c-0003-4ff0-0002-731a00bda6ea"`. This is different from BigQuery where the job ID is at `adapter_response.job_id`. The script reads this field specifically when parsing Bolt artifacts.
{% endhint %}

{% code title="scripts/run:snowflake:query:optimizer.py" lineNumbers="true" %}

```python
"""
run_snowflake_query_optimizer.py
---------------------------------
Trigger script for the `snowflake-query-optimizer` DinoAI agent.

Reads dbt™ model results from the latest Bolt run's run_results.json,
extracts adapter_response.query_id per model, filters to the top-N
slowest models above an execution time threshold, and triggers a single
DinoAI agent session to diagnose and fix the anti-patterns.

Environment variables (required)
=================================
  PARADIME_API_ENDPOINT   – Paradime API endpoint
  PARADIME_API_KEY        – Paradime API key
  PARADIME_API_SECRET     – Paradime API secret

  At least ONE of the following must also be set (or passed via CLI):
  BOLT_SCHEDULE_NAME          – Bolt schedule to pull query IDs from automatically
  QUERY_OPTIMIZER_QUERY_IDS   – Comma-separated Snowflake query IDs (skips Bolt fetch)

Environment variables (optional)
=================================
  QUERY_OPTIMIZER_TOP_N           – Max models to optimise in one run (default: 5)
  QUERY_OPTIMIZER_MIN_EXEC_SECS   – Skip models below this execution time in seconds
                                    (default: 30)
  QUERY_OPTIMIZER_DRILL_MODEL     – dbt™ model name to drill into after the main pass
  QUERY_OPTIMIZER_TIMEOUT         – Max seconds to wait for the agent (default: 3600)
  QUERY_OPTIMIZER_POLL_INTERVAL   – Polling cadence in seconds (default: 20)
"""

import argparse
import os
import sys
import textwrap
import time

import requests

from paradime import Paradime
from paradime.apis.dinoai_agents.exception import DinoaiAgentRunFailedException
from paradime.apis.dinoai_agents.types import DinoaiAgentRunStatus

AGENT_NAME    = "snowflake-query-optimizer"
ARTIFACT_PATH = "target/run_results.json"

TOP_N         = int(os.environ.get("QUERY_OPTIMIZER_TOP_N", 5))
MIN_EXEC_SECS = float(os.environ.get("QUERY_OPTIMIZER_MIN_EXEC_SECS", 30))
DRILL_MODEL   = os.environ.get("QUERY_OPTIMIZER_DRILL_MODEL", "").strip()
TIMEOUT       = int(os.environ.get("QUERY_OPTIMIZER_TIMEOUT", 3600))
POLL_INTERVAL = int(os.environ.get("QUERY_OPTIMIZER_POLL_INTERVAL", 20))


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Trigger the snowflake-query-optimizer DinoAI agent."
    )
    source = parser.add_mutually_exclusive_group()
    source.add_argument(
        "--query-ids", nargs="+", metavar="QUERY_ID",
        default=_split_env("QUERY_OPTIMIZER_QUERY_IDS"),
        help="One or more Snowflake query IDs. (env: QUERY_OPTIMIZER_QUERY_IDS)",
    )
    source.add_argument(
        "--schedule-name",
        default=os.environ.get("BOLT_SCHEDULE_NAME", "").strip(),
        help="Bolt schedule name — query IDs fetched from latest run artifact. (env: BOLT_SCHEDULE_NAME)",
    )
    parser.add_argument("--top-n", type=int, default=TOP_N)
    parser.add_argument("--min-exec-secs", type=float, default=MIN_EXEC_SECS)
    parser.add_argument("--drill-model", default=DRILL_MODEL)
    return parser.parse_args()


def _split_env(var: str) -> list[str]:
    raw = os.environ.get(var, "").strip()
    return [v.strip() for v in raw.split(",") if v.strip()] if raw else []


def _build_client() -> Paradime:
    required = {
        "PARADIME_API_ENDPOINT": os.environ.get("PARADIME_API_ENDPOINT"),
        "PARADIME_API_KEY":      os.environ.get("PARADIME_API_KEY"),
        "PARADIME_API_SECRET":   os.environ.get("PARADIME_API_SECRET"),
    }
    missing = [k for k, v in required.items() if not v]
    if missing:
        print(f"ERROR: Missing env vars: {', '.join(missing)}", file=sys.stderr)
        raise SystemExit(1)
    return Paradime(
        api_endpoint=required["PARADIME_API_ENDPOINT"],
        api_key=required["PARADIME_API_KEY"],
        api_secret=required["PARADIME_API_SECRET"],
    )


def _fetch_run_results_from_bolt(paradime: Paradime, schedule_name: str) -> list[dict]:
    """
    Fetch run_results.json from the latest Bolt run and extract one record
    per dbt™ model. For Snowflake, the query ID is at adapter_response.query_id.
    """
    print(f"[snowflake-optimizer] 🔍  Fetching latest Bolt run for '{schedule_name}' …")
    try:
        latest_runs = paradime.bolt.list_runs(schedule_name=schedule_name, limit=1)
        latest_run  = latest_runs.runs[0] if latest_runs.runs else None
    except Exception as exc:
        print(f"[snowflake-optimizer] ⚠️   Could not fetch run list: {exc}", file=sys.stderr)
        return []

    if not latest_run:
        print("[snowflake-optimizer] ⚠️   No runs found for this schedule.", file=sys.stderr)
        return []

    print(f"[snowflake-optimizer] 🏃  Run ID: {latest_run.id} | status: {latest_run.state}")

    try:
        run_status      = paradime.bolt.get_run_status(latest_run.id)
        commands        = getattr(run_status, "commands", None) or []
        command_indices = list(range(len(commands))) if commands else list(range(5))
    except Exception:
        command_indices = list(range(5))

    all_model_results: list[dict] = []
    seen_urls: set[str] = set()

    for command_index in command_indices:
        try:
            url = paradime.bolt.get_latest_artifact_url(
                schedule_name=schedule_name,
                artifact_path=ARTIFACT_PATH,
                command_index=command_index,
            )
        except Exception:
            continue

        if not url or url in seen_urls:
            continue
        seen_urls.add(url)

        try:
            resp = requests.get(url, timeout=30)
            resp.raise_for_status()
            data = resp.json()
        except Exception as exc:
            print(f"[snowflake-optimizer] ⚠️   Failed to download artifact: {exc}", file=sys.stderr)
            continue

        for result in data.get("results", []):
            unique_id = result.get("unique_id", "")
            if not unique_id.startswith("model."):
                continue

            adapter  = result.get("adapter_response") or {}
            query_id = adapter.get("query_id", "—")   # Snowflake: adapter_response.query_id

            all_model_results.append({
                "name":             unique_id.split(".")[-1],
                "unique_id":        unique_id,
                "status":           result.get("status", "—"),
                "execution_time_s": round(result.get("execution_time") or 0, 2),
                "query_id":         query_id,
                "rows_affected":    adapter.get("rows_affected", "—"),
                "command_index":    command_index,
                "generated_at":     data.get("metadata", {}).get("generated_at", "unknown"),
            })

    print(f"[snowflake-optimizer] 📊  Total model results collected: {len(all_model_results)}")
    return all_model_results


def _top_slow_models(
    all_results: list[dict],
    top_n: int,
    min_exec_secs: float,
) -> list[dict]:
    """
    Deduplicate by model name (keep slowest instance), apply execution time
    threshold, sort by execution_time_s descending, return top N.
    Only includes models where a Snowflake query_id was captured.
    """
    deduped: dict[str, dict] = {}
    for row in all_results:
        name = row["name"]
        if name not in deduped or row["execution_time_s"] > deduped[name]["execution_time_s"]:
            deduped[name] = row

    filtered = [
        r for r in deduped.values()
        if r["execution_time_s"] >= min_exec_secs and r["query_id"] != "—"
    ]
    return sorted(filtered, key=lambda r: r["execution_time_s"], reverse=True)[:top_n]


def _rows_from_query_ids(query_ids: list[str]) -> list[dict]:
    """Build minimal model result dicts from raw Snowflake query IDs."""
    return [
        {
            "name":             f"query_{i + 1}",
            "unique_id":        "—",
            "status":           "unknown",
            "execution_time_s": 0,
            "query_id":         qid,
            "rows_affected":    "—",
            "command_index":    0,
            "generated_at":     "—",
        }
        for i, qid in enumerate(query_ids)
    ]


def _format_table(rows: list[dict]) -> str:
    if not rows:
        return "(no model results to display)"
    headers = {
        "name":             "Model",
        "execution_time_s": "Time (s)",
        "rows_affected":    "Rows Affected",
        "query_id":         "Snowflake Query ID",
        "command_index":    "Cmd",
    }
    widths = {col: len(label) for col, label in headers.items()}
    for row in rows:
        for col in headers:
            widths[col] = max(widths[col], len(str(row.get(col, ""))))

    def _row_str(values):
        return "  ".join(str(values.get(col, "")).ljust(widths[col]) for col in headers)

    header_line = _row_str(headers)
    return "\n".join([header_line, "-" * len(header_line)] + [_row_str(r) for r in rows])


def _build_opening_message(rows: list[dict], source_label: str) -> str:
    query_id_lines = "\n".join(
        f"  {i + 1}. model=`{r['name']}`  "
        f"query_id=`{r['query_id']}`  "
        f"execution_time={r['execution_time_s']}s  "
        f"rows_affected={r['rows_affected']}"
        for i, r in enumerate(rows)
    )
    return textwrap.dedent(f"""
        You are being triggered to analyse and fix costly dbt™ models running on Snowflake.

        SOURCE: {source_label}

        ── PRE-FETCHED MODEL PERFORMANCE DATA ────────────────────────────────────

        {_format_table(rows)}

        ── QUERY IDs TO OPTIMISE ─────────────────────────────────────────────────

        {query_id_lines}

        Work through all five phases (GATHER → LOCATE → DIAGNOSE → IMPLEMENT →
        PR + SLACK) in order without stopping. The run is only complete when the
        PR is open and the Slack message has been posted to #finops.
    """).strip()


def _build_drill_message(drill_model: str, rows: list[dict]) -> str:
    drill_row = next((r for r in rows if r["name"] == drill_model), None)
    query_hint = (
        f"Its Snowflake query ID is `{drill_row['query_id']}` "
        f"({drill_row['execution_time_s']}s execution time). "
        f"Use get_snowflake_query_performance_stats to retrieve the full "
        f"query profile including partition pruning ratios and spill details. "
        if drill_row and drill_row["query_id"] != "—"
        else f"No pre-fetched query ID for `{drill_model}` — analyse the .sql file directly. "
    )
    return (
        f"Now drill deeper into `{drill_model}` specifically. {query_hint}"
        f"Show the exact partition pruning ratio, spill-to-disk bytes, and "
        f"warehouse size for this query. Identify whether the bottleneck is a "
        f"missing cluster key, warehouse sizing, or a data movement issue, and "
        f"apply any additional targeted fixes not already covered in the main pass. "
        f"Commit changes to the existing branch, update the PR description, and "
        f"post a follow-up to #finops with the updated PR link and a summary of "
        f"the extra changes."
    )


def _poll_until_done(paradime: Paradime, session_id: str, label: str) -> object:
    start = time.time()
    while True:
        run = paradime.dinoai_agents.get_run(agent_session_id=session_id)
        if run.status == DinoaiAgentRunStatus.COMPLETED:
            print(f"[snowflake-optimizer] ✅  {label} completed")
            return run
        if run.status == DinoaiAgentRunStatus.FAILED:
            last_msg = run.messages[-1].content if run.messages else "no messages"
            raise DinoaiAgentRunFailedException(f"{label} FAILED. Last: {last_msg}")
        elapsed = time.time() - start
        if elapsed > TIMEOUT:
            raise TimeoutError(f"Timed out after {TIMEOUT}s for session {session_id}")
        print(f"[snowflake-optimizer]    {label} — {run.status.value} ({int(elapsed)}s) … {POLL_INTERVAL}s")
        time.sleep(POLL_INTERVAL)


def main() -> None:
    args     = _parse_args()
    paradime = _build_client()

    if args.query_ids:
        rows         = _rows_from_query_ids(args.query_ids)
        source_label = f"explicit query IDs: {', '.join(args.query_ids)}"
    elif args.schedule_name:
        all_results  = _fetch_run_results_from_bolt(paradime, args.schedule_name)
        rows         = _top_slow_models(all_results, args.top_n, args.min_exec_secs)
        source_label = (
            f"Bolt schedule '{args.schedule_name}' — top {len(rows)} models "
            f"above {args.min_exec_secs}s execution time"
        )
        if not rows:
            print("[snowflake-optimizer] ⚠️   No models above threshold or missing query IDs. Exiting.")
            raise SystemExit(0)
    else:
        print("ERROR: Provide --query-ids or --schedule-name.", file=sys.stderr)
        raise SystemExit(1)

    print("\n" + "=" * 72)
    print(_format_table(rows))
    print("=" * 72 + "\n")

    trigger    = paradime.dinoai_agents.trigger_run(
        agent=AGENT_NAME,
        message=_build_opening_message(rows, source_label),
    )
    session_id = trigger.agent_session_id
    print(f"[snowflake-optimizer] 📋  Session ID: {session_id}")

    main_run = _poll_until_done(paradime, session_id, "Main optimisation pass")
    print(main_run.messages[-1].content if main_run.messages else "(no report)")

    if args.drill_model:
        paradime.dinoai_agents.send_message(
            agent_session_id=session_id,
            message=_build_drill_message(args.drill_model, rows),
        )
        drill_run = _poll_until_done(paradime, session_id, f"Drill-down on '{args.drill_model}'")
        print(drill_run.messages[-1].content if drill_run.messages else "(no drill report)")

    print("[snowflake-optimizer] ✅  All done.")


if __name__ == "__main__":
    main()
```

{% endcode %}

{% hint style="info" %}
The script accepts `--schedule-name` or `--query-ids` but not both — they are mutually exclusive. When `--schedule-name` is used, the script reads `run_results.json` from the latest Bolt run and extracts `adapter_response.query_id` from each dbt™ model result. Models where `query_id` is absent (e.g. tests or snapshots) are automatically excluded.
{% endhint %}

{% hint style="info" %}
When no models are found above the execution time threshold, or none have a captured `query_id`, the script exits cleanly with code `0` — safe to run on a schedule without generating noise.
{% endhint %}
{% endstep %}

{% step %}

### Set Your Environment Variables

Store your secrets in Paradime before scheduling. Go to **Workspace Settings → Environment Variables** and add the following:

| Variable                        | Description                                                                                                                      |
| ------------------------------- | -------------------------------------------------------------------------------------------------------------------------------- |
| `PARADIME_API_ENDPOINT`         | Your Paradime API endpoint                                                                                                       |
| `PARADIME_API_KEY`              | Your Paradime API key                                                                                                            |
| `PARADIME_API_SECRET`           | Your Paradime API secret                                                                                                         |
| `BOLT_SCHEDULE_NAME`            | Name of the Bolt schedule whose `run_results.json` maps dbt™ models to their Snowflake query IDs                                 |
| `QUERY_OPTIMIZER_TOP_N`         | *(optional)* Max models per run — defaults to `5`                                                                                |
| `QUERY_OPTIMIZER_MIN_EXEC_SECS` | *(optional)* Execution time threshold in seconds — defaults to `30`                                                              |
| `QUERY_OPTIMIZER_DRILL_MODEL`   | *(optional)* Model name to drill into after the main pass                                                                        |
| `QUERY_OPTIMIZER_TIMEOUT`       | *(optional)* Max seconds to wait per session — defaults to `3600`                                                                |
| `QUERY_OPTIMIZER_QUERY_IDS`     | *(optional)* Comma-separated Snowflake query IDs — bypasses Bolt artifact fetch when you already know which dbt™ models are slow |

{% hint style="info" %}
Your Paradime API endpoint, key, and secret are available under Workspace Settings → API. Make sure the key has `DinoAI agent API` capabilities enabled.
{% endhint %}
{% endstep %}

{% step %}

### Run the Optimizer Manually

**Option A — Automatically detect costly dbt™ models from a Bolt schedule:**

```bash
poetry run python scripts/run_snowflake_query_optimizer.py \
  --schedule-name "my_dbt_production_schedule" \
  --top-n 5 \
  --min-exec-secs 30
```

**Option B — Supply Snowflake query IDs directly (when you already know which models are slow):**

```bash
poetry run python scripts/run_snowflake_query_optimizer.py \
  --query-ids 01c4683c-0003-4ff0-0002-731a00bda6ea 01c4683c-0003-4ff0-0002-731a00bda6eb
```

**Option C — Include a follow-up drill-down on a specific dbt™ model:**

```bash
poetry run python scripts/run_snowflake_query_optimizer.py \
  --schedule-name "my_dbt_production_schedule" \
  --drill-model fct_orders
```

You should see output like:

```
========================================================================
Model              Time (s)  Rows Affected  Snowflake Query ID                       Cmd
------------------------------------------------------------------------
fct_orders         142.30    84921          01c4683c-0003-4ff0-0002-731a00bda6ea     0
dim_sessions       98.50     12043          01c4683c-0003-4ff0-0002-731a00bda6eb     0
========================================================================

[snowflake-optimizer] 📋  Session ID: agt_sess_abc123xyz
[snowflake-optimizer]    Main optimisation pass — running (0s) … 20s
[snowflake-optimizer]    Main optimisation pass — running (20s) … 20s
[snowflake-optimizer] ✅  Main optimisation pass completed
```

The agent then posts its full optimisation plan and PR link to stdout, and a credit reduction summary to `#finops` on Slack.
{% endstep %}
{% endstepper %}

## Schedule with Bolt

Run the optimizer automatically on a recurring cadence so costly dbt™ models are caught and fixed without any manual intervention. The schedule reads the latest Bolt run's `run_results.json`, identifies the top offending models by execution time, and triggers the agent to fix and PR them.

### Add Environment Variables to Paradime

Go to **Workspace Settings → Environment Variables** and confirm the variables from the previous step are saved. Bolt schedules read from the same environment variable store, so no additional setup is needed.

### Create the Bolt Schedule

Go to **Bolt → Schedules** and click **New Schedule**. Name it something like `Snowflake Query Cost Optimizer` and add the following two commands in order:

```bash
poetry install
```

```bash
poetry run python scripts/run_snowflake_query_optimizer.py --schedule-name "your_dbt_production_schedule" --top-n 5 --min-exec-secs 30
```

{% hint style="info" %}
Replace `your_dbt_production_schedule` with the exact name of the Bolt schedule that runs your dbt™ models in production. The script reads that schedule's `run_results.json` artifact, which contains a record per dbt™ model including the Snowflake query ID captured at `adapter_response.query_id`.
{% endhint %}

{% hint style="info" %}
`poetry install` runs first on every execution so that any dependency updates committed to `pyproject.toml` are picked up automatically — no manual intervention needed after a dependency bump.
{% endhint %}

#### Choose a Schedule Frequency

The right cadence depends on how frequently your production dbt™ models run and how quickly Snowflake credit costs accumulate. The table below covers the most common options:

| Cadence                   | Cron expression | When it runs                 |
| ------------------------- | --------------- | ---------------------------- |
| Every day at 8 AM         | `0 8 * * *`     | Monday–Sunday, 8:00 AM       |
| Weekdays at 8 AM          | `0 8 * * 1-5`   | Monday–Friday, 8:00 AM       |
| Once a week (Monday 8 AM) | `0 8 * * 1`     | Every Monday, 8:00 AM        |
| Twice a week (Mon & Thu)  | `0 8 * * 1,4`   | Monday and Thursday, 8:00 AM |
| Every 12 hours            | `0 */12 * * *`  | Midnight and noon daily      |

{% hint style="info" %}
For most teams, **weekdays at 8 AM** (`0 8 * * 1-5`) is a sensible default — it runs after overnight production dbt™ models have completed, so the latest `run_results.json` artifacts and Snowflake query history are always available. The script exits cleanly with code `0` when no dbt™ models exceed the execution time threshold, so there is no cost or noise on quiet days.
{% endhint %}

#### File Structure

Your repository should look like this after completing the setup:

```
your-repo/
├── dbt_project.yml
├── pyproject.toml                              ← same level as dbt_project.yml
├── .dinoai/
│   └── agents/
│       └── snowflake-query-optimizer.yml
└── scripts/
    └── run_snowflake_query_optimizer.py        ← trigger script
```

{% hint style="info" %}
`pyproject.toml` must sit at the same directory level as `dbt_project.yml`. The agent uses `run_terminal_command`, `read_file`, and `write_file` to navigate and rewrite your dbt™ project files, so the working directory at session start must be the repo root.
{% endhint %}

## Related Docs

* [**Programmable Agents — Quick Start** — getting started with DinoAI agents](/app-help/products/dino-ai/programmable-agents/quick-start.md)
* [**Programmable Agents — YAML Configuration** — full reference for agent config options](/app-help/products/dino-ai/programmable-agents/yaml-configuration.md)
* [**Programmable Agents — Tools Reference** — all available tools including `get_snowflake_query_performance_stats` and `post_slack_message`](/app-help/products/dino-ai/programmable-agents/tools-reference.md)
* [**Bolt — Schedules** — creating and managing Bolt schedules](/app-help/products/bolt/creating-schedules.md)
* [**Paradime SDK — Artifacts** — accessing run artifacts including `run_results.json`](/app-help/developers/python-sdk/modules/bolt.md)
* [**Slack Integration** — connecting Slack to Paradime](/app-help/integrations/slack.md)
* [**Paradime API & Credentials** — where to find your API endpoint, key, and secret](/app-help/developers/generate-api-keys.md)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.paradime.io/app-help/guides-new/porgrammable-agents/dbt-tm-model-query-cost-optimizer-snowflake.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
