Skip to content

Commit d5e1c7f

Browse files
Merge 04c59b3 into d478385
2 parents d478385 + 04c59b3 commit d5e1c7f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1348
-0
lines changed

ydb/core/base/events.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ struct TKikimrEvents : TEvents {
177177
ES_NEBIUS_ACCESS_SERVICE,
178178
ES_REPLICATION_SERVICE,
179179
ES_BACKUP_SERVICE,
180+
ES_TX_BACKGROUND,
180181
};
181182
};
182183

ydb/core/protos/counters_columnshard.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,8 @@ enum ETxTypes {
190190
TXTYPE_DATA_SHARING_APPLY_LINKS_MODIFICATION = 27 [(TxTypeOpts) = {Name: "TxDataSharingApplyLinksModification"}];
191191
TXTYPE_DATA_SHARING_WRITE_SOURCE_CURSOR = 28 [(TxTypeOpts) = {Name: "TxDataSharingWriteSourceCursor"}];
192192
TXTYPE_EXPORT_SAVE_CURSOR = 29 [(TxTypeOpts) = {Name: "TxExportSaveCursor"}];
193+
TXTYPE_REMOVE_BACKGROUND_SESSION = 30 [(TxTypeOpts) = {Name: "TxRemoveBackgroundSession"}];
194+
TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}];
195+
TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}];
196+
TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}];
193197
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "adapter.h"
2+
3+
namespace NKikimr::NOlap::NBackground {
4+
5+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#pragma once
2+
#include "session.h"
3+
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
4+
#include <ydb/library/actors/core/actorid.h>
5+
#include <ydb/library/accessor/accessor.h>
6+
#include <ydb/library/conclusion/status.h>
7+
8+
namespace NKikimr::NOlap::NBackground {
9+
10+
class ITabletAdapter {
11+
private:
12+
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
13+
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) = 0;
14+
virtual TConclusionStatus DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
15+
virtual TConclusionStatus DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
16+
virtual TConclusionStatus DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
17+
virtual TConclusionStatus DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
18+
public:
19+
ITabletAdapter(const NActors::TActorId& actorId)
20+
: TabletActorId(actorId)
21+
{
22+
23+
}
24+
25+
[[nodiscard]] TConclusionStatus RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
26+
return DoRemoveSessionFromLocalDatabase(txc, className, identifier);
27+
}
28+
29+
[[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) {
30+
return DoLoadSessionsFromLocalDatabase(txc, records);
31+
}
32+
33+
[[nodiscard]] TConclusionStatus SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
34+
return DoSaveSessionToLocalDatabase(txc, session);
35+
}
36+
37+
[[nodiscard]] TConclusionStatus SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
38+
return DoSaveStateToLocalDatabase(txc, session);
39+
}
40+
41+
[[nodiscard]] TConclusionStatus SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
42+
return DoSaveProgressToLocalDatabase(txc, session);
43+
}
44+
};
45+
46+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "control.h"
2+
3+
namespace NKikimr::NOlap::NBackground {
4+
5+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#pragma once
2+
#include "session.h"
3+
#include "status_channel.h"
4+
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>
5+
#include <ydb/services/bg_tasks/abstract/interface.h>
6+
#include <ydb/library/accessor/accessor.h>
7+
8+
namespace NKikimr::NOlap::NBackground {
9+
10+
class ISessionLogicControl {
11+
private:
12+
YDB_READONLY_DEF(TString, Identifier);
13+
virtual TConclusionStatus DoApply(const std::shared_ptr<ISessionLogic>& session) const = 0;
14+
virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
15+
virtual TString DoSerializeToString() const = 0;
16+
public:
17+
using TFactory = NObjectFactory::TObjectFactory<ISessionLogicControl, TString>;
18+
19+
virtual ~ISessionLogicControl() = default;
20+
21+
TConclusionStatus DeserializeFromString(const TString& data) {
22+
return DoDeserializeFromString(data);
23+
}
24+
25+
TString SerializeToString() const {
26+
return DoSerializeToString();
27+
}
28+
29+
TConclusionStatus Apply(const std::shared_ptr<ISessionLogic>& session) const {
30+
session->CheckStatusCorrect();
31+
auto result = DoApply(session);
32+
session->CheckStatusCorrect();
33+
return result;
34+
}
35+
36+
virtual TString GetClassName() const = 0;
37+
};
38+
39+
class TSessionLogicControlContainer: public NBackgroundTasks::TInterfaceStringContainer<ISessionLogicControl> {
40+
private:
41+
using TBase = NBackgroundTasks::TInterfaceStringContainer<ISessionLogicControl>;
42+
public:
43+
using TBase::TBase;
44+
};
45+
46+
class TSessionControlContainer {
47+
private:
48+
YDB_READONLY_DEF(TString, SessionClassName);
49+
YDB_READONLY_DEF(TString, SessionIdentifier);
50+
YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer);
51+
YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer);
52+
public:
53+
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const {
54+
NKikimrTxBackgroundProto::TSessionControlContainer result;
55+
result.SetSessionClassName(SessionClassName);
56+
result.SetSessionIdentifier(SessionIdentifier);
57+
result.SetStatusChannelContainer(ChannelContainer.SerializeToString());
58+
result.SetLogicControlContainer(LogicControlContainer.SerializeToString());
59+
return result;
60+
}
61+
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
62+
SessionClassName = proto.GetSessionClassName();
63+
SessionIdentifier = proto.GetSessionIdentifier();
64+
if (!SessionClassName) {
65+
return TConclusionStatus::Fail("incorrect session class name for bg_task");
66+
}
67+
if (!SessionIdentifier) {
68+
return TConclusionStatus::Fail("incorrect session id for bg_task");
69+
}
70+
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
71+
return TConclusionStatus::Fail("cannot parse channel from proto");
72+
}
73+
if (!LogicControlContainer.DeserializeFromString(proto.GetLogicControlContainer())) {
74+
return TConclusionStatus::Fail("cannot parse logic from proto");
75+
}
76+
return TConclusionStatus::Success();
77+
}
78+
79+
TSessionControlContainer() = default;
80+
81+
TSessionControlContainer(const TStatusChannelContainer& channel, const TSessionLogicControlContainer& logic)
82+
: ChannelContainer(channel)
83+
, LogicControlContainer(logic) {
84+
AFL_VERIFY(!!ChannelContainer);
85+
AFL_VERIFY(!!LogicControlContainer);
86+
}
87+
};
88+
89+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "session.h"
2+
3+
namespace NKikimr::NOlap::NBackground {
4+
5+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#pragma once
2+
#include "status_channel.h"
3+
4+
#include <ydb/core/tx/columnshard/common/tablet_id.h>
5+
#include <ydb/services/bg_tasks/abstract/interface.h>
6+
7+
#include <ydb/library/accessor/accessor.h>
8+
#include <ydb/library/actors/core/actorid.h>
9+
#include <ydb/library/conclusion/status.h>
10+
#include <ydb/library/conclusion/result.h>
11+
12+
namespace NKikimr::NOlap::NBackground {
13+
14+
class TStartContext {
15+
private:
16+
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
17+
YDB_READONLY_DEF(TStatusChannelContainer, Channel);
18+
public:
19+
TStartContext(const NActors::TActorId& tabletActorId, const TStatusChannelContainer channel)
20+
: TabletActorId(tabletActorId)
21+
, Channel(channel)
22+
{
23+
24+
}
25+
};
26+
27+
class ISessionLogic {
28+
private:
29+
mutable bool ActorConstructed = false;
30+
virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) = 0;
31+
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) = 0;
32+
virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
33+
virtual TString DoSerializeProgressToString() const = 0;
34+
virtual TString DoSerializeStateToString() const = 0;
35+
virtual TString DoSerializeToString() const = 0;
36+
virtual TConclusion<std::unique_ptr<NActors::IActor>> DoCreateActor(const TStartContext& context) const = 0;
37+
public:
38+
using TFactory = NObjectFactory::TObjectFactory<ISessionLogic, TString>;
39+
40+
virtual ~ISessionLogic() = default;
41+
42+
virtual TString GetClassName() const = 0;
43+
44+
void CheckStatusCorrect() const {
45+
}
46+
47+
TConclusionStatus DeserializeProgressFromString(const TString& data) {
48+
return DoDeserializeProgressFromString(data);
49+
}
50+
TString SerializeProgressToString() const {
51+
CheckStatusCorrect();
52+
return DoSerializeProgressToString();
53+
}
54+
TConclusionStatus DeserializeFromString(const TString& data) {
55+
return DoDeserializeFromString(data);
56+
}
57+
TString SerializeToString() const {
58+
CheckStatusCorrect();
59+
return DoSerializeToString();
60+
}
61+
TConclusionStatus DeserializeStateFromString(const TString& data) {
62+
return DoDeserializeStateFromString(data);
63+
}
64+
TString SerializeStateToString() const {
65+
CheckStatusCorrect();
66+
return DoSerializeStateToString();
67+
}
68+
69+
std::unique_ptr<NActors::IActor> CreateActor(const TStartContext& context) const {
70+
AFL_VERIFY(IsReadyForStart());
71+
AFL_VERIFY(!IsFinished());
72+
AFL_VERIFY(!ActorConstructed);
73+
ActorConstructed = true;
74+
std::unique_ptr<NActors::IActor> result = DoCreateActor(context).DetachResult();
75+
AFL_VERIFY(!!result);
76+
return result;
77+
}
78+
79+
virtual bool IsReadyForStart() const = 0;
80+
virtual bool IsFinished() const = 0;
81+
virtual bool IsReadyForRemove() const = 0;
82+
};
83+
84+
template <class TProtoLogicExt, class TProtoProgressExt, class TProtoStateExt>
85+
class TSessionProtoAdapter: public TInterfaceProtoAdapter<TProtoLogicExt, ISessionLogic> {
86+
protected:
87+
using TProtoProgress = TProtoProgressExt;
88+
using TProtoState = TProtoStateExt;
89+
using TProtoLogic = TProtoLogicExt;
90+
private:
91+
virtual TConclusionStatus DoDeserializeProgressFromProto(const TProtoProgress& proto) = 0;
92+
virtual TProtoProgress DoSerializeProgressToProto() const = 0;
93+
virtual TConclusionStatus DoDeserializeStateFromProto(const TProtoState& proto) = 0;
94+
virtual TProtoState DoSerializeStateToProto() const = 0;
95+
protected:
96+
virtual TConclusionStatus DoDeserializeProgressFromString(const TString& data) override final {
97+
TProtoProgress proto;
98+
if (!proto.ParseFromArray(data.data(), data.size())) {
99+
return TConclusionStatus::Fail("cannot parse proto string as " + TypeName<TProtoProgress>());
100+
}
101+
return DoDeserializeProgressFromProto(proto);
102+
}
103+
virtual TString DoSerializeProgressToString() const override final {
104+
TProtoProgress proto = DoSerializeToProto();
105+
return proto.SerializeAsString();
106+
}
107+
virtual TConclusionStatus DoDeserializeStateFromString(const TString& data) override final {
108+
TProtoState proto;
109+
if (!proto.ParseFromArray(data.data(), data.size())) {
110+
return TConclusionStatus::Fail("cannot parse proto string as " + TypeName<TProtoState>());
111+
}
112+
return DoDeserializeStateFromProto(proto);
113+
}
114+
virtual TString DoSerializeStateToString() const override final {
115+
TProtoState proto = DoSerializeStateToProto();
116+
return proto.SerializeAsString();
117+
}
118+
};
119+
120+
class TSessionLogicContainer: public NBackgroundTasks::TInterfaceStringContainer<ISessionLogic> {
121+
private:
122+
using TBase = NBackgroundTasks::TInterfaceStringContainer<ISessionLogic>;
123+
public:
124+
using TBase::TBase;
125+
};
126+
127+
class TSessionRecord {
128+
private:
129+
YDB_ACCESSOR_DEF(TString, Identifier);
130+
YDB_ACCESSOR_DEF(TString, ClassName);
131+
YDB_ACCESSOR_DEF(TString, LogicDescription);
132+
YDB_ACCESSOR_DEF(TString, StatusChannel);
133+
YDB_ACCESSOR_DEF(TString, Progress);
134+
YDB_ACCESSOR_DEF(TString, State);
135+
public:
136+
};
137+
138+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "status_channel.h"
2+
3+
namespace NKikimr::NOlap::NBackground {
4+
5+
}

0 commit comments

Comments
 (0)