Skip to main content

Orchestrate dbt with Airflow

dbt is an open-source library for analytics engineering that helps users build interdependent SQL models for in-warehouse data transformation. As ephemeral compute becomes more readily available in data warehouses thanks to tools like Snowflake, dbt has become a key component of the modern data engineering workflow. Now, data engineers can use dbt to write, organize, and run in-warehouse transformations of raw data.

Organizations can use Airflow to orchestrate and execute dbt models as DAGs. Running dbt with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models only after every prerequisite task is met. Airflow also gives you fine-grained control over dbt tasks such that teams have observability over every step in their dbt models.

In this guide, you'll:

  • Use the dbt Cloud Provider to orchestrate dbt Cloud with Airflow.
  • Review two common use cases for orchestrating dbt Core with Airflow with the BashOperator:
  • Learn how to extend the model-level use case by automating changes to a dbt model.
info

The dbt Core sections of this guide summarize some of the key practices and findings from our blog series with Updater and Sam Bail about using dbt in Airflow. For more information, check out Part 1, Part 2, and Part 3 of the series.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

dbt Cloud

To orchestrate dbt Cloud jobs with Airflow, you can use the dbt Cloud provider, which contains the following useful modules:

  • DbtCloudRunJobOperator: Executes a dbt Cloud job.
  • DbtCloudGetJobRunArtifactOperator: Downloads artifacts from a dbt Cloud job run.
  • DbtCloudJobRunSensor: Waits for a dbt Cloud job run to complete.
  • DbtCloudHook: Interacts with dbt Cloud using the V2 API.

To use the dbt Cloud provider in your DAGs, you'll need to complete the following steps:

  1. Add the apache-airflow-providers-dbt-cloud package to your Airflow environment. If you are working in an Astro project, you can add the package to your requirements.txt file.
  2. Set up an Airflow connection to your dbt Cloud instance. The connection type should be dbt Cloud, and it should include an API token from your dbt Cloud account. If you want your dbt Cloud Provider tasks to use a default account ID, you can add that to the connection, but it is not required.

In the DAG below, you'll review a simple implementation of the dbt Cloud provider. This example showcases how to run a dbt Cloud job from Airflow, while adding an operational check to ensure the dbt Cloud job is not running prior to triggering. The DbtCloudHook provides a list_job_runs() method which can be used to retrieve all runs for a given job. The operational check uses this method to retrieve the latest triggered run for a job and check its status. If the job is currently not in a state of 10 (Success), 20 (Error), or 30 (Canceled), the pipeline will not try to trigger another run.

from pendulum import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.utils.edgemodifier import Label

DBT_CLOUD_CONN_ID = "dbt"
JOB_ID = "{{ var.value.dbt_cloud_job_id }}"

def _check_job_not_running(job_id):
"""
Retrieves the last run for a given dbt Cloud job and checks to see if the job is not currently running.
"""
hook = DbtCloudHook(DBT_CLOUD_CONN_ID)
runs = hook.list_job_runs(job_definition_id=job_id, order_by="-id")
latest_run = runs[0].json()["data"][0]

return DbtCloudJobRunStatus.is_terminal(latest_run["status"])

@dag(
start_date=datetime(2022, 2, 10),
schedule_interval="@daily",
catchup=False,
default_view="graph",
doc_md=__doc__,
)
def check_before_running_dbt_cloud_job():
begin, end = [EmptyOperator(task_id=id) for id in ["begin", "end"]]

check_job = ShortCircuitOperator(
task_id="check_job_is_not_running",
python_callable=_check_job_not_running,
op_kwargs={"job_id": JOB_ID},
)

trigger_job = DbtCloudRunJobOperator(
task_id="trigger_dbt_cloud_job",
dbt_cloud_conn_id=DBT_CLOUD_CONN_ID,
job_id=JOB_ID,
check_interval=600,
timeout=3600,
)

begin >> check_job >> Label("Job not currently running. Proceeding.") >> trigger_job >> end

dag = check_before_running_dbt_cloud_job()

In the DbtCloudRunJobOperator you must provide the dbt connection ID as well as the job_id of the job you are triggering.

info

The full code for this example, along with other DAGs that implement the dbt Cloud Provider, can be found on the Astronomer Registry.

dbt Core

When orchestrating dbt Core with Airflow, a straightforward DAG design is to run dbt commands directly through the BashOperator. In this section you'll review two use cases that demonstrate how it's done.

Use case 1: dbt Core and Airflow at the project level

For this example you'll use the BashOperator, which simply executes a shell command, because it lets us run specific dbt commands. The primary dbt interface is the command line, so the BashOperator is one of the best tools for managing dbt. You can execute dbt run or dbt test directly in Airflow as you would with any other shell.

Note: The code for this example can be found on the Astronomer Registry.

The DAG below uses the BashOperator to run a dbt project and the models' associated tests, each in a single Task:

from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import datetime
from airflow.utils.dates import timedelta


with DAG(
dag_id='dbt_dag',
start_date=datetime(2021, 12, 23),
description='An Airflow DAG to invoke simple dbt commands',
schedule_interval=timedelta(days=1),
) as dag:

dbt_run = BashOperator(
task_id='dbt_run',
bash_command='dbt run'
)

dbt_test = BashOperator(
task_id='dbt_test',
bash_command='dbt test'
)

dbt_run >> dbt_test

Using the BashOperator to run dbt run and dbt test is a working solution for simple use cases or when you would rather have dbt manage dependencies between models. If you need something quick to develop and deploy that has the full power of dbt behind it, then this is the solution for you. However, running dbt at the project-level has several issues:

  • Low observability into what execution state the project is in.
  • Failures are absolute and require the whole dbt group of models to be run again, which can be costly.

Use case 2: dbt Core and Airflow at the model level

What if you need more visibility into the steps dbt is running in each task? Instead of running a group of dbt models on a single task, you can write a DAG that runs a task for each model. Using this method, our dbt workflow is more controllable because you can see the successes, failures, and retries of each dbt model in its corresponding Airflow task. If a model near the end of our dbt pipeline fails, you can fix the broken model and retry that individual task without having to rerun the entire workflow. Also, you no longer have to worry about defining Sensors to configure interdependency between Airflow DAGs because you've consolidated your work into a single DAG. Our friends at Updater came up with this solution.

To make this work, you need a file that's generated by dbt called manifest.json. This file is generated in the target directory of your dbt project and contains a full representation of your dbt project. For more information on this file, see the dbt documentation.

Our DAG will read the manifest.json file, parse it, create the necessary BashOperator Airflow tasks, and then set the dependencies to match those of your dbt project. The end result is that each model in your dbt project maps to two tasks in your Airflow DAG: one task to run the model, and another task to run the tests associated with that model. All of these models will run in the appropriate order thanks to the task dependencies you've set. You implement this workflow using the following DAG:

import datetime
import json

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import datetime
from airflow.utils.dates import timedelta


dag = DAG(
dag_id='dbt_dag',
start_date=datetime(2020, 12, 23),
description='A dbt wrapper for Airflow',
schedule_interval=timedelta(days=1),
)

def load_manifest():
local_filepath = "/usr/local/airflow/dags/dbt/target/manifest.json"
with open(local_filepath) as f:
data = json.load(f)

return data

def make_dbt_task(node, dbt_verb):
"""Returns an Airflow operator either run and test an individual model"""
DBT_DIR = "/usr/local/airflow/dags/dbt"
GLOBAL_CLI_FLAGS = "--no-write-json"
model = node.split(".")[-1]

if dbt_verb == "run":
dbt_task = BashOperator(
task_id=node,
bash_command=f"""
cd {DBT_DIR} &&
dbt {GLOBAL_CLI_FLAGS} {dbt_verb} --target prod --models {model}
""",
dag=dag,
)

elif dbt_verb == "test":
node_test = node.replace("model", "test")
dbt_task = BashOperator(
task_id=node_test,
bash_command=f"""
cd {DBT_DIR} &&
dbt {GLOBAL_CLI_FLAGS} {dbt_verb} --target prod --models {model}
""",
dag=dag,
)

return dbt_task

data = load_manifest()

dbt_tasks = {}
for node in data["nodes"].keys():
if node.split(".")[0] == "model":
node_test = node.replace("model", "test")

dbt_tasks[node] = make_dbt_task(node, "run")
dbt_tasks[node_test] = make_dbt_task(node, "test")

for node in data["nodes"].keys():
if node.split(".")[0] == "model":

# Set dependency to run tests on a model after model runs finishes
node_test = node.replace("model", "test")
dbt_tasks[node] >> dbt_tasks[node_test]

# Set all model -> model dependencies
for upstream_node in data["nodes"][node]["depends_on"]["nodes"]:

upstream_node_type = upstream_node.split(".")[0]
if upstream_node_type == "model":
dbt_tasks[upstream_node] >> dbt_tasks[node]
info

The code for this example can be found on the Astronomer Registry.

Now you have a solution that orchestrates a dbt project in detail, giving data engineers visibility into each dbt model without affecting each model's dependencies. In the next section, you'll see how to make this DAG resilient to change.

Bonus 1: Productionizing with CI/CD

For the team at Updater, splitting the dbt models into tasks was only the first step. The next questions they tackled are:

  • How do you automatically update manifest.json so that you don't need to manually copy it, paste it into our Airflow project, and redeploy our Airflow environment every time our dbt project changes?
  • How do you extend our workflows to accommodate multiple DAGs and schedule intervals?
  • How do you contextualize this approach in the case of a broader ELT pipeline?

Running our dbt models on a single DAG has some limitations. Specifically, this implementation cannot handle running different groups of dbt models on different schedules.

To add this functionality, you can take a group of models defined by some selector, such as dbt run --models tag:hourly, and deploy that set of models as their own Airflow DAG with its own defined schedule. You can then use your manifest.json file to set dependencies between these groups of models and build out a robust CI process. To do this, you can:

  1. Use the selectors.yml file (introduced in dbt 0.18) to define a set of model selectors for each Airflow DAG schedule you want to create. You can then use dbt's tagging feature to tag every model with a desired schedule interval.

  2. Use a CI/CD provider to run a Python script that:

    1. Runs dbt compile to create a fresh copy of manifest.json

    2. Reads the model selectors defined in the YAML file

    3. Uses the dbt ls command to list all of the models associated with each model selector in the YAML file

    4. Turns the dbt DAG from manifest.json into a Graph object with the networkx library

    5. Uses the methods available on the Graph object to figure out the correct set of dependencies for each group of models defined in the YAML file

    6. Writes the dependencies for each group of models (stored as a list of tuples) as a pickle file to local storage.

      Here is what that Python script looks like in practice:

      import yaml
      import os
      import json
      import networkx as nx
      import pickle

      # README
      # This file is a utility script that is run using CircleCI in the deploy
      # step. It is not run by Airflow in any way. The point of this script is
      # to generate a pickle file that contains all of the dependencies between dbt models
      # for each DAG (usually corresponding to a different schedule) that you want
      # to run.

      def load_manifest():
      """Load manifest.json """
      local_filepath = f"{DBT_DIR}/target/manifest.json"
      with open(local_filepath) as f:
      data = json.load(f)
      return data

      def load_model_selectors():
      """Load the dbt selectors from YAML file to be used with dbt ls command"""
      with open(f"{DBT_DIR}/selectors.yml") as f:
      dag_model_selectors = yaml.full_load(f)
      selected_models = {}
      for selector in dag_model_selectors["selectors"]:
      selector_name = selector["name"]
      selector_def = selector["definition"]
      selected_models[selector_name] = selector_def
      return selected_models

      def parse_model_selector(selector_def):
      """Run the dbt ls command which returns all dbt models associated with a particular
      selection syntax"""
      models = os.popen(f"cd {DBT_DIR} && dbt ls --models {selector_def}").read()
      models = models.splitlines()
      return models

      def generate_all_model_dependencies(all_models, manifest_data):
      """Generate dependencies for entire project by creating a list of tuples that
      represent the edges of the DAG"""
      dependency_list = []
      for node in all_models:
      # Cleaning things up to match node format in manifest.json
      split_node = node.split(".")
      length_split_node = len(split_node)
      node = split_node[0] + "." + split_node[length_split_node - 1]
      node = "model." + node
      node_test = node.replace("model", "test")
      # Set dependency to run tests on a model after model runs finishes
      dependency_list.append((node, node_test))
      # Set all model -> model dependencies
      for upstream_node in manifest_data["nodes"][node]["depends_on"]["nodes"]:
      upstream_node_type = upstream_node.split(".")[0]
      upstream_node_name = upstream_node.split(".")[2]
      if upstream_node_type == "model":
      dependency_list.append((upstream_node, node))
      return dependency_list

      def clean_selected_task_nodes(selected_models):
      """Clean up the naming of the "selected" nodes so they match the structure of what
      is coming out of the generate_all_model_dependencies function. This function doesn't create
      a list of dependencies between selected nodes (that happens in generate_dag_dependencies), rather
      it's just cleaning up the naming of the nodes and outputting them as a list"""
      selected_nodes = []
      for node in selected_models:
      # Cleaning things up to match node format in manifest.json
      split_node = node.split(".")
      length_split_node = len(split_node)
      node = split_node[0] + "." + split_node[length_split_node - 1]
      # Adding run model nodes
      node = "model." + node
      selected_nodes.append(node)
      # Set test model nodes
      node_test = node.replace("model", "test")
      selected_nodes.append(node_test)
      return selected_nodes

      def generate_dag_dependencies(selected_nodes, all_model_dependencies):
      """Return dependencies as list of tuples for a given DAG (set of models)"""
      G = nx.DiGraph()
      G.add_edges_from(all_model_dependencies)
      G_subset = G.copy()
      for node in G:
      if node not in selected_nodes:
      G_subset.remove_node(node)
      selected_dependencies = list(G_subset.edges())
      return selected_dependencies

      def run():
      """Gets a list of all models in the project and creates dependencies.
      You want to load all the models first because the logic to properly set
      dependencies between subsets of models is basically
      removing nodes from the complete DAG. This logic can be found in the
      generate_dag_dependencies function. The networkx graph object is smart
      enough that if you remove nodes with remove_node method that the dependencies
      of the remaining nodes are what you would expect.
      """
      manifest_data = load_manifest()
      all_models = parse_model_selector("updater_data_model")
      all_model_dependencies = generate_all_model_dependencies(all_models, manifest_data)
      # Load model selectors
      dag_model_selectors = load_model_selectors()
      for dag_name, selector in dag_model_selectors.items():
      selected_models = parse_model_selector(selector)
      selected_nodes = clean_selected_task_nodes(selected_models)
      dag_dependencies = generate_dag_dependencies(selected_nodes, all_model_dependencies)
      with open(f"{DBT_DIR}/dbt_dags/data/{dag_name}.pickle", "wb") as f:
      pickle.dump(dag_dependencies, f)

      # RUN IT
      DBT_DIR = "./dags/dbt"
      run()
  3. Create an Airflow DAG file for each group of models. Each DAG reads the associated pickle file, creates the required dbt model run/test tasks, and sets dependencies between them as specified in the pickle file. One of those DAGs might look something like this:

      with DAG(
    dag_id="dbt_dag",
    schedule_interval="@daily",
    max_active_runs=1,
    catchup=False,
    start_date=datetime(2021, 1, 1)
    ) as dag:
    # Load dependencies from configuration file
    dag_def = load_dag_def_pickle(f"{DAG_NAME}.pickle")

    # Returns a dictionary of bash operators corresponding to dbt models/tests
    dbt_tasks = create_task_dict(dag_def)

    # Set dependencies between tasks according to config file
    for edge in dag_def:
    dbt_tasks[edge[0]] >> dbt_tasks[edge[1]]
info

The functions in the DAG file above have been split out for simplicity, but the logic can be found in the dbt_advanced.py DAG.

Putting all of this together, you end up with multiple Airflow DAGs, each running on its own defined schedule, with a specified group of interdependent dbt models running as individual tasks. With this system, running a production dbt model in Airflow is simple: all you need to do is tag a model with the appropriate schedule interval, and it will automatically get picked up and executed by the corresponding Airflow DAG.

Ultimately, this gives us a robust, end-to-end solution that captures the ideal scheduling, execution, and observability experience for running dbt models with Airflow.

With that said, this implementation still has some limitations. For a more in-depth consideration of the benefits and downsides of each implementation, see Part 2 of our Airflow/dbt blog series.

Bonus 2: DbtDagParser utility

The sample code provided in the previous section demonstrates how to loop through the manifest.json file of your DAG to parse out individual models and map them to Airflow tasks. To simplify the DAG code when using this pattern, you can use a convenient utility method that takes care of the parsing. The DbtDagParser utility, developed and explained by Sam Bail, works as follows:

  • The parser takes the dbt project path containing the dbt_project.yml file, as well as the path to the profiles.yml file, as inputs. Note that this setup assumes that you have a single repo that contains both your dbt project and Airflow code.
  • By providing a "dbt_tag" parameter, you can select a subset of models to run. This means you can specify multiple DAGs for different subsets of the dbt models, for example to run them on different schedules, as described in Part 2 of our blog series.
  • The utility returns a task group containing all dbt run tasks for the models in the specified dbt DAG, and optionally another task group for all test tasks.

When used as shown in the sample code below, the utility provides a shortcut to creating Airflow task groups for dbt models. Note that this code snippet only shows part of the DAG file; you can find the whole file in the demo repo.

with dag:

start_empty = EmptyOperator(task_id='start')

dbt_seed = BashOperator(
task_id='dbt_seed',
bash_command=f'dbt {DBT_GLOBAL_CLI_FLAGS} seed --profiles-dir {DBT_PROJECT_DIR} --project-dir {DBT_PROJECT_DIR}'
)
end_empty = EmptyOperator(task_id='end')

dag_parser = DbtDagParser(dag=dag,
dbt_global_cli_flags=DBT_GLOBAL_CLI_FLAGS,
dbt_project_dir=DBT_PROJECT_DIR,
dbt_profiles_dir=DBT_PROJECT_DIR,
dbt_target=DBT_TARGET
)
dbt_run_group = dag_parser.get_dbt_run_group()
dbt_test_group = dag_parser.get_dbt_test_group()

start_empty >> dbt_seed >> dbt_run_group >> dbt_test_group >> end_empty

Using the jaffleshop demo dbt project, the parser creates the following DAG including two task groups for the dbt_run and dbt_test tasks:

One important fact to note here is that the DbtDagParser does not include a dbt compile step that updates the manifest.json file. Since the Airflow Scheduler parses the DAG file periodically, having a compile step as part of the DAG creation could incur some unnecessary load for the scheduler. Astronomer recommends adding a dbt compile step either as part of a CI/CD pipeline, or as part of a pipeline run in production before the Airflow DAG is run.

With regards to the dbt test runs:

  • The test runs are optional. You can simply skip the tests by not using getdbttest_group().
  • The dbt test task group depends entirely on the dbt run group. In this example, the DAG will run all models first, then all tests.

Conclusion

To recap, in this guide you have learned about dbt Cloud and dbt Core, how to create and productionize dbt tasks in Airflow, and how to automatically create dbt Core tasks based on a manifest. For a more detailed discussion on trade-offs, limitations, and adding dbt Core or dbt Cloud to a full ELT pipeline, see our blog posts. To see more examples of how to use dbt and Airflow to build pipelines, check out our dbt DAGs on the Registry.