66#include < arrow/csv/chunker.h>
77#include < arrow/csv/options.h>
88#include < arrow/io/memory.h>
9+ #include < arrow/util/endian.h>
910
1011#include < util/generic/guid.h>
1112#include < util/generic/size_literals.h>
1213
1314#include < ydb/core/external_sources/object_storage/events.h>
1415#include < ydb/library/actors/core/actor_bootstrapped.h>
1516#include < ydb/library/actors/core/hfunc.h>
17+ #include < ydb/library/yql/providers/s3/compressors/factory.h>
18+ #include < ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBufferFromString.h>
1619
1720namespace NKikimr ::NExternalSource::NObjectStorage::NInference {
1821
1922class TArrowFileFetcher : public NActors ::TActorBootstrapped<TArrowFileFetcher> {
2023 static constexpr uint64_t PrefixSize = 10_MB;
2124public:
22- TArrowFileFetcher (NActors::TActorId s3FetcherId, EFileFormat format)
25+ TArrowFileFetcher (NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params )
2326 : S3FetcherId_{s3FetcherId}
2427 , Format_{format}
2528 {
2629 Y_ABORT_UNLESS (IsArrowInferredFormat (Format_));
30+
31+ auto decompression = params.FindPtr (" compression" );
32+ if (decompression) {
33+ DecompressionFormat_ = *decompression;
34+ }
2735 }
2836
2937 void Bootstrap () {
@@ -40,15 +48,20 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
4048 const auto & request = *ev->Get ();
4149 TRequest localRequest{
4250 .Path = request.Path ,
43- .RequestId = {} ,
51+ .RequestId = TGUID::Create () ,
4452 .Requester = ev->Sender ,
53+ .MetadataRequest = false ,
4554 };
46- CreateGuid (&localRequest.RequestId );
4755
4856 switch (Format_) {
4957 case EFileFormat::CsvWithNames:
5058 case EFileFormat::TsvWithNames: {
51- HandleAsPrefixFile (std::move (localRequest), ctx);
59+ RequestPartialFile (std::move (localRequest), ctx, 0 , 10_MB);
60+ break ;
61+ }
62+ case EFileFormat::Parquet: {
63+ localRequest.MetadataRequest = true ;
64+ RequestPartialFile (std::move (localRequest), ctx, request.Size - 8 , request.Size - 4 );
5265 break ;
5366 }
5467 default : {
@@ -67,6 +80,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
6780
6881 const auto & request = requestIt->second ;
6982
83+ TString data = std::move (response.Data );
84+ if (DecompressionFormat_) {
85+ auto decompressedData = DecompressFile (data, request, ctx);
86+ if (!decompressedData) {
87+ return ;
88+ }
89+ data = std::move (*decompressedData);
90+ }
91+
7092 std::shared_ptr<arrow::io::RandomAccessFile> file;
7193 switch (Format_) {
7294 case EFileFormat::CsvWithNames:
@@ -76,7 +98,16 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
7698 if (Format_ == EFileFormat::TsvWithNames) {
7799 options.delimiter = ' \t ' ;
78100 }
79- file = CleanupCsvFile (response.Data , request, options, ctx);
101+ file = CleanupCsvFile (data, request, options, ctx);
102+ ctx.Send (request.Requester , new TEvArrowFile (std::move (file), request.Path ));
103+ break ;
104+ }
105+ case EFileFormat::Parquet: {
106+ if (request.MetadataRequest ) {
107+ HandleMetadataSizeRequest (data, request, ctx);
108+ return ;
109+ }
110+ file = BuildParquetFileFromMetadata (data, request, ctx);
80111 ctx.Send (request.Requester , new TEvArrowFile (std::move (file), request.Path ));
81112 break ;
82113 }
@@ -104,14 +135,15 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
104135 uint64_t From = 0 ;
105136 uint64_t To = 0 ;
106137 NActors::TActorId Requester;
138+ bool MetadataRequest;
107139 };
108140
109141 // Reading file
110142
111- void HandleAsPrefixFile (TRequest&& insertedRequest, const NActors::TActorContext& ctx) {
143+ void RequestPartialFile (TRequest&& insertedRequest, const NActors::TActorContext& ctx, uint64_t from, uint64_t to ) {
112144 auto path = insertedRequest.Path ;
113- insertedRequest.From = 0 ;
114- insertedRequest.To = 10_MB ;
145+ insertedRequest.From = from ;
146+ insertedRequest.To = to ;
115147 auto it = InflightRequests_.try_emplace (path, std::move (insertedRequest));
116148 Y_ABORT_UNLESS (it.second , " couldn't insert request for path: %s" , insertedRequest.RequestId .AsGuidString ().c_str ());
117149
@@ -135,6 +167,43 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
135167
136168 // Cutting file
137169
170+ TMaybe<TString> DecompressFile (const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
171+ try {
172+ NDB::ReadBufferFromString dataBuffer (data);
173+ auto decompressorBuffer = NYql::MakeDecompressor (dataBuffer, *DecompressionFormat_);
174+ if (!decompressorBuffer) {
175+ auto error = MakeError (
176+ request.Path ,
177+ NFq::TIssuesIds::INTERNAL_ERROR,
178+ TStringBuilder{} << " unknown compression: " << *DecompressionFormat_ << " . Use one of: gzip, zstd, lz4, brotli, bzip2, xz"
179+ );
180+ SendError (ctx, error);
181+ return {};
182+ }
183+
184+ TStringBuilder decompressedData;
185+ while (!decompressorBuffer->eof () && decompressedData.size () < 10_MB) {
186+ decompressorBuffer->nextIfAtEnd ();
187+ size_t maxDecompressedChunkSize = std::min (
188+ decompressorBuffer->available (),
189+ 10_MB - decompressedData.size ()
190+ );
191+ TString decompressedChunk{maxDecompressedChunkSize, ' ' };
192+ decompressorBuffer->read (&decompressedChunk.front (), maxDecompressedChunkSize);
193+ decompressedData << decompressedChunk;
194+ }
195+ return std::move (decompressedData);
196+ } catch (const yexception& error) {
197+ auto errorEv = MakeError (
198+ request.Path ,
199+ NFq::TIssuesIds::INTERNAL_ERROR,
200+ TStringBuilder{} << " couldn't decompress file, check compression params: " << error.what ()
201+ );
202+ SendError (ctx, errorEv);
203+ return {};
204+ }
205+ }
206+
138207 std::shared_ptr<arrow::io::RandomAccessFile> CleanupCsvFile (const TString& data, const TRequest& request, const arrow::csv::ParseOptions& options, const NActors::TActorContext& ctx) {
139208 auto chunker = arrow::csv::MakeChunker (options);
140209 std::shared_ptr<arrow::Buffer> whole, partial;
@@ -170,6 +239,58 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
170239 return std::make_shared<arrow::io::BufferReader>(std::move (whole));
171240 }
172241
242+ void HandleMetadataSizeRequest (const TString& data, TRequest request, const NActors::TActorContext& ctx) {
243+ uint32_t metadataSize = arrow::BitUtil::FromLittleEndian<uint32_t >(ReadUnaligned<uint32_t >(data.data ()));
244+
245+ if (metadataSize > 10_MB) {
246+ auto error = MakeError (
247+ request.Path ,
248+ NFq::TIssuesIds::INTERNAL_ERROR,
249+ TStringBuilder{} << " couldn't load parquet metadata, size is bigger than 10MB : " << metadataSize
250+ );
251+ SendError (ctx, error);
252+ return ;
253+ }
254+
255+ InflightRequests_.erase (request.Path );
256+
257+ TRequest localRequest{
258+ .Path = request.Path ,
259+ .RequestId = TGUID::Create (),
260+ .Requester = request.Requester ,
261+ .MetadataRequest = false ,
262+ };
263+ RequestPartialFile (std::move (localRequest), ctx, request.From - metadataSize, request.To + 4 );
264+ }
265+
266+ std::shared_ptr<arrow::io::RandomAccessFile> BuildParquetFileFromMetadata (const TString& data, const TRequest& request, const NActors::TActorContext& ctx) {
267+ auto arrowData = std::make_shared<arrow::Buffer>(nullptr , 0 );
268+ arrow::BufferBuilder builder;
269+ auto buildRes = builder.Append (data.data (), data.size ());
270+ if (!buildRes.ok ()) {
271+ auto error = MakeError (
272+ request.Path ,
273+ NFq::TIssuesIds::INTERNAL_ERROR,
274+ TStringBuilder{} << " couldn't read data from S3Fetcher: " << buildRes.ToString ()
275+ );
276+ SendError (ctx, error);
277+ return nullptr ;
278+ }
279+
280+ buildRes = builder.Finish (&arrowData);
281+ if (!buildRes.ok ()) {
282+ auto error = MakeError (
283+ request.Path ,
284+ NFq::TIssuesIds::INTERNAL_ERROR,
285+ TStringBuilder{} << " couldn't copy data from S3Fetcher: " << buildRes.ToString ()
286+ );
287+ SendError (ctx, error);
288+ return nullptr ;
289+ }
290+
291+ return std::make_shared<arrow::io::BufferReader>(std::move (arrowData));
292+ }
293+
173294 // Utility
174295 void SendError (const NActors::TActorContext& ctx, TEvFileError* error) {
175296 auto requestIt = InflightRequests_.find (error->Path );
@@ -183,10 +304,11 @@ class TArrowFileFetcher : public NActors::TActorBootstrapped<TArrowFileFetcher>
183304 // Fields
184305 NActors::TActorId S3FetcherId_;
185306 EFileFormat Format_;
307+ TMaybe<TString> DecompressionFormat_;
186308 std::unordered_map<TString, TRequest> InflightRequests_; // Path -> Request
187309};
188310
189- NActors::IActor* CreateArrowFetchingActor (NActors::TActorId s3FetcherId, EFileFormat format) {
190- return new TArrowFileFetcher{s3FetcherId, format};
311+ NActors::IActor* CreateArrowFetchingActor (NActors::TActorId s3FetcherId, EFileFormat format, const THashMap<TString, TString>& params ) {
312+ return new TArrowFileFetcher{s3FetcherId, format, params };
191313}
192314} // namespace NKikimr::NExternalSource::NObjectStorage::NInference
0 commit comments