Skip to main content

Test and Troubleshoot the KubernetesPodOperator Locally

The KubernetesPodOperator is an Airflow operator that completes tasks in Kubernetes Pods. The KubernetesPodOperator provides an isolated, containerized execution environment for each task and lets you run custom Docker images and Python versions, set task-level resource requests, and more.

On Astro, the Kubernetes infrastructure required to run the KubernetesPodOperator is built in. To test the KubernetesPodOperator operator locally, you need a local Kubernetes environment.

Step 1: Set up Kubernetes

The latest versions of Docker for Windows and Mac let you run a single node Kubernetes cluster locally. If you are using Windows, see Setting Up Docker for Windows and WSL to Work Flawlessly. If you are using Mac, see Docker Desktop for Mac user manual. It isn't necessary to install Docker Compose.

  1. Open Docker and go to Settings > Kubernetes.

  2. Select the Enable Kubernetes checkbox.

  3. Click Apply and Restart.

  4. Click Install in the Kubernetes Cluster Installation dialog.

    Docker restarts and the status indicator changes to green to indicate Kubernetes is running.

Step 2: Update the kubeconfig file

  1. Go to the $HOME/.kube directory that was created when you enabled Kubernetes in Docker and copy the config file into the /include/.kube/ folder in your Astro project. The config file contains all the information the KubernetesPodOperator uses to connect to your cluster. For example:

    clusters:
    - cluster:
    certificate-authority-data: <certificate-authority-data>
    server: https://kubernetes.docker.internal:6443/
    name: docker-desktop
    contexts:
    - context:
    cluster: docker-desktop
    user: docker-desktop
    name: docker-desktop
    current-context: docker-desktop
    kind: Config
    preferences: {}
    users:
    - name: docker-desktop
    user:
    client-certificate-data: <client-certificate-data>
    client-key-data: <client-key-data>

    The cluster name should be searchable as docker-desktop in your local $HOME/.kube``config file. Do not add any additional data to the config file.

  2. Update the <certificate-authority-data>, <client-authority-data>, and <client-key-data> values in the config file with the values for your organization.

  3. Under cluster, change server: https://localhost:6445 to server: https://kubernetes.docker.internal:6443 to identify the localhost running Kubernetes Pods. If this doesn't work, try server: https://host.docker.internal:6445.

  4. Optional. Add the .kube folder to .gitignore if your Astro project is hosted in a GitHub repository and you want to prevent the file from being tracked by your version control tool.

  5. Optional. Add the .kube folder to .dockerignore to exclude it from the Docker image.

Step 3: Run your container

To use the KubernetesPodOperator, you must define the configuration of each task and the Kubernetes Pod in which it runs, including its namespace and Docker image.

This example DAG runs a hello-world Docker image. The namespace is determined dynamically based on whether you're running the DAG in your local environment or on Astro. If you are using Linux, the cluster_context is microk8s. The config_file points to the edited /include/.kube/config file.

Once you've updated the definition of KubernetesPodOperator tasks in your Astro project, run astro dev start with the Astro CLI to test your DAGs in a local Airflow environment.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

namespace = conf.get('kubernetes', 'NAMESPACE')

# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace =='default':
config_file = '/usr/local/airflow/include/.kube/config'
in_cluster = False
else:
in_cluster = True
config_file = None

dag = DAG('example_kubernetes_pod', schedule_interval='@once', default_args=default_args)


with dag:
KubernetesPodOperator(
namespace=namespace,
image="hello-world",
labels={"<pod-label>": "<label-name>"},
name="airflow-test-pod",
task_id="task-one",
in_cluster=in_cluster, # if set to true, will look in the cluster, if false, looks for file
cluster_context="docker-desktop", # is ignored when in_cluster is set to True
config_file=config_file,
is_delete_operator_pod=True,
get_logs=True,
)

Step 4: View Kubernetes logs

Optional. Use the kubectl command line tool to review the logs for any Pods that were created by the operator for issues and help with troubleshooting. If you haven't installed the kubectl command line tool, see Install Tools.

Run kubectl get pods -n $namespace or kubectl logs {pod_name} -n $namespace to examine the logs for the Pod that just ran. By default, docker-for-desktop runs Pods in the default namespace.

Next steps