Read paper “A Column Store Engine for Real-Time Streaming Analytics” (MemSQL)

Paper reference: A Column Store Engine for Real-Time Streaming Analytics

Background:
According to the official website, MemSQL is “a high performance data warehouse designed for the cloud that delivers ultra fast insights of your live and historical data”. It use row-storage and lock-free engine for data in memory and column-storage for data in disk.
MemSQL could also store all data into disk. In its most durable state, MemSQL will not lose any transactions which have been acknowledged. And it implements “Read Commited” isolation level for transactions.
I heard about MemSQL in early 2012, but don’t know it has became a OLTP-and-OLAP system until several days ago.

What is the problem?
To fulfill OLAP jobs, MemSQL has to store data into disk with columnar-storage-format. But MemSQL still need to process OLTP requests such as random INSERT or UPDATE. Therefore it store data into a data structure named “Segment”. And by connecting different segments into a ordered segments list (which named “Sorted Runs”), MemSQL could balance the requirements between frequent INSERT/UPDATE operations and SCAN/GROUP BY operations.
For example:
1. Users INSERT three keys: 1, 92, 107. MemSQL will create a segment that contains the three keys:
        [1, 92, 107]
2. Users continue to INSERT two keys: 63, 84. The segments list are:
        [1, 92, 107]
        [63, 84]
3. After many INSERT operations, the segments become:
        [1, 92, 107]
        [2, 17, 42]
        [63, 84]
        [110, 118, 172]

Now,MemSQL makes these segments into “Sorted Runs”, which have a basic order for keys:

When the SELECT comes, MemSQL could find the row quickly by just looking up two ordered segment-lists. Uses could also SCAN two segment-lists effectively to do OLAP tasks, since all the data are stored in columnar-format.
What happen if users INSERT more rows? MemSQL will merge the old big Sorted-Runs and create new segment for freshly-insert-data, which could keep the number of Sorted-Runs acceptable.
In practice, MemSQL column store engine uses a constant of 8, so that the biggest sorted run has at least of all the segments, the second biggest sorted run has at least of the remaining segments, and so forth. This strategy seems just like LSM tree in LevelDB of Google. The difference between LevelDB and MemSQL is LevelDB store every key-value pair respectively but MemSQL store a batch of rows into one segment.

If INSERT operations come when MemSQL is merging, the small merging actions will be aborted and relaunch. For big merging actions, it will barely skip any missing or updated segments, for skipping some segments will not ruin the new merged Sorted-Runs.
As we can see, MemSQL endeavors to avoid locking for in-memory data operations, which makes its performance significantly superior.

There are also some practical considerations for MemSQL.

  1. If only one merger are working at big Sort-Runs, it will cost too much time and the small Sorted-Runs will become tremendous for intensive INSERT operations. So MemSQL launch two mergers: Fast Merger and Slow Merger.
  2. MemSQL could accumulates some rows before batching them into a segment, which will decrease the fragment of data.
  3. MemSQL also create special Commands for users to sort all the segments, or just decrease the level of Sorted-Runs.
  4. Columnar-Storage format in memory makes using SIMD instruments of CPU possible. This shed some light on me: may be one day we can run machine learning jobs on MemSQL directly 🙂

Read paper “iShuffle: Improving Hadoop Performance with Shuffle-on-Write”

Paper reference: iShuffle: Improving Hadoop Performance with Shuffle-on-Write

Background:
A job in Hadoop consists of three main stages: map, shuffle, reduce (Actually shuffle stage has been contained into reduce stage).


What is the problem?
Shuffle phase need to migrate large mount of data from nodes which running map job to those nodes which intend to run reduce job. This cause shuffle-latency which is usually significant. And the reason is:

  • Partitioning skew: Hadoop use hash algorithm to organize output data of map task, if too many keys have the same hash, it may cause unevenly size of partitions
  • Coupling of shuffle and reduce: data shuffling can’t be overlapped with map tasks


Solution: iShuffle

    • “Shuffler” collect intermediate data generated by every map task and predict size of respective partition
    • “Shuffler Manager” collect informations from “Shuffler” and decide the position of partitions


    • Shuffle-on-Write:While a map task writing a spill to local filesystem, it will (by modification of Hadoop code) also write spill to correspondent node where reduce task will be launched
    • Automated map out placement: iShuffle will decide the position for every partition by “map selectivity”, which is the ratio of map input size and map output size. After predicting the “map selectivity” and knowing the total input size of data, iShuffle could finally choose optimist node for every partition data


  • Flexible reduce scheduling: when a node request a reduce task, the Task Manager(after modification of Hadoop’s FIFO scheduler) will find the list of partitions reside on this node, and launch reduce task only for these partitions (Make sure reduce task will only read shuffled data from local filesystem which will reduce network-bandwidth at reduce stage)

In my opinion
Using prediction technology to proactively move map output to apt nodes, which avoid partition skew, is the most intelligent part of this paper. This tech could also be used to other intermediate data moving scenario like OLAP in Data-warehouse.
But, I also suspect that in real production, not too many organizations will use this “iShuffle” as they usually run multi-user applications in Hadoop system. When a lot of jobs running in one Hadoop cluster simultaneously, a low peak of CPU-usage made by long reduce latency of one job will be compensated by other computing-intensive jobs. Therefore to all users, none of hardware resources are wasted.