Using Apache Cassandra with Apache Spark

Cassandra is available by default on port 9042. For example on host:

  val cassandraHost = "192.168.1.34"

To use it from Spark, location of Cassandra must be added to spark configuration as shown in introduction.

Function to load data from Cassandra go as follow:

def cassandraFile: (SparkContext => RDD[String]) = sc => {
    import com.datastax.spark.connector._
    sc.cassandraTable("spark", "bike_buyers").map { row => row.columnValues.mkString("\t") }
}

CassandraRows are mapped into Strings, only to keep the same form, as after reading from text file. More reasonably solution could transform rows directly into something more useful. To load data into Cassandra simple ETL program written in Scala can look like this:

object LoadBikeBuyers {

  def main(args: Array[String]): Unit = {

    org.apache.log4j.BasicConfigurator.configure()
    val host = args(0) 
    val cc = com.datastax.spark.connector.cql.CassandraConnector(Set(InetAddress.getByName(host)))

    val keyspaceCql = Source.fromInputStream(getClass.getResourceAsStream("/create_spark_keyspace.cql")).mkString
    val tableCql = Source.fromInputStream(getClass.getResourceAsStream("/create_bike_buyers_table.cql")).mkString

    val bbFile = Source.fromFile("data/bike-buyers.txt", "utf8").getLines()

    cc.withSessionDo(s => {
      s.execute(keyspaceCql)
      s.execute(tableCql)

      val columns = s.getCluster.getMetadata.getKeyspace("spark").getTable("bike_buyers").getColumns

      bbFile.map { x => x.split("\\t") }.foreach(row => {

        val insert = new QueryBuilder(s.getCluster).insertInto("spark", "bike_buyers")
        row.zipWithIndex.foreach(vi => {
          val column = columns(vi._2)
          insert.value(column.getName,
            if (column.getType == DataType.text()) vi._1
            else if (column.getType == DataType.cfloat()) vi._1.replaceFirst(",", ".").toFloat
            else vi._1.toInt)
        })
        s.execute(insert)
      })
    })
  }
}

Both scripts (create_spark_keyspace.cql, create_bike_buyers_table.cql) are on github.

After executing of LoadBikeBuyers.scala, keyspace “spark” and table “bike_buyers” are created, and content of bike-buyers file is loaded into it.

results matching ""

    No results matching ""