Reading data from Warp 10
The integration of WarpScript in Spark opens a vast number of opportunities for analyzing your time series data. Since the integration in Spark is at the DAG level, you can manipulate data coming from any source. For fast batch execution we recommend reading data from files on disk but it is sometimes necessary to read hot data directly from the Warp 10 Storage Engine.
Such access is possible as Warp 10 provides a
Warp10InputFormat class which is a standard Hadoop InputFormat.
Using the Warp10InputFormat
The complete class for the Warp 10 Input Format is
io.warp10.hadoop.Warp10InputFormat, it can be leveraged within Spark using the
newAPIHadoopRDD method from the RDD class. The created RDD can then be converted into a DataFrame for processing in Spark SQL.
Warp10InputFormat produces records with a Text key and a BytesWritable value, which are converted to String and binary when accessed in Spark.
The syntax for reading from Warp 10 is:
spark = SparkSession.builder.appName("WarpScript Spark App").getOrCreate() sc = spark.sparkContext rdd = sc.newAPIHadoopRDD('io.warp10.hadoop.Warp10InputFormat', 'org.apache.hadoop.io.Text', 'org.apache.hadoop.io.BytesWritable', conf=conf)
conf is a dict with the configuration for the
The supported configuration keys are:
|warp10.splits.endpoint||URL of the endpoint to access for retrieving splits. Typically |
|warp10.fetcher.fallbacks||Comma separated list of hosts which can server as fallback fetchers in case one of the fetchers defined for a split is unavailable|
|warp10.fetcher.fallbacksonly||Boolean indicating whether to use the fetchers ('false') or only the fallbacks ('true'). Set to '|
|warp10.fetcher.protocol||Protocol to use when talking to the fetchers, defaults to |
|warp10.fetcher.port||Port to use when talking to the fetchers, defaults to |
|warp10.fetcher.path||URL patch of the fetcher endpoint, defaults to |
|warp10.splits.selector||Geo Time Series selector to use to retrieve the list of GTS, for example |
|warp10.splits.token||Token to use for retrieving the list of Geo Time Series and later their datapoints|
|warp10.http.connect.timeout||Connection timeout to the |
|warp10.http.read.timeout||Read timeout for the |
|warp10.fetch.now||Timestamp to use as the |
|warp10.fetch.timespan||Timespan to use for the datapoints retrieval|
|warp10.max.combined.splits||Maximum number of splits to combine in a single split. Each original split corresponds to a single Geo Time Series, do not set to let the |
|warp10.max.splits||Maximum number of splits to produce. The |
The returned tuples have a key set to a wrapper ID, which can be safely ignored, and the values are wrapped Geo Time Series encoders which can be unwrapped using