Manage Airflow DAG notifications
When you're using a data orchestration tool, how do you know when something has gone wrong? Airflow users can check the Airflow UI to determine the status of their DAGs, but this is an inefficient way of managing errors systematically, especially if certain failures need to be addressed promptly or by multiple team members. Fortunately, Airflow has built-in notification mechanisms that can be leveraged to configure error notifications in a way that works for your organization.
In this guide, you'll learn the basics of Airflow notifications and how to set up common notification mechanisms including email, pre-built and custom notifiers, and SLAs. You'll also learn how to leverage Airflow alerting when using Astro.
There are multiple resources for learning about this topic. See also:
Assumed knowledge
To get the most out of this guide, you should have an understanding of:
- Airflow DAGs. See Introduction to Airflow DAGs.
- Task dependencies. See Managing dependencies in Apache Airflow.
Airflow notification types
Airflow has a few options for notifying you on the status of your DAGs and tasks:
- Email notifications: Most Airflow operators have parameters for setting email alerts in case of a task failure or retry. Use email alerts in production pipelines where task failures or retries need immediate attention by a data professional.
- Airflow callbacks: Callback parameters (
*_callback
) exist both at the task and at the DAG level. You can pass any callable or Airflow notifier to these parameters, and Airflow will run them in the case of specific events, such as a task failure. Airflow callbacks offer a lot of flexibility to execute any code based on the state of a task or DAG. They are often used to define actions for specific instances of task failures or successes. - Airflow notifiers: Notifiers are custom classes for Airflow callbacks that can be easily reused and standardized. Provider packages can ship pre-built notifiers like the SlackNotifier. Notifiers can be provided to callback parameters to define which task or DAG state should cause them to be executed. A common use case for notifiers is standardizing actions for task failures across several Airflow instances.
- Airflow service-level agreements (SLAs): SLAs define the expected time it takes for a specific task to complete. If an SLA is missed, the callable or notifier provided to the
sla_miss_callback
parameter is executed. If you configure an SMTP connection, an email will be sent as well. Since an SLA miss does not stop a task from running, this type of notification is used when intervention is needed if a specific task is taking longer than expected.
Most notifications can be set at the level of both a DAG and a task. Setting a parameter within a DAG's default_args
dictionary will apply it to all tasks in the DAG. You can see examples of this in the set DAG and task-level callbacks section.
The OSS notification library Apprise contains modules to send notifications to many services. You can use Apprise with Airflow by installing the Apprise Airflow provider which contains the AppriseNotifier. See the Apprise Airflow provider documentation for more information and examples.
Choose a notification type
It's best practice to use pre-built solutions whenever possible. This approach makes your DAGs more robust by reducing custom code and standardizing notifications across different Airflow environments.
If you want to deliver alerts to email, use email notifications for task failures or retries and the SmtpNotifier for other events such as successful task runs.
If a notifier class exists for your use case, you should always use these methods instead of a custom callback. See the Airflow documentation for an up-to-date list of available Notifiers and the Apprise wiki for a list of services the Apprise notifier can connect to.
A notifier can be provided to any callback parameter (*callback
). Only use custom Airflow callbacks when no notifier is available for your use case.
To execute custom code based on events happening anywhere in your Airflow environment, for example whenever any dataset is updated or any task instance fails, you can use Airflow listeners. See the Use a listener to send a Slack notification when a Dataset is updated tutorial for an example.
Email notifications
If you have an SMTP connection configured in Airflow, you can use the email
, email_on_failure
, and email_on_retry
task parameters to send notification emails from Airflow.
- TaskFlow API
- Traditional syntax
@task(
email=["noreply@astronomer.io", "noreply2@astronomer.io"],
email_on_failure=True,
email_on_retry=True
)
def t1():
return "hello"
def say_hello():
return "hello"
t1 = PythonOperator(
task_id="t1",
python_callable=say_hello,
email=["noreply@astronomer.io", "noreply2@astronomer.io"],
email_on_failure=True,
email_on_retry=True
)
You can also configure email notifications for all tasks in a DAG by defining the configurations in the default_args
parameter.
default_args = {
"email": ["noreply@astronomer.io"],
"email_on_failure": True,
"email_on_retry": True,
}
@dag(
start_date=datetime(2023, 4, 25),
schedule="@daily",
catchup=None,
default_args=default_args
)
To allow Airflow to send emails, you have to provide values to the SMTP section of your airflow.cfg
similar to this example:
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = your-smtp-host.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user =
# smtp_password =
smtp_port = 587
smtp_mail_from = noreply@astronomer.io
You can also set these values using environment variables. In this case, all parameters are preceded by AIRFLOW__SMTP__
. For example, smtp_host
can be specified by setting the AIRFLOW__SMTP__SMTP_HOST
variable. For more on Airflow email configuration, see Email Configuration.
If you are using Astro, use environment variables to set up SMTP because the airflow.cfg
cannot be directly edited.
Custom email notifications
By default, email notifications are sent in a standard format that are defined in the email_alert()
and get_email_subject_content()
methods of the TaskInstance
class:
default_subject = 'Airflow alert: {{ti}}'
# For reporting purposes, the report is based on 1-indexed,
# not 0-indexed lists (i.e. Try 1 instead of
# Try 0 for the first attempt).
default_html_content = (
'Try {{try_number}} out of {{max_tries + 1}}<br>'
'Exception:<br>{{exception_html}}<br>'
'Log: <a href="{{ti.log_url}}">Link</a><br>'
'Host: {{ti.hostname}}<br>'
'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'
)
To see the full method, see the source code here.
You can customize this content by setting the subject_template
and/or html_content_template
variables in your airflow.cfg
with the path to your jinja template files for subject and content respectively.
If you want to send emails out on a more customizable basis, you can also use Airflow's callback functions to run custom functions that send email notifications. For example, if you want to send emails for successful task runs, you can provide an email function to the on_success_callback
parameter:
from airflow.utils.email import send_email
def success_email_function(context):
dag_run = context.get("dag_run")
msg = "DAG ran successfully"
subject = f"DAG {dag_run} has completed"
send_email(to=your_emails, subject=subject, html_content=msg)
@dag(
start_date=datetime(2023, 4, 26),
schedule="@daily",
catchup=False,
on_success_callback=success_email_function
)
Airflow callbacks
In Airflow you can define actions to be taken due to different DAG or task states using *_callback
parameters:
on_success_callback
: Invoked when a task or DAG succeeds.on_failure_callback
: Invoked when a task or DAG fails.on_execute_callback
: Invoked right before a task begins executing. This callback only exists at the task level.on_retry_callback
: Invoked when a task is retried. This callback only exists at the task level.sla_miss_callback
: Invoked when a task or DAG misses its defined Service Level Agreement (SLA). This callback is defined at the DAG level for DAGs with defined SLAs and will be applied to every task.
You can provide any Python callable to the *_callback
parameters. As of Airflow 2.6, you can also use notifiers for your callbacks, and you can provide several callback items to the same callback parameter in a list.