Run Snowpark queries with the ExternalPythonOperator in Apache Airflow
It is very common to run a task with different dependencies than your Airflow environment. Your task might need a different Python version than core Airflow, or it has packages that conflict with your other tasks. In these cases, running tasks in an isolated environment can help manage dependency conflicts and enable compatibility with your execution environments.
In this tutorial, you'll learn how to use the ExternalPythonOperator to run a task that leverages the Snowpark API for data transformations. Snowpark allows you to run queries and transformations on your data using different programming languages, making it a flexible addition to traditional Snowflake operators.
Snowpark requires Python 3.8, while the Astro Runtime uses Python 3.9. The ExternalPythonOperator can run your Snowpark query in a Python 3.8 virtual environment, allowing you to use a different Python version for your task than in the Airflow environment. You can use these same general steps for any use case for running a task in a reusable Python virtual environment.
Time to complete
This tutorial takes approximately one hour to complete.
Assumed knowledge
To get the most out of this tutorial, you should be familiar with:
- Airflow operators. See Operators 101.
- Python virtual environments. See Python Virtual Environments: A Primer.
Prerequisites
- The Astro CLI.
- A Snowflake Enterprise account. If you don't already have an account, Snowflake has a free Snowflake trial for 30 days.
Step 1: Set up your data stores
For this example, you will need data in a Snowflake table to query using Snowpark. To create a table, run the following queries in Snowflake:
Create a table:
CREATE TABLE dog_intelligence (
BREED varchar(50),
HEIGHT_LOW_INCHES INT,
HEIGHT_HIGH_INCHES INT,
WEIGHT_LOW_LBS INT,
WEIGHT_HIGH_LBS INT,
REPS_LOWER INT,
REPS_UPPER INT
);Populate the table:
INSERT INTO dog_intelligence
VALUES
('Akita', 26, 28, 80, 120, 1, 4),
('Great Dane', 32, 32, 120, 160, 1, 4),
('Weimaraner', 25, 27, 70, 85, 16, 25),
('Vizsla', 48, 66, 22, 25, 26, 40)
;
Step 2: Configure your Astro project
Now that you have your Snowflake resources configured, you can set up Airflow.
Create a new Astro project:
$ mkdir astro-snowpark-tutorial && cd astro-snowpark-tutorial
$ astro dev initAdd a new file to the root folder of your project called
snowpark-requirements.txt
and add the following text:snowflake-snowpark-python[pandas]
apache-airflow
psycopg2-binary
apache-airflow-providers-snowflakeThe packages in this file will be installed in your virtual environment. Your Airflow task requires the
snowflake-snowpark-python
package to run Snowpark queries in the virtual environment. The virtual environment uses the other packages to access the Snowflake connection you defined in Airflow. If you are using a different method of connecting to Snowflake, such as a secrets manager or managing secrets locally, you can update or remove these lines.Add the following to your
packages.txt
file:build-essential
Update the
Dockerfile
of your Astro project to installpyenv
and its requirements using Astronomer's Docker BuildKit frontend for Airflow:# syntax=quay.io/astronomer/airflow-extensions:v1
FROM quay.io/astronomer/astro-runtime:7.2.0-base
PYENV 3.8 snowpark snowpark-requirements.txtThese commands install
pyenv
in your Airflow environment and create a Python 3.8 virtual environment calledsnowpark
with the required packages to run Snowpark. Thepyenv
environment is initialized when you start your Airflow project and can be used by any ExternalPythonOperator tasks.Creating a
pyenv
environment in your Airflow project requires installing multiple operating system level packages and a series of Docker commands. The open source Astro BuildKit simplifies this process and allows you to create a Python virtual environment with only two lines of code.noteTo modify this example for other use cases, you can update
3.8
in the Dockerfile to a different version of Python. Note that there are some limitations when using a Python version greater than the version used by Airflow. For more details, see the project Readme.Ensure Docker BuildKit is enabled. To enable BuildKit by default, you can update the parameter in Docker Desktop or modify your
/etc/docker/daemon.json
file with the following:{
"features": {
"buildkit" : true
}
}Run the following command to start your project in a local environment:
astro dev start
infoThe build of this project's Dockerfile can take up to 20 minutes due to the
pyenv
and Python 3.8 installation. If you are an Astronomer customer and will be deploying this project to Astro, you can use adag-only
deploy after the initial deployment to avoid rebuilding the Dockerfile when making changes to DAGs in the project.
Step 3: Create a connection to Snowflake
In the Airflow UI, go to Admin > Connections and click +.
Create a new connection named
snowflake_default
and choose theSnowflake
connection type. Enter the following information:- Schema: Your Snowflake schema.
- Login: Your Snowflake login username.
- Password: Your Snowflake password.
- Account: Your Snowflake account in the format
xy12345
. - Database: Your Snowflake database.
- Region: Your Snowflake region, for example
us-east-1
.
The following example shows an Airflow Connection configuration in the Airflow UI.
Step 4: Create your DAG
In your Astro project dags
folder, create a new file called external-python-pipeline.py
. Paste the following code into the file:
from __future__ import annotations
import os
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
with DAG(
"py_virtual_env",
schedule_interval=None,
start_date=pendulum.datetime(2022, 10, 10, tz="UTC"),
catchup=False,
tags=["pythonvirtualenv"],
) as dag:
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"
@task.external_python(
task_id="external_python", python=os.environ["ASTRO_PYENV_snowpark"]
)
def callable_external_python():
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from snowflake.snowpark import Session
hook = SnowflakeHook("snowflake_default")
conn_params = hook._get_conn_params()
session = Session.builder.configs(conn_params).create()
query = """
select avg(reps_upper), avg(reps_lower)
from dog_intelligence;
"""
df = session.sql(query)
print(df)
print(df.collect())
session.close()
task_print = print_context()
task_external_python = callable_external_python()
task_print >> task_external_python
This DAG prints the context of your Airflow environment before using the @task.external_python
decorator to run a Snowpark query in the virtual environment you created in Step 2.
Step 5: Run your DAG to execute your Snowpark query in a virtual environment
Go to the Airflow UI, unpause your py_virtual_env
DAG, and trigger it to run your Snowpark query in an isolated Python virtual environment. Open your tasks logs to see the results of your query printed:
[2023-04-07, 17:10:50 UTC] {process_utils.py:187} INFO - <snowflake.snowpark.dataframe.DataFrame object at 0x7fb86f142a90>
[2023-04-07, 17:10:50 UTC] {process_utils.py:187} INFO - [ [34m2023-04-07 17:10:50,271 [0m] {[34mcursor.py:[0m738} INFO [0m - query: [select avg(reps_upper), avg(reps_lower) from dog_intelligence;][0m
[2023-04-07, 17:10:50 UTC] {process_utils.py:187} INFO - [ [34m2023-04-07 17:10:50,814 [0m] { [34mcursor.py:[0m751} INFO[0m - query execution done[0m
[2023-04-07, 17:10:50 UTC] {process_utils.py:187} INFO - [[34m2023-04-07 17:10:50,815[0m] {[34mcursor.py:[0m890} INFO[0m - Number of results in first chunk: 1[0m
[2023-04-07, 17:10:50 UTC] {process_utils.py:187} INFO - [Row(AVG(REPS_UPPER)=Decimal('41.507353'), AVG(REPS_LOWER)=Decimal('25.588235'))]
[2023-04-07, 17:10:50 UTC] {process_utils.py:187} INFO - [[34m2023-04-07 17:10:50,827[0m] {[34msession.py:[0m373} INFO[0m - Closing session: 114491144789286[0m
Other methods for running tasks in isolated environments
Airflow has several other options for running tasks in isolated environments:
- The KubernetesPodOperator. This operator is ideal for users who are running Airflow on Kubernetes and want more control over the resources and infrastructure used to run the task in addition to package management. Downsides include more complex setup and higher task latency.
- The PythonVirtualenvOperator. This operator works similarly to the ExternalPythonOperator, but it creates and destroys a new virtual environment for each task. This operator is ideal if you don't want to persist your virtual environment. Downsides include higher task latency since the environment must be created each time the task is run.