Overview
Welcome to the part three of the series 'Spark + Kafka + Cassandra'.Building on top of part one and part two, now it is time to consume a bunch of stuff from Kafka using Spark Streaming and dump it into Cassandra. There really was no nice way to illustrate consumption without putting the messages somewhere - so why not go straight to c*? Don't care about c*? Feel free to write to stdout, HDFS, text file, whatever.
This piece is effectively designed to work with the message generator from part two, but you can put messages into your Kafka topic however you choose.
This instalment will have you:
- Run ZooKeeper local (as part of Kafka distribution)
- Run Kafka local
- Run a local Spark Cluster
- Create a Kafka Producer wrapped up in a Spark application (Part two)
- Submit the application to generate messages sent to a given topic for some number of seconds.
- Have a running Spark Streaming application ready and waiting to consume your topic and dump the results into a Cassandra table.
Prerequisites:
- Java 1.7+ (Oracle JDK required)
- Scala 2.10.4
- SBT 0.13.x
- A Spark cluster (how to do it here.)
- ZooKeeper and Kafka running local
- git
Set it up...
It is not mandatory that you have gone through parts one and two of this series, but it will make this part more seamless to do so. I recommend doing that and then return here.Clone the Example
Begin by cloning the example project from github - spark-streaming-kafka-consumer, & cd into the project directory.[bkarels@rev27 work]$ git clone https://github.com/bradkarels/spark-streaming-kafka-consumer.git
Cloning into 'spark-streaming-kafka-consumer'...
remote: Counting objects: 29, done.
remote: Total 29 (delta 0), reused 0 (delta 0)
Unpacking objects: 100% (29/29), done.
[bkarels@rev27 work]$ cd spark-streaming-kafka-consumer/
[bkarels@rev27 spark-streaming-kafka-consumer]$
Prepare the Application
In a terminal, from the project root, fire up SBT and assembly the project to create your application jar file.[bkarels@rev27 spark-streaming-kafka-consumer]$ sbt
Picked up _JAVA_OPTIONS: -Xms1G -Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G
[info] Loading project definition from /home/bkarels/work/spark-streaming-kafka-consumer/project
[info] Updating {file:/home/bkarels/work/spark-streaming-kafka-consumer/project/}spark-streaming-kafka-consumer-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Spark Fu Streaming Kafka Consumer (in build file:/home/bkarels/work/spark-streaming-kafka-consumer/)
> assembly
...
[info] SHA-1: 4597553bcdecc6db6a67cad0883cc56cadb0be03
[info] Packaging /home/bkarels/work/spark-streaming-kafka-consumer/target/scala-2.10/sparkFuStreamingConsumer.jar ...
[info] Done packaging.
[success] Total time: 29 s, completed Jan 29, 2015 9:24:32 AM
>
Like before, take note of your jar file location (highlighted above). Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to sparkFuStreamingConsumer.jar.
Spark it up!
If your local Spark cluster is not up and running, do that now. Go here to see about getting 'r' done.Make Sparks fly! (i.e. run it)
Having reviewed the code you have seen that out of the box, this streaming application expects ZooKeeper local on port 2181 & Kafka local with a topic of sparkfu. Additionally, a local Cassandra instance with keyspace sparkfu and table messages should also exist. The CQL for this table is at the root of the project. See my post on local Cassandra if you do not yet have that bit in place. Once Cassandra is up and running execute the CQL to create the messages table.You should be able to see the following when done:
cqlsh> use sparkfu;...and so we're sure we're starting from zero:
cqlsh:sparkfu> DESCRIBE TABLE messages;
CREATE TABLE messages (
key text,
msg text,
PRIMARY KEY ((key))
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
...
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};
cqlsh:sparkfu> TRUNCATE messages;
cqlsh:sparkfu> SELECT key, msg FROM messages;
(0 rows)
OK, spark it up:
[bkarels@rev27 spark-streaming-kafka-consumer]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.Consumer --master spark://127.0.0.1:7077 /home/bkarels/work/spark-streaming-kafka-consumer/target/scala-2.10/sparkFuStreamingConsumer.jarIf all has gone well, your streaming application is now running on your local Spark cluster waiting for messages to hit your Kafka topic.
...
Output will flow about here...
You can verify at http://localhost:4040.
If you have just recently completed part two, your application will likely be busy pulling messages from the sparkfu topic. However, let's assume that's not the case. So, fire up the message generator from part two and run it for a few seconds to publish some messages to the sparkfu topic.
So, we should have 9282 messages in our topic (in this example). We should also, by the time we look have that same number of messages in our messages table in Cassandra.[bkarels@rev27 spark-kafka-msg-generator]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.RandomMessages --master local[*] /home/bkarels/dev/spark-kafka-msg-generator/target/scala-2.10/sparkFuProducer.jar sparkfu 5 true...
15/01/29 09:45:40 INFO SyncProducer: Connected to rev27:9092 for producing
5000 * 3 seconds left...
Produced 9282 msgs in 5s -> 1856.0m/s.
cqlsh:sparkfu> SELECT COUNT(*) FROM messages;
count
-------
9282
(1 rows)
Due to the nature of this example, the data itself will be uninteresting:
cqlsh:sparkfu> SELECT key,msg FROM messages LIMIT 5;
key | msg
---------------------------+------------------------------------------------------
kUs3nkk0mv5fJ6eGvcLDrkQTd | 1422546343283,AMpOMjZeJozSy3t519QcUHRwl,...
P6cUTChERoqZ7bOyDa3XjnHNs | 1422546344238,VDPCAhV3k3m5wfaUY0jAB8qB0,...
KlRKLYnnlZY6NCpbKyQEIKLrF | 1422546343576,hdzvBKR7z2raTsxNYFoTFmeS2,...
YGrBt2ZI7PPXrpopLsSTAwYrD | 1422546341519,cv0b7MEPdnrK1HuRL0GPDzMMP,...
YsDWO67wKMuWBzyRpOiRSNpq2 | 1422546344491,RthQLkxPc5es7f2fYjTXRJnNu...
(5 rows)
So there you have it - about the most simple Spark Streaming Kafka consumer you can do. Shoulders of giants people. So tweak it, tune it, expand it, use it as your springboard.
What's next?
Still in flight for the next part of this series:- Not sure - I may just work custom Kafka encode/decode into this example.
- May also, model the data in more of time series fashion to make reading from Kafka a more practical exercise.
- Your thoughts?