并行迭代
Taskflow 提供模板函数来构建任务以对一系列项目执行并行迭代。
头文件
#include <kthread/algorithm/for_each.h>
创建基于索引的并行迭代任务
基于索引的并行 for 使用给定的步长在范围 [first, last) 上执行并行迭代。由 kthread::for_each_index(B first, E last, S step, C callable, P part) 创建的任务表示并行执行以下循环:
// positive step
for(auto i=first; i<last; i+=step) {
callable(i);
}
// negative step
for(auto i=first; i>last; i+=step) {
callable(i);
}
我们仅支持基于整数的范围。范围可以正向或负向。
taskflow.for_each_index(0, 100, 2, [](int i) { }); // 50 loops with a + step
taskflow.for_each_index(100, 0, -2, [](int i) { }); // 50 loops with a - step
请注意,正方向或负方向是根据范围 [first, last) 定义的,其中不包括 end。在正情况下,50 个项目为 0、2、4、6、8、...、96、98。在负 情况下,50 个项目为 100、98、96、04、... 4、2。下面是 12 名工人以下正情况下的任务流图示例:
重载 kthread::for_each_index(R range, C callable, P part) 无需明确指定每个索引调用的索引范围和可调用函数,而是为您提供了
一种更灵活的方式来迭代索引的子范围。此重载使用 kthread::IndexRange 将范围划分为子范围,从而可以更精细地控制每个子范围的处理方式。
例如,下面的代码使用两种不同的方法执行相同的操作:
std::vector<int> data1(100), data2(100);
// Approach 1: initialize data1 using explicit index range
taskflow.for_each_index(0, 100, 1, [&](int i){ data1[i] = 10; });
// Approach 2: initialize data2 using kthread::IndexRange
kthread::IndexRange<int> range(0, 100, 1);
taskflow.for_each_index(range, [&](kthread::IndexRange<int> subrange){
for(int i=subrange.begin(); i<subrange.end(); i+=subrange.step_size()) {
data2[i] = 10;
}
});
两种方法都会产生相同的结果,但第二种方法在每个分区子范围的迭代方式方面提供了更大的灵活性。这对于受益于 SIMD 优 化或其他基于范围的处理策略的应用程序特别有用。
通过引用捕获索引
您可以使用 std::ref 通过引用传递索引,以在依赖任务之间编组参数更新。当创建 for-each-index 任务时范围索引未知,但从另一个任务初始化时,这尤其有用 。
int* vec;
int first, last;
auto init = taskflow.emplace([&](){
first = 0;
last = 1000;
vec = new int[1000];
});
auto pf = taskflow.for_each_index(std::ref(first), std::ref(last), 1,
[&] (int i) {
std::cout << "parallel iteration on index " << vec[i] << '\n';
}
);
// wrong! must use std::ref, or first and last are captured by copy
// auto pf = taskflow.for_each_index(first, last, 1, [&](int i) {
// std::cout << "parallel iteration on index " << vec[i] << '\n';
// });
init.precede(pf);
当 init 完成时,parallel-for 任务 pf 将看到 first 为 0,last 为 1000,并对 1000 个项目执行并行迭代。
创建基于迭代器的并行迭代任务
基于迭代器的 parallel-for 在由两个 STL 样式迭代器 first 和 last 指定的范围内执行并行迭代。由 kthread::for_each(B first, E last, C callable, P part)
创建的任务表示以下循环的并行执行:
for(auto i=first; i<last; i++) {
callable(*i);
}
kthread::for_each(B first, E last, C callable, P&& part) 同时将可调用函数应用于通过取消引用范围 [first, last) 中的每个迭代器
获得的对象。用户有责任确保范围在并行 for 任务执行期间有效。迭代器必须定义后增量运算符 ++。
std::vector<int> vec = {1, 2, 3, 4, 5};
taskflow.for_each(vec.begin(), vec.end(), [](int i){
std::cout << "parallel for on item " << i << '\n';
});
std::list<std::string> list = {"hi", "from", "t", "a", "s", "k", "f", "low"};
taskflow.for_each(list.begin(), list.end(), [](const std::string& str){
std::cout << "parallel for on item " << str << '\n';
});
通过引用捕获迭代器
与 kthread::for_each_index 类似,kthread::for_each 的迭代器经过模板化,允许通过引用捕获范围参数,这样一项任务可以在另
一个任务执行并行 for 算法之前设置范围。例如:
std::vector<int> vec;
std::vector<int>::iterator first, last;;
kthread::Task init = taskflow.emplace([&](){
vec.resize(1000);
first = vec.begin();
last = vec.end();
});
kthread::Task pf = taskflow.for_each(std::ref(first), std::ref(last), [&](int i) {
std::cout << "parallel iteration on item " << i << '\n';
});
// wrong! must use std::ref, or first and last are captured by copy
// kthread::Task pf = taskflow.for_each(first, last, [&](int i) {
// std::cout << "parallel iteration on item " << i << '\n';
// });
init.precede(pf);
当 init 完成时,parallel-for 任务 pf 将看到 first 指向 vec 的开头,last 指向 vec 的结尾,并对 1000 个项目
执行并行迭代。这两个任务形成一个端到端任务图,其中 parallel-for 的参数是动态计算的。
配置分区器
您可以为并行迭代任务配置分区器,使其使用不同的调度方法运行,例如引导式分区、动态分区和静态分区。以下示例使用两个不同的分区器创建两个并 行迭代任务,一个使用静态分区算法,另一个使用引导式分区算法:
std::vector<int> vec(1024, 0);
// create two partitioners with a chunk size of 10
kthread::StaticPartitioner static_partitioner(10);
kthread::GuidedPartitioner guided_partitioner(10);
// create a parallel-iteration task with static partitioner
taskflow.for_each(
vec.begin(), vec.end(), [&](int i) {
std::cout << "parallel iteration on item " << i << '\n';
},
static_partitioner
);
// create a parallel-iteration task with guided partitioner
taskflow.for_each(
vec.begin(), vec.end(), [&](int i) {
std::cout << "parallel iteration on item " << i << '\n';
},
guided_partitioner
);
注意
默认情况下,如果未指定分区器,则并行迭代任务将使用 kthread::DefaultPartitioner。
更多信息请参见算法索引