Skip to main content
Version: 1.1.1

具有标记依赖关系的任务并行流水线

Taskflow 管道允许您将Token的执行推迟到未来的Token。此推迟引入了未来Token与当前Token之间的依赖关系,特别适用 于许多视频编码应用程序。我们建议在学习此接口之前先阅读 任务并行流水线

了解 Token 依赖关系

Token依赖关系确定了数据Token在任务并行管道中执行的顺序。当Token t1 在 t2 开始之前完成时,存在从 t1t2 的依赖关系。我们将Token依赖关系分为两种类型:

  • 前向Token依赖关系 (FTD):从较早Token到未来Token的依赖关系
  • 后向Token依赖关系 (BTD):从未来Token到较早Token的依赖关系下图说明了示例Token依赖关系图及其Token执行 顺序。从Token 2 指向 5 的边是 FTD,从Token 8 指向 2 和 5 以及从Token 9 指向 5 的边是 BTD。根据依赖关 系,Token按相应的执行顺序执行。

解决Token依赖关系

要解决 Token 依赖关系,基本思想是推迟执行具有未解决依赖关系的 Token,并将 Token 保存在数据结构中,直到其依赖 关系得到解决。为了实现这个想法,我们利用了三种数据结构,deferred_tokens (DT)token_dependencies (TD)ready_tokens (RT)DTTD 是关联容器,RT 是队列。DT 存储延迟 Token 及其依赖项,延迟 Token 就是通过这些依赖 项延迟的。TD 存储依赖项及其相关的延迟 TokenRT 存储曾经是延迟 Token 但现在由于依赖关系已解决而准备就绪的 token。下 图说明了如何使用这三种数据结构来解析 Token 依赖关系并获取理解 Token 依赖关系中举例说明的相应串行执行序列。

整个过程有以下步骤:

  1. 标记 1 不是延迟标记,因此 1 完成。现在执行顺序为 {1}

  2. 标记 2 延迟到 8。我们插入 DT[2]={8}TD[8]={2}。上图中的黑色圆圈 2 说明了这一步。

  3. 标记 3 不是延迟标记,因此 3 完成。现在执行顺序为 {1,3}。

  4. 标记 4 不是延迟标记,因此 4 完成。现在执行顺序为 {1,3,4}

  5. 标记 5 延迟到 27。我们插入 DT[5]={2,7}TD[2]={5}TD[7]={5}。上图中的黑色圆圈 5 说明了这一步。

  6. 标记 6 不是延迟标记,因此 6 已完成。现在执行顺序为 {1,3,4,6}

  7. 标记 7 不是延迟标记,因此 7 已完成。现在执行顺序为 {1,3,4,6,7}。由于 TD[7]={5},我们直接从 DT[5] 中删除 7。 上图中的黑色圆圈 7 说明了这一步骤。

  8. 标记 8 不是延迟标记,因此 8 已完成。现在执行顺序为 {1,3,4,6,7,8}。由于 TD[8]={2},我们直接 从 DT[2] 中删除 8,并发现 DT[2] 为空。现在标记 2 不再是延迟标记,我们将 2 移至 RT。上图中的黑色圆圈 8 说明了这一步骤。

  9. RT 不为空,且有一个 token 2,那么我们把 2 运行完,现在执行顺序是 {1,3,4,6,7,8,2}。由于 TD[2]={5},我们直接 从 DT[5] 中移除 2,发现 DT[5] 为空,此时 token 5 不再是延迟 token,我们将 5 移到 RT,上图中黑色圆圈 9 就是这个步骤的说明。

  10. RT不为空,且有token 5,然后我们运行5,发现token 5第二次延迟,延迟到9,我们插入DT[5]={9}TD[9]={5},上图中黑色圆圈20说明了这一步。

  11. token 9不是延迟token,所以9执行完毕,现在执行顺序为{1,3,4,6,7,8,2,9}。由于TD[9]={5},我们直接从DT[5]中移除9,发现DT[5]为空,现在token 5 不再是延迟token,我们将5移到RT,上图中黑色圆圈11说明了这一步。

  12. RT不为空,且有token 5,然后我们运行完毕5,现在执行顺序为{1,3,4,6,7,8,2,9,5},上图中黑色圆圈12说明了这一步。

  13. Token 10 不是延迟Token,因此 10 已完成。现在执行顺序为 {1,3,4,6,7,8,2,9,5,10}

创建延迟管道模块任务

要创建延迟管道应用程序,需要四个步骤,比创建任务并行管道 (kthread::Pipeline) 多一个步骤:

  1. 定义管道结构(例如,管道类型、管道可调用、停止规则、行数)

  2. 定义第一个管道上的Token依赖关系

  3. 定义数据存储和布局(如果应用程序需要)

  4. 使用组合定义管道任务流图

以下示例演示了如何创建延迟管道应用程序,该应用程序在“了解Token依赖关系”中的依赖关系图中进行了示例说明。该示例创建了一个延迟管道,该 管道总共生成 10 个数据Token。管道结构由四条线和三个阶段(所有串行类型)组成。

 1: kthread::Taskflow taskflow("deferred_pipeline");
2: kthread::Executor executor;
3:
4: const size_t num_lines = 4;
5:
6: // the pipeline consists of three pipes (serial-serial-serial)
7: // and up to four concurrent scheduling tokens
8: kthread::Pipeline pl(num_lines,
9: kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
10: // stop at 11 scheduling tokens
11: if(pf.token() == 11) {
12: pf.stop();
13: }
14: else {
15: // Token 5 is deferred
16: if (pf.token() == 5) {
17: switch(pf.num_deferrals()) {
18: case 0:
19: pf.defer(2);
20: printf("1st-time: Token %zu is deferred by 2\n", pf.token());
21: pf.defer(7);
22: printf("1st-time: Token %zu is deferred by 7\n", pf.token());
23: return;
24: break;
25:
26: case 1:
27: pf.defer(9);
28: printf("2nd-time: Token %zu is deferred by 9\n", pf.token());
29: return;
30: break;
31:
32: case 2:
33: printf("3rd-time: Tokens 2, 7 and 9 resolved dependencies \
for token %zu\n", pf.token());
34: break;
35: }
36: }
37: // token 2 is deferred
38: else if (pf.token() == 2) {
39: switch(pf.num_deferrals()) {
40: case 0:
41: pf.defer(8);
42: printf("1st-time: Token %zu is deferred by 8\n", pf.token());
43: break;
44: case 1:
45: printf("2nd-time: Token 8 resolved dependencies for token %zu\n",
pf.token());
46: break;
47: }
48: }
49: else {
50: printf("stage 1: Non-deferred token %zu\n", pf.token());
51: }
52: }
53: }},
54:
55: kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
56: printf("stage 2: input token %zu (deferrals=%zu)\n",
pf.token(), pf.num_deferrals());
57: }},
58:
59: kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
60: printf("stage 3: input token %zu\n", pf.token());
61: }}
62: );
63:
64: kthread::Task task = taskflow.composed_of(pl);
65:
66: // run the pipeline
67: executor.run(taskflow).wait();

总结:

  • 第 8 行定义管道中的行数

  • 第 9-52 行定义第一个串行管道,它将在第 11 个Token处停止管道调度

  • 第 15-30 行定义Token 5 的Token依赖关系

  • 第 37-48 行定义Token 2 的Token依赖关系

  • 第 55-57 行定义第二个串行管道

  • 第 59-61 行定义第三个串行管道

  • 第 64 行使用组合定义管道任务流图

  • 第 67 行执行任务流

以下是示例的可能结果之一。

stage 1: Non-deferred token 0
stage 2: input token 0 (deferrals=0)
stage 3: input token 0
stage 1: Non-deferred token 1
stage 2: input token 1 (deferrals=0)
stage 3: input token 1
1st-time: Token 2 is deferred by 8
stage 1: Non-deferred token 3
stage 2: input token 3 (deferrals=0)
stage 3: input token 3
stage 1: Non-deferred token 4
stage 2: input token 4 (deferrals=0)
stage 3: input token 4
1st-time: Token 5 is deferred by 2
1st-time: Token 5 is deferred by 7
stage 1: Non-deferred token 6
stage 2: input token 6 (deferrals=0)
stage 1: Non-deferred token 7
stage 2: input token 7 (deferrals=0)
stage 1: Non-deferred token 8
stage 3: input token 6
stage 3: input token 7
stage 2: input token 8 (deferrals=0)
stage 3: input token 8
2nd-time: Token 8 resolved dependencies for token 2
stage 2: input token 2 (deferrals=1)
stage 3: input token 2
2nd-time: Token 5 is deferred by 9
stage 1: Non-deferred token 9
stage 2: input token 9 (deferrals=0)
stage 3: input token 9
3rd-time: Tokens 2, 7 and 9 resolved dependencies for token 5
stage 2: input token 5 (deferrals=2)
stage 3: input token 5
stage 1: Non-deferred token 10
stage 2: input token 10 (deferrals=0)
stage 3: input token 10
info

注意 您只能在第一个管道处指定Token依赖项才能获得Token的串行执行。

创建延迟可扩展管道模块任务

除了任务并行管道 (kthread::Pipeline) 之外,您还可以在任务并行可扩展管道 (kthread::ScalablePipeline) 之上指定 token 依赖项。我们建议在学习此接口之前先阅读任务并行可扩展管道。

要创建延迟可扩展管道应用程序,需要执行四个步骤,这些步骤与创建延迟管道模块任务中描述的步骤相同。它们是:

  1. 定义管道结构(例如,管道类型、管道可调用、停止规则、行数)
  2. 定义第一个管道的Token依赖关系
  3. 定义数据存储和布局(如果应用程序需要)
  4. 使用组合定义管道任务流图

以下代码创建一个延迟可扩展管道,该管道使用四条并行线路通过给定向量中的两个串行管道调度Token,然后将该管道重置为三 个串行管道。这三个管道可调用函数与创建延迟管道模块任务中的代码片段中演示的管道可调用函数相同。 Token依赖关系在了解Token依赖关系中进行了示例说明。

 1: // create a vector of three pipes
2: std::vector< kthread::Pipe<std::function<void(kthread::Pipeflow&)>> > pipes;
3:
4: // define pipe callables
5: // first_pipe_callable is same as lines 15-53 in the above code snippet
6: auto first_pipe_callable = [](kthread::Pipeflow& pf) {
7: // stop at 11 scheduling tokens
8: if(pf.token() == 11) {
9: pf.stop();
10: }
11: else {
12: // Token 5 is deferred
13: if (pf.token() == 5) {
14: switch(pf.num_deferrals()) {
15: case 0:
16: pf.defer(2);
17: printf("1st-time: Token %zu is deferred by 2\n", pf.token());
18: pf.defer(7);
19: printf("1st-time: Token %zu is deferred by 7\n", pf.token());
20: return;
21: break;
22:
23: case 1:
24: pf.defer(9);
25: printf("2nd-time: Token %zu is deferred by 9\n", pf.token());
26: return;
27: break;
28:
29: case 2:
30: printf("3rd-time: Tokens 2, 7 and 9 resolved dependencies for token %zu\n",
pf.token());
31: break;
32: }
33: }
34: // token 2 is deferred
35: else if (pf.token() == 2) {
36: switch(pf.num_deferrals()) {
37: case 0:
38: pf.defer(8);
39: printf("1st-time: Token %zu is deferred by 8\n", pf.token());
40: break;
41: case 1:
42: printf("2nd-time: Token 8 resolved dependencies for token %zu\n",
pf.token());
43: break;
44: }
45: }
46: else {
47: printf("stage 1: Non-deferred token %zu\n", pf.token());
48: }
49: };
50:
51: // second_pipe_callable is same as lines 55-57 in the above code snippet
52: auto second_pipe_callable = [](kthread::Pipeflow& pf){
53: printf("stage 2: input token %zu (deferrals=%zu)\n",
pf.token(), pf.num_deferrals());
54: };
55:
56: // third_pipe_callable is same as lines 59-61 in the above code snippet
57: auto third_pipe_callable = [](kthread::Pipeflow& pf){
58: printf("stage 3: input token %zu\n", pf.token());
59: };
60:
61: pipes.emplace_back(kthread::PipeType::SERIAL, first_pipe_callable);
62: pipes.emplace_back(kthread::PipeType::SERIAL, second_pipe_callable);
63:
64: // create a pipeline of four parallel lines based on the given vector of pipes
65: kthread::ScalablePipeline pl(4, pipes.begin(), pipes.end());
66:
67: kthread::Task task = taskflow.composed_of(pl);
68:
69: // run the pipeline
70: executor.run(taskflow).wait();
71:
72: // reset the pipeline to a new range of three pipes and starts from
73: // the initial state (i.e., token counts from zero)
74: pipes.emplace_back(kthread::PipeType::SERIAL, third_pipe_callable);
75: pl.reset(pipes.begin(), pipes.end());
76:
77: executor.run(taskflow).wait();

总结:

  • 第 2 行定义 kthread::Pipe 类型的向量
  • 第 6-49 行定义第一个管道可调用函数
  • 第 52-54 行定义第二个管道可调用函数
  • 第 57-59 行定义第三个管道可调用函数
  • 第 61-62 行定义两个管道可调用函数的向量
  • 第 65 行定义可扩展管道
  • 第 67 行使用组合定义管道任务流图
  • 第 70 行执行第一次运行的任务流
  • 第 74 行在向量中插入第三个管道可调用函数
  • 第 75 行将管道重置为三个管道可调用函数
  • 第 77 行执行第二次运行的任务流
info

更多信息请参见算法索引