Read paper “iShuffle: Improving Hadoop Performance with Shuffle-on-Write”

Paper reference: iShuffle: Improving Hadoop Performance with Shuffle-on-Write

Background:
A job in Hadoop consists of three main stages: map, shuffle, reduce (Actually shuffle stage has been contained into reduce stage).


What is the problem?
Shuffle phase need to migrate large mount of data from nodes which running map job to those nodes which intend to run reduce job. This cause shuffle-latency which is usually significant. And the reason is:

  • Partitioning skew: Hadoop use hash algorithm to organize output data of map task, if too many keys have the same hash, it may cause unevenly size of partitions
  • Coupling of shuffle and reduce: data shuffling can’t be overlapped with map tasks


Solution: iShuffle

    • “Shuffler” collect intermediate data generated by every map task and predict size of respective partition
    • “Shuffler Manager” collect informations from “Shuffler” and decide the position of partitions


    • Shuffle-on-Write:While a map task writing a spill to local filesystem, it will (by modification of Hadoop code) also write spill to correspondent node where reduce task will be launched
    • Automated map out placement: iShuffle will decide the position for every partition by “map selectivity”, which is the ratio of map input size and map output size. After predicting the “map selectivity” and knowing the total input size of data, iShuffle could finally choose optimist node for every partition data


  • Flexible reduce scheduling: when a node request a reduce task, the Task Manager(after modification of Hadoop’s FIFO scheduler) will find the list of partitions reside on this node, and launch reduce task only for these partitions (Make sure reduce task will only read shuffled data from local filesystem which will reduce network-bandwidth at reduce stage)

In my opinion
Using prediction technology to proactively move map output to apt nodes, which avoid partition skew, is the most intelligent part of this paper. This tech could also be used to other intermediate data moving scenario like OLAP in Data-warehouse.
But, I also suspect that in real production, not too many organizations will use this “iShuffle” as they usually run multi-user applications in Hadoop system. When a lot of jobs running in one Hadoop cluster simultaneously, a low peak of CPU-usage made by long reduce latency of one job will be compensated by other computing-intensive jobs. Therefore to all users, none of hardware resources are wasted.

Install CDH(Cloudera Distribution Hadoop) by Cloudera Manager

These days I was trying to install Cloudera-5.8.3 on my centos-7 machines, and here are some steps for operation and tips for trouble shooting:

0. If you are not in USA, the speed of network for accessing Cloudera Repository of RPMS(or Parcels) is desperately slow, thus we need to move CM (Cloudera Manager) Repo and CDH Repo to local.

Create local CM Repo

Create local CDH Repo

1. Install Cloudera Manager (steps)

2. Start Cloudera Manager

But it report:

In centos-7, the solution is:

Also need to run “sudo ./cloudera-manager-installer.bin –skip_repo_package=1” to create “db.properties”.

3. Login to the Cloudera Manager(port: 7180) and follow the steps of Wizard to create a new cluster. (Choose the local repository for installation will bring favorable fast speed 🙂

Make sure the hostname of every node is correct. And by using “Host Inspector”, we can reveal many potential problems in these machines.

After tried many times to setup cluster, I found this error in logs of some nodes:

The solution is simple:

and restart Cloudera Manager Agent on these nodes.

I also confronted a problem that installation progress has hanged on this message:

There isn’t any process of “yum” running in the node, so why it still acquire installation lock? The answer is:

4. After many fails and retry, I eventually setup the Hadoop Ecosystem of CDH:


cloudera

When upgrading or downgrading a Cloudera Cluster, your may see this problem:




The solution is (if in ‘single user mode’):

and try it again.

When staring ResourceManager, it failed and report:

The reason of this error is: there is a Non-Cloudera version of zookeeper installed on the host. Remove it and reinstall zookeeper from CDH, the yarn-resource-manager will be launched successfully.

If meet “Deploy Client Configuration failed” when create new service, just add sudo nopassword to cloudera-scm user.

Using Pig to join two tables and sort it

Having two tables: salary and employee,we can use Pig to find the most high-salary employees:

The result is:

Terasort for Spark (part2 / 2)

In previous article, we used Spark to sort large dataset generated by Teragen. But it cost too much time than Hadoop Mapreduce framework, so we are going to optimize it.

By looking at the Spark UI for profiling, we find out the “Shuffle” read/write too much data from/to the hard-disk, this will surely hurt the performance severely.




In “Terasort” of Hadoop, it use “class TotalOrderPartition” to map all the data to a large mount of partitions by ordering, so every “Reduce” job only need to sort data in one task (almost don’t need any shuffle from other partition). This will save a lot of network bandwidth and CPU usage.

Therefore we could modify our Scala code to sort every partition locally:

and the spark-submit should also be changed:

This time, the job only cost 10 minutes for sorting data!

Screenshot from “Job Browser” of Hue:



Terasort for Spark (part1 / 2)

We could use Spark to sort all the data which is generated by Teragen of Hadoop.

TerasortApp.scala

build.sbt

After building the jar file, we could submit it to spark (I run my spark on yarn-cluster mode):

It costs 17 minutes to complete the task, but tool “terasort” from Hadoop only costs 8 minutes to sort all data. The reason is I haven’t use TotalOrderPartitioner so spark has to sort all the data between different partitions (also between different servers) which costs a lot of network resource and delay the progress.

Remember to use scala-2.10 to build app for Spark-1.6.x, otherwise spark will report error like:

Some problems about programming Mapreduce

1. After submitting job, the console report:

The reason is I forgot to setJarByClass():

2. When the job finished, I found the reducer haven’t run at all. The reason is I haven’t override the correct reduce() member function of Reducer so MapReduce Framework ignore it and didn’t report any notification or warning. To make sure we override the correct member function of parent class, we need to add annotation:

Use MapReduce to join two datasets

The two datasets are:

To join the two tables above by “student id”, we need to use MultipleInputs. The code is:

Compile and run it:

And the result in /my is:

Use MapReduce to find prime numbers

Just want to write a small example of MapReduce of Hadoop for finding prime numbers. The first question is: how could I generate numbers from 1 to 1000000 by my own application instead of reading from file of HDFS? The answer is: inherit the InputSplit, RecordReader, and InputFormat by yourself, just like teragen program
Then comes the second question: could I just use mapper without reducer stage? The answer is yes, simply use job.setNumReduceTasks(0) to disable reducer stage.

The complete code is here (I know the algorithm for checking a number for prime is naive, but it works):

Copy the code to file CalcPrime.java, compile and run it:

Some tips about Hive

Found some tips about Hive in my learning progress:

1. When I start “bin/hive” at first time, these errors report:

The solution is simple:

Actually, we’d better use mysql instead of derby for multi-users environment.

2. Control the number of mappers for SQL jobs. If a SQL job use too much mappers, the context-switch of processes (include frequent launch/stop for JVM) will cost extra CPU resource. We could use

to change the number of mappers for all the SQL jobs.

3. After I imported 1TB data into a “Orc format” table, the size of the table is just 250GB. But after I imported 1TB data into a “Parquet format” table, the size is 900GB. Looks Apache Orc has more effective compression algorithm for custom data.

4. Using partitions carefully.

Now we have a table named “users” and is partitioned by field “ca”.

Now, there is a record in HDFS directory “/user/hive/warehouse/users/ca=China/”
In the book <>, it said we could copy the data in a partition directory to AWS s3 and then set partition to it. But, what if I set the partition to a new empty HDFS directory? Let’s try:

Because the partition has been set to a empty directory, the select couldn’t find any records now. That is what “Schema on read” mean.

5. Debug.

This will print many debug information for finding causes such as:

C++/Java developers needed

I worked in Alibaba Group for more than 9 years. Recently I am working in Alimama, a sub-company of Alibaba Group and has been the biggest Advertisement Publishing Company in China. At present, we need C++/Java developers to build new back-end basic services for our new business.

[Job Description]

Role: C++/Java Developer for storage system or high performance computing

Location: Beijing

Your responsibilities:

1. Building and optimizing the distributed key-value storage system
2. Building and optimizing the distributed computing engine of Linear Regression algorithm
3. Building and maintaining the backend service for Advertisement Publishing System

Skins & experience required:

1. Familiar with storage system or hight performance computing system
2. Strong background about Redis/Rocksdb/Hadoop/Glusterfs
3. Very familiar with one of C/C++/Java/Scala language
4. More than 3 years experience about storage system or HPC as a developer
5. Passionate about new Technologies and wanting to continuously push the boundaries

Any one who is interesting in the job above could send email to my email: haodong@alibaba-inc.com