Leveraging WarpScript from Pig to analyze your time series

The integration of WarpScript with Pig brings the power of advanced time series analytics to big data payloads

Leveraging WarpScript with Pig

In the data processing world, Pig is a little known gem. Long before Spark or Flink came along, in 2008, Yahoo! designed a language to express data processing tasks without having to write full-blown Map Reduce jobs. This language is called Pig Latin and is executed by a system called Pig, now an Apache project.

While Pig is great for manipulating data, it lacks, just like Spark and Flink, provision for time series as first-class citizens. This is what the integration of WarpScript in Pig aims at solving by providing the ability to have time series as values in Pig and bringing the power and flexibility of its 1000+ native functions and an ocean of macros.

This article aims at showing you how you can leverage WarpScript from Pig.

Learn more about WarpScript with this tutorial using a tropical cyclones dataset.

Making WarpScript visible to Pig

The integration of WarpScript in Pig is done via a .jar file available on Bintray which can be retrieved like other Maven artifacts. Pig has the ability to fetch at runtime such dependencies. In order for this feature to work, you need to create a file called ivysettings.xml with the following content:

<ivysettings>
  <settings defaultResolver="downloadGrapes"/>
  <resolvers>
    <chain name="downloadGrapes">
      <ibiblio name="central" m2compatible="true"/>
      <ibiblio name="senx-bintray" root="https://dl.bintray.com/senx/maven" m2compatible="true" checkmodified="true" changingMatcher="regexp" />
      <ibiblio name="hbs-bintray" root="https://dl.bintray.com/hbs/maven" m2compatible="true" checkmodified="true" changingMatcher="regexp" />
    </chain>
  </resolvers>
</ivysettings>

Then you need to set the PIG_OPTS environment variable to the following value:

export PIG_OPTS=-Dgrape.config=/path/to/your/ivysettings.xml

This will instruct Pig to use your newly created ivysettings.xml file to resolve dependencies.

You then need to specify the .jar file in your Pig Script via a REGISTER command:

REGISTER ivy://io.warp10:warp10-pig:1.0.6;

This will instruct Pig to fetch the io.warp10:warp10-pig:1.0.6 artifact and all its dependencies.

Lastly, you need to instruct Pig to not combine input splits. This is needed because the recent version of Pig ignores splits which report a length of 0, and all splits from Warp 10 up to 2.7.0 do exactly that. So add the following to your Pig Script:

SET pig.splitCombination false;

What is provided?

The warp10-pig dependency provides a WarpScript runtime and a set of classes for accessing this runtime from Pig Latin scripts. Those classes add loading functions and a UDF for executing WarpScript code.

The loading functions allow retrieving data from a Warp 10 instance and applying WarpScript code to key/value pairs as they are being loaded thus leading to load time transformation of data. We will demonstrate both later.

The UDF allows calling any WarpScript code, including external macros retrieved from WarpFleet repositories.

Configuring Warp 10

Before you can use WarpScript, some configuration needs to be specified, so the runtime environment knows how to behave. The bare minimum that needs to be specified is the Warp 10 time units to use.

This, and other configurations are specified using Pig Latin SET statements:

SET warp.timeunits 'us';
SET warpscript.extension.debug 'io.warp10.script.ext.debug.DebugWarpScriptExtension';
SET warpscript.update.endpoint 'http://127.0.0.1:8080/api/v0/update';

Here we instruct the WarpScript runtime to use microseconds as the time units, we load the DebugWarpScriptExtension and we set the update endpoint so the UPDATE function can be used. Add any other configuration you wish.

Discover how to use Apache Arrow format with WarpScript.

Calling WarpScript on data

A dedicated UDF called WarpScriptRun allows you to process data from Pig Latin relations using WarpScript. You need to register this UDF first using:

DEFINE WarpScriptRun io.warp10.pig.WarpScriptRun();

The UDF is then called with 1 to N parameters, the first parameter is the WarpScript code to execute, either as-is or via the execution of code contained in a file when using the syntax %file.mc2 (if the file contains a macro) or @file.mc2 if it contains code outside a macro definition. Note that using these syntaxes requires that you REGISTER the files in your Pig Latin script. The other parameters will be pushed onto the WarpScript stack starting with the last.

The return value of the call will be a tuple containing one element per stack level with a value. The top of the stack will be the first element of the output tuple.

List values are converted to a Pig Latin tuples and vectors, created with ->V, are converted to Pig Latin data bags.

Conversely, parameters of type data bag are available in WarpScript as a value which can be iterated using FOREACH.

Loading data from Warp 10

Geo Time Series can be fetched from a Warp 10 instance using the Warp10LoadFunc. This loading function outputs key/value pairs where keys are ids unique for a given GTS and the values are byte arrays containing wrapped chunks of Geo Time Series. The schema of the output tuples is (id:chararray,data:bytearray). A single series might span multiple chunks depending on its size. Each wrapped chunk can be decoded using the UNWRAP function.

The use of the Warp10LoadFunc relies on parameters set in the Pig Latin script and suffixed with a key passed to the Warp10LoadFunc call:

--
-- LOAD parameters
--
SET warp10.splits.endpoint.LOADSUFFIX 'http://127.0.0.1:8080/api/v0/splits';
SET warp10.fetcher.fallbacks.LOADSUFFIX '127.0.0.1';
SET warp10.fetcher.fallbacksonly.LOADSUFFIX 'true';
SET warp10.fetcher.protocol.LOADSUFFIX 'http';
SET warp10.fetcher.port.LOADSUFFIX '8080';
SET warp10.fetcher.path.LOADSUFFIX '/api/v0/sfetch';
SET warp10.splits.selector.LOADSUFFIX 'test{}';
SET warp10.splits.token.LOADSUFFIX 'READ';
SET warp10.fetch.now.LOADSUFFIX '9223372036854775807';
SET warp10.fetch.timespan.LOADSUFFIX '-9223372036854775807';
SET warp10.max.splits.LOADSUFFIX '10';

--
-- Load data from Warp 10, using the configuration above suffixed with '.LOADSUFFIX'
--
GTS = LOAD 'xxx' USING io.warp10.pig.Warp10LoadFunc('LOADSUFFIX') AS (id:chararray, wrapper: bytearray);

If you want to test it on a standalone, you have to enable splits on your instance configuring: standalone.splits.enable = true

Processing data at load time

It may happen that your data does not reside in Warp 10 but in another data source that Pig Latin may be able to read. Very often, those data sources contain one record per observation (timestamp, value). So in order to be able to efficiently process those data, chunks of Geo Time Series need to be reconstructed.

This can be done by grouping all records together and using WarpScriptRun to execute code that will iterate on a data bag and create those chunks, or it can be done directly at load time using the WarpScriptInputFormat. The former solution requires a grouping step which may involve copying quite some amount of data around and may, therefore, prove suboptimal.

The latter is performed at the map stage of the Pig job and is therefore way more efficient especially when your data volume grows.

The WarpScriptInputFormat wraps another Hadoop InputFormat and can, therefore, process a very diverse set of data. For every key/value pair read by the wrapped InputFormat, the configured WarpScript code is called with a flag indicating whether the underlying input format is done and if not with the last key/value pair it read. This code can then issue from 0 to N key/value pairs of its own.

This mechanism allows, for example, the WarpScript code to build Geo Time Series from individual observations and output those as chunks either when all input has been processed or periodically when a size or count threshold is reached.

The WarpScriptInputFormat is leveraged as depicted below:

-- Register all .mc2 files needed by the job
REGISTER wsif.mc2;

-- Warp 10 configuration
SET warp.timeunits 'us';

-- Configuration for the WarpScriptInputFormat
SET pig.genericload.inputformat.XXX 'io.warp10.hadoop.WarpScriptInputFormat';

-- Class of the wrapped InputFormat
SET warpscript.inputformat.class 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat';

-- WarpScriptCode to execute on each input tuple
SET warpscript.inputformat.script '%wsif.mc2';

-- Parameter for the wrapped InputFormat
SET mapreduce.input.fileinputformat.inputdir '${input}';

-- Load CSV data using the XXX configuration
GTS = LOAD '/dev/null' USING io.warp10.pig.GenericLoadFunc('XXX') AS (discard,wrapper:bytearray);

The wsif.mc2 script is shown below, it consumes CSV lines which are expected to contain a latitude, a longitude, and a timestamp in ISO8601 format and outputs chunks of GTS at the end of a split:

<%
  NEWENCODER 'encoder' CSTORE  
  'done' STORE

  $done NOT
  <%
    // Store key and value (offset and line)
    [ 'key' 'value' ] STORE        

    // Split CSV
    $value ',' SPLIT LIST-> DROP [ 'lat' 'lon' 'ts' ] STORE    

    // Add value to GTS Encoder
    $encoder $ts TOTIMESTAMP $lat TODOUBLE $lon TODOUBLE NaN T ADDVALUE DROP
  %>
  <%
    [ NULL $encoder WRAP ]
  %>
  IFTE
%>

Storing data in Warp 10

Last piece of the puzzle, the output of your job. While you can store the results in files using any of the output options supported by Pig, you can also push your data to Warp 10.

You can achieve this in two ways. The first involves calling UPDATE from WarpScript code executed in a call to WarpScriptRun inside a Pig Latin FOREACH ... GENERATE construct. For this option to work, make sure you have configured the warpscript.update.endpoint as shown above.

The second possibility is to use the Warp10StoreFunc provided by the Pig integration. You can use this StoreFunc as below:

--
-- STORE parameters
--
SET warp10.endpoint.STORESUFFIX 'http://127.0.0.1:8080/api/v0/update';
SET warp10.token.STORESUFFIX 'WRITE';
SET warp10.gzip.STORESUFFIX 'true';
SET warp10.maxrate.STORESUFFIX '1000000'; -- 1 million datapoints/s limit

--
-- Build your WRAPPERS here
--

...

--
-- Store data in Warp 10
--
STORE WRAPPERS INTO 'xxx' USING io.warp10.pig.Warp10StoreFunc('STORESUFFIX');

The relation to store must contain either chararray elements in the Geo Time Series format (TS/LAT:LON/ELEV CLASS{LABELS} VALUE) or GTS wrappers produced by WRAP, bytearray elements containing GTS wrapper produced by WRAPRAW or bags of such elements.

Putting it all together

Once you have written your Pig Latin script, run it using the following pig command:

pig -x local -F -l pig.log script.pig

You can replace local by mapreduce or tez if you have such an environment available. You also need to add any parameter you are using in your script using relevant -p param=value options passed to pig.

Lastly, you need to have set the JAVA_HOME and PIG_OPTS environment variables prior to calling pig.

Takeaways

The integration of WarpScript with Pig opens a world of possibilities. In our own experience Pig has proven easier to use than Spark or Flink, mostly because it is far more tolerant when it comes to specifying schemas which are almost always optional.

Also, Pig's overflowing to disk mechanism makes it less vulnerable to out-of-memory errors.

As Pig can be leveraged from data science environments such as Zeppelin or Jupyter, it is the perfect arrow to add to your quiver for manipulating large volumes of data.

Keep reading on using Pig with Warp 10 and WarpScript.