Pages

Saturday, 31 January 2015

Playing with Spark Streaming & Kafka

Last week I was in a meetup in which people from Stratio were talking about Spark Streaming and also about its core, Apache Spark (including the basics: RDDs, transformations, actions, etc.). I had already read some articles about this cutting-edge technology but I had not had the opportunity to dig into it, so new knolewdge is always welcome. :-) You can find the presentation here.

While it's true that Hadoop MR is more mature technology than Spark is, it's growing fast and vendors are including it in their distributions. A key element of Spark is its performance, up to 100 times faster on memory or 10 times faster on disk for certain applications as they said, improving iterative processing or interactive analysis keeping data in-memory. While Spark is more oriented in this way, Hadoop MR is in batch mode by nature (although there are other technologies around it that are not). If you want to execute jobs much faster and have enough memory, probably Sparks fits better than Hadoop MR does. As always, it'll depend on what you want to do.
Spark is written in Scala but includes a Java and Python APIs as well.

In this demo I'll show an usage example of Spark Streaming with Apache Kafka (a fast, scalable, durable, and fault-tolerant publish-subscribe messaging system), integrating the producer and the consumer. Since this post is a consequence of this meetup, the information source to be processed will be from Meetup RSVPs, using its streaming API. :-P

The idea is to rank most commonly used group topics found in affirmative RSVPs from the Meetup streaming API every 20 seconds. The steps are as follows:
  1. Develop a producer to get from Meetup the RSVPs records (JSON values). These records are sent to Kafka categorized in a topic named "meetup-rsvps".
  2. Setting up Kafka and Zookeper and create the topic "meetup-rsvps".
  3. Implement the driver: get the Kafka stream from the StreamingContext subscribed to that topic and process each record; first filtering RSVPs by response (just "yes" value accepted) and then aggregating keys by the reduce function in a sliding window (20 secs.).
I developed this example in Scala. If you like you can test it with Scala 2.10 with no problems. This is the producer:

import java.net.URL
import java.net.URLConnection
import java.io.InputStream
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.JsonToken._
import com.fasterxml.jackson.databind.ObjectMapper
import java.util.{Properties, UUID}
import kafka.producer.Producer
import kafka.producer.async._
import kafka.producer.ProducerConfig
import kafka.producer.KeyedMessage
import org.json4s._
import org.json4s.jackson.JsonMethods._

object JsonStreamingExample {
  def main(args: Array[String]) {
  
    val kafkaProducer = new KafkaProducer("meetup-rsvps")
    val url = new URL("http://stream.meetup.com/2/rsvps")
    val conn = url.openConnection()
    conn.addRequestProperty("User-Agent",
      "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)")

    val jsonFactory = new JsonFactory(new ObjectMapper)
    val parser = jsonFactory.createParser(conn.getInputStream)

    while (parser.nextToken() != null) {
      kafkaProducer.send(parser.readValueAsTree().toString())
    }
  }
}

case class KafkaProducer(
  topic: String, 
  clientId: String = UUID.randomUUID().toString
  ){ 
  
  val props = new Properties()
  props.put("producer.type", "async")
  props.put("metadata.broker.list", "localhost:9092")
  props.put("message.send.max.retries","3")
  props.put("request.required.acks", "-1")
  props.put("client.id", clientId)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
  
  def send(message: String, partition: String = null): Unit = {
    val kmessage: KeyedMessage[AnyRef, AnyRef] =
      if (partition == null)
        new KeyedMessage(topic, message.getBytes("UTF-8"))
      else
        new KeyedMessage(topic, partition.getBytes("UTF-8"), message.getBytes("UTF-8"))
    
    try {
      producer.send(kmessage)
    } catch {
      case e: Exception =>
        e.printStackTrace
        sys.exit(1)
    }
  }
}
You must take into account the highlighted lines. If not, you'll get a 403 HTTP error code.

On the other hand, you'll need to execute these commands to start Zookeeper and Kafka (and create the topic):

bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meetup-rsvps
bin/kafka-topics.sh --list --zookeeper localhost:2181

And finally, the driver code working as a Kafka consumer. You'll get a range with the most used group topics within the 20-sec window.

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.Logging

import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.log4j.{Level, Logger}

object MeetupStreamingTopics {
  def main(args: Array[String]) {
    Logger.getRootLogger.setLevel(Level.WARN)

    val sparkConf = new SparkConf().setAppName("Meetup Streaming Group Topics").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(20))
    val topics = Map("meetup-rsvps" -> 1)
    val stream = KafkaUtils.createStream(ssc, "localhost:2181", "rsvpsGroup", topics).map(_._2)
    
    val rsvpsOk = stream.filter(f => {
      val rsvp = parse(f)
      val JString(response) = rsvp \ "response"
      response.equals("yes")
    }).flatMap (m => {
      val rsvp = parse(m)
      try {
        val JArray(fields) = rsvp \ "group" \ "group_topics" \ "urlkey"
        fields.map { x => (compact(x), 1) }
      } catch {
        case me : MatchError =>
          val JString(fields) = rsvp \ "group" \ "group_topics" \ "urlkey"
          fields.map { x => (x, 1) }
      }
    })
    
    val topCounts10 = rsvpsOk.reduceByKeyAndWindow(_ + _, Seconds(20))
                     .map{case (topic, count) => (count, topic)}
                     .transform(_.sortByKey(false))
    
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular group topics in last 20 seconds (%s total):".format(rdd.count()))
      topList.foreach{case (count, tag) => println("%s (%s rsvps)".format(tag, count))}
    })
    ssc.start()
    ssc.awaitTermination()
  }
}
Interesting, isn't it? I'll try to write more posts about Spark asap!


2 comments: