Skip to content

Commit ed69046

Browse files
authored
Merge 2ba2342 into d48cf4d
2 parents d48cf4d + 2ba2342 commit ed69046

File tree

9 files changed

+243
-11
lines changed

9 files changed

+243
-11
lines changed

ydb/library/yql/providers/s3/actors/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ IF (CLANG AND NOT WITH_VALGRIND)
5353

5454
SRCS(
5555
yql_arrow_column_converters.cpp
56+
yql_s3_decompressor_actor.cpp
5657
yql_s3_read_actor.cpp
5758
yql_s3_source_queue.cpp
5859
)
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#include <queue>
2+
3+
#include <ydb/library/actors/core/actor.h>
4+
#include <ydb/library/actors/core/actor_coroutine.h>
5+
#include <ydb/library/yql/providers/s3/compressors/factory.h>
6+
#include <ydb/library/yql/providers/s3/events/events.h>
7+
8+
#if defined(_linux_) || defined(_darwin_)
9+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
10+
#endif
11+
12+
namespace NYql::NDq {
13+
14+
using namespace ::NActors;
15+
16+
namespace {
17+
18+
class TS3DecompressorCoroImpl : public TActorCoroImpl {
19+
public:
20+
TS3DecompressorCoroImpl(const TActorId& parent, const TString& compression)
21+
: TActorCoroImpl(256_KB)
22+
, Compression(compression)
23+
, Parent(parent)
24+
{}
25+
26+
private:
27+
class TCoroReadBuffer : public NDB::ReadBuffer {
28+
public:
29+
TCoroReadBuffer(TS3DecompressorCoroImpl* coro)
30+
: NDB::ReadBuffer(nullptr, 0ULL)
31+
, Coro(coro)
32+
{ }
33+
private:
34+
bool nextImpl() final {
35+
while (!Coro->InputFinished || !Coro->Requests.empty()) {
36+
Coro->ProcessOneEvent();
37+
if (Coro->InputBuffer) {
38+
RawDataBuffer.swap(Coro->InputBuffer);
39+
Coro->InputBuffer.clear();
40+
auto rawData = const_cast<char*>(RawDataBuffer.data());
41+
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
42+
return true;
43+
}
44+
}
45+
return false;
46+
}
47+
TS3DecompressorCoroImpl *const Coro;
48+
TString RawDataBuffer;
49+
};
50+
51+
STRICT_STFUNC(StateFunc,
52+
hFunc(TEvS3Provider::TEvDecompressDataRequest, Handle);
53+
hFunc(NActors::TEvents::TEvPoison, Handle);
54+
)
55+
56+
void Handle(TEvS3Provider::TEvDecompressDataRequest::TPtr& ev) {
57+
Requests.push(std::move(ev->Release()));
58+
}
59+
60+
void Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
61+
if (ev->Cookie) {
62+
ythrow yexception() << "S3 decompressor actor abort";
63+
}
64+
InputFinished = true;
65+
}
66+
67+
void Run() final {
68+
try {
69+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
70+
NDB::ReadBuffer* buffer = coroBuffer.get();
71+
auto decompressorBuffer = MakeDecompressor(*buffer, Compression);
72+
YQL_ENSURE(decompressorBuffer, "Unsupported " << Compression << " compression.");
73+
while (!decompressorBuffer->eof()) {
74+
decompressorBuffer->nextIfAtEnd();
75+
TString data{decompressorBuffer->available(), ' '};
76+
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
77+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
78+
}
79+
} catch (const TDtorException&) {
80+
// Stop any activity instantly
81+
return;
82+
} catch (...) {
83+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
84+
}
85+
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
86+
}
87+
88+
void ProcessOneEvent() {
89+
if (!Requests.empty()) {
90+
ExtractDataPart(*Requests.front());
91+
Requests.pop();
92+
return;
93+
}
94+
TAutoPtr<::NActors::IEventHandle> ev(WaitForEvent().Release());
95+
StateFunc(ev);
96+
}
97+
98+
void ExtractDataPart(TEvS3Provider::TEvDecompressDataRequest& event) {
99+
InputBuffer = std::move(event.Data);
100+
}
101+
102+
private:
103+
TString InputBuffer;
104+
TString Compression;
105+
TActorId Parent;
106+
bool InputFinished = false;
107+
std::queue<THolder<TEvS3Provider::TEvDecompressDataRequest>> Requests;
108+
};
109+
110+
class TS3DecompressorCoroActor : public TActorCoro {
111+
public:
112+
TS3DecompressorCoroActor(THolder<TS3DecompressorCoroImpl> impl)
113+
: TActorCoro(THolder<TS3DecompressorCoroImpl>(impl.Release()))
114+
{}
115+
private:
116+
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
117+
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
118+
}
119+
};
120+
121+
}
122+
123+
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression) {
124+
return new TS3DecompressorCoroActor(MakeHolder<TS3DecompressorCoroImpl>(parent, compression));
125+
}
126+
127+
} // namespace NYql::NDq
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/actor.h>
4+
5+
namespace NYql::NDq {
6+
7+
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression);
8+
9+
} // namespace NYql::NDq

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
#include "yql_arrow_column_converters.h"
4040
#include "yql_arrow_push_down.h"
41+
#include "yql_s3_decompressor_actor.h"
4142
#include "yql_s3_actors_util.h"
4243
#include "yql_s3_raw_read_actor.h"
4344
#include "yql_s3_read_actor.h"
@@ -364,22 +365,49 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
364365
TString RawDataBuffer;
365366
};
366367
368+
class TCoroDecompressorBuffer : public NDB::ReadBuffer {
369+
public:
370+
TCoroDecompressorBuffer(TS3ReadCoroImpl* coro)
371+
: NDB::ReadBuffer(nullptr, 0ULL)
372+
, Coro(coro)
373+
{ }
374+
private:
375+
bool nextImpl() final {
376+
while (!Coro->DecompressedInputFinished || !Coro->DeferredDecompressedDataParts.empty()) {
377+
Coro->CpuTime += Coro->GetCpuTimeDelta();
378+
Coro->ProcessOneEvent();
379+
Coro->StartCycleCount = GetCycleCountFast();
380+
auto decompressed = Coro->ExtractDecompressedDataPart();
381+
if (decompressed) {
382+
RawDataBuffer.swap(decompressed);
383+
auto rawData = const_cast<char*>(RawDataBuffer.data());
384+
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
385+
return true;
386+
} else if (Coro->InputBuffer) {
387+
Coro->Send(Coro->DecompressorActorId, new TEvS3Provider::TEvDecompressDataRequest(std::move(Coro->InputBuffer)));
388+
}
389+
}
390+
return false;
391+
}
392+
TS3ReadCoroImpl *const Coro;
393+
TString RawDataBuffer;
394+
};
395+
367396
void RunClickHouseParserOverHttp() {
368397
369398
LOG_CORO_D("RunClickHouseParserOverHttp");
370399
371-
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
400+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = AsyncDecompressing ? std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroDecompressorBuffer>(this)) : std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroReadBuffer>(this));
372401
std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
373402
NDB::ReadBuffer* buffer = coroBuffer.get();
374403
375404
// lz4 decompressor reads signature in ctor, w/o actual data it will be deadlocked
376405
DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize);
377406
378-
if (ReadSpec->Compression) {
407+
if (ReadSpec->Compression && !AsyncDecompressing) {
379408
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
380409
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
381410
buffer = decompressorBuffer.get();
382-
383411
}
384412
385413
auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(
@@ -413,11 +441,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
413441
414442
TString fileName = Url.substr(7) + Path;
415443
416-
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<NDB::ReadBufferFromFile>(fileName);
444+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = AsyncDecompressing ? std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroDecompressorBuffer>(this)) : std::unique_ptr<NDB::ReadBuffer>(std::make_unique<NDB::ReadBufferFromFile>(fileName));
417445
std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
418446
NDB::ReadBuffer* buffer = coroBuffer.get();
419447
420-
if (ReadSpec->Compression) {
448+
if (ReadSpec->Compression && !AsyncDecompressing) {
421449
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
422450
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
423451
buffer = decompressorBuffer.get();
@@ -840,6 +868,8 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
840868
hFunc(TEvS3Provider::TEvDownloadStart, Handle);
841869
hFunc(TEvS3Provider::TEvDownloadData, Handle);
842870
hFunc(TEvS3Provider::TEvDownloadFinish, Handle);
871+
hFunc(TEvS3Provider::TEvDecompressDataResult, Handle);
872+
hFunc(TEvS3Provider::TEvDecompressDataFinish, Handle);
843873
hFunc(TEvS3Provider::TEvContinue, Handle);
844874
hFunc(TEvS3Provider::TEvReadResult2, Handle);
845875
hFunc(NActors::TEvents::TEvPoison, Handle);
@@ -869,6 +899,18 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
869899
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
870900
}
871901

902+
TString ExtractDecompressedDataPart() {
903+
if (!DeferredDecompressedDataParts.empty()) {
904+
auto result = std::move(DeferredDecompressedDataParts.front());
905+
DeferredDecompressedDataParts.pop();
906+
if (result->Exception) {
907+
throw result->Exception;
908+
}
909+
return result->Data;
910+
}
911+
return {};
912+
}
913+
872914
void Handle(TEvS3Provider::TEvDownloadStart::TPtr& ev) {
873915
HttpResponseCode = ev->Get()->HttpResponseCode;
874916
CurlResponseCode = ev->Get()->CurlResponseCode;
@@ -898,6 +940,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
898940
}
899941
}
900942

943+
void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
944+
DeferredDecompressedDataParts.push(std::move(ev->Release()));
945+
}
946+
947+
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) {
948+
DecompressedInputFinished = true;
949+
}
950+
901951
void Handle(TEvS3Provider::TEvDownloadFinish::TPtr& ev) {
902952

903953
if (CurlResponseCode == CURLE_OK) {
@@ -933,6 +983,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
933983
// can't retry here: fail download
934984
RetryStuff->RetryState = nullptr;
935985
InputFinished = true;
986+
FinishDecompressor();
936987
LOG_CORO_W("ReadError: " << Issues.ToOneLineString() << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
937988
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
938989
}
@@ -951,6 +1002,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
9511002
} else {
9521003
LOG_CORO_D("TEvDownloadFinish, LastOffset: " << LastOffset << ", Error: " << ServerReturnedError);
9531004
InputFinished = true;
1005+
FinishDecompressor();
9541006
if (ServerReturnedError) {
9551007
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
9561008
}
@@ -973,9 +1025,16 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
9731025
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
9741026
LOG_CORO_D("TEvPoison");
9751027
RetryStuff->Cancel();
1028+
FinishDecompressor(true);
9761029
throw TS3ReadAbort();
9771030
}
9781031

1032+
void FinishDecompressor(bool force = false) {
1033+
if (AsyncDecompressing) {
1034+
Send(DecompressorActorId, new NActors::TEvents::TEvPoison(), 0, force);
1035+
}
1036+
}
1037+
9791038
private:
9801039
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
9811040
public:
@@ -987,13 +1046,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
9871046
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
9881047
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
9891048
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps,
990-
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
1049+
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize,
1050+
bool asyncDecompressing)
9911051
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
9921052
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
9931053
PathIndex(pathIndex), Path(path), Url(url), RowsRemained(maxRows),
9941054
SourceContext(queueBufferCounter),
9951055
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
996-
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {
1056+
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize), AsyncDecompressing(asyncDecompressing) {
9971057
}
9981058

9991059
~TS3ReadCoroImpl() override {
@@ -1048,6 +1108,9 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
10481108
}
10491109

10501110
void Run() final {
1111+
if (AsyncDecompressing) {
1112+
DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression));
1113+
}
10511114

10521115
NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
10531116

@@ -1158,12 +1221,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
11581221
const TString Url;
11591222

11601223
bool InputFinished = false;
1224+
bool DecompressedInputFinished = false;
11611225
long HttpResponseCode = 0L;
11621226
CURLcode CurlResponseCode = CURLE_OK;
11631227
bool ServerReturnedError = false;
11641228
TString ErrorText;
11651229
TIssues Issues;
11661230

1231+
NActors::TActorId DecompressorActorId;
11671232
std::size_t LastOffset = 0;
11681233
TString LastData;
11691234
ui64 IngressBytes = 0;
@@ -1174,11 +1239,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
11741239
std::optional<ui64> RowsRemained;
11751240
bool Paused = false;
11761241
std::queue<THolder<TEvS3Provider::TEvDownloadData>> DeferredDataParts;
1242+
std::queue<THolder<TEvS3Provider::TEvDecompressDataResult>> DeferredDecompressedDataParts;
11771243
TSourceContext::TPtr SourceContext;
11781244
const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
11791245
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
11801246
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
11811247
const ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize;
1248+
const bool AsyncDecompressing;
11821249
};
11831250

11841251
class TS3ReadCoroActor : public TActorCoro {
@@ -1221,7 +1288,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
12211288
ui64 fileQueueBatchSizeLimit,
12221289
ui64 fileQueueBatchObjectCountLimit,
12231290
ui64 fileQueueConsumersCountDelta,
1224-
bool asyncDecoding
1291+
bool asyncDecoding,
1292+
bool asyncDecompressing
12251293
) : ReadActorFactoryCfg(readActorFactoryCfg)
12261294
, Gateway(std::move(gateway))
12271295
, HolderFactory(holderFactory)
@@ -1247,7 +1315,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
12471315
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
12481316
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
12491317
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
1250-
, AsyncDecoding(asyncDecoding) {
1318+
, AsyncDecoding(asyncDecoding)
1319+
, AsyncDecompressing(asyncDecompressing) {
12511320
if (Counters) {
12521321
QueueDataSize = Counters->GetCounter("QueueDataSize");
12531322
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -1408,7 +1477,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
14081477
DeferredQueueSize,
14091478
HttpInflightSize,
14101479
HttpDataRps,
1411-
RawInflightSize
1480+
RawInflightSize,
1481+
AsyncDecompressing
14121482
);
14131483
if (AsyncDecoding) {
14141484
actorId = Register(new TS3ReadCoroActor(std::move(impl)));
@@ -1829,6 +1899,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
18291899
ui64 FileQueueBatchObjectCountLimit;
18301900
ui64 FileQueueConsumersCountDelta;
18311901
const bool AsyncDecoding;
1902+
const bool AsyncDecompressing;
18321903
bool IsCurrentBatchEmpty = false;
18331904
bool IsFileQueueEmpty = false;
18341905
bool IsWaitingFileQueueResponse = false;
@@ -2182,7 +2253,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
21822253
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
21832254
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
21842255
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
2185-
params.GetAsyncDecoding());
2256+
params.GetAsyncDecoding(), params.GetAsyncDecompressing());
21862257

21872258
return {actor, actor};
21882259
} else {

0 commit comments

Comments
 (0)