Skip to content

Commit

Permalink
[yugabyte#10935] CDCSDK: Provide tablet split support to CDCSDK Service
Browse files Browse the repository at this point in the history
Summary:
**Adding entries to the cdc_state table for the children tablets**
After the split has been registered, but before the split requests are sent, as part of the TabletSplitManager::ProcessSplitTabletResult call, in order to start processing the metadata for CDCSDK .
For each stream which has an entry with the parent tablet we will add entries for the child tablets with checkpoint 0.0

**Retain Parent tablet until GetChanges catches up with SplitOp**
While any tablet is being deleted after tablet split, we will first see if there is any active CDCSDK stream associated with the table of the tablet, using the newly introduced map: cdcsdk_tables_to_stream_map_. If there indeed is an active stream associated with the table, will add the tablet to a new map: retained_by_cdcsdk_(std::unordered_map<TabletId, HiddenReplicationParentTabletInfo>) , which tells the hidden tablet is still needed for cdcsdk. We will mark any tablet which has an active CDCSDK stream (this tablet is part of retained_by_cdcsdk_) as hidden. This way the tablet will not get deleted until we have reported all the required changes to the CDC client. This will be done as part of processing Split in the function:CatalogManager::DeleteTabletListAndSendRequests. This approach is similar to the one followed the xcluster (change introduced by PR: yugabyte@288330d )

The cdcsdk_tables_to_stream_map_ and retained_by_cdcsdk_ maps are in-memory data structures, but they will be re-initialized on master restarts.

**Detect Tablet SplitOp from GetChanges**
In the function: GetChangesForCDCSDK , when we read the ops to operate on from the function: ReadReplicatedMessagesForCDC , we will know that tablet split occurred when we a an op of type: yb::consensus::OperationType::SPLIT_OP.
The list of things to be done when we detect a split op is:
First confirm if tablet split has indeed occurred
Validate there are no more write ops/ update ops on the tablet
Do not update the checkpoint which is sent as a response to the client. So that the next GetChanges call will be with the OpId which is from the last operation just before the SplitOp. Please note that we will not be communicating to the client that the tablet split occurred in this call itself, but rather send the data until the SplitOp to the client. The client will get to know about the Split in their next ‘GetChanges’ call.

**Communicating Tablet Split to Client**
The client will next call GetChanges again with the fromOpId of the operation just before the SplitOp. So if the first message we read from ‘ReadReplicatedMessagesForCDC’ is the SplitOp message, we will know that the client already has all the data preceding the Split. In such cases:
we will update the checkpoint for the tablet , with the OpId of the SplitOp
Verify entry exists for children tablets, if not add entry for the children tablets with checkpoint equal to the SplitOpId to the ‘cdc_state’ table
Remove entry of the tablet/ stream pair from ‘cdc_state’ table
return an error code: TABLET_SPLIT.

**Dealing with GetChanges on the parent tablet after TabletSplit**
If the client calls “GetChanges” on a tablet which is not found, we will call “GetTablets”:
 if we see that the call is on any parent tablet, we will return a TABLET_SPLIT error.
If the tabletId is neither a parent nor an active tablet returned in through “GetTablets”, we will return a TABLET_NOT_FOUND error.

**Deletion of parent tablet**
There is a background thread which handles tablet deletion. We will add a new function: DoProcessCDCSDKClusterTabletDeletion ,  which will check if there is at least one entry for the parent tablet in the ‘cdc_state’ table , with checkpoint not (-1.-1). If we did not find a single row which satisfies the criteria:
we will remove the tablet’s  entry from ‘retained_by_cdcsdk_’.
We will also remove the rows for this tablet from the ‘cdc_state’ table
Update the child’s checkpoint as -1.-1 (If this is not done, any child tablet on which we have not started streaming will be unnecessarily retaining data)

And the function ‘CleanupHiddenTablets’ will later delete the tablet, since it is removed from ‘retained_by_cdcsdk_’.

We will be re-using the bg thread which handles the deletion of tablets retained by xcluster (introduced in PR: yugabyte@288330d) , now this bg task will run two functions: DoProcessCDCSDKClusterTabletDeletion , along with the existing DoProcessXClusterParentTabletDeletion

**Dealing with client side restarts before TabletSplit is communicated**
In cases where the cdcsdk client crashes/ restarts before the tablet split is communicated to it, which means there can still be data which needs to be streamed from the parent tablet, if the client calls “GetTablets” it will not return the parent tablet Id.
To support this scenario we will introduce a new API: “GetTabletListToPollForCdc”
This API will take the table name and stream id as request parameters , and return all the tablets and their checkpoints by scanning the ‘cdc_state’ table.
The list of things the new API “GetTabletListToPollForCdc” API will do is:
Call “GetTablets” for the relevant tabletId, and derive the set of parent tablets and child tablets from the results of this API (The api returns data to get the parent tablet id of the children tablets)
Now we start iterating the rows belonging to the required stream from the ‘cdc_state’table, and filter down only to the relevant rows.
We decide to add any tablet’s info to the final result based on the below conditions:
  # Add every tablet which is not a child/ parent tablet to the result
  # If the tablet is not a parent tablet, nor is it an active tablet returned by ‘GetTabelts’ call, then we do not add the tablet to the result. This happens in scenarios where the tablet split has been initiated but not completed.
  # If the tablet is not a child tablet, but a parent tablet, we will add the paren tablet to the result if we have not started polling on any of the children or if we have still not reported the tablet split to the client (when the tablet split is reported to the client, the child’s checkpoint will be changed to the SplitOp record’s OpId)
  # If the tablet is a child and we have started polling on the tablet, we will add the child to the result.
  # If the tablet is a child and if we have not started polling on the tablet, we check if the parent tablet has already been polled on, if so we do not add the current child tablet to the result, if the parent tablet has not been polled, we will add the child tablet to the result

Test Plan: Added ctests

Reviewers: skumar, srangavajjula, aagrawal, rvenkatesh, jhe, sdash, vkushwaha

Reviewed By: jhe, sdash, vkushwaha

Subscribers: bogdan

Differential Revision: http://phabricator.dev.yugabyte.com/D18638
  • Loading branch information
Adithya Bharadwaj committed Oct 7, 2022
1 parent 4344e7e commit fda511d
Show file tree
Hide file tree
Showing 18 changed files with 1,717 additions and 145 deletions.
1 change: 1 addition & 0 deletions ent/src/yb/cdc/cdc_producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Status GetChangesForCDCSDK(
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const std::shared_ptr<MemTracker>& mem_tracker,
const EnumOidLabelMap& enum_oid_label_map,
client::YBClient* client,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
std::string* commit_timestamp,
Expand Down
349 changes: 337 additions & 12 deletions ent/src/yb/cdc/cdc_service.cc

Large diffs are not rendered by default.

19 changes: 17 additions & 2 deletions ent/src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ class CDCServiceImpl : public CDCServiceIf {
Result<SetCDCCheckpointResponsePB> SetCDCCheckpoint(
const SetCDCCheckpointRequestPB& req, CoarseTimePoint deadline) override;

void GetTabletListToPollForCDC(
const GetTabletListToPollForCDCRequestPB* req,
GetTabletListToPollForCDCResponsePB* resp,
rpc::RpcContext context) override;

void IsBootstrapRequired(const IsBootstrapRequiredRequestPB* req,
IsBootstrapRequiredResponsePB* resp,
rpc::RpcContext rpc) override;
Expand Down Expand Up @@ -301,6 +306,13 @@ class CDCServiceImpl : public CDCServiceIf {
// tablet and then update the peers' log objects. Also used to update lag metrics.
void UpdatePeersAndMetrics();

Status GetTabletIdsToPoll(
const CDCStreamId stream_id,
const std::set<TabletId>& active_or_hidden_tablets,
const std::set<TabletId>& parent_tablets,
const std::map<TabletId, TabletId>& child_to_parent_mapping,
std::vector<std::pair<TabletId, OpId>>* result);

// This method deletes entries from the cdc_state table that are contained in the set.
Status DeleteCDCStateTableMetadata(const TabletIdStreamIdSet& cdc_state_entries_to_delete);

Expand All @@ -321,8 +333,8 @@ class CDCServiceImpl : public CDCServiceIf {
std::vector<ProducerTabletInfo>* producer_entries_modified,
std::vector<client::YBOperationPtr>* ops,
const CDCStreamId& stream_id,
const TableId& table_id,
const TabletId& tablet_id);
const TabletId& tablet_id,
const OpId& op_id = OpId::Invalid());

Status CreateCDCStreamForNamespace(
const CreateCDCStreamRequestPB* req,
Expand All @@ -342,6 +354,9 @@ class CDCServiceImpl : public CDCServiceIf {
std::shared_ptr<yb::consensus::ReplicateMsg> split_op_msg,
const client::YBSessionPtr& session);

Status UpdateChildrenTabletsOnSplitOpForCDCSDK(
const ProducerTabletInfo& info, const OpId& split_op_id);

// Get enum map from the cache.
Result<EnumOidLabelMap> GetEnumMapFromCache(const NamespaceName& ns_name);

Expand Down
89 changes: 89 additions & 0 deletions ent/src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

#include "yb/cdc/cdc_common_util.h"

#include "yb/client/client.h"
#include "yb/client/yb_table_name.h"
#include "yb/common/wire_protocol.h"
#include "yb/common/ql_expr.h"

#include "yb/docdb/docdb_util.h"
#include "yb/docdb/doc_key.h"

#include "yb/master/master_client.pb.h"
#include "yb/util/flag_tags.h"
#include "yb/util/logging.h"

Expand Down Expand Up @@ -733,6 +736,35 @@ void FillDDLInfo(
}
}

bool VerifyTabletSplitOnParentTablet(
const TableId& table_id, const TabletId& tablet_id,
const std::shared_ptr<yb::consensus::ReplicateMsg>& msg, client::YBClient* client) {
if (!(msg->has_split_request() && msg->split_request().has_tablet_id() &&
msg->split_request().tablet_id() == tablet_id)) {
LOG(WARNING) << "The replicate message for split-op does not have the parent tablet_id set to: "
<< tablet_id << ". Could not verify tablet-split for tablet: " << tablet_id;
return false;
}

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
client::YBTableName table_name;
table_name.set_table_id(table_id);
RETURN_NOT_OK_RET(
client->GetTablets(
table_name, 0, &tablets, /* partition_list_version =*/nullptr,
RequireTabletsRunning::kFalse, master::IncludeInactive::kTrue),
false);

uint children_tablet_count = 0;
for (const auto& tablet : tablets) {
if (tablet.has_split_parent_tablet_id() && tablet.split_parent_tablet_id() == tablet_id) {
children_tablet_count += 1;
}
}

return (children_tablet_count == 2);
}

// CDC get changes is different from 2DC as it doesn't need
// to read intents from WAL.

Expand All @@ -744,6 +776,7 @@ Status GetChangesForCDCSDK(
const std::shared_ptr<tablet::TabletPeer>& tablet_peer,
const MemTrackerPtr& mem_tracker,
const EnumOidLabelMap& enum_oid_label_map,
client::YBClient* client,
consensus::ReplicateMsgsHolder* msgs_holder,
GetChangesResponsePB* resp,
std::string* commit_timestamp,
Expand All @@ -756,6 +789,8 @@ Status GetChangesForCDCSDK(
ScopedTrackedConsumption consumption;
CDCSDKCheckpointPB checkpoint;
bool checkpoint_updated = false;
bool report_tablet_split = false;
OpId split_op_id = OpId::Invalid();

// It is snapshot call.
if (from_op_id.write_id() == -1) {
Expand Down Expand Up @@ -977,6 +1012,47 @@ Status GetChangesForCDCSDK(
}
break;

case yb::consensus::OperationType::SPLIT_OP: {
// It is possible that we found records corresponding to SPLIT_OP even when it failed.
// We first verify if a split has indeed occured succesfully on the tablet by checking:
// 1. There are two children tablets for the tablet
// 2. The split op is the last operation on the tablet
// If either of the conditions are false, we will know the splitOp is not succesfull.
const TableId& table_id = tablet_peer->tablet()->metadata()->table_id();

if (!(VerifyTabletSplitOnParentTablet(table_id, tablet_id, msg, client))) {
SetCheckpoint(
msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint, last_streamed_op_id);
checkpoint_updated = true;
LOG_WITH_FUNC(WARNING)
<< "Found SplitOp record with index: " << msg->id()
<< ", but did not find any children tablets for the parent tablet: " << tablet_id;
} else {
if (checkpoint_updated) {
// If we have records which are yet to be streamed which we discovered in the same
// 'GetChangesForCDCSDK' call, we will not update the checkpoint to the SplitOp
// record's OpId and return the records seen till now. Next time the client will
// call 'GetChangesForCDCSDK' with the OpId just before the SplitOp's record.
LOG(INFO) << "Found SPLIT_OP record with OpId: " << msg->id()
<< ", will stream all seen records until now.";
} else {
// If 'GetChangesForCDCSDK' was called with the OpId just before the SplitOp's
// record, and if there is no more data to stream and we can notify the client
// about the split and update the checkpoint. At this point, we will store the
// split_op_id.
LOG(INFO) << "Found SPLIT_OP record with OpId: " << msg->id()
<< ", and if we did not see any other records we will report the tablet "
"split to the client";
SetCheckpoint(
msg->id().term(), msg->id().index(), 0, "", 0, &checkpoint,
last_streamed_op_id);
checkpoint_updated = true;
split_op_id = OpId::FromPB(msg->id());
}
}
}
break;

default:
// Nothing to do for other operation types.
break;
Expand All @@ -999,6 +1075,13 @@ Status GetChangesForCDCSDK(
last_seen_op_id.index < *last_readable_opid_index);
}

// If the split_op_id is equal to the checkpoint i.e the OpId of the last actionable message, we
// know that after the split there are no more actionable messages, and this confirms that the
// SPLIT OP was succesfull.
if (split_op_id.term == checkpoint.term() && split_op_id.index == checkpoint.index()) {
report_tablet_split = true;
}

if (consumption) {
consumption.Add(resp->SpaceUsedLong());
}
Expand All @@ -1017,6 +1100,12 @@ Status GetChangesForCDCSDK(
<< resp->cdc_sdk_checkpoint().ShortDebugString();
VLOG(1) << "The checkpoint is not updated " << resp->checkpoint().ShortDebugString();
}

if (report_tablet_split) {
return STATUS_FORMAT(
TabletSplit, "Tablet Split on tablet: $0, no more records to stream", tablet_id);
}

return Status::OK();
}

Expand Down
Loading

0 comments on commit fda511d

Please sign in to comment.