Skip to content

Commit

Permalink
[#12476] xCluster,TabletSplitting: Handle parent tablet deletion for …
Browse files Browse the repository at this point in the history
…xCluster related tablets

Summary:
*This is a fixed version of D17313 / commit 4b0f239*

Previously, parent tablets retained by xCluster were being retained for as long as their wal retention time, and also weren't being cleaned up from the cdc_state table. This could lead to metrics being stuck on high values, and stale old entries being left in cdc_state.

Adding in a new map `retained_by_xcluster_` to keep track of parent tablets that are hidden but that are still being replicated by xCluster. Tablets in this map won't be deleted by the hidden tablet deleting bg task. Tablets are added to this map when they are hidden for the first time.

Also adding in a bg task to check the status of tablets in this mapping and remove them if required. This will check the cdc_state entries for the children tablets and ensure that they are present and being polled, at which point the parent is no longer needed. This background thread will remove these parent tablets from `retained_by_xcluster_` which will then lead to the hidden tablet deleting bg task to clean up and actually delete the tablet.
To determine when the children tablets are being polled, changing the initial rows that we insert for new children tablets into cdc_state. Will now be setting the initial timestamp to null for these children tablets, so that we can tell that this is a new tablet that we have yet to start replicating yet.

Also discovered an issue with bootstrap + splitting: if we have tablet splits after having run BootstrapCdcProducer, then during SetupUniverseReplication, we use the current active tablets to start polling from. This complicates how we handle processing of the bootstrap checkpoint, and cleanup of the original tablets that were around for the bootstrap. For now, disabling tablet splits for tablets that are part of a bootstrapping state (state = INITIATED). Note that this does mean that if users bootstrap a stream but don't setup replication for it, then tablet splits won't happen until this stream is properly deleted (via DeleteCdcStream).

init atomic bool

Test Plan:
```
ybd --cxx-test integration-tests_xcluster-tablet-split-itest --gtest_filter XClusterExternalTabletSplitITest.MasterFailoverDuringConsumerPostSplitOps
ybd --cxx-test integration-tests_xcluster-tablet-split-itest --gtest_filter XClusterExternalTabletSplitITest.MasterFailoverDuringProducerPostSplitOps
ybd --cxx-test integration-tests_xcluster-tablet-split-itest
```

Reviewers: nicolas, rahuldesirazu

Reviewed By: rahuldesirazu

Subscribers: zyu, mbautin, bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D17795
  • Loading branch information
hulien22 committed Jun 21, 2022
1 parent 0838061 commit 288330d
Show file tree
Hide file tree
Showing 13 changed files with 513 additions and 128 deletions.
2 changes: 1 addition & 1 deletion ent/src/yb/cdc/cdc_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Status GetChangesForCDCSDK(
int64_t* last_readable_opid_index = nullptr,
const CoarseTimePoint deadline = CoarseTimePoint::max());

typedef std::function<Status(std::shared_ptr<yb::consensus::ReplicateMsg>)> UpdateOnSplitOpFunc;
using UpdateOnSplitOpFunc = std::function<Status(std::shared_ptr<yb::consensus::ReplicateMsg>)>;

Status GetChangesForXCluster(const std::string& stream_id,
const std::string& tablet_id,
Expand Down
37 changes: 24 additions & 13 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,15 @@ class CDCServiceImpl::Impl {
}
}

void ForceCdcStateUpdate(const ProducerTabletInfo& producer_tablet) {
std::lock_guard<rw_spinlock> l(mutex_);
auto it = tablet_checkpoints_.find(producer_tablet);
if (it != tablet_checkpoints_.end()) {
// Setting the timestamp to min will result in ExpiredAt saying it is expired.
it->cdc_state_checkpoint.last_update_time = CoarseTimePoint::min();
}
}

boost::optional<client::AsyncClientInitialiser> async_client_init_;

// this will be used for the std::call_once call while caching the client
Expand Down Expand Up @@ -1284,8 +1293,8 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
if (record.source_type == XCLUSTER) {
s = cdc::GetChangesForXCluster(
stream_id, req->tablet_id(), op_id, record, tablet_peer, session,
std::bind(&CDCServiceImpl::UpdateChildrenTabletsOnSplitOp, this, stream_id,
req->tablet_id(), std::placeholders::_1, session), mem_tracker,
std::bind(&CDCServiceImpl::UpdateChildrenTabletsOnSplitOp, this, producer_tablet,
std::placeholders::_1, session), mem_tracker,
&msgs_holder, resp, &last_readable_index, get_changes_deadline);
} else {
std::string commit_timestamp;
Expand Down Expand Up @@ -2822,7 +2831,6 @@ Status CDCServiceImpl::UpdateCheckpoint(
uint64_t last_record_hybrid_time,
const bool force_update) {
bool update_cdc_state = impl_->UpdateCheckpoint(producer_tablet, sent_op_id, commit_op_id);

if (update_cdc_state || force_update) {
auto cdc_state = VERIFY_RESULT(GetCdcStateTable());
const auto op = cdc_state->NewUpdateOp();
Expand Down Expand Up @@ -3087,8 +3095,7 @@ void CDCServiceImpl::IsBootstrapRequired(const IsBootstrapRequiredRequestPB* req
}

Status CDCServiceImpl::UpdateChildrenTabletsOnSplitOp(
const std::string& stream_id,
const std::string& tablet_id,
const ProducerTabletInfo& producer_tablet,
std::shared_ptr<yb::consensus::ReplicateMsg> split_op_msg,
const client::YBSessionPtr& session) {
const auto split_req = split_op_msg->split_request();
Expand All @@ -3105,7 +3112,7 @@ Status CDCServiceImpl::UpdateChildrenTabletsOnSplitOp(
auto cond = req->mutable_where_expr()->mutable_condition();
cond->set_op(QLOperator::QL_OP_AND);
QLAddStringCondition(cond, Schema::first_column_id() + master::kCdcStreamIdIdx,
QL_OP_EQUAL, stream_id);
QL_OP_EQUAL, producer_tablet.stream_id);
req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcTabletIdIdx);
req->mutable_column_refs()->add_ids(Schema::first_column_id() + master::kCdcStreamIdIdx);
cdc_state_table->AddColumns({master::kCdcCheckpoint}, req);
Expand All @@ -3116,21 +3123,25 @@ Status CDCServiceImpl::UpdateChildrenTabletsOnSplitOp(
auto row_block = ql::RowsResult(op.get()).GetRowBlock();
SCHECK(row_block->row_count() == 1, NotFound,
Format("Error finding entry in cdc_state table for tablet: $0, stream $1.",
child_tablet, stream_id));
child_tablet, producer_tablet.stream_id));
}

// If we found both entries then lets update their checkpoints to this split_op's op id.
// Force an update of parent tablet checkpoint/timestamp to ensure that there it gets updated at
// least once (otherwise, we may have a situation where consecutive splits occur within the
// cdc_state table update window, and we wouldn't update the tablet's row with non-null values).
impl_->ForceCdcStateUpdate(producer_tablet);

// If we found both entries then lets update their checkpoints to this split_op's op id, to
// ensure that we continue replicating from where we left off.
for (const auto& child_tablet : children_tablets) {
const auto op = cdc_state_table->NewUpdateOp();
auto* const req = op->mutable_request();
QLAddStringHashValue(req, child_tablet);
QLAddStringRangeValue(req, stream_id);

QLAddStringRangeValue(req, producer_tablet.stream_id);
// No need to update the timestamp here as we haven't started replicating the child yet.
cdc_state_table->AddStringColumnValue(
req, master::kCdcCheckpoint, consensus::OpIdToString(split_op_msg->id()));
cdc_state_table->AddTimestampColumnValue(
req, master::kCdcLastReplicationTime, GetCurrentTimeMicros());
// Only perform upserts on tservers for cdc_state.
// Only perform updates from tservers for cdc_state, so check if row exists or not.
auto* condition = req->mutable_if_expr()->mutable_condition();
condition->set_op(QL_OP_EXISTS);
// TODO(async_flush): https://github.com/yugabyte/yugabyte-db/issues/12173
Expand Down
5 changes: 2 additions & 3 deletions ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct TabletCheckpoint {
CoarseTimePoint last_active_time;

bool ExpiredAt(std::chrono::milliseconds duration, std::chrono::time_point<CoarseMonoClock> now) {
return (now - last_update_time) >= duration;
return !IsInitialized(last_update_time) || (now - last_update_time) >= duration;
}
};

Expand Down Expand Up @@ -332,8 +332,7 @@ class CDCServiceImpl : public CDCServiceIf {
const std::shared_ptr<tablet::TabletPeer>& tablet_peer);

Status UpdateChildrenTabletsOnSplitOp(
const std::string& stream_id,
const std::string& tablet_id,
const ProducerTabletInfo& producer_tablet,
std::shared_ptr<yb::consensus::ReplicateMsg> split_op_msg,
const client::YBSessionPtr& session);

Expand Down
20 changes: 18 additions & 2 deletions ent/src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

bool IsCdcEnabled(const TableInfo& table_info) const override;

bool IsTablePartOfBootstrappingCdcStream(const TableInfo& table_info) const override;

tablet::SnapshotCoordinator& snapshot_coordinator() override {
return snapshot_coordinator_;
}
Expand All @@ -237,6 +239,10 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon

void EnableTabletSplitting(const std::string& feature) override;

void StartXClusterParentTabletDeletionTaskIfStopped();

void ScheduleXClusterParentTabletDeletionTask();

private:
friend class SnapshotLoader;
friend class yb::master::ClusterLoadBalancer;
Expand Down Expand Up @@ -475,6 +481,9 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
const std::unordered_map<TableId, std::string>&
table_bootstrap_ids);

// Get the set of CDC streams for a given table, or an empty set if this is not a producer.
std::unordered_set<CDCStreamId> GetCdcStreamsForProducerTable(const TableId& table_id) const;

// Gets the set of CDC stream info for an xCluster consumer table.
XClusterConsumerTableStreamInfoMap GetXClusterStreamInfoForConsumerTable(const TableId& table_id)
const;
Expand Down Expand Up @@ -503,6 +512,12 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
const google::protobuf::RepeatedPtrField<HostPortPB>& master_addresses,
const google::protobuf::RepeatedPtrField<std::string>& table_ids);

void ProcessXClusterParentTabletDeletionPeriodically();

Status DoProcessXClusterParentTabletDeletion();

void LoadXClusterRetainedParentTabletsSet() REQUIRES(mutex_);

// Snapshot map: snapshot-id -> SnapshotInfo.
typedef std::unordered_map<SnapshotId, scoped_refptr<SnapshotInfo>> SnapshotInfoMap;
SnapshotInfoMap non_txn_snapshot_ids_map_;
Expand All @@ -518,8 +533,9 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon
typedef std::unordered_map<CDCStreamId, scoped_refptr<CDCStreamInfo>> CDCStreamInfoMap;
CDCStreamInfoMap cdc_stream_map_ GUARDED_BY(mutex_);

// Map of tables -> number of cdc streams they are producers for.
std::unordered_map<TableId, int> cdc_stream_tables_count_map_ GUARDED_BY(mutex_);
// Map of tables -> set of cdc streams they are producers for.
std::unordered_map<TableId, std::unordered_set<CDCStreamId>>
xcluster_producer_tables_to_stream_map_ GUARDED_BY(mutex_);

// Map of all consumer tables that are part of xcluster replication, to a map of the stream infos.
std::unordered_map<TableId, XClusterConsumerTableStreamInfoMap>
Expand Down
Loading

0 comments on commit 288330d

Please sign in to comment.