Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ struct TKikimrEvents : TEvents {
ES_NEBIUS_ACCESS_SERVICE,
ES_REPLICATION_SERVICE,
ES_BACKUP_SERVICE,
ES_TX_BACKGROUND,
ES_SS_BG_TASKS
};
};

Expand Down
21 changes: 21 additions & 0 deletions ydb/core/grpc_services/rpc_list_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
#include <ydb/core/tx/schemeshard/olap/bg_tasks/events/global.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/public/lib/operation_id/operation_id.h>

Expand Down Expand Up @@ -50,6 +51,8 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
const auto& request = *GetProtoRequest();

switch (ParseKind(GetProtoRequest()->kind())) {
case TOperationId::SS_BG_TASKS:
return new NSchemeShard::NBackground::TEvListRequest(DatabaseName, request.page_size(), request.page_token());
case TOperationId::EXPORT:
return new TEvExport::TEvListExportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind());
case TOperationId::IMPORT:
Expand Down Expand Up @@ -117,6 +120,24 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
Reply(response);
}

void Handle(NSchemeShard::NBackground::TEvListResponse::TPtr& ev) {
const auto& record = ev->Get()->Record;

LOG_D("Handle TEvSchemeShard::TEvBGTasksListResponse: record# " << record.ShortDebugString());

TResponse response;

response.set_status(record.GetStatus());
if (record.GetIssues().size()) {
response.mutable_issues()->CopyFrom(record.GetIssues());
}
for (const auto& entry : record.GetEntries()) {
*response.add_operations() = entry;
}
response.set_next_page_token(record.GetNextPageToken());
Reply(response);
}

void SendListScriptExecutions() {
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvListScriptExecutionOperations(DatabaseName, GetProtoRequest()->page_size(), GetProtoRequest()->page_token()));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ PEERDIR(
ydb/core/tx/datashard
ydb/core/tx/sharding
ydb/core/tx/data_events
ydb/core/tx/schemeshard/olap/bg_tasks/events
ydb/core/util
ydb/core/ydb_convert
ydb/core/security
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,8 @@ enum ETxTypes {
TXTYPE_DATA_SHARING_APPLY_LINKS_MODIFICATION = 27 [(TxTypeOpts) = {Name: "TxDataSharingApplyLinksModification"}];
TXTYPE_DATA_SHARING_WRITE_SOURCE_CURSOR = 28 [(TxTypeOpts) = {Name: "TxDataSharingWriteSourceCursor"}];
TXTYPE_EXPORT_SAVE_CURSOR = 29 [(TxTypeOpts) = {Name: "TxExportSaveCursor"}];
TXTYPE_REMOVE_BACKGROUND_SESSION = 30 [(TxTypeOpts) = {Name: "TxRemoveBackgroundSession"}];
TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}];
TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}];
TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}];
}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "adapter.h"

namespace NKikimr::NOlap::NBackground {

}
63 changes: 63 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#pragma once
#include "session.h"
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NTabletFlatExecutor {
class TTabletExecutedFlat;
}

namespace NKikimr::NOlap::NBackground {

class ITabletAdapter {
private:
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
YDB_READONLY(TTabletId, TabletId, TTabletId(0));
NTabletFlatExecutor::TTabletExecutedFlat& TabletExecutor;
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) = 0;
virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
public:
ITabletAdapter(const NActors::TActorId& tabletActorId, const TTabletId tabletId, NTabletFlatExecutor::TTabletExecutedFlat& tabletExecutor)
: TabletActorId(tabletActorId)
, TabletId(tabletId)
, TabletExecutor(tabletExecutor)
{
AFL_VERIFY(!!TabletActorId);
AFL_VERIFY(!!(ui64)TabletId);
}
virtual ~ITabletAdapter() = default;

template <class T>
T& GetTabletExecutorVerifiedAs() {
T* result = dynamic_cast<T*>(&TabletExecutor);
AFL_VERIFY(result);
return *result;
}

void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
return DoRemoveSessionFromLocalDatabase(txc, className, identifier);
}

[[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) {
return DoLoadSessionsFromLocalDatabase(txc, records);
}

void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveSessionToLocalDatabase(txc, session);
}

void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveStateToLocalDatabase(txc, session);
}

void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveProgressToLocalDatabase(txc, session);
}
};

}
35 changes: 35 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "control.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>

namespace NKikimr::NOlap::NBackground {

NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const {
NKikimrTxBackgroundProto::TSessionControlContainer result;
*result.MutableStatusChannelContainer() = ChannelContainer.SerializeToString();
*result.MutableLogicControlContainer() = LogicControlContainer.SerializeToProto();
return result;
}

NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse channel from proto");
}
if (!LogicControlContainer.DeserializeFromProto(proto.GetLogicControlContainer())) {
return TConclusionStatus::Fail("cannot parse logic from proto");
}
return TConclusionStatus::Success();
}

NKikimr::TConclusionStatus ISessionLogicControl::DeserializeFromProto(const TProto& data) {
SessionClassName = data.GetSessionClassName();
SessionIdentifier = data.GetSessionIdentifier();
return DeserializeFromString(data.GetSessionControlDescription());
}

void ISessionLogicControl::SerializeToProto(TProto& proto) const {
proto.SetSessionClassName(SessionClassName);
proto.SetSessionIdentifier(SessionIdentifier);
proto.SetSessionControlDescription(SerializeToString());
}

}
80 changes: 80 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#pragma once
#include "session.h"
#include "status_channel.h"
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/library/accessor/accessor.h>

namespace NKikimrTxBackgroundProto {
class TSessionControlContainer;
class TSessionLogicControlContainer;
}

namespace NKikimr::NOlap::NBackground {

class ISessionLogicControl {
private:
YDB_READONLY_DEF(TString, SessionClassName);
YDB_READONLY_DEF(TString, SessionIdentifier);
virtual TConclusionStatus DoApply(const std::shared_ptr<ISessionLogic>& session) const = 0;

virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
virtual TString DoSerializeToString() const = 0;
protected:
TConclusionStatus DeserializeFromString(const TString& data) {
return DoDeserializeFromString(data);
}
TString SerializeToString() const {
return DoSerializeToString();
}
public:
using TProto = NKikimrTxBackgroundProto::TSessionLogicControlContainer;
using TFactory = NObjectFactory::TObjectFactory<ISessionLogicControl, TString>;

virtual ~ISessionLogicControl() = default;
ISessionLogicControl() = default;
ISessionLogicControl(const TString& sessionClassName, const TString& sessionIdentifier)
: SessionClassName(sessionClassName)
, SessionIdentifier(sessionIdentifier)
{

}

TConclusionStatus DeserializeFromProto(const TProto& data);
void SerializeToProto(TProto& proto) const;

TConclusionStatus Apply(const std::shared_ptr<ISessionLogic>& session) const {
session->CheckStatusCorrect();
auto result = DoApply(session);
session->CheckStatusCorrect();
return result;
}

virtual TString GetClassName() const = 0;
};

class TSessionLogicControlContainer: public NBackgroundTasks::TInterfaceProtoContainer<ISessionLogicControl> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<ISessionLogicControl>;
public:
using TBase::TBase;
};

class TSessionControlContainer {
private:
YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer);
YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer);
public:
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const;
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto);

TSessionControlContainer() = default;

TSessionControlContainer(const TStatusChannelContainer& channel, const TSessionLogicControlContainer& logic)
: ChannelContainer(channel)
, LogicControlContainer(logic) {
AFL_VERIFY(!!ChannelContainer);
AFL_VERIFY(!!LogicControlContainer);
}
};

}
14 changes: 14 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/session.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "session.h"
#include "adapter.h"
#include <ydb/public/api/protos/ydb_operation.pb.h>

namespace NKikimr::NOlap::NBackground {

Ydb::Operations::Operation TSessionInfoReport::SerializeToProto() const {
Ydb::Operations::Operation result;
result.set_id(ClassName + "::" + Identifier);
result.set_ready(IsFinished);
return result;
}

}
Loading