Orchestrate Great Expectations with Airflow
You can now find the Great Expectations Provider on the Astronomer Registry, the discovery and distribution hub for Apache Airflow integrations created to aggregate and curate the best bits of the ecosystem.
Great Expectations is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in Python, then run validations using those “expectations” against datasets with Checkpoints. Astronomer, with help from Superconductive, maintains an Airflow provider that gives users a convenient method for running validations directly from their DAGs.
This guide will walk you through how to use the GreatExpectationsOperator
in an Airflow DAG and the Astronomer environment.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of Great Expectations. See Getting started with Great Expectations.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
Great Expectations concepts
Typically, using Great Expectations is a two-step process:
- Expectation Suite creation
- Validation
First, a user creates test suites, or “Expectation Suites”, using Great Expectations methods. These suites are usually stored in JSON and can be checked into version control, just like regular tests. The suites are then loaded by the Great Expectations framework at test runtime, for example, when processing a new batch of data in a pipeline.
For a step-by-step guide on how to configure a simple Great Expectations project, see the “Getting started” tutorial.
Setup
This guide assumes that you have downloaded the code from the demo repository, which contains a sample Great Expectations project.
If you want to use your own Great Expectations project along with this guide, ensure you have completed the following steps:
- Initialized a Great Expectations project
- Created at least one Expectation Suite,
my_suite
If you set up a project manually, you will see a great_expectations
directory which contains several sub-directories, as well as the great_expectations.yml
Data Context file. If you cloned the demo repository, the great_expectations
directory can be found under include/
.
The GreatExpectationsOperator
requires Airflow 2.1 or later, and you will need to change the value of enable_xcom_pickling
to true
in your airflow.cfg
file. If you are using an Astronomer project structure, add ENV AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True
to your Dockerfile. If you are working from the demo repository, this step has already been completed for you.
Use Case: Great Expectations operator
Now that your system is set up to work with Great Expectations, you can start exploring how to use it in your DAGs. The current Great Expectations provider version uses the Great Expectations V3 API.
Configuration
To validate your data, the GreatExpectationsOperator
runs a Checkpoint against your dataset. Before you start writing your DAG, make sure you have at least a Data Context configured. To do this, either verify the path to your great_expectations.yml
file or define Python dictionaries containing the necessary Data Context fields and import those dictionaries into your DAG.
A Data Context represents a Great Expectations project. It organizes storage and access for Expectation Suites, data sources, notification settings, and data fixtures.
Our demo repository uses the following configuration:
The
great_expectations
directory is accessible by your DAG, as it is loaded into the Docker container as part of theinclude
directory.The Great Expectations provider is installed when you run
astro dev start
, as it is part ofrequirements.txt
. For Astronomer projects, a provider version >=0.1.1
is required. If you are not using an Astronomer environment, install the Great Expectations provider in your environment manually:pip install great_expectations airflow-provider-great-expectations==0.2.0
It’s recommended that you specify a version when installing the package.
Using the Great Expectations operator
Import the operator into your DAG file. You might also need to import the
DataContextConfig
orCheckpointConfig
classes depending on how you're using the operator. To import the Great Expectations provider, and configurations in a given DAG, add the following line to the top of the DAG file in yourdags
directory:from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations.data_context.types.base import (
DataContextConfig,
CheckpointConfig
)Create a task using the
GreatExpectationsOperator
. To use the operator in the DAG, define an instance of theGreatExpectationsOperator
class and assign it to a variable. In the following example, we define two different instances of the operator to showcase two different ways to pass the data context in to the operator:ge_data_context_root_dir_pass = GreatExpectationsOperator(
task_id="ge_data_context_root_dir_pass",
data_context_root_dir=ge_root_dir,
expectation_suite_name="taxi.demo",
execution_engine="PandasExecutionEngine",
data_asset_name="taxi_df_pass",
dataframe_to_validate=pd.read_csv(
filepath_or_buffer=data_file,
header=0,
),
)
ge_data_context_config_pass = GreatExpectationsOperator(
task_id="ge_data_context_config_with_checkpoint_config_pass",
data_context_config=example_data_context_config,
expectation_suite_name="taxi.demo",
execution_engine="PandasExecutionEngine",
data_asset_name="taxi_df_pass",
dataframe_to_validate=pd.read_csv(
filepath_or_buffer=data_file,
header=0,
),
)
ge_data_context_root_dir_pass >> ge_data_context_config_pass
Operator parameters
The operator has several optional parameters, but it always requires either a data_context_root_dir
or a data_context_config
. Depending on how you have your project configured, other parameters may also be necessary. The less configured your project, the more parameters you'll pass.
For example:
- If your project's data context specifies data sources, all you need to pass in is the data context and the expectation suite.
- If your data context does not specify a data source, and you do not pass in a checkpoint, then the operator will build a checkpoint for you based on the data source you pass in and run the given expectation suite.
The datasource can be a dataframe, as shown in the example code, or an Airflow connection using the conn_id
parameter. Depending on how you define your data source the data_asset_name
parameter has to be adjusted:
- If a dataframe is passed, the
data_asset_name
parameter can be any name that will help you identify the dataframe. - If a
conn_id
is supplied, thedata_asset_name
must be the name of the table the expectations suite runs on.
The data_context_root_dir
should point to the great_expectations
project directory generated when you created the project with the CLI. If using an in-memory data_context_config
, a DataContextConfig
must be defined, as in this example.
While not a required parameter, if your project already contains Checkpoints, they too can be passed to the operator in two ways:
- A
checkpoint_name
may be passed with thecheckpoint_name
paramter and references a checkpoint in the projectCheckpointStore
defined in theDataContext
(which is often in thegreat_expectations/checkpoints/
path), so thatcheckpoint_name = "taxi.pass.chk"
would reference the filegreat_expectations/checkpoints/taxi/pass/chk.yml
. - A
checkpoint_config
may be passed to the operator in place of a name, and can be defined like this example.
Additionally, checkpoint_kwargs
may be passed to the operator to specify additional, overwriting configurations.
By default, a Great Expectations task runs validations and raises an AirflowException
if any of the tests fail. To override this behavior and continue running the pipeline even if tests fail, set the fail_task_on_validation_failure
flag to False
.
By default in Astronomer or any environment with OpenLineage configured, the GreatExpectationsOperator
will automatically add the OpenLineage action to its default action list when a checkpoint is not specified to the operator. To turn off this feature, set the use_open_lineage
parameter to False
.
For a full list of parameters, see the GreatExpectationsOperator
documentation.
For more information about possible parameters and examples, see the README in the provider repository and the example DAG in the provider package.
Connections and backends
The GreatExpectationsOperator
can run a checkpoint on a dataset stored in any backend compatible with Great Expectations. This includes BigQuery, MSSQL, MySQL, PostgreSQL, Redshift, Snowflake, SQLite, and Athena, among others. All that’s needed to get the GreatExpectationsOperator
to point at an external dataset is to set up an Airflow Connection to the data source and set the conn_id
parameter. Connections will still work if you have your connection configured in both Airflow and Great Expectations, as long as the correct datasource is specified in the Checkpoint passed to the operator.
Next steps
In this guide, you learned about the purpose of Great Expectations and how to use the provider operator to create Great Expectations Airflow tasks. For more examples on how to use the GreatExpectationsOperator
as part of an ELT pipeline, see the Great Expectations Snowflake Example, Great Expectations BigQuery Example, and Great Expectations Redshift Example examples on the Astronomer Registry.