Skip to main content
Version: 1.1.1

如何添加聚合函数?

聚合函数由 HashAggregation 运算符计算。单个运算符中可以有一个或多个聚合函数。 以下是一些示例。

全局聚合(无分组键),单个聚合计数

    SELECT count(*) FROM t

全局聚合,两个聚合:countsum


SELECT count(*), sum(b) FROM t

具有三个聚合的聚合:计数和两个总和


SELECT a, count(*), sum(b), sum(c) FROM t GROUP BY 1

仅使用一个聚合min和两个分组键进行聚合。


SELECT a, b, min(c) FROM t GROUP BY 1, 2

通常,聚合计算分为两个步骤:部分聚合和最终聚合。

部分聚合获取原始数据并生成中间结果。最终聚合获取中间结果并生成最终结果。在某些情况下,还会使用单次聚合和中间聚合。 当数据已根据分组键进行分区,因此无需进行混洗时,使用单次聚合。中间聚合用于合并在多个线程中并行计算的部分聚合结果,以减少发送到最终聚合阶段的数据量。

聚合的四种类型和步骤仅通过输入和输出的类型来区分。

Step TypeInputOutput
PartialRaw DataIntermediate Results
FinalIntermediate ResultsFinal Results
SingleRaw DataFinal Results
IntermediateIntermediate ResultsIntermediate Results

在某些情况下,部分聚合和最终聚合执行的计算是相同的。例如,summinmax 聚合。大多数情况下,它们是不同的。例如,部分 count 聚合对输入值进行计数,而最终 count 聚合将部分计数相加以得出总计。

聚合函数的签名由原始输入数据的类型、 中间结果的类型和最终结果的类型组成。

聚合函数也可以在窗口运算符中使用。以下是在窗口运算符中使用 sum 聚合函数计算运行总计的示例。


SELECT a, sum(b) OVER (PARTITION by c ORDER BY d DESC) FROM t

内存布局

HashAggregation 运算符将数据存储在行中。每行对应一个唯一的分组键值组合。全局聚合将数据存储在一行中。

聚合函数可以根据其累加器的类型分为三类:

  • 固定宽度累加器:
  • countsumavg
  • minmaxarbitrary(适用于固定宽度类型)
  • 具有仅追加语义的可变宽度累加器:
  • array_agg
  • map_agg
  • 可变宽度累加器,可以以任何方式修改,而不仅仅是追加。
  • minmax(适用于字符串)
  • arbitrary(适用于可变宽度类型)
  • approx_percentile
  • approx_distinct

累加器的固定宽度部分存储在行中。可变宽度部分(如果存在)使用 :doc:HashStringAllocator <arena> 分配,并将一个指针存储在固定宽度部分中。

行是一个连续的字节缓冲区。如果任何累加器有对齐要求,则行首地址和累加器地址将相应地对齐。数据按以下顺序存储:

  1. 空标志(每项 1 位)
  2. 键(仅当可空时)
  3. 累加器
  4. 空闲标志(1 位)
  5. 用于对齐的填充
  6. 累加器,仅限固定宽度部分
  7. 可变大小(32 位)
  8. 用于对齐的填充

要添加聚合函数,有两种选择:将其实现为 简单函数或向量函数。简单函数接口允许作者编写每次处理一行输入数据的方法,而无需自行处理输入向量编码。然而,简单函 数接口目前存在一些限制,例如不支持对常量输入进行高级性能优化。需要此类功能的聚合函数可以通过向量函数接口实现。 使用向量函数接口,作者可以编写每次处理一个向量的方法,并自行处理输入向量编码。

  • 准备:
    • 确定输入、中间和最终类型。
    • 确定部分计算和最终计算。
    • 设计累加器。确保同一个累加器可以同时接受原始输入和中间结果。
    • 如果实现的是简单函数,请按照以下说明为该函数创建一个类;如果实现的是向量函数, 请创建一个扩展 pollux::exec::Aggregate 基类的新类(参见 pollux/exec/aggregate.h 文件)并实现虚方法。
  • 使用 exec::registerAggregateFunction(...) 注册新函数。
  • 添加测试。
  • 编写文档。

简单函数接口

本节介绍聚合函数的主要概念和简单接口。通过简单函数接口实现的聚合函数示例可在 tests/exec/simple_average_aggregate.cpptests/exec/simple_array_agg_aggregate.cpp 中找到。

一个简单的聚合函数以类的形式实现,如下所示。


// array_agg(T) -> array(T) -> array(T)
class ArrayAggAggregate {
public:
// Type(s) of input vector(s) wrapped in Row.
using InputType = Row<Generic<T1>>;
using IntermediateType = Array<Generic<T1>>;
using OutputType = Array<Generic<T1>>;

// Optional. Default is true.
static constexpr bool default_null_behavior_ = false;

// Optional.
static bool toIntermediate(
exec::out_type<Array<Generic<T1>>>& out,
exec::optional_arg_type<Generic<T1>> in);

// Optional. Define some function-level variables.
TypePtr inputType_;
TypePtr resultType_;

// Optional. Defined only when the aggregation function needs to use function-level variables.
// This method is called once when the aggregation function is created.
void initialize(
core::AggregationNode::Step step,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType) {
POLLUX_CHECK_EQ(argTypes.size(), 1);
inputType_ = argTypes[0];
resultType_ = resultType;
}

struct AccumulatorType { ... };
};

作者在简单聚合函数类中声明函数的输入类型、中间类型和输出类型。即使函数只接受一个参数, 输入类型也必须是函数的参数类型,并封装在 Row<> 中。这是 SimpleAggregateAdapter 正确解析任意聚合函数的输入类型所必需的。

作者可以选择在简单聚合函数类中定义函数级变量。这些变量在创建聚合函数时初始化一次,并在向累加器添 加输入或从累加器提取值时用于每一行。例如,如果聚合函数需要获取结果类型或聚合函数的原始输入类型,则可以将这 些类型定义为聚合类中的成员变量,并在 initialize() 方法中初始化。

作者可以定义一个可选标志 default_null_behavior_,指示聚合函数是否具有默认空行为。此标志默认为 true。接下来,该类 可以包含一个可选方法 toIntermediate(),用于将聚合函数的原始输入直接转换为其中间状态。最后,作者 必须在聚合函数类中定义一个名为 AccumulatorType 的结构体。我们将在下面详细解释每个部分。

默认空值行为

当向累加器添加原始输入或中间状态时,默认为空行为的聚合函数会忽略为空 的输入值。对于包含多列的原始输入,如果该行至少有一列为空,则整行都会被忽略。以下是示例。


SELECT sum(c0) FROM (values (cast(NULL as bigint), 10), (NULL, 20), (NULL, 30)) AS t(c0, c1); -- NULL

当从累加器生成中间或最终输出结果时,具有默认空值行为的聚合函数会针对无输入行或仅有空 值的行生成空值。下面给出了另一个示例。


SELECT sum(c0) FROM (values (1, 10), (2, 20), (3, 30)) AS t(c0, c1) WHERE c1 > 40; -- NULL

大多数聚合函数都具有默认为空的行为。SimpleAverageAggregate.cpp 中有一个示例。另一方面,simple_array_agg_aggregate.cpp 中 有一个非默认为空的行为示例。此标志会影响 toIntermediate() 的 C++ 函数签名以及 AccumulatorType 结构体中的方法。

升级

作者可以选择定义一个静态方法 toIntermediate(),用于将原始输入转换为中间状态。如果定义了此函数, 则在放弃部分聚合步骤的查询计划中使用此函数。如果聚合函数具有默认为空的行为,则 toIntermediate() 函数会 有一个 exec::out_type<IntermediateType>& 类型的输出参数,后跟 exec::arg_type<T> 类型的输 入参数,用于包装在 InputType 中的每个 T。如果聚合函数具有非默认为空的行为,则 toIntermediate() 的 输入参数将改为 exec::optional_arg_type<T> 类型。

T 是除 Varchar 和 Varbinary 之外的原始类型时,exec::arg_type<T> 就是 T 本身,而 exec::out_type<T> 则是 T&exec::optional_arg_type<T> 则是 std::optional<T>

T 是 Varchar、Varbinary 或复杂类型时,exec::arg_type<T>exec::optional_arg_type<T>exec::out_type<T> 分别是 T 对应的视图类型和写入类型。详细解释请参阅 :doc:view-and-writer-types

Default-Null BehaviorNon-Default-Null Behavior
static bool SimpleAverageAggregate::toIntermediate(<br>exec::out_type<Row<double, int64_t>>& out,<br>exec::arg_type<T> in)static bool SimpleArrayAggAggregate::toIntermediate(<br>exec::out_type<Array<Generic<T1>>>& out,<br>exec::optional_arg_type<Generic<T1>> in)

默认空行为的累加器类型

对于具有默认空行为的聚合函数,作者定义了一个 AccumulatorType 结构体,如下所示。


struct AccumulatorType {
// Author defines data members
...

// Optional. Define a pointer to the UDAF class if the aggregation
// function uses function-level variables.
ArrayAggAggregate* fn_;

// Optional. Default is true.
static constexpr bool is_fixed_size_ = false;

// Optional. Default is false.
static constexpr bool use_external_memory_ = true;

// Optional. Default is false.
static constexpr bool is_aligned_ = true;

explicit AccumulatorType(
HashStringAllocator* allocator, ArrayAggAggregate* fn)
: fn_(fn);

void addInput(HashStringAllocator* allocator, exec::arg_type<T1> value1, ...);

void combine(
HashStringAllocator* allocator,
exec::arg_type<IntermediateType> other);

bool writeIntermediateResult(exec::out_type<IntermediateType>& out);

bool writeFinalResult(exec::out_type<OutputType>& out);

// Optional. Called during destruction.
void destroy(HashStringAllocator* allocator);
};

作者定义了一个可选标志is_fixed_size_,指示每个累加器是否占用固定大小的内存。此标志默认为 true。 接下来,作者定义了另一个可选标志use_external_memory_,指示累加器是否使用 Pollux 未跟踪的内存。 此标志默认为 false。然后,作者可以定义一个可选标志is_aligned_,指示累加器是否需要对齐访问。此标志默认为 false。

作者定义了一个构造函数,该构造函数接受一个参数: HashStringAllocator*。此构造函数在聚合开始之前调用,用于初始化所有累加器。

作者还可以选择定义一个 destroy 函数,该函数在 累加器对象被析构时调用。

请注意,writeIntermediateResultwriteFinalResult 不应修改累加器中的内容。

添加输入

此方法将原始输入值添加到 this 累加器。它接收一个 HashStringAllocator* 后跟 exec::arg_type<T1> 类型的值,每个参数类型 Ti 都包装在 InputType 中。

在默认为空的情况下,在调用 addInput 之前,至少有一列为空的原始输入行将被忽略。调用 addInput 后,this 累加器将被假定为非空。

合并(combine)

此方法将一个输入中间状态添加到 this 累加器。它接收一个 HashStringAllocator* 和一个 exec::arg_type<IntermediateType> 值。在 default-null 行为下,在调用 combine 之前,输入中间状态中的空值将被忽略。调用 combine 之后,this 累加器将被假定为非空。

写入中间结果(writeIntermediateResult)

此方法将累加器写入中间状态向量。它有一个类型为 exec::out_type<IntermediateType>& 的输出参数。如果将 非空值写入 out,则此方法返回 true;否则返回 false,表示应将空值写入中间状态向量。如果累加器为空(即未向其添 加任何值),则在中间状态向量中将自动变为空值,而无需调用 writeIntermediateResult

写入最终结果(writeFinalResult)

此方法将累加器写入最终结果向量。它 具有一个 exec::out_type<OutputType>& 类型的输出参数。如果将非空值写入 out,则此方 法返回 true;否则返回 false,表示应将空值写入最终结果向量。为空(即未向其添加任何值)的累加器在最终结果 向量中将自动变为空值,而无需调用 writeFinalResult

非默认空行为的 AccumulatorType(AccumulatorType of Non-Default-Null Behavior)

对于非默认空行为的聚合函数,作者定义了一个 AccumulatorType 结构体,如下所示。


struct AccumulatorType {
// Author defines data members
...

// Optional. Default is true.
static constexpr bool is_fixed_size_ = false;

// Optional. Default is false.
static constexpr bool use_external_memory_ = true;

// Optional. Default is false.
static constexpr bool is_aligned_ = true;

explicit AccumulatorType(HashStringAllocator* allocator, ArrayAggAggregate* fn);

bool addInput(HashStringAllocator* allocator, exec::optional_arg_type<T1> value1, ...);

bool combine(
HashStringAllocator* allocator,
exec::optional_arg_type<IntermediateType> other);

bool writeIntermediateResult(bool nonNullGroup, exec::out_type<IntermediateType>& out);

bool writeFinalResult(bool nonNullGroup, exec::out_type<OutputType>& out);

// Optional.
void destroy(HashStringAllocator* allocator);
};

is_fixed_size_use_external_memory_is_aligned_、构造函数和 destroy 方法的定义与 default-null 行为完全相同。

另一方面,addInputcombinewriteIntermediateResultwriteFinalResult 的 C++ 函数签名不同。

与 default-null 行为的情况相同,writeIntermediateResultwriteFinalResult 不应修改累加器中的内容。

addInput

此方法接收一个 HashStringAllocator* 值,后跟 exec::optional_arg_type<T1> 值,每个值对应一个封装在 InputType 中的参数类型 Ti

即使某些列可能为空,也会对所有原始输入行调用此方法。 它返回一个布尔值,表示调用后 累加器是否为非空。所有累加器在聚合开始前都会初始化为 null。 原本为空的累加器可以转换为非空。但 原本为非空的累加器无论 addInput 的返回值如何,都将保持非空状态。

combine

此方法接收一个 HashStringAllocator* 和一个 exec::optional_arg_type<IntermediateType> 值。所有中间状态都会 调用此方法,即使某些中间状态为空。与 addInput 相同,此方法返回一个布尔值,表示调用后 累加器是否为非空。

writeIntermediateResult

此方法具有一个类型为 exec::out_type<IntermediateType>& 的输出参数,以及一个布尔标志 nonNullGroup,用于指示累加器是否为非空。如果将非空值写入 out,则此方法返回 true;否则,返回 false,表示应将空值写入中间状态向量。

writeFinalResult

此方法将累加器写入最终结果向量。它有一个类型为 exec::out_type<OutputType>& 的输出参数,以 及一个布尔值标志 nonNullGroup,用于指示累加器是否为非空。如果将非空值写入 out,则此方法返回 true;否则,返回 false,表示应将 空值写入最终结果向量。

Limitations

简单聚合函数接口目前有三个限制。

  1. 聚合函数读取或写入的所有值都必须是累加器的一部分。这意味着函数级状态不能保存在累加器之外。

  2. 不支持对常量输入进行优化。也就是说,常量输入参数每行处理一次,处理方式与非常量输入相同。

  3. 目前尚不支持聚合下推到表扫描。我们计划添加此支持。

Vector Function Interface

不能使用简单函数接口的聚合函数可以写成向量函数。

Accumulator size

pollux::exec::Aggregate 接口的实现可以从 accumulatorFixedWidthSize() 方法开始。


// Returns the fixed number of bytes the accumulator takes on a group
// row. Variable width accumulators will reference the variable
// width part of the state from the fixed part.
virtual int32_t accumulatorFixedWidthSize() const = 0;

如果累加器需要特定的对齐,则需要实现 *accumulatorAlignmentSize()* 方法。


/// Returns the alignment size of the accumulator. Some types such as
/// int128_t require aligned access. This value must be a power of 2.
virtual int32_t accumulatorAlignmentSize() const {
return 1;
}

HashAggregation 运算符在初始化期间使用这些方法来计算行的总大小,并确定不同聚合将存储其数据的偏移量。然后,该运算符为每个聚合调用 pollux::exec::Aggregate::setOffsets 方法来指定累加器的位置。


// Sets the offset and null indicator position of 'this'.
// @param offset Offset in bytes from the start of the row of the accumulator
// @param nullByte Offset in bytes from the start of the row of the null flag
// @param nullMask The specific bit in the nullByte that stores the null flag
// @param initializedByte Offset in bytes from the start of the row of the
// initialized flag
// @param initializedMask The specific bit in the initializedByte that stores
// the initialized flag
// @param rowSizeOffset The offset of a uint32_t row size from the start of
// the row. Only applies to accumulators that store variable size data out of
// line. Fixed length accumulators do not use this. 0 if the row does not have
// a size field.
void setOffsets(
int32_t offset,
int32_t nullByte,
uint8_t nullMask,
int32_t initializedByte,
int8_t initializedMask,
int32_t rowSizeOffset)

基类通过将偏移量存储在成员变量中来实现 setOffsets 方法。


// Byte position of null flag in group row.
int32_t nullByte_;
uint8_t nullMask_;
// Byte position of the initialized flag in group row.
int32_t initializedByte_;
uint8_t initializedMask_;
// Offset of fixed length accumulator state in group row.
int32_t offset_;

// Offset of uint32_t row byte size of row. 0 if there are no
// variable width fields or accumulators on the row. The size is
// capped at 4G and will stay at 4G and not wrap around if growing
// past this. This serves to track the batch size when extracting
// rows. A size in excess of 4G would finish the batch in any case,
// so larger values need not be represented.
int32_t rowSizeOffset_ = 0;

通常,聚合函数不会直接使用偏移量,而是使用基类中的辅助方法。

要访问累加器,请执行以下操作:


template <typename T>
T* value(char* group) const {
return reinterpret_cast<T*>(group + offset_);
}

要操作空标志:


bool isNull(char* group) const;

// Sets null flag for all specified groups to true.
// For any given group, this method can be called at most once.
void setAllNulls(char** groups, melon::Range<const vector_size_t*> indices);

inline bool clearNull(char* group);

Initialization

一旦有了 accumulatorFixedWidthSize(),下一个要实现的方法就是 initializeNewGroupsInternal()。


// Initializes null flags and accumulators for newly encountered groups.
// @param groups Pointers to the start of the new group rows.
// @param indices Indices into 'groups' of the new entries.
virtual void initializeNewGroupsInternal(
char** groups,
melon::Range<const vector_size_t*> indices) = 0;

每次遇到新的分组键组合时,HashAggregation 运算符都会调用此方法。此方法应该初始化新组的累加器。 例如,部分“count”和“sum”聚合会将累加器设置为零。许多聚合函数会通过调用 exec::Aggregate::setAllNulls(groups, indices) 辅助方法将空值标志设置为 true。

GroupBy aggregation

此时,您已实现 accumulatorFixedWidthSize() 和 initialiseNewGroupsInternal() 方法。现在, 我们可以继续实现端到端的分组聚合。我们需要以下部分:

  • 将原始输入添加到累加器的逻辑:
  • addRawInput() 方法。
  • 从累加器生成中间结果的逻辑:
  • extractAccumulators() 方法。
  • 将中间结果添加到累加器的逻辑:
  • addIntermediateResults() 方法。
  • 从累加器生成最终结果的逻辑:
  • extractValues() 方法。
  • 将先前溢出的数据添加回累加器的逻辑:
  • addSingleGroupIntermediateResults() 方法。
  • 将原始输入转换为中间结果的可选逻辑:
  • supportToIntermediate() 和 toIntermediate() 方法。

有些方法仅用于部分聚合工作流。下表显示了哪些方法用于哪些工作流。

MethodPartialFinalSingleIntermediateStreaming
addRawInputYNYNY
extractAccumulatorsYY (used for spilling)Y (used for spilling)YY
addIntermediateResultsNYNYY
extractValuesNYYNY
addSingleGroupIntermediateResultsNYYNN
toIntermediateYNNNN

我们从 addRawInput() 方法开始,该方法接收原始输入向量并将数据添加到累加器。


// Updates the accumulator in 'groups' with the values in 'args'.
// @param groups Pointers to the start of the group rows. These are aligned
// with the 'args', e.g. data in the i-th row of the 'args' goes to the i-th group.
// The groups may repeat if different rows go into the same group.
// @param rows Rows of the 'args' to add to the accumulators. These may not be
// contiguous if the aggregation is configured to drop null grouping keys.
// This would be the case when aggregation is followed by the join on the
// grouping keys.
// @param args Data to add to the accumulators.
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addRawInput(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown = false) = 0;

addRawInput() 方法会使用 DecodedVector 来解码输入数据。然后,循环遍历行来更新累加器。为每个输入向量定义一个 DecodedVector 类型的成员变量是一种很好的做法。这样可以在输入批次之间重用解码输入所需的内存。

实现 addRawInput() 方法后,我们继续添加用于提取中间结果的逻辑。


// Extracts partial results (used for partial and intermediate aggregations).
// This method is expected to not modify contents in accumulators.
// @param groups Pointers to the start of the group rows.
// @param numGroups Number of groups to extract results from.
// @param result The result vector to store the results in.
virtual void
extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result) = 0;

接下来,我们实现接收中间结果并更新累加器的 addIntermediateResults() 方法。


virtual void addIntermediateResults(
char** groups,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown = false) = 0;

接下来,我们实现从累加器中提取最终结果的 extractValues() 方法。


// Extracts final results (used for final and single aggregations). This method
// is expected to not modify contents in accumulators.
// @param groups Pointers to the start of the group rows.
// @param numGroups Number of groups to extract results from.
// @param result The result vector to store the results in.
virtual void
extractValues(char** groups, int32_t numGroups, VectorPtr* result) = 0;

最后,我们实现 addSingleGroupIntermediateResults() 方法,用于将先前溢出的数据添加回累加器。


// Updates the single final accumulator from intermediate results for global
// aggregation.
// @param group Pointer to the start of the group row.
// @param rows Rows of the 'args' to add to the accumulators. These may not
// be contiguous if the aggregation has mask. 'rows' is guaranteed to have at
// least one active row.
// @param args Intermediate results produced by extractAccumulators().
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addSingleGroupIntermediateResults(
char* group,
const SelectivityVector& rows,
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;

最后,我们可以实现可选方法,将原始输入转换为中间结果。如果部分聚合遇到的键大多是唯一的,并且无法有效地降低基数,则运算 符可以决定放弃部分聚合。在这种情况下,运算符首先发出已累积的数据(例如由于内存压力而刷新的情况),然后将每一批新的输入 转换为中间结果并立即发出。默认情况下,为了将原始输入转换为中间结果,运算符会为每个输入行创建一个虚拟组,通过调用 initializeNewGroups初始化这些组,使用addRawInput将每行添加到其自己的组中,然后调用extractAccumulators。 这种方法有效,但效率不高。各个聚合函数可以通过实现toIntermediate()方法来提供更高效的实现。如果它们决定这样做, 它们也应该重写supportsToIntermediate()方法。例如,min和max聚合函数实现了toIntermediate()方法,该方法仅返回未修改的输入。


/// Returns true if toIntermediate() is supported.
virtual bool supportsToIntermediate() const {
return false;
}

/// Produces an accumulator initialized from a single value for each
/// row in 'rows'. The raw arguments of the aggregate are in 'args',
/// which have the same meaning as in addRawInput. The result is
/// placed in 'result'. 'result' is expected to be a writable flat vector of
/// the right type.
///
/// @param rows A set of rows to produce intermediate results for. The
/// 'result' is expected to have rows.size() rows. Invalid rows represent rows
/// that were masked out, these need to have correct intermediate results as
/// well. It is possible that all entries in 'rows' are invalid (masked out).
virtual void toIntermediate(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const {
POLLUX_NYI("toIntermediate not supported");
}

GroupBy 聚合代码路径已完成。我们继续进行全局聚合。

Global aggregation

全局聚合与分组聚合类似,但只有一个组和一个累加器。实现分组聚合后,只需实现 addSingleGroupRawInput() 方 法即可启用全局聚合(addSingleGroupIntermediateResults() 方法已实现,因为它用于溢出分组)。


// Updates the single accumulator used for global aggregation.
// @param group Pointer to the start of the group row.
// @param allRows A contiguous range of row numbers starting from 0.
// @param args Data to add to the accumulators.
// @param mayPushdown True if aggregation can be pushdown down via LazyVector.
// The pushdown can happen only if this flag is true and 'args' is a single
// LazyVector.
virtual void addSingleGroupRawInput(
char* group,
const SelectivityVector& allRows,
const std::vector<VectorPtr>& args,
bool mayPushdown) = 0;

溢出对全局聚合无益,因此不支持。下表显示了不同全局聚合工作流中使用的方法。

MethodPartialFinalSingleIntermediateStreaming
addSingleGroupRawInputYNYNY
extractAccumulatorsYNNYY
addSingleGroupIntermediateResultsNYNYY
extractValuesNYYNY

窗口运算符也使用全局聚合。对于每一行,都会清除全局累加器 (clear + initializeNewGroups),然后添加窗口框架中的所有行 (addSingleGroupRawInput),最后提取结果 (extractValues)。

计算累计总数时,即当窗口框架位于 UNBOUNDED PRECEDING AND CURRENT ROW 之间时,窗口运算符会重复使用累加器进行多行计算,而无需重置 。对于每一行,窗口运算符都会将该行添加到累加器,然后提取结果。聚合函数会发现 addSingleGroupRawInput + extractValues 调用的重复序列,因此需要正确处理这些调用。

Factory function

现在,我们可以编写一个工厂函数来创建新聚合函数的实例,并通过调用 exec::registerAggregateFunction (...) 并指定函数名称和签名来注册它。

HashAggregation 运算符使用此函数创建聚合函数的实例。每个执行线程都会创建一个新实例。当 部分聚合在 5 个线程上运行时,它将使用每个聚合函数的 5 个实例。

工厂函数接受 core::AggregationNode::Step (partial/final/intermediate/single),它指定预期的输入类型、输入类型和结果类型。


exec::AggregateRegistrationResult registerApproxPercentile(const std::string& name) {
std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
...

return exec::registerAggregateFunction(
name,
std::move(signatures),
[name](
core::AggregationNode::Step step,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType) -> std::unique_ptr<exec::Aggregate> {
if (step == core::AggregationNode::Step::kIntermediate) {
return std::make_unique<ApproxPercentileAggregate<double>>(
false, false, VARBINARY());
}

auto hasWeight = argTypes.size() == 3;
TypePtr type = exec::isRawInput(step) ? argTypes[0] : resultType;

switch (type->kind()) {
case TypeKind::BIGINT:
return std::make_unique<ApproxPercentileAggregate<int64_t>>(
hasWeight, resultType);
case TypeKind::DOUBLE:
return std::make_unique<ApproxPercentileAggregate<double>>(
hasWeight, resultType);
...
}
});
}

static bool FB_ANONYMOUS_VARIABLE(g_AggregateFunction) =
registerApproxPercentile(kApproxPercentile);

如果聚合函数是通过简单函数接口实现的,则在创建唯一指针时使用 SimpleAggregateAdapter<FunctionClassName>。以下是示例。


exec::AggregateRegistrationResult registerSimpleArrayAggAggregate(
const std::string& name) {
...

return exec::registerAggregateFunction(
name,
std::move(signatures),
[name](
core::AggregationNode::Step step,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType,
const core::QueryConfig& /*config*/)
-> std::unique_ptr<exec::Aggregate> {
POLLUX_CHECK_EQ(
argTypes.size(), 1, "{} takes at most one argument", name);
return std::make_unique<SimpleAggregateAdapter<SimpleArrayAggAggregate>>(
step, argTypes, resultType);
});
}

使用 FunctionSignatureBuilder 创建 FunctionSignature 实例,该实例描述支持的签名。每个签名包含零个或多个输入类型、一个中间结果类型和最终结果类型。

FunctionSignatureBuilder 和 FunctionSignature 支持类似 Java 的泛型、可变数量的参数和 lambda 表达式。更多信息请参阅 scalar-functions 指南中的 :ref:function-signature 部分。

以下是 approx_percentile 函数的签名示例。该函数接受一个数值类型的值参数、一个可选的 INTEGER 类型的权重参数和一个 DOUBLE 类型的百分 比参数。中间类型与输入类型无关,始终为 VARBINARY。最终结果类型与输入值类型相同。


for (const auto& inputType :
{"tinyint", "smallint", "integer", "bigint", "real", "double"}) {
// (x, double percentage) -> varbinary -> x
signatures.push_back(exec::AggregateFunctionSignatureBuilder()
.returnType(inputType)
.intermediateType("varbinary")
.argumentType(inputType)
.argumentType("double")
.build());

// (x, integer weight, double percentage) -> varbinary -> x
signatures.push_back(exec::AggregateFunctionSignatureBuilder()
.returnType(inputType)
.intermediateType("varbinary")
.argumentType(inputType)
.argumentType("bigint")
.argumentType("double")
.build());
}

Testing

现在是时候将所有部分组合在一起,并测试新函数的运行效果了。

使用 pollux/aggregates/tests/AggregationTestBase.h 中的 AggregationTestBase 作为测试的基类。

如果 DuckDB 支持新的聚合函数,您可以使用 DuckDB 检查结果。在这种情况下, 您需要指定输入数据、分组键、聚合列表以及要在 DuckDB 上运行的 SQL 查询以计算预期结果,并调用 AggregationTestBase 类中定义的辅助函数 testAggregates。 对于全局聚合,分组键可以为空。


// Global aggregation.
testAggregations(vectors, {}, {"sum(c1)"}, "SELECT sum(c1) FROM tmp");

// Group by aggregation.
testAggregations(
vectors, {"c0"}, {"sum(c1)"}, "SELECT c0, sum(c1) FROM tmp GROUP BY 1");

如果 DuckDB 不支持该新功能,您需要手动指定预期结果。


// Global aggregation.
testAggregations(vectors, {}, {"map_union(c1)"}, expectedResult);

// Group by aggregation.
testAggregations(vectors, {"c0"}, {"map_union(c1)"}, expectedResult);

testAggregations 方法在后台生成多个不同但逻辑上等效的计划,执行这些计划,验证是否成功完成,并将结果与​​ DuckDB 或指定的预期结果进行比较。

正在测试以下查询计划。

  • 部分聚合,然后进行最终聚合。查询以单线程运行。
  • 单次聚合。查询以单线程运行。
  • 部分聚合,然后进行中间聚合,然后进行最终聚合。查询以单线程运行。
  • 部分聚合,然后进行分组键的本地交换,然后进行最终聚合。查询使用 4 个线程运行。
  • 使用循环重新分区的本地交换,然后进行部分聚合,然后进行分组键的本地交换,然后进行强制溢出的最终聚合。查询使用 4 个线程运行。

强制溢出查询仅适用于分组聚合,并且 且仅当聚合函数对顺序不敏感时才启用。溢出测试需要多批次输入。 为了将输入数据拆分为多个批次,我们在部分聚合之前添加了本地交换和 循环重新分区。这会改变聚合输入的处理顺序,因此,只有当查询中使用的聚合函数对输入顺序不敏感时,溢出查询的结果才预期与不溢出查询的结果相同。许多 函数无论输入顺序如何都会产生相同的结果,但某些函数可能会在输入顺序不同的情况下返回不同的结果。例如,arbitraryarray_aggmap_aggmap_union 函数对输入顺序敏感, 而 min_bymax_by 函数在存在关联的情况下对输入顺序敏感。

Function names

与标量函数一样,聚合函数名称不区分大小写。函数注册时以及解析给定表达式时,名称会自动转换为小写。

Documentation

最后,通过在 pollux/docs/functions/presto/aggregate.rst 文件中添加一个条目来记录新函数。

您可以在 ../functions/presto/aggregate 查看所有函数的文档。

Accumulator

在 Pollux 中,高效利用内存是首要任务。这包括优化 内存使用总量以及内存分配次数。 请注意,Pollux 报告的运行时统计信息包括每个运算符的峰值内存使用量(以字节为单位)和内存分配次数。

聚合函数使用内存将中间结果存储在累加器中。它们从 arena(:doc:HashStringAllocator <arena> 类)分配内存。

array_agg and ValueList

StlAllocator 是一个兼容 STL 的分配器,由 HashStringAllocator 支持, 可以与 STL 容器一起使用。例如,可以定义一个 std::vector, 从 arena 中分配内存,如下所示:


std::vector<int64_t, StlAllocator<int64_t>>

例如,这在带有固定宽度类型输入(例如整数)的 3 参数版本的 min_bymax_by 中得到使用。

还有一个 AlignedStlAllocator,它提供从 arena 进行对齐分配的功能, 并且可以与需要 16 字节对齐的 F14 <https://engineering.fb.com/2019/04/25/developer-tools/f14/> 容器一起使用。可以定义一个 F14FastMap, 从 arena 分配内存,如下所示:


melon::F14FastMap<
int64_t,
double,
std::hash<int64_t>,
std::equal_to<int64_t>,
AlignedStlAllocator<std::pair<const int64_t, double>, 16>>

您可以在 histogram 聚合函数中找到一个示例用法。

可以使用 std::vector<T> 实现针对原始类型的 array_agg 函数,但效率不高。为什么呢?如果不使用 reserve 方法向 std::vector 提供关于将添加多少个条目的提示,则默认行为是按 2 的幂分配内存,例如,首先分配 1 个条目,然后分配 2 个,然后是 4 个、8 个、16 个等 等。每次进行新的分配时,数据都会复制到新的内存缓冲区中,并释放旧缓冲区。您可以通过检测 StlAllocator::allocate 和 deallocate 方法添加日志记录并运行一个简单的循环来向向量添加元素来查看这一点:


std::vector<double, StlAllocator<double>> data(
0, StlAllocator<double>(allocator_.get()));


for (auto i = 0; i < 100; ++i) {
data.push_back(i);
}


E20230714 14:57:33.717708 975289 HashStringAllocator.h:497] allocate 1
E20230714 14:57:33.734280 975289 HashStringAllocator.h:497] allocate 2
E20230714 14:57:33.734321 975289 HashStringAllocator.h:506] free 1
E20230714 14:57:33.734352 975289 HashStringAllocator.h:497] allocate 4
E20230714 14:57:33.734381 975289 HashStringAllocator.h:506] free 2
E20230714 14:57:33.734416 975289 HashStringAllocator.h:497] allocate 8
E20230714 14:57:33.734445 975289 HashStringAllocator.h:506] free 4
E20230714 14:57:33.734481 975289 HashStringAllocator.h:497] allocate 16
E20230714 14:57:33.734513 975289 HashStringAllocator.h:506] free 8
E20230714 14:57:33.734544 975289 HashStringAllocator.h:497] allocate 32
E20230714 14:57:33.734575 975289 HashStringAllocator.h:506] free 16
E20230714 14:57:33.734606 975289 HashStringAllocator.h:497] allocate 64
E20230714 14:57:33.734637 975289 HashStringAllocator.h:506] free 32
E20230714 14:57:33.734668 975289 HashStringAllocator.h:497] allocate 128
E20230714 14:57:33.734699 975289 HashStringAllocator.h:506] free 64
E20230714 14:57:33.734731 975289 HashStringAllocator.h:506] free 128

重新分配内存和复制数据并不便宜。为了避免这种开销,我们 引入了 ValueList 原语,并用它来实现 array_agg。

ValueList 是一种仅追加的数据结构,允许从任何 Pollux Vector 追加值,并将值读回 Pollux Vector。ValueList 不需 要连续的内存块,因此在空间不足时无需重新分配和复制。它只需分配另一个内存块并开始填充即可。

ValueList 旨在处理来自 Pollux Vector 的数据,因此, 它的 API 与 std::vector 不同。您可以从 DecodedVector 追加值, 并将值读回平面 Vector。以下是使用示例:


DecodedVector decoded(*data);

// Store data.
ValueList values;
for (auto i = 0; i < 100; ++i) {
values.appendValue(decoded, i, allocator());
}


// Read data back.
auto copy = BaseVector::create(DOUBLE(), 100, pool());
aggregate::ValueListReader reader(values);
for (auto i = 0; i < 100; ++i) {
reader.next(*copy, i);
}

ValueList 支持所有类型,因此您可以使用它来附加固定宽度的值以及字符串、数组、映射和结构体。

存储复杂类型时,ValueList 使用 ContainerRowSerde 对值进行序列化。

ValueList 还保留了空值标志,因此您可以在其中存储可空值的列表。

array_agg 是使用 ValueList 实现的累加器。

ValueList 需要一个指向存储区的指针来附加数据。它在构造函数中不接受存储区,也不会存储它,因为那样在聚合运算符中每个组需 要 8 个字节的内存。相反,ValueList::appendValue 方法接受一个指向存储区的指针作为参数。 因此,ValueList 的析构函数无法将内存释放回存储区,需要用户显式调用 free(HashStringAllocator*) 方法。

min, max, and SingleValueAccumulator

minmax 函数将单个值存储在累加器中(当前最小值或最大值)。它们使用 SingleValueAccumulator 存储字符串、数组、 Map 和结构体。处理新值时,我们会将其与存储的值进行比较,并在必要时替换存储的值。

与 ValueList 类似,SingleValueAccumulator 使用 ContainerRowSerde 序列化值。SingleValueAccumulator 提供了一个 compare 方法,用于将存储的值与 DecodedVector 的一行进行比较。

此累加器也用于实现 arbitrary 聚合函数,该函数将第一个值存储在累加器中。

set_agg, set_union, Strings and AddressableNonNullValueList

set_agg 函数将一组唯一值累积到一个 melon::F14FastSet 中,该集合配置为通过 AlignedStlAllocator 从 arena 分配内存。 固定宽度的值直接存储在 melon::F14FastSet 中。F14 数据结构的内存分配模式与 std::vector 类似。F14 以幂 2 的方式分配内存, 复制数据并释放先前分配的内存。因此,我们不会将字符串直接存储在 F14 集合中。相反,Pollux 会将字符串写入 arena,并在集合中存储一个指向该 arena 的 StringView。

通常,写入 arena 时,无法保证写入的连续性。 但是,为了使 StringView 正常工作,我们必须确保写入 arena 的字符串是连续的。Strings 辅助类提供了此功能。它的 append 方法接受一个 StringView 和一个指向 arena 的指针,将字符串复制到 arena 中,并返回一个指向副本的 StringView。


/// Copies the string into contiguous memory allocated via
/// HashStringAllocator. Returns StringView over the copy.
StringView append(StringView value, HashStringAllocator& allocator);

Strings 类提供了一种释放内存并返回给竞技场的免费方法。


/// Frees memory used by the strings. StringViews returned from 'append'
/// become invalid after this call.
void free(HashStringAllocator& allocator);

当聚合复杂类型(数组、映射或结构体)时,我们使用 AddressableNonNullValueList ,它将值写入 arena,并 返回指向写入值的指针,该指针存储在 F14 集合中。AddressableNonNullValueList 提供了计算值哈 希值和比较两个值的方法。AddressableNonNullValueList 使用 ContainerRowSerde 来序列化数据并比较序列化的值。


/// A set of pointers to values stored in AddressableNonNullValueList.
SetAccumulator<
HashStringAllocator::Position,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo>
base;

AddressableNonNullValueList 允许附加一个值并删除最后一个值。 此功能对于 set_agg 和 set_union 来说已经足够了。处理新值时,我们会将其附加到列表中,获取一个指针,并将该指针插入 F14 集 合,如果该“指针”指向重复值,则将其从列表中移除。

与所有其他基于 arena 的累加器一样,AddressableNonNullValueList 提供了一个自由方法将内存返回给 arena。

注意:AddressableNonNullValueList 与 ValueList 的不同之处在于,它提供对单个值的访问(因此名称中带有Addressable前缀) ,而 ValueList 则不提供。使用 ValueList,可以附加值,然后将所有值复制到 Vector 中。ValueList 不支持对单个元素的临时访问。

SetAccumulator<T> 模板实现了一个简单的接口来累积唯一值。它使用 melon::F14FastSetStringsAddressableNonNullValueList 实现。T 可以是固定宽度类型,例如 int32_tint64_tStringViewComplexType

addValue 和 addValues 方法允许从向量中添加一个或多个值。


/// Adds value if new. No-op if the value was added before.
void addValue(
const DecodedVector& decoded,
vector_size_t index,
HashStringAllocator* allocator)/// Adds new values from an array.

void addValues(
const ArrayVector& arrayVector,
vector_size_t index,
const DecodedVector& values,
HashStringAllocator* allocator)

size() 方法返回唯一值的数量。


/// Returns number of unique values including null.
size_t size() const

extractValues 方法允许将唯一值提取到向量中。


/// Copies the unique values and null into the specified vector starting at
/// the specified offset.
vector_size_t extractValues(FlatVector<T>& values, vector_size_t offset)

/// For complex types.
vector_size_t extractValues(BaseVector& values, vector_size_t offset)

set_aggset_union 函数均使用 SetAccumulator 实现。

map_agg, map_union, and MapAccumulator

map_agg 函数将键和值累积到一个 Map 中。它会丢弃重复的键,并为每个唯一键保留一个值。Map_agg 使用 MapAccumulator<T> 模板来累积值。与 SetAccumulator 类似, MapAccumulator 使用 F14FastMapAlignedStlAllocator、Strings 和 AddressableNonNullValueList 构建。

insert() 方法将一对 (key, value) 添加到 Map 中,如果匹配的键已经存在,则丢弃该值。


/// Adds key-value pair if entry with that key doesn't exist yet.
void insert(
const DecodedVector& decodedKeys,
const DecodedVector& decodedValues,
vector_size_t index,
HashStringAllocator& allocator)

size() 方法返回唯一值的数量。

extract() 方法将键和值复制到向量中,这些向量可以组合形成 MapVector。


void extract(
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t offset)

map_aggmap_union 函数均使用 MapAccumulator 实现。

在实现新的聚合函数时,请考虑使用 ValueList、SingleValueAccumulator、Strings、AddressableNonNullValueList 和 F14 容器来构建一个高效利用内存的累加器。

Tracking Memory Usage

聚合运算符需要知道每个组的内存使用量。 例如,当内存紧张时,此信息用于决定溢出多少行以及哪些行。

聚合函数应该使用 RowSizeTracker 来帮助跟踪每个组的内存使用情况。聚合基类提供了辅助方法 trackRowSize(group), 其使用方式如下:


rows.applyToSelected([&](vector_size_t row) {
auto group = groups[row];
auto tracker = trackRowSize(group);
accumulator->append(...);
});

'trackRowSize' 方法返回一个 RowSizeTracker 实例,该实例使用指向竞技场的引用和一个计数器进行 初始化,计数器会在销毁时递增。当 trackRowSize 返回的对象超出范围时,计数器会进行更新,添加自对象创建以来分配的内存。

End-to-End Testing

为了确认聚合函数作为查询的一部分端到端工作,请更新 presto_cpp repo 中 TestHiveAggregationQueries.java 中的 testAggregations() 测试以添加使用新函数的查询。


assertQuery("SELECT orderkey, array_agg(linenumber) FROM lineitem GROUP BY 1");

Overwrite Intermediate Type in Presto

有时,由于实现方式或工作节点接收的类型信息存在差异,我们需要更改 Presto 中聚合函数的中间类型。这在 Presto 类 com.facebook.presto.metadata.BuiltInTypeAndFunctionNamespaceManager 中完 成。启用 FeaturesConfig.isUseAlternativeFunctionSignatures() 后,我们可以注册一组 Pollux 专用的不同函数签名。在 com.facebook.presto.operator.aggregation.AlternativeApproxPercentile 中可以找到如何从头创建此类替代函数签名的示例。示例拉取请求位于 presto