Skip to content

Commit 8a076ec

Browse files
authored
YQL-17087: Move DqChannelStorage to separate file and use interface (#591)
1 parent bf58f60 commit 8a076ec

File tree

4 files changed

+256
-206
lines changed

4 files changed

+256
-206
lines changed

ydb/library/yql/dq/actors/spilling/channel_storage.cpp

Lines changed: 5 additions & 206 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#include "channel_storage.h"
2-
#include "spilling.h"
3-
#include "spilling_file.h"
2+
3+
#include "channel_storage_actor.h"
44

55
#include <ydb/library/yql/utils/yql_panic.h>
66
#include <ydb/library/services/services.pb.h>
@@ -12,220 +12,19 @@
1212
#include <util/generic/buffer.h>
1313
#include <util/generic/map.h>
1414
#include <util/generic/set.h>
15-
#include <util/generic/size_literals.h>
1615

1716

1817
namespace NYql::NDq {
1918

2019
using namespace NActors;
2120

22-
#define LOG_D(s) \
23-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
24-
#define LOG_I(s) \
25-
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
26-
#define LOG_E(s) \
27-
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
28-
#define LOG_C(s) \
29-
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
30-
#define LOG_W(s) \
31-
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
32-
#define LOG_T(s) \
33-
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
34-
3521
namespace {
3622

37-
constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
38-
constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
39-
40-
class TDqChannelStorageActor : public TActorBootstrapped<TDqChannelStorageActor> {
41-
using TBase = TActorBootstrapped<TDqChannelStorageActor>;
42-
43-
public:
44-
TDqChannelStorageActor(TTxId txId, ui64 channelId, IDqChannelStorage::TWakeUpCallback&& wakeUp, TActorSystem* actorSystem)
45-
: TxId_(txId)
46-
, ChannelId_(channelId)
47-
, WakeUp_(std::move(wakeUp))
48-
, ActorSystem_(actorSystem)
49-
{}
50-
51-
void Bootstrap() {
52-
auto spillingActor = CreateDqLocalFileSpillingActor(TxId_, TStringBuilder() << "ChannelId: " << ChannelId_,
53-
SelfId(), true);
54-
SpillingActorId_ = Register(spillingActor);
55-
56-
Become(&TDqChannelStorageActor::WorkState);
57-
}
58-
59-
static constexpr char ActorName[] = "DQ_CHANNEL_STORAGE";
60-
61-
protected:
62-
void PassAway() override {
63-
Send(SpillingActorId_, new TEvents::TEvPoison);
64-
TBase::PassAway();
65-
}
66-
67-
private:
68-
STATEFN(WorkState) {
69-
switch (ev->GetTypeRewrite()) {
70-
hFunc(TEvDqSpilling::TEvWriteResult, HandleWork);
71-
hFunc(TEvDqSpilling::TEvReadResult, HandleWork);
72-
hFunc(TEvDqSpilling::TEvError, HandleWork);
73-
default:
74-
Y_ABORT("TDqChannelStorageActor::WorkState unexpected event type: %" PRIx32 " event: %s",
75-
ev->GetTypeRewrite(),
76-
ev->ToString().data());
77-
}
78-
}
79-
80-
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
81-
auto& msg = *ev->Get();
82-
LOG_T("[TEvWriteResult] blobId: " << msg.BlobId);
83-
84-
auto it = WritingBlobs_.find(msg.BlobId);
85-
if (it == WritingBlobs_.end()) {
86-
LOG_E("Got unexpected TEvWriteResult, blobId: " << msg.BlobId);
87-
88-
Error_ = "Internal error";
89-
90-
Send(SpillingActorId_, new TEvents::TEvPoison);
91-
return;
92-
}
93-
94-
ui64 size = it->second;
95-
WritingBlobsSize_ -= size;
96-
WritingBlobs_.erase(it);
97-
98-
StoredBlobsCount_++;
99-
StoredBlobsSize_ += size;
100-
}
101-
102-
void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
103-
auto& msg = *ev->Get();
104-
LOG_T("[TEvReadResult] blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
105-
106-
if (LoadingBlobs_.erase(msg.BlobId) != 1) {
107-
LOG_E("[TEvReadResult] unexpected, blobId: " << msg.BlobId << ", size: " << msg.Blob.size());
108-
return;
109-
}
110-
111-
LoadedBlobs_[msg.BlobId].Swap(msg.Blob);
112-
YQL_ENSURE(LoadedBlobs_[msg.BlobId].size() != 0);
113-
114-
if (LoadedBlobs_.size() == 1) {
115-
WakeUp_();
116-
}
117-
}
118-
119-
void HandleWork(TEvDqSpilling::TEvError::TPtr& ev) {
120-
auto& msg = *ev->Get();
121-
LOG_D("[TEvError] " << msg.Message);
122-
123-
Error_.ConstructInPlace(msg.Message);
124-
}
125-
126-
public:
127-
[[nodiscard]]
128-
const TMaybe<TString>& GetError() const {
129-
return Error_;
130-
}
131-
132-
bool IsEmpty() const {
133-
return WritingBlobs_.empty() && StoredBlobsCount_ == 0 && LoadedBlobs_.empty();
134-
}
135-
136-
bool IsFull() const {
137-
return WritingBlobs_.size() > MAX_INFLIGHT_BLOBS_COUNT || WritingBlobsSize_ > MAX_INFLIGHT_BLOBS_SIZE;
138-
}
139-
140-
void Put(ui64 blobId, TRope&& blob, ui64 cookie) {
141-
FailOnError();
142-
143-
// TODO: timeout
144-
// TODO: limit inflight events
145-
146-
ui64 size = blob.size();
147-
148-
SendEvent(new TEvDqSpilling::TEvWrite(blobId, std::move(blob)), cookie);
149-
150-
WritingBlobs_.emplace(blobId, size);
151-
WritingBlobsSize_ += size;
152-
}
153-
154-
bool Get(ui64 blobId, TBuffer& blob, ui64 cookie) {
155-
FailOnError();
156-
157-
auto loadedIt = LoadedBlobs_.find(blobId);
158-
if (loadedIt != LoadedBlobs_.end()) {
159-
YQL_ENSURE(loadedIt->second.size() != 0);
160-
blob.Swap(loadedIt->second);
161-
LoadedBlobs_.erase(loadedIt);
162-
return true;
163-
}
164-
165-
auto result = LoadingBlobs_.emplace(blobId);
166-
if (result.second) {
167-
SendEvent(new TEvDqSpilling::TEvRead(blobId, true), cookie);
168-
}
169-
170-
return false;
171-
}
172-
173-
void Terminate() {
174-
PassAway();
175-
}
176-
177-
private:
178-
void FailOnError() {
179-
if (Error_) {
180-
LOG_E("Error: " << *Error_);
181-
ythrow TDqChannelStorageException() << "TxId: " << TxId_ << ", channelId: " << ChannelId_
182-
<< ", error: " << *Error_;
183-
}
184-
}
185-
186-
template<typename T>
187-
void SendEvent(T* event, ui64 cookie) {
188-
if (ActorSystem_) {
189-
ActorSystem_->Send(
190-
new IEventHandle(
191-
SpillingActorId_,
192-
SelfId(),
193-
event,
194-
/*flags=*/0,
195-
cookie
196-
)
197-
);
198-
} else {
199-
Send(SpillingActorId_, event);
200-
}
201-
}
202-
203-
private:
204-
const TTxId TxId_;
205-
const ui64 ChannelId_;
206-
IDqChannelStorage::TWakeUpCallback WakeUp_;
207-
TActorId SpillingActorId_;
208-
209-
TMap<ui64, ui64> WritingBlobs_; // blobId -> blobSize
210-
ui64 WritingBlobsSize_ = 0;
211-
212-
ui32 StoredBlobsCount_ = 0;
213-
ui64 StoredBlobsSize_ = 0;
214-
215-
TSet<ui64> LoadingBlobs_;
216-
TMap<ui64, TBuffer> LoadedBlobs_;
217-
218-
TMaybe<TString> Error_;
219-
220-
TActorSystem* ActorSystem_;
221-
};
222-
223-
22423
class TDqChannelStorage : public IDqChannelStorage {
22524
public:
22625
TDqChannelStorage(TTxId txId, ui64 channelId, TWakeUpCallback&& wakeUp, TActorSystem* actorSystem) {
227-
SelfActor_ = new TDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
228-
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_);
26+
SelfActor_ = CreateDqChannelStorageActor(txId, channelId, std::move(wakeUp), actorSystem);
27+
TlsActivationContext->AsActorContext().RegisterWithSameMailbox(SelfActor_->GetActor());
22928
}
23029

23130
~TDqChannelStorage() {
@@ -249,7 +48,7 @@ class TDqChannelStorage : public IDqChannelStorage {
24948
}
25049

25150
private:
252-
TDqChannelStorageActor* SelfActor_;
51+
IDqChannelStorageActor* SelfActor_;
25352
};
25453

25554
} // anonymous namespace

0 commit comments

Comments
 (0)