Java 8 version can look like that:
try (JavaSparkContext sc = new JavaSparkContext(configLocalMode("NaiveBayes exploiting TFIDF for spam classification in Java 8"))) {
HashingTF hash = new HashingTF(100000);
JavaRDD<String> file = localFile("sms-labeled.txt", sc);
JavaRDD<Tuple3<String, List<String>, Vector>> raw = file.distinct().map(
s -> s.split("\\t+")
).map(
a -> new Tuple2<>(a[0], Arrays.stream(a[1].split("\\s+")).map(w -> w.toLowerCase()).collect(Collectors.toList()))
).map(
t -> new Tuple3<>(t._1, t._2, hash.transform(t._2))
).cache();
IDFModel idf = new IDF().fit(raw.map(t -> t._3()).rdd());
JavaRDD<LabeledPoint> data = raw.map(t -> {
int label = 0;
if(t._1().equals("spam")){
label = 1;
}
return new LabeledPoint(label, idf.transform(t._3()));
});
JavaRDD<LabeledPoint>[] split = data.randomSplit(new double[] { .8, .2 });
JavaRDD<LabeledPoint> train = split[0].cache();
JavaRDD<LabeledPoint> test = split[1].cache();
NaiveBayesModel model = NaiveBayes.train(train.rdd());
evaluateModel(model, test);
}
And method for model evaluation:
test.take(5).forEach(x -> {
System.out.println(String.format("Predicted: %.1f, Label: %.1f", model.predict(x.features()), x.label()));
});
JavaPairRDD<Object, Object> predictionsAndLabels = test.mapToPair(
p -> new Tuple2<Object, Object>(model.predict(p.features()), p.label())
);
Stats stats = Stats.apply(confusionMatrix(predictionsAndLabels.rdd()));
System.out.println(stats.toString());
BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(predictionsAndLabels.rdd());
printMetrics(metrics);