Skip to main content

Datasets and data-aware scheduling in Airflow

Datasets and data-aware scheduling were made available in Airflow 2.4. DAGs that access the same data now have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.

Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and an analytics team with a DAG that analyses the dataset. Using datasets, the data analytics DAG runs only when the data engineering team's DAG publishes the dataset.

In this guide, you'll learn about datasets in Airflow and how to use them to implement triggering of DAGs based on dataset updates. You'll also learn how datasets work with the Astro Python SDK.

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

Dataset concepts

You can define datasets in your Airflow environment and use them to create dependencies between DAGs. To define a dataset, instantiate the Dataset class and provide a string to identify the location of the dataset. This string must be in the form of a valid Uniform Resource Identifier (URI).

In Airflow 2.4, the URI is not used to connect to an external system and there is no awareness of the content or location of the dataset. However, using this naming convention helps you to easily identify the datasets that your DAG accesses and ensures compatibility with future Airflow features.

The dataset URI is saved as plain text, so it is recommended that you hide sensitive values using environment variables or a secrets backend.

You can reference the dataset in a task by passing it to the task's outlets parameter. outlets is part of the BaseOperator, so it's available to every Airflow operator.

When you define a task's outlets parameter, Airflow labels the task as a producer task that updates the datasets. It is up to you to determine which tasks should be considered producer tasks for a dataset. As long as a task has an outlet dataset, Airflow considers it a producer task even if that task doesn't operate on the referenced dataset. In the following example, the write_instructions_to_file and write_info_to_file are both producer tasks because they have defined outlets.

from pendulum import datetime

from airflow import DAG, Dataset
from airflow.decorators import task

API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")

with DAG(
dag_id="datasets_producer_dag",
start_date=datetime(2022, 10, 1, tz="UTC"),
schedule=None,
catchup=False,
):

@task
def get_cocktail(api):
import requests

r = requests.get(api)
return r.json()

@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"

f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()

@task(outlets=[INFO])
def write_info_to_file(response):
import time

time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()

cocktail = get_cocktail(api=API)

write_instructions_to_file(cocktail)
write_info_to_file(cocktail)

After a dataset is defined in one or more producer tasks, consumer DAGs in your Airflow environment listen to the producer tasks and run whenever the task completes, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS and INFO datasets are updated, you define the DAG's schedule using the names of the datasets.

Any task that is scheduled with a dataset is considered a consumer task even if that task doesn't consume the referenced dataset. In other words, it is up to you as the DAG author to correctly reference and use the dataset that the consumer DAG is scheduled on.

from pendulum import datetime

from airflow import DAG, Dataset
from airflow.decorators import task

INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")

with DAG(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1, tz="UTC"),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
tags=["datasets", "cross-DAG dependencies"],
catchup=False,
):

@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)

return [item for sublist in cocktail for item in sublist]

read_about_cocktail()

Any number of datasets can be provided to the schedule parameter as a list. The DAG is triggered after all of the datasets have received at least one update due to a producing task completing successfully.

When you work with datasets, keep the following considerations in mind:

  • Datasets can only be used by DAGs in the same Airflow environment.
  • Airflow monitors datasets only within the context of DAGs and tasks. It does not monitor updates to datasets that occur outside of Airflow.
  • Consumer DAGs that are scheduled on a dataset are triggered every time a task that updates that dataset completes successfully. For example, if task1 and task2 both produce dataset_a, a consumer DAG of dataset_a runs twice - first when task1 completes, and again when task2 completes.
  • Consumer DAGs scheduled on a dataset are triggered as soon as the first task with that dataset as an outlet finishes, even if there are downstream producer tasks that also operate on the dataset.
  • Scheduling a DAG on a dataset update cannot currently be combined with any other type of schedule. For example, you can't schedule a DAG on an update to a dataset and a timetable.

For more information about datasets, see Data-aware scheduling.

The Datasets tab, and the DAG Dependencies view in the Airflow UI give you observability for datasets and data dependencies in the DAG's schedule.

On the DAGs view, you can see that your dataset_downstream_1_2 DAG is scheduled on two producer datasets (one in dataset_upstream1 and dataset_upstream2), and its next run is pending one dataset update. At this point the dataset_upstream DAG has run and updated its dataset, but the dataset_upstream2 DAG has not.

DAGs View

The Datasets tab shows a list of all datasets in your Airflow environment and a graph showing how your DAGs and datasets are connected. You can filter the lists of Datasets by recent updates.

Datasets View

Click one of the datasets to display a list of task instances that updated the dataset and a highlighted view of that dataset and its connections on the graph.

Datasets Highlight

The DAG Dependencies view (found under the Browse tab) shows a graph of all dependencies between DAGs (in green) and datasets (in orange) in your Airflow environment.

DAG Dependencies View

Datasets with the Astro Python SDK

If you are using the Astro Python SDK version 1.1 or later, you do not need to make any code updates to use datasets. Datasets are automatically registered for any functions with output tables and you do not need to define any outlet parameters.

The following example DAG results in three registered datasets: one for each load_file function and one for the resulting data from the transform function.

from pendulum import datetime
from airflow.decorators import dag
from astro.files import File
from astro.sql import (
load_file,
transform,
)
from astro.sql.table import Table

SNOWFLAKE_CONN_ID = "snowflake_conn"
AWS_CONN_ID = "aws_conn"


# The first transformation combines data from the two source tables
@transform
def extract_data(homes1: Table, homes2: Table):
return """
SELECT *
FROM {{homes1}}
UNION
SELECT *
FROM {{homes2}}
"""


@dag(start_date=datetime(2021, 12, 1), schedule="@daily", catchup=False)
def example_sdk_datasets():
# Initial load of homes data csv's from S3 into Snowflake
homes_data1 = load_file(
task_id="load_homes1",
input_file=File(path="s3://airflow-kenten/homes1.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES1", conn_id=SNOWFLAKE_CONN_ID),
if_exists="replace",
)

homes_data2 = load_file(
task_id="load_homes2",
input_file=File(path="s3://airflow-kenten/homes2.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES2", conn_id=SNOWFLAKE_CONN_ID),
if_exists="replace",
)

# Define task dependencies
extracted_data = extract_data(
homes1=homes_data1,
homes2=homes_data2,
output_table=Table(name="combined_homes_data"),
)


example_sdk_datasets = example_sdk_datasets()

SDK datasets

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.