Memory Management
Background
Pollux 内存系统构建于 std::mmap 库之上,以避免 std::malloc 带来的
内存碎片问题。它提供了用于查询执行的
基本内存分配函数,以及高级内存管理函数,例如公平内存共享、透 明文件缓存和服务器内存不足 (OOM) 预防。
Pollux 提供了大型连续和非连续缓冲区分配函数,以优化查询内存分配模式。例如,查询可以使用 std::mmap 直接从操作系 统分配物理内存,为哈希表 (HashTable::allocateTables) 分配大型连续内存。对于小型缓冲区分配,查询可以分配一大块非连续内 存,然后使用内存竞技场技术 (例如 StreamArena 或 HashStringAllocator) 在其上进行小型分配,以减少昂贵的实际内存分配次数。
Pollux 通过在运行时根据内存使用情况的变化调整内存容量,在运行的查询之间提供公平的内存共享。此过程称为内存仲裁。它确保所有 查询的总分配内存容量在系统配置的查询内存限制范围内。它还可以防止每个查询超出用户配置的单个查询内存限制。当某个查询尝试分配超过其当前 容量的内存时,内存仲裁会通过从其他具有更大容量的查询回收已使用的内存来增加该查询的容量;或者,如果查询超出了单个查询的内存限制,则从该查 询本身回收内存,以释放其当前容量范围内的空间。内存回收是通过诸如磁盘溢出 之类的技术实现的。
Pollux 提供透明文件缓存,通过热数据重用和预取来加速表扫描。文件缓存与内存系统集成,实现文件缓存和查询内存之间的动态内存共享。当查询分配内存失 败时,我们会通过缩小文件缓存来重试分配。因此,文件缓存大小会根据查询内存使用情况的变化自动调整。
Pollux 通过 std::mmap 库自行管理物理内存分配,从而预防服务器内存不足 (OOM)。这使我们能够对 Pollux 内存使用 的驻留集大小 (RSS) 进行显式控制。Pollux 中的内存分配器负责处理文件缓存和查询内存的所有内存分配。它确保分配的总内存不会超过 Pollux 使用的系统内存限制。为了进一步处理非 Pollux 组件的内存使用高峰, Pollux 提供了一种通用的服务器内存回退机制,当检测到服务器内存不足时,会自动缩小文件缓存,将未使用的 Pollux 内存返回给操作系统。
Overall Design
Pollux 内存系统由以下主要内存组件组成:内存管理器 (MemoryManager)、内存分配器 (MemoryAllocator)、文件缓存 (AsyncDataCache)、内存仲裁 器 (MemoryArbitrator) 以及多个内存池 (MemoryPool)。内存管理器创建所有其他组件。它在初始化内存系统时创建内存分配器和内存仲裁器,并在查询执行时按需创建内存池。
当查询开始执行时,它首先从内存管理器创建一个根内存池(查询池),然后根据查询计划从根开始创建子内存池树:每个查询任务一个子内存池(任务池),每 个计划节点一个孙内存池(节点池),以及每个运算符实例一个曾孙内存池(运算符池)。在查询执行期间,它从叶运算符池分配内存,并将内存使用情况向上传播 到根查询池。如果根处的聚合内存使用量超过当前查询内存容量,则查询池会向内存仲裁器发送增加其容量的请求。内存仲裁器可以通过从系统中容量最大的其他查 询中回收已使用的内存来增加请求者查询池的容量,或者如果请求者查询池超过了每个查询的内存限制或它拥有系统中最大的容量,则从请求者查询池本身回收以释放其 当前容量范围内的空间。已用内存的回收是通过磁盘溢出等技术实现的。 如果内存仲裁成功,则叶子操作符池可以继续执行内存分配器的实际内存分配。但是,如果内存仲裁失败,则系统中容量最大的查询将被选择失败,并出现查询内存容量 超出错误(本地内存溢出)。失败的查询可能是也可能不是请求者查询池本身。
内存分配器在其自身管理的内存空间中以机器页面(4KB)为单位进行实际内存分配。它会跟踪已分配的内存量,并在分配请求超出系统内存限制时返回错误。这可 以显式控制 Pollux 内存使用的 RSS,从而有助于防止服务器内存耗尽 (OOM)。
当用户查询访问远程存储时,文件缓存提供内存中的热数据缓存和预取功能。它直接从内存分配器分配内存,这部分内存不计入查询内存使用量。为了防止因文件 缓存内存使用量过大而导致内存分配失败,文件缓存会通过缩小文件缓存来重试分配失败的情况。这实现了文件缓存和查询内存之间的动态内存共享,以响应用户查询工作负载的变化。
总而言之,内存管理器管理内存池并协调不同内存组件之间的访问。内存池跟踪查询的内存使用情况,并与内存仲裁器交互,调整正在运行的查询之间的内存容量 分配,以实现公平的内存共享。内存分配器管理物理内存分配以防止服务器内存溢出 (OOM),并与文件缓存交互,实现查询内存和文件缓存之间的动态内存 共享,从而最大限度地提高内存 效率。本文档的其余部分将详细描述每个内存组件。
Memory Manager
内存管理器在服务器启动时使用提供的 MemoryManagerOption 创建。它会创建一个内存分配器实例,用于管理 通过内存池分配的查询内存和通过文件缓存分配的缓存内存的物理内存分配。它确保 分配的总内存不超过系统内存限制(由 MemoryManagerOptions::allocatorCapacity 指定)。内存管理器还会创建一个 内存仲裁器实例,用于仲裁正在运行的查询之间的内存容量。它确保 分配的查询内存总容量不超过查询内存限制(由 MemoryManagerOptions::arbitratorCapacity 指定)。内存 仲裁器还可以通过磁盘溢出回收过度使用的内存, 从而防止每个查询超出其每个查询的内存限制(由QueryConfig::query_max_memory_per_node指定)(有关详细信息, 请参阅内存仲裁器)。
设置 Pollux 内存系统后,内存管理器将管理用于查询执行的内存池。查询启动时,它会从内存管理器创建一个根查询池, 然后根据查询计划(详情请参阅内存池部分)从该查询池创建子池树,用于内存分配和使用情况跟踪。
内存管理器会跟踪所有活动的查询池,以便进行内存仲裁。当查询池向内存管理器发送增加容量的请求 (MemoryManager::growPool) 时,内 存管理器会将该请求转发给内存仲裁器,并将活动的查询池列表作为仲裁候选池。内存仲裁器首先从容 量最大的候选池中回收已使用的内存,然后相应地 使用释放的内存空间增加请求池的容量。如果请求者池已拥有所有候选池中最大的容量,则内存仲裁器将从请求者自身回收内存,以释放其当前容量范围 内的空间。有关内存仲裁过程的详细说明,请参阅内存仲裁过程。
内存管理器并不拥有用户创建的查询池的所有权,而仅通过 MemoryManager::dropPool 方法跟踪它们的活跃度,该方法由 查询池的析构函数调用,用于将自身从跟踪列表 (MemoryManager::pools_) 中删除。QueryCtx 对象拥有查 询池,该池将一直保持活跃状态,直到查询完成。
内存管理器创建并拥有一个系统根池,用于 Pollux 内部操作,例如磁盘溢出 。 系统根池和用户创建的查询根池之间的区别在于,系统根池没有每个查询的内存限制,因此它不参与内存仲裁。这是因为系统操作不是代表特定用户查询执行的。 以磁盘溢出 为例,它由内存仲裁触发,用于释放查询使用的 内存。我们预计系统运行期间不会出现大量的内存占用,最终,内存分配器会保证实际分配的内存在系统内存限制范围内,无论是用于系统运行还是用于用户查询 执行。在实际应用中,我们会从内存分配器中预留一些空间来补偿此类系统内存占用。我们可以通过将查询内存限制 (MemoryManagerOptions::arbitratorCapacity) 配置为小于系统内存限制 (MemoryManagerOptions::allocatorCapacity) 来实现(详情请参阅OOM 预防)。
Memory System Setup
以下是 Prestissimo 中初始化 Pollux 内存系统的代码块:
:linenos:
void PrestoServer::initializePolluxMemory() {
auto* systemConfig = SystemConfig::instance();
const uint64_t memoryGb = systemConfig->systemMemoryGb();
MemoryManagerOptions options;
options.allocatorCapacity = memoryGb << 30;
options.useMmapAllocator = systemConfig->useMmapAllocator();
if (!systemConfig->memoryArbitratorKind().empty()) {
options.arbitratorKind = systemConfig->memoryArbitratorKind();
const uint64_t queryMemoryGb = systemConfig->queryMemoryGb();
options.queryMemoryCapacity = queryMemoryGb << 30;
...
}
memory::initializeMemoryManager(options);
if (systemConfig->asyncDataCacheEnabled()) {
...
cache_ = std::make_shared<cache::AsyncDataCache>(
memoryManager()->allocator(), memoryBytes, std::move(ssd));
}
...
}
- L5:从 Prestissimo 系统配置中设置内存分配器容量(系统内存限制)。
- L6:从 Prestissimo 系统配置中设置内存分配器类型。如果 useMmapAllocator 为 true,则使用 MmapAllocator,否则使用 MallocAllocator。内存分配器 描述了这两种类型的分配器。
- L8:从 Prestissimo 系统配置中设置内存仲裁器类型。
目前,我们仅支持 “SHARED” 仲裁器类型(请参阅内存仲裁器)。
NOOP仲裁器类型即将弃用。 - L10:从 Prestissimo 系统配置中设置内存仲裁器容量(查询内存限制)
- L13:创建进程范围的内存管理器,该管理器根据前面步骤初始化的 MemoryManagerOptions 在内部创建内存分配器和仲裁器
- L15-19:如果在 Prestissimo 系统配置中启用了文件缓存,则创建文件缓存
Memory Pool
内存池为查询执行提供内存分配功能。它还会跟踪查询的内存使用情况,以便强制执行每个查询的内存限制。如查询内存池层次结构图所示,查询 会创建一个内存池树,该树镜像查询计划,以便对内存使用情况进行细粒度的跟踪,从而确定哪些任务或操作符使用了最多的内存。在树的根 节点,QueryCtx 从内存管理器创建一个根查询池。每个查询任务都会从查询池创建一个子任务池。 查询任务执行查询计划的一个片段(例如,Prestissimo 中分布式查询执行计划中的一个执行阶段)。任务计划片段中的每个计划节点都会从任务池 创建一个子节点池 (Task::getOrAddNodePool)。每个计划节点都属于一个或多个任务执行管道。每个管道可能有多个并行运行的驱动程序 实例。每个驱动程序实例都包含一个查询运算符管道,而运算符是驱动程序中查询计划节点的实例。因此,每个运算符都会从节点池中创建一个子运算符池 (Task::addOperatorPool)。
查询从位于树叶子节点的运算符池中分配内存,并将内存使用情况一直传播到位于树根节点的查询池,以检查内存使用情况是否超出每个查询的内存限 制。内存分配始终在叶子运算符池中进行,中间池(节点池和任务池)仅聚合内存使用情况,而根查询池则强制执行每个查询的内存限制。鉴于此,我们引入了 两种内存池类型(由 MemoryPool::Kind 定义)以简化内存池管理:一种是 LEAF 类型,它仅允许内存分配;另一种是 AGGREGATE 类型,它聚合 其所有子节点的内存使用情况,但不允许直接分配内存。因此,运算符池为 LEAF 类型,而所有其他运算符池均为 AGGREGATE 类型。我们仅在根查询 池中强制执行内存限制检查。
Memory Usage Tracking
为了跟踪查询内存使用情况,叶运算符池需要将内存使用情况一直传播到根查询池,并检查每次分配的内存限制,但这会很慢。因此,内存池使用内存预 留机制来跟踪查询内存使用情况。内存预留以 1MB 或更大的块进行,以避免每次分配都过度锁定、传播和检查内存使用情况(请参阅下 面的 MemoryPool::quantizedSize 说明)。叶运算符池维护两个内存预留计数器:一个是实际使用的内存 (MemoryPoolImpl::usedReservationBytes_),另一 个是从根查询池预留的内存 (MemoryPoolImpl::reservationBytes_)。这两个计数器之间的差值就是叶运算符池可用的内存。
中间池仅使用 reservationBytes_ 来计算其所有子池持有的内存预留总量。根查询池有两个额外的计数器用于检查内存限制:一个是其当前内存容量 (MemoryPoolImpl::capacity_),即 查询可用的内存量。内存仲裁器会根据正在运行的查询数量、查询内存总限制以及每个查询所需的内存量来设置此值。另一个是最大容量 (MemoryPool::maxCapacity_),即查询可以达到的最 大容量。它由用户设置,并在查询的生命周期内固定不变 (QueryConfig::kQueryMaxMemoryPerNode)。内存仲裁器不能将查询的 capacity_ 设置为超出其 maxCapacity_ 限制的值。
当根查询池收到新的内存预留请求时,它会增加 reservationBytes_ 并检查其是否在其当前的 capacity_ 限制范围内。如果在限制范围内,则根查询池接受该请求。如果不在限制范围内,则根查询池 会请求内存仲裁器(通过内存管理器)通过内存仲裁增加其容量(详情请参阅内存仲裁器)。 如果内存仲裁失败,则根查询池将拒绝该请求,并抛出查询内存容量超出错误(本地 OOM 错误)。
MemoryPool::reserve 和 MemoryPool::release 是内存池用于内存预留的两个方法。内存预留是线程安全的,而 MemoryPool::reserveThreadSafe 是实现内存预留逻辑的主要函数:
#. 叶内存池调用 MemoryPool::reservationSizeLocked 来计算新的所需预留 (incrementBytes)。该计算基 于内存分配大小和可用内存预留 (reservationBytes_ - usedReservationBytes_)。
#. 如果 incrementBytes 为零,则表示叶内存池有足够的可用预留空间,因此无需进行新的预留,只需更新 usedReservationBytes_ 即可反映新的内存使用情况。
#. 如果 incrementBytes 不为零,则叶内存池需要调用 MemoryPool::incrementReservationThreadSafe(见下文)将增量一直传播到根内存池,以检查新的预留请求是否超出查询 的当前容量。如果没有超出,则通过相应地增加 reservationBytes_ 来接受预留。
请注意,如果 MemoryPool::incrementReservationThreadSafe 失败,它会抛出一个异常,导致内存分配请求失败,并出现本地 OOM 错误。
#. 预留成功后,叶内存池将返回步骤 1,检查是否有足够的可用预留空间用于分配请求。
请注意,对同一叶内存池的并发分配请求可能会窃取步骤 3 中所做的预留,因 此我们必须再次检查。 从根内存池进行预留时,我们不会持有叶内存池的锁,如果涉及内存仲裁,这可能会造成阻塞操作。因此,如果有两个来自同一叶内存池的并发内 存预留请求,则可能存在竞争条件。但我们预计这种情况在实践中不会经常发生。
如上所述,为了避免频繁并发地向根内存池预留内存,从而降低 CPU 成本,叶内存池会进行量化内存预留。它会将实际预留的字节数向上舍 入到下一个较大的量化预留值 (MemoryPool::quantizedSize):
- 如果大小
<16MB,则向上舍入到下一个 MB - 如果大小
<64MB,则向上舍入到下一个 4MB - 如果大小
>=64MB,则向上舍入到下一个 8MB
使用量化预留,我们预留的内存永远不会少于 1 MB。即使我们只需要 1KB,也必须预留 1MB,如果可用内存不足,查询就会失败。这也意味 着,如果我们以 15 的并发数运行,每个驱动程序线程将至少预留 1MB,因此,即使查询只使用了几 KB,也至少需要 15 MB 的内存。
MemoryPool::incrementReservationThreadSafe 的实现:
#. 非根内存池会递归调用其父池的 incrementReservationThreadSafe 方法,将预留请求一直传递到根内存池
#. 检查父池的 MemoryPool::incrementReservationThreadSafe 结果:
a. 如果函数返回 true,则表示从根内存池预留成功,并继续接受预留(步骤 3)。 b. 如果函数返回 false,则表示预留成功,但与根内存池检测到的其他并发预留请求冲突。我们需要通过向 MemoryPoolImpl::reserveThreadSafe 返回 false 来再次从叶内存池重试。 c. 如果根内存池的内存预留失败,则函数将抛出“查询内存容量超出”异常,并且内存分配失败。
#. 调用 MemoryPool::maybeIncrementReservation 尝试增加预留空间并检查结果:
a. 对于非根内存池,此操作应该始终成功,因为我们只检查根内存池的容量。 b. 对于根内存池,如果预留请求超出其当前容量,该函数可能会返回 false,并进入步骤 4 以请求内存仲裁。
#. 根内存池调用 MemoryManager::growPool 来增加其容量。 这会触发内存仲裁器内部的内存仲裁过程。
#. 如果 MemoryManager::growPool 返回 true,则表示我们成功增加了内存容量(或将内存使用量减少到当 前容量范围内)。该函数再次调用 MemoryPool::maybeIncrementReservation 来检查内存预留是否能够满足。如果 不能满足,则应该有一个并发的内存预留请求来移除已增加的内存容量。在这种情况下,返回 false 以再次从叶内存池重试(步骤 2-b)。否则,返回 true(步骤 2-a)。
#. 如果 MemoryManager::growPool 返回 false,则表示我们无法从内存仲裁器增加容量,并抛出查询内存容量超出错误(步骤 2-c)
Memory Pool APIs
内存池有三组 API,分别用于内存池管理、内存分配和内存仲裁。以下列出了每组 API 中需要使用的主要 API。
Memory Pool Management
/// Creates a root memory pool with specified 'name' and 'maxCapacity'.
/// 'reclaimer' is provided for memory arbitration process.
std::shared_ptr<MemoryPool> MemoryManager::addRootPool(
const std::string& name = "",
int64_t maxCapacity = kMaxMemory,
std::unique_ptr<MemoryReclaimer> reclaimer = nullptr);
/// Create an aggregate child memory pool which allows to create child memory
/// pools from it, and it used to aggregate memory usage from its child pools.
/// Aggregate memory pool is not allowed to allocate memory directly.
virtual std::shared_ptr<MemoryPool> MemoryPool::addAggregateChild(
const std::string& name);
/// Create a leaf child memory pool which allows to allocate memory but are not
/// allowed to create child pools.
virtual std::shared_ptr<MemoryPool> MemoryPool::addLeafChild(
const std::string& name);
/// Creates new instance of MemoryPool for an operator, stores it in the task
/// to ensure lifetime and returns a raw pointer.
pollux::memory::MemoryPool* Task::addOperatorPool(
const core::PlanNodeId& planNodeId,
int pipelineId,
uint32_t driverId,
const std::string& operatorType);
/// Creates new instance of MemoryPool for a plan node, stores it in the task
/// to ensure lifetime and returns a raw pointer.
memory::MemoryPool* Task::getOrAddNodePool(
const core::PlanNodeId& planNodeId);
Memory Allocation
内存池提供三种类型的内存分配。如果用户需要分配一大块缓冲区,且分配的缓冲区不需要连续,则可以使用 MemoryPool::allocateNonContiguous 分配多个大小可变的缓冲区(详情请参阅非连续分配)。Pollux 将这种分配方式用于 RowContainer、 StreamArena/HashStringAllocator 和 AsyncDataCache 等。如果用户需要分配一大块连续的缓冲区,且大小 > 1MB,则可以使用 MemoryPool::allocateContiguous 通过 std::mmap 直接从操作系统分配一大块物理内存(详情请参阅连续分配)。 Pollux 将这种分配方式用于 HashTable。对于任何其他临时分配,我们可以使用 MemoryPool::allocate。内存分配器会根据实际分配大小决定如何分配 内存(详情请参阅小型分配)。
/// Allocates a buffer with specified 'size'. If the memory allocation is
/// smaller than a predefined threshold, then we delegate the allocation to
/// std::malloc (MmapAllocator::Options::maxMallocBytes).
virtual void* MemoryPool::allocate(int64_t size) = 0;
/// Frees an allocated buffer.
virtual void MemoryPool::free(void* p, int64_t size) = 0;
/// Allocates one or more runs that add up to at least 'numPages', with the
/// smallest run being at least 'minSizeClass' pages. 'minSizeClass' must
/// be <= the size of the largest size class (see non-contiguous allocation
/// section for size class definition). The new memory is returned in 'out' on
/// success and any memory formerly referenced by 'out' is freed. The function
/// throws if allocation fails and 'out' references no memory and any partially
/// allocated memory is freed.
virtual void MemoryPool::allocateNonContiguous(
MachinePageCount numPages,
Allocation& out,
MachinePageCount minSizeClass = 0) = 0;
/// Frees non-contiguous 'allocation'. 'allocation' is empty on return.
virtual void MemoryPool::freeNonContiguous(Allocation& allocation) = 0;
/// Makes a large contiguous mmap of 'numPages'. The new mapped pages are
/// returned in 'out' on success. Any formly mapped pages referenced by 'out'
/// is unmapped in all the cases even if the allocation fails.
virtual void MemoryPool::allocateContiguous(
MachinePageCount numPages,
ContiguousAllocation& out) = 0;
/// Frees contiguous 'allocation'. 'allocation' is empty on return.
virtual void MemoryPool::freeContiguous(ContiguousAllocation& allocation) = 0;
Memory Arbitration
The memory arbitrator section below discusses how these memory arbitration related methods are used in the memory arbitration and reclaim process.
/// Returns the number of bytes that haven't been reserved for use, and can be
/// freed by reducing this memory pool's limit.
virtual uint64_t MemoryPool::freeBytes() const = 0;
/// Invoked to bump up the memory pool's capacity by 'bytes'. The function
/// returns the memory pool's new capacity after the grow.
virtual uint64_t MemoryPool::grow(uint64_t bytes) = 0;
/// Invoked to free up to the specified amount of unused memory reservations by
/// reducing this memory pool's capacity without actually freeing up any
/// used memory. The function returns the actually freed memory bytes. If
/// 'targetBytes' is zero, the function frees all the unused memory reservation
/// bytes.
virtual uint64_t MemoryPool::shrink(uint64_t targetBytes = 0) = 0;
/// Invoked by the memory arbitrator to enter memory arbitration processing. It
/// is a noop if 'reclaimer_' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void MemoryPool::enterArbitration();
/// Invoked by the memory arbitrator to leave memory arbitration processing. It
/// is a noop if 'reclaimer_' is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual void MemoryPool::leaveArbitration();
/// Function estimates the number of reclaimable bytes and returns in
/// 'reclaimableBytes'. If the 'reclaimer' is not set, the function returns
/// std::nullopt. Otherwise, it will invoke the corresponding method of the
/// reclaimer.
virtual std::optional<uint64_t> reclaimableBytes() const = 0;
/// Invoked by the memory arbitrator to reclaim memory from this memory pool
/// with specified reclaim target bytes. If 'targetBytes' is zero, then it
/// tries to reclaim all the reclaimable memory from the memory pool. It is
/// noop if the reclaimer is not set, otherwise invoke the reclaimer's
/// corresponding method.
virtual uint64_t MemoryPool::reclaim(uint64_t targetBytes);
Memory Arbitrator
内存仲裁器用于在正在运行的查询之间仲裁内存容量,以实现公平的内存共享,并防止查询超出其内存限制。为了在正在运行的查询之间仲裁内存容量,内存 仲裁器需要能够通过诸如磁盘溢出之类的技术从查询中回收已 使用的内存,然后通过相应地调整内存池的容量在查询之间转移释放的内存(详情请参阅内存仲裁过程部分)。
MemoryArbitrator 的定义旨在支持不同查询系统的不同实现。目前,我们为 Prestissimo 和 Prestissimo-on-Spark 实现了 SharedArbitrator。 Gulten 实现了自己的内存仲裁器,以便与 Spark 内存系统 集成。SharedArbitrator确保分配的总内存容量在查询内存限制 (MemoryManagerOptions::arbitratorCapacity) 范围内,并确保每个查询的容量都在单个查询的内存限制 (MemoryPool::maxCapacity) 范围内。 当查询需要增加容量时,SharedArbitrator 会从查询本身回收已使用的内存(如果已超过其最大内存容量),或者通过从系统中具有最大内存容量的其他查询回收已使用的内存来增加其容量。
Memory Arbitration Process
SharedArbitrator 中的端到端内存仲裁过程如下:
#. 查询运算符 A 从其叶运算符池(运算符池 A)分配内存 #. 运算符池 A 将内存预留请求发送到根查询池(查询池 A) #. 查询池 A 是根内存池,它会检查内存预留请求是否在当前容量范围内 (MemoryPoolImpl::capacity_)。假设请求已超出当前容量,则触发内存仲裁 #. 查询池 A 向内存管理器发送请求,以增加其新预留的容量 (MemoryManager::growPool) #. 内存管理器将请求转发给内存仲裁器 (MemoryArbitrator::growCapacity),并将请求者内存池和根查询池列表作为内存仲裁候选。内存管理器在内存仲裁过程中保持候选查询池处于活动状态 #.内存仲裁器将内存仲裁处理序列化,每次处理一个请求,以确保查询之间分配的内存容量视图一致。内存仲裁器可能会收到来自 不同查询甚至同一查询的不同驱动程序线程的并发仲裁请求。对于每个内存仲裁请求:
a. 内存仲裁器在启动内存仲裁之前,会调用请求者内存池的 MemoryPool::enterArbitration 方法。此处的请求 内存池是发起内存预留请求的算子池 A。它调用关联的算子回收器 (Operator::MemoryReclaimer) 的 MemoryReclaimer::enterArbitration 方法。算子回收器会将驱动线程置于挂起状态 (Task::enterSuspended)。要从 查询任务中回收内存,我们需要首先暂停该任务以停止其所有驱动线程,以避免在内存回收期间对其算子状态进行任何并发更新。 如果选择请求内存池的查询任 务来回收内存, 那么我们必须将其驱动线程置于挂起状态, 否则,由于请求驱动线程正在接受内存仲裁,因此查询任务将永远不会暂停。注意,在任务暂停处理中,挂起的驱动线程不计入正在运行的状态。
b. 内存仲裁器调用 SharedArbitrator::ensureCapacity 来检查请求者查询池的新预留是否超出其最大内存容量 限制 (MemoryPool::maxCapacity_)。如果没有超出,则继续执行步骤 6-c。否则,内存仲裁器尝试从请求者池本身回收已 使用的内存。如果内存回收机制已从请求者池中释放出足够的内存用于新预留,且内存仲裁在其当前容量范围内,则内存仲裁成功。如果请求 者池仍然超出最大内存容量限制,则内存仲裁失败。否则,继续执行步骤 6-c。
c. 内存仲裁器运行快速路径 (SharedArbitrator::reclaimFreeMemoryFromCandidates),从候选查询池中回收未使用 的内存预留,而不会实际释放已使用的内存。它首先尝试从自身回收,然后从可用容量最大的候选池中回收, (MemoryPool::freeBytes),直到达到内存回收目标。
d. 如果内存仲裁器在快速路径上未回收足够的可用内存,则运行慢速路径 (SharedArbitrator::reclaimUsedMemoryFromCandidates), 从可回收内存最多的候选池中回收已使用的内存(有关查询中内存回收过程的详细信息,请参阅“内存回收过程”部分 )。
e. 如果内存仲裁器已回收足够的内存,它会通过增加内存容量 (MemoryPool::grow) 将回收的内存分配给请求者池。如果没有,内存仲裁器必须调用 SharedArbitrator::handleOOM,向容量最大的候选内存池(作为牺牲者)发送内存池中止 (MemoryPool::abort) 请求,以释放内存,让其他拥有足够内存的正在运行的查询继续执行。内存池中止请求会导致查询执行失败, 并等待查询完成以 释放所有占用的内存资源。
f. 如果牺牲查询池就是请求者池本身,则内存仲裁失败。否则,返回步骤 6-c,在放弃之前再次尝试内存仲裁。
g.内存仲裁器在内存仲裁结束时调用请求者内存池的 MemoryPool::leaveArbitration 方法。回收器运算符将其驱动线程移出挂起状态 (Task::leaveSuspended)。
Memory Reclaim Process
以下是查询中的内存回收过程:
-
内存仲裁器调用候选查询池的 MemoryPool::reclaim 方法,并以字节为单位指定回收目标,该方法会调用关联内存回收器对象 (MemoryReclaimer::reclaim) 的相应方法。 查询池使用默认实现,根据可回收字节数 (MemoryPool::reclaimableBytes) 对其子任务池进行排序,并从可回收字节数最多的任务中回收内存,直至达到回收目标。
-
查询池调用任务池的 reclaim 方法,该方法又调用关联的任务回收器 (Task::MemoryReclaimer)。后者首先暂停任务执行 (Task::requestPause),然后根据可回收字节数对其子节点池进行排序,并从可回收字节数最多的节点池中回收内存。达到回收目标或从所有节 点池回收资源后,任务回收器将恢复任务执行 (Task::resume)
-
任务池调用节点池的 reclaim 方法,该方法从其可回收字节数最多的子操作符池中回收内存。
-
节点池最终调用操作符池执行实际的内存回收 (Operator::MemoryReclaimer)。目前,我们支持通过磁盘溢出和表写入刷新进行内 存回收。Operator::reclaim 的添加是为了支持内存回收,其默认实现不执行任何操作。只有可溢出的操作符会重写该方法:OrderBy、HashBuild、 HashAggregation、RowNumber、TopNRowNumber、Window 和 TableWriter。 目前,我们只是从可溢出操作符的行容器中溢出所有内容来释放内存。在为行容器添加内存压缩支持后,我们可以利用 Pollux 中的细粒度磁盘溢出功能, 仅溢出和释放所需的内存量。
请注意,如果可溢出运算符在数据处理过程中触发了内存仲裁,即使它已经停止了查询任务的执行,也无法从该运算符处回 收内存。为了防止这种情况发生,我们添加了 Operator::nonReclaimableSection_ 来指示运算符是否位于不 可回收区段内,并且内存仲裁器无法从位于不可回收区段内的运算符处回收内存。驱动程序执行框架默认在不可回收区段内 设置一个正在运行的运算符。可溢出运算符会在实际数据处理之前,在特定调用点(例如内存预留 (MemoryPool::maybeReserve))选择 清除不可回收区段,以便内存仲裁器回收内存。
Memory Allocator
内存分配器管理通过内存池分配的查询内存和直接从文件缓存分配的缓存内存的物理内存分配。内存分配器确保分配的总 内存始终在系统内存限制之内。MemoryAllocator 定义了内存分配器接口。我们有两个分配器实现:MallocAllocator 将 所有内存分配委托给 std::malloc,它简单可靠。我们将其作为默认选项提供,但我们认为它存在由内存碎片引起的 RSS 变化问题。因 此,我们构建了 MMapAllocator 来使用 std::mmap 管理物理内存分配,以便对 RSS 进行显式控制。我们尚未确认 MmapAllocator 是否 比 MallocAllocator 效果更好,但我们能够使用它运行相当大的 Prestissimo 工作负载。我们将在未来使用这两个分配器比较该工作负载,以确定 哪一个更好。用户可以通过设置 MemoryManagerOptions::useMmapAllocator 来为其应用程序选择分配器(例如,参见内存系统设置)。
Non-Contiguous Allocation
非连续分配定义为一个由多个 PageRun 组成的 Allocation 对象。每个 PageRun 包含一个连续的缓冲区,不
同 PageRun 的缓冲区不必连续。MMapAllocator 定义了 MmapAllocator::SizeClass 数据结构(类似于
Umbra <https://db.in.tum.de/~freitag/papers/p29-neumann-cidr20.pdf>_ 中使用的数据结构)来管
理非连续分配。SizeClass 对象提供固定大小的缓冲区(类页面)的分配,其大小是机器页面大小的 2 的幂。MMapAllocator 创
建了 9 个不同的 SizeClass 对象,类页面大小从 1 个机器页面 (4KB) 到 256 个机器页面 (1MB) 不等。
为了分配大量的机器页面,MmapAllocator 会调用MemoryAllocator::allocationSize 来构建分配计划
(MemoryAllocator::SizeMix),该分配计划包含一个选定的 SizeClass 对象列表,以及每个对象中要分配的类页面数量。
MemoryAllocator::allocationSize 通过从用户指定的最大值 SizeClass 到最小值 SizeClass 进行搜索 来生成分配计划。如果最小值 SizeClass 不为 1,则最后一个分配的类页面可能会浪费内存。如图 所示,对于一个 150 个页面的分配请 求,最小值 SizeClass 为 4,我们选择从 SizeClass/64 中分配 2 个类页面,从 SizeClass/16 中分配 1 个类页面, 从 SizeClass/4 中分配 2 个类页面。总共分配的机器页面数量为 152。最后一个从 SizeClass/4 中分配的类页面有两个机器 页面被浪费。内存分配器根据分配计划,从每个选定的 SizeClass 对象中分配内存。分配结果将返回一个 Allocation 对象,该对 象包含 4 个页面运行:两个运行来自SizeClass/64(两个分配的类页面在内存中不连续),一个运行来自 SizeClass/16,一个运行来自 SizeClass/4(两个分配的类页面在内存中连续)。
每个 SizeClass 对象使用 std::mmap 设置自己的内存空间,其大小与系统内存限制相同。在用户写入已分配 的内存空间之前,设置的内存空间不会引起操作系统的任何内存分配(或拥有后备内存)。SizeClass 对象将其自身的内存 空间划分为多个类页面,并使用 SizeClass::pageAllocated_ 位图来跟踪类页面是否已分配。它使用另一个位图 SizeClass::pageMapped_ 来跟踪类页面是否有后备内存(映射的类页面)。为了确保 Pollux 内存使用率的 RSS 在系 统内存限制范围内,我们假设已分配的类页面始终拥有后备内存,而已释放的类页面也拥有后备内存,直到我们调用 std::madvise 将其释放回操 作系统。要释放类页面,我们只需清除 pageAllocated_ 位图中的分配位,但不会立即调用 std::madvise 来释放后备内存,因为 std::madvise 是一个开 销很大的操作系统调用。我们还预计已释放的类页面很可能会被再次使用。因此,只有当映射的类页面总数达到系统内存限制时,我们才会移除已释放类页面 的后备内存以进行新的分配。numMappedFreePages_ 用于跟踪每个 SizeClass 对象中仍具有后备内存的已释放类页面的数 量。SizeClass::adviseAway 实现了延迟释放后备内存的控制逻辑。
我们采用了两种优化措施来加速空闲类页面的查找。一种是使用聚合位图 (mappedFreeLookup_) 来跟踪组中 的空闲类页面。mappedFreeLookup_ 中的每个位对应于 pageAllocated_ 中的 512 位(8 个字)。如 果 mappedFreeLookup_ 中的某个位被置位,则 pageAllocated_ 中的 512 位中至少有一个位未置位。另一 种是使用 simd 指令操作位图,以进一步加速 CPU 执行。
简化的 MmapAllocator::allocateNonContiguous 实现:
bool MmapAllocator::allocateNonContiguous(
MachinePageCount numPages,
Allocation& out,
ReservationCallback reservationCB,
MachinePageCount minSizeClass) override;
-
使用 numPages 和 minSizeClass 参数调用 MemoryAllocator::allocationSize。 numPages 指定要分配的机器页面数量。minSizeClass 指定要分配的最小类页面大小。该函数返回 从 MemoryAllocator::SizeMix 中每个选定的 SizeClass 分配的类页面数量。从所有 SizeClass 对象分配的机器页面总数应不小于请求的 numPages。
-
增加内存分配器的内存使用量,并检查是否超出系统内存限制 (MemoryAllocator::capacity_)。如果超出,则分配失败并撤销内存使用量更新。否则,继续执行步骤 3 中的内存池预留操作。
- MMapAllocator* 使用 MallocAllocator::numAllocated_ 来计算已分配的内存,单位为机器页。
- MMapAllocator* 的分配操作由 AsyncDataCache::makeSpace 包装,它会在分配失败后多次缩小文件缓存,直至放弃。每次重试都需要一定的退避延迟,这使得从缓存中移除内存变得更加困难。
- AsyncDataCache::makeSpace* 不仅会从内存池中重试分配,还会从文件缓存本身重试分配。在后一种情况下,旧的缓存条目将被移除,以便为新的缓存数据腾出空间。
-
调用 reservationCB 来增加内存池的预留空间,以检查新的分配是否超出查询内存限制。如果超出,我们 将还原步骤 2 中进行的内存使用量更新,并重新抛出从 reservationCB 捕获的查询内存容量超出异常。如果分配来自文件缓存,则 reservationCB 为 null。
-
从每个选定的 SizeClass 对象分配类页面。如果任何一个 SizeClass 分配失败,则整个分配失败。我们释放 成功的 SizeClass 分配,并还原内存池预留(步骤 3)和内存使用量(步骤 2)更新。
-
类页面分配返回设置后备内存所需的机器页面数量。这指的是已分配的类页面,这些页面没有后备内存,并且 SizeClass::pageMapped_ 中的相应位未设置。我们调用 MmapAllocator::ensureEnoughMappedPages 来确 保此新分配中具有后备内存的映射类页面总数不超过系统内存限制。如果超出,我们将调用 MmapAllocator::adviseAway 来移 除已释放类页面的后备内存。如果 MmapAllocator::adviseAway 调用失败,则分配失败,并恢复先前步骤中对此分配所做的所有更改。
-
调用 MmapAllocator::markAllMapped 将所有已分配的类页面设置 为 SizeClass::pageMapped_ 中的映射页面,分配成功。
Contiguous Allocation
virtual bool MemoryAllocator::allocateContiguous(
MachinePageCount numPages,
Allocation* collateral,
ContiguousAllocation& allocation,
ReservationCallback reservationCB = nullptr) = 0;
连续分配定义为一个 ContiguousAllocation 对象,其中包含一个较大的连续缓冲区。它用于非常大的连续 缓冲区分配(>1MB),例如分配哈希表。它的实现非常简单。它调用 std::mmap 直接从操作系统分配一块连续的 物理内存。与非连续分配类似,它需要调用 MmapAllocator::ensureEnoughMappedPages 来确保映射内存空 间的大小在系统内存限制之内。为了释放连续的分配,内存分配器会调用 std::munmap 将物理内存立即归还给操作系统。
Small Allocation
void* MmapAllocator::allocateBytes(
uint64_t bytes,
uint16_t alignment = kMinAlignment) override;
MmapAllocator::allocateBytes 根据实际分配大小(字节)以三种不同的方式分配内存。如果分配大小小于配置
的阈值 (MmapAllocator::Options::maxMallocBytes),MmapAllocator 会将分配委托给 std::malloc。如
果分配大小在类页面大小range (<= 1MB)内,它会将缓冲区分配为来自某个 SizeClass 对象的类页面。否则,它会将缓
冲区分配为一个较大的连续分配。
我们预计使用 MmapAllocator 的查询系统不会有大量的小内存分配。在 Prestissimo 中,只有极少数的小内存分配会 委托给 std::malloc。大型内存状态(例如 RowContainer 和 HashTable)会分配连续或非连续的分配。目前, 我们不限制在 MmapAllocator 中委托给 std::malloc 的内存分配。我们提供了一个选项 (MmapAllocator::Options::smallAllocationReservePct),供查询系统在 MmapAllocator 中预留少量 内存容量,以弥补实际应用中这些临时的小额分配。
Server OOM Prevention
内存分配器确保 Pollux 的所有内存使用量不超过系统内存限制。这对于防止服务器内存耗尽至关重要, 因为我们预计 Pollux 在运行过程中会占用相当一部分服务器内存。例如,Meta 中的 Prestissimo 会为 Pollux 配置 80% 的服务器内存,其余 20% 则用于非 Pollux 组件,例如程序二进制文件、 HTTP 流式 shuffle 和远程存储客户端等。
然而,Pollux 本身的内存容量强制机制不足以防止服务器在非 Pollux 组件内存使用量激增的情况下耗尽 内存。例如,我们发现在 Prestissimo 中,大型 Prestissimo 设置(超过 400 个工作进程)中的 http 流 式 shuffle 会导致内存使用量激增,这很容易导致 Prestissimo 工作进程内存耗尽 (OOM)。在大型集群中,每个工作 进程 (PrestoExchangeSource) 可能会同时从大量源接收流数据。在接近 OOM 时收集的内存配置文件显示,超过 50% 的 非 Pollux 内存是由 http proxygen 分配的。为了防止 http 流式 shuffle 导致服务器内存耗尽,我们在 Prestissimo 流 式 shuffle 中添加了节流控制,以限制一次读取的源数量,从而限制流式 shuffle 的内存使用量。
除了为每个非 Pollux 组件构建特定的节流机制外,我们还在 Meta Prestissimo 中提供了一个通用的服务器内 存回退机制,以便与 Pollux 协作处理来自非 Pollux 组件的内存使用高峰。PeriodicMemoryChecker 在后台运 行,定期检查系统内存使用情况。每当系统内存使用量超过某个阈值时,它会尝试通过缩小文件缓存 (AsyncDataCache::shrink) 来释放 Pollux 的内存,并将释放的缓存内存返回给操作系统。通过这种方式,我们可以自动缩小文件缓存,以应对查询系统中非 Pollux 组 件的瞬时内存使用高峰。