Using Spark-SQL to transfer CSV file to Parquet
After downloading data from “Food and Agriculture Organization of United Nations”, I get many CSV files. One of the file is named “Trade_Crops_Livestock_E_All_Data_(Normalized).csv” and it looks like:
1 2 3 4 5 |
Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag "2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1961","1961","tonnes","0.000000","" "2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1962","1962","tonnes","0.000000","" "2","Afghanistan","231","Almonds shelled","5910","Export Quantity","1963","1963","tonnes","0.000000","" ...... |
To load this CSV file into Spark and dump it to Parquet format, I wrote these codes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.rdd._ /* Area Code,Area,Item Code,Item,Element Code,Element,Year Code,Year,Unit,Value,Flag */ case class Trade(area_code:Int, area:String, item_code:Int, item:String, element_code:Int, element:String, year:Int, unit:String, value:Double, flag:String) object TradeCrops { def scrub(str:String):String = { return str.replace("\"", "") } def toInt(str:String):Int = { try { return scrub(str).toInt } catch { case e:Throwable => { return 0 } } } def toDouble(str:String):Double = { try { return scrub(str).toDouble } catch { case e:Throwable => { return 0 } } } def toTrade(line:String):Trade = { val fields = line.split("\",") Trade( toInt(fields(0)), scrub(fields(1)), toInt(fields(2)), scrub(fields(3)), toInt(fields(4)), scrub(fields(5)), toInt(fields(7)), scrub(fields(8)), toDouble(fields(9)), scrub(fields(10)) ) } def main(args: Array[String]) { val conf = new SparkConf() .setAppName("Trade Crops Application") val sc = new SparkContext(conf) val spark = SparkSession.builder() .appName("Spark SQL Trade Crops") .getOrCreate() val file = sc.textFile("hdfs:///FAO/Trade_Crops.csv") val tradeRDD = file.filter(_.split("\",").length == 11).map(toTrade(_)) val tradeDF = spark.createDataFrame(tradeRDD) tradeDF.write.parquet("hdfs:///FAO/Trade_Crops.parquet") } } |
The build.sbt is
1 2 3 4 5 6 7 8 9 10 11 12 |
lazy val root = (project in file(".")) .settings( name := "FAO", version := "1.0", scalaVersion := "2.11.7", unmanagedJars in Compile += file("/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.6.jar"), libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.1.1", "org.apache.spark" % "spark-sql_2.11" % "2.1.1", "org.apache.hadoop" % "hadoop-client" % "2.6.0" ) ) |