Scala example of Decision Tree classification algorithm used for prediction of prospective customer behavior
To properly run following example some prerequisites are required:
- AdventureWorks database,
- jdbc driver for Microsoft SQL Server.
After downloading, database should be installed, and driver should be copied e.g. to jars folder to be further distributed to Spark cluster.
User with limited DB rights can be created with this script:
CREATE LOGIN [aw-user] WITH PASSWORD = 'aw-pass'
CREATE USER [aw-user] FOR LOGIN [aw-user]
GRANT SELECT ON vTargetMail TO [aw-user]
GRANT SELECT ON ProspectiveBuyer TO [aw-user]
Jdbc URL for Express (free version of Microsoft SQL Server) can look like this one:
val url = "jdbc:sqlserver://zxspectrum\\SQL2008R2EXPRESS;databaseName=AdventureWorksDW2008R2;user=aw-user;password=aw-pass"
SQL query that provides input about customers (with information which of them bought a bicycle, stored in column BikeBuyer):
val query = "(SELECT CustomerKey, Age, BikeBuyer, CommuteDistance, EnglishEducation, Gender, HouseOwnerFlag, MaritalStatus, NumberCarsOwned, NumberChildrenAtHome, EnglishOccupation, Region, TotalChildren, YearlyIncome FROM dbo.vTargetMail) as bikebuyers"
Connection properties should contain class name of Microsoft jdbc driver. Along with jdbc url, and query this is enough to load data into Dataframe:
val connectionProperties = new Properties()
connectionProperties.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
val bikeBuyers = spark.read.jdbc(url, queryBikeBuyers, connectionProperties)
Show function can be used on this dataframe to sample data, or printSchema to print schema in nice format to the console.
In Spark 2 the main concept of ML workflow is a Pipeline that chains multiple Transformers and Estimators.
Here the pipeline is composed of RFormula, VectorIndexer, DecisionTreeClassifier transformers and estimators.
RFormula to select subset of columns and produce a vector of features and a column of label, VectorIndexer to index categorical features in intermediate dataset and DecisionTreeClassifier to create learning model.
val rformula = new RFormula()
.setFormula("BikeBuyer ~ EnglishEducation + Gender + HouseOwnerFlag + MaritalStatus + NumberCarsOwned + NumberChildrenAtHome + EnglishOccupation + TotalChildren + YearlyIncome ")
.setFeaturesCol("features")
.setLabelCol("label")
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(2)
val dtree = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxBins(34)
.setMaxDepth(20)
.setImpurity("entropy")
val pipeline = new Pipeline().setStages(Array(rformula, featureIndexer, dtree))
val model = pipeline.fit(bikeBuyers)
To check how trained model works dataset of potential customers is to be used. SQL query that provides list of potential customers, as well as some characteristics about the customers:
val queryProspectiveBuyers = """(SELECT 0 as BikeBuyer, LastName, FirstName, MaritalStatus, Gender, YearlyIncome, TotalChildren, NumberChildrenAtHome,
EnglishEducation = CASE Education
WHEN 'High Schoo' THEN 'High School'
WHEN 'Partial Hi' THEN 'Partial High School'
WHEN 'Partial Co' THEN 'Partial College'
WHEN 'Graduate D' THEN 'Graduate Degree'
WHEN 'Bachelors' THEN 'Bachelors'
END, Occupation as EnglishOccupation, HouseOwnerFlag, NumberCarsOwned FROM dbo.ProspectiveBuyer) as prospectivebuyers"""
Case statement is used to fill gaps in dataset.
val prospectiveBuyers = spark.read.jdbc(url, queryProspectiveBuyers, connectionProperties)
val prospective = model.transform(prospectiveBuyers)
// Select example rows to display predicted as 1
prospective.select("LastName", "FirstName", "features").filter("prediction == 1").show(false)