任务并行可扩展流水线
#include <kthread/algorithm/pipeline.h>
创建可扩展管道模块任务
与 kthread::Pipeline 类似,kthread::ScalablePipeline 是一个可组合的图形对象,用于在任务流中实现管道调度框架。
kthread::Pipeline 和 kthread::ScalablePipeline 之间的主要区别在于,可扩展管道可以接受管道的变量分配,而不是在
构建或编程时实例化所有管道。用户定义一个线性范围的管道,每个管道都属于相同的可调用类型,并应用该范围来构建可扩展管道。在连续
运行之间,用户可以将管道重置为不同的管道范围。以下代码创建一个可扩展管道,该管道使用四条并行线路通过给定向量中的三个串行管道调
度Token,然后将该管道重置为五个串行管道的新范围:
kthread::Taskflow taskflow("pipeline");
kthread::Executor executor;
const size_t num_lines = 4;
// create data storage
std::array<int, num_lines> buffer;
// define the pipe callable
auto pipe_callable = [&buffer] (kthread::Pipeflow& pf) mutable {
switch(pf.pipe()) {
// first stage generates only 5 scheduling tokens and saves the
// token number into the buffer.
case 0: {
if(pf.token() == 5) {
pf.stop();
}
else {
printf("stage 1: input token = %zu\n", pf.token());
buffer[pf.line()] = pf.token();
}
return;
}
break;
// other stages propagate the previous result to this pipe and
// increment it by one
default: {
printf(
"stage %zu: input buffer[%zu] = %d\n", pf.pipe(), pf.line(), buffer[pf.line()]
);
buffer[pf.line()] = buffer[pf.line()] + 1;
}
break;
}
};
// create a vector of three pipes
std::vector< kthread::Pipe<std::function<void(kthread::Pipeflow&)>> > pipes;
for(size_t i=0; i<3; i++) {
pipes.emplace_back(kthread::PipeType::SERIAL, pipe_callable);
}
// create a pipeline of four parallel lines based on the given vector of pipes
kthread::ScalablePipeline pl(num_lines, pipes.begin(), pipes.end());
// build the pipeline graph using composition
kthread::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
kthread::Task task = taskflow.composed_of(pl)
.name("pipeline");
kthread::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
// reset the pipeline to a new range of five pipes and starts from
// the initial state (i.e., token counts from zero)
for(size_t i=0; i<2; i++) {
pipes.emplace_back(kthread::PipeType::SERIAL, pipe_callable);
}
pl.reset(pipes.begin(), pipes.end());
executor.run(taskflow).wait();
该程序定义了统一的管道类型 kthread::Pipe<std::function<void(kthread::Pipeflow&)>>,并将所有管道保存在一个可更改的
向量中。然后,它使用两个范围迭代器([first, last))构建一个可扩展的管道,这两个迭代器指向管道向量的开头和结尾,从而
形成一个由三个串行阶段组成的管道:
然后,程序将另外两个管道附加到向量中,并将管道重置为两个附加管道的新范围,从而形成五个串行阶段的管道:
当将可扩展管道重置到新的范围时,它将从初始状态开始,就像刚刚构建一样,即Token数从零开始计数。
注意
与将给定管道保存在 std::tuple 对象中的 kthread::Pipeline 不同,kthread::ScalablePipeline 并不拥有给定的管道,
而是维护给定范围内每个管道的迭代器向量。在执行管道任务期间,您有责任保持这些管道对象处于活动状态。
重置占位符可扩展管道
可以使用构造函数 kthread::ScalablePipeline(size_t num_lines) 创建可 扩展管道作为占位符,并在应用程序的
稍后部分将其重置为另一个范围。以下代码创建一个任务来放置一系列管道,并在运行管道任务之前将管道重置为该范围:
kthread::Executor executor;
kthread::Taskflow taskflow;
size_t num_pipes = 10;
size_t num_lines = 10;
std::vector<kthread::Pipe<std::function<void(kthread::Pipeflow&)>>> pipes;
kthread::ScalablePipeline<typename decltype(pipes)::iterator> spl(num_lines);
kthread::Task init = taskflow.emplace([&](){
for(size_t i=0; i<num_pipes; i++) {
pipes.emplace_back(kthread::PipeType::SERIAL, [&](kthread::Pipeflow& pf) {
if(pf.pipe() == 0 && pf.token() == 1024) {
pf.stop();
return;
}
});
}
spl.reset(pipes.begin(), pipes.end());
}).name("init");
kthread::Task pipeline = taskflow.composed_of(spl).name("pipeline");
pipeline.succeed(init);
executor.run(taskflow).wait();
注意 在运行可扩展管道之前,您有责任确保其结构有效。有效管道必须至少有一条并行线和一个管道,其中第一个管道是串行类型。
std::vector<kthread::Pipe<std::function<void(kthread::Pipeflow&)>>> pipes;
kthread::ScalablePipeline<typename decltype(pipes)::iterator> spl;
// create pipes ...
spl.reset(num_lines, pipes.begin(), pipes.end());
类似地,您可以使用默认构造函数 kthread::ScalablePipeline() 创建一个空的可扩展管道,并在稍后的程序中重置它。
使用其他迭代器类型
当将范围分配给可扩展管道时,管道会将该范围内的所有管道迭代器提取到内部向量中。这种组织允许将管道可调用函数调用为可随机访问的
操作,而不管管道容器类型如何。只要这些管道可以使用后缀增量运算符 ++ 按顺序进行迭代,Taskflow 对迭代器类型没有太多限制。
// use vector to store pipes
std::vector<kthread::Pipe<std::function<void(kthread::Pipeflow&)>>> vector;
kthread::ScalablePipeline spl1(num_lines, vector.begin(), vector.end());
// use list to store pipes
std::list<kthread::Pipe<std::function<void(kthread::Pipeflow&)>>> list;
kthread::ScalablePipeline spl2(num_lines, list.begin(), list.end());
更多信息请参见算法索引