Learn how you can connect to Kafka and consume messages directly from your Warp 10 instance.
Kafka is a message broker whose popularity has grown really fast over the last few years. It is now part of many infrastructure stacks. Many data transit at one point or another through a Kafka cluster.
This makes Kafka a very interesting middleware to connect to in order to fetch and act upon important data in an organization. Whether it is logs, metrics, transaction records, alarms, if you push those data into Kafka, then being able to consume from Kafka opens many opportunities.
This blog post will describe the Warp 10 Kafka Plugin, a module you can add to your Warp 10 instance to consume data from Kafka and manipulate it on the fly using WarpScript.
Learn how Kafka Streams topologies can now be defined entirely in WarpScript. |
Installing the Warp 10 Kafka Plugin
The plugin is available on WarpFleet, or, if you want to build it from the sources, you can go to its GitHub repo.
Installing from WarpFleet
You need the WarpFleet CLI client to be installed, if you have not done so, please run npm i -f @senx/warpfleet
. Once wf
is installed, simply execute the following:
wf g -d CONFDIR -l LIBDIR io.warp10 warp10-plugin-kafka
Where CONFDIR
is the configuration directory of your Warp 10 instance (or use -c CONFFILE
with a release prior to 2.1), and LIBDIR
the lib
directory where the downloaded jar file should be copied.
Building from source
First, you need to clone the repository
git clone https://github.com/senx/warp10-plugin-kafka.git
cd warp10-plugin-kafka
./gradlew -Duberjar shadowJar
Then copy the jar (suffixed with uberjar
) from the build/libs
directory into the lib
directory of your Warp 10 deployment.
Configuring the Plugin
In order to use the Kafka Plugin, it must be enabled in the Warp 10 configuration file:
warp10.plugin.kafka = io.warp10.plugins.kafka.KafkaWarp10Plugin
The Kafka plugin will scan the directory specified in the kafka.dir
configuration key at the period configured in kafka.period
(expressed in milliseconds, default to one minute), reloading consumer specifications from .mc2
files found in the directory.
Using the Plugin
The plugin will interact with Kafka according to specification files. A specification file is a valid WarpScript (.mc2
) file producing a map with the following fields:
{
'topics' [ 'topic1' 'topic2' ] // List of Kafka topics to subscribe to
'parallelism' 1 // Number of threads to start for processing the incoming messages. Each thread will handle a certain number of partitions.
'config' { // Map of Kafka consumer parameters
'bootstrap.servers' '127.0.0.1:9092'
'group.id' 'senx-consumer'
'enable.auto.commit' 'true'
}
'macro' <% %> // Macro to execute for each incoming message
'timeout' 10000 // Polling timeout (in ms), if no message is received within this delay, the macro will be called with an empty map as input
}
Once loaded, this specification will trigger the consumption of the specified Kafka topics. You should set the groupid
in the config
map. For each received message, the macro specified in macro
will be called, preserving the execution environment (stack) between calls. The message is pushed onto the stack prior to calling the macro. It is pushed as a map with the following fields:
{
'timestamp' 123 // The record timestamp
'timestampType' 'type' // The type of timestamp, can be one of 'NoTimestampType', 'CreateTime', 'LogAppendTime'
'topic' 'topic_name' // Name of the topic which received the message
'offset' 123 // Offset of the message in 'topic'
'partition' 123 // Id of the partition which received the message
'key' ... // Byte array of the message key
'value' ... // Byte array of the message value
'headers' { } // Map of message headers
}
When the timeout
has expired, instead of a message map like above, the macro is called with an empty map.
The configured macro can perform any WarpScript operation. If you wish to store some data, you can use the UPDATE
function.
Build a real-time system monitoring using a Kafka broker and a dashboard using WebSockets. A Warp 10 tutorial with Discovery to read here. |
Takeaways
The Warp 10 Kafka Plugin allows you to consume data from the most popular message broker and process those messages using WarpScript.
Share what you build using this plugin!
Read more
Getting help from Warp 10 community
Build a WarpScript extension which embeds a C library
Build a BACnet datalogger with a Raspberry
Co-Founder & Chief Technology Officer