Skip to content

Commit

Permalink
[#24033] xCluster: correctly take sequences_data streams into account…
Browse files Browse the repository at this point in the history
… for xCluster safe time

Summary:
In this diff, I expand the primary key for the safe time table to distinguish between sequence_data tablets belonging to different streams/replication namespaces.  This is necessary to prevent confusion between different streams with the same replication group and tablet ID.  (Recall that we are already using sequence aliases for table IDs to distinguish between the tables belonging to different streams; unfortunately, because there are no aliases for tablet IDs, more work is required.)

To preserve backward compatibility as much as possible, I encode the new information into the existing tablet_id column of the table.  In particular,  the new value for this column is:
  * (non-sequences_data tablet) tablet_id
  * (sequences_data tablet) tablet_id,replication_namespace
This means that we can correctly read rows from existing semi-automatic xCluster replication ongoing during an upgrade and that we do not produce data using semi automatic mode with the new code that cannot be read by the old code.

I assume that we will have an auto flag guarding the use of automatic mode replication that will prevent us writing rows containing a "," then rolling back to the previous code.

I have created an abstraction, ProducerTabletKey, to encode how this works;
it is currently in code/yugabyte-db/src/yb/common/xcluster_util.h
because it needs to be used by both
src/yb/master/xcluster/xcluster_safe_time_service.cc and
src/yb/master/xcluster/xcluster_safe_time_service.cc.  Likely we will
move it elsewhere after refactoring later.
Jira: DB-12923

Test Plan:
I have added several tests to make sure that xCluster time is being computed correctly; in particular, slowness in the sequences_data tablet for one namespace should not affect the xCluster safe time for a different namespace.  This make sure the xCluster safe time for the two different name spaces are being kept separate and managed correctly.

```
ybd --cxx-test xcluster_sequences-test --test-args '' >& /tmp/generic.mdl.log
```
Added several unit tests for the new SafeTimeTablePK type:
```
ybd --cxx-test xcluster_safe_time_service-test --test-args '' >& /tmp/generic.mdl.log
```

Reviewers: hsunder, xCluster

Reviewed By: hsunder

Subscribers: ycdcxcluster, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D39715
  • Loading branch information
mdbridge committed Nov 14, 2024
1 parent 00f9146 commit ad8384e
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 84 deletions.
2 changes: 2 additions & 0 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1700,6 +1700,8 @@ void CDCServiceImpl::GetChanges(
if (is_replication_paused_for_stream && VLOG_IS_ON(1)) {
YB_LOG_EVERY_N_SECS(INFO, 300)
<< "Replication is paused from the producer for stream: " << req->stream_id();
// Below log line used in tests to detect when streams are paused.
VLOG(3) << "Replication is paused from the producer for stream: " << req->stream_id();
}
// Returning success to slow down polling on the consumer side while replication is paused or
// early exit for testing purpose.
Expand Down
1 change: 1 addition & 0 deletions src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ using TableIdToStreamIdMap =
std::unordered_map<TableId, std::pair<TabletId, std::unordered_set<xrepl::StreamId>>>;
using RollBackTabletIdCheckpointMap =
std::unordered_map<const std::string*, std::pair<int64_t, OpId>>;

class CDCServiceImpl : public CDCServiceIf {
public:
CDCServiceImpl(
Expand Down
5 changes: 2 additions & 3 deletions src/yb/cdc/xcluster_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
namespace yb::xcluster {

std::string ProducerTabletInfo::ToString() const {
return Format(
"{ replication_group_id: $0 stream_id: $1 tablet_id: $2 }", replication_group_id, stream_id,
tablet_id);
return YB_STRUCT_TO_STRING(replication_group_id, stream_id, tablet_id, table_id);
}

std::size_t ProducerTabletInfo::Hash::operator()(const ProducerTabletInfo& p) const noexcept {
std::size_t hash = 0;
boost::hash_combine(hash, p.replication_group_id);
boost::hash_combine(hash, p.stream_id);
boost::hash_combine(hash, p.tablet_id);
boost::hash_combine(hash, p.table_id);

return hash;
}
Expand Down
3 changes: 2 additions & 1 deletion src/yb/cdc/xcluster_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ struct ProducerTabletInfo {
// Unique ID on Producer, but not on Consumer.
xrepl::StreamId stream_id;
TabletId tablet_id;
TableId table_id;

bool operator==(const ProducerTabletInfo& other) const {
return replication_group_id == other.replication_group_id && stream_id == other.stream_id &&
tablet_id == other.tablet_id;
tablet_id == other.tablet_id && table_id == other.table_id;
}

std::string ToString() const;
Expand Down
91 changes: 91 additions & 0 deletions src/yb/common/xcluster_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ constexpr char kSequencesDataAliasTableIdMid[] = ".sequences_data_for.";

// How many characters a normal TableId (e.g., no suffixes) takes up.
constexpr int kTableIdSize = 32;

constexpr std::string_view kTabletIdColumnSequencePrefix = "sequence.";
constexpr std::string_view kTabletIdColumnSeparator = ".";

} // namespace

ReplicationGroupId GetAlterReplicationGroupId(const ReplicationGroupId& replication_group_id) {
Expand Down Expand Up @@ -75,4 +79,91 @@ Result<NamespaceId> GetReplicationNamespaceBelongsTo(const TableId& table_id) {
return table_id.substr(kTableIdSize + strlen(kSequencesDataAliasTableIdMid));
}

Result<SafeTimeTablePK> SafeTimeTablePK::FromProducerTabletInfo(const ProducerTabletInfo& info) {
return SafeTimeTablePK::FromProducerTabletInfo(
info.replication_group_id, info.tablet_id, info.table_id);
}

Result<SafeTimeTablePK> SafeTimeTablePK::FromProducerTabletInfo(
const xcluster::ReplicationGroupId& replication_group_id, const std::string& producer_tablet_id,
const std::string& producer_table_id) {
SafeTimeTablePK result;
result.replication_group_id_ = replication_group_id;
result.tablet_id_ = producer_tablet_id;
if (IsSequencesDataAlias(producer_table_id)) {
result.sequences_data_namespace_id_ =
VERIFY_RESULT(GetReplicationNamespaceBelongsTo(producer_table_id));
}
return result;
}

Result<SafeTimeTablePK> SafeTimeTablePK::FromSafeTimeTableRow(
const xcluster::ReplicationGroupId& replication_group_id_column_value,
const std::string& tablet_id_column_value) {
SafeTimeTablePK result;
result.replication_group_id_ = replication_group_id_column_value;
RSTATUS_DCHECK(
!result.replication_group_id_.empty(), IllegalState,
"Safe time table replication group ID column is empty");
if (!tablet_id_column_value.starts_with(kTabletIdColumnSequencePrefix)) {
result.tablet_id_ = tablet_id_column_value;
RSTATUS_DCHECK(
!result.tablet_id_.empty(), IllegalState, "Safe time table tablet ID column is empty");
} else {
size_t namespace_id_start = kTabletIdColumnSequencePrefix.size();
size_t separator_position =
tablet_id_column_value.find(kTabletIdColumnSeparator, namespace_id_start);
if (separator_position != std::string::npos) {
result.sequences_data_namespace_id_ = tablet_id_column_value.substr(
namespace_id_start, separator_position - namespace_id_start);
result.tablet_id_ =
tablet_id_column_value.substr(separator_position + kTabletIdColumnSeparator.size());
}
RSTATUS_DCHECK(
!result.tablet_id_.empty() && !result.sequences_data_namespace_id_.empty(), IllegalState,
Format(
"Safe time table tablet ID column starts with '$0' but is not in the form $0$1$2$3: $4",
kTabletIdColumnSequencePrefix, "<namespace_id>", kTabletIdColumnSeparator,
"<tablet_id>", tablet_id_column_value));
}
return result;
}

xcluster::ReplicationGroupId SafeTimeTablePK::replication_group_id_column_value() const {
return replication_group_id_;
}

TabletId SafeTimeTablePK::tablet_id_column_value() const {
if (sequences_data_namespace_id_.empty()) {
return tablet_id_;
}
return Format(
"$0$1$2$3", kTabletIdColumnSequencePrefix, sequences_data_namespace_id_,
kTabletIdColumnSeparator, tablet_id_);
}

xcluster::ReplicationGroupId SafeTimeTablePK::replication_group_id() const {
return replication_group_id_;
}

TabletId SafeTimeTablePK::tablet_id() const { return tablet_id_; }

NamespaceId SafeTimeTablePK::TEST_sequences_data_namespace_id() const {
return sequences_data_namespace_id_;
}

bool SafeTimeTablePK::operator==(const SafeTimeTablePK& rhs) const {
return replication_group_id_ == rhs.replication_group_id_ && tablet_id_ == rhs.tablet_id_ &&
sequences_data_namespace_id_ == rhs.sequences_data_namespace_id_;
}

bool SafeTimeTablePK::operator<(const SafeTimeTablePK& rhs) const {
return std::tie(replication_group_id_, tablet_id_, sequences_data_namespace_id_) <
std::tie(rhs.replication_group_id_, rhs.tablet_id_, rhs.sequences_data_namespace_id_);
}

std::string SafeTimeTablePK::ToString() const {
return YB_CLASS_TO_STRING(replication_group_id, tablet_id, sequences_data_namespace_id);
}

} // namespace yb::xcluster
41 changes: 41 additions & 0 deletions src/yb/common/xcluster_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,45 @@ TableId StripSequencesDataAliasIfPresent(const TableId& table_id);
// instead returns the ID's namespace as usual.
Result<NamespaceId> GetReplicationNamespaceBelongsTo(const TableId& table_id);

// The primary key used to access safe time information in the safe time table.
class SafeTimeTablePK {
public:
static Result<SafeTimeTablePK> FromProducerTabletInfo(const ProducerTabletInfo& info);

static Result<SafeTimeTablePK> FromProducerTabletInfo(
const xcluster::ReplicationGroupId& replication_group_id,
const std::string& producer_tablet_id, const std::string& producer_table_id);

static Result<SafeTimeTablePK> FromSafeTimeTableRow(
const xcluster::ReplicationGroupId& replication_group_id_column_value,
const std::string& tablet_id_column_value);

// These are what we store in the safe time table's primary key columns.
xcluster::ReplicationGroupId replication_group_id_column_value() const;
std::string tablet_id_column_value() const;

// These are the actual underlying values.
xcluster::ReplicationGroupId replication_group_id() const;
TabletId tablet_id() const;
// Only tests should access this information directly.
NamespaceId TEST_sequences_data_namespace_id() const;

bool operator==(const SafeTimeTablePK& rhs) const;

bool operator<(const SafeTimeTablePK& rhs) const;

std::string ToString() const;

private:
xcluster::ReplicationGroupId replication_group_id_;
TabletId tablet_id_;
// If this is a sequences_data tablet, then this holds its producer tablet
// ID sequence alias's replication namespace. Otherwise holds the empty string.
//
// We add this to the primary key to distinguish sequence_data tablets belonging to different
// streams in a backward-compatible way: we want to still be able to decode old primary keys
// (which will not involve sequences_data tablets).
NamespaceId sequences_data_namespace_id_{""};
};

} // namespace yb::xcluster
93 changes: 93 additions & 0 deletions src/yb/integration-tests/xcluster/xcluster_sequences-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "yb/common/xcluster_util.h"
#include "yb/integration-tests/xcluster/xcluster_ddl_replication_test_base.h"
#include "yb/util/flags.h"
#include "yb/util/logging_test_util.h"

DECLARE_bool(ysql_enable_packed_row);
DECLARE_int32(TEST_xcluster_simulated_lag_ms);
Expand Down Expand Up @@ -255,4 +256,96 @@ TEST_F(XClusterAutomaticModeTest, SequenceReplicationWithTransform) {
ASSERT_OK(VerifySequencesSameOnBothSides(namespace1));
}

TEST_F(XClusterAutomaticModeTest, SequenceSafeTime) {
const std::string namespace1{"yugabyte"};
ASSERT_OK(SetUpClusters(/*use_different_database_oids=*/false, namespace1));

ASSERT_OK(SetUpSequences(&producer_cluster_, namespace1));
ASSERT_OK(SetUpSequences(&consumer_cluster_, namespace1));

std::vector<NamespaceName> namespaces_to_replicate = {namespace1};
ASSERT_OK(CheckpointReplicationGroupWithoutRequiringNoBootstrapNeeded(namespaces_to_replicate));
ASSERT_OK(CreateReplicationFromCheckpoint({}, kReplicationGroupId, namespaces_to_replicate));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));
ASSERT_OK(VerifySequencesSameOnBothSides(namespace1));

ASSERT_OK(BumpSequences(&producer_cluster_, namespace1));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));
ASSERT_OK(VerifySequencesSameOnBothSides(namespace1));

ASSERT_OK(BumpSequences(&producer_cluster_, namespace1));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));
ASSERT_OK(VerifySequencesSameOnBothSides(namespace1));
}

TEST_F(XClusterAutomaticModeTest, SequencePausingAndSafeTime) {
const std::string namespace1{"yugabyte"};
ASSERT_OK(SetUpClusters(/*use_different_database_oids=*/false, namespace1));

ASSERT_OK(SetUpSequences(&producer_cluster_, namespace1));
ASSERT_OK(SetUpSequences(&consumer_cluster_, namespace1));

std::vector<NamespaceName> namespaces_to_replicate = {namespace1};
ASSERT_OK(CheckpointReplicationGroupWithoutRequiringNoBootstrapNeeded(namespaces_to_replicate));
ASSERT_OK(CreateReplicationFromCheckpoint({}, kReplicationGroupId, namespaces_to_replicate));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));

auto sequences_stream_id =
ASSERT_RESULT(GetCDCStreamID(xcluster::GetSequencesDataAliasForNamespace(
ASSERT_RESULT(GetNamespaceId(producer_client(), namespace_name)))));
ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/true));
ASSERT_OK(
StringWaiterLogSink("Replication is paused from the producer for stream").WaitFor(300s));
ASSERT_NOK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));

ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/false));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));
}

TEST_F(XClusterAutomaticModeTest, SequencePausingIsolation) {
const std::string namespace1{"yugabyte"};
const std::string namespace2{"yugabyte2"};
ASSERT_OK(SetUpClusters(/*use_different_database_oids=*/false, namespace1, namespace2));

ASSERT_OK(RunOnBothClusters([&](Cluster* cluster) -> Status {
RETURN_NOT_OK(SetUpSequences(cluster, namespace1));
return SetUpSequences(cluster, namespace2);
}));

std::vector<NamespaceName> namespaces_to_replicate = {namespace1, namespace2};
ASSERT_OK(CheckpointReplicationGroupWithoutRequiringNoBootstrapNeeded(namespaces_to_replicate));
ASSERT_OK(CreateReplicationFromCheckpoint({}, kReplicationGroupId, namespaces_to_replicate));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(namespaces_to_replicate));

auto pause_one_namespace_temporarily = [&](NamespaceName namespace_to_pause,
NamespaceName other_namespace) {
auto namespace_to_pause_id =
ASSERT_RESULT(GetNamespaceId(producer_client(), namespace_to_pause));
LOG(INFO) << "***** Pausing namespace: " << namespace_to_pause
<< " ID: " << namespace_to_pause_id;
auto sequences_stream_id = ASSERT_RESULT(
GetCDCStreamID(xcluster::GetSequencesDataAliasForNamespace(namespace_to_pause_id)));
ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/true));
ASSERT_OK(
StringWaiterLogSink(
"Replication is paused from the producer for stream: "s + AsString(sequences_stream_id))
.WaitFor(300s));
ASSERT_OK(BumpSequences(&producer_cluster_, namespace_to_pause));

std::vector<NamespaceName> paused_namespaces = {namespace_to_pause};
std::vector<NamespaceName> unpaused_namespaces = {other_namespace};
ASSERT_NOK(WaitForSafeTimeToAdvanceToNow(paused_namespaces));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(unpaused_namespaces));

LOG(INFO) << "***** Unpausing namespace: " << namespace_to_pause
<< " ID: " << namespace_to_pause_id;
ASSERT_OK(PauseResumeXClusterProducerStreams({sequences_stream_id}, /*is_paused=*/false));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(paused_namespaces));
ASSERT_OK(WaitForSafeTimeToAdvanceToNow(unpaused_namespaces));
};

pause_one_namespace_temporarily(namespace1, namespace2);
pause_one_namespace_temporarily(namespace2, namespace1);
}

} // namespace yb
Loading

0 comments on commit ad8384e

Please sign in to comment.