Understand the Shuffle Component in Spark-core

Published:

Categories:

Shuffle is one of the most expensive operations that will affect the performance of the job. Even though Spark tries to avoid shuffling as possible as it can, some operations require shuffling step to achieve the result (eg groupByKey, sortByKey, reduceByKey, distinct,…). These operations require one node fetches data from many the other nodes to have enough data for computing the result. Fetching data via network results in data partitioning, sorting, serialization, disk and network IO, … which are expensive to the application. The module shuffle in current Spark version has been evolving and bring significant improvements over time.

alt Shuffle: write phase & read phase

Just the same as Hadoop Map Reduce, Spark shuffle involves the aggregate step (combiner) before writing map outputs (intermediate values) to buckets. Spark also writes to a small buffers (size of buffer is configurable via spark.shuffle.file.buffer.kb) before writing to physical files to increase disk I/O speed.

The input aggregation

The aggregate is done thanks to the implementation in ShuffleMapTask: combines input by using 2 special data structures: AppendOnlyMap (In-memory hash table combiner) & ExternalAppendOnlyMap (In-memory and disk hash table combiner).

Available implementations of Spark shuffle are hash and sort. (spark.shuffle.manager). Sort is set by default since version 1.2 of Spark.

The basic hash shuffle

alt

In basic hash shuffle, each map task will write output into multiple files. Suppose we have: #MapTasks = M #ReduceTasks = R => #ShuffleFiles = M * R, and #In-memoryBuffers = M*R respectively. This implementation leads to the problem that: if we use 100Kb as the size of one buffer, then with 10,000 reducers and 10 mappers/executor, the total size of in-memory buffer would be: 100Kb * 10,000 * 10 = 10 Gb/executor. 10Gb/executor used only for buffer is unacceptable! Thus, this implementation won’t support ~10,000 reducers, we need to change something. The easiest and fastest way is to lower the buffer size (but not too small). The Spark-2503 decreased the buffer size spark.shuffle.file.buffer.kb to 32 Kb by default. https://issues.apache.org/jira/browse/SPARK-2503

The consolidate hash shuffle version

alt Consolidate shuffle

The solution that naively decreases the buffer size as previous apparently doesn’t satisfy us. Can we do better? Yes. The consolidation shuffle files - https://issues.apache.org/jira/browse/SPARK-751: within an executor, each bucket is mapped to a segment of file (as shown in the figure). As a result, for each executor, #ShuffleFiles = R, and #In-memoryBuffers = R. Thus, #ShuffleFiles = C*R / worker node if each node runs C executors in its C cores. You can enable this feature by setting spark.shuffle.consolidateFiles=true.

The sort based shuffle

alt Sort based shuffle

With the consolidate hash shuffle, we reduced the number of shuffle files from M*R to R files per Executor. Why can we do as Hadoop MR does: each MapTask spills only one shuffle file containing segments, and one index file. This will save significant memory for compression and serialization buffers and result in more sequential disk I/O.

Sort-based shuffle implementation - https://issues.apache.org/jira/browse/SPARK-2045

Each MapTask generates 1 shuffle data file and 1 index file. Output will be sorted using an ExternalSorter.

  • If map-side combines are required, data will be sorted by key and partition for aggregation. Otherwise, only sort by partition
  • If #reducers <= 200 (spark.shuffle.sort.bypassMergeThreshold = 200) and no aggregation or ordering: do as hash way (without sorting): spills to partitions then merges. Make sort-based shuffle writes files directly when there is no sorting / aggregation and # of partitions is small - https://issues.apache.org/jira/browse/SPARK-2787

Reference:
[1] http://www.slideshare.net/colorant/spark-shuffle-introduction
[2] Shuffle behavior configuration - https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
[3] Sort based shuffle in Spark - https://issues.apache.org/jira/secure/attachment/12655884/Sort-basedshuffledesign.pdf

Leave a Comment