Skip to content

Commit 018b441

Browse files
authored
Cli parquet (#12372)
1 parent e54c9ee commit 018b441

File tree

18 files changed

+407
-178
lines changed

18 files changed

+407
-178
lines changed

ydb/apps/ydb/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Use parquet format instead of CSV to fill tables in `ydb workload` benchmarks
12
* Made `--consumer` flag in `ydb topic read` command optional. Now if this flag is not specified, reading is performed in no-consumer mode. In this mode partition IDs should be specified with `--partition-ids` option.
23
* Fixed a bug in `ydb import file csv` where multiple columns with escaped quotes in the same row were parsed incorrectly
34
* Truncate query results output in benchmarks

ydb/core/grpc_services/rpc_load_rows.cpp

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -460,46 +460,13 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
460460
case EUploadSource::CSV:
461461
{
462462
auto& data = GetSourceData();
463-
auto& cvsSettings = GetCsvSettings();
464-
ui32 skipRows = cvsSettings.skip_rows();
465-
auto& delimiter = cvsSettings.delimiter();
466-
auto& nullValue = cvsSettings.null_value();
467-
bool withHeader = cvsSettings.header();
468-
469-
auto reader = NFormats::TArrowCSV::Create(SrcColumns, withHeader, NotNullColumns);
463+
auto& csvSettings = GetCsvSettings();
464+
auto reader = NFormats::TArrowCSVScheme::Create(SrcColumns, csvSettings.header(), NotNullColumns);
470465
if (!reader.ok()) {
471466
errorMessage = reader.status().ToString();
472467
return false;
473468
}
474-
const auto& quoting = cvsSettings.quoting();
475-
if (quoting.quote_char().length() > 1) {
476-
errorMessage = TStringBuilder() << "Wrong quote char '" << quoting.quote_char() << "'";
477-
return false;
478-
}
479-
const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front();
480-
reader->SetQuoting(!quoting.disabled(), qchar, !quoting.double_quote_disabled());
481-
reader->SetSkipRows(skipRows);
482-
483-
if (!delimiter.empty()) {
484-
if (delimiter.size() != 1) {
485-
errorMessage = TStringBuilder() << "Wrong delimiter '" << delimiter << "'";
486-
return false;
487-
}
488-
489-
reader->SetDelimiter(delimiter[0]);
490-
}
491-
492-
if (!nullValue.empty()) {
493-
reader->SetNullValue(nullValue);
494-
}
495-
496-
if (data.size() > NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) {
497-
ui32 blockSize = NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE;
498-
blockSize *= data.size() / blockSize + 1;
499-
reader->SetBlockSize(blockSize);
500-
}
501-
502-
Batch = reader->ReadSingleBatch(data, errorMessage);
469+
Batch = reader->ReadSingleBatch(data, csvSettings, errorMessage);
503470
if (!Batch) {
504471
return false;
505472
}

ydb/core/io_formats/arrow/csv_arrow.cpp renamed to ydb/core/io_formats/arrow/csv_arrow/csv_arrow.cpp

Lines changed: 68 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
#include "csv_arrow.h"
22

3-
#include <ydb/core/formats/arrow/arrow_helpers.h>
4-
#include <ydb/core/formats/arrow/serializer/stream.h>
5-
3+
#include <contrib/libs/apache/arrow/cpp/src/arrow/array.h>
4+
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
65
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
76
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/value_parsing.h>
87
#include <util/string/join.h>
@@ -43,29 +42,6 @@ class TimestampIntParser: public arrow::TimestampParser {
4342

4443
}
4544

46-
arrow::Result<TArrowCSV> TArrowCSV::Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header, const std::set<std::string>& notNullColumns) {
47-
TVector<TString> errors;
48-
TColummns convertedColumns;
49-
convertedColumns.reserve(columns.size());
50-
for (auto& [name, type] : columns) {
51-
const auto arrowType = NArrow::GetArrowType(type);
52-
if (!arrowType.ok()) {
53-
errors.emplace_back("column " + name + ": " + arrowType.status().ToString());
54-
continue;
55-
}
56-
const auto csvArrowType = NArrow::GetCSVArrowType(type);
57-
if (!csvArrowType.ok()) {
58-
errors.emplace_back("column " + name + ": " + csvArrowType.status().ToString());
59-
continue;
60-
}
61-
convertedColumns.emplace_back(TColumnInfo{name, *arrowType, *csvArrowType});
62-
}
63-
if (!errors.empty()) {
64-
return arrow::Status::TypeError(ErrorPrefix() + "columns errors: " + JoinSeq("; ", errors));
65-
}
66-
return TArrowCSV(convertedColumns, header, notNullColumns);
67-
}
68-
6945
TArrowCSV::TArrowCSV(const TColummns& columns, bool header, const std::set<std::string>& notNullColumns)
7046
: ReadOptions(arrow::csv::ReadOptions::Defaults())
7147
, ParseOptions(arrow::csv::ParseOptions::Defaults())
@@ -107,6 +83,27 @@ TArrowCSV::TArrowCSV(const TColummns& columns, bool header, const std::set<std::
10783
SetNullValue(); // set default null value
10884
}
10985

86+
namespace {
87+
88+
template<class TBuilder, class TOriginalArray>
89+
std::shared_ptr<arrow::Array> ConvertArray(std::shared_ptr<arrow::ArrayData> data, ui64 dev) {
90+
auto originalArr = std::make_shared<TOriginalArray>(data);
91+
TBuilder aBuilder;
92+
Y_ABORT_UNLESS(aBuilder.Reserve(originalArr->length()).ok());
93+
for (long i = 0; i < originalArr->length(); ++i) {
94+
if (originalArr->IsNull(i)) {
95+
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
96+
} else {
97+
aBuilder.UnsafeAppend(originalArr->Value(i) / dev);
98+
}
99+
}
100+
auto res = aBuilder.Finish();
101+
Y_ABORT_UNLESS(res.ok());
102+
return *res;
103+
}
104+
105+
}
106+
110107
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const {
111108
if (!parsedBatch) {
112109
return nullptr;
@@ -134,59 +131,20 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ConvertColumnTypes(std::shared_pt
134131
if (fArr->type()->Equals(originalType)) {
135132
resultColumns.emplace_back(fArr);
136133
} else if (fArr->type()->id() == arrow::TimestampType::type_id) {
137-
arrow::Result<std::shared_ptr<arrow::Array>> arrResult;
138-
{
139-
std::shared_ptr<arrow::TimestampArray> i64Arr = std::make_shared<arrow::TimestampArray>(fArr->data());
140-
if (originalType->id() == arrow::UInt16Type::type_id) {
141-
arrow::UInt16Builder aBuilder;
142-
Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
143-
for (long i = 0; i < parsedBatch->num_rows(); ++i) {
144-
if (i64Arr->IsNull(i)) {
145-
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
146-
} else {
147-
aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400ull);
148-
}
149-
}
150-
arrResult = aBuilder.Finish();
151-
} else if (originalType->id() == arrow::UInt32Type::type_id) {
152-
arrow::UInt32Builder aBuilder;
153-
Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
154-
for (long i = 0; i < parsedBatch->num_rows(); ++i) {
155-
if (i64Arr->IsNull(i)) {
156-
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
157-
} else {
158-
aBuilder.UnsafeAppend(i64Arr->Value(i));
159-
}
160-
}
161-
arrResult = aBuilder.Finish();
162-
} else if (originalType->id() == arrow::Int32Type::type_id) {
163-
arrow::Int32Builder aBuilder;
164-
Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
165-
for (long i = 0; i < parsedBatch->num_rows(); ++i) {
166-
if (i64Arr->IsNull(i)) {
167-
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
168-
} else {
169-
aBuilder.UnsafeAppend(i64Arr->Value(i) / 86400);
170-
}
171-
}
172-
arrResult = aBuilder.Finish();
173-
} else if (originalType->id() == arrow::Int64Type::type_id) {
174-
arrow::Int64Builder aBuilder;
175-
Y_ABORT_UNLESS(aBuilder.Reserve(parsedBatch->num_rows()).ok());
176-
for (long i = 0; i < parsedBatch->num_rows(); ++i) {
177-
if (i64Arr->IsNull(i)) {
178-
Y_ABORT_UNLESS(aBuilder.AppendNull().ok());
179-
} else {
180-
aBuilder.UnsafeAppend(i64Arr->Value(i));
181-
}
182-
}
183-
arrResult = aBuilder.Finish();
184-
} else {
134+
resultColumns.emplace_back([originalType, fArr]() {
135+
switch (originalType->id()) {
136+
case arrow::UInt16Type::type_id: // Date
137+
return ConvertArray<arrow::UInt16Builder, arrow::TimestampArray>(fArr->data(), 86400);
138+
case arrow::UInt32Type::type_id: // Datetime
139+
return ConvertArray<arrow::UInt32Builder, arrow::TimestampArray>(fArr->data(), 1);
140+
case arrow::Int32Type::type_id: // Date32
141+
return ConvertArray<arrow::Int32Builder, arrow::TimestampArray>(fArr->data(), 86400);
142+
case arrow::Int64Type::type_id:// Datetime64, Timestamp64
143+
return ConvertArray<arrow::Int64Builder, arrow::TimestampArray>(fArr->data(), 1);
144+
default:
185145
Y_ABORT_UNLESS(false);
186146
}
187-
}
188-
Y_ABORT_UNLESS(arrResult.ok());
189-
resultColumns.emplace_back(*arrResult);
147+
}());
190148
} else {
191149
Y_ABORT_UNLESS(false);
192150
}
@@ -204,7 +162,7 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
204162
return {};
205163
}
206164

207-
auto buffer = std::make_shared<NArrow::NSerialization::TBufferOverString>(csv);
165+
auto buffer = std::make_shared<arrow::Buffer>(arrow::util::string_view(csv.c_str(), csv.length()));
208166
auto input = std::make_shared<arrow::io::BufferReader>(buffer);
209167
auto res = arrow::csv::StreamingReader::Make(arrow::io::default_io_context(), input,
210168
ReadOptions, ParseOptions, ConvertOptions);
@@ -249,11 +207,9 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadNext(const TString& csv, TStr
249207
return {};
250208
}
251209

252-
if (batch && ResultColumns.size()) {
253-
batch = NArrow::TColumnOperator().ErrorIfAbsent().Extract(batch, ResultColumns);
254-
if (!batch) {
255-
errString = ErrorPrefix() + "not all result columns present";
256-
}
210+
if (batch && ResultColumns.size() && batch->schema()->fields().size() != ResultColumns.size()) {
211+
errString = ErrorPrefix() + "not all result columns present";
212+
batch.reset();
257213
}
258214
return batch;
259215
}
@@ -279,5 +235,34 @@ std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& cs
279235
}
280236
return batch;
281237
}
238+
std::shared_ptr<arrow::RecordBatch> TArrowCSV::ReadSingleBatch(const TString& csv, const Ydb::Formats::CsvSettings& csvSettings, TString& errString) {
239+
const auto& quoting = csvSettings.quoting();
240+
if (quoting.quote_char().length() > 1) {
241+
errString = ErrorPrefix() + "Wrong quote char '" + quoting.quote_char() + "'";
242+
return {};
243+
}
244+
245+
const char qchar = quoting.quote_char().empty() ? '"' : quoting.quote_char().front();
246+
SetQuoting(!quoting.disabled(), qchar, !quoting.double_quote_disabled());
247+
if (csvSettings.delimiter()) {
248+
if (csvSettings.delimiter().size() != 1) {
249+
errString = ErrorPrefix() + "Invalid delimitr in csv: " + csvSettings.delimiter();
250+
return {};
251+
}
252+
SetDelimiter(csvSettings.delimiter().front());
253+
}
254+
SetSkipRows(csvSettings.skip_rows());
255+
256+
if (csvSettings.null_value()) {
257+
SetNullValue(csvSettings.null_value());
258+
}
259+
260+
if (csv.size() > NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE) {
261+
ui32 blockSize = NKikimr::NFormats::TArrowCSV::DEFAULT_BLOCK_SIZE;
262+
blockSize *= csv.size() / blockSize + 1;
263+
SetBlockSize(blockSize);
264+
}
265+
return ReadSingleBatch(csv, errString);
266+
}
282267

283268
}

ydb/core/io_formats/arrow/csv_arrow.h renamed to ydb/core/io_formats/arrow/csv_arrow/csv_arrow.h

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
#pragma once
22

3-
#include <ydb/core/scheme_types/scheme_type_info.h>
4-
3+
#include <ydb/public/api/protos/ydb_formats.pb.h>
54
#include <contrib/libs/apache/arrow/cpp/src/arrow/csv/api.h>
65
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
6+
#include <util/generic/string.h>
7+
#include <util/generic/vector.h>
8+
#include <set>
9+
#include <vector>
10+
#include <unordered_map>
711

812
namespace NKikimr::NFormats {
913

1014
class TArrowCSV {
1115
public:
1216
static constexpr ui32 DEFAULT_BLOCK_SIZE = 1024 * 1024;
1317

14-
/// If header is true read column names from first line after skipRows. Parse columns as strings in this case.
15-
/// @note It's possible to skip header with skipRows and use typed columns instead.
16-
static arrow::Result<TArrowCSV> Create(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, bool header = false, const std::set<std::string>& notNullColumns = {});
17-
1818
std::shared_ptr<arrow::RecordBatch> ReadNext(const TString& csv, TString& errString);
1919
std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, TString& errString);
20+
std::shared_ptr<arrow::RecordBatch> ReadSingleBatch(const TString& csv, const Ydb::Formats::CsvSettings& csvSettings, TString& errString);
2021

2122
void Reset() {
2223
Reader = {};
@@ -49,14 +50,20 @@ class TArrowCSV {
4950

5051
void SetNullValue(const TString& null = "");
5152

52-
private:
53+
protected:
5354
struct TColumnInfo {
5455
TString Name;
5556
std::shared_ptr<arrow::DataType> ArrowType;
5657
std::shared_ptr<arrow::DataType>CsvArrowType;
5758
};
5859
using TColummns = TVector<TColumnInfo>;
5960
TArrowCSV(const TColummns& columns, bool header, const std::set<std::string>& notNullColumns);
61+
62+
static TString ErrorPrefix() {
63+
return "Cannot read CSV: ";
64+
}
65+
66+
private:
6067
arrow::csv::ReadOptions ReadOptions;
6168
arrow::csv::ParseOptions ParseOptions;
6269
arrow::csv::ConvertOptions ConvertOptions;
@@ -66,10 +73,6 @@ class TArrowCSV {
6673
std::set<std::string> NotNullColumns;
6774

6875
std::shared_ptr<arrow::RecordBatch> ConvertColumnTypes(std::shared_ptr<arrow::RecordBatch> parsedBatch) const;
69-
70-
static TString ErrorPrefix() {
71-
return "Cannot read CSV: ";
72-
}
7376
};
7477

7578
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
csv_arrow.cpp
5+
)
6+
7+
PEERDIR(
8+
contrib/libs/apache/arrow
9+
ydb/public/api/protos
10+
)
11+
12+
END()

ydb/core/io_formats/arrow/csv_arrow_ut.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "csv_arrow.h"
1+
#include <ydb/core/io_formats/arrow/scheme/scheme.h>
22

33
#include <ydb/core/formats/arrow/arrow_helpers.h>
44
#include <library/cpp/testing/unittest/registar.h>
@@ -64,7 +64,7 @@ TestReadSingleBatch(TArrowCSV& reader,
6464
std::shared_ptr<arrow::RecordBatch>
6565
TestReadSingleBatch(const TVector<std::pair<TString, NScheme::TTypeInfo>>& columns, const TString& data,
6666
char delimiter, bool header, ui32 numRows, ui32 skipRows = 0, std::optional<char> escape = {}) {
67-
auto reader = TArrowCSV::Create(columns, header);
67+
auto reader = TArrowCSVScheme::Create(columns, header);
6868
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
6969
reader->SetDelimiter(delimiter);
7070
if (skipRows) {
@@ -98,7 +98,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
9898
};
9999
TInstant dtInstant;
100100
Y_ABORT_UNLESS(TInstant::TryParseIso8601(dateTimeString, dtInstant));
101-
auto reader = TArrowCSV::Create(columns, false);
101+
auto reader = TArrowCSVScheme::Create(columns, false);
102102
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
103103

104104
TString errorMessage;
@@ -159,7 +159,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
159159
TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
160160

161161
{
162-
auto reader = TArrowCSV::Create(columns, false);
162+
auto reader = TArrowCSVScheme::Create(columns, false);
163163
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
164164

165165
TString errorMessage;
@@ -175,7 +175,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
175175
{"i64", NScheme::TTypeInfo(NScheme::NTypeIds::Int64)}
176176
};
177177

178-
auto reader = TArrowCSV::Create(columns, false);
178+
auto reader = TArrowCSVScheme::Create(columns, false);
179179
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
180180

181181
TString errorMessage;
@@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(FormatCSV) {
297297
csv += TString() + null + delimiter + q + null + q + delimiter + q + null + q + endLine;
298298
csv += TString() + null + delimiter + null + delimiter + null + endLine;
299299

300-
auto reader = TArrowCSV::Create(columns, false);
300+
auto reader = TArrowCSVScheme::Create(columns, false);
301301
UNIT_ASSERT_C(reader.ok(), reader.status().ToString());
302302
if (!nulls.empty() || !defaultNull) {
303303
reader->SetNullValue(null);

0 commit comments

Comments
 (0)