Custom Transformers for Spark Dataframes
In Spark a transformer is used to convert a Dataframe in to another. But due to the immutability of Dataframes (i.e: existing values of a Dataframe cannot be changed), if we need to transform values in a column, we have to create a new column with those transformed values and add it to the existing Dataframe.
To create a transformer we simply need to extend the org.apache.spark.ml.Transformer class, and write our transforming logic inside the transform() method. Below are a couple of examples:
A simple transformer
This is a simple transformer, to get the given power, of each value of any column.
public class CustomTransformer extends Transformer { private static final long serialVersionUID = 5545470640951989469L; String column; int power = 1; CustomTransformer(String column, int power) { this.column = column; this.power = power; } @Override public String uid() { return "CustomTransformer" + serialVersionUID; } @Override public Transformer copy(ParamMap arg0) { return null; } @Override public DataFrame transform(DataFrame data) { return data.withColumn("power", functions.pow(data.col(this.column), this.power)); } @Override public StructType transformSchema(StructType arg0) { return arg0; } }
You can refer [1] for another similar example.
UDF transformer
We can also, register some custom logic as UDF in spark sql context, and then transform the Dataframe with spark sql, within our transformer.
Refer [2] for a sample which uses a UDF to extract part of a string in a column.
References:
[1] https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/MeanImputer.java
3 comments
Very interesting is what you write.
ReplyDeleteAussie hook
This comment has been removed by the author.
ReplyDeleteThis as nice and clear, thank you. Very newbie question, in your implementation of transformSchema, shouldn't you add the new field "power" to the given schema arg0. I'm not sure why you're just returning what was given.
ReplyDelete