Typed Spark ML

The frameless-ml module provides a strongly typed Spark ML API leveraging TypedDatasets. It introduces TypedTransformers and TypedEstimators, the type-safe equivalents of Spark ML's Transformer and Estimator.

A TypedEstimator fits models to data, i.e trains a ML model based on an input TypedDataset. A TypedTransformer transforms one TypedDataset into another, usually by appending column(s) to it.

By calling the fit method of a TypedEstimator, the TypedEstimator will train a ML model using the TypedDataset passed as input (representing the training data) and will return a TypedTransformer that represents the trained model. This TypedTransformercan then be used to make predictions on an input TypedDataset (representing the test data) using the transform method that will return a new TypedDataset with appended prediction column(s).

Both TypedEstimator and TypedTransformer check at compile-time the correctness of their inputs field names and types, contrary to Spark ML API which only deals with DataFrames (the data structure with the lowest level of type-safety in Spark).

frameless-ml adds type-safety to Spark ML API but stays very close to it in terms of abstractions and API calls, so please check Spark ML documentation for more details on Transformers and Estimators.

Example 1: predict a continuous value using a TypedRandomForestRegressor

In this example, we want to predict the sale price of a house depending on its square footage and the fact that the house has a garden or not. We will use a TypedRandomForestRegressor.

Training

As with the Spark ML API, we use a TypedVectorAssembler (the type-safe equivalent of VectorAssembler) to compute feature vectors:

import frameless._
import frameless.syntax._
import frameless.ml._
import frameless.ml.feature._
import frameless.ml.regression._
import org.apache.spark.ml.linalg.Vector
case class HouseData(squareFeet: Double, hasGarden: Boolean, price: Double)
// defined class HouseData

val trainingData = TypedDataset.create(Seq(
  HouseData(20, false, 100000),
  HouseData(50, false, 200000),
  HouseData(50, true, 250000),
  HouseData(100, true, 500000)
))
// trainingData: frameless.TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]

case class Features(squareFeet: Double, hasGarden: Boolean)
// defined class Features

val assembler = TypedVectorAssembler[Features]
// assembler: frameless.ml.feature.TypedVectorAssembler[Features] = frameless.ml.feature.TypedVectorAssembler@45e2fd11

case class HouseDataWithFeatures(squareFeet: Double, hasGarden: Boolean, price: Double, features: Vector)
// defined class HouseDataWithFeatures

val trainingDataWithFeatures = assembler.transform(trainingData).as[HouseDataWithFeatures]
// trainingDataWithFeatures: frameless.TypedDataset[HouseDataWithFeatures] = [squareFeet: double, hasGarden: boolean ... 2 more fields]

In the above code snippet, .as[HouseDataWithFeatures] is a TypedDataset's type-safe cast (see TypedDataset: Feature Overview):

case class WrongHouseFeatures(
  squareFeet: Double,
  hasGarden: Int, // hasGarden has wrong type
  price: Double,
  features: Vector
)
assembler.transform(trainingData).as[WrongHouseFeatures]
// <console>:39: error: could not find implicit value for parameter as: frameless.ops.As[(Double, Boolean, Double, org.apache.spark.ml.linalg.Vector),WrongHouseFeatures]
//        assembler.transform(trainingData).as[WrongHouseFeatures]
//                                            ^

Moreover, TypedVectorAssembler[Features] will compile only if Features contains exclusively fields of type Numeric or Boolean:

case class WrongFeatures(squareFeet: Double, hasGarden: Boolean, city: String)
TypedVectorAssembler[WrongFeatures]
// <console>:37: error: Cannot prove that WrongFeatures is a valid input type. Input type must only contain fields of numeric or boolean types.
//        TypedVectorAssembler[WrongFeatures]
//                            ^

The subsequent call assembler.transform(trainingData) compiles only if trainingData contains all fields (names and types) of Features:

case class WrongHouseData(squareFeet: Double, price: Double) // hasGarden is missing
// defined class WrongHouseData

val wrongTrainingData = TypedDataset.create(Seq(WrongHouseData(20, 100000)))
// wrongTrainingData: frameless.TypedDataset[WrongHouseData] = [squareFeet: double, price: double]
assembler.transform(wrongTrainingData)
// <console>:37: error: Cannot prove that WrongHouseData can be projected to Features. Perhaps not all member names and types of Features are the same in WrongHouseData?
//        assembler.transform(wrongTrainingData)
//                           ^

Then, we train the model. To train a Random Forest, one needs to feed it with features (what we predict from) and with a label (what we predict). In our example, price is the label, features are the features:

case class RFInputs(price: Double, features: Vector)
// defined class RFInputs

val rf = TypedRandomForestRegressor[RFInputs]
// rf: frameless.ml.regression.TypedRandomForestRegressor[RFInputs] = frameless.ml.regression.TypedRandomForestRegressor@564fdc23

val model = rf.fit(trainingDataWithFeatures).run()
// model: frameless.ml.AppendTransformer[RFInputs,frameless.ml.regression.TypedRandomForestRegressor.Outputs,org.apache.spark.ml.regression.RandomForestRegressionModel] = frameless.ml.TypedEstimator$$anonfun$fit$1$$anon$1@77489a50

TypedRandomForestRegressor[RFInputs] compiles only if RFInputs contains only one field of type Double (the label) and one field of type Vector (the features):

case class WrongRFInputs(labelOfWrongType: String, features: Vector)
TypedRandomForestRegressor[WrongRFInputs]
// <console>:37: error: Cannot prove that WrongRFInputs is a valid input type. Input type must only contain a field of type Double (the label) and a field of type org.apache.spark.ml.linalg.Vector (the features).
//        TypedRandomForestRegressor[WrongRFInputs]
//                                  ^

The subsequent rf.fit(trainingDataWithFeatures) call compiles only if trainingDataWithFeatures contains the same fields (names and types) as RFInputs.

val wrongTrainingDataWithFeatures = TypedDataset.create(Seq(HouseData(20, false, 100000))) // features are missing
// wrongTrainingDataWithFeatures: frameless.TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]
rf.fit(wrongTrainingDataWithFeatures) 
// <console>:37: error: Cannot prove that HouseData can be projected to RFInputs. Perhaps not all member names and types of RFInputs are the same in HouseData?
//        rf.fit(wrongTrainingDataWithFeatures)
//              ^

Prediction

We now want to predict price for testData using the previously trained model. Like the Spark ML API, testData has a default value for price (0 in our case) that will be ignored at prediction time. We reuse our assembler to compute the feature vector of testData.

val testData = TypedDataset.create(Seq(HouseData(70, true, 0)))
// testData: frameless.TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]

val testDataWithFeatures = assembler.transform(testData).as[HouseDataWithFeatures]
// testDataWithFeatures: frameless.TypedDataset[HouseDataWithFeatures] = [squareFeet: double, hasGarden: boolean ... 2 more fields]

case class HousePricePrediction(
  squareFeet: Double,
  hasGarden: Boolean,
  price: Double,
  features: Vector,
  predictedPrice: Double
)
// defined class HousePricePrediction

val predictions = model.transform(testDataWithFeatures).as[HousePricePrediction]
// predictions: frameless.TypedDataset[HousePricePrediction] = [squareFeet: double, hasGarden: boolean ... 3 more fields]

predictions.select(predictions.col('predictedPrice)).collect.run()
// res6: Seq[Double] = WrappedArray(420000.0)

model.transform(testDataWithFeatures) will only compile if testDataWithFeatures contains a field price of type Double and a field features of type Vector:

model.transform(testData)
// <console>:37: error: Cannot prove that HouseData can be projected to RFInputs. Perhaps not all member names and types of RFInputs are the same in HouseData?
//        model.transform(testData)
//                       ^

Example 2: predict a categorical value using a TypedRandomForestClassifier

In this example, we want to predict in which city a house is located depending on its price and its square footage. We use a TypedRandomForestClassifier.

Training

As with the Spark ML API, we use a TypedVectorAssembler to compute feature vectors and a TypedStringIndexer to index city values in order to be able to pass them to a TypedRandomForestClassifier (which only accepts Double values as label):

import frameless.ml.classification._
case class HouseData(squareFeet: Double, city: String, price: Double)
// defined class HouseData

val trainingData = TypedDataset.create(Seq(
  HouseData(100, "lyon", 100000),
  HouseData(200, "lyon", 200000),
  HouseData(100, "san francisco", 500000),
  HouseData(150, "san francisco", 900000)
))
// trainingData: frameless.TypedDataset[HouseData] = [squareFeet: double, city: string ... 1 more field]

case class Features(price: Double, squareFeet: Double)
// defined class Features

val vectorAssembler = TypedVectorAssembler[Features]
// vectorAssembler: frameless.ml.feature.TypedVectorAssembler[Features] = frameless.ml.feature.TypedVectorAssembler@39bf007a

case class HouseDataWithFeatures(squareFeet: Double, city: String, price: Double, features: Vector)
// defined class HouseDataWithFeatures

val dataWithFeatures = vectorAssembler.transform(trainingData).as[HouseDataWithFeatures]
// dataWithFeatures: frameless.TypedDataset[HouseDataWithFeatures] = [squareFeet: double, city: string ... 2 more fields]

case class StringIndexerInput(city: String)
// defined class StringIndexerInput

val indexer = TypedStringIndexer[StringIndexerInput]
// indexer: frameless.ml.feature.TypedStringIndexer[StringIndexerInput] = frameless.ml.feature.TypedStringIndexer@296abbf6

val indexerModel = indexer.fit(dataWithFeatures).run()
// indexerModel: frameless.ml.AppendTransformer[StringIndexerInput,frameless.ml.feature.TypedStringIndexer.Outputs,org.apache.spark.ml.feature.StringIndexerModel] = frameless.ml.TypedEstimator$$anonfun$fit$1$$anon$1@56fd3ce9

case class HouseDataWithFeaturesAndIndex(
  squareFeet: Double,
  city: String,
  price: Double,
  features: Vector,
  cityIndexed: Double
)
// defined class HouseDataWithFeaturesAndIndex

val indexedData = indexerModel.transform(dataWithFeatures).as[HouseDataWithFeaturesAndIndex]
// indexedData: frameless.TypedDataset[HouseDataWithFeaturesAndIndex] = [squareFeet: double, city: string ... 3 more fields]

Then, we train the model:

case class RFInputs(cityIndexed: Double, features: Vector)
// defined class RFInputs

val rf = TypedRandomForestClassifier[RFInputs]
// rf: frameless.ml.classification.TypedRandomForestClassifier[RFInputs] = frameless.ml.classification.TypedRandomForestClassifier@19a5665

val model = rf.fit(indexedData).run()
// model: frameless.ml.AppendTransformer[RFInputs,frameless.ml.classification.TypedRandomForestClassifier.Outputs,org.apache.spark.ml.classification.RandomForestClassificationModel] = frameless.ml.TypedEstimator$$anonfun$fit$1$$anon$1@125bc973

Prediction

We now want to predict city for testData using the previously trained model. Like the Spark ML API, testData has a default value for city (empty string in our case) that will be ignored at prediction time. We reuse our vectorAssembler to compute the feature vector of testData and our indexerModel to index city.

val testData = TypedDataset.create(Seq(HouseData(120, "", 800000)))
// testData: frameless.TypedDataset[HouseData] = [squareFeet: double, city: string ... 1 more field]

val testDataWithFeatures = vectorAssembler.transform(testData).as[HouseDataWithFeatures]
// testDataWithFeatures: frameless.TypedDataset[HouseDataWithFeatures] = [squareFeet: double, city: string ... 2 more fields]

val indexedTestData = indexerModel.transform(testDataWithFeatures).as[HouseDataWithFeaturesAndIndex]
// indexedTestData: frameless.TypedDataset[HouseDataWithFeaturesAndIndex] = [squareFeet: double, city: string ... 3 more fields]

case class HouseCityPredictionInputs(features: Vector, cityIndexed: Double)
// defined class HouseCityPredictionInputs

val testInput = indexedTestData.project[HouseCityPredictionInputs]
// testInput: frameless.TypedDataset[HouseCityPredictionInputs] = [features: vector, cityIndexed: double]

case class HouseCityPredictionIndexed(
  features: Vector,
  cityIndexed: Double,
  rawPrediction: Vector,
  probability: Vector,
  predictedCityIndexed: Double
)
// defined class HouseCityPredictionIndexed

val indexedPredictions = model.transform(testInput).as[HouseCityPredictionIndexed]
// indexedPredictions: frameless.TypedDataset[HouseCityPredictionIndexed] = [features: vector, cityIndexed: double ... 3 more fields]

Then, we use a TypedIndexToString to get back a String value from predictedCityIndexed. TypedIndexToString takes as input the label array computed by our previous indexerModel:

case class IndexToStringInput(predictedCityIndexed: Double)
// defined class IndexToStringInput

val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labels)
// indexToString: frameless.ml.feature.TypedIndexToString[IndexToStringInput] = frameless.ml.feature.TypedIndexToString@2fb5a588

case class HouseCityPrediction(
  features: Vector,
  cityIndexed: Double,
  rawPrediction: Vector,
  probability: Vector,
  predictedCityIndexed: Double,
  predictedCity: String
)
// defined class HouseCityPrediction

val predictions = indexToString.transform(indexedPredictions).as[HouseCityPrediction]
// predictions: frameless.TypedDataset[HouseCityPrediction] = [features: vector, cityIndexed: double ... 4 more fields]

predictions.select(predictions.col('predictedCity)).collect.run()
// res8: Seq[String] = WrappedArray(san francisco)

List of currently implemented TypedEstimators

List of currently implemented TypedTransformers

Using Vector and Matrix with TypedDataset

frameless-ml provides TypedEncoder instances for org.apache.spark.ml.linalg.Vector and org.apache.spark.ml.linalg.Matrix:

import frameless._
import frameless.ml._
import org.apache.spark.ml.linalg._
val vector = Vectors.dense(1, 2, 3)
// vector: org.apache.spark.ml.linalg.Vector = [1.0,2.0,3.0]

val vectorDs = TypedDataset.create(Seq("label" -> vector))
// vectorDs: frameless.TypedDataset[(String, org.apache.spark.ml.linalg.Vector)] = [_1: string, _2: vector]

val matrix = Matrices.dense(2, 1, Array(1, 2))
// matrix: org.apache.spark.ml.linalg.Matrix =
// 1.0
// 2.0

val matrixDs = TypedDataset.create(Seq("label" -> matrix))
// matrixDs: frameless.TypedDataset[(String, org.apache.spark.ml.linalg.Matrix)] = [_1: string, _2: matrix]

Under the hood, Vector and Matrix are encoded using org.apache.spark.ml.linalg.VectorUDT and org.apache.spark.ml.linalg.MatrixUDT. This is possible thanks to the implicit derivation from org.apache.spark.sql.types.UserDefinedType[A] to TypedEncoder[A] defined in TypedEncoder companion object.

results matching ""

    No results matching ""