How to orchestrate Azure Data Explorer queries with Airflow

  • Kenten Danas

Azure Data Explorer (ADX) is a managed data analytics service used for performing real-time analysis of large volumes of streaming data. It’s particularly useful for IoT applications, big data logging platforms, and SaaS applications.

You can use the ADX Hook and Operator which are part of the Azure provider package, to integrate ADX queries into your DAGs. In this post, we show how to make your ADX cluster work with Airflow and create a DAG that runs a query against a database in that cluster.

Prerequisites

To complete the example in this post, you need:

Step 1: Configure your ADX cluster to work with Airflow

To allow Airflow to communicate with your ADX database, you need to configure service principal authentication. To do this, create and register a Microsoft Entra ID service principal, then give that principal permission to access your ADX database. See Create a Microsoft Entra ID application registration in Azure Data Explorer for more details.

Step 2: Populate your ADX database

Populate your ADX database with demo data. See Quickstart: Ingest sample data into ADX for instructions on ingesting the StormEvents sample dataset. If you are working with an existing ADX cluster that already contains data, you can skip this step.

Step 3: Configure your Astro project

Now that you have your Azure resources configured, you can move on to setting up Airflow.

  1. Create a new Astro project:

    $ mkdir astro-adx-tutorial && cd astro-adx-tutorial
    $ astro dev init
  2. Add the following line to the requirements.txt file of your Astro project:

    apache-airflow-providers-microsoft-azure

    This installs the Azure provider package that contains all of the relevant ADX modules.

  3. Run the following command to start your project in a local environment:

    astro dev start

Step 4: Add an Airflow connection to ADX

Add a connection that Airflow will use to connect to ADX. In the Airflow UI, go to Admin -> Connections.

Create a new connection named adx and choose the Azure Data Explorer connection type. Enter the following information:

For more information on setting up this connection, including available authentication methods, see the ADX hook documentation.

Your connection should look similar to this:

ADX Connection

Step 5: Create your DAG

In your Astro project dags/ folder, create a new file called adx-pipeline.py. Paste the following code into the file:

from airflow.models.dag import DAG
from airflow.providers.microsoft.azure.operators.adx import AzureDataExplorerQueryOperator
from datetime import datetime, timedelta

adx_query = '''StormEvents
| sort by StartTime desc
| take 10'''

with DAG('azure_data_explorer',
         start_date=datetime(2020, 12, 1),
         max_active_runs=1,
         schedule='@daily',
         default_args={
            'depends_on_past': False,
            'retries': 0,
        },
         catchup=False
         ) as dag:

    opr_adx_query = AzureDataExplorerQueryOperator(
        task_id='adx_query',
        query=adx_query,
        database='storm_demo',
        azure_data_explorer_conn_id='adx'
    )

Update the database parameter in the AzureDataExplorerQueryOperator to be the name of your database. If you are not working with the StormEvents demo dataset, you can also update the adx_query to something appropriate for your data.

Step 6: Run your DAG and review XComs

Go to the Airflow UI, unpause your azure_data_explorer DAG, and trigger it to run the query in your ADX cluster. The results of the query will automatically be pushed to XCom. Go to Admin -> XComs to view the results.

ADX Xcom Results

Ready to Get Started?

Get Started Free

Try Astro free for 14 days and power your next big data project.