Pages

Monday, 12 January 2015

#JeSuisCharlie

Few days ago a horrible event happened: jihadists attacked the satirical weekly Charlie Hebdo to revenge the honor of the Prophet Muhammad due to some cartoons published about him.
I don't understand how people can carry out terrorist acts; killing innocent people just for a religious belief (or whatever). I don't know where we're going but it doesn't look very good.

Neither I want to start a discussion about this complex problem but it's a fact that when a thing like this happens, social networks start boiling and it's time to analyze what they're saying...

In this post I'll talk about collecting tweets via Apache Flume, storing them in HDFS and analyzing them integrating Apache Hive with R. Process steps are:
  1. Collecting tweets using Flume: I have to configure a Flume agent with the rights source, channel and sink. The Flume sink will be HDFS. Data will be stored in a specific directory in HDFS.
  2. Expose tweets through Hive server. I'll create a Hive table representing the stored tweets (in JSON format).
  3. Accessing tweets from R. I'll execute queries from R using RHive and process the results within R with the help of some R packages.
Starting with the first step, I define the Flume agent:

CharlieHebdoAgent.sources = Twitter
CharlieHebdoAgent.channels = MemChannel
CharlieHebdoAgent.sinks = HDFS

CharlieHebdoAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
CharlieHebdoAgent.sources.Twitter.channels = MemChannel
CharlieHebdoAgent.sources.Twitter.consumerKey = <your_consumer_key>
CharlieHebdoAgent.sources.Twitter.consumerSecret = <your_consumer_secret>
CharlieHebdoAgent.sources.Twitter.accessToken = <your_access_token>
CharlieHebdoAgent.sources.Twitter.accessTokenSecret = <your_access_token_secret>
CharlieHebdoAgent.sources.Twitter.keywords=charliehebdo

CharlieHebdoAgent.channels.MemChannel.type = memory
CharlieHebdoAgent.channels.MemChannel.capacity = 1000
CharlieHebdoAgent.channels.MemChannel.transactionCapacity = 1000

CharlieHebdoAgent.sinks.HDFS.channel = MemChannel
CharlieHebdoAgent.sinks.HDFS.type = hdfs
CharlieHebdoAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/flume/tweets/
CharlieHebdoAgent.sinks.HDFS.hdfs.fileType = DataStream
CharlieHebdoAgent.sinks.HDFS.hdfs.filePrefix = tweets-
CharlieHebdoAgent.sinks.HDFS.hdfs.fileSuffix = .json
CharlieHebdoAgent.sinks.HDFS.hdfs.batchSize = 1000
CharlieHebdoAgent.sinks.HDFS.hdfs.rollCount = 10000
CharlieHebdoAgent.sinks.HDFS.hdfs.writeFormat = Text
CharlieHebdoAgent.sinks.HDFS.hdfs.rollInterval = 3000
As you can see, the Twitter type is "com.cloudera.flume.source.TwitterSource" (developed by Cloudera). You can download it here. You'll get plain text files in HDFS with JSON content with the data itself after executing this command:

flume-ng agent --conf conf --conf-file ./Agents.conf --name CharlieHebdoAgent -Dflume.root.logger=INFO,console

Second step: creating a table in Hive representing tweet data:

CREATE EXTERNAL TABLE tweets (
 id BIGINT,
 created_at STRING,
 source STRING,
 favorited BOOLEAN,
 retweet_count INT,
 retweeted_status STRUCT<
  text:STRING,
  user:STRUCT<
   screen_name:STRING,
   name:STRING
   >>,
 entities STRUCT<
  urls:ARRAY<STRUCT<expanded_url:STRING>>,
  user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
  hashtags:ARRAY<STRUCT<text:STRING>>>,
 text STRING,
 user STRUCT<
  screen_name:STRING,
  name:STRING,
  friends_count:INT,
  followers_count:INT,
  statuses_count:INT,
  verified:BOOLEAN,
  utc_offset:INT,
  time_zone:STRING>,
 in_reply_to_screen_name STRING
)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/flume/tweets';

And the last one: by means of RHive and R, execute a query to retrieve the tweets. Afterwards, cleaning and prepare the text:

library(RHive)
Sys.setenv(RHIVE_FS_HOME = "<$RHIVE_HOME>")
Sys.setenv(HIVE_HOME = "<$HIVE_HOME>")
Sys.setenv(HADOOP_HOME = "<$HADOOP_HOME>")
rhive.init()
rhive.connect(host = "localhost", hiveServer2 = FALSE)
rhive.query("add jar <$HIVE_SERDE_LIB_DIRECTORY>/hive-serde-cdh-twitter-1.0.jar")

resultQuery <- rhive.query("select text from tweets")

cleanText <- function(txt) {
 txt = gsub("(RT|via)((?:\\b\\W*@\\w+)+)", " ", txt)
 txt = gsub("@\\w+", " ", txt)
 txt = gsub("[[:punct:]]", " ", txt)
 txt = gsub("[[:digit:]]", " ", txt)
 txt = gsub("http[[:alnum:]]*", " ", txt)
 txt = gsub("[ \t]{2,}", " ", txt)
 txt = gsub("^\\s+|\\s+$", " ", txt)
 txt = gsub("amp", " ", txt)
 txt = gsub("[^[:graph:]]", " ", txt)
 try.tolower = function(x) {
  y = NA
  try_error = tryCatch(tolower(x), error = function(e) e)
  if (!inherits(try_error, "error"))
   return (tolower(x))
 }
 txt = sapply(txt, try.tolower)
 txt = txt[txt != ""]
 names(txt) = NULL
 return (txt)
}

library(tm)
corpus <- Corpus(VectorSource(cleanText(resultQuery[,1])))
stopWords <- c(stopwords("english"), stopwords("spanish"), stopwords("french"), c("charliehebdo"))
corpus <- tm_map(corpus, removeWords, stopWords)
tdm <- TermDocumentMatrix(corpus, control = list(wordLengths = c(1,Inf)))
Moreover, if you want to represent this data in a plot, here are two examples (a wordcloud and a correlation of terms):
#WordCloud
library(wordcloud)
wordcloud(corpus, min.freq = 100, scale = c(6, 0.2), colors = brewer.pal(9, "Set1"), random.color = TRUE, random.order = FALSE)

#Associations
require(ggplot2)
assocs <- findAssocs(tdm, "hebdo", 0.4)
dfAssocs <- data.frame(corr = assocs[,1], terms = row.names(assocs))
ggplot(dfAssocs, aes( y = terms  ) ) + geom_point(aes(x = corr), data = dfAssocs) + xlab(paste0("Correlation with the term ", "\"", "hebdo", "\""))

And the results!!! #JeSuisCharlie

2 comments:

  1. Hi - Nice post. BTW the step rhive.query("add jar /path/xyz.jar") does not get executed at all, it somehow does not recognize the path, any suggestions/ideas? I am able to execute a sample query but not add a jar.

    ReplyDelete
  2. Hi Ravi,
    Nice to hear from you.
    It's possible that the problem is related with some misconfiguration. Have you checked the hive classpath? What kind of error do you get?

    ReplyDelete