Skip to content

Commit 7c81dc3

Browse files
authored
Merge dfdae95 into dca55a5
2 parents dca55a5 + dfdae95 commit 7c81dc3

File tree

16 files changed

+349
-31
lines changed

16 files changed

+349
-31
lines changed

ydb/library/yql/providers/s3/actors/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ IF (CLANG AND NOT WITH_VALGRIND)
5151

5252
SRCS(
5353
yql_arrow_column_converters.cpp
54+
yql_s3_decompressor_actor.cpp
5455
yql_s3_read_actor.cpp
5556
yql_s3_source_queue.cpp
5657
)
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#include <queue>
2+
3+
#include <ydb/library/actors/core/actor.h>
4+
#include <ydb/library/actors/core/actor_coroutine.h>
5+
#include <ydb/library/yql/providers/s3/compressors/factory.h>
6+
#include <ydb/library/yql/providers/s3/events/events.h>
7+
8+
#if defined(_linux_) || defined(_darwin_)
9+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
10+
#endif
11+
12+
namespace NYql::NDq {
13+
14+
using namespace ::NActors;
15+
16+
namespace {
17+
18+
class TS3DecompressorCoroImpl : public TActorCoroImpl {
19+
public:
20+
TS3DecompressorCoroImpl(const TActorId& parent, const TString& compression)
21+
: TActorCoroImpl(256_KB)
22+
, Compression(compression)
23+
, Parent(parent)
24+
{}
25+
26+
private:
27+
class TCoroReadBuffer : public NDB::ReadBuffer {
28+
public:
29+
TCoroReadBuffer(TS3DecompressorCoroImpl* coro)
30+
: NDB::ReadBuffer(nullptr, 0ULL)
31+
, Coro(coro)
32+
{ }
33+
private:
34+
bool nextImpl() final {
35+
while (!Coro->InputFinished || !Coro->Requests.empty()) {
36+
Coro->ProcessOneEvent();
37+
if (Coro->InputBuffer) {
38+
RawDataBuffer.swap(Coro->InputBuffer);
39+
Coro->InputBuffer.clear();
40+
auto rawData = const_cast<char*>(RawDataBuffer.data());
41+
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
42+
return true;
43+
}
44+
}
45+
return false;
46+
}
47+
TS3DecompressorCoroImpl *const Coro;
48+
TString RawDataBuffer;
49+
};
50+
51+
STRICT_STFUNC(StateFunc,
52+
hFunc(TEvS3Provider::TEvDecompressDataRequest, Handle);
53+
hFunc(NActors::TEvents::TEvPoison, Handle);
54+
)
55+
56+
void Handle(TEvS3Provider::TEvDecompressDataRequest::TPtr& ev) {
57+
Requests.push(std::move(ev->Release()));
58+
}
59+
60+
void Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
61+
if (ev->Cookie) {
62+
ythrow yexception() << "S3 decompressor actor abort";
63+
}
64+
InputFinished = true;
65+
}
66+
67+
void Run() final {
68+
try {
69+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
70+
NDB::ReadBuffer* buffer = coroBuffer.get();
71+
auto decompressorBuffer = MakeDecompressor(*buffer, Compression);
72+
YQL_ENSURE(decompressorBuffer, "Unsupported " << Compression << " compression.");
73+
while (!decompressorBuffer->eof()) {
74+
decompressorBuffer->nextIfAtEnd();
75+
TString data{decompressorBuffer->available(), ' '};
76+
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
77+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
78+
}
79+
} catch (const TDtorException&) {
80+
// Stop any activity instantly
81+
return;
82+
} catch (...) {
83+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
84+
}
85+
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
86+
}
87+
88+
void ProcessOneEvent() {
89+
if (!Requests.empty()) {
90+
ExtractDataPart(*Requests.front());
91+
Requests.pop();
92+
return;
93+
}
94+
TAutoPtr<::NActors::IEventHandle> ev(WaitForEvent().Release());
95+
StateFunc(ev);
96+
}
97+
98+
void ExtractDataPart(TEvS3Provider::TEvDecompressDataRequest& event) {
99+
InputBuffer = std::move(event.Data);
100+
}
101+
102+
private:
103+
TString InputBuffer;
104+
TString Compression;
105+
TActorId Parent;
106+
bool InputFinished = false;
107+
std::queue<THolder<TEvS3Provider::TEvDecompressDataRequest>> Requests;
108+
};
109+
110+
class TS3DecompressorCoroActor : public TActorCoro {
111+
public:
112+
TS3DecompressorCoroActor(THolder<TS3DecompressorCoroImpl> impl)
113+
: TActorCoro(THolder<TS3DecompressorCoroImpl>(impl.Release()))
114+
{}
115+
private:
116+
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
117+
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
118+
}
119+
};
120+
121+
}
122+
123+
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression) {
124+
return new TS3DecompressorCoroActor(MakeHolder<TS3DecompressorCoroImpl>(parent, compression));
125+
}
126+
127+
} // namespace NYql::NDq
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/actor.h>
4+
5+
namespace NYql::NDq {
6+
7+
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression);
8+
9+
} // namespace NYql::NDq

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#endif
3838

3939
#include "yql_arrow_column_converters.h"
40+
#include "yql_s3_decompressor_actor.h"
4041
#include "yql_s3_actors_util.h"
4142
#include "yql_s3_raw_read_actor.h"
4243
#include "yql_s3_read_actor.h"
@@ -362,22 +363,49 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
362363
TString RawDataBuffer;
363364
};
364365
366+
class TCoroDecompressorBuffer : public NDB::ReadBuffer {
367+
public:
368+
TCoroDecompressorBuffer(TS3ReadCoroImpl* coro)
369+
: NDB::ReadBuffer(nullptr, 0ULL)
370+
, Coro(coro)
371+
{ }
372+
private:
373+
bool nextImpl() final {
374+
while (!Coro->DecompressedInputFinished || !Coro->DeferredDecompressedDataParts.empty()) {
375+
Coro->CpuTime += Coro->GetCpuTimeDelta();
376+
Coro->ProcessOneEvent();
377+
Coro->StartCycleCount = GetCycleCountFast();
378+
auto decompressed = Coro->ExtractDecompressedDataPart();
379+
if (decompressed) {
380+
RawDataBuffer.swap(decompressed);
381+
auto rawData = const_cast<char*>(RawDataBuffer.data());
382+
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
383+
return true;
384+
} else if (Coro->InputBuffer) {
385+
Coro->Send(Coro->DecompressorActorId, new TEvS3Provider::TEvDecompressDataRequest(std::move(Coro->InputBuffer)));
386+
}
387+
}
388+
return false;
389+
}
390+
TS3ReadCoroImpl *const Coro;
391+
TString RawDataBuffer;
392+
};
393+
365394
void RunClickHouseParserOverHttp() {
366395
367396
LOG_CORO_D("RunClickHouseParserOverHttp");
368397
369-
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
398+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = AsyncDecompressing ? std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroDecompressorBuffer>(this)) : std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroReadBuffer>(this));
370399
std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
371400
NDB::ReadBuffer* buffer = coroBuffer.get();
372401
373402
// lz4 decompressor reads signature in ctor, w/o actual data it will be deadlocked
374403
DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize);
375404
376-
if (ReadSpec->Compression) {
405+
if (ReadSpec->Compression && !AsyncDecompressing) {
377406
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
378407
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
379408
buffer = decompressorBuffer.get();
380-
381409
}
382410
383411
auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(
@@ -411,11 +439,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
411439
412440
TString fileName = Url.substr(7) + Path;
413441
414-
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<NDB::ReadBufferFromFile>(fileName);
442+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = AsyncDecompressing ? std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroDecompressorBuffer>(this)) : std::unique_ptr<NDB::ReadBuffer>(std::make_unique<NDB::ReadBufferFromFile>(fileName));
415443
std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
416444
NDB::ReadBuffer* buffer = coroBuffer.get();
417445
418-
if (ReadSpec->Compression) {
446+
if (ReadSpec->Compression && !AsyncDecompressing) {
419447
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
420448
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
421449
buffer = decompressorBuffer.get();
@@ -836,6 +864,8 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
836864
hFunc(TEvS3Provider::TEvDownloadStart, Handle);
837865
hFunc(TEvS3Provider::TEvDownloadData, Handle);
838866
hFunc(TEvS3Provider::TEvDownloadFinish, Handle);
867+
hFunc(TEvS3Provider::TEvDecompressDataResult, Handle);
868+
hFunc(TEvS3Provider::TEvDecompressDataFinish, Handle);
839869
hFunc(TEvS3Provider::TEvContinue, Handle);
840870
hFunc(TEvS3Provider::TEvReadResult2, Handle);
841871
hFunc(NActors::TEvents::TEvPoison, Handle);
@@ -865,6 +895,18 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
865895
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
866896
}
867897

898+
TString ExtractDecompressedDataPart() {
899+
if (!DeferredDecompressedDataParts.empty()) {
900+
auto result = std::move(DeferredDecompressedDataParts.front());
901+
DeferredDecompressedDataParts.pop();
902+
if (result->Exception) {
903+
throw result->Exception;
904+
}
905+
return result->Data;
906+
}
907+
return {};
908+
}
909+
868910
void Handle(TEvS3Provider::TEvDownloadStart::TPtr& ev) {
869911
HttpResponseCode = ev->Get()->HttpResponseCode;
870912
CurlResponseCode = ev->Get()->CurlResponseCode;
@@ -894,6 +936,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
894936
}
895937
}
896938

939+
void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
940+
DeferredDecompressedDataParts.push(std::move(ev->Release()));
941+
}
942+
943+
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
944+
DecompressedInputFinished = true;
945+
}
946+
897947
void Handle(TEvS3Provider::TEvDownloadFinish::TPtr& ev) {
898948

899949
if (CurlResponseCode == CURLE_OK) {
@@ -929,6 +979,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
929979
// can't retry here: fail download
930980
RetryStuff->RetryState = nullptr;
931981
InputFinished = true;
982+
FinishDecompressor();
932983
LOG_CORO_W("ReadError: " << Issues.ToOneLineString() << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
933984
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
934985
}
@@ -947,6 +998,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
947998
} else {
948999
LOG_CORO_D("TEvDownloadFinish, LastOffset: " << LastOffset << ", Error: " << ServerReturnedError);
9491000
InputFinished = true;
1001+
FinishDecompressor();
9501002
if (ServerReturnedError) {
9511003
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
9521004
}
@@ -969,9 +1021,16 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
9691021
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
9701022
LOG_CORO_D("TEvPoison");
9711023
RetryStuff->Cancel();
1024+
FinishDecompressor(true);
9721025
throw TS3ReadAbort();
9731026
}
9741027

1028+
void FinishDecompressor(bool force = false) {
1029+
if (AsyncDecompressing) {
1030+
Send(DecompressorActorId, new NActors::TEvents::TEvPoison(), 0, force);
1031+
}
1032+
}
1033+
9751034
private:
9761035
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
9771036
public:
@@ -983,13 +1042,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
9831042
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
9841043
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
9851044
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps,
986-
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
1045+
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize,
1046+
bool asyncDecompressing)
9871047
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
9881048
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
9891049
PathIndex(pathIndex), Path(path), Url(url), RowsRemained(maxRows),
9901050
SourceContext(queueBufferCounter),
9911051
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
992-
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {
1052+
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize), AsyncDecompressing(asyncDecompressing) {
9931053
}
9941054

9951055
~TS3ReadCoroImpl() override {
@@ -1044,6 +1104,9 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
10441104
}
10451105

10461106
void Run() final {
1107+
if (AsyncDecompressing) {
1108+
DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression));
1109+
}
10471110

10481111
NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;
10491112

@@ -1154,12 +1217,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
11541217
const TString Url;
11551218

11561219
bool InputFinished = false;
1220+
bool DecompressedInputFinished = false;
11571221
long HttpResponseCode = 0L;
11581222
CURLcode CurlResponseCode = CURLE_OK;
11591223
bool ServerReturnedError = false;
11601224
TString ErrorText;
11611225
TIssues Issues;
11621226

1227+
NActors::TActorId DecompressorActorId;
11631228
std::size_t LastOffset = 0;
11641229
TString LastData;
11651230
ui64 IngressBytes = 0;
@@ -1170,11 +1235,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
11701235
std::optional<ui64> RowsRemained;
11711236
bool Paused = false;
11721237
std::queue<THolder<TEvS3Provider::TEvDownloadData>> DeferredDataParts;
1238+
std::queue<THolder<TEvS3Provider::TEvDecompressDataResult>> DeferredDecompressedDataParts;
11731239
TSourceContext::TPtr SourceContext;
11741240
const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
11751241
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
11761242
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
11771243
const ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize;
1244+
const bool AsyncDecompressing;
11781245
};
11791246

11801247
class TS3ReadCoroActor : public TActorCoro {
@@ -1217,7 +1284,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
12171284
ui64 fileQueueBatchSizeLimit,
12181285
ui64 fileQueueBatchObjectCountLimit,
12191286
ui64 fileQueueConsumersCountDelta,
1220-
bool asyncDecoding
1287+
bool asyncDecoding,
1288+
bool asyncDecompressing
12211289
) : ReadActorFactoryCfg(readActorFactoryCfg)
12221290
, Gateway(std::move(gateway))
12231291
, HolderFactory(holderFactory)
@@ -1243,7 +1311,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
12431311
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
12441312
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
12451313
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
1246-
, AsyncDecoding(asyncDecoding) {
1314+
, AsyncDecoding(asyncDecoding)
1315+
, AsyncDecompressing(asyncDecompressing) {
12471316
if (Counters) {
12481317
QueueDataSize = Counters->GetCounter("QueueDataSize");
12491318
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -1404,7 +1473,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
14041473
DeferredQueueSize,
14051474
HttpInflightSize,
14061475
HttpDataRps,
1407-
RawInflightSize
1476+
RawInflightSize,
1477+
AsyncDecompressing
14081478
);
14091479
if (AsyncDecoding) {
14101480
actorId = Register(new TS3ReadCoroActor(std::move(impl)));
@@ -1825,6 +1895,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
18251895
ui64 FileQueueBatchObjectCountLimit;
18261896
ui64 FileQueueConsumersCountDelta;
18271897
const bool AsyncDecoding;
1898+
const bool AsyncDecompressing;
18281899
bool IsCurrentBatchEmpty = false;
18291900
bool IsFileQueueEmpty = false;
18301901
bool IsWaitingFileQueueResponse = false;
@@ -2177,7 +2248,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
21772248
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
21782249
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
21792250
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
2180-
params.GetAsyncDecoding());
2251+
params.GetAsyncDecoding(), params.GetAsyncDecompressing());
21812252

21822253
return {actor, actor};
21832254
} else {

0 commit comments

Comments
 (0)