Using Spark-SQL to transfer CSV file to Parquet

After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:

To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:

The build.sbt is

Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:

Data Join in AWS Redshift

In “Amazon Redshift Database Developer Guide“, there is an explanation for data join:
“HASH JOIN and HASH are used when joining tables where the join columns are not both distribution keys and sort keys.
MERGE JOIN is used when joining tables where the join columns are both distribution keys and sort keys, and when less than 20 percent of the joining tables are unsorted.”

Let’s take ‘salary’ and ’employee’ for example.

Firstly, we EXPLAIN the join of ‘salary’ and ’employee’, and it shows “Hash Join”:



Then we create two new tables:

Currently, the join column is both distkey and sortkey. Hence EXPLAIN shows “Merge Join”:



Some tips about “Amazon Redshift Database Developer Guide”

Show diststyle of tables

Details about distribution styles: http://docs.aws.amazon.com/redshift/latest/dg/viewing-distribution-styles.html

How to COPY multiple files into Redshift from S3
http://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html

Could “Group” (or “Order”) by number, not column name

Change current environment in SQL Editor

Primary key and foreign key
Amazon Redshift does not enforce primary key and foreign key constraints, but the query optimizer uses them when it generates query plans. If you set primary keys and foreign keys, your application must maintain the validity of the keys. 


Distribution info in EXPLAIN
DS_DIST_NONE
No redistribution is required, because corresponding slices are collocated on the compute nodes. You will typically have only one DS_DIST_NONE step, the join between the fact table and one dimension table.
DS_DIST_ALL_NONE
No redistribution is required, because the inner join table used DISTSTYLE ALL. The entire table is located on every node.
DS_DIST_INNER
The inner table is redistributed.
DS_DIST_OUTER
The outer table is redistributed.
DS_BCAST_INNER
A copy of the entire inner table is broadcast to all the compute nodes.
DS_DIST_ALL_INNER
The entire inner table is redistributed to a single slice because the outer table uses DISTSTYLE ALL.
DS_DIST_BOTH
Both tables are redistributed.

Create Like

Interleaved skew

The value for interleaved_skew is a ratio that indicates the amount of skew. A value of 1 means there is no skew. If the skew is greater than 1.4, a VACUUM REINDEX will usually improve performance unless the skew is inherent in the underlying set.

About interleaved sort key: http://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html#t_Sorting_data-interleaved

Concurrent write
Concurrent write operations are supported in Amazon Redshift in a protective way, using write locks
on tables and the principle of serializable isolation. 


UNLOAD

Redshift UDF
In addition to the Python Standard Library, the following modules are part of the Amazon Redshift implementation:
* numpy 1.8.2 

* pandas 0.14.1 

* python-dateutil 2.2 

* pytz 2015.7 

* scipy 0.12.1 

* six 1.3.0 

* wsgiref 0.1.2

Data Join
* Nested Loop
: The least optimal join, a nested loop is used mainly for cross-joins (Cartesian products) and some 
inequality joins. 

* Hash Join and Hash 
Typically faster than a nested loop join, a hash join and hash are used for inner joins and left and
right outer joins. These operators are used when joining tables where the join columns are not both distribution keys and sort keys. The hash operator creates the hash table for the inner table in the join; the hash join operator reads the outer table, hashes the joining column, and finds matches in the inner hash table. 

* Merge Join 
Typically the fastest join, a merge join is used for inner joins and outer joins. The merge join is not used for full joins. This operator is used when joining tables where the join columns are both distribution keys and sort keys, and when less than 20 percent of the joining tables are unsorted. It reads two sorted tables in order and finds the matching rows. To view the percent of unsorted rows, query the SVV_TABLE_INFO (p. 786) system table. 


wlm_query_slot_count
You can temporarily override the amount of memory assigned to a query by setting the wlm_query_slot_count parameter to specify the number of slots allocated to the query. 
By default, WLM queues have a concurrency level of 5 


VARCHAR
A VARCHAR(12) column can contain 12 single-byte characters, 6 two-byte characters, 4 three- byte characters, or 3 four-byte characters. 


Tuple in Redshift SQL

SIMILAR TO

analyze_threshold_percent
To reduce processing time and improve overall system performance, Amazon Redshift skips analyzing a table if the percentage of rows that have changed since the last ANALYZE command run is lower
than the analyze threshold specified by the analyze_threshold_percent parameter. By default, analyze_threshold_percent is 10

COPY from DynamoDB
Setting READRATIO to 100 or higher will enable Amazon Redshift to consume the entirety of the DynamoDB table’s provisioned throughput, which will seriously degrade the performance of concurrent read operations against the same table during the COPY session. Write traffic will be unaffected.

Different databases in Redshift
After you have created the TICKIT database, you can connect to the new database from your SQL client. Use the same connection parameters as you used for your current connection, but change the database name to tickit.

Interleaved Sort Key
A maximum of eight columns can be specified for an interleaved sort key.

Concatenate in SQL

INSERT INTO from SELECT

Prepare and execute PLAN

Powerful ‘WITH’ for sub-query in SQL

UNLOAD with compression

VACCUM
VACUUM FULL salary TO 100 PERCENT;

‘OVER’ in SQL

Show and set current settings

Show blocks(1MB) allocated to each column in the ‘salary’ table

Slice and Col
slice: Node slice
col: Every table you create has three hidden columns appended to it: INSERT_XID, DELETE_XID, ROW_ID

In STV_SLICES , we can see relations between slice and node. Single node have two slices: 0 and 1

Common used tables for meta information
pg_table_def
svv_table_info

Data Preprocessing in Tableau

In previous article, I created two tables in my Redshift Cluster. Now I wan’t to find out the relation between salary of every employee and their working age. Tableau is the best choice for visualizing data analysis (SAS is too expensive and has no trail-version for learning).
First, we connect to Redshift in Tableau, and double-click the “New Custom SQL”. In the popup window, type in our SQL to query first-year-salary of every employee:




Now we have the table “custom sql query”. Drag in table “salary”, and choose “inner join” for employee_id, start_date:



Click into the “Sheet 1”. Drag “salary” to “Rows”, “min_start_date” to “Columns”, and “employee_id” to “Color” in “Marks” panel.



Now we can see the “expensive employees” (who have the most high salary in the same first-year) on the top of the graph:



Instead of adding custom SQL in tableau datasource panel, we can also create view in Redshift, and let tableau show views in “Tables”.

Or using “WITH” clause



Example datasets for Amazon RedShift

Last year, I imported two datasets to Hive. Currently, I will load two these two datasets into Amazon RedShift instead.
After created a RedShift Cluster in my VPC, I couldn’t connect to it even with Elastic IP. Then I check the parameters of my VPC between AWS’s default VPC, and eventually saw the vital differences. First, set “Network ACL” in “VPC” of AWS:




Then, add rule in “Route table”, which let node to access Anywhere(0.0.0.0/0) through “Internet Gateway” (also created in “VPC” service):



Now I could connect to my RedShift cluster.

Create s3 bucket by AWS Cli:

Upload two csv files into bucekt:

Create tables in Redshift by using SQL-Bench:

Don’t put blank space or tab(‘\t’) before column name when creating table. or else Redshift will consider column name as
”     employee_id”
”     salary”

Load data from s3 to RedShift by COPY, the powerful tool for ETL in AWS.

We could see the success report like this:

There are “Warnings” but “successfully”, a little weird. But don’t worry, it’s ok for SQL-Bench.

Currently we could run this script which was wrote last year (But need to change ‘==’ to ‘=’ for compatible problem):

The result is



Enable audit log for AWS Redshift

When I was trying to enable the Audit Log for AWS Redshift, I chose to use a exists bucket in S3. But it report error:

"Cannot read ACLs of bucket redshift-robin. Please ensure that your IAM permissions are set up correctly."
"Service: AmazonRedshift; Status Code: 400; Error Code: InsufficientS3BucketPolicyFault ...."




According to this document, I need to change permission of bucket "redshift-robin". So I entered the AWS Console of S3, click bucket name of "redshift-robin" in left panel, and saw description of permissions:



Press "Add Bucket Policy", and in the pop-out-window, press "AWS Policy Generator". Here came the generator, which is easy to use for creating policy.
Add two policy for "redshift-robin":


The "902366379725" is the account-id of us-west-2 region (Oregon)

Click "Generate Policy", and copy the generated JSON to "Bucket Policy Editor":



Press "Save". Now, we could enable Audit Log of Redshift for bucket "redshift-robin":


Read paper “iShuffle: Improving Hadoop Performance with Shuffle-on-Write”

Paper reference: iShuffle: Improving Hadoop Performance with Shuffle-on-Write

Background:
A job in Hadoop consists of three main stages: map, shuffle, reduce (Actually shuffle stage has been contained into reduce stage).


What is the problem?
Shuffle phase need to migrate large mount of data from nodes which running map job to those nodes which intend to run reduce job. This cause shuffle-latency which is usually significant. And the reason is:

  • Partitioning skew: Hadoop use hash algorithm to organize output data of map task, if too many keys have the same hash, it may cause unevenly size of partitions
  • Coupling of shuffle and reduce: data shuffling can’t be overlapped with map tasks


Solution: iShuffle

    • “Shuffler” collect intermediate data generated by every map task and predict size of respective partition
    • “Shuffler Manager” collect informations from “Shuffler” and decide the position of partitions


    • Shuffle-on-Write:While a map task writing a spill to local filesystem, it will (by modification of Hadoop code) also write spill to correspondent node where reduce task will be launched
    • Automated map out placement: iShuffle will decide the position for every partition by “map selectivity”, which is the ratio of map input size and map output size. After predicting the “map selectivity” and knowing the total input size of data, iShuffle could finally choose optimist node for every partition data


  • Flexible reduce scheduling: when a node request a reduce task, the Task Manager(after modification of Hadoop’s FIFO scheduler) will find the list of partitions reside on this node, and launch reduce task only for these partitions (Make sure reduce task will only read shuffled data from local filesystem which will reduce network-bandwidth at reduce stage)

In my opinion
Using prediction technology to proactively move map output to apt nodes, which avoid partition skew, is the most intelligent part of this paper. This tech could also be used to other intermediate data moving scenario like OLAP in Data-warehouse.
But, I also suspect that in real production, not too many organizations will use this “iShuffle” as they usually run multi-user applications in Hadoop system. When a lot of jobs running in one Hadoop cluster simultaneously, a low peak of CPU-usage made by long reduce latency of one job will be compensated by other computing-intensive jobs. Therefore to all users, none of hardware resources are wasted.

Build dataflow to get monthly top price of Land Trading in UK

The dataset is downloaded from UK government data web(The total data size is more than 3GB). And, I am using Apache Oozie to run Hive and Sqoop job periodically.

The Hive script “land_price.hql”:

We want Hive job to run on queue “root.default” in YARN (and other jobs in “root.mr”), so we set the “mapred.job.queue.name” to “root.default”.

Remember to use SUBSTR() in Hive to erase quote charactor “\”” when importing data from raw CSV file.

The “coordinator.xml” for Apache Oozie:

The “workflow.xml” for Apache Oozie:

We run two jobs parallelly here: Hive and TeraSort (TeraSort is not useful in real productive environment, but it could be a good substitute for real private job in my company).

The sqoop once report error “javax.xml.parsers.ParserConfigurationException: Feature ‘http://apache.org/xml/features/xinclude’ is not recognized”.
The solution is change file “/usr/lib/hadoop/bin/hadoop” like:

“job.properties” for Oozie:

Remember to set “oozie.use.system.libpath=true” therefore Oozie could run Hive and Sqoop job correctly.

The script to create MYSQL table:

After launch the Oozie coordinator, it will finally put consequent data into MYSQL table:


MYSQL

Looks the land price of “WOKINGHAM” in October 2015 is extremely expensive.

Some tips about using Apache Flume

Question1: Flume process report “Expected timestamp in the Flume event headers, but it was null”
Solution1: The flume process expect to receive events with timestamp, but the event doesn’t have. For sending normal text event to flume, we need to tell it to generate timestamp with every events by itself. Put below line into configuration:

Question2: HDFS Sink generates tremendous small files with high frequency even we have set “a1.sinks.k2.hdfs.rollInterval=600”
Solution2: We still need to set “rollCount” and “rollSize”, as Flume will roll file if any condition of “rollInterval”, “rollCOunt”, or “rollSize” been fulfilled.

Question3: Flume process exit and report “Exception in thread “SinkRunner-PollingRunner-DefaultSinkProcessor” java.lang.OutOfMemoryError: GC overhead limit exceeded”
Solution3: Simply add “JAVA_OPTS=”-Xms12g -Xmx12g” (My server has more than 16G physical memory) into “/usr/lib/flume-ng/bin/flume-ng”

—— My configuration file for Flume ——

The startup command for Cloudera Environment:

Use Oozie to run terasort

The better choice of “Action” for running terasort test case in Oozie is “Java Action” instead of “Mapreduce Action” because terasort need to run

first and then load ‘partitonFile’ by “TotalOrderPartitioner”. It’s not a simple Mapreduce job which need merely a few propertyies.

The directory of this”TerasortApp” which using “Java Action” of Oozie looks just like:

The core of this App is “workflow.xml”:

Note 1. In Cloudera environment, The Web UI will fail in the last step of creating sharelib for Oozie Service. To fix this problem:

Note 2. We can’t use property of ‘mapred.map.tasks’ to change the number of mappers in Terasort because it is actually decided by class ‘TotalOrderPartitioner’. Therefore I use ‘mapreduce.input.fileinputformat.split.minsize’ property to limit the number of mappers.