Integrate data lineage from external systems to Astro
Data lineage is the concept of tracking data from its origin to wherever it is consumed downstream as it flows through a data pipeline. This includes connections between datasets and tables in a database as well as rich metadata about the tasks that create and transform data. You can observe data lineage to:
- Trace the history of a dataset.
- Troubleshoot run failures.
- Manage personally identifiable information (PII).
- Ensure compliance with data regulations.
This guide provides information about how lineage data is automatically extracted from Apache Airflow tasks on Astro and how to integrate external systems, including Databricks and dbt, that require additional configuration. To learn about how to view data lineage on Astro, see View data lineage.
Data lineage on Astro
To view lineage data, it first needs to be extracted from an external application and then stored in a lineage backend. Astro uses the OpenLineage Airflow library (openlineage-airflow
) to extract lineage from Airflow tasks and stores that data in the Astro control plane. The latest version of the OpenLineage Airflow library is installed on Astro Runtime by default.
There are two ways to emit lineage data to Astro:
- Run a task on Astro with a supported Airflow operator, such as the SnowflakeOperator. These operators include extractors that automatically emit lineage data and don’t require additional configuration. See Supported Airflow operators.
- Integrate OpenLineage with an external service, such as dbt or Apache Spark, to emit data lineage outside of an Airflow DAG or task using an OpenLineage API key.
The data lineage graph in the Cloud UI shows lineage data that is emitted with both methods, including jobs that are not run on the Astro data plane. This graph can provide context to your data before, during, and after it reaches your Deployment.
Extract lineage data from external systems to Astro
When you integrate an external data lineage system with Astro or you are working with Astro locally and are not using a supported Airflow operator, you need to provide a Deployment namespace, your Organization's OpenLineage URL, and your organization's OpenLineage API key. This information is used to send OpenLineage data to the correct place in Astro.
To locate the your Deployment namespace in the Cloud UI, select a Workspace and then copy the value with the format <text>-<text>-<four-digit-number>
next to the Deployment name. To locate your Organization's OpenLineage URL and OpenLineage API key, go to https://cloud.<your-astro-base-domain>.io/settings
and copy the values in the Lineage API Key and OpenLineage URL fields.
Snowflake and OpenLineage with Airflow
Lineage data emitted from Snowflake is similar to what is collected from other SQL databases, including Amazon Redshift and Google BigQuery. However, Snowflake is unique in that it emits query tags that provide additional task execution details.
When you run a task in Airflow that interacts with Snowflake, the query tag allows each task to be directly matched with the Snowflake query or queries that are run by that task. If the task fails, for example, you can look up the Snowflake query that was executed by that task and reduce the time required to troubleshoot the task failure.
To emit lineage data from Snowflake:
- Add a Snowflake connection to Airflow. See Snowflake connection.
- Run an Airflow DAG or task with the
SnowflakeOperator
orSnowflakeOperatorAsync
. This operator is officially supported by OpenLineage and does not require additional configuration. If you don't run Airflow on Astro, see Extract lineage data from external systems to Astro.
Data collected
When you run an Airflow task with the SnowflakeOperator
, the following data is collected:
- Task duration
- SQL queries. For a list of supported queries, see the OpenLineage
tests
repository. - Query duration. This is different from the Airflow task duration
- Input datasets
- Output datasets
- Quality metrics based on dataset and column-level checks, including successes and failures per run
To view this data in the Cloud UI, click Lineage, select a SnowflakeOperator task, and then click the dataset. See View data lineage.
Airflow tasks run with the SnowflakeOperator
emit SQL source code that you can view in the Cloud UI. See View SQL source code.
OpenLineage and Databricks with Airflow
Use the information provided here to set up lineage collection for Spark running on a Databricks cluster.
Prerequisites
Setup
In your Databricks File System (DBFS), create a new directory at
dbfs:/databricks/openlineage/
.Download the latest OpenLineage
jar
file to the new directory. See Maven Central Repository.Download the
open-lineage-init-script.sh
file to the new directory. See OpenLineage GitHub.In Databricks, run this command to create a cluster-scoped init script and install the
openlineage-spark
library at cluster initialization:dbfs:/databricks/openlineage/open-lineage-init-script.sh
In the cluster configuration page for your Databricks cluster, specify the following Spark configuration:
bash
spark.driver.extraJavaOptions -Djava.security.properties=
spark.executor.extraJavaOptions -Djava.security.properties=
spark.openlineage.url https://<your-astro-base-domain>
spark.openlineage.apiKey <your-lineage-api-key>
spark.openlineage.namespace <NAMESPACE_NAME> // Astronomer recommends using a meaningful namespace like `spark-dev`or `spark-prod`.
Note: You override the JVM security properties for the spark driver and executor with an empty string as some TLS algorithms are disabled by default. For a more information, see this discussion.
After you save this configuration, lineage is enabled for all Spark jobs running on your cluster.
Verify Setup
To test that lineage was configured correctly on your Databricks cluster, run a test Spark job on Databricks. After your job runs, click Lineage in the Cloud UI and then click Runs in the left menu. If your configuration is successful, your Spark job appears in the table of most recent runs. Click a job run to see it within a lineage graph.
OpenLineage and dbt Core with Airflow
Use the information provided here to set up lineage collection for dbt Core tasks. To learn how to create and productionize dbt tasks in Airflow, and how to automatically create dbt Core tasks based on a manifest, see Orchestrate dbt with Airflow.
If your organization wants to orchestrate dbt Cloud jobs with Airflow, contact Astronomer support.
Prerequisites
- A dbt project.
- The dbt CLI v0.20+.
Setup
Add the following line to the
requirements.txt
file of your Astro project:openlineage-dbt
Run the following command to generate the
catalog.json
file for your dbt project:$ dbt docs generate
In your dbt project, run the OpenLineage wrapper script using the
dbt run
command:$ dbt-ol run
Optional. Run the following command to test your set up:
$ dbt-ol test
Verify setup
To confirm that your setup is successful, run a dbt model in your project. After you run this model, click Lineage in the Cloud UI and then click Runs in the left menu. If the setup is successful, the run that you triggered appears in the table of most recent runs.
OpenLineage and Great Expectations with Airflow
Use the information provided here to set up lineage collection for a running Great Expectations suite.
This guide outlines how to set up lineage collection for a Great Expectations project.
Prerequisites
- A Great Expectations Data Context
- If using a Checkpoint or Checkpoint config, your Astro base domain and OpenLineage API key.
Setup
Make your Data Context accessible to your DAGs. For most use cases, Astronomer recommends adding the Data Context to your Astro project
include
folder. The GreatExpectationsOperator will accessinclude/great_expectations/great_expectations.yml
and use the configuration to run your Expectations. Then, add the following lines to your DAGs:# Required imports for Great Expectations
import os
from pathlib import Path
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
# Set base path for Data Context
base_path = Path(__file__).parents[2]
...
# Example task using GreatExpectationsOperator
ge_task = GreatExpectationsOperator(
task_id="ge_task",
# Set directory for the Data Context
ge_root_dir=os.path.join(base_path, "include", "great_expectations"),
...
)If you use the
GreatExpectationsOperator
version 0.2.0 or later and don't provide a Checkpoint file or Checkpoint Config, you can skip steps 2 and 3.In each of your Checkpoint files, add
OpenLineageValidationAction
to youraction_list
like in the following example:name: my.simple.chk
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template:
expectation_suite_name:
batch_request: {}
action_list:
- name: open_lineage
action:
class_name: OpenLineageValidationAction,
module_name: openlineage.common.provider.great_expectations,
openlineage_host: https://astro-<your-astro-base-domain>.datakin.com,
openlineage_apiKey: <your-openlineage-api-key>,
openlineage_namespace: <namespace-name> # Replace with your job namespace; Astronomer recommends using a meaningful namespace such as `dev` or `prod`,
job_name: validate_task_name,Deploy your changes to Astro. See Deploy code.
Verify
To confirm that your setup is successful, click Lineage in the Cloud UI and then click Issues in the left menu. Recent data quality assertion issues appear in the All Issues table.
If your code hasn't produced any data quality assertion issues, use the search bar to search for a dataset and view its node on the lineage graph for a recent job run. Click Quality to view metrics and assertion pass or fail counts.
OpenLineage and Spark
Use the information provided here to set up lineage collection for Spark.
Prerequisites
- A Spark application.
- A Spark job.
- Your Astro base domain.
- Your Organization's OpenLineage API key.
Setup
In your Spark application, set the following properties to configure your lineage endpoint, install the openlineage-spark
library, and configure an OpenLineageSparkListener:
SparkSession.builder \
.config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.2.+')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.host', 'https://astro-<your-astro-base-domain>.datakin.com')
.config('spark.openlineage.apiKey', '<your-openlineage-api-key>')
.config('spark.openlineage.namespace', '<namespace-name>') # Replace with the name of your Spark cluster.
.getOrCreate() # Astronomer recommends using a meaningful namespace such as `spark-dev` or `spark-prod`.
Verify
To confirm that your setup is successful, run a Spark job after you save your configuration. After you run this model, click Lineage in the Cloud UI and then click Runs in the left menu. Your recent Spark job run appears in the table of most recent runs.
View SQL source code
The SQL source code view for supported Airflow operators in the Cloud UI Lineage page is off by default for all Workspace users. To enable the source code view, set the following environment variable for each Astro Deployment:
- Key:
OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
- Value:
False
Astronomer recommends enabling this feature only for Deployments with non-sensitive code. For more information about Workspace permissions, see Workspace roles.