Skip to content

Commit cdf4f5d

Browse files
Merge 9d4b485 into 6e333e8
2 parents 6e333e8 + 9d4b485 commit cdf4f5d

File tree

107 files changed

+2652
-509
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+2652
-509
lines changed

ydb/core/base/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ struct TKikimrEvents : TEvents {
177177
ES_NEBIUS_ACCESS_SERVICE,
178178
ES_REPLICATION_SERVICE,
179179
ES_BACKUP_SERVICE,
180+
ES_TX_BACKGROUND,
180181
};
181182
};
182183

ydb/core/protos/counters_columnshard.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,8 @@ enum ETxTypes {
190190
TXTYPE_DATA_SHARING_APPLY_LINKS_MODIFICATION = 27 [(TxTypeOpts) = {Name: "TxDataSharingApplyLinksModification"}];
191191
TXTYPE_DATA_SHARING_WRITE_SOURCE_CURSOR = 28 [(TxTypeOpts) = {Name: "TxDataSharingWriteSourceCursor"}];
192192
TXTYPE_EXPORT_SAVE_CURSOR = 29 [(TxTypeOpts) = {Name: "TxExportSaveCursor"}];
193+
TXTYPE_REMOVE_BACKGROUND_SESSION = 30 [(TxTypeOpts) = {Name: "TxRemoveBackgroundSession"}];
194+
TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}];
195+
TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}];
196+
TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}];
193197
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "adapter.h"
2+
3+
namespace NKikimr::NOlap::NBackground {
4+
5+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#pragma once
2+
#include "session.h"
3+
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
4+
#include <ydb/library/actors/core/actorid.h>
5+
#include <ydb/library/accessor/accessor.h>
6+
#include <ydb/library/conclusion/status.h>
7+
8+
namespace NKikimr::NTabletFlatExecutor {
9+
class TTabletExecutedFlat;
10+
}
11+
12+
namespace NKikimr::NOlap::NBackground {
13+
14+
class ITabletAdapter {
15+
private:
16+
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
17+
YDB_READONLY(TTabletId, TabletId, TTabletId(0));
18+
NTabletFlatExecutor::TTabletExecutedFlat& TabletExecutor;
19+
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) = 0;
20+
virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
21+
virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
22+
virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
23+
virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
24+
public:
25+
ITabletAdapter(const NActors::TActorId& tabletActorId, const TTabletId tabletId, NTabletFlatExecutor::TTabletExecutedFlat& tabletExecutor)
26+
: TabletActorId(tabletActorId)
27+
, TabletId(tabletId)
28+
, TabletExecutor(tabletExecutor)
29+
{
30+
31+
}
32+
virtual ~ITabletAdapter() = default;
33+
34+
template <class T>
35+
T& GetTabletExecutorVerifiedAs() {
36+
T* result = dynamic_cast<T*>(&TabletExecutor);
37+
AFL_VERIFY(result);
38+
return *result;
39+
}
40+
41+
void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
42+
return DoRemoveSessionFromLocalDatabase(txc, className, identifier);
43+
}
44+
45+
[[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) {
46+
return DoLoadSessionsFromLocalDatabase(txc, records);
47+
}
48+
49+
void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
50+
return DoSaveSessionToLocalDatabase(txc, session);
51+
}
52+
53+
void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
54+
return DoSaveStateToLocalDatabase(txc, session);
55+
}
56+
57+
void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
58+
return DoSaveProgressToLocalDatabase(txc, session);
59+
}
60+
};
61+
62+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#include "control.h"
2+
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>
3+
4+
namespace NKikimr::NOlap::NBackground {
5+
6+
NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const {
7+
NKikimrTxBackgroundProto::TSessionControlContainer result;
8+
*result.MutableStatusChannelContainer() = ChannelContainer.SerializeToString();
9+
*result.MutableLogicControlContainer() = LogicControlContainer.SerializeToProto();
10+
return result;
11+
}
12+
13+
NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
14+
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
15+
return TConclusionStatus::Fail("cannot parse channel from proto");
16+
}
17+
if (!LogicControlContainer.DeserializeFromProto(proto.GetLogicControlContainer())) {
18+
return TConclusionStatus::Fail("cannot parse logic from proto");
19+
}
20+
return TConclusionStatus::Success();
21+
}
22+
23+
NKikimr::TConclusionStatus ISessionLogicControl::DeserializeFromProto(const TProto& data) {
24+
SessionClassName = data.GetSessionClassName();
25+
SessionIdentifier = data.GetSessionIdentifier();
26+
return DeserializeFromString(data.GetSessionControlDescription());
27+
}
28+
29+
void ISessionLogicControl::SerializeToProto(TProto& proto) const {
30+
proto.SetSessionClassName(SessionClassName);
31+
proto.SetSessionIdentifier(SessionIdentifier);
32+
proto.SetSessionControlDescription(SerializeToString());
33+
}
34+
35+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#pragma once
2+
#include "session.h"
3+
#include "status_channel.h"
4+
#include <ydb/services/bg_tasks/abstract/interface.h>
5+
#include <ydb/library/accessor/accessor.h>
6+
7+
namespace NKikimrTxBackgroundProto {
8+
class TSessionControlContainer;
9+
class TSessionLogicControlContainer;
10+
}
11+
12+
namespace NKikimr::NOlap::NBackground {
13+
14+
class ISessionLogicControl {
15+
private:
16+
YDB_READONLY_DEF(TString, SessionClassName);
17+
YDB_READONLY_DEF(TString, SessionIdentifier);
18+
virtual TConclusionStatus DoApply(const std::shared_ptr<ISessionLogic>& session) const = 0;
19+
20+
virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
21+
virtual TString DoSerializeToString() const = 0;
22+
protected:
23+
TConclusionStatus DeserializeFromString(const TString& data) {
24+
return DoDeserializeFromString(data);
25+
}
26+
TString SerializeToString() const {
27+
return DoSerializeToString();
28+
}
29+
public:
30+
using TProto = NKikimrTxBackgroundProto::TSessionLogicControlContainer;
31+
using TFactory = NObjectFactory::TObjectFactory<ISessionLogicControl, TString>;
32+
33+
virtual ~ISessionLogicControl() = default;
34+
ISessionLogicControl() = default;
35+
ISessionLogicControl(const TString& sessionClassName, const TString& sessionIdentifier)
36+
: SessionClassName(sessionClassName)
37+
, SessionIdentifier(sessionIdentifier)
38+
{
39+
40+
}
41+
42+
TConclusionStatus DeserializeFromProto(const TProto& data);
43+
void SerializeToProto(TProto& proto) const;
44+
45+
TConclusionStatus Apply(const std::shared_ptr<ISessionLogic>& session) const {
46+
session->CheckStatusCorrect();
47+
auto result = DoApply(session);
48+
session->CheckStatusCorrect();
49+
return result;
50+
}
51+
52+
virtual TString GetClassName() const = 0;
53+
};
54+
55+
class TSessionLogicControlContainer: public NBackgroundTasks::TInterfaceProtoContainer<ISessionLogicControl> {
56+
private:
57+
using TBase = NBackgroundTasks::TInterfaceProtoContainer<ISessionLogicControl>;
58+
public:
59+
using TBase::TBase;
60+
};
61+
62+
class TSessionControlContainer {
63+
private:
64+
YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer);
65+
YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer);
66+
public:
67+
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const;
68+
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto);
69+
70+
TSessionControlContainer() = default;
71+
72+
TSessionControlContainer(const TStatusChannelContainer& channel, const TSessionLogicControlContainer& logic)
73+
: ChannelContainer(channel)
74+
, LogicControlContainer(logic) {
75+
AFL_VERIFY(!!ChannelContainer);
76+
AFL_VERIFY(!!LogicControlContainer);
77+
}
78+
};
79+
80+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#include "session.h"
2+
#include "adapter.h"
3+
4+
namespace NKikimr::NOlap::NBackground {
5+
6+
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#pragma once
2+
#include "status_channel.h"
3+
4+
#include <ydb/core/tx/columnshard/common/tablet_id.h>
5+
#include <ydb/services/bg_tasks/abstract/interface.h>
6+
7+
#include <ydb/library/accessor/accessor.h>
8+
#include <ydb/library/actors/core/actorid.h>
9+
#include <ydb/library/conclusion/status.h>
10+
#include <ydb/library/conclusion/result.h>
11+
12+
namespace NKikimr::NOlap::NBackground {
13+
14+
class TSession;
15+
class ITabletAdapter;
16+
17+
class TStartContext {
18+
private:
19+
YDB_READONLY_DEF(std::shared_ptr<TSession>, SessionSelfPtr);
20+
YDB_READONLY_DEF(std::shared_ptr<ITabletAdapter>, Adapter);
21+
public:
22+
TStartContext(const std::shared_ptr<TSession>& sessionSelfPtr,
23+
const std::shared_ptr<ITabletAdapter>& adapter)
24+
: SessionSelfPtr(sessionSelfPtr)
25+
, Adapter(adapter)
26+
{
27+
AFL_VERIFY(!!SessionSelfPtr);
28+
AFL_VERIFY(!!Adapter);
29+
}
30+
};
31+
32+
class ISessionLogic {
33+
private:
34+
mutable bool ActorConstructed = false;
35+
virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) = 0;
36+
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) = 0;
37+
virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
38+
virtual TString DoSerializeProgressToString() const = 0;
39+
virtual TString DoSerializeStateToString() const = 0;
40+
virtual TString DoSerializeToString() const = 0;
41+
virtual TConclusion<std::unique_ptr<NActors::IActor>> DoCreateActor(const TStartContext& context) const = 0;
42+
public:
43+
using TFactory = NObjectFactory::TObjectFactory<ISessionLogic, TString>;
44+
45+
virtual ~ISessionLogic() = default;
46+
47+
virtual TString GetClassName() const = 0;
48+
49+
void CheckStatusCorrect() const {
50+
}
51+
52+
TConclusionStatus DeserializeProgressFromString(const TString& data) {
53+
return DoDeserializeProgressFromString(data);
54+
}
55+
TString SerializeProgressToString() const {
56+
CheckStatusCorrect();
57+
return DoSerializeProgressToString();
58+
}
59+
TConclusionStatus DeserializeFromString(const TString& data) {
60+
return DoDeserializeFromString(data);
61+
}
62+
TString SerializeToString() const {
63+
CheckStatusCorrect();
64+
return DoSerializeToString();
65+
}
66+
TConclusionStatus DeserializeStateFromString(const TString& data) {
67+
return DoDeserializeStateFromString(data);
68+
}
69+
TString SerializeStateToString() const {
70+
CheckStatusCorrect();
71+
return DoSerializeStateToString();
72+
}
73+
74+
std::unique_ptr<NActors::IActor> CreateActor(const TStartContext& context) const {
75+
AFL_VERIFY(IsReadyForStart());
76+
AFL_VERIFY(!IsFinished());
77+
AFL_VERIFY(!ActorConstructed);
78+
ActorConstructed = true;
79+
std::unique_ptr<NActors::IActor> result = DoCreateActor(context).DetachResult();
80+
AFL_VERIFY(!!result);
81+
return result;
82+
}
83+
84+
virtual bool IsReadyForStart() const = 0;
85+
virtual bool IsFinished() const = 0;
86+
virtual bool IsReadyForRemove() const = 0;
87+
};
88+
89+
template <class TProtoLogicExt, class TProtoProgressExt, class TProtoStateExt>
90+
class TSessionProtoAdapter: public NBackgroundTasks::TInterfaceProtoAdapter<TProtoLogicExt, ISessionLogic> {
91+
protected:
92+
using TProtoProgress = TProtoProgressExt;
93+
using TProtoState = TProtoStateExt;
94+
using TProtoLogic = TProtoLogicExt;
95+
private:
96+
virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) = 0;
97+
virtual TProtoProgress DoSerializeProgressToProto() const = 0;
98+
virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& proto) = 0;
99+
virtual TProtoState DoSerializeStateToProto() const = 0;
100+
protected:
101+
virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) override final {
102+
TProtoProgress proto;
103+
if (!proto.ParseFromArray(data.data(), data.size())) {
104+
return TConclusionStatus::Fail("cannot parse proto string as " + TypeName<TProtoProgress>());
105+
}
106+
return DoDeserializeProgressFromProto(proto);
107+
}
108+
virtual TString DoSerializeProgressToString() const override final {
109+
TProtoProgress proto = DoSerializeProgressToProto();
110+
return proto.SerializeAsString();
111+
}
112+
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) override final {
113+
TProtoState proto;
114+
if (!proto.ParseFromArray(data.data(), data.size())) {
115+
return TConclusionStatus::Fail("cannot parse proto string as " + TypeName<TProtoState>());
116+
}
117+
return DoDeserializeStateFromProto(proto);
118+
}
119+
virtual TString DoSerializeStateToString() const override final {
120+
TProtoState proto = DoSerializeStateToProto();
121+
return proto.SerializeAsString();
122+
}
123+
};
124+
125+
class TSessionLogicContainer: public NBackgroundTasks::TInterfaceStringContainer<ISessionLogic> {
126+
private:
127+
using TBase = NBackgroundTasks::TInterfaceStringContainer<ISessionLogic>;
128+
public:
129+
using TBase::TBase;
130+
};
131+
132+
class TSessionRecord {
133+
private:
134+
YDB_ACCESSOR_DEF(TString, Identifier);
135+
YDB_ACCESSOR_DEF(TString, ClassName);
136+
YDB_ACCESSOR_DEF(TString, LogicDescription);
137+
YDB_ACCESSOR_DEF(TString, StatusChannel);
138+
YDB_ACCESSOR_DEF(TString, Progress);
139+
YDB_ACCESSOR_DEF(TString, State);
140+
public:
141+
};
142+
143+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "status_channel.h"
2+
3+
namespace NKikimr::NOlap::NBackground {
4+
5+
}

0 commit comments

Comments
 (0)