# Metadata

## Overview

The Paradime **Python SDK Metadata** module helps you query and analyze **dbt™ metadata** from Bolt runs.

Use it to monitor **model health**, analyze **dbt test results**, track **source freshness**, and explore **upstream/downstream lineage**. It uses a local **DuckDB** database and returns results as high-performance **polars DataFrames**.

* Query dbt artifacts like `manifest.json`, `run_results.json`, and source freshness results
* Build reliability checks around failing models, failing tests, and stale sources
* Run ad-hoc SQL with `query_sql()` against the DuckDB metadata schema

{% hint style="info" %}
The Metadata SDK is available in Python SDK version 4.18.0+ and requires a Paradime account with access to dbt metadata.
{% endhint %}

### Quick Start

#### Basic Setup

```python
from paradime.client.paradime_client import Paradime

client = Paradime(
    api_endpoint="your-api-endpoint",
    api_key="your-api-key",
    api_secret="your-api-secret"
)

metadata = client.metadata
```

#### Simple Health Check

```python
models = metadata.get_model_health("daily_production_run")

successful = len([m for m in models if m.status.value == "success"])
print(f"✅ {successful}/{len(models)} models successful")
```

### Model Health Monitoring

#### Get Model Health Status

Monitor the health of all models in a dbt run, including execution status, test results, and performance metrics.

```python
models = metadata.get_model_health("schedule_name")

for model in models:
    print(f"{model.name}")
    print(f"  Status: {model.status.value}")
    print(f"  Health: {model.health_status.value}")
    print(f"  Tests: {model.total_tests} total, {model.failed_tests} failed")
    print(f"  Execution: {model.execution_time}s")
```

#### Filter by Health Status

{% tabs %}
{% tab title="Failing models" %}

```python
failing_models = metadata.get_failing_models("schedule_name")

for model in failing_models:
    print(f"{model.name} — {model.failed_tests} failed tests")
    if model.error_message:
        print(f"  Error: {model.error_message}")
```

{% endtab %}

{% tab title="Slowest models" %}

```python
slowest = metadata.get_slowest_models("schedule_name", limit=10)

for model in slowest:
    print(f"{model.name}: {model.execution_time}s")
```

{% endtab %}
{% endtabs %}

### Test Results

```python
# All test results
tests = metadata.get_test_results("schedule_name")

# Failed tests only
failed_tests = metadata.get_test_results("schedule_name", failed_only=True)

for test in failed_tests:
    print(f"❌ {test.test_name}")
    print(f"   Status: {test.status.value}")
    print(f"   Models: {test.depends_on_nodes}")
```

### Source Freshness

```python
sources = metadata.get_source_freshness("schedule_name")

for source in sources:
    print(f"{source.source_name}.{source.table_name}")
    print(f"  Status: {source.freshness_status.value}")
    print(f"  Last loaded: {source.max_loaded_at}")
    print(f"  Hours since load: {source.hours_since_load}")
```

{% hint style="warning" %}
Sources are loaded from a dedicated source freshness schedule (e.g. one running `dbt source freshness`), not from your main dbt run schedule.
{% endhint %}

### Custom SQL Queries

The metadata SDK exposes a DuckDB database you can query directly using SQL. Results are returned as polars DataFrames.

```python
sql = """
    SELECT
        schema_name,
        COUNT(*) as model_count,
        AVG(execution_time) as avg_execution_time
    FROM dbt_run_results
    WHERE schedule_name = ? AND resource_type = 'model'
    GROUP BY schema_name
    ORDER BY model_count DESC
"""

results = metadata.query_sql(sql, "schedule_name", ["schedule_name"])

for row in results.iter_rows(named=True):
    print(f"{row['schema_name']}: {row['model_count']} models")
```

{% hint style="info" %}
`query_sql` returns a `polars.DataFrame`. Use `.iter_rows(named=True)` to iterate with dict-style row access.
{% endhint %}

### Dependency Analysis

#### Upstream Dependencies

```python
dependencies = metadata.get_upstream_health(
    model_name="my_important_model",
    schedule_name="schedule_name",
    max_depth=5
)

for dep in dependencies:
    print(f"Level {dep.level}: {dep.name}")
    print(f"  Health: {dep.health_status.value}")
    print(f"  Status: {dep.status.value}")
```

#### Downstream Impact

```python
impact = metadata.get_downstream_impact(
    model_name="failed_model",
    schedule_name="schedule_name",
    max_depth=5
)

print(f"Impact of {impact.failed_model} failure:")
print(f"  Critical: {len(impact.critical_models)}")
print(f"  Warning: {len(impact.warning_models)}")
print(f"  Potentially affected: {len(impact.potentially_affected)}")
```

### Health Dashboard

```python
dashboard = metadata.get_health_dashboard("schedule_name")

print(f"Models — Total: {dashboard.total_models}")
print(f"  Healthy: {dashboard.healthy_models}")
print(f"  Warning: {dashboard.warning_models}")
print(f"  Critical: {dashboard.critical_models}")
print(f"Avg execution: {dashboard.avg_execution_time:.1f}s")
print(f"Test success rate: {dashboard.test_success_rate:.1f}%")
```

### Advanced Features

#### Performance Summary

```python
perf = metadata.get_performance_summary("schedule_name", days=7)

print(f"Avg execution: {perf.average_execution_time:.1f}s")
print(f"Success rate: {perf.success_rate:.1f}%")

for model in perf.slowest_models[:5]:
    print(f"  {model['name']}: {model['execution_time']:.1f}s")
```

#### Streaming Large Datasets

For large metadata datasets, stream results in batches to avoid memory pressure.

```python
for batch in metadata.get_model_health_stream("schedule_name", batch_size=100):
    for model in batch:
        if model.health_status.value == "Critical":
            print(f"Critical: {model.name}")
```

#### Cache Management

```python
metadata.clear_cache()                    # clear all
metadata.clear_cache("schedule_name")     # clear one schedule
metadata.refresh_metadata("schedule_name") # force refresh
```

### Configuration

#### Custom Database Connection

By default the SDK uses an in-memory DuckDB instance. Use a file path for persistent storage across sessions.

```python
from paradime.apis.metadata.client import MetadataClient

metadata_client = MetadataClient(
    bolt_client=client.bolt,
    db_connection="./metadata.duckdb",  # persistent
    cache_ttl_seconds=600               # 10 min cache
)
```

### Complete Example

```python
from paradime.client.paradime_client import Paradime

def monitor_dbt_health(schedule_name: str) -> None:
    client = Paradime(
        api_endpoint="your-api-endpoint",
        api_key="your-api-key",
        api_secret="your-api-secret"
    )
    metadata = client.metadata

    dashboard = metadata.get_health_dashboard(schedule_name)
    print(f"Models: {dashboard.healthy_models}✅  {dashboard.warning_models}⚠️  {dashboard.critical_models}❌")

    if dashboard.critical_models > 0:
        for model in metadata.get_failing_models(schedule_name)[:5]:
            print(f"  {model.name}: {model.failed_tests} failed tests")

    for model in metadata.get_slowest_models(schedule_name, limit=3):
        print(f"  {model.name}: {model.execution_time:.1f}s")

    stale = [s for s in metadata.get_source_freshness(schedule_name)
             if s.freshness_status.value != "pass"]
    for source in stale[:3]:
        print(f"  {source.source_name}.{source.table_name}: {source.hours_since_load:.1f}h stale")

    client.metadata.close()

monitor_dbt_health("daily_production_run")
```

### Database Schema

The Metadata SDK loads dbt artifacts into a local DuckDB database. You can query any of these tables directly via `query_sql`.

#### Core Tables

**`dbt_run_results`**

Main table containing execution results for all dbt resources (models, tests, seeds, snapshots).

| Column              | Type       | Description                            |
| ------------------- | ---------- | -------------------------------------- |
| `unique_id`         | VARCHAR    | Unique identifier for the dbt resource |
| `name`              | VARCHAR    | Resource name                          |
| `resource_type`     | VARCHAR    | `model`, `test`, `seed`, `snapshot`    |
| `status`            | VARCHAR    | `success`, `error`, `fail`, `skipped`  |
| `execution_time`    | DOUBLE     | Execution time in seconds              |
| `executed_at`       | TIMESTAMP  | When the resource was executed         |
| `schedule_name`     | VARCHAR    | Schedule identifier                    |
| `depends_on`        | VARCHAR\[] | Array of upstream dependency IDs       |
| `error_message`     | TEXT       | Error details if execution failed      |
| `schema_name`       | VARCHAR    | Database schema name                   |
| `database_name`     | VARCHAR    | Database name                          |
| `materialized_type` | VARCHAR    | Model materialization type             |
| `description`       | TEXT       | Resource description from dbt          |
| `tags`              | VARCHAR\[] | Array of dbt tags                      |
| `meta`              | JSON       | dbt meta configuration                 |
| `owner`             | VARCHAR    | Resource owner                         |
| `compiled_sql`      | TEXT       | Compiled SQL                           |
| `raw_sql`           | TEXT       | Raw SQL from dbt files                 |
| `columns`           | JSON       | Column information                     |
| `children_l1`       | VARCHAR\[] | Direct child dependencies              |
| `parents_models`    | VARCHAR\[] | Parent model dependencies              |
| `parents_sources`   | VARCHAR\[] | Parent source dependencies             |

**`dbt_source_freshness_results`**

| Column              | Type       | Description                |
| ------------------- | ---------- | -------------------------- |
| `unique_id`         | VARCHAR    | Unique source identifier   |
| `source_name`       | VARCHAR    | dbt source name            |
| `table_name`        | VARCHAR    | Source table name          |
| `schedule_name`     | VARCHAR    | Schedule identifier        |
| `freshness_status`  | VARCHAR    | `pass`, `warn`, `error`    |
| `max_loaded_at`     | TIMESTAMP  | Last data load timestamp   |
| `snapshotted_at`    | TIMESTAMP  | When freshness was checked |
| `hours_since_load`  | DOUBLE     | Hours since last data load |
| `error_after_hours` | INTEGER    | Error threshold in hours   |
| `warn_after_hours`  | INTEGER    | Warning threshold in hours |
| `database`          | VARCHAR    | Source database            |
| `schema_name`       | VARCHAR    | Source schema              |
| `description`       | TEXT       | Source description         |
| `meta`              | JSON       | Source metadata            |
| `tags`              | VARCHAR\[] | Source tags                |

**`model_metadata`**

Extended model metadata and lineage information from `manifest.json`.

| Column               | Type       | Description              |
| -------------------- | ---------- | ------------------------ |
| `unique_id`          | VARCHAR    | Model unique identifier  |
| `name`               | VARCHAR    | Model name               |
| `resource_type`      | VARCHAR    | Always `model`           |
| `depends_on`         | VARCHAR\[] | Direct dependencies      |
| `config`             | JSON       | dbt model configuration  |
| `tags`               | VARCHAR\[] | Model tags               |
| `meta`               | JSON       | Model metadata           |
| `description`        | TEXT       | Model description        |
| `columns`            | JSON       | Model column definitions |
| `parents`            | VARCHAR\[] | All parent resources     |
| `children`           | VARCHAR\[] | All child resources      |
| `original_file_path` | VARCHAR    | Path to dbt model file   |

#### Specialized Tables

| Table               | Description                                            |
| ------------------- | ------------------------------------------------------ |
| `dbt_test_data`     | Detailed test execution results with column-level info |
| `dbt_seed_data`     | Seed file load results                                 |
| `dbt_snapshot_data` | Snapshot execution results                             |
| `dbt_exposure_data` | dbt exposure metadata                                  |

#### Optimized Views

{% tabs %}
{% tab title="models\_with\_tests" %}
Pre-calculated model health with test counts.

```sql
SELECT unique_id, name, status, execution_time,
       total_tests, failed_tests,
       health_status  -- 'Healthy', 'Warning', 'Critical'
FROM models_with_tests
WHERE schedule_name = 'your_schedule'
```

{% endtab %}

{% tab title="latest\_model\_results" %}
Most recent result per model.

```sql
SELECT *
FROM latest_model_results
WHERE schedule_name = 'your_schedule'
ORDER BY execution_time DESC
```

{% endtab %}

{% tab title="dashboard\_metrics" %}
Aggregated metrics for dashboards.

```sql
SELECT total_models, healthy_models, warning_models,
       critical_models, avg_execution_time, test_success_rate
FROM dashboard_metrics
WHERE schedule_name = 'your_schedule'
```

{% endtab %}
{% endtabs %}

#### Example Queries

{% tabs %}
{% tab title="Failed tests" %}

```sql
SELECT name, failed_tests, total_tests,
       (failed_tests * 100.0 / total_tests) as failure_rate
FROM models_with_tests
WHERE schedule_name = ?
  AND total_tests > 0
ORDER BY failed_tests DESC
LIMIT 10
```

{% endtab %}

{% tab title="Performance by schema" %}

```sql
SELECT schema_name,
       COUNT(*) as model_count,
       AVG(execution_time) as avg_time,
       MAX(execution_time) as max_time
FROM dbt_run_results
WHERE schedule_name = ?
  AND resource_type = 'model'
GROUP BY schema_name
ORDER BY avg_time DESC
```

{% endtab %}

{% tab title="Source freshness alerts" %}

```sql
SELECT source_name, table_name, hours_since_load,
       CASE
           WHEN hours_since_load > error_after_hours THEN 'CRITICAL'
           WHEN hours_since_load > warn_after_hours  THEN 'WARNING'
           ELSE 'OK'
       END as alert_level
FROM dbt_source_freshness_results
WHERE schedule_name = ?
ORDER BY hours_since_load DESC
```

{% endtab %}

{% tab title="Downstream lineage" %}

```sql
WITH RECURSIVE downstream AS (
    SELECT unique_id, name, 0 as level
    FROM model_metadata
    WHERE name = ? AND schedule_name = ?

    UNION ALL

    SELECT m.unique_id, m.name, d.level + 1
    FROM model_metadata m
    JOIN downstream d ON d.unique_id = ANY(m.depends_on)
    WHERE d.level < 10
)
SELECT d.name, d.level, r.status, r.execution_time
FROM downstream d
LEFT JOIN dbt_run_results r ON d.unique_id = r.unique_id
WHERE d.level > 0
ORDER BY d.level, d.name
```

{% endtab %}
{% endtabs %}

### API Reference

#### MetadataClient

| Method                                                           | Description                 | Returns                       |
| ---------------------------------------------------------------- | --------------------------- | ----------------------------- |
| `get_model_health(schedule_name)`                                | Health status of all models | `List[ModelHealth]`           |
| `get_failing_models(schedule_name)`                              | Models with failed tests    | `List[ModelHealth]`           |
| `get_slowest_models(schedule_name, limit=10)`                    | Slowest running models      | `List[ModelHealth]`           |
| `get_test_results(schedule_name, failed_only=False)`             | Test results                | `List[TestResult]`            |
| `get_source_freshness(schedule_name)`                            | Source freshness status     | `List[SourceFreshness]`       |
| `get_health_dashboard(schedule_name)`                            | Aggregated health metrics   | `HealthDashboard`             |
| `get_performance_summary(schedule_name, days=7)`                 | Performance metrics         | `PerformanceMetrics`          |
| `get_upstream_health(model_name, schedule_name, max_depth=10)`   | Upstream dependencies       | `List[ModelDependency]`       |
| `get_downstream_impact(model_name, schedule_name, max_depth=10)` | Downstream impact           | `DependencyImpact`            |
| `get_model_health_stream(schedule_name, batch_size=100)`         | Stream models in batches    | `Iterator[List[ModelHealth]]` |
| `query_sql(sql, schedule_name, parameters=None)`                 | Custom SQL query            | `polars.DataFrame`            |
| `refresh_metadata(schedule_name)`                                | Force refresh from Bolt     | `None`                        |
| `clear_cache(schedule_name=None)`                                | Clear in-memory cache       | `None`                        |
| `close()`                                                        | Close the DuckDB connection | `None`                        |

#### Data Models

{% tabs %}
{% tab title="ModelHealth" %}

```python
class ModelHealth:
    unique_id: str
    name: str
    resource_type: ResourceType
    status: RunStatus            # success, error, fail, skipped
    execution_time: Optional[float]
    health_status: HealthStatus  # Healthy, Warning, Critical
    total_tests: int
    failed_tests: int
    schema_name: Optional[str]
    database_name: Optional[str]
    error_message: Optional[str]
    description: str
    tags: List[str]
    depends_on: List[str]
```

{% endtab %}

{% tab title="HealthDashboard" %}

```python
class HealthDashboard:
    schedule_name: str
    total_models: int
    healthy_models: int
    warning_models: int
    critical_models: int
    avg_execution_time: float
    total_tests: int
    failed_tests: int
    test_success_rate: float
    total_sources: int
    stale_sources: int
```

{% endtab %}

{% tab title="SourceFreshness" %}

```python
class SourceFreshness:
    unique_id: str
    source_name: str
    name: str
    table_name: str
    freshness_status: str        # pass, warn, error
    max_loaded_at: Optional[datetime]
    hours_since_load: Optional[float]
    error_after_hours: Optional[int]
    warn_after_hours: Optional[int]
```

{% endtab %}
{% endtabs %}

### Troubleshooting

#### Common Issues

{% tabs %}
{% tab title="Schedule not found" %}

```python
schedules = client.bolt.list_schedules()
print([s.name for s in schedules])
```

{% endtab %}

{% tab title="No sources returned" %}
Sources come from schedules running `dbt source freshness`. Make sure you're using the correct schedule name — it is likely different from your main dbt run schedule.

```python
# List all available schedules
schedules = client.bolt.list_schedules()
print([s.name for s in schedules])
```

{% endtab %}

{% tab title="Performance issues" %}
Use streaming for large datasets and tune the batch size:

```python
for batch in metadata.get_model_health_stream(schedule, batch_size=50):
    process_batch(batch)
```

Or use persistent DuckDB storage to avoid re-loading artifacts:

```python
MetadataClient(bolt_client=client.bolt, db_connection="./metadata.duckdb")
```

{% endtab %}

{% tab title="Stale data" %}

```python
metadata.clear_cache("schedule_name")
metadata.refresh_metadata("schedule_name")
```

{% endtab %}
{% endtabs %}

#### Error Handling

```python
try:
    models = metadata.get_model_health("schedule_name")
except ValueError as e:
    print(f"Schedule not found or no artifacts: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")
finally:
    metadata.close()
```

{% hint style="success" %}
For advanced use cases, use `query_sql` to run custom SQL directly against the DuckDB metadata database and return results as a polars DataFrame.
{% endhint %}
