Pages

Wednesday 21 January 2015

Analyzing tweets from Flume in Kibana

Hadoop opens a world of possibilities enabling us to store and process large amounts of any type of data. More and more organizations are adopting and investing more in this technology because of the benefits that represent to have a competitive advantage for businesses across industries.
This advantage comes from the help of the actionable information gives to decision makers to make the right ones. For that, the data has to be accessible, I mean, turn the data into something that's consumable. Here is where visualization systems of this data are the key which enable to see analytical results, find relevance information, develop scenarios or even perform predictive analysis.

There are a lot of visualization systems offering different kind of tools and features but in this post, I'm going to talk about Kibana -in future posts I'll show others-. Kibana is an open source analytical software for doing some analysis with a responsive web design connected to ElasticSearch (a highly scalable open source search engine).  As of this writing, the stable release version is 3.1.2 but I'll work with a new beta version (Kibana 4 beta 3) to test it. :-)

The data to store in ElasticSearch will be tweets with keywords related with big data and they will be collected by means of Apache Flume as I did in a previous post.

NOTE: there is a bug (FLUME-2476) in the class "org.apache.flume.sink.elasticsearch.ContentBuilderUtil" that stores in the field "body" the string "org.elasticsearch.common.xcontent.XContentBuilder@31a5fdb9". It's solved in 1.6.0 Flume version but it's already unreleased (I have 1.5.2 version installed). To fix this, you have to replace this line (number 64) in the source code and build the jar:
builder.field(fieldName, tmp.string());
However, fixing this issue and executing the Flume agent I got another one: the field "body" is stored  as JSON containing all information about the tweet; absolutely useless. So, I decided to write a Flume Interceptor to add JSON fields as headers. Here is the code:

package org.apache.flume.interceptor;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

import static org.apache.flume.interceptor.JsonInterceptor.Constants.*;

public class JsonInterceptor implements Interceptor {

 private static final Logger logger = LoggerFactory
   .getLogger(JsonInterceptor.class);

 private final String prefix;
 private final boolean preserveExisting;
 private final Set<String> ignoreFields;

 /**
  * Only {@link JsonInterceptor.Builder} can build me
  */
 private JsonInterceptor(String prefix, boolean preserveExisting,
   Set<String> ignoreFields) {
  this.prefix = prefix;
  this.preserveExisting = preserveExisting;
  this.ignoreFields = ignoreFields;

 }

 @Override
 public void initialize() {
  // no-op
 }

 /**
  * Modifies events in-place.
  */
 @Override
 public Event intercept(Event event) {
  String body = new String(event.getBody());
  ObjectMapper objectMapper = new ObjectMapper();
  try {
   JsonNode rootNode = objectMapper.readTree(body);
   Iterator<Entry<String, JsonNode>> it = rootNode.fields();

   Map<String, String> headers = event.getHeaders();

   while (it.hasNext()) {
    Entry<String, JsonNode> field = it.next();
    if (!ignoreFields.contains(field.getKey())) {
     if (!preserveExisting
       || !headers.containsKey(field.getKey())) {
      headers.put(prefix + field.getKey(), field.getValue()
        .asText());
     }
    }
   }
  } catch (Exception e) {
   logger.info("Cannot parse body as json: " + body);
  }

  return event;
 }

 /**
  * Delegates to {@link #intercept(Event)} in a loop.
  * 
  * @param events
  * @return
  */
 @Override
 public List<Event> intercept(List<Event> events) {
  for (Event event : events) {
   intercept(event);
  }
  return events;
 }

 @Override
 public void close() {
  // no-op
 }

 /**
  * Builder which builds new instances of the JsonInterceptor.
  */
 public static class Builder implements Interceptor.Builder {

  private String prefix = PREFIX_DFLT;
  private boolean preserveExisting = PRESERVE_DFLT;
  private Set<String> ignoreFields = Sets.newHashSet(IGNOREFIELDS_DFLT
    .split("\\s+"));

  @Override
  public Interceptor build() {
   return new JsonInterceptor(prefix, preserveExisting, ignoreFields);
  }

  @Override
  public void configure(Context context) {
   prefix = context.getString(PREFIX, PREFIX_DFLT);
   preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
   ignoreFields = Sets.newHashSet(context.getString(IGNOREFIELDS,
     IGNOREFIELDS_DFLT).split("\\s+"));
  }

 }

 public static class Constants {
  public static String PREFIX = "prefix";
  public static String PREFIX_DFLT = "";

  public static String PRESERVE = "preserveExisting";
  public static boolean PRESERVE_DFLT = false;

  public static String IGNOREFIELDS = "ignoreFields";
  public static String IGNOREFIELDS_DFLT = "";
 }

}
This interceptor works well and is valid adding JSON fields as headers whatever the source is but, what if the field value is not a string and I want to use these other kind of values such as integer, long, boolean, date, etc. values in Kibana? With this I cannot solve the problem. Thus, it's time to think in another option: writing a custom serializer. Here it is:

package org.apache.flume.sink.elasticsearch;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.commons.lang.time.DateUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.FloatNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ShortNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.Sets;

import static org.apache.flume.sink.elasticsearch.ElasticSearchJsonSerializer.Constants.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

public class ElasticSearchJsonSerializer implements
  ElasticSearchEventSerializer {
 private static final Logger logger = LoggerFactory
   .getLogger(ElasticSearchJsonSerializer.class);
 
 public static final String[] datePatterns = new String[] { 
   "yyyy-MM-dd",
   "yyyy/MM/dd", 
            "yyyy MMM dd",
            "yyyy dd MMM",
            "dd/MM/yyyy",
            "dd-MM-yyyy",
            "dd MMM yyyy",
            "dd MMM yyyy",
            "dd-MM-yyyy HH:mm:ss",
            "MM dd, yyyy",
            "EEE MMM dd HH:mm:ss ZZZZZ yyyy",
   "EEE, dd MMM yyyy HH:mm:ss zzz",
   "EEEE, dd-MMM-yy HH:mm:ss zzz",
   "EEE MMM d HH:mm:ss yyyy" };

 private String prefix;
 private boolean jsonHeaders;
 private boolean jsonBody;
 private Set<String> ignoreFields;

 @Override
 public void configure(Context context) {
  prefix = context.getString(PREFIX, PREFIX_DFLT);
  jsonHeaders = context.getBoolean(JSONHEADERS, JSONHEADERS_DFLT);
  jsonBody = context.getBoolean(JSONBODY, JSONBODY_DFLT);
  ignoreFields = Sets.newHashSet(context.getString(IGNOREFIELDS,
    IGNOREFIELDS_DFLT).split("\\s+"));
 }

 @Override
 public void configure(ComponentConfiguration conf) {
  // NO-OP...
 }

 @Override
 public XContentBuilder getContentBuilder(Event event) throws IOException {
  Map<String, JsonNode> fields = new HashMap<String, JsonNode>();

  // fields from the body
  if (jsonBody) {
   fields.putAll(jsonToMap(event.getBody()));
  } else {
   fields.put(prefix + "body", new TextNode(new String(event.getBody())));
  }

  // fields from the headers
  Map<String, String> headers = event.getHeaders();
  for (String key : headers.keySet()) {
   if (jsonHeaders) {
    fields.putAll(jsonToMap(headers.get(key).getBytes(charset)));
   } else {
    fields.put(prefix + key, new TextNode(headers.get(key)));
   }
  }

  XContentBuilder builder = jsonBuilder().startObject();
  appendFields(builder, fields);

  return builder;
 }

 private Map<String, JsonNode> jsonToMap(byte[] content) {
  Map<String, JsonNode> fields = new HashMap<String, JsonNode>();
  ObjectMapper objectMapper = new ObjectMapper();
  try {
   JsonNode rootNode = objectMapper.readTree(new String(content));
   Iterator<Entry<String, JsonNode>> it = rootNode.fields();

   while (it.hasNext()) {
    Entry<String, JsonNode> field = it.next();
    if (!ignoreFields.contains(field.getKey())) {
     fields.put(prefix + field.getKey(), field.getValue());
    }
   }
  } catch (Exception e) {
   logger.info("Cannot parse content as json: " + new String(content));
  }
  return fields;
 }

 private void appendFields(XContentBuilder builder,
   Map<String, JsonNode> fields) throws IOException {
  for (Map.Entry<String, JsonNode> entry : fields.entrySet()) {
   if (entry.getValue() instanceof BooleanNode) {
    builder.field(entry.getKey(), entry.getValue().asBoolean());
   }else if (entry.getValue() instanceof FloatNode || entry.getValue() instanceof DoubleNode) {
    builder.field(entry.getKey(), entry.getValue().asDouble());
   }else if (entry.getValue() instanceof LongNode) {
    builder.field(entry.getKey(), entry.getValue().asLong());
   }else if (entry.getValue() instanceof IntNode || entry.getValue() instanceof ShortNode) {
    builder.field(entry.getKey(), entry.getValue().asInt());
   }else if (entry.getValue() instanceof BinaryNode) {
    builder.field(entry.getKey(), entry.getValue().asText().getBytes(charset));
   } else {
    //checking if it is a date
    try {
     builder.field(entry.getKey(), DateUtils.parseDate(entry.getValue().asText(), datePatterns));
    } catch (Exception e) {
     //we assume is String...
     builder.field(entry.getKey(), entry.getValue().asText());
    }
   }
  }
 }

 public static class Constants {
  public static String PREFIX = "prefix";
  public static String PREFIX_DFLT = "";

  public static String JSONHEADERS = "jsonHeaders";
  public static boolean JSONHEADERS_DFLT = true;

  public static String JSONBODY = "jsonBody";
  public static boolean JSONBODY_DFLT = true;

  public static String IGNOREFIELDS = "ignoreFields";
  public static String IGNOREFIELDS_DFLT = "";
 }
}
Got it! Now it works fine!

This is the Flume agent configuration I used:

BigDataTwitterAgent.sources = Twitter
BigDataTwitterAgent.channels = MemChannel
BigDataTwitterAgent.sinks = ES

BigDataTwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
BigDataTwitterAgent.sources.Twitter.channels = MemChannel
BigDataTwitterAgent.sources.Twitter.consumerKey = <your_consumer_key>
BigDataTwitterAgent.sources.Twitter.consumerSecret = <your_consumer_secret>
BigDataTwitterAgent.sources.Twitter.accessToken = <your_access_token>
BigDataTwitterAgent.sources.Twitter.accessTokenSecret = <your_access_token_secret>
BigDataTwitterAgent.sources.Twitter.keywords=hadoop, bigdata, datascience, mapreduce, cloudera, hortonworks, mapr

#BigDataTwitterAgent.sources.Twitter.interceptors = org.apache.flume.sink.elasticsearch.JsonInterceptor
#BigDataTwitterAgent.sources.Twitter.interceptors.JsonInterceptor.type = org.apache.flume.interceptor.JsonInterceptor$Builder
#BigDataTwitterAgent.sources.Twitter.interceptors.JsonInterceptor.prefix = demo-
#BigDataTwitterAgent.sources.Twitter.interceptors.JsonInterceptor.ignoreFields = in_reply_to_screen_name in_reply_to_status_id in_reply_to_status_id_str in_reply_to_user_id in_reply_to_user_id_str

BigDataTwitterAgent.channels.MemChannel.type = memory
BigDataTwitterAgent.channels.MemChannel.capacity = 1000
BigDataTwitterAgent.channels.MemChannel.transactionCapacity = 1000

BigDataTwitterAgent.sinks.ES.type = elasticsearch
BigDataTwitterAgent.sinks.ES.hostNames = localhost:9300
BigDataTwitterAgent.sinks.ES.indexName = twitter
BigDataTwitterAgent.sinks.ES.indexType = tweet
BigDataTwitterAgent.sinks.ES.clusterName = elasticsearch
BigDataTwitterAgent.sinks.ES.batchSize = 100
BigDataTwitterAgent.sinks.ES.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchJsonSerializer
BigDataTwitterAgent.sinks.ES.serializer.prefix = demo-
BigDataTwitterAgent.sinks.ES.serializer.ignoreFields = entities retweeted_status user in_reply_to_screen_name in_reply_to_status_id in_reply_to_status_id_str in_reply_to_user_id in_reply_to_user_id_str
BigDataTwitterAgent.sinks.ES.channel = MemChannel

Automatically, in ElasticSearch and index is created with this structure:

{"twitter":
    {"mappings":
        {"tweet":
            {"properties":
                {"demo-contributors":
                    {"type":"string"},
                 "demo-coordinates":
                    {"type":"string"},
                 "demo-created_at":
                    {"type":"date",
                     "format":"dateOptionalTime"},
                 "demo-favorite_count":
                    {"type":"long"},
                 "demo-favorited":
                    {"type":"boolean"},
                 "demo-filter_level":
                    {"type":"string"},
                 "demo-geo":
                    {"type":"string"},
                 "demo-id":
                    {"type":"long"},
                 "demo-id_str":
                    {"type":"string"},
                 "demo-lang":
                    {"type":"string"},
                 "demo-place":
                    {"type":"string"},
                 "demo-possibly_sensitive":
                    {"type":"boolean"},
                 "demo-retweet_count":
                    {"type":"long"},
                 "demo-retweeted":
                    {"type":"boolean"},
                 "demo-source":
                    {"type":"string"},
                 "demo-text":
                    {"type":"string"},
                 "demo-timestamp_ms":
                    {"type":"string"},
                 "demo-truncated":
                    {"type":"boolean"}}}},
     "settings":
        {"index":
            {"creation_date":"1422646633782",
             "uuid":"eX0diovjR7Gg1KGGbnCw_w",
             "number_of_replicas":"1",
             "number_of_shards":"5",
             "version":
                {"created":"1040299"}}}}}

Now it's "Kibana time"!!!






2 comments: