Skip to content

Commit

Permalink
[yugabyte#16879] DocDB: Extract encoding related part from docdb to d…
Browse files Browse the repository at this point in the history
…ockv

Summary:
DocDB library consists of two parts.
The first part is responsible to key-value encoding and decoding.
The second part is responsible for fetching data rocksdb and other high level communications.
They are partially separated to docdb and docdb_encoding libraries libraries.
But files related to both parts still placed in docdb source directory.

This diff places files related to encoding to newly introduced dockv library.
So all modules that depended on docdb_encoding now depends on dockv.
Also moved other parts that are encoding related to this new library.

Test Plan: Jenkins

Reviewers: mbautin, skumar, timur, bogdan

Reviewed By: timur, bogdan

Subscribers: bogdan, asrivastava, ybase, xCluster

Differential Revision: https://phabricator.dev.yugabyte.com/D24491
  • Loading branch information
spolitov committed Apr 17, 2023
1 parent 87d2426 commit d2a4a64
Show file tree
Hide file tree
Showing 280 changed files with 2,952 additions and 2,762 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ set(YB_SUBDIR_NAMES
common
consensus
docdb
dockv
encryption
fs
gen_yrpc
Expand Down
6 changes: 3 additions & 3 deletions src/yb/cdc/cdc_common_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
#include "yb/consensus/replicate_msgs_holder.h"

#include "yb/docdb/docdb.h"
#include "yb/docdb/primitive_value.h"
#include "yb/dockv/primitive_value.h"
#include "yb/docdb/ql_rowwise_iterator_interface.h"
#include "yb/docdb/value_type.h"
#include "yb/dockv/value_type.h"

#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet_metadata.h"
Expand Down Expand Up @@ -62,7 +62,7 @@ void AddColumnToMap(

void AddProtoRecordColumnToMap(
const ColumnSchema &col_schema,
const docdb::PrimitiveValue &col,
const dockv::PrimitiveValue &col,
cdc::KeyValuePairPB *kv_pair,
bool is_proto_record,
DatumMessagePB *cdc_datum_message = nullptr);
Expand Down
36 changes: 18 additions & 18 deletions src/yb/cdc/cdc_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
#include "yb/consensus/raft_consensus.h"
#include "yb/consensus/replicate_msgs_holder.h"

#include "yb/docdb/doc_key.h"
#include "yb/dockv/doc_key.h"
#include "yb/docdb/docdb.pb.h"
#include "yb/docdb/primitive_value.h"
#include "yb/docdb/value.h"
#include "yb/docdb/value_type.h"
#include "yb/dockv/primitive_value.h"
#include "yb/dockv/value.h"
#include "yb/dockv/value_type.h"

#include "yb/master/master_defaults.h"

Expand Down Expand Up @@ -72,7 +72,7 @@ namespace cdc {
using namespace std::chrono_literals;
using consensus::ReplicateMsgPtr;
using consensus::ReplicateMsgs;
using docdb::PrimitiveValue;
using dockv::PrimitiveValue;
using tablet::TransactionParticipant;

std::shared_ptr<StreamMetadata::StreamTabletMetadata> StreamMetadata::GetTabletMetadata(
Expand All @@ -96,13 +96,13 @@ std::shared_ptr<StreamMetadata::StreamTabletMetadata> StreamMetadata::GetTabletM
}

void AddColumnToMap(const ColumnSchema& col_schema,
const docdb::KeyEntryValue& col,
const dockv::KeyEntryValue& col,
cdc::KeyValuePairPB* kv_pair) {
kv_pair->set_key(col_schema.name());
col.ToQLValuePB(col_schema.type(), kv_pair->mutable_value());
}

void AddPrimaryKey(const docdb::SubDocKey& decoded_key,
void AddPrimaryKey(const dockv::SubDocKey& decoded_key,
const Schema& tablet_schema,
CDCRecordPB* record) {
size_t i = 0;
Expand Down Expand Up @@ -356,11 +356,11 @@ Status PopulateWriteRecord(const ReplicateMsgPtr& msg,
for (const auto& write_pair : batch.write_pairs()) {
Slice key = write_pair.key();
const auto key_size = VERIFY_RESULT(
docdb::DocKey::EncodedSize(key, docdb::DocKeyPart::kWholeDocKey));
dockv::DocKey::EncodedSize(key, dockv::DocKeyPart::kWholeDocKey));

Slice value_slice = write_pair.value();
RETURN_NOT_OK(docdb::ValueControlFields::Decode(&value_slice));
auto value_type = docdb::DecodeValueEntryType(value_slice);
RETURN_NOT_OK(dockv::ValueControlFields::Decode(&value_slice));
auto value_type = dockv::DecodeValueEntryType(value_slice);

// Compare key hash with previously seen key hash to determine whether the write pair
// is part of the same row or not.
Expand All @@ -369,8 +369,8 @@ Status PopulateWriteRecord(const ReplicateMsgPtr& msg,
// Write pair contains record for different row. Create a new CDCRecord in this case.
record = resp->add_records();
Slice sub_doc_key = key;
docdb::SubDocKey decoded_key;
RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, docdb::HybridTimeRequired::kFalse));
dockv::SubDocKey decoded_key;
RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, dockv::HybridTimeRequired::kFalse));

if (metadata.record_format == CDCRecordFormat::WAL) {
// For xCluster, populate serialized data from WAL, to avoid unnecessary deserializing on
Expand All @@ -379,7 +379,7 @@ Status PopulateWriteRecord(const ReplicateMsgPtr& msg,
if (decoded_key.doc_key().has_hash()) {
// TODO: is there another way of getting this? Perhaps using kUpToHashOrFirstRange?
kv_pair->set_key(
PartitionSchema::EncodeMultiColumnHashValue(decoded_key.doc_key().hash()));
dockv::PartitionSchema::EncodeMultiColumnHashValue(decoded_key.doc_key().hash()));
} else {
kv_pair->set_key(decoded_key.doc_key().Encode().ToStringBuffer());
}
Expand All @@ -389,7 +389,7 @@ Status PopulateWriteRecord(const ReplicateMsgPtr& msg,
}

// Check whether operation is WRITE or DELETE.
if (value_type == docdb::ValueEntryType::kTombstone && decoded_key.num_subkeys() == 0) {
if (value_type == dockv::ValueEntryType::kTombstone && decoded_key.num_subkeys() == 0) {
record->set_operation(CDCRecordPB::DELETE);
} else {
record->set_operation(CDCRecordPB::WRITE);
Expand Down Expand Up @@ -420,16 +420,16 @@ Status PopulateWriteRecord(const ReplicateMsgPtr& msg,
kv_pair->set_key(write_pair.key().ToBuffer());
kv_pair->mutable_value()->set_binary_value(write_pair.value().ToBuffer());
} else if (record->operation() == CDCRecordPB_OperationType_WRITE) {
docdb::KeyEntryValue column_id;
dockv::KeyEntryValue column_id;
Slice key_column = write_pair.key().WithoutPrefix(key_size);
RETURN_NOT_OK(column_id.DecodeFromKey(&key_column));
if (column_id.type() == docdb::KeyEntryType::kColumnId) {
docdb::Value decoded_value;
if (column_id.type() == dockv::KeyEntryType::kColumnId) {
dockv::Value decoded_value;
RETURN_NOT_OK(decoded_value.Decode(write_pair.value()));

const ColumnSchema& col = VERIFY_RESULT(schema.column_by_id(column_id.GetColumnId()));
AddColumnToMap(col, decoded_value.primitive_value(), record->add_changes());
} else if (column_id.type() != docdb::KeyEntryType::kSystemColumnId) {
} else if (column_id.type() != dockv::KeyEntryType::kSystemColumnId) {
LOG(DFATAL) << "Unexpected value type in key: " << column_id.type();
}
}
Expand Down
Loading

0 comments on commit d2a4a64

Please sign in to comment.