Thursday, January 29, 2015

Spark + Kafka + Cassandra (Part 3 - Spark Streaming Kafka Consumer)

Overview

Welcome to the part three of the series 'Spark + Kafka + Cassandra'.

Building on top of part one and part two, now it is time to consume a bunch of stuff from Kafka using Spark Streaming and dump it into Cassandra.  There really was no nice way to illustrate consumption without putting the messages somewhere - so why not go straight to c*?  Don't care about c*?  Feel free to write to stdout, HDFS, text file, whatever.


This piece is effectively designed to work with the message generator from part two, but you can put messages into your Kafka topic however you choose.

This instalment will have you:
  • Run ZooKeeper local (as part of Kafka distribution)
  • Run Kafka local
  • Run a local Spark Cluster
  • Create a Kafka Producer wrapped up in a Spark application (Part two)
  • Submit the application to generate messages sent to a given topic for some number of seconds.
  • Have a running Spark Streaming application ready and waiting to consume your topic and dump the results into a Cassandra table.

Prerequisites:

  • Java 1.7+ (Oracle JDK required)
  • Scala 2.10.4
  • SBT 0.13.x
  • A Spark cluster (how to do it here.)
  • ZooKeeper and Kafka running local 
  • git


Set it up...

It is not mandatory that you have gone through parts one and two of this series, but it will make this part more seamless to do so.  I recommend doing that and then return here.



Clone the Example

Begin by cloning the example project from github - spark-streaming-kafka-consumer, & cd into the project directory.

[bkarels@rev27 work]$ git clone https://github.com/bradkarels/spark-streaming-kafka-consumer.git
Cloning into 'spark-streaming-kafka-consumer'...
remote: Counting objects: 29, done.
remote: Total 29 (delta 0), reused 0 (delta 0)
Unpacking objects: 100% (29/29), done.
[bkarels@rev27 work]$ cd spark-streaming-kafka-consumer/
[bkarels@rev27 spark-streaming-kafka-consumer]$


Prepare the Application

In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.
[bkarels@rev27 spark-streaming-kafka-consumer]$ sbt
Picked up _JAVA_OPTIONS: -Xms1G -Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G
[info] Loading project definition from /home/bkarels/work/spark-streaming-kafka-consumer/project
[info] Updating {file:/home/bkarels/work/spark-streaming-kafka-consumer/project/}spark-streaming-kafka-consumer-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Spark Fu Streaming Kafka Consumer (in build file:/home/bkarels/work/spark-streaming-kafka-consumer/)
> assembly
...
[info] SHA-1: 4597553bcdecc6db6a67cad0883cc56cadb0be03
[info] Packaging /home/bkarels/work/spark-streaming-kafka-consumer/target/scala-2.10/sparkFuStreamingConsumer.jar ...
[info] Done packaging.
[success] Total time: 29 s, completed Jan 29, 2015 9:24:32 AM
>

Like before, take note of your jar file location (highlighted above).  Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to sparkFuStreamingConsumer.jar.

Spark it up!

If your local Spark cluster is not up and running, do that now.  Go here to see about getting 'r' done.

Make Sparks fly! (i.e. run it)

Having reviewed the code you have seen that out of the box, this streaming application expects ZooKeeper local on port 2181 & Kafka local with a topic of sparkfu.  Additionally, a local Cassandra instance with keyspace sparkfu and table messages should also exist.  The CQL for this table is at the root of the project.  See my post on local Cassandra if you do not yet have that bit in place.  Once Cassandra is up and running execute the CQL to create the messages table.

You should be able to see the following when done:
cqlsh> use sparkfu;
cqlsh:sparkfu> DESCRIBE TABLE messages;

CREATE TABLE messages (
  key text,
  msg text,
  PRIMARY KEY ((key))
) WITH
  bloom_filter_fp_chance=0.010000 AND
  caching='KEYS_ONLY' AND
...
  compaction={'class': 'SizeTieredCompactionStrategy'} AND
  compression={'sstable_compression': 'LZ4Compressor'};
...and so we're sure we're starting from zero:
cqlsh:sparkfu> TRUNCATE messages;
cqlsh:sparkfu> SELECT key, msg FROM messages;

(0 rows)

OK, spark it up:
[bkarels@rev27 spark-streaming-kafka-consumer]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.Consumer --master spark://127.0.0.1:7077 /home/bkarels/work/spark-streaming-kafka-consumer/target/scala-2.10/sparkFuStreamingConsumer.jar
...
Output will flow about here...
If all has gone well, your streaming application is now running on your local Spark cluster waiting for messages to hit your Kafka topic.

You can verify at http://localhost:4040.



If you have just recently completed part two, your application will likely be busy pulling messages from the sparkfu topic.  However, let's assume that's not the case.  So, fire up the message generator from part two and run it for a few seconds to publish some messages to the sparkfu topic.

[bkarels@rev27 spark-kafka-msg-generator]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.RandomMessages --master local[*] /home/bkarels/dev/spark-kafka-msg-generator/target/scala-2.10/sparkFuProducer.jar sparkfu 5 true
...
15/01/29 09:45:40 INFO SyncProducer: Connected to rev27:9092 for producing
5000 * 3 seconds left...
Produced 9282 msgs in 5s -> 1856.0m/s.
So, we should have 9282 messages in our topic (in this example).  We should also, by the time we look have that same number of messages in our messages table in Cassandra.

cqlsh:sparkfu> SELECT COUNT(*) FROM messages;

 count
-------
  9282

(1 rows)

Due to the nature of this example, the data itself will be uninteresting:

 cqlsh:sparkfu> SELECT key,msg FROM messages LIMIT 5;

 key                       | msg
---------------------------+------------------------------------------------------
 kUs3nkk0mv5fJ6eGvcLDrkQTd | 1422546343283,AMpOMjZeJozSy3t519QcUHRwl,...
 P6cUTChERoqZ7bOyDa3XjnHNs | 1422546344238,VDPCAhV3k3m5wfaUY0jAB8qB0,...
 KlRKLYnnlZY6NCpbKyQEIKLrF | 1422546343576,hdzvBKR7z2raTsxNYFoTFmeS2,...
 YGrBt2ZI7PPXrpopLsSTAwYrD | 1422546341519,cv0b7MEPdnrK1HuRL0GPDzMMP,...
 YsDWO67wKMuWBzyRpOiRSNpq2 | 1422546344491,RthQLkxPc5es7f2fYjTXRJnNu...

(5 rows)


So there you have it - about the most simple Spark Streaming Kafka consumer you can do.  Shoulders of giants people.  So tweak it, tune it, expand it, use it as your springboard.


What's next?

Still in flight for the next part of this series:
  • Not sure - I may just work custom Kafka encode/decode into this example.
  • May also, model the data in more of time series fashion to make reading from Kafka a more practical exercise.
  • Your thoughts?
 Thanks for checking this out - more to come!

17 comments:

  1. Thanks for this great post! I got almost everything working except the one below:


    foreachRDD at SparkFuConsumer.scala:49
    Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 128, 172.31.42.82): java.lang.Exception: Could not compute split, block input-0-1422993397600 not found
    at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

    Driver stacktrace:

    ReplyDelete
  2. Thank for you interesting tutorial mixing all the technologies I was research the lasts weeks.
    Do you think that this architecture will suit a system for save industry sensor data?

    later on I wanted to query it using spark and apply machine learning for get more in deep knowledge but first step is to store the data.

    Thanks again

    ReplyDelete
    Replies
    1. I will not say that gathering sensor data is a "classic" use case as Spark has not been around long enough for anything Spark to be "classic" - but yes, the Kafka -> Spark Streaming -> Cassandra stack is very well suited to sensor data. That said, it is not your only option. Consider your use cases carefully and don't be afraid to look at things that can also work well in your enterprise. Spark is sexy and shiny, but Flume, Storm, and HDFS could get the job done just as well in many cases. 0MQ, Rabbit, Kestrel and others also play where Kafka plays - each has pros and cons and should be at least reviewed.

      So, now that I've posted my "not a fan-boy" bits - Kafka -> Spark Streaming -> Cassandra is a wicked awesome stack that I think will become more predominant in the coming year. As Spark matures I suspect we will see projects migrate from Storm. Similarly, as Cassandra gains acceptance we'll see hot data move out of other stores and into C*. A lot of time and money has been spent on traditional map reduce with HDFS and until stake holders can be shown demonstrated ROI on the new stack there will be little movement. While that movement has started, I believe we'll see it accelerate as we move through 2015.

      Delete
  3. I forget to reply you giving thanks for the info. Greetings from Germany(but I am spanish) you have a free beer waiting :P

    ReplyDelete
  4. I enjoyed all your tutorials and I really thank you for easing the learning curve of Spark, r3volutionary!

    I want to ask you a question regarding a particular use case. I just want to query data from Cassandra, process it and produce an output (for example into Kafka). The data process is expected to be non-tribial, maybe merging the Cassandra data with other systems, consume caches, etc. The process must be almost real time (pseudo-synchronous):
    -Is this stack needed or its too much? There are simpler ways to consume Cassandra data and aggregate/process it which also offers scalability and good performance?
    -What would be the proper workflow? Keep a Spark job alive which consumes user petitions or start a new Spark job for each user petition?

    Thank you! Looking forward to seeing new tutorials!

    ReplyDelete
    Replies
    1. Albert - sorry it has taken me so long to see this - my paying job has kept me too busy to dedicate more time to Spark-Fu of recent. More stuff is coming, just as soon as free time becomes free...

      I'm afraid I do not fully grok your use case - indeed it does sound non-trivial. I would just advise caution not to create a circular consumption of streams - that could get very ugly, very fast. That is, make sure all of your pipelines have a destination.

      I will say that Kafka and Spark Streaming are wonderful and sexy, but they are not always the final solution. Don't be afraid to look at how some immutable portion of your intermediate results might be kept in HDFS to be reprocessed periodically. Also, if you're executing C* queries, depending on how you're going about it, a standard Spark job (non-streaming) on a schedule (e.g. cron) might fit your requirements. If you have some serious near real time requirements perhaps there are elements of solr that will work for you?

      If you did want to stick to more strict Spark/Kafka/C* stack, recall that you can tune your interval on your streaming jobs. So, if your primary job is outputing intermediate results to various Kafka topics, the streaming jobs that consume those streams may process at slightly longer intervals. Of course, that all depends on the size and speed of your cluster, the volume and rate of your data...so many variables.

      Best of luck to you - let me know how things go!

      Oh, also, remember you can look beyond Kafka to feed Spark Streaming. If it fits your use case HDFS, Kinesis, 0MQ, or even Flume might get you past a technical hurdle.

      Delete
  5. The information shared are very much useful My sincere thanks for sharing this post Please Continue to share this post
    Hadoop Training in Chennai

    ReplyDelete
    Replies
    1. Hi, Great.. Tutorial is just awesome..It is really helpful for a newbie like me.. I am a regular follower of your blog. Really very informative post you shared here. Kindly keep blogging. If anyone wants to become a Java developer learn from Java Training in Chennai. or learn thru Java Online Training India . Nowadays Java has tons of job opportunities on various vertical industry.

      Delete
  6. nice blog has been shared by you. before i read this blog i didn't have any knowledge about this but now i got some knowledge. so keep on sharing such kind of an interesting blogs.
    hadoop training in chennai

    ReplyDelete
  7. Excellent blog. Thank you for sharing with us. The information you shared is very effective for learners and I have got some important suggestions from your blog post. Software Testing Training in Chennai | Big data Analytics Training in Chennai

    ReplyDelete
  8. Hi, I am really happy to found such a helpful and fascinating post that is written in well manner. Thanks for sharing such an informative post..Big Data Hadoop Training in Bangalore | Data Science Training in Bangalore

    ReplyDelete
  9. Well Said, you have provided the right info that will be beneficial to somebody at all time. Thanks for sharing your valuable Ideas to our vision.


    Hadoop Training in Marathallai



    Hadoop Training in BtmLayout

    ReplyDelete
  10. Fantastic blog., The Art Education was superb., Really interesting to read, Thanks for sharing such a nice blog.Cloud Computing Project Center in Chennai | Cloud Computing Projects in Velachery

    ReplyDelete