Use MapReduce to join two datasets

The two datasets are:

To join the two tables above by “student id”, we need to use MultipleInputs. The code is:

Compile and run it:

And the result in /my is:

Use MapReduce to find prime numbers

Just want to write a small example of MapReduce of Hadoop for finding prime numbers. The first question is: how could I generate numbers from 1 to 1000000 by my own application instead of reading from file of HDFS? The answer is: inherit the InputSplit, RecordReader, and InputFormat by yourself, just like teragen program
Then comes the second question: could I just use mapper without reducer stage? The answer is yes, simply use job.setNumReduceTasks(0) to disable reducer stage.

The complete code is here (I know the algorithm for checking a number for prime is naive, but it works):

Copy the code to file CalcPrime.java, compile and run it:

Some problems and solutions when deploying and running Hadoop-2.7.2

1. If we see this error report:

The solution is here, the heap size of Java should not be bigger than map/reduce memory. The Cloudera recommends the head size prefer to be 0.8 of the map/reduce memory, such as:

2. The directory of “/tmp/” became full.

This is usually caused by spilled data from map output. This article introduced the whole overview of Map/Reduce algorithm in Hadoop with a detailed and clear picture.
As a result, my solution is adding this configuration:

into core-site.xml, so the inevitable spill data will be write into different disks for load balance.

3. Don’t use more than 0.8 of physical memory as “yarn.nodemanager.resource.memory-mb”, or it will cause unexpected fail for jobs.

4. If we launch too many map jobs or reduce jobs more than physical cores of servers, it may lead to tremendous timeouts for these jobs. Therefore, adjust the “mapreduce.map.memory.mb” and “mapreduce.reduce.memory.mb” carefully to limit the number of map/reduce jobs.

5. If you notice that all the CPU cores are full in Hadoop cluster, that does not mean we can’t do optimizations anymore. By using perf, I find out system waste too many times on launching and stopping java task (or containers):

hadoop

So I change the value of “mapreduce.input.fileinputformat.split.minsize” to 8GB for reducing the number of mappers. After decrease the number of mappers from thousands to hundreds, the running time of Terasort program drop down more than 50% (Also the Context Switch of system fall from ten thousands per second to thousands). Therefore, adjust the number of java tasks close to the number of physical CPU cores is a better solution.

From scala Array[String] / Seq[String] to java varargs

While testing performance of redis these days, I need to use mset() interface of jedis (a java version redis client). But the prototype of mset() in jedis is:

Firstly I write my scala code like:

But it report compiling errors:

After searching many documents about scala/java on google, I finally find the answer: http://docs.scala-lang.org/style/types.html. So, let’s write code this way:

Then Array[String] of scala changes to varargs in java now. It also viable for Seq[String].