Skip to content

Commit 7944425

Browse files
Merge 6e17260 into c0b4856
2 parents c0b4856 + 6e17260 commit 7944425

File tree

12 files changed

+158
-8
lines changed

12 files changed

+158
-8
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
349349
meta->Attributes.erase("withinfer");
350350

351351
auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
352-
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
352+
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
353353
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));
354354

355355
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@
1313
#include <ydb/core/external_sources/object_storage/events.h>
1414
#include <ydb/library/actors/core/actor_bootstrapped.h>
1515
#include <ydb/library/actors/core/hfunc.h>
16+
#include <ydb/library/yql/providers/s3/compressors/factory.h>
17+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromString.h>
1618

1719
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
1820

1921
class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher> {
2022
static constexpr uint64_t PrefixSize = 10_MB;
2123
public:
22-
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format)
24+
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
2325
: S3FetcherId_{s3FetcherId}
2426
, Format_{format}
2527
{
2628
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));
29+
30+
auto decompression = params.FindPtr("compression");
31+
if (decompression) {
32+
DecompressionFormat_ = *decompression;
33+
}
2734
}
2835

2936
void Bootstrap() {
@@ -67,6 +74,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
6774

6875
const auto& request = requestIt->second;
6976

77+
TString data = std::move(response.Data);
78+
if (DecompressionFormat_) {
79+
auto decompressedData = DecompressFile(data, request, ctx);
80+
if (!decompressedData) {
81+
return;
82+
}
83+
data = std::move(*decompressedData);
84+
}
85+
7086
std::shared_ptr<arrow::io::RandomAccessFile> file;
7187
switch (Format_) {
7288
case EFileFormat::CsvWithNames:
@@ -76,7 +92,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
7692
if (Format_ == EFileFormat::TsvWithNames) {
7793
options.delimiter = '\t';
7894
}
79-
file = CleanupCsvFile(response.Data, request, options, ctx);
95+
file = CleanupCsvFile(data, request, options, ctx);
8096
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
8197
break;
8298
}
@@ -135,6 +151,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
135151

136152
// Cutting file
137153

154+
TMaybe<TString> DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
155+
try {
156+
auto dataBuffer = NDB::ReadBufferFromString(data);
157+
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
158+
if (!decompressorBuffer) {
159+
auto error = MakeError(
160+
request.Path,
161+
NFq::TIssuesIds::INTERNAL_ERROR,
162+
TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
163+
);
164+
SendError(ctx, error);
165+
return {};
166+
}
167+
168+
TStringBuilder decompressedData;
169+
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
170+
decompressorBuffer->nextIfAtEnd();
171+
size_t maxDecompressedChunkSize = std::min(
172+
decompressorBuffer->available(),
173+
10_MB - decompressedData.size()
174+
);
175+
TString decompressedChunk{maxDecompressedChunkSize, ' '};
176+
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
177+
decompressedData << decompressedChunk;
178+
}
179+
return std::move(decompressedData);
180+
} catch (const yexception& error) {
181+
auto errorEv = MakeError(
182+
request.Path,
183+
NFq::TIssuesIds::INTERNAL_ERROR,
184+
TStringBuilder{} << "couldn't decompress file, check format and compression params: " << error.AsStrBuf()
185+
);
186+
SendError(ctx, errorEv);
187+
return {};
188+
}
189+
}
190+
138191
std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
139192
auto chunker = arrow::csv::MakeChunker(options);
140193
std::shared_ptr<arrow::Buffer> whole, partial;
@@ -183,10 +236,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
183236
// Fields
184237
NActors::TActorId S3FetcherId_;
185238
EFileFormat Format_;
239+
TMaybe<TString> DecompressionFormat_;
186240
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
187241
};
188242

189-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format) {
190-
return new TArrowFileFetcher{s3FetcherId, format};
243+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params) {
244+
return new TArrowFileFetcher{s3FetcherId, format, params};
191245
}
192246
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/arrow_fetcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
77

8-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format);
8+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params);
99
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ArrowInferenceTest : public testing::Test {
5050

5151
NActors::TActorId RegisterInferencinator(TStringBuf formatStr) {
5252
auto format = NInference::ConvertFileFormat(formatStr);
53-
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format), 1);
53+
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format, {}), 1);
5454
return ActorSystem.Register(NInference::CreateArrowInferencinator(arrowFetcher, format, {}), 1);
5555
}
5656

ydb/core/external_sources/object_storage/inference/ya.make

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
LIBRARY()
22

3+
ADDINCL(
4+
ydb/library/yql/udfs/common/clickhouse/client/base
5+
ydb/library/yql/udfs/common/clickhouse/client/src
6+
)
7+
38
SRCS(
49
arrow_fetcher.cpp
510
arrow_inferencinator.cpp
@@ -9,6 +14,9 @@ PEERDIR(
914
contrib/libs/apache/arrow
1015

1116
ydb/core/external_sources/object_storage
17+
18+
ydb/library/yql/providers/s3/compressors
19+
ydb/library/yql/udfs/common/clickhouse/client
1220
)
1321

1422
END()
45 Bytes
Binary file not shown.
74 Bytes
Binary file not shown.
66 Bytes
Binary file not shown.
59 Bytes
Binary file not shown.
96 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)