Pages

Tuesday, 23 December 2014

Top Sea Life observed!

One of my favorite hobbies is scuba diving. Diving frees my mind and makes me feel completely at ease: it's like being in another world.
All divers are used to having a logbook in which they write down their dives in order to remember them later or even certificate they dived all those times. Also, you can do this in Diveboard, a social network where divers share their dives. You can get this dataset (and many others) from Global Biodiversity Information Facility.
So, I decided to use it as a functional dataset in my next example with Hadoop.

In this post, we'll see how programming a MapReduce job to rank the species viewed reported in Diveboard using Apache Avro. Avro is a data serialization framework offering features such as rich data structures, a compact binary format and integration with many languages. It needs a schema to serialize and deserialize data and provides splittable files for HDFS and schema evolution.

As usual, Maven dependencies (just Avro included):
<dependency>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro</artifactId>
 <version>${avro.version}</version>
</dependency>
<dependency>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-mapred</artifactId>
 <version>${avro.version}</version>
 <classifier>hadoop2</classifier>
</dependency>
<dependency>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-maven-plugin</artifactId>
 <version>${avro.version}</version>
</dependency>
<dependency>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-compiler</artifactId>
 <version>${avro.version}</version>
</dependency>
<dependency>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-ipc</artifactId>
 <version>${avro.version}</version>
</dependency>

Also, for performing code generation you can include the Avro Maven plugin in your pom.xml :
<plugin>
 <groupId>org.apache.avro</groupId>
 <artifactId>avro-maven-plugin</artifactId>
 <version>${avro.version}</version>
 <executions>
  <execution>
   <phase>generate-sources</phase>
   <goals>
    <goal>schema</goal>
   </goals>
   <configuration>
    <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
    <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
    <testSourceDirectory>${project.basedir}/src/test/avro/</testSourceDirectory>
    <testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
    <fieldVisibility>PRIVATE</fieldVisibility>
    <includes>
     <include>**/*.avsc</include>
    </includes>
    <testIncludes>
     <testInclude>**/*.test</testInclude>
    </testIncludes>
   </configuration>
  </execution>
 </executions>
</plugin>

I want to get the most viewed species for each country. For this, in the Avro schema I'm going to group Diveboard records by country and scientific name (ResultRecord). Additionally, all related info about each record is represented in a list of type DiveBoardRecord.

{
    "type":"record",
    "name":"ResultRecord",
    "fields":[
  {
         "name":"counter",
         "type":"int",
         "order":"descending"
     },
  {
         "name":"scientificName",
         "type":"string"
     },
     {
         "name":"country",
         "type":"string"
     },
     { 
      "name":"dbRecordList",
      "type":{
       "type":"array",
       "items":{
        "type":"record",
        "name":"DiveBoardRecord",
        "fields":[
            {
                "name":"id",
                "type":"string",
                "order": "ignore"
            },
         {
             "name":"diverProfileUrl",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"diverName",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"eventDate",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"waterBody",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"scientificName",
             "type":"string"
         },
         {
             "name":"country",
             "type":"string"
         },
         {
             "name":"locality",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"verbatimLocality",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"latitude",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"longitude",
             "type":"string",
             "order": "ignore"
         },
         {
             "name":"url",
             "type":"string",
             "order": "ignore"
         }
        ]
    }
   }
  } 
    ]
}
In line 8, the value of "order" for the field "counter" is "descending". It'll sort records by this field (most viewed species at the beginning of the final output result).

Then, it's time to write de driver! (before, I generated the sources of the schema). The driver consists of two chained jobs:
  • The first one ("Sea Life Aggregator") does the parsing -the downloaded dataset is in CSV format so we can parse each line easily- in the map phase and afterwards, the reduce phase generates an intermediate output with the ResultRecord (including the list of all records and the counter of them).
  • The second one ("Sea Life Sorter") just sorts the previous intermediate result. It uses the original Mapper and Reducer classes which they'll sort the results in the shuffle and sort phase (remember the "counter" in the schema).
Once these jobs are configured, the ControlledJob encapsulates them with a dependency and... execute the job!
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

public class DiveBoardDriver extends Configured implements Tool {

 private static final Logger LOG = Logger.getLogger(DiveBoardDriver.class);

 public static class DiveBoardMapper
   extends
   Mapper<LongWritable, Text, AvroKey<DiveBoardRecord>, AvroValue<DiveBoardRecord>> {

  private DiveBoardRecord parse(String line) {
   String[] columns = line.split("\t");
   if (columns == null || columns.length != 49) {
    return null;
   }
   try {
    DiveBoardRecord record = new DiveBoardRecord();

    record.setId(columns[0]);
    record.setDiverProfileUrl(columns[6]);
    record.setDiverName(columns[14]);
    record.setEventDate(columns[18]);
    record.setWaterBody(columns[22]);
    record.setCountry(columns[23]);
    record.setLocality(columns[25]);
    record.setVerbatimLocality(columns[26]);
    record.setLatitude(columns[32]);
    record.setLongitude(columns[33]);
    record.setUrl(columns[39]);
    record.setScientificName(columns[40]);

    return record;
   } catch (Exception e) {
    return null;
   }
  }

  @Override
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
   DiveBoardRecord record = parse(value.toString());
   if (record != null) {
    context.write(new AvroKey<DiveBoardRecord>(record),
      new AvroValue<DiveBoardRecord>(record));
   }
  }
 }

 public static class DiveBoardReducer
   extends
   Reducer<AvroKey<DiveBoardRecord>, AvroValue<DiveBoardRecord>, AvroKey<ResultRecord>, NullWritable> {

  public void reduce(AvroKey<DiveBoardRecord> key,
    Iterable<AvroValue<DiveBoardRecord>> records, Context context)
    throws IOException, InterruptedException {
   List<DiveBoardRecord> dbRecordList = new ArrayList<DiveBoardRecord>();
   for (AvroValue<DiveBoardRecord> avroValue : records) {
    dbRecordList.add(avroValue.datum());
   }

   ResultRecord result = ResultRecord.newBuilder()
     .setCounter(dbRecordList.size())
     .setCountry(dbRecordList.get(0).getCountry())
     .setScientificName(dbRecordList.get(0).getScientificName())
     .setDbRecordList(dbRecordList).build();

   context.write(new AvroKey<ResultRecord>(result), NullWritable.get());
  }
 }

 public int run(String[] args) throws Exception {
  Configuration conf = this.getConf();
  Job job1 = Job.getInstance(conf, "Sea Life Aggregator");
  job1.setJarByClass(getClass());

  FileInputFormat.addInputPath(job1, new Path(args[0]));
  FileOutputFormat.setOutputPath(job1, new Path(args[1]));

  job1.setMapperClass(DiveBoardMapper.class);
  job1.setReducerClass(DiveBoardReducer.class);

  AvroJob.setMapOutputKeySchema(job1, DiveBoardRecord.SCHEMA$);
  AvroJob.setMapOutputValueSchema(job1, DiveBoardRecord.SCHEMA$);

  job1.setOutputFormatClass(AvroKeyOutputFormat.class);
  AvroJob.setOutputKeySchema(job1, ResultRecord.SCHEMA$);

  Configuration conf2 = new Configuration(conf);
  Job job2 = Job.getInstance(conf2, "Sea Life Sorter");
  job2.setJarByClass(getClass());

  FileInputFormat.addInputPath(job2, new Path(args[1]));
  FileOutputFormat.setOutputPath(job2, new Path(args[2]));

  job2.setInputFormatClass(AvroKeyInputFormat.class);
  job2.setMapperClass(Mapper.class);
  AvroJob.setInputKeySchema(job2, ResultRecord.getClassSchema());

  AvroJob.setMapOutputKeySchema(job2, ResultRecord.SCHEMA$);
  job2.setMapOutputValueClass(NullWritable.class);

  job2.setOutputFormatClass(AvroKeyOutputFormat.class);
  job2.setReducerClass(Reducer.class);
  AvroJob.setOutputKeySchema(job2, ResultRecord.getClassSchema());

  ControlledJob cJob1 = new ControlledJob(conf);
  cJob1.setJob(job1);

  ControlledJob cJob2 = new ControlledJob(job2, Arrays.asList(cJob1));

  JobControl jobCtrl = new JobControl("Top Sea Life Observed");
  jobCtrl.addJob(cJob1);
  jobCtrl.addJob(cJob2);

  LOG.info("Starting chained jobs");
  Thread thread = new Thread(jobCtrl);
  thread.setDaemon(true);
  thread.start();
  while (!jobCtrl.allFinished()) {
   Thread.sleep(500);
  }
  if (jobCtrl.getFailedJobList().size() > 0) {
   for (ControlledJob cj : jobCtrl.getFailedJobList()) {
    LOG.error("Error executing job: " + cj.getJobName());
   }
   return 0;
  } else {
   LOG.info("Jobs successfully completed");
   return 1;
  }
 }

 public static void main(String[] args) throws Exception {
  if (arguments.length != 3) {
   System.err.printf("Usage: %s [generic options] <input> <intermediate result> <output>\n",
       DiveBoardDriver.class.getSimpleName());
   ToolRunner.printGenericCommandUsage(System.err);
   System.exit(-1);
  }
  LOG.info("Running DiveBoardDriver program...");
  int status = ToolRunner.run(new DiveBoardDriver(), args);

  DatumReader<ResultRecord> datumReader = new SpecificDatumReader<ResultRecord>(
    ResultRecord.SCHEMA$);
  File[] outputFiles = new File(args[2])
    .listFiles(new FilenameFilter() {
     @Override
     public boolean accept(File dir, String name) {
      return name.toLowerCase().endsWith(".avro");
     }
    });

  for (File file : outputFiles) {
   DataFileReader<ResultRecord> fileReader = new DataFileReader<ResultRecord>(
     file, datumReader);
   for (ResultRecord resultRecord : fileReader) {
    System.out.println(resultRecord.getCounter() + "\t"
      + resultRecord.getScientificName() + "\t"
      + resultRecord.getCountry());
   }
  }
  System.exit(status);
 }
}


And the winner is... the "Pterois volitans" (aka Red Lionfish) in Egypt!!!

No comments:

Post a Comment