We could use Spark to sort all the data which is generated by Teragen of Hadoop.
TerasortApp.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.Partitioner
import org.apache.spark.rdd._
import org.apache.hadoop.examples.terasort.TeraInputFormat
import org.apache.hadoop.examples.terasort.TeraOutputFormat
import org.apache.hadoop.io.Text
import com.google.common.primitives.Longs
import com.google.common.primitives.UnsignedBytes
case class TeraSortPartitioner(numPartitions: Int) extends Partitioner {
import TeraSortPartitioner._
val rangePerPart = (max - min) / numPartitions
override def getPartition(key: Any): Int = {
val b = key.asInstanceOf[Text].getBytes()
val prefix = Longs.fromBytes(0, b(0), b(1), b(2), b(3), b(4), b(5), b(6))
(prefix / rangePerPart).toInt
}
}
object TeraSortPartitioner {
val min = Longs.fromBytes(0, 0, 0, 0, 0, 0, 0, 0)
val max = Longs.fromBytes(0, -1, -1, -1, -1, -1, -1, -1) // 0xff = -1
}
object TerasortApp {
implicit val caseInsensitiveOrdering = UnsignedBytes.lexicographicalComparator
def main(args: Array[String]) {
val conf = new SparkConf()
.registerKryoClasses(Array(classOf[Text]))
.setAppName("Simple Application")
val sc = new SparkContext(conf)
var logData = sc.newAPIHadoopFile("hdfs://127.0.0.1/tera", classOf[TeraInputFormat], classOf[Text], classOf[Text])
logData.partitionBy(new TeraSortPartitioner(logData.partitions.size))
.sortBy(kv => kv._1.getBytes)
.saveAsNewAPIHadoopFile[TeraOutputFormat]("hdfs://127.0.0.1/output")
}
}
build.sbt
lazy val root = (project in file(".")) .settings( name := "Terasort", version := "1.0", scalaVersion := "2.10.6", unmanagedJars in Compile += file("/home/sanbai/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar"), libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.10" % "1.6.2", "org.apache.hadoop" % "hadoop-client" % "2.7.2" ) )
After building the jar file, we could submit it to spark (I run my spark on yarn-cluster mode):
./bin/spark-submit --class TerasortApp \
--master yarn \
--deploy-mode cluster \
--driver-memory 2000M \
--executor-memory 2000M \
--executor-cores 1 \
--num-executors 128 \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.shuffle.memoryFraction=0.9 \
--conf spark.storage.memoryFraction=0.9 \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=85" \
--queue spark \
/home/sanbai/myspark/target/scala-2.10/Terasort_2.10-1.0.jar
It costs 17 minutes to complete the task, but tool “terasort” from Hadoop only costs 8 minutes to sort all data. The reason is I haven’t use TotalOrderPartitioner so spark has to sort all the data between different partitions (also between different servers) which costs a lot of network resource and delay the progress.
Remember to use scala-2.10 to build app for Spark-1.6.x, otherwise spark will report error like:
scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef