Build a Complete Application with Warp 10, from TCP Stream to Dashboard

Warp 10 provides a lot of functionalities, making possible the creation of whole applications. Learn how to get, store and display live ship positions.

Build a Complete Application with Warp 10, from TCP Stream to Dashboard

Warp 10 is truly versatile and offers numerous functionalities. Most of our blog posts focus on one of these functionalities to give you a deep understanding of it. This article takes a different path and describes how to build a complete application with Warp 10, from the data source to the visualization, thus covering several functionalities.

As a reminder, a series of blog posts already use this approach, with more focus on data exploration, UFO datasets, the truth is out there…, part 2 and part 3. The tutorial on cyclone data is also worth your time if you want to discover WarpScript.

This blog post will take you through all the steps to build your application, all of them using Warp 10. The idea of the application we will create is to get live ship positions and display them plus other information on a web page:

  1. Get AIS messages from a live TCP stream
  2. Parse these messages
  3. Store the data they contain
  4. Clean the data
  5. Analyze the data
  6. Visualize the results

Read TCP Streams

Warp 10 can create TCP servers and clients with the TCP plugin (it can do UDP too). You first need to enable the plugin by adding to your configuration:

warp10.plugin.tcp = io.warp10.plugins.tcp.TCPWarp10Plugintcp.dir = ${standalone.home}/tcp

If you want to keep your configuration clean and easily revert back to your previous configuration, you can add all the configuration changes from this blog post to a file named etc/conf.d/99-blog-ais.conf. While you are modifying the configuration, enable the Debug extension, it will be useful:

warpscript.extension.debug = io.warp10.script.ext.debug.DebugWarpScriptExtension

Create a tcp directory in your Warp 10 home and restart your instance to load your new configuration. We can now add WarpScripts in this directory to make Warp 10 create TCP clients or servers.

Before starting to code WarpScripts, we need a TCP stream of AIS messages to connect to. Hopefully, the Norwegian Coastal Administration provides that. It sends a line of text for each AIS message one of the base stations receives. This is exactly what the TCP plugin expects: lines of UTF-8 text separated with a newline.

Now, we can add our first script to test the connection and visualize the message. Scripts for the TCP plugin must leave a Map on the stack with some configuration. You can create a test.mc2 file in the tcp directory you created earlier and paste the following in this file:

{ 'mode' 'client' 'host' '153.44.253.27' 'port' 5631 'macro' <% STACKTOLIST STDOUT %>}

If you look into your log file with tail -f logs/warp.log and everything is OK, you will see a lot of messages looking like this:

[[[153.44.253.27, 5631, \s:2573405,c:1615990800*05\!BSVDM,2,1,9,A,53nSd2`00000hGW?C@08u8M<d`4E8000000000160P:44000000000000000,0*58]]]

What does it mean? For each line of text the TCP client receives, the macro defined in the configuration above is called. When it is called, it is given a list of triplet, each triplet containing the IP, the port, and text message. In the log above, only one triplet is present because the macro is fast and the stream does not send too much data. If the macro takes too much time compared to the flow of incoming messages, the given list of triplet will contain more elements.

The message itself is a text composed of a prefix between \ and the NMEA message beginning with !. Time to decode this message.

Parse AIS Messages

How AIS messages are encoded is wonderfully described by Eric S. Raymond in this page. As you can see this is quite complex so we will cover the most common type 1, 2, and 3 messages, and only the MMSI (ship id), latitude, and longitude.

To do so, we will need some bytes and binary functions most of which are described here. We will also need the neat UNPACK function to extract numbers, coded on X number of bits, and booleans packed in a byte array. This is particularly useful in this case because of how AIS messages are encoded.

In our case, we use the ">U6S2>U30S23>L28>L27" format for UNPACK to decode type 1, 2 and 3 messages:

  1. >U6 decodes the message type as big-endian, unsigned integer on 6 bits.
  2. S2 ignores 2 bits encoding the repeat indicator
  3. >U30 decodes the MMSI as big-endian, unsigned integer on 30 bits
  4. S23 ignores 23 bits encoding the navigation status, the rate of turn, the speed over ground, and position accuracy
  5. >L28 decodes the longitude as big-endian, unsigned integer on 28 bits
  6. >L27 decodes the latitude as big-endian, unsigned integer on 27 bits
  7. The following bits being ignored

This message must be converted to a GTS to be stored and manipulated by Warp 10. We use a simple model consisting in a single GTS per MMSI named io.senx.blog.ais{mmsi=messagemmsi} containing points with latitude and longitude and true values. Using boolean values reduces the storage and memory footprint of GTSs.

Now, save the complete macro to convert an NMEA message to a GTS in a file located in your Warp 10 home under blog/ais/nmea_to_gts.mc2, we will use it later:

<% SAVE '.context' STORE ',' SPLIT 'message_split' STORE // Ignore NMEA messages spanning on several fragments $message_split 2 GET '1' != <% 'Unhandled multi-fragment message' MSGFAIL %> IFT // Extract timestamp $message_split 1 GET 2 10 SUBSTRING TOLONG 'ts' STORE // Decode NMEA payload, see https://gpsd.gitlab.io/gpsd/AIVDM.html#_aivdmaivdo_payload_armoring '' $message_split 6 GET <% 'ASCII' ->BYTES TOLONG 48 – DUP 40 > <% 8 - %> IFT TOBIN 58 SUBSTRING + %> FOREACH // Decode 1, 2 and 3 messages, see https://gpsd.gitlab.io/gpsd/AIVDM.html#_types_1_2_and_3_position_report_class_a BIN-> '>U6S2>U30S23>L28>L27' UNPACK LIST-> DROP [ 'message_type' 'mmsi' 'lon_raw' 'lat_raw' ] STORE $lon_raw 600000.0 / 'lon' STORE $lat_raw 600000.0 / 'lat' STORE // Ignore messages other than 1, 2 and 3 $message_type 3 > <% 'Unhandle message type ' + $message_type MSGFAIL %> IFT // Add to $lgts NEWGTS 'io.senx.blog.ais' RENAME { 'mmsi' $mmsi TOSTRING } RELABEL $ts s $lat $lon NaN T ADDVALUE $.context RESTORE%>

A useful trick, which is very often used in our scripts, is shown here: LIST-> DROP [ ... ] STORE. It allows the storage of each element of a list in a variable.

Store the Data

Now that we are able to read a TCP stream and decode the messages it sends, we need to store the GTSs containing the decoded messages. Because most analysis we will do will be on fresh data, we will activate the Warp 10 Accelerator. Add to your configuration:

accelerator = trueaccelerator.chunk.count = 16accelerator.chunk.length = 60000000

This enables the Accelerator, using 16 chunks of 1 minute. This means you will have between 15 and 16 minutes of data in memory. Restart your instance to take this change into account.

With the accelerator, your data can be stored either in memory, on disk or on both. Of course,it is much faster to read from and write to memory. This will be helpful to make sure the TCP macro reading the stream is not slowed by writes on the disk. Having a fast TCP macro makes sure the stream is read faster than the data arrives. To store the data only on cache (in memory) we use the ACCEL.CACHE and ACCEL.NOPERSIST functions.

We also use another neat functionality of Warp 10: secrets in the config. The fewer tokens and other secrets are in your macros, the less chances such secret wills leak, because of a erroneous git -a commit for instance. Open again your configuration and add the token you intend to use for this application:

readtoken@blog/ais = YOUR_READ_TOKENwritetoken@blog/ais = YOUR_WRITE_TOKEN

You can now add a macro under blog/ais/store_in_cache.mc2 to store a list of GTS only in the cache of your Accelerated Warp 10.

<% ACCEL.CACHE ACCEL.NOPERSIST 'writetoken' MACROCONFIG UPDATE%>

Combine Reading, Parsing and Storing

We now have all the building block to read, parse and store the data in our instance. You can remove the test.mc2 file in the tcp directory and add a ais_sources.mc2 containing:

{ 'mode' 'client' 'host' '153.44.253.27' 'port' 5631 'macro' <% [] 'lgts' STORE // List of GTS resulting from the parsing of the message <% <% // Get TCP payload -1 GET 'message' STORE $lgts $message @blog/ais/nmea_to_gts +! DROP %> <% /* Ignore invalid messages */ %> <% %> TRY %> FOREACH // Update in Accelerator cache $lgts @blog/ais/store_in_cache %>}

This file will make your instance ingest the TCP stream and store the data in memory. Notice the negative indexing which works with GET, ATINDEX, etc. This works like in Python, in the case the index is negative, the index really used is this index plus the length of the collection.

Persist the Data

The data is here, but only for 15 minutes! It would be a pity to leave this data go to waste so we will deploy a runner to periodically get the data in memory and persist it. As you remember, we stored the data in memory both to be rapidly accessible to analysis scripts but also to avoid slowing down the handling of TCP messages. By persisting the data with a runner, we don”t impact other scripts performance because it is done in another thread, in a more optimized way because we use large batches of data.

We will also protect how secrets here by putting the tokens used by the runner in the configuration. Add the following to the configuration then restart your instance:

readtoken@/blog_ais = "YOUR_READ_TOKEN"writetoken@/blog_ais = "YOUR_WRITE_TOKEN"

Notice that the mechanism is a bit different because the replacement will be done verbatim in the runner script. The token must be quoted for the replacement to produce a valid WarpScript.

The period at which you persist the data depends on the amount of data you”re OK to lose in case of a crash. However, if you update your data too often, it will be less optimal because you will do a lot of calls with a small amount of data. We can start with a periodicity of 10 seconds by adding a script in warpscripts/blog_ais/10000/runner_persist.mc2 with the following content:

// Load data in cacheACCEL.CACHE ACCEL.NOPERSIST[ ${readtoken} 'io.senx.blog.ais' {} NOW $runner.periodicity ms 1.2 * TOLONG // Take a bit more than periodicity to account for imprecision in runner scheduling]FETCH// Rename with same name to allow UPDATE'+' RENAME// Store on diskACCEL.NOCACHE ACCEL.PERSIST${writetoken}UPDATE

Clean the Data

If you look at some data, you rapidly notice that there are some points far from Norway:

[ 'YOUR_READ_TOKEN' 'io.senx.blog.ais' {} NOW 1 m]FETCH

This is because some AIS messages provide wrong locations or are corrupted as we didn”t check the checksum. We have to make a macro to filter those points to have more meaningful analysis and nicer visualization. You may already know about the GEO.WKT and GEO.JSON functions but there is a proper way to use them in macros.

If you call these functions each time you want to remove outliers, it will be slow because it has to compute the cells covering the defined area. A better approach is to compute the shape once and for all, and reuse it for each call, even without using GEOPACK! This can be done with early evaluation, !$. Create a file blog/ais/keep_around_norway.mc2 containing:

'POLYGON((-1.2402343750000178 72.66452333828568,34.70703124999998 72.66452333828568,34.70703124999998 53.96582812906815,-1.2402343750000178 53.96582812906815,-1.2402343750000178 72.66452333828568))'10 falseGEO.WKT'shape' STORE<% [ SWAP !$shape mapper.geo.within 0 0 0 ] MAP%>

Calling this macro will use created shape instance without recomputing it, saving a lot of time for the execution.

Analyze the Data

In this section, we prepare some macros to analyze and prepare the data for visualization. The most simple query is to get the data for the last 15 minutes, thus making sure it is fetched from the cache. Create a blog/ais/recent_locs.mc2 file containing:

<% [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' {} NOW 15 m ] FETCH @blog/ais/keep_around_norway // Keep a consistent order, by MMSI <% LABELS 'mmsi' GET TOLONG %> SORTBY%>

Nothing fancy here, as FETCH knows the time windows is included in the cache time window, it will fetch the data from the cache. We also filter data far from Norway and sort by MMSI for successive requests to return the GTS in the same order. We also use the MACROCONFIG function to avoid displaying the token.

And we can add a bit more filtering by only keeping ships going above 30 kts, which is pretty fast on water. Add the blog/ais/recent_speeding.mc2 file:

<% NOW 'now' STORE [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' {} $now 15 m ] FETCH @blog/ais/keep_around_norway <% LABELS 'mmsi' GET TOLONG %> SORTBY // Compute speed using locations, result in m/s [ SWAP mapper.hspeed 1 0 0 ] MAP // Keep speeds more than 30 kts, which is 30 / 1.944 = 15.432 ms/s [ SWAP 30 1.944 / mapper.gt 1 0 0 ] MAP NONEMPTY%>

Of course, we”re not limited to returning GTSs of positions. We can also have a series of values, for instance, the number of emitters per minute. Store that in blog/ais/recent_emissions.mc2:

<% NOW 'now' STORE [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' {} $now 15 m ] FETCH @blog/ais/keep_around_norway [ SWAP bucketizer.last $now 1 m / 1 m * 1 m 14 ] BUCKETIZE [ SWAP [] reducer.count.exclude-nulls ] REDUCE%>

This script takes advantage of the BUCKETIZE + REDUCE combination which makes it easy to align and combine ticks. We also align buckets on full minutes using 1 m / 1 m * which snaps a timestamp to the full minute before it.

A bit more complex, but very important when monitoring AIS signals: detect a ship stopping to send AIS messages. The frequency at which a ship sends its messages is not clearly defined, for this example, we consider that a ship which has not sent AIS in the last 5 minutes but has sent at least a message in the last 10 minutes. Create blog/ais/recent_losses.mc2 and add:

<% NOW 'now' STORE [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' {} $now 10 m ] FETCH @blog/ais/keep_around_norway $now 'last_bucket' STORE [ SWAP bucketizer.last $last_bucket 5 m 2 ] BUCKETIZE [] SWAP <% SORT 'gts' STORE [ <% $gts SIZE 1 == %> <% $gts -1 ATINDEX 0 GET $last_bucket != %> ] AND <% $gts +! %> IFT %> FOREACH%>

To solve that, we fetch 10 minutes of data, from the cache, make two buckets of 5 minutes with the last one ending at $now. If the last bucket containing data does not end at $now, that means we lose the signal. Notice we use a variable to store NOW, this is because calling NOW several times will yield a different result, breaking the logic of the script. We also use a little-known feature of the AND function: short-circuit evaluation. By giving it a list of macros, we ensure that the first macro returning false will end the evaluation and makes AND return false. This makes sure that -1 ATINDEX will not fail because the GTS is empty.

We finish this section with a last script showing how to take advantage of cached and persisted data. Depending on the parameters given to FETCH and the calls to ACCEL.CACHE, ACCEL.NOCACHE, ACCEL.PERSIST and ACCEL.NOPERSIST, FETCH will either get the data from the cache or the persistent storage. We can use that to our advantage to get all known MMSI being more than 5 minutes old from the persistent storage and compare it to the MMSIs in cache. We can then detect MMSIs we never heard of before. Create the blog/ais/new_emitters.mc2 file with the following content:

<% NOW 'now' STORE [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' {} $now 5 m - -1 // This will force FETCH to use persistant storage ] FETCH @blog/ais/keep_around_norway 'old_emitters' STORE ACCEL.CACHE ACCEL.NOPERSIST [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' {} $now -1 // We forced cache/nopersist so even with this parameter, cache is used ] FETCH @blog/ais/keep_around_norway 'recent_emitters' STORE // Compute the difference of MMSIs using sets $recent_emitters <% LABELS 'mmsi' GET %> F LMAP ->SET $old_emitters <% LABELS 'mmsi' GET %> F LMAP ->SET DIFFERENCE SET-> 'new_emitters_mmsi' STORE // Fetch the recent data for these MMSIs [ 'readtoken' MACROCONFIG 'io.senx.blog.ais' { 'mmsi' '~' $new_emitters_mmsi REOPTALT + } $now 15 m ] FETCH @blog/ais/keep_around_norway%>

To determine the new MMSIs, we also Sets to compute differences between lists of MMSIs. On the resulting List, we used the very useful REOPTALT function which creates a regexp from a list of strings. This regexp matches only with the elements in the given List, allowing us to fetch the right MMSIs.

Visualize Analysis

We now have more than enough macros to feed our dashboard, so we will use Discovery to create our dashboard which will display the results of our macros. We create 5 tiles for each of our 5 macros, all being of type map, except for the number of emissions which will be of type line. And we also add auto-refresh to all our tiles to be able to visualize the live data.

You first need to enable the HTTP plugin for your Warp 10 instance. Add those line to your configuration:

warp10.plugin.http = io.warp10.plugins.http.HTTPWarp10Pluginhttp.host = 0.0.0.0http.port = 12345http.dir = ${standalone.home}/http

Once done, restart your instance, it will be the last time! You can now add a WarpScript in the http directory defining a configuration in a similar manner you did for TCP. Create a blog_ais_dashboard.mc2 in the http directory and paste this:

{ 'path' '/blog/ais/dashboard' 'prefix' false 'parsePayload' true 'macro' <% [ { 'x' 0 'y' 0 'w' 5 'h' 4 'type' 'map' 'endpoint' 'http://localhost:8888/api/v0/exec' 'macro' <% @blog/ais/recent_locs %> 'options' { 'autoRefresh' 300 } } { 'title' 'Number of emission per minute' 'x' 5 'y' 0 'w' 4 'h' 1 'type' 'line' 'endpoint' 'http://localhost:8888/api/v0/exec' 'macro' <% @blog/ais/recent_emissions %> 'options' { 'autoRefresh' 60 } } { 'title' 'Loss of signal' 'x' 5 'y' 1 'w' 4 'h' 3 'type' 'map' 'endpoint' 'http://localhost:8888/api/v0/exec' 'macro' <% @blog/ais/recent_losses %> 'options' { 'autoRefresh' 300 } } { 'title' 'Over 30 kts' 'x' 9 'y' 0 'w' 3 'h' 2 'type' 'map' 'endpoint' 'http://localhost:8888/api/v0/exec' 'macro' <% @blog/ais/recent_speeding %> 'options' { 'autoRefresh' 300 } } { 'title' 'New MMSIs' 'x' 9 'y' 2 'w' 3 'h' 2 'type' 'map' 'endpoint' 'http://localhost:8888/api/v0/exec' 'macro' <% @blog/ais/new_emitters %> 'options' { 'autoRefresh' 300 } } ] 'tiles' STORE { 'title' 'My Legendary AIS Dashboard!' 'tiles' $tiles } @senx/discovery/render 'body' STORE { 'body' $body 'headers' { 'Content-Type' 'text/html' 'Access-Control-Allow-Origin' '*' } } %>}

You can go now to http://127.0.0.1:12345/blog/ais/dashboard and admire the result of all this had work. If you”ve just restarted your instance, wait some time for the cache to fill to have the best result from your macros. Congratulations to you, you now have a live monitoring of ships around Norway!

The resulting AIS dashboard with Discovery.
The resulting AIS dashboard with Discovery.

Takeaways

Although this blog post covered a lot of Warp 10 functionalities, a lot more exists built-in or in extensions! Nonetheless, we showed that it is possible to build a complete application from the data source to the data visualization. The first step is the ingestion of data through TCP, which is a very efficient way of ingesting data. WarpScript also offers bytes and bit-related functions which allowed us to parse NMEA/AIS messages.

We also covered the use of the Warp 10 Accelerator to optimize the storage both for reads and writes. WarpScript macros optimizations and secret-protection tips and tricks have been described to further improve the quality of your scripts.

Finally, WarpScript functions useful for analysis and Discovery dashboard have been presented. This proves that Warp 10 is not only a platform to read and write data, but also a powerful tool to analyze and visualize data.

Next, you can make an even richer analysis, which can rely on spatio-temporal indexing of the data. You can also improve the new_emitters script to keep the list of known MMSIs in SHM for better performances.