The Paradime Airflow operator allows users to orchestrate and execute actions in Paradime as DAGs. Running dbt™️ with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models based on upstream dependencies in your data ecosystem.
In the example code below, we have an Airflow DAG to trigger a run for a Bolt schedule, we then check the status of the runID and extract the dbt™️ artefacts.
from airflow.decorators import dag # type: ignore[import]from paradime_dbt_provider.operators.paradime import ParadimeBoltDbtScheduleRunArtifactOperator, ParadimeBoltDbtScheduleRunOperator
from paradime_dbt_provider.sensors.paradime import ParadimeBoltDbtScheduleRunSensorPARADIME_CONN_ID ="your_paradime_conn_id"# Update this to your connection idBOLT_SCHEDULE_NAME ="your_schedule_name"# Update this to your schedule name@dag( default_args={"conn_id": PARADIME_CONN_ID},)defrun_schedule_and_download_manifest():# Run the schedule and return the run id as the xcom return value task_run_schedule =ParadimeBoltDbtScheduleRunOperator(task_id="run_schedule", schedule_name=BOLT_SCHEDULE_NAME)# Get the run id from the xcom return value run_id ="{{ task_instance.xcom_pull(task_ids='run_schedule') }}"# Wait for the schedule to complete before continuing task_wait_for_schedule =ParadimeBoltDbtScheduleRunSensor(task_id="wait_for_schedule", run_id=run_id)# Download the manifest.json file from the schedule run and return the path as the xcom return value task_download_manifest = ParadimeBoltDbtScheduleRunArtifactOperator(task_id="download_manifest", run_id=run_id, artifact_path="target/manifest.json")
# Get the path to the manifest.json file from the xcom return value output_path ="{{ task_instance.xcom_pull(task_ids='download_manifest') }}" task_run_schedule >> task_wait_for_schedule >> task_download_manifestrun_schedule_and_download_manifest()
Trigger a Bolt Schedule with custom commands
In the example code below, we have an Airflow DAG to trigger a run for a Bolt schedule, we then override the dbt commands that this run will execute at runtime. We then check the status of the runID and extract the dbt™️ artefacts.
from airflow.decorators import dag # type: ignore[import]from paradime_dbt_provider.operators.paradime import ParadimeBoltDbtScheduleRunArtifactOperator, ParadimeBoltDbtScheduleRunOperator
from paradime_dbt_provider.sensors.paradime import ParadimeBoltDbtScheduleRunSensorPARADIME_CONN_ID ="your_paradime_conn_id"# Update this to your connection idBOLT_SCHEDULE_NAME ="your_schedule_name"# Update this to your schedule name@dag( default_args={"conn_id": PARADIME_CONN_ID},)defrun_schedule_with_custom_commands():# Define the custom commands to run custom_commands = ["dbt run","dbt test"]# Run the schedule with custom commands and return the run id as the xcom return value task_run_schedule = ParadimeBoltDbtScheduleRunOperator(task_id="run_schedule", schedule_name=BOLT_SCHEDULE_NAME, commands=custom_commands)
# Get the run id from the xcom return value run_id ="{{ task_instance.xcom_pull(task_ids='run_schedule') }}"# Wait for the schedule to complete before continuing task_wait_for_schedule =ParadimeBoltDbtScheduleRunSensor(task_id="wait_for_schedule", run_id=run_id)# Download the manifest.json file from the schedule run and return the path as the xcom return value task_download_manifest = ParadimeBoltDbtScheduleRunArtifactOperator(task_id="download_manifest", run_id=run_id, artifact_path="target/manifest.json")
# Get the path to the manifest.json file from the xcom return value output_path ="{{ task_instance.xcom_pull(task_ids='download_manifest') }}" task_run_schedule >> task_wait_for_schedule >> task_download_manifestrun_schedule_with_custom_commands()