Send lineage metadata to Astro
Data lineage is the concept of tracking data from its origin to wherever it is consumed downstream as it flows through a data pipeline. This includes connections between datasets and tables in a database as well as rich metadata about the tasks that create and transform data. You can observe data lineage to:
- Trace the history of a dataset.
- Troubleshoot run failures.
- Manage personally identifiable information (PII).
- Ensure compliance with data regulations.
This guide provides information about how lineage metadata is automatically extracted from Apache Airflow tasks on Astro and how to integrate external systems, including Databricks and dbt, that require additional configuration. To learn about how to view data lineage on Astro, see View data lineage.
Extract lineage metadata from Airflow operators using supported extractors
Astro uses the OpenLineage Airflow library (
openlineage-airflow) to extract lineage from Airflow tasks and stores that data in the Astro control plane. This package includes default extractors for popular Airflow operators.
The latest version of the OpenLineage Airflow library is installed on Astro Runtime by default, meaning that you can use all default extractors without additional configuration. If you use an Airflow operator that includes a default extractor in your DAG, the operator automatically generates lineage metadata to the Lineage page on Astro.
Each operator generates different lineage metadata based on its default extractor. For more information about operators with default extractors and what lineage metadata they generate, see OpenLineage documentation.
Extract lineage metadata from Airflow operators using custom extractors
If you want to extract lineage metadata from an Airflow operator that doesn't have a default extractor, you can write a custom extractor and add it to your Astro project.
To write a custom extractor, see OpenLineage documentation. To add a custom extractor to an Astro Deployment:
Add your custom extractor files to the
includefolder of your local Astro project.
Deploy your project. See Deploy code.
Set the following environment variable in your Astro Deployment:
Specify the path to your extractor class as relative to the base of your Astro project directory (for example,
include/myExtractorClass). If you are importing only one custom extractor, do not include a semicolon after the file path.
Extract lineage metadata from Airflow operators using custom inlets and outlets
An alternative to writing a custom extractor is to specify dataset inlets and outlets directly in your task parameters. These inlets and outlets appear as dependency lines on the lineage graph for your DAG. This option is suitable if your priority is rendering an accurate lineage graph of your DAG, and you don't need to generate specific facets from your operators.
To specify inlets and outlets, see the OpenLineage documentation and Apache Airflow documentation. Note that OpenLineage only supports specifying inlets and outlets using
Extract lineage metadata from external systems to Astro
To send lineage metadata from an external system to Astro, you need to configure the external system's OpenLineage integration with a Deployment namespace, your Organization's OpenLineage URL, and your organization's OpenLineage API key. This information is used to send OpenLineage data to your Astro lineage backend.
To locate your Deployment namespace in the Cloud UI, open the Deployment and copy the value in Namespace. To locate your Organization's OpenLineage URL and OpenLineage API key, go to
https://cloud.<your-astro-base-domain>.io/settings and copy the values in the Lineage API Key and OpenLineage URL fields.
Use the following topics to configure these values in supported external systems and send lineage metadata from those systems to Astro.
Snowflake and OpenLineage with Airflow
Lineage data emitted from Snowflake is similar to what is collected from other SQL databases, including Amazon Redshift and Google BigQuery. However, Snowflake is unique in that it emits query tags that provide additional task execution details.
When you run a task in Airflow that interacts with Snowflake, the query tag allows each task to be directly matched with the Snowflake query or queries that are run by that task. If the task fails, for example, you can look up the Snowflake query that was executed by that task and reduce the time required to troubleshoot the task failure.
To emit lineage metadata from Snowflake:
- Add a Snowflake connection to Airflow. See Snowflake connection.
- Run an Airflow DAG or task with the
SnowflakeOperatorAsync. This operator is officially supported by OpenLineage and does not require additional configuration. If you don't run Airflow on Astro, see Extract lineage metadata from external systems to Astro.
When you run an Airflow task with the
SnowflakeOperator, the following data is collected:
- Task duration
- SQL queries. For a list of supported queries, see the OpenLineage
- Query duration. This is different from the Airflow task duration
- Input datasets
- Output datasets
- Quality metrics based on dataset and column-level checks, including successes and failures per run
To view this data in the Cloud UI, click Lineage, select a SnowflakeOperator task, and then click the dataset. See View data lineage.
Airflow tasks run with the
SnowflakeOperator emit SQL source code that you can view in the Cloud UI. See View SQL source code.
OpenLineage and Databricks with Airflow
Use the information provided here to set up lineage collection for Spark running on a Databricks cluster.
In your Databricks File System (DBFS), create a new directory at
Download the latest OpenLineage
jarfile to the new directory. See Maven Central Repository.
open-lineage-init-script.shfile to the new directory. See OpenLineage GitHub.
In Databricks, run this command to create a cluster-scoped init script and install the
openlineage-sparklibrary at cluster initialization:
In the cluster configuration page for your Databricks cluster, specify the following Spark configuration:
spark.openlineage.namespace <NAMESPACE_NAME> // Astronomer recommends using a meaningful namespace like `spark-dev`or `spark-prod`.
Note: You override the JVM security properties for the spark driver and executor with an empty string as some TLS algorithms are disabled by default. For more information, see this discussion.
After you save this configuration, lineage is enabled for all Spark jobs running on your cluster.
To test that lineage was configured correctly on your Databricks cluster, run a test Spark job on Databricks. After your job runs, click Lineage in the Cloud UI and then click Runs in the left menu. If your configuration is successful, your Spark job appears in the table of most recent runs. Click a job run to see it within a lineage graph.
OpenLineage and dbt Core with Airflow
Use the information provided here to set up lineage collection for dbt Core tasks. To learn how to create and productionize dbt tasks in Airflow, and how to automatically create dbt Core tasks based on a manifest, see Orchestrate dbt with Airflow.
If your organization wants to orchestrate dbt Cloud jobs with Airflow, contact Astronomer support.
- A dbt project.
- The dbt CLI v0.20+.
Add the following line to the
requirements.txtfile of your Astro project:
Run the following command to generate the
catalog.jsonfile for your dbt project:
$ dbt docs generate
In your dbt project, run the OpenLineage wrapper script using the
$ dbt-ol run
Optional. Run the following command to test your set up:
$ dbt-ol test
To confirm that your setup is successful, run a dbt model in your project. After you run this model, click Lineage in the Cloud UI and then click Runs in the left menu. If the setup is successful, the run that you triggered appears in the table of most recent runs.
OpenLineage and Great Expectations with Airflow
Use the information provided here to set up lineage collection for a running Great Expectations suite.
This guide outlines how to set up lineage collection for a Great Expectations project.
- A Great Expectations Data Context
- If using a Checkpoint or Checkpoint config, your Astro base domain and OpenLineage API key.
Make your Data Context accessible to your DAGs. For most use cases, Astronomer recommends adding the Data Context to your Astro project
includefolder. The GreatExpectationsOperator will access
include/great_expectations/great_expectations.ymland use the configuration to run your Expectations. Then, add the following lines to your DAGs:
# Required imports for Great Expectations
from pathlib import Path
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
# Set base path for Data Context
base_path = Path(__file__).parents
# Example task using GreatExpectationsOperator
ge_task = GreatExpectationsOperator(
# Set directory for the Data Context
ge_root_dir=os.path.join(base_path, "include", "great_expectations"),
If you use the
GreatExpectationsOperatorversion 0.2.0 or later and don't provide a Checkpoint file or Checkpoint Config, you can skip steps 2 and 3.
In each of your Checkpoint files, add
action_listlike in the following example:
- name: open_lineage
openlineage_namespace: <namespace-name> # Replace with your job namespace; Astronomer recommends using a meaningful namespace such as `dev` or `prod`,
Deploy your changes to Astro. See Deploy code.
To confirm that your setup is successful, click Lineage in the Cloud UI and then click Issues in the left menu. Recent data quality assertion issues appear in the All Issues table.
If your code hasn't produced any data quality assertion issues, use the search bar to search for a dataset and view its node on the lineage graph for a recent job run. Click Quality to view metrics and assertion pass or fail counts.
OpenLineage and Spark
Use the information provided here to set up lineage collection for Spark.
- A Spark application.
- A Spark job.
- Your Astro base domain.
- Your Organization's OpenLineage API key.
In your Spark application, set the following properties to configure your lineage endpoint, install the
openlineage-spark library, and configure an OpenLineageSparkListener:
.config('spark.openlineage.namespace', '<namespace-name>') # Replace with the name of your Spark cluster.
.getOrCreate() # Astronomer recommends using a meaningful namespace such as `spark-dev` or `spark-prod`.
To confirm that your setup is successful, run a Spark job after you save your configuration. After you run this model, click Lineage in the Cloud UI and then click Runs in the left menu. Your recent Spark job run appears in the table of most recent runs.
View SQL source code
The SQL source code view for supported Airflow operators in the Cloud UI Lineage page is off by default for all Workspace users. To enable the source code view, set the following environment variable for each Astro Deployment:
Astronomer recommends enabling this feature only for Deployments with non-sensitive code. For more information about Workspace permissions, see Workspace roles.
Generate custom facets for OpenLineage events
OpenLineage facets are JSON objects that provide additional context about a given job run. By default, a job run for an Airflow task includes facets that show the source code for the task, whether the task run was successful, and who owns the task. All default facets for a job run appear as Standard Facets in the Info tab of your data pipeline's lineage graph.
You can configure both Airflow and external systems to generate custom facets that contain more specific information about job runs. Custom facets appear as Custom Facets in the Info tab of your data pipeline's lineage graph. To create a custom facet, see OpenLineage Documentation.