Using Spark-SQL to transfer CSV file to Parquet

After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:

To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:

The build.sbt is

Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:

Importance of function’s return Type in Scala

The Scala code is below:

But the output of this section of code is:

Nothing, just a pair of parenthesis. What happen? Why doesn’t Scala use the ‘call’ function in subclass ‘Student’?
The answer is: we forget to define the return Type of function ‘call’, so its default return Type is ‘Unit’. The value of ‘Unit’ is only ‘()’. Hence no matter what value we give to ‘call’, it only return ‘()’.
We just to add return Type:

It print “robin” now.

Data Join in AWS Redshift

In “Amazon Redshift Database Developer Guide“, there is an explanation for data join:
“HASH JOIN and HASH are used when joining tables where the join columns are not both distribution keys and sort keys.
MERGE JOIN is used when joining tables where the join columns are both distribution keys and sort keys, and when less than 20 percent of the joining tables are unsorted.”

Let’s take ‘salary’ and ’employee’ for example.

Firstly, we EXPLAIN the join of ‘salary’ and ’employee’, and it shows “Hash Join”:



Then we create two new tables:

Currently, the join column is both distkey and sortkey. Hence EXPLAIN shows “Merge Join”:



Some tips about “Amazon Redshift Database Developer Guide”

Show diststyle of tables

Details about distribution styles: http://docs.aws.amazon.com/redshift/latest/dg/viewing-distribution-styles.html

How to COPY multiple files into Redshift from S3
http://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html

Could “Group” (or “Order”) by number, not column name

Change current environment in SQL Editor

Primary key and foreign key
Amazon Redshift does not enforce primary key and foreign key constraints, but the query optimizer uses them when it generates query plans. If you set primary keys and foreign keys, your application must maintain the validity of the keys. 


Distribution info in EXPLAIN
DS_DIST_NONE
No redistribution is required, because corresponding slices are collocated on the compute nodes. You will typically have only one DS_DIST_NONE step, the join between the fact table and one dimension table.
DS_DIST_ALL_NONE
No redistribution is required, because the inner join table used DISTSTYLE ALL. The entire table is located on every node.
DS_DIST_INNER
The inner table is redistributed.
DS_DIST_OUTER
The outer table is redistributed.
DS_BCAST_INNER
A copy of the entire inner table is broadcast to all the compute nodes.
DS_DIST_ALL_INNER
The entire inner table is redistributed to a single slice because the outer table uses DISTSTYLE ALL.
DS_DIST_BOTH
Both tables are redistributed.

Create Like

Interleaved skew

The value for interleaved_skew is a ratio that indicates the amount of skew. A value of 1 means there is no skew. If the skew is greater than 1.4, a VACUUM REINDEX will usually improve performance unless the skew is inherent in the underlying set.

About interleaved sort key: http://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html#t_Sorting_data-interleaved

Concurrent write
Concurrent write operations are supported in Amazon Redshift in a protective way, using write locks
on tables and the principle of serializable isolation. 


UNLOAD

Redshift UDF
In addition to the Python Standard Library, the following modules are part of the Amazon Redshift implementation:
* numpy 1.8.2 

* pandas 0.14.1 

* python-dateutil 2.2 

* pytz 2015.7 

* scipy 0.12.1 

* six 1.3.0 

* wsgiref 0.1.2

Data Join
* Nested Loop
: The least optimal join, a nested loop is used mainly for cross-joins (Cartesian products) and some 
inequality joins. 

* Hash Join and Hash 
Typically faster than a nested loop join, a hash join and hash are used for inner joins and left and
right outer joins. These operators are used when joining tables where the join columns are not both distribution keys and sort keys. The hash operator creates the hash table for the inner table in the join; the hash join operator reads the outer table, hashes the joining column, and finds matches in the inner hash table. 

* Merge Join 
Typically the fastest join, a merge join is used for inner joins and outer joins. The merge join is not used for full joins. This operator is used when joining tables where the join columns are both distribution keys and sort keys, and when less than 20 percent of the joining tables are unsorted. It reads two sorted tables in order and finds the matching rows. To view the percent of unsorted rows, query the SVV_TABLE_INFO (p. 786) system table. 


wlm_query_slot_count
You can temporarily override the amount of memory assigned to a query by setting the wlm_query_slot_count parameter to specify the number of slots allocated to the query. 
By default, WLM queues have a concurrency level of 5 


VARCHAR
A VARCHAR(12) column can contain 12 single-byte characters, 6 two-byte characters, 4 three- byte characters, or 3 four-byte characters. 


Tuple in Redshift SQL

SIMILAR TO

analyze_threshold_percent
To reduce processing time and improve overall system performance, Amazon Redshift skips analyzing a table if the percentage of rows that have changed since the last ANALYZE command run is lower
than the analyze threshold specified by the analyze_threshold_percent parameter. By default, analyze_threshold_percent is 10

COPY from DynamoDB
Setting READRATIO to 100 or higher will enable Amazon Redshift to consume the entirety of the DynamoDB table’s provisioned throughput, which will seriously degrade the performance of concurrent read operations against the same table during the COPY session. Write traffic will be unaffected.

Different databases in Redshift
After you have created the TICKIT database, you can connect to the new database from your SQL client. Use the same connection parameters as you used for your current connection, but change the database name to tickit.

Interleaved Sort Key
A maximum of eight columns can be specified for an interleaved sort key.

Concatenate in SQL

INSERT INTO from SELECT

Prepare and execute PLAN

Powerful ‘WITH’ for sub-query in SQL

UNLOAD with compression

VACCUM
VACUUM FULL salary TO 100 PERCENT;

‘OVER’ in SQL

Show and set current settings

Show blocks(1MB) allocated to each column in the ‘salary’ table

Slice and Col
slice: Node slice
col: Every table you create has three hidden columns appended to it: INSERT_XID, DELETE_XID, ROW_ID

In STV_SLICES , we can see relations between slice and node. Single node have two slices: 0 and 1

Common used tables for meta information
pg_table_def
svv_table_info

Read paper “Large-Scale Machine Learning with Stochastic Gradient Descent”

Paper reference: Large-Scale Machine Learning with Stochastic Gradient Descent


GD

This GD(Gradient Descent), which is used for computing weight of NN (also used for other Machine Learning Algorithm). zi represents the example ‘i’, also as (xi, yi). After calculate all examples, we need to compute the average for all differentials by weight. Calculating all examples is a slow progress, so we can image GD is not adequate efficient.


SGD

Here comes the SGD, which use only one example to compute gradient. It is simpler, and more efficient.


k-mean

Using SGD in K-mean clustering algorithm seems counterintuitive for me at first glance. But after thinking about “Sample zi belongs to cluster of wk, then don’t wait for all samples, just update wk by zi“, it becomes conceivable.


ASGD

ASGD is suitable for distributed machine learning environment, since it could get averaged gradient from any example of data at any time (no order restrain).

Data Preprocessing in Tableau

In previous article, I created two tables in my Redshift Cluster. Now I wan’t to find out the relation between salary of every employee and their working age. Tableau is the best choice for visualizing data analysis (SAS is too expensive and has no trail-version for learning).
First, we connect to Redshift in Tableau, and double-click the “New Custom SQL”. In the popup window, type in our SQL to query first-year-salary of every employee:




Now we have the table “custom sql query”. Drag in table “salary”, and choose “inner join” for employee_id, start_date:



Click into the “Sheet 1”. Drag “salary” to “Rows”, “min_start_date” to “Columns”, and “employee_id” to “Color” in “Marks” panel.



Now we can see the “expensive employees” (who have the most high salary in the same first-year) on the top of the graph:



Instead of adding custom SQL in tableau datasource panel, we can also create view in Redshift, and let tableau show views in “Tables”.

Or using “WITH” clause



Example datasets for Amazon RedShift

Last year, I imported two datasets to Hive. Currently, I will load two these two datasets into Amazon RedShift instead.
After created a RedShift Cluster in my VPC, I couldn’t connect to it even with Elastic IP. Then I check the parameters of my VPC between AWS’s default VPC, and eventually saw the vital differences. First, set “Network ACL” in “VPC” of AWS:




Then, add rule in “Route table”, which let node to access Anywhere(0.0.0.0/0) through “Internet Gateway” (also created in “VPC” service):



Now I could connect to my RedShift cluster.

Create s3 bucket by AWS Cli:

Upload two csv files into bucekt:

Create tables in Redshift by using SQL-Bench:

Don’t put blank space or tab(‘\t’) before column name when creating table. or else Redshift will consider column name as
”     employee_id”
”     salary”

Load data from s3 to RedShift by COPY, the powerful tool for ETL in AWS.

We could see the success report like this:

There are “Warnings” but “successfully”, a little weird. But don’t worry, it’s ok for SQL-Bench.

Currently we could run this script which was wrote last year (But need to change ‘==’ to ‘=’ for compatible problem):

The result is



Enable audit log for AWS Redshift

When I was trying to enable the Audit Log for AWS Redshift, I chose to use a exists bucket in S3. But it report error:

"Cannot read ACLs of bucket redshift-robin. Please ensure that your IAM permissions are set up correctly."
"Service: AmazonRedshift; Status Code: 400; Error Code: InsufficientS3BucketPolicyFault ...."




According to this document, I need to change permission of bucket "redshift-robin". So I entered the AWS Console of S3, click bucket name of "redshift-robin" in left panel, and saw description of permissions:



Press "Add Bucket Policy", and in the pop-out-window, press "AWS Policy Generator". Here came the generator, which is easy to use for creating policy.
Add two policy for "redshift-robin":


The "902366379725" is the account-id of us-west-2 region (Oregon)

Click "Generate Policy", and copy the generated JSON to "Bucket Policy Editor":



Press "Save". Now, we could enable Audit Log of Redshift for bucket "redshift-robin":


Read paper “In-Datacenter Performance Analysis of a Tensor Processing Unit”

Paper reference: In-Datacenter Performance Analysis of a Tensor Processing Unit”

Application
Using floating point (16bit or 32bit) for NN (Neural Network) training, then a step called quantization transforms floating-point numbers into narrow integers–often just 8 bits–which are usually good enough for inference.
MLP(Multi-layer Perceptions), CNN(Convolutional Neural Netowrks), and RNN(Recurrent Neural Networks), these three types of NN represent 95% of NN inference workload in Google datacenter. Therefore, the TPU mainly focus on them.



As we can see, CNNs are usually dense-computing NN, which are better for TPU.

TPU has 25 times as many MACs (Multiply and Accumulate) and 3.5 times as much on-chip memory as the K80 GPU.

Architecture
The TPU was designed to be a coprocessor on the PCIe I/O bus, more like FPU(floating-poin unit) than it is to a GPU.



The parameters of NN model (weights) comes from off-chip memory (8G DDR3 DRAM) to Weight FIFO, and then flow into MMU(Matrix Multiply Unit). The request (sample need to be inference) comes from PCIe to Unified Buffer, and also flow into MMU finally.
Even the “Activation” and “Pooling” algorithm in CNN have been fixed into hardware.

The MMU contains 256×256 MACs that can perform 8-bit multiply-and-adds on signed or unsigned integers.


According to this Floor Plan, we can imaging that UB and MMU might cost most energy of TPU.

TPU instructions follow the CISC tradition and only has about a dozen instructions, include “Read_Host_Memory”, “Read_Weights”, “MatrixMultiply”, “Activate” etc. Recalling how many codes we need to write to implement a effective Activation function, then we could conceive the speed of using only one “Activate” instruction in TPU.
This paper said TPU is a type of Systolic Array. But what is Systolic Array? Here is the explain: A systolic array is a network of processors that rhythmically compute and pass data through the system.

Performance
There are lot of tables and diagrams which show the top-rate performance of TPU. Although the TPU is fast, it also depend on the computing-density of applications. The CNNs are most computing-dense NN, so it gains most speed(or TeraOps per second) from TPU:



In this paper, it didn’t explain why the GPU is slower than TPU in inference operation. The only sentence about this topic is in “8 Discussion”: “GPUs have traditionally been seen as high-throughput architectures that reply on high-bandwidth DRAM and thousands of threads to achieve their goals”. Actually, I think this is not a serious explain.
The interesting thing is, after Google publish this paper, the CEO of Nvidia – Jensen Huang, wrote a blog to gently appeal a fact: the state-of-the-art GPU (Tesla P40) can inference faster than TPU. The war between different giants of Deep learning is just beginning.


Using antlr3 to generate C++ code

Need to parse SQL query to C++ code in project, so I had to learn antlr these days.
Let’s write a small sample file “Calc.g” for antlr3:

Then add “antlr-3.5.2-complete.jar” (run “mvn package” on source code path of antlr3 will generate this jar) to CLASSPATH and run:

It will generate many code files: CalcLexer.[hpp/cpp], CalcParser.[hpp/cpp], Calc.tokens. Now we could compile all generated C++ codes:

(“ANTLR3_SRC_PATH” is where the antlr3 source code are
But this step lead compiler errors for g++:

The reason is ‘ID’ and ‘INT’ should be declare as “const CommonTokenType*”, rather than “CommonTokenType*”. The fix has already be commited to antlr3 and be contained in master branch of git tree. Therefore I checkout the master branch of antlr3 instead of “3.5.2” tag, re-package the jar of antlr3, re-generate the code for “Calc.g”, and the compiler errors disappeared.
The target executable file is “Test”, now we can use it to parse our “code”:

The result ’15’ is correct.