Skip to main content

Run tasks in an isolated environment in Apache Airflow

It is very common to run a task with different dependencies than your Airflow environment. Your task might need a different Python version than core Airflow, or it has packages that conflict with your other tasks. In these cases, running tasks in an isolated environment can help manage dependency conflicts and enable compatibility with your execution environments.

In Airflow, you have several options for running custom Python code in isolated environments. This guide teaches you how to choose the right isolated environment option for your use case, implement different virtual environment operators and decorators, and access Airflow context and variables in isolated environments.

Other ways to learn

There are multiple resources for learning about this topic. See also:

info

This guide covers options to isolate individual tasks in Airflow. If you want to run all of your Airflow tasks in dedicated Kubernetes pods, consider using the Kubernetes Executor. Astronomer customers can set their Deployments to use the KubernetesExecutor in the Astro UI, see Manage Airflow executors on Astro.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

When to use isolated environments

There are two situations when you might want to run a task in an isolated environment:

  • Your task requires a different version of Python than your Airflow environment. Apache Airflow is compatible with and available in Python 3.8, 3.9, 3.10 and 3.11. The Astro Runtime has images available for all supported Python versions, so you can run Airflow inside Docker in a reproducible environment. See Prerequisites for more information.

  • Your task requires different versions of Python packages that conflict with the package versions installed in your Airflow environment. To know which Python packages are pinned to which versions within Airflow, you can retrieve the full list of constraints for each Airflow version by going to:

    https://raw.githubusercontent.com/apache/airflow/constraints-<AIRFLOW VERSION>/constraints-<PYTHON VERSION>.txt
Airflow Best Practice

Make sure to pin all package versions, both in your core Airflow environment (requirements.txt) and in your isolated environments. This helps you avoid unexpected behavior due to package updates that might create version conflicts.

Limitations

When creating isolated environments in Airflow, you might not be able to use common Airflow features or connect to your Airflow environment in the same way you would in a regular Airflow task.

Common limitations include:

Choosing an isolated environment option

Airflow provides several options for running tasks in isolated environments.

To run tasks in a dedicated Kubernetes Pod you can use:

To run tasks in a Python virtual environment you can use:

The virtual environment decorators have operator equivalents with the same functionality. Astronomer recommends using decorators where possible because they simplify the handling of XCom.

Graph of options for isolated environments in Airflow.

Which option you choose depends on your use case and the requirements of your task. The table below shows which decorators and operators are best for particular use cases.

Use CaseImplementation Options
Run a Python task in a K8s Pod@task.kubernetes,
KubernetesPodOperator
Run a Docker image without additional Python code in a K8s PodKubernetesPodOperator
Run a Python task in an existing (reusable) virtual environment@task.external_python,
ExternalPythonOperator
Run a Python task in a new virtual environment@task.virtualenv,
PythonVirtualenvOperator
Run branching code in an existing (reusable) virtual environment@task.branch_external_python, BranchExternalPythonOperator
Run branching code in a new virtual environment@task.branch_virtualenv, BranchPythonVirtualenvOperator
Install different packages for each run of a taskPythonVirtualenvOperator,
BranchPythonVirtualenvOperator

Another consideration when choosing an operator is the infrastructure you have available. Operators that run tasks in Kubernetes pods allow you to have full control over the environment and resources used, but they require a Kubernetes cluster. Operators that run tasks in Python virtual environments are easier to set up, but do not provide the same level of control over the environment and resources used.

RequirementsDecoratorsOperators
A Kubernetes cluster@task.kubernetesKubernetesPodOperator
A Docker image@task.kubernetes (with Python installed)KubernetesPodOperator (with or without Python installed)
A Python binary@task.external_python,
@task.branch_external_python,
@task.virtualenv (*),
@task.branch_virtualenv (*)
ExternalPythonOperator,
BranchExternalPythonOperator,
PythonVirtualenvOperator (*),
BranchPythonVirtualenvOperator (*)

*Only required if you need to use a different Python version than your Airflow environment.

External Python operator

The ExternalPython operator, @task.external_python decorator or ExternalPythonOperator, runs a Python function in an existing virtual Python environment, isolated from your Airflow environment. To use the @task.external_python decorator or the ExternalPythonOperator, you need to create a separate Python environment to reference. You can use any Python binary created by any means.

The easiest way to create a Python environment when using the Astro CLI is with the Astronomer PYENV BuildKit. The BuildKit can be used by adding a comment on the first line of the Dockerfile as shown in the following example. Adding this comment enables you to create virtual environments with the PYENV keyword.

# syntax=quay.io/astronomer/airflow-extensions:v1

FROM quay.io/astronomer/astro-runtime:10.3.0-python-3.11

# create a virtual environment for the ExternalPythonOperator and @task.external_python decorator
# using Python 3.9 and install the packages from epo_requirements.txt
PYENV 3.9 epo_pyenv epo_requirements.txt
note

To use the BuildKit, the Docker BuildKit Backend needs to be enabled. This is the default as of Docker Desktop version 23.0, but might need to be enabled manually in older versions of Docker.

You can add any Python packages to the virtual environment by putting them into a separate requirements file. In this example, by using the name epo_requirements.txt. Make sure to pin all package versions.

pandas==1.4.4
warning

Installing Airflow itself and Airflow provider packages in isolated environments can lead to unexpected behavior and is not recommended. If you need to use Airflow or Airflow provider modules inside your virtual environment, Astronomer recommends to choose the @task.virtualenv decorator or the PythonVirtualenvOperator. See Use Airflow packages in isolated environments.

After restarting your Airflow environment, you can use this Python binary by referencing the environment variable ASTRO_PYENV_<my-pyenv-name>. If you choose an alternative method to create you Python binary, you need to set the python parameter of the decorator or operator to the location of your Python binary.

To run any Python function in your virtual environment, use the @task.external_python decorator on it and set the python parameter to the location of your Python binary.

# from airflow.decorators import task
# import os

@task.external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
def my_isolated_task():
import pandas as pd
import sys
print(f"The python version in the virtual env is: {sys.version}")
print(f"The pandas version in the virtual env is: {pd.__version__}")
# your code to run in the isolated environment

To get a list of all parameters of the @task.external_python decorator / ExternalPythonOperator, see the Astronomer Registry.

Virtualenv operator

The Virtualenv operator (@task.virtualenv or PythonVirtualenvOperator) creates a new virtual environment each time the task runs. If you only specify different package versions and use the same Python version as your Airflow environment, you do not need to create or specify a Python binary.

warning

Installing Airflow itself and Airflow provider packages in isolated environments can lead to unexpected behavior and is generally not recommended. See Use Airflow packages in isolated environments.

Add the pinned versions of the packages to the requirements parameter of the @task.virtualenv decorator. The decorator creates a new virtual environment at runtime.

# from airflow.decorators import task

@task.virtualenv(requirements=["pandas==1.5.1"]) # add your requirements to the list
def my_isolated_task():
import pandas as pd
print(f"The pandas version in the virtual env is: {pd.__version__}")"
# your code to run in the isolated environment

Since the requirements parameter of the PythonVirtualenvOperator is templatable, you can use Jinja templating to pass information at runtime. For example, you can use a Jinja template to install a different version of pandas for each run of the task.

# from airflow.decorators import task
# from airflow.models.baseoperator import chain
# from airflow.operators.python import PythonVirtualenvOperator

@task
def get_pandas_version():
pandas_version = "1.5.1" # retrieve the pandas version according to your logic
return pandas_version

my_isolated_task = PythonVirtualenvOperator(
task_id="my_isolated_task",
python_callable=my_isolated_function,
requirements=[
"pandas=={{ ti.xcom_pull(task_ids='get_pandas_version') }}",
],
)

chain(get_pandas_version(), my_isolated_task)

If your task requires a different Python version than your Airflow environment, you need to install the Python version your task requires in your Airflow environment so the Virtualenv task can use it. Use the Astronomer PYENV BuildKit to install a different Python version in your Dockerfile.

# syntax=quay.io/astronomer/airflow-extensions:v1

FROM quay.io/astronomer/astro-runtime:10.3.0-python-3.11

PYENV 3.10 pyenv_3_10
note

To use the BuildKit, the Docker BuildKit Backend needs to be enabled. This is the default starting in Docker Desktop version 23.0, but might need to be enabled manually in older versions of Docker.

The Python version can be referenced directly using the python parameter of the decorator/operator.

# from airflow.decorators import task

@task.virtualenv(
requirements=["pandas==1.5.1"],
python_version="3.10", # specify the Python version
)
def my_isolated_task():
import pandas as pd
import sys
print(f"The python version in the virtual env is: {sys.version}")
print(f"The pandas version in the virtual env is: {pd.__version__}")
# your code to run in the isolated environment

To get a list of all parameters of the @task.virtualenv decorator or PythonVirtualenvOperator, see the Astronomer Registry.

Kubernetes pod operator

The Kubernetes operator, @task.kubernetes decorator or KubernetesPodOperator, runs an Airflow task in a dedicated Kubernetes pod. You can use the @task.kubernetes to run any custom Python code in a separate Kubernetes pod on a Docker image with Python installed, while the KubernetesPodOperator runs any existing Docker image.

To use the @task.kubernetes decorator or the KubernetesPodOperator, you need to provide a Docker image and have access to a Kubernetes cluster. The following example shows how to use the modules to run a task in a separate Kubernetes pod in the same namespace and Kubernetes cluster as your Airflow environment. For more information on how to use the KubernetesPodOperator, see Use the KubernetesPodOperator and Run the KubernetesPodOperator on Astro.

# from airflow.decorators import task
# from airflow.configuration import conf

# if you are running Airflow on Kubernetes, you can get
# the current namespace from the Airflow conf
namespace = conf.get("kubernetes", "NAMESPACE")

@task.kubernetes(
image="<YOUR IMAGE>",
in_cluster=True,
namespace=namespace,
name="<YOUR POD NAME>",
get_logs=True,
log_events_on_failure=True,
do_xcom_push=True,
)
def my_isolated_task(num: int):
return num + 1

Virtual branching operators

Virtual branching operators allow you to run conditional task logic in an isolated Python environment.

  • @task.branch_external_python decorator or BranchExternalPythonOperator: Run conditional task logic in an existing virtual Python environment.
  • @task.branch_virtualenv decorator or BranchPythonVirtualenvOperator: Run conditional task logic in a newly created virtual Python environment.

To run conditional task logic in an isolated environment, use the branching versions of the virtual environment decorators and operators. You can learn more about branching in Airflow in the Branching in Airflow guide.

# from airflow.decorators import task
# import os

@task.branch_external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
def my_isolated_task():
import pandas as pd
import random
print(f"The pandas version in the virtual env is: {pd.__version__}")

num = random.randint(0, 100)

if num > 50:
# return the task_id of the downstream task that should be executed
return "downstream_task_a"
else:
return "downstream_task_b"

Use Airflow context variables in isolated environments

Some variables from the Airflow context can be passed to isolated environments, for example the logical_date of the DAG run. Due to compatibility issues, other objects from the context such as ti cannot be passed to isolated environments. For more information, see the Airflow documentation.

# from airflow.decorators import task
# import os

# note that to be able to use the logical date, pendulum needs to be installed in the epo_pyenv
@task.external_python(python=os.environ["ASTRO_PYENV_epo_pyenv"])
def my_isolated_task(logical_date):
print(f"The logical date is: {logical_date}")
# your code to run in the isolated environment

my_isolated_task()

Use Airflow variables in isolated environments

You can inject Airflow variables into isolated environments by using Jinja templating in the op_kwargs argument of the PythonVirtualenvOperator or ExternalPythonOperator. This strategy lets you pass secrets into your isolated environment, which are masked in the logs according to rules described in Hide sensitive information in Airflow variables.

# from airflow.operators.python import PythonVirtualenvOperator

def my_isolated_function(password_from_op_kwargs):
print(f"The password is: {password_from_op_kwargs}")

my_isolated_task = PythonVirtualenvOperator(
task_id="my_isolated_task",
python_callable=my_isolated_function,
requirements=["pandas==1.5.1"],
python_version="3.10",
op_kwargs={
"password_from_op_kwargs": "{{ var.value.my_secret }}",
},
)

Use Airflow packages in isolated environments

warning

Using Airflow packages inside of isolated environments can lead to unexpected behavior and is not recommended.

If you need to use Airflow or an Airflow provider module inside your virtual environment, use the @task.virtualenv decorator or the PythonVirtualenvOperator instead of the @task.external_python decorator or the ExternalPythonOperator. As of Airflow 2.8, you can cache the virtual environment for reuse by providing a venv_cache_path to the @task.virtualenv decorator or PythonVirtualenvOperator, to speed up subsequent runs of your task.

# from airflow.decorators import task

@task.virtualenv(
requirements=[
"apache-airflow-providers-snowflake==5.3.0",
"apache-airflow==2.8.1",
"pandas==1.5.3",
],
venv_cache_path="/tmp/venv_cache", # optional caching of the virtual environment
)
def my_isolated_task():
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
import pandas as pd

hook = SnowflakeHook(snowflake_conn_id="MY_SNOWFLAKE_CONN_ID")
result = hook.get_first("SELECT * FROM MY_TABLE LIMIT 1")
print(f"The pandas version in the virtual env is: {pd.__version__}")

return result

my_isolated_task()

Was this page helpful?

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.