WarpScript ❤️ Kafka Streams

Learn how Kafka Streams topologies can now be defined entirely in WarpScript and benefit from its vast number of functions

WarpScript love Kafka Streams

WarpScript is the analytics language of the Warp 10 Time Series Platform. But it really is a general purpose data flow programming language and as such, it can ease many data manipulation tasks.

WarpScript has been integrated with Pig, Spark, NiFi, Storm and other products for a long time. But today we are introducing the integration of WarpScript with KafkaStreams.

How is WarpScript integrated?

The integration allows the definition of Kafka Streams topologies entirely in WarpScript, meaning topologies can be defined simply with a .mc2 file making use of WarpScript functions dedicated to building and launching Kafka Streams topologies.

The processor nodes of the topologies are WarpScript macros and can make use of any function, whether built-in or from extensions.

The integration of a scripting language into Kafka Streams makes it really easy to build topologies. Besides the power and flexibility of WarpScript make almost anything possible.

The integration also offers a Warp 10 plugin which allows to run Kafka Streams topologies in a Java Virtual Machine running a Warp 10 instance. This has the added benefit of enabling your topology to directly access the Warp 10 Storage Engine via FETCH or UPDATE rather than via the use of REXEC to access a remote instance.

A simple example

The rest of this post will walk you through the process of running Kafka and a simple Kafka Streams topology defined in WarpScript. It is assumed you use a Unix like operating system such as Linux or OS X.

Launching Kafka

In order to test a Kafka Streams topology, Kafka needs to be running. You can either launch docker containers or download, install and start Kafka as below:

curl -O -L 'https://downloads.apache.org/kafka/2.4.1/kafka_2.12-2.4.1.tgz'
tar zxpf kafka_2.12-2.4.1.tgz
nohup ./kafka_2.12-2.4.1/bin/zookeeper-server-start.sh ./kafka_2.12-2.4.1/config/zookeeper.properties &
nohup ./kafka_2.12-2.4.1/bin/kafka-server-start.sh ./kafka_2.12-2.4.1/config/server.properties &

Creating a topic

Once the Kafka broker is running, create two topics named source and sink;

./kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic source
./kafka_2.12-2.4.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic sink

You can verify that the two topics were correctly created by running

./kafka_2.12-2.4.1/bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Populating the source topic

Add a few messages to the source topic with the following command:

echo "Message#1" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#2" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#3" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#4" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source
echo "Message#5" | ./kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source

Defining a Kafka Streams topology

The topology we will test is very simple, it will have one source node consuming the source topic, one sink node writing to the sink topic and two processor nodes in between. The first processor node will reverse the input message, pack it and the current timestamp into a list and pass the list to the second processor. The second processor will create a JSON string with the list and forward it to the sink node.

The WarpScript code defining this topology is below:

KSTOPOLOGY
{
  'name' 'source'
  'topics' 'source'
} KSSOURCE
{
  'name' 'processor-1'
  'process' <%
    REVERSE NOW 2 ->LIST 'processor-2' -1
    SNAPSHOTCOPY STDOUT
    KSFORWARD
  %>
  'parents' [ 'source' ]
} KSPROCESSOR
{
  'name' 'processor-2'
  'process' <%
    ->JSON 'UTF8' ->BYTES 'sink' NULL
    SNAPSHOTCOPY STDOUT
    KSFORWARD
  %>
  'parents' [ 'processor-1' ]
} KSPROCESSOR
{
  'name' 'sink'
  'topic' 'sink'
  'parents' [ 'processor-2' ]
} KSSINK
{
  'application.id' 'WarpScript-KafkaStreams'
  'bootstrap.servers' 'localhost:9092'
} KSSTART

The documentation of these functions is available on this link.

Running the topology

Download the uberuberjar version of the warp10-plugin-kstreams plugin from WarpFleet. This version contains the full WarpScript library and its dependencies. It is therefore able to launch Kafka Streams topologies without the need for a Warp 10 instance.

A minimal Warp 10 configuration file needs to be set up. Copy the content below in a file warp10.conf:

warp.timeunits=us
warpscript.extension.debug = io.warp10.script.ext.debug.DebugWarpScriptExtension

Assuming the topology definition was saved in the file kstopology.mc2, the topology can be launched using the following command:

java -Dwarp10.config=warp10.conf -cp warp10-plugin-kstreams-x.y.z-uberuberjar.jar io.warp10.plugins.kstreams.KSLaunch kstopology.mc2

where x.y.z is the version of the plugin you downloaded.

And voilà! Your topology is running, consuming messages from source, transforming them and pushing the result to sink, displaying some intermediate debug infos to standard output.

Takeway

WarpScript is now integrated with Kafka Streams, enabling the definition of topologies entirely in WarpScript to speed up development time and leverage the power of more than 1000 functions.

We hope you will find this new integration useful and will create many interesting topologies.