What is PySpark?
WarpScript in PySpark
The integration of WarpScript in PySpark is provided by the
warp10-spark-x.y.z.jar built from source (use the
pack Gradle task).
This artifact defines both User Defined Functions (UDFs) and a User Defined Aggregate Function (UDAF) which can be used in PySpark jobs to execute WarpScript code.
Those functions are implemented as Java classes and thus do not suffer from the Py4J overhead experienced when executing Python functions in Spark jobs.
Using a WarpScript UDF
warp10-spark2 package defines a range of Spark SQL User Defined Functions which accept from 1 to 22 parameters (as allowed by the Spark SQL API). The first of those parameters is expected to be the WarpScript code fragment to execute. The extra parameters will be pushed onto the stack, in reverse order, i.e. the parameter following the WarpScript fragment will be on top of the stack when the fragment is executed.
After the WarpScript has been executed, the content of the stack will be returned to the Spark job. If the stack contained a single level after the execution, the object on top of the stack will be returned, otherwise an array with the levels of the stack will be returned, top first.
PySpark expects the datasets to be strongly typed, therefore when declaring the UDF in your job, you must also specify the types of its return values, with arrays and maps being strongly typed too.
The types supported by PySpark are defined in the Python package
pyspark.sql.type, the Catalyst code can be looked up to understand type conversion. The following table gives some matching between Java and PySpark types:
|Java type||PySpark type|
Registering the UDF
The registration of a User Defined Function is done with the following code in PySpark:
sqlContext.registerJavaFunction("name", "io.warp10.spark.WarpScriptUDFx", TYPE)
name is the name of the function as it will be available in PySpark,
x is the number of arguments of the functions, from 1 to 22, and
TYPE is the type of its return value.
As a more detailed example, the following will define a
foo function which returns an array of strings and expects 3 parameters:
from pyspark.sql import SparkSession from pyspark.sql import SQLContext from pyspark.sql.types import StringType from pyspark.sql.types import ArrayType spark = SparkSession.builder.appName("WarpScript Spark Test").getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sqlContext.registerJavaFunction("foo", "io.warp10.spark.WarpScriptUDF3", ArrayType(StringType()))
Using the UDF
Once registered, a UDF can be called in your PySpark job just as built-in functions.
Building upon the previous example, adding the following line demonstrates the function invocation:
print sqlContext.sql("SELECT foo('SNAPSHOT \"Easy!\"', 3.14, 'pi')").collect()
It will simply convert the state of the stack to a STRING (via the
SNAPSHOT function) and will add the
Easy! STRING on top of the stack.
Instead of explicit WarpScript code, you could have invoked WarpScript code from an external file by using the
Using a WarpScript User Defined Aggregate Function
User Defined Aggregate Functions are used to aggregate a sequence of values then emit a final value.
The lifecycle of those functions contains several steps, initialize, update, merge and evaluate.
The link between steps is done using an aggregation buffer which is passed at each step with other parameters.
The initialize step is meant to set an initial value into the aggregation buffer. The WarpScript code is called with the following items on the stack:
TOP: the STRING 'initialize' 2: the aggregation buffer (a list)
The WarpScript code is expected to leave a stack with as many values as the size of the aggregation buffer. The values on the stack will be put into the aggregation buffer.
The update step is meant to update the aggregation buffer given a row of input. The code is called with the following items on the stack:
TOP: the STRING 'update' 2: a row of input (a list) 3: the aggregation buffer (a list)
There again, the code is expected to leave a stack with as many values as the size of the aggregation buffer. The aggregation buffer will be updated accordingly.
The merge step is meant to merge two aggregation buffers. The code is called with the following input:
TOP: the STRING 'merge' 2: an aggregation buffer (a list) 3: the target aggregation buffer (a list)
As before, the code is expected to leave a stack with as many values as the size of the aggregation buffer. The aggregation buffer will be updated with those values.
Lastly, the evaluate step is meant to emit a final value. The code is called with the following stack:
TOP: the STRING 'evaluate' 2: the aggregation buffer (a list)
It is expected to leave on top of the stack the final value (possibly a compound one) and nothing else.
Creating a User Defined Aggregate Function
A User Defined Aggregate Function is created by instructing the JVM running the Py4J gateway to create an instance of
io.warp10.spark.WarpScriptUDAF with the WarpScript code the UDAF will run.
Then the input, buffer and output types must be specified and
apply method must be called on the created object.
The example below creates a WarpScript UDAF which executes the WarpScript code in file
macro.mc2 with a row containing a column
input of type
double as the input schema, an aggregation buffer which is a row containing a single
agg of type
string and a return value of type
string too. The UDAF can also be flagged as deterministic, meaning that it will always produce the same output for a given input. This is to help the Spark executor make decisions when it has to reschedule a task.
from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType from pyspark.sql.types import DoubleType from pyspark.sql.types import ArrayType spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate() sc = spark.sparkContext insch = StructType([StructField("input", DoubleType(), True)]).json() buf = StructType([StructField("agg", StringType(), True)]).json() datatype = StringType().json() udaf = sc._jvm.io.warp10.spark.WarpScriptUDAF('@macro.mc2') udaf.setDeterministic(True) udaf.setInputSchema(insch) udaf.setBufferSchema(buf) udaf.setDataType(datatype) udaf = udaf.apply
Using a User Defined Aggregate Function
Once a User Defined Aggregate Function is defined, it must be wrapped into a
Column before it can be applied to data.
The wrapping process will also specify the input parameters to your WarpScript code, i.e. columns from the DataFrame which will be passed on the stack when your code is called.
from pyspark.sql.column import Column from pyspark.sql.column import _to_java_column from pyspark.sql.column import _to_seq // Create the wrapped UDAF, assuming the UDAF is the one defined above cudaf = Column(udaf(_to_seq(sc, ['col1', 'col2', ...], _to_java_column)))
The wrapped UDAF can then be used in your actual Spark analytics pipeline:
The UDAF can also be used with a window definition:
window_spec = Window.partitionBy("col1").orderBy("col2").rowsBetween(-1, 1) result = df.withColumn("foo", cudaf.over(window_spec))
Note: starting with version 2.3, Spark makes it easier to register UDAFs in PySpark, we'll update this page when this new way of doing things becomes available.
Running the job
A PySpark job is launched using the
spark-submit command. Some specific options must be passed to it to enable the WarpScript integration.
warp10-spark2 jar must be added to the job via the
warp.timeunits property must be set on both the driver and the worker via the following options of
--conf 'spark.driver.extraJavaOptions=-Dwarp.timeunits=us' --conf 'spark.executor.extraJavaOptions=-Dwarp.timeunits=us'
Alternatively, if additional Warp 10 configuration properties must be set, for example to load specific WarpScript extensions, the following options should be used:
--conf spark.executor.extraJavaOptions=-Dwarp10.config=warp10-spark.conf --conf spark.driver.extraJavaOptions=-Dwarp10.config=warp10-spark.conf --files warp10-spark.conf
Any WarpScript code referenced (via
@) in function invocations must be specified using the
Lastly, the actual Python (.py) file of the job should be the last parameter of the call to