Skip to content

Commit f0d8213

Browse files
authored
Ability for ydb-cli to write request results as arrow parquet (#673)
Возможность записи parquet через ydb-cli
1 parent 49000bc commit f0d8213

File tree

12 files changed

+291
-4
lines changed

12 files changed

+291
-4
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
#include "result_set_parquet_printer.h"
2+
3+
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
4+
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
5+
6+
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/file.h>
7+
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/stdio.h>
8+
#include <contrib/libs/apache/arrow/cpp/src/parquet/stream_writer.h>
9+
#include <contrib/libs/apache/arrow/cpp/src/parquet/schema.h>
10+
11+
#include <util/folder/path.h>
12+
13+
namespace NYdb {
14+
15+
class TResultSetParquetPrinter::TImpl {
16+
public:
17+
explicit TImpl(const std::string& outputPath, ui64 rowGroupSize);
18+
void Reset();
19+
void Print(const TResultSet& resultSet);
20+
21+
private:
22+
void InitStream(const TResultSet& resultSet);
23+
static parquet::schema::NodePtr ToParquetType(const char* name, const TTypeParser& type, bool nullable);
24+
25+
private:
26+
std::unique_ptr<parquet::StreamWriter> Stream;
27+
const std::string OutputPath;
28+
const ui64 RowGroupSize;
29+
};
30+
31+
void TResultSetParquetPrinter::TImpl::InitStream(const TResultSet& resultSet) {
32+
parquet::schema::NodeVector fields;
33+
for (const auto& field : resultSet.GetColumnsMeta()) {
34+
TTypeParser type(field.Type);
35+
bool nullable = false;
36+
if (type.GetKind() == TTypeParser::ETypeKind::Optional) {
37+
nullable = true;
38+
type.OpenOptional();
39+
}
40+
fields.emplace_back(ToParquetType(field.Name.c_str(), type, nullable));
41+
}
42+
auto schema = std::static_pointer_cast<parquet::schema::GroupNode>(
43+
parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields));
44+
parquet::WriterProperties::Builder builder;
45+
builder.compression(parquet::Compression::ZSTD);
46+
builder.disable_dictionary();
47+
std::shared_ptr<arrow::io::OutputStream> outstream;
48+
if (OutputPath.empty()) {
49+
outstream = std::make_shared<arrow::io::StdoutStream>();
50+
} else {
51+
if (auto parent = TFsPath(OutputPath.c_str()).Parent()) {
52+
parent.MkDirs();
53+
}
54+
outstream = *arrow::io::FileOutputStream::Open(OutputPath);
55+
}
56+
Stream = std::make_unique<parquet::StreamWriter>(parquet::ParquetFileWriter::Open(outstream, schema, builder.build()));
57+
Stream->SetMaxRowGroupSize(RowGroupSize);
58+
}
59+
60+
parquet::schema::NodePtr TResultSetParquetPrinter::TImpl::ToParquetType(const char* name, const TTypeParser& type, bool nullable) {
61+
if (type.GetKind() != TTypeParser::ETypeKind::Primitive) {
62+
ythrow yexception() << "Cannot save not primitive type to parquet: " << type.GetKind();
63+
}
64+
const auto repType = nullable ? parquet::Repetition::OPTIONAL : parquet::Repetition::REQUIRED;
65+
switch (type.GetPrimitive()) {
66+
case EPrimitiveType::Bool:
67+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BOOLEAN);
68+
case EPrimitiveType::Int8:
69+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::INT_8);
70+
case EPrimitiveType::Uint8:
71+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::UINT_8);
72+
case EPrimitiveType::Int16:
73+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::INT_16);
74+
case EPrimitiveType::Uint16:
75+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::UINT_16);
76+
case EPrimitiveType::Int32:
77+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::INT_32);
78+
case EPrimitiveType::Uint32:
79+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::UINT_32);
80+
case EPrimitiveType::Int64:
81+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT64, parquet::ConvertedType::INT_64);
82+
case EPrimitiveType::Uint64:
83+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT64, parquet::ConvertedType::UINT_64);
84+
case EPrimitiveType::Float:
85+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::FLOAT);
86+
case EPrimitiveType::Double:
87+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::DOUBLE);
88+
case EPrimitiveType::Date:
89+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT32, parquet::ConvertedType::UINT_32);
90+
case EPrimitiveType::Timestamp:
91+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT64, parquet::ConvertedType::INT_64);
92+
case EPrimitiveType::Interval:
93+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::INT64, parquet::ConvertedType::INT_64);
94+
case EPrimitiveType::String:
95+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8);
96+
case EPrimitiveType::Utf8:
97+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8);
98+
case EPrimitiveType::Yson:
99+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8);
100+
case EPrimitiveType::Json:
101+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8);
102+
case EPrimitiveType::JsonDocument:
103+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8);
104+
case EPrimitiveType::DyNumber:
105+
return parquet::schema::PrimitiveNode::Make(name, repType, parquet::Type::BYTE_ARRAY, parquet::ConvertedType::UTF8);
106+
default:
107+
ythrow yexception() << "Cannot save type to parquet: " << type.GetPrimitive();
108+
}
109+
}
110+
111+
TResultSetParquetPrinter::TImpl::TImpl(const std::string& outputPath, ui64 rowGroupSize)
112+
: OutputPath(outputPath)
113+
, RowGroupSize(rowGroupSize)
114+
{}
115+
116+
void TResultSetParquetPrinter::TImpl::Reset() {
117+
Stream.reset();
118+
}
119+
120+
void TResultSetParquetPrinter::TImpl::Print(const TResultSet& resultSet) {
121+
if (!Stream) {
122+
InitStream(resultSet);
123+
}
124+
auto& os = *Stream;
125+
TResultSetParser parser(resultSet);
126+
while (parser.TryNextRow()) {
127+
for (ui32 i = 0; i < resultSet.GetColumnsMeta().size(); ++i) {
128+
TValueParser value(parser.GetValue(i));
129+
bool nullable = value.GetKind() == TTypeParser::ETypeKind::Optional;
130+
if (nullable) {
131+
value.OpenOptional();
132+
if (value.IsNull()) {
133+
os.SkipColumns(1);
134+
continue;
135+
}
136+
}
137+
if (value.GetKind() != TTypeParser::ETypeKind::Primitive) {
138+
ythrow yexception() << "Cannot save not primitive type to parquet: " << value.GetKind();
139+
}
140+
switch (value.GetPrimitiveType()) {
141+
case EPrimitiveType::Bool:
142+
os << value.GetBool();
143+
break;
144+
case EPrimitiveType::Int8:
145+
os << value.GetInt8();
146+
break;
147+
case EPrimitiveType::Uint8:
148+
os << value.GetUint8();
149+
break;
150+
case EPrimitiveType::Int16:
151+
os << value.GetInt16();
152+
break;
153+
case EPrimitiveType::Uint16:
154+
os << value.GetUint16();
155+
break;
156+
case EPrimitiveType::Int32:
157+
os << value.GetInt32();
158+
break;
159+
case EPrimitiveType::Uint32:
160+
os << value.GetUint32();
161+
break;
162+
case EPrimitiveType::Int64:
163+
os << (std::int64_t)value.GetInt64();
164+
break;
165+
case EPrimitiveType::Uint64:
166+
os << (std::uint64_t)value.GetUint64();
167+
break;
168+
case EPrimitiveType::Float:
169+
os << value.GetFloat();
170+
break;
171+
case EPrimitiveType::Double:
172+
os << value.GetDouble();
173+
break;
174+
case EPrimitiveType::Date:
175+
os << (ui32)value.GetDate().Seconds();
176+
break;
177+
case EPrimitiveType::Timestamp:
178+
os << (std::int64_t)value.GetTimestamp().MicroSeconds();
179+
break;
180+
case EPrimitiveType::Interval:
181+
os << (std::int64_t)value.GetInterval();
182+
break;
183+
case EPrimitiveType::String:
184+
os << arrow::util::string_view(value.GetString().c_str(), value.GetString().length());
185+
break;
186+
case EPrimitiveType::Utf8:
187+
os << arrow::util::string_view(value.GetUtf8().c_str(), value.GetUtf8().length());
188+
break;
189+
case EPrimitiveType::Yson:
190+
os << arrow::util::string_view(value.GetYson().c_str(), value.GetYson().length());
191+
break;
192+
case EPrimitiveType::Json:
193+
os << arrow::util::string_view(value.GetJson().c_str(), value.GetJson().length());
194+
break;
195+
case EPrimitiveType::JsonDocument:
196+
os << arrow::util::string_view(value.GetJsonDocument().c_str(), value.GetJsonDocument().length());
197+
break;
198+
case EPrimitiveType::DyNumber:
199+
os << arrow::util::string_view(value.GetDyNumber().c_str(), value.GetDyNumber().length());
200+
break;
201+
default:
202+
ythrow yexception() << "Cannot save type to parquet: " << value.GetPrimitiveType();
203+
}
204+
}
205+
os.EndRow();
206+
}
207+
}
208+
209+
TResultSetParquetPrinter::TResultSetParquetPrinter(const std::string& outputPath, ui64 rowGroupSize /*= 100000*/)
210+
: Impl(std::make_unique<TImpl>(outputPath, rowGroupSize))
211+
{}
212+
213+
TResultSetParquetPrinter::~TResultSetParquetPrinter() {
214+
}
215+
216+
void TResultSetParquetPrinter::Reset() {
217+
Impl->Reset();
218+
}
219+
220+
void TResultSetParquetPrinter::Print(const TResultSet& resultSet) {
221+
Impl->Print(resultSet);
222+
}
223+
224+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#pragma once
2+
3+
#include <util/system/types.h>
4+
5+
#include <string>
6+
7+
namespace NYdb {
8+
9+
class TResultSet;
10+
11+
class TResultSetParquetPrinter {
12+
public:
13+
explicit TResultSetParquetPrinter(const std::string& outputPath, ui64 rowGroupSize = 100000);
14+
~TResultSetParquetPrinter();
15+
void Reset();
16+
void Print(const TResultSet& resultSet);
17+
18+
private:
19+
class TImpl;
20+
std::unique_ptr<TImpl> Impl;
21+
};
22+
23+
}

ydb/library/arrow_parquet/ya.make

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
LIBRARY()
2+
3+
OWNER(g:kikimr)
4+
5+
SRCS(
6+
result_set_parquet_printer.cpp
7+
)
8+
9+
PEERDIR(
10+
ydb/public/sdk/cpp/client/ydb_value
11+
contrib/libs/apache/arrow
12+
)
13+
14+
END()

ydb/library/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ RECURSE(
44
aclib
55
arrow_clickhouse
66
arrow_kernels
7+
arrow_parquet
78
backup
89
binary_json
910
chunks_limiter

ydb/public/lib/ydb_cli/commands/ydb_service_scripting.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ void TCommandExecuteYqlScript::Config(TConfig& config) {
3838
EOutputFormat::JsonUnicode,
3939
EOutputFormat::JsonUnicodeArray,
4040
EOutputFormat::JsonBase64,
41-
EOutputFormat::JsonBase64Array
41+
EOutputFormat::JsonBase64Array,
42+
EOutputFormat::Parquet,
4243
});
4344

4445
AddParametersOption(config);

ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,8 @@ void TCommandExecuteQuery::Config(TConfig& config) {
368368
EOutputFormat::JsonBase64,
369369
EOutputFormat::JsonBase64Array,
370370
EOutputFormat::Csv,
371-
EOutputFormat::Tsv
371+
EOutputFormat::Tsv,
372+
EOutputFormat::Parquet,
372373
});
373374

374375
AddParametersOption(config, "(for data & scan queries)");
@@ -1023,7 +1024,8 @@ void TCommandReadTable::Config(TConfig& config) {
10231024
EOutputFormat::JsonBase64,
10241025
EOutputFormat::JsonBase64Array,
10251026
EOutputFormat::Csv,
1026-
EOutputFormat::Tsv
1027+
EOutputFormat::Tsv,
1028+
EOutputFormat::Parquet,
10271029
});
10281030

10291031
config.SetFreeArgsNum(1);

ydb/public/lib/ydb_cli/commands/ydb_yql.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ void TCommandYql::Config(TConfig& config) {
3939
EOutputFormat::JsonBase64,
4040
EOutputFormat::JsonBase64Array,
4141
EOutputFormat::Csv,
42-
EOutputFormat::Tsv
42+
EOutputFormat::Tsv,
43+
EOutputFormat::Parquet,
4344
});
4445

4546
AddParametersOption(config);

ydb/public/lib/ydb_cli/common/format.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <library/cpp/json/json_prettifier.h>
55

66
#include <ydb/public/lib/json_value/ydb_json_value.h>
7+
#include <ydb/library/arrow_parquet/result_set_parquet_printer.h>
78

89
namespace NYdb {
910
namespace NConsoleClient {
@@ -47,6 +48,7 @@ namespace {
4748
{ EOutputFormat::ProtoJsonBase64, "Output result protobuf in json format, binary strings are encoded with base64" },
4849
{ EOutputFormat::Csv, "Output in csv format" },
4950
{ EOutputFormat::Tsv, "Output in tsv format" },
51+
{ EOutputFormat::Parquet, "Output in parquet format" },
5052
};
5153

5254
THashMap<EMessagingFormat, TString> MessagingFormatDescriptions = {
@@ -713,6 +715,7 @@ TString TQueryPlanPrinter::JsonToString(const NJson::TJsonValue& jsonValue) {
713715
TResultSetPrinter::TResultSetPrinter(EOutputFormat format, std::function<bool()> isInterrupted)
714716
: Format(format)
715717
, IsInterrupted(isInterrupted)
718+
, ParquetPrinter(std::make_unique<TResultSetParquetPrinter>(""))
716719
{}
717720

718721
TResultSetPrinter::~TResultSetPrinter() {
@@ -750,6 +753,9 @@ void TResultSetPrinter::Print(const TResultSet& resultSet) {
750753
case EOutputFormat::Tsv:
751754
PrintCsv(resultSet, "\t");
752755
break;
756+
case EOutputFormat::Parquet:
757+
ParquetPrinter->Print(resultSet);
758+
break;
753759
default:
754760
throw TMisuseException() << "This command doesn't support " << Format << " output format";
755761
}
@@ -783,6 +789,9 @@ void TResultSetPrinter::EndResultSet() {
783789
case EOutputFormat::JsonBase64Array:
784790
Cout << ']' << Endl;
785791
break;
792+
case EOutputFormat::Parquet:
793+
ParquetPrinter->Reset();
794+
break;
786795
default:
787796
break;
788797
}

ydb/public/lib/ydb_cli/common/format.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@
88
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
99
#include <ydb/public/sdk/cpp/client/ydb_types/status/status.h>
1010

11+
namespace NYdb {
12+
13+
class TResultSetParquetPrinter;
14+
15+
}
16+
1117
namespace NYdb {
1218
namespace NConsoleClient {
1319

@@ -81,6 +87,7 @@ class TResultSetPrinter {
8187
bool PrintedSomething = false;
8288
EOutputFormat Format;
8389
std::function<bool()> IsInterrupted;
90+
std::unique_ptr<TResultSetParquetPrinter> ParquetPrinter;
8491
};
8592

8693
class TQueryPlanPrinter {

ydb/public/lib/ydb_cli/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ PEERDIR(
4646
ydb/public/sdk/cpp/client/ydb_topic
4747
ydb/public/sdk/cpp/client/ydb_types
4848
ydb/public/sdk/cpp/client/ydb_types/credentials
49+
ydb/library/arrow_parquet
4950
)
5051

5152
GENERATE_ENUM_SERIALIZATION(formats.h)

0 commit comments

Comments
 (0)