Reading data from Warp 10
The integration of WarpScript in Spark opens a vast number of opportunities for analyzing your time series data. Since the integration in Spark is at the DAG level, you can manipulate data coming from any source. For fast batch execution we recommend reading data from files on disk but it is sometimes necessary to read hot data directly from the Warp 10 Storage Engine.
Such access is possible as Warp 10 provides a Warp10InputFormat
class which is a standard Hadoop InputFormat.
Using the Warp10InputFormat
The complete class for the Warp 10 Input Format is io.warp10.hadoop.Warp10InputFormat
, it can be leveraged within Spark using the newAPIHadoopRDD
method from the RDD class. The created RDD can then be converted into a DataFrame for processing in Spark SQL.
The Warp10InputFormat
produces records with a Text key and a BytesWritable value, which are converted to String and binary when accessed in Spark.
The syntax for reading from Warp 10 is:
spark = SparkSession.builder.appName("WarpScript Spark App").getOrCreate()
sc = spark.sparkContext
rdd = sc.newAPIHadoopRDD('io.warp10.hadoop.Warp10InputFormat', 'org.apache.hadoop.io.Text', 'org.apache.hadoop.io.BytesWritable', conf=conf)
Where conf
is a dict with the configuration for the Warp10InputFormat
execution.
The supported configuration keys are:
Key | Description |
---|---|
warp10.splits.endpoint | URL of the endpoint to access for retrieving splits. Typically http://HOST:PORT/api/v0/splits |
warp10.fetcher.fallbacks | Comma separated list of hosts which can server as fallback fetchers in case one of the fetchers defined for a split is unavailable |
warp10.fetcher.fallbacksonly | Boolean indicating whether to use the fetchers ('false') or only the fallbacks ('true'). Set to 'true ' when retrieving data from a standalone Warp 10 |
warp10.fetcher.protocol | Protocol to use when talking to the fetchers, defaults to http |
warp10.fetcher.port | Port to use when talking to the fetchers, defaults to 8881 |
warp10.fetcher.path | URL patch of the fetcher endpoint, defaults to /api/v0/sfetch |
warp10.splits.selector | Geo Time Series selector to use to retrieve the list of GTS, for example class{label1~regexp1} |
warp10.splits.token | Token to use for retrieving the list of Geo Time Series and later their datapoints |
warp10.http.connect.timeout | Connection timeout to the splits and sfetch endpoints. Defaults to 10000 ms |
warp10.http.read.timeout | Read timeout for the splits and sfetch endpoints, also defaults to 10000 ms |
warp10.fetch.now | Timestamp to use as the now parameter for the datapoints retrieval |
warp10.fetch.timespan | Timespan to use for the datapoints retrieval |
warp10.max.combined.splits | Maximum number of splits to combine in a single split. Each original split corresponds to a single Geo Time Series, do not set to let the InputFormat infer the right number for you |
warp10.max.splits | Maximum number of splits to produce. The InputFormat will combine the individual splits to producer that many splits |
The returned tuples have a key set to a wrapper ID, which can be safely ignored, and the values are wrapped Geo Time Series encoders which can be unwrapped using UNWRAP
or UNWRAPENCODER
.