Use Apache Kafka with Apache Airflow
Apache Kafka is an open source tool for handling event streaming. Combining Kafka and Airflow allows you to build powerful pipelines that integrate streaming data with batch processing. In this tutorial, you'll learn how to install and use the Airflow Kafka provider to interact directly with Kafka topics.
While it is possible to manage a Kafka cluster with Airflow, be aware that Airflow itself should not be used for streaming or low-latency processes. See the Best practices section for more information.
Time to complete
This tutorial takes approximately 1 hour to complete.
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of Apache Kafka. See the official Introduction to Kafka.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
Prerequisites
- A Kafka cluster with a topic. This tutorial uses a cluster hosted by Confluent Cloud, which has a free trial option. See the Confluent documentation for how to create a Kafka cluster and topic in Confluent Cloud.
- The Astro CLI.
To connect a local Kafka cluster to an Airflow instance running in Docker, set the following properties in your Kafka cluster's server.properties
file before starting your Kafka cluster:
listeners=PLAINTEXT://:9092,DOCKER_HACK://:19092
advertised.listeners=PLAINTEXT://localhost:9092,DOCKER_HACK://host.docker.internal:19092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,DOCKER_HACK:PLAINTEXT
You can learn more about connecting to local Kafka from within a Docker container in Confluent's Documentation.
Step 1: Configure your Astro project
Create a new Astro project:
$ mkdir astro-kafka-tutorial && cd astro-kafka-tutorial
$ astro dev initAdd the following packages to your
packages.txt
file:build-essential
librdkafka-devAdd the following packages to your
requirements.txt
file:confluent-kafka==1.8.2
airflow-provider-kafka
If you are running Airflow as a standalone application and are using an M1 Mac, complete the additional setup in the airflow-provider-kafka
README.
Add the following environment variables in
.env
. Provide your own Kafka topic name, boostrap server, API Key and API Secret.KAFKA_TOPIC_NAME=<your-kafka-topic-name>
BOOSTRAP_SERVER=<your-bootstrap-server>
SECURITY_PROTOCOL=SASL_SSL
KAFKA_API_KEY=<your-api-key>
KAFKA_API_SECRET=<your-api-secret>
If you are connecting to the local Kafka server created with the server.properties
in the info box from the Prerequisites section you will need to set BOOTSTRAP_SERVER=host.docker.internal:19092
, SECURITY_PROTOCOL=PLAINTEXT
and provide your topic name. You can set the API Key and API Secret to None
.
Run the following command to start your project in a local environment:
astro dev start
Step 2: Create a DAG with a producer task
The Airflow Kafka provider package contains the ProduceToTopicOperator, which you can use to produce events directly to a Kafka topic.
Create a new file in your
dags
folder calledkafka_example_dag_1.py
.Copy and paste the following code into the file:
# kafka_example_dag_1.py
import os
import json
import logging
import functools
from pendulum import datetime
from airflow import DAG
from airflow_provider_kafka.operators.produce_to_topic import ProduceToTopicOperator
# get the topic name from .env
my_topic = os.environ["KAFKA_TOPIC_NAME"]
# get Kafka configuration information
connection_config = {
"bootstrap.servers": os.environ["BOOSTRAP_SERVER"],
"security.protocol": os.environ["SECURITY_PROTOCOL"],
"sasl.mechanism": "PLAIN",
"sasl.username": os.environ["KAFKA_API_KEY"],
"sasl.password": os.environ["KAFKA_API_SECRET"]
}
with DAG(
dag_id="kafka_example_dag_1",
start_date=datetime(2022, 11, 1),
schedule=None,
catchup=False,
):
# define the producer function
def producer_function():
for i in range(5):
yield (json.dumps(i), json.dumps(i+1))
# define the producer task
producer_task = ProduceToTopicOperator(
task_id=f"produce_to_{my_topic}",
topic=my_topic,
producer_function=producer_function,
kafka_config=connection_config
)The code above retrieves the environment variables you defined in Step 1 and packages them into a configuration dictionary that can be used by the ProduceToTopicOperator. Any Python function which returns a generator can be passed to the
producer_function
parameter of the ProduceToTopicOperator. Make sure your producer function returns a generator that contains key-value pairs where the value is in a format your Kafka topic accepts as input. In this example, the generator produces a JSON value. Additionally, if you have defined a schema for your Kafka topic, the generator needs to return compatible objects.Run your DAG.
View the logs of your task instance. The 5 produced events will be listed.
View the produced events in your Kafka cluster. The example screenshot below shows them in the Confluent Cloud.
Step 3: Add a consumer task
The ConsumeFromTopicOperator enables Airflow to consume messages from topics.
Add the following import statement to
kafka_example_dag_1
.from airflow_provider_kafka.operators.consume_from_topic import ConsumeFromTopicOperator
Add the following code after the
producer_task
inkafka_example_dag_1
.consumer_logger = logging.getLogger("airflow")
def consumer_function(message, prefix=None):
try:
key = json.loads(message.key())
value = json.loads(message.value())
consumer_logger.info(f"{prefix} {message.topic()} @ {message.offset()}; {key} : {value}")
return
except:
consumer_logger.info(f"Unable to consume message!")
return
consumer_task = ConsumeFromTopicOperator(
task_id=f"consume_from_{my_topic}",
topics=[my_topic],
apply_function=functools.partial(consumer_function, prefix="consumed:::"),
consumer_config={
**connection_config
"group.id": "consume",
"enable.auto.commit": False,
"auto.offset.reset": "beginning",
},
max_messages=30,
max_batch_size=10,
)
producer_task >> consumer_taskThe
consumer_task
includes a function that reads frommy_topic
and prints the messages it consumes to the Airflow task log. Theconsumer_task
uses the sameconnection_config
as theproducer_task
with added configurations specific to Kafka Consumers. You can read more about consumer configuration in Kafka in the Kafka documentation.Run your DAG.
View the consumed messages in your Airflow task logs.
A common use case is to directly connect a blob storage (for example an Amazon S3 bucket) to your Kafka topic as a consumer. The ConsumeFromTopicOperator is helpful if you want to use Airflow to schedule the consuming task. Instead of writing the messages retrieved to the Airflow logs, you can for example write them to S3 using the S3CreateObjectOperator.
Step 4: Listen for a message in the stream
A common use case is to run a downstream task when a specific message appears in your Kafka topic. The AwaitKafkaMessageOperator is a deferrable operator that will listen to your Kafka topic for a message that fulfills a specific criteria.
A deferrable operator is a sensor that will go into a deferred state in between checking for its condition in the target system. While in the deferred state the operator does not take up a worker slot, offering a significant efficiency improvement. See Deferrable operators.
In
kafka_example_dag_1
, add the following import statement:from airflow_provider_kafka.operators.await_message import AwaitKafkaMessageOperator
Copy and paste the following code at the end of your DAG:
def await_function(message):
if isinstance(json.loads(message.value()), int):
if json.loads(message.value()) % 5 == 0:
return f" Got the following message: {json.loads(message.value())}"
await_message = AwaitKafkaMessageOperator(
task_id=f"awaiting_message_in_{my_topic}",
topics=[my_topic],
# the apply function needs to be passed with its location for the triggerer
apply_function="kafka_example_dag_1.await_function",
kafka_config={
**connection_config
"group.id": "awaiting_message",
"enable.auto.commit": False,
"auto.offset.reset": "beginning",
},
xcom_push_key="retrieved_message",
)
consumer_task >> await_messageThis code snippet includes an
await_function
which will parse each message in the Kafka topic and return the message if the value is an integer divisible by 5. The AwaitKafkaMessageOperator runs this function over messages polled from the Kafka topic. If no matching message is found, it continues to poll in a deferred state until it finds one.Run the DAG. Notice how the task instance of the
await_message
task goes into a deferred state (purple square).(Optional) Add a downstream task to the
await_message
task, which only runs onceawait_message
has completed successfully.
How it works
The airflow-kafka-provider
contains three hooks:
KafkaAdminClientHook
KafkaConsumerHook
KafkaProducerHook
It uses four operators and one trigger:
ProduceToTopicOperator
ConsumeFromTopicOperator
AwaitKafkaMessageOperator
EventTriggersFunctionOperator
AwaitMessageTrigger
The following section provides more detailed information on the parameters of each operator. For more information on the other modules in this provider see the airflow-provider-kafka
source code.
ProduceToTopicOperator
The ProduceToTopicOperator can be used to create a Kafka producer to produce messages to a Kafka topic. You can define the following parameters:
topic
: The Kafka topic you want to produce to.producer_function
: A Python function that returns a generator that will create key/value pairs to be produced to Kafka as messages.producer_function_args
: Positional arguments for theproducer_function
.producer_function_kwargs
: Keyword arguments for theproducer_function
.delivery_callback
: A custom function to be executed after each message that was produced to the Kafka topic (in case of success and failure). If no function is provided the ProduceToTopicOperator will log the produced record in case of success and an error message in case of failure.synchronous
: Specifies if writing to Kafka should be fully synchronous. True by default.kafka_config
: The configuration for the Kafka client, including the connection information. For a full list of parameters please refer to the librdkafka GitHub repository.poll_timeout
: The delay between production to Kafka and calling poll on the producer.
ConsumeFromTopicOperator
The ConsumeFromTopicOperator can be used to create a Kafka consumer to read batches of messages and process them. You can define the following parameters:
topics
: A list of topics or regex patterns for the consumer to subscribe to i.e. read from.apply_function
: A Python function that is applied to all messages that are read.apply_function_args
: Positional arguments for theapply_function
.apply_function_kwargs
: Keyword arguments for theapply_function
.consumer_config
: The configuration for the Kafka client, including the connection information. For a full list of parameters please refer to the librdkafka GitHub repository.commit_cadence
: In which situations the Kafka consumer created should commit offsets. The 3 optionsend_of_operator
(default),never
andend_of_batch
are available.max_messages
: Maximum number of messages the Kafka consumer created by this instance of the ConsumeFromTopicOperator can read from its topics.max_batch_size
: Maximum number of messages the Kafka consumer can read when polling. The default is 1000.poll_timeout
: How long the Kafka Consumer created should wait for potentially incoming messages after having read all currently available messages before ending its task. The default is 60 seconds.
AwaitKafkaMessageOperator
The AwaitKafkaMessageOperator is a deferrable operator that can be used to wait for a specific message to be published to one or more Kafka topics. You can define the following parameters:
topics
: A list of topics or regex patterns to read from.apply_function
: A Python function that is applied to all messages that are read. If the function returns any data the task will be ended and marked as successful. The returned data will be pushed to XCom unless the BaseOperator argumentdo_xcom_push
is set toFalse
.apply_function_args
: Positional arguments for theapply_function
.apply_function_kwargs
: Keyword arguments for theapply_function
.kafka_config
: The configuration for the Kafka client, including the connection information. For a full list of parameters please refer to the librdkafka GitHub repository.poll_timeout
: The amount of time in seconds that the task should wait for a message in its active state.poll_interval
: The amount of time in seconds that the task should wait in the deferred state.xcom_push_key
: The key under which to save the returned data to XCom.
EventTriggersFunctionOperator
The EventTriggersFunctionOperator is a deferrable operator that waits for a specific message to be published to one or more Kafka topics, similar to the AwaitKafkaMessageOperator. Unlike the AwaitKafkaMessageOperator, the EventTriggersFunctionOperator will continue listening until the task is stopped by an external criterion such as a timeout of the DAG itself. If this external criterion isn't met, this task will stay in a deferred state indefinitely as long as the DAG is running. You can view an example DAG using this operator in the astronomer-providers
repository.
You can define the following parameters for this operator:
topics
: A list of topics or regex patterns to read from.apply_function
: A Python function that is applied to all messages that are read. If the function returns any data, theevent_triggered_function
will run.event_triggered_function
: A Python function that runs every time the operator consumes a message that causes theapply_function
to return any data. The value returned by theapply_function
is passed to the function provided toevent_triggered_function
as the first positional parameter.apply_function_args
: Positional arguments for theapply_function
.apply_function_kwargs
: Keyword arguments for theapply_function
.kafka_config
: The configuration for the Kafka client, including the connection information. For a full list of parameters, see the librdkafka GitHub repository.poll_timeout
: The amount of time in seconds that the task should wait for a message in its active state.poll_interval
: The amount of time in seconds that the task should wait in the deferred state.
Best practices
Apache Kafka is a tool optimized for streaming messages at high frequencies, for example in an IoT application. Airflow is designed to handle orchestration of data pipelines in batches.
Astronomer recommends to combine these two open source tools by handling low-latency processes with Kafka and data orchestration with Airflow.
Common patterns include:
- Configuring a Kafka cluster with a blob storage like S3 as a sink. Batch process data from S3 at regular intervals.
- Using the ProduceToTopicOperator in Airflow to produce messages to a Kafka cluster as one of several producers.
- Consuming data from a Kafka cluster via the ConsumeFromTopicOperator in batches using the apply function to extract and load information to a blob storage or data warehouse.
- Listening for specific messages in a data stream running through a Kafka cluster using the AwaitKafkaMessageOperator to trigger downstream tasks once the message appears.
Conclusion
The Airflow Kafka provider offers 3 easy to use operators to interact with topics and messages in Kafka. You now know how to use these operators to connect Kafka and Airflow.