Skip to main content

Use Airflow templates

Templating allows you to pass dynamic information into task instances at runtime. For example, you can run the following command to print the day of the week every time you run a task:

BashOperator(
task_id="print_day_of_week",
bash_command="echo Today is {{ execution_date.format('dddd') }}",
)

In this example, the value in the double curly braces {{ }} is the templated code that is evaluated at runtime. If you execute this code on a Wednesday, the BashOperator prints Today is Wednesday. Templates have numerous applications. For example, you can use templating to create a new directory named after a task's execution date for storing daily data (/data/path/20210824). Alternatively, you can select a specific partition (/data/path/yyyy=2021/mm=08/dd=24) so that only the relevant data for a given execution date is scanned.

Airflow leverages Jinja, a Python templating framework, as its templating engine. In this guide, you'll learn the following:

  • How to apply Jinja templates in your code.
  • Which variables and functions are available when templating.
  • Which operator fields can be templated and which cannot.
  • How to validate templates.
  • How to apply custom variables and functions when templating.
  • How to render templates to strings and native Python code.
Other ways to learn

There are multiple resources for learning about this topic. See also:

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

Templating variables in Airflow

Templating in Airflow works the same as Jinja templating in Python. You enclose the code you want evaluated between double curly braces, and the expression is evaluated at runtime.

Some of the most commonly used Airflow variables that you can use in templates are:

  • {{ ds }}: The DAG Run’s logical date as YYYY-MM-DD.
  • {{ ds_nodash }}: The DAG run’s logical date as YYYYMMDD.
  • {{ data_interval_start }}: The start of the data interval.
  • {{ data_interval_end }}: The end of the data interval.

For a complete list of the available variables, see the Airflow Templates reference.

Templateable fields and scripts

Templates cannot be applied to all arguments of an operator. Two attributes in the BaseOperator define where you can use templated values:

  • template_fields: Defines which operator arguments can use templated values.
  • template_ext: Defines which file extensions can use templated values.

The following example shows a simplified version of the BashOperator:

class BashOperator(BaseOperator):
template_fields = ('bash_command', 'env') # defines which fields are templateable
template_ext = ('.sh', '.bash') # defines which file extensions are templateable

def __init__(
self,
*,
bash_command,
env: None,
output_encoding: 'utf-8',
**kwargs,
):
super().__init__(**kwargs)
self.bash_command = bash_command # templateable (can also give path to .sh or .bash script)
self.env = env # templateable
self.output_encoding = output_encoding # not templateable

The template_fields attribute holds a list of attributes that can use templated values. You can also find this list in the Airflow documentation or in the Airflow UI as shown in the following image:

Rendered Template view

template_ext contains a list of file extensions that can be read and templated at runtime. For example, instead of providing a Bash command to bash_command, you could provide a .sh script that contains a templated value:

run_this = BashOperator(
task_id="run_this",
bash_command="script.sh", # .sh extension can be read and templated
)

The BashOperator takes the contents of the following script, templates it, and executes it:

# script.sh
echo "Today is {{ execution_date.format('dddd') }}"

Templating from files speeds development because an integrated development environment (IDE) can apply language-specific syntax highlighting on the script. This wouldn't be possible if your script is defined as a big string of Airflow code.

By default, Airflow searches for the location of your scripts relative to the directory the DAG file is defined in. So, if your DAG is stored in /path/to/dag.py and your script is stored in /path/to/scripts/script.sh, you would update the value of bash_command in the previous example to scripts/script.sh.

Alternatively, you can set a base path for templates at the DAG-level with the template_searchpath argument. For example, the following DAG would look for script.sh at /tmp/script.sh:

@dag(..., template_searchpath="/tmp")
def my_dag():
run_this = BashOperator(task_id="run_this", bash_command="script.sh")

Disable templating

As of Airflow 2.8 it is possible to use a wrapper class to disable templating for the input to a templatable field without needing to modify the operator itself. This is useful when you want to pass a string that contains Jinja syntax to an operator without it being rendered. For example, you may want to pass a Jinja template to a BashOperator that will not be rendered. This can be achieved by wrapping the string into the literal function:

from airflow.utils.template import literal

BashOperator(
task_id="use_literal_wrapper_to_ignore_jinja_template",
bash_command=literal("echo {{ params.the_best_number }}"),
)

The code above will print {{ params.the_best_number }} to the logs instead of showing the rendered value of params.the_best_number.

Validate templates

The output of templates can be checked in both the Airflow UI and Airflow CLI. One advantage of the Airflow CLI is that you don't need to run any tasks before seeing the result.

The Airflow CLI command airflow tasks render renders all templateable attributes of a given task. Given a dag_id, task_id, and random execution_date, the command output is similar to the following example:

$ airflow tasks render example_dag run_this 2021-01-01

# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "Today is Friday"

# ----------------------------------------------------------
# property: env
# ----------------------------------------------------------
None

For this command to work, Airflow needs access to a metadata database. To set up a local SQLite database, run the following commands:

cd <your-project-directory>
export AIRFLOW_HOME=$(pwd)
airflow db migrate # generates airflow.db, airflow.cfg, and webserver_config.py in your project dir
# note that in Airflow versions pre-2.7 you'll need to use `airflow db init` instead

# airflow tasks render [dag_id] [task_id] [execution_date]

If you use the Astro CLI, a postgres metadata database is automatically configured for you after running astro dev start in your project directory. From here, you can run astro dev run tasks render <parameters> to test your templated values.

For most templates, this is sufficient. However, if an external system such as a variable in your production Airflow metadata database is reached by the templating logic, you must have connectivity to it.

To view the result of templated attributes after running a task in the Airflow UI, click a task and then click Rendered as shown in the following image:

Rendered button in the task instance popup

The Rendered Template view and the output of the templated attributes are shown in the following image:

Rendered Template view

Macros: using custom functions and variables in templates

As discussed previously, there are several variables available during templating. A Jinja environment and Airflow runtime are different. You can view a Jinja environment as a very stripped-down Python environment. That, among other things, means modules cannot be imported. For example, this command won't work in a Jinja template:

from datetime import datetime

BashOperator(
task_id="print_now",
# raises jinja2.exceptions.UndefinedError: 'datetime' is undefined
bash_command="echo It is currently {{ datetime.now() }}",
)

However, it is possible to inject functions into your Jinja environment. In Airflow, several standard Python modules are injected by default for templating, under the name macros. For example, the previous code example can be updated to use macros.datetime:

BashOperator(
task_id="print_now",
# It is currently 2021-08-30 13:51:55.820299
bash_command="echo It is currently {{ macros.datetime.now() }}",
)

Airflow includes some pre-injected functions out of the box for you to use in your templates. See Airflow documentation for a list of available functions. You can also load information in JSON format using "{{ macros.json.loads(...) }}" and information in YAML format using "{{ macros.yaml.safe_load(...) }}". Besides pre-injected functions, you can also use self-defined variables and functions in your templates. Airflow provides a convenient way to inject these into the Jinja environment. In the following example, a function is added to the DAG to print the number of days since May 1st, 2015:

def days_to_now(starting_date):
return (datetime.now() - starting_date).days

To use this inside a Jinja template, you can pass a dict to user_defined_macros in the DAG. For example:

def days_to_now(starting_date):
return (datetime.now() - starting_date).days


@dag(
start_date=datetime(2021, 1, 1),
schedule=None,
user_defined_macros={
"starting_date": datetime(2015, 5, 1), # Macro can be a variable
"days_to_now": days_to_now, # Macro can also be a function
},
)
def demo_template():
print_days = BashOperator(
task_id="print_days",
# Call user defined macros
bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}",
)
# Days since 2015-05-01 00:00:00 is 2313


demo_template():

It's also possible to inject functions as Jinja filters using user_defined_filters. You can use filters as pipe-operations. The following example completes the same work as the previous example, only this time filters are used:

@dag(
start_date=datetime(2021, 1, 1),
schedule=None,
# Set user_defined_filters to use function as pipe-operation
user_defined_filters={"days_to_now": days_to_now},
user_defined_macros={"starting_date": datetime(2015, 5, 1)},
)
def bash_script_template():
print_days = BashOperator(
task_id="print_days",
# Pipe value to function
bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}",
)
# Days since 2015-05-01 00:00:00 is 2313


bash_script_template()

Functions injected with user_defined_filters and user_defined_macros are both usable in the Jinja environment. While they achieve the same result, Astronomer recommends using filters when you need to import multiple custom functions because the filter formatting improves the readability of your code. You can see this when comparing the two techniques side-to-side:

"{{ name | striptags | title }}"  # chained filters are read naturally from left to right
"{{ title(striptags(name)) }}" # multiple functions are more difficult to interpret because reading right to left

Render native Python code

By default, Jinja templates always render to Python strings. Sometimes it's desirable to render templates to native Python code. When the code you're calling doesn't work with strings, it can cause issues. For example:

def sum_numbers(*args):
total = 0
for val in args:
total += val
return total

sum_numbers(1, 2, 3)
# returns 6
sum_numbers("1", "2", "3")
# TypeError: unsupported operand type(s) for +=: 'int' and 'str'

Consider a scenario where you're passing a list of values to this function by triggering a DAG with a config that holds some numbers:

@dag(
start_date=datetime.datetime(2021, 1, 1),
schedule=None,
catchup=False
)
def failing_template():
PythonOperator(
task_id="sumnumbers",
python_callable=sum_numbers,
op_args="{{ dag_run.conf['numbers'] }}",
)


failing_template()

You would trigger the DAG with the following JSON to the DAG run configuration:

{"numbers": [1,2,3]}

The rendered value is a string. Since the sum_numbers function unpacks the given string, it ends up trying to add up every character in the string:

('[', '1', ',', ' ', '2', ',', ' ', '3', ']')

This is not going to work, so you must tell Jinja to return a native Python list instead of a string. Jinja supports this with Environments. The default Jinja environment outputs strings, but you can configure a NativeEnvironment to render templates as native Python code.

Support for Jinja's NativeEnvironment was added in Airflow 2.1.0 with the render_template_as_native_obj argument on the DAG class. This argument takes a boolean value which determines whether to render templates with Jinja's default Environment or NativeEnvironment. For example:

def sum_numbers(*args):
total = 0
for val in args:
total += val
return total


@dag(
dag_id="native_templating",
start_date=datetime.datetime(2021, 1, 1),
schedule=None,
# Render templates using Jinja NativeEnvironment
render_template_as_native_obj=True,
)
def native_templating()
sumnumbers = PythonOperator(
task_id="sumnumbers",
python_callable=sum_numbers,
op_args="{{ dag_run.conf['numbers'] }}",
)

native_templating()

Passing the same JSON configuration {"numbers": [1,2,3]} now renders a list of integers which the sum_numbers function processes correctly:

[2021-08-26 11:53:12,872] {python.py:151} INFO - Done. Returned value was: 6

The Jinja environment must be configured on the DAG-level. This means that all tasks in a DAG render either using the default Jinja environment or using the NativeEnvironment.

Was this page helpful?

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.