This advantage comes from the help of the actionable information gives to decision makers to make the right ones. For that, the data has to be accessible, I mean, turn the data into something that's consumable. Here is where visualization systems of this data are the key which enable to see analytical results, find relevance information, develop scenarios or even perform predictive analysis.
There are a lot of visualization systems offering different kind of tools and features but in this post, I'm going to talk about Kibana -in future posts I'll show others-. Kibana is an open source analytical software for doing some analysis with a responsive web design connected to ElasticSearch (a highly scalable open source search engine). As of this writing, the stable release version is 3.1.2 but I'll work with a new beta version (Kibana 4 beta 3) to test it. :-)
The data to store in ElasticSearch will be tweets with keywords related with big data and they will be collected by means of Apache Flume as I did in a previous post.
NOTE: there is a bug (FLUME-2476) in the class "org.apache.flume.sink.elasticsearch.ContentBuilderUtil" that stores in the field "body" the string "org.elasticsearch.common.xcontent.XContentBuilder@31a5fdb9". It's solved in 1.6.0 Flume version but it's already unreleased (I have 1.5.2 version installed). To fix this, you have to replace this line (number 64) in the source code and build the jar:
builder.field(fieldName, tmp.string());
However, fixing this issue and executing the Flume agent I got another one: the field "body" is stored as JSON containing all information about the tweet; absolutely useless. So, I decided to write a Flume Interceptor to add JSON fields as headers. Here is the code:
package org.apache.flume.interceptor;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import static org.apache.flume.interceptor.JsonInterceptor.Constants.*;
public class JsonInterceptor implements Interceptor {
private static final Logger logger = LoggerFactory
.getLogger(JsonInterceptor.class);
private final String prefix;
private final boolean preserveExisting;
private final Set<String> ignoreFields;
/**
* Only {@link JsonInterceptor.Builder} can build me
*/
private JsonInterceptor(String prefix, boolean preserveExisting,
Set<String> ignoreFields) {
this.prefix = prefix;
this.preserveExisting = preserveExisting;
this.ignoreFields = ignoreFields;
}
@Override
public void initialize() {
// no-op
}
/**
* Modifies events in-place.
*/
@Override
public Event intercept(Event event) {
String body = new String(event.getBody());
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode rootNode = objectMapper.readTree(body);
Iterator<Entry<String, JsonNode>> it = rootNode.fields();
Map<String, String> headers = event.getHeaders();
while (it.hasNext()) {
Entry<String, JsonNode> field = it.next();
if (!ignoreFields.contains(field.getKey())) {
if (!preserveExisting
|| !headers.containsKey(field.getKey())) {
headers.put(prefix + field.getKey(), field.getValue()
.asText());
}
}
}
} catch (Exception e) {
logger.info("Cannot parse body as json: " + body);
}
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
*
* @param events
* @return
*/
@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
// no-op
}
/**
* Builder which builds new instances of the JsonInterceptor.
*/
public static class Builder implements Interceptor.Builder {
private String prefix = PREFIX_DFLT;
private boolean preserveExisting = PRESERVE_DFLT;
private Set<String> ignoreFields = Sets.newHashSet(IGNOREFIELDS_DFLT
.split("\\s+"));
@Override
public Interceptor build() {
return new JsonInterceptor(prefix, preserveExisting, ignoreFields);
}
@Override
public void configure(Context context) {
prefix = context.getString(PREFIX, PREFIX_DFLT);
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
ignoreFields = Sets.newHashSet(context.getString(IGNOREFIELDS,
IGNOREFIELDS_DFLT).split("\\s+"));
}
}
public static class Constants {
public static String PREFIX = "prefix";
public static String PREFIX_DFLT = "";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
public static String IGNOREFIELDS = "ignoreFields";
public static String IGNOREFIELDS_DFLT = "";
}
}
This interceptor works well and is valid adding JSON fields as headers whatever the source is but, what if the field value is not a string and I want to use these other kind of values such as integer, long, boolean, date, etc. values in Kibana? With this I cannot solve the problem. Thus, it's time to think in another option: writing a custom serializer. Here it is:
package org.apache.flume.sink.elasticsearch;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.commons.lang.time.DateUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.ComponentConfiguration;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.BinaryNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.FloatNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ShortNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.Sets;
import static org.apache.flume.sink.elasticsearch.ElasticSearchJsonSerializer.Constants.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class ElasticSearchJsonSerializer implements
ElasticSearchEventSerializer {
private static final Logger logger = LoggerFactory
.getLogger(ElasticSearchJsonSerializer.class);
public static final String[] datePatterns = new String[] {
"yyyy-MM-dd",
"yyyy/MM/dd",
"yyyy MMM dd",
"yyyy dd MMM",
"dd/MM/yyyy",
"dd-MM-yyyy",
"dd MMM yyyy",
"dd MMM yyyy",
"dd-MM-yyyy HH:mm:ss",
"MM dd, yyyy",
"EEE MMM dd HH:mm:ss ZZZZZ yyyy",
"EEE, dd MMM yyyy HH:mm:ss zzz",
"EEEE, dd-MMM-yy HH:mm:ss zzz",
"EEE MMM d HH:mm:ss yyyy" };
private String prefix;
private boolean jsonHeaders;
private boolean jsonBody;
private Set<String> ignoreFields;
@Override
public void configure(Context context) {
prefix = context.getString(PREFIX, PREFIX_DFLT);
jsonHeaders = context.getBoolean(JSONHEADERS, JSONHEADERS_DFLT);
jsonBody = context.getBoolean(JSONBODY, JSONBODY_DFLT);
ignoreFields = Sets.newHashSet(context.getString(IGNOREFIELDS,
IGNOREFIELDS_DFLT).split("\\s+"));
}
@Override
public void configure(ComponentConfiguration conf) {
// NO-OP...
}
@Override
public XContentBuilder getContentBuilder(Event event) throws IOException {
Map<String, JsonNode> fields = new HashMap<String, JsonNode>();
// fields from the body
if (jsonBody) {
fields.putAll(jsonToMap(event.getBody()));
} else {
fields.put(prefix + "body", new TextNode(new String(event.getBody())));
}
// fields from the headers
Map<String, String> headers = event.getHeaders();
for (String key : headers.keySet()) {
if (jsonHeaders) {
fields.putAll(jsonToMap(headers.get(key).getBytes(charset)));
} else {
fields.put(prefix + key, new TextNode(headers.get(key)));
}
}
XContentBuilder builder = jsonBuilder().startObject();
appendFields(builder, fields);
return builder;
}
private Map<String, JsonNode> jsonToMap(byte[] content) {
Map<String, JsonNode> fields = new HashMap<String, JsonNode>();
ObjectMapper objectMapper = new ObjectMapper();
try {
JsonNode rootNode = objectMapper.readTree(new String(content));
Iterator<Entry<String, JsonNode>> it = rootNode.fields();
while (it.hasNext()) {
Entry<String, JsonNode> field = it.next();
if (!ignoreFields.contains(field.getKey())) {
fields.put(prefix + field.getKey(), field.getValue());
}
}
} catch (Exception e) {
logger.info("Cannot parse content as json: " + new String(content));
}
return fields;
}
private void appendFields(XContentBuilder builder,
Map<String, JsonNode> fields) throws IOException {
for (Map.Entry<String, JsonNode> entry : fields.entrySet()) {
if (entry.getValue() instanceof BooleanNode) {
builder.field(entry.getKey(), entry.getValue().asBoolean());
}else if (entry.getValue() instanceof FloatNode || entry.getValue() instanceof DoubleNode) {
builder.field(entry.getKey(), entry.getValue().asDouble());
}else if (entry.getValue() instanceof LongNode) {
builder.field(entry.getKey(), entry.getValue().asLong());
}else if (entry.getValue() instanceof IntNode || entry.getValue() instanceof ShortNode) {
builder.field(entry.getKey(), entry.getValue().asInt());
}else if (entry.getValue() instanceof BinaryNode) {
builder.field(entry.getKey(), entry.getValue().asText().getBytes(charset));
} else {
//checking if it is a date
try {
builder.field(entry.getKey(), DateUtils.parseDate(entry.getValue().asText(), datePatterns));
} catch (Exception e) {
//we assume is String...
builder.field(entry.getKey(), entry.getValue().asText());
}
}
}
}
public static class Constants {
public static String PREFIX = "prefix";
public static String PREFIX_DFLT = "";
public static String JSONHEADERS = "jsonHeaders";
public static boolean JSONHEADERS_DFLT = true;
public static String JSONBODY = "jsonBody";
public static boolean JSONBODY_DFLT = true;
public static String IGNOREFIELDS = "ignoreFields";
public static String IGNOREFIELDS_DFLT = "";
}
}
Got it! Now it works fine!This is the Flume agent configuration I used:
BigDataTwitterAgent.sources = Twitter
BigDataTwitterAgent.channels = MemChannel
BigDataTwitterAgent.sinks = ES
BigDataTwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
BigDataTwitterAgent.sources.Twitter.channels = MemChannel
BigDataTwitterAgent.sources.Twitter.consumerKey = <your_consumer_key>
BigDataTwitterAgent.sources.Twitter.consumerSecret = <your_consumer_secret>
BigDataTwitterAgent.sources.Twitter.accessToken = <your_access_token>
BigDataTwitterAgent.sources.Twitter.accessTokenSecret = <your_access_token_secret>
BigDataTwitterAgent.sources.Twitter.keywords=hadoop, bigdata, datascience, mapreduce, cloudera, hortonworks, mapr
#BigDataTwitterAgent.sources.Twitter.interceptors = org.apache.flume.sink.elasticsearch.JsonInterceptor
#BigDataTwitterAgent.sources.Twitter.interceptors.JsonInterceptor.type = org.apache.flume.interceptor.JsonInterceptor$Builder
#BigDataTwitterAgent.sources.Twitter.interceptors.JsonInterceptor.prefix = demo-
#BigDataTwitterAgent.sources.Twitter.interceptors.JsonInterceptor.ignoreFields = in_reply_to_screen_name in_reply_to_status_id in_reply_to_status_id_str in_reply_to_user_id in_reply_to_user_id_str
BigDataTwitterAgent.channels.MemChannel.type = memory
BigDataTwitterAgent.channels.MemChannel.capacity = 1000
BigDataTwitterAgent.channels.MemChannel.transactionCapacity = 1000
BigDataTwitterAgent.sinks.ES.type = elasticsearch
BigDataTwitterAgent.sinks.ES.hostNames = localhost:9300
BigDataTwitterAgent.sinks.ES.indexName = twitter
BigDataTwitterAgent.sinks.ES.indexType = tweet
BigDataTwitterAgent.sinks.ES.clusterName = elasticsearch
BigDataTwitterAgent.sinks.ES.batchSize = 100
BigDataTwitterAgent.sinks.ES.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchJsonSerializer
BigDataTwitterAgent.sinks.ES.serializer.prefix = demo-
BigDataTwitterAgent.sinks.ES.serializer.ignoreFields = entities retweeted_status user in_reply_to_screen_name in_reply_to_status_id in_reply_to_status_id_str in_reply_to_user_id in_reply_to_user_id_str
BigDataTwitterAgent.sinks.ES.channel = MemChannel
Automatically, in ElasticSearch and index is created with this structure:
{"twitter":
{"mappings":
{"tweet":
{"properties":
{"demo-contributors":
{"type":"string"},
"demo-coordinates":
{"type":"string"},
"demo-created_at":
{"type":"date",
"format":"dateOptionalTime"},
"demo-favorite_count":
{"type":"long"},
"demo-favorited":
{"type":"boolean"},
"demo-filter_level":
{"type":"string"},
"demo-geo":
{"type":"string"},
"demo-id":
{"type":"long"},
"demo-id_str":
{"type":"string"},
"demo-lang":
{"type":"string"},
"demo-place":
{"type":"string"},
"demo-possibly_sensitive":
{"type":"boolean"},
"demo-retweet_count":
{"type":"long"},
"demo-retweeted":
{"type":"boolean"},
"demo-source":
{"type":"string"},
"demo-text":
{"type":"string"},
"demo-timestamp_ms":
{"type":"string"},
"demo-truncated":
{"type":"boolean"}}}},
"settings":
{"index":
{"creation_date":"1422646633782",
"uuid":"eX0diovjR7Gg1KGGbnCw_w",
"number_of_replicas":"1",
"number_of_shards":"5",
"version":
{"created":"1040299"}}}}}
Now it's "Kibana time"!!!
Thanks for the nice blog. It was very useful for me. I'm happy I found this blog. Thank you for sharing with us,I too always learn something new from your post.
ReplyDeletewordpress web design
TÜL PERDE MODELLERİ
ReplyDeletesms onay
mobil ödeme bozdurma
Nft nasıl alınır
Ankara Evden Eve Nakliyat
Trafik Sigortasi
Dedektör
Web site kurmak
aşk kitapları