Incremental Materialization

Incremental models allow you to update only new or modified data in your warehouse instead of rebuilding entire tables. This optimization is particularly valuable when working with:

  • Large datasets (millions/billions of rows)

  • Computationally expensive transformations

Basic Configuration

While these examples use Snowflake syntax, the core concepts apply to most data warehouses. Specific syntax and available features may vary by platform.

Here's a simple incremental model configuration:

{{
  config(
    materialized='incremental',
    unique_key='id'
  )
}}

SELECT 
    id,
    status,
    amount,
    updated_at
FROM {{ ref('stg_source') }}

{% if is_incremental() %}
    -- This filter will only be applied on an incremental run
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Key Components

  1. Materialization Config: Set materialized='incremental' in your config block

  2. Unique Key: Define what makes each row unique (single column or multiple columns)

  3. Incremental Logic: Use the is_incremental() macro to filter for new/changed records


Incremental Strategies

Understanding which strategy to use is crucial for optimal performance. Snowflake supports three strategies, each with specific use cases:

The merge strategy uses Snowflake's MERGE statement to update existing records and insert new ones. This strategy provides the most control over how your incremental model is updated but requires more computational resources.

When to use:

  • Tables requiring both inserts and updates

  • When you need to update specific columns

  • When data consistency is critical

Advantages:

  • Most flexible strategy

  • Precise control over column updates

  • Maintains data consistency

  • Handles complex update patterns

Trade-offs:

  • Slower than other strategies

  • More resource-intensive

  • Can be costly for very large datasets

Example Implementation:

{{
  config(
    materialized='incremental',
    incremental_strategy='merge',
    unique_key='id',
  )
}}

SELECT 
    id,
    status,
    amount,
    updated_at
FROM {{ ref('stg_source') }}

{% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Configuration Examples

This guide walks you through building and testing an incremental model with different strategies. You'll learn how to:

  • Set up an initial incremental model

  • Test how data changes are handled

  • Verify results in your warehouse

  • Try different incremental strategies

Requirements

  • Access to your data warehouse

  • Basic understanding of dbt™ models

  • dbt™ project set up

Configuration Examples

Step 1: Initial Setup and Testing

  1. First, create the staging model:

-- models/staging/stg_test_source.sql
WITH source_data AS (
    SELECT 1 as id,
           'active' as status,
           100 as amount,
           CURRENT_TIMESTAMP() as updated_at
    UNION ALL
    SELECT 2, 'pending', 200, CURRENT_TIMESTAMP()
    UNION ALL
    SELECT 3, 'active', 300, CURRENT_TIMESTAMP()
)
SELECT * FROM source_data
  1. Run staging model:

dbt run --select stg_test_source
  1. Verify staging data in Snowflake:

SELECT * FROM your_database.your_schema.stg_test_source ORDER BY id;

Expected Result:

ID | STATUS  | AMOUNT | UPDATED_AT
1  | active  | 100    | [timestamp]
2  | pending | 200    | [timestamp]
3  | active  | 300    | [timestamp]
  1. Create the incremental model:

-- models/incremental_test.sql
{{
  config(
    materialized='incremental',
    unique_key='id'
  )
}}

SELECT 
    id,
    status,
    amount,
    updated_at
FROM {{ ref('stg_test_source') }}

{% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
  1. Run incremental model:

dbt run --select incremental_test
  1. Verify incremental model in Snowflake:

SELECT * FROM your_database.your_schema.incremental_test ORDER BY id;

Expected Result: Same as staging model (3 rows)

Step 2: Testing Merge Strategy (Default)

  1. Update staging model with changes:

-- models/staging/stg_test_source.sql
WITH source_data AS (
    SELECT 1 as id,
           'inactive' as status,  -- Changed status
           150 as amount,         -- Changed amount
           CURRENT_TIMESTAMP() as updated_at
    UNION ALL
    SELECT 4, 'new', 400, CURRENT_TIMESTAMP()  -- New record
)
SELECT * FROM source_data
  1. Run staging model:

dbt run --select stg_test_source
  1. Verify staging data:

SELECT * FROM your_database.your_schema.stg_test_source ORDER BY id;

Expected Result:

ID | STATUS   | AMOUNT | UPDATED_AT
1  | inactive | 150    | [new_timestamp]
4  | new      | 400    | [new_timestamp]
  1. Run incremental model:

dbt run --select incremental_test
  1. Verify incremental model:

SELECT * FROM your_database.your_schema.incremental_test ORDER BY id;

Expected Result:

ID | STATUS   | AMOUNT | UPDATED_AT
1  | inactive | 150    | [new_timestamp]  -- Updated
2  | pending  | 200    | [old_timestamp]  -- Unchanged
3  | active   | 300    | [old_timestamp]  -- Unchanged
4  | new      | 400    | [new_timestamp]  -- New

Step 3: Testing Delete+Insert Strategy

  1. Update incremental model config (keep same staging data):

-- models/incremental_test.sql
{{
  config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='id'
  )
}}

[Rest of the model SQL stays the same]
  1. Run incremental model:

dbt run --select incremental_test
  1. Verify in Snowflake:

SELECT * FROM your_database.your_schema.incremental_test ORDER BY id;

Expected Result: Similar to merge strategy but with updated timestamps

Step 4: Testing Append Strategy

  1. Update incremental model config:

-- models/incremental_test.sql
{{
  config(
    materialized='incremental',
    incremental_strategy='append'
  )
}}

[Rest of the model SQL stays the same]
  1. Run incremental model:

dbt run --full-refresh --select incremental_test
  1. Verify initial state:

SELECT * FROM your_database.your_schema.incremental_test ORDER BY id;
  1. Update staging data to trigger append:

-- models/staging/stg_test_source.sql
WITH source_data AS (
    SELECT 1 as id,
           'inactive' as status,
           150 as amount,
           CURRENT_TIMESTAMP() as updated_at
    UNION ALL
    SELECT 5, 'newest', 500, CURRENT_TIMESTAMP()  -- Another new record
)
SELECT * FROM source_data
  1. Run both models:

dbt run --select stg_test_source
dbt run --select incremental_test
  1. Verify append results:

SELECT * FROM your_database.your_schema.incremental_test ORDER BY updated_at;

Expected Result: Previous records plus new appended records


Best Practices

1. Handle Late-Arriving Data

Data doesn't always arrive in perfect chronological order due to network delays, system outages, or batch processing. Always include a buffer period in your incremental logic:

{% if is_incremental() %}
    -- Look back 3 days to catch late-arriving data
    WHERE updated_at > DATEADD(days, -3, (SELECT MAX(updated_at) FROM {{ this }}))
{% endif %}

Adjust the look-back period (3 days in this example) based on your data patterns and service level agreements (SLAs).

2. Manage Schema Changes

Tables evolve over time with new columns being added or modified. Use the on_schema_change configuration to handle these changes:

{{
  config(
    materialized='incremental',
    on_schema_change='sync_all_columns'  -- Handles column changes automatically
  )
}}

Options explained:

  • sync_all_columns: Automatically adapts to column changes (recommended)

  • fail: Halts execution when schema changes (useful during development)

  • ignore: Maintains existing schema (use cautiously)

3. Optimize Performance

Use appropriate configurations to improve query performance and efficiency:

{{
  config(
    materialized='incremental',
    unique_key='id',
    cluster_by=['date_field', 'customer_id'],  -- Improve query performance
    merge_update_columns=['status', 'amount']  -- Minimize update overhead
  )
}}
  • cluster_by: Organizes data physically for faster filtering

  • merge_update_columns: Specifies which columns to update, reducing overhead

4. Regular Refreshes

To prevent potential data inconsistencies that might accumulate over time, periodically rebuild your entire table to ensure data consistency:

dbt run --full-refresh --select model_name

Advanced Features

1. Multiple Column Unique Keys

When a single column isn't enough to identify unique records:

{{
  config(
    materialized='incremental',
    unique_key=['customer_id', 'order_date']
  )
}}

This is useful for scenarios like daily customer metrics where uniqueness depends on both customer and date.

2. Custom Update Columns

Control exactly which columns should be updated during merge operations:

{{
  config(
    materialized='incremental',
    merge_update_columns=['status', 'amount', 'updated_at']  -- Only update these
  )
}}

This is particularly useful when you want to preserve certain column values while updating others.

Remember: While these examples use Snowflake syntax, the core concepts apply to most data warehouses. Specific syntax and available features may vary by platform.

Last updated