Great Expectations (GX) is an open source Python-based data validation framework. You can test your data by expressing what you “expect” from it as simple declarative statements in JSON or YAML, then run validations using those Expectation Suites against data in a Source Data System or a pandas DataFrame. Astronomer, with help from Superconductive, maintains the Great Expectations Airflow Provider that gives users a convenient method for running validations directly from their DAGs.
Time to complete
This tutorial takes approximately 20 minutes to complete.
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of Great Expectations. See GX Quickstart.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
- Airflow connections. See Managing your Connections in Apache Airflow.
- The Astro CLI.
- Access to a SQL database. This tutorial uses a local Postgres instance.
- Optional. Local installation of the
Step 1: Configure your Astro project
To use GX with Airflow, install the Great Expectations Airflow Provider in your Astro project.
Create a new Astro project:
$ mkdir astro-gx-tutorial && cd astro-gx-tutorial
$ astro dev init
Add the GX Airflow provider to your Astro project
Step 2: Configure a GX project
The Great Expectations Airflow Provider requires a GX project to be present in your Airflow environment. The easiest way to create a GX project is by using the
great_expectations package and following the steps below. If you cannot install the GX package locally, you can copy the
great_expectations folder from this GitHub repository into your Astro project
include folder instead and continue this tutorial at Step 3.
Initialize a new GX project in your Astro project
$ great_expectations init
Create a new file in your
strawberry_suite.jsonand copy and paste the following code into the file:
This JSON file defines an Expectation Suite containing one expectation of the type
expect_table_row_count_to_be_between. This expectation will check that the number of rows in a table is between 2 and 1000.
You should now have the following file structure in your
│ └── strawberry_suite.json
Step 3: Create a database connection
The easiest way to use GX with Airflow is to let the GreatExpectationsOperator create a default Checkpoint and Datasource based on an Airflow connection. To set up a connection to a Postgres database complete the following steps:
In the Airflow UI, go to Admin -> Connections and click +.
Create a new connection named
postgres_defaultusing the following information:
- Connection Id:
- Connection Type:
<your postgres host>.
<your postgres username>.
<your postgres password>.
<your postgres port>.
- Connection Id:
Step 4: Create a DAG
Create a new file in your
Copy and paste the following DAG code into the file:
from pendulum import datetime
from airflow.decorators import dag
from airflow.providers.postgres.operators.postgres import PostgresOperator
from great_expectations_provider.operators.great_expectations import (
POSTGRES_CONN_ID = "postgres_default"
start_date=datetime(2023, 7, 1),
create_table_pg = PostgresOperator(
CREATE TABLE strawberries (
id VARCHAR(10) PRIMARY KEY,
INSERT INTO strawberries (id, name, amount)
VALUES ('001', 'Strawberry Order 1', 10),
('002', 'Strawberry Order 2', 5),
('003', 'Strawberry Order 3', 8),
('004', 'Strawberry Order 4', 3),
('005', 'Strawberry Order 5', 12);
gx_validate_pg = GreatExpectationsOperator(
drop_table_pg = PostgresOperator(
DROP TABLE strawberries;
create_table_pg >> gx_validate_pg >> drop_table_pg
This DAG will create a table in your Postgres database, run a GX validation on the table, and then drop the table.
The data in the table is validated using the GreatExpectationsOperator. The operator will automatically create a default Checkpoint and Datasource based on the
postgres_defaultconnection and run the Expectations defined in the
strawberry_suite.jsonfile on the
strawberriestable. Note that for some databases your might need to provide the schema name to the
data_asset_nameparameter in the form of
Open Airflow at
http://localhost:8080/. Run the DAG manually by clicking the play button.
By default, the
GreatExpectationsOperator pushes a CheckpointResult object to XCom. You can instead return a json-serializable dictionary by setting the
return_json_dict parameter to
If you do not want to use this built-in serialization, you can either enable XCom pickling by setting the environment variable
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True, or use a custom serialization method in a custom XCom backend.
How it works
The GreatExpectationsOperator is a versatile operator allowing you to integrate GX into your Airflow environment. This tutorial shows the simplest way of using the operator by letting it create a default Checkpoint and Datasource based on an Airflow connection.
The GreatExpectationsOperator also allows you to pass in a CheckpointConfig object using the
checkpoint_config parameter or a
checkpoint_kwargs dictionary. You can also customize the Execution Engines and pass DataContextConfig objects by configuring Operator parameters.
For more examples, check out the Get Improved Data Quality Checks in Airflow with the Updated Great Expectations Operator blog post and the Airflow data quality demo repository.
The GreatExpectationsOperator is highly customizable to allow expert GX users to use their custom objects. This section explains some of the most commonly used parameters. Please refer to the GX documentation for in depth explanations of GX concepts.
When using the GreatExpectationsOperator, you must pass in either one of the following parameters:
data_context_root_dir(str): The path to your GX project directory. This is the directory containing your
data_context_config(DataContextConfig): To use an in-memory Data Context, a
DataContextConfigmust be defined and passed to the operator, see also this example DataContextConfig.
Next, the operator will determine the Checkpoint and Datasource used to run your GX validations:
- If your Data Context does not specify a Datasource, and you do not pass in a Checkpoint, then the operator will build a Datasource and Checkpoint for you based on the
conn_idyou pass in and run the given Expectation Suite. This is the simplest way to use the operator and what is shown in this tutorial.
- If your project's Data Context specifies Datasources already, all you need to pass is the Data Context and the Expectation Suite. The operator will use the Datasources in the Data Context to create a default Checkpoint.
- If your project's CheckpointStore already contains a Checkpoint, you can specify its use by passing the name to the
checkpoint_nameparameter. The CheckpointStore is often in the
great_expectations/checkpoints/path, so that
checkpoint_name = "strawberries.pass.chk"would reference the file
- If you want to run a custom Checkpoint, you can either pass a CheckpointConfig object to
checkpoint_configor a dictionary of your checkpoint config to
checkpoint_kwargscan also be used to specify additional overwriting configurations. See the example CheckpointConfig.
The Datasource can also be a pandas DataFrame, as shown in Running GX validations on pandas DataFrames. Depending on how you define your Datasource the
data_asset_name parameter has to be adjusted:
- If a pandas DataFrame is passed, the
data_asset_nameparameter can be any name that will help you identify the DataFrame.
- If a
conn_idis supplied, the
data_asset_namemust be the name of the table the Expectations Suite runs on.
By default, a Great Expectations task runs validations and raises an
AirflowException if any of the tests fail. To override this behavior and continue running the pipeline even if tests fail, set the
fail_task_on_validation_failure flag to
In Astronomer or any environment with OpenLineage configured, the
GreatExpectationsOperator will automatically add the OpenLineage action to its default action list when a Checkpoint is not specified to the operator. To turn off this feature, set the
use_open_lineage parameter to
Running GX validations on pandas DataFrames
The GreatExpectationsOperator can also be used to run validations on CSV files by passing them in as a pandas DataFrame. This pattern is useful to test pipelines locally with small amounts of data. Note that the
execution_engine parameter needs to be adjusted.
gx_validate_pg = GreatExpectationsOperator(
Connections and backends
The GreatExpectationsOperator can run a Checkpoint on a dataset stored in any backend compatible with GX. This includes BigQuery, MSSQL, MySQL, PostgreSQL, Redshift, Snowflake, SQLite, and Athena, among others. All that’s needed to get the GreatExpectationsOperator to point at an external dataset is to set up an Airflow Connection to the Datasource and setting the
conn_id parameter. Connections will still work if you have your connection configured in both Airflow and Great Expectations, as long as the correct Datasource is specified in the Checkpoint passed to the operator.