We could use Spark to sort all the data which is generated by Teragen of Hadoop.

TerasortApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.Partitioner
import org.apache.spark.rdd._

import org.apache.hadoop.examples.terasort.TeraInputFormat
import org.apache.hadoop.examples.terasort.TeraOutputFormat
import org.apache.hadoop.io.Text

import com.google.common.primitives.Longs
import com.google.common.primitives.UnsignedBytes

case class TeraSortPartitioner(numPartitions: Int) extends Partitioner {

  import TeraSortPartitioner._

  val rangePerPart = (max - min) / numPartitions

  override def getPartition(key: Any): Int = {
    val b = key.asInstanceOf[Text].getBytes()
    val prefix = Longs.fromBytes(0, b(0), b(1), b(2), b(3), b(4), b(5), b(6))
    (prefix / rangePerPart).toInt
  }
}

object TeraSortPartitioner {
  val min = Longs.fromBytes(0, 0, 0, 0, 0, 0, 0, 0)
  val max = Longs.fromBytes(0, -1, -1, -1, -1, -1, -1, -1)  // 0xff = -1
}

object TerasortApp {
  implicit val caseInsensitiveOrdering = UnsignedBytes.lexicographicalComparator

  def main(args: Array[String]) {
    val conf = new SparkConf()
      .registerKryoClasses(Array(classOf[Text]))
      .setAppName("Simple Application")
    val sc = new SparkContext(conf)

    var logData = sc.newAPIHadoopFile("hdfs://127.0.0.1/tera", classOf[TeraInputFormat], classOf[Text], classOf[Text])
    logData.partitionBy(new TeraSortPartitioner(logData.partitions.size))
      .sortBy(kv => kv._1.getBytes)
      .saveAsNewAPIHadoopFile[TeraOutputFormat]("hdfs://127.0.0.1/output")
  }
}

build.sbt

lazy val root = (project in file("."))
    .settings(
        name := "Terasort",
        version := "1.0",
        scalaVersion := "2.10.6",
        unmanagedJars in Compile += file("/home/sanbai/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar"),
        libraryDependencies ++= Seq(
            "org.apache.spark" % "spark-core_2.10" % "1.6.2",
            "org.apache.hadoop" % "hadoop-client" % "2.7.2"
        )
    )

After building the jar file, we could submit it to spark (I run my spark on yarn-cluster mode):

./bin/spark-submit --class TerasortApp \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2000M \
  --executor-memory 2000M \
  --executor-cores 1 \
  --num-executors 128 \
  --conf spark.yarn.executor.memoryOverhead=2048 \
  --conf spark.shuffle.memoryFraction=0.9 \
  --conf spark.storage.memoryFraction=0.9 \
  --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=85" \
  --queue spark \
  /home/sanbai/myspark/target/scala-2.10/Terasort_2.10-1.0.jar

It costs 17 minutes to complete the task, but tool “terasort” from Hadoop only costs 8 minutes to sort all data. The reason is I haven’t use TotalOrderPartitioner so spark has to sort all the data between different partitions (also between different servers) which costs a lot of network resource and delay the progress.

Remember to use scala-2.10 to build app for Spark-1.6.x, otherwise spark will report error like:

scala.runtime.VolatileObjectRef.zero()Lscala/runtime/VolatileObjectRef