Airflow

The Paradime Airflow operator allows users to orchestrate and execute actions in Paradime as DAGs. Running dbt™️ with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models based on upstream dependencies in your data ecosystem.

Example DAG

Trigger a Bolt Schedule

In the example code below, we have an Airflow DAG to trigger a run for a Bolt schedule, we then check the status of the runID and extract the dbt™️ artefacts.

from airflow.decorators import dag  # type: ignore[import]

from paradime_dbt_provider.operators.paradime import ParadimeBoltDbtScheduleRunArtifactOperator, ParadimeBoltDbtScheduleRunOperator
from paradime_dbt_provider.sensors.paradime import ParadimeBoltDbtScheduleRunSensor

PARADIME_CONN_ID = "your_paradime_conn_id"  # Update this to your connection id
BOLT_SCHEDULE_NAME = "your_schedule_name"  # Update this to your schedule name


@dag(
    default_args={"conn_id": PARADIME_CONN_ID},
)
def run_schedule_and_download_manifest():
    # Run the schedule and return the run id as the xcom return value
    task_run_schedule = ParadimeBoltDbtScheduleRunOperator(task_id="run_schedule", schedule_name=BOLT_SCHEDULE_NAME)

    # Get the run id from the xcom return value
    run_id = "{{ task_instance.xcom_pull(task_ids='run_schedule') }}"

    # Wait for the schedule to complete before continuing
    task_wait_for_schedule = ParadimeBoltDbtScheduleRunSensor(task_id="wait_for_schedule", run_id=run_id)

    # Download the manifest.json file from the schedule run and return the path as the xcom return value
    task_download_manifest = ParadimeBoltDbtScheduleRunArtifactOperator(task_id="download_manifest", run_id=run_id, artifact_path="target/manifest.json")

    # Get the path to the manifest.json file from the xcom return value
    output_path = "{{ task_instance.xcom_pull(task_ids='download_manifest') }}"

    task_run_schedule >> task_wait_for_schedule >> task_download_manifest


run_schedule_and_download_manifest()

Trigger a Bolt Schedule with custom commands

In the example code below, we have an Airflow DAG to trigger a run for a Bolt schedule, we then override the dbt commands that this run will execute at runtime. We then check the status of the runID and extract the dbt™️ artefacts.

from airflow.decorators import dag  # type: ignore[import]

from paradime_dbt_provider.operators.paradime import ParadimeBoltDbtScheduleRunArtifactOperator, ParadimeBoltDbtScheduleRunOperator
from paradime_dbt_provider.sensors.paradime import ParadimeBoltDbtScheduleRunSensor

PARADIME_CONN_ID = "your_paradime_conn_id"  # Update this to your connection id
BOLT_SCHEDULE_NAME = "your_schedule_name"  # Update this to your schedule name


@dag(
    default_args={"conn_id": PARADIME_CONN_ID},
)
def run_schedule_with_custom_commands():
    # Define the custom commands to run
    custom_commands = ["dbt run", "dbt test"]

    # Run the schedule with custom commands and return the run id as the xcom return value
    task_run_schedule = ParadimeBoltDbtScheduleRunOperator(task_id="run_schedule", schedule_name=BOLT_SCHEDULE_NAME, commands=custom_commands)

    # Get the run id from the xcom return value
    run_id = "{{ task_instance.xcom_pull(task_ids='run_schedule') }}"

    # Wait for the schedule to complete before continuing
    task_wait_for_schedule = ParadimeBoltDbtScheduleRunSensor(task_id="wait_for_schedule", run_id=run_id)

    # Download the manifest.json file from the schedule run and return the path as the xcom return value
    task_download_manifest = ParadimeBoltDbtScheduleRunArtifactOperator(task_id="download_manifest", run_id=run_id, artifact_path="target/manifest.json")

    # Get the path to the manifest.json file from the xcom return value
    output_path = "{{ task_instance.xcom_pull(task_ids='download_manifest') }}"

    task_run_schedule >> task_wait_for_schedule >> task_download_manifest


run_schedule_with_custom_commands()

Last updated