Things to look for in the coming weeks:
- Cluster install using Ambari
- Spark on Yarn introduction
- Updated examples with Scala 2.11.x and Kafka 0.8.2.1
- Freshening previous stuff with current Spark version
- Going current on SBT as well.
Developing with Apache Spark on Cassandra - tips, tricks, & examples. This is the place where I will provide the documentation around applications I am building using Apache Spark and Cassandra. Feedback, comments, questions, & corrections always welcome.
[bkarels@rev27 work]$ git clone https://github.com/bradkarels/spark-streaming-kafka-consumer.git
Cloning into 'spark-streaming-kafka-consumer'...
remote: Counting objects: 29, done.
remote: Total 29 (delta 0), reused 0 (delta 0)
Unpacking objects: 100% (29/29), done.
[bkarels@rev27 work]$ cd spark-streaming-kafka-consumer/
[bkarels@rev27 spark-streaming-kafka-consumer]$
[bkarels@rev27 spark-streaming-kafka-consumer]$ sbt
Picked up _JAVA_OPTIONS: -Xms1G -Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G
[info] Loading project definition from /home/bkarels/work/spark-streaming-kafka-consumer/project
[info] Updating {file:/home/bkarels/work/spark-streaming-kafka-consumer/project/}spark-streaming-kafka-consumer-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Spark Fu Streaming Kafka Consumer (in build file:/home/bkarels/work/spark-streaming-kafka-consumer/)
> assembly
...
[info] SHA-1: 4597553bcdecc6db6a67cad0883cc56cadb0be03
[info] Packaging /home/bkarels/work/spark-streaming-kafka-consumer/target/scala-2.10/sparkFuStreamingConsumer.jar ...
[info] Done packaging.
[success] Total time: 29 s, completed Jan 29, 2015 9:24:32 AM
>
cqlsh> use sparkfu;...and so we're sure we're starting from zero:
cqlsh:sparkfu> DESCRIBE TABLE messages;
CREATE TABLE messages (
key text,
msg text,
PRIMARY KEY ((key))
) WITH
bloom_filter_fp_chance=0.010000 AND
caching='KEYS_ONLY' AND
...
compaction={'class': 'SizeTieredCompactionStrategy'} AND
compression={'sstable_compression': 'LZ4Compressor'};
cqlsh:sparkfu> TRUNCATE messages;
cqlsh:sparkfu> SELECT key, msg FROM messages;
(0 rows)
[bkarels@rev27 spark-streaming-kafka-consumer]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.Consumer --master spark://127.0.0.1:7077 /home/bkarels/work/spark-streaming-kafka-consumer/target/scala-2.10/sparkFuStreamingConsumer.jarIf all has gone well, your streaming application is now running on your local Spark cluster waiting for messages to hit your Kafka topic.
...
Output will flow about here...
So, we should have 9282 messages in our topic (in this example). We should also, by the time we look have that same number of messages in our messages table in Cassandra.[bkarels@rev27 spark-kafka-msg-generator]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.RandomMessages --master local[*] /home/bkarels/dev/spark-kafka-msg-generator/target/scala-2.10/sparkFuProducer.jar sparkfu 5 true...
15/01/29 09:45:40 INFO SyncProducer: Connected to rev27:9092 for producing
5000 * 3 seconds left...
Produced 9282 msgs in 5s -> 1856.0m/s.
cqlsh:sparkfu> SELECT COUNT(*) FROM messages;
count
-------
9282
(1 rows)
cqlsh:sparkfu> SELECT key,msg FROM messages LIMIT 5;
key | msg
---------------------------+------------------------------------------------------
kUs3nkk0mv5fJ6eGvcLDrkQTd | 1422546343283,AMpOMjZeJozSy3t519QcUHRwl,...
P6cUTChERoqZ7bOyDa3XjnHNs | 1422546344238,VDPCAhV3k3m5wfaUY0jAB8qB0,...
KlRKLYnnlZY6NCpbKyQEIKLrF | 1422546343576,hdzvBKR7z2raTsxNYFoTFmeS2,...
YGrBt2ZI7PPXrpopLsSTAwYrD | 1422546341519,cv0b7MEPdnrK1HuRL0GPDzMMP,...
YsDWO67wKMuWBzyRpOiRSNpq2 | 1422546344491,RthQLkxPc5es7f2fYjTXRJnNu...
(5 rows)
[bkarels@rev27 work]$ git clone https://github.com/bradkarels/spark-kafka-msg-generator.git
Cloning into 'spark-kafka-msg-generator'...
remote: Counting objects: 19, done.
remote: Compressing objects: 100% (13/13), done.
remote: Total 19 (delta 0), reused 15 (delta 0)
Unpacking objects: 100% (19/19), done.
[bkarels@rev27 work]$ cd spark-kafka-msg-generator/
[bkarels@rev27 spark-kafka-msg-generator]$
[bkarels@rev27 spark-kafka-msg-generator]$ sbt
Picked up _JAVA_OPTIONS: -Xms1G -Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G
[info] Loading project definition from /home/bkarels/work/spark-kafka-msg-generator/project
[info] Updating {file:/home/bkarels/work/spark-kafka-msg-generator/project/}spark-kafka-msg-generator-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to MsgSpewer (in build file:/home/bkarels/work/spark-kafka-msg-generator/)
> assembly
...
[info] SHA-1: cddec14059d6a435847f8dd4b4b6f15f6899c0c3
[info] Packaging /home/bkarels/work/spark-kafka-msg-generator/target/scala-2.10/sparkFuProducer.jar ...
[info] Done packaging.
[success] Total time: 53 s, completed Jan 26, 2015 9:10:16 AM
[[topic] [duration]] [verbose]]]Argument Examples:
Publish to topic sparkfu for 20 seconds and do print additional output:
sparkfu 20 true
Publish to topic filthpig for 3600 seconds and do not print additional output:
filthpig 3600
Publish to topic schweinehund for 10 seconds and do not print additional output:
schweinehund
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sparkfu --from-beginning
$SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.RandomMessages --master local[*] ~/dev/spark-kafka-msg-generator/target/scala-2.10/sparkFuProducer.jar sparkfu 20 trueIf you were running your local consumer you should see a big ol' pile of alphanumeric randomness stream by. Just like that you can spew a pretty good chunk of messages to a topic on command.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Picked up _JAVA_OPTIONS: -Xms1G -Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G
This will run for: 20s
...
15/01/26 09:24:42 INFO SyncProducer: Connected to rev27:9092 for producing
5000 * 18 seconds left...
10000 * 17 seconds left...
...
115000 * 3 seconds left...
120000 * 2 seconds left...
130000 * 1 seconds left...
Produced 134137 msgs in 20s -> 6706.0m/s.
[bkarels@rev27 kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.propertiesYes, there is an infinite number of ways to get ZK up and running - but the above is as far as I'm going here. Simple and functional.
[bkarels@rev27 kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties &Create topic `sparkfu`:
[bkarels@rev27 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sparkfu &List our topic(s) as an exercise:
[bkarels@rev27 kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181All of the above commands spit out a goodly amount of "stuff" to your console, but just a bit of careful examination should have you feeling confident that everything is up and running correctly. Most importantly, when you listed out your topics that somewhere in that output you saw sparkfu listed (will be on it's own line).
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sparkfu --from-beginningIn your other terminal we'll fire up a producer:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sparkfuOnce your producer is started and staring at you, you can enter any string and press [Enter] to produce the message. Within a moment you should see your message parroted back to you in the terminal where you started up the consumer. If you crave greater details review the Kafka documentation.
[bkarels@rev27 work]$ git clone https://github.com/bradkarels/spark-kafka-producer.git
Cloning into 'spark-kafka-producer'...
remote: Counting objects: 19, done.
remote: Compressing objects: 100% (13/13), done.
remote: Total 19 (delta 0), reused 15 (delta 0)
Unpacking objects: 100% (19/19), done.
[bkarels@rev27 work]$ cd spark-kafka-producer/
[bkarels@rev27 spark-kafka-producer]$
As per normal, take note of where your jar file gets put (highlighted above). Also, note that we have set the resulting jar file name in assembly.sbt - here we have set it to sparkFuProducer.jar.[bkarels@rev27 spark-kafka-producer]$ sbt
Picked up _JAVA_OPTIONS: -Xms1G -Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G
[info] Loading project definition from /home/bkarels/work/spark-kafka-producer/project
[info] Updating {file:/home/bkarels/work/spark-kafka-producer/project/}spark-kafka-producer-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to Spark Fu Kafka Producer (in build file:/home/bkarels/work/spark-kafka-producer/)
> assembly
...
[info] SHA-1: 69db758e5dd205ae60875f00388cd5c935955773
[info] Packaging /home/bkarels/work/spark-kafka-producer/target/scala-2.10/sparkFuProducer.jar ...
[info] Done packaging.
[success] Total time: 11 s, completed Jan 20, 2015 12:08:44 PM
>
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic sparkfu --from-beginningTruth be told you could start it up after the fact since we are telling it to read all messages from the start of time. But isn't it more fun to see messages come in as your job is sending them? Yes, it is.
[bkarels@rev27 kafka_2.10-0.8.1.1]$ $SPARK_HOME/bin/spark-submit --class com.bradkarels.simple.Messenger --master spark://127.0.0.1:7077 /home/bkarels/dev/spark-kafka-producer/target/scala-2.10/sparkFuProducer.jarIf all has gone well you should see output like the following in your consumer as the application runs:
[2015-01-20 12:28:09,547] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
What the foo?
What the bar?
What the baz?
No! No! NO! What the FU!
[2015-01-20 12:28:10,071] INFO Closing socket connection to /192.168.254.153. (kafka.network.Processor)