Skip to main content

Integrate data lineage from external systems to Astro

Data lineage is the concept of tracking data from its origin to wherever it is consumed downstream as it flows through a data pipeline. This includes connections between datasets and tables in a database as well as rich metadata about the tasks that create and transform data. You can observe data lineage to:

  • Trace the history of a dataset.
  • Troubleshoot run failures.
  • Manage personally identifiable information (PII).
  • Ensure compliance with data regulations.

This guide provides information about how lineage data is automatically extracted from Apache Airflow tasks on Astro and how to integrate external systems, including Databricks and dbt, that require additional configuration. To learn about how to view data lineage on Astro, see View data lineage.

Data lineage on Astro

To view lineage data, it first needs to be extracted from an external application and then stored in a lineage backend. Astro uses the OpenLineage Airflow library (openlineage-airflow) to extract lineage from Airflow tasks and stores that data in the Astro control plane. The latest version of the OpenLineage Airflow library is installed on Astro Runtime by default.

There are two ways to emit lineage data to Astro:

  • Run a task on Astro with a supported Airflow operator, such as the SnowflakeOperator. These operators include extractors that automatically emit lineage data and don’t require additional configuration. See Supported Airflow operators.
  • Integrate OpenLineage with an external service, such as dbt or Apache Spark, to emit data lineage outside of an Airflow DAG or task using an OpenLineage API key.

The data lineage graph in the Cloud UI shows lineage data that is emitted with both methods, including jobs that are not run on the Astro data plane. This graph can provide context to your data before, during, and after it reaches your Deployment.

Extract lineage data from external systems to Astro

When you integrate an external data lineage system with Astro or you are working with Astro locally and are not using a supported Airflow operator, you need to provide a Deployment namespace, your Organization's OpenLineage URL, and your organization's OpenLineage API key. This information is used to send OpenLineage data to the correct place in Astro.

To locate the your Deployment namespace in the Cloud UI, select a Workspace and then copy the value with the format <text>-<text>-<four-digit-number> next to the Deployment name. To locate your Organization's OpenLineage URL and OpenLineage API key, go to https://cloud.<your-astro-base-domain>.io/settings and copy the values in the Lineage API Key and OpenLineage URL fields.

Snowflake and OpenLineage with Airflow

Lineage data emitted from Snowflake is similar to what is collected from other SQL databases, including Amazon Redshift and Google BigQuery. However, Snowflake is unique in that it emits query tags that provide additional task execution details.

When you run a task in Airflow that interacts with Snowflake, the query tag allows each task to be directly matched with the Snowflake query or queries that are run by that task. If the task fails, for example, you can look up the Snowflake query that was executed by that task and reduce the time required to troubleshoot the task failure.

To emit lineage data from Snowflake:

  1. Add a Snowflake connection to Airflow. See Snowflake connection.
  2. Run an Airflow DAG or task with the SnowflakeOperator or SnowflakeOperatorAsync. This operator is officially supported by OpenLineage and does not require additional configuration. If you don't run Airflow on Astro, see Extract lineage data from external systems to Astro.

Data collected

When you run an Airflow task with the SnowflakeOperator, the following data is collected:

  • Task duration
  • SQL queries. For a list of supported queries, see the OpenLineage tests repository.
  • Query duration. This is different from the Airflow task duration
  • Input datasets
  • Output datasets
  • Quality metrics based on dataset and column-level checks, including successes and failures per run

To view this data in the Cloud UI, click Lineage, select a SnowflakeOperator task, and then click the dataset. See View data lineage.

tip

Airflow tasks run with the SnowflakeOperator emit SQL source code that you can view in the Cloud UI. See View SQL source code.

OpenLineage and Databricks with Airflow

Use the information provided here to set up lineage collection for Spark running on a Databricks cluster.

Prerequisites

Setup

  1. In your Databricks File System (DBFS), create a new directory at dbfs:/databricks/openlineage/.

  2. Download the latest OpenLineage jar file to the new directory. See Maven Central Repository.

  3. Download the open-lineage-init-script.sh file to the new directory. See OpenLineage GitHub.

  4. In Databricks, run this command to create a cluster-scoped init script and install the openlineage-spark library at cluster initialization:

    dbfs:/databricks/openlineage/open-lineage-init-script.sh
  5. In the cluster configuration page for your Databricks cluster, specify the following Spark configuration:

       bash
    spark.driver.extraJavaOptions -Djava.security.properties=
    spark.executor.extraJavaOptions -Djava.security.properties=
    spark.openlineage.url https://<your-astro-base-domain>
    spark.openlineage.apiKey <your-lineage-api-key>
    spark.openlineage.namespace <NAMESPACE_NAME> // Astronomer recommends using a meaningful namespace like `spark-dev`or `spark-prod`.

Note: You override the JVM security properties for the spark driver and executor with an empty string as some TLS algorithms are disabled by default. For a more information, see this discussion.

After you save this configuration, lineage is enabled for all Spark jobs running on your cluster.

Verify Setup

To test that lineage was configured correctly on your Databricks cluster, run a test Spark job on Databricks. After your job runs, click Lineage in the Cloud UI and then click Runs in the left menu. If your configuration is successful, your Spark job appears in the table of most recent runs. Click a job run to see it within a lineage graph.

OpenLineage and dbt Core with Airflow

Use the information provided here to set up lineage collection for dbt Core tasks. To learn how to create and productionize dbt tasks in Airflow, and how to automatically create dbt Core tasks based on a manifest, see Orchestrate dbt with Airflow.

If your organization wants to orchestrate dbt Cloud jobs with Airflow, contact Astronomer support.

Prerequisites

Setup

  1. Add the following line to the requirements.txt file of your Astro project:

     openlineage-dbt
  2. Run the following command to generate the catalog.json file for your dbt project:

    $ dbt docs generate
  3. In your dbt project, run the OpenLineage wrapper script using the dbt run command:

    $ dbt-ol run
  4. Optional. Run the following command to test your set up:

    $ dbt-ol test

Verify setup

To confirm that your setup is successful, run a dbt model in your project. After you run this model, click Lineage in the Cloud UI and then click Runs in the left menu. If the setup is successful, the run that you triggered appears in the table of most recent runs.

OpenLineage and Great Expectations with Airflow

Use the information provided here to set up lineage collection for a running Great Expectations suite.

This guide outlines how to set up lineage collection for a Great Expectations project.

Prerequisites

Setup

  1. Make your Data Context accessible to your DAGs. For most use cases, Astronomer recommends adding the Data Context to your Astro project include folder. The GreatExpectationsOperator will access include/great_expectations/great_expectations.yml and use the configuration to run your Expectations. Then, add the following lines to your DAGs:

    # Required imports for Great Expectations
    import os
    from pathlib import Path
    from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
    # Set base path for Data Context
    base_path = Path(__file__).parents[2]

    ...

    # Example task using GreatExpectationsOperator
    ge_task = GreatExpectationsOperator(
    task_id="ge_task",
    # Set directory for the Data Context
    ge_root_dir=os.path.join(base_path, "include", "great_expectations"),
    ...
    )

    If you use the GreatExpectationsOperator version 0.2.0 or later and don't provide a Checkpoint file or Checkpoint Config, you can skip steps 2 and 3.

  2. In each of your Checkpoint files, add OpenLineageValidationAction to your action_list like in the following example:

    name: my.simple.chk
    config_version: 1.0
    template_name:
    module_name: great_expectations.checkpoint
    class_name: Checkpoint
    run_name_template:
    expectation_suite_name:
    batch_request: {}
    action_list:
    - name: open_lineage
    action:
    class_name: OpenLineageValidationAction,
    module_name: openlineage.common.provider.great_expectations,
    openlineage_host: https://astro-<your-astro-base-domain>.datakin.com,
    openlineage_apiKey: <your-openlineage-api-key>,
    openlineage_namespace: <namespace-name> # Replace with your job namespace; Astronomer recommends using a meaningful namespace such as `dev` or `prod`,
    job_name: validate_task_name,
  3. Deploy your changes to Astro. See Deploy code.

Verify

To confirm that your setup is successful, click Lineage in the Cloud UI and then click Issues in the left menu. Recent data quality assertion issues appear in the All Issues table.

If your code hasn't produced any data quality assertion issues, use the search bar to search for a dataset and view its node on the lineage graph for a recent job run. Click Quality to view metrics and assertion pass or fail counts.

OpenLineage and Spark

Use the information provided here to set up lineage collection for Spark.

Prerequisites

  • A Spark application.
  • A Spark job.
  • Your Astro base domain.
  • Your Organization's OpenLineage API key.

Setup

In your Spark application, set the following properties to configure your lineage endpoint, install the openlineage-spark library, and configure an OpenLineageSparkListener:

SparkSession.builder \
.config('spark.jars.packages', 'io.openlineage:openlineage-spark:0.2.+')
.config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
.config('spark.openlineage.host', 'https://astro-<your-astro-base-domain>.datakin.com')
.config('spark.openlineage.apiKey', '<your-openlineage-api-key>')
.config('spark.openlineage.namespace', '<namespace-name>') # Replace with the name of your Spark cluster.
.getOrCreate() # Astronomer recommends using a meaningful namespace such as `spark-dev` or `spark-prod`.

Verify

To confirm that your setup is successful, run a Spark job after you save your configuration. After you run this model, click Lineage in the Cloud UI and then click Runs in the left menu. Your recent Spark job run appears in the table of most recent runs.

View SQL source code

The SQL source code view for supported Airflow operators in the Cloud UI Lineage page is off by default for all Workspace users. To enable the source code view, set the following environment variable for each Astro Deployment:

  • Key: OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
  • Value: False

Astronomer recommends enabling this feature only for Deployments with non-sensitive code. For more information about Workspace permissions, see Workspace roles.