Deferrable operators
With Airflow 2.2 and later, you can use deferrable operators to run tasks in your Airflow environment. These operators leverage the Python asyncio library to efficiently run tasks waiting for an external resource to finish. This frees up your workers and allows you to utilize resources more effectively. In this guide, you'll review deferrable operator concepts and learn which operators are deferrable.
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Airflow operators. See Operators 101.
- Airflow sensors. See Sensors 101.
Terms and concepts
Review the following terms and concepts to gain a better understanding of deferrable operator functionality:
- asyncio: A Python library used as the foundation for multiple asynchronous frameworks. This library is core to deferrable operator functionality, and is used when writing triggers.
- Triggers: Small, asynchronous sections of Python code. Due to their asynchronous nature, they coexist efficiently in a single process known as the triggerer.
- Triggerer: An Airflow service similar to a scheduler or a worker that runs an asyncio event loop in your Airflow environment. Running a triggerer is essential for using deferrable operators.
- Deferred: An Airflow task state indicating that a task has paused its execution, released the worker slot, and submitted a trigger to be picked up by the triggerer process.
The terms deferrable, async, and asynchronous are used interchangeably and have the same meaning.
With traditional operators, a task submits a job to an external system such as a Spark cluster and then polls the job status until it is completed. Although the task isn't doing significant work, it still occupies a worker slot during the polling process. As worker slots are occupied, tasks are queued and start times are delayed. The following image illustrates this process:
With deferrable operators, worker slots are released when a task is polling for job status. When the task is deferred, the polling process is offloaded as a trigger to the triggerer, and the worker slot becomes available. The triggerer can run many asynchronous polling tasks concurrently, and this prevents polling tasks from occupying your worker resources. When the terminal status for the job is received, the task resumes, taking a worker slot while it finishes. The following image illustrates this process:
Use deferrable operators
Deferrable operators should be used whenever you have tasks that occupy a worker slot while polling for a condition in an external system. For example, using deferrable operators for sensor tasks can provide efficiency gains and reduce operational costs. Smart Sensors were deprecated in Airflow 2.2.4, and were removed in Airflow 2.4.0. Use deferrable operators instead of Smart Sensors because they provide greater functionality and they are supported by Airflow.
To use a deferrable version of a core Airflow operator in your DAG, you only need to replace the import statement for the existing operator. For example, Airflow's TimeSensorAsync
is a replacement of the non-deferrable TimeSensor
(source). To use TimeSensorAsync
, remove your existing import
and replace it with the following:
# Remove this import:
# from airflow.operators.sensors import TimeSensor
# Replace with:
from airflow.sensors.time_sensor import TimeSensorAsync as TimeSensor
If you are using a deferrable operator that is part of the Astronomer Providers package, you will also need to ensure that package is installed in your Airflow environment. For example, to use the Snowflake deferrable operator:
Add the following to your
requirements.txt
file:astronomer-providers[snowflake]
Update the import statement in your DAG:
# Remove this import:
# from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
# Replace with:
from astronomer.providers.snowflake.operators.snowflake import (
SnowflakeOperatorAsync as SnowflakeOperator,
)
Note that importing the asynchronous operator using the alias of the analogous traditional operator (e.g. import SnowflakeOperatorAsync as SnowflakeOperator
) is simply to make updating existing DAGs easier. This is not required, and may not be preferrable when authoring a new DAG.
There are numerous benefits to using deferrable operators including:
- Reduced resource consumption: Depending on the available resources and the workload of your triggers, you can run hundreds to thousands of deferred tasks in a single triggerer process. This can lead to a reduction in the number of workers needed to run tasks during periods of high concurrency. With less workers needed, you are able to scale down the underlying infrastructure of your Airflow environment.
- Resiliency against restarts: Triggers are stateless by design. This means your deferred tasks are not set to a failure state if a triggerer needs to be restarted due to a deployment or infrastructure issue. When a triggerer is back up and running in your environment, your deferred tasks will resume.
- Gateway to event-based DAGs: The presence of
asyncio
in core Airflow is a potential foundation for event-triggered DAGs.
Some additional notes about using deferrable operators:
- If you want to replace non-deferrable operators in an existing project with deferrable operators, Astronomer recommends importing the deferrable operator class as its non-deferrable class name. If you don't include this part of the import statement, you need to replace all instances of non-deferrable operators in your DAGs. In the above example, that would require replacing all instances of
TimeSensor
withTimeSensorAsync
. - Currently, not all operators have a deferrable version. There are a few open source deferrable operators, plus additional operators designed and maintained by Astronomer.
- If you're interested in the deferrable version of an operator that is not generally available, you can write your own and contribute it back to the open source project.
- There are some use cases where it can be more appropriate to use a traditional sensor instead of a deferrable operator. For example, if your task needs to wait only a few seconds for a condition to be met, Astronomer recommends using a Sensor in
reschedule
mode to avoid unnecessary resource overhead.
Available deferrable operators
The following deferrable operators are installed by default in Airflow:
You can use additional deferrable operators built and maintained by Astronomer by installing the open source Astronomer Providers Python package. The operators and sensors in this package are deferrable versions of commonly used operators. For example, the package includes:
SnowflakeOperatorAsync
DatabricksSubmitRunOperatorAsync
HttpSensorAsync
For a full list of deferrable operators and sensors available in the astronomer-providers
package, see Changelog. You can also create your own deferrable operator for your use case.
Example workflow
The following example DAG is scheduled to run every minute between its start_date
and its end_date
. Every DAG run contains one sensor task that will potentially take up to 20 minutes to complete.
from pendulum import datetime
from airflow import DAG
from airflow.sensors.date_time import DateTimeSensor
with DAG(
"sync_dag_2",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 19),
schedule="* * * * *",
catchup=True,
) as dag:
sync_sensor = DateTimeSensor(
task_id="sync_task",
target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=20) }}""",
)
Using DateTimeSensor
, one worker slot is taken up by every sensor that runs. By using the deferrable version of this sensor, DateTimeSensorAsync
, you can achieve full concurrency while freeing up your workers to complete additional tasks across your Airflow environment.
In the following screenshot, running the DAG produces 16 running task instances, each containing one active DateTimeSensor
taking up one worker slot.
Because Airflow imposes default limits on the number of active runs of the same DAG or number of active tasks in a DAG across all runs, you'll have to scale up Airflow to concurrently run any other DAGs and tasks as described in the Scaling Airflow to optimize performance guide.
Switching out the DateTimeSensor
for DateTimeSensorAsync
will create 16 running DAG instances, but the tasks for these DAGs are in a deferred state which does not take up a worker slot. The only difference in the DAG code is using the deferrable operator DateTimeSensorAsync
over DateTimeSensor
:
from pendulum import datetime
from airflow import DAG
from airflow.sensors.date_time import DateTimeSensorAsync
with DAG(
"async_dag_2",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 19),
schedule="* * * * *",
catchup=True,
) as dag:
async_sensor = DateTimeSensorAsync(
task_id="async_task",
target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=20) }}""",
)
In the following screenshot, all tasks are shown in a deferred (violet) state. Tasks in other DAGs can use the available worker slots, making the deferrable operator more cost and time-efficient.
Run deferrable tasks
To start a triggerer process, run airflow triggerer
in your Airflow environment. Your output should be similar to the following image.
If you are running Airflow on Astro, the triggerer runs automatically if you are on Astro Runtime 4.0 and later. If you are using Astronomer Software 0.26 and later, you can add a triggerer to an Airflow 2.2 and later deployment in the Deployment Settings tab. See Configure a Deployment on Astronomer Software - Triggerer to configure the triggerer.
As tasks are raised into a deferred state, triggers are registered in the triggerer. You can set the number of concurrent triggers that can run in a single triggerer process with the default_capacity
configuration setting in Airflow. This can also be set with the AIRFLOW__TRIGGERER__DEFAULT_CAPACITY
environment variable. The default value is 1,000
.
High availability
Triggers are designed to be highly available. You can implement this by starting multiple triggerer processes. Similar to the HA scheduler introduced in Airflow 2.0, Airflow ensures that they co-exist with correct locking and high availability. See High Availability for more information on this topic.
Create a deferrable operator
If you have an operator that would benefit from being asynchronous but does not yet exist in OSS Airflow or Astronomer Providers, you can create your own. See Writing Deferrable Operators.