Schedule Warp 10 tasks with runners

Schedule Warp 10 tasks with runners, like down-sampling, archiving, deletions, computations and so on.

Schedule Warp 10 tasks with runners

Ever wonder how to downs-sample old data or to do some computation periodically? Warp 10 uses runners for that. We will cover the basics of runners and a simple use case.

Runners are scheduled tasks in Warp 10, written in WarpScript.

How to basic

In your Warp 10 installation folder, there is a directory called warpscripts which contains, by default, this folder hierarchy

/path/to/warp10
       [...]
         |- warpscripts
            |- test
               |- 60000
                  |- runner.mc2.DISABLE

What it means: for the domain “test”, each “60000” ms, all files with the “mc2” extension will be executed as a WarpScript.

Let’s test it: rename runner.mc2.DISABLE to runner.mc2 and edit it:

//
// Warpscript code for this runner
// This script must have a '.mc2' extension.
//
NOW ISO8601 LOGMSG

We will use the LOGMSG function. In order to use it, we have to activate the DebugWarpScriptExtension by creating a custom configuration file:

warpscript.extension.debug = io.warp10.script.ext.debug.DebugWarpScriptExtension

Now restart Warp 10 and have a look at the file: /path/to/warp10/logs/warp10.log:

tail -f -n 100 /path/to/warp10/logs/warp10.log

You will see a line per minute (60000 ms) :

[1632130303832] 2021-09-20T09:31:43.832517Z
[1632130363841] 2021-09-20T09:32:43.841663Z
[1632130423848] 2021-09-20T09:33:43.848062Z
[1632130483852] 2021-09-20T09:34:43.852193Z

Of course, you can add any subdirectory you want according to your domain (instead of using “test”) and any subfolder named by the desired frequency in ms, i.e.:

/path/to/warp10
       [...]
         |- warpscripts
            |- senx
               |- 10000
               |  |- myRunner.mc2
               |  |- myOtherRunner;mc2
               |- 30000
                  |- anotherRunner.mc2 

There are some configuration keys in the Warp 10 configuration file /path/to/warp10/etc/conf.d/10-runner.conf, have a look.

Sample use case

Imagine we have data (random data in our case), we want to do some computations (subsampling with the mean value) and store it for later use.

First, generate a random series with a point every 10 seconds:

NEWGTS 'gts' STORE
0.0 'v' STORE
// for (i = 1; i <= 50000; i++)
1 50000 <% 
  'i' STORE
  NOW $i 10 s * - 'ts' STORE                   // computed timestamp
  $v RAND 0.5 - + 'v' STORE                    // computed value
  $gts $ts NaN NaN NaN $v ADDVALUE 'gts' STORE // add datapoint
%> FOR
// display result
$gts

https://snapshot.senx.io/0005cc912308cdc5-0-2-282dfd693a5467a9

Random data
Random data

Learn more about Random Numbers.

Now test the hourly mean value computation in WarpStudio:

NEWGTS 'gts' STORE
0.0 'v' STORE
// for (i = 1; i <= 50000; i++)
1 50000 <% 
  'i' STORE
  NOW $i 10 s * - 'ts' STORE                   // computed timestamp
  $v RAND 0.5 - + 'v' STORE                    // computed value
  $gts $ts NaN NaN NaN $v ADDVALUE 'gts' STORE // add datapoint
%> FOR
// retrieve the last timestamp
$gts LASTTICK 'last' STORE
// keep only the hourly mean value
[ $gts bucketizer.mean $last 1 h 0 ] BUCKETIZE // the sub sampled series
$gts // the original
original and sub-sampled series
Original and sub-sampled series

It’s time to code our runner.

We will code a runner dedicated to data insertion, a data point each second with a value shared through SHM. (Read this blog post about SHM!)

Now edit /path/to/warp10/warpscripts/test/1000/randomData.mc2:

// Generates a random datapoint each second
<%
  <%
    // try to read last value from SHared Memory
    'value' SHMLOAD 'v' STORE
  %>
  <%
    // when not found, set default value
    0.0 'v' STORE
  %>
  <%
    // finally, load the reference from SHM and store it
    $v RAND 0.5 - + 'v' STORE           // computes the new value
    NULL 'value' SHMSTORE               // empty the store
    $v 'value' SHMSTORE                 // save it in shared memory
    NEWGTS 'random' RENAME 'gts' STORE  // create a new series
    $gts NOW NaN NaN NaN $v ADDVALUE    // add datapoint
    '<your write token>' UPDATE         // insert it
%> 'myMutex' MUTEX // prevent a concurrent execution on the same SHM data

You can have a look at the generated series:

[ '<your read token>' 'random' {}  NOW 1 h ] FETCH
Random series

Now we will code a new runner which:

  • Runs every minute
  • Fetches the last 5 minutes of data
  • Computes a moving average over a window of 5 minutes (or whatever you want)
  • Computes the mean value of all the values of this time window (this is the sub-sampling, 1 point per 5 minutes, but it is just an example)
  • Inserts this new value as a GTS
  • Deletes all data points older than one hour.

Now edit /path/to/warp10/warpscripts/test/60000/moving-avg.mc2

// get the last 5 minutes of data
[ '<your read token>' 'random' {} NOW 5 m ] FETCH 'gts' STORE
// computes the upper time boundary as a plain minute 
$gts LASTTICK ->TSELEMENTS [ 0 4 ] SUBLIST TSELEMENTS-> 'last' STORE
// computes the moving average
[ $gts mapper.mean 5 m 0 0 ] MAP 'gts' STORE
// computes the mean value
[ $gts bucketizer.mean $last 0 1 ] BUCKETIZE 0 GET 
// rename the resulting GTS
'random.avg' RENAME 'gts' STORE 
'<your write token>' 'token' STORE
// insert the resulting GTS
$gts $token UPDATE
// delete old datapoints
$token 'random' 
MINLONG   // start timestamp
NOW 1 h - // end timestamp
MAXLONG   // count
DELETE

After a while, you can see the result directly in WarpStudio:

[
  '<your read token>'
  '~random.*' {} 
  NOW 2 h
] FETCH 
Sub sampled data
Sub sampled data

Going further

In real life, of course, you will have to handle real data and perform more realistic computation. But the concept remains the same.

You should use macros instead of having all your logic in runners. Your runners should just call macros. You should even automate the deployment of your macros.

And finally, do not delete your data, make archives, into a S3 for example! You never know if you will need it someday.