Monday, December 8, 2014

Spark 1.1.1 to read from Cassandra into scala case class - Simple Example

As we continue to advance our use cases for Apache Spark on Cassandra it seems only right that our next case puts data into a case class.

Following previous examples, this will be the most simple use case - more elegant and advanced usages are coming...

Update! (2014-12-10): I have added fetching Cassandra data direct into a case class as intended by the connector.  The source for the example has been updated and pushed to the repository.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • A Cassandra cluster (how to do it here.)
  • git

Overview

For this example we will pull data for various humans out of Cassandra and put the relevant bits into our model.  Then we will see about working with only good persons (as determined by the creators of our data - our algorithm is not that advanced...yet).

Clone the Example

Begin by cloning the example project from github - our project is spark-cassandra-to-scala-case-class, & cd into the project directory.
[bkarels@ahimsa work]$ git clone https://github.com/bradkarels/spark-cassandra-to-scala-case-class.git simple-case
Initialized empty Git repository in /home/bkarels/work/simple-case/.git/
remote: Counting objects: 21, done.
remote: Compressing objects: 100% (15/15), done.
remote: Total 21 (delta 0), reused 17 (delta 0)
Unpacking objects: 100% (21/21), done.
[bkarels@ahimsa work]$ cd simple-case/
[bkarels@ahimsa simple-case]$ l
total 32K
-rw-rw-r--. 1 bkarels bkarels 1.2K Dec  8 12:24 assembly.sbt
-rw-rw-r--. 1 bkarels bkarels  298 Dec  8 12:24 build.sbt
-rw-rw-r--. 1 bkarels bkarels 1.1K Dec  8 12:24 LICENSE
-rw-rw-r--. 1 bkarels bkarels 1.1K Dec  8 12:24 nicecase.cql
-rw-rw-r--. 1 bkarels bkarels 3.2K Dec  8 12:24 populateHumans.cql
drwxrwxr-x. 2 bkarels bkarels 4.0K Dec  8 12:24 project
-rw-rw-r--. 1 bkarels bkarels  228 Dec  8 12:24 README.md
drwxrwxr-x. 3 bkarels bkarels 4.0K Dec  8 12:24 src

Prepare the Data

At the root of the project you will see two CQL files: nicecase.cql & populateHumans.cql.  You will need to execute these two files against your local Cassandra instance from Datastax DevCenter or cqlsh (or some other tool) to set things up.

Begin by executing nicecase.cql to create your keyspace and tables:
CREATE KEYSPACE nicecase WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE nicecase.human (
    id TIMEUUID,
    firstname TEXT,
    lastname TEXT,
    gender TEXT,
    address0 TEXT,
    address1 TEXT,
    city TEXT,
    stateprov TEXT,
    zippostal TEXT,
    country TEXT,
    phone TEXT,
    "isGoodPerson" BOOLEAN, // Case sensitive column name - generally not recommended, but possible.
    PRIMARY KEY(id)
);

CREATE INDEX good_person ON nicecase.human ( "isGoodPerson" ); // We want to be able to find good people quickly

CREATE INDEX person_state ON nicecase.human ( stateprov ); // Maybe we need good people by state?

CREATE INDEX ON nicecase.human ( firstname ); // Let Cassandra use the default name for the index.  Good people tend to be named "Brad" - let's find them fast too!

CREATE TABLE nicecase.goodhuman (
    human_id TIMEUUID,
    PRIMARY KEY(human_id)
);

CREATE TABLE nicecase.badhuman (
    human_id TIMEUUID,
    PRIMARY KEY(human_id)
);

CREATE TABLE nicecase.goodbrad (
    human_id TIMEUUID,
    firstname TEXT,
    PRIMARY KEY(human_id)
);
There are a few extra bits in here that we won't be using in this example (indexes, extra tiny tables), but they won't hurt anything and we will likely use them in our next example or two.

Next, execute populateHumans.cql to load up our human table with some data.
 INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Pete', 'Jones', 'm', '555 Astor Lane',null,'Minneapolis','MN','55401','USA','6125551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Neil', 'Harris', 'm', '123 Doogie Howser Way',null,'Los Angeles','CA','90211','USA','9045551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Hiro', 'Ryoshi', 'f', '42 Cemetary Road','Apt. 23','River Falls','WI','55301','USA','7155551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Biel', 'SaBubb', 'm', '666 Hellpass',null,'Demonville','KY','32054','USA','3125551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Edward', 'Snowden', 'm', null,null,null,null,null,'RU',null,True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Mother', 'Theresa', 'f', null,null,null,null,null,null,null,True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Fatima', 'Nagossa', 'f', '689 First Ave.','Apt 1b','Orlando','FL','32822','USA','7895551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Casey', 'Steals-a-lot', 'f', '71 Buster Lane',null,'Denver','CO','74811','USA','7895551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'B', 'Real', 'm', '420 High Way','I forgot','Palo Alto','CA','90255','USA','9995551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'JJ', 'Jones', 't', '123 Sycamore Way',null,'Las Cruces','CA','91553','USA','9995551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Diane', 'Feinstein', '?', 'Do not care',null,'Some City','CA','99999','USA','1235551212',False);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Brad', 'Karels', 'm', '123 Nice Guy Blvd.',null,'Minneapolis','MN','55402','USA','6125551212',True);
INSERT INTO nicecase.human (id, firstname, lastname, gender, address0, address1,city, stateprov, zippostal, country, phone, "isGoodPerson")
  VALUES (now(),'Alysia', 'Yeoh', 't', '1 Bat Girl Way',null,'Metropolis','YX','55666','USA','3215551212',True);
There is a lot more data here than we will use for this example, but again, the near future is coming...  Most notable here is how we use the CQL function now() to set the value for our primary key of type TIMEUUID.  We are also, quite intentionally, leaving much of the data values null.  Real data has nulls, might as well get the hang of dealing with it in this context as well.

Assuming you did not encounter any errors, your Cassandra instance is now ready for this example.

Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@ahimsa simple-case]$ sbt
[info] Loading project definition from /home/bkarels/work/simple-case/project
[info] Updating {file:/home/bkarels/work/simple-case/project/}simple-case-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                              
[info] Done updating.                                                            
[info] Set current project to Simple Case (in build file:/home/bkarels/work/simple-case/)
> assembly
[info] Updating {file:/home/bkarels/work/simple-case/}simple-case...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...                
[info] Done updating.                                              
[info] Compiling 1 Scala source to /home/bkarels/work/simple-case/target/scala-2.10/classes...                                                                 
[info] Including: minlog-1.2.jar                                                             
...                                               
[info] Checking every *.class/*.jar file's SHA-1.                                            
...
[info] SHA-1: 5b17b5b5ccb29df92a662ad3f404573e7470d576
[info] Packaging /home/bkarels/work/simple-case/target/scala-2.10/CaseStudy.jar ...
[info] Done packaging.
[success] Total time: 28 s, completed Dec 8, 2014 12:37:30 PM
>
As per normal, take note of where your jar file gets put (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to CaseStudy.jar.

Spark it up!

If your local Spark cluster is not up and running, do that now.  If you need to review how to go about that, you can look here.

Make Sparks fly! (i.e. run it)

OK, time to find out who the good people are...
This very simple example will pull a set of CassandraRows out of our human table in Cassandra, iterate over the rows and create a list of Human case classes.  It will then rip through our list of Humans and nicely tell us who is a good person and who is not so much.

An interesting preview of things to come is commented out at or about line 29 of SimpleApp.scala:
row.columnNames
As Cassandra is NoSQL a different set of columns could exist for each row.  For our software to be reactive, we need mechanisms to detect these differences so that they can be handled.  So play with that - it likely will be part of your future.

With that noted, let's run things.  Using the location of our CaseStudy.jar from above, open a terminal and use spark-submit to submit the application to you Spark cluster:
[bkarels@ahimsa simple-case]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.CaseStudy --master spark://127.0.0.1:7077 /home/bkarels/work/simple-case/target/scala-2.10/CaseStudy.jar
...
Fatima Nagossa is a good person.
Neil Harris is a good person.
Pete Jones is a good person.
Joe Jones is not a good person.
Edward Snowden is a good person.
Brad Karels is a good person.
Biel SaBubb is not a good person.
Mother Theresa is a good person.
Alysia Yeoh is a good person.
Hiro Ryoshi is a good person.
B Real is a good person.
Casey Steals-a-lot is not a good person.
Diane Feinstein is not a good person.
If all has gone to plan, you should see output like the following above.  You will actually see two sets of the above out put as the example does the same operation two ways - fetch direct to case class and a more verbose row parsing mechanism.

What's next?

Next we hope to look into writing data to Cassandra from scala case classes.  Or some other interesting thing...

FIN

No comments:

Post a Comment