Run Snowpark queries with the ExternalPythonOperator in Apache Airflow
It is very common to run a task with different dependencies than your Airflow environment. Your task might need a different Python version than core Airflow, or it has packages that conflict with your other tasks. In these cases, running tasks in an isolated environment can help manage dependency conflicts and enable compatibility with your execution environments.
In this tutorial, you'll learn how to use the ExternalPythonOperator to run a task that leverages the Snowpark API for data transformations. Snowpark allows you to run queries and transformations on your data using different programming languages, making it a flexible addition to traditional Snowflake operators.
Snowpark requires Python 3.8, while Astro runtime uses Python 3.9. The ExternalPythonOperator will run your Snowpark query in a Python 3.8 virtual environment, allowing you to use a different python version than Airflow. You can use these same general steps for any use case for running a task in a reusable Python virtual environment.
Time to complete
This tutorial takes approximately one hour to complete.
Assumed knowledge
To get the most out of this tutorial, you should be familiar with:
- Airflow operators. See Operators 101.
- Python virtual environments. See Python Virtual Environments: A Primer.
Prerequisites
- The Astro CLI.
- A Snowflake Enterprise account. If you don't already have an account, Snowflake has a free Snowflake trial for 30 days.
- An external secrets manager of your choice (optional). This tutorial uses AWS Parameter Store.
Step 1: Set up your data stores
For this example, you will need data in a Snowflake table to query using Snowpark. To create a table, run the following queries in Snowflake:
Create a table:
CREATE TABLE dog_intelligence (
BREED varchar(50),
HEIGHT_LOW_INCHES INT,
HEIGHT_HIGH_INCHES INT,
WEIGHT_LOW_LBS INT,
WEIGHT_HIGH_LBS INT,
REPS_LOWER INT,
REPS_UPPER INT
);Populate the table:
INSERT INTO dog_intelligence
VALUES
('Akita', 26, 28, 80, 120, 1, 4),
('Great Dane', 32, 32, 120, 160, 1, 4),
('Weimaraner', 25, 27, 70, 85, 16, 25),
('Vizsla', 48, 66, 22, 25, 26, 40)
;
Step 2: Create a connection to Snowflake in your secrets manager (optional)
Your Python virtual environment needs access to Snowflake through a connection. If you are using an external secrets manager, add a new secret called /airflow/connections/snowflake
with the connection information with the following JSON:
{
"conn_type": "snowflake",
"login": "your-login",
"password": "your-password",
"schema": "your-schema",
"extra": {
"account": "<your-account>",
"database": "<your-database>",
"region": "<your-region>",
"warehouse": "<your-warehouse>",
"role": "<your-role>"
}
}
If you are not using an external secrets manager, you can skip this step.
Step 3: Configure your Astro project
Now that you have your Snowflake resources configured, you can set up Airflow.
Create a new Astro project:
$ mkdir astro-snowpark-tutorial && cd astro-snowpark-tutorial
$ astro dev initAdd a new file to the root folder of your project called
snowpark_requirements.txt
and add the following text:snowflake-snowpark-python[pandas]
boto3The packages in this file will be installed in your virtual environment. The
snowflake-snowpark-python
package is required to run Snowpark queries. Theboto3
package is used to interact with AWS Parameter Store to retrieve credentials. If you are using a different secrets manager or are managing secrets locally, you can update or remove this line.Update the
Dockerfile
of your Astro project to installpyenv
and its requirements:FROM quay.io/astronomer/astro-runtime:6.0.3
##### Docker Customizations below this line #####
## The directory where `pyenv` will be installed. You can update the path as needed
ENV PYENV_ROOT="/home/astro/.pyenv"
ENV PATH=${PYENV_ROOT}/bin:${PATH}
## If you want to check your dependency conflicts for extra packages that you may require for your ## venv, uncomment the following two lines to install pip-tools
# RUN pip-compile -h
# RUN pip-compile snowpark_requirements.txt
## Install the required version of pyenv and create the virtual environment
RUN curl https://pyenv.run | bash && \
eval "$(pyenv init -)" && \
pyenv install 3.8.14 && \
pyenv virtualenv 3.8.14 snowpark_env && \
pyenv activate snowpark_env && \
pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r snowpark_requirements.txtThese commands install
pyenv
in your Airflow environment and create a Python 3.8 virtual environment calledsnowpark_env
with the required packages to run Snowpark. Thepyenv
environment is initialized when you start your Airflow project and can be used by any ExternalPythonOperator tasks. If you use a different virtual environment package (such asvenv
orconda
), you might need to update this configuration.(Optional) If you set up a Snowflake connection with a secrets manager as described in Step 2, add a new file to the root folder of your project called
secrets-manager.env
and add environment variables that can be used to connect to your secrets manager. For example, if you use AWS parameter store, you might add the following:AWS_ACCESS_KEY_ID=<your-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-secret-access-key>Add the
secrets-manager.env
file to your project's.gitignore
file so sensitive credentials aren't tracked in git, and update the last two lines of your Dockerfile to the following:pip install --no-cache-dir -r snowpark_requirements.txt && \
source secrets-manager.envIf you are not using an external secrets manager, you can skip this step.
noteThere are many ways to connect your virtual environment to your secrets manager, including passing a profile with a shared credential file or having your environment assume a role that has access to your secrets manager. The access key and secret method described here is the most straight forward when working with a local project, but might not work for production in some organizations.
Add the following to your
packages.txt
file:git
make
build-essential
libssl-dev
zlib1g-dev
libbz2-dev
libreadline-dev
libsqlite3-dev
wget
curl
llvm
libncurses5-dev
libncursesw5-dev
xz-utils
tk-dev
libpq-dev
krb5-configThis installs all the needed packages to run
pyenv
in your Airflow environment.Run the following command to start your project in a local environment:
astro dev start
noteThe build of this project's Dockerfile can take up to 20 minutes due to the
pyenv
and Python 3.8 installation. If you are an Astronomer customer and will be deploying this project to Astro, you can use adag-only
deploy after the initial deployment to avoid rebuilding the Dockerfile when making changes to DAGs in the project.
Step 4: Create your DAG
In your Astro project dags
folder, create a new file called external-python-pipeline.py
. Paste the following code into the file:
from __future__ import annotations
import logging
import os
import sys
import tempfile
import time
import shutil
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
with DAG(
dag_id='py_virtual_env',
schedule=None,
start_date=pendulum.datetime(2022, 10, 10, tz="UTC"),
catchup=False,
tags=['pythonvirtualenv']
):
@task(task_id='print_the_context')
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
@task.external_python(
task_id='external_python',
python='/home/astro/.pyenv/versions/snowpark_env/bin/python'
)
def callable_external_python():
from time import sleep
import pkg_resources
from snowflake.snowpark import Session
import boto3
import json
## Checking for the correct venv packages - this is useful for debugging
installed_packages = pkg_resources.working_set
installed_packages_list = sorted(['%s==%s' % (i.key, i.version)
for i in installed_packages])
print(installed_packages_list)
# Retrieving connection information from the external secrets manager
ssm = boto3.client('ssm', region_name='us-east-1')
parameter = ssm.get_parameter(Name='/airflow/connections/snowflake', WithDecryption=True)
conn = json.loads(parameter['Parameter']['Value'])
# Defining parameters for Airflow's Snowpark connection
connection_parameters = {
"account": conn['extra']['account'],
"user": conn['login'],
"password": conn['password'],
"role": conn['extra']['role'],
"warehouse": conn['extra']['warehouse'],
"database": conn['extra']['database'],
"schema": conn['schema'],
"region": conn['extra']['region']
}
# Creating a connection session between Snowpark and Airflow
session = Session.builder.configs(connection_parameters).create()
# Running a SQL query in Snowpark
df = session.sql('select avg(reps_upper), avg(reps_lower) from dog_intelligence;')
print(df.collect())
# Closing the connection session
session.close()
task_print = print_context()
task_external_python = callable_external_python()
task_print >> task_external_python
This DAG prints the context of your Airflow environment before using the @task.external_python
decorator to run a Snowpark query in the virtual environment you created in Step 3. The ExternalPythonOperator task also prints a list of packages installed in the virtual environment, which can be helpful for debugging.
This example pulls Snowflake connection information from AWS Parameter Store. If you are using a different secrets manager, you will need to update the following lines:
import boto3
import json
ssm = boto3.client('ssm', region_name='us-east-1')
parameter = ssm.get_parameter(Name='/airflow/connections/snowflake', WithDecryption=True)
conn = json.loads(parameter['Parameter']['Value'])
To run the DAG without an external secrets manager, simply provide your connection information directly in the connection_parameters
dictionary (note that this is not best practice as sensitive information will be stored in your DAG file).
Step 5: Run your DAG to execute your Snowpark query in a virtual environment
Go to the Airflow UI, unpause your py_virtual_env
DAG and trigger it to run your Snowpark query in an isolated Python virtual environment. Open your tasks logs to see the list of installed packages and the results of your query printed:
[2022-11-17, 18:55:56 UTC] {process_utils.py:179} INFO - Executing cmd: /home/astro/.pyenv/versions/snowpark_env/bin/python /tmp/tmd_3cp_1al/script.py /tmp/tmd_3cp_1al/script.in /tmp/tmd_3cp_1al/script.out /tmp/tmd_3cp_1al/string_args.txt
[2022-11-17, 18:55:56 UTC] {process_utils.py:183} INFO - Output:
[2022-11-17, 18:56:02 UTC] {process_utils.py:187} INFO - ['asn1crypto==1.5.1', 'boto3==1.26.10', 'botocore==1.29.10', 'certifi==2022.9.24', 'cffi==1.15.1', 'charset-normalizer==2.1.1', 'cloudpickle==2.0.0', 'cryptography==38.0.3', 'filelock==3.8.0', 'idna==3.4', 'jmespath==1.0.1', 'numpy==1.23.4', 'oscrypto==1.3.0', 'pandas==1.5.1', 'pip==22.3.1', 'pyarrow==8.0.0', 'pycparser==2.21', 'pycryptodomex==3.15.0', 'pyjwt==2.6.0', 'pyopenssl==22.1.0', 'python-dateutil==2.8.2', 'pytz==2022.6', 'requests==2.28.1', 's3transfer==0.6.0', 'setuptools==56.0.0', 'six==1.16.0', 'snowflake-connector-python==2.8.1', 'snowflake-snowpark-python==1.0.0', 'typing-extensions==4.4.0', 'urllib3==1.26.12', 'wheel==0.38.4']
[2022-11-17, 18:56:02 UTC] {process_utils.py:187} INFO - <snowflake.snowpark.dataframe.DataFrame object at 0x7f69710e1d60>
[2022-11-17, 18:56:02 UTC] {process_utils.py:187} INFO - [Row(AVG(REPS_UPPER)=Decimal('41.507353'), AVG(REPS_LOWER)=Decimal('25.588235'))]
[2022-11-17, 18:56:02 UTC] {python.py:177} INFO - Done. Returned value was: None
[2022-11-17, 18:56:02 UTC] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=py_virtual_env, task_id=external_python, execution_date=20221117T185554, start_date=20221117T185555, end_date=20221117T185602
Other methods for running tasks in isolated environments
Airflow has several other options for running tasks in isolated environments:
- The KubernetesPodOperator. This operator is ideal for users who are running Airflow on Kubernetes and want more control over the resources and infrastructure used to run the task in addition to package management. Downsides include more complex setup and higher task latency.
- The PythonVirtualenvOperator. This operator works similarly to the ExternalPythonOperator, but it creates and destroys a new virtual environment for each task. This operator is ideal if you don't want to persist your virtual environment. Downsides include higher task latency since the environment must be created each time the task is run.