Tuesday, December 9, 2014

Writing to Cassandra 2.0.x with Spark 1.1.1 - moving beyond Tuple2

UPDATE! (2014-12-12): Wide tuple write example added.  I won't lie, it is not elegant - but worth knowing the basic concepts of how to work with tuple bigger than two.  Pull the updated project code to check it out.

Overview

The most rudimentary mechanisms for writing Tuple2 to Cassandra are pretty well beaten to a pulp.  In this post I hope to shine light on few additional, more complex ways to write data to Cassandra as not everything we do with a Spark/Cassandra stack will be reduceByKey on key/value pairs.

In the first example in this series we will fetch some data from Cassandra directly into a case class.  Then we will transform that RDD into a more compact version of itself and write the resulting collection of case classes back to a different table in our Cassandra cluster.

We will be sticking to our 'human' example from previous examples however, I have tuned the schema a bit.  To keep your life simple, I recommend executing the updated CQL which will create a new keyspace and tables for this example.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • A Cassandra cluster (how to do it here.)
  • git

Clone the Example

Begin by cloning the example project from github - sbt-spark-cassandra-writing, & cd into the project directory.

[bkarels@ahimsa work]$ git clone https://github.com/bradkarels/sbt-spark-cassandra-writing.git simple-writing
Initialized empty Git repository in /home/bkarels/work/simple-writing/.git/
remote: Counting objects: 21, done.
remote: Compressing objects: 100% (15/15), done.
remote: Total 21 (delta 0), reused 17 (delta 0)
Unpacking objects: 100% (21/21), done.
[bkarels@ahimsa work]$ cd simple-writing/
[bkarels@ahimsa simple-writing]$

Prepare the Data

At the root of the project you will see two CQL files: sparkfu.cql and populateHumans.cql.  You will need to execute these two files against your local Cassandra instance from Datastax DevCenter or cqlsh (or some other tool) to set things up.

Begin by executing sparkfu.cql to create your keyspace and tables:

CREATE KEYSPACE sparkfu WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE sparkfu.human (
    id TIMEUUID,
    firstname TEXT,
    lastname TEXT,
    gender TEXT,
    address0 TEXT,
    address1 TEXT,
    city TEXT,
    stateprov TEXT,
    zippostal TEXT,
    country TEXT,
    phone TEXT,
    isgoodperson BOOLEAN,
    PRIMARY KEY(id)
);

// Letting Cassandra use default names for the indexs for ease.
CREATE INDEX ON sparkfu.human ( isgoodperson ); // We want to be able to find good people quickly

CREATE INDEX ON sparkfu.human ( stateprov ); // Maybe we need good people by state?

CREATE INDEX ON sparkfu.human ( firstname ); // Good people tend to be named "Brad" - let's find them fast too!

// Clearly this is a horrible model you would never ever use in production, but since this is just a simple example.
CREATE TABLE sparkfu.goodhuman (
    firstname TEXT,
    lastname TEXT,
    PRIMARY KEY(firstname,lastname)
);
Next load up your table with the sample data by executing populateHumans.cql

INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Pete', 'Jones', 'm', '555 Astor Lane',null,'Minneapolis','MN','55401','USA','6125551212',True);
...
[Some CQL removed for brevity - file in project source. ]
...
INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Brad', 'Karels', 'm', '123 Nice Guy Blvd.',null,'Minneapolis','MN','55402','USA','6125551212',True); 
INSERT INTO sparkfu.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, isgoodperson)
  VALUES (now(),'Alysia', 'Yeoh', 't', '1 Bat Girl Way',null,'Metropolis','YX','55666','USA','3215551212',True);
Errors?  No.  Good - we are set to proceed.

Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@ahimsa simple-writing]$ sbt
[info] Loading project definition from /home/bkarels/work/simple-writing/project
[info] Updating {file:/home/bkarels/work/simple-writing/project/}simple-writing-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                                    
[info] Done updating.                                                                  
[info] Set current project to Sandy Author (in build file:/home/bkarels/work/simple-writing/)
> assembly                                                                                  
[info] Updating {file:/home/bkarels/work/simple-writing/}simple-writing...                  
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                                         
[info] Done updating.                                                                       
[info] Compiling 1 Scala source to /home/bkarels/work/simple-writing/target/scala-2.10/classes...
...
[ a whole bunch of assembly output will be here... ]
...
[info] SHA-1: 0202b523259e5688311e4b2bcb16c63ade4b7067
[info] Packaging /home/bkarels/work/simple-writing/target/scala-2.10/SandyAuthor.jar ...
[info] Done packaging.
[success] Total time: 19 s, completed Dec 10, 2014 4:41:29 PM
>
As per normal, take note of where your jar file gets put (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to SandyAuthor.jar(You can only say Cassandra and Writer so many times - having some fun with jar names...)

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

In keeping with our theme of finding good people, we'll do it again.  But this time, as you will see in the project source, we will fetch only good people in the first place, transform them into a simplified RDD, and store them off safely separated from all the not good people.

From your terminal, using the location of SandyAuthor.jar from above, submit the application to your Spark cluster:

[bkarels@ahimsa simple-writing]$ $SPARK_HOME/bin/spark-submit --class com.sparkfu.simple.Writer --master spark://127.0.0.1:7077 /home/bkarels/dev/simple-writing/target/scala-2.10/SandyAuthor.jar
...
[ lots of Spark output dumped to terminal here...]
...
14/12/10 16:48:51 INFO spark.SparkContext: Job finished: toArray at SimpleWriting.scala:34, took 0.641733148 s
Alysia Yeoh is a good, simple person.
Edward Snowden is a good, simple person.
Fatima Nagossa is a good, simple person.
Pete Jones is a good, simple person.
Brad Karels is a good, simple person.
Mother Theresa is a good, simple person.
Hiro Ryoshi is a good, simple person.
Neil Harris is a good, simple person.
B Real is a good, simple person.
...
[and then the wide tuple example output]
...

Alysia Yeoh is transgender and can be reached at 3215551212.                                                                                                                                                                                
Neil Harris is male and can be reached at 9045551212.                                                                                                                                                                                       
Mother Theresa is female and can be reached at null.                                                                                                                                                                                        
Hiro Ryoshi is female and can be reached at 7155551212.                                                                                                                                                                                     
Brad Karels is male and can be reached at 6125551212.                                                                                                                                                                                       
Pete Jones is male and can be reached at 6125551212.                                                                                                                                                                                        
Edward Snowden is male and can be reached at null.                                                                                                                                                                                          
Fatima Nagossa is female and can be reached at 7895551212.                                                                                                                                                                                  
B Real is male and can be reached at 9995551212.

[bkarels@ahimsa simple-writing]$
If all has gone well you should see output like the above.  Please note, we are also TRUNCATING the goodperson and personfromtuple tables just before the program exits, so if you look in Cassandra those tables will be empty.  Feel free to comment out those lines and validate directly in Cassandra.

Then play around with it.  Extend the case classes, add columns to the schema, add more transformations, etc..  These simple examples will serve you best if you extend and make them your own.

What's next?

There is likely good cause to start playing with creating and altering table structures dynamically.

No comments:

Post a Comment