Loading

Reading data from Warp 10

The integration of WarpScript in Spark opens a vast number of opportunities for analyzing your time series data. Since the integration in Spark is at the DAG level, you can manipulate data coming from any source. For fast batch execution we recommend reading data from files on disk but it is sometimes necessary to read hot data directly from the Warp 10 Storage Engine.

Such access is possible as Warp 10 provides a Warp10InputFormat class which is a standard Hadoop InputFormat.

Using the Warp10InputFormat

The complete class for the Warp 10 Input Format is io.warp10.hadoop.Warp10InputFormat, it can be leveraged within Spark using the newAPIHadoopRDD method from the RDD class. The created RDD can then be converted into a DataFrame for processing in Spark SQL.

The Warp10InputFormat produces records with a Text key and a BytesWritable value, which are converted to String and binary when accessed in Spark.

The syntax for reading from Warp 10 is:

spark = SparkSession.builder.appName("WarpScript Spark App").getOrCreate()
sc = spark.sparkContext

rdd = sc.newAPIHadoopRDD('io.warp10.hadoop.Warp10InputFormat', 'org.apache.hadoop.io.Text', 'org.apache.hadoop.io.BytesWritable', conf=conf)

Where conf is a dict with the configuration for the Warp10InputFormat execution.

The supported configuration keys are:

KeyDescription
warp10.splits.endpointURL of the endpoint to access for retrieving splits. Typically http://HOST:PORT/api/v0/splits
warp10.fetcher.fallbacksComma separated list of hosts which can server as fallback fetchers in case one of the fetchers defined for a split is unavailable
warp10.fetcher.fallbacksonlyBoolean indicating whether to use the fetchers ('false') or only the fallbacks ('true'). Set to 'true' when retrieving data from a standalone Warp 10
warp10.fetcher.protocolProtocol to use when talking to the fetchers, defaults to http
warp10.fetcher.portPort to use when talking to the fetchers, defaults to 8881
warp10.fetcher.pathURL patch of the fetcher endpoint, defaults to /api/v0/sfetch
warp10.splits.selectorGeo Time Series selector to use to retrieve the list of GTS, for example class{label1~regexp1}
warp10.splits.tokenToken to use for retrieving the list of Geo Time Series and later their datapoints
warp10.http.connect.timeoutConnection timeout to the splits and sfetch endpoints. Defaults to 10000 ms
warp10.http.read.timeoutRead timeout for the splits and sfetch endpoints, also defaults to 10000 ms
warp10.fetch.nowTimestamp to use as the now parameter for the datapoints retrieval
warp10.fetch.timespanTimespan to use for the datapoints retrieval
warp10.max.combined.splitsMaximum number of splits to combine in a single split. Each original split corresponds to a single Geo Time Series, do not set to let the InputFormat infer the right number for you
warp10.max.splitsMaximum number of splits to produce. The InputFormat will combine the individual splits to producer that many splits

The returned tuples have a key set to a wrapper ID, which can be safely ignored, and the values are wrapped Geo Time Series encoders which can be unwrapped using UNWRAP or UNWRAPENCODER.