Skip to main content
Version: 1.1.1

并行归并

Taskflow 提供了模板函数,可以构建一个任务来对一系列项目执行并行缩减。

头文件

#include <kthread/algorithm/reduce.h>

创建并行归并任务

kthread::reduce(B first, E last, T& result, O bop, P part) 创建的约简任务使用二元运算符 bop[first, last) 指定的元素范围执行并行约简,并将约简结果存储在 result 中。它表示以下约简循环的并行执行:

for(auto itr=first; itr<last; itr++) {
result = bop(result, *itr);
}

在运行时,归约任务会生成一个子流程来执行并行归约。归约结果存储在结果中,该结果将在归约任务中通过引用捕获。您有责任确保结果在并行执行期间保持活动状态。

int sum = 100;
std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

kthread::Task task = taskflow.reduce(vec.begin(), vec.end(), sum,
[] (int l, int r) { return l + r; } // binary reducer operator
);
executor.run(taskflow).wait();

assert(sum == 100 + 55);

二元运算符应用于元素对的顺序是未指定的。换句话说,范围的元素可以按任意顺序分组和重新排列。二元运算符的结果和参数类型必须与输入数据类型一致。

通过引用捕获迭代器

您可以使用 std::ref 通过引用传递迭代器,以在依赖任务之间编组参数更新。当创建并行缩减任务时范围未知但需要从另一个任务进行初始化时,这尤其有用。

int sum = 100;
std::vector<int> vec;
std::vector<int>::iterator first, last;

kthread::Task init = taskflow.emplace([&](){
vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
first = vec.begin();
last = vec.end();
});

kthread::Task task = taskflow.reduce(std::ref(first), std::ref(last), sum,
[] (int l, int r) { return l + r; } // binary reducer operator
);

// wrong! must use std::ref, or first and last are captured by copy
// kthread::Task task = taskflow.reduce(first, last, sum, [] (int l, int r) {
// return l + r; // binary reducer operator
// });

init.precede(task);

executor.run(taskflow).wait();

assert(sum == 100 + 55);

在上面的例子中,当 init 完成时,vec 已初始化为 10 个元素,其中 firstlast 指向 vec 的数据范围。然后, reduction 任务将通过引用传递迭代器来处理这个初始化范围。

创建并行变换归约任务

常见的做法是将每个元素转换为新的数据类型,然后对转换后的元素执行缩减。Taskflow 提供了一种方法 kthread::transform_reduce(B first, E last, T& result, BOP bop, UOP uop, P part),该方法 应用 uop 来转换指定范围内的每个元素,然后对结果和转换后的元素执行并行缩减。它表示以下缩减循环的并行执行:

for(auto itr=first; itr<last; itr++) {
result = bop(result, uop(*itr));
}

下面的示例将字符串中的每个数字转换为整数,然后并行求所有整数之和。

std::string str = "12345678";
int sum {0};
kthread::Task task = taskflow.transform_reduce(str.begin(), str.end(), sum,
[] (int a, int b) { // binary reduction operator
return a + b;
},
[] (char c) -> int { // unary transformation operator
return c - '0';
}
);
executor.run(taskflow).wait();
assert(sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); // sum will be 36

我们对转换后的元素应用二元运算符的顺序是未指定的。由于转换后的临时变量,二元运算符可能会在两个参数中都采用 右值,例如 bop(uop(*itr1), uop(*itr2))。当数据传递成本高昂时,您可以将结果类型 T 定义为可移动构造。

创建按索引归约任务

kthread::reduce 不同,kthread::reduce_by_index 函数可让您在索引范围内执行并行缩减, 但可以更好地控制范围的每个部分的处理方式。当您需要为每个子范围自定义缩减过程或想要结合 SIMD 等优化时,这很有用。以下示 例使用 res 对数据中的所有元素执行求和缩减:

std::vector<double> data(100000);
double res = 1.0;
taskflow.reduce_by_index(
// index range
kthread::IndexRange<size_t>(0, N, 1),
// final result
res,
// local reducer
[&](kthread::IndexRange<size_t> subrange, std::optional<double> running_total) {
double residual = running_total ? *running_total : 0.0;
for(size_t i=subrange.begin(); i<subrange.end(); i+=subrange.step_size()) {
data[i] = 1.0; // we initialize the data here
residual += data[i];
}
printf("partial sum = %lf\n", residual);
return residual;
},
// global reducer
std::plus<double>()
);

executor.run(taskflow).wait();
assert(res == 100001);

本地缩减器 lop 为每个子范围计算部分和,全局缩减器 gop 将部分结果组合成最终结果并将其存储在 res 中,其初始 值(即此处的 1.0)也参与缩减过程。本地缩减器的第二个参数是 std::optional 类型,它表示到此子范围的当前部分和。显 然,第一个子范围没有任何部分和,因为没有来自先前子范围的运行总数(即 running_totalstd::nullopt)。

配置分区器

您可以为并行缩减任务配置分区器,使其使用不同的调度方法运行,例如引导式分区、动态分区和静态分区。以下示例使用两个不同的 分区器创建两个并行缩减任务,一个使用静态分区算法,另一个使用引导式分区算法:

kthread::StaticPartitioner static_partitioner;
kthread::GuidedPartitioner guided_partitioner;

int sum1 = 100, sum2 = 100;
std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

// create a parallel-reduction task with static partitioner
taskflow.reduce(vec.begin(), vec.end(), sum1,
[] (int l, int r) { return l + r; },
static_partitioner
);

// create a parallel-reduction task with guided partitioner
taskflow.reduce(vec.begin(), vec.end(), sum2,
[] (int l, int r) { return l + r; },
guided_partitioner
);
warning

注意 默认情况下,如果未指定分区器,则并行缩减任务将使用 kthread::DefaultPartitioner。

info

更多信息请参见算法索引