Tracing
Background
查询跟踪工具有助于分析和调试查询性能和正确性问题。它允许在隔离环境(例如本地机器)中重放部分查询计划和数据集,从 而避免生产环境中外部噪声(例如存储、网络等)的干扰。这对于查询性能分析和问题调试来说更加高效,因为它无需在生产环境中重放整个查询。
How Tracing Tool Works
追踪过程包含两个不同的阶段:追踪阶段和重放阶段。追踪阶段在生产环境中执行,而重放阶段在本地开发环境中进行。
追踪阶段
- 在查询任务启动期间,记录追踪重放所需的元数据,包括查询计划片段、查询配置和连接器属性。
- 在整个查询处理过程中,每个被追踪的算子都会记录输入向量或分片,并将其存储在指定的存储位置。
- 元数据和分片以 JSON 格式序 列化,算子数据输入使用 Presto 序列化器 进行序列化。
重放阶段
- 读取并反序列化已记录的查询计划,提取跟踪的计划节点,并使用自定义的源节点和接收器节点组装计划片段。
- 源节点从存储中序列化的操作符输入读取输入,接收器操作符打印或记录执行状态。
- 使用步骤 1 中组装好的计划片段构建任务。应用已记录的查询配置和连接器属性,使用与生产环境中相同的输入和配置设置重放该任务。
注意:Presto 序列化可能会丢失输入向量编码,例如惰性向量和嵌套字典编码,这会影响操作符的执行。因此,它可能并不总是与生产环境中的相同。
Tracing Framework
追踪框架由三个组件组成:
- 查询追踪写入器:记录元数据、输入数据和扫描分片。
- 查询追踪读取器:加载元数据、读取输入数据和扫描分片。
- 查询追踪重放器:显示查询摘要或重放目标算子的执行情况。
Trace Writers
有三种类型的写入器:TaskTraceMetadataWriter、OperatorTraceInputWriter 和 OperatorTraceSplitWriter。它们用于在生产环境或影子环境中记录真实的 执行数据。
TaskTraceMetadataWriter
TaskTraceMetadataWriter 会在任务创建期间记录查询元数据,并将其序列化,
然后以 JSON 格式保存到文件中。元数据有两种类型:
- 查询配置和连接器属性:这些元数据由用户根据查询指定,可以序列化为 JSON 映射对象(键值对)。
- 任务计划片段(又名计划节点树):可以序列化为 JSON 对象,此功能已在 Pollux 中支持。
元数据在元数据文件中保存为单个 JSON 对象字符串。它看起来类似于以下简化的、打印精美的 JSON 字符串(为简洁起见,删除了一些内容):
{
"planNode":{
"nullAware": false,
"outputType":{...},
"leftKeys":[...],
"rightKeys":[...],
"joinType":"INNER",
"sources":[
{
"outputType":{...},
"tableHandle":{...},
"assignments":[...],
"id":"0",
"name":"TableScanNode"
},
{
"outputType":{...},
"tableHandle":{...},
"assignments":[...],
"id":"1",
"name":"TableScanNode"
}
],
"id":"2",
"name":"HashJoinNode"
},
"connectorProperties":{...},
"queryConfig":{"query_trace_node_ids":"2", ...}
}
OperatorTraceInputWriter
OperatorTraceInputWriter 记录来自目标算子的输入向量,它使用 Presto 序列化器对每个向量批次进行序列化并立即刷新,以确保即使在执行过程中发生崩溃也能重放。
它在目标算子初始化期间创建,并在执行期间将数据写入 Operator::addInput 方法中。它在目标算子关闭时完成。但是,如果记录的数据大小超过用户指定的限制,它可能会提前完成。
OperatorTraceSplitWriter
OperatorTraceSplitWriter 捕获来自目标 TableScan 算子的输入分片。它会序列化每个分片并立即刷新,以确保即使在执行过程中发生崩溃也能重放。
每个分片的序列化方式如下:
| length : uint32_t | split : JSON string | crc32 : uint32_t |
Storage Location
建议将追踪数据存储在远程存储系统中,以确保即使计算集群重新配置或遇到问题,数据也能保存并可访问。这也有助于防止集群中的节点因本地磁盘耗尽而发生故障。
用户应首先创建一个根目录。然后,写入器将在此根目录中创建子目录来组织追踪数据。精心设计的目录结 构将使数据井然有序,并易于重放和分析。
元数据位置
“TaskTraceMetadataWriter”在任务创建期间设置,因此它会创建一个名为“queryId/$taskId”的追踪目录。
输入数据和拆分位置
该任务会生成驱动程序和运算符,每个驱动程序和运算符都由一组 ID 标识。每个驱动程序都会分配一个管道 ID 和一个驱动程序 ID。管道 ID 是从零开始的连续数字, 驱动程序 ID 也是从零开始的连续数字,但作用域限定于特定管道, 以确保该管道内的唯一性。此外,驱动程序中的每个运算符都会分配一个从零开始的连续运算符 ID,并且在驱动程序内是唯一的。
节点 ID 会合并同一跟踪计划节点的跟踪数据。管道 ID 会隔离从同一计划节点创建的不同运算符(例如,HashProbe 和 HashBuild 与 HashJoinNode)之间的 跟踪数据。驱动程序 ID 会隔离同一管道中来自不同驱动程序的对等运算符的跟踪数据。
相应地,为了确保追踪数据的有序且隔离的存储,在算子初始化时会设置 OperatorTraceInputWriter 和 OpeartorTraceSplitWriter,并
在 $rootDir/$queryId/$taskId/$nodeId/$pipelineId/$driverId 中创建数据或拆分追踪目录。
以下是一个典型的 HashJoinNode 追踪元数据和数据存储目录结构:
trace ---------------------------------------------------> rootDir
└── query-1 -------------------------------------------> query ID
└── task-1 ----------------------------------------> task ID
├── 2 -----------------------------------------> node ID
│ ├── 0 -------------------------> pipeline ID (probe)
│ │ ├── 0 -------------------------> driver ID (0)
│ │ │ ├── op_input_trace.data
│ │ │ └── op_trace_summary.json
│ │ └── 1 -------------------------> driver ID (1)
│ │ ├── op_input_trace.data
│ │ └── op_trace_summary.json
│ └── 1 -------------------------> pipeline ID (build)
│ ├── 0 ---------------------------> driver ID (0)
│ │ ├── op_input_trace.data
│ │ └── op_trace_summary.json
│ └── 1 -------------------------> driver ID (1)
│ ├── op_input_trace.data
│ └── op_trace_summary.json
└── task_trace_meta.json ----------------> query metadata
Memory Management
添加一个名为 tracePool 的新叶系统池,用于跟踪内存使用情况,并将其暴露出来,类似 memory::MemoryManager::getInstance()->tracePool()。
Query Trace Readers
三种类型的读取器与查询跟踪写入器相对应:TaskTraceMetadataReader、OperatorTraceInputReader 和
OperatorTraceSplitReader。重放器通常在本地环境中使用它们,这将在查询跟踪重放器部分中详细描述。
TaskTraceMetadataReader
TaskTraceMetadataReader 可以加载查询元数据 JSON 文件,并提取查询配置、连接器属性和计划片段。重放器使用这些信息构建重放任务。
OperatorTraceInputReader
OperatorTraceInputReader 读取并反序列化跟踪数据文件中的输入向量。
它由 QueryTraceScan 算子创建和使用,该算子将在查询跟踪扫描部分中详细介绍。
OperatorTraceSplitReader
OperatorTraceSplitReader 读取并反序列化跟踪拆分信息文件中的输入拆分,
并为查询重放生成一个 exec::Split 列表。
Trace Scan
如追踪的工作原理部分所述,重放非叶子算子需要一个专门的源算子。该算子负责在追踪阶段读取数据记录,并通过自定义的计划节点和算子转换器与 Pollux 的“LocalPlanner”集成。
TraceScanNode
我们引入了一个自定义的“TraceScanNode”来重放非叶子算子。该节点充当源节点,并在重放期间为每个驱动程序创建一个专门的扫描算子,称 为“OperatorTraceScan”。“TraceScanNode”包含指定追踪节点的追踪目录、与其关联的管道 ID 以及用户在重放期间传递的驱动程序 ID 列表,以便 OperatorTraceScan 能够找到正确的追踪输入数据或拆分目录。
OperatorTraceScan
如存储位置部分所述,一个计划节点可以拆分为多个管道, 每个管道可以拆分为多个算子。每个算子对应一个驱动程序,即一个执行线程。单个计划节点可能有多个跟踪数据文件,每个驱动程序一个文件。
为了 识别与特定 OperatorTraceScan 算子关联的正确输入数据文件,它利用 TraceScanNode 提供的跟踪节点目录、流水线 ID 和驱动程序 ID 列表。
在重放过程中,它使用自身的驱动程序 ID 作为索引,从 TraceScanNode 中的驱动程序 ID 列表中提取重放驱动程序 ID。结合 TraceScanNode 中的跟踪
节点目录和流水线 ID,它找到其对应的输入数据文件。
相应地,OperatorTraceScan 算子使用 $rootDir/$queryId/$taskId/$nodeId/$pipelineId/$dirverId 中的跟踪数据文件
来创建 OperatorTraceReader。OperatorTraceScan::getOutput 方法返回其 OperatorTraceInputReader 读取的向量,该向
量的返回顺序与生产执行中最初处理的顺序相同。这确保了重放保持与原始生产执行相同的数据流。
Query Trace Replayer
查询跟踪重放器通常在本地环境中使用,其工作原理如下:
-
使用
TaskTraceMetadataReader加载跟踪的查询配置、连接器属性和计划片段。 -
使用指定的计划节点 ID 从计划片段中提取目标计划节点。
-
使用步骤 2 中的目标计划节点创建重放计划节点。使用
exec::test::PlanBuilder创建重放计划。 -
如果目标计划节点是
TableScanNode,
- 将重放计划节点作为源节点添加到重放计划中。
- 使用
OperatorInputSplitReader获取所有跟踪的分片。 - 将分片用作任务重放的输入。
- 对于非叶子运算符,将
QueryTraceScanNode作为源节点添加到重放计划中, 然后添加重放计划节点。 - 使用
exec::test::AssertQueryBuilder添加接收器节点,应用查询配置(禁用跟踪)和连接器属性,并执行重放计划。
OperatorReplayBase 提供重放算子所需的核心功能。
它负责检索元数据、创建重放计划以及执行计划。
具体的算子重放器(例如 HashJoinReplayer 和 AggregationReplayer)扩展了此基类,并重写了 createPlanNode 方法来创建特定的计划节点。
Query Trace Tool Usage
Enable tracing using configurations in https://facebookincubator.github.io/pollux/configs.html#tracing.
After the traced query finishes, its metadata and the input data for the target tasks and operators
are all saved in the directory specified by query_trace_dir.
To get a glance at the traced task, we can execute the following command:
pollux_query_replayer --root_dir /trace_root --task_id task-4 --summary
它将显示如下内容:
++++++Query trace summary++++++
Number of tasks: 1
++++++Query configs++++++
query_trace_task_reg_exp: .*
query_trace_node_ids: 2
query_trace_max_bytes: 107374182400
query_trace_dir: /tmp/pollux_test_aJqeFd/basic/traceRoot/
query_trace_enabled: 1
++++++Connector configs++++++
++++++Task query plan++++++
-- HashJoin[2][INNER t0=u0] -> t0:BIGINT, t1:VARCHAR, t2:SMALLINT, t3:REAL, u0:BIGINT, u1:INTEGER, u2:SMALLINT, u3:VARCHAR
-- TableScan[0][table: hive_table] -> t0:BIGINT, t1:VARCHAR, t2:SMALLINT, t3:REAL
-- TableScan[1][table: hive_table] -> u0:BIGINT, u1:INTEGER, u2:SMALLINT, u3:VARCHAR
++++++Task Summaries++++++
++++++Task task-1++++++
++++++Pipeline 2++++++
driver 0: opType HashProbe, inputRows 70720, peakMemory 108.00KB
driver 1: opType HashProbe, inputRows 70720, peakMemory 108.00KB
++++++Pipeline 3++++++
driver 0: opType HashBuild, inputRows 48000, peakMemory 4.51MB
driver 1: opType HashBuild, inputRows 48000, peakMemory 2.25MB
然后我们可以使用终端中的以下命令重新执行查询,或者在 IDE 中使用相同的标志进行调试。
pollux_query_replayer --root_dir /Users/bytedance/work/native/trace --query_id query-1 --task_id task-1 --node_id 2
Stats of replaying operator HashBuild : Output: 0 rows (0B, 0 batches), Cpu time: 48.63us, Wall time: 65.22us, Blocked wall time: 24.08ms, Peak memory: 4.51MB, Memory allocations: 16, Threads: 2, CPU breakdown: B/I/O/F (23.79us/0ns/14.46us/10.38us)
Stats of replaying operator HashProbe : Output: 13578240 rows (1.17GB, 13280 batches), Cpu time: 3.99s, Wall time: 4.01s, Blocked wall time: 98.58ms, Peak memory: 108.00KB, Memory allocations: 12534, Threads: 2, CPU breakdown: B/I/O/F (8.52ms/1.59s/2.39s/20.29us)
Memory usage: TaskCursorQuery_0 usage 0B reserved 0B peak 10.00MB
task.test_cursor 1 usage 0B reserved 0B peak 10.00MB
node.2 usage 0B reserved 0B peak 0B
op.2.1.1.OperatorTraceScan usage 0B reserved 0B peak 0B
op.2.1.0.OperatorTraceScan usage 0B reserved 0B peak 0B
node.N/A usage 0B reserved 0B peak 0B
op.N/A.0.1.CallbackSink usage 0B reserved 0B peak 0B
op.N/A.0.0.CallbackSink usage 0B reserved 0B peak 0B
node.1 usage 0B reserved 0B peak 10.00MB
op.1.1.0.HashBuild usage 0B reserved 0B peak 4.51MB
op.1.0.1.HashProbe usage 0B reserved 0B peak 108.00KB
op.1.1.1.HashBuild usage 0B reserved 0B peak 2.25MB
op.1.0.0.HashProbe usage 0B reserved 0B peak 108.00KB
node.0 usage 0B reserved 0B peak 0B
op.0.0.1.OperatorTraceScan usage 0B reserved 0B peak 0B
op.0.0.0.OperatorTraceScan usage 0B reserved 0B peak 0B
Here is a full list of supported command line arguments.
* ``--root_dir``: The root directory where the replayer is reading the traced data, must be set.
* ``--summary``: Show the summary of the tracing including number of tasks and task ids.
It also print the query metadata including query configs, connectors properties, and query plan in JSON format.
* ``--query_id``: Specify the target query ID, it must be set.
* ``--task_id``: Specify the target task ID, it must be set.
* ``--node_id``: Specify the target node ID, it must be set.
* ``--driver_ids``: Specify the target driver IDs to replay.
* ``--shuffle_serialization_format``: Specify the shuffle serialization format.
* ``--table_writer_output_dir``: Specify the output directory of TableWriter.
* ``--hiveConnectorExecutorHwMultiplier``: Hardware multiplier for hive connector.
* ``--driver_cpu_executor_hw_multiplier``: Hardware multipler for driver cpu executor.
* ``--memory_arbitrator_type``: Specify the memory arbitrator type.
* ``--query_memory_capacity_mb``: Specify the query memory capacity limit in MB. If it is zero, then there is no limit.
* ``--copy_results``: If true, copy the replaying result.