聚合(Aggregations)
本文探讨 Pollux 中与聚合相关的优化。我们将介绍不同的技术,并提供示例并定义每种技术的应用条件。
Pollux 支持使用零个、一个或多个分组键以及零个、一个或多个聚合函数进行部分聚合和最终聚合。
文档中的 :doc:聚合函数 <../functions/presto/aggregate> 部分列出了所有可用的聚合函数,而 :doc:如何添加聚合函数?<aggregate-functions> 指南则解释了如何添加更多聚合函数。
使用 :ref:AggregationNode<AggregationNode> 将聚合插入查询计划。指定聚合步骤(部分、中间、最终或单个)、
分组键和聚合函数。您还可以指定布尔列来
屏蔽部分或所有聚合的行,以及请求对已排序或不同的输入进行聚合计算。分组键必须引用输入列,并且不能包含表达式。要计算表达式的聚合,请在 AggregationNode 之前添加 ProjectNode。
以下是聚合查询计划的示例:
使用单个分组键和单个聚合函数进行分组:
SELECT a, sum(b) FROM t GROUP BY 1
- AggregationNode:groupingKeys =
{a},aggregates ={sum(b)}
使用单个分组键和应用于表达式的聚合函数进行分组:
SELECT a, sum(b * c) FROM t GROUP BY 1
- 聚合节点:groupingKeys =
{a},aggregates ={sum(d)} - 项目节点:a, d := b * c
使用多个分组键和多个聚合进行分组:
SELECT a, b, sum(c), avg(c) FROM t GROUP BY 1, 2
- AggregationNode:groupingKeys =
{a, b},aggregates ={sum(c), avg(c)}
对不同输入进行分组:
SELECT a, count(DISTINCT c) FROM t GROUP BY 1
- AggregationNode:
groupingKeys = {a}, aggregates = {count(distinct c)}
对已排序的输入进行分组:
SELECT a, array_agg(b ORDER BY b) FROM t GROUP BY 1
- AggregationNode:groupingKeys =
{a},aggregates ={array_agg(b ORDER BY b ASC NULLS LAST)}
Distinct 聚合:
SELECT DISTINCT a, b FROM t
- AggregationNode:groupingKeys =
{a, b},aggregates ={}
使用掩码进行聚合:
SELECT a, sum (b) FILTER (WHERE c > 10) FROM t GROUP BY 1
- 聚合节点:groupingKeys =
{a},aggregates ={sum(b, mask: d)} - 项目节点:a, b, d := c > 10
全局聚合:
SELECT sum(a), avg(b) FROM t
- AggregationNode:groupingKeys =
{},聚合 ={sum(a), avg(b)}
HashAggregation and StreamingAggregation Operators
AggregationNode 会被转换为 HashAggregation 算子来执行。 不同的聚合操作(例如没有聚合操作的聚合操作)以流式模式运行。对于每一批输入行,该算子会确定一组新的分组键值并将其作为结果返回。具有 一个或多个聚合函数的聚合操作需要先处理所有输入,然后才能生成结果。
AggregationNode 可能指示输入已根据分组键的子集进行了预分组。如果输入已根据所有分组键进行了预分组,则计划节点将使用 StreamingAggregation 算子执行。在这种情况下,无需在生成结果之前将所有输入累积到内存中。StreamingAggregation 算子每次只累积少量分 组,因此比 HashAggregation 算子占用的内存少得多。
对于输入按分组键的严格子集进行预分组的情况, HashAggregation 包含一项优化,即每当遇到预分组键中值不同的行时,它都会刷新分组。这有助于减少内存使用总量,并允许更快地解除下游运算符的阻塞。
Push-Down into Table Scan
HashAggregation 运算符支持将聚合下推到表扫描中。 当满足以下所有条件时,即可启用下推:
- 聚合函数接受单个参数;
- 该参数是直接从表中读取的列,无需任何转换;
- 该列未在查询中的其他任何地方使用。
例如,在以下查询中可以进行下推:
SELECT a, sum(b) FROM t GROUP BY 1
如果使用聚合函数输入列以外的列来过滤数据,也可以实现下推。例如,在以下查询中启用了下推:
SELECT a, sum(b)
FROM t
WHERE a > 100
GROUP BY 1
在这些查询中,TableScan 运算符将“b”列生成为 LazyVector,而“sum”聚合函数使用 ValueHook 加载此向量,例如,每个值 都从文件中读取,并直接传递给“sum”聚合函数,后者将其添加到累加器中。在这种情况下,不会生成任何中间向量。
以下聚合函数支持下推::func:sum、:func:min、
:func:max、:func:bitwise_and_agg、:func:bitwise_or_agg、:func:bool_and、
:func:bool_or。
Adaptive Array-Based Aggregation
HashAggregation 运算符将聚合数据存储在行中。每行对应一个唯一的分组键值组合。全局聚合将数据存储在一行中。有关详细信息,请参 阅 如何添加聚合函数指南中的“内存布局”部分。
数据行被组织成一个哈希表,该哈希表可以采用哈希、数组或 规范化键模式。
Hash mode
在哈希模式下,传入行的处理包含以下步骤:
- 计算分组键的哈希值;
- 使用该哈希值在哈希表中查找一个或多个可能匹配的条目;
- 比较分组键以确定单个匹配条目或确定不存在此类条目;
- 如果不存在匹配条目,则插入新条目;
- 更新现有条目或新创建条目的累加器。
Array mode
在数组模式下,有一个指向数据行的指针数组。传入行的分组键值会映射到一个整数,该整数用作数组的索引。没有匹配分组键的条目将存储 nullptr。
考虑对以下数据执行 SELECT a, sum(b) FROM t GROUP BY 1 查询:
== ==
a b
== ==
1 10
7 12
1 4
4 128
10 -29
7 3
== ==
有一个分组键 a,其值取自一个较小的整数范围: [1, 10]。在数组模式下,哈希表会分配一个大小为 10 的数组,并使用一个简单的公式将分组键值映射到数组中的索引:index = a - 1。
最初,数组填充为空值:[null, null, … null]。随着行的处理,条目会被填充。
============================================ ========================================================= After adding the first row 10: [10, null, null, null, null, null, null, null, null, null] After adding the second row 12: [10, null, null, null, null, null, 12, null, null, null] After adding the third row 4: [14, null, null, null, null, null, 12, null, null, null] After adding the 4th row 128 [14, null, null, 128, null, null, 12, null, null, null] After adding the 5th row -29: [14, null, null, null, null, null, 12, null, null, -29] After adding the last row 3: [14, null, null, null, null, null, 15, null, null, -29] ============================================ =========================================================
与哈希模式相比,数组模式非常高效,因为它不需要计算哈希值,也不需要将传入的分组键与哈希表条目进行比较。与可用于任何聚合的哈希 模式不同,数组模式仅适用于分组键的值可以映射到相对较小的整数范围的情况。例如,当只有一个整数类型的分组键,并且最小值和最 大值 之间的差异相对较小时,就属于这种情况。在这种情况下,映射公式很简单:“索引 = 值 - 最小值”。
当分组键有两个或多个,且它们的值范围的倍数仍然较小时,数组模式也适用。例如,GROUP BY a, b
其中“a”个值的范围为 [10, 50],而“b”个值的范围为 [1000, 1050]
允许使用数组模式,数组大小等于 40 * 50 = 200,映射公式为:索引 = (a - 10) + (b - 1000) * 40。
此外,当分组键的唯一值数量较少时,数组模式也适用。在这种情况下,可以为每个唯一值分配一个从 1 开始的序数(0 保留为空值),该序数可以用作数组的索引。
数组模式也适用于值范围较小且唯一值数量较少的分组键组合,只要值范围大小与唯一值数量的乘积不超过数组模式允许的最大值。
数组模式支持最多 2M 个条目的数组。
数组模式通常适用于对布尔类型的键进行分组,因为这些键只有 3 个可能的值:null、false 和 true。这些值分别映射到 0、1 和 2。
对于长度不超过 7 个字节的短字符串分组键,会通过填充前导零并在字符串字节前的第一位填充 1 来映射到 64 位整数,例
如 00...01<string bytes>。如果结果数字的取值范围较小,或者唯一值的数量较少,则使用数组模式。
否则,结果数字可以用于规范化键模式。
用于表示分组键值的整数值称为值 ID。
Normalized Key Mode
在规范化键模式下,多个分组键值会映射到单个 64 位整数,并且处理过程与哈希模式相同,使用单个 64 位整数分组键。此模式的 效率低于数组模式,但比哈希模式更 高,因为对单个 64 位整数值进行哈希处理和比较比对多个值进行哈希处理和比较更快。
Adaptivity
哈希表模式自适应地决定,首先从数组模式开始,然后根据分组键的新值需要切换到 规范化键或哈希模式。切换模式时,哈希表需要重新组织。一旦进入哈希模式,哈希表将保持该模式,直到查询处理的剩余时间。
对于每个分组键,HashAggregation 运算符都会创建一个 VectorHasher 实例来分析和累积有关该键的统计信息。VectorHasher 存储键的最小值和最大值。如果范围过大,VectorHasher 将切换到跟踪唯一值集合。如果唯一值的数量超过 10 万,VectorHasher 将停止跟踪这些唯一值,并且哈希表将切换到规范化键或哈希模式。
数组和规范化键模式仅支持以下类型的分组键:boolean、tinyint、smallint、integer、bigint 和 varchar。
Adaptive Disabling of Partial Aggregation
有时,部分聚合会遇到大部分唯一键,无法有效降低数据的基数。在这种情况下,更高效的方法是跳过部分聚合,直接 对数据进行混洗并计算最终聚合结果。主要的节省来自于无需对输入进行哈希处理、构建和探测哈希表。
HashAggregation 运算符包含自动检测并禁用无效部分聚合的逻辑。此逻辑由两个配置属性控制:
- abandon_partial_aggregation_min_pct - 继续进行部分聚合的唯一行的最大百分比 。默认值:80%。
- abandon_partial_aggregation_min_rows - 决定放弃部分聚合之前接收的最小行数。默认值:100,000。
在收到至少 abandon_partial_aggregation_min_rows 个输入行后,操作符会检查唯一行的百分比,例如,比较组数与输入行数。 如果唯一行的百分比超过了 abandon_partial_aggregation_min_pct,操作符将放弃部分聚合。
无法简单地停止聚合输入并将其原样传递给 shuffle 和最终聚合,因为最终聚合所需的数据类型与原始输入类型不同。例如,avg 的部分
聚合可能接收 INTEGER 类型的输入,但 avg 的最终聚合需要 ROW(sum DOUBLE, count BIGINT) 类型的输入。
哈希聚合操作符需要将原始输入的每一行转换为单行中间结果。例如,对于 avg 函数,它需要
将每个整数值 n 转换为 {n, 1} 结构体。它通过
创建“假”组(每个输入行一个)并使用聚合函数 API
将每行添加到其自己的累加器中,然后提取中间结果来实现。
这有助于避免对输入进行哈希处理和构建哈希表的 CPU 成本,
也有助于减少内存使用。然而,此过程仍然会产生分配累加器、向累加器添加值以及提取结果的成本。
各个聚合函数可以实现可选的 Aggregate::toIntermediate() API,该 API 允许 HashAggregation 运算符 高效地将原始输入转换为中间结果,而无需使用 累加器。
/// Returns true if toIntermediate() is supported.
virtual bool supportsToIntermediate() const {
return false;
}
/// Produces an accumulator initialized from a single value for each
/// row in 'rows'. The raw arguments of the aggregate are in 'args',
/// which have the same meaning as in addRawInput. The result is
/// placed in 'result'. 'result' is expected to be a writable flat
/// vector of the right type.
///
/// @param rows A set of rows to produce intermediate results for. The
/// 'result' is expected to have rows.size() rows. Invalid rows represent rows
/// that were masked out, these need to have correct intermediate results as
/// well. It is possible that all entries in 'rows' are invalid (masked out).
virtual void toIntermediate(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const {
POLLUX_NYI("toIntermediate not supported");
}
许多聚合函数都实现了 toIntermediate() 快速路径。例如:
min、max、array_agg、set_agg、map_agg 和 map_union。
可以使用运行时统计信息 abandonedPartialAggregation 来判断部分聚合是否被放弃。