Skip to main content

Datasets and data-aware scheduling in Airflow

Datasets and data-aware scheduling were made available in Airflow 2.4. DAGs that access the same data now have explicit, visible relationships, and DAGs can be scheduled based on updates to these datasets. This feature helps make Airflow data-aware and expands Airflow scheduling capabilities beyond time-based methods such as cron.

Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and an analytics team with a DAG that analyses the dataset. Using datasets, the data analytics DAG runs only when the data engineering team's DAG publishes the dataset.

In this guide, you'll learn about datasets in Airflow and how to use them to implement triggering of DAGs based on dataset updates. You'll also learn how datasets work with the Astro Python SDK.

info

Datasets are a separate feature from object storage, which allows you to interact with files in cloud and local object storage systems. To learn more about using Airflow to interact with files, see Use Airflow object storage to interact with cloud storage in an ML pipeline.

Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an existing knowledge of:

Why use datasets?

Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:

  • Standardize communication between teams. Datasets can function like an API to communicate when data in a specific location has been updated and is ready for use.
  • Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don't depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates a dataset.
  • Get better visibility into how your DAGs are connected and how they depend on data. The Datasets tab in the Airflow UI shows a graph of all dependencies between DAGs and datasets in your Airflow environment.
  • Reduce costs, because datasets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
Listening for dataset changes

As of Airflow 2.8, you can use listeners to enable Airflow to notify you when certain dataset events occur. There are two listener hooks for the following events:

  • on_dataset_created
  • on_dataset_changed

For examples, refer to our Create Airflow listeners tutorial.

Dataset concepts

You can define datasets in your Airflow environment and use them to create dependencies between DAGs. To define a dataset, instantiate the Dataset class and provide a string to identify the location of the dataset. This string must be in the form of a valid Uniform Resource Identifier (URI).

In Airflow 2.4, the URI is not used to connect to an external system and there is no awareness of the content or location of the dataset. However, using this naming convention helps you to easily identify the datasets that your DAG accesses and ensures compatibility with future Airflow features.

The dataset URI is saved as plain text, so it is recommended that you hide sensitive values using environment variables or a secrets backend.

You can reference the dataset in a task by passing it to the task's outlets parameter. outlets is part of the BaseOperator, so it's available to every Airflow operator.

When you define a task's outlets parameter, Airflow labels the task as a producer task that updates the datasets. It is up to you to determine which tasks should be considered producer tasks for a dataset. As long as a task has an outlet dataset, Airflow considers it a producer task even if that task doesn't operate on the referenced dataset. In the following example, the write_instructions_to_file and write_info_to_file are both producer tasks because they have defined outlets.

from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task

API = "https://www.thecocktaildb.com/api/json/v1/1/random.php"
INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")


@dag(
start_date=datetime(2022, 10, 1),
schedule=None,
catchup=False,
)
def datasets_producer_dag():
@task
def get_cocktail(api):
import requests

r = requests.get(api)
return r.json()

@task(outlets=[INSTRUCTIONS])
def write_instructions_to_file(response):
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_instructions = response["drinks"][0]["strInstructions"]
msg = f"See how to prepare {cocktail_name}: {cocktail_instructions}"

f = open("include/cocktail_instructions.txt", "a")
f.write(msg)
f.close()

@task(outlets=[INFO])
def write_info_to_file(response):
import time

time.sleep(30)
cocktail_name = response["drinks"][0]["strDrink"]
cocktail_category = response["drinks"][0]["strCategory"]
alcohol = response["drinks"][0]["strAlcoholic"]
msg = f"{cocktail_name} is a(n) {alcohol} cocktail from category {cocktail_category}."
f = open("include/cocktail_info.txt", "a")
f.write(msg)
f.close()

cocktail = get_cocktail(api=API)

write_instructions_to_file(cocktail)
write_info_to_file(cocktail)


datasets_producer_dag()

A consumer DAG runs whenever the dataset(s) it is scheduled on is updated by a producer task, rather than running on a time-based schedule. For example, if you have a DAG that should run when the INSTRUCTIONS and INFO datasets are updated, you define the DAG's schedule using the names of those two datasets.

Any DAG that is scheduled with a dataset is considered a consumer DAG even if that DAG doesn't actually access the referenced dataset. In other words, it's up to you as the DAG author to correctly reference and use datasets.

from pendulum import datetime
from airflow.datasets import Dataset
from airflow.decorators import dag, task

INSTRUCTIONS = Dataset("file://localhost/airflow/include/cocktail_instructions.txt")
INFO = Dataset("file://localhost/airflow/include/cocktail_info.txt")


@dag(
dag_id="datasets_consumer_dag",
start_date=datetime(2022, 10, 1),
schedule=[INSTRUCTIONS, INFO], # Scheduled on both Datasets
catchup=False,
)
def datasets_consumer_dag():
@task
def read_about_cocktail():
cocktail = []
for filename in ("info", "instructions"):
with open(f"include/cocktail_{filename}.txt", "r") as f:
contents = f.readlines()
cocktail.append(contents)

return [item for sublist in cocktail for item in sublist]

read_about_cocktail()


datasets_consumer_dag()

Any number of datasets can be provided to the schedule parameter as a list. The DAG is triggered after all of the datasets have received at least one update due to a producing task completing successfully.

When you work with datasets, keep the following considerations in mind:

  • Datasets can only be used by DAGs in the same Airflow environment.
  • Airflow monitors datasets only within the context of DAGs and tasks. It does not monitor updates to datasets that occur outside of Airflow.
  • Consumer DAGs that are scheduled on a dataset are triggered every time a task that updates that dataset completes successfully. For example, if task1 and task2 both produce dataset_a, a consumer DAG of dataset_a runs twice - first when task1 completes, and again when task2 completes.
  • Consumer DAGs scheduled on a dataset are triggered as soon as the first task with that dataset as an outlet finishes, even if there are downstream producer tasks that also operate on the dataset.
  • Scheduling a DAG on a dataset update cannot currently be combined with any other type of schedule. For example, you can't schedule a DAG on an update to a dataset and a timetable.

For more information about datasets, see Data-aware scheduling.

The Datasets tab, and the DAG Dependencies view in the Airflow UI give you observability for datasets and data dependencies in the DAG's schedule.

On the DAGs view, you can see that your dataset_downstream_1_2 DAG is scheduled on two producer datasets (one in dataset_upstream1 and dataset_upstream2), and its next run is pending one dataset update. At this point the dataset_upstream DAG has run and updated its dataset, but the dataset_upstream2 DAG has not.

DAGs View

The Datasets tab shows a list of all datasets in your Airflow environment and a graph showing how your DAGs and datasets are connected. You can filter the lists of Datasets by recent updates.

Datasets View

Click one of the datasets to display a list of task instances that updated the dataset and a highlighted view of that dataset and its connections on the graph.

Datasets Highlight

The DAG Dependencies view (found under the Browse tab) shows a graph of all dependencies between DAGs (in green) and datasets (in orange) in your Airflow environment.

DAG Dependencies View

note

DAGs that are triggered by datasets do not have the concept of a data interval. If you need information about the triggering event in your downstream DAG, you can use the parameter triggering_dataset_events from the context. This parameter provides a list of all the triggering dataset events with parameters [timestamp, source_dag_id, source_task_id, source_run_id, source_map_index ].

Datasets with the Astro Python SDK

If you are using the Astro Python SDK version 1.1 or later, you do not need to make any code updates to use datasets. Datasets are automatically registered for any functions with output tables and you do not need to define any outlet parameters.

The following example DAG results in three registered datasets: one for each load_file function and one for the resulting data from the transform function.

from pendulum import datetime
from airflow.decorators import dag
from astro.files import File
from astro.sql import (
load_file,
transform,
)
from astro.sql.table import Table

SNOWFLAKE_CONN_ID = "snowflake_conn"
AWS_CONN_ID = "aws_conn"


# The first transformation combines data from the two source tables
@transform
def extract_data(homes1: Table, homes2: Table):
return """
SELECT *
FROM {{homes1}}
UNION
SELECT *
FROM {{homes2}}
"""


@dag(start_date=datetime(2021, 12, 1), schedule="@daily", catchup=False)
def example_sdk_datasets():
# Initial load of homes data csv's from S3 into Snowflake
homes_data1 = load_file(
task_id="load_homes1",
input_file=File(path="s3://airflow-kenten/homes1.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES1", conn_id=SNOWFLAKE_CONN_ID),
if_exists="replace",
)

homes_data2 = load_file(
task_id="load_homes2",
input_file=File(path="s3://airflow-kenten/homes2.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES2", conn_id=SNOWFLAKE_CONN_ID),
if_exists="replace",
)

# Define task dependencies
extracted_data = extract_data(
homes1=homes_data1,
homes2=homes_data2,
output_table=Table(name="combined_homes_data"),
)


example_sdk_datasets = example_sdk_datasets()

SDK datasets

Was this page helpful?

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.