Tuesday, December 16, 2014

Simple Python application on Apache Spark Cluster

Overview


As an exercise, I am working on duplicating my previous examples in Python.  It is clear that Python has, and is gaining, traction in the data world.  So, it makes sense to have a working knowledge of it.

As with my other examples, everything will find it's way to my Github repositories - forking and enhancements welcome.

This effort is about the most simplistic Python submit to Spark Cluster example possible.  But, when you move beyond the REPL, you have to start somewhere right?

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • A Spark cluster (how to do it here.)
  • git


Clone the Example

Begin by cloning the example project from github - super-simple-spark-python-app and cd into the project directory.


[bkarels@ahimsa work]$ git clone git@github.com:bradkarels/super-simple-spark-python-app.git
Initialized empty Git repository in /home/bkarels/work/super-simple-spark-python-app/.git/
remote: Counting objects: 13, done.
remote: Compressing objects: 100% (12/12), done.
remote: Total 13 (delta 3), reused 6 (delta 1)
Receiving objects: 100% (13/13), done.
Resolving deltas: 100% (3/3), done.
[bkarels@ahimsa work]$ cd super-simple-spark-python-app/


Move the file tenzingyatso.txt to your home directory.
[bkarels@ahimsa super-simple-spark-python-app]$ mv tenzingyatso.txt ~

Modify simple.py path to the sample file (save and close).

file = sc.textFile("/home/bkarels/tenzingyatso.txt")
becomes...
file = sc.textFile("/home/yourUserNameHere/tenzingyatso.txt")
...or some such similar thing.

Spark it up (with python)!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)


Since this example does not have a packaged application (e.g. jar, egg, etc.), we can invoke spark-submit with just our simple python file.


[bkarels@ahimsa super-simple-spark-python-app]$ $SPARK_HOME/bin/spark-submit --master spark://127.0.0.1:7077 ./simple.py
Your expected output to the console should be a line count of 7 wrapped in a nice battery of asterisks and the copy from the first line of the example file.  If you see that - this has worked.

Tuesday, December 9, 2014

Writing to Cassandra 2.0.x with Spark 1.1.1 - moving beyond Tuple2

UPDATE! (2014-12-12): Wide tuple write example added.  I won't lie, it is not elegant - but worth knowing the basic concepts of how to work with tuple bigger than two.  Pull the updated project code to check it out.

Overview

The most rudimentary mechanisms for writing Tuple2 to Cassandra are pretty well beaten to a pulp.  In this post I hope to shine light on few additional, more complex ways to write data to Cassandra as not everything we do with a Spark/Cassandra stack will be reduceByKey on key/value pairs.

In the first example in this series we will fetch some data from Cassandra directly into a case class.  Then we will transform that RDD into a more compact version of itself and write the resulting collection of case classes back to a different table in our Cassandra cluster.

We will be sticking to our 'human' example from previous examples however, I have tuned the schema a bit.  To keep your life simple, I recommend executing the updated CQL which will create a new keyspace and tables for this example.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • A Cassandra cluster (how to do it here.)
  • git

Clone the Example

Begin by cloning the example project from github - sbt-spark-cassandra-writing, & cd into the project directory.

[bkarels@ahimsa work]$ git clone https://github.com/bradkarels/sbt-spark-cassandra-writing.git simple-writing
Initialized empty Git repository in /home/bkarels/work/simple-writing/.git/
remote: Counting objects: 21, done.
remote: Compressing objects: 100% (15/15), done.
remote: Total 21 (delta 0), reused 17 (delta 0)
Unpacking objects: 100% (21/21), done.
[bkarels@ahimsa work]$ cd simple-writing/
[bkarels@ahimsa simple-writing]$

Prepare the Data

At the root of the project you will see two CQL files: sparkfu.cql and populateHumans.cql.  You will need to execute these two files against your local Cassandra instance from Datastax DevCenter or cqlsh (or some other tool) to set things up.

Begin by executing sparkfu.cql to create your keyspace and tables:

CREATE KEYSPACE sparkfu WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE sparkfu.human (
    id TIMEUUID,
    firstname TEXT,
    lastname TEXT,
    gender TEXT,
    address0 TEXT,
    address1 TEXT,
    city TEXT,
    stateprov TEXT,
    zippostal TEXT,
    country TEXT,
    phone TEXT,
    isgoodperson BOOLEAN,
    PRIMARY KEY(id)
);

// Letting Cassandra use default names for the indexs for ease.
CREATE INDEX ON sparkfu.human ( isgoodperson ); // We want to be able to find good people quickly

CREATE INDEX ON sparkfu.human ( stateprov ); // Maybe we need good people by state?

CREATE INDEX ON sparkfu.human ( firstname ); // Good people tend to be named "Brad" - let's find them fast too!

// Clearly this is a horrible model you would never ever use in production, but since this is just a simple example.
CREATE TABLE sparkfu.goodhuman (
    firstname TEXT,
    lastname TEXT,
    PRIMARY KEY(firstname,lastname)
);
Next load up your table with the sample data by executing populateHumans.cql

INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Pete', 'Jones', 'm', '555 Astor Lane',null,'Minneapolis','MN','55401','USA','6125551212',True);
...
[Some CQL removed for brevity - file in project source. ]
...
INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Brad', 'Karels', 'm', '123 Nice Guy Blvd.',null,'Minneapolis','MN','55402','USA','6125551212',True); 
INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Alysia', 'Yeoh', 't', '1 Bat Girl Way',null,'Metropolis','YX','55666','USA','3215551212',True);
Errors?  No.  Good - we are set to proceed.

Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@ahimsa simple-writing]$ sbt
[info] Loading project definition from /home/bkarels/work/simple-writing/project
[info] Updating {file:/home/bkarels/work/simple-writing/project/}simple-writing-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                                    
[info] Done updating.                                                                  
[info] Set current project to Sandy Author (in build file:/home/bkarels/work/simple-writing/)
> assembly                                                                                  
[info] Updating {file:/home/bkarels/work/simple-writing/}simple-writing...                  
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                                         
[info] Done updating.                                                                       
[info] Compiling 1 Scala source to /home/bkarels/work/simple-writing/target/scala-2.10/classes...
...
[ a whole bunch of assembly output will be here... ]
...
[info] SHA-1: 0202b523259e5688311e4b2bcb16c63ade4b7067
[info] Packaging /home/bkarels/work/simple-writing/target/scala-2.10/SandyAuthor.jar ...
[info] Done packaging.
[success] Total time: 19 s, completed Dec 10, 2014 4:41:29 PM
>
As per normal, take note of where your jar file gets put (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to SandyAuthor.jar(You can only say Cassandra and Writer so many times - having some fun with jar names...)

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

In keeping with our theme of finding good people, we'll do it again.  But this time, as you will see in the project source, we will fetch only good people in the first place, transform them into a simplified RDD, and store them off safely separated from all the not good people.

From your terminal, using the location of SandyAuthor.jar from above, submit the application to your Spark cluster:

[bkarels@ahimsa simple-writing]$ $SPARK_HOME/bin/spark-submit --class com.sparkfu.simple.Writer --master spark://127.0.0.1:7077 /home/bkarels/dev/simple-writing/target/scala-2.10/SandyAuthor.jar
...
[ lots of Spark output dumped to terminal here...]
...
14/12/10 16:48:51 INFO spark.SparkContext: Job finished: toArray at SimpleWriting.scala:34, took 0.641733148 s
Alysia Yeoh is a good, simple person.
Edward Snowden is a good, simple person.
Fatima Nagossa is a good, simple person.
Pete Jones is a good, simple person.
Brad Karels is a good, simple person.
Mother Theresa is a good, simple person.
Hiro Ryoshi is a good, simple person.
Neil Harris is a good, simple person.
B Real is a good, simple person.
...
[and then the wide tuple example output]
...

Alysia Yeoh is transgender and can be reached at 3215551212.                                                                                                                                                                                
Neil Harris is male and can be reached at 9045551212.                                                                                                                                                                                       
Mother Theresa is female and can be reached at null.                                                                                                                                                                                        
Hiro Ryoshi is female and can be reached at 7155551212.                                                                                                                                                                                     
Brad Karels is male and can be reached at 6125551212.                                                                                                                                                                                       
Pete Jones is male and can be reached at 6125551212.                                                                                                                                                                                        
Edward Snowden is male and can be reached at null.                                                                                                                                                                                          
Fatima Nagossa is female and can be reached at 7895551212.                                                                                                                                                                                  
B Real is male and can be reached at 9995551212.

[bkarels@ahimsa simple-writing]$
If all has gone well you should see output like the above.  Please note, we are also TRUNCATING the goodperson and personfromtuple tables just before the program exits, so if you look in Cassandra those tables will be empty.  Feel free to comment out those lines and validate directly in Cassandra.

Then play around with it.  Extend the case classes, add columns to the schema, add more transformations, etc..  These simple examples will serve you best if you extend and make them your own.

What's next?

There is likely good cause to start playing with creating and altering table structures dynamically.

Monday, December 8, 2014

Spark 1.1.1 to read from Cassandra into scala case class - Simple Example

As we continue to advance our use cases for Apache Spark on Cassandra it seems only right that our next case puts data into a case class.

Following previous examples, this will be the most simple use case - more elegant and advanced usages are coming...

Update! (2014-12-10): I have added fetching Cassandra data direct into a case class as intended by the connector.  The source for the example has been updated and pushed to the repository.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • A Cassandra cluster (how to do it here.)
  • git

Overview

For this example we will pull data for various humans out of Cassandra and put the relevant bits into our model.  Then we will see about working with only good persons (as determined by the creators of our data - our algorithm is not that advanced...yet).

Clone the Example

Begin by cloning the example project from github - our project is spark-cassandra-to-scala-case-class, & cd into the project directory.
[bkarels@ahimsa work]$ git clone https://github.com/bradkarels/spark-cassandra-to-scala-case-class.git simple-case
Initialized empty Git repository in /home/bkarels/work/simple-case/.git/
remote: Counting objects: 21, done.
remote: Compressing objects: 100% (15/15), done.
remote: Total 21 (delta 0), reused 17 (delta 0)
Unpacking objects: 100% (21/21), done.
[bkarels@ahimsa work]$ cd simple-case/
[bkarels@ahimsa simple-case]$ l
total 32K
-rw-rw-r--. 1 bkarels bkarels 1.2K Dec  8 12:24 assembly.sbt
-rw-rw-r--. 1 bkarels bkarels  298 Dec  8 12:24 build.sbt
-rw-rw-r--. 1 bkarels bkarels 1.1K Dec  8 12:24 LICENSE
-rw-rw-r--. 1 bkarels bkarels 1.1K Dec  8 12:24 nicecase.cql
-rw-rw-r--. 1 bkarels bkarels 3.2K Dec  8 12:24 populateHumans.cql
drwxrwxr-x. 2 bkarels bkarels 4.0K Dec  8 12:24 project
-rw-rw-r--. 1 bkarels bkarels  228 Dec  8 12:24 README.md
drwxrwxr-x. 3 bkarels bkarels 4.0K Dec  8 12:24 src

Prepare the Data

At the root of the project you will see two CQL files: nicecase.cql & populateHumans.cql.  You will need to execute these two files against your local Cassandra instance from Datastax DevCenter or cqlsh (or some other tool) to set things up.

Begin by executing nicecase.cql to create your keyspace and tables:
CREATE KEYSPACE nicecase WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE nicecase.human (
    id TIMEUUID,
    firstname TEXT,
    lastname TEXT,
    gender TEXT,
    address0 TEXT,
    address1 TEXT,
    city TEXT,
    stateprov TEXT,
    zippostal TEXT,
    country TEXT,
    phone TEXT,
    "isGoodPerson" BOOLEAN, // Case sensitive column name - generally not recommended, but possible.
    PRIMARY KEY(id)
);

CREATE INDEX good_person ON nicecase.human ( "isGoodPerson" ); // We want to be able to find good people quickly

CREATE INDEX person_state ON nicecase.human ( stateprov ); // Maybe we need good people by state?

CREATE INDEX ON nicecase.human ( firstname ); // Let Cassandra use the default name for the index.  Good people tend to be named "Brad" - let's find them fast too!

CREATE TABLE nicecase.goodhuman (
    human_id TIMEUUID,
    PRIMARY KEY(human_id)
);

CREATE TABLE nicecase.badhuman (
    human_id TIMEUUID,
    PRIMARY KEY(human_id)
);

CREATE TABLE nicecase.goodbrad (
    human_id TIMEUUID,
    firstname TEXT,
    PRIMARY KEY(human_id)
);
There are a few extra bits in here that we won't be using in this example (indexes, extra tiny tables), but they won't hurt anything and we will likely use them in our next example or two.

Next, execute populateHumans.cql to load up our human table with some data.
 INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Pete', 'Jones', 'm', '555 Astor Lane',null,'Minneapolis','MN','55401','USA','6125551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Neil', 'Harris', 'm', '123 Doogie Howser Way',null,'Los Angeles','CA','90211','USA','9045551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Hiro', 'Ryoshi', 'f', '42 Cemetary Road','Apt. 23','River Falls','WI','55301','USA','7155551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Biel', 'SaBubb', 'm', '666 Hellpass',null,'Demonville','KY','32054','USA','3125551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Edward', 'Snowden', 'm', null,null,null,null,null,'RU',null,True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Mother', 'Theresa', 'f', null,null,null,null,null,null,null,True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Fatima', 'Nagossa', 'f', '689 First Ave.','Apt 1b','Orlando','FL','32822','USA','7895551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Casey', 'Steals-a-lot', 'f', '71 Buster Lane',null,'Denver','CO','74811','USA','7895551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'B', 'Real', 'm', '420 High Way','I forgot','Palo Alto','CA','90255','USA','9995551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'JJ', 'Jones', 't', '123 Sycamore Way',null,'Las Cruces','CA','91553','USA','9995551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Diane', 'Feinstein', '?', 'Do not care',null,'Some City','CA','99999','USA','1235551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Brad', 'Karels', 'm', '123 Nice Guy Blvd.',null,'Minneapolis','MN','55402','USA','6125551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Alysia', 'Yeoh', 't', '1 Bat Girl Way',null,'Metropolis','YX','55666','USA','3215551212',True);
There is a lot more data here than we will use for this example, but again, the near future is coming...  Most notable here is how we use the CQL function now() to set the value for our primary key of type TIMEUUID.  We are also, quite intentionally, leaving much of the data values null.  Real data has nulls, might as well get the hang of dealing with it in this context as well.

Assuming you did not encounter any errors, your Cassandra instance is now ready for this example.

Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@ahimsa simple-case]$ sbt
[info] Loading project definition from /home/bkarels/work/simple-case/project
[info] Updating {file:/home/bkarels/work/simple-case/project/}simple-case-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                              
[info] Done updating.                                                            
[info] Set current project to Simple Case (in build file:/home/bkarels/work/simple-case/)
> assembly
[info] Updating {file:/home/bkarels/work/simple-case/}simple-case...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                
[info] Done updating.                                              
[info] Compiling 1 Scala source to /home/bkarels/work/simple-case/target/scala-2.10/classes...                                                                 
[info] Including: minlog-1.2.jar                                                             
...                                               
[info] Checking every *.class/*.jar file's SHA-1.                                            
...
[info] SHA-1: 5b17b5b5ccb29df92a662ad3f404573e7470d576
[info] Packaging /home/bkarels/work/simple-case/target/scala-2.10/CaseStudy.jar ...
[info] Done packaging.
[success] Total time: 28 s, completed Dec 8, 2014 12:37:30 PM
>
As per normal, take note of where your jar file gets put (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to CaseStudy.jar.

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

OK, time to find out who the good people are...
This very simple example will pull a set of CassandraRows out of our human table in Cassandra, iterate over the rows and create a list of Human case classes.  It will then rip through our list of Humans and nicely tell us who is a good person and who is not so much.

An interesting preview of things to come is commented out at or about line 29 of SimpleApp.scala:
row.columnNames
As Cassandra is NoSQL a different set of columns could exist for each row.  For our software to be reactive, we need mechanisms to detect these differences so that they can be handled.  So play with that - it likely will be part of your future.

With that noted, let's run things.  Using the location of our CaseStudy.jar from above, open a terminal and use spark-submit to submit the application to you Spark cluster:
[bkarels@ahimsa simple-case]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.CaseStudy --master spark://127.0.0.1:7077 /home/bkarels/work/simple-case/target/scala-2.10/CaseStudy.jar
...
Fatima Nagossa is a good person.
Neil Harris is a good person.
Pete Jones is a good person.
Joe Jones is not a good person.
Edward Snowden is a good person.
Brad Karels is a good person.
Biel SaBubb is not a good person.
Mother Theresa is a good person.
Alysia Yeoh is a good person.
Hiro Ryoshi is a good person.
B Real is a good person.
Casey Steals-a-lot is not a good person.
Diane Feinstein is not a good person.
If all has gone to plan, you should see output like the following above.  You will actually see two sets of the above out put as the example does the same operation two ways - fetch direct to case class and a more verbose row parsing mechanism.

What's next?

Next we hope to look into writing data to Cassandra from scala case classes.  Or some other interesting thing...

FIN

Thursday, December 4, 2014

Spark 1.1.1 to read & write to Cassandra 2.0.11 - Simple Example


Overview


In my last post we looked at the most simple way to read some data from Apache Cassandra using Apache Spark from your local machine.  Taking the next logical step we will now write some data to Cassandra. The set up for this post is nearly identical to what we did here.  Assuming you have done that work this should only take a couple minutes.

Fetch the example from gihub

From github, clone the sbt-spark-cassandra-rw project.

Assembly

From a terminal, cd into the sbt-spark-cassandra-rw project and fire up sbt.  Once ready, call assembly to create your application jar file.
[bkarels@ahimsa simple-rw]$ sbt            
[info] Loading project definition from /home/bkarels/dev/simple-rw/project
[info] Set current project to Simple RW Project (in build file:/home/bkarels/dev/simple-rw/)                              
> assembly
...
[info] Packaging /home/bkarels/dev/simple-rw/target/scala-2.10/simpleSpark-RW.jar ...                                       
[info] Done packaging.                                                                                                      
[success] Total time: 21 s, completed Dec 3, 2014 10:52:33 AM
As before, take note of where your jar is put (highlighted bit above).

Prepare the data

At the sbt-spark-cassandra-rw project root you will find the file things.cql.  Using DevCenter or cqlsh, execute this script against your target Cassandra cluster.  If you need to set-up a local cluster for development look here.

In this example we will look a a group of things.  Things have keys and values.  But, for a thing to matter it must have a value of greater than one.  So, we will pull down all things, filter out the things that matter, and write only things that matter into their own table (thingsthatmatter).

It is worth noting that the target table must exist for us to write to it.  Unlike some other NoSQL data stores, we must plan ahead a bit more with Cassandra.

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

This bit is identical to the previous example.  With your application assembled, Cassandra up and prepared, and your Spark cluster humming; go to a terminal and submit your job.
[bkarels@ahimsa simple-rw]$ $SPARK_HOME/bin/spark-submit --class com.sparkfu.simple.SimpleApp --master spark://127.0.0.1:7077 /home/bkarels/dev/simple-rw/target/scala-2.10/simpleSpark-RW.jar
...
14/12/03 20:42:45 INFO spark.SparkContext: Job finished: toArray at SimpleApp.scala:27, took 0.641849986 s
(key8,27)
(key7,6)
(key1,28)
(key3,7)
(key4,99)
(key0,42)
(key9,1975)
(key6,100)
If all has gone well your terminal will spit out a list of all the things that matter as above.  Unaltered, the application will truncate the thingsthatmatter table just before it exits.  If you comment out that section in the source, re-assemble, and re-submit the job you could further confirm this by query:

 Note how all things that matter have a value greater than zero.

What's next?

To make this work we used a simple tuple to read our data into, transform it, and write it back.  While this works for very simple examples we will outgrow this very...we have already outgrown this.  That said, next we'll look to read data directly into a Scala case class, transform that data (perhaps into a new class), and write it back to Cassandra.

Tuesday, December 2, 2014

Simple example using Apache Spark 1.1.1 to read from Cassandra 2.0.11


Overview

Building on what we have done in previous posts, we will now put together an Apache Spark application that will read from an Apache Cassandra table and spit out a bit of output.  Here again, this example will only scratch the surface of what is possible in hopes of getting you up and running quickly reading from Cassandra with Spark.

In this example:
  1. Set up a local, three node Cassandra cluster
    1. Create a keyspace
    2. Add and populate a simple key/value table
  2. Set up a local, two worker Spark cluster
  3. Build a simple Spark application
  4. Submit the Spark application to the local cluster
  5. See simple output in the console
  6. Grab a drink and celebrate - your world just got more awesome!

Set up your local Cassandra cluster

ACHTUNG! (2014-12-12)  As noted here, we now run all these examples against a local, single node Cassandra cluster.  You can use the tutorial from Datastax Academy to set this up.

If you have not done so already, go to my post on setting up a local Cassandra cluster.  Once that is done, come back here and continue to the next steps.

You can verify you are ready to go by executing ccm status from a terminal.  You should see output very similar to the following:
[bkarels@ahimsa simple]$ ccm status
Cluster: 'cluster3'
-------------------
node1: UP
node3: UP
node2: UP
 Also, at least one of these nodes should be running on 127.0.0.1 which you can verify using ccm [node name] show.
[bkarels@ahimsa simple]$ ccm node1 show
node1: UP
       cluster=cluster3
       auto_bootstrap=False
       thrift=('127.0.0.1', 9160)
       binary=('127.0.0.1', 9042)
       storage=('127.0.0.1', 7000)
       jmx_port=7100
       remote_debug_port=0
       initial_token=-9223372036854775808
       pid=3137

Create your keyspace

For this next set of tasks I use Datastx DevCenter - a free tool available from Datastax that you can download here.  DevCenter is a very intuitive tool that makes working with Cassandra clusters a breeze.  Take a little tour of DevCenter here.  Of  course, if you prefer other mechanisms for interacting with Cassandra feel free - the next bits of CQL will work regardless.

(NOTE: The following CQL is available in userdb.cql at the root of the example project that can be found here.)

Create a keyspace named 'userdb' (you can name it whatever you like, but that is the name used in this example) using the following CQL:
CREATE KEYSPACE userdb
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};
If you are using DevCenter, you should be able to see your new keyspace in the schema viewer.












Create and populate a table

Execute the following CQL to create a table named 'mycount' and insert 10 rows of data into it:
CREATE TABLE mycount (
  key TEXT PRIMARY KEY,
  value INT
);

INSERT INTO mycount (key,value) VALUES ('key0', 42);
INSERT INTO mycount (key,value) VALUES ('key1', 27);
INSERT INTO mycount (key,value) VALUES ('key2', 12);
INSERT INTO mycount (key,value) VALUES ('key3', 7);
INSERT INTO mycount (key,value) VALUES ('key4', 99);
INSERT INTO mycount (key,value) VALUES ('key5', 666);
INSERT INTO mycount (key,value) VALUES ('key6', 1000);
INSERT INTO mycount (key,value) VALUES ('key7', 0);
INSERT INTO mycount (key,value) VALUES ('key8', 6);
INSERT INTO mycount (key,value) VALUES ('key9', 1975);
You can verify this any number of ways, but again in DevCenter schema view you should see:




















OK, you should now have a local, three node Cassandra cluster up and running with a keyspace, a table, and a tiny bit of data.

Spark Cluster Setup

If you have not done so already, go back to my post on setting up a local Apache Spark cluster.  Once you're done with that come back to here and continue.

If all has gone well, you should be able to open up a browser and see the following at http://127.0.0.1:8080:


The Spark Application

Prerequisites

For this application, you will need the following:
  1. Java 7 or higher (Oracle JDK recommended)
  2. Scala 2.10.4
  3. SBT 0.13.5
  4. Git
  5. Your running Cassandra cluster from above
  6. Your running Spark cluster from above
Begin by cloning the sbt-spark-cassandra-simple project from github.
[bkarels@ahimsa work]$ git clone git@github.com:bradkarels/sbt-spark-cassandra-simple.git simple
Initialized empty Git repository in /home/bkarels/work/simple/.git/
Enter passphrase for key '/home/bkarels/.ssh/id_rsa':
remote: Counting objects: 30, done.
remote: Compressing objects: 100% (6/6), done.
remote: Total 30 (delta 0), reused 0 (delta 0)
Receiving objects: 100% (30/30), 4.56 KiB, done.
Resolving deltas: 100% (2/2), done.
Change directory into the project and start sbt:

(NOTE: Above the project was cloned into a directory name simple.  By default the project directory would be named sbt-spark-cassandra-simple.)
[bkarels@ahimsa work]$ cd simple
[bkarels@ahimsa simple]$ sbt
[info] Loading project definition from /home/bkarels/work/simple/project
[info] Updating {file:/home/bkarels/work/simple/project/}simple-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Simple Project (in build file:/home/bkarels/work/simple/)
>
The project has the assembly plugin built in, so from your sbt prompt you should be able to execute assemby to build the uber jar for the project:
> assembly
sbt will download the necessary files, lots of removed output here...
[info] SHA-1: 20faa94edf488c440ac372ba79f402e759a1a2a7
[info] Packaging /home/bkarels/work/simple/target/scala-2.10/simpleSpark.jar ...
[info] Done packaging.
[success] Total time: 13 s, completed Dec 2, 2014 11:10:08 AM
>
Take note of the highlighted path above.  This is where your Spark application has been placed.  The project is set up to name the resulting jar file 'simpleSpark.jar'.  You can set this value to whatever you like in assembly.sbt however, for this example we will use 'simpleSpark.jar'.

Now, if you have been following the example, you should be able to execute the following command from your terminal:
[bkarels@ahimsa simple]$ $SPARK_HOME/bin/spark-submit --class SimpleApp --master spark://127.0.0.1:7077 /home/bkarels/work/simple/target/scala-2.10/simpleSpark.jar
...and if the bits are in the right order you should see output like this:
Spark assembly has been built with Hive, including Datanucleus jars on classpath                                                                                                                              
14/12/02 11:25:13 WARN util.Utils: Your hostname, ahimsa resolves to a loopback address: 127.0.0.1; using 192.168.254.137 instead (on interface eth0)                                                         
14/12/02 11:25:13 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address                                                                                                                  
14/12/02 11:25:13 INFO spark.SecurityManager: Changing view acls to: bkarels,                                                                                                                                 
14/12/02 11:25:13 INFO spark.SecurityManager: Changing modify acls to: bkarels,
...
14/12/02 11:27:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/12/02 11:27:50 INFO spark.SparkContext: Job finished: first at SimpleApp.scala:18, took 48.70891727 s
Row count is: 10
The first of which is:
CassandraRow{key: key4, value: 99}
VoilĂ !  Welcome to the bleeding edge.  The last three lines of output above (highlighted) come from SimpleApp.scala where the following line puts the bit together:
val out = s"Row count is: %d\nThe first of which is:\n%s".format(cnt, row.toString)
Here again, as with the other examples, this is very nearly the most simple example possible to use a Spark application to read data from Cassandra and do something with it.  Where you go from here is near limitless.

Tuesday, November 25, 2014

Using spark-submit to send an application to a local Spark cluster

In my last post (Running a local Apache Spark Cluster)
I went over how to spin up a local Spark cluster for development and prototyping.  Now it is time to build the most basic Spark application to submit to your local cluster.  While this example is heavily based on this example, we will tweak a couple of bits to make it just slightly more interesting.

What you should expect:
  1. Pull down and quickly modify the source.
  2. Package the application into a jar file.
  3. Submit the application using spark-submit to your locally running cluster (or any cluster where the sample file exists on all nodes).
  4. View the expected results in your terminal.

The ready to consume application can be found at:
https://github.com/bradkarels/super-simple-spark-app

See the README.md file for direction on how to modify the application to run on your environment.

You will need to have Java, Scala, and SBT installed locally.

(From the README.md file)

Step 1:
Move the file tenzingyatso.txt to a known location on your file system (E.g. /tmp/tenzingyatso.txt)

Step 2:
Modify SuperSimple.scala so the path to tenzingyatso.txt is correct for your system.

From:
val compassionFile = "/home/bkarels/tenzingyatso.txt"

To:
val compassionFile = "/tmp/tenzingyatso.txt"

Step 3:
From the root of this project run package from within SBT:

$sbt
...
> package
...
*** Take note of where the application jar is written ***
[info] Packaging /home/bkarels/dev/super-simple-spark-app/target/scala-2.10/super-simple-spark-app_2.10-0.1.jar ...
[info] Done packaging.

> exit

Step 4:
Since this has been designed to run against a local cluster, navigate to your $SPARK_HOME and use spark-submit to send the application to your cluster:

(example)
[bkarels@ahimsa spark_1.1.0]$ ./bin/spark-submit --class com.bradkarels.spark.simple.SuperSimple --master spark://127.0.0.1:7077 /home/bkarels/dev/super-simple-spark-app/target/scala-2.10/super-simple-spark-app_2.10-0.1.jar
...
Talks of peace: 3
Speaks of love: 2

FIN

Running a local Apache Spark cluster

We can't all have a dedicated cluster to play with and even if we do, having complete control of a disposable environment has it's advantages.  Here we will examine the most simple path to setting up a local cluster on your machine.  Remember, one of the great powers of Spark is that the same code you run on your underpowered single machine on a tiny dataset will run on hundreds of nodes and petabytes of data.  So let's make your laptop useful for development and prototyping shall we...

For this example I am running a CentOS 6.5 virtual machine (VMWare) set to use a single processor with four cores and ~12Gb RAM.  So we'll give one core to our Spark Master and one core each to two worker nodes with 1Gb of memory.  Clearly, you could do more if you have more cores and more memory, but we're not looking to break processing speed records - just to move your Spark knowledge to the next level. 

(How fun would it be to do this on a stack of RasberryPis?)

The state of the art Apache Spark release, as of the time of this writing, is 1.1.0 and that is what will be used.

You probably have a binary Spark distribution downloaded to your machine but if you do not, do that now.  Apache Spark can be downloaded here.  Once downloaded extract it to a local directory - mine is at:

/home/bkarels/spark_1.1.0

(this will become $SPARK_HOME)

The Spark developers have done you a huge favour and have added a set of scripts at SPARK_HOME/sbin/ to do most of what we want to accomplish (get a local spark cluster on a single machine).  Again, in the spirit of getting you up and running without exploring every possibility, here is what you need to do.

You can tip up a Spark master and worker independently using SPARK_HOME/sbin/start-master.sh and SPARK_HOME/sbin/start-slaves.sh together.  But we're going to add a couple environment variables and let SPARK_HOME/sbin/start-all.sh tip up a master and two workers in a single step.

The official Apache Spark documentation for this can be found here if you want to dig deeper.  But for now use your favorite editor to pop open ~/.bashrc.

Add the following items (mind where your SPARK_HOME actually is):

# Spark local environment variables
export SPARK_HOME=/home/bkarels/spark_1.1.0
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8080
export SPARK_LOCAL_DIRS=$SPARK_HOME/work
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=1G
export SPARK_WORKER_INSTANCES=2
export SPARK_DAEMON_MEMORY=384m
SPARK_MASTER_IP, SPARK_MASTER_PORT, & SPARK_MASTER_WEBUI_PORT
Here, these are explicitly set with the defaults.  This is just to illustrate that you could easily customize these values.  The remaining elements are where we get what we are looking for in our local cluster.

By default, start-all.sh fires up a single worker that uses all available cores and your available memory - 1Gb. (E.g. On a four core machine with 12Gb RAM the worker would use all four cores and 11Gb of memory.)  If that works for you, great!  If not, let's tune things a bit.

SPARK_WORKER_INSTANCES
Defaults to 1.  If you set this to anything greater than one, be sure that the value multiplied by the value of SPARK_WORKER_CORES is less than or equal to the total number of cores available on your machine.

Also, if you set to a value greater than one, verify that the value multiplied by the value of SPARK_WORKER_MEMORY is less than your total system memory.

On my example machine with four cores and 12Gb of memory we could do:
(workers X cores X memory in Gb)
4x1x3
2x2x6
1x4x12
...and other combos that do not max things out:
3x1x1
2x1x2
2x1x1 (our example)

SPARK_WORKER_MEMORY & SPARK_WORKER_INSTANCES
Hopefully these are self explanatory - so I'll let the Apache docs stand.  Just be mindful of the notes above.

It is worth noting that for development and prototyping a small worker memory should be sufficient.  (E.g. Worker memory of 256Mb would be more than enough if your sample data set was a 64Mb log file or 150Mb of emails.)  Remember, failing fast in development is a good thing!

SPARK_DAEMON_MEMORY
By default this value is 512m.  I have turned it down here just to illustrate that it can be tuned.  So if you are getting tight on resources running your IDE, Cassandra, Spark, & etc.; you could turn this down a bit.  (This and memory per worker perhaps.)

Enough with the details, let's run this thing...

First, don't forget to source your .bashrc file:
[bkarels@ahimsa ~]$ . .bashrc

Navigate to SPARK_HOME/sbin:
[bkarels@ahimsa ~]$ cd $SPARK_HOME/sbin
[bkarels@ahimsa sbin]$

Run start-all.sh entering your password when prompted:
[bkarels@ahimsa sbin]$ ./start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /home/bkarels/spark_1.1.0/sbin/../logs/spark-bkarels-org.apache.spark.deploy.master.Master-1-ahimsa.out                                                     
bkarels@localhost's password:                                                                              
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/bkarels/spark_1.1.0/sbin/../logs/spark-bkarels-org.apache.spark.deploy.worker.Worker-1-ahimsa.out

Looks good - let's verify by checking out the master's webui at localhost:8080.


VoilĂ !  Your completely customizable, local, disposable Spark cluster is up and running and ready to accept jobs.  (We'll get to that bit soon!)

Lastly - shutting it down.  Here again the Spark engineers have done the work, you need only call stop-all.sh.

[bkarels@ahimsa sbin]$ ./stop-all.sh
bkarels@localhost's password:
localhost: stopping org.apache.spark.deploy.worker.Worker
bkarels@localhost's password:
localhost: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
[bkarels@ahimsa sbin]$
FIN

Friday, November 21, 2014

Get your Cassandra on with ccm (Cassandra Cluster Manager)

UPDATE! (2014-12-9)  After a small bit of experimentation it seems that running the Spark-Fu examples against Cassandra clusters spun up using CCM may be the cause of the performance issues I have been experiencing.  Using a single node Cassandra "cluster" from the tarball has presented the kind of performance I would expect for these simple examples on a laptop.  I followed the great tutorial from Datastax Academy to set this up.  You will need to sign up for the Academy - but it's free and has much great content so I have no issues recommending it.  That said, using CCM to experiment with Cassandra clusters locally seems to be otherwise wonderful and stable.

To do things with Apache Spark on Cassandra we need first have Cassandra.  The best/fastest way (that I know of) to get a Cassandra cluster locally to prototype with is to use CCM.

What is CCM? 

CCM is the Cassandra Cluster Manager.

What does CCM do?

CCM creates multi-node clusters for development and testing on a local machine.  It has no capacity for use in production.

How do I get started with all this CCM voodoo?

I am running this all on CentOS 6.5 - directions will vary for other environments.  We also assume you have java 7 or high installed.

Step 1: Download & install the epel packages:

See -> https://fedoraproject.org/wiki/EPEL

Download the package (e.g. CentOS 6.x)

http://mirror.metrocast.net/fedora/epel/6/i386/epel-release-6-8.noarch.rpm

Install:
[bkarels@ahimsa ~]$ sudo rpm -Uvh epel-release-6-8.noarch.rpm

Step 2: Install python-pip:

[bkarels@ahimsa ~]$ sudo yum -y install python-pip

[bkarels@ahimsa ~]$ pip install cql PyYAML

Step 3: Install Apache Ant (CCM depends on ant)

See ant.apache.org for install instructions.

Step 4: Install ccm: (Cassandra Cluster Manager)


[bkarels@ahimsa ~]$ git clone https://github.com/pcmanus/ccm.git
[bkarels@ahimsa ~]$ cd ccm/
[bkarels@ahimsa ~]$ sudo ./setup.py install

Step 5 (Optional): Get Help

To get help: (this is really a great way to dig in to ccm)
[bkarels@ahimsa ~]$  ccm -help
[bkarels@ahimsa ~]$  ccm [command] -help

Step 6: Do some stuff
CCM has two primary types of operations: 
  1. Cluster commands
  2. Node commands
Cluster commands take the form:
$ ccm [cluster command] [options]

Node commands take the form:
$ ccm [node name] [node command] [options]

So, lets spin up a three node local cluster real quick like: 

[bkarels@ahimsa ~]$ ccm create cluster0 -v 2.0.11
Downloading http://archive.apache.org/dist/cassandra/2.0.11/apache-cassandra-2.0.11-src.tar.gz to /tmp/ccm-bwFLa4.tar.gz (10.836MB)
  11362079  [100.00%]
Extracting /tmp/ccm-bwFLa4.tar.gz as version 2.0.11 ...
Compiling Cassandra 2.0.11 ...
Current cluster is now: cluster0
[bkarels@ahimsa ~]$ ccm list
 *cluster0
[bkarels@ahimsa ~]$ ccm populate --nodes 3
[bkarels@ahimsa ~]$ ccm start
[bkarels@ahimsa ~]$ ccm status
Cluster: 'cluster0'
-------------------
node1: UP
node3: UP
node2: UP
[bkarels@ahimsa ~]$ ccm node2 stop
[bkarels@ahimsa ~]$ ccm status
Cluster: 'cluster0'
-------------------
node1: UP
node3: UP
node2: DOWN
[bkarels@ahimsa ~]$

And just like that you have a three node cluster on your local machine that you can start to play with.  Of course this set of instructions barely scratches the surface of what is possible, but our focus is Spark so this is just to give us something we can read from and write to.

FIN