Skip to main content

Set up a custom XCom backend using cloud-based or local object storage

Airflow XComs allow you to pass data between tasks. By default, Airflow uses the metadata database to store XComs, which works well for local development but has limited performance. For production environments, you can configure a custom XCom backend.

This allows you to push and pull XComs to and from an external system such as AWS S3, GCP Cloud Storage, Azure Blob Storage, or a MinIO instance.

Common reasons to use a custom XCom backend include:

  • Needing more storage space for XCom than the metadata database can offer.
  • Running a production environment where you require custom retention, deletion, and backup policies for XComs. With a custom XCom backend, you don't need to worry about periodically cleaning up the metadata database.
  • Utilizing custom serialization and deserialization methods. By default, Airflow uses JSON serialization, which puts limits on the type of data that you can pass through XComs. Pickling is also available, but it has known security implications. A custom XCom backend allows you to implement your own serialization and deserialization methods.
  • Accessing XCom without accessing the metadata database.

After you complete this tutorial, you'll be able to:

  • Create a custom XCom backend using cloud-based or local object storage.
  • Use JSON serialization and deserialization in a custom XCom backend.
  • Add custom logic to the serialization and deserialization methods to store Pandas dataframes as CSVs in your custom XCom backend.
  • Explain best practices of using custom XCom backends.
  • Explain the possibility of customizing other BaseXCom methods for extended functionality.
caution

While a custom XCom backend allows you to store virtually unlimited amounts of data as XCom, you will also need to scale other Airflow components to pass large amounts of data between tasks. For help running Airflow at scale, reach out to Astronomer.

Time to complete

This tutorial takes approximately 1.5 hours to complete.

Assumed knowledge

To get the most out of this tutorial, make sure you have an understanding of:

Prerequisites

  • The Astro CLI.
  • A local or cloud-based object storage account. This tutorial has instructions for AWS, GCP, Azure, and MinIO.

Step 1: Create an Astro project

  1. Set up Airflow by creating a new Astro project:

    $ mkdir airflow-custom-xcom-example && cd airflow-custom-xcom-example
    $ astro dev init
  2. Ensure that the provider of your cloud-based object storage account is installed in your Airflow instance. If you are using the Astro CLI, the Amazon, Google, and Azure provider packages come pre-installed in your Astro runtime image. If you are working with a local MinIO instance, add the minio Python package to your requirements.txt file.

  3. Start your Airflow project by running:

    $ astro dev start

Step 2: Set up your object storage account

  1. Log into your AWS account and create a new S3 bucket called s3-xcom-backend-example. Ensure that public access to the bucket is blocked.

  2. Create a new IAM policy for Airflow to access your bucket. You can use the JSON configuration below or use the AWS GUI to replicate what you see in the screenshot.

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Sid": "VisualEditor0",
    "Effect": "Allow",
    "Action": [
    "s3:ReplicateObject",
    "s3:PutObject",
    "s3:GetObject",
    "s3:RestoreObject",
    "s3:ListBucket",
    "s3:DeleteObject"
    ],
    "Resource": [
    "arn:aws:s3:::s3-xcom-backend-example/*",
    "arn:aws:s3:::s3-xcom-backend-example"
    ]
    }
    ]
    }

    AWS IAM policy for the XCom backend

  3. Save your policy under the name AirflowXComBackendAWSS3.

  4. Create an IAM user called airflow-xcom with the AWS credential type Access key - Programmatic access. Attach the AirflowXComBackendAWSS3 policy to this user as shown in the screenshot below. Make sure to save the Access Key ID and the Secret Access Key.

    AWS IAM user for the XCom backend

Step 3: Create a connection

To give Airflow access to your S3 bucket you need to define an Airflow connection.

  1. In the Airflow UI go to Admin -> Connections and create a new a connection (+) with the Connection Type Amazon Web Service. Provide the AWS Access Key ID and AWS Secret Access Key from the IAM user you created in Step 2. The following screenshot shows how you would configure a connection with the ID s3_xcom_backend_conn.

    Airflow Connection to S3

  2. Test and save your connection.

Step 4: Define a custom XCom class using JSON serialization

For Airflow to use your custom XCom backend, you need to define an XCom backend class which inherits from the BaseXCom class.

  1. In your Astro project, create a new file in the include directory called xcom_backend_json.py.

  2. Copy paste the following code into the file:

from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import json
import uuid
import os

class CustomXComBackendJSON(BaseXCom):
# the prefix is optional and used to make it easier to recognize
# which reference strings in the Airflow metadata database
# refer to an XCom that has been stored in an S3 bucket
PREFIX = "xcom_s3://"
BUCKET_NAME = "s3-xcom-backend-example"

@staticmethod
def serialize_value(
value,
key=None,
task_id=None,
dag_id=None,
run_id=None,
map_index= None,
**kwargs
):

# the connection to AWS is created by using the S3 hook with
# the conn id configured in Step 3
hook = S3Hook(aws_conn_id="s3_xcom_backend_conn")
# make sure the file_id is unique, either by using combinations of
# the task_id, run_id and map_index parameters or by using a uuid
filename = "data_" + str(uuid.uuid4()) + ".json"
# define the full S3 key where the file should be stored
s3_key = f"{run_id}/{task_id}/{filename}"

# write the value to a local temporary JSON file
with open(filename, 'a+') as f:
json.dump(value, f)

# load the local JSON file into the S3 bucket
hook.load_file(
filename=filename,
key=s3_key,
bucket_name=CustomXComBackendJSON.BUCKET_NAME,
replace=True
)

# remove the local temporary JSON file
os.remove(filename)

# define the string that will be saved to the Airflow metadata
# database to refer to this XCom
reference_string = CustomXComBackendJSON.PREFIX + s3_key

# use JSON serialization to write the reference string to the
# Airflow metadata database (like a regular XCom)
return BaseXCom.serialize_value(value=reference_string)

@staticmethod
def deserialize_value(result):
# retrieve the relevant reference string from the metadata database
reference_string = BaseXCom.deserialize_value(result=result)

# create the S3 connection using the S3Hook and recreate the S3 key
hook = S3Hook(aws_conn_id="s3_xcom_backend_conn")
key = reference_string.replace(CustomXComBackendJSON.PREFIX, "")

# download the JSON file found at the location described by the
# reference string to a temporary local folder
filename = hook.download_file(
key=key,
bucket_name=CustomXComBackendJSON.BUCKET_NAME,
local_path="/tmp"
)

# load the content of the local JSON file and return it to be used by
# the operator
with open(filename, 'r') as f:
output = json.load(f)

# remove the local temporary JSON file
os.remove(filename)

return output
  1. Review the copied code. It defines a class called CustomXComBackendJSON. The class has two methods: .serialize_value() defines how to handle the value that is pushed to XCom from an Airflow task, and .deserialize_value() defines the logic to retrieve information from the XCom backend.

    The .serialize_value() method:

    • Creates the connection to the external tool, either by using the Hook from the tools' provider package (S3Hook, GCSHook, WasbHook) or by providing credentials directly (MinIO).
    • Creates a unique filename using the uuid package.
    • Uses the run_id and task_id from the Airflow context to define the key under which the file will be saved in the object storage.
    • Writes the value that is being pushed to XCom to the object storage using JSON serialization.
    • Creates a unique reference_string that is written to the Airflow metadata database as a regular XCom.

    The .deserialize_value() method:

    • Retrieves the reference_string for a given entry (result) from the Airflow metadata database using regular XCom.
    • Downloads the JSON file at the key contained in the reference_string.
    • Retrieves the information from the JSON file.
  2. Open the .env file of your Astro Project and add the following line to set your XCom backend to the custom class:

    AIRFLOW__CORE__XCOM_BACKEND=include.xcom_backend_json.CustomXComBackendJSON

    If you use Astro, set this environment variable in your Deployment instead. See Environment variables.

  3. Restart your Airflow instance using astro dev restart.

Step 5: Create and run your DAG to generate XComs

To test your custom XCom backend you will run a simple DAG which pushes a random number to your custom XCom backend and then retrieves it again.

  1. Create a new file in your dags folder named simple_xcom_dag.py.

  2. Copy and paste the code below into the file.

    from airflow.decorators import dag, task
    from pendulum import datetime
    import random

    @dag(
    start_date=datetime(2022, 12, 20),
    schedule="@daily",
    catchup=False
    )
    def simple_xcom_dag():

    @task
    def pick_a_random_number():
    return random.randint(1, 10) # push to XCom

    @task
    def print_a_number(num): # retrieve from XCom
    print(num)

    print_a_number(pick_a_random_number())

    simple_xcom_dag()
  3. Run the DAG.

  4. View the logs of both tasks. The logs will include information about the custom XCom backend. The print_a_number task includes the full path to the file stored in the custom backend.

    Logs mentioning custom XCom backend

  1. View the XCom in your local object storage.

XCom in the S3 bucket

Step 6: Create a custom serialization method to handle Pandas dataframes

A powerful feature of custom XCom backends is the possibility to create custom serialization and deserialization methods. This is particularly useful for handling objects that cannot be JSON-serialized. In this step, you will create a new custom XCom backend that can save the contents of a Pandas dataframe as a CSV file.

  1. Create a second file in your include folder called xcom_backend_pandas.py.

  2. Copy and paste the following code into the file.

from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import pandas as pd
import json
import uuid
import os

class CustomXComBackendPandas(BaseXCom):
PREFIX = "xcom_s3://"
BUCKET_NAME = "s3-xcom-backend-example"

@staticmethod
def serialize_value(
value,
key=None,
task_id=None,
dag_id=None,
run_id=None,
map_index= None,
**kwargs
):

hook = S3Hook(aws_conn_id="s3_xcom_backend_conn")

# added serialization method if the value passed is a Pandas dataframe
# the contents are written to a local temporary csv file
if isinstance(value, pd.DataFrame):
filename = "data_" + str(uuid.uuid4()) + ".csv"
s3_key = f"{run_id}/{task_id}/{filename}"

value.to_csv(filename)

# if the value passed is not a Pandas dataframe, attempt to use
# JSON serialization
else:
filename = "data_" + str(uuid.uuid4()) + ".json"
s3_key = f"{run_id}/{task_id}/{filename}"

with open(filename, 'a+') as f:
json.dump(value, f)

hook.load_file(
filename=filename,
key=s3_key,
bucket_name=CustomXComBackendPandas.BUCKET_NAME,
replace=True
)

# remove the local temporary file
os.remove(filename)

reference_string = CustomXComBackendPandas.PREFIX + s3_key

return BaseXCom.serialize_value(value=reference_string)

@staticmethod
def deserialize_value(result):
result = BaseXCom.deserialize_value(result)

hook = S3Hook(aws_conn_id="s3_xcom_backend_conn")
key = result.replace(CustomXComBackendPandas.PREFIX, "")

filename = hook.download_file(
key=key,
bucket_name=CustomXComBackendPandas.BUCKET_NAME,
local_path="/tmp"
)

# added deserialization option to convert a CSV back to a dataframe
if key.split(".")[-1] == "csv":
output = pd.read_csv(filename)
# if the key does not end in 'csv' use JSON deserialization
else:
with open(filename, 'r') as f:
output = json.load(f)

# remove the local temporary file
os.remove(filename)

return output
  1. Review the copied code. It creates a custom XCom backend called CustomXComBackendPandas with added logic to convert a Pandas dataframe to a CSV file, which gets written to object storage and converted back to a Pandas dataframe upon retrieval. If the value passed is not a Pandas dataframe, the .serialize() and .deserialize methods will use JSON serialization like the custom XCom backend in Step 4.

  2. In the .env file of your Astro project, replace the XCom backend variable to use the newly created CustomXComBackendPandas.

    AIRFLOW__CORE__XCOM_BACKEND=include.xcom_backend_pandas.CustomXComBackendPandas
  3. Restart your Airflow instance by running astro dev restart.

tip

You can add custom logic inside your .serialize() and .deserialize() methods to send XComs to several custom XCom backend solutions, for example based on their object type.

Step 7: Run a DAG passing Pandas dataframes via XCom

Test the new custom XCom backend by running a DAG that passes a Pandas dataframe between tasks.

  1. Create a new file called fetch_pokemon_data_dag.py in the dags folder of your Astro project.

  2. Copy and paste the DAG below. Make sure to enter your favorite Pokémon.

    from airflow.decorators import dag, task
    from pendulum import datetime
    import pandas as pd
    import requests

    MY_FAVORITE_POKEMON = "pikachu"
    MY_OTHER_FAVORITE_POKEMON = "vulpix"

    @dag(
    start_date=datetime(2022, 12, 20),
    schedule="@daily",
    catchup=False
    )
    def fetch_pokemon_data_dag():

    @task
    def extract_data():
    """Extracts data from the Pokémon API. Returns a JSON serializeable dict."""

    r1 = requests.get(f"https://pokeapi.co/api/v2/pokemon/{MY_FAVORITE_POKEMON}")
    r2 = requests.get(f"https://pokeapi.co/api/v2/pokemon/{MY_OTHER_FAVORITE_POKEMON}")

    return {
    "pokemon": [f"{MY_FAVORITE_POKEMON}", f"{MY_OTHER_FAVORITE_POKEMON}"],
    "base_experience": [r1.json()["base_experience"], r2.json()["base_experience"]],
    "height" : [r1.json()["height"], r2.json()["height"]]
    }

    @task
    def calculate_xp_per_height(pokemon_data_dict):
    """Calculates base XP per height and returns a pandas dataframe."""

    df = pd.DataFrame(pokemon_data_dict)

    df["xp_per_height"] = df["base_experience"] / df["height"]

    return df

    @task
    def print_xp_per_height(pokemon_data_df):
    """Retrieves information from a pandas dataframe in the custom XCom
    backend. Prints out Pokémon information."""

    for i in pokemon_data_df.index:
    pokemon = pokemon_data_df.loc[i, 'pokemon']
    xph = pokemon_data_df.loc[i, 'xp_per_height']
    print(f"{pokemon} has a base xp to height ratio of {xph}")

    print_xp_per_height(calculate_xp_per_height(extract_data()))

    fetch_pokemon_data_dag()

    The extract_data task will push a dictionary to XCom, which will be saved to your blob storage as a JSON file and retrieved by the calculate_xp_per_height task. This second task pushes a Pandas dataframe to XCom, which is only possible when using a custom XCom backend with a serialization method for this type of object. The last task, print_xp_per_height, retrieves the CSV and recreates the Pandas dataframe before printing out the Pokémon and their base experience to height ratio.

  3. View the information about your favorite Pokémon in the task log of the print_xp_per_height task.

    Pokémon Information in logs

Overriding additional BaseXCom methods

In this tutorial, you added custom logic to the .serialize_value() and .deserialize_value() methods. If you want to further customize the functionality for your custom XCom backend, you can override additional methods of the XCom module (source code).

A common use case for this is removing stored XComs upon clearing and rerunning a task in both the Airflow metadata database and the custom XCom backend. To do so, the .clear() method needs to be overridden to include the removal of the referenced XCom in the custom XCom backend. The code below shows an example of a .clear() method that includes the deletion of an XCom stored in a custom S3 backend, using the AWS version of the CustomXComBackendJSON XCom backend from Step 4 of the tutorial.

from airflow.utils.session import NEW_SESSION, provide_session

@classmethod
@provide_session
def clear(
cls,
execution_date = None,
dag_id = None,
task_id = None,
session = NEW_SESSION,
*,
run_id = None,
map_index = None,
) -> None:

from airflow.models import DagRun
from airflow.utils.helpers import exactly_one
import warnings
from airflow.exceptions import RemovedInAirflow3Warning

if dag_id is None:
raise TypeError("clear() missing required argument: dag_id")
if task_id is None:
raise TypeError("clear() missing required argument: task_id")

if not exactly_one(execution_date is not None, run_id is not None):
raise ValueError(
f"Exactly one of run_id or execution_date must be passed. "
f"Passed execution_date={execution_date}, run_id={run_id}"
)

if execution_date is not None:
message = "Passing 'execution_date' to 'XCom.clear()' is deprecated. Use 'run_id' instead."
warnings.warn(message, RemovedInAirflow3Warning, stacklevel=3)
run_id = (
session.query(DagRun.run_id)
.filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
.scalar()
)

#### Customization start

# get the reference string from the Airflow metadata database
if map_index is not None:
reference_string = session.query(cls.value).filter_by(
dag_id=dag_id,
task_id=task_id,
run_id=run_id,
map_index=map_index
).scalar()
else:
reference_string = session.query(cls.value).filter_by(
dag_id=dag_id,
task_id=task_id,
run_id=run_id
).scalar()

if reference_string is not None:

# decode the XCom binary to UTF-8
reference_string = reference_string.decode('utf-8')

hook = S3Hook(aws_conn_id="s3_xcom_backend_conn")
key = reference_string.replace(CustomXComBackendJSON.PREFIX, '')

# use the reference string to delete the object from the S3 bucket
hook.delete_objects(
bucket=CustomXComBackendJSON.BUCKET_NAME,
keys=json.loads(key)
)

# retrieve the XCom record from the metadata database containing the reference string
query = session.query(cls).filter_by(
dag_id=dag_id,
task_id=task_id,
run_id=run_id
)
if map_index is not None:
query = query.filter_by(map_index=map_index)

# delete the XCom containing the reference string from metadata database
query.delete()

Conclusion

Congratulations! You learned how to set up a custom XCom backend and how to define your own serialization and deserialization methods.