Skip to content

Commit af43ee2

Browse files
Merge 49f3da1 into 4efd471
2 parents 4efd471 + 49f3da1 commit af43ee2

File tree

13 files changed

+159
-8
lines changed

13 files changed

+159
-8
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
349349
meta->Attributes.erase("withinfer");
350350

351351
auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
352-
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
352+
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat, meta->Attributes));
353353
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));
354354

355355
return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {

ydb/core/external_sources/object_storage/inference/arrow_fetcher.cpp

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@
1313
#include <ydb/core/external_sources/object_storage/events.h>
1414
#include <ydb/library/actors/core/actor_bootstrapped.h>
1515
#include <ydb/library/actors/core/hfunc.h>
16+
#include <ydb/library/yql/providers/s3/compressors/factory.h>
17+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromString.h>
1618

1719
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
1820

1921
class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher> {
2022
static constexpr uint64_t PrefixSize = 10_MB;
2123
public:
22-
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format)
24+
TArrowFileFetcher(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params)
2325
: S3FetcherId_{s3FetcherId}
2426
, Format_{format}
2527
{
2628
Y_ABORT_UNLESS(IsArrowInferredFormat(Format_));
29+
30+
auto decompression = params.FindPtr("compression");
31+
if (decompression) {
32+
DecompressionFormat_ = *decompression;
33+
}
2734
}
2835

2936
void Bootstrap() {
@@ -67,6 +74,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
6774

6875
const auto& request = requestIt->second;
6976

77+
TString data = std::move(response.Data);
78+
if (DecompressionFormat_) {
79+
auto decompressedData = DecompressFile(data, request, ctx);
80+
if (!decompressedData) {
81+
return;
82+
}
83+
data = std::move(*decompressedData);
84+
}
85+
7086
std::shared_ptr<arrow::io::RandomAccessFile> file;
7187
switch (Format_) {
7288
case EFileFormat::CsvWithNames:
@@ -76,7 +92,7 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
7692
if (Format_ == EFileFormat::TsvWithNames) {
7793
options.delimiter = '\t';
7894
}
79-
file = CleanupCsvFile(response.Data, request, options, ctx);
95+
file = CleanupCsvFile(data, request, options, ctx);
8096
ctx.Send(request.Requester, new TEvArrowFile(std::move(file), request.Path));
8197
break;
8298
}
@@ -135,6 +151,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
135151

136152
// Cutting file
137153

154+
TMaybe<TString> DecompressFile(const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
155+
try {
156+
NDB::ReadBufferFromString dataBuffer(data);
157+
auto decompressorBuffer = NYql::MakeDecompressor(dataBuffer, *DecompressionFormat_);
158+
if (!decompressorBuffer) {
159+
auto error = MakeError(
160+
request.Path,
161+
NFq::TIssuesIds::INTERNAL_ERROR,
162+
TStringBuilder{} << "unknown compression: " << *DecompressionFormat_ << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
163+
);
164+
SendError(ctx, error);
165+
return {};
166+
}
167+
168+
TStringBuilder decompressedData;
169+
while (!decompressorBuffer->eof() && decompressedData.size() < 10_MB) {
170+
decompressorBuffer->nextIfAtEnd();
171+
size_t maxDecompressedChunkSize = std::min(
172+
decompressorBuffer->available(),
173+
10_MB - decompressedData.size()
174+
);
175+
TString decompressedChunk{maxDecompressedChunkSize, ' '};
176+
decompressorBuffer->read(&decompressedChunk.front(), maxDecompressedChunkSize);
177+
decompressedData << decompressedChunk;
178+
}
179+
return std::move(decompressedData);
180+
} catch (const yexception& error) {
181+
auto errorEv = MakeError(
182+
request.Path,
183+
NFq::TIssuesIds::INTERNAL_ERROR,
184+
TStringBuilder{} << "couldn't decompress file, check compression params: " << error.what()
185+
);
186+
SendError(ctx, errorEv);
187+
return {};
188+
}
189+
}
190+
138191
std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile(const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
139192
auto chunker = arrow::csv::MakeChunker(options);
140193
std::shared_ptr<arrow::Buffer> whole, partial;
@@ -183,10 +236,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
183236
// Fields
184237
NActors::TActorId S3FetcherId_;
185238
EFileFormat Format_;
239+
TMaybe<TString> DecompressionFormat_;
186240
std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
187241
};
188242

189-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format) {
190-
return new TArrowFileFetcher{s3FetcherId, format};
243+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params) {
244+
return new TArrowFileFetcher{s3FetcherId, format, params};
191245
}
192246
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/arrow_fetcher.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
namespace NKikimr::NExternalSource::NObjectStorage::NInference {
77

8-
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format);
8+
NActors::IActor* CreateArrowFetchingActor(NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params);
99
} // namespace NKikimr::NExternalSource::NObjectStorage::NInference

ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ArrowInferenceTest : public testing::Test {
5050

5151
NActors::TActorId RegisterInferencinator(TStringBuf formatStr) {
5252
auto format = NInference::ConvertFileFormat(formatStr);
53-
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format), 1);
53+
auto arrowFetcher = ActorSystem.Register(NInference::CreateArrowFetchingActor(S3ActorId, format, {}), 1);
5454
return ActorSystem.Register(NInference::CreateArrowInferencinator(arrowFetcher, format, {}), 1);
5555
}
5656

ydb/core/external_sources/object_storage/inference/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
GTEST()
22

33
PEERDIR(
4+
ydb/library/yql/public/udf/service/stub
45
ydb/core/external_sources/object_storage/inference
56
ydb/core/external_sources/object_storage
67
ydb/core/tx/scheme_board

ydb/core/external_sources/object_storage/inference/ya.make

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
LIBRARY()
22

3+
ADDINCL(
4+
ydb/library/yql/udfs/common/clickhouse/client/base
5+
ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random
6+
ydb/library/yql/udfs/common/clickhouse/client/src
7+
)
8+
39
SRCS(
410
arrow_fetcher.cpp
511
arrow_inferencinator.cpp
@@ -9,6 +15,8 @@ PEERDIR(
915
contrib/libs/apache/arrow
1016

1117
ydb/core/external_sources/object_storage
18+
19+
ydb/library/yql/providers/s3/compressors
1220
)
1321

1422
END()
45 Bytes
Binary file not shown.
74 Bytes
Binary file not shown.
66 Bytes
Binary file not shown.
59 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)