并行归并
Taskflow 提供了模板函数,可以构建一个任务来对一系列项目执行并行缩减。
头文件
#include <kthread/algorithm/reduce.h>
创建并行归并任务
kthread::reduce(B first, E last, T& result, O bop, P part) 创建的约简任务使用二元运算符 bop 对
[first, last) 指定的元素范围执行并行约简,并将约简结果存储在 result 中。它表示以下约简循环的并行执行:
for(auto itr=first; itr<last; itr++) {
result = bop(result, *itr);
}
在运行时,归约任务会生成一个子流程来执行并行归约。归约结果存储在结果中,该结果将在归约任务中通过引用捕获。您有责任确保结果在并行执行期间保持活动状态。
int sum = 100;
std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
kthread::Task task = taskflow.reduce(vec.begin(), vec.end(), sum,
[] (int l, int r) { return l + r; } // binary reducer operator
);
executor.run(taskflow).wait();
assert(sum == 100 + 55);
二元运算符应用于元素对的顺序是未指定的。换句话说,范围的元素可以按任意顺序分组和重新排列。二元运算符的结果和参数类型必须与输入数据类型一致。
通过引用捕获迭代器
您可以使用 std::ref 通过引用传递迭代器,以在依赖任务之间编组参数更新。当创建并行缩减任务时范围未知但需要从另一个任务进行初始化时,这尤其有用。
int sum = 100;
std::vector<int> vec;
std::vector<int>::iterator first, last;
kthread::Task init = taskflow.emplace([&](){
vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
first = vec.begin();
last = vec.end();
});
kthread::Task task = taskflow.reduce(std::ref(first), std::ref(last), sum,
[] (int l, int r) { return l + r; } // binary reducer operator
);
// wrong! must use std::ref, or first and last are captured by copy
// kthread::Task task = taskflow.reduce(first, last, sum, [] (int l, int r) {
// return l + r; // binary reducer operator
// });
init.precede(task);
executor.run(taskflow).wait();
assert(sum == 100 + 55);
在上面的例子中,当 init 完成时,vec 已初始化为 10 个元素,其中 first 和 last 指向 vec 的数据范围。然后,
reduction 任务将通过引用传递迭代器来处理这个初始化范围。
创建并行变换归约任务
常见的做法是将每个元素转换为新的数据类型,然后对转换后的元素执行缩减。Taskflow 提供了一种方法
kthread::transform_reduce(B first, E last, T& result, BOP bop, UOP uop, P part),该方法
应用 uop 来转换指定范围内的每个元素,然后对结果和转换后的元素执行并行缩减。它表示以下缩减循环的并行执行:
for(auto itr=first; itr<last; itr++) {
result = bop(result, uop(*itr));
}
下面的示例将字符串中的每个数字转换为整数,然后并行求所有整数之和。
std::string str = "12345678";
int sum {0};
kthread::Task task = taskflow.transform_reduce(str.begin(), str.end(), sum,
[] (int a, int b) { // binary reduction operator
return a + b;
},
[] (char c) -> int { // unary transformation operator
return c - '0';
}
);
executor.run(taskflow).wait();
assert(sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); // sum will be 36
我们对转换后的元素应用二元运算符的顺序是未指定的。由于转换后的临时变量,二元运算符可能会在两个参数中都采用
右值,例如 bop(uop(*itr1), uop(*itr2))。当数据传递成本高昂时,您可以将结果类型 T 定义为可移动构造。
创建按索引归约任务
与 kthread::reduce 不同,kthread::reduce_by_index 函数可让您在索引范围内执行并行缩减,
但可以更好地控制范围的每个部 分的处理方式。当您需要为每个子范围自定义缩减过程或想要结合 SIMD 等优化时,这很有用。以下示
例使用 res 对数据中的所有元素执行求和缩减:
std::vector<double> data(100000);
double res = 1.0;
taskflow.reduce_by_index(
// index range
kthread::IndexRange<size_t>(0, N, 1),
// final result
res,
// local reducer
[&](kthread::IndexRange<size_t> subrange, std::optional<double> running_total) {
double residual = running_total ? *running_total : 0.0;
for(size_t i=subrange.begin(); i<subrange.end(); i+=subrange.step_size()) {
data[i] = 1.0; // we initialize the data here
residual += data[i];
}
printf("partial sum = %lf\n", residual);
return residual;
},
// global reducer
std::plus<double>()
);
executor.run(taskflow).wait();
assert(res == 100001);
本地缩减器 lop 为每个子范围计算部分和,全局缩减器 gop 将部分结果组合成最终结果并将其存储在 res 中,其初始
值(即此处的 1.0)也参与缩减过程。本地缩减器的第二个参数是 std::optional 类型,它表示到此子范围的当前部分和。显
然,第一个子范围没有任何部分和,因为没有来自先前子范围的运行总数(即 running_total 是 std::nullopt)。
配置分区器
您可以为并行缩减任务配置分区器,使其使用不同的调度方法运行,例如引导式分区、动态分区和静态分区 。以下示例使用两个不同的 分区器创建两个并行缩减任务,一个使用静态分区算法,另一个使用引导式分区算法:
kthread::StaticPartitioner static_partitioner;
kthread::GuidedPartitioner guided_partitioner;
int sum1 = 100, sum2 = 100;
std::vector<int> vec = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// create a parallel-reduction task with static partitioner
taskflow.reduce(vec.begin(), vec.end(), sum1,
[] (int l, int r) { return l + r; },
static_partitioner
);
// create a parallel-reduction task with guided partitioner
taskflow.reduce(vec.begin(), vec.end(), sum2,
[] (int l, int r) { return l + r; },
guided_partitioner
);
注意 默认情况下,如果未指定分区器,则并行缩减任务将使用 kthread::DefaultPartitioner。
更多信息请参见算法索引