Skip to main content

Run the KubernetesPodOperator on Astro

The KubernetesPodOperator is one of the most powerful Apache Airflow operators. Similar to the Kubernetes executor, this operator dynamically launches a Pod in Kubernetes for each task and terminates each Pod once the task is complete. This results in an isolated, containerized execution environment for each task that is separate from tasks otherwise being executed by Celery workers.

Benefits

You can use the KubernetesPodOperator to:

  • Execute a custom Docker image per task with Python packages and dependencies that would otherwise conflict with the rest of your Deployment's dependencies. This includes Docker images in a private registry or repository.
  • Run tasks in a Kubernetes cluster outside of Astro. This can be helpful when you need to run individual tasks on infrastructure that isn't currently supported by Astro, such as GPU nodes or other third-party services.
  • Specify CPU and memory as task-level limits or minimums to optimize performance and reduce costs.
  • Write task logic in a language other than Python. This gives you flexibility and can enable new use cases across teams.
  • Scale task growth horizontally in a way that is cost-effective, dynamic, and minimally dependent on worker resources.
  • Set Kubernetes-native configurations in a YAML file, including volumes, secrets, and affinities.

On Astro, the Kubernetes infrastructure required to run the KubernetesPodOperator is built into every cluster and is managed by Astronomer.

Known limitations

  • Cross-account service accounts are not supported on Pods launched in an Astro cluster. To allow access to external data sources, you can provide credentials and secrets to tasks.
  • PersistentVolumes (PVs) are not supported on Pods launched in an Astro cluster.
  • (Hybrid only) You cannot run a KubernetesPodOperator task in a worker queue or node pool that is different than the worker queue of its parent worker. For example, a KubernetesPodOperator task that is triggered by an m5.4xlarge worker on AWS will also be run on an m5.4xlarge node. To run a task on a different node instance type, you must launch it in an external Kubernetes cluster. If you need assistance launching KubernetesPodOperator tasks in external Kubernetes clusters, contact Astronomer support.

Prerequisites

Configure the KubernetesPodOperator in the Cloud UI

While you still need to configure the KubernetesPodOperator in your DAG code to define your task environment, you can set some safeguards on Astro so that tasks in your Deployment don't request more CPU or memory than expected. Set safeguards by configuring default Pod limits and requests from the Cloud UI. If a task requests more CPU or memory than is currently allowed in your configuration, the task fails.

  1. In the Cloud UI, select a Deployment.

  2. Click Resource quotas.

  3. Configure the following values:

    • CPU quota: The maximum amount of CPU for all currently running Pods on your Deployment.
    • Memory Quota: The maximum amount of memory for all currently running Pods on your Deployment.

Your CPU quota and memory quota determine your Max Pod Size, which is the maximum amount of resources that a task can request for its Pod. If the CPU and memory quotas you specify exceed exceed the limits of Astro's infrastructure, your Max Pod Size is determined by the size of the Astro-hosted infrastructure running your tasks.

The Cloud UI also shows the Default CPU and Default Memory for your default Pod. If you don't configure CPU or memory for a task in your DAG code, the task runs in the default Pod with these default resources.

Set up the KubernetesPodOperator

To use the KubernetesPodOperator in a DAG, add the following import statements and instantiation to your DAG file:

from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator


namespace = conf.get("kubernetes", "NAMESPACE")

KubernetesPodOperator(
namespace=namespace,
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
task_id="<task-name>",
get_logs=True,
)

For each instantiation of the KubernetesPodOperator, you must specify the following values:

  • namespace = conf.get("kubernetes", "NAMESPACE"): Every Deployment runs on its own Kubernetes namespace within a cluster. Information about this namespace can be programmatically imported as long as you set this variable.
  • image: This is the Docker image that the operator will use to run its defined task, commands, and arguments. The value you specify is assumed to be an image tag that's publicly available on Docker Hub. To pull an image from a private registry, see Pull images from a Private Registry.

This is the minimum configuration required to run tasks with the KubernetesPodOperator on Astro. To further customize the way your tasks are run, see the topics below.

Configure task-level Pod resources

Astro automatically allocates resources to Pods created by the KubernetesPodOperator. Resources used by the KubernetesPodOperator are not technically limited, which means that the operator could theoretically use any CPU and memory that's available in your Deployment to complete a task. Because of this, Astronomer recommends specifying compute resource requests and limits for each task.

To do so, define a kubernetes.client.models.V1ResourceRequirements object and provide that to the container_resources argument of the KubernetesPodOperator. For example:

from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

compute_resources = k8s.V1ResourceRequirements(
limits={"cpu": "800m", "memory": "3Gi"},
requests={"cpu": "800m", "memory": "3Gi"}
)

namespace = conf.get("kubernetes", "NAMESPACE")

KubernetesPodOperator(
namespace=namespace,
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
container_resources=compute_resources,
task_id="<task-name>",
get_logs=True,
)

Applying the previous code example ensures that when this DAG runs, it launches a Kubernetes Pod with exactly 800m of CPU and 3Gi of memory as long as that infrastructure is available in your Deployment. After the task finishes, the Pod will terminate gracefully.

Mount a temporary directory

Alternative Astro Hybrid setup

On Astro Hybrid, this configuration works only on AWS clusters where you have enabled m5d and m6id worker types. These worker types have NVMe SSD volumes that can be used by tasks for ephemeral storage. See Amazon EC2 M6i Instances and Amazon EC2 M5 Instances for the amount of available storage in each node type.

The task which mounts a temporary directory must run on a worker queue that uses either m5d and m6id worker types. See Modify a cluster for instructions on enabling m5d and m6id workers on your cluster. See Configure a worker queue to configure a worker queue to use one of these worker types.

To run a task run the KubernetesPodOperator that utilizes your Deployment's ephemeral storage, mount an emptyDir volume to the KubernetesPodOperator. For example:

from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

volume = k8s.V1Volume(
name="cache-volume",
emptyDir={},
)

volume_mounts = [
k8s.V1VolumeMount(
mount_path="/cache", name="cache-volume"
)
]

example_volume_test = KubernetesPodOperator(
namespace=namespace,
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
task_id="<task-name>",
get_logs=True,
volume_mounts=volume_mounts,
volumes=[volume],
)

Run images from a private registry

By default, the KubernetesPodOperator expects to pull a Docker image that's hosted publicly on Docker Hub. If you want to execute a Docker image that's hosted in a private registry, you need to create a Kubernetes Secret and then specify the Kubernetes Secret in your DAG. If your Docker image is hosted in an Amazon Elastic Container Registry (ECR) repository, see Docker images hosted in private Amazon ECR repositories.

Prerequisites

Step 1: Create a Kubernetes Secret

To run Docker images from a private registry on Astro, a Kubernetes Secret that contains credentials to your registry must be created. Injecting this secret into your Deployment's namespace will give your tasks access to Docker images within your private registry.

  1. Log in to your Docker registry and follow the Kubernetes documentation to produce a /.docker/config.json file.
  2. In the Cloud UI, select a Workspace and then select the Deployment you want to use the KubernetesPodOperator with.
  3. Copy the value in the NAMESPACE field.
  4. Contact Astronomer support and provide the namespace of the Deployment.

Astronomer Support will give you instructions on how to securely send the output of your /.docker/config.json file. Do not send this file by email, as it contains sensitive credentials to your registry. Astronomer will use this file to create a Kubernetes secret and inject it into your Deployment's namespace.

Step 2: Specify the Kubernetes Secret in your DAG

Once Astronomer has added the Kubernetes secret to your Deployment, you will be notified and provided with the name of the secret.

After you receive the name of your Kubernetes secret from Astronomer, you can run images from your private registry by importing models from kubernetes.client and configuring image_pull_secrets in your KubernetesPodOperator instantiation:

from kubernetes.client import models as k8s

KubernetesPodOperator(
namespace=namespace,
image_pull_secrets=[k8s.V1LocalObjectReference("<your-secret-name>")],
image="<your-docker-image>",
cmds=["<commands-for-image>"],
arguments=["<arguments-for-image>"],
labels={"<pod-label>": "<label-name>"},
name="<pod-name>",
task_id="<task-name>",
get_logs=True,
)

Docker images hosted in private Amazon ECR repositories

info

This setup is available only on Astro Hybrid.

If your Docker image is hosted in an Amazon ECR repository, add a permissions policy to the repository to allow the KubernetesPodOperator to pull the Docker image. You don't need to create a Kubernetes secret, or specify the Kubernetes secret in your DAG. Docker images hosted on Amazon ECR repositories can only be pulled from AWS clusters.

  1. Log in to the Amazon ECR Dashboard and then select Menu > Repositories.

  2. Click the Private tab and then click the name of the repository that hosts the Docker image.

  3. Click Permissions in the left menu.

  4. Click Edit policy JSON.

  5. Copy and paste the following policy into the Edit JSON pane:

    {
    "Version": "2008-10-17",
    "Statement": [
    {
    "Sid": "AllowImagePullAstro",
    "Effect": "Allow",
    "Principal": {
    "AWS": "arn:aws:iam::<AstroAccountID>:root"
    },
    "Action": [
    "ecr:GetDownloadUrlForLayer",
    "ecr:BatchGetImage"
    ]
    }
    ]
    }

    Replace <AstroAccountID> with your Astro AWS account ID.

  6. Click Save to create a new permissions policy named AllowImagePullAstro.

  7. Set up the KubernetesPodOperator.

  8. Replace <your-docker-image> in the instantiation of the KubernetesPodOperator with the Amazon ECR repository URI that hosts the Docker image. To locate the URI:

    • In the Amazon ECR Dashboard, click Repositories in the left menu.
    • Click the Private tab and then copy the URI of the repository that hosts the Docker image.

Use secret environment variables with the KubernetesPodOperator

Astro environment variables marked as secrets are stored in a Kubernetes secret called env-secrets. To use a secret value in a task running on the Kubernetes executor, you pull the value from env-secrets and mount it to the Pod running your task as a new Kubernetes Secret.

  1. Add the following import to your DAG file:

    from airflow.kubernetes.secret import Secret
  2. Define a Kubernetes Secret in your DAG instantiation using the following format:

    secret_env = Secret(deploy_type="env", deploy_target="<VARIABLE_KEY>", secret="env-secrets", key="<VARIABLE_KEY>")
    namespace = conf.get("kubernetes", "NAMESPACE")
  3. Reference the key for the environment variable, formatted as $VARIABLE_KEY in the task using the KubernetesPodOperator.

In the following example, a secret named MY_SECRET is pulled from env-secrets and printed to logs.

import pendulum
from airflow.kubernetes.secret import Secret

from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.configuration import conf

with DAG(
dag_id='test-kube-pod-secret',
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
end_date=pendulum.datetime(2022, 1, 5, tz="UTC"),
schedule_interval="@once",
) as dag:

secret_env = Secret(deploy_type="env", deploy_target="MY_SECRET", secret="env-secrets", key="MY_SECRET")

namespace = conf.get("kubernetes", "NAMESPACE")

k = KubernetesPodOperator(
namespace=namespace,
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo $MY_SECRET && sleep 150"],
name="test-name",
task_id="test-task",
get_logs=True,
secrets=[secret_env],
)

Sign up for Developer Updates

Get a summary of new Astro features once a month.

You can unsubscribe at any time.
By proceeding you agree to our Privacy Policy, our Website Terms and to receive emails from Astronomer.