Learn how Kafka Streams topologies can now be defined entirely in WarpScript and benefit from its vast number of functions
WarpScript is the analytics language of the Warp 10 Time Series Platform. But it really is a general-purpose data flow programming language and as such, it can ease many data manipulation tasks.
|Introducing the Warp 10 Kafka Plugin
How is WarpScript integrated?
The integration allows the definition of Kafka Streams topologies entirely in WarpScript, meaning topologies can be defined simply with a
.mc2 file making use of WarpScript functions dedicated to building and launching Kafka Streams topologies.
The processor nodes of the topologies are WarpScript macros and can make use of any function, whether built-in or from extensions.
The integration of a scripting language into Kafka Streams makes it really easy to build topologies. Besides the power and flexibility of WarpScript make almost anything possible.
The integration also offers a Warp 10 plugin which allows running Kafka Streams topologies in a Java Virtual Machine running a Warp 10 instance. This has the added benefit of enabling your topology to directly access the Warp 10 Storage Engine via
UPDATE rather than via the use of
REXEC to access a remote instance.
A simple example
The rest of this post will walk you through the process of running Kafka and a simple Kafka Streams topology defined in WarpScript. It is assumed you use a Unix-like operating system, such as Linux or OS X.
In order to test a Kafka Streams topology, Kafka needs to be running. You can either launch docker containers or download, install and start Kafka as below:
curl -O -L "https://downloads.apache.org/kafka/2.4.1/kafka_2.12-2.4.1.tgz"
tar zxpf kafka_2.12-2.4.1.tgz
nohup ./kafka_2.12-2.4.1/bin/zookeeper-server-start.sh ./kafka_2.12-2.4.1/config/zookeeper.properties &
nohup ./kafka_2.12-2.4.1/bin/kafka-server-start.sh ./kafka_2.12-2.4.1/config/server.properties &
Creating a topic
Once the Kafka broker is running, create two topics named
./kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic source
./kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sink
You can verify that the two topics were correctly created by running
./kafka_2.12-2.4.1/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Add a few messages to the
source topic with the following command:
echo "Message#1" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#2" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#3" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#4" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#5" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
Defining a Kafka Streams topology
The topology we will test is very simple, it will have one source node consuming the
source topic, one sink node writing to the
sink topic and two-processor nodes in between. The first processor node will reverse the input message, pack it and the current timestamp into a list and pass the list to the second processor. The second processor will create a JSON string with the list and forward it to the sink node.
The WarpScript code defining this topology is below:
REVERSE NOW 2 ->LIST 'processor-2' -1
'parents' [ 'source' ]
->JSON 'UTF8' ->BYTES 'sink' NULL
'parents' [ 'processor-1' ]
'parents' [ 'processor-2' ]
The documentation of these functions is available on this link.
|All you need to know about getting help from Warp 10 community
Running the topology
uberuberjar version of the
warp10-plugin-kstreams plugin from WarpFleet. This version contains the full WarpScript library and its dependencies. It is, therefore, able to launch Kafka Streams topologies without the need for a Warp 10 instance.
A minimal Warp 10 configuration file needs to be set up. Copy the content below in a file
warpscript.extension.debug = io.warp10.script.ext.debug.DebugWarpScriptExtension
Assuming the topology definition was saved in the file
kstopology.mc2, the topology can be launched using the following command:
java -Dwarp10.config=warp10.conf -cp warp10-plugin-kstreams-x.y.z-uberuberjar.jar io.warp10.plugins.kstreams.KSLaunch kstopology.mc2
x.y.z is the version of the plugin you downloaded.
And voilà! Your topology is running, consuming messages from
source, transforming them, and pushing the result to
sink, displaying some intermediate debug information to standard output.
WarpScript is now integrated with Kafka Streams, enabling the definition of topologies entirely in WarpScript to speed up development time and leverage the power of more than 1000 functions.
We hope you will find this new integration useful and will create many interesting topologies.
|Discover how to plug Warp 10 to Apache Nifi to manage your dataflow pipelines
Leveraging WarpScript from Pig to analyze your time series
What's new in the Warp 10 Ecosystem
Warp 10 Docker Image - Next Generation
Co-Founder & Chief Technology Officer