Plan Nodes and Operators
Pollux 查询计划是由 PlanNode 组成的树状结构。每个 PlanNode 都有零个或多个子 PlanNode。为了执行查询计划,Pollux 会将其转换为一组管道。 每个管道由一系列线性运算符组成,这些运算符对应于计划的一个 线性子树。通过将每个具有两个或多个子节点的节点与所有子节点断开连接,将计划树分解为一组线性子树。
计划节点到运算符的转换大多是一对一的。以下情况除外:
- 过滤节点后跟项目节点时,会转换为单个运算符 FilterProject
- 具 有两个或多个子节点的节点会转换为多个运算符,例如 HashJoin 节点会转换为一对运算符:HashProbe 和 HashBuild。
与叶节点对应的运算符称为源运算符。只有一部分计划节点可以位于计划树的叶子节点上。这些节点包括:
- TableScanNode
- ValuesNode
- ExchangeNode
- MergeExchangeNode
- ArrowStreamNode
以下是受支持的计划节点和相应操作符的列表。
| Plan Node | Operator(s) | Leaf Node / Source Operator |
|---|---|---|
| TableScanNode | TableScan | Y |
| ArrowStreamNode | ArrowStream | Y |
| FilterNode | FilterProject | |
| ProjectNode | FilterProject | |
| AggregationNode | HashAggregation or StreamingAggregation | |
| GroupIdNode | GroupId | |
| MarkDistinctNode | MarkDistinct | |
| HashJoinNode | HashProbe and HashBuild | |
| MergeJoinNode | MergeJoin | |
| NestedLoopJoinNode | NestedLoopJoinProbe and NestedLoopJoinBuild | |
| OrderByNode | OrderBy | |
| TopNNode | TopN | |
| LimitNode | Limit | |
| UnnestNode | Unnest | |
| TableWriteNode | TableWrite | |
| TableWriteMergeNode | TableWriteMerge | |
| PartitionedOutputNode | PartitionedOutput | |
| ExchangeNode | Exchange | Y |
| ExpandNode | Expand | |
| MergeExchangeNode | MergeExchange | Y |
| ValuesNode | Values | Y |
| LocalMergeNode | LocalMerge | |
| LocalPartitionNode | LocalPartition and LocalExchange | |
| EnforceSingleRowNode | EnforceSingleRow | |
| AssignUniqueIdNode | AssignUniqueId | |
| WindowNode | Window | |
| RowNumberNode | RowNumber | |
| TopNRowNumberNode | TopNRowNumber |
Plan Nodes
TableScanNode
表扫描操作从 connector </develop/connectors> 读取数据。例如,当与 HiveConnector 一起使用时,表扫描会从 ORC 或 Parquet 文件读取数据。
| Property | Description |
|---|---|
| outputType | 输出列的列表。这是基础表中可用列的子集。列的顺序可能与表的架构不一致。 |
| tableHandle | 特定于连接器的表格描述。可能包含下推式过滤器。 |
| assignments | 从表模式到输出列的连接器特定映射。 |
ArrowStreamNode
Arrow 流操作从 Arrow 数组流中读取数据。ArrowArrayStream 结构在 Arrow ABI 中定义,并提供与 Arrow 数组流源交互所需的回调。
| Property | Description |
|---|---|
| arrowStream | 构造的 Arrow 数组流。这是数据块的流式源,每个数据块具有相同的架构。 |
FilterNode
过滤操作根据布尔过滤表达式从输入数据中剔除一条或多条记录。
| Property | Description |
|---|---|
| filter | 布尔过滤表达式。 |
ProjectNode
投影操作会根据数据集的输入生成一个或多个附加表达式。投影操作还可能删除一个或多个输入列。
| Property | Description |
|---|---|
| names | 输出表达式的列名。 |
| expressions | 输出列的表达式。 |
AggregationNode
聚合操作根据一组分组键对输入数据进行分组,并计算每个分组键组合对应的度量值。此外,还可以对各个度量值的输入进行排序和去重。
| Property | Description |
|---|---|
| step | 聚合步骤:部分、最终、中间、单一。 |
| groupingKeys | 零个或多个分组键。 |
| preGroupedKeys | 已知输入已按分组键预分组的分组键子集,即所有具有给定预分组键值组合的行将依次出现。输入不按预分组键排序。如果输入已按所有分组键预分组,则执行将使用 StreamingAggregation 运算符。 |
| aggregateNames | 度量的输出列的名称。 |
| aggregates | 一个或多个待 计算的度量。每个度量指定一个表达式,例如 count(1)、sum(a)、avg(b);可选的布尔值输入列(用于屏蔽此特定度量的行);可选的输入列列表(用于在计算度量之前进行排序);可选的标志(用于指示在计算度量之前必须对输入进行重复数据删除)。表达式必须采用直接对输入列进行聚合函数调用的形式,例如 sum(c) 可以,但 sum(c + d) 不可以。 |
| ignoreNullKeys | 一个布尔标志,指示聚合是否应删除任何分组键中为空的行。用于避免对分组键上紧接着内连接的聚合进行不必要的处理。 |
| globalGroupingSets | 如果 AggregationNode 位于 GroupIdNode 之上,则某些组可能是全局组,这些组仅具有 GroupId 分组键值。这些值表示全局聚合值。 |
| groupId | GroupId 是 AggregationNode 中由底层 GroupIdNode 生成的 groupId 列的分组键。它必须是 BIGINT 类型。 |
各个度量的属性。
| Property | Description |
|---|---|
| call | 用于计算度量的表达式,例如 count(1)、sum(a)、avg(b)。表达式必须采用直接对输入列调用聚合函数的形式,例如 sum(c) 可以,但 sum(c + d) 不可以。 |
| rawInputTypes | 聚合函数的原始输入类型列表。用于正确识别聚合函数,例如,在中间聚合的情况下,决定是 min(x) 还是 min(x, n)。当聚合步骤为中间或最终时,这些输入类型可能与“call”中指定的输入类型不同。 |
| mask | 可选的布尔值输入列,用于屏蔽此特定度量的行。多个度量可以指定相同的输入列作为掩码。 |
| sortingKeys | 计算度量值之前要排序的输入列的可选列表。如果指定,则必须使用 sortingOrders 来指定每个排序键的排序顺序。 |
| sortingOrders | 每个排序键 的排序顺序列表。 |
| distinct | 布尔标志,指示在计算测量值之前必须对输入进行重复数据删除。 |
请注意,如果度量指定了排序键,HashAggregation 运算符会将所有输入行累加到内存中,然后再对它们进行排序并添加到累加器中。与输入无需排序的情况相比,这需要更多的内存。
同样,如果度量要求对输入进行去重,HashAggregation 运算符会将所有不同的输入行累加到内存中,然后再将它们添加到累加器中。与输入无需去重的情况相比,这需要更多的内存。
此外,许多聚合函数对已排序和未排序的输入都会产生相同的结果,例如::func:min、:func:max、:func:count 和 :func:sum。
查询规划器应避免为此类聚合函数生成需要已排序输入的计划。对输入顺序敏感的聚合函数的一些示例包括:func:array_agg 和 :func:min_by(存在关系时)。
类似地,某些聚合函数对唯一输入和重复输入都会产生相同的结果,例如::func:min 和 :func:max。查询规划器应避免为此类聚合函数生成要求去重输入的计划。
最后,请注意,只有当聚合步骤为“单步”时,才可以对已排序的输入进行度量计算。此类计算不能拆分为部分计算和最终计算。
为了说明 globalGroupingSets 和 groupIdColumn 的必要性,我们分析以下 SQL
SELECT orderkey, sum(total_quantity) FROM orders GROUP BY CUBE (orderkey);
这相当于以下带有 GROUPING SETS 的 SQL
SELECT orderkey, sum(total_quantity) FROM orders GROUP BY GROUPING SETS ((orderkey), ());
该 SQL 给出了每个 orderkey 的 total_quantity 小计以及全局总和(来自空分组集)。
优化器将上述查询规划为基于 GroupId 节点的聚合。
假设订单表有 5 行:
orderkey total_quantity
1 5
2 6
2 7
3 8
4 9
分组集 ((orderkey), ()) 的 GroupId 之后,表有以下 10 行
orderkey total_quantity group_id
1 5 0
2 6 0
2 7 0
3 8 0
4 9 0
null 5 1
null 6 1
null 7 1
null 8 1
null 9 1
随后使用分组键(orderkey、group_id)进行聚合,给出查询的小计
orderkey total_quantity group_id
1 5 0
2 13 0
3 8 0
4 9 0
null 35 1
如果此 GROUP BY CUBE 语句没有输入行,则预期结果为一行,其中包含全局聚合的默认值。对于上述查询,结果将是:
orderkey total_quantity group_id
null null 1
要生成此特殊行,AggregationNode 需要全局分组集的 groupId(在本例中为 1),并返回一行包含聚合默认值的记录。
注意:Presto 允许在单个 SQL 查询中使用多个全局分组集。
SELECT orderkey, sum(total_quantity) FROM orders GROUP BY GROUPING SETS ((), ());
因此,globalGroupingSets 是 groupIds 的向量。
ExpandNode
对于每个输入行,根据指定的“projections”生成 N 行 M 列。 “projections”是一个 N x M 的表达式矩阵:一个 N 行 M 列的向量。 每个表达式可以是列引用或常量。常量可以为空,也可以为非空。 “names”是一个包含 M 个新列名的列表。此运算符的语义与 Spark 匹配。可以使用 project 和 unnest 来实 现扩展功能。但是,在 Project 操作中创建数组构造函数时,性能并不理想。
| Property | Description |
|---|---|
| projections | 一个包含 N 行、每行 M 列的向量。每个表达式要么是列引用,要么是常量。 |
| names | 新列名的列表。 |
ExpandNode 通常用于计算 GROUPING SETS、CUBE、ROLLUP 和 COUNT DISTINCT。
为了说明 ExpandNode 的工作原理,让我们检查以下 SQL 查询:
SELECT l_orderkey, l_partkey, count(l_suppkey) FROM lineitem GROUP BY ROLLUP(l_orderkey, l_partkey);
在规划阶段,Spark 会生成一个 Expand 运算符,其投影列表如下:
[l_suppkey, l_orderkey, l_partkey, 0],
[l_suppkey, l_orderkey, null, 1],
[l_suppkey, null, null, 3]
注意:最后一列用作特殊的组 ID,指示每行所属的分组集。在 Spark 中,此 ID 使用位掩码计算。如果选择了某一列,则该位 值赋值为 0;否则,赋值为 1。因此,第一行的二进制表示为 (000),结果为 0。第二行的二进制表示为 (001),结果为 1。第三行的二进制表示为 (011),结果为 3。
例如,假设输入行如下:
l_suppkey l_orderkey l_partkey
93 1 673
75 2 674
38 3 22
经过 ExpandNode 计算后,每一行都会生成 3 行数据,所以一共有 9 行:
l_suppkey l_orderkey l_partkey grouping_id_0
93 1 673 0
93 1 null 1
93 null null 3
75 2 674 0
75 2 null 1
75 null null 3
38 3 22 0
38 3 null 1
38 null null 3
接下来的聚合运算符将这 9 行按 (l_orderkey、l_partkey、grouping_id_0) 分组,并计算 count(l_suppkey):
l_orderkey l_partkey count(l_suppkey)
1 673 1
null null 3
1 null 1
2 null 1
2 674 1
3 null 1
3 22 1
另一个例子是 COUNT DISTINCT 查询。
SELECT COUNT(DISTINCT l_suppkey), COUNT(DISTINCT l_partkey) FROM lineitem;
在规划阶段,Spark 会生成一个 Expand 运算符,其投影列表如下:
[l_suppkey, null, 1],
[null, l_partkey, 2]
例如,如果输入行是:
l_suppkey l_partkey
93 673
75 674
38 22
经过 ExpandNode 计算后,每一行都会生成 2 行数据,所以一共有 6 行:
l_suppkey l_partkey grouping_id_0
93 null 1
null 673 2
75 null 1
null 674 2
38 null 1
null 22 2
接下来的聚合运算符根据 (l_suppkey, l_partkey, grouping_id_0) 对这些行进行分组并生成:
l_suppkey l_partkey grouping_id_0
93 null 1
75 null 1
38 null 1
null 673 2
null 674 2
null 22 2
接下来的另一个聚合运算符计算全局计数(l_suppkey)和计数(l_partkey),产生最终结果:
COUNT(DISTINCT l_suppkey) COUNT(DISTINCT l_partkey)
3 3
GroupIdNode
为每个指定的分组键集复制输入。用于 实现分组集的聚合。
输出由分组键、聚合输入和组 ID 列组成。组 ID 列的类型为 BIGINT。
| Property | Description |
|---|---|
| groupingSets | 分组键集列表。每个集合中的键必须是唯一的,但键可以在集合之间重复。分组键通过其输出名称指定。 |
| groupingKeyInfos | 输出中分组键列的名称和顺序。 |
| aggregationInputs | 输入要复制的列。 |
| groupIdName | 标识分组集的 group-id 列的名称。该列以零为基数,对应于分组集在“groupingSets”列表中的位置。 |
GroupIdNode 通常用于计算 GROUPING SETS、CUBE 和 ROLLUP。
虽然 GroupingSet 通常不会重复使用相同的分组键列,但在某些情况下可能会出现这种情况。为了说明 GroupingSet 为何会重复使用相同的分组键列,让我们检查以下 SQL 查询:
SELECT count(orderkey), count(DISTINCT orderkey) FROM orders;
在此查询中,用户希望使用同一列计算全局聚合,无论是否使用 DISTINCT 子句。Presto 采用了一种特殊的优化策略:optimize.mixed-distinct-aggregations <https://www.qubole.com/blog/presto-optimizes-aggregations-over-distinct-values>,并使用 GroupIdNode 来计算这些聚合。
首先,优化器创建一个 GroupIdNode 来复制每一行,并将一份副本分配给组 0,另一份副本分配给组 1。这是通过使用 GroupIdNode 和两个分组集来实现的,每个分组集都使用 orderkey 作为分组键。为了消除组间的歧义,orderkey 列被设置为其中一个分组集的分组键。
假设 orders 表有 5 行:
orderkey
1
2
2
3
4
GroupIdNode 会将其转换为:
orderkey orderkey1 group_id
1 null 0
2 null 0
2 null 0
3 null 0
4 null 0
null 1 1
null 2 1
null 2 1
null 3 1
null 4 1
然后,Presto 使用 (orderkey, group_id) 和 count(orderkey1) 进行聚合。
结果如下:
orderkey group_id count(orderkey1) as c
1 0 null
2 0 null
3 0 null
4 0 null
null 1 5
然后,Presto 计划进行第二次聚合,其中没有键,并且 count(orderkey),arbitrarily(c)。 由于这两个聚合都忽略空值,因此可以正确计算不同 orderkey 的数量以及所有 orderkey 的数量。
count(orderkey) arbitrary(c)
4 5
HashJoinNode and MergeJoinNode
连接操作基于连接表达式,将两个独立的输入合并为一个输出。连接的一种常见子类型是等式连接,其中连接表达式被约束为连接的两个输入之间的等式(或等式 + 零等式)条件列表。
HashJoinNode 表示一种实现,它首先将连接右侧的所有行加载到哈希表中,然后流式传输连接左侧,探测哈希表中的匹配行并输出结果。
MergeJoinNode 表示一种实现,它假设两个输入都已按连接键排序,并流式传输连接两侧以查 找匹配的行并输出结果。
表格转换
以下是将给定内容转换为Markdown格式的表格:
| Property | Description |
|---|---|
| joinType | Join type: inner, left, right, full, left semi filter, left semi project, right semi filter, right semi project, anti. You can read about different join types in this blog post. |
| nullAware | 仅适用于反项目和半项目连接。指示连接语义是 IN(nullAware = true)还是 EXISTS(nullAware = false)。 |
| leftKeys | 左侧输入中属于相等条件的列。必须至少指定一个。 |
| rightKeys | 右侧输入中属于相等条件的列。必须至少指定一列。rightKeys 的数量和顺序必须与 leftKeys 的数量和顺序匹配。 |
| filter | 可选的非相等过滤表达式,可以引用来自两个输入的列。 |
| outputType | 输出列的列表。这是连接操作的左输入和右输入中可用列的子集。这些列的显示顺序可能与输入中的顺序不同。 |
NestedLoopJoinNode
NestedLoopJoinNode 表示一种实现,它从连接左侧迭代每一行,并针对每一行从连接右侧迭代所有行,根据连接条件进行比较以查找匹配的行并发出结果。嵌套循环连接支持非等式连接,并按照与探测输入相同的顺序为每个执行线程发出输出行(对于内连接和左外连接)。
| Property | Description |
|---|---|
| joinType | Join type: inner, left, right, full. |
| joinCondition | 用作连接条件的表达式可以引用来自两个输入的列。 |
| outputType | 输出列的列表。这是连接操作的左输入和右输入中可用列的子集。这些列的显示顺序可能与输入中的顺序不同。 |
OrderByNode
排序或排序依据操作会根据一个或多个已识别的排序字段以及排序顺序对数据集进行重新排序。
| Property | Description |
|---|---|
| sortingKeys | 用于排序的一个或多个输入列的列表。排序键必须是唯一的。 |
| sortingOrders | 每个排序键的排序顺序。支持的顺序为:升序空值优先,升序空值最后,降序空值优先,降序空值最后。 |
| isPartial | 布尔值,指示排序操作是否仅处理数据集的一部分。 |
TopNNode
Top-n 操作会根据一个或多个已确定的排序字段以及排序顺序对数据集进行重新排序。Top-n 操作不会对整个数据集进行排序,而是仅维护确保有限输出所需的记录总数。Top-n 操作是逻辑排序和逻辑限制操作的组合。
| Property | Description |
|---|---|
| sortingKeys | 用于排序的一个或多个输入列的列表。不得为空,且不得包含重复项。 |
| sortingOrders | 每个排序键的排序顺序。请参阅 OrderBy 获取支持的排序顺序列表。 |
| count | 返回的最大行数。 |
| isPartial | 布尔值指示操作是否仅处理数据集的一部分。 |
LimitNode
限制操作会跳过指定数量的输入行,然后保留指定数量的行并删除其余行。
| Property | Description |
|---|---|
| offset | 要跳过的输入行数。 |
| count | 返回的最大行数。 |
| isPartial | 布尔值指示操作是否仅处理数据集的一部分。 |
UnnestNode
unnest 操作将数组和映射扩展为单独的列。数组会扩展为单列,映射会扩展为两列(键和值)。可用于扩展多列。在这种情 况下,会生成与基数最高的数组或映射一样多的行(其他列将用 null 填充)。也可以选择生成一个序数列,指定从 1 开始的行号。
| Property | Description |
|---|---|
| replicateVariables | 未修改地返回的输入列。 |
| unnestVariables | 输入要扩展的数组或映射类型的列。 |
| unnestNames | 用于扩展列的名称。每个数组列一个名称。每个映射列两个名称。 |
| ordinalityName | 序数列的可选名称。 |
TableWriteNode
表写入操作会消耗一个输出,并通过
:doc:connector </develop/connectors> 将其写入存储。例如,写入 ORC 或 Parquet 文件。表写入操作会返回一个包含已写入数据元数据的列列表:写入存储的行数、写入器上下文信息、
已写入存储的文件路径以及收集的列统计信息。
| Property | Description |
|---|---|
| columns | 要写入存储的输入列列表。这可能是不同顺序的输入列的子集。 |
| columnNames | 写入存储时使用的列名。这些列名可以与输入列名不同。 |
| aggregationNode | 可选聚合计划节点,用于收集写入存储的数据的列统计信息。 |
| insertTableHandle | 目标表的连接器特定描述。 |
| outputType | 包含已写入存储的数据的元数据的输出列的列表。 |
TableWriteMergeNode
表写入合并操作聚合多个表写入操作的元数据输出并返回聚合结果。
| Property | Description |
|---|---|
| outputType | 包含从多个表写入操作聚合的写入数据的元数据的输出列列表。 |
PartitionedOutputNode
分区输出操作根据零个或多个分布字段重新分配数据。
| Property | Description |
|---|---|
| kind | 指定输出缓冲区类型:kPartitioned、kBroadcast 和 kArbitrary。对于 kPartitioned 类型,行会被分区,每行会被发送到相应的目标分区。对于 kBroadcast 类型,行不会被分区,而是会被发送到所有目标分区。对于 kArbitrary 类型,行不会被分区,每行会被发送到任意一个目标分区。 |
| keys | 用于计算每行分区的零个或多个输入字段。 |
| numPartitions | 将数据拆分成的分区数。 |
| replicateNullsAndAny | 布尔值标志,指示是否应将键为空的行发送到所有分区,以及如果不存在这样的行,是否应将任意选择的一行发送到所有分区。用于提供在单个节点上实现反连接语义所需的全局范围信息。 |
| partitionFunctionFactory | 工厂在计算输入行的分区时使用分区函数。 |
| outputType | 输出列的列表。这是输入列的子集,可能顺序不同。 |
ValuesNode
值操作返回指定的数据。
| Property | Description |
|---|---|
| values | 要返回的行集。 |
| parallelizable | 如果每个线程(每个驱动程序一个)应产生相同的输入。 |
| repeatTimes | 每个向量应作为输入产生多少次。 |
ExchangeNode
以任意顺序合并多个流的接收操作。输入流来自远程交换或 shuffle。
| Property | Description |
|---|---|
| type | 输入流中的列的列表。 |
MergeExchangeNode
合并多个有序流以保持有序性的接收操作。输入流来自远程交换或混洗。
| Property | Description |
|---|---|
| type | 输入流中的列的列表。 |
| sortingKeys | 要按其排序的一个或多个输入列的列表。 |
| sortingOrders | 每个排序键的排序顺序。请参阅 OrderBy 获取支持的排序顺序列表。 |
LocalMergeNode
合并多个有序流以保持其有序性的操作。输入流来自本地交换器。
| Property | Description |
|---|---|
| sortingKeys | 要按其排序的一个或多个输入列的列表。 |
| sortingOrders | 每个排序键的排序顺序。请参阅 OrderBy 获取支持的排序顺序列表。 |
LocalPartitionNode
本地交换操作将输入数据划分为多个流,或将来自多个流的数据合并为一个流。
| Property | Description |
|---|---|
| Type | 交换类型:收集或重新分配。 |
| partitionFunctionFactory | 工厂在计算输入行的分区时使用分区函数。 |
| outputType | 输出列的列表。这是输入列的子集,可能顺序不同。 |
EnforceSingleRowNode
强制单行操作会检查输入是否最多包含一行,并 返回未修改的该行。如果输入为空,则返回一行,所有值都设置为空。如果输入包含多行,则会引发异常。
用于包含不相关子查询的查询。
AssignUniqueIdNode
分配唯一 ID 操作会在输入列的末尾添加一列,每行都有一个唯一值。此唯一值标记每个输出行在该运算符的所有输出行中都是唯一的。
64 位唯一 ID 的构建方式如下:
- 前 24 位 - 任务唯一 ID
- 接下来的 40 位 - 运算符计数器值
添加任务唯一 ID 是为了确保生成的 ID 在分布式查询执行中执行同一查询阶段的所有节点上都是唯一的。
| Property | Description |
|---|---|
| idName | 生成的唯一 ID 列的列名。 |
| taskUniqueId | 一个 24 位整数,用于唯一标识所有节点上的任务 ID。 |
WindowNode
窗口运算符用于评估窗口函数。该运算符在输入列的末尾添加用于窗口函数输出的列。
窗口运算符根据分区列的值将输入数据分组到各个分区中。如果未指定分区列,则所有输入行都被视为位于同一分区中。 在每个分区中,行都按排序列的值排序。 窗口函数会按照此顺序逐行计算。 如果未指定排序列,则 结果的顺序未指定。
| Property | Description |
|---|---|
| partitionKeys | 按窗口函数的列进行分区。 |
| sortingKeys | 按窗口函数的列排序。 |
| sortingOrders | 上述每个排序键的排序顺序。支持的排序顺序为:asc nulls first、asc nulls last、desc nulls first 和 desc nulls last。 |
| windowColumnNames | 输出下面 windowFunctions 列表中每个窗口函数调用的列名。 |
| windowFunctions | 带有 frame 子句的窗口函数调用。例如,row_number()、first_value(name) 介于范围 10 的上一行和当前行之间。默认 frame 介于范围无界的上一行和当前行之间。 |
| inputsSorted | 如果为 true,Window 算子会假定输入按分区键进行聚类,并按排序键按排序顺序进行排序。在这种情况下,算子会在收到数据后立即拆分窗口分区并开始处理。如果为 false,Window 算子会先累积所有输入,然后对数据进行排序,并根据定义的条件拆分窗口分区,最后按顺序处理每个窗口分区。 |
RowNumberNode
WindowNode 的优化版本,包含单个 row_number 函数、 可选的限制,且不包含排序。
使用指定的分区键对输入进行分区,并为每个分区分配从 1 开始的行号。该运算符以流式模式运行。对于 每一批输入,它都会计算并返回结果,然后再接受下一批输入。
该运算符会累积状态:一个哈希表,将分区键映射到此分区中迄今为止已查看的总行数。将行号作为输出中的一列返回是可选的。该运算符支持溢出。
该运算符相当于一个 WindowNode,后跟 FilterNode(row_number <= limit),但它占用的内存和 CPU 更少,并且在查看所有输入之前即可获得结果。
| Property | Description |
|---|---|
| partitionKeys | 按列分区。 |
| rowNumberColumnName | 可选的行号输出列名。如果指定,则生成的行号将作为输出列返回,该输出列位于所有输入列之后。 |
| limit | 可选的每个分区限制。如果指定,则此节点生成的任何给定分区的行数都不会超过此值。多余的行将被丢弃。 |
TopNRowNumberNode
WindowNode 的优化版本,带有单个 row_number 函数,并对已排序的分区设置了限制。
使用指定的分区键对输入进行分区,并为每个分区维护最多“限制”行数。接收所有输入后, 从 1 开始为每个分区分配行号。
此运算符累积状态:一个哈希表,将分区键映射到该分区内“限制”行数的列表。将行号作为输出中的一列返回是可选的。此运算符也支持溢出。
此运算符逻辑上等同于 WindowNode 后跟 FilterNode(row_number <= limit),但它占用的内存和 CPU 更少。
| Property | Description |
|---|---|
| partitionKeys | 按列分区。 |
| rowNumberColumnName | 可选的行号输出列名。如果指定,则生成的行号将作为输出列返回,该输出列位于所有输入列之后。 |
| limit | 可选的每个分区限制。如果指定,则此节点生成的任何给定分区的行数都不会超过此值。多余的行将被丢弃。 |
MarkDistinctNode
MarkDistinct 运算符用于生成针对不同值进行聚合的聚合掩码列,例如 agg(DISTINCT a)。 掩码是一个布尔值列,对于一组输入行子集,设置为 true,这些子集共同表示一组“distinctKeys”的唯一值。
| Property | Description |
|---|---|
| markerName | 输出掩码列的名称。 |
| distinctKeys | 分组键的名称。 |
Examples
Join
带有连接的查询计划包含一个 HashJoinNode。此类计划会被转换为两个管道:构建管道和探测管道。构建管道处理来自连接构建端 的输入,并使用 HashBuild 运算符构建哈希表。探测管道处理来自连接探测端的输入,探测哈希表并生成符合连接条件的行。构建管 道通过一种名为 JoinBridge 的特殊机制将哈希表提供给探测管道。 JoinBridge 就像一个 Future,其中 HashBuild 运算符以 HashTable 作为结果完成 Future,而 HashProbe 运算符在 Future 完成时接收该 HashTable。
每个管道可以以不同的并行度运行。在下面的示例中,探测管道在 2 个线程上运行,而构建管道在 3 个线程上运行。当构建管道以多 线程运行时,每个管道都会处理一部分构建端输入。最后完成处理的管道负责合并来自其他管道的哈希表,并将最终表发布到 JoinBridge。 当右外连接的探测管道以多线程运 行时,最后完成处理的管道负责从构建端发出不符合连接条件的行。
Local Exchange
局部交换操作有多种用途。它可用于将数据处理的并行性从多线程转换为单线程,或反之。例如,局部交换可用于排序操作,其中部分 排序在多线程中运行,然后在单线程上合并结果。局部交换操作也可用于合并多个流水线的结果。例如,合并 UNION 或 UNION ALL 的多个输入。
以下是一些示例。
N 对 1 局部交换可用于合并部分排序的结果,以便最终进行合并排序。
在必须单线程运行的操作之后,进行一对多本地交换以增加并行度。
本地交换用于合并来自多个管道的数据,例如 UNION ALL。