In previous article, we use sample datasets to join two tables in Hive. To promote the performance of table join, we could also use Partition or Bucket. Let’s first create a parquet format table with partition and bucket:

CREATE TABLE employee_p (
    employee_id INT,
    birthday DATE,
    first_name STRING,
    family_name STRING,
    work_day DATE)
PARTITIONED BY (gender CHAR(1))
CLUSTERED BY (employee_id) INTO 8 BUCKETS
STORED AS PARQUET;

Then import data into it:

SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE employee_p PARTITION(gender)
                SELECT employee_id, birthday, first_name, family_name, gender, work_day
                  FROM employee;

But it reports error:

Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"499795
","_col1":"1961-07-05","_col2":"Idoia","_col3":"Riefers","_col4":"M","_col5":"1986-01-16"}}
        at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:257)
        at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"499795","_col1":"1961-07-05","
_col2":"Idoia","_col3":"Riefers","_col4":"M","_col5":"1986-01-16"}}
        at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:245)
        ... 7 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveFatalException: [Error 20004]: Fatal error occurred when node tried to create too many dynamic partitions. The maximu
m number of dynamic partitions is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. Maximum was set to: 100
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:922)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:699)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:97)
        at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:236)
        ... 7 more

All the employees have only two genders: “M” and “F”. How could Hive report “too many dynamic partitions”?
To look for the fundamental cause, I use “explain” before my HQL, and finally noticed by this line:

expressions: UDFToInteger(VALUE._col0) (type: int), CAST( VALUE._col1 AS DATE) (type: date), VALUE._col2 (type: string), VALUE._col3 (type: string), CAST( VALUE._col4 AS DATE) (type: date), VALUE._col5 (type: string)

Hive use “_col4” as partition column and it’s type is DATE! So the correct import HQL should put partition column at last:

INSERT OVERWRITE TABLE employee_p PARTITION(gender)
                SELECT employee_id, birthday, first_name, family_name, work_day, gender
                  FROM employee;

We successfully import data by dynamic partitions.
Now we create new parquet format table “salary” (using buckets) and join two tables:

CREATE TABLE salary_p (
    employee_id INT,
    salary INT,
    start_date DATE,
    end_date DATE)
CLUSTERED BY (employee_id) INTO 8 BUCKETS
STORED AS PARQUET;
INSERT OVERWRITE TABLE salary_p SELECT * FROM salary;
/* Join two tables */
SELECT  e.gender, AVG(s.salary) AS avg_salary
  FROM  employee_p AS e
        JOIN  salary_p as s
        ON (e.employee_id == s.employee_id)
  GROUP BY e.gender;

The join operation only cost 90 seconds, much smaller than previous 140 seconds without bucketing and partitioning.