任务流
Taskflow 帮助您用现代 C++ 快速编写并行和异构任务程序。
任务流是构建在pthread或者kthread基础上的任务调度框架。底层线程基于pthread或者kthread。
为什么使用任务流?
在处理复杂的并行工作负载方面,Taskflow 比许多现有的任务编程框架更快、更具表现力,而且更容易实现嵌入式集成。
Taskflow 让您可以快速实施任务分解策略,该策略结合了常规和不规则的计算模式,并结合高效的 工作窃取 调度程序来优化您的多线程性能。
| Static Tasking |
|---|
| Subflow Tasking |
Taskflow 支持条件任务,让您能够跨相关任务快速做出控制流决策,从而实现使用现有工具难以实现的循环和条件。
| Conditional Tasking |
|---|
Taskflow 是可组合的。您可以通过组合模块化和可重复使用的块来创建大型并行图,这些块更容易在单个范围内进行优化。
| Taskflow Composition |
|---|
启动您的第一个 Taskflow 程序
以下程序(simple.cpp)创建四个任务
A、B、C 和 D,其中 A 在 B 和 C 之前运行,而 D 在 B 和 C 之后运行。
当 A 完成时,B 和 C 可以并行运行。
在现场试用 Compiler Explorer (godbolt)!
#include <kthread/kthread.h> // Taskflow is header-only
int main(){
kthread::TaskExecutor executor;
kthread::Taskflow taskflow;
auto [A, B, C, D] = taskflow.emplace( // create four tasks
[] () { std::cout << "TaskA\n"; },
[] () { std::cout << "TaskB\n"; },
[] () { std::cout << "TaskC\n"; },
[] () { std::cout << "TaskD\n"; }
);
A.precede(B, C); // A runs before B and C
D.succeed(B, C); // D runs after B and C
executor.run(taskflow).wait();
return 0;
}
Visualize Your First Taskflow Program
Taskflow comes with a built-in profiler, for you to profile and visualize taskflow programs in an easy-to-use web-based interface.
# run the program with the environment variable TF_ENABLE_PROFILER enabled
~$ KTHREAD_ENABLE_PROFILER=simple.json ./simple
~$ cat simple.json
[
{"executor":"0","data":[{"worker":0,"level":0,"data":[{"span":[172,186],"name":"0_0","type":"static"},{"span":[187,189],"name":"0_1","type":"static"}]},{"worker":2,"level":0,"data":[{"span":[93,164],"name":"2_0","type":"static"},{"span":[170,179],"name":"2_1","type":"static"}]}]}
]
In addition to execution diagram, you can dump the graph to a DOT format and visualize it using a number of free [GraphViz][GraphViz] tools.
// dump the taskflow graph to a DOT format through std::cout
taskflow.dump(std::cout);
Express 任务图并行性
Taskflow 为用户提供了静态和动态任务图构造,以在嵌入图内控制流的任务图中表达端到端并行性。
创建子流图
Taskflow 支持动态任务,让您从任务执行中创建子流程图 以执行动态并行。 以下程序生成以任务“B”为父级的任务依赖关系图。
kthread::Task A = taskflow.emplace([](){}).name("A");
kthread::Task C = taskflow.emplace([](){}).name("C");
kthread::Task D = taskflow.emplace([](){}).name("D");
kthread::Task B = taskflow.emplace([] (kthread::Subflow& subflow) {
kthread::Task B1 = subflow.emplace([](){}).name("B1");
kthread::Task B2 = subflow.emplace([](){}).name("B2");
kthread::Task B3 = subflow.emplace([](){}).name("B3");
B3.succeed(B1, B2); // B3 runs after B1 and B2
}).name("B");
A.precede(B, C); // A runs before B and C
D.succeed(B, C); // D runs after B and C
将控制流集成到任务图
Taskflow 支持条件任务,让您能够 跨相关任务快速做出控制流决策,从而在端到端任务图中实现循环和条件。
kthread::Task init = taskflow.emplace([](){}).name("init");
kthread::Task stop = taskflow.emplace([](){}).name("stop");
// creates a condition task that returns a random binary
kthread::Task cond = taskflow.emplace(
[](){ return std::rand() % 2; }
).name("cond");
init.precede(cond);
// creates a feedback loop {0: cond, 1: stop}
cond.precede(cond, stop);
编写任务图
Taskflow 是可组合的。 您可以通过组合模块化和可重复使用的块来创建大型并行图,这些块更容易在单个范围内进行优化。
kthread::Taskflow f1, f2;
// create taskflow f1 of two tasks
kthread::Task f1A = f1.emplace([]() { std::cout << "Task f1A\n"; })
.name("f1A");
kthread::Task f1B = f1.emplace([]() { std::cout << "Task f1B\n"; })
.name("f1B");
// create taskflow f2 with one module task composed of f1
kthread::Task f2A = f2.emplace([]() { std::cout << "Task f2A\n"; })
.name("f2A");
kthread::Task f2B = f2.emplace([]() { std::cout << "Task f2B\n"; })
.name("f2B");
kthread::Task f2C = f2.emplace([]() { std::cout << "Task f2C\n"; })
.name("f2C");
kthread::Task f1_module_task = f2.composed_of(f1)
.name("module");
f1_module_task.succeed(f2A, f2B)
.precede(f2C);
启动异步任务
Taskflow 支持异步任务。 您可以异步启动任务以动态探索任务图并行性。
kthread::TaskExecutor executor;
// create asynchronous tasks directly from an executor
std::future<int> future = executor.async([](){
std::cout << "async task returns 1\n";
return 1;
});
executor.silent_async([](){ std::cout << "async task does not return\n"; });
// create asynchronous tasks with dynamic dependencies
kthread::AsyncTask A = executor.silent_dependent_async([](){ printf("A\n"); });
kthread::AsyncTask B = executor.silent_dependent_async([](){ printf("B\n"); }, A);
kthread::AsyncTask C = executor.silent_dependent_async([](){ printf("C\n"); }, A);
kthread::AsyncTask D = executor.silent_dependent_async([](){ printf("D\n"); }, B, C);
executor.wait_for_all();
执行任务流
执行器提供了几种线程安全方法来运行任务流。
您可以运行任务流一次、多次或直到满足停止条件。
这些方法是非阻塞的,并返回kthread::TaskFuture<void>,
以便您查询执行状态。
// runs the taskflow once
kthread::TaskFuture<void> run_once = executor.run(taskflow);
// wait on this run to finish
run_once.get();
// run the taskflow four times
executor.run_n(taskflow, 4);
// runs the taskflow five times
executor.run_until(taskflow, [counter=5](){ return --counter == 0; });
// block the executor until all submitted taskflows complete
executor.wait_for_all();
利用标准并行算法
Taskflow 为您定义算法,以便您使用标准 C++ 语法快速表达常见的并行 模式, 例如并行迭代、并行缩减和并行排序。
// standard parallel CPU algorithms
kthread::Task task1 = taskflow.for_each( // assign each element to 100 in parallel
first, last, [] (auto& i) { i = 100; }
);
kthread::Task task2 = taskflow.reduce( // reduce a range of items in parallel
first, last, init, [] (auto a, auto b) { return a + b; }
);
kthread::Task task3 = taskflow.sort( // sort a range of items in parallel
first, last, [] (auto a, auto b) { return a < b; }
);
// standard parallel GPU algorithms
kthread::cudaTask cuda1 = cudaflow.for_each( // assign each element to 100 on GPU
dfirst, dlast, [] __device__ (auto i) { i = 100; }
);
kthread::cudaTask cuda2 = cudaflow.reduce( // reduce a range of items on GPU
dfirst, dlast, init, [] __device__ (auto a, auto b) { return a + b; }
);
kthread::cudaTask cuda3 = cudaflow.sort( // sort a range of items on GPU
dfirst, dlast, [] __device__ (auto a, auto b) { return a < b; }
);
此外,Taskflow 还提供可组合的图形构建块,以便您高效地实现常见的并行算法,例如并行管道。
// create a pipeline to propagate five tokens through three serial stages
kthread::Pipeline pl(num_parallel_lines,
kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
if(pf.token() == 5) {
pf.stop();
}
}},
kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
printf("stage 2: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
}},
kthread::Pipe{kthread::PipeType::SERIAL, [](kthread::Pipeflow& pf) {
printf("stage 3: input buffer[%zu] = %d\n", pf.line(), buffer[pf.line()]);
}}
);
taskflow.composed_of(pl)
executor.run(taskflow).wait();
支持的编译器
要使用 Taskflow,您只需要一个支持 C++17 的编译器:
- GNU C++ 编译器至少 v8.4 且带有 -std=c++17
- Clang C++ 编译器至少 v6.0 且带有 -std=c++17
- Microsoft Visual Studio 至少 v19.27 且带有 /std:c++17
- AppleClang Xcode 版本至少 v12.0 且带有 -std=c++17
- Nvidia CUDA 工具包和编译器 (nvcc) 至少 v11.1 且带有 -std=c++17
- Intel C++ 编译器至少 v19.0.1 且带有 -std=c++17
- Intel DPC++ Clang 编译器至少 v13.0.0 且带有 -std=c++17 和 SYCL20
Taskflow 适用于 Linux、Windows 和 Mac OS X。
尽管 %Taskflow 主要支持 C++17,但您可以启用通过 -std=c++20 进行 C++20 编译,以利用新的 C++20 特性实现更好的性能。