Skip to main content

The Astro Python SDK for ETL

The Astro Python SDK is an open source tool and Python package for DAG development that is built and maintained by Astronomer. The purpose of the SDK is to remove the complexity of writing DAGs in Apache Airflow, particularly in the context of Extract, Load, Transform (ELT) use cases. This enables pipeline authors to focus more on writing business logic in Python, and less on setting Airflow configurations.

The Astro SDK uses Python decorators and the TaskFlow API to simplify Python functions for common data orchestration use cases. Specifically, the Astro SDK decorators include eight python functions that make it easier to:

  • Extract a file from a remote object store, such as Amazon S3 or Google Cloud Storage (GCS).
  • Load that file to a new or existing table in a data warehouse, such as Snowflake.
  • Transform the data in that file with SQL written by your team.

These functions make your DAGs easier to write and read with less code. In this guide, you’ll learn about how to install the Python SDK and how to use it in practice. The Astro SDK should feel more similar to writing a traditional Python script than writing a DAG in Airflow.

tip

To get the most out of this guide, you should have an understanding of Airflow decorators. See Introduction to Airflow Decorators guide.

Python SDK functions

The Astro Python SDK makes implementing ELT use cases easier by allowing you to seamlessly transition between Python and SQL for each step in your process. Details like creating dataframes, storing intermediate results, passing context and data between tasks, and creating task dependencies are all managed automatically.

More specifically, the Astro Python SDK includes several functions that are helpful when implementing an ETL framework:

  • load_file: Loads a given file into a SQL table. The file should be in CSV, JSON, or parquet files stored in Amazon S3 or GCS.
  • transform: Applies a SQL select statement to a source table and saves the result to a destination table. This function allows you to transform your data with a SQL query. It uses a SELECT statement that you define to automatically store your results in a new table. By default, the output_table is given a unique name each time the DAG runs, but you can overwrite this behavior by defining a specific output_table in your function. You can then pass the results of the transform downstream to the next task as if it were a native Python object.
  • dataframe: Exports a specific SQL table into an in-memory pandas DataFrame. Similar to transform for SQL, the dataframe function allows you to implement a transformation of your data using Python. You can easily store the results of the dataframe function in your database by specifying an output_table, which is useful if you want to switch back to SQL in the next step or load your final results to your database.
  • append: Inserts rows from the source SQL table into the destination SQL table, if there are no conflicts. This function allows you to take resulting data from another function and append it to an existing table in your database. It is particularly useful in ETL scenarios and when dealing with reporting data.

For a full list of functions, see the Astro Python SDK README in GitHub.

Installation

Using the Astro Python SDK requires configuring a few things in your Airflow project.

  1. Install the Astro Python SDK package in your Airflow environment. If you're using the Astro CLI, add the following to the requirements.txt file of your Astro project:

    astro-sdk-python
  2. If you're using the Astro CLI locally, add the following variable to the .env file of your Astro project:

    export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True

    Setting AIRFLOW__CORE__ENABLE_XCOM_PICKLING to True is not recommended in production environments (See here) and should be avoided. Instead, Astronomer recommends using AIRFLOW__CORE__XCOM_BACKEND, AIRFLOW__ASTRO_SDK__XCOM_STORAGE_CONN_ID, and AIRFLOW__ASTRO_SDK__XCOM_STORAGE_URL to allow XCOM data to be saved to a S3 or GCS location. For example:

    export AIRFLOW__CORE__XCOM_BACKEND='astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend'
    export AIRFLOW__ASTRO_SDK__XCOM_STORAGE_CONN_ID=<my_aws_conn>
    export AIRFLOW__ASTRO_SDK__XCOM_STORAGE_URL='s3://my-bucket/xcom/'

    To store all of the intermediary tables generated by astro-sdk in a specific schema, use the variable AIRFLOW__ASTRO_SDK__SQL_SCHEMA to specify the schema. To deploy a pipeline written with the Astro Python SDK to Astro, add the environment variables to your Deployment. See Environment variables.

For a guided experience to get started, see the Astro Python SDK tutorial.

Before and after the Astro Python SDK

To highlight how the Astro Python SDK results in simpler DAG code, we'll show a direct comparison of a DAG written with the SDK to one written with traditional operators. The following DAG is a complete implementation of an ETL pipeline using the Astro Python SDK. In order, the DAG:

  • Loads .csv files from Amazon S3 into two tables that contain data about the housing market. Tables are objects that contain all of the necessary functionality to pass database contexts between functions without reconfiguration.
  • Combines the two tables of home data using aql.transform.
  • Turns the combined into a DataFrame, melts the values using aql.dataframe, and returns the results as a Table object.
  • Creates a new reporting table in Snowflake using aql.run_raw_sql.
  • Appends the table of transformed home data to a reporting table with aql.append.
import os
from datetime import datetime
import pandas as pd
from airflow.decorators import dag
from astro.files import File
from astro import sql as aql
from astro.sql.table import Metadata, Table
SNOWFLAKE_CONN_ID = "snowflake_conn"
AWS_CONN_ID = "aws_conn"
# The first transformation combines data from the two source tables
@aql.transform
def combine_tables(homes1: Table, homes2: Table):
return """
SELECT *
FROM {{homes1}}
UNION
SELECT *
FROM {{homes2}}
"""
# Switch to Python (Pandas) for melting transformation to get data into long format
@aql.dataframe
def transform_data(df: pd.DataFrame):
df.columns = df.columns.str.lower()
melted_df = df.melt(
id_vars=["sell", "list"], value_vars=["living", "rooms", "beds", "baths", "age"]
)
return melted_df
# Run a raw SQL statement to create the reporting table if it doesn't already exist
@aql.run_raw_sql
def create_reporting_table():
"""Create the reporting data which will be the target of the append method"""
return """
CREATE TABLE IF NOT EXISTS homes_reporting (
sell number,
list number,
variable varchar,
value number
);
"""
@dag(start_date=datetime(2021, 12, 1), schedule="@daily", catchup=False)
def example_s3_to_snowflake_etl():
# Initial load of homes data csv's from S3 into Snowflake
homes_data1 = aql.load_file(
task_id="load_homes1",
input_file=File(path="s3://airflow-kenten/homes1.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES1", conn_id=SNOWFLAKE_CONN_ID)
)
homes_data2 = aql.load_file(
task_id="load_homes2",
input_file=File(path="s3://airflow-kenten/homes2.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES2", conn_id=SNOWFLAKE_CONN_ID)
)
# Define task dependencies
extracted_data = combine_tables(
homes1=homes_data1,
homes2=homes_data2,
output_table=Table(name="combined_homes_data"),
)
transformed_data = transform_data(
df=extracted_data, output_table=Table(name="homes_data_long")
)
create_reporting_table = create_reporting_table(conn_id=SNOWFLAKE_CONN_ID)
# Append transformed data to reporting table
# Dependency is inferred by passing the previous `transformed_data` task to `source_table` param
record_results = aql.append(
source_table=transformed_data,
target_table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID),
columns=["sell", "list", "variable", "value"],
)
record_results.set_upstream(create_results_table)
example_s3_to_snowflake_etl_dag = example_s3_to_snowflake_etl()

Astro Graph

The following sections break down each step of this DAG and compare the Astro Python SDK implementation to one using traditional operators.

Load data

The first step in the pipeline is to load the data from S3 to Snowflake. With the Astro Python SDK, the load_file and append functions take care of loading your raw data from Amazon S3 and appending data to your reporting table. You don't have to write any extra code to get the data into Snowflake.

homes_data1 = aql.load_file(
task_id="load_homes1",
input_file=File(path="s3://airflow-kenten/homes1.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES1", conn_id=SNOWFLAKE_CONN_ID)
)
homes_data2 = aql.load_file(
task_id="load_homes2",
input_file=File(path="s3://airflow-kenten/homes2.csv", conn_id=AWS_CONN_ID),
output_table=Table(name="HOMES2", conn_id=SNOWFLAKE_CONN_ID)
)

Without the SDK, the easiest way to accomplish this is using the traditional S3ToSnowflakeOperator.

def classic_etl_dag():
load_data = S3ToSnowflakeOperator(
task_id='load_homes_data',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
s3_keys=[S3_FILE_PATH + '/homes.csv'],
table=SNOWFLAKE_SAMPLE_TABLE,
schema=SNOWFLAKE_SCHEMA,
stage=SNOWFLAKE_STAGE,
file_format="(type = 'CSV',field_delimiter = ',')",
)

While this operator is straight-forward, it requires knowledge of Snowflake and S3-specific parameters. The Astro Python SDK takes care of all of nuances of different systems for you under the hood.

Combine data

The next step in the pipeline is to combine data. With the Astro Python SDK transform function, you can execute SQL to combine your data from multiple tables. The results are automatically stored in a Snowflake table.

@aql.transform
def combine_tables(homes1: Table, homes2: Table):
return """
SELECT *
FROM {{homes1}}
UNION
SELECT *
FROM {{homes2}}
"""

Without the SDK, you need to write explicit code to complete this step.

@task(task_id='extract_data')
def extract_data():
# Join data from two tables and save to dataframe to process
query = ''''
SELECT *
FROM HOMES1
UNION
SELECT *
FROM HOMES2
'''
# Make connection to Snowflake and execute query
hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID)
conn = hook.get_conn()
cur = conn.cursor()
cur.execute(query)
results = cur.fetchall()
column_names = list(map(lambda t: t[0], cur.description))
df = pd.DataFrame(results)
df.columns = column_names
return df.to_json()

Since there is no way to pass results from the SnowflakeOperator query to the next task, you have to write a query in a _DecoratedPythonOperator function using the SnowflakeHook and explicitly do the conversion from SQL to a dataframe yourself.

Transform data

The third step in the pipeline is transforming the data. The transformations required for this pipeline are easier to implement in Python than in SQL. With the Astro Python SDK, you can run a transformation in Python with the dataframe function, meaning that you don't need to explicitly convert the results of your previous task to a Pandas DataFrame. You can then write output of your transformation to your aggregated reporting table in Snowflake using the target_table parameter, so you don't have to worry about storing the data in XCom.

@aql.dataframe
def transform_data(df: pd.DataFrame):
df.columns = df.columns.str.lower()
melted_df = df.melt(
id_vars=["sell", "list"], value_vars=["living", "rooms", "beds", "baths", "age"]
)
return melted_df

transformed_data = transform_data(
df=extracted_data, output_table=Table(name="homes_data_long")
)
# Append transformed data to reporting table
# Dependency is inferred by passing the previous `transformed_data` task to `source_table` param
record_results = aql.append(
source_table=transformed_data,
target_table=Table(name="homes_reporting", conn_id=SNOWFLAKE_CONN_ID),
columns=["sell", "list", "variable", "value"],
)

Implementing this transformation without the SDK is more challenging.

@task(task_id='transform_data')
def transform_data(xcom: str) -> str:
# Transform data by melting
df = pd.read_json(xcom)
melted_df = df.melt(
id_vars=["sell", "list"], value_vars=["living", "rooms", "beds", "baths", "age"]
)
melted_str = melted_df.to_string()
# Save results to Amazon S3 so they can be loaded back to Snowflake
s3_hook = S3Hook(aws_conn_id="s3_conn")
s3_hook.load_string(melted_str, 'transformed_file_name.csv', bucket_name=S3_BUCKET, replace=True)

load_transformed_data = S3ToSnowflakeOperator(
task_id='load_transformed_data',
snowflake_conn_id=SNOWFLAKE_CONN_ID,
s3_keys=[S3_FILE_PATH + '/transformed_file_name.csv'],
table=SNOWFLAKE_RESULTS_TABLE,
schema=SNOWFLAKE_SCHEMA,
stage=SNOWFLAKE_STAGE,
file_format="(type = 'CSV',field_delimiter = ',')",
)

Transitioning between Python to complete the transformation and SQL to load the results back to Snowflake requires extra boilerplate code to explicitly make the conversions. You also have to use a S3 as intermediary storage for the results and implement another S3ToSnowflakeOperator to load them, because there is no traditional operator to load data from a Pandas dataframe directly to Snowflake.

Overall, your DAG with the Astro Python SDK is shorter, simpler to implement, and easier to read. This allows you to implement even more complicated use cases easily while focusing on the movement of your data.

Learn more

To learn more about the Astro Python SDK, see: