Skip to content

(refactoring) Split TEvChangeExchange into two parts: common & DS KIKIMR-20673 #1152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents {
ES_HEALTH_CHECK,
ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212
ES_YQ, // 4213
ES_CHANGE_EXCHANGE,
ES_CHANGE_EXCHANGE_DATASHARD,
ES_DATABASE_SERVICE, //4215
ES_SEQUENCESHARD, // 4216
ES_SEQUENCEPROXY, // 4217
Expand Down Expand Up @@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents {
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
};
};

Expand Down
125 changes: 125 additions & 0 deletions ydb/core/change_exchange/change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include "change_exchange.h"

#include <util/string/builder.h>
#include <util/string/join.h>

namespace NKikimr::NChangeExchange {

/// TEvEnqueueRecords
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
: Order(order)
, PathId(pathId)
, BodySize(bodySize)
{
}

void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " PathId: " << PathId
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRequestRecords
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRequestRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
: Order(order)
, BodySize(bodySize)
{
}

bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
return Order < rhs.Order;
}

void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRemoveRecords
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvRecords
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvForgetRecords
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvForgetRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

}
101 changes: 101 additions & 0 deletions ydb/core/change_exchange/change_exchange.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#pragma once

#include "change_record.h"

#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>
#include <ydb/core/scheme/scheme_pathid.h>

#include <util/generic/vector.h>

namespace NKikimr::NChangeExchange {

struct TEvChangeExchange {
enum EEv {
// Enqueue for sending
EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE),
// Request change record(s) by id
EvRequestRecords,
// Change record(s)
EvRecords,
// Remove change record(s) from local database
EvRemoveRecords,
// Already removed records that the sender should forget about
EvForgetRecods,

EvEnd,
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE));

struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> {
struct TRecordInfo {
ui64 Order;
TPathId PathId;
ui64 BodySize;

TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize);

void Out(IOutputStream& out) const;
};

TVector<TRecordInfo> Records;

explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records);
explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records);
TString ToString() const override;
};

struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> {
struct TRecordInfo {
ui64 Order;
ui64 BodySize;

TRecordInfo(ui64 order, ui64 bodySize = 0);

bool operator<(const TRecordInfo& rhs) const;
void Out(IOutputStream& out) const;
};

TVector<TRecordInfo> Records;

explicit TEvRequestRecords(const TVector<TRecordInfo>& records);
explicit TEvRequestRecords(TVector<TRecordInfo>&& records);
TString ToString() const override;
};

struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> {
TVector<ui64> Records;

explicit TEvRemoveRecords(const TVector<ui64>& records);
explicit TEvRemoveRecords(TVector<ui64>&& records);
TString ToString() const override;
};

struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
TVector<IChangeRecord::TPtr> Records;

explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records);
explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records);
TString ToString() const override;
};

struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
TVector<ui64> Records;

explicit TEvForgetRecords(const TVector<ui64>& records);
explicit TEvForgetRecords(TVector<ui64>&& records);
TString ToString() const override;
};

}; // TEvChangeExchange

}

Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) {
return x.Out(o);
}

Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) {
return x.Out(o);
}
6 changes: 6 additions & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
LIBRARY()

SRCS(
change_exchange.cpp
change_record.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)

PEERDIR(
ydb/core/base
ydb/core/scheme
)

YQL_LAST_ABI_VERSION()

END()
124 changes: 2 additions & 122 deletions ydb/core/tx/datashard/change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -1,127 +1,8 @@
#include "change_exchange.h"

#include <util/string/builder.h>
#include <util/string/join.h>

namespace NKikimr {
namespace NDataShard {

/// TEvEnqueueRecords
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
: Order(order)
, PathId(pathId)
, BodySize(bodySize)
{
}

void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " PathId: " << PathId
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRequestRecords
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRequestRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
: Order(order)
, BodySize(bodySize)
{
}

bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
return Order < rhs.Order;
}

void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRemoveRecords
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvRecords
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRecords::TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvForgetRecords
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvForgetRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}
namespace NKikimr::NDataShard {

/// TEvAddSender
TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId)
Expand Down Expand Up @@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const {
<< " }";
}

} // NDataShard
} // NKikimr
}
Loading