diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 88e94433c963..28309bb9c216 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -1700,6 +1700,8 @@ void CDCServiceImpl::GetChanges( if (is_replication_paused_for_stream && VLOG_IS_ON(1)) { YB_LOG_EVERY_N_SECS(INFO, 300) << "Replication is paused from the producer for stream: " << req->stream_id(); + // Below log line used in tests to detect when streams are paused. + VLOG(3) << "Replication is paused from the producer for stream: " << req->stream_id(); } // Returning success to slow down polling on the consumer side while replication is paused or // early exit for testing purpose. diff --git a/src/yb/cdc/cdc_service.h b/src/yb/cdc/cdc_service.h index e8f259bb129a..01f823e4cbec 100644 --- a/src/yb/cdc/cdc_service.h +++ b/src/yb/cdc/cdc_service.h @@ -104,6 +104,7 @@ using TableIdToStreamIdMap = std::unordered_map>>; using RollBackTabletIdCheckpointMap = std::unordered_map>; + class CDCServiceImpl : public CDCServiceIf { public: CDCServiceImpl( diff --git a/src/yb/cdc/xcluster_types.cc b/src/yb/cdc/xcluster_types.cc index 03cbaa7c0b45..1346d2c356f1 100644 --- a/src/yb/cdc/xcluster_types.cc +++ b/src/yb/cdc/xcluster_types.cc @@ -20,9 +20,7 @@ namespace yb::xcluster { std::string ProducerTabletInfo::ToString() const { - return Format( - "{ replication_group_id: $0 stream_id: $1 tablet_id: $2 }", replication_group_id, stream_id, - tablet_id); + return YB_STRUCT_TO_STRING(replication_group_id, stream_id, tablet_id, table_id); } std::size_t ProducerTabletInfo::Hash::operator()(const ProducerTabletInfo& p) const noexcept { @@ -30,6 +28,7 @@ std::size_t ProducerTabletInfo::Hash::operator()(const ProducerTabletInfo& p) co boost::hash_combine(hash, p.replication_group_id); boost::hash_combine(hash, p.stream_id); boost::hash_combine(hash, p.tablet_id); + boost::hash_combine(hash, p.table_id); return hash; } diff --git a/src/yb/cdc/xcluster_types.h b/src/yb/cdc/xcluster_types.h index edd3d6b29e26..cbc9c8f7fc59 100644 --- a/src/yb/cdc/xcluster_types.h +++ b/src/yb/cdc/xcluster_types.h @@ -42,10 +42,11 @@ struct ProducerTabletInfo { // Unique ID on Producer, but not on Consumer. xrepl::StreamId stream_id; TabletId tablet_id; + TableId table_id; bool operator==(const ProducerTabletInfo& other) const { return replication_group_id == other.replication_group_id && stream_id == other.stream_id && - tablet_id == other.tablet_id; + tablet_id == other.tablet_id && table_id == other.table_id; } std::string ToString() const; diff --git a/src/yb/common/xcluster_util.cc b/src/yb/common/xcluster_util.cc index a66b0c66d4be..384132e2b6b6 100644 --- a/src/yb/common/xcluster_util.cc +++ b/src/yb/common/xcluster_util.cc @@ -27,6 +27,10 @@ constexpr char kSequencesDataAliasTableIdMid[] = ".sequences_data_for."; // How many characters a normal TableId (e.g., no suffixes) takes up. constexpr int kTableIdSize = 32; + +constexpr std::string_view kTabletIdColumnSequencePrefix = "sequence."; +constexpr std::string_view kTabletIdColumnSeparator = "."; + } // namespace ReplicationGroupId GetAlterReplicationGroupId(const ReplicationGroupId& replication_group_id) { @@ -75,4 +79,91 @@ Result GetReplicationNamespaceBelongsTo(const TableId& table_id) { return table_id.substr(kTableIdSize + strlen(kSequencesDataAliasTableIdMid)); } +Result SafeTimeTablePK::FromProducerTabletInfo(const ProducerTabletInfo& info) { + return SafeTimeTablePK::FromProducerTabletInfo( + info.replication_group_id, info.tablet_id, info.table_id); +} + +Result SafeTimeTablePK::FromProducerTabletInfo( + const xcluster::ReplicationGroupId& replication_group_id, const std::string& producer_tablet_id, + const std::string& producer_table_id) { + SafeTimeTablePK result; + result.replication_group_id_ = replication_group_id; + result.tablet_id_ = producer_tablet_id; + if (IsSequencesDataAlias(producer_table_id)) { + result.sequences_data_namespace_id_ = + VERIFY_RESULT(GetReplicationNamespaceBelongsTo(producer_table_id)); + } + return result; +} + +Result SafeTimeTablePK::FromSafeTimeTableRow( + const xcluster::ReplicationGroupId& replication_group_id_column_value, + const std::string& tablet_id_column_value) { + SafeTimeTablePK result; + result.replication_group_id_ = replication_group_id_column_value; + RSTATUS_DCHECK( + !result.replication_group_id_.empty(), IllegalState, + "Safe time table replication group ID column is empty"); + if (!tablet_id_column_value.starts_with(kTabletIdColumnSequencePrefix)) { + result.tablet_id_ = tablet_id_column_value; + RSTATUS_DCHECK( + !result.tablet_id_.empty(), IllegalState, "Safe time table tablet ID column is empty"); + } else { + size_t namespace_id_start = kTabletIdColumnSequencePrefix.size(); + size_t separator_position = + tablet_id_column_value.find(kTabletIdColumnSeparator, namespace_id_start); + if (separator_position != std::string::npos) { + result.sequences_data_namespace_id_ = tablet_id_column_value.substr( + namespace_id_start, separator_position - namespace_id_start); + result.tablet_id_ = + tablet_id_column_value.substr(separator_position + kTabletIdColumnSeparator.size()); + } + RSTATUS_DCHECK( + !result.tablet_id_.empty() && !result.sequences_data_namespace_id_.empty(), IllegalState, + Format( + "Safe time table tablet ID column starts with '$0' but is not in the form $0$1$2$3: $4", + kTabletIdColumnSequencePrefix, "", kTabletIdColumnSeparator, + "", tablet_id_column_value)); + } + return result; +} + +xcluster::ReplicationGroupId SafeTimeTablePK::replication_group_id_column_value() const { + return replication_group_id_; +} + +TabletId SafeTimeTablePK::tablet_id_column_value() const { + if (sequences_data_namespace_id_.empty()) { + return tablet_id_; + } + return Format( + "$0$1$2$3", kTabletIdColumnSequencePrefix, sequences_data_namespace_id_, + kTabletIdColumnSeparator, tablet_id_); +} + +xcluster::ReplicationGroupId SafeTimeTablePK::replication_group_id() const { + return replication_group_id_; +} + +TabletId SafeTimeTablePK::tablet_id() const { return tablet_id_; } + +NamespaceId SafeTimeTablePK::TEST_sequences_data_namespace_id() const { + return sequences_data_namespace_id_; +} + +bool SafeTimeTablePK::operator==(const SafeTimeTablePK& rhs) const { + return replication_group_id_ == rhs.replication_group_id_ && tablet_id_ == rhs.tablet_id_ && + sequences_data_namespace_id_ == rhs.sequences_data_namespace_id_; +} + +bool SafeTimeTablePK::operator<(const SafeTimeTablePK& rhs) const { + return std::tie(replication_group_id_, tablet_id_, sequences_data_namespace_id_) < + std::tie(rhs.replication_group_id_, rhs.tablet_id_, rhs.sequences_data_namespace_id_); +} + +std::string SafeTimeTablePK::ToString() const { + return YB_CLASS_TO_STRING(replication_group_id, tablet_id, sequences_data_namespace_id); +} + } // namespace yb::xcluster diff --git a/src/yb/common/xcluster_util.h b/src/yb/common/xcluster_util.h index 80e27f3f43a0..58d0a249a230 100644 --- a/src/yb/common/xcluster_util.h +++ b/src/yb/common/xcluster_util.h @@ -36,4 +36,45 @@ TableId StripSequencesDataAliasIfPresent(const TableId& table_id); // instead returns the ID's namespace as usual. Result GetReplicationNamespaceBelongsTo(const TableId& table_id); +// The primary key used to access safe time information in the safe time table. +class SafeTimeTablePK { + public: + static Result FromProducerTabletInfo(const ProducerTabletInfo& info); + + static Result FromProducerTabletInfo( + const xcluster::ReplicationGroupId& replication_group_id, + const std::string& producer_tablet_id, const std::string& producer_table_id); + + static Result FromSafeTimeTableRow( + const xcluster::ReplicationGroupId& replication_group_id_column_value, + const std::string& tablet_id_column_value); + + // These are what we store in the safe time table's primary key columns. + xcluster::ReplicationGroupId replication_group_id_column_value() const; + std::string tablet_id_column_value() const; + + // These are the actual underlying values. + xcluster::ReplicationGroupId replication_group_id() const; + TabletId tablet_id() const; + // Only tests should access this information directly. + NamespaceId TEST_sequences_data_namespace_id() const; + + bool operator==(const SafeTimeTablePK& rhs) const; + + bool operator<(const SafeTimeTablePK& rhs) const; + + std::string ToString() const; + + private: + xcluster::ReplicationGroupId replication_group_id_; + TabletId tablet_id_; + // If this is a sequences_data tablet, then this holds its producer tablet + // ID sequence alias's replication namespace. Otherwise holds the empty string. + // + // We add this to the primary key to distinguish sequence_data tablets belonging to different + // streams in a backward-compatible way: we want to still be able to decode old primary keys + // (which will not involve sequences_data tablets). + NamespaceId sequences_data_namespace_id_{""}; +}; + } // namespace yb::xcluster diff --git a/src/yb/integration-tests/xcluster/xcluster_sequences-test.cc b/src/yb/integration-tests/xcluster/xcluster_sequences-test.cc index c4e1383eac32..94f2ff2ed479 100644 --- a/src/yb/integration-tests/xcluster/xcluster_sequences-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_sequences-test.cc @@ -18,6 +18,7 @@ #include "yb/common/xcluster_util.h" #include "yb/integration-tests/xcluster/xcluster_ddl_replication_test_base.h" #include "yb/util/flags.h" +#include "yb/util/logging_test_util.h" DECLARE_bool(ysql_enable_packed_row); DECLARE_int32(TEST_xcluster_simulated_lag_ms); @@ -255,4 +256,96 @@ TEST_F(XClusterAutomaticModeTest, SequenceReplicationWithTransform) { ASSERT_OK(VerifySequencesSameOnBothSides(namespace1)); } +TEST_F(XClusterAutomaticModeTest, SequenceSafeTime) { + const std::string namespace1{"yugabyte"}; + ASSERT_OK(SetUpClusters(/*use_different_database_oids=*/false, namespace1)); + + ASSERT_OK(SetUpSequences(&producer_cluster_, namespace1)); + ASSERT_OK(SetUpSequences(&consumer_cluster_, namespace1)); + + std::vector namespaces_to_replicate = {namespace1}; + ASSERT_OK(CheckpointReplicationGroupWithoutRequiringNoBootstrapNeeded(namespaces_to_replicate)); + ASSERT_OK(CreateReplicationFromCheckpoint({}, kReplicationGroupId, namespaces_to_replicate)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); + ASSERT_OK(VerifySequencesSameOnBothSides(namespace1)); + + ASSERT_OK(BumpSequences(&producer_cluster_, namespace1)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); + ASSERT_OK(VerifySequencesSameOnBothSides(namespace1)); + + ASSERT_OK(BumpSequences(&producer_cluster_, namespace1)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); + ASSERT_OK(VerifySequencesSameOnBothSides(namespace1)); +} + +TEST_F(XClusterAutomaticModeTest, SequencePausingAndSafeTime) { + const std::string namespace1{"yugabyte"}; + ASSERT_OK(SetUpClusters(/*use_different_database_oids=*/false, namespace1)); + + ASSERT_OK(SetUpSequences(&producer_cluster_, namespace1)); + ASSERT_OK(SetUpSequences(&consumer_cluster_, namespace1)); + + std::vector namespaces_to_replicate = {namespace1}; + ASSERT_OK(CheckpointReplicationGroupWithoutRequiringNoBootstrapNeeded(namespaces_to_replicate)); + ASSERT_OK(CreateReplicationFromCheckpoint({}, kReplicationGroupId, namespaces_to_replicate)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); + + auto sequences_stream_id = + ASSERT_RESULT(GetCDCStreamID(xcluster::GetSequencesDataAliasForNamespace( + ASSERT_RESULT(GetNamespaceId(producer_client(), namespace_name))))); + ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/true)); + ASSERT_OK( + StringWaiterLogSink("Replication is paused from the producer for stream").WaitFor(300s)); + ASSERT_NOK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); + + ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/false)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); +} + +TEST_F(XClusterAutomaticModeTest, SequencePausingIsolation) { + const std::string namespace1{"yugabyte"}; + const std::string namespace2{"yugabyte2"}; + ASSERT_OK(SetUpClusters(/*use_different_database_oids=*/false, namespace1, namespace2)); + + ASSERT_OK(RunOnBothClusters([&](Cluster* cluster) -> Status { + RETURN_NOT_OK(SetUpSequences(cluster, namespace1)); + return SetUpSequences(cluster, namespace2); + })); + + std::vector namespaces_to_replicate = {namespace1, namespace2}; + ASSERT_OK(CheckpointReplicationGroupWithoutRequiringNoBootstrapNeeded(namespaces_to_replicate)); + ASSERT_OK(CreateReplicationFromCheckpoint({}, kReplicationGroupId, namespaces_to_replicate)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate)); + + auto pause_one_namespace_temporarily = [&](NamespaceName namespace_to_pause, + NamespaceName other_namespace) { + auto namespace_to_pause_id = + ASSERT_RESULT(GetNamespaceId(producer_client(), namespace_to_pause)); + LOG(INFO) << "***** Pausing namespace: " << namespace_to_pause + << " ID: " << namespace_to_pause_id; + auto sequences_stream_id = ASSERT_RESULT( + GetCDCStreamID(xcluster::GetSequencesDataAliasForNamespace(namespace_to_pause_id))); + ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/true)); + ASSERT_OK( + StringWaiterLogSink( + "Replication is paused from the producer for stream: "s + AsString(sequences_stream_id)) + .WaitFor(300s)); + ASSERT_OK(BumpSequences(&producer_cluster_, namespace_to_pause)); + + std::vector paused_namespaces = {namespace_to_pause}; + std::vector unpaused_namespaces = {other_namespace}; + ASSERT_NOK(WaitForSafeTimeToAdvanceToNow(paused_namespaces)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(unpaused_namespaces)); + + LOG(INFO) << "***** Unpausing namespace: " << namespace_to_pause + << " ID: " << namespace_to_pause_id; + ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/false)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(paused_namespaces)); + ASSERT_OK(WaitForSafeTimeToAdvanceToNow(unpaused_namespaces)); + }; + + pause_one_namespace_temporarily(namespace1, namespace2); + pause_one_namespace_temporarily(namespace2, namespace1); +} + } // namespace yb diff --git a/src/yb/master/xcluster/xcluster_safe_time_service-test.cc b/src/yb/master/xcluster/xcluster_safe_time_service-test.cc index bfec9a449a50..0fab1d317407 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service-test.cc +++ b/src/yb/master/xcluster/xcluster_safe_time_service-test.cc @@ -12,16 +12,25 @@ // #include + #include "yb/common/hybrid_time.h" -#include "yb/util/test_util.h" #include "yb/master/xcluster/xcluster_safe_time_service.h" +#include "yb/util/test_util.h" using std::string; +namespace yb::xcluster { +inline std::ostream& operator<<(std::ostream& os, const SafeTimeTablePK& object) { + return os << object.ToString(); +} +} // namespace yb::xcluster + namespace yb { namespace master { using OK = Status::OK; +using SafeTimeTablePK = xcluster::SafeTimeTablePK; + class XClusterSafeTimeServiceMocked : public XClusterSafeTimeService { public: @@ -33,7 +42,7 @@ class XClusterSafeTimeServiceMocked : public XClusterSafeTimeService { ~XClusterSafeTimeServiceMocked() {} - Result> GetSafeTimeFromTable() override + Result> GetSafeTimeFromTable() override REQUIRES(mutex_) { create_table_if_not_found_ = VERIFY_RESULT(CreateTableRequired()); @@ -59,7 +68,7 @@ class XClusterSafeTimeServiceMocked : public XClusterSafeTimeService { return OK(); } - Status CleanupEntriesFromTable(const std::vector& entries_to_delete) override + Status CleanupEntriesFromTable(const std::vector& entries_to_delete) override REQUIRES(mutex_) { entries_to_delete_ = entries_to_delete; for (auto& entry : entries_to_delete_) { @@ -70,40 +79,38 @@ class XClusterSafeTimeServiceMocked : public XClusterSafeTimeService { Result GetLeaderSafeTimeFromCatalogManager() override { return leader_safe_time_; } - void SetDdlQueueTablets(const std::vector& tablet_infos) { + void SetDdlQueueTablets(const std::vector& tablet_infos) { std::lock_guard lock(mutex_); ddl_queue_tablet_ids_.clear(); for (const auto& tablet_info : tablet_infos) { - ddl_queue_tablet_ids_.insert(tablet_info.tablet_id); + ddl_queue_tablet_ids_.insert(tablet_info.tablet_id()); } } - std::map table_entries_; - std::map consumer_registry_; + std::map table_entries_; + std::map consumer_registry_; XClusterNamespaceToSafeTimeMap safe_time_map_; - std::vector entries_to_delete_; + std::vector entries_to_delete_; HybridTime leader_safe_time_; bool create_table_if_not_found_; }; class XClusterSafeTimeServiceTest : public YBTest { public: - using ProducerTabletInfo = XClusterSafeTimeService::ProducerTabletInfo; - const xcluster::ReplicationGroupId replication_group_id = xcluster::ReplicationGroupId("c1"); const NamespaceId db1 = "db1"; const NamespaceId db2 = "db2"; - const ProducerTabletInfo t1 = {replication_group_id, "t1"}; - const ProducerTabletInfo t2 = {replication_group_id, "t2"}; - const ProducerTabletInfo t3 = {replication_group_id, "t3"}; - const ProducerTabletInfo t4 = {replication_group_id, "t4"}; + const SafeTimeTablePK t1 = ConstructSafeTimeTablePK(replication_group_id, "t1"); + const SafeTimeTablePK t2 = ConstructSafeTimeTablePK(replication_group_id, "t2"); + const SafeTimeTablePK t3 = ConstructSafeTimeTablePK(replication_group_id, "t3"); + const SafeTimeTablePK t4 = ConstructSafeTimeTablePK(replication_group_id, "t4"); const HybridTime ht1 = HybridTime(HybridTime::kInitial.ToUint64() + 1); const HybridTime ht2 = HybridTime(ht1.ToUint64() + 1); const HybridTime ht3 = HybridTime(ht2.ToUint64() + 1); const HybridTime ht_invalid = HybridTime::kInvalid; int64_t dummy_leader_term = 1; - std::map default_consumer_registry; + std::map default_consumer_registry; XClusterSafeTimeServiceTest() { default_consumer_registry[t1] = db1; @@ -111,6 +118,13 @@ class XClusterSafeTimeServiceTest : public YBTest { default_consumer_registry[t3] = db2; } + SafeTimeTablePK ConstructSafeTimeTablePK( + const xcluster::ReplicationGroupId& replication_group_id, + const std::string& producer_tablet_id) { + return CHECK_RESULT(SafeTimeTablePK::FromProducerTabletInfo( + replication_group_id, producer_tablet_id, "not_a_sequences_data_alias")); + } + Result ComputeSafeTime(XClusterSafeTimeServiceMocked& safe_time_service) { return safe_time_service.ComputeSafeTime(dummy_leader_term); } @@ -256,7 +270,7 @@ TEST_F(XClusterSafeTimeServiceTest, ComputeSafeTime) { XClusterSafeTimeServiceMocked safe_time_service; safe_time_service.consumer_registry_ = default_consumer_registry; - const ProducerTabletInfo t5 = {replication_group_id, "t5"}; + const SafeTimeTablePK t5 = ConstructSafeTimeTablePK(replication_group_id, "t5"); safe_time_service.table_entries_[t1] = ht2; safe_time_service.table_entries_[t2] = ht2; safe_time_service.table_entries_[t3] = ht2; @@ -274,7 +288,7 @@ TEST_F(XClusterSafeTimeServiceTest, ComputeSafeTimeWithFilters) { safe_time_service.consumer_registry_ = default_consumer_registry; safe_time_service.leader_safe_time_ = ht2; - const ProducerTabletInfo t5 = {replication_group_id, "t5"}; + const SafeTimeTablePK t5 = ConstructSafeTimeTablePK(replication_group_id, "t5"); safe_time_service.consumer_registry_[t4] = db1; safe_time_service.consumer_registry_[t5] = db1; @@ -392,5 +406,80 @@ TEST_F(XClusterSafeTimeServiceTest, ComputeSafeTimeWithFiltersSingleTablet) { ASSERT_EQ(db1_none, ht1); ASSERT_EQ(db1_ddlqueue, ht3); } + +using SafeTimeTablePKTest = YBTest; + +TEST_F(SafeTimeTablePKTest, Creation) { + xcluster::ReplicationGroupId group_id{"replication_group_id"}; + TabletId tablet_id{"tablet_id"}; + + // When the producer table ID is not a sequence alias + { + TableId table_id{"producer_table_id"}; + SafeTimeTablePK input = ASSERT_RESULT( + xcluster::SafeTimeTablePK::FromProducerTabletInfo(group_id, tablet_id, table_id)); + EXPECT_EQ(input.replication_group_id(), group_id); + EXPECT_EQ(input.tablet_id(), tablet_id); + EXPECT_EQ(input.TEST_sequences_data_namespace_id(), ""); + } + + // When the producer table ID is a sequence alias + { + NamespaceId namespace_id = "00004000000030008000000000000000"; + TableId table_id = xcluster::GetSequencesDataAliasForNamespace(namespace_id); + SafeTimeTablePK input = ASSERT_RESULT( + xcluster::SafeTimeTablePK::FromProducerTabletInfo(group_id, tablet_id, table_id)); + EXPECT_EQ(input.replication_group_id(), group_id); + EXPECT_EQ(input.tablet_id(), tablet_id); + EXPECT_EQ(input.TEST_sequences_data_namespace_id(), namespace_id); + } +} + +TEST_F(SafeTimeTablePKTest, Encoding) { + xcluster::ReplicationGroupId group_id{"replication_group_id"}; + TabletId tablet_id{"tablet_id"}; + TableId normal_table_id{"producer_table_id"}; + NamespaceId namespace_id{"00004000000030008000000000000000"}; + TableId sequence_table_id = xcluster::GetSequencesDataAliasForNamespace(namespace_id); + + // When the producer table ID is not a sequence alias + SafeTimeTablePK input1 = ASSERT_RESULT( + xcluster::SafeTimeTablePK::FromProducerTabletInfo(group_id, tablet_id, normal_table_id)); + SafeTimeTablePK output1 = ASSERT_RESULT(xcluster::SafeTimeTablePK::FromSafeTimeTableRow( + input1.replication_group_id_column_value(), input1.tablet_id_column_value())); + EXPECT_EQ(input1, output1); + // Check backwards compatibility. + EXPECT_EQ(input1.tablet_id_column_value(), tablet_id); + + // When the producer table ID is a sequence alias + SafeTimeTablePK input2 = ASSERT_RESULT( + xcluster::SafeTimeTablePK::FromProducerTabletInfo(group_id, tablet_id, sequence_table_id)); + SafeTimeTablePK output2 = ASSERT_RESULT(xcluster::SafeTimeTablePK::FromSafeTimeTableRow( + input2.replication_group_id_column_value(), input2.tablet_id_column_value())); + EXPECT_EQ(input2, output2); + // Check encoded tablet_id column value is pleasing to humans. + EXPECT_EQ(input2.tablet_id_column_value(), Format("sequence.$0.$1", namespace_id, tablet_id)); + + // Verify we can distinguish the two cases. + EXPECT_NE(output1, output2); +} + +TEST_F(SafeTimeTablePKTest, BadEncoding) { + xcluster::ReplicationGroupId group_id{"replication_group_id"}; + + std::vector bad_tablet_id_column_values{ + "", "sequence.bad", "sequence.", "sequence..", "sequence.foo.", "sequence..foo"}; + for (const auto& bad_value : bad_tablet_id_column_values) { + SCOPED_TRACE(std::string("using bad value ") + bad_value); +#ifndef NDEBUG + EXPECT_DEATH( + { EXPECT_NOK(xcluster::SafeTimeTablePK::FromSafeTimeTableRow(group_id, bad_value)); }, + "Safe time table tablet ID column"); +#else + EXPECT_NOK(xcluster::SafeTimeTablePK::FromSafeTimeTableRow(group_id, bad_value)); +#endif + } +} + } // namespace master } // namespace yb diff --git a/src/yb/master/xcluster/xcluster_safe_time_service.cc b/src/yb/master/xcluster/xcluster_safe_time_service.cc index 20fa7d619417..e56c93e8e1fc 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service.cc +++ b/src/yb/master/xcluster/xcluster_safe_time_service.cc @@ -381,7 +381,7 @@ Result XClusterSafeTimeService::ComputeSafeTime( std::unordered_map namespace_safe_time_map; std::unordered_map namespace_safe_time_map_without_ddl_queue; - std::vector table_entries_to_delete; + std::vector table_entries_to_delete; // Track tablets that are missing from the safe time, or slow. This is for reporting only. std::unordered_map> tablets_missing_safe_time_map; @@ -399,7 +399,7 @@ Result XClusterSafeTimeService::ComputeSafeTime( if (should_log_outlier_tablets) { const auto& tablet_safe_time = tablet_to_safe_time_map[tablet_info]; if (tablet_safe_time.is_special()) { - tablets_missing_safe_time_map[namespace_id].emplace_back(tablet_info.tablet_id); + tablets_missing_safe_time_map[namespace_id].emplace_back(tablet_info.tablet_id()); } else { namespace_max_safe_time[namespace_id].MakeAtLeast(tablet_safe_time); } @@ -425,7 +425,7 @@ Result XClusterSafeTimeService::ComputeSafeTime( // Ignore values like Invalid, Min, Max and only consider a valid clock time. if (tablet_safe_time.is_special()) { namespace_safe_time_map[*namespace_id] = HybridTime::kInvalid; - if (!ddl_queue_tablet_ids_.contains(tablet_info.tablet_id)) { + if (!ddl_queue_tablet_ids_.contains(tablet_info.tablet_id())) { namespace_safe_time_map_without_ddl_queue[*namespace_id] = HybridTime::kInvalid; } continue; @@ -435,7 +435,7 @@ Result XClusterSafeTimeService::ComputeSafeTime( if (tablet_safe_time.AddDelta(1s * FLAGS_xcluster_safe_time_slow_tablet_delta_secs) < namespace_max_safe_time[*namespace_id]) { namespace_min_safe_time[*namespace_id].MakeAtMost(tablet_safe_time); - slow_tablets_map[*namespace_id].emplace_back(tablet_info.tablet_id); + slow_tablets_map[*namespace_id].emplace_back(tablet_info.tablet_id()); } } @@ -446,7 +446,7 @@ Result XClusterSafeTimeService::ComputeSafeTime( namespace_safe_time.MakeAtMost(tablet_safe_time); // Also update namespace_safe_time_map_without_ddl_queue at same time. - if (!ddl_queue_tablet_ids_.contains(tablet_info.tablet_id)) { + if (!ddl_queue_tablet_ids_.contains(tablet_info.tablet_id())) { namespace_safe_time_map_without_ddl_queue[*namespace_id].MakeAtMost(tablet_safe_time); } } @@ -536,18 +536,20 @@ XClusterSafeTimeService::GetSafeTimeFromTable() { }; for (const auto& row : client::TableRange(*safe_time_table_, options)) { - auto replication_group_id = + auto replication_group_id_column_value = xcluster::ReplicationGroupId(row.column(kXCReplicationGroupIdIdx).string_value()); - auto tablet_id = row.column(kXCProducerTabletIdIdx).string_value(); + auto tablet_id_column_value = row.column(kXCProducerTabletIdIdx).string_value(); + auto key = VERIFY_RESULT(xcluster::SafeTimeTablePK::FromSafeTimeTableRow( + replication_group_id_column_value, tablet_id_column_value)); auto safe_time = row.column(kXCSafeTimeIdx).int64_value(); HybridTime safe_ht; RETURN_NOT_OK_PREPEND( safe_ht.FromUint64(static_cast(safe_time)), Format( - "Invalid safe time set in table $0 replication_group_id:$1, tablet_id:$2", - kSafeTimeTableName.table_name(), replication_group_id, tablet_id)); + "Invalid safe time set in table $0 tablet key $1", kSafeTimeTableName.table_name(), + key)); - tablet_safe_time[{replication_group_id, tablet_id}] = safe_ht; + tablet_safe_time[key] = safe_ht; } RETURN_NOT_OK_PREPEND( @@ -603,14 +605,20 @@ Status XClusterSafeTimeService::RefreshProducerTabletToNamespaceMap() { for (const auto& [_, stream_entry] : producer_entry.stream_map()) { const auto& consumer_table_id = stream_entry.consumer_table_id(); - auto stripped_consumer_table_id = - xcluster::StripSequencesDataAliasIfPresent(consumer_table_id); - auto consumer_namespace_id = - VERIFY_RESULT(catalog_manager_->GetTableNamespaceId(stripped_consumer_table_id)); + const auto& producer_table_id = stream_entry.producer_table_id(); + NamespaceId consumer_namespace_id; + if (xcluster::IsSequencesDataAlias(consumer_table_id)) { + consumer_namespace_id = + VERIFY_RESULT(xcluster::GetReplicationNamespaceBelongsTo(consumer_table_id)); + } else { + consumer_namespace_id = + VERIFY_RESULT(catalog_manager_->GetTableNamespaceId(consumer_table_id)); + } for (const auto& [_, producer_tablets] : stream_entry.consumer_producer_tablet_map()) { for (const auto& tablet_id : producer_tablets.tablets()) { - producer_tablet_namespace_map_[{replication_group_id, tablet_id}] = - consumer_namespace_id; + const auto key = VERIFY_RESULT(xcluster::SafeTimeTablePK::FromProducerTabletInfo( + replication_group_id, tablet_id, producer_table_id)); + producer_tablet_namespace_map_[key] = consumer_namespace_id; if (stream_entry.is_ddl_queue_table()) { ddl_queue_tablet_ids_.insert(tablet_id); } @@ -654,7 +662,7 @@ Status XClusterSafeTimeService::SetXClusterSafeTime( } Status XClusterSafeTimeService::CleanupEntriesFromTable( - const std::vector& entries_to_delete) { + const std::vector& entries_to_delete) { if (entries_to_delete.empty()) { return OK(); } @@ -673,8 +681,8 @@ Status XClusterSafeTimeService::CleanupEntriesFromTable( for (auto& tablet_info : entries_to_delete) { const auto op = safe_time_table_->NewWriteOp(QLWriteRequestPB::QL_STMT_DELETE); auto* const req = op->mutable_request(); - QLAddStringHashValue(req, tablet_info.replication_group_id.ToString()); - QLAddStringHashValue(req, tablet_info.tablet_id); + QLAddStringHashValue(req, tablet_info.replication_group_id_column_value().ToString()); + QLAddStringHashValue(req, tablet_info.tablet_id_column_value()); VLOG_WITH_FUNC(1) << "Cleaning up tablet from " << kSafeTimeTableName.table_name() << ". " << tablet_info.ToString(); diff --git a/src/yb/master/xcluster/xcluster_safe_time_service.h b/src/yb/master/xcluster/xcluster_safe_time_service.h index 068f5eb47918..aabd62f5709c 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service.h +++ b/src/yb/master/xcluster/xcluster_safe_time_service.h @@ -35,12 +35,13 @@ #include "yb/client/client_fwd.h" #include "yb/client/yb_table_name.h" #include "yb/common/hybrid_time.h" +#include "yb/common/xcluster_util.h" +#include "yb/gutil/thread_annotations.h" #include "yb/master/catalog_manager.h" #include "yb/master/xcluster/xcluster_consumer_metrics.h" #include "yb/master/xcluster/xcluster_manager_if.h" #include "yb/rpc/scheduler.h" #include "yb/util/threadpool.h" -#include "yb/gutil/thread_annotations.h" namespace yb { namespace master { @@ -83,27 +84,9 @@ class XClusterSafeTimeService { friend class XClusterSafeTimeServiceMocked; friend class XClusterSafeTimeServiceTest; - struct ProducerTabletInfo { - xcluster::ReplicationGroupId replication_group_id; - TabletId tablet_id; - - bool operator==(const ProducerTabletInfo& rhs) const { - return replication_group_id == rhs.replication_group_id && tablet_id == rhs.tablet_id; - } - - bool operator<(const ProducerTabletInfo& rhs) const { - if (replication_group_id == rhs.replication_group_id) { - return tablet_id < rhs.tablet_id; - } - return replication_group_id < rhs.replication_group_id; - } - - std::string ToString() const { return YB_STRUCT_TO_STRING(replication_group_id, tablet_id); } - }; - void ProcessTaskPeriodically() EXCLUDES(task_enqueue_lock_); - typedef std::map ProducerTabletToSafeTimeMap; + typedef std::map ProducerTabletToSafeTimeMap; virtual Result GetSafeTimeFromTable() REQUIRES(mutex_); @@ -121,8 +104,8 @@ class XClusterSafeTimeService { virtual Status SetXClusterSafeTime( const int64_t leader_term, const XClusterNamespaceToSafeTimeMap& new_safe_time_map); - virtual Status CleanupEntriesFromTable(const std::vector& entries_to_delete) - REQUIRES(mutex_); + virtual Status CleanupEntriesFromTable( + const std::vector& entries_to_delete) REQUIRES(mutex_); Result GetLeaderTermFromCatalogManager(); @@ -156,7 +139,8 @@ class XClusterSafeTimeService { int64_t leader_term_ GUARDED_BY(mutex_); int32_t cluster_config_version_ GUARDED_BY(mutex_); - std::map producer_tablet_namespace_map_ GUARDED_BY(mutex_); + std::map producer_tablet_namespace_map_ + GUARDED_BY(mutex_); // List of tablet ids for ddl_queue tables, used to find safe times without this stream. std::unordered_set ddl_queue_tablet_ids_ GUARDED_BY(mutex_); diff --git a/src/yb/tserver/xcluster_consumer.cc b/src/yb/tserver/xcluster_consumer.cc index 76d6cabcd4e0..561258c1e2c6 100644 --- a/src/yb/tserver/xcluster_consumer.cc +++ b/src/yb/tserver/xcluster_consumer.cc @@ -11,7 +11,12 @@ // under the License. // +#include "yb/tserver/xcluster_consumer.h" + +#include "yb/cdc/cdc_consumer.pb.h" #include "yb/cdc/xcluster_types.h" + +#include "yb/client/client.h" #include "yb/client/error.h" #include "yb/client/session.h" #include "yb/client/table_handle.h" @@ -21,25 +26,21 @@ #include "yb/common/pg_types.h" #include "yb/common/wire_protocol.h" +#include "yb/common/xcluster_util.h" + +#include "yb/gutil/map-util.h" #include "yb/master/master_defaults.h" #include "yb/master/master_heartbeat.pb.h" +#include "yb/rocksdb/rate_limiter.h" #include "yb/rpc/rpc.h" -#include "yb/tserver/xcluster_consumer.h" + #include "yb/tserver/tserver_xcluster_context_if.h" #include "yb/tserver/xcluster_consumer_auto_flags_info.h" #include "yb/tserver/xcluster_output_client.h" #include "yb/tserver/xcluster_poller.h" -#include "yb/cdc/cdc_consumer.pb.h" - -#include "yb/client/client.h" - -#include "yb/rocksdb/rate_limiter.h" - -#include "yb/gutil/map-util.h" - #include "yb/util/callsite_profiling.h" #include "yb/util/flags.h" #include "yb/util/flag_validators.h" @@ -400,7 +401,9 @@ void XClusterConsumer::UpdateReplicationGroupInMemState( stream_entry_pb.consumer_producer_tablet_map()) { for (const auto& producer_tablet_id : producer_tablet_list.tablets()) { auto xCluster_tablet_info = xcluster::XClusterTabletInfo{ - .producer_tablet_info = {replication_group_id, stream_id, producer_tablet_id}, + .producer_tablet_info = + {replication_group_id, stream_id, producer_tablet_id, + stream_entry_pb.producer_table_id()}, .consumer_tablet_info = {consumer_tablet_id, stream_entry_pb.consumer_table_id()}, .disable_stream = producer_entry_pb.disable_stream(), .automatic_ddl_mode = producer_entry_pb.automatic_ddl_mode()}; @@ -493,17 +496,36 @@ void XClusterConsumer::TriggerPollForNewTablets() { bool use_local_tserver = streams_with_local_tserver_optimization_.contains(producer_tablet_info.stream_id); - auto namespace_info_res = get_namespace_info_func_(consumer_tablet_info.tablet_id); - if (!namespace_info_res.ok()) { - LOG(WARNING) << "Could not get namespace info for table " - << consumer_tablet_info.tablet_id << ": " - << namespace_info_res.status().ToString(); - continue; // Don't finish creation. Try again on the next RunThread(). + NamespaceId consumer_namespace_id; + NamespaceName consumer_namespace_name; + auto consumer_table_id = consumer_tablet_info.table_id; + if (xcluster::IsSequencesDataAlias(consumer_table_id)) { + auto namespace_id_result = xcluster::GetReplicationNamespaceBelongsTo(consumer_table_id); + if (namespace_id_result) { + consumer_namespace_id = *namespace_id_result; + // We don't need consumer_namespace_name for sequence streams so don't bother computing + // it. + consumer_namespace_name = ""; + } else { + LOG(ERROR) << "Malformed sequences_data alias table ID: " << consumer_table_id + << "; skipping creation of a poller for a tablet belonging to that table: " + << consumer_tablet_info.tablet_id; + continue; + } + } else { + auto namespace_info_result = get_namespace_info_func_(consumer_tablet_info.tablet_id); + if (!namespace_info_result.ok()) { + LOG(WARNING) << "Could not get namespace info for tablet " + << consumer_tablet_info.tablet_id << ": " + << namespace_info_result.status().ToString(); + continue; // Don't finish creation. Try again on the next RunThread(). + } + consumer_namespace_id = namespace_info_result->first; + consumer_namespace_name = namespace_info_result->second; } - const auto& [namespace_id, namespace_name] = *namespace_info_res; auto xcluster_poller = std::make_shared( - producer_tablet_info, consumer_tablet_info, namespace_id, + producer_tablet_info, consumer_tablet_info, consumer_namespace_id, auto_flags_version_handler_->GetAutoFlagsCompatibleVersion( producer_tablet_info.replication_group_id), thread_pool_.get(), rpcs_.get(), local_client_, remote_clients_[replication_group_id], @@ -511,7 +533,7 @@ void XClusterConsumer::TriggerPollForNewTablets() { if (ddl_queue_streams_.contains(producer_tablet_info.stream_id)) { xcluster_poller->InitDDLQueuePoller( - use_local_tserver, rate_limiter_.get(), namespace_name, xcluster_context_, + use_local_tserver, rate_limiter_.get(), consumer_namespace_name, xcluster_context_, connect_to_pg_func_); } else { xcluster_poller->Init(use_local_tserver, rate_limiter_.get()); @@ -705,14 +727,17 @@ Status XClusterConsumer::PublishXClusterSafeTime() { auto session = local_client_.NewSession(local_client_.default_rpc_timeout()); for (auto& [producer_info, safe_time] : safe_time_map) { + const auto key = + VERIFY_RESULT(xcluster::SafeTimeTablePK::FromProducerTabletInfo(producer_info)); const auto op = safe_time_table_->NewWriteOp(QLWriteRequestPB::QL_STMT_UPDATE); auto* const req = op->mutable_request(); - QLAddStringHashValue(req, producer_info.replication_group_id.ToString()); - QLAddStringHashValue(req, producer_info.tablet_id); + QLAddStringHashValue(req, key.replication_group_id_column_value().ToString()); + QLAddStringHashValue(req, key.tablet_id_column_value()); safe_time_table_->AddInt64ColumnValue(req, master::kXCSafeTime, safe_time.ToUint64()); VLOG_WITH_FUNC(2) << "UniverseID: " << producer_info.replication_group_id << ", TabletId: " << producer_info.tablet_id + << ", TableId: " << producer_info.table_id << ", SafeTime: " << safe_time.ToDebugString(); session->Apply(std::move(op)); }