Skip to main content
Version: 1.1.1

任务并行流水线

Taskflow 为您提供了任务并行管道编程框架,以便您实现管道算法。管道并行是指通过线性管道或阶段链并行执行多个 数据Token。每个阶段处理从上一个阶段发送的数据Token,将给定的可调用函数应用于该数据Token,然后将结果发送到下一个 阶段。可以跨不同阶段同时处理多个数据Token

头文件

#include <kthread/algorithm/pipeline.h>

理解管道调度框架

kthread::Pipeline 对象是一个可组合图,用于通过任务流中的模块任务创建流水线调度框架(请参阅可组合任务)。与传统的流水线编程 框架(例如 Intel TBB 并行流水线)不同,Taskflow 的流水线算法不提供任何数据抽象,这通常会限制用户优化其应用程序中的数据布 局,但它提供了一个灵活的框架,让用户能够在高效的流水线调度框架之上自定义其应用程序数据。

上图给出了我们的流水线调度框架的示例。该框架由三个管道(串行-并行-串行阶段)和四条线路(最大并行性)组成,其中每条线路最多处理一个数 据Token。三管道四条线路的流水线将通过三个管道的顺序链传播每个数据Token,并且可以同时在四条线路上处理最多四个数据Token。每条边代表一个任务依 赖关系。例如,从线路 0 中的管道 0 到管道 1 的边表示第一条线路中第一和第二个管道之间的任务依赖关系;从线路 0 中的管道 0 到线路 1 中的 管道 0 的边表示在同一管道上处理两个数据Token时两个相邻线路之间的任务依赖关系。每个管道可以是串行类型或并行类型,其中串行管道按顺序处理 数据Token,而并行管道同时处理不同的数据Token

info

注意 由于管道的性质,Taskflow 要求第一个管道为串行类型。管道调度算法以循环方式运行,其系数为行数。

创建任务并行管道模块任务

Taskflow 利用现代 C++ 和模板技术在设计管道编程模型时在表现力和通用性之间取得平衡。一般来说,创建任务并行管道应用程序有三个步骤:

  • 定义管道结构(例如,管道类型、管道可调用、停止规则、行数)
  • 定义数据存储和布局(如果应用程序需要)
  • 使用组合定义管道任务流图

以下代码使用上一节中的示例创建管道调度框架。该框架总共调度了五个调度Token,标记为 04。第一个管道将Token标识符存储在自定义数据存储缓冲区中,其 余每个管道将前一个管道的结果中的输入数据加一,并将结果存储到缓冲区中的相应行条目中。

 1: kthread::Taskflow taskflow;
2: kthread::Executor executor;
3:
4: // maximum parallelism - each line processes one token at a time
5: const size_t num_lines = 4;
6:
7: // custom data storage
8: std::array<int, num_lines> buffer;
9:
10: // the pipeline consists of three pipes (serial-parallel-serial)
11: // and up to four concurrent scheduling tokens
12: kthread::Pipeline pl(num_lines,
13: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer](kthread::Pipeflow& pf) {
14: // generate only 5 scheduling tokens
15: if(pf.token() == 5) {
16: pf.stop();
17: }
18: // save the result of this pipe into the buffer
19: else {
20: printf("pipe 0: input token = %zu\n", pf.token());
21: buffer[pf.line()] = pf.token();
22: }
23: }},
24:
25: kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer](kthread::Pipeflow& pf) {
26: printf(
27: "pipe 1: input buffer[%zu] = %d\n",
28: pf.line(), buffer[pf.line()]
29: );
30: // propagate the previous result to this pipe by adding one
31: buffer[pf.line()] = buffer[pf.line()] + 1;
32: }},
33:
34: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer](kthread::Pipeflow& pf) {
35: printf(
36: "pipe 2: input buffer[%zu] = %d\n",
37: pf.line(), buffer[pf.line()]
38: );
39: // propagate the previous result to this pipe by adding one
40: buffer[pf.line()] = buffer[pf.line()] + 1;
41: }}
42: );
43:
44: // build the pipeline graph using composition
45: kthread::Task pipeline = taskflow.composed_of(pl).name("pipeline");
46:
47: // execute the taskflow
48: executor.run(taskflow).wait();

总结:

  • 第 4-5 行定义流水线调度框架的结构

  • 第 8 行将数据存储定义为 num_lines 个整数的一维数组

  • 第 12 行定义流水线中的行数

  • 第 13-23 行定义第一个串行管道,它将在第五个标记处停止流水线调度

  • 第 25-32 行定义第二个并行管道

  • 第 34-41 行定义第三个串行管道

  • 第 45 行使用组合定义流水线任务流图

  • 第 48 行执行任务流

Taskflow 利用 Interact with the RuntimeComposable Tasking 来实现流水线调度框架。此流水线示例的任务 流图如下所示,其中

  1. 一个条件任务用于决定运行哪个运行时任务;
  2. 四个运行时任务分别用于在四条并行线上调度Token

在这个例子中,我们将数据存储 buffer 自定义为一个 4 个整数的一维数组,因为管道结构只定义了 4 条并行的流水线。buffer store 的每个条 目存储的是对应流水线中正在处理的数据。例如 buffer[1] 存储的是流水线 1 中处理好的数据。下图是 buffer 的数据布局。

info

注意 实际应用中,可能需要为缓冲区的数据类型添加填充或使其与缓存行大小对齐,以避免错误共享。如果不同管道的数据类型不同,可以使用 std::variant 将数据类型存储在统一的存储中。

对于每个调度Token,您可以使用 kthread::Pipeflow::line() 获取其线路标识符,并使用 kthread::Pipeflow::pipe() 获取其管道标识符。 例如,如果调度Token位于第四条线路的第三个管道,则 kthread::Pipeflow::line() 将返回 3,而 kthread::Pipeflow::pipe() 将返 回 2(索引从 0 开始)。要停止管道的执行,您需要在第一个管道处调用 kthread::Pipeflow::stop()。一旦触发停止信号,管道将停止调度可调用 函数之后的任何新Token。从这个例子中我们可以看出,kthread::Pipeline 让您完全控制在管道调度框架之上自定义应用程序数据。

info

注意 在第一个管道之外调用 kthread::Pipeflow::stop() 对管道调度没有影响。 在大多数情况下,std::thread::hardware_concurrency 是行数的一个好数字。

我们的流水线算法以循环方式调度Token,因子为 num_lines。也就是说,Token t 将在第 t % num_lines 行进行处理。以下 代码片段显示了此流水线程序可能的输出之一:

pipe 0: input token = 0
pipe 1: input buffer[0] = 0
pipe 2: input buffer[0] = 1
pipe 0: input token = 1
pipe 1: input buffer[1] = 1
pipe 2: input buffer[1] = 2
pipe 0: input token = 2
pipe 1: input buffer[2] = 2
pipe 2: input buffer[2] = 3
pipe 0: input token = 3
pipe 1: input buffer[3] = 3
pipe 2: input buffer[3] = 4
pipe 0: input token = 4
pipe 1: input buffer[0] = 4
pipe 2: input buffer[0] = 5

总共有五个Token穿过三个管道。每个管道都打印其输入数据值,除了第一个管道打印其Token标识符。由于第二个管道是并行管道,因此输出可以交错。

将管道与其他任务连接起来

您可以将管道模块任务与其他任务连接起来,以创建嵌入一个或多个管道算法的任务流应用程序。我们在下面描述了三个常见示例:

示例 1:迭代管道

此示例模拟了一个数据流应用程序,该应用程序使用条件任务通过管道迭代运行数据流。任务流图由一个管道模块任务和一个条件任务组成。 管道模块任务处理数据流。条件任务决定数据的可用性,并在下一个数据流可用时重新运行管道。

 1: kthread::Taskflow taskflow;
2: kthread::Executor executor;
3:
4: const size_t num_lines = 4; // maximum parallelism of the pipeline
5:
6: int i = 0, N = 0;
7: // custom data storage
8: std::array<int, num_lines> buffer;
9:
10: // the pipeline consists of three pipes (serial-parallel-serial)
11: // and up to four concurrent scheduling tokens
12: kthread::Pipeline pl(num_lines,
13: kthread::Pipe{kthread::PipeType::SERIAL, [&i, &buffer](kthread::Pipeflow& pf) {
14: // only 5 scheduling tokens are processed
15: if(i++ == 5) {
16: pf.stop();
17: }
18: // save the result of this pipe into the buffer
19: else {
20: printf("stage 0: input token = %zu\n", pf.token());
21: buffer[pf.line()] = pf.token();
22: }
23: }},
24:
25: kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer](kthread::Pipeflow& pf) {
26: printf(
27: "stage 1: input buffer[%zu] = %d\n",
28: pf.line(), buffer[pf.line()]
29: );
30: // propagate the previous result to this pipe by adding one
31: buffer[pf.line()] = buffer[pf.line()] + 1;
32: }},
33:
34: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer](kthread::Pipeflow& pf) {
35: printf(
36: "stage 2: input buffer[%zu] = %d\n",
37: pf.line(), buffer[pf.line()]
38: );
39: // propagate the previous result to this pipe by adding one
40: buffer[pf.line()] = buffer[pf.line()] + 1;
41: }}
42: );
43:
44: kthread::Task conditional = taskflow.emplace([&N, &i](){
45: i = 0;
46: if (++N < 2) {
47: std::cout << "Rerun the pipeline\n";
48: return 0;
49: }
50: else {
51: return 1;
52: }
53: }).name("conditional");
54:
55: // build the pipeline graph using composition
56: kthread::Task pipeline = taskflow.composed_of(pl)
57: .name("pipeline");
58: kthread::Task initial = taskflow.emplace([](){ std::cout << "initial\n"; })
59: .name("initial");
60: kthread::Task stop = taskflow.emplace([](){ std::cout << "stop\n"; })
61: .name("stop");
62:
63: // specify the graph dependency
64: initial.precede(pipeline);
65: pipeline.precede(conditional);
66: conditional.precede(pipeline, stop);
67:
68: // execute the taskflow
69: executor.run(taskflow).wait();

总结:

  • 4-5 行定义流水线调度框架的结构

  • 8 行将数据存储为一维数组(num_lines 整数)

  • 12 行定义流水线中的行数

  • 13-23 行定义第一个串行管道,当 i5 时将停止流水线调度

  • 25-32 行定义第二个并行管道

  • 34-41 行定义第三个串行管道

  • 44-53 行定义条件任务,当 N 小于 2 时返回 0,否则返回 1

  • 45 行重置变量 i

  • 56-57 行使用组合定义流水线图

  • 58-61 行定义两个静态任务

  • 64-66 行定义任务依赖关系

  • 69 行执行任务流

此流水线示例的任务流图如下所示:

以下代码片段显示了一种可能的输出:

initial
stage 0: input token = 0
stage 1: input buffer[0] = 0
stage 2: input buffer[0] = 1
stage 0: input token = 1
stage 1: input buffer[1] = 1
stage 2: input buffer[1] = 2
stage 0: input token = 2
stage 1: input buffer[2] = 2
stage 2: input buffer[2] = 3
stage 0: input token = 3
stage 1: input buffer[3] = 3
stage 2: input buffer[3] = 4
stage 0: input token = 4
stage 1: input buffer[0] = 4
stage 2: input buffer[0] = 5
Rerun the pipeline
stage 0: input token = 5
stage 1: input buffer[1] = 5
stage 2: input buffer[1] = 6
stage 0: input token = 6
stage 1: input buffer[2] = 6
stage 2: input buffer[2] = 7
stage 0: input token = 7
stage 1: input buffer[3] = 7
stage 2: input buffer[3] = 8
stage 0: input token = 8
stage 1: input buffer[0] = 8
stage 2: input buffer[0] = 9
stage 0: input token = 9
stage 1: input buffer[1] = 9
stage 2: input buffer[1] = 10
stop

管道在条件任务的控制下运行两次。管道第二次运行的起始Token5,而不是 0,因为管道保持有状态 的Token数量。最后一个Token9,这意味着管道总共处理 10 个调度Token。前五个TokenToken 04)在第 一次运行中处理,其余五个TokenToken 59)在第二次运行中处理。在条件任务中,我们使用 N 作为决策计数器来处理下一个数据流。

示例 2:连接两个管道

此示例演示了两个串联管道,其中一系列数据Token从一个管道同步运行到另一个管道。第一个管道任务先于第二个管道任务。

 1: kthread::Taskflow taskflow("pipeline");
2: kthread::Executor executor;
3:
4: // define the maximum parallelism of the pipeline
5: const size_t num_lines = 4;
6:
7: // custom data storage
8: std::array<int, num_lines> buffer_1;
9: std::array<int, num_lines> buffer_2;
10:
11: // the pipeline_1 consists of three pipes (serial-parallel-serial)
12: // and up to four concurrent scheduling tokens
13: kthread::Pipeline pl_1(num_lines,
14: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_1](kthread::Pipeflow& pf) mutable{
15: // generate only 4 scheduling tokens
16: if(pf.token() == 4) {
17: pf.stop();
18: }
19: // save the result of this pipe into the buffer
20: else {
21: printf("pipeline 1, pipe 0: input token = %zu\n", pf.token());
22: buffer_1[pf.line()] = pf.token();
23: }
24: }},
25:
26: kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer_1](kthread::Pipeflow& pf) {
27: printf(
28: "pipeline 1, pipe 1: input buffer_1[%zu] = %d\n",
29: pf.line(), buffer_1[pf.line()]
30: );
31: // propagate the previous result to this pipe by adding one
32: buffer_1[pf.line()] = buffer_1[pf.line()] + 1;
33: }},
34:
35: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_1](kthread::Pipeflow& pf) {
36: printf(
37: "pipeline 1, pipe 2: input buffer_1[%zu] = %d\n",
38: pf.line(), buffer_1[pf.line()]
39: );
40: // propagate the previous result to this pipe by adding one
41: buffer_1[pf.line()] = buffer_1[pf.line()] + 1;
42: }}
43: );
44:
45: // the pipeline_2 consists of three pipes (serial-parallel-serial)
46: // and up to four concurrent scheduling tokens
47: kthread::Pipeline pl_2(num_lines,
48: kthread::Pipe{kthread::PipeType::SERIAL,
49: [&buffer_2, &buffer_1](kthread::Pipeflow& pf) mutable{
50: // generate only 4 scheduling tokens
51: if(pf.token() == 4) {
52: pf.stop();
53: }
54: // save the result of this pipe into the buffer
55: else {
56: printf("pipeline 2, pipe 0: input value = %d\n", buffer_1[pf.line()]);
57: buffer_2[pf.line()] = buffer_1[pf.line()];
58: }
59: }},
60:
61: kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer_2](kthread::Pipeflow& pf) {
62: printf(
63: "pipeline 2, pipe 1: input buffer_2[%zu] = %d\n",
64: pf.line(), buffer_2[pf.line()]
65: );
66: // propagate the previous result to this pipe by adding 1
67: buffer_2[pf.line()] = buffer_2[pf.line()] + 1;
68: }},
69:
70: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_2](kthread::Pipeflow& pf) {
71: printf(
72: "pipeline 2, pipe 2: input buffer_2[%zu] = %d\n",
73: pf.line(), buffer_2[pf.line()]
74: );
75: // propagate the previous result to this pipe by adding 1
76: buffer_2[pf.line()] = buffer_2[pf.line()] + 1;
77: }}
78: );
79:
80: // build the pipeline graph using composition
81: kthread::Task pipeline_1 = taskflow.composed_of(pl_1).name("pipeline_1");
82: kthread::Task pipeline_2 = taskflow.composed_of(pl_2).name("pipeline_2");
83:
84: // specify the graph dependency
85: pipeline_1.precede(pipeline_2);
86:
87: // execute the taskflow
88: executor.run(taskflow).wait();

总结:

  • 第 8 行定义管道 pl_1 的数据存储(num_lines 个整数)

  • 第 9 行定义管道 pl_2 的数据存储(num_lines 个整数)

  • 第 14-24 行定义 pl_1 中的第一个串行管道

  • 第 26-33 行定义 pl_1 中的第二个并行管道

  • 第 35-42 行定义 pl_1 中的第三个串行管道

  • 第 48-59 行定义 pl_2 中的第一个串行管道,该管道将 pl_1 的结果作为输入

  • 第 61-68 行定义 pl_2 中的第二个并行管道

  • 第 70-77 行定义 pl_2 中的第三个串行管道

  • 第 81-82 行使用组合定义管道图

  • 第 85 行定义任务依赖关系

  • 第 88 行运行任务流

此管道示例的任务流图如下所示:

以下代码片段显示了一种可能的输出:

pipeline 1, pipe 0: input token = 0
pipeline 1, pipe 1: input buffer_1[0] = 0
pipeline 1, pipe 2: input buffer_1[0] = 1
pipeline 1, pipe 0: input token = 1
pipeline 1, pipe 1: input buffer_1[1] = 1
pipeline 1, pipe 2: input buffer_1[1] = 2
pipeline 1, pipe 0: input token = 2
pipeline 1, pipe 1: input buffer_1[2] = 2
pipeline 1, pipe 2: input buffer_1[2] = 3
pipeline 1, pipe 0: input token = 3
pipeline 1, pipe 1: input buffer_1[3] = 3
pipeline 1, pipe 2: input buffer_1[3] = 4
pipeline 2, pipe 1: input value = 2
pipeline 2, pipe 2: input buffer_2[0] = 2
pipeline 2, pipe 3: input buffer_2[0] = 3
pipeline 2, pipe 1: input value = 3
pipeline 2, pipe 2: input buffer_2[1] = 3
pipeline 2, pipe 3: input buffer_2[1] = 4
pipeline 2, pipe 1: input value = 4
pipeline 2, pipe 2: input buffer_2[2] = 4
pipeline 2, pipe 3: input buffer_2[2] = 5
pipeline 2, pipe 1: input value = 5
pipeline 2, pipe 2: input buffer_2[3] = 5
pipeline 2, pipe 3: input buffer_2[3] = 6

管道 pl_1pl_2 的输出在每次运行中可能有所不同,因为它们的第二个管道都是并行类型。 由于管道 pl_1 和管道 pl_2 之间的任务依赖性,pl_1 的输出先于 pl_2 的输出。

示例 3:定义多个并行管道

此示例创建了两个在不同数据集上并行运行的独立管道。

 1: kthread::Taskflow taskflow("pipeline");
2: kthread::Executor executor;
3:
4: // define the maximum parallelism of the pipeline
5: const size_t num_lines = 4;
6:
7: // custom data storage
8: std::array<int, num_lines> buffer_1;
9: std::array<int, num_lines> buffer_2;
10:
11: // the pipeline_1 consists of three pipes (serial-parallel-serial)
12: // and up to four concurrent scheduling tokens
13: kthread::Pipeline pl_1(num_lines,
14: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_1](kthread::Pipeflow& pf) mutable{
15: // generate only 5 scheduling tokens
16: if(pf.token() == 5) {
17: pf.stop();
18: }
19: // save the result of this pipe into the buffer
20: else {
21: printf("pipeline 1, pipe 0: input token = %zu\n", pf.token());
22: buffer_1[pf.line()] = pf.token();
23: }
24: }},
25:
26: kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer_1](kthread::Pipeflow& pf) {
27: printf(
28: "pipeline 1, pipe 1: input buffer_1[%zu] = %d\n",
29: pf.line(), buffer_1[pf.line()]
30: );
31: // propagate the previous result to this pipe by adding one
32: buffer_1[pf.line()] = buffer_1[pf.line()] + 1;
33: }},
34:
35: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_1](kthread::Pipeflow& pf) {
36: printf(
37: "pipeline 1, pipe 2: input buffer_1[%zu] = %d\n",
38: pf.line(), buffer_1[pf.line()]
39: );
40: // propagate the previous result to this pipe by adding one
41: buffer_1[pf.line()] = buffer_1[pf.line()] + 1;
42: }}
43: );
44:
45: // the pipeline_2 consists of three pipes (serial-parallel-serial)
46: // and up to four concurrent scheduling tokens
47: kthread::Pipeline pl_2(num_lines,
48: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_2](kthread::Pipeflow& pf) mutable{
49: // generate only 2 scheduling tokens
50: if(pf.token() == 5) {
51: pf.stop();
52: }
53: // save the result of this pipe into the buffer
54: else {
55: printf("pipeline 2, pipe 0: input token = %zu\n", pf.token());
56: buffer_2[pf.line()] = "pipeline";
57: }
58: }},
59:
60: kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer_2](kthread::Pipeflow& pf) {
61: printf(
62: "pipeline 2, pipe 1: input buffer_2[%zu] = %d\n",
63: pf.line(), buffer_2[pf.line()]
64: );
65: // propagate the previous result to this pipe by concatenating "_"
66: buffer_2[pf.line()] = buffer_2[pf.line()];
67: }},
68:
69: kthread::Pipe{kthread::PipeType::SERIAL, [&buffer_2](kthread::Pipeflow& pf) {
70: printf(
71: "pipeline 2, pipe 2: input buffer_2[%zu] = %d\n",
72: pf.line(), buffer_2[pf.line()]
73: );
74: // propagate the previous result to this pipe by concatenating "2"
75: buffer_2[pf.line()] = buffer_2[pf.line()];
76: }}
77: );
78:
79: kthread::Task pipeline_1 = taskflow.composed_of(pl_1)
80: .name("pipeline_1");
81: kthread::Task pipeline_2 = taskflow.composed_of(pl_2)
82: .name("pipeline_2");
83: kthread::Task initial = taskflow.emplace([](){ std::cout << "initial"; })
84: .name("initial");
85:
86: initial.precede(pipeline_1, pipeline_2);
87:
88: executor.run(taskflow).wait();

总结:

  • 第 8 行定义管道 pl_1 的数据存储(num_lines 个整数)

  • 第 9 行定义管道 pl_2 的数据存储(num_lines 个整数)

  • 第 14-24 行定义 pl_1 中的第一个串行管道

  • 第 26-33 行定义 pl_1 中的第二个并行管道

  • 第 35-42 行定义 pl_1 中的第三个串行管道

  • 第 48-58 行定义 pl_2 中的第一个串行管道

  • 第 60-67 行定义 pl_2 中的第二个并行管道

  • 第 69-76 行定义 pl_2 中的第三个串行管道

  • 第 79-82 行使用组合定义管道图

  • 第 83-84 行定义静态任务。

  • 第 86 行定义任务依赖关系

  • 第 88 行运行任务流

此管道示例的任务流图如下所示:

以下代码片段显示了一种可能的输出:

initial
pipeline 2, pipe 0: input token = 0
pipeline 2, pipe 1: input buffer_2[0] = 0
pipeline 2, pipe 2: input buffer_2[0] = 1
pipeline 1, pipe 0: input token = 0
pipeline 1, pipe 1: input buffer_1[0] = 0
pipeline 1, pipe 2: input buffer_1[0] = 1
pipeline 1, pipe 0: input token = 1
pipeline 1, pipe 1: input buffer_1[1] = 1
pipeline 1, pipe 0: input token = 2
pipeline 1, pipe 1: input buffer_1[2] = 2
pipeline 1, pipe 0: input token = 3
pipeline 1, pipe 1: input buffer_1[3] = 3
pipeline 1, pipe 0: input token = 4
pipeline 1, pipe 1: input buffer_1[0] = 4
pipeline 2, pipe 0: input token = 1
pipeline 2, pipe 1: input buffer_2[1] = 1
pipeline 2, pipe 0: input buffer_2[1] = 2
pipeline 2, pipe 0: input token = 2
pipeline 2, pipe 1: input buffer_2[2] = 2
pipeline 2, pipe 2: input buffer_2[2] = 3
pipeline 2, pipe 0: input token = 3
pipeline 2, pipe 1: input buffer_2[3] = 3
pipeline 2, pipe 2: input buffer_2[3] = 4
pipeline 2, pipe 0: input token = 4
pipeline 2, pipe 1: input buffer_2[0] = 4
pipeline 2, pipe 2: input buffer_2[0] = 5
pipeline 1, pipe 2: input buffer_1[1] = 2
pipeline 1, pipe 2: input buffer_1[2] = 3
pipeline 1, pipe 2: input buffer_1[3] = 4
pipeline 1, pipe 2: input buffer_1[0] = 5

由于管道 pl_1 和管道 pl_2 并行运行,它们的输出可能会交叉。

重置管道

我们的管道调度框架在每次提交运行时都会保留一定数量的调度Token。您可以使用 kthread::Pipeline::reset() 将管道重置为初始状态, 其中调度Token的数量将在下一次运行中从零开始。借鉴示例 1:迭代管道,下面的程序在第二次迭代(条件任务内部) 时重置管道,因此调度Token将在下一次运行中从零开始。

kthread::Taskflow taskflow("pipeline");
kthread::Executor executor;

// define the maximum parallelism of the pipeline
const size_t num_lines = 4;

// custom data storage
std::array<int, num_lines> buffer;

// the pipeline consists of three pipes (serial-parallel-serial)
// and up to four concurrent scheduling tokens
kthread::Pipeline pl(num_lines,
kthread::Pipe{kthread::PipeType::SERIAL, [&buffer](kthread::Pipeflow& pf) mutable{
// generate only 5 scheduling tokens
if(pf.token() == 5) {
pf.stop();
}
// save the result of this pipe into the buffer
else {
printf("pipe 0: input token = %zu\n", pf.token());
buffer[pf.line()] = pf.token();
}
}},

kthread::Pipe{kthread::PipeType::PARALLEL, [&buffer](kthread::Pipeflow& pf) {
printf(
"pipe 1: input buffer_1[%zu] = %d\n", pf.line(), buffer[pf.line()]
);
// propagate the previous result to this pipe by adding one
buffer[pf.line()] = buffer[pf.line()] + 1;
}},

kthread::Pipe{kthread::PipeType::SERIAL, [&buffer](kthread::Pipeflow& pf) {
printf(
"pipe 2: input buffer[%zu][%zu] = %d\n", pf.line(), buffer[pf.line()]
);
// propagate the previous result to this pipe by adding one
buffer[pf.line()] = buffer[pf.line()] + 1;
}}
);

kthread::Task conditional = taskflow.emplace([&](){
if (++N < 2) {
pl.reset();
std::cout << "Rerun the pipeline\n";
return 0;
}
else {
return 1;
}
}).name("conditional");

kthread::Task pipeline = taskflow.composed_of(pl)
.name("pipeline");
kthread::Task initial = taskflow.emplace([](){ std::cout << "initial"; })
.name("initial");
kthread::Task stop = taskflow.emplace([](){ std::cout << "stop"; })
.name("stop");

initial.precede(pipeline);
pipeline.precede(conditional);
conditional.precede(pipeline, stop);

executor.run(taskflow).wait();

以下代码片段显示了一种可能的输出:

initial
pipe 0: input token = 0
pipe 1: input buffer_1[0] = 0
pipe 2: input buffer_1[0] = 1
pipe 0: input token = 1
pipe 1: input buffer_1[1] = 1
pipe 2: input buffer_1[1] = 2
pipe 0: input token = 2
pipe 1: input buffer_1[2] = 2
pipe 2: input buffer_1[2] = 3
pipe 0: input token = 3
pipe 1: input buffer_1[3] = 3
pipe 2: input buffer_1[3] = 4
pipe 0: input token = 4
pipe 1: input buffer_1[0] = 4
pipe 2: input buffer_1[0] = 5
Rerun the pipeline
pipe 0: input token = 0
pipe 1: input buffer_1[0] = 0
pipe 2: input buffer_1[0] = 1
pipe 0: input token = 1
pipe 1: input buffer_1[1] = 1
pipe 2: input buffer_1[1] = 2
pipe 0: input token = 2
pipe 1: input buffer_1[2] = 2
pipe 2: input buffer_1[2] = 3
pipe 0: input token = 3
pipe 1: input buffer_1[3] = 3
pipe 2: input buffer_1[3] = 4
pipe 0: input token = 4
pipe 1: input buffer_1[0] = 4
pipe 2: input buffer_1[0] = 5
stop

由于第二根管道是并行类型,因此每次运行的输出可能不同。在条件任务的第二次迭代中,我们重置了管道,以便Token标识符从 0 开始,而不是从 5 开始。

info

更多信息请参见算法索引