printPlanWithStats
Pollux 在查询执行期间收集大量有价值的统计信息。这些计数器通过 Task::taskStats() API 公开,以便通过编程访问,并可以以人性化的格式打印出来,方便手动检查。我们使用这些数据来推断查询执行的动态并排查性能问题。
如果您熟悉 Presto,那么下面描述的工具与通过 bunnylol presto <query-id> 提供的 PrestoQueryLookup 工具非常相似。
PlanNode::toString()
PlanNode::toString() 方法将查询计划打印为计划节点树。此 API 可在执行查询之前或之后使用。
PlanNode::toString() 方法接受两个可选标志:detailed 和 recursive。 当 detail 为 true 时,输出将包含每个计划节点的额外详细信息。 当 recursive 为 true 时,输出将包含整个计划树,否则仅显示单个计划节点。
在“detailed”模式下,Project 节点显示投影表达式,Filter 节点显示过滤表达式,Join 节点显示连接类型和连接键,Aggregation 节点显示分组键和聚合函数,OrderBy 节点显示排序键和顺序等。
Let’s use a simple join query as an example:
plan->toString(false /*detailed*/, true /*recursive*/) 使用计划节点名称打印计划树:
-> Project
-> HashJoin
-> TableScan
-> Project
-> Values
plan->toString(true /*detailed*/, true /*recursive*/) 向每个计划节点添加计划节点详细信息。
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"])), ]
-> HashJoin[INNER c0=u_c0]
-> TableScan[]
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"]), ]
-> Values[100 rows in 1 vectors]
我们还来看一个聚合查询:
plan->toString(false /*detailed*/, true /*recursive*/)
-> Aggregation
-> TableScan
plan->toString(true /*detailed*/, true /*recursive*/)
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
-> TableScan[]
printPlanWithStats()
printPlanWithStats() 函数打印带有运行时统计信息注释的查询计划。此函数可在查询处理完成后使用。它接受一个根计划节点和一个 TaskStats 结构体。
默认情况下,printPlanWithStats 会显示每个计划节点的输出行数、CPU 时间、使用的线程数以及峰值内存使用情况。
printPlanWithStats(*plan, task->taskStats())
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))]
Output: 2000 rows (154.98KB), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
-> HashJoin[INNER c0=u_c0]
Output: 2000 rows (136.88KB), Cpu time: 508.74us, Blocked wall time: 117.00us, Peak memory: 2.00MB, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (40.18us/1.59us/0ns)
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 466.97us, Blocked wall time: 117.00us, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 5.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])]
Output: 100 rows (1.31KB), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
-> Values[100 rows in 1 vectors]
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
启用 includeCustomStats 标志后,printPlanWithStats 会为每个计划节点添加特定于运算符的统计信息,例如,连接键的不同值数量、 表扫描中跳过的行组数量、从缓存读取的数据量以及 表扫描中的存储空间、通过聚合下推到扫描中处理的行数等等。
以下是上述连接查询的输出。
printPlanWithStats(*plan, task->taskStats(), true) 显示自定义运算符的统计信息。
-> Project[expressions: (c0:INTEGER, ROW["c0"]), (p1:BIGINT, plus(ROW["c1"],1)), (p2:BIGINT, plus(ROW["c1"],ROW["u_c1"]))]
Output: 2000 rows (154.98KB), Cpu time: 907.80us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (27.24us/872.82us/7.74us)
dataSourceLazyWallNanos sum: 473.00us, count: 20, min: 11.00us, max: 96.00us
-> HashJoin[INNER c0=u_c0]
Output: 2000 rows (136.88KB), Cpu time: 508.74us, Blocked wall time: 223.00us, Peak memory: 2.00MB, CPU breakdown: I/O/F (177.87us/329.20us/1.66us)
HashBuild: Input: 100 rows (1.31KB), Output: 0 rows (0B), Cpu time: 41.77us, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (40.18us/1.59us/0ns)
distinctKey0 sum: 101, count: 1, min: 101, max: 101
queuedWallNanos sum: 125.00us, count: 1, min: 125.00us, max: 125.00us
rangeKey0 sum: 200, count: 1, min: 200, max: 200
HashProbe: Input: 2000 rows (118.12KB), Output: 2000 rows (136.88KB), Cpu time: 466.97us, Blocked wall time: 223.00us, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (137.69us/327.61us/1.66us)
dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1
queuedWallNanos sum: 24.00us, count: 1, min: 24.00us, max: 24.00us
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
dataSourceWallNanos sum: 2.52ms, count: 40, min: 12.00us, max: 250.00us
dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 28, count: 1, min: 28, max: 28
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 140, count: 1, min: 140, max: 140
prefetchBytes sum: 29.51KB, count: 1, min: 29.51KB, max: 29.51KB
queuedWallNanos sum: 29.00us, count: 1, min: 29.00us, max: 29.00us
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 150.25KB, count: 1, min: 150.25KB, max: 150.25KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
-> Project[expressions: (u_c0:INTEGER, ROW["c0"]), (u_c1:BIGINT, ROW["c1"])]
Output: 100 rows (1.31KB), Cpu time: 43.22us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (691ns/5.54us/36.98us)
-> Values[100 rows in 1 vectors]
Input: 0 rows (0B), Output: 100 rows (1.31KB), Cpu time: 3.05us, Blocked wall time: 0ns, Peak memory: 0B, Threads: 1, CPU breakdown: I/O/F (0ns/2.48us/568ns)
这是上面聚合查询的输出。
printPlanWithStats(*plan, task->taskStats()) 显示基本统计数据:
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
Output: 849 rows (84.38KB), Cpu time: 1.96ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (1.38ms/579.12us/6.82us)
-> TableScan[Table: hive_table]
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 2.89ms, Blocked wall time: 25.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1, CPU breakdown: I/O/F (0ns/2.89ms/3.35us)
printPlanWithStats(*plan, task->taskStats(), true) 包括自定义统计数据:
-> Aggregation[PARTIAL [c5] a0 := max(ROW["c0"]), a1 := sum(ROW["c1"]), a2 := sum(ROW["c2"]), a3 := sum(ROW["c3"]), a4 := sum(ROW["c4"])]
Output: 849 rows (84.38KB), Cpu time: 1.96ms, Blocked wall time: 0ns, Peak memory: 1.00MB, Threads: 1, CPU breakdown: I/O/F (1.38ms/579.12us/6.82us)
-> TableScan[Table: hive_table]
Input: 10000 rows (0B), Output: 10000 rows (0B), Cpu time: 2.89ms, Blocked wall time: 30.00us, Peak memory: 1.00MB, Threads: 1, Splits: 1, CPU breakdown: I/O/F (0ns/2.89ms/3.35us)
dataSourceLazyWallNanos sum: 1.07ms, count: 7, min: 92.00us, max: 232.00us
dataSourceWallNanos sum: 329.00us, count: 2, min: 48.00us, max: 281.00us
loadedToValueHook sum: 50000, count: 5, min: 10000, max: 10000
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 2, count: 1, min: 2, max: 2
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 7, count: 1, min: 7, max: 7
prefetchBytes sum: 31.13KB, count: 1, min: 31.13KB, max: 31.13KB
queuedWallNanos sum: 101.00us, count: 1, min: 101.00us, max: 101.00us
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 61.53KB, count: 1, min: 61.53KB, max: 61.53KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
Common operator statistics
让我们仔细看看所有运算符收集的统计数据。
对于每个运算符,Pollux 都会跟踪输入行总数、输出行总数、 它们的预估大小、CPU 时间、阻塞挂钟时间以及运行该运算符所使用的线程数。
-> TableScan[Table: Orders]
Input: 2000 rows (118.12KB), Raw Input: 20480 rows (72.31KB), Output: 2000 rows (118.12KB), Cpu time: 8.89ms, Blocked wall time: 10.00us, Peak memory: 1.00MB, Threads: 1, Splits: 20, CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
printPlanWithStats 显示每个计划节点的输出行数和大小,并显示叶节点和扩展到多个运算符的节点的输入行数和大小。显示其他节点的输入行是多余的,因为输入行数等于直接子计划节点的输出行数。
Input: 2000 rows (118.12KB), Output: 2000 rows (118.12KB)
当使用过滤器的 TableScan 修剪行时,Pollux 会报告原始输入行的数量及其总大小。这些行是在应用下推过滤器之前处理的。 TableScan 还会报告分配的分片数量。
Raw Input: 20480 rows (72.31KB), Splits: 20
Pollux 还会测量每个算子的 CPU 时间和峰值内存使用情况。所有计划节点都会显示这些信息。
Cpu time: 8.89ms, Peak memory: 1.00MB
您还可以将 CPU 时间细分为运算符的 addInput、getOutput 和 finish 阶段。 下面的 I/O/F 是 addInput/getOutput/finish 的快捷方式。
CPU breakdown: I/O/F (0ns/8.88ms/4.93us)
某些算子(例如 TableScan 和 HashProbe)可能会因等待拆分或哈希表而被 阻塞。Pollux 会记录算子被阻塞的总时间,并且 printPlanWithStats 会将此信息显示为“阻塞时间”。
Blocked wall time: 10.00us
Custom operator statistics
操作符还会收集并报告特定于操作符的统计信息。
TableScan 操作符会报告统计信息,显示从缓存和持久存储中读取了多少数据、预取了多少数据、以及通过基于统计信息的修剪跳过了多少文件和行组。
-> TableScan[Table = Orders]
localReadBytes sum: 0B, count: 1, min: 0B, max: 0B
numLocalRead sum: 0, count: 1, min: 0, max: 0
numPrefetch sum: 28, count: 1, min: 28, max: 28
numRamRead sum: 0, count: 1, min: 0, max: 0
numStorageRead sum: 140, count: 1, min: 140, max: 140
prefetchBytes sum: 29.51KB, count: 1, min: 29.51KB, max: 29.51KB
ramReadBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplitBytes sum: 0B, count: 1, min: 0B, max: 0B
skippedSplits sum: 0, count: 1, min: 0, max: 0
skippedStrides sum: 0, count: 1, min: 0, max: 0
storageReadBytes sum: 150.25KB, count: 1, min: 150.25KB, max: 150.25KB
totalScanTime sum: 0ns, count: 1, min: 0ns, max: 0ns
totalRemainingFilterTime sum: 0ns, count: 1, min: 0ns, max: 0ns
queryThreadIoLatency sum: 0, count: 1, min: 0, max: 0
HashBuild 运算符报告连接键的不同值的范围和数量。
-> HashJoin[INNER c0=u_c0]
HashBuild:
rangeKey0 sum: 200, count: 1, min: 200, max: 200
distinctKey0 sum: 101, count: 1, min: 101, max: 101
HashProbe 运算符报告它是否生成了动态过滤器,而 TableScan 运算符报告它是否收到了从连接中推送下来的动态过滤器。
-> HashJoin[INNER c0=u_c0]
HashProbe:
dynamicFiltersProduced sum: 1, count: 1, min: 1, max: 1
-> TableScan[]
dynamicFiltersAccepted sum: 1, count: 1, min: 1, max: 1
TableScan 运算符通过将聚合下推到 TableScan 来显示处理了多少行。
loadedToValueHook sum: 50000, count: 5, min: 10000, max: 10000