Using Spark-SQL to transfer CSV file to Parquet

After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:

To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:

The build.sbt is

Always remember to add dependency for “spark-sql” or else it will report “createDataFrame() if not a member of spark”.
And finally, the submit script is:

Books I read in year 2016

Here comes the last day of 2016 year. And it is also the time for me to review my harvest about knowledge, or books.

Frankly speaking, the book “All hard thing about hard things” literally frighten me, and cause me to give up any idea about joining a startup company in China. Maybe this is the best consequence, for many startup companies failed in this end of year and I fortunately avoid this tempest.

Diving more deeper into the ocean of “Hadoop Ecosystem”, or “Big Data”, I find out Spark is really a convenient and powerful framework (compare to MapReduce) which could implement complicated algorithm or data-flow with a few lines of code. Surely, Scala is also a key element for Spark’s efficiency and concision.

Today, even normal person could imagine a sci-fi story about how modern people will fight with Alien invaders. But, what will happen if Aliens attacked the earth in the ancient time? What about Medieval age? Then comes the funny and bold sci-fi novel “The High Crusade”. A group of Medieval army defeat the invader of Alien, and did even more: occupied a frontline planet of a gigantic Alien Empire. It is really out of my imagination 🙂

Problem about running Hive-2.0.1 on Spark-1.6.2

When I launched Hive-2.0.1 on Spark-1.6.2, it report errors:

After changed “spark.master” from “yarn-cluster” to “local” and add “–hiveconf hive.root.logger=DEBUG,console” to hive command, it printed out details like:

This article suggest replacing fasterxml.jackson package with newer version, but the problem remained the same even after I completed the replacement.
Then I found the [HIVE-13301] in JIRA:


This explains everything clearly: Hive was using jackson-databind-2.1.1 in calcite package instead of lib/jackson-databind-2.4.2.jar, therefore updating it has no effect.
Thus, we should remove shaded jackson-databind-2.1.1 in calcite-avatica-1.5.0.jar:

The Hive uses lib/jackson-databind-2.4.2.jar and runs correctly now.

Using Linear Regression to filter spam message of SMS on Spark

By using the sample from “SMS Spam Collection v. 1“, I write a simple program on Spark to classify normal and spam message.

and the “build.sbt” file contains:

After submit the job to YARN:

We could retrieve the log of job by:

And the result is:

From now on, we can consider the message with negative value as normal and positive value as spam (Or use 10 instead of 0 as boundary).
This is just a example, for the dataset of sample is too small and it could only filter obvious spam message. To identify more spam messages, we need to add more features like ‘the topics of every message’, ‘total number of words’, ‘the frequency of special words’ etc.

Why my Spark job hangs?

After running my small application for Spark of Machine Learning , the job hangs and the Spark UI for it display nothing for more than 5 minutes.
That is weird and I see some logs in UI of yarn:

I haven’t any IP looks like “110.75.x.x”. Why is the Spark job trying to connect it ?
After reviewing the code carefully, I find out the problem:

It is me who forget to add IP to URI of HDFS. Thus, the correct code should be:

Now the application runs correctly.

“java.io.Exception: failed to uncompress the chunk” in Apache Spark

After I run spark-submit in my YARN cluster with Spark-1.6.2:

The job fail, and the log report:

Somebody in the internet say may be this is caused by the compatibility problem between Spark-1.6.2 and Snappy. Therefore I add

to my spark-submit shell script to change compress algorithm from Snappy to lz4. And this time everything goes ok.

Terasort for Spark (part2 / 2)

In previous article, we used Spark to sort large dataset generated by Teragen. But it cost too much time than Hadoop Mapreduce framework, so we are going to optimize it.

By looking at the Spark UI for profiling, we find out the “Shuffle” read/write too much data from/to the hard-disk, this will surely hurt the performance severely.




In “Terasort” of Hadoop, it use “class TotalOrderPartition” to map all the data to a large mount of partitions by ordering, so every “Reduce” job only need to sort data in one task (almost don’t need any shuffle from other partition). This will save a lot of network bandwidth and CPU usage.

Therefore we could modify our Scala code to sort every partition locally:

and the spark-submit should also be changed:

This time, the job only cost 10 minutes for sorting data!

Screenshot from “Job Browser” of Hue:



Terasort for Spark (part1 / 2)

We could use Spark to sort all the data which is generated by Teragen of Hadoop.

TerasortApp.scala

build.sbt

After building the jar file, we could submit it to spark (I run my spark on yarn-cluster mode):

It costs 17 minutes to complete the task, but tool “terasort” from Hadoop only costs 8 minutes to sort all data. The reason is I haven’t use TotalOrderPartitioner so spark has to sort all the data between different partitions (also between different servers) which costs a lot of network resource and delay the progress.

Remember to use scala-2.10 to build app for Spark-1.6.x, otherwise spark will report error like:

Deploy Hive on Spark

The Mapreduce framework is too small for realtime analytic query, so we need to change engine of Hive from “mr” to “spark” (link):

1. set environment for spark:

2. copy configuration xml file for Hive:

and change these configuration items:

Notice: remember to replace all “${system:java.io.tmpdir}/${system:user.name}” in hive-site.xml to “/tmp/my/” (link)