Skip to main content
Version: 1.1.1

CSV format

读取和写入 CSV 文件

Alkaid 提供快速 CSV 读取器,允许提取外部数据以 创建 Alkaid 表或 Alkaid RecordBatches 流。

CSV 读取器/写入器 API 参考 <cpp-api-csv>

读取 CSV 文件

CSV 文件中的数据既可以使用~alkaid::csv::TableReader 读取为单个 Alkaid 表,也可以使用 ~alkaid::csv::StreamingReader 流式传输为 RecordBatches。请参阅 Tradeoffs <cpp-csv-tradeoffs> 以了解这两种方法之间的权衡。

这两种读取器都需要一个表示输入文件的 alkaid::io::InputStream 实例。可以使用 ~alkaid::csv::ReadOptions~alkaid::csv::ParseOptions~alkaid::csv::ConvertOptions 的组合来定制它们的行为。

TableReader

#include "arrow/csv/api.h"

{
// ...
alkaid::io::IOContext io_context = alkaid::io::default_io_context();
std::shared_ptr<alkaid::io::InputStream> input = ...;

auto read_options = alkaid::csv::ReadOptions::Defaults();
auto parse_options = alkaid::csv::ParseOptions::Defaults();
auto convert_options = alkaid::csv::ConvertOptions::Defaults();

// Instantiate TableReader from input stream and options
auto maybe_reader =
alkaid::csv::TableReader::Make(io_context,
input,
read_options,
parse_options,
convert_options);
if (!maybe_reader.ok()) {
// Handle TableReader instantiation error...
}
std::shared_ptr<alkaid::csv::TableReader> reader = *maybe_reader;

// Read table from CSV file
auto maybe_table = reader->Read();
if (!maybe_table.ok()) {
// Handle CSV read error
// (for example a CSV syntax error or failed type conversion)
}
std::shared_ptr<alkaid::Table> table = *maybe_table;
}

StreamingReader

#include "arrow/csv/api.h"

{
// ...
alkaid::io::IOContext io_context = alkaid::io::default_io_context();
std::shared_ptr<alkaid::io::InputStream> input = ...;

auto read_options = alkaid::csv::ReadOptions::Defaults();
auto parse_options = alkaid::csv::ParseOptions::Defaults();
auto convert_options = alkaid::csv::ConvertOptions::Defaults();

// Instantiate StreamingReader from input stream and options
auto maybe_reader =
alkaid::csv::StreamingReader::Make(io_context,
input,
read_options,
parse_options,
convert_options);
if (!maybe_reader.ok()) {
// Handle StreamingReader instantiation error...
}
std::shared_ptr<alkaid::csv::StreamingReader> reader = *maybe_reader;

// Set aside a RecordBatch pointer for re-use while streaming
std::shared_ptr<RecordBatch> batch;

while (true) {
// Attempt to read the first RecordBatch
alkaid::Status status = reader->ReadNext(&batch);

if (!status.ok()) {
// Handle read error
}

if (batch == NULL) {
// Handle end of file
break;
}

// Do something with the batch
}
}

Tradeoffs

选择使用 ~alkaid::csv::TableReader 还是~alkaid::csv::StreamingReader 最终取决于用例 但需要注意一些权衡:

  1. 内存使用~alkaid::csv::TableReader 会一次性将所有数据加载到内存中,并且根据数据量,可能需要比 ~alkaid::csv::StreamingReader 多得多的内存,后者每次只能加载一个 ~alkaid::RecordBatch。这可能是用户最重要的权衡。

  2. 速度:读取 CSV 的全部内容时,~alkaid::csv::TableReader 往往比 ~alkaid::csv::StreamingReader 更快,因为它可以更好地利用可用内核。有关更多详细信息,请参阅性能 <cpp-csv-performance>

  3. 灵活性~alkaid::csv::StreamingReader 可能被认为不如 ~alkaid::csv::TableReader 灵活, 因为它仅对读入的第一个块执行类型推断,在此之后,类型将被冻结,并且后续块中任何无法转换为这些类型的数据都会导致 错误。请注意,可以通过将 ReadOptions::block_size 设置为足够大的值或使用 ConvertOptions::column_types 明确设置所需的数据类型来解决此问题。

写入 CSV 文件

CSV 文件被写入~alkaid::io::OutputStream

#include <arrow/csv/api.h>
{
// Oneshot write
// ...
std::shared_ptr<alkaid::io::OutputStream> output = ...;
auto write_options = alkaid::csv::WriteOptions::Defaults();
if (WriteCSV(table, write_options, output.get()).ok()) {
// Handle writer error...
}
}
{
// Write incrementally
// ...
std::shared_ptr<alkaid::io::OutputStream> output = ...;
auto write_options = alkaid::csv::WriteOptions::Defaults();
auto maybe_writer = alkaid::csv::MakeCSVWriter(output, schema, write_options);
if (!maybe_writer.ok()) {
// Handle writer instantiation error...
}
std::shared_ptr<alkaid::ipc::RecordBatchWriter> writer = *maybe_writer;

// Write batches...
if (!writer->WriteRecordBatch(*batch).ok()) {
// Handle write error...
}

if (!writer->Close().ok()) {
// Handle close error...
}
if (!output->Close().ok()) {
// Handle file close error...
}
}
info

该编写器尚不支持所有 Alkaid 类型。

列名称

有三种可能的方法可以从 CSV 文件中推断出列名:

  • 默认情况下,列名从 CSV 文件的第一行读取
  • 如果设置了 ReadOptions::column_names,它会强制将表中的列名设置为这些值(CSV文件的第一行被读取为数据)
  • 如果 ReadOptions::autogenerate_column_names 为真,列名将使用模式“f0”、“f1”... 自动生成(CSV 文件的第一行被读取为数据)

列选择

默认情况下,Alkaid 会读取 CSV 文件中的所有列。您可以使用 ConvertOptions::include_columns 选项缩小列的选择范围。 如果 CSV 文件中缺少 ConvertOptions::include_columns 中的某些列,则会发出错误,除非 ConvertOptions::include_missing_columns 为真,在这种情况下,缺失的列被假定为包含全空值。

与列名的交互

如果同时指定了 ReadOptions::column_namesConvertOptions::include_columns,则 ReadOptions::column_names 被假定映射到 CSV 列,而 ConvertOptions::include_columns 是这些列名的子集, 这些列名将成为 Alkaid 表的一部分。

数据类型

默认情况下,CSV 读取器会推断出每列最合适的数据类型。类型推断会按以下顺序考虑以下数据类型:

  • Null
  • Int64
  • Boolean
  • Date32
  • Time32(以秒为单位)
  • Timestamp(以秒为单位)
  • Timestamp(以纳秒为单位)
  • Float64
  • Dictionary<String>(如果 ConvertOptions::auto_dict_encode 为 true)
  • Dictionary<Binary>(如果 ConvertOptions::auto_dict_encode 为 true)
  • String
  • Binary

可以通过设置 ConvertOptions::column_types 选项来覆盖所选列的类型推断。显式数据类型可从以下列表中选择:

  • Null
  • 所有整数类型
  • Float32 和 Float64
  • Decimal128
  • Boolean
  • Date32 和 Date64
  • Time32 和 Time64
  • Timestamp
  • 二进制和大型二进制
  • 字符串和大型字符串(带有可选的 UTF8 输入验证)
  • 固定大小二进制
  • 索引类型为 Int32 且值类型为以下之一的字典:二进制、字符串、LargeBinary、LargeString、Int32、UInt32、 Int64、UInt64、Float32、Float64、Decimal128

其他数据类型不支持从 CSV 值转换,并且会出错。

字典推理

如果启用了类型推断并且 ConvertOptions::auto_dict_encode 为 真,则 CSV 读取器首先尝试将类似字符串的列转换为 字典编码的类似字符串的数组。当达到 ConvertOptions::auto_dict_max_cardinality 中的阈值时,它会切换到普通的类似字符串的数组。

时间戳推断/解析

如果启用了类型推断,CSV 读取器首先尝试将 字符串类列解释为时间戳。如果所有行都有一些时区偏移 (例如 Z+0100),即使偏移不一致,推断的类型也将是 UTC 时间戳。如果没有行具有时区偏移,则推断的类型将为无 时区的时间戳。有/无偏移的行混合将产生字符串列。

如果将类型明确指定为有/无时区的时间戳,则读取器将在该列中没有/有时区偏移的值上出错。请注意,这意 味着目前无法让读取器将没有时区偏移的时间戳列解析为特定时区的本地时间;相反,将该列解析为无时区的时间戳,然后使用assume_timezone 计算函数转换值。

+-------------------+------------------------------+-------------------+
| Specified Type | Input CSV | Result Type |
+===================+==============================+===================+
| (inferred) | ``2021-01-01T00:00:00`` | timestamp[s] |
| +------------------------------+-------------------+
| | ``2021-01-01T00:00:00Z`` | timestamp[s, UTC] |
| +------------------------------+ |
| | ``2021-01-01T00:00:00+0100`` | |
| +------------------------------+-------------------+
| | :: | string |
| | | |
| | 2021-01-01T00:00:00 | |
| | 2021-01-01T00:00:00Z | |
+-------------------+------------------------------+-------------------+
| timestamp[s] | ``2021-01-01T00:00:00`` | timestamp[s] |
| +------------------------------+-------------------+
| | ``2021-01-01T00:00:00Z`` | (error) |
| +------------------------------+ |
| | ``2021-01-01T00:00:00+0100`` | |
| +------------------------------+ |
| | :: | |
| | | |
| | 2021-01-01T00:00:00 | |
| | 2021-01-01T00:00:00Z | |
+-------------------+------------------------------+-------------------+
| timestamp[s, UTC] | ``2021-01-01T00:00:00`` | (error) |
| +------------------------------+-------------------+
| | ``2021-01-01T00:00:00Z`` | timestamp[s, UTC] |
| +------------------------------+ |
| | ``2021-01-01T00:00:00+0100`` | |
| +------------------------------+-------------------+
| | :: | (error) |
| | | |
| | 2021-01-01T00:00:00 | |
| | 2021-01-01T00:00:00Z | |
+-------------------+------------------------------+-------------------+
| timestamp[s, | ``2021-01-01T00:00:00`` | (error) |
| America/New_York] +------------------------------+-------------------+
| | ``2021-01-01T00:00:00Z`` | timestamp[s, |
| +------------------------------+ America/New_York] |
| | ``2021-01-01T00:00:00+0100`` | |
| +------------------------------+-------------------+
| | :: | (error) |
| | | |
| | 2021-01-01T00:00:00 | |
| | 2021-01-01T00:00:00Z | |
+-------------------+------------------------------+-------------------+

空值

空值是根据存储在ConvertOptions::null_values 中的拼写来识别的。ConvertOptions::Defaults 工厂 方法将初始化许多常规空值拼写,例如N/A

字符编码

CSV 文件应采用 UTF8 编码。但是,二进制列可接受非 UTF8 数据。

写入选项

可以通过~alkaid::csv::WriteOptions 自定义写入的 CSV 文件的格式。目前可用的选项很少;未来版本中将添加更多选项。

性能

默认情况下,~alkaid::csv::TableReader 将并行读取,以利用机器上的所有 CPU 内核。您 可以在ReadOptions::use_threads 中更改此设置。合理的预期是高性能台式机或笔记本电脑上每核至少 100MB/s(以源 CSV 字节为单位,而不是目标 Alkaid 数据字节)。