“database is locked” in Hue

After launching a long-time HiveQL in SQL Editors of Hue, a small exceptional tip appears under the editor “database is locked”. The solution is to make Hue use Mysql instead of sqlite3. But I am using Hue directly got from github, not Cloudera Release version. So the correct steps should be:

  1. Stop Hue server
  2. Install Mysql and create database ‘hue’
  3. Edit desktop/conf/pseudo-distributed.ini, add these in “[[database]]” section:
  4. Run “make apps” (This is the most important step, as it will install Mysql connector/packages automatically and create meta tables in ‘hue’ database)
  5. Start Hue server

Now we can run long-time query and there will be no error.

Hue

“java.io.Exception: failed to uncompress the chunk” in Apache Spark

After I run spark-submit in my YARN cluster with Spark-1.6.2:

The job fail, and the log report:

Somebody in the internet say may be this is caused by the compatibility problem between Spark-1.6.2 and Snappy. Therefore I add

to my spark-submit shell script to change compress algorithm from Snappy to lz4. And this time everything goes ok.

Finding the lost memory

We find out a strange phenomenon in a product server. By using “free” command, it shows there is no free memory in this server. But when we add all processes’s memory allocation:

it show all processes cost only 60GB memory (The whole physical memory of this server is 126GB).

Where is the lost memory?

Firstly we umount the tmpfs but it does not make any change. Then we use:

and soon notice that the “Slab” cost more than 10GB memory. Slab is a linux kernel component for managing memory. If the “Slab” is too high in “/proc/meminfo”, it means kernel may allocate too much resource for user-space program. But, what type of resource? Finally, by using the command:

it shows the all TCP connections’s Recv-Q cost more than 20GB memory. Now we uncover the root cause: too much TCP connections (more than 1 hundred thousand) been created. The solution could be:

  • Reduce the size of TCP Recv-Q
  • Modify user program to reduce the number of TCP connections

Terasort for Spark (part2 / 2)

In previous article, we used Spark to sort large dataset generated by Teragen. But it cost too much time than Hadoop Mapreduce framework, so we are going to optimize it.

By looking at the Spark UI for profiling, we find out the “Shuffle” read/write too much data from/to the hard-disk, this will surely hurt the performance severely.




In “Terasort” of Hadoop, it use “class TotalOrderPartition” to map all the data to a large mount of partitions by ordering, so every “Reduce” job only need to sort data in one task (almost don’t need any shuffle from other partition). This will save a lot of network bandwidth and CPU usage.

Therefore we could modify our Scala code to sort every partition locally:

and the spark-submit should also be changed:

This time, the job only cost 10 minutes for sorting data!

Screenshot from “Job Browser” of Hue:



Terasort for Spark (part1 / 2)

We could use Spark to sort all the data which is generated by Teragen of Hadoop.

TerasortApp.scala

build.sbt

After building the jar file, we could submit it to spark (I run my spark on yarn-cluster mode):

It costs 17 minutes to complete the task, but tool “terasort” from Hadoop only costs 8 minutes to sort all data. The reason is I haven’t use TotalOrderPartitioner so spark has to sort all the data between different partitions (also between different servers) which costs a lot of network resource and delay the progress.

Remember to use scala-2.10 to build app for Spark-1.6.x, otherwise spark will report error like:

Deploy Hive on Spark

The Mapreduce framework is too small for realtime analytic query, so we need to change engine of Hive from “mr” to “spark” (link):

1. set environment for spark:

2. copy configuration xml file for Hive:

and change these configuration items:

Notice: remember to replace all “${system:java.io.tmpdir}/${system:user.name}” in hive-site.xml to “/tmp/my/” (link)

Partitioning and Bucketing Hive table

In previous article, we use sample datasets to join two tables in Hive. To promote the performance of table join, we could also use Partition or Bucket. Let’s first create a parquet format table with partition and bucket:

Then import data into it:

But it reports error:

All the employees have only two genders: “M” and “F”. How could Hive report “too many dynamic partitions”?
To look for the fundamental cause, I use “explain” before my HQL, and finally noticed by this line:

Hive use “_col4” as partition column and it’s type is DATE! So the correct import HQL should put partition column at last:

We successfully import data by dynamic partitions.

Now we create new parquet format table “salary” (using buckets) and join two tables:

The join operation only cost 90 seconds, much smaller than previous 140 seconds without bucketing and partitioning.

Example datasets for learning Hive

I find two datasets: employee and salary for learning and practicing. After putting two files into HDFS, we just need to create tables:

Now we could analyze the data.

Find the oldest 10 employees.

Find all the employees joined the corporation in January 1990.

Find the top 10 employees earned the highest average salary. Notice we use ‘order by’ here because ‘sort by’ only produce local order in reducer.

Let’s find out whether this corporation has sex discrimination:

The result is:

Looks good 🙂

Use hive to join two datasets

In previous article, I write java code of MapReduce-Framework to join two datasets. Furthermore, I enhanced the code to sort by scores for every student. The complete join-and-sort code is here. It need more than 170 lines of java code to join two tables and sort it. But in product environment, we usually use Hive to do the same work.
By using the same sample datasets:

Now we could join them:

Just three lines of HQL (Hive Query Language), not 170 lines of java code.

These two tables are very small, thus we could use local mode to run Hive task:

Some problems about programming Mapreduce

1. After submitting job, the console report:

The reason is I forgot to setJarByClass():

2. When the job finished, I found the reducer haven’t run at all. The reason is I haven’t override the correct reduce() member function of Reducer so MapReduce Framework ignore it and didn’t report any notification or warning. To make sure we override the correct member function of parent class, we need to add annotation: