Skip to main content
Version: 1.1.1

用户指南

本页介绍如何使用 Acero。建议您先阅读 概述并熟悉基本概念。

使用 Acero

Acero 的基本工作流程如下:

  1. 首先,创建一个描述计划的“Declaration”对象图

  2. 调用其中一个DeclarationToXyz 方法执行声明。

  3. 从声明图创建一个新的 ExecPlan。每个 Declaration 将对应计划中的一个 ExecNode。 此外,将添加一个接收器节点,具体取决于使用了哪个 DeclarationToXyz 方法。

  4. 执行 ExecPlan。通常,这是作为 DeclarationToXyz 调用的一部分发生的,但在DeclarationToReader 中,读取器在计划执行完成之前返回。

  5. 计划完成后,它将被销毁

创建计划

Using Substrait

Substrait 是创建计划(Declaration 图)的首选机制。原因如下:

  • Substrait 生产者花费大量时间和精力创建用户友好的 API,以便以简单的方式生成复杂的执行计划。例如,可以使用一系列复杂的 aggregate 节点来 实现 pivot_wider 操作。生产者将为您提供更简单的 API,而不是手动创建所有这些 aggregate 节点。
  • 如果您正在使用 Substrait,那么如果您在某个时候发现它比 Acero 更能满足您的需求,您可以轻松切换到任何其他使用 Substrait 的引擎。
  • 我们希望最终会出现基于 Substrait 的优化器和规划器的工具。通过使用 Substrait,您将来可以更轻松地使用这些工具。

您可以自己创建 Substrait 计划,但找到现有的 Substrait 生产者可能会更容易。例如, 您可以使用 ibis-substrait 轻松地从 Python 表达式创建 Substrait 计划。有几种 不同的工具能够从 SQL 创建 Substrait 计划。 最终,我们希望基于 C++ 的 Substrait 生成器能够出现。 但是,我们目前还不知道有任何这样的生成器。 有关从 Substrait 创建执行计划的详细说明, 可以在Substrait 页面<acero-substrait>中找到

程序化计划制定

通过编程方式创建执行计划比从 Substrait 创建计划更简单,但会失去一些灵活性和面向未来的保证。创建声明的 最简单方法是简单地实例化一个。您将需要声明的名称、输入向量和选项对象。例如:

/// \brief An example showing a project node
///
/// Scan-Project-Table
/// This example shows how a Scan operation can be used to load the data
/// into the execution plan, how a project operation can be applied on the
/// data stream and how the output is collected into a table
turbo::Status ScanProjectSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// projection
cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
options->projection = cp::project({}, {});

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};
ac::Declaration project{
"project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};

return ExecutePlanAndCollectAsTable(std::move(project));
}

上述代码创建了一个扫描声明(没有输入)和一个项目声明(使用扫描作为输入)。这很简单,但我们可以让它稍微简单一 些。如果您要创建一个线性声明序列(如上例所示),那么您也可以使用Declaration::Sequence函数。

  // Inputs do not have to be passed to the project node when using Sequence
ac::Declaration plan =
ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
{"project", ac::ProjectNodeOptions({a_times_2})}});

本文档后面还会提供更多有关程序化计划创建的示例。

执行计划

有多种不同的方法可用于执行 声明。每种方法提供的数据形式略有不同。 由于所有这些方法都以DeclarationTo...开头,因此本指南 通常将这些方法称为DeclarationToXyz方法。

DeclarationToTable

DeclarationToTable 方法会将所有结果累积到单个 alkaid::Table 中。这也许是从 Acero 收集结果的最简单方法。这种方法的主要缺点是它需要将所有结果累积到内存中。

info

Acero 以小块的形式处理大型数据集。开发人员指南中对此进行了更详细的描述。因此,您可能会惊讶地发现使用 StatementToTable 收集的表的分块方式与您的输入 不同。例如,您的输入可能是一个大型表,其中单个块包含 200 万行。然后,您的输出表可能有 64 个块,每个块有 32Ki 行。

DeclarationToReader

DeclarationToReader 方法允许您迭代地使用 结果。它将创建一个 alkaid::RecordBatchReader,您可以在闲暇时从中读取。如果您没有足够快地从读取器读取,则将施加背压,执行计划将暂停。 关闭读取器将取消正在运行的执行计划,读取器的析构函数将等待执行计划完成它正在执行的任何操作,因此它可能会阻塞。

DeclarationToStatus

如果您想要运行该计划但实际上并不想使用结果,则 DeclarationToStatus 方法很有用。例如,在进行基准测试或计划具 有副作用(例如数据集写入节点)时,这很有用。如果计划生成任何结果,则这些结果将被立即丢弃。

Running a Plan Directly

如果出于某种原因,DeclarationToXyz 方法之一不够用,那么可以直接运行计划。只有在执行某些独特 操作时才需要这样做。例如,如果您创建了自定义接收器节点,或者您需要具有多个输出的计划。

info

在学术文献和许多现有系统中,普遍假设执行计划最多只有一个输出。Acero 中的某 些方法(例如 StatementToXyz 方法)会期望这一点。但是,设计中没有任何内容严格阻止有多个接收器节点。

有关如何执行此操作的详细说明超出了本指南的范围 但大致步骤如下:

  1. 创建一个新的 ExecPlan 对象。

  2. 将接收器节点添加到 Declaration 对象图(这是 创建接收器节点声明所需的唯一类型)

  3. 使用 Declaration::AddToPlan 将声明添加到计划 (如果您有多个输出,则您将无法使用 此方法,并且需要一次添加一个节点)

  4. 使用 ExecPlan::Validate 验证计划

  5. 使用 ExecPlan::StartProducing 启动计划

  6. 等待 ExecPlan::finished 返回的未来完成。

提供输入

执行计划的输入数据可以来自各种来源。它通常是从存储在某种文件系统上的文件中读取的。输入通常来自内存数据。例 如,在类似 pandas 的前端中,内存数据很常见。输入也可以来自网络流,如航班请求。Acero 可以支持所有这些情况,甚 至可以支持此处未提及的独特和自定义情况。

有预定义的源节点涵盖最常见的输入场景。这些列在下面。但是,如果您的源数据是唯一的,那么您将需要使用通用的节点。 此节点希望您提供异步批处理流,并在此处 <stream_execution_source_docs>中有更详细的介绍。

可用的 ExecNode 实现


下表快速总结了可用的运算符。

源节点 -- Sources Nodes

这些节点可以作为数据源

Factory NameOptionsBrief Description
sourceSourceNodeOptionsA generic source node that wraps an asynchronous stream of data (example <stream_execution_source_docs>)
table_sourceTableSourceNodeOptionsGenerates data from analkaid::Table (example <stream_execution_table_source_docs>)
record_batch_sourceRecordBatchSourceNodeOptionsGenerates data from an iterator of alkaid::RecordBatch
record_batch_reader_sourceRecordBatchReaderSourceNodeOptionsGenerates data from analkaid::RecordBatchReader
exec_batch_sourceExecBatchSourceNodeOptionsGenerates data from an iterator of alkaid::compute::ExecBatch
array_vector_sourceArrayVectorSourceNodeOptionsGenerates data from an iterator of vectors of alkaid::Array
scanalkaid::dataset::ScanNodeOptionsGenerates data from an alkaid::dataset::Dataset (requires the datasets module)(example <stream_execution_scan_docs>)

计算节点 -- Compute Nodes

这些节点对数据进行计算,并可能转换或重塑数据

Factory NameOptionsBrief Description
filterFilterNodeOptionsRemoves rows that do not match a given filter expression(example <stream_execution_filter_docs>)
projectProjectNodeOptionsCreates new columns by evaluating compute expressions. Can also drop and reorder columns(example <stream_execution_project_docs>)
aggregateAggregateNodeOptionsCalculates summary statistics across the entire input stream or on groups of data(example <stream_execution_aggregate_docs>)
pivot_longerPivotLongerNodeOptionsReshapes data by converting some columns into additional rows

排列节点 -- Arrangement Nodes

这些节点对数据流进行重新排序、组合或切片

Factory NameOptionsBrief Description
hash_joinHashJoinNodeOptionsJoins two inputs based on common columns (:ref:example <stream_execution_hashjoin_docs>)
asofjoinAsofJoinNodeOptionsJoins multiple inputs to the first input based on a common ordered column (often time)
unionN/AMerges two inputs with identical schemas (:ref:example <stream_execution_union_docs>)
order_byOrderByNodeOptionsReorders a stream
fetchFetchNodeOptionsSlices a range of rows from a stream

汇聚节点 -- Sink Nodes

这些节点终止计划。用户通常不会创建接收节点,因为它们是根据用于消费计划的DeclarationToXyz方法选择的。但 是,对于那些开发新接收节点或以高级方式使用Acero的人来说,此列表可能很有用。

Factory NameOptionsBrief Description
sinkSinkNodeOptionsCollects batches into a FIFO queue with optional backpressure
writealkaid::dataset::WriteNodeOptionsWrites batches to a filesystem (:ref:example <stream_execution_write_docs>)
consuming_sinkConsumingSinkNodeOptionsConsumes batches using a user provided callback function
table_sinkTableSinkNodeOptionsCollects batches into an :class:alkaid::Table
order_by_sinkOrderBySinkNodeOptionsDeprecated
select_k_sinkSelectKSinkNodeOptionsDeprecated

示例

本文档的其余部分包含示例执行计划。每个示例 都重点介绍了特定执行节点的行为。

source

source 操作可视为创建流式执行计划的入口点。SourceNodeOptions 用于创建 source 操作。source 操作是目前 最通用、最灵活的源类型,但配置起来可能相当棘手。首先,您应该查看其他源节点类型,以确保没有更简单的选择。

源节点需要某种可以调用来轮询更多数据的函数。此函数不应带任何参数,并且应返回 alkaid::Future<std::optional<alkaid::ExecBatch>>。此 函数可能正在读取文件、遍历内存结构或从网络连接接收数据。alkaid 库将这些函数称为 alkaid::AsyncGenerator,并且有许多实用程序可用于使用这些 函数。对于此示例,我们使用已存储在内存中的记录批次向量。此外,必须预先知道数据的模式。Acero 必须在开始任何处理之前知道执行图每个阶段的数据模式。 这意味着我们必须为源节点提供与数据本身分开的模式。在这里,我们定义一个结构来保存数据生成器定义。这包括内存批次、模式和用作数据生成器的函数:

struct BatchesWithSchema {
std::vector<cp::ExecBatch> batches;
std::shared_ptr<alkaid::Schema> schema;
// This method uses internal alkaid utilities to
// convert a vector of record batches to an AsyncGenerator of optional batches
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
auto opt_batches = ::alkaid::internal::MapVector(
[](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
batches);
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
gen = alkaid::MakeVectorGenerator(std::move(opt_batches));
return gen;
}
};

生成用于计算的样本批次:

alkaid::Result<BatchesWithSchema> MakeBasicBatches() {
BatchesWithSchema out;
auto field_vector = {alkaid::field("a", alkaid::int32()),
alkaid::field("b", alkaid::boolean())};
TURBO_MOVE_OR_RAISE(auto b1_int, GetArrayDataSample<alkaid::Int32Type>({0, 4}));
TURBO_MOVE_OR_RAISE(auto b2_int, GetArrayDataSample<alkaid::Int32Type>({5, 6, 7}));
TURBO_MOVE_OR_RAISE(auto b3_int, GetArrayDataSample<alkaid::Int32Type>({8, 9, 10}));

TURBO_MOVE_OR_RAISE(auto b1_bool,
GetArrayDataSample<alkaid::BooleanType>({false, true}));
TURBO_MOVE_OR_RAISE(auto b2_bool,
GetArrayDataSample<alkaid::BooleanType>({true, false, true}));
TURBO_MOVE_OR_RAISE(auto b3_bool,
GetArrayDataSample<alkaid::BooleanType>({false, true, false}));

TURBO_MOVE_OR_RAISE(auto b1,
GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
TURBO_MOVE_OR_RAISE(auto b2,
GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
TURBO_MOVE_OR_RAISE(auto b3,
GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));

out.batches = {b1, b2, b3};
out.schema = alkaid::schema(field_vector);
return out;
}

使用 source 的示例(sink 的用法在 sink<stream_execution_sink_docs> 中有详细说明):

/// \brief An example demonstrating a source and sink node
///
/// Source-Table Example
/// This example shows how a custom source can be used
/// in an execution plan. This includes source node using pregenerated
/// data and collecting it into a table.
///
/// This sort of custom source is often not needed. In most cases you can
/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
/// exec_batch_source, or record_batch_source (for in-memory data)
turbo::Status SourceSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};

return ExecutePlanAndCollectAsTable(std::move(source));
}

table_source

在上一个示例源节点 <stream_execution_source_docs>中,源节点用于输入数据。但是在开发应用程序时,如果 数据已经作为表存储在内存中,则使用TableSourceNodeOptions会更加容易,性能也更高。在这里, 输入数据可以作为std::shared_ptr<alkaid::Table>以及max_batch_size传递。max_batch_size用于拆分大型记录批次,以便 可以并行处理它们。需要注意的是,当源表的批次较小时,表批次不会合并形成更大的批次。

使用table_source的示例

/// \brief An example showing a table source node
///
/// TableSource-Table Example
/// This example shows how a table_source can be used
/// in an execution plan. This includes a table source node
/// receiving data from a table. This plan simply collects the
/// data back into a table but nodes could be added that modify
/// or transform the data as well (as is shown in later examples)
turbo::Status TableSourceSinkExample() {
TURBO_MOVE_OR_RAISE(auto table, GetTable());

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
int max_batch_size = 2;
auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};

ac::Declaration source{"table_source", std::move(table_source_options)};

return ExecutePlanAndCollectAsTable(std::move(source));
}

filter

filter 操作,顾名思义,提供了一个定义数据过滤条件的选项。它选择给定表达式计算结果为 true 的 行。可以使用 alkaid::compute::Expression 编写过滤器,并且表达式的返回类型应为布尔值。例如, 如果我们希望保留列 b 的值大于 3 的行,那么我们可以使用以下表达式。

过滤器示例:

/// \brief An example showing a filter node
///
/// Source-Filter-Table
/// This example shows how a filter can be used in an execution plan,
/// to filter data from a source. The output from the execution plan
/// is collected into a table.
turbo::Status ScanFilterSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// specify the filter. This filter removes all rows where the
// value of the "a" column is greater than 3.
cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
// set filter for scanner : on-disk / push-down filtering.
// This step can be skipped if you are not reading from disk.
options->filter = filter_expr;
// empty projection
options->projection = cp::project({}, {});

// construct the scan node
std::cout << "Initialized Scanning Options" << std::endl;

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};
std::cout << "Scan node options created" << std::endl;

ac::Declaration scan{"scan", std::move(scan_node_options)};

// pipe the scan node into the filter node
// Need to set the filter in scan node options and filter node options.
// At scan node it is used for on-disk / push-down filtering.
// At filter node it is used for in-memory filtering.
ac::Declaration filter{
"filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};

return ExecutePlanAndCollectAsTable(std::move(filter));
}

project

project 操作会重新排列、删除、转换和创建 列。每个输出列都是通过针对源记录批次评估表达式来计算的。这些必须是标量表达式 (由标量文字、字段引用和标量 函数组成的表达式,即元素函数,为每个输入行返回一个值,而与所有其他行的值无关)。这通过 ProjectNodeOptions 公开,它要求每个输出列都有一个 alkaid::compute::Expression 和名称(如果没有提供名称,则将使用 exprs 的字符串表示形式)。

project示例:

/// \brief An example showing a project node
///
/// Scan-Project-Table
/// This example shows how a Scan operation can be used to load the data
/// into the execution plan, how a project operation can be applied on the
/// data stream and how the output is collected into a table
turbo::Status ScanProjectSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// projection
cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
options->projection = cp::project({}, {});

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};
ac::Declaration project{
"project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};

return ExecutePlanAndCollectAsTable(std::move(project));
}

aggregate

aggregate 节点计算各种类型的数据聚合。

Alkaid 支持两种类型的聚合:“标量”聚合和“哈希”聚合。标量聚合将数组或标量输入简化为单个标量输出(例如,计算列的平均值)。 哈希聚合的作用类似于 SQL 中的 GROUP BY,首先根据一个或多个关键列对数据进行分区,然后减少每个分区中的数据。aggregate 节点支持这两种类型的计算,并且可以一次计算任意数量的聚合。

AggregateNodeOptions 用于定义聚合标准。它需要一个聚合函数及其选项的列表;要聚合的目标字段列表,每个函数一个;以及 输出字段的名称列表,每个函数一个。在哈希聚合的情况下,它还可以选择用于对数据进行分区的列列表。可以从此聚合函数列表<aggregation-option-list> 中选择聚合函数。

info

此节点是一个管道断路器,它将在内存中完全实现数据集。将来,将添加溢出机制,以缓解这一限制。

聚合可以将结果作为组或标量提供。例如,像 hash_count 这样的操作将每个唯一记录的计数作为分组结果提供,而像 sum 这样的操作则提供单个记录。

标量聚合示例:

/// \brief An example showing an aggregation node to aggregate an entire table
///
/// Source-Aggregation-Table
/// This example shows how an aggregation operation can be applied on a
/// execution plan resulting in a scalar output. The source node loads the
/// data and the aggregation (counting unique types in column 'a')
/// is applied on this data. The output is collected into a table (that will
/// have exactly one row)
turbo::Status SourceScalarAggregateSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};
auto aggregate_options =
ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
ac::Declaration aggregate{
"aggregate", {std::move(source)}, std::move(aggregate_options)};

return ExecutePlanAndCollectAsTable(std::move(aggregate));
}

组聚合示例:

/// \brief An example showing an aggregation node to perform a group-by operation
///
/// Source-Aggregation-Table
/// This example shows how an aggregation operation can be applied on a
/// execution plan resulting in grouped output. The source node loads the
/// data and the aggregation (counting unique types in column 'a') is
/// applied on this data. The output is collected into a table that will contain
/// one row for each unique combination of group keys.
turbo::Status SourceGroupAggregateSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};
auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
auto aggregate_options =
ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
/*keys=*/{"b"}};
ac::Declaration aggregate{
"aggregate", {std::move(source)}, std::move(aggregate_options)};

return ExecutePlanAndCollectAsTable(std::move(aggregate));
}

sink

sink 操作提供输出,是流式执行定义的最终节点。SinkNodeOptions 接口用于传递所需的选项。与源操作符类 似,sink 操作符每次调用时都会使用返回记录批处理未来的函数来公开输出。预计调用者将重复调用此函数,直到生成器函数耗 尽(返回std::optional::nullopt)。如果此函数调用不够频繁,则记录批处理将在内存中累积。执行计划应该只有一个“终端”节点(一个接收器节 点)。ExecPlan 可能会因取消或错误而提前终止,在输出被完全使用之前。但是,该计划可以独立于接收器安全地销毁,接收器将通 过exec_plan->finished() 保存未使用的批次。

作为源示例的一部分,还包括 Sink 操作;

/// \brief An example demonstrating a source and sink node
///
/// Source-Table Example
/// This example shows how a custom source can be used
/// in an execution plan. This includes source node using pregenerated
/// data and collecting it into a table.
///
/// This sort of custom source is often not needed. In most cases you can
/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
/// exec_batch_source, or record_batch_source (for in-memory data)
turbo::Status SourceSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};

return ExecutePlanAndCollectAsTable(std::move(source));
}

consuming_sink

consuming_sink 操作符是执行计划中包含消费操作的接收操作(即,执行计划在消费完成之前不应完成)。 与 sink 节点不同,此节点接收预期消费批次的回调函数。一旦此回调完成,执行计划将不再保留对批次的任何 引用。消费函数可能在上一次调用完成之前被调用。如果消费函数运行不够快,那么许多并发执行可能会堆积起来,从而阻塞 CPU 线程池。在所有消费函数回调完成之前,执行计划不会被标记为完成。一旦所有批次都已交付,执行计划将等待 finish 未来完成,然后 再将执行计划标记为完成。这允许工作流,其中消费函数将批次转换为异步任务(这目前在数据集写入节点内部完成)。

例子:

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
alkaid::Future<> finish = alkaid::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, alkaid::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
turbo::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return turbo::Status::OK();
}

alkaid::Future<> Finish() override { return finish; }

std::atomic<uint32_t> *batches_seen;
alkaid::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

alkaid::acero::ExecNode *consuming_sink;

TURBO_MOVE_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));

Consuming-Sink 示例:

/// \brief An example showing a consuming sink node
///
/// Source-Consuming-Sink
/// This example shows how the data can be consumed within the execution plan
/// by using a ConsumingSink node. There is no data output from this execution plan.
turbo::Status SourceConsumingSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};

std::atomic<uint32_t> batches_seen{0};
alkaid::Future<> finish = alkaid::Future<>::Make();
struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, alkaid::Future<> finish)
: batches_seen(batches_seen), finish(std::move(finish)) {}

turbo::Status Init(const std::shared_ptr<alkaid::Schema>& schema,
ac::BackpressureControl* backpressure_control,
ac::ExecPlan* plan) override {
// This will be called as the plan is started (before the first call to Consume)
// and provides the schema of the data coming into the node, controls for pausing /
// resuming input, and a pointer to the plan itself which can be used to access
// other utilities such as the thread indexer or async task scheduler.
return turbo::Status::OK();
}

turbo::Status Consume(cp::ExecBatch batch) override {
(*batches_seen)++;
return turbo::Status::OK();
}

alkaid::Future<> Finish() override {
// Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
// output file handles and flushing remaining work
return alkaid::Future<>::MakeFinished();
}

std::atomic<uint32_t>* batches_seen;
alkaid::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

ac::Declaration consuming_sink{"consuming_sink",
{std::move(source)},
ac::ConsumingSinkNodeOptions(std::move(consumer))};

// Since we are consuming the data within the plan there is no output and we simply
// run the plan to completion instead of collecting into a table.
TURBO_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));

std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
<< std::endl;
return turbo::Status::OK();
}

order_by_sink

order_by_sink 操作是 sink 操作的扩展。此操作通过提供 OrderBySinkNodeOptions 来保证流的排序。 这里提供了 alkaid::compute::SortOptions 来定义哪些列用于排序以及是否按升序或降序值排序。

此节点是一个“管道断路器”,它将在内存中完全实现数据集。将来,将添加溢出机制,以缓解这一限制。 :::

Order-By-Sink 示例:

turbo::Status ExecutePlanAndCollectAsTableWithCustomSink(
std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<alkaid::Schema> schema,
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
// translate sink_gen (async) to sink_reader (sync)
std::shared_ptr<alkaid::RecordBatchReader> sink_reader =
ac::MakeGeneratorReader(schema, std::move(sink_gen), alkaid::default_memory_pool());

// validate the ExecPlan
TURBO_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();

// collect sink_reader into a Table
std::shared_ptr<alkaid::Table> response_table;

TURBO_MOVE_OR_RAISE(response_table,
alkaid::Table::FromRecordBatchReader(sink_reader.get()));

std::cout << "Results : " << response_table->ToString() << std::endl;

// stop producing
plan->StopProducing();
// plan mark finished
auto future = plan->finished();
return future.status();
}

/// \brief An example showing an order-by node
///
/// Source-OrderBy-Sink
/// In this example, the data enters through the source node
/// and the data is ordered in the sink node. The order can be
/// ASCENDING or DESCENDING and it is configurable. The output
/// is obtained as a table from the sink node.
turbo::Status SourceOrderBySinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
ac::ExecPlan::Make(*cp::threaded_exec_context()));

TURBO_MOVE_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
TURBO_MOVE_OR_RAISE(ac::ExecNode * source,
ac::MakeExecNode("source", plan.get(), {}, source_node_options));

TURBO_RETURN_NOT_OK(ac::MakeExecNode(
"order_by_sink", plan.get(), {source},
ac::OrderBySinkNodeOptions{
cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));

return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
}

select_k_sink

select_k_sink 选项允许选择顶部/底部 K 个元素,

类似于 SQL ORDER BY ... LIMIT K 子句。SelectKOptions

是使用 OrderBySinkNode 定义定义的。此选项返回

一个接收输入然后计算 top_k/bottom_k 的接收器节点。

warning

此节点是一个“管道断路器”,将在内存中完全实现输入。将来,将添加溢出机制,这应该可以缓解这一限制。

SelectK 示例:

/// \brief An example showing a select-k node
///
/// Source-KSelect
/// This example shows how K number of elements can be selected
/// either from the top or bottom. The output node is a modified
/// sink node where output can be obtained as a table.
turbo::Status SourceKSelectExample() {
TURBO_MOVE_OR_RAISE(auto input, MakeGroupableBatches());
TURBO_MOVE_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
ac::ExecPlan::Make(*cp::threaded_exec_context()));
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

TURBO_MOVE_OR_RAISE(
ac::ExecNode * source,
ac::MakeExecNode("source", plan.get(), {},
ac::SourceNodeOptions{input.schema, input.gen()}));

cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});

TURBO_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
ac::SelectKSinkNodeOptions{options, &sink_gen}));

auto schema = alkaid::schema(
{alkaid::field("i32", alkaid::int32()), alkaid::field("str", alkaid::utf8())});

return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
}

table_sink

table_sink 节点提供了将输出作为内存表接收的功能。这比流执行引擎提供的其他接收器节点更易于使用,但只有当输出 适合内存时才有意义。该节点是使用TableSinkNodeOptions创建的。

使用 table_sink 的示例

/// \brief An example showing a table sink node
///
/// TableSink Example
/// This example shows how a table_sink can be used
/// in an execution plan. This includes a source node
/// receiving data as batches and the table sink node
/// which emits the output as a table.
turbo::Status TableSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
ac::ExecPlan::Make(*cp::threaded_exec_context()));

TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

TURBO_MOVE_OR_RAISE(ac::ExecNode * source,
ac::MakeExecNode("source", plan.get(), {}, source_node_options));

std::shared_ptr<alkaid::Table> output_table;
auto table_sink_options = ac::TableSinkNodeOptions{&output_table};

TURBO_RETURN_NOT_OK(
ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
// validate the ExecPlan
TURBO_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();

// Wait for the plan to finish
auto finished = plan->finished();
RETURN_NOT_OK(finished.status());
std::cout << "Results : " << output_table->ToString() << std::endl;
return turbo::Status::OK();
}

scan

scan 是用于加载和处理数据集的操作。当您的输入是数据集时,它应该优于更通用的 source 节点。该行为 使用 alkaid::dataset::ScanNodeOptions 定义。有关数据集和各种扫描选项的更多信息,请参阅 ../dataset

此节点能够将下推过滤器应用于文件读取器,从而减少需要读取的数据量。这意味着您可以向扫描节点提供与 FilterNode 相同的过滤器表达式,因为过滤是在两个不同的地方完成的。

扫描示例:

/// \brief An example demonstrating a scan and sink node
///
/// Scan-Table
/// This example shows how scan operation can be applied on a dataset.
/// There are operations that can be applied on the scan (project, filter)
/// and the input data can be processed. The output is obtained as a table
turbo::Status ScanSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
options->projection = cp::project({}, {}); // create empty projection

// construct the scan node
auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};

return ExecutePlanAndCollectAsTable(std::move(scan));
}

write

write 节点使用 Alkaid 中的 ../dataset 功能将查询结果保存为 Parquet、Feather、CSV 等格式的文件数据集。写入选项通 过 alkaid::dataset::WriteNodeOptions 提供,而 alkaid::dataset::FileSystemDatasetWriteOptions 又包 含 alkaid::dataset::FileSystemDatasetWriteOptionsalkaid::dataset::FileSystemDatasetWriteOptions 提 供对写入数据集的控制,包括输出目录、文件命名方案等选项。

写入示例:

/// \brief An example showing a write node
/// \param file_path The destination to write to
///
/// Scan-Filter-Write
/// This example shows how scan node can be used to load the data
/// and after processing how it can be written to disk.
turbo::Status ScanFilterWriteExample(const std::string& file_path) {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// empty projection
options->projection = cp::project({}, {});

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

std::string root_path = "";
std::string uri = "file://" + file_path;
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::fs::FileSystem> filesystem,
alkaid::fs::FileSystemFromUri(uri, &root_path));

auto base_path = root_path + "/parquet_dataset";
// Uncomment the following line, if run repeatedly
// TURBO_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
TURBO_RETURN_NOT_OK(filesystem->CreateDir(base_path));

// The partition schema determines which fields are part of the partitioning.
auto partition_schema = alkaid::schema({alkaid::field("a", alkaid::int32())});
// We'll use Hive-style partitioning,
// which creates directories with "key=value" pairs.

auto partitioning =
std::make_shared<alkaid::dataset::HivePartitioning>(partition_schema);
// We'll write Parquet files.
auto format = std::make_shared<alkaid::dataset::ParquetFileFormat>();

alkaid::dataset::FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = format->DefaultWriteOptions();
write_options.filesystem = filesystem;
write_options.base_dir = base_path;
write_options.partitioning = partitioning;
write_options.basename_template = "part{i}.parquet";

alkaid::dataset::WriteNodeOptions write_node_options{write_options};

ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};

// Since the write node has no output we simply run the plan to completion and the
// data should be written
TURBO_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));

std::cout << "Dataset written to " << base_path << std::endl;
return turbo::Status::OK();
}

union

union 将具有相同架构的多个数据流合并为一个, 类似于 SQL UNION ALL 子句。

以下示例演示了如何使用两个数据源实现这一点。

联合示例:

/// \brief An example showing a union node
///
/// Source-Union-Table
/// This example shows how a union operation can be applied on two
/// data sources. The output is collected into a table.
turbo::Status SourceUnionSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

ac::Declaration lhs{"source",
ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
lhs.label = "lhs";
ac::Declaration rhs{"source",
ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
rhs.label = "rhs";
ac::Declaration union_plan{
"union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};

return ExecutePlanAndCollectAsTable(std::move(union_plan));
}

hash_join

hash_join 操作提供关系代数操作,使用基于哈希的算法进行连接。HashJoinNodeOptions 包含定义连接所需的选项。 hash_join 支持 左/右/全半/反/外连接。此外,可以通过连接选项设 置连接键(即要连接的列)和后缀(即后缀术语,如“_x”,可以作为在左关系和右关系中重复的列名的后缀附加)。 阅读更多关于 hash-joins

Hash-Join 示例:

/// \brief An example showing a hash join node
///
/// Source-HashJoin-Table
/// This example shows how source node gets the data and how a self-join
/// is applied on the data. The join options are configurable. The output
/// is collected into a table.
turbo::Status SourceHashJoinSinkExample() {
TURBO_MOVE_OR_RAISE(auto input, MakeGroupableBatches());

ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};

ac::HashJoinNodeOptions join_opts{
ac::JoinType::INNER,
/*left_keys=*/{"str"},
/*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};

ac::Declaration hashjoin{
"hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};

return ExecutePlanAndCollectAsTable(std::move(hashjoin));
}

Summary

这些节点的示例可以在 Alkaid 源中的 cpp/examples/alkaid/execution_plan_documentation_examples.cc 中找到。

完整示例:

#include <alkaid/array.h>
#include <alkaid/builder.h>

#include <alkaid/acero/exec_plan.h>
#include <alkaid/compute/api.h>
#include <alkaid/compute/api_vector.h>
#include <alkaid/compute/cast.h>

#include <alkaid/csv/api.h>

#include <alkaid/dataset/dataset.h>
#include <alkaid/dataset/file_base.h>
#include <alkaid/dataset/file_parquet.h>
#include <alkaid/dataset/plan.h>
#include <alkaid/dataset/scanner.h>

#include <alkaid/io/interfaces.h>
#include <alkaid/io/memory.h>

#include <alkaid/result.h>
#include <alkaid/status.h>
#include <alkaid/table.h>

#include <alkaid/ipc/api.h>

#include <alkaid/util/future.h>
#include <alkaid/util/range.h>
#include <alkaid/util/thread_pool.h>
#include <alkaid/util/vector.h>

#include <iostream>
#include <memory>
#include <utility>

// Demonstrate various operators in Alkaid Streaming Execution Engine

namespace cp = ::alkaid::compute;
namespace ac = ::alkaid::acero;

constexpr char kSep[] = "******";

void PrintBlock(const std::string& msg) {
std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
}

template <typename TYPE,
typename = typename std::enable_if<alkaid::is_number_type<TYPE>::value |
alkaid::is_boolean_type<TYPE>::value |
alkaid::is_temporal_type<TYPE>::value>::type>
alkaid::Result<std::shared_ptr<alkaid::Array>> GetArrayDataSample(
const std::vector<typename TYPE::c_type>& values) {
using AlkaidBuilderType = typename alkaid::TypeTraits<TYPE>::BuilderType;
AlkaidBuilderType builder;
TURBO_RETURN_NOT_OK(builder.Reserve(values.size()));
TURBO_RETURN_NOT_OK(builder.AppendValues(values));
return builder.Finish();
}

template <class TYPE>
alkaid::Result<std::shared_ptr<alkaid::Array>> GetBinaryArrayDataSample(
const std::vector<std::string>& values) {
using AlkaidBuilderType = typename alkaid::TypeTraits<TYPE>::BuilderType;
AlkaidBuilderType builder;
TURBO_RETURN_NOT_OK(builder.Reserve(values.size()));
TURBO_RETURN_NOT_OK(builder.AppendValues(values));
return builder.Finish();
}

alkaid::Result<std::shared_ptr<alkaid::RecordBatch>> GetSampleRecordBatch(
const alkaid::ArrayVector array_vector, const alkaid::FieldVector& field_vector) {
std::shared_ptr<alkaid::RecordBatch> record_batch;
TURBO_MOVE_OR_RAISE(auto struct_result,
alkaid::StructArray::Make(array_vector, field_vector));
return record_batch->FromStructArray(struct_result);
}

/// \brief Create a sample table
/// The table's contents will be:
/// a,b
/// 1,null
/// 2,true
/// null,true
/// 3,false
/// null,true
/// 4,false
/// 5,null
/// 6,false
/// 7,false
/// 8,true
/// \return The created table

alkaid::Result<std::shared_ptr<alkaid::Table>> GetTable() {
auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
TURBO_MOVE_OR_RAISE(auto int64_array,
GetArrayDataSample<alkaid::Int64Type>(
{1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));

alkaid::BooleanBuilder boolean_builder;
std::shared_ptr<alkaid::BooleanArray> bool_array;

std::vector<uint8_t> bool_values = {false, true, true, false, true,
false, false, false, false, true};
std::vector<bool> is_valid = {false, true, true, true, true,
true, false, true, true, true};

TURBO_RETURN_NOT_OK(boolean_builder.Reserve(10));

TURBO_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));

TURBO_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));

auto record_batch =
alkaid::RecordBatch::Make(alkaid::schema({alkaid::field("a", alkaid::int64()),
alkaid::field("b", alkaid::boolean())}),
10, {int64_array, bool_array});
TURBO_MOVE_OR_RAISE(auto table, alkaid::Table::FromRecordBatches({record_batch}));
return table;
}

/// \brief Create a sample dataset
/// \return An in-memory dataset based on GetTable()
alkaid::Result<std::shared_ptr<alkaid::dataset::Dataset>> GetDataset() {
TURBO_MOVE_OR_RAISE(auto table, GetTable());
auto ds = std::make_shared<alkaid::dataset::InMemoryDataset>(table);
return ds;
}

alkaid::Result<cp::ExecBatch> GetExecBatchFromVectors(
const alkaid::FieldVector& field_vector, const alkaid::ArrayVector& array_vector) {
std::shared_ptr<alkaid::RecordBatch> record_batch;
TURBO_MOVE_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
cp::ExecBatch batch{*res_batch};
return batch;
}

// (Doc section: BatchesWithSchema Definition)
struct BatchesWithSchema {
std::vector<cp::ExecBatch> batches;
std::shared_ptr<alkaid::Schema> schema;
// This method uses internal alkaid utilities to
// convert a vector of record batches to an AsyncGenerator of optional batches
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
auto opt_batches = ::alkaid::internal::MapVector(
[](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
batches);
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
gen = alkaid::MakeVectorGenerator(std::move(opt_batches));
return gen;
}
};
// (Doc section: BatchesWithSchema Definition)

// (Doc section: MakeBasicBatches Definition)
alkaid::Result<BatchesWithSchema> MakeBasicBatches() {
BatchesWithSchema out;
auto field_vector = {alkaid::field("a", alkaid::int32()),
alkaid::field("b", alkaid::boolean())};
TURBO_MOVE_OR_RAISE(auto b1_int, GetArrayDataSample<alkaid::Int32Type>({0, 4}));
TURBO_MOVE_OR_RAISE(auto b2_int, GetArrayDataSample<alkaid::Int32Type>({5, 6, 7}));
TURBO_MOVE_OR_RAISE(auto b3_int, GetArrayDataSample<alkaid::Int32Type>({8, 9, 10}));

TURBO_MOVE_OR_RAISE(auto b1_bool,
GetArrayDataSample<alkaid::BooleanType>({false, true}));
TURBO_MOVE_OR_RAISE(auto b2_bool,
GetArrayDataSample<alkaid::BooleanType>({true, false, true}));
TURBO_MOVE_OR_RAISE(auto b3_bool,
GetArrayDataSample<alkaid::BooleanType>({false, true, false}));

TURBO_MOVE_OR_RAISE(auto b1,
GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
TURBO_MOVE_OR_RAISE(auto b2,
GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
TURBO_MOVE_OR_RAISE(auto b3,
GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));

out.batches = {b1, b2, b3};
out.schema = alkaid::schema(field_vector);
return out;
}
// (Doc section: MakeBasicBatches Definition)

alkaid::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
BatchesWithSchema out;
auto field = alkaid::field("a", alkaid::int32());
TURBO_MOVE_OR_RAISE(auto b1_int, GetArrayDataSample<alkaid::Int32Type>({1, 3, 0, 2}));
TURBO_MOVE_OR_RAISE(auto b2_int,
GetArrayDataSample<alkaid::Int32Type>({121, 101, 120, 12}));
TURBO_MOVE_OR_RAISE(auto b3_int,
GetArrayDataSample<alkaid::Int32Type>({10, 110, 210, 121}));
TURBO_MOVE_OR_RAISE(auto b4_int,
GetArrayDataSample<alkaid::Int32Type>({51, 101, 2, 34}));
TURBO_MOVE_OR_RAISE(auto b5_int,
GetArrayDataSample<alkaid::Int32Type>({11, 31, 1, 12}));
TURBO_MOVE_OR_RAISE(auto b6_int,
GetArrayDataSample<alkaid::Int32Type>({12, 101, 120, 12}));
TURBO_MOVE_OR_RAISE(auto b7_int,
GetArrayDataSample<alkaid::Int32Type>({0, 110, 210, 11}));
TURBO_MOVE_OR_RAISE(auto b8_int,
GetArrayDataSample<alkaid::Int32Type>({51, 10, 2, 3}));

TURBO_MOVE_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
TURBO_MOVE_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
TURBO_MOVE_OR_RAISE(auto b3,
GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
TURBO_MOVE_OR_RAISE(auto b4,
GetExecBatchFromVectors({field, field, field, field},
{b4_int, b5_int, b6_int, b7_int}));
out.batches = {b1, b2, b3, b4};
out.schema = alkaid::schema({field});
return out;
}

alkaid::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
BatchesWithSchema out;
auto fields = {alkaid::field("i32", alkaid::int32()), alkaid::field("str", alkaid::utf8())};
TURBO_MOVE_OR_RAISE(auto b1_int, GetArrayDataSample<alkaid::Int32Type>({12, 7, 3}));
TURBO_MOVE_OR_RAISE(auto b2_int, GetArrayDataSample<alkaid::Int32Type>({-2, -1, 3}));
TURBO_MOVE_OR_RAISE(auto b3_int, GetArrayDataSample<alkaid::Int32Type>({5, 3, -8}));
TURBO_MOVE_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<alkaid::StringType>(
{"alpha", "beta", "alpha"}));
TURBO_MOVE_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<alkaid::StringType>(
{"alpha", "gamma", "alpha"}));
TURBO_MOVE_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<alkaid::StringType>(
{"gamma", "beta", "alpha"}));
TURBO_MOVE_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
TURBO_MOVE_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
TURBO_MOVE_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
out.batches = {b1, b2, b3};

size_t batch_count = out.batches.size();
for (int repeat = 1; repeat < multiplicity; ++repeat) {
for (size_t i = 0; i < batch_count; ++i) {
out.batches.push_back(out.batches[i]);
}
}

out.schema = alkaid::schema(fields);
return out;
}

turbo::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
// collect sink_reader into a Table
std::shared_ptr<alkaid::Table> response_table;
TURBO_MOVE_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));

std::cout << "Results : " << response_table->ToString() << std::endl;

return turbo::Status::OK();
}

// (Doc section: Scan Example)

/// \brief An example demonstrating a scan and sink node
///
/// Scan-Table
/// This example shows how scan operation can be applied on a dataset.
/// There are operations that can be applied on the scan (project, filter)
/// and the input data can be processed. The output is obtained as a table
turbo::Status ScanSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
options->projection = cp::project({}, {}); // create empty projection

// construct the scan node
auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};

return ExecutePlanAndCollectAsTable(std::move(scan));
}
// (Doc section: Scan Example)

// (Doc section: Source Example)

/// \brief An example demonstrating a source and sink node
///
/// Source-Table Example
/// This example shows how a custom source can be used
/// in an execution plan. This includes source node using pregenerated
/// data and collecting it into a table.
///
/// This sort of custom source is often not needed. In most cases you can
/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
/// exec_batch_source, or record_batch_source (for in-memory data)
turbo::Status SourceSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};

return ExecutePlanAndCollectAsTable(std::move(source));
}
// (Doc section: Source Example)

// (Doc section: Table Source Example)

/// \brief An example showing a table source node
///
/// TableSource-Table Example
/// This example shows how a table_source can be used
/// in an execution plan. This includes a table source node
/// receiving data from a table. This plan simply collects the
/// data back into a table but nodes could be added that modify
/// or transform the data as well (as is shown in later examples)
turbo::Status TableSourceSinkExample() {
TURBO_MOVE_OR_RAISE(auto table, GetTable());

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
int max_batch_size = 2;
auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};

ac::Declaration source{"table_source", std::move(table_source_options)};

return ExecutePlanAndCollectAsTable(std::move(source));
}
// (Doc section: Table Source Example)

// (Doc section: Filter Example)

/// \brief An example showing a filter node
///
/// Source-Filter-Table
/// This example shows how a filter can be used in an execution plan,
/// to filter data from a source. The output from the execution plan
/// is collected into a table.
turbo::Status ScanFilterSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// specify the filter. This filter removes all rows where the
// value of the "a" column is greater than 3.
cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
// set filter for scanner : on-disk / push-down filtering.
// This step can be skipped if you are not reading from disk.
options->filter = filter_expr;
// empty projection
options->projection = cp::project({}, {});

// construct the scan node
std::cout << "Initialized Scanning Options" << std::endl;

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};
std::cout << "Scan node options created" << std::endl;

ac::Declaration scan{"scan", std::move(scan_node_options)};

// pipe the scan node into the filter node
// Need to set the filter in scan node options and filter node options.
// At scan node it is used for on-disk / push-down filtering.
// At filter node it is used for in-memory filtering.
ac::Declaration filter{
"filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};

return ExecutePlanAndCollectAsTable(std::move(filter));
}

// (Doc section: Filter Example)

// (Doc section: Project Example)

/// \brief An example showing a project node
///
/// Scan-Project-Table
/// This example shows how a Scan operation can be used to load the data
/// into the execution plan, how a project operation can be applied on the
/// data stream and how the output is collected into a table
turbo::Status ScanProjectSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// projection
cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
options->projection = cp::project({}, {});

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};
ac::Declaration project{
"project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};

return ExecutePlanAndCollectAsTable(std::move(project));
}

// (Doc section: Project Example)

// This is a variation of ScanProjectSinkExample introducing how to use the
// Declaration::Sequence function
turbo::Status ScanProjectSequenceSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// projection
cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
options->projection = cp::project({}, {});

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

// (Doc section: Project Sequence Example)
// Inputs do not have to be passed to the project node when using Sequence
ac::Declaration plan =
ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
{"project", ac::ProjectNodeOptions({a_times_2})}});
// (Doc section: Project Sequence Example)

return ExecutePlanAndCollectAsTable(std::move(plan));
}

// (Doc section: Scalar Aggregate Example)

/// \brief An example showing an aggregation node to aggregate an entire table
///
/// Source-Aggregation-Table
/// This example shows how an aggregation operation can be applied on a
/// execution plan resulting in a scalar output. The source node loads the
/// data and the aggregation (counting unique types in column 'a')
/// is applied on this data. The output is collected into a table (that will
/// have exactly one row)
turbo::Status SourceScalarAggregateSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};
auto aggregate_options =
ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
ac::Declaration aggregate{
"aggregate", {std::move(source)}, std::move(aggregate_options)};

return ExecutePlanAndCollectAsTable(std::move(aggregate));
}
// (Doc section: Scalar Aggregate Example)

// (Doc section: Group Aggregate Example)

/// \brief An example showing an aggregation node to perform a group-by operation
///
/// Source-Aggregation-Table
/// This example shows how an aggregation operation can be applied on a
/// execution plan resulting in grouped output. The source node loads the
/// data and the aggregation (counting unique types in column 'a') is
/// applied on this data. The output is collected into a table that will contain
/// one row for each unique combination of group keys.
turbo::Status SourceGroupAggregateSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};
auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
auto aggregate_options =
ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
/*keys=*/{"b"}};
ac::Declaration aggregate{
"aggregate", {std::move(source)}, std::move(aggregate_options)};

return ExecutePlanAndCollectAsTable(std::move(aggregate));
}
// (Doc section: Group Aggregate Example)

// (Doc section: ConsumingSink Example)

/// \brief An example showing a consuming sink node
///
/// Source-Consuming-Sink
/// This example shows how the data can be consumed within the execution plan
/// by using a ConsumingSink node. There is no data output from this execution plan.
turbo::Status SourceConsumingSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

ac::Declaration source{"source", std::move(source_node_options)};

std::atomic<uint32_t> batches_seen{0};
alkaid::Future<> finish = alkaid::Future<>::Make();
struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, alkaid::Future<> finish)
: batches_seen(batches_seen), finish(std::move(finish)) {}

turbo::Status Init(const std::shared_ptr<alkaid::Schema>& schema,
ac::BackpressureControl* backpressure_control,
ac::ExecPlan* plan) override {
// This will be called as the plan is started (before the first call to Consume)
// and provides the schema of the data coming into the node, controls for pausing /
// resuming input, and a pointer to the plan itself which can be used to access
// other utilities such as the thread indexer or async task scheduler.
return turbo::Status::OK();
}

turbo::Status Consume(cp::ExecBatch batch) override {
(*batches_seen)++;
return turbo::Status::OK();
}

alkaid::Future<> Finish() override {
// Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
// output file handles and flushing remaining work
return alkaid::Future<>::MakeFinished();
}

std::atomic<uint32_t>* batches_seen;
alkaid::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

ac::Declaration consuming_sink{"consuming_sink",
{std::move(source)},
ac::ConsumingSinkNodeOptions(std::move(consumer))};

// Since we are consuming the data within the plan there is no output and we simply
// run the plan to completion instead of collecting into a table.
TURBO_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));

std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
<< std::endl;
return turbo::Status::OK();
}
// (Doc section: ConsumingSink Example)

// (Doc section: OrderBySink Example)

turbo::Status ExecutePlanAndCollectAsTableWithCustomSink(
std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<alkaid::Schema> schema,
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
// translate sink_gen (async) to sink_reader (sync)
std::shared_ptr<alkaid::RecordBatchReader> sink_reader =
ac::MakeGeneratorReader(schema, std::move(sink_gen), alkaid::default_memory_pool());

// validate the ExecPlan
TURBO_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();

// collect sink_reader into a Table
std::shared_ptr<alkaid::Table> response_table;

TURBO_MOVE_OR_RAISE(response_table,
alkaid::Table::FromRecordBatchReader(sink_reader.get()));

std::cout << "Results : " << response_table->ToString() << std::endl;

// stop producing
plan->StopProducing();
// plan mark finished
auto future = plan->finished();
return future.status();
}

/// \brief An example showing an order-by node
///
/// Source-OrderBy-Sink
/// In this example, the data enters through the source node
/// and the data is ordered in the sink node. The order can be
/// ASCENDING or DESCENDING and it is configurable. The output
/// is obtained as a table from the sink node.
turbo::Status SourceOrderBySinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
ac::ExecPlan::Make(*cp::threaded_exec_context()));

TURBO_MOVE_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
TURBO_MOVE_OR_RAISE(ac::ExecNode * source,
ac::MakeExecNode("source", plan.get(), {}, source_node_options));

TURBO_RETURN_NOT_OK(ac::MakeExecNode(
"order_by_sink", plan.get(), {source},
ac::OrderBySinkNodeOptions{
cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));

return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
}

// (Doc section: OrderBySink Example)

// (Doc section: HashJoin Example)

/// \brief An example showing a hash join node
///
/// Source-HashJoin-Table
/// This example shows how source node gets the data and how a self-join
/// is applied on the data. The join options are configurable. The output
/// is collected into a table.
turbo::Status SourceHashJoinSinkExample() {
TURBO_MOVE_OR_RAISE(auto input, MakeGroupableBatches());

ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};

ac::HashJoinNodeOptions join_opts{
ac::JoinType::INNER,
/*left_keys=*/{"str"},
/*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};

ac::Declaration hashjoin{
"hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};

return ExecutePlanAndCollectAsTable(std::move(hashjoin));
}

// (Doc section: HashJoin Example)

// (Doc section: KSelect Example)

/// \brief An example showing a select-k node
///
/// Source-KSelect
/// This example shows how K number of elements can be selected
/// either from the top or bottom. The output node is a modified
/// sink node where output can be obtained as a table.
turbo::Status SourceKSelectExample() {
TURBO_MOVE_OR_RAISE(auto input, MakeGroupableBatches());
TURBO_MOVE_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
ac::ExecPlan::Make(*cp::threaded_exec_context()));
alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

TURBO_MOVE_OR_RAISE(
ac::ExecNode * source,
ac::MakeExecNode("source", plan.get(), {},
ac::SourceNodeOptions{input.schema, input.gen()}));

cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});

TURBO_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
ac::SelectKSinkNodeOptions{options, &sink_gen}));

auto schema = alkaid::schema(
{alkaid::field("i32", alkaid::int32()), alkaid::field("str", alkaid::utf8())});

return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
}

// (Doc section: KSelect Example)

// (Doc section: Write Example)

/// \brief An example showing a write node
/// \param file_path The destination to write to
///
/// Scan-Filter-Write
/// This example shows how scan node can be used to load the data
/// and after processing how it can be written to disk.
turbo::Status ScanFilterWriteExample(const std::string& file_path) {
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::dataset::Dataset> dataset, GetDataset());

auto options = std::make_shared<alkaid::dataset::ScanOptions>();
// empty projection
options->projection = cp::project({}, {});

auto scan_node_options = alkaid::dataset::ScanNodeOptions{dataset, options};

ac::Declaration scan{"scan", std::move(scan_node_options)};

alkaid::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;

std::string root_path = "";
std::string uri = "file://" + file_path;
TURBO_MOVE_OR_RAISE(std::shared_ptr<alkaid::fs::FileSystem> filesystem,
alkaid::fs::FileSystemFromUri(uri, &root_path));

auto base_path = root_path + "/parquet_dataset";
// Uncomment the following line, if run repeatedly
// TURBO_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
TURBO_RETURN_NOT_OK(filesystem->CreateDir(base_path));

// The partition schema determines which fields are part of the partitioning.
auto partition_schema = alkaid::schema({alkaid::field("a", alkaid::int32())});
// We'll use Hive-style partitioning,
// which creates directories with "key=value" pairs.

auto partitioning =
std::make_shared<alkaid::dataset::HivePartitioning>(partition_schema);
// We'll write Parquet files.
auto format = std::make_shared<alkaid::dataset::ParquetFileFormat>();

alkaid::dataset::FileSystemDatasetWriteOptions write_options;
write_options.file_write_options = format->DefaultWriteOptions();
write_options.filesystem = filesystem;
write_options.base_dir = base_path;
write_options.partitioning = partitioning;
write_options.basename_template = "part{i}.parquet";

alkaid::dataset::WriteNodeOptions write_node_options{write_options};

ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};

// Since the write node has no output we simply run the plan to completion and the
// data should be written
TURBO_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));

std::cout << "Dataset written to " << base_path << std::endl;
return turbo::Status::OK();
}

// (Doc section: Write Example)

// (Doc section: Union Example)

/// \brief An example showing a union node
///
/// Source-Union-Table
/// This example shows how a union operation can be applied on two
/// data sources. The output is collected into a table.
turbo::Status SourceUnionSinkExample() {
TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

ac::Declaration lhs{"source",
ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
lhs.label = "lhs";
ac::Declaration rhs{"source",
ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
rhs.label = "rhs";
ac::Declaration union_plan{
"union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};

return ExecutePlanAndCollectAsTable(std::move(union_plan));
}

// (Doc section: Union Example)

// (Doc section: Table Sink Example)

/// \brief An example showing a table sink node
///
/// TableSink Example
/// This example shows how a table_sink can be used
/// in an execution plan. This includes a source node
/// receiving data as batches and the table sink node
/// which emits the output as a table.
turbo::Status TableSinkExample() {
TURBO_MOVE_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
ac::ExecPlan::Make(*cp::threaded_exec_context()));

TURBO_MOVE_OR_RAISE(auto basic_data, MakeBasicBatches());

auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};

TURBO_MOVE_OR_RAISE(ac::ExecNode * source,
ac::MakeExecNode("source", plan.get(), {}, source_node_options));

std::shared_ptr<alkaid::Table> output_table;
auto table_sink_options = ac::TableSinkNodeOptions{&output_table};

TURBO_RETURN_NOT_OK(
ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
// validate the ExecPlan
TURBO_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();

// Wait for the plan to finish
auto finished = plan->finished();
RETURN_NOT_OK(finished.status());
std::cout << "Results : " << output_table->ToString() << std::endl;
return turbo::Status::OK();
}

// (Doc section: Table Sink Example)

// (Doc section: RecordBatchReaderSource Example)

/// \brief An example showing the usage of a RecordBatchReader as the data source.
///
/// RecordBatchReaderSourceSink Example
/// This example shows how a record_batch_reader_source can be used
/// in an execution plan. This includes the source node
/// receiving data from a TableRecordBatchReader.

turbo::Status RecordBatchReaderSourceSinkExample() {
TURBO_MOVE_OR_RAISE(auto table, GetTable());
std::shared_ptr<alkaid::RecordBatchReader> reader =
std::make_shared<alkaid::TableBatchReader>(table);
ac::Declaration reader_source{"record_batch_reader_source",
ac::RecordBatchReaderSourceNodeOptions{reader}};
return ExecutePlanAndCollectAsTable(std::move(reader_source));
}

// (Doc section: RecordBatchReaderSource Example)

enum ExampleMode {
SOURCE_SINK = 0,
TABLE_SOURCE_SINK = 1,
SCAN = 2,
FILTER = 3,
PROJECT = 4,
SCALAR_AGGREGATION = 5,
GROUP_AGGREGATION = 6,
CONSUMING_SINK = 7,
ORDER_BY_SINK = 8,
HASHJOIN = 9,
KSELECT = 10,
WRITE = 11,
UNION = 12,
TABLE_SOURCE_TABLE_SINK = 13,
RECORD_BATCH_READER_SOURCE = 14,
PROJECT_SEQUENCE = 15
};

int main(int argc, char** argv) {
if (argc < 3) {
// Fake success for CI purposes.
return EXIT_SUCCESS;
}

std::string base_save_path = argv[1];
int mode = std::atoi(argv[2]);
turbo::Status status;
// ensure alkaid::dataset node factories are in the registry
alkaid::dataset::internal::Initialize();
switch (mode) {
case SOURCE_SINK:
PrintBlock("Source Sink Example");
status = SourceSinkExample();
break;
case TABLE_SOURCE_SINK:
PrintBlock("Table Source Sink Example");
status = TableSourceSinkExample();
break;
case SCAN:
PrintBlock("Scan Example");
status = ScanSinkExample();
break;
case FILTER:
PrintBlock("Filter Example");
status = ScanFilterSinkExample();
break;
case PROJECT:
PrintBlock("Project Example");
status = ScanProjectSinkExample();
break;
case PROJECT_SEQUENCE:
PrintBlock("Project Example (using Declaration::Sequence)");
status = ScanProjectSequenceSinkExample();
break;
case GROUP_AGGREGATION:
PrintBlock("Aggregate Example");
status = SourceGroupAggregateSinkExample();
break;
case SCALAR_AGGREGATION:
PrintBlock("Aggregate Example");
status = SourceScalarAggregateSinkExample();
break;
case CONSUMING_SINK:
PrintBlock("Consuming-Sink Example");
status = SourceConsumingSinkExample();
break;
case ORDER_BY_SINK:
PrintBlock("OrderBy Example");
status = SourceOrderBySinkExample();
break;
case HASHJOIN:
PrintBlock("HashJoin Example");
status = SourceHashJoinSinkExample();
break;
case KSELECT:
PrintBlock("KSelect Example");
status = SourceKSelectExample();
break;
case WRITE:
PrintBlock("Write Example");
status = ScanFilterWriteExample(base_save_path);
break;
case UNION:
PrintBlock("Union Example");
status = SourceUnionSinkExample();
break;
case TABLE_SOURCE_TABLE_SINK:
PrintBlock("TableSink Example");
status = TableSinkExample();
break;
case RECORD_BATCH_READER_SOURCE:
PrintBlock("RecordBatchReaderSource Example");
status = RecordBatchReaderSourceSinkExample();
break;
default:
break;
}

if (status.ok()) {
return EXIT_SUCCESS;
} else {
std::cout << "Error occurred: " << status.message() << std::endl;
return EXIT_FAILURE;
}
}