Joins
Pollux 支持使用分区或广播分布策略的内部连接、左连接、右连接、完全外部连接、左半过滤连接、左半投影连接、右半过滤连接、右半 投影连接和反哈希连接。半投影连接和反哈希连接支持额外的空值感知标志,以区分 IN(空值感知)和 EXISTS(常规)语义。Pollux 还支持交叉连接。
对于连接输入按连接键排序的情况,Pollux 还支持内部合并连接和左合并连接。目前尚不支持右连接、完全连接、左半连接、右半连接和反哈希连接。
Hash Join Implementation
使用 HashJoinNode 计划节点将连接插入查询计划。指定连接类型、等值子句(例如,左侧和右侧的值需要匹配的列对),以 及应用于连接结果的可选过滤器。
连接类型可以是 kInner、kLeft、kRight、kFull、kLeftSemiFilter、kLeftSemiProject、kRightSemiFilter、kRightSemiProject 或 kAnti 之一。
kLeftSemiProject、kRightSemiProject 和 kAnti 连接支持额外的 nullAware 标志,用于区分 IN(空感知)和 EXISTS(常规)语义。
过滤器是可选的。如果指定,它可以是连接结果的任何表达式。此表达式将使用与 FilterProject 运算符和 HiveConnector 相同的表达式求值引擎进行求值。
带过滤器的内连接相当于连接运算符后跟 FilterProject 运算符,但带过滤器的左连接则不同。左连接返回左侧的所有行以及构建端通过 过滤器的所有匹配项。结果还包括构建端没有匹配项或两个匹配项均未通过过滤器的行。
为了说明带过滤器的左连接和带过滤器的左连接之间的区别,请考虑以下数据。
Left side:
== =====
id value
== =====
1 10
2 20
3 30
4 40
== =====
Right side:
== ====
id name
== ====
2 'a'
2 'b'
3 'c'
3 'd'
3 'e'
4 'f'
== ====
left.id = right.id 左连接的结果是:
==== ======= ======
l.id l.value r.name
==== ======= ======
1 10 null
2 20 'a'
2 20 'b'
3 30 'c'
3 30 'd'
3 30 'e'
4 40 'f'
==== ======= ======
左连接 left.id = right.id 并使用过滤器 right.name IN ('a', 'f') 的结果为:
==== ======= ======
l.id l.value r.name
==== ======= ======
1 10 null
2 20 'a'
3 30 null
4 40 'f'
==== ======= ======
将此结果与左连接(left.id = right.id)的结果进行比较,然后过滤 right.name IN ('a', 'f'):
==== ======= ======
l.id l.value r.name
==== ======= ======
2 20 'a'
4 40 'f'
==== ======= ======
为了执行包含连接操作的计划,Pollux 创建了两个独立的管道。一个管道处理构建端数据并创建哈希表。另一个管道处理探测端数据,
通过探测哈希表将其与构建端数据连接起来,并按照下游计划节点的指定继续执行。HashJoinNode 被转换为两个独立的运算
符:HashProbe 和 HashBuild。HashProbe 运算符成为探测端管道的一部分。HashBuild 运算符作为构建端管道的最后一个
运算符安装。HashBuild 运算符的输出是一个哈希表 <hash-table.html>,HashProbe 运算符可以通过一种特殊的机制
访问该哈希表:JoinBridge。
HashBuild 和 HashAggregation 运算符都使用相同的哈希表数据结构:pollux::exec::HashTable <hash-table.html>。有效负载,即非连接键列,
被称为依赖列,按行存储在 RowContainer 中。
在连接和聚合中使用哈希表可以进行未来的优化, 如果聚合后进行连接,并且聚合和连接的键相同,则可以重用哈希表。我们预计在不久的将来实现此优化,但目前尚未实现。
在处理输入和构建哈希表时,HashBuild 运算符会分析连接键的值,以确定这些键是否可以进行规范化(例如,可以将多个键合并为 一个 64 位整数键)或映射到一个小的整数域以允许基于数组的查找。此机制在 pollux::exec::VectorHasher 中实现,并在聚合和连接之间共享。
构建和探测端流水线可以多线程运行,并且可以独立指定它们的并行度。HashBuild 运算符具有额外的逻辑来支持哈希表的并行构建,其中最后完成构建的运 算符负责将其与所有其他哈希表合并,然后再将哈希表通过 JoinBridge 提供。
Dynamic Filter Pushdown
在某些查询中,连接操作与探测端表扫描在同一阶段运行。 如果连接构建端(当其足够小时)被广播到所有连接节点,或者查询使用逐桶执行模式,就会发生这种情况。 在这些情况下,通常只有一小部分连接键与探测端匹配,因此在表扫描期间过滤掉探测行是有益的。这种优化称为动态过滤或动态过滤器下推。
Pollux 通过利用 VectorHashers 来实现此优化,这些 VectorHashers 在构建端包含关于连接键值的完整信息。对于每个不同值 较少的连接键,系统会使用存储在相应 VectorHashers 中的不同值集合构建一个列表内过滤 器。这些过滤器随后被推送到 TableScan 运 算符,并进入 HiveConnector,HiveConnector 使用它们来 (1) 根据统计数据修剪文件和行组,以及 (2) 在读取数据时过滤掉行。
值得注意的是,最大的优势来自于使用动态过滤器在表扫描期间修剪整个文件和行组。
只有当连接键列来自 并置 TableScan 运算符且未经修改时,动态过滤器下推才可用。HashProbe 会询问驱动程序哪些列支持过滤器下推(例 如,哪些列来自接受动态过滤器的运算符且未经修改),并仅为这些列生成过滤器。
如果连接只有一个连接键且没有依赖列,并且构建端的所有连接键值都是唯一的,则可以用下推的过滤器完全替换连接。Pollux 会检测到这种情况,并在下推过滤器后将连接转换为无操作。
动态过滤器下推优化已针对内部连接、左半连接、右半连接和右连接启用。
Broadcast Join
广播连接是指一种特定的分布式执行策略,其中构建端足够小,可以将其复制(广播)到所有连接节点,从而避免重新调整探测端 和构建端,从而将所有具有相同键的行安排在同一台机器上。连接是使用广播策略还是分区策略执行对连接执行本身没有影响。唯一 的区别是广播执行允许动态过滤器下推,而分区执行则不允许。
PartitionedOutput 运算符和 OutputBufferManager 支持广播计划评估的结果。此功能可通过将 PartitionedOutputNode 中 的布尔标志“broadcast”设置为 true 来启用。
Anti Joins
空感知反连接用于带有 NOT IN <subquery> 子句的查询,而常规反连接用于带有 NOT EXISTS <subquery> 子句的查询。
-- null-aware anti join
SELECT * FROM t WHERE t.key NOT IN (SELECT key FROM u)
-- regular anti join
SELECT * FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE u.key = t.key)
广义上讲,反连接会返回在探测端没有匹配项的行。
然而,确切的语义有点棘手。这些在 :doc:反连接 <../develop/anti-join> 中有详细描述。
从高层次来看,没有额外过滤器的空感知反连接的行为如下:
-
如果构建端包含任何连接键为空的条目,则返回空数据集;
-
当构建端为空时,返回探测端的所有行,包括连接键为空的行;
-
当构建端不为空时,返回探测端连接键非空且构建端无匹配项的行。
当查询在多台机器上运行时,情况 (1) 和 (2) 很难在本地识别(除非连接以广播模式运行),因为它们需要了解整 个构建端。需要知道所有节点的组合构建端是否为空,如果不是,则是否包含空键。为了在本地提供此信息,PartitionedOutput 运算符支持 一种模式,该模式将分区键中所有包含空值的行复制到所有目标,如果没有包含空键的行,则将任意选择的一行复制到所有目标。此模式可通过在 PartitionedOutputNode 计划节点中将“replicateNullsAndAny”标志设置为 true 来启用。
PartitionedOutput 运算符的 Replicate-nulls-and-any 函数确保所有节点都接收连接键中包含空值的行,因此可以实现 (1) 中描述的语 义。它还确保只有当整个构建端为空时,本地构建端才为空,从而实现语义(2)。将一行包含非空键的数据发送到多个“错 误”的目的地是安全的,因为该行 不可能与这些目的地上的任何内容匹配。
Semi Joins
半过滤连接适用于包含 IN <subquery> 和 EXISTS <subquery> 子句的查询。当外部查询的基数大于子查询的基数
时,应使用左半过滤连接。当子查询的基数更大时,可以使用右半连接。
SELECT * FROM t WHERE t.key IN (SELECT key FROM u)
SELECT * FROM t WHERE EXISTS (SELECT * FROM u WHERE u.key = t.key)
左半过滤连接返回探测端至少有一个匹配项的行。右半过滤连接返回构建端至少有一个匹配项的行。
半项目连接用于 IN <subquery> 或 EXISTS <subquery> 表达式与其他表达式组合的查询。
SELECT * FROM t WHERE t.key IN (SELECT key FROM u) OR t.col > 10
SELECT * FROM t WHERE t.key NOT IN (...) OR t.key2 NOT IN (...)
左半项目连接返回所有探测侧行,并带有一个额外的布尔值列(匹配),用于指示构建侧是否存在匹配项。右半项目连接返回所有构建侧行,并带有一个“匹配” 列,用于指示探测侧是否存在匹配项。
半项目连接支持空值感知标志,以区分 IN 和 EXISTS 语义。如果无法明确判断是否存在匹配项,则支持空值感知的半项目连接可能会返回 NULL 值。例如,只要构建侧不为空,左半项目连接在没有额外过滤器的情况下,对于连接键为 NULL 的探测侧行, 就会返回 NULL。
反连接的结果等同于半项目连接后跟“不匹配”过滤器的结果。
Empty Build Side
对于内连接、左半 连接和右半连接,当构建端为空时, Pollux 会实施优化,提前完成连接并返回一个空的结果集,而无需等待接收所有探测端的输入。在这种情况下, 所有上游运算符都会被取消,以避免不必要的计算。
Skipping Duplicate Keys
在为左半连接或反连接构建哈希表时,HashBuild 运算符会跳过 包含重复键的条目,因为这些条目是不需要的。这是通过 配置 exec::HashTable 并将“allowDuplicates”标志设置为 false 来实现 的。如果构建端包含重复的连接键,此优化可以减少哈希表的内存占用。
Execution Statistics
如果每个连接键的值不太大,并且允许基于数组的连接或使用规范化键,则 HashBuild 运算符会报告每个连接键的不同值的范围和数量。
rangeKey<N>- 连接键的值范围 #NdistinctKey<N>- 连接键的不同值的数量 #N
HashProbe 运算符会报告它是否已将自身完全替换为下推的过滤器,并变为无操作。
- switchedWithDynamicFilterRows - 过滤器下推后未经任何处理而直接传递的行数。
HashProbe 还会报告其为下推生成的动态过滤器数量。
-
dynamicFiltersProduced - 生成的动态过滤器数量(每个连接键最多一个)
-
maxSpillLevel - 初始溢出触发的最大溢出级别(0)。
TableScan 运算符会报告其接收并传递给 HiveConnector 的动态过滤器数量。
- dynamicFiltersAccepted - 已接收的动态过滤器数量
Memory Layout
在哈希表中,我们将行值保存在“RowContainer”中。这是一个按行存储的存储结构,每行包含以下组件:
- 空值标志(每项 1 位)
- 键(仅当可空时)
- 依赖项
- 已探测标志(1 位)
- 空闲标志(1 位)
- 键
- 依赖项
- 变量大小(32 位)
- 下一个偏移量(64 位指针)
Merge Join Implementation
使用 MergeJoinNode 计划节点将合并连接插入查询计划。确保 连接的左侧和右侧都生成按连接键排序的结果。 指定连接类型、等值子句(例如,左侧和右侧的值需要匹配的列对)以及用于连接结果的可选过滤器。
为了执行包含合并连接的计划,Pollux 创建了两个独立的管道。一个管道处理右侧数据并将其放入 MergeJoinSource。另一 个管道处理左侧数据,将其与右侧数据连接,并按照下游计划节点的指定继续执行。 MergeJoinNode 被转换为 MergeJoin 运算符和由 MergeJoinSource 支持的 CallbackSink。MergeJoin 运算符成为 左侧管道的一部分。CallbackSink 安装在右侧管道的末尾。
Usage Examples
查看 tests/exec/HashJoinTest.cpp 和 MergeJoinTest.cpp 以获取有关如何使用哈希连接或合并连接构建和执行计划的示例。