Skip to main content
Version: 1.1.1

任务流

Taskflow 帮助您用现代 C++ 快速编写并行和异构任务程序。

任务流是构建在pthread或者kthread基础上的任务调度框架。底层线程基于pthread或者kthread

为什么使用任务流?

在处理复杂的并行工作负载方面,Taskflow 比许多现有的任务编程框架更快、更具表现力,而且更容易实现嵌入式集成。

Taskflow 让您可以快速实施任务分解策略,该策略结合了常规和不规则的计算模式,并结合高效的 工作窃取 调度程序来优化您的多线程性能。

Static Tasking
TaskflowTTZZZ->TYYY->ZXXX->YSSDDS->DAAS->AEES->ED->TBBD->BA->BE->TCCE->CC->XB->XB->C
Subflow Tasking
Topologycluster_BSubflow_BDDXXD->XCCBBC->BB->DEEB->EB3B3B3->BB2B2B2->B3B1B1B1->B2AAA->DA->CE->XZZE->ZYYX->YY->Z

Taskflow 支持条件任务,让您能够跨相关任务快速做出控制流决策,从而实现使用现有工具难以实现的循环和条件。

Conditional Tasking
Taskflowcluster_p0x7ffc2daeebe0p0x1676c20HIIp0x1676c20->Ip0x1676d50Gp0x1676e80Fp0x1677700cond_2p0x1676e80->p0x1677700p0x1677700->p0x1676c201p0x1677700->p0x1676d500p0x1676fc0EKKp0x1676fc0->Kp0x16770d0Dp0x1677560cond_1p0x16770d0->p0x1677560p0x1677560->p0x1676fc01p0x1677340Bp0x1677560->p0x16773400p0x1677230Cp0x1677230->p0x16770d0p0x1677340->p0x1677230p0x1677450Ap0x1677450->p0x1676e80p0x1677450->p0x1677340Jcond_3I->JJ->J0LLJ->L1MML->M

Taskflow 是可组合的。您可以通过组合模块化和可重复使用的块来创建大型并行图,这些块更容易在单个范围内进行优化。

Taskflow Composition
Taskflowcluster_p0x7ffee0bae020Taskflow: F4cluster_p0x7ffee0badd80Taskflow: F2cluster_p0x7ffee0badce0Taskflow: F1cluster_p0x7ffee0bade20Taskflow: F3p0x7f9ec5801238module3 [Taskflow: F3]p0x7f9ec5801338module2 [Taskflow: F2]p0x7f9ec5801238->p0x7f9ec5801338p0x7f9ec5800a38f2Ap0x7f9ec5801038f2Cp0x7f9ec5800a38->p0x7f9ec5801038p0x7f9ec5801138f2Bp0x7f9ec5801138->p0x7f9ec5801038p0x7f9ec5800f38module1 [Taskflow: F1]p0x7f9ec5800f38->p0x7f9ec5801038p0xaf1Ap0xbf1Bp0xcf1Cp0xdf1Dp0x7f9ec5800e38module2 [Taskflow: F2]p0x7f9ec5800d38f3A

启动您的第一个 Taskflow 程序

以下程序(simple.cpp)创建四个任务 ABCD,其中 ABC 之前运行,而 DBC 之后运行。 当 A 完成时,BC 可以并行运行。 在现场试用 Compiler Explorer (godbolt)!

#include <kthread/kthread.h>  // Taskflow is header-only

int main(){

kthread::TaskExecutor executor;
kthread::Taskflow taskflow;

auto [A, B, C, D] = taskflow.emplace( // create four tasks
[] () { std::cout << "TaskA\n"; },
[] () { std::cout << "TaskB\n"; },
[] () { std::cout << "TaskC\n"; },
[] () { std::cout << "TaskD\n"; }
);

A.precede(B, C); // A runs before B and C
D.succeed(B, C); // D runs after B and C

executor.run(taskflow).wait();

return 0;
}

Visualize Your First Taskflow Program

Taskflow comes with a built-in profiler, for you to profile and visualize taskflow programs in an easy-to-use web-based interface.

# run the program with the environment variable TF_ENABLE_PROFILER enabled
~$ KTHREAD_ENABLE_PROFILER=simple.json ./simple
~$ cat simple.json
[
{"executor":"0","data":[{"worker":0,"level":0,"data":[{"span":[172,186],"name":"0_0","type":"static"},{"span":[187,189],"name":"0_1","type":"static"}]},{"worker":2,"level":0,"data":[{"span":[93,164],"name":"2_0","type":"static"},{"span":[170,179],"name":"2_1","type":"static"}]}]}
]

In addition to execution diagram, you can dump the graph to a DOT format and visualize it using a number of free [GraphViz][GraphViz] tools.

// dump the taskflow graph to a DOT format through std::cout
taskflow.dump(std::cout);
TaskflowAABBA->BCCA->CDDB->DC->D

Express 任务图并行性

Taskflow 为用户提供了静态和动态任务图构造,以在嵌入图内控制流的任务图中表达端到端并行性。

  1. 创建子流图
  2. 将控制流集成到任务图
  3. 编写任务图
  4. 启动异步任务
  5. 执行任务流
  6. 利用标准并行算法

创建子流图

Taskflow 支持动态任务,让您从任务执行中创建子流程图 以执行动态并行。 以下程序生成以任务“B”为父级的任务依赖关系图。

kthread::Task A = taskflow.emplace([](){}).name("A");  
kthread::Task C = taskflow.emplace([](){}).name("C");
kthread::Task D = taskflow.emplace([](){}).name("D");

kthread::Task B = taskflow.emplace([] (kthread::Subflow& subflow) {
kthread::Task B1 = subflow.emplace([](){}).name("B1");
kthread::Task B2 = subflow.emplace([](){}).name("B2");
kthread::Task B3 = subflow.emplace([](){}).name("B3");
B3.succeed(B1, B2); // B3 runs after B1 and B2
}).name("B");

A.precede(B, C); // A runs before B and C
D.succeed(B, C); // D runs after B and C
Taskflowcluster_p0x7f9866c01b70Subflow: Bp0x7f9866c01820Ap0x7f9866c01b70Bp0x7f9866c01820->p0x7f9866c01b70p0x7f9866c01930Cp0x7f9866c01820->p0x7f9866c01930p0x7f9866c01a40Dp0x7f9866c01b70->p0x7f9866c01a40p0x7f9866c01930->p0x7f9866c01a40p0x7f9866d01880B1p0x7f9866d01ac0B3p0x7f9866d01880->p0x7f9866d01ac0p0x7f9866d01ac0->p0x7f9866c01b70p0x7f9866d019a0B2p0x7f9866d019a0->p0x7f9866d01ac0

将控制流集成到任务图

Taskflow 支持条件任务,让您能够跨相关任务快速做出控制流决策,从而在端到端任务图中实现循环和条件。

kthread::Task init = taskflow.emplace([](){}).name("init");
kthread::Task stop = taskflow.emplace([](){}).name("stop");

// creates a condition task that returns a random binary
kthread::Task cond = taskflow.emplace(
[](){ return std::rand() % 2; }
).name("cond");

init.precede(cond);

// creates a feedback loop {0: cond, 1: stop}
cond.precede(cond, stop);
Taskflowcondcondcond->cond0stopstopcond->stop1initinitinit->cond

编写任务图

Taskflow 是可组合的。 您可以通过组合模块化和可重复使用的块来创建大型并行图,这些块更容易在单个范围内进行优化。

kthread::Taskflow f1, f2;

// create taskflow f1 of two tasks
kthread::Task f1A = f1.emplace([]() { std::cout << "Task f1A\n"; })
.name("f1A");
kthread::Task f1B = f1.emplace([]() { std::cout << "Task f1B\n"; })
.name("f1B");

// create taskflow f2 with one module task composed of f1
kthread::Task f2A = f2.emplace([]() { std::cout << "Task f2A\n"; })
.name("f2A");
kthread::Task f2B = f2.emplace([]() { std::cout << "Task f2B\n"; })
.name("f2B");
kthread::Task f2C = f2.emplace([]() { std::cout << "Task f2C\n"; })
.name("f2C");

kthread::Task f1_module_task = f2.composed_of(f1)
.name("module");

f1_module_task.succeed(f2A, f2B)
.precede(f2C);
Taskflowcluster_p0x7ffeeb8ff970Taskflow: f2cluster_p0x7ffeeb8ff8d0Taskflow: f1p0x7ffb03813838f2Cp0x7ffb03813938f2Bp0x7ffb03813b38module [Taskflow: f1]p0x7ffb03813938->p0x7ffb03813b38p0x7ffb03813b38->p0x7ffb03813838p0x7ffb03813a38f2Ap0x7ffb03813a38->p0x7ffb03813b38p0x7ffb03813638f1Bp0x7ffb03813738f1A

启动异步任务

Taskflow 支持异步任务。 您可以异步启动任务以动态探索任务图并行性。

kthread::TaskExecutor executor;

// create asynchronous tasks directly from an executor
std::future<int> future = executor.async([](){
std::cout << "async task returns 1\n";
return 1;
});
executor.silent_async([](){ std::cout << "async task does not return\n"; });

// create asynchronous tasks with dynamic dependencies
kthread::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
kthread::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
kthread::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
kthread::AsyncTask D = executor.silent_dependent_async([](){ printf("D\n"); }, B, C);

executor.wait_for_all();

执行任务流

执行器提供了几种线程安全方法来运行任务流。 您可以运行任务流一次、多次或直到满足停止条件。 这些方法是非阻塞的,并返回kthread::TaskFuture<void>, 以便您查询执行状态。

// runs the taskflow once
kthread::TaskFuture<void> run_once = executor.run(taskflow);

// wait on this run to finish
run_once.get();

// run the taskflow four times
executor.run_n(taskflow, 4);

// runs the taskflow five times
executor.run_until(taskflow, [counter=5](){ return --counter == 0; });

// block the executor until all submitted taskflows complete
executor.wait_for_all();

利用标准并行算法

Taskflow 为您定义算法,以便您使用标准 C++ 语法快速表达常见的并行 模式, 例如并行迭代、并行缩减和并行排序。

// standard parallel CPU algorithms
kthread::Task task1 = taskflow.for_each( // assign each element to 100 in parallel
first, last, [] (auto& i) { i = 100; }
);
kthread::Task task2 = taskflow.reduce( // reduce a range of items in parallel
first, last, init, [] (auto a, auto b) { return a + b; }
);
kthread::Task task3 = taskflow.sort( // sort a range of items in parallel
first, last, [] (auto a, auto b) { return a < b; }
);

// standard parallel GPU algorithms
kthread::cudaTask cuda1 = cudaflow.for_each( // assign each element to 100 on GPU
dfirst, dlast, [] __device__ (auto i) { i = 100; }
);
kthread::cudaTask cuda2 = cudaflow.reduce( // reduce a range of items on GPU
dfirst, dlast, init, [] __device__ (auto a, auto b) { return a + b; }
);
kthread::cudaTask cuda3 = cudaflow.sort( // sort a range of items on GPU
dfirst, dlast, [] __device__ (auto a, auto b) { return a < b; }
);

此外,Taskflow 还提供可组合的图形构建块,以便您高效地实现常见的并行算法,例如并行管道。

// create a pipeline to propagate five tokens through three serial stages
kthread::Pipeline pl(num_parallel_lines,
kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
if(pf.token() == 5) {
pf.stop();
}
}},
kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
printf("stage 2: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
}},
kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
printf("stage 3: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
}}
);
taskflow.composed_of(pl)
executor.run(taskflow).wait();

支持的编译器

要使用 Taskflow,您只需要一个支持 C++17 的编译器:

  • GNU C++ 编译器至少 v8.4 且带有 -std=c++17
  • Clang C++ 编译器至少 v6.0 且带有 -std=c++17
  • Microsoft Visual Studio 至少 v19.27 且带有 /std:c++17
  • AppleClang Xcode 版本至少 v12.0 且带有 -std=c++17
  • Nvidia CUDA 工具包和编译器 (nvcc) 至少 v11.1 且带有 -std=c++17
  • Intel C++ 编译器至少 v19.0.1 且带有 -std=c++17
  • Intel DPC++ Clang 编译器至少 v13.0.0 且带有 -std=c++17 和 SYCL20

Taskflow 适用于 Linux、Windows 和 Mac OS X。

尽管 %Taskflow 主要支持 C++17,但您可以启用通过 -std=c++20 进行 C++20 编译,以利用新的 C++20 特性实现更好的性能。