Using Spark-SQL to transfer CSV file to Parquet

After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:

To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:

The build.sbt is

Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:

Importance of function’s return Type in Scala

The Scala code is below:

But the output of this section of code is:

Nothing, just a pair of parenthesis. What happen? Why doesn’t Scala use the ‘call’ function in subclass ‘Student’?
The answer is: we forget to define the return Type of function ‘call’, so its default return Type is ‘Unit’. The value of ‘Unit’ is only ‘()’. Hence no matter what value we give to ‘call’, it only return ‘()’.
We just to add return Type:

It print “robin” now.

Data Join in AWS Redshift

In “Amazon Redshift Database Developer Guide“, there is an explanation for data join:
“HASH JOIN and HASH are used when joining tables where the join columns are not both distribution keys and sort keys.
MERGE JOIN is used when joining tables where the join columns are both distribution keys and sort keys, and when less than 20 percent of the joining tables are unsorted.”

Let’s take ‘salary’ and ’employee’ for example.

Firstly, we EXPLAIN the join of ‘salary’ and ’employee’, and it shows “Hash Join”:



Then we create two new tables:

Currently, the join column is both distkey and sortkey. Hence EXPLAIN shows “Merge Join”:



Some tips about “Amazon Redshift Database Developer Guide”

Show diststyle of tables

Details about distribution styles: http://docs.aws.amazon.com/redshift/latest/dg/viewing-distribution-styles.html

How to COPY multiple files into Redshift from S3
http://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html

Could “Group” (or “Order”) by number, not column name

Change current environment in SQL Editor

Primary key and foreign key
Amazon Redshift does not enforce primary key and foreign key constraints, but the query optimizer uses them when it generates query plans. If you set primary keys and foreign keys, your application must maintain the validity of the keys. 


Distribution info in EXPLAIN
DS_DIST_NONE
No redistribution is required, because corresponding slices are collocated on the compute nodes. You will typically have only one DS_DIST_NONE step, the join between the fact table and one dimension table.
DS_DIST_ALL_NONE
No redistribution is required, because the inner join table used DISTSTYLE ALL. The entire table is located on every node.
DS_DIST_INNER
The inner table is redistributed.
DS_DIST_OUTER
The outer table is redistributed.
DS_BCAST_INNER
A copy of the entire inner table is broadcast to all the compute nodes.
DS_DIST_ALL_INNER
The entire inner table is redistributed to a single slice because the outer table uses DISTSTYLE ALL.
DS_DIST_BOTH
Both tables are redistributed.

Create Like

Interleaved skew

The value for interleaved_skew is a ratio that indicates the amount of skew. A value of 1 means there is no skew. If the skew is greater than 1.4, a VACUUM REINDEX will usually improve performance unless the skew is inherent in the underlying set.

About interleaved sort key: http://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data.html#t_Sorting_data-interleaved

Concurrent write
Concurrent write operations are supported in Amazon Redshift in a protective way, using write locks
on tables and the principle of serializable isolation. 


UNLOAD

Redshift UDF
In addition to the Python Standard Library, the following modules are part of the Amazon Redshift implementation:
* numpy 1.8.2 

* pandas 0.14.1 

* python-dateutil 2.2 

* pytz 2015.7 

* scipy 0.12.1 

* six 1.3.0 

* wsgiref 0.1.2

Data Join
* Nested Loop
: The least optimal join, a nested loop is used mainly for cross-joins (Cartesian products) and some 
inequality joins. 

* Hash Join and Hash 
Typically faster than a nested loop join, a hash join and hash are used for inner joins and left and
right outer joins. These operators are used when joining tables where the join columns are not both distribution keys and sort keys. The hash operator creates the hash table for the inner table in the join; the hash join operator reads the outer table, hashes the joining column, and finds matches in the inner hash table. 

* Merge Join 
Typically the fastest join, a merge join is used for inner joins and outer joins. The merge join is not used for full joins. This operator is used when joining tables where the join columns are both distribution keys and sort keys, and when less than 20 percent of the joining tables are unsorted. It reads two sorted tables in order and finds the matching rows. To view the percent of unsorted rows, query the SVV_TABLE_INFO (p. 786) system table. 


wlm_query_slot_count
You can temporarily override the amount of memory assigned to a query by setting the wlm_query_slot_count parameter to specify the number of slots allocated to the query. 
By default, WLM queues have a concurrency level of 5 


VARCHAR
A VARCHAR(12) column can contain 12 single-byte characters, 6 two-byte characters, 4 three- byte characters, or 3 four-byte characters. 


Tuple in Redshift SQL

SIMILAR TO

analyze_threshold_percent
To reduce processing time and improve overall system performance, Amazon Redshift skips analyzing a table if the percentage of rows that have changed since the last ANALYZE command run is lower
than the analyze threshold specified by the analyze_threshold_percent parameter. By default, analyze_threshold_percent is 10

COPY from DynamoDB
Setting READRATIO to 100 or higher will enable Amazon Redshift to consume the entirety of the DynamoDB table’s provisioned throughput, which will seriously degrade the performance of concurrent read operations against the same table during the COPY session. Write traffic will be unaffected.

Different databases in Redshift
After you have created the TICKIT database, you can connect to the new database from your SQL client. Use the same connection parameters as you used for your current connection, but change the database name to tickit.

Interleaved Sort Key
A maximum of eight columns can be specified for an interleaved sort key.

Concatenate in SQL

INSERT INTO from SELECT

Prepare and execute PLAN

Powerful ‘WITH’ for sub-query in SQL

UNLOAD with compression

VACCUM
VACUUM FULL salary TO 100 PERCENT;

‘OVER’ in SQL

Show and set current settings

Show blocks(1MB) allocated to each column in the ‘salary’ table

Slice and Col
slice: Node slice
col: Every table you create has three hidden columns appended to it: INSERT_XID, DELETE_XID, ROW_ID

In STV_SLICES , we can see relations between slice and node. Single node have two slices: 0 and 1

Common used tables for meta information
pg_table_def
svv_table_info