Skip to content

Move TBaseChangeSender to ydb/core/change_exchange KIKIMR-20673 #1207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include "change_sender_common_ops.h"
#include "change_sender_monitoring.h"

#include <ydb/library/yverify_stream/yverify_stream.h>

#include <library/cpp/monlib/service/pages/mon_page.h>
#include <library/cpp/monlib/service/pages/templates.h>

#include <util/generic/algorithm.h>
#include <util/generic/size_literals.h>

namespace NKikimr::NDataShard {
namespace NKikimr::NChangeExchange {

void TBaseChangeSender::LazyCreateSender(THashMap<ui64, TSender>& senders, ui64 partitionId) {
auto res = senders.emplace(partitionId, TSender{});
Expand Down Expand Up @@ -82,7 +84,7 @@ void TBaseChangeSender::KillSenders() {
}
}

void TBaseChangeSender::EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
void TBaseChangeSender::EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) {
for (auto& record : records) {
Y_VERIFY_S(PathId == record.PathId, "Unexpected record's path id"
<< ": expected# " << PathId
Expand Down Expand Up @@ -117,11 +119,11 @@ bool TBaseChangeSender::RequestRecords() {
return false;
}

ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRequestRecords(std::move(records)));
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRequestRecords(std::move(records)));
return true;
}

void TBaseChangeSender::ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) {
void TBaseChangeSender::ProcessRecords(TVector<IChangeRecord::TPtr>&& records) {
for (auto& record : records) {
auto it = PendingBody.find(record->GetOrder());
if (it == PendingBody.end()) {
Expand Down Expand Up @@ -290,7 +292,7 @@ void TBaseChangeSender::SendPreparedRecords(ui64 partitionId) {
}

Y_ABORT_UNLESS(sender.ActorId);
ActorOps->Send(sender.ActorId, new NChangeExchange::TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
ActorOps->Send(sender.ActorId, new TEvChangeExchange::TEvRecords(std::exchange(sender.Prepared, {})));
}

void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
Expand All @@ -306,7 +308,7 @@ void TBaseChangeSender::ReEnqueueRecords(const TSender& sender) {
}
}

TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record) {
TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(IChangeRecord::TPtr record) {
Y_ABORT_UNLESS(record->IsBroadcast());

auto it = Broadcasting.find(record->GetOrder());
Expand Down Expand Up @@ -430,17 +432,17 @@ void TBaseChangeSender::RemoveRecords() {
}

TBaseChangeSender::TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
const TDataShardId& dataShard, const TPathId& pathId)
const TActorId& changeServer, const TPathId& pathId)
: ActorOps(actorOps)
, Resolver(resolver)
, DataShard(dataShard)
, ChangeServer(changeServer)
, PathId(pathId)
, MemLimit(192_KB)
, MemUsage(0)
{
}

void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev,
void TBaseChangeSender::RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev,
const TActorContext& ctx)
{
const auto& cgi = ev->Get()->Cgi();
Expand Down Expand Up @@ -468,7 +470,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TStringStream html;

HTML(html) {
Header(html, TStringBuilder() << type << " change sender", DataShard.TabletId);
Header(html, "Change sender", tabletId);

SimplePanel(html, "Info", [this](IOutputStream& html) {
HTML(html) {
Expand All @@ -479,7 +481,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
}
});

SimplePanel(html, "Partition senders", [this](IOutputStream& html) {
SimplePanel(html, "Partition senders", [this, tabletId](IOutputStream& html) {
HTML(html) {
TABLE_CLASS("table table-hover") {
TABLEHEAD() {
Expand All @@ -503,7 +505,7 @@ void TBaseChangeSender::RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon
TABLED() { html << sender.Pending.size(); }
TABLED() { html << sender.Prepared.size(); }
TABLED() { html << sender.Broadcasting.size(); }
TABLED() { ActorLink(html, DataShard.TabletId, PathId, partitionId); }
TABLED() { ActorLink(html, tabletId, PathId, partitionId); }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#pragma once

#include "change_exchange.h"
#include "change_exchange_helpers.h"

#include <ydb/core/base/appdata.h>
#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/mon.h>
Expand All @@ -14,8 +11,7 @@
#include <util/generic/set.h>
#include <util/string/builder.h>

namespace NKikimr {
namespace NDataShard {
namespace NKikimr::NChangeExchange {

struct TEvChangeExchangePrivate {
enum EEv {
Expand Down Expand Up @@ -61,8 +57,8 @@ class IChangeSender {
virtual IActor* CreateSender(ui64 partitionId) = 0;
virtual void RemoveRecords() = 0;

virtual void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
virtual void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) = 0;
virtual void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) = 0;
virtual void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) = 0;
virtual void ForgetRecords(TVector<ui64>&& records) = 0;
virtual void OnReady(ui64 partitionId) = 0;
virtual void OnGone(ui64 partitionId) = 0;
Expand All @@ -75,18 +71,18 @@ class IChangeSenderResolver {
virtual void Resolve() = 0;
virtual bool IsResolving() const = 0;
virtual bool IsResolved() const = 0;
virtual ui64 GetPartitionId(NChangeExchange::IChangeRecord::TPtr record) const = 0;
virtual ui64 GetPartitionId(IChangeRecord::TPtr record) const = 0;
};

class TBaseChangeSender: public IChangeSender {
using TEnqueuedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
using TRequestedRecord = NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo;
using TEnqueuedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;
using TRequestedRecord = TEvChangeExchange::TEvRequestRecords::TRecordInfo;

struct TSender {
TActorId ActorId;
bool Ready = false;
TVector<TEnqueuedRecord> Pending;
TVector<NChangeExchange::IChangeRecord::TPtr> Prepared;
TVector<IChangeRecord::TPtr> Prepared;
TVector<ui64> Broadcasting;
};

Expand All @@ -108,7 +104,7 @@ class TBaseChangeSender: public IChangeSender {
void SendPreparedRecords(ui64 partitionId);
void ReEnqueueRecords(const TSender& sender);

TBroadcast& EnsureBroadcast(NChangeExchange::IChangeRecord::TPtr record);
TBroadcast& EnsureBroadcast(IChangeRecord::TPtr record);
bool AddBroadcastPartition(ui64 order, ui64 partitionId);
bool RemoveBroadcastPartition(ui64 order, ui64 partitionId);
bool CompleteBroadcastPartition(ui64 order, ui64 partitionId);
Expand All @@ -124,35 +120,35 @@ class TBaseChangeSender: public IChangeSender {
remove.push_back(record.Order);
}

ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(remove)));
}

template <>
void RemoveRecords(TVector<ui64>&& records) {
ActorOps->Send(DataShard.ActorId, new NChangeExchange::TEvChangeExchange::TEvRemoveRecords(std::move(records)));
ActorOps->Send(ChangeServer, new TEvChangeExchange::TEvRemoveRecords(std::move(records)));
}

void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) override;
void KillSenders() override;
void RemoveRecords() override;

void EnqueueRecords(TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
void ProcessRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records) override;
void EnqueueRecords(TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo>&& records) override;
void ProcessRecords(TVector<IChangeRecord::TPtr>&& records) override;
void ForgetRecords(TVector<ui64>&& records) override;
void OnReady(ui64 partitionId) override;
void OnGone(ui64 partitionId) override;

explicit TBaseChangeSender(IActorOps* actorOps, IChangeSenderResolver* resolver,
const TDataShardId& dataShard, const TPathId& pathId);
const TActorId& changeServer, const TPathId& pathId);

void RenderHtmlPage(TEvChangeExchange::ESenderType type, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);
void RenderHtmlPage(ui64 tabletId, NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx);

private:
IActorOps* const ActorOps;
IChangeSenderResolver* const Resolver;

protected:
const TDataShardId DataShard;
const TActorId ChangeServer;
const TPathId PathId;

private:
Expand All @@ -162,12 +158,11 @@ class TBaseChangeSender: public IChangeSender {
THashMap<ui64, TSender> Senders; // ui64 is partition id
TSet<TEnqueuedRecord> Enqueued;
TSet<TRequestedRecord> PendingBody;
TMap<ui64, NChangeExchange::IChangeRecord::TPtr> PendingSent; // ui64 is order
TMap<ui64, IChangeRecord::TPtr> PendingSent; // ui64 is order
THashMap<ui64, TBroadcast> Broadcasting; // ui64 is order

TVector<ui64> GonePartitions;

}; // TBaseChangeSender

} // NDataShard
} // NKikimr
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <util/string/printf.h>
#include <util/string/split.h>

namespace NKikimr::NDataShard {
namespace NKikimr::NChangeExchange {

void Panel(IOutputStream& str, std::function<void(IOutputStream&)> title, std::function<void(IOutputStream&)> body) {
HTML(str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <util/generic/maybe.h>

namespace NKikimr::NDataShard {
namespace NKikimr::NChangeExchange {

template <typename T>
static void Link(IOutputStream& str, const TStringBuf path, const T& title) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ LIBRARY()
SRCS(
change_exchange.cpp
change_record.cpp
change_sender_common_ops.cpp
change_sender_monitoring.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)

PEERDIR(
ydb/core/base
ydb/core/scheme
ydb/library/actors/core
ydb/library/yverify_stream
library/cpp/monlib/service/pages
)

YQL_LAST_ABI_VERSION()
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/datashard/change_exchange_split.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "change_exchange.h"
#include "change_exchange_helpers.h"
#include "change_sender_common_ops.h"
#include "datashard_impl.h"

#include <ydb/core/base/tablet_pipe.h>
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/datashard/change_sender.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#include "change_exchange.h"
#include "change_exchange_impl.h"
#include "change_sender_monitoring.h"
#include "datashard_impl.h"

#include <ydb/core/change_exchange/change_exchange.h>
#include <ydb/core/change_exchange/change_sender_monitoring.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
Expand Down Expand Up @@ -165,6 +165,8 @@ class TChangeSender: public TActor<TChangeSender> {
}

void Handle(NMon::TEvRemoteHttpInfo::TPtr& ev, const TActorContext& ctx) {
using namespace NChangeExchange;

const auto& cgi = ev->Get()->Cgi();
if (const auto& str = cgi.Get("pathId")) {
if (const auto& pathId = ParsePathId(str)) {
Expand Down
Loading