|
6 | 6 |
|
7 | 7 | #include <ydb/core/change_exchange/change_sender_common_ops.h>
|
8 | 8 | #include <ydb/core/change_exchange/change_sender_monitoring.h>
|
9 |
| -#include <ydb/core/persqueue/partition_key_range/partition_key_range.h> |
| 9 | +#include <ydb/core/change_exchange/util.h> |
10 | 10 | #include <ydb/core/persqueue/writer/source_id_encoding.h>
|
11 | 11 | #include <ydb/core/persqueue/writer/writer.h>
|
12 | 12 | #include <ydb/core/tx/scheme_cache/helpers.h>
|
@@ -300,45 +300,6 @@ class TCdcChangeSenderMain
|
300 | 300 | , public NChangeExchange::ISenderFactory
|
301 | 301 | , private NSchemeCache::TSchemeCacheHelpers
|
302 | 302 | {
|
303 |
| - struct TPQPartitionInfo { |
304 |
| - ui32 PartitionId; |
305 |
| - ui64 ShardId; |
306 |
| - TPartitionKeyRange KeyRange; |
307 |
| - |
308 |
| - struct TLess { |
309 |
| - TConstArrayRef<NScheme::TTypeInfo> Schema; |
310 |
| - |
311 |
| - TLess(const TVector<NScheme::TTypeInfo>& schema) |
312 |
| - : Schema(schema) |
313 |
| - { |
314 |
| - } |
315 |
| - |
316 |
| - bool operator()(const TPQPartitionInfo& lhs, const TPQPartitionInfo& rhs) const { |
317 |
| - Y_ABORT_UNLESS(lhs.KeyRange.ToBound || rhs.KeyRange.ToBound); |
318 |
| - |
319 |
| - if (!lhs.KeyRange.ToBound) { |
320 |
| - return false; |
321 |
| - } |
322 |
| - |
323 |
| - if (!rhs.KeyRange.ToBound) { |
324 |
| - return true; |
325 |
| - } |
326 |
| - |
327 |
| - Y_ABORT_UNLESS(lhs.KeyRange.ToBound && rhs.KeyRange.ToBound); |
328 |
| - |
329 |
| - const int compares = CompareTypedCellVectors( |
330 |
| - lhs.KeyRange.ToBound->GetCells().data(), |
331 |
| - rhs.KeyRange.ToBound->GetCells().data(), |
332 |
| - Schema.data(), Schema.size() |
333 |
| - ); |
334 |
| - |
335 |
| - return (compares < 0); |
336 |
| - } |
337 |
| - |
338 |
| - }; // TLess |
339 |
| - |
340 |
| - }; // TPQPartitionInfo |
341 |
| - |
342 | 303 | TStringBuf GetLogPrefix() const {
|
343 | 304 | if (!LogPrefix) {
|
344 | 305 | LogPrefix = TStringBuilder()
|
@@ -430,16 +391,6 @@ class TCdcChangeSenderMain
|
430 | 391 | return false;
|
431 | 392 | }
|
432 | 393 |
|
433 |
| - static TVector<ui64> MakePartitionIds(const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) { |
434 |
| - TVector<ui64> result(Reserve(partitions.size())); |
435 |
| - |
436 |
| - for (const auto& partition : partitions) { |
437 |
| - result.push_back(partition.ShardId); |
438 |
| - } |
439 |
| - |
440 |
| - return result; |
441 |
| - } |
442 |
| - |
443 | 394 | /// ResolveCdcStream
|
444 | 395 |
|
445 | 396 | void ResolveCdcStream() {
|
@@ -561,77 +512,28 @@ class TCdcChangeSenderMain
|
561 | 512 | return;
|
562 | 513 | }
|
563 | 514 |
|
| 515 | + const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); |
| 516 | + if (TopicVersion && TopicVersion == topicVersion) { |
| 517 | + CreateSenders(); |
| 518 | + return Become(&TThis::StateMain); |
| 519 | + } |
| 520 | + |
| 521 | + TopicVersion = topicVersion; |
| 522 | + |
564 | 523 | const auto& pqDesc = entry.PQGroupInfo->Description;
|
565 | 524 | const auto& pqConfig = pqDesc.GetPQTabletConfig();
|
566 | 525 |
|
567 |
| - TVector<NScheme::TTypeInfo> schema; |
568 | 526 | PartitionToShard.clear();
|
569 |
| - |
570 |
| - schema.reserve(pqConfig.PartitionKeySchemaSize()); |
571 |
| - for (const auto& keySchema : pqConfig.GetPartitionKeySchema()) { |
572 |
| - // TODO: support pg types |
573 |
| - schema.push_back(NScheme::TTypeInfo(keySchema.GetTypeId())); |
574 |
| - } |
575 |
| - |
576 |
| - TSet<TPQPartitionInfo, TPQPartitionInfo::TLess> partitions(schema); |
577 |
| - THashSet<ui64> shards; |
578 |
| - |
579 | 527 | for (const auto& partition : pqDesc.GetPartitions()) {
|
580 |
| - const auto partitionId = partition.GetPartitionId(); |
581 |
| - const auto shardId = partition.GetTabletId(); |
582 |
| - |
583 |
| - PartitionToShard.emplace(partitionId, shardId); |
584 |
| - |
585 |
| - auto keyRange = TPartitionKeyRange::Parse(partition.GetKeyRange()); |
586 |
| - Y_ABORT_UNLESS(!keyRange.FromBound || keyRange.FromBound->GetCells().size() == schema.size()); |
587 |
| - Y_ABORT_UNLESS(!keyRange.ToBound || keyRange.ToBound->GetCells().size() == schema.size()); |
588 |
| - |
589 |
| - partitions.insert({partitionId, shardId, std::move(keyRange)}); |
590 |
| - shards.insert(shardId); |
591 |
| - } |
592 |
| - |
593 |
| - // used to validate |
594 |
| - bool isFirst = true; |
595 |
| - const TPQPartitionInfo* prev = nullptr; |
596 |
| - |
597 |
| - TVector<NKikimr::TKeyDesc::TPartitionInfo> partitioning; |
598 |
| - partitioning.reserve(partitions.size()); |
599 |
| - for (const auto& cur : partitions) { |
600 |
| - if (isFirst) { |
601 |
| - isFirst = false; |
602 |
| - Y_ABORT_UNLESS(!cur.KeyRange.FromBound.Defined()); |
603 |
| - } else { |
604 |
| - Y_ABORT_UNLESS(cur.KeyRange.FromBound.Defined()); |
605 |
| - Y_ABORT_UNLESS(prev); |
606 |
| - Y_ABORT_UNLESS(prev->KeyRange.ToBound.Defined()); |
607 |
| - // TODO: compare cells |
608 |
| - } |
609 |
| - |
610 |
| - auto& part = partitioning.emplace_back(cur.PartitionId); // TODO: double-check that it is right partitioning |
611 |
| - |
612 |
| - if (cur.KeyRange.ToBound) { |
613 |
| - part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{ |
614 |
| - .EndKeyPrefix = *cur.KeyRange.ToBound, |
615 |
| - }; |
616 |
| - } else { |
617 |
| - part.Range = NKikimr::TKeyDesc::TPartitionRangeInfo{}; |
618 |
| - } |
619 |
| - |
620 |
| - prev = &cur; |
| 528 | + PartitionToShard.emplace(partition.GetPartitionId(), partition.GetTabletId()); |
621 | 529 | }
|
622 | 530 |
|
623 |
| - if (prev) { |
624 |
| - Y_ABORT_UNLESS(!prev->KeyRange.ToBound.Defined()); |
625 |
| - } |
626 |
| - |
627 |
| - const auto topicVersion = entry.Self->Info.GetVersion().GetGeneralVersion(); |
628 |
| - const bool versionChanged = !TopicVersion || TopicVersion != topicVersion; |
629 |
| - TopicVersion = topicVersion; |
630 |
| - |
631 |
| - KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(schema); |
632 |
| - KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(std::move(partitioning)); |
| 531 | + Y_ABORT_UNLESS(entry.PQGroupInfo->Schema); |
| 532 | + KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc(entry.PQGroupInfo->Schema); |
| 533 | + Y_ABORT_UNLESS(entry.PQGroupInfo->Partitioning); |
| 534 | + KeyDesc->Partitioning = std::make_shared<TVector<NKikimr::TKeyDesc::TPartitionInfo>>(entry.PQGroupInfo->Partitioning); |
633 | 535 |
|
634 |
| - CreateSenders(MakePartitionIds(*KeyDesc->Partitioning), versionChanged); |
| 536 | + CreateSenders(NChangeExchange::MakePartitionIds(*KeyDesc->Partitioning)); |
635 | 537 | Become(&TThis::StateMain);
|
636 | 538 | }
|
637 | 539 |
|
|
0 commit comments