Tuesday, December 2, 2014

Simple example using Apache Spark 1.1.1 to read from Cassandra 2.0.11


Building on what we have done in previous posts, we will now put together an Apache Spark application that will read from an Apache Cassandra table and spit out a bit of output.  Here again, this example will only scratch the surface of what is possible in hopes of getting you up and running quickly reading from Cassandra with Spark.

In this example:
  1. Set up a local, three node Cassandra cluster
    1. Create a keyspace
    2. Add and populate a simple key/value table
  2. Set up a local, two worker Spark cluster
  3. Build a simple Spark application
  4. Submit the Spark application to the local cluster
  5. See simple output in the console
  6. Grab a drink and celebrate - your world just got more awesome!

Set up your local Cassandra cluster

ACHTUNG! (2014-12-12)  As noted here, we now run all these examples against a local, single node Cassandra cluster.  You can use the tutorial from Datastax Academy to set this up.

If you have not done so already, go to my post on setting up a local Cassandra cluster.  Once that is done, come back here and continue to the next steps.

You can verify you are ready to go by executing ccm status from a terminal.  You should see output very similar to the following:
[bkarels@ahimsa simple]$ ccm status
Cluster: 'cluster3'
node1: UP
node3: UP
node2: UP
 Also, at least one of these nodes should be running on which you can verify using ccm [node name] show.
[bkarels@ahimsa simple]$ ccm node1 show
node1: UP
       thrift=('', 9160)
       binary=('', 9042)
       storage=('', 7000)

Create your keyspace

For this next set of tasks I use Datastx DevCenter - a free tool available from Datastax that you can download here.  DevCenter is a very intuitive tool that makes working with Cassandra clusters a breeze.  Take a little tour of DevCenter here.  Of  course, if you prefer other mechanisms for interacting with Cassandra feel free - the next bits of CQL will work regardless.

(NOTE: The following CQL is available in userdb.cql at the root of the example project that can be found here.)

Create a keyspace named 'userdb' (you can name it whatever you like, but that is the name used in this example) using the following CQL:
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};
If you are using DevCenter, you should be able to see your new keyspace in the schema viewer.

Create and populate a table

Execute the following CQL to create a table named 'mycount' and insert 10 rows of data into it:
CREATE TABLE mycount (
  value INT

INSERT INTO mycount (key,value) VALUES ('key0', 42);
INSERT INTO mycount (key,value) VALUES ('key1', 27);
INSERT INTO mycount (key,value) VALUES ('key2', 12);
INSERT INTO mycount (key,value) VALUES ('key3', 7);
INSERT INTO mycount (key,value) VALUES ('key4', 99);
INSERT INTO mycount (key,value) VALUES ('key5', 666);
INSERT INTO mycount (key,value) VALUES ('key6', 1000);
INSERT INTO mycount (key,value) VALUES ('key7', 0);
INSERT INTO mycount (key,value) VALUES ('key8', 6);
INSERT INTO mycount (key,value) VALUES ('key9', 1975);
You can verify this any number of ways, but again in DevCenter schema view you should see:

OK, you should now have a local, three node Cassandra cluster up and running with a keyspace, a table, and a tiny bit of data.

Spark Cluster Setup

If you have not done so already, go back to my post on setting up a local Apache Spark cluster.  Once you're done with that come back to here and continue.

If all has gone well, you should be able to open up a browser and see the following at

The Spark Application


For this application, you will need the following:
  1. Java 7 or higher (Oracle JDK recommended)
  2. Scala 2.10.4
  3. SBT 0.13.5
  4. Git
  5. Your running Cassandra cluster from above
  6. Your running Spark cluster from above
Begin by cloning the sbt-spark-cassandra-simple project from github.
[bkarels@ahimsa work]$ git clone git@github.com:bradkarels/sbt-spark-cassandra-simple.git simple
Initialized empty Git repository in /home/bkarels/work/simple/.git/
Enter passphrase for key '/home/bkarels/.ssh/id_rsa':
remote: Counting objects: 30, done.
remote: Compressing objects: 100% (6/6), done.
remote: Total 30 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (30/30), 4.56 KiB, done.
Resolving deltas: 100% (2/2), done.
Change directory into the project and start sbt:

(NOTE: Above the project was cloned into a directory name simple.  By default the project directory would be named sbt-spark-cassandra-simple.)
[bkarels@ahimsa work]$ cd simple
[bkarels@ahimsa simple]$ sbt
[info] Loading project definition from /home/bkarels/work/simple/project
[info] Updating {file:/home/bkarels/work/simple/project/}simple-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Simple Project (in build file:/home/bkarels/work/simple/)
The project has the assembly plugin built in, so from your sbt prompt you should be able to execute assemby to build the uber jar for the project:
> assembly
sbt will download the necessary files, lots of removed output here...
[info] SHA-1: 20faa94edf488c440ac372ba79f402e759a1a2a7
[info] Packaging /home/bkarels/work/simple/target/scala-2.10/simpleSpark.jar ...
[info] Done packaging.
[success] Total time: 13 s, completed Dec 2, 2014 11:10:08 AM
Take note of the highlighted path above.  This is where your Spark application has been placed.  The project is set up to name the resulting jar file 'simpleSpark.jar'.  You can set this value to whatever you like in assembly.sbt however, for this example we will use 'simpleSpark.jar'.

Now, if you have been following the example, you should be able to execute the following command from your terminal:
[bkarels@ahimsa simple]$ $SPARK_HOME/bin/spark-submit --class SimpleApp --master spark:// /home/bkarels/work/simple/target/scala-2.10/simpleSpark.jar
...and if the bits are in the right order you should see output like this:
Spark assembly has been built with Hive, including Datanucleus jars on classpath                                                                                                                              
14/12/02 11:25:13 WARN util.Utils: Your hostname, ahimsa resolves to a loopback address:; using instead (on interface eth0)                                                         
14/12/02 11:25:13 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address                                                                                                                  
14/12/02 11:25:13 INFO spark.SecurityManager: Changing view acls to: bkarels,                                                                                                                                 
14/12/02 11:25:13 INFO spark.SecurityManager: Changing modify acls to: bkarels,
14/12/02 11:27:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/02 11:27:50 INFO spark.SparkContext: Job finished: first at SimpleApp.scala:18, took 48.70891727 s
Row count is: 10
The first of which is:
CassandraRow{key: key4, value: 99}
VoilĂ !  Welcome to the bleeding edge.  The last three lines of output above (highlighted) come from SimpleApp.scala where the following line puts the bit together:
val out = s"Row count is: %d\nThe first of which is:\n%s".format(cnt, row.toString)
Here again, as with the other examples, this is very nearly the most simple example possible to use a Spark application to read data from Cassandra and do something with it.  Where you go from here is near limitless.

1 comment: