Using Merge for Incremental Models
The merge method uses your database's MERGE statement (or equivalent) to update existing records and insert new ones. This method provides the most control over how your incremental model is updated but requires more computational resources.
When to Use Merge Method
Tables requiring both inserts and updates
When data consistency is critical
When specific columns need updating
For maintaining referential integrity
Advantages and Trade-offs
Most flexible method
Slower than other methods
Precise control over column updates
More resource-intensive
Maintains data consistency
Can be costly for very large datasets
Handles complex update patterns
Requires a unique key
Example Implementation
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge' // Default, can be omitted
)
}}
SELECT
event_id,
event_type,
event_time,
user_id,
properties
FROM {{ ref('stg_events') }}
{% if is_incremental() %}
WHERE event_time > (SELECT MAX(event_time) FROM {{ this }})
{% endif %}
In this example:
unique_key
identifies which records should be updatedThe
WHERE
clause in theis_incremental()
block filters for new recordsExisting records with matching
event_id
values will be updatedNew records will be inserted
Fine-Tuning Merge Operations
You can precisely control which columns are updated during merge operations:
{{
config(
materialized='incremental',
unique_key='user_id',
incremental_strategy='merge',
merge_update_columns=['email', 'last_login_at', 'profile_updated_at'], -- Only update these columns
merge_exclude_columns=['created_at', 'signup_source'] -- Never update these columns
)
}}
This configuration gives you fine-grained control over the merge process.
Step By Step Guide: Merge Method Implementation
Let's see how the merge method works with a practical example.
Step 1: Initial Setup and Testing
First, create the staging model:
-- models/staging/stg_test_source.sql
WITH source_data AS (
SELECT 1 as id,
'active' as status,
100 as amount,
CURRENT_TIMESTAMP() as updated_at
UNION ALL
SELECT 2, 'pending', 200, CURRENT_TIMESTAMP()
UNION ALL
SELECT 3, 'active', 300, CURRENT_TIMESTAMP()
)
SELECT * FROM source_data
Run staging model:
dbt run --select stg_test_source
Verify staging data in Snowflake:
SELECT
*
FROM
your_database.your_schema.stg_test_source
ORDER BY
id
Expected Result:
ID | STATUS | AMOUNT | UPDATED_AT
1 | active | 100 | [timestamp]
2 | pending | 200 | [timestamp]
3 | active | 300 | [timestamp]
Create the incremental model:
-- models/incremental_test.sql
{{
config(
materialized='incremental',
unique_key='id'
)
}}
SELECT
id,
status,
amount,
updated_at
FROM {{ ref('stg_test_source') }}
{% raw %}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
{% endraw %}
Run incremental model:
dbt run --select incremental_test
The result should match the staging model (3 rows).
Step 2: Testing Merge Method
Update staging model with changes:
-- models/staging/stg_test_source.sql
WITH source_data AS (
SELECT 1 as id,
'inactive' as status, -- Changed status
150 as amount, -- Changed amount
CURRENT_TIMESTAMP() as updated_at
UNION ALL
SELECT 4, 'new', 400, CURRENT_TIMESTAMP() -- New record
)
SELECT * FROM source_data
Run staging model:
dbt run --select stg_test_source
Verify staging data:
SELECT
*
FROM
your_database.your_schema.stg_test_source
ORDER BY
id;
Expected Result:
ID | STATUS | AMOUNT | UPDATED_AT
1 | inactive | 150 | [new_timestamp]
4 | new | 400 | [new_timestamp]
Run incremental model:
dbt run --select incremental_test
Verify incremental model:
SELECT
*
FROM
your_database.your_schema.incremental_test
ORDER BY
id
Expected Result:
ID | STATUS | AMOUNT | UPDATED_AT
1 | inactive | 150 | [new_timestamp] -- Updated
2 | pending | 200 | [old_timestamp] -- Unchanged
3 | active | 300 | [old_timestamp] -- Unchanged
4 | new | 400 | [new_timestamp] -- New
Notice that record with ID 1 was updated with the new status and amount, while new record with ID 4 was inserted. Records with IDs 2 and 3 remained unchanged as they weren't in the staging data.
Common Issues and Solutions
Merge performance with large tables
Use incremental_predicates
to limit the scope of the merge operation
Unexpected column updates
Use merge_update_columns
or merge_exclude_columns
for precise control
Duplicate key errors
Ensure your unique_key
truly identifies records uniquely
The merge method offers the most flexibility for incremental models, but comes with higher computational costs. Consider it your default choice unless you have specific performance requirements or data characteristics that favor another method.
Last updated
Was this helpful?