Pages

Wednesday 13 September 2017

One File System to ingest them all (and in Kafka bind them)

I've recently written a post in my company's blog. It's about a Kafka connector I've implemented to ingest files to Kafka from "any" file system.

You can read the post here and also, you can find the source code on my GitHub account.

By the way, this connector has been registered at Confluent connectors page and Read the Docs hosts its documentation.

Enjoy!

Monday 16 March 2015

Custom Trident functions in Apache Storm

Assume we have a greenhouse (or a lot of them) in which we carry out different kind of activities. One of them is to take care of the greenhouse(s) and control how some external agents can affect our crops, for instance: temperature and humidity. Parameter values like these change frequently because of the weather so, we bought a huge number of sensors to monitor and measure them in all places over the greenhouse(s) in order to set them within a proper range that we know our crops will grow better.
These sensors are integrated in a platform and send continously signals with info they capture and due to the amount of sensors we have and info received, our system is unable to process them. We'd like to have a platform in which we could process these signals in (near) real-time, taking into account that our greenhouse will grow in the next few months and consequently, we'll have to buy more sensors. This is what this post is all about: an architecture in which we could process a lot of messages in real-time with good performance, reliability, scalability, etc.

In this sense, I'll talk about a distributed system for processing streams of data in real-time: Apache Storm. Storm was graduated to top-level project last September and is becoming a good reference in terms of stream processing systems. It's horizontal scalable, faul-tolerant, fail-fast and guarantees data processing (with different types of configurations). Also, it provides a master-slave architecture in which Nimbus (the master node) distributes the application across the worker nodes (supervisor nodes), monitors them, etc. and they communicate with each other through ZooKeeper.

For this post, I'll use Kafka (as I did in a previous post) acting as a feeder for Storm and Apache Thrift for data serialization between Kafka and Storm. Storm is a stateless processing framework so, in order to guarantee that messages are processed only once I'll use Trident (a high-level abstraction built on top of Storm), which provides stateful stream processing and a transactional Trident topology got from storm-kafka package. Also, I'll define two Trident functions: the first one for reasoning the temperature variaton with fuzzy logic through FuzzyLite (a fuzzy logic control library) and the other one, a simple tuple printer to stdout.

The Kafka producer is a class producing dummy data emulating signals from sensors with values such as sensorId, humidity and temperature. This data is encoded by Thrift (you can select the encoded protocol that you prefer):

import java.util.Properties;
import java.util.Random;

import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import schema.Signal;

public class KafkaProducer {

 public static class SignalEncoder implements Encoder<Signal> {
  private TSerializer serializer;
  public static enum Serializer{
   BINARY,
   COMPACT,
   JSON
  }

  public SignalEncoder(VerifiableProperties vp) {
   switch (Serializer.valueOf(vp.getString("serializer.protocol"))) {
   case BINARY:
    this.serializer = new TSerializer(new TBinaryProtocol.Factory());
    break;
   case COMPACT:
    this.serializer = new TSerializer(new TCompactProtocol.Factory());
    break;
   case JSON:
    this.serializer = new TSerializer(new TJSONProtocol.Factory());
    break;
   default:
    this.serializer = new TSerializer();
    break;
   }
  }

  @Override
  public byte[] toBytes(Signal obj) {
   try {
    return serializer.serialize(obj);
   } catch (TException e) {
    return null;
   }
  }
 }

 public static void main(String[] args) {
  Properties props = new Properties();
  props.put("metadata.broker.list", "localhost:9092");
  props.put("request.required.acks", "1");
  props.put("serializer.class", "KafkaProducer$SignalEncoder");
  props.put("serializer.protocol", KafkaProducer.SignalEncoder.Serializer.BINARY.name());

  ProducerConfig config = new ProducerConfig(props);
  Producer<String, Signal> producer = new Producer<>(config);
  Random random = new Random();
  final double maxHumidity = 100.0, maxTemperature = 40.0;
  for (int i = 0; i < 1000; i++) {
   Signal signal = new Signal();
   signal.setSensorId(random.nextInt(1000));
   signal.setHumidity(random.nextDouble() * maxHumidity);
   signal.setTemperature(random.nextDouble() * maxTemperature);
   KeyedMessage<String, Signal> data = new KeyedMessage<>("signals", signal);
   producer.send(data);
  }
  producer.close();
 }
}

The Thrift schema:

namespace java schema 

struct Signal {
    1: required i32 sensorId,
    2: required double humidity,
    3: required double temperature,
}

And the pom.xml

 <dependencies>
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.9.2</artifactId>
   <version>0.8.2.0</version>
   <exclusions>
    <exclusion>
     <groupId>com.sun.jmx</groupId>
     <artifactId>jmxri</artifactId>
    </exclusion>
    <exclusion>
     <groupId>com.sun.jdmk</groupId>
     <artifactId>jmxtools</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
  <dependency>
   <groupId>org.apache.thrift</groupId>
   <artifactId>libthrift</artifactId>
   <version>0.9.2</version>
  </dependency>
 </dependencies>

 <build>
  <resources>
   <resource>
    <directory>src/main/java-gen</directory>
   </resource>
  </resources>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.1</version>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.3.2</version>
    <executions>
     <execution>
      <goals>
       <goal>exec</goal>
      </goals>
     </execution>
    </executions>
    <configuration>
     <executable>java</executable>
     <includeProjectDependencies>true
     </includeProjectDependencies>
     <includePluginDependencies>false
     </includePluginDependencies>
     <classpathScope>compile</classpathScope>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.apache.thrift.tools</groupId>
    <artifactId>maven-thrift-plugin</artifactId>
    <version>0.1.11</version>
    <configuration>
     <outputDirectory>src/main/java-gen</outputDirectory>
    </configuration>
    <executions>
     <execution>
      <id>thrift-sources</id>
      <phase>generate-sources</phase>
      <goals>
       <goal>compile</goal>
      </goals>
      <configuration>
       <thriftSourceRoot>src/main/resources/</thriftSourceRoot>
      </configuration>
     </execution>
     <execution>
      <id>thrift-test-sources</id>
      <phase>generate-test-sources</phase>
      <goals>
       <goal>testCompile</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
  </plugins>
  <pluginManagement>
   <plugins>
    <!--Just to store Eclipse m2e settings only. It has no influence on the Maven build itself. -->
    <plugin>
     <groupId>org.eclipse.m2e</groupId>
     <artifactId>lifecycle-mapping</artifactId>
     <version>1.0.0</version>
     <configuration>
      <lifecycleMappingMetadata>
       <pluginExecutions>
        <pluginExecution>
         <pluginExecutionFilter>
          <groupId>
           org.apache.thrift.tools
          </groupId>
          <artifactId>
           maven-thrift-plugin
          </artifactId>
          <versionRange>
           [0.1.11,)
          </versionRange>
          <goals>
           <goal>compile</goal>
           <goal>testCompile</goal>
          </goals>
         </pluginExecutionFilter>
         <action>
          <ignore></ignore>
         </action>
        </pluginExecution>
       </pluginExecutions>
      </lifecycleMappingMetadata>
     </configuration>
    </plugin>
   </plugins>
  </pluginManagement>
 </build>

On the Storm side, the main class with the topology to test it in local mode or, if you like, to submit to your Storm cluster:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Arrays;

import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TJSONProtocol;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.kafka.ZkHosts;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.trident.TridentTopology;
import schema.Signal;

public class SensorTopology implements Serializable {
 private static final long serialVersionUID = 1L;
 
 public static final String DEFAULT_TOPOLOGY_NAME = "SensorTopology";

 public static class SignalScheme implements Scheme {
  private static final long serialVersionUID = 1L;
  
  private Deserializer protocol;
  private Fields fields;
  public static enum Deserializer{
   BINARY,
   COMPACT,
   JSON
  }

  public SignalScheme(Deserializer protocol) {
   this.protocol = protocol;
   List<String> _fields = new ArrayList<>();
   for (Signal._Fields elm : Signal._Fields.values()) {
    _fields.add(elm.getFieldName());
   }
   this.fields = new Fields(_fields);
  }

  @Override
  public List<Object> deserialize(byte[] ser) {
   Signal signal = new Signal();
   try {
    TDeserializer deserializer;
    switch (this.protocol) {
    case BINARY:
     deserializer = new TDeserializer(new TBinaryProtocol.Factory());
     break;
    case COMPACT:
     deserializer = new TDeserializer(new TCompactProtocol.Factory());
     break;
    case JSON:
     deserializer = new TDeserializer(new TJSONProtocol.Factory());
     break;
    default:
     deserializer = new TDeserializer();
     break;
    }
    deserializer.deserialize(signal, ser);
   } catch (TException e) {
    return null;
   }
   Values result = new Values();
   for (String field : this.fields) {
    result.add(signal.getFieldValue(Signal._Fields.findByName(field)));
   }
   return result;
  }

  @Override
  public Fields getOutputFields() {
   return fields;
  }
 }

 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
  if (args.length != 2 && args.length != 0) {
   System.err.println("USAGE (local mode): " + SensorTopology.class.getName());
   System.err.println("USAGE (submit topology): " + SensorTopology.class.getName() + " topologyName nimbusHost");
   System.exit(0);
  }
  
  ZkHosts zkHosts = new ZkHosts("localhost:2181");
  Scheme customScheme = new SignalScheme(SignalScheme.Deserializer.BINARY);
  
  TridentKafkaConfig tkConfig = new TridentKafkaConfig(zkHosts, "signals");
  tkConfig.scheme = new SchemeAsMultiScheme(customScheme);
  tkConfig.forceFromStart = false;
  
  List outputFields = customScheme.getOutputFields().toList();
  outputFields.add("temperatureVariation");
  
  TridentTopology topology = new TridentTopology();
  topology.newStream("tridentKafkaSpout", new TransactionalTridentKafkaSpout(tkConfig))
   .shuffle().each(customScheme.getOutputFields(), 
     new TridentFunctions.FuzzyLogicBolt(), new Fields("temperatureVariation"))
   .each(new Fields(outputFields), new TridentFunctions.PrinterBolt())
   .parallelismHint(2);
  
  Config config = new Config();
  config.setNumWorkers(2);
  config.setMaxTaskParallelism(2);
  if (args.length == 0) {
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, topology.build());
  } else {
         String topologyName = args[0];
            String nimbusHost = args[1];
            config.put(Config.NIMBUS_HOST, nimbusHost);
            config.put(Config.NIMBUS_THRIFT_PORT, 6627);
            config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(nimbusHost));
         StormSubmitter.submitTopology(topologyName, config, topology.build());
  }
 }
}

In case of the Trident functions, in one of them I've used FuzzyLite for fuzzy logic reasoning what would be the temperature variation to set in the greenhouse; it gets the values from the tuples (sensorId, humidity and temperature) and generates and output with the resulting temperature variation to apply).
Fuzzy logic is a good option when we deal with no specific values but they're in a range (I mean: hot, very hot, cold, very cold, etc.). There are a lot of use cases implementing this sort of reasoning in things like this.
FuzzyLite is an open source project and provides an API for fuzzy logic control in C++ and Java. Also, it has a functional Qt-based graphic interface to develop your variables and rules; I used it and it's nice:

The source code for these functions:

import backtype.storm.tuple.Values;

import com.fuzzylite.Engine;
import com.fuzzylite.defuzzifier.Centroid;
import com.fuzzylite.norm.s.Maximum;
import com.fuzzylite.norm.t.Minimum;
import com.fuzzylite.rule.Rule;
import com.fuzzylite.rule.RuleBlock;
import com.fuzzylite.term.Discrete;
import com.fuzzylite.term.Ramp;
import com.fuzzylite.term.Triangle;
import com.fuzzylite.variable.InputVariable;
import com.fuzzylite.variable.OutputVariable;

import storm.trident.operation.BaseFilter;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class TridentFunctions {

 public static class FuzzyLogicBolt extends BaseFunction {
  private static final long serialVersionUID = 1L;

  private Engine engine;

  public FuzzyLogicBolt() {
   this.engine = new Engine();
   engine.setName("FuzzyLogicBolt");

   InputVariable temperatureVar = new InputVariable();
   temperatureVar.setEnabled(true);
   temperatureVar.setName("temperature");
   temperatureVar.setRange(0.000, 40.000);
   temperatureVar.addTerm(Discrete.create("VERYLOW", 10.000, 1.000, 15.000, 0.000));
   temperatureVar.addTerm(new Triangle("LOW", 10.000, 15.000, 20.000));
   temperatureVar.addTerm(new Triangle("NORMAL", 18.000, 20.000, 22.000));
   temperatureVar.addTerm(new Triangle("HIGH", 20.000, 25.000, 30.000));
   temperatureVar.addTerm(new Ramp("VERYHIGH", 25.000, 30.000));
   engine.addInputVariable(temperatureVar);

   InputVariable humidityVar = new InputVariable();
   humidityVar.setEnabled(true);
   humidityVar.setName("humidity");
   humidityVar.setRange(0.000, 100.000);
   humidityVar.addTerm(Discrete.create("VERYLOW", 10.000, 1.000, 20.000, 0.000));
   humidityVar.addTerm(new Triangle("LOW", 10.000, 25.000, 40.000));
   humidityVar.addTerm(new Triangle("NORMAL", 30.000, 40.000, 50.000));
   humidityVar.addTerm(new Triangle("HIGH", 40.000, 55.000, 70.000));
   humidityVar.addTerm(new Ramp("VERYHIGH", 60.000, 70.000));
   engine.addInputVariable(humidityVar);

   OutputVariable temperatureVariationVar = new OutputVariable();
   temperatureVariationVar.setEnabled(true);
   temperatureVariationVar.setName("temperatureVariation");
   temperatureVariationVar.setRange(-15.000, 15.000);
   temperatureVariationVar.fuzzyOutput().setAccumulation(new Maximum());
   temperatureVariationVar.setDefuzzifier(new Centroid(200));
   temperatureVariationVar.setDefaultValue(Double.NaN);
   temperatureVariationVar.setLockOutputValueInRange(false);
   temperatureVariationVar.addTerm(new Triangle("STRONGDECREASE", -15.000, -10.000, -7.500));
   temperatureVariationVar.addTerm(new Triangle("NORMALDECREASE", -10.000, -5.000, -2.500));
   temperatureVariationVar.addTerm(new Triangle("SMALLDECREASE", -7.500, -2.500, 0.000));
   temperatureVariationVar.addTerm(new Triangle("MAINTAIN", -1.000, 0.000, 1.000));
   temperatureVariationVar.addTerm(new Triangle("SMALLINCREASE", 0.000, 2.500, 7.500));
   temperatureVariationVar.addTerm(new Triangle("NORMALINCREASE", 2.500, 5.000, 10.000));
   temperatureVariationVar.addTerm(new Triangle("STRONGINCREASE", 7.500, 10.000, 15.000));
   engine.addOutputVariable(temperatureVariationVar);

   RuleBlock ruleBlock = new RuleBlock();
   ruleBlock.setEnabled(true);
   ruleBlock.setName("Fuzzy Logic rules");
   ruleBlock.setConjunction(new Minimum());
   ruleBlock.setDisjunction(new Maximum());
   ruleBlock.setActivation(new Minimum());
   ruleBlock.addRule(Rule.parse("if temperature is VERYLOW and humidity is VERYLOW then temperatureVariation is NORMALINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYLOW and humidity is LOW then temperatureVariation is NORMALINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYLOW and humidity is NORMAL then temperatureVariation is STRONGINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYLOW and humidity is HIGH then temperatureVariation is STRONGINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYLOW and humidity is VERYHIGH then temperatureVariation is STRONGINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is LOW and humidity is VERYLOW then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is LOW and humidity is LOW then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is LOW and humidity is NORMAL then temperatureVariation is SMALLINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is LOW and humidity is HIGH then temperatureVariation is SMALLINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is LOW and humidity is VERYHIGH then temperatureVariation is NORMALINCREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is NORMAL and humidity is VERYLOW then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is NORMAL and humidity is LOW then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is NORMAL and humidity is NORMAL then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is NORMAL and humidity is HIGH then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is NORMAL and humidity is VERYHIGH then temperatureVariation is SMALLDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is HIGH and humidity is VERYLOW then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is HIGH and humidity is LOW then temperatureVariation is MAINTAIN", engine));
   ruleBlock.addRule(Rule.parse("if temperature is HIGH and humidity is NORMAL then temperatureVariation is SMALLDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is HIGH and humidity is HIGH then temperatureVariation is SMALLDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is HIGH and humidity is VERYHIGH then temperatureVariation is NORMALDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYHIGH and humidity is VERYLOW then temperatureVariation is SMALLDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYHIGH and humidity is LOW then temperatureVariation is NORMALDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYHIGH and humidity is NORMAL then temperatureVariation is NORMALDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYHIGH and humidity is HIGH then temperatureVariation is STRONGDECREASE", engine));
   ruleBlock.addRule(Rule.parse("if temperature is VERYHIGH and humidity is VERYHIGH then temperatureVariation is STRONGDECREASE", engine));
   engine.addRuleBlock(ruleBlock);
  }

  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
   engine.restart();
   engine.setInputValue("humidity", tuple.getDoubleByField("humidity"));
   engine.setInputValue("temperature", tuple.getDoubleByField("temperature"));
   engine.process();

   collector.emit(new Values(engine.getOutputValue("temperatureVariation")));
  }
 }
 
 public static class PrinterBolt extends BaseFilter {
  private static final long serialVersionUID = 1L;

  @Override
  public boolean isKeep(TridentTuple tuple) {
   System.out.println(tuple);
   return true;
  }
 }
}
NOTE: You'll have to "serializer" FuzzyLite classes implementing Serializable interface.

The pom configuration:

 <dependencies>
  <dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.9.2</artifactId>
   <version>0.8.2.0</version>
   <exclusions>
    <exclusion>
     <groupId>com.sun.jmx</groupId>
     <artifactId>jmxri</artifactId>
    </exclusion>
    <exclusion>
     <groupId>com.sun.jdmk</groupId>
     <artifactId>jmxtools</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-kafka</artifactId>
   <version>0.9.3</version>
  </dependency>
  <dependency>
   <groupId>org.apache.storm</groupId>
   <artifactId>storm-core</artifactId>
   <version>0.9.3</version>
  </dependency>
  <dependency>
   <groupId>org.apache.thrift</groupId>
   <artifactId>libthrift</artifactId>
   <version>0.9.2</version>
  </dependency>
  <dependency>
   <groupId>com.fuzzylite</groupId>
   <artifactId>jfuzzylite</artifactId>
   <version>1.0</version>
  </dependency>
 </dependencies>
 <build>
  <resources>
   <resource>
    <directory>src/main/java-gen</directory>
   </resource>
  </resources>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.1</version>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.3</version>
    <executions>
     <execution>
      <phase>package</phase>
      <goals>
       <goal>shade</goal>
      </goals>
      <configuration>
       <artifactSet>
        <excludes>
         <exclude>org.apache.storm:storm-core</exclude>
         <exclude>org.slf4j:slf4j-api</exclude>
         <exclude>org.slf4j:log4j-over-slf4j</exclude>
         <exclude>org.slf4j:slf4j-log4j12</exclude>
         <exclude>log4j:log4j</exclude>
        </excludes>
       </artifactSet>
      </configuration>
     </execution>
    </executions>
   </plugin>
   <plugin>
    <groupId>org.apache.thrift.tools</groupId>
    <artifactId>maven-thrift-plugin</artifactId>
    <version>0.1.11</version>
    <configuration>
     <outputDirectory>src/main/java-gen</outputDirectory>
    </configuration>
    <executions>
     <execution>
      <id>thrift-sources</id>
      <phase>generate-sources</phase>
      <goals>
       <goal>compile</goal>
      </goals>
      <configuration>
       <thriftSourceRoot>src/main/resources/</thriftSourceRoot>
      </configuration>
     </execution>
     <execution>
      <id>thrift-test-sources</id>
      <phase>generate-test-sources</phase>
      <goals>
       <goal>testCompile</goal>
      </goals>
     </execution>
    </executions>
   </plugin>
   <plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.3.2</version>
   </plugin>
  </plugins>
  <pluginManagement>
   <plugins>
    <!--Just to store Eclipse m2e settings only. It has no influence on the Maven build itself. -->
    <plugin>
     <groupId>org.eclipse.m2e</groupId>
     <artifactId>lifecycle-mapping</artifactId>
     <version>1.0.0</version>
     <configuration>
      <lifecycleMappingMetadata>
       <pluginExecutions>
        <pluginExecution>
         <pluginExecutionFilter>
          <groupId>
           org.apache.thrift.tools
          </groupId>
          <artifactId>
           maven-thrift-plugin
          </artifactId>
          <versionRange>
           [0.1.11,)
          </versionRange>
          <goals>
           <goal>compile</goal>
           <goal>testCompile</goal>
          </goals>
         </pluginExecutionFilter>
         <action>
          <ignore></ignore>
         </action>
        </pluginExecution>
       </pluginExecutions>
      </lifecycleMappingMetadata>
     </configuration>
    </plugin>
   </plugins>
  </pluginManagement>
 </build>
NOTE: Remember to exclude this highlighted jars when packaging in order to avoid issues when submitting the topology.

Starting our system and submit the topology...

#Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties

#Kafka
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic signals

#Storm
bin/storm nimbus
bin/storm supervisor
bin/storm ui

#Submit jar to Storm
storm jar /path/to/my/jar.jar SensorTopology localhost

Now we're good to go, so... let's harvest!!! ;-)