Tuesday, December 16, 2014

Simple Python application on Apache Spark Cluster

Overview


As an exercise, I am working on duplicating my previous examples in Python.  It is clear that Python has, and is gaining, traction in the data world.  So, it makes sense to have a working knowledge of it.

As with my other examples, everything will find it's way to my Github repositories - forking and enhancements welcome.

This effort is about the most simplistic Python submit to Spark Cluster example possible.  But, when you move beyond the REPL, you have to start somewhere right?

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • A Spark cluster (how to do it here.)
  • git


Clone the Example

Begin by cloning the example project from github - super-simple-spark-python-app and cd into the project directory.


[bkarels@ahimsa work]$ git clone git@github.com:bradkarels/super-simple-spark-python-app.git
Initialized empty Git repository in /home/bkarels/work/super-simple-spark-python-app/.git/
remote: Counting objects: 13, done.
remote: Compressing objects: 100% (12/12), done.
remote: Total 13 (delta 3), reused 6 (delta 1)
Receiving objects: 100% (13/13), done.
Resolving deltas: 100% (3/3), done.
[bkarels@ahimsa work]$ cd super-simple-spark-python-app/


Move the file tenzingyatso.txt to your home directory.
[bkarels@ahimsa super-simple-spark-python-app]$ mv tenzingyatso.txt ~

Modify simple.py path to the sample file (save and close).

file = sc.textFile("/home/bkarels/tenzingyatso.txt")
becomes...
file = sc.textFile("/home/yourUserNameHere/tenzingyatso.txt")
...or some such similar thing.

Spark it up (with python)!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)


Since this example does not have a packaged application (e.g. jar, egg, etc.), we can invoke spark-submit with just our simple python file.


[bkarels@ahimsa super-simple-spark-python-app]$ $SPARK_HOME/bin/spark-submit --master spark://127.0.0.1:7077 ./simple.py
Your expected output to the console should be a line count of 7 wrapped in a nice battery of asterisks and the copy from the first line of the example file.  If you see that - this has worked.

Tuesday, December 9, 2014

Writing to Cassandra 2.0.x with Spark 1.1.1 - moving beyond Tuple2

UPDATE! (2014-12-12): Wide tuple write example added.  I won't lie, it is not elegant - but worth knowing the basic concepts of how to work with tuple bigger than two.  Pull the updated project code to check it out.

Overview

The most rudimentary mechanisms for writing Tuple2 to Cassandra are pretty well beaten to a pulp.  In this post I hope to shine light on few additional, more complex ways to write data to Cassandra as not everything we do with a Spark/Cassandra stack will be reduceByKey on key/value pairs.

In the first example in this series we will fetch some data from Cassandra directly into a case class.  Then we will transform that RDD into a more compact version of itself and write the resulting collection of case classes back to a different table in our Cassandra cluster.

We will be sticking to our 'human' example from previous examples however, I have tuned the schema a bit.  To keep your life simple, I recommend executing the updated CQL which will create a new keyspace and tables for this example.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • A Cassandra cluster (how to do it here.)
  • git

Clone the Example

Begin by cloning the example project from github - sbt-spark-cassandra-writing, & cd into the project directory.

[bkarels@ahimsa work]$ git clone https://github.com/bradkarels/sbt-spark-cassandra-writing.git simple-writing
Initialized empty Git repository in /home/bkarels/work/simple-writing/.git/
remote: Counting objects: 21, done.
remote: Compressing objects: 100% (15/15), done.
remote: Total 21 (delta 0), reused 17 (delta 0)
Unpacking objects: 100% (21/21), done.
[bkarels@ahimsa work]$ cd simple-writing/
[bkarels@ahimsa simple-writing]$

Prepare the Data

At the root of the project you will see two CQL files: sparkfu.cql and populateHumans.cql.  You will need to execute these two files against your local Cassandra instance from Datastax DevCenter or cqlsh (or some other tool) to set things up.

Begin by executing sparkfu.cql to create your keyspace and tables:

CREATE KEYSPACE sparkfu WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE sparkfu.human (
    id TIMEUUID,
    firstname TEXT,
    lastname TEXT,
    gender TEXT,
    address0 TEXT,
    address1 TEXT,
    city TEXT,
    stateprov TEXT,
    zippostal TEXT,
    country TEXT,
    phone TEXT,
    isgoodperson BOOLEAN,
    PRIMARY KEY(id)
);

// Letting Cassandra use default names for the indexs for ease.
CREATE INDEX ON sparkfu.human ( isgoodperson ); // We want to be able to find good people quickly

CREATE INDEX ON sparkfu.human ( stateprov ); // Maybe we need good people by state?

CREATE INDEX ON sparkfu.human ( firstname ); // Good people tend to be named "Brad" - let's find them fast too!

// Clearly this is a horrible model you would never ever use in production, but since this is just a simple example.
CREATE TABLE sparkfu.goodhuman (
    firstname TEXT,
    lastname TEXT,
    PRIMARY KEY(firstname,lastname)
);
Next load up your table with the sample data by executing populateHumans.cql

INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Pete', 'Jones', 'm', '555 Astor Lane',null,'Minneapolis','MN','55401','USA','6125551212',True);
...
[Some CQL removed for brevity - file in project source. ]
...
INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Brad', 'Karels', 'm', '123 Nice Guy Blvd.',null,'Minneapolis','MN','55402','USA','6125551212',True); 
INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Alysia', 'Yeoh', 't', '1 Bat Girl Way',null,'Metropolis','YX','55666','USA','3215551212',True);
Errors?  No.  Good - we are set to proceed.

Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@ahimsa simple-writing]$ sbt
[info] Loading project definition from /home/bkarels/work/simple-writing/project
[info] Updating {file:/home/bkarels/work/simple-writing/project/}simple-writing-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                                    
[info] Done updating.                                                                  
[info] Set current project to Sandy Author (in build file:/home/bkarels/work/simple-writing/)
> assembly                                                                                  
[info] Updating {file:/home/bkarels/work/simple-writing/}simple-writing...                  
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                                         
[info] Done updating.                                                                       
[info] Compiling 1 Scala source to /home/bkarels/work/simple-writing/target/scala-2.10/classes...
...
[ a whole bunch of assembly output will be here... ]
...
[info] SHA-1: 0202b523259e5688311e4b2bcb16c63ade4b7067
[info] Packaging /home/bkarels/work/simple-writing/target/scala-2.10/SandyAuthor.jar ...
[info] Done packaging.
[success] Total time: 19 s, completed Dec 10, 2014 4:41:29 PM
>
As per normal, take note of where your jar file gets put (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to SandyAuthor.jar(You can only say Cassandra and Writer so many times - having some fun with jar names...)

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

In keeping with our theme of finding good people, we'll do it again.  But this time, as you will see in the project source, we will fetch only good people in the first place, transform them into a simplified RDD, and store them off safely separated from all the not good people.

From your terminal, using the location of SandyAuthor.jar from above, submit the application to your Spark cluster:

[bkarels@ahimsa simple-writing]$ $SPARK_HOME/bin/spark-submit --class com.sparkfu.simple.Writer --master spark://127.0.0.1:7077 /home/bkarels/dev/simple-writing/target/scala-2.10/SandyAuthor.jar
...
[ lots of Spark output dumped to terminal here...]
...
14/12/10 16:48:51 INFO spark.SparkContext: Job finished: toArray at SimpleWriting.scala:34, took 0.641733148 s
Alysia Yeoh is a good, simple person.
Edward Snowden is a good, simple person.
Fatima Nagossa is a good, simple person.
Pete Jones is a good, simple person.
Brad Karels is a good, simple person.
Mother Theresa is a good, simple person.
Hiro Ryoshi is a good, simple person.
Neil Harris is a good, simple person.
B Real is a good, simple person.
...
[and then the wide tuple example output]
...

Alysia Yeoh is transgender and can be reached at 3215551212.                                                                                                                                                                                
Neil Harris is male and can be reached at 9045551212.                                                                                                                                                                                       
Mother Theresa is female and can be reached at null.                                                                                                                                                                                        
Hiro Ryoshi is female and can be reached at 7155551212.                                                                                                                                                                                     
Brad Karels is male and can be reached at 6125551212.                                                                                                                                                                                       
Pete Jones is male and can be reached at 6125551212.                                                                                                                                                                                        
Edward Snowden is male and can be reached at null.                                                                                                                                                                                          
Fatima Nagossa is female and can be reached at 7895551212.                                                                                                                                                                                  
B Real is male and can be reached at 9995551212.

[bkarels@ahimsa simple-writing]$
If all has gone well you should see output like the above.  Please note, we are also TRUNCATING the goodperson and personfromtuple tables just before the program exits, so if you look in Cassandra those tables will be empty.  Feel free to comment out those lines and validate directly in Cassandra.

Then play around with it.  Extend the case classes, add columns to the schema, add more transformations, etc..  These simple examples will serve you best if you extend and make them your own.

What's next?

There is likely good cause to start playing with creating and altering table structures dynamically.

Monday, December 8, 2014

Spark 1.1.1 to read from Cassandra into scala case class - Simple Example

As we continue to advance our use cases for Apache Spark on Cassandra it seems only right that our next case puts data into a case class.

Following previous examples, this will be the most simple use case - more elegant and advanced usages are coming...

Update! (2014-12-10): I have added fetching Cassandra data direct into a case class as intended by the connector.  The source for the example has been updated and pushed to the repository.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • A Cassandra cluster (how to do it here.)
  • git

Overview

For this example we will pull data for various humans out of Cassandra and put the relevant bits into our model.  Then we will see about working with only good persons (as determined by the creators of our data - our algorithm is not that advanced...yet).

Clone the Example

Begin by cloning the example project from github - our project is spark-cassandra-to-scala-case-class, & cd into the project directory.
[bkarels@ahimsa work]$ git clone https://github.com/bradkarels/spark-cassandra-to-scala-case-class.git simple-case
Initialized empty Git repository in /home/bkarels/work/simple-case/.git/
remote: Counting objects: 21, done.
remote: Compressing objects: 100% (15/15), done.
remote: Total 21 (delta 0), reused 17 (delta 0)
Unpacking objects: 100% (21/21), done.
[bkarels@ahimsa work]$ cd simple-case/
[bkarels@ahimsa simple-case]$ l
total 32K
-rw-rw-r--. 1 bkarels bkarels 1.2K Dec  8 12:24 assembly.sbt
-rw-rw-r--. 1 bkarels bkarels  298 Dec  8 12:24 build.sbt
-rw-rw-r--. 1 bkarels bkarels 1.1K Dec  8 12:24 LICENSE
-rw-rw-r--. 1 bkarels bkarels 1.1K Dec  8 12:24 nicecase.cql
-rw-rw-r--. 1 bkarels bkarels 3.2K Dec  8 12:24 populateHumans.cql
drwxrwxr-x. 2 bkarels bkarels 4.0K Dec  8 12:24 project
-rw-rw-r--. 1 bkarels bkarels  228 Dec  8 12:24 README.md
drwxrwxr-x. 3 bkarels bkarels 4.0K Dec  8 12:24 src

Prepare the Data

At the root of the project you will see two CQL files: nicecase.cql & populateHumans.cql.  You will need to execute these two files against your local Cassandra instance from Datastax DevCenter or cqlsh (or some other tool) to set things up.

Begin by executing nicecase.cql to create your keyspace and tables:
CREATE KEYSPACE nicecase WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE nicecase.human (
    id TIMEUUID,
    firstname TEXT,
    lastname TEXT,
    gender TEXT,
    address0 TEXT,
    address1 TEXT,
    city TEXT,
    stateprov TEXT,
    zippostal TEXT,
    country TEXT,
    phone TEXT,
    "isGoodPerson" BOOLEAN, // Case sensitive column name - generally not recommended, but possible.
    PRIMARY KEY(id)
);

CREATE INDEX good_person ON nicecase.human ( "isGoodPerson" ); // We want to be able to find good people quickly

CREATE INDEX person_state ON nicecase.human ( stateprov ); // Maybe we need good people by state?

CREATE INDEX ON nicecase.human ( firstname ); // Let Cassandra use the default name for the index.  Good people tend to be named "Brad" - let's find them fast too!

CREATE TABLE nicecase.goodhuman (
    human_id TIMEUUID,
    PRIMARY KEY(human_id)
);

CREATE TABLE nicecase.badhuman (
    human_id TIMEUUID,
    PRIMARY KEY(human_id)
);

CREATE TABLE nicecase.goodbrad (
    human_id TIMEUUID,
    firstname TEXT,
    PRIMARY KEY(human_id)
);
There are a few extra bits in here that we won't be using in this example (indexes, extra tiny tables), but they won't hurt anything and we will likely use them in our next example or two.

Next, execute populateHumans.cql to load up our human table with some data.
 INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Pete', 'Jones', 'm', '555 Astor Lane',null,'Minneapolis','MN','55401','USA','6125551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Neil', 'Harris', 'm', '123 Doogie Howser Way',null,'Los Angeles','CA','90211','USA','9045551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Hiro', 'Ryoshi', 'f', '42 Cemetary Road','Apt. 23','River Falls','WI','55301','USA','7155551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Biel', 'SaBubb', 'm', '666 Hellpass',null,'Demonville','KY','32054','USA','3125551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Edward', 'Snowden', 'm', null,null,null,null,null,'RU',null,True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Mother', 'Theresa', 'f', null,null,null,null,null,null,null,True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Fatima', 'Nagossa', 'f', '689 First Ave.','Apt 1b','Orlando','FL','32822','USA','7895551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Casey', 'Steals-a-lot', 'f', '71 Buster Lane',null,'Denver','CO','74811','USA','7895551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'B', 'Real', 'm', '420 High Way','I forgot','Palo Alto','CA','90255','USA','9995551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'JJ', 'Jones', 't', '123 Sycamore Way',null,'Las Cruces','CA','91553','USA','9995551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Diane', 'Feinstein', '?', 'Do not care',null,'Some City','CA','99999','USA','1235551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Brad', 'Karels', 'm', '123 Nice Guy Blvd.',null,'Minneapolis','MN','55402','USA','6125551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Alysia', 'Yeoh', 't', '1 Bat Girl Way',null,'Metropolis','YX','55666','USA','3215551212',True);
There is a lot more data here than we will use for this example, but again, the near future is coming...  Most notable here is how we use the CQL function now() to set the value for our primary key of type TIMEUUID.  We are also, quite intentionally, leaving much of the data values null.  Real data has nulls, might as well get the hang of dealing with it in this context as well.

Assuming you did not encounter any errors, your Cassandra instance is now ready for this example.

Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@ahimsa simple-case]$ sbt
[info] Loading project definition from /home/bkarels/work/simple-case/project
[info] Updating {file:/home/bkarels/work/simple-case/project/}simple-case-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                              
[info] Done updating.                                                            
[info] Set current project to Simple Case (in build file:/home/bkarels/work/simple-case/)
> assembly
[info] Updating {file:/home/bkarels/work/simple-case/}simple-case...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                
[info] Done updating.                                              
[info] Compiling 1 Scala source to /home/bkarels/work/simple-case/target/scala-2.10/classes...                                                                 
[info] Including: minlog-1.2.jar                                                             
...                                               
[info] Checking every *.class/*.jar file's SHA-1.                                            
...
[info] SHA-1: 5b17b5b5ccb29df92a662ad3f404573e7470d576
[info] Packaging /home/bkarels/work/simple-case/target/scala-2.10/CaseStudy.jar ...
[info] Done packaging.
[success] Total time: 28 s, completed Dec 8, 2014 12:37:30 PM
>
As per normal, take note of where your jar file gets put (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to CaseStudy.jar.

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

OK, time to find out who the good people are...
This very simple example will pull a set of CassandraRows out of our human table in Cassandra, iterate over the rows and create a list of Human case classes.  It will then rip through our list of Humans and nicely tell us who is a good person and who is not so much.

An interesting preview of things to come is commented out at or about line 29 of SimpleApp.scala:
row.columnNames
As Cassandra is NoSQL a different set of columns could exist for each row.  For our software to be reactive, we need mechanisms to detect these differences so that they can be handled.  So play with that - it likely will be part of your future.

With that noted, let's run things.  Using the location of our CaseStudy.jar from above, open a terminal and use spark-submit to submit the application to you Spark cluster:
[bkarels@ahimsa simple-case]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.CaseStudy --master spark://127.0.0.1:7077 /home/bkarels/work/simple-case/target/scala-2.10/CaseStudy.jar
...
Fatima Nagossa is a good person.
Neil Harris is a good person.
Pete Jones is a good person.
Joe Jones is not a good person.
Edward Snowden is a good person.
Brad Karels is a good person.
Biel SaBubb is not a good person.
Mother Theresa is a good person.
Alysia Yeoh is a good person.
Hiro Ryoshi is a good person.
B Real is a good person.
Casey Steals-a-lot is not a good person.
Diane Feinstein is not a good person.
If all has gone to plan, you should see output like the following above.  You will actually see two sets of the above out put as the example does the same operation two ways - fetch direct to case class and a more verbose row parsing mechanism.

What's next?

Next we hope to look into writing data to Cassandra from scala case classes.  Or some other interesting thing...

FIN

Thursday, December 4, 2014

Spark 1.1.1 to read & write to Cassandra 2.0.11 - Simple Example


Overview


In my last post we looked at the most simple way to read some data from Apache Cassandra using Apache Spark from your local machine.  Taking the next logical step we will now write some data to Cassandra. The set up for this post is nearly identical to what we did here.  Assuming you have done that work this should only take a couple minutes.

Fetch the example from gihub

From github, clone the sbt-spark-cassandra-rw project.

Assembly

From a terminal, cd into the sbt-spark-cassandra-rw project and fire up sbt.  Once ready, call assembly to create your application jar file.
[bkarels@ahimsa simple-rw]$ sbt            
[info] Loading project definition from /home/bkarels/dev/simple-rw/project
[info] Set current project to Simple RW Project (in build file:/home/bkarels/dev/simple-rw/)                              
> assembly
...
[info] Packaging /home/bkarels/dev/simple-rw/target/scala-2.10/simpleSpark-RW.jar ...                                       
[info] Done packaging.                                                                                                      
[success] Total time: 21 s, completed Dec 3, 2014 10:52:33 AM
As before, take note of where your jar is put (highlighted bit above).

Prepare the data

At the sbt-spark-cassandra-rw project root you will find the file things.cql.  Using DevCenter or cqlsh, execute this script against your target Cassandra cluster.  If you need to set-up a local cluster for development look here.

In this example we will look a a group of things.  Things have keys and values.  But, for a thing to matter it must have a value of greater than one.  So, we will pull down all things, filter out the things that matter, and write only things that matter into their own table (thingsthatmatter).

It is worth noting that the target table must exist for us to write to it.  Unlike some other NoSQL data stores, we must plan ahead a bit more with Cassandra.

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

This bit is identical to the previous example.  With your application assembled, Cassandra up and prepared, and your Spark cluster humming; go to a terminal and submit your job.
[bkarels@ahimsa simple-rw]$ $SPARK_HOME/bin/spark-submit --class com.sparkfu.simple.SimpleApp --master spark://127.0.0.1:7077 /home/bkarels/dev/simple-rw/target/scala-2.10/simpleSpark-RW.jar
...
14/12/03 20:42:45 INFO spark.SparkContext: Job finished: toArray at SimpleApp.scala:27, took 0.641849986 s
(key8,27)
(key7,6)
(key1,28)
(key3,7)
(key4,99)
(key0,42)
(key9,1975)
(key6,100)
If all has gone well your terminal will spit out a list of all the things that matter as above.  Unaltered, the application will truncate the thingsthatmatter table just before it exits.  If you comment out that section in the source, re-assemble, and re-submit the job you could further confirm this by query:

 Note how all things that matter have a value greater than zero.

What's next?

To make this work we used a simple tuple to read our data into, transform it, and write it back.  While this works for very simple examples we will outgrow this very...we have already outgrown this.  That said, next we'll look to read data directly into a Scala case class, transform that data (perhaps into a new class), and write it back to Cassandra.

Tuesday, December 2, 2014

Simple example using Apache Spark 1.1.1 to read from Cassandra 2.0.11


Overview

Building on what we have done in previous posts, we will now put together an Apache Spark application that will read from an Apache Cassandra table and spit out a bit of output.  Here again, this example will only scratch the surface of what is possible in hopes of getting you up and running quickly reading from Cassandra with Spark.

In this example:
  1. Set up a local, three node Cassandra cluster
    1. Create a keyspace
    2. Add and populate a simple key/value table
  2. Set up a local, two worker Spark cluster
  3. Build a simple Spark application
  4. Submit the Spark application to the local cluster
  5. See simple output in the console
  6. Grab a drink and celebrate - your world just got more awesome!

Set up your local Cassandra cluster

ACHTUNG! (2014-12-12)  As noted here, we now run all these examples against a local, single node Cassandra cluster.  You can use the tutorial from Datastax Academy to set this up.

If you have not done so already, go to my post on setting up a local Cassandra cluster.  Once that is done, come back here and continue to the next steps.

You can verify you are ready to go by executing ccm status from a terminal.  You should see output very similar to the following:
[bkarels@ahimsa simple]$ ccm status
Cluster: 'cluster3'
-------------------
node1: UP
node3: UP
node2: UP
 Also, at least one of these nodes should be running on 127.0.0.1 which you can verify using ccm [node name] show.
[bkarels@ahimsa simple]$ ccm node1 show
node1: UP
       cluster=cluster3
       auto_bootstrap=False
       thrift=('127.0.0.1', 9160)
       binary=('127.0.0.1', 9042)
       storage=('127.0.0.1', 7000)
       jmx_port=7100
       remote_debug_port=0
       initial_token=-9223372036854775808
       pid=3137

Create your keyspace

For this next set of tasks I use Datastx DevCenter - a free tool available from Datastax that you can download here.  DevCenter is a very intuitive tool that makes working with Cassandra clusters a breeze.  Take a little tour of DevCenter here.  Of  course, if you prefer other mechanisms for interacting with Cassandra feel free - the next bits of CQL will work regardless.

(NOTE: The following CQL is available in userdb.cql at the root of the example project that can be found here.)

Create a keyspace named 'userdb' (you can name it whatever you like, but that is the name used in this example) using the following CQL:
CREATE KEYSPACE userdb
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};
If you are using DevCenter, you should be able to see your new keyspace in the schema viewer.












Create and populate a table

Execute the following CQL to create a table named 'mycount' and insert 10 rows of data into it:
CREATE TABLE mycount (
  key TEXT PRIMARY KEY,
  value INT
);

INSERT INTO mycount (key,value) VALUES ('key0', 42);
INSERT INTO mycount (key,value) VALUES ('key1', 27);
INSERT INTO mycount (key,value) VALUES ('key2', 12);
INSERT INTO mycount (key,value) VALUES ('key3', 7);
INSERT INTO mycount (key,value) VALUES ('key4', 99);
INSERT INTO mycount (key,value) VALUES ('key5', 666);
INSERT INTO mycount (key,value) VALUES ('key6', 1000);
INSERT INTO mycount (key,value) VALUES ('key7', 0);
INSERT INTO mycount (key,value) VALUES ('key8', 6);
INSERT INTO mycount (key,value) VALUES ('key9', 1975);
You can verify this any number of ways, but again in DevCenter schema view you should see:




















OK, you should now have a local, three node Cassandra cluster up and running with a keyspace, a table, and a tiny bit of data.

Spark Cluster Setup

If you have not done so already, go back to my post on setting up a local Apache Spark cluster.  Once you're done with that come back to here and continue.

If all has gone well, you should be able to open up a browser and see the following at http://127.0.0.1:8080:


The Spark Application

Prerequisites

For this application, you will need the following:
  1. Java 7 or higher (Oracle JDK recommended)
  2. Scala 2.10.4
  3. SBT 0.13.5
  4. Git
  5. Your running Cassandra cluster from above
  6. Your running Spark cluster from above
Begin by cloning the sbt-spark-cassandra-simple project from github.
[bkarels@ahimsa work]$ git clone git@github.com:bradkarels/sbt-spark-cassandra-simple.git simple
Initialized empty Git repository in /home/bkarels/work/simple/.git/
Enter passphrase for key '/home/bkarels/.ssh/id_rsa':
remote: Counting objects: 30, done.
remote: Compressing objects: 100% (6/6), done.
remote: Total 30 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (30/30), 4.56 KiB, done.
Resolving deltas: 100% (2/2), done.
Change directory into the project and start sbt:

(NOTE: Above the project was cloned into a directory name simple.  By default the project directory would be named sbt-spark-cassandra-simple.)
[bkarels@ahimsa work]$ cd simple
[bkarels@ahimsa simple]$ sbt
[info] Loading project definition from /home/bkarels/work/simple/project
[info] Updating {file:/home/bkarels/work/simple/project/}simple-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Simple Project (in build file:/home/bkarels/work/simple/)
>
The project has the assembly plugin built in, so from your sbt prompt you should be able to execute assemby to build the uber jar for the project:
> assembly
sbt will download the necessary files, lots of removed output here...
[info] SHA-1: 20faa94edf488c440ac372ba79f402e759a1a2a7
[info] Packaging /home/bkarels/work/simple/target/scala-2.10/simpleSpark.jar ...
[info] Done packaging.
[success] Total time: 13 s, completed Dec 2, 2014 11:10:08 AM
>
Take note of the highlighted path above.  This is where your Spark application has been placed.  The project is set up to name the resulting jar file 'simpleSpark.jar'.  You can set this value to whatever you like in assembly.sbt however, for this example we will use 'simpleSpark.jar'.

Now, if you have been following the example, you should be able to execute the following command from your terminal:
[bkarels@ahimsa simple]$ $SPARK_HOME/bin/spark-submit --class SimpleApp --master spark://127.0.0.1:7077 /home/bkarels/work/simple/target/scala-2.10/simpleSpark.jar
...and if the bits are in the right order you should see output like this:
Spark assembly has been built with Hive, including Datanucleus jars on classpath                                                                                                                              
14/12/02 11:25:13 WARN util.Utils: Your hostname, ahimsa resolves to a loopback address: 127.0.0.1; using 192.168.254.137 instead (on interface eth0)                                                         
14/12/02 11:25:13 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address                                                                                                                  
14/12/02 11:25:13 INFO spark.SecurityManager: Changing view acls to: bkarels,                                                                                                                                 
14/12/02 11:25:13 INFO spark.SecurityManager: Changing modify acls to: bkarels,
...
14/12/02 11:27:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/02 11:27:50 INFO spark.SparkContext: Job finished: first at SimpleApp.scala:18, took 48.70891727 s
Row count is: 10
The first of which is:
CassandraRow{key: key4, value: 99}
VoilĂ !  Welcome to the bleeding edge.  The last three lines of output above (highlighted) come from SimpleApp.scala where the following line puts the bit together:
val out = s"Row count is: %d\nThe first of which is:\n%s".format(cnt, row.toString)
Here again, as with the other examples, this is very nearly the most simple example possible to use a Spark application to read data from Cassandra and do something with it.  Where you go from here is near limitless.