Custom Transformers for Spark Dataframes

In Spark a transformer is used to convert a Dataframe in to another. But due to the immutability of Dataframes  (i.e: existing values of a Dataframe cannot be changed), if we need to transform values in a column, we have to create a new column with those transformed values and add it to the existing Dataframe. 

To create a transformer we simply need to extend the org.apache.spark.ml.Transformer class, and write our transforming logic inside the transform() method. Below are a couple of examples:

A simple transformer

This is a simple transformer, to get the given power, of each value of any column.

public class CustomTransformer extends Transformer {
    private static final long serialVersionUID = 5545470640951989469L;
         String column;
         int power = 1;

    CustomTransformer(String column, int power) {
         this.column = column;
         this.power = power;
    }

    @Override
    public String uid() {
        return "CustomTransformer" + serialVersionUID;
    }

    @Override
    public Transformer copy(ParamMap arg0) {
        return null;
    }

    @Override
    public DataFrame transform(DataFrame data) {
        return data.withColumn("power", functions.pow(data.col(this.column), this.power));
    }

    @Override
    public StructType transformSchema(StructType arg0) {
        return arg0;
    }
}

You can refer [1]  for another similar example.

UDF transformer

We can also, register some custom logic as UDF in spark sql context, and then transform the Dataframe with spark sql, within our transformer.

Refer [2] for a sample which uses a UDF to extract part of a string in a column.


References:

[1] https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/MeanImputer.java
[2] https://github.com/SupunS/play-ground/blob/master/test.spark.client_2/src/main/java/RegexTransformer.java

Share:

3 comments

  1. This comment has been removed by the author.

    ReplyDelete
  2. This as nice and clear, thank you. Very newbie question, in your implementation of transformSchema, shouldn't you add the new field "power" to the given schema arg0. I'm not sure why you're just returning what was given.

    ReplyDelete