Skip to content

Commit 9210980

Browse files
authored
AsyncDecompressing + lz4 fix (#6889)
1 parent 0af35b6 commit 9210980

File tree

18 files changed

+356
-36
lines changed

18 files changed

+356
-36
lines changed

ydb/core/external_sources/hive_metastore/ut/common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ TString Exec(const TString& cmd) {
2525

2626
TString GetExternalPort(const TString& service, const TString& port) {
2727
auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose");
28-
auto composeFileYml = ArcadiaSourceRoot() + "/ydb/core/external_sources/hive_metastore/ut/docker-compose.yml";
28+
auto composeFileYml = ArcadiaFromCurrentLocation(__SOURCE_FILE__, "docker-compose.yml");
2929
auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList<TString>();
3030
return result ? Strip(result.back()) : TString{};
3131
}

ydb/core/external_sources/s3/ut/s3_aws_credentials_ut.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
12
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
23
#include <ydb/core/kqp/ut/federated_query/common/common.h>
3-
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
4+
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
45
#include <ydb/library/yql/utils/log/log.h>
56
#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
67
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
78
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
8-
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
99
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
10+
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
1011

1112
#include <library/cpp/testing/unittest/registar.h>
1213

@@ -38,15 +39,16 @@ TString Exec(const TString& cmd) {
3839

3940
TString GetExternalPort(const TString& service, const TString& port) {
4041
auto dockerComposeBin = BinaryPath("library/recipes/docker_compose/bin/docker-compose");
41-
auto composeFileYml = ArcadiaSourceRoot() + "/ydb/core/external_sources/s3/ut/docker-compose.yml";
42+
auto composeFileYml = ArcadiaFromCurrentLocation(__SOURCE_FILE__, "docker-compose.yml");
4243
auto result = StringSplitter(Exec(dockerComposeBin + " -f " + composeFileYml + " port " + service + " " + port)).Split(':').ToList<TString>();
4344
return result ? Strip(result.back()) : TString{};
4445
}
4546

4647
Y_UNIT_TEST_SUITE(S3AwsCredentials) {
4748
Y_UNIT_TEST(ExecuteScriptWithEqSymbol) {
4849
const TString externalDataSourceName = "/Root/external_data_source";
49-
auto kikimr = MakeKikimrRunner(true);
50+
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
51+
auto kikimr = MakeKikimrRunner(true, nullptr, nullptr, std::nullopt, s3ActorsFactory);
5052
auto tc = kikimr->GetTableClient();
5153
auto session = tc.CreateSession().GetValueSync().GetSession();
5254
const TString query = fmt::format(R"(

ydb/library/yql/providers/s3/actors/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ IF (CLANG AND NOT WITH_VALGRIND)
5353

5454
SRCS(
5555
yql_arrow_column_converters.cpp
56+
yql_s3_decompressor_actor.cpp
5657
yql_s3_read_actor.cpp
5758
yql_s3_source_queue.cpp
5859
)
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
#include <queue>
2+
3+
#include <ydb/library/actors/core/actor.h>
4+
#include <ydb/library/actors/core/actor_coroutine.h>
5+
#include <ydb/library/yql/providers/s3/compressors/factory.h>
6+
#include <ydb/library/yql/providers/s3/events/events.h>
7+
8+
#if defined(_linux_) || defined(_darwin_)
9+
#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
10+
#endif
11+
12+
namespace NYql::NDq {
13+
14+
using namespace ::NActors;
15+
16+
namespace {
17+
18+
class TS3DecompressorCoroImpl : public TActorCoroImpl {
19+
public:
20+
TS3DecompressorCoroImpl(const TActorId& parent, const TString& compression)
21+
: TActorCoroImpl(256_KB)
22+
, Compression(compression)
23+
, Parent(parent)
24+
{}
25+
26+
private:
27+
class TCoroReadBuffer : public NDB::ReadBuffer {
28+
public:
29+
TCoroReadBuffer(TS3DecompressorCoroImpl* coro)
30+
: NDB::ReadBuffer(nullptr, 0ULL)
31+
, Coro(coro)
32+
{ }
33+
private:
34+
bool nextImpl() final {
35+
while (!Coro->InputFinished || !Coro->Requests.empty()) {
36+
Coro->ProcessOneEvent();
37+
if (Coro->InputBuffer) {
38+
RawDataBuffer.swap(Coro->InputBuffer);
39+
Coro->InputBuffer.clear();
40+
auto rawData = const_cast<char*>(RawDataBuffer.data());
41+
working_buffer = NDB::BufferBase::Buffer(rawData, rawData + RawDataBuffer.size());
42+
return true;
43+
}
44+
}
45+
return false;
46+
}
47+
TS3DecompressorCoroImpl *const Coro;
48+
TString RawDataBuffer;
49+
};
50+
51+
STRICT_STFUNC(StateFunc,
52+
hFunc(TEvS3Provider::TEvDecompressDataRequest, Handle);
53+
hFunc(NActors::TEvents::TEvPoison, Handle);
54+
)
55+
56+
void Handle(TEvS3Provider::TEvDecompressDataRequest::TPtr& ev) {
57+
Requests.push(std::move(ev->Release()));
58+
}
59+
60+
void Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
61+
if (ev->Cookie) {
62+
ythrow yexception() << "S3 decompressor actor abort";
63+
}
64+
InputFinished = true;
65+
}
66+
67+
void Run() final {
68+
try {
69+
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
70+
NDB::ReadBuffer* buffer = coroBuffer.get();
71+
auto decompressorBuffer = MakeDecompressor(*buffer, Compression);
72+
YQL_ENSURE(decompressorBuffer, "Unsupported " << Compression << " compression.");
73+
while (!decompressorBuffer->eof()) {
74+
decompressorBuffer->nextIfAtEnd();
75+
TString data{decompressorBuffer->available(), ' '};
76+
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
77+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
78+
}
79+
} catch (const TDtorException&) {
80+
// Stop any activity instantly
81+
return;
82+
} catch (...) {
83+
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
84+
}
85+
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
86+
}
87+
88+
void ProcessOneEvent() {
89+
if (!Requests.empty()) {
90+
ExtractDataPart(*Requests.front());
91+
Requests.pop();
92+
return;
93+
}
94+
TAutoPtr<::NActors::IEventHandle> ev(WaitForEvent().Release());
95+
StateFunc(ev);
96+
}
97+
98+
void ExtractDataPart(TEvS3Provider::TEvDecompressDataRequest& event) {
99+
InputBuffer = std::move(event.Data);
100+
}
101+
102+
private:
103+
TString InputBuffer;
104+
TString Compression;
105+
TActorId Parent;
106+
bool InputFinished = false;
107+
std::queue<THolder<TEvS3Provider::TEvDecompressDataRequest>> Requests;
108+
};
109+
110+
class TS3DecompressorCoroActor : public TActorCoro {
111+
public:
112+
TS3DecompressorCoroActor(THolder<TS3DecompressorCoroImpl> impl)
113+
: TActorCoro(THolder<TS3DecompressorCoroImpl>(impl.Release()))
114+
{}
115+
private:
116+
void Registered(TActorSystem* actorSystem, const TActorId& parent) override {
117+
TActorCoro::Registered(actorSystem, parent); // Calls TActorCoro::OnRegister and sends bootstrap event to ourself.
118+
}
119+
};
120+
121+
}
122+
123+
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression) {
124+
return new TS3DecompressorCoroActor(MakeHolder<TS3DecompressorCoroImpl>(parent, compression));
125+
}
126+
127+
} // namespace NYql::NDq
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/actor.h>
4+
5+
namespace NYql::NDq {
6+
7+
NActors::IActor* CreateS3DecompressorActor(const NActors::TActorId& parent, const TString& compression);
8+
9+
} // namespace NYql::NDq

0 commit comments

Comments
 (0)