Spark 1.x basic introduction
First thing to do in Apache Spark application is to create SparkConf object that contains information about application, which is farther required by SparkContext object as its constructor parameter.
It is important that the only one SparkContext may be active per JVM, and should be stopped before creating new context or before exiting current application.
Functions that return the configuration for different modes are kept in Application object. First for running application in local mode with as many worker threads as logical cores available:
def configLocalMode(appName: String): SparkConf = {
val conf = new SparkConf().setAppName(appName)
conf.setMaster("local[*]")
conf.set("spark.cassandra.connection.host", cassandraHost)
conf
}
}
Parameter which sets access to Apache Cassandra database is optional.
For running application in Standalone Cluster mode following function can be used:
def configStandaloneClusterMode(appName: String): SparkConf = {
val conf = new SparkConf().setAppName(appName)
conf.setMaster(sparkMaster)
conf.setJars(Array("build/libs/spark-examples-1.0.jar"))
conf.set("spark.cassandra.connection.host", cassandraHost)
conf
}
Variable sparkMaster should be pointing at existing Spark Standalone cluster.
To leverage existing Hadoop infrastructure Spark application can be executed on YARN. According to documentation two options are available: yarn-client and yarn-cluster. The most noticeable difference between them is whether you want to receive computational results on your driver application or not. The whole thing is best described at Cloudera blog. It seems that executing on yarn any Spark application can be done either by using spark-submit script or SparkLaucher class (available since version 1.4). SparkLaucher uses java ProcessBuilder class to execute spark-submit script in separate process. It is convenient to read streams connected to created process outputs. Small utility class for doing this task can look like that:
class RunnableInputStreamReader(is: InputStream, name: String) extends Runnable {
val reader = new BufferedReader(new InputStreamReader(is))
def run() = {
var line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
}
}
To run Spark in yarn-client or yarn-cluster mode, SPARK_HOME and HADOOP_CONF_DIR or YARN_CONF_DIR variables must be set. Environment variables can also be provided in SparkLauncher class' contructor parameter or in case of SPARK_HOME through method setSparkHome. Utility class executing Spark application on existing Hadoop cluster can look like that:
object YarnLauncher {
val mode = "yarn-client"
val mainClass = "examples.regression.HousePricesPrediction"
def main(args: Array[String]): Unit = {
val launcher = new SparkLauncher()
.setAppResource("build/libs/spark-examples-1.0.jar")
.setMainClass(mainClass)
.setMaster(mode)
.launch();
val tf = Executors.defaultThreadFactory()
tf.newThread(new RunnableInputStreamReader(launcher.getInputStream(), "input")).start()
tf.newThread(new RunnableInputStreamReader(launcher.getErrorStream(), "error")).start()
println("Executing ...")
val exitCode = launcher.waitFor()
println(s"Finished, exit code: $exitCode")
}
}
When Spark works in standalone mode, files should be located on network storage available for every Spark’s workers or copied to exactly the same location on every node. Using Spark on Yarn requires putting data on HDFS or some other distributed storage like S3. Reading local file go as follows:
def localFile(fileName: String): (SparkContext => RDD[String]) = sc => {
sc.textFile("data/" + file)
}
Using HDFS requires Apache Hadoop configured and run. The only difference in code is that instead of providing file path, HDFS URL is to be supplied. 192.168.1.34:19000 reflects my local network Hadoop Cluster configuration, so it ought to be replaced with some alternative.
def hdfsFile(fileName: String): (SparkContext => RDD[String]) = sc => {
sc.textFile("hdfs://192.168.1.34:19000/spark/" + fileName)
}