While it's true that Hadoop MR is more mature technology than Spark is, it's growing fast and vendors are including it in their distributions. A key element of Spark is its performance, up to 100 times faster on memory or 10 times faster on disk for certain applications as they said, improving iterative processing or interactive analysis keeping data in-memory. While Spark is more oriented in this way, Hadoop MR is in batch mode by nature (although there are other technologies around it that are not). If you want to execute jobs much faster and have enough memory, probably Sparks fits better than Hadoop MR does. As always, it'll depend on what you want to do.
Spark is written in Scala but includes a Java and Python APIs as well.
In this demo I'll show an usage example of Spark Streaming with Apache Kafka (a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system), integrating the producer and the consumer. Since this post is a consequence of this meetup, the information source to be processed will be from Meetup RSVPs, using its streaming API. :-P
The idea is to rank most commonly used group topics found in affirmative RSVPs from the Meetup streaming API every 20 seconds. The steps are as follows:
- Develop a producer to get from Meetup the RSVPs records (JSON values). These records are sent to Kafka categorized in a topic named "meetup-rsvps".
- Setting up Kafka and Zookeper and create the topic "meetup-rsvps".
- Implement the driver: get the Kafka stream from the StreamingContext subscribed to that topic and process each record; first filtering RSVPs by response (just "yes" value accepted) and then aggregating keys by the reduce function in a sliding window (20 secs.).
import java.net.URL
import java.net.URLConnection
import java.io.InputStream
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.JsonToken._
import com.fasterxml.jackson.databind.ObjectMapper
import java.util.{Properties, UUID}
import kafka.producer.Producer
import kafka.producer.async._
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import org.json4s._
import org.json4s.jackson.JsonMethods._
object JsonStreamingExample {
def main(args: Array[String]) {
val kafkaProducer = new KafkaProducer("meetup-rsvps")
val url = new URL("http://stream.meetup.com/2/rsvps")
val conn = url.openConnection()
conn.addRequestProperty("User-Agent",
"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)")
val jsonFactory = new JsonFactory(new ObjectMapper)
val parser = jsonFactory.createParser(conn.getInputStream)
while (parser.nextToken() != null) {
kafkaProducer.send(parser.readValueAsTree().toString())
}
}
}
case class KafkaProducer(
topic: String,
clientId: String = UUID.randomUUID().toString
){
val props = new Properties()
props.put("producer.type", "async")
props.put("metadata.broker.list", "localhost:9092")
props.put("message.send.max.retries","3")
props.put("request.required.acks", "-1")
props.put("client.id", clientId)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
def send(message: String, partition: String = null): Unit = {
val kmessage: KeyedMessage[AnyRef, AnyRef] =
if (partition == null)
new KeyedMessage(topic, message.getBytes("UTF-8"))
else
new KeyedMessage(topic, partition.getBytes("UTF-8"), message.getBytes("UTF-8"))
try {
producer.send(kmessage)
} catch {
case e: Exception =>
e.printStackTrace
sys.exit(1)
}
}
}
You must take into account the highlighted lines. If not, you'll get a 403 HTTP error code.On the other hand, you'll need to execute these commands to start Zookeeper and Kafka (and create the topic):
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meetup-rsvps
bin/kafka-topics.sh --list --zookeeper localhost:2181
And finally, the driver code working as a Kafka consumer. You'll get a range with the most used group topics within the 20-sec window.
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.Logging
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.log4j.{Level, Logger}
object MeetupStreamingTopics {
def main(args: Array[String]) {
Logger.getRootLogger.setLevel(Level.WARN)
val sparkConf = new SparkConf().setAppName("Meetup Streaming Group Topics").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(20))
val topics = Map("meetup-rsvps" -> 1)
val stream = KafkaUtils.createStream(ssc, "localhost:2181", "rsvpsGroup", topics).map(_._2)
val rsvpsOk = stream.filter(f => {
val rsvp = parse(f)
val JString(response) = rsvp \ "response"
response.equals("yes")
}).flatMap (m => {
val rsvp = parse(m)
try {
val JArray(fields) = rsvp \ "group" \ "group_topics" \ "urlkey"
fields.map { x => (compact(x), 1) }
} catch {
case me : MatchError =>
val JString(fields) = rsvp \ "group" \ "group_topics" \ "urlkey"
fields.map { x => (x, 1) }
}
})
val topCounts10 = rsvpsOk.reduceByKeyAndWindow(_ + _, Seconds(20))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular group topics in last 20 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s rsvps)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()
}
}
Interesting, isn't it? I'll try to write more posts about Spark asap!
very nice article apache kafka training
ReplyDeleteYou did a great job in this blog. I am very impressed by your amazing effort. Thanks for sharing this informative post...
ReplyDeletehadoop big data training in pune
hadoop pune
hadoop testing