Understand the Storage Module in Spark Core

Published:

Categories:

The module storage in Spark provides the data access service for application, including:

  • Reads and stores data from various sources: HDFS, Local disk, RAM or even fetch blocks of data from other nodes via network.
  • Caches data (RDDs) in different levels and formats: in-memory, disk, …

The most striking difference between Spark and Hadoop is the leverage of RAM memory in cluster to produce fast results. Thus, it’s worth to go a bit deeper to understand the caching API.

How RDD caching system works

alt

  • The application will first call 2 methods: cache() or persist() on the RDD object.
  • The object SparkContext keeps persistentRDDs as the list of cached RDDs. SparkContext can hold the reference of RDD being cached or remove that and then call gc. It can even periodically remove null RDD or old cached RDD thanks to the TimeStampedWeakValueHashMap[A, B].

alt Cache at nodes

Stack calls

alt

The CacheManager

CacheManager stores computed Rdds to BlockManager so that it can return a cached RDD or compute it.

BlockManager

  • “write-once” key-value store on each worker
  • Serves shuffle data (locally or remotely fetch data) as well as cached RDDs
  • Tracks StorageLevel (RAM, disk) for each block
  • Spills data to disk if memory is insufficient
  • Can replicate data across nodes

CommunicationManager

  • Asynchronous IO based networking library
  • Allows fetching blocks from BlockManagers
  • Allows prioritization/chunking across connections
  • Fetch logic tries to optimize for block sizes

MapOutputTracker

  • Track where each map task in a shuffle ran
  • Tells reduce tasks the map locations
  • Each worker caches the locations to avoid re-fetching
  • A “generation ID” passed with each Task allows invalidating the cache when map outputs are lost.

Different levels of caching

The class & object org.apache.spark.storage.StorageLevel act as flags for controlling the storage level of a RDD: Where to keep data:

  • RAM memory (by default)
  • Tachyon (off_heap)
  • Hard disk (in case out of in-memory) and in which form
  • Serialized format
  • Replicated partitions on multiple nodes
Level Use Disk Use Memory Use Off Heap Object(Deserialized) #Replication
NONE         1
DISK_ONLY  x       1
DISK_ONLY_2  x       2
MEMORY_ONLY   x   x 1
MEMORY_ONLY_2   x   x 2
MEMORY_ONLY_SER   x     1
MEMORY_ONLY_SER_2   x     2
MEMORY_AND_DISK x x   x 1
MEMORY_AND_DISK_2 x x   x 2
MEMORY_AND_DISK_SER x x     1
MEMORY_AND_DISK_SER_2 x x     2
OFF_HEAP     x   1

At the moment, Off Heap storage level does not support using disk, ram, serialized & multiple replications.

  • MEMORY_ONLY: if the RDD doesn’t fit in memory, some partition will not be cached (will be re-computed when needed).
  • MEMORY_AND_DISK: if the in memory, store the partitions that don’t fit in memory on disk (re-read them when needed)
  • OFF_HEAP: stores RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Refer to Tachyon project for more information. By storing the memory in Tachyon, the crash of an executor no longer leads to losing memory in cache

Leave a Comment