Learn how Kafka Streams topologies can now be defined entirely in WarpScript and benefit from its vast number of functions
WarpScript is the analytics language of the Warp 10 Time Series Platform. But it really is a general-purpose data flow programming language and as such, it can ease many data manipulation tasks.
WarpScript has been integrated with Pig, Spark, NiFi, Storm, and other products for a long time. But today we are introducing the integration of WarpScript with KafkaStreams.
Introducing the Warp 10 Kafka Plugin |
How is WarpScript integrated?
The integration allows the definition of Kafka Streams topologies entirely in WarpScript, meaning topologies can be defined simply with a .mc2
file making use of WarpScript functions dedicated to building and launching Kafka Streams topologies.
The processor nodes of the topologies are WarpScript macros and can make use of any function, whether built-in or from extensions.
The integration of a scripting language into Kafka Streams makes it really easy to build topologies. Besides the power and flexibility of WarpScript make almost anything possible.
The integration also offers a Warp 10 plugin which allows running Kafka Streams topologies in a Java Virtual Machine running a Warp 10 instance. This has the added benefit of enabling your topology to directly access the Warp 10 Storage Engine via FETCH
or, UPDATE
rather than via the use of REXEC
to access a remote instance.
A simple example
The rest of this post will walk you through the process of running Kafka and a simple Kafka Streams topology defined in WarpScript. It is assumed you use a Unix-like operating system, such as Linux or OS X.
Launching Kafka
In order to test a Kafka Streams topology, Kafka needs to be running. You can either launch docker containers or download, install and start Kafka as below:
curl -O -L "https://downloads.apache.org/kafka/2.4.1/kafka_2.12-2.4.1.tgz"
tar zxpf kafka_2.12-2.4.1.tgz
nohup ./kafka_2.12-2.4.1/bin/zookeeper-server-start.sh ./kafka_2.12-2.4.1/config/zookeeper.properties &
nohup ./kafka_2.12-2.4.1/bin/kafka-server-start.sh ./kafka_2.12-2.4.1/config/server.properties &
Creating a topic
Once the Kafka broker is running, create two topics named source
and sink
;
./kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic source
./kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sink
You can verify that the two topics were correctly created by running
./kafka_2.12-2.4.1/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Populating the source
topic
Add a few messages to the source
topic with the following command:
echo "Message#1" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#2" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#3" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#4" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#5" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
Defining a Kafka Streams topology
The topology we will test is very simple, it will have one source node consuming the source
topic, one sink node writing to the sink
topic and two-processor nodes in between. The first processor node will reverse the input message, pack it and the current timestamp into a list and pass the list to the second processor. The second processor will create a JSON string with the list and forward it to the sink node.
The WarpScript code defining this topology is below:
KSTOPOLOGY
{
'name' 'source'
'topics' 'source'
} KSSOURCE
{
'name' 'processor-1'
'process' <%
REVERSE NOW 2 ->LIST 'processor-2' -1
SNAPSHOTCOPY STDOUT
KSFORWARD
%>
'parents' [ 'source' ]
} KSPROCESSOR
{
'name' 'processor-2'
'process' <%
->JSON 'UTF8' ->BYTES 'sink' NULL
SNAPSHOTCOPY STDOUT
KSFORWARD
%>
'parents' [ 'processor-1' ]
} KSPROCESSOR
{
'name' 'sink'
'topic' 'sink'
'parents' [ 'processor-2' ]
} KSSINK
{
'application.id' 'WarpScript-KafkaStreams'
'bootstrap.servers' 'localhost:9092'
} KSSTART
The documentation of these functions is available on this link.
All you need to know about getting help from Warp 10 community |
Running the topology
Download the uberuberjar
version of the warp10-plugin-kstreams
plugin from WarpFleet. This version contains the full WarpScript library and its dependencies. It is, therefore, able to launch Kafka Streams topologies without the need for a Warp 10 instance.
A minimal Warp 10 configuration file needs to be set up. Copy the content below in a file warp10.conf
:
warp.timeunits=us
warpscript.extension.debug = io.warp10.script.ext.debug.DebugWarpScriptExtension
Assuming the topology definition was saved in the file kstopology.mc2
, the topology can be launched using the following command:
java -Dwarp10.config=warp10.conf -cp warp10-plugin-kstreams-x.y.z-uberuberjar.jar io.warp10.plugins.kstreams.KSLaunch kstopology.mc2
where x.y.z
is the version of the plugin you downloaded.
And voilà! Your topology is running, consuming messages from source
, transforming them, and pushing the result to sink
, displaying some intermediate debug information to standard output.
Takeaway
WarpScript is now integrated with Kafka Streams, enabling the definition of topologies entirely in WarpScript to speed up development time and leverage the power of more than 1000 functions.
We hope you will find this new integration useful and will create many interesting topologies.
Discover how to plug Warp 10 to Apache Nifi to manage your dataflow pipelines |
Read more
Archiving Time Series Data into Amazon S3
Captain's Log, Stardate 4353.3
LevelDB extension for blazing-fast deletion
Co-Founder & Chief Technology Officer