[SPARK][CORE] 面试问题之UnsafeShuffleWriter流程解析(上)

2022-05-29 10:45:33 浏览数 (1)

在说UnsafeShuffleWriter 前,需要先细谈下Tungsten对内存管理的优化。当然这里就不展开讲了以防内容过于冗长。

Tungsten其实是一揽子优化项目的总代称,这里我们主要了解Tungsten对于内存管理的优化。对内存管理优化的原因主要有两方面: 1. Java对象占用内存的空间大。2. Jvm垃圾回收的开销大。

下面我们举例分析下:

我们拿类型是 String 的 name 来举例,如果一个用户的名字叫做“abcd”,它本应该只占用 4 个字节,但在 JVM 的对象存储中,“abcd”会消耗总共 48 个字节,其中包括 12 个字节的对象头信息、8 字节的哈希编码、8 字节的字段值存储和另外 20 个字节的其他开销。

另外,如果存在一个User表,其中存在username String, age Int, sex Char三个字段。那么一行数据需要创建三个包装类,同时需要将其装入到Array数组中,最后封装为GenericMutableRow。那么总共需要5个类。我们知道大量的类的创建会加剧JVM的GC情况,如果可以将其封装为一个类中,那么就减少了大量的类的创建。

为此Tungsten 设计了一种紧凑的二进制格式 Unsafe Row数据结构。

Unsafe Row : 紧凑的数据结构

Unsafe Row 是一种字节数组,它可以用来存储下图所示 Schema 为(userID,name,age,gender)的用户数据条目。总的来说,所有字段都会按照 Schema 中的顺序安放在数组中。其中,定长字段的值会直接安插到字节中,而变长字段会先在 Schema 的相应位置插入偏移地址,再把字段长度和字段值存储到靠后的元素中。

51.png

字节数组的存储方式在消除存储开销的同时,仅用一个数组对象就能轻松完成一条数据的封装,显著降低 GC 压力。

代码语言:javascript复制
public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
  public static int calculateBitSetWidthInBytes(int numFields) {
    return ((numFields   63)/ 64) * 8;
  }
  private Object baseObject;
  private long baseOffset;

  /** The number of fields in this row, used for calculating the bitset width (and in assertions) */
  private int numFields;

  /** The size of this row's backing data, in bytes) */
  private int sizeInBytes;

  /** The width of the null tracking bit set, in bytes */
  private int bitSetWidthInBytes;

  public void setNotNullAt(int i) {
    assertIndexIsValid(i);
    BitSetMethods.unset(baseObject, baseOffset, i);
  }

@Override
  public void setLong(int ordinal, long value) {
    assertIndexIsValid(ordinal);
    setNotNullAt(ordinal);
    Platform.putLong(baseObject, getFieldOffset(ordinal), value);
  }
...

从上面可以看出每个数据元组,有三部分组成[null bit set] [values] [variable length portion]null的追踪和word边界的补齐是由bitSetWidthInBytes字段负责。从上面的例子可以看出在赋值Long时调用Platform.putLong直接进行赋值。但是如果插入的数据是可变的数据类型,会先插入offset偏移量,指定在定长插入完成的靠后的位置,然后再插入其长度,最后再插入其数据值。

进一步提升数据存储效率与 GC 效率,Tungsten 还推出了基于内存页的内存管理模式。

基于内存页的内存管理

为了统一管理 Off Heap 和 On Heap 内存空间,Tungsten 定义了统一的 128 位内存地址,简称 Tungsten 地址。Tungsten 地址分为两部分:前 64 位预留给 Java Object,后 64 位是偏移地址 Offset。具体的定义在MemoryLocation类中

代码语言:javascript复制
public class MemoryLocation {

  @Nullable
  Object obj;

  long offset;

  public MemoryLocation(@Nullable Object obj, long offset) {
    this.obj = obj;
    this.offset = offset;
  }
...
}

对于 On Heap 空间的 Tungsten 地址来说,前 64 位存储的是 JVM 堆内对象的引用或者说指针,后 64 位 Offset 存储的是数据在该对象内的偏移地址。而 Off Heap 空间则完全不同,在堆外的空间中,由于 Spark 是通过 Java Unsafe API 直接管理操作系统内存,不存在内存对象的概念,因此前 64 位存储的是 null 值,后 64 位则用于在堆外空间中直接寻址操作系统的内存空间。

代码语言:javascript复制
public class TaskMemoryManager {

// [1] 页号13位表示
/** The number of bits used to address the page table. */
private static final int PAGE_NUMBER_BITS= 13;

// [2] 偏移量 64 - 13 = 51 位表示
/** The number of bits used to encode offsets in data pages. */
@VisibleForTesting
  static final int OFFSET_BITS= 64 -PAGE_NUMBER_BITS;  // 51

/** The number of entries in the page table. */
private static final int PAGE_TABLE_SIZE= 1 <<PAGE_NUMBER_BITS;

public static final long MAXIMUM_PAGE_SIZE_BYTES= ((1L << 31) - 1) * 8L;

// [3] page页用MemoryBlock表示,定位一个页如果是堆内前64 位存储的是 JVM 堆内对象的引用,堆外则64 位存储的是 null 值。
/**
   * Similar to an operating system's page table, this array maps page numbers into base object
   * pointers, allowing us to translate between the hashtable's internal 64-bit address
   * representation and the baseObject offset representation which we use to support both on- and
   * off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
   * When using an on-heap allocator, the entries in this map will point to pages' base objects.
   * Entries are added to this map as new data pages are allocated.
   */
private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];

/**
   * Bitmap for tracking free pages.
   */
private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);

final MemoryMode tungstenMemoryMode;

public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
    this.memoryManager = memoryManager;
    this.taskAttemptId = taskAttemptId;
    this.consumers = new HashSet<>();
  }
...
}
// [4] 每个page 由Object,offset 确定, length 表示页大小
public MemoryBlock(@Nullable Object obj, long offset, long length) {
    super(obj, offset);
    this.length = length;
}

// [5] 在创建TaskMemoryManager会指定MemoryMode,优先使用堆外内存
final val tungstenMemoryMode: MemoryMode = {
    if (conf.get(MEMORY_OFFHEAP_ENABLED)) {
      require(conf.get(MEMORY_OFFHEAP_SIZE) > 0,
        "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
      require(Platform.unaligned(),
        "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
      MemoryMode.OFF_HEAP
    } else {
      MemoryMode.ON_HEAP
    }
  }

由上代码分析可知:每个Task的内存空间被分为多个内存页Page, 每个内存页本质上都是一个内存块(MemoryBlock)。TaskMemoryManager统一了堆内堆外内存的访问方式,引入了虚拟内存逻辑地址的概念,并将逻辑地址转换为实际的物理地址。逻辑地址是一个64bits的长整型,高13bits用来表示页号pageNumber,低51bits用来表示该内存内部的偏移offset。

Untitled.png

内存优化的应用

首先我们来看下HashMap的弊端,Java 标准库中采用数组加链表的方式来实现 HashMap,数组元素存储 Hash code 和链表头。链表节点存储 3 个元素,分别是 Key 引用、Value 引用和下一个元素的地址。

Utle.png

但是,这种实现方式会带来两个弊端。

首先是存储开销和 GC 负担比较大。结合上面的示意图我们不难发现,存储数据的对象值只占整个 HashMap 一半的存储空间,另外一半的存储空间用来存储引用和指针,这 50% 的存储开销还是蛮大的。而且我们发现,图中每一个 Key、Value 和链表元素都是 JVM 对象。假设,我们用 HashMap 来存储一百万条数据条目,那么 JVM 对象的数量至少是三百万。由于 JVM 的 GC 效率与对象数量成反比,因此 java.util.HashMap 的实现方式对于 GC 并不友好。

其次,在数据访问的过程中,标准库实现的 HashMap 容易降低 CPU 缓存命中率,进而降低 CPU 利用率。链表这种数据结构的特点是,对写入友好,但访问低效。用链表存储数据的方式确实很灵活,这让 JVM 可以充分利用零散的内存区域,提升内存利用率。但是,在对链表进行全量扫描的时候,这种零散的存储方式会引入大量的随机内存访问(Random Memory Access)。相比顺序访问,随机内存访问会大幅降低 CPU cache 命中率。

那么,针对以上几个弊端,Tungsten 又是怎么解决的呢?BytesToBytesMap可以看做是Spark实现的HashMap, 我们从存储开销、GC 效率和 CPU cache 命中率分别来分析下。

Uled.png

首先,Tungsten 放弃了链表的实现方式,使用数组加内存页的方式来实现 HashMap。数组中存储的元素是 Hash code 和 Tungsten 内存地址,也就是 Object 引用外加 Offset 的 128 位地址。Tungsten HashMap 使用 128 位地址来寻址数据元素,相比 java.util.HashMap 大量的链表指针,在存储开销上更低。另外BytesToBytesMap在出现Hash冲突时采用的是开放定址法,通过探测下一个(idx 1)位置进行解决。

其次,Tungsten HashMap 的存储单元是内存页,内存页本质上是 Java Object,一个内存页可以存储多个数据条目。因此,相比标准库中的 HashMap,使用内存页大幅缩减了存储所需的对象数量。比如说,我们需要存储一百万条数据记录,标准库的 HashMap 至少需要三百万的 JVM 对象才能存下,而 Tungsten HashMap 可能只需要几个或是十几个内存页就能存下。对比下来,它们所需的 JVM 对象数量可以说是天壤之别,显然,Tungsten 的实现方式对于 GC 更加友好。再者,内存页本质上是 JVM 对象,其内部使用连续空间来存储数据,内存页加偏移量可以精准地定位到每一个数据元素。因此,在需要扫描 HashMap 全量数据的时候,得益于内存页中连续存储的方式,内存的访问方式从原来的随机访问变成了顺序读取(Sequential Access)。顺序内存访问会大幅提升 CPU cache 利用率,减少 CPU 中断,显著提升 CPU 利用率。

Tungsten除此以外还定义了基本的数据类型与数据结构:ByteArray、LongArray和UTF8String类型等。

接下来,我们回归Shuffle, 其实UnsafeShuffleWriter其实现的思路和SortShuffleWriter一致,相当于Tungsten版本的SortShuffleWriter。在Tungsten Shuffle的UnsafeShuffleWriter与SortShuffleWriter不同之处在于UnsafeShuffleWriter中不涉及数据的反序列化的操作。除此以外,在UnsafeShuffleWriter中,ExternalSorter是采用ShuffleExternalSorter替换实现,ShuffleExternalSorter的在功能上和ExternalSorter是一致的。

下一讲,我们将从源码的角度解读UnsafeShuffleWriter的实现过程,理解其中原理和适用场景。

0 人点赞