The integration of WarpScript with Pig brings the power of advanced time series analytics to big data payloads
In the data processing world, Pig is a little known gem. Long before Spark or Flink came along, in 2008, Yahoo! designed a language to express data processing tasks without having to write full-blown Map Reduce jobs. This language is called Pig Latin and is executed by a system called Pig, now an Apache project.
While Pig is great for manipulating data, it lacks, just like Spark and Flink, provision for time series as first-class citizens. This is what the integration of WarpScript in Pig aims at solving by providing the ability to have time series as values in Pig and bringing the power and flexibility of its 1000+ native functions and an ocean of macros.
This article aims at showing you how you can leverage WarpScript from Pig.
Learn more about WarpScript with this tutorial using a tropical cyclones dataset. |
Making WarpScript visible to Pig
The integration of WarpScript in Pig is done via a .jar
file available on Bintray which can be retrieved like other Maven artifacts. Pig has the ability to fetch at runtime such dependencies. In order for this feature to work, you need to create a file called ivysettings.xml
with the following content:
<ivysettings>
<settings defaultResolver="downloadGrapes"/>
<resolvers>
<chain name="downloadGrapes">
<ibiblio name="central" m2compatible="true"/>
<ibiblio name="senx-bintray" root="https://dl.bintray.com/senx/maven" m2compatible="true" checkmodified="true" changingMatcher="regexp" />
<ibiblio name="hbs-bintray" root="https://dl.bintray.com/hbs/maven" m2compatible="true" checkmodified="true" changingMatcher="regexp" />
</chain>
</resolvers>
</ivysettings>
Then you need to set the PIG_OPTS
environment variable to the following value:
export PIG_OPTS=-Dgrape.config=/path/to/your/ivysettings.xml
This will instruct Pig to use your newly created ivysettings.xml
file to resolve dependencies.
You then need to specify the .jar
file in your Pig Script via a REGISTER
command:
REGISTER ivy://io.warp10:warp10-pig:1.0.6;
This will instruct Pig to fetch the io.warp10:warp10-pig:1.0.6
artifact and all its dependencies.
Lastly, you need to instruct Pig to not combine input splits. This is needed because the recent version of Pig ignores splits which report a length of 0, and all splits from Warp 10 up to 2.7.0 do exactly that. So add the following to your Pig Script:
SET pig.splitCombination false;
What is provided?
The warp10-pig
dependency provides a WarpScript runtime and a set of classes for accessing this runtime from Pig Latin scripts. Those classes add loading functions and a UDF for executing WarpScript code.
The loading functions allow retrieving data from a Warp 10 instance and applying WarpScript code to key/value pairs as they are being loaded thus leading to load time transformation of data. We will demonstrate both later.
The UDF allows calling any WarpScript code, including external macros retrieved from WarpFleet repositories.
Configuring Warp 10
Before you can use WarpScript, some configuration needs to be specified, so the runtime environment knows how to behave. The bare minimum that needs to be specified is the Warp 10 time units to use.
This, and other configurations are specified using Pig Latin SET statements:
SET warp.timeunits 'us';
SET warpscript.extension.debug 'io.warp10.script.ext.debug.DebugWarpScriptExtension';
SET warpscript.update.endpoint 'http://127.0.0.1:8080/api/v0/update';
Here we instruct the WarpScript runtime to use microseconds as the time units, we load the DebugWarpScriptExtension
and we set the update endpoint so the UPDATE
function can be used. Add any other configuration you wish.
Discover how to use Apache Arrow format with WarpScript. |
Calling WarpScript on data
A dedicated UDF called WarpScriptRun
allows you to process data from Pig Latin relations using WarpScript. You need to register this UDF first using:
DEFINE WarpScriptRun io.warp10.pig.WarpScriptRun();
The UDF is then called with 1 to N parameters, the first parameter is the WarpScript code to execute, either as-is or via the execution of code contained in a file when using the syntax %file.mc2
(if the file contains a macro) or @file.mc2
if it contains code outside a macro definition. Note that using these syntaxes requires that you REGISTER
the files in your Pig Latin script. The other parameters will be pushed onto the WarpScript stack starting with the last.
The return value of the call will be a tuple containing one element per stack level with a value. The top of the stack will be the first element of the output tuple.
List values are converted to a Pig Latin tuples and vectors, created with ->V
, are converted to Pig Latin data bags.
Conversely, parameters of type data bag are available in WarpScript as a value which can be iterated using FOREACH.
Loading data from Warp 10
Geo Time Series can be fetched from a Warp 10 instance using the Warp10LoadFunc
. This loading function outputs key/value pairs where keys are ids unique for a given GTS and the values are byte arrays containing wrapped chunks of Geo Time Series. The schema of the output tuples is (id:chararray,data:bytearray)
. A single series might span multiple chunks depending on its size. Each wrapped chunk can be decoded using the UNWRAP
function.
The use of the Warp10LoadFunc
relies on parameters set in the Pig Latin script and suffixed with a key passed to the Warp10LoadFunc
call:
--
-- LOAD parameters
--
SET warp10.splits.endpoint.LOADSUFFIX 'http://127.0.0.1:8080/api/v0/splits';
SET warp10.fetcher.fallbacks.LOADSUFFIX '127.0.0.1';
SET warp10.fetcher.fallbacksonly.LOADSUFFIX 'true';
SET warp10.fetcher.protocol.LOADSUFFIX 'http';
SET warp10.fetcher.port.LOADSUFFIX '8080';
SET warp10.fetcher.path.LOADSUFFIX '/api/v0/sfetch';
SET warp10.splits.selector.LOADSUFFIX 'test{}';
SET warp10.splits.token.LOADSUFFIX 'READ';
SET warp10.fetch.now.LOADSUFFIX '9223372036854775807';
SET warp10.fetch.timespan.LOADSUFFIX '-9223372036854775807';
SET warp10.max.splits.LOADSUFFIX '10';
--
-- Load data from Warp 10, using the configuration above suffixed with '.LOADSUFFIX'
--
GTS = LOAD 'xxx' USING io.warp10.pig.Warp10LoadFunc('LOADSUFFIX') AS (id:chararray, wrapper: bytearray);
If you want to test it on a standalone, you have to enable splits on your instance configuring:
standalone.splits.enable = true
Processing data at load time
It may happen that your data does not reside in Warp 10 but in another data source that Pig Latin may be able to read. Very often, those data sources contain one record per observation (timestamp, value). So in order to be able to efficiently process those data, chunks of Geo Time Series need to be reconstructed.
This can be done by grouping all records together and using WarpScriptRun
to execute code that will iterate on a data bag and create those chunks, or it can be done directly at load time using the WarpScriptInputFormat. The former solution requires a grouping step which may involve copying quite some amount of data around and may, therefore, prove suboptimal.
The latter is performed at the map stage of the Pig job and is therefore way more efficient especially when your data volume grows.
The WarpScriptInputFormat
wraps another Hadoop InputFormat and can, therefore, process a very diverse set of data. For every key/value pair read by the wrapped InputFormat, the configured WarpScript code is called with a flag indicating whether the underlying input format is done and if not with the last key/value pair it read. This code can then issue from 0 to N key/value pairs of its own.
This mechanism allows, for example, the WarpScript code to build Geo Time Series from individual observations and output those as chunks either when all input has been processed or periodically when a size or count threshold is reached.
The WarpScriptInputFormat
is leveraged as depicted below:
-- Register all .mc2 files needed by the job
REGISTER wsif.mc2;
-- Warp 10 configuration
SET warp.timeunits 'us';
-- Configuration for the WarpScriptInputFormat
SET pig.genericload.inputformat.XXX 'io.warp10.hadoop.WarpScriptInputFormat';
-- Class of the wrapped InputFormat
SET warpscript.inputformat.class 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat';
-- WarpScriptCode to execute on each input tuple
SET warpscript.inputformat.script '%wsif.mc2';
-- Parameter for the wrapped InputFormat
SET mapreduce.input.fileinputformat.inputdir '${input}';
-- Load CSV data using the XXX configuration
GTS = LOAD '/dev/null' USING io.warp10.pig.GenericLoadFunc('XXX') AS (discard,wrapper:bytearray);
The wsif.mc2
script is shown below, it consumes CSV lines which are expected to contain a latitude, a longitude, and a timestamp in ISO8601 format and outputs chunks of GTS at the end of a split:
<%
NEWENCODER 'encoder' CSTORE
'done' STORE
$done NOT
<%
// Store key and value (offset and line)
[ 'key' 'value' ] STORE
// Split CSV
$value ',' SPLIT LIST-> DROP [ 'lat' 'lon' 'ts' ] STORE
// Add value to GTS Encoder
$encoder $ts TOTIMESTAMP $lat TODOUBLE $lon TODOUBLE NaN T ADDVALUE DROP
%>
<%
[ NULL $encoder WRAP ]
%>
IFTE
%>
Storing data in Warp 10
Last piece of the puzzle, the output of your job. While you can store the results in files using any of the output options supported by Pig, you can also push your data to Warp 10.
You can achieve this in two ways. The first involves calling UPDATE
from WarpScript code executed in a call to WarpScriptRun
inside a Pig Latin FOREACH ... GENERATE
construct. For this option to work, make sure you have configured the warpscript.update.endpoint
as shown above.
The second possibility is to use the Warp10StoreFunc
provided by the Pig integration. You can use this StoreFunc as below:
--
-- STORE parameters
--
SET warp10.endpoint.STORESUFFIX 'http://127.0.0.1:8080/api/v0/update';
SET warp10.token.STORESUFFIX 'WRITE';
SET warp10.gzip.STORESUFFIX 'true';
SET warp10.maxrate.STORESUFFIX '1000000'; -- 1 million datapoints/s limit
--
-- Build your WRAPPERS here
--
...
--
-- Store data in Warp 10
--
STORE WRAPPERS INTO 'xxx' USING io.warp10.pig.Warp10StoreFunc('STORESUFFIX');
The relation to store must contain either chararray
elements in the Geo Time Series format (TS/LAT:LON/ELEV CLASS{LABELS} VALUE
) or GTS wrappers produced by WRAP
, bytearray
elements containing GTS wrapper produced by WRAPRAW
or bags of such elements.
Putting it all together
Once you have written your Pig Latin script, run it using the following pig
command:
pig -x local -F -l pig.log script.pig
You can replace local
by mapreduce
or tez
if you have such an environment available. You also need to add any parameter you are using in your script using relevant -p param=value
options passed to pig
.
Lastly, you need to have set the JAVA_HOME
and PIG_OPTS
environment variables prior to calling pig
.
Takeaways
The integration of WarpScript with Pig opens a world of possibilities. In our own experience Pig has proven easier to use than Spark or Flink, mostly because it is far more tolerant when it comes to specifying schemas which are almost always optional.
Also, Pig's overflowing to disk mechanism makes it less vulnerable to out-of-memory errors.
As Pig can be leveraged from data science environments such as Zeppelin or Jupyter, it is the perfect arrow to add to your quiver for manipulating large volumes of data.
Keep reading on using Pig with Warp 10 and WarpScript.
Read more
I want this in Excel!
Grafana BeerTender Dashboard connected with Warp 10
Implementing a WarpScript function in Java
Co-Founder & Chief Technology Officer