What's in the Task?
一些运算符需要与其他运算符协调工作。
- 并行运行的表扫描运算符需要从共享队列中提取分片。
- HashProbe 运算符需要等待 HashBuild 运算符创建哈希表。
- 并行构建哈希表不同部分的多个 HashBuild 运算符需要协调以将这些部分组合成最终的哈希表。
- 并行运行的交换运算符需要从共享的远程数据源中提取数据。
- 本地交换运算符需要将数据写入和读取到共享数据分区。
任务通过维护共享状态来实现这些运算符之间的协调。
Pipelines and Drivers
任务负责将查询计划转换为一组由多个堆叠在一起的操作符组成的管道。
有关计划节点和操作符之间的映射,请参阅:doc:计划节点和操作符 <operators>。每
个具有多个源的计划节点都会引入一个或多个新的管道。这些计划节点包括 HashJoinNode、NestedLoopJoinNode、
MergeJoinNode、LocalMergeNode 和 LocalPartitionNode。连接节点有两个源,并会引入一条终止于右侧源的新管道。
LocalMergeNode 和 LocalPartitionNode 为每个源引入一条管道。
LocalPartitionNode 是个例外,因为它可能只有一个源,在这种情况下,它仍然会引入一条新的管道,以允许前后不同的并行度。每个管道可以多线程运
行,并且每个管道使用的线程数可以不同。
驱动程序是指单个管道的单个执行线程。
让我们看几个例子。
包含 HashJoinNode 的查询计划会被转换为两个管道。一个管道包含一个 HashProbe 运算符。另一个管道包含一个 HashBuild 运算符 。HashBuild 运算符是管道中的最后一个运算符。处理 HashBuild 管道的线程数可以与处理 HashProbe 管道的线程数不同。
LocalPartitionNode 用于 (1) 对数据进行重新分区;(2) 将多个线程的数据集中到一个线程中;(3) 拆分单个线程生成的数 据,以实现多线程处理。带有 LocalPartitionNode 的查询计划会被转换为两个管道:一个以 LocalPartition 运算符结尾 的生产者管道,以及一个以 LocalExchange 开头的消费者管道。这些管道可以相同或不同的并行度运行,从而实现 N:1、1:N 和 N:M 本地交换。
LocalMergeNode 用于合并多个已排序的输入,同时保持排序性。一个包含两个源的 LocalMergeNode 查询计划将被转换 为三个管道:每个源节点一个生产者管道和一个消费者管道。生产者管道以 CallbackSink 运算符结束。消费者管道以 LocalMerge 运算符开始。带有 LocalMerge 运算符的管道始终以单线程运行。消费者管道生成已排序的输出,并且通常也以单线程运行。
总而言之,一个查询计划会被转换为多个管道,每个管道使用多个驱动程序执行。给定的计划节点会被转换为一个或多个 运算符。某些计划节点会被转换为多个不同类型的运算符。例如,TableScanNode 会被转换为多个 TableScan 运算符, 每个执行线程(驱动程序)对应一个;而 HashJoinNode 会被转换为多个 HashProbe 运算符和多个 HashBuild 运算符。 HashProbe 和 HashBuild 运算符的数量可能匹配,也可能不匹配。
任务会创建驱动程序和运算符,然后将运算符的所有权转移给相应的驱动程序。每个驱动程序都会被分配一个管道 ID 和一个驱动 程序 ID。管道 ID 是从零开始的序列号。驱动程序 ID 也是从零开始的序列号,作用域为管道。驱动程序 ID 在管道内是唯一的,但 在管道之间重复。每个运算符都会获得相应计划节点的计划节点 ID,以及一个指定管道 ID 和驱动程序 ID 的 DriverCtx。具有相同流水线 ID 的同类型算子被称为对等算子。这些算子在不同的驱动程序中并发运行。每个算子都会被分配一个从零开始的顺序算子 ID,该 ID 在驱动 程序中是唯一的。
Splits
Task 负责从 应用程序接收分片、存储这些分片,并提供对操作符的访问。
只能为叶子计划节点添加分片。与叶子计划节点对应的操作符会获取并处理分片。这些操作符被称为源操作符,包括 TableScan、Exchange 和 MergeExchange。
Task::addSplit(planNodeId, split) API 允许应用程序为特定计划节点添加分片。应用程序多次调用 addSplit() API 即可为多个计 划节点添加多个分片。Task::noMoreSplits() API 允许应用程序发出已添加所有分片的信号。一旦处理完所有分片并收到“不再添加分片”的信号,Task 便会完成。
Task 将分片存储在队列中:每个计划节点 ID 对应一个队列。
Task::getSplitOrFuture(planNodeId) API 允许操作符从相应的队列中获取分片进行处理。如果有可用的分片,此 API 将返回该分片;如果队列 为空,但尚未收到“不再分片”信号,则返回 Future。当收到分片或“不再分片”消息时,Future 完成。
Join Bridges and Barriers
HashProbe 算子需要等待相应的 HashBuild 算子创建哈希表。对于每个 HashJoinNode 和 NestedLoopJoinNode,Task 会创 建一个桥接对象,可以是 HashJoinBridge 或 NestedLoopJoinBridge。HashProbe 和 HashBuild 算子使用 Task::getHashJoinBridge() API 来访问共享桥接。HashBuild 算子将哈希表添加到桥接中。HashProbe 算子从桥接中获取哈希表。同样,NestedLoopJoinProbe 和 NestedLoopJoinBuild 算子使用 Task::getNestedLoopJoinBridge() API 来访问共享桥接,以便将构建端的数据传递到探测端。
并行运行的 HashBuilder 算子需要相互协调,等待所有算子处理完输入后,将结果合并到单个哈希表中。当一个算子完成处 理后,它会调用 Task::allPeersFinished() API,该 API 对除最后一个算子之外的所有算子都返回 false。最后一 个算子收到 true,并负责组装哈希表,并将其通过桥发送到探测端。
Task::allPeersFinished() API 使用 BarrierState 结构来维护状态,以了解哪个算子是最后一个算子。
同样,并行运行的 NestedLoopJoinBuild 算子使用 Task::allPeersFinished() API 选择一个算子来合并结果,并通过桥发送到探测端。
此外,对于 RIGHT 和 FULL OUTER 连接,HashProbe 算子使用 Task::allPeersFinished() API 选择一个算子,该算子 会在所有探测行处理完毕后,发出构建端没有匹配的行。
Exchange Clients
Exchange 操作符旨在从多个远程工作器获取单个分区的数据。获取数据的具体机制并未包含在 Pollux 库中,预计由应 用程序提供。应用程序实现了一个 ExchangeSource 和一个 ExchangeSource::Factory。工厂函数接收一个用于标识 远程工作器的字符串、一个分区号和一个 ExchangeQueue 实例,并返回一个 ExchangeSource 实例,该实例可以从远程工作 器获取分区数据并将其放入提供的队列中。
ExchangeClient 负责创建 ExchangeSource 并维护传入数据的队列。多个 Exchange 操作符从一个共享的 ExchangeClient 中 提取数据,每个操作符接收一部分数据。
一个 Exchange 操作符(driverId == 0)负责从任务中获取分片,并使用分片中找到的任务 ID 信息初始化共享的 ExchangeClient。
Task 负责创建和存储共享的 ExchangeClient 实例,每 个 ExchangeNode 对应一个。Task 在创 建 Exchange 操作符时,会将引用传递给相应 ExchangeNode 的共享 ExchangeClient。
下图显示了第一个 Exchange 操作符 (driverId == 0) 从 Task 中获取分片并初始化 ExchangeClient,然 后所有 Exchange 操作符都从共享的 ExchangeClient 中获取数据。对于每个分片,ExchangeClient 都会创建 一个 ExchangeSource,该 ExchangeSource 从单个上游任务中拉取单个分区(图中的 #15)的数据。所有 ExchangeSource 都从 不同的上游任务中拉取同一分区的数据。ExchangeClient 中的队列用于累积来自所有上游任务的 #15 分区的数据,并将其分配给 多个 Exchange 操作符,从而实现并行处理。
MergeExchange 运算符与 Exchange 运算符类似,但它从多个工作器接收已排序的数据,并且必须合并数据 以保持排序状态。此运算符必须单线程运行,因此不需要在任务中维护任何共享状态。
Local Exchange Queues
本地交换器用于更改管道之间的并行度或重新分区数据。交换器由安装在生产管道末端的 LocalPartition 算子、安装在消 费管道源头的 LocalExchange 以及一个或多个用于累积数据的 LocalExchangeQueue 组成。 LocalExchangeQueue 由多个 LocalPartition 和 LocalExchangeQueue 算子共享,因此必须存储在 Task 中。
对于每个 LocalPartitionNode,Task 会创建多个 LocalExchangeQueue,其数量与运行消费管道的线程数 相同。LocalPartition 和 LocalExchangeQueue 算子使用 Task::getLocalExchangeQueue() API 来访问共享队列以读取和写 入数据。LocalPartition 算子对数据进行分区并将其放入 LocalExchangeQueue 中。LocalExchange 从单个 LocalExchangeQueue 中读取数据。
Local Merge Sources
LocalMerge 运算符与 LocalExchange 类似,但它从多个线程接收已排序的数据,并且必须合并数据以保持排序状态。此运算符必须单线程运行。它使用 Task::getLocalMergeSources() API 来访问生产管道正在写入的 LocalMergeSource 实例。对于每个 LocalMergeNode,Task 会创建 多个 LocalMergeSource 实例,以匹配生产管道的并行度。
Merge Join Sources
MergeJoin 操作符需要一种接收右侧数据的方法。它依赖于 Task 来创建一个 MergeJoinSource,并在右侧管道的末端安装 一个 CallbackSink 操作符,以便将数据写入 MergeJoinSource。MergeJoin 操作符使用 Task::getMergeJoinSource() API 来访问 MergeJoinSource() 并读取右侧数据。
Extensibility
Pollux 允许应用程序定义自定义计划节点和运算符,以及 自定义连接节点及其自身的连接桥和运算符。这些运算符可以 访问分片并使用屏障,因为这些结构是通用的,并且不依赖于 任何特定的计划节点或运算符。Exchange 客户端、本地交换队列 以及本地合并源和合并连接源并非通用的, 因此不适用于自定义运算符。
Summary
任务负责维护共享状态并实现算子之间的协调。共享状态包括:
- 分片
- 连接桥和屏障
- 交换客户端
- 本地交换队列
- 本地合并源
- 合并连接源
每个共享状态都与特定的计划节点相关联。算子在访问共享状态时使用计划节点 ID。自定义算子可以访问分片和屏障。其他共享状态对自定义算子不可用。