Skip to content

Commit da13271

Browse files
authored
Merge 3516a42 into b5dae67
2 parents b5dae67 + 3516a42 commit da13271

File tree

11 files changed

+133
-146
lines changed

11 files changed

+133
-146
lines changed

ydb/core/change_exchange/change_sender_common_ops.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -413,20 +413,30 @@ class TBaseChangeSender {
413413
}
414414

415415
TActorId GetChangeServer() const { return ChangeServer; }
416-
void CreateSenders(const TVector<ui64>& partitionIds, bool partitioningChanged = true) {
417-
if (partitioningChanged) {
416+
417+
private:
418+
void CreateSendersImpl(const TVector<ui64>& partitionIds) {
419+
if (partitionIds) {
418420
CreateMissingSenders(partitionIds);
419421
} else {
420-
RecreateSenders(GonePartitions);
422+
RecreateSenders(std::exchange(GonePartitions, {}));
421423
}
422424

423-
GonePartitions.clear();
424-
425425
if (!Enqueued || !RequestRecords()) {
426426
SendRecords();
427427
}
428428
}
429429

430+
protected:
431+
void CreateSenders(const TVector<ui64>& partitionIds) {
432+
Y_ABORT_UNLESS(partitionIds);
433+
CreateSendersImpl(partitionIds);
434+
}
435+
436+
void CreateSenders() {
437+
CreateSendersImpl({});
438+
}
439+
430440
void KillSenders() {
431441
for (const auto& [_, sender] : std::exchange(Senders, {})) {
432442
if (sender.ActorId) {

ydb/core/change_exchange/util.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#include "util.h"
2+
3+
namespace NKikimr::NChangeExchange {
4+
5+
TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
6+
TVector<ui64> result(::Reserve(partitions.size()));
7+
8+
for (const auto& partition : partitions) {
9+
result.push_back(partition.ShardId);
10+
}
11+
12+
return result;
13+
}
14+
15+
}

ydb/core/change_exchange/util.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <ydb/core/scheme/scheme_tabledefs.h>
4+
5+
namespace NKikimr::NChangeExchange {
6+
7+
TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions);
8+
9+
}

ydb/core/change_exchange/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ SRCS(
44
change_exchange.cpp
55
change_record.cpp
66
change_sender_monitoring.cpp
7+
util.cpp
78
)
89

910
GENERATE_ENUM_SERIALIZATION(change_record.h)

ydb/core/tx/datashard/change_sender_async_index.cpp

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/core/base/tablet_pipecache.h>
77
#include <ydb/core/change_exchange/change_sender_common_ops.h>
88
#include <ydb/core/change_exchange/change_sender_monitoring.h>
9+
#include <ydb/core/change_exchange/util.h>
910
#include <ydb/core/tablet_flat/flat_row_eggs.h>
1011
#include <ydb/core/tx/scheme_cache/helpers.h>
1112
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -435,16 +436,6 @@ class TAsyncIndexChangeSenderMain
435436
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogWarnAndRetry, entry, expected);
436437
}
437438

438-
static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
439-
TVector<ui64> result(Reserve(partitions.size()));
440-
441-
for (const auto& partition : partitions) {
442-
result.push_back(partition.ShardId); // partition = shard
443-
}
444-
445-
return result;
446-
}
447-
448439
/// ResolveUserTable
449440

450441
void ResolveUserTable() {
@@ -611,6 +602,11 @@ class TAsyncIndexChangeSenderMain
611602
return;
612603
}
613604

605+
if (IndexTableVersion && IndexTableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
606+
CreateSenders();
607+
return Become(&TThis::StateMain);
608+
}
609+
614610
TagMap.clear();
615611
TVector<NScheme::TTypeInfo> keyColumnTypes;
616612

@@ -692,11 +688,9 @@ class TAsyncIndexChangeSenderMain
692688
return Retry();
693689
}
694690

695-
const bool versionChanged = !IndexTableVersion || IndexTableVersion != entry.GeneralVersion;
696691
IndexTableVersion = entry.GeneralVersion;
697-
698692
KeyDesc = std::move(entry.KeyDescription);
699-
CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
693+
CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));
700694

701695
Become(&TThis::StateMain);
702696
}

ydb/core/tx/datashard/change_sender_cdc_stream.cpp

Lines changed: 15 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
#include <ydb/core/change_exchange/change_sender_common_ops.h>
88
#include <ydb/core/change_exchange/change_sender_monitoring.h>
9-
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
9+
#include <ydb/core/change_exchange/util.h>
1010
#include <ydb/core/persqueue/writer/source_id_encoding.h>
1111
#include <ydb/core/persqueue/writer/writer.h>
1212
#include <ydb/core/tx/scheme_cache/helpers.h>
@@ -300,45 +300,6 @@ class TCdcChangeSenderMain
300300
, public NChangeExchange::ISenderFactory
301301
, private NSchemeCache::TSchemeCacheHelpers
302302
{
303-
struct TPQPartitionInfo {
304-
ui32 PartitionId;
305-
ui64 ShardId;
306-
TPartitionKeyRange KeyRange;
307-
308-
struct TLess {
309-
TConstArrayRef<NScheme::TTypeInfo> Schema;
310-
311-
TLess(const TVector<NScheme::TTypeInfo>& schema)
312-
: Schema(schema)
313-
{
314-
}
315-
316-
bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const {
317-
Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound);
318-
319-
if (!lhs.KeyRange.ToBound) {
320-
return false;
321-
}
322-
323-
if (!rhs.KeyRange.ToBound) {
324-
return true;
325-
}
326-
327-
Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound);
328-
329-
const int compares = CompareTypedCellVectors(
330-
lhs.KeyRange.ToBound->GetCells().data(),
331-
rhs.KeyRange.ToBound->GetCells().data(),
332-
Schema.data(), Schema.size()
333-
);
334-
335-
return (compares < 0);
336-
}
337-
338-
}; // TLess
339-
340-
}; // TPQPartitionInfo
341-
342303
TStringBuf GetLogPrefix() const {
343304
if (!LogPrefix) {
344305
LogPrefix = TStringBuilder()
@@ -430,16 +391,6 @@ class TCdcChangeSenderMain
430391
return false;
431392
}
432393

433-
static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
434-
TVector<ui64> result(Reserve(partitions.size()));
435-
436-
for (const auto& partition : partitions) {
437-
result.push_back(partition.ShardId);
438-
}
439-
440-
return result;
441-
}
442-
443394
/// ResolveCdcStream
444395

445396
void ResolveCdcStream() {
@@ -561,77 +512,28 @@ class TCdcChangeSenderMain
561512
return;
562513
}
563514

515+
const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
516+
if (TopicVersion && TopicVersion == topicVersion) {
517+
CreateSenders();
518+
return Become(&TThis::StateMain);
519+
}
520+
521+
TopicVersion = topicVersion;
522+
564523
const auto& pqDesc = entry.PQGroupInfo->Description;
565524
const auto& pqConfig = pqDesc.GetPQTabletConfig();
566525

567-
TVector<NScheme::TTypeInfo> schema;
568526
PartitionToShard.clear();
569-
570-
schema.reserve(pqConfig.PartitionKeySchemaSize());
571-
for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) {
572-
// TODO: support pg types
573-
schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId()));
574-
}
575-
576-
TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema);
577-
THashSet<ui64> shards;
578-
579527
for (const auto& partition : pqDesc.GetPartitions()) {
580-
const auto partitionId = partition.GetPartitionId();
581-
const auto shardId = partition.GetTabletId();
582-
583-
PartitionToShard.emplace(partitionId, shardId);
584-
585-
auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange());
586-
Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size());
587-
Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size());
588-
589-
partitions.insert({partitionId, shardId, std::move(keyRange)});
590-
shards.insert(shardId);
591-
}
592-
593-
// used to validate
594-
bool isFirst = true;
595-
const TPQPartitionInfo* prev = nullptr;
596-
597-
TVector<NKikimr::TKeyDesc::TPartitionInfo> partitioning;
598-
partitioning.reserve(partitions.size());
599-
for (const auto& cur : partitions) {
600-
if (isFirst) {
601-
isFirst = false;
602-
Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined());
603-
} else {
604-
Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined());
605-
Y_ABORT_UNLESS(prev);
606-
Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined());
607-
// TODO: compare cells
608-
}
609-
610-
auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning
611-
612-
if (cur.KeyRange.ToBound) {
613-
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{
614-
.EndKeyPrefix = *cur.KeyRange.ToBound,
615-
};
616-
} else {
617-
part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{};
618-
}
619-
620-
prev = &cur;
528+
PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId());
621529
}
622530

623-
if (prev) {
624-
Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined());
625-
}
626-
627-
const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion();
628-
const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
629-
TopicVersion = topicVersion;
630-
631-
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema);
632-
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(std::move(partitioning));
531+
Y_ABORT_UNLESS(entry.PQGroupInfo->Schema);
532+
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema);
533+
Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning);
534+
KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning);
633535

634-
CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged);
536+
CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning));
635537
Become(&TThis::StateMain);
636538
}
637539

ydb/core/tx/datashard/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ PEERDIR(
244244
ydb/core/formats
245245
ydb/core/io_formats/ydb_dump
246246
ydb/core/kqp/runtime
247-
ydb/core/persqueue/partition_key_range
248247
ydb/core/persqueue/writer
249248
ydb/core/protos
250249
ydb/core/tablet

ydb/core/tx/replication/service/table_writer_impl.h

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <ydb/core/base/tablet_pipecache.h>
88
#include <ydb/core/change_exchange/change_sender_common_ops.h>
9+
#include <ydb/core/change_exchange/util.h>
910
#include <ydb/core/scheme/scheme_tabledefs.h>
1011
#include <ydb/core/tablet_flat/flat_row_eggs.h>
1112
#include <ydb/core/tx/datashard/datashard.h>
@@ -278,16 +279,6 @@ class TLocalTableWriter
278279
return Check(&TSchemeCacheHelpers::CheckEntryKind<T>, &TThis::LogCritAndLeave, entry, expected);
279280
}
280281

281-
static TVector<ui64> MakePartitionIds(const TVector<TKeyDesc::TPartitionInfo>& partitions) {
282-
TVector<ui64> result(::Reserve(partitions.size()));
283-
284-
for (const auto& partition : partitions) {
285-
result.push_back(partition.ShardId);
286-
}
287-
288-
return result;
289-
}
290-
291282
void Registered(TActorSystem*, const TActorId&) override {
292283
this->ChangeServer = this->SelfId();
293284
}
@@ -348,6 +339,12 @@ class TLocalTableWriter
348339
return;
349340
}
350341

342+
if (TableVersion && TableVersion == entry.Self->Info.GetVersion().GetGeneralVersion()) {
343+
Y_ABORT_UNLESS(Initialized);
344+
Resolving = false;
345+
return this->CreateSenders();
346+
}
347+
351348
auto schema = MakeIntrusive<TLightweightSchema>();
352349
if (entry.Self && entry.Self->Info.HasVersion()) {
353350
schema->Version = entry.Self->Info.GetVersion().GetTableSchemaVersion();
@@ -415,11 +412,9 @@ class TLocalTableWriter
415412
return LogWarnAndRetry("Empty partitions");
416413
}
417414

418-
const bool versionChanged = !TableVersion || TableVersion != entry.GeneralVersion;
419415
TableVersion = entry.GeneralVersion;
420-
421416
KeyDesc = std::move(entry.KeyDescription);
422-
this->CreateSenders(MakePartitionIds(KeyDesc->GetPartitions()), versionChanged);
417+
this->CreateSenders(NChangeExchange::MakePartitionIds(KeyDesc->GetPartitions()));
423418

424419
if (!Initialized) {
425420
this->Send(Worker, new TEvWorker::TEvHandshake());

0 commit comments

Comments
 (0)