Connectors
==========
连接器允许从外部源读取和写入数据。
此概念类似于 Presto 连接器
TableScanNode<TableScanNode> 运算符通过连接器读取外部数据。
TableWriteNode<TableWriteNode> 运算符通过连接器向外部写入数据。
Pollux 中的各种连接器接口如下所述。
Connector Interface
| Interface Name | Description |
|---|---|
| ConnectorSplit | A chunk of data to process. For example, a single file. |
| DataSource | Provides methods to consume and process a split. A DataSource can optionally consume a dynamic filter during execution to prune some rows from the output vector. |
| DataSink | Provides methods to write a Pollux vector externally. |
| Connector | Allows creating instances of a DataSource or a DataSink. |
| Connector Factory | Enables creating instances of a particular connector. |
Pollux 提供开箱即用的 Hive 和 TPC-H 连接器。 下面,我们将详细了解上述连接器接口在 Hive 连接器中的实现方式。
Hive Connector
Hive 连接器用于读取和写入驻留在外部存储(S3、HDFS、GCS、Linux FS)上的数据文件(Parquet、DWRF)。
HiveConnectorSplit
HiveConnectorSplit 使用包括“文件路径”、“文件格式”、“起始位置”、“长度”、“存储格式”等参数来描述数据块。 无需指定与行边界对齐的起始位置和长度值。 例如,在 Parquet 文件中,偏移量在 [起始位置, 长度) 范围内的行组将作为拆分的一部分进行处理。 对于给定的一组文件,用户或应用程序负责定义拆分。
HiveDataSource
HiveDataSource 实现了 addSplit API,该 API 会使用 HiveConnectorSplit。
它会根据文件格式、偏移量和长度创建一个文件读取器。支持的文件格式包括 DWRF 和 Parquet。
next API 会处理拆分并返回一批行。用户可以继续调用 next,直到拆分中的所有行都被完全读取。
HiveDataSource 允许使用 addDynamicFilter API 添加动态过滤器。这支持 :ref:Dynamic Filter Pushdown<DynamicFilterPushdown>。
HiveDataSink
HiveDataSink 将向量写入磁盘上的文件。支持的文件格式包括 DWRF 和 Parquet。
HiveDataSink 的参数还包括列名、排序、分区和 bucketing 信息。
appendData API 根据上述参数实例化文件写入器,并将向量写入磁盘。
HiveConnector
HiveConnector 实现了 createDataSource 连接器 API,用于创建 HiveDataSource 实例。
它还实现了 createDataSink 连接器 API,用于创建 HiveDataSink 实例。
这些 API 的参数之一是 ConnectorQueryCtx,它提供了指定内存池和连接器配置的方法。
HiveConnectorFactory
HiveConnectorFactory 已启用创建 HiveConnector 实例的功能。注册 HiveConnectorFactory 时,需要一个“连接器名称”,
例如“hive”。然后,可以使用 newConnector API 创建多个 HiveConnector 实例,只需指定 connectorId 和此处列出
的连接器配置即可。如果您有多个外部源,并且每个源都需要不同的配置,则需要多个连接器实例。
Storage Adapters
Hive 连接器允许从各种分布式存储系统读取和写入文件。 支持的存储 API 包括 S3、HDFS、GCS 和 Linux FS。
如果读取时未找到文件,openFileForRead API 会抛出 PolluxRuntimeError 并返回 error_code::kFileNotFound。
此行为对于支持 ignore_missing_files 配置属性是必需的。
使用 AWS SDK for C++ <https://github.com/aws/aws-sdk-cpp> 库支持 S3。
S3 支持的方案包括 s3://(Amazon S3、Minio)、s3a://(Hadoop 3.x)、s3n://(Hadoop 3.x 中已弃用)、oss://(阿里云存储)以及 cos:// 和 cosn://(腾讯云存储)。
HDFS 支持使用
Apache Hawk libhdfs3 <https://github.com/apache/hawq/tree/master/depends/libhdfs3> 库。HDFS 支持的方案
为 hdfs://。
GCS 支持使用
Google Cloud Platform C++ 客户端库 <https://github.com/googleapis/google-cloud-cpp> 库。GCS 支持的方案
为 gs://。
ABS(Azure Blob 存储)支持使用
Azure SDK for C++ <https://github.com/Azure/azure-sdk-for-cpp> 库。ABS 支持的方案为 abfs(s)://。
S3 Storage adapter using a proxy
默认情况下,C++ AWS S3 客户端不支持环境变量 http_proxy、https_proxy 和 no_proxy 的配置。
Java AWS S3 客户端支持此功能。
环境变量可以指定为小写、大写或两者兼有。
为了启用代理,必须将 Hive 连接器配置变量 hive.s3.use-proxy-from-env 设置为 true。默认情况下,该值为 false。
启用代理设置后的行为如下:
- 读取 http_proxy/HTTP_PROXY、https_proxy/HTTPS_PROXY 和 no_proxy/NO_PROXY 环境变量。如果同时设置了大小写变量,则小写变量优先。
- 扫描 no_proxy/NO_PROXY 内容以查找精确匹配和后缀匹配。
- 可以在 no_proxy/NO_PROXY 中指定 IP 地址、域名、子域名或 IP 范围 (CIDR)。
- no_proxy/NO_PROXY 列表以逗号分隔。
- 使用 . 或 *. 指示域名后缀匹配,例如
.foobar.com将匹配test.foobar.com或foo.foobar.com。