Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/library/yql/providers/s3/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ IF (CLANG AND NOT WITH_VALGRIND)

SRCS(
yql_arrow_column_converters.cpp
yql_s3_decompressor_actor.cpp
yql_s3_read_actor.cpp
yql_s3_source_queue.cpp
)
Expand Down
127 changes: 127 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#include <queue>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/actor_coroutine.h>
#include <ydb/library/yql/providers/s3/compressors/factory.h>
#include <ydb/library/yql/providers/s3/events/events.h>

#if defined(_linux_) || defined(_darwin_)
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
#endif

namespace NYql::NDq {

using namespace ::NActors;

namespace {

class TS3DecompressorCoroImpl : public TActorCoroImpl {
public:
TS3DecompressorCoroImpl(const TActorId& parent, const TString& compression)
: TActorCoroImpl(256_KB)
, Compression(compression)
, Parent(parent)
{}

private:
class TCoroReadBuffer : public NDB::ReadBuffer {
public:
TCoroReadBuffer(TS3DecompressorCoroImpl* coro)
: NDB::ReadBuffer(nullptr, 0ULL)
, Coro(coro)
{ }
private:
bool nextImpl() final {
while (!Coro->InputFinished || !Coro->Requests.empty()) {
Coro->ProcessOneEvent();
if (Coro->InputBuffer) {
RawDataBuffer.swap(Coro->InputBuffer);
Coro->InputBuffer.clear();
auto rawData = const_cast<char*>(RawDataBuffer.data());
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
return true;
}
}
return false;
}
TS3DecompressorCoroImpl *const Coro;
TString RawDataBuffer;
};

STRICT_STFUNC(StateFunc,
hFunc(TEvS3Provider::TEvDecompressDataRequest, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
)

void Handle(TEvS3Provider::TEvDecompressDataRequest::TPtr& ev) {
Requests.push(std::move(ev->Release()));
}

void Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
if (ev->Cookie) {
ythrow yexception() << "S3 decompressor actor abort";
}
InputFinished = true;
}

void Run() final {
try {
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
NDB::ReadBuffer* buffer = coroBuffer.get();
auto decompressorBuffer = MakeDecompressor(*buffer, Compression);
YQL_ENSURE(decompressorBuffer, "Unsupported " << Compression << " compression.");
while (!decompressorBuffer->eof()) {
decompressorBuffer->nextIfAtEnd();
TString data{decompressorBuffer->available(), ' '};
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
}
} catch (const TDtorException&) {
// Stop any activity instantly
return;
} catch (...) {
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
}
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
}

void ProcessOneEvent() {
if (!Requests.empty()) {
ExtractDataPart(*Requests.front());
Requests.pop();
return;
}
TAutoPtr<::NActors::IEventHandle> ev(WaitForEvent().Release());
StateFunc(ev);
}

void ExtractDataPart(TEvS3Provider::TEvDecompressDataRequest& event) {
InputBuffer = std::move(event.Data);
}

private:
TString InputBuffer;
TString Compression;
TActorId Parent;
bool InputFinished = false;
std::queue<THolder<TEvS3Provider::TEvDecompressDataRequest>> Requests;
};

class TS3DecompressorCoroActor : public TActorCoro {
public:
TS3DecompressorCoroActor(THolder<TS3DecompressorCoroImpl> impl)
: TActorCoro(THolder<TS3DecompressorCoroImpl>(impl.Release()))
{}
private:
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
}
};

}

NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression) {
return new TS3DecompressorCoroActor(MakeHolder<TS3DecompressorCoroImpl>(parent, compression));
}

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#pragma once

#include <ydb/library/actors/core/actor.h>

namespace NYql::NDq {

NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression);

} // namespace NYql::NDq
93 changes: 82 additions & 11 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#endif

#include "yql_arrow_column_converters.h"
#include "yql_s3_decompressor_actor.h"
#include "yql_s3_actors_util.h"
#include "yql_s3_raw_read_actor.h"
#include "yql_s3_read_actor.h"
Expand Down Expand Up @@ -362,22 +363,49 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
TString RawDataBuffer;
};

class TCoroDecompressorBuffer : public NDB::ReadBuffer {
public:
TCoroDecompressorBuffer(TS3ReadCoroImpl* coro)
: NDB::ReadBuffer(nullptr, 0ULL)
, Coro(coro)
{ }
private:
bool nextImpl() final {
while (!Coro->DecompressedInputFinished || !Coro->DeferredDecompressedDataParts.empty()) {
Coro->CpuTime += Coro->GetCpuTimeDelta();
Coro->ProcessOneEvent();
Coro->StartCycleCount = GetCycleCountFast();
auto decompressed = Coro->ExtractDecompressedDataPart();
if (decompressed) {
RawDataBuffer.swap(decompressed);
auto rawData = const_cast<char*>(RawDataBuffer.data());
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
return true;
} else if (Coro->InputBuffer) {
Coro->Send(Coro->DecompressorActorId, new TEvS3Provider::TEvDecompressDataRequest(std::move(Coro->InputBuffer)));
}
}
return false;
}
TS3ReadCoroImpl *const Coro;
TString RawDataBuffer;
};

void RunClickHouseParserOverHttp() {

LOG_CORO_D("RunClickHouseParserOverHttp");

std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
std::unique_ptr<NDB::ReadBuffer> coroBuffer = AsyncDecompressing ? std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroDecompressorBuffer>(this)) : std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroReadBuffer>(this));
std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
NDB::ReadBuffer* buffer = coroBuffer.get();

// lz4 decompressor reads signature in ctor, w/o actual data it will be deadlocked
DownloadStart(RetryStuff, GetActorSystem(), SelfActorId, ParentActorId, PathIndex, HttpInflightSize);

if (ReadSpec->Compression) {
if (ReadSpec->Compression && !AsyncDecompressing) {
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
buffer = decompressorBuffer.get();

}

auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(
Expand Down Expand Up @@ -411,11 +439,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

TString fileName = Url.substr(7) + Path;

std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<NDB::ReadBufferFromFile>(fileName);
std::unique_ptr<NDB::ReadBuffer> coroBuffer = AsyncDecompressing ? std::unique_ptr<NDB::ReadBuffer>(std::make_unique<TCoroDecompressorBuffer>(this)) : std::unique_ptr<NDB::ReadBuffer>(std::make_unique<NDB::ReadBufferFromFile>(fileName));
std::unique_ptr<NDB::ReadBuffer> decompressorBuffer;
NDB::ReadBuffer* buffer = coroBuffer.get();

if (ReadSpec->Compression) {
if (ReadSpec->Compression && !AsyncDecompressing) {
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
buffer = decompressorBuffer.get();
Expand Down Expand Up @@ -836,6 +864,8 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
hFunc(TEvS3Provider::TEvDownloadStart, Handle);
hFunc(TEvS3Provider::TEvDownloadData, Handle);
hFunc(TEvS3Provider::TEvDownloadFinish, Handle);
hFunc(TEvS3Provider::TEvDecompressDataResult, Handle);
hFunc(TEvS3Provider::TEvDecompressDataFinish, Handle);
hFunc(TEvS3Provider::TEvContinue, Handle);
hFunc(TEvS3Provider::TEvReadResult2, Handle);
hFunc(NActors::TEvents::TEvPoison, Handle);
Expand Down Expand Up @@ -865,6 +895,18 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
}

TString ExtractDecompressedDataPart() {
if (!DeferredDecompressedDataParts.empty()) {
auto result = std::move(DeferredDecompressedDataParts.front());
DeferredDecompressedDataParts.pop();
if (result->Exception) {
throw result->Exception;
}
return result->Data;
}
return {};
}

void Handle(TEvS3Provider::TEvDownloadStart::TPtr& ev) {
HttpResponseCode = ev->Get()->HttpResponseCode;
CurlResponseCode = ev->Get()->CurlResponseCode;
Expand Down Expand Up @@ -894,6 +936,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}
}

void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
DeferredDecompressedDataParts.push(std::move(ev->Release()));
}

void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
DecompressedInputFinished = true;
}

void Handle(TEvS3Provider::TEvDownloadFinish::TPtr& ev) {

if (CurlResponseCode == CURLE_OK) {
Expand Down Expand Up @@ -929,6 +979,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
// can't retry here: fail download
RetryStuff->RetryState = nullptr;
InputFinished = true;
FinishDecompressor();
LOG_CORO_W("ReadError: " << Issues.ToOneLineString() << ", LastOffset: " << LastOffset << ", LastData: " << GetLastDataAsText());
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
}
Expand All @@ -947,6 +998,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
} else {
LOG_CORO_D("TEvDownloadFinish, LastOffset: " << LastOffset << ", Error: " << ServerReturnedError);
InputFinished = true;
FinishDecompressor();
if (ServerReturnedError) {
throw TS3ReadError(); // Don't pass control to data parsing, because it may validate eof and show wrong issues about incorrect data format
}
Expand All @@ -969,9 +1021,16 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
void Handle(NActors::TEvents::TEvPoison::TPtr&) {
LOG_CORO_D("TEvPoison");
RetryStuff->Cancel();
FinishDecompressor(true);
throw TS3ReadAbort();
}

void FinishDecompressor(bool force = false) {
if (AsyncDecompressing) {
Send(DecompressorActorId, new NActors::TEvents::TEvPoison(), 0, force);
}
}

private:
static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
public:
Expand All @@ -983,13 +1042,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
const ::NMonitoring::TDynamicCounters::TCounterPtr& deferredQueueSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpInflightSize,
const ::NMonitoring::TDynamicCounters::TCounterPtr& httpDataRps,
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize)
const ::NMonitoring::TDynamicCounters::TCounterPtr& rawInflightSize,
bool asyncDecompressing)
: TActorCoroImpl(256_KB), ReadActorFactoryCfg(readActorFactoryCfg), InputIndex(inputIndex),
TxId(txId), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId),
PathIndex(pathIndex), Path(path), Url(url), RowsRemained(maxRows),
SourceContext(queueBufferCounter),
DeferredQueueSize(deferredQueueSize), HttpInflightSize(httpInflightSize),
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize) {
HttpDataRps(httpDataRps), RawInflightSize(rawInflightSize), AsyncDecompressing(asyncDecompressing) {
}

~TS3ReadCoroImpl() override {
Expand Down Expand Up @@ -1044,6 +1104,9 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}

void Run() final {
if (AsyncDecompressing) {
DecompressorActorId = Register(CreateS3DecompressorActor(SelfActorId, ReadSpec->Compression));
}

NYql::NDqProto::StatusIds::StatusCode fatalCode = NYql::NDqProto::StatusIds::EXTERNAL_ERROR;

Expand Down Expand Up @@ -1154,12 +1217,14 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
const TString Url;

bool InputFinished = false;
bool DecompressedInputFinished = false;
long HttpResponseCode = 0L;
CURLcode CurlResponseCode = CURLE_OK;
bool ServerReturnedError = false;
TString ErrorText;
TIssues Issues;

NActors::TActorId DecompressorActorId;
std::size_t LastOffset = 0;
TString LastData;
ui64 IngressBytes = 0;
Expand All @@ -1170,11 +1235,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
std::optional<ui64> RowsRemained;
bool Paused = false;
std::queue<THolder<TEvS3Provider::TEvDownloadData>> DeferredDataParts;
std::queue<THolder<TEvS3Provider::TEvDecompressDataResult>> DeferredDecompressedDataParts;
TSourceContext::TPtr SourceContext;
const ::NMonitoring::TDynamicCounters::TCounterPtr DeferredQueueSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpInflightSize;
const ::NMonitoring::TDynamicCounters::TCounterPtr HttpDataRps;
const ::NMonitoring::TDynamicCounters::TCounterPtr RawInflightSize;
const bool AsyncDecompressing;
};

class TS3ReadCoroActor : public TActorCoro {
Expand Down Expand Up @@ -1217,7 +1284,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
ui64 fileQueueBatchSizeLimit,
ui64 fileQueueBatchObjectCountLimit,
ui64 fileQueueConsumersCountDelta,
bool asyncDecoding
bool asyncDecoding,
bool asyncDecompressing
) : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
Expand All @@ -1243,7 +1311,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
, FileQueueBatchObjectCountLimit(fileQueueBatchObjectCountLimit)
, FileQueueConsumersCountDelta(fileQueueConsumersCountDelta)
, AsyncDecoding(asyncDecoding) {
, AsyncDecoding(asyncDecoding)
, AsyncDecompressing(asyncDecompressing) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
Expand Down Expand Up @@ -1404,7 +1473,8 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
DeferredQueueSize,
HttpInflightSize,
HttpDataRps,
RawInflightSize
RawInflightSize,
AsyncDecompressing
);
if (AsyncDecoding) {
actorId = Register(new TS3ReadCoroActor(std::move(impl)));
Expand Down Expand Up @@ -1825,6 +1895,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
ui64 FileQueueBatchObjectCountLimit;
ui64 FileQueueConsumersCountDelta;
const bool AsyncDecoding;
const bool AsyncDecompressing;
bool IsCurrentBatchEmpty = false;
bool IsFileQueueEmpty = false;
bool IsWaitingFileQueueResponse = false;
Expand Down Expand Up @@ -2177,7 +2248,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
params.GetAsyncDecoding());
params.GetAsyncDecoding(), params.GetAsyncDecompressing());

return {actor, actor};
} else {
Expand Down
Loading