# Metadata

## Overview

The Paradime **Python SDK Metadata** module helps you query and analyze **dbt™ metadata** from Bolt runs.

Use it to monitor **model health**, analyze **dbt test results**, track **source freshness**, and explore **upstream/downstream lineage**. It uses a local **DuckDB** database and returns results as high-performance **polars DataFrames**.

* Query dbt artifacts like `manifest.json`, `run_results.json`, and source freshness results
* Build reliability checks around failing models, failing tests, and stale sources
* Run ad-hoc SQL with `query_sql()` against the DuckDB metadata schema

{% hint style="info" %}
The Metadata SDK is available in Python SDK version 4.18.0+ and requires a Paradime account with access to dbt metadata.
{% endhint %}

### Quick Start

#### Basic Setup

```python
from paradime.client.paradime_client import Paradime

client = Paradime(
    api_endpoint="your-api-endpoint",
    api_key="your-api-key",
    api_secret="your-api-secret"
)

metadata = client.metadata
```

#### Simple Health Check

```python
models = metadata.get_model_health("daily_production_run")

successful = len([m for m in models if m.status.value == "success"])
print(f"✅ {successful}/{len(models)} models successful")
```

### Model Health Monitoring

#### Get Model Health Status

Monitor the health of all models in a dbt run, including execution status, test results, and performance metrics.

```python
models = metadata.get_model_health("schedule_name")

for model in models:
    print(f"{model.name}")
    print(f"  Status: {model.status.value}")
    print(f"  Health: {model.health_status.value}")
    print(f"  Tests: {model.total_tests} total, {model.failed_tests} failed")
    print(f"  Execution: {model.execution_time}s")
```

#### Filter by Health Status

{% tabs %}
{% tab title="Failing models" %}

```python
failing_models = metadata.get_failing_models("schedule_name")

for model in failing_models:
    print(f"{model.name} — {model.failed_tests} failed tests")
    if model.error_message:
        print(f"  Error: {model.error_message}")
```

{% endtab %}

{% tab title="Slowest models" %}

```python
slowest = metadata.get_slowest_models("schedule_name", limit=10)

for model in slowest:
    print(f"{model.name}: {model.execution_time}s")
```

{% endtab %}
{% endtabs %}

### Test Results

```python
# All test results
tests = metadata.get_test_results("schedule_name")

# Failed tests only
failed_tests = metadata.get_test_results("schedule_name", failed_only=True)

for test in failed_tests:
    print(f"❌ {test.test_name}")
    print(f"   Status: {test.status.value}")
    print(f"   Models: {test.depends_on_nodes}")
```

### Source Freshness

```python
sources = metadata.get_source_freshness("schedule_name")

for source in sources:
    print(f"{source.source_name}.{source.table_name}")
    print(f"  Status: {source.freshness_status.value}")
    print(f"  Last loaded: {source.max_loaded_at}")
    print(f"  Hours since load: {source.hours_since_load}")
```

{% hint style="warning" %}
Sources are loaded from a dedicated source freshness schedule (e.g. one running `dbt source freshness`), not from your main dbt run schedule.
{% endhint %}

### Custom SQL Queries

The metadata SDK exposes a DuckDB database you can query directly using SQL. Results are returned as polars DataFrames.

```python
sql = """
    SELECT
        schema_name,
        COUNT(*) as model_count,
        AVG(execution_time) as avg_execution_time
    FROM dbt_run_results
    WHERE schedule_name = ? AND resource_type = 'model'
    GROUP BY schema_name
    ORDER BY model_count DESC
"""

results = metadata.query_sql(sql, "schedule_name", ["schedule_name"])

for row in results.iter_rows(named=True):
    print(f"{row['schema_name']}: {row['model_count']} models")
```

{% hint style="info" %}
`query_sql` returns a `polars.DataFrame`. Use `.iter_rows(named=True)` to iterate with dict-style row access.
{% endhint %}

### Dependency Analysis

#### Upstream Dependencies

```python
dependencies = metadata.get_upstream_health(
    model_name="my_important_model",
    schedule_name="schedule_name",
    max_depth=5
)

for dep in dependencies:
    print(f"Level {dep.level}: {dep.name}")
    print(f"  Health: {dep.health_status.value}")
    print(f"  Status: {dep.status.value}")
```

#### Downstream Impact

```python
impact = metadata.get_downstream_impact(
    model_name="failed_model",
    schedule_name="schedule_name",
    max_depth=5
)

print(f"Impact of {impact.failed_model} failure:")
print(f"  Critical: {len(impact.critical_models)}")
print(f"  Warning: {len(impact.warning_models)}")
print(f"  Potentially affected: {len(impact.potentially_affected)}")
```

### Health Dashboard

```python
dashboard = metadata.get_health_dashboard("schedule_name")

print(f"Models — Total: {dashboard.total_models}")
print(f"  Healthy: {dashboard.healthy_models}")
print(f"  Warning: {dashboard.warning_models}")
print(f"  Critical: {dashboard.critical_models}")
print(f"Avg execution: {dashboard.avg_execution_time:.1f}s")
print(f"Test success rate: {dashboard.test_success_rate:.1f}%")
```

### Advanced Features

#### Performance Summary

```python
perf = metadata.get_performance_summary("schedule_name", days=7)

print(f"Avg execution: {perf.average_execution_time:.1f}s")
print(f"Success rate: {perf.success_rate:.1f}%")

for model in perf.slowest_models[:5]:
    print(f"  {model['name']}: {model['execution_time']:.1f}s")
```

#### Streaming Large Datasets

For large metadata datasets, stream results in batches to avoid memory pressure.

```python
for batch in metadata.get_model_health_stream("schedule_name", batch_size=100):
    for model in batch:
        if model.health_status.value == "Critical":
            print(f"Critical: {model.name}")
```

#### Cache Management

```python
metadata.clear_cache()                    # clear all
metadata.clear_cache("schedule_name")     # clear one schedule
metadata.refresh_metadata("schedule_name") # force refresh
```

### Configuration

#### Custom Database Connection

By default the SDK uses an in-memory DuckDB instance. Use a file path for persistent storage across sessions.

```python
from paradime.apis.metadata.client import MetadataClient

metadata_client = MetadataClient(
    bolt_client=client.bolt,
    db_connection="./metadata.duckdb",  # persistent
    cache_ttl_seconds=600               # 10 min cache
)
```

### Complete Example

```python
from paradime.client.paradime_client import Paradime

def monitor_dbt_health(schedule_name: str) -> None:
    client = Paradime(
        api_endpoint="your-api-endpoint",
        api_key="your-api-key",
        api_secret="your-api-secret"
    )
    metadata = client.metadata

    dashboard = metadata.get_health_dashboard(schedule_name)
    print(f"Models: {dashboard.healthy_models}✅  {dashboard.warning_models}⚠️  {dashboard.critical_models}❌")

    if dashboard.critical_models > 0:
        for model in metadata.get_failing_models(schedule_name)[:5]:
            print(f"  {model.name}: {model.failed_tests} failed tests")

    for model in metadata.get_slowest_models(schedule_name, limit=3):
        print(f"  {model.name}: {model.execution_time:.1f}s")

    stale = [s for s in metadata.get_source_freshness(schedule_name)
             if s.freshness_status.value != "pass"]
    for source in stale[:3]:
        print(f"  {source.source_name}.{source.table_name}: {source.hours_since_load:.1f}h stale")

    client.metadata.close()

monitor_dbt_health("daily_production_run")
```

### Database Schema

The Metadata SDK loads dbt artifacts into a local DuckDB database. You can query any of these tables directly via `query_sql`.

#### Core Tables

**`dbt_run_results`**

Main table containing execution results for all dbt resources (models, tests, seeds, snapshots).

| Column              | Type       | Description                            |
| ------------------- | ---------- | -------------------------------------- |
| `unique_id`         | VARCHAR    | Unique identifier for the dbt resource |
| `name`              | VARCHAR    | Resource name                          |
| `resource_type`     | VARCHAR    | `model`, `test`, `seed`, `snapshot`    |
| `status`            | VARCHAR    | `success`, `error`, `fail`, `skipped`  |
| `execution_time`    | DOUBLE     | Execution time in seconds              |
| `executed_at`       | TIMESTAMP  | When the resource was executed         |
| `schedule_name`     | VARCHAR    | Schedule identifier                    |
| `depends_on`        | VARCHAR\[] | Array of upstream dependency IDs       |
| `error_message`     | TEXT       | Error details if execution failed      |
| `schema_name`       | VARCHAR    | Database schema name                   |
| `database_name`     | VARCHAR    | Database name                          |
| `materialized_type` | VARCHAR    | Model materialization type             |
| `description`       | TEXT       | Resource description from dbt          |
| `tags`              | VARCHAR\[] | Array of dbt tags                      |
| `meta`              | JSON       | dbt meta configuration                 |
| `owner`             | VARCHAR    | Resource owner                         |
| `compiled_sql`      | TEXT       | Compiled SQL                           |
| `raw_sql`           | TEXT       | Raw SQL from dbt files                 |
| `columns`           | JSON       | Column information                     |
| `children_l1`       | VARCHAR\[] | Direct child dependencies              |
| `parents_models`    | VARCHAR\[] | Parent model dependencies              |
| `parents_sources`   | VARCHAR\[] | Parent source dependencies             |

**`dbt_source_freshness_results`**

| Column              | Type       | Description                |
| ------------------- | ---------- | -------------------------- |
| `unique_id`         | VARCHAR    | Unique source identifier   |
| `source_name`       | VARCHAR    | dbt source name            |
| `table_name`        | VARCHAR    | Source table name          |
| `schedule_name`     | VARCHAR    | Schedule identifier        |
| `freshness_status`  | VARCHAR    | `pass`, `warn`, `error`    |
| `max_loaded_at`     | TIMESTAMP  | Last data load timestamp   |
| `snapshotted_at`    | TIMESTAMP  | When freshness was checked |
| `hours_since_load`  | DOUBLE     | Hours since last data load |
| `error_after_hours` | INTEGER    | Error threshold in hours   |
| `warn_after_hours`  | INTEGER    | Warning threshold in hours |
| `database`          | VARCHAR    | Source database            |
| `schema_name`       | VARCHAR    | Source schema              |
| `description`       | TEXT       | Source description         |
| `meta`              | JSON       | Source metadata            |
| `tags`              | VARCHAR\[] | Source tags                |

**`model_metadata`**

Extended model metadata and lineage information from `manifest.json`.

| Column               | Type       | Description              |
| -------------------- | ---------- | ------------------------ |
| `unique_id`          | VARCHAR    | Model unique identifier  |
| `name`               | VARCHAR    | Model name               |
| `resource_type`      | VARCHAR    | Always `model`           |
| `depends_on`         | VARCHAR\[] | Direct dependencies      |
| `config`             | JSON       | dbt model configuration  |
| `tags`               | VARCHAR\[] | Model tags               |
| `meta`               | JSON       | Model metadata           |
| `description`        | TEXT       | Model description        |
| `columns`            | JSON       | Model column definitions |
| `parents`            | VARCHAR\[] | All parent resources     |
| `children`           | VARCHAR\[] | All child resources      |
| `original_file_path` | VARCHAR    | Path to dbt model file   |

#### Specialized Tables

| Table               | Description                                            |
| ------------------- | ------------------------------------------------------ |
| `dbt_test_data`     | Detailed test execution results with column-level info |
| `dbt_seed_data`     | Seed file load results                                 |
| `dbt_snapshot_data` | Snapshot execution results                             |
| `dbt_exposure_data` | dbt exposure metadata                                  |

#### Optimized Views

{% tabs %}
{% tab title="models\_with\_tests" %}
Pre-calculated model health with test counts.

```sql
SELECT unique_id, name, status, execution_time,
       total_tests, failed_tests,
       health_status  -- 'Healthy', 'Warning', 'Critical'
FROM models_with_tests
WHERE schedule_name = 'your_schedule'
```

{% endtab %}

{% tab title="latest\_model\_results" %}
Most recent result per model.

```sql
SELECT *
FROM latest_model_results
WHERE schedule_name = 'your_schedule'
ORDER BY execution_time DESC
```

{% endtab %}

{% tab title="dashboard\_metrics" %}
Aggregated metrics for dashboards.

```sql
SELECT total_models, healthy_models, warning_models,
       critical_models, avg_execution_time, test_success_rate
FROM dashboard_metrics
WHERE schedule_name = 'your_schedule'
```

{% endtab %}
{% endtabs %}

#### Example Queries

{% tabs %}
{% tab title="Failed tests" %}

```sql
SELECT name, failed_tests, total_tests,
       (failed_tests * 100.0 / total_tests) as failure_rate
FROM models_with_tests
WHERE schedule_name = ?
  AND total_tests > 0
ORDER BY failed_tests DESC
LIMIT 10
```

{% endtab %}

{% tab title="Performance by schema" %}

```sql
SELECT schema_name,
       COUNT(*) as model_count,
       AVG(execution_time) as avg_time,
       MAX(execution_time) as max_time
FROM dbt_run_results
WHERE schedule_name = ?
  AND resource_type = 'model'
GROUP BY schema_name
ORDER BY avg_time DESC
```

{% endtab %}

{% tab title="Source freshness alerts" %}

```sql
SELECT source_name, table_name, hours_since_load,
       CASE
           WHEN hours_since_load > error_after_hours THEN 'CRITICAL'
           WHEN hours_since_load > warn_after_hours  THEN 'WARNING'
           ELSE 'OK'
       END as alert_level
FROM dbt_source_freshness_results
WHERE schedule_name = ?
ORDER BY hours_since_load DESC
```

{% endtab %}

{% tab title="Downstream lineage" %}

```sql
WITH RECURSIVE downstream AS (
    SELECT unique_id, name, 0 as level
    FROM model_metadata
    WHERE name = ? AND schedule_name = ?

    UNION ALL

    SELECT m.unique_id, m.name, d.level + 1
    FROM model_metadata m
    JOIN downstream d ON d.unique_id = ANY(m.depends_on)
    WHERE d.level < 10
)
SELECT d.name, d.level, r.status, r.execution_time
FROM downstream d
LEFT JOIN dbt_run_results r ON d.unique_id = r.unique_id
WHERE d.level > 0
ORDER BY d.level, d.name
```

{% endtab %}
{% endtabs %}

### API Reference

#### MetadataClient

| Method                                                           | Description                 | Returns                       |
| ---------------------------------------------------------------- | --------------------------- | ----------------------------- |
| `get_model_health(schedule_name)`                                | Health status of all models | `List[ModelHealth]`           |
| `get_failing_models(schedule_name)`                              | Models with failed tests    | `List[ModelHealth]`           |
| `get_slowest_models(schedule_name, limit=10)`                    | Slowest running models      | `List[ModelHealth]`           |
| `get_test_results(schedule_name, failed_only=False)`             | Test results                | `List[TestResult]`            |
| `get_source_freshness(schedule_name)`                            | Source freshness status     | `List[SourceFreshness]`       |
| `get_health_dashboard(schedule_name)`                            | Aggregated health metrics   | `HealthDashboard`             |
| `get_performance_summary(schedule_name, days=7)`                 | Performance metrics         | `PerformanceMetrics`          |
| `get_upstream_health(model_name, schedule_name, max_depth=10)`   | Upstream dependencies       | `List[ModelDependency]`       |
| `get_downstream_impact(model_name, schedule_name, max_depth=10)` | Downstream impact           | `DependencyImpact`            |
| `get_model_health_stream(schedule_name, batch_size=100)`         | Stream models in batches    | `Iterator[List[ModelHealth]]` |
| `query_sql(sql, schedule_name, parameters=None)`                 | Custom SQL query            | `polars.DataFrame`            |
| `refresh_metadata(schedule_name)`                                | Force refresh from Bolt     | `None`                        |
| `clear_cache(schedule_name=None)`                                | Clear in-memory cache       | `None`                        |
| `close()`                                                        | Close the DuckDB connection | `None`                        |

#### Data Models

{% tabs %}
{% tab title="ModelHealth" %}

```python
class ModelHealth:
    unique_id: str
    name: str
    resource_type: ResourceType
    status: RunStatus            # success, error, fail, skipped
    execution_time: Optional[float]
    health_status: HealthStatus  # Healthy, Warning, Critical
    total_tests: int
    failed_tests: int
    schema_name: Optional[str]
    database_name: Optional[str]
    error_message: Optional[str]
    description: str
    tags: List[str]
    depends_on: List[str]
```

{% endtab %}

{% tab title="HealthDashboard" %}

```python
class HealthDashboard:
    schedule_name: str
    total_models: int
    healthy_models: int
    warning_models: int
    critical_models: int
    avg_execution_time: float
    total_tests: int
    failed_tests: int
    test_success_rate: float
    total_sources: int
    stale_sources: int
```

{% endtab %}

{% tab title="SourceFreshness" %}

```python
class SourceFreshness:
    unique_id: str
    source_name: str
    name: str
    table_name: str
    freshness_status: str        # pass, warn, error
    max_loaded_at: Optional[datetime]
    hours_since_load: Optional[float]
    error_after_hours: Optional[int]
    warn_after_hours: Optional[int]
```

{% endtab %}
{% endtabs %}

### Troubleshooting

#### Common Issues

{% tabs %}
{% tab title="Schedule not found" %}

```python
schedules = client.bolt.list_schedules()
print([s.name for s in schedules])
```

{% endtab %}

{% tab title="No sources returned" %}
Sources come from schedules running `dbt source freshness`. Make sure you're using the correct schedule name — it is likely different from your main dbt run schedule.

```python
# List all available schedules
schedules = client.bolt.list_schedules()
print([s.name for s in schedules])
```

{% endtab %}

{% tab title="Performance issues" %}
Use streaming for large datasets and tune the batch size:

```python
for batch in metadata.get_model_health_stream(schedule, batch_size=50):
    process_batch(batch)
```

Or use persistent DuckDB storage to avoid re-loading artifacts:

```python
MetadataClient(bolt_client=client.bolt, db_connection="./metadata.duckdb")
```

{% endtab %}

{% tab title="Stale data" %}

```python
metadata.clear_cache("schedule_name")
metadata.refresh_metadata("schedule_name")
```

{% endtab %}
{% endtabs %}

#### Error Handling

```python
try:
    models = metadata.get_model_health("schedule_name")
except ValueError as e:
    print(f"Schedule not found or no artifacts: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
finally:
    metadata.close()
```

{% hint style="success" %}
For advanced use cases, use `query_sql` to run custom SQL directly against the DuckDB metadata database and return results as a polars DataFrame.
{% endhint %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.paradime.io/app-help/developers/python-sdk/modules/metadata.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
