Skip to content

Commit

Permalink
[yugabyte#11409] DocDB: Skip creating write batch while applying and …
Browse files Browse the repository at this point in the history
…removing intents

Summary:
Recently we added DirectWriter to write to RocksDB w/o creating WriteBatch.
I.e. write directly to MemTable.

Here we use DirectWriter to apply and remove intents.

Test Plan: Jenkins

Reviewers: mbautin, timur

Reviewed By: timur

Subscribers: kannan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D15582
  • Loading branch information
spolitov authored and jayant anand committed Mar 8, 2022
1 parent b862cfb commit 93fc86a
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 314 deletions.
273 changes: 0 additions & 273 deletions src/yb/docdb/docdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "yb/docdb/pgsql_operation.h"
#include "yb/docdb/rocksdb_writer.h"
#include "yb/docdb/subdocument.h"
#include "yb/docdb/transaction_dump.h"
#include "yb/docdb/value.h"
#include "yb/docdb/value_type.h"

Expand Down Expand Up @@ -79,13 +78,6 @@ using strings::Substitute;

using namespace std::placeholders;

DEFINE_test_flag(bool, fail_on_replicated_batch_idx_set_in_txn_record, false,
"Fail when a set of replicated batch indexes is found in txn record.");
DEFINE_int32(txn_max_apply_batch_records, 100000,
"Max number of apply records allowed in single RocksDB batch. "
"When a transaction's data in one tablet does not fit into specified number of "
"records, it will be applied using multiple RocksDB write batches.");

namespace yb {
namespace docdb {

Expand Down Expand Up @@ -753,271 +745,6 @@ void AppendTransactionKeyPrefix(const TransactionId& transaction_id, KeyBytes* o
out->AppendRawBytes(transaction_id.AsSlice());
}

CHECKED_STATUS IntentToWriteRequest(
const Slice& transaction_id_slice,
const AbortedSubTransactionSet& aborted,
HybridTime commit_ht,
const Slice& reverse_index_key,
const Slice& reverse_index_value,
rocksdb::Iterator* intent_iter,
rocksdb::WriteBatch* regular_batch,
IntraTxnWriteId* write_id) {
DocHybridTimeBuffer doc_ht_buffer;
intent_iter->Seek(reverse_index_value);
if (!intent_iter->Valid() || intent_iter->key() != reverse_index_value) {
Slice temp_slice = reverse_index_value;
auto value_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice);
temp_slice = reverse_index_key;
auto key_doc_ht = DocHybridTime::DecodeFromEnd(&temp_slice);
LOG(ERROR) << "Unable to find intent: " << reverse_index_value.ToDebugHexString()
<< " (" << value_doc_ht << ") for "
<< reverse_index_key.ToDebugHexString() << "(" << key_doc_ht << ")";
std::this_thread::sleep_for(std::chrono::seconds(1000));

LOG(DFATAL) << "Unable to find intent: " << reverse_index_value.ToDebugHexString()
<< " (" << value_doc_ht << ") for "
<< reverse_index_key.ToDebugHexString() << "(" << key_doc_ht << ")";
return Status::OK();
}
auto intent = VERIFY_RESULT(ParseIntentKey(intent_iter->key(), transaction_id_slice));

if (intent.types.Test(IntentType::kStrongWrite)) {
auto decoded_value = VERIFY_RESULT(
DecodeIntentValue(intent_iter->value(), &transaction_id_slice));

// Write id should match to one that were calculated during append of intents.
// Doing it just for sanity check.
RSTATUS_DCHECK_GE(
decoded_value.write_id, *write_id,
Corruption,
Format("Unexpected write id. Expected: $0, found: $1, raw value: $2",
*write_id,
decoded_value.write_id,
intent_iter->value().ToDebugHexString()));
*write_id = decoded_value.write_id;

// Intents for row locks should be ignored (i.e. should not be written as regular records).
if (decoded_value.body.starts_with(ValueTypeAsChar::kRowLock)) {
return Status::OK();
}

// Intents from aborted subtransactions should not be written as regular records.
if (aborted.Test(decoded_value.subtransaction_id)) {
return Status::OK();
}

// After strip of prefix and suffix intent_key contains just SubDocKey w/o a hybrid time.
// Time will be added when writing batch to RocksDB.
std::array<Slice, 2> key_parts = {{
intent.doc_path,
doc_ht_buffer.EncodeWithValueType(commit_ht, *write_id),
}};
std::array<Slice, 2> value_parts = {{
intent.doc_ht,
decoded_value.body,
}};

// Useful when debugging transaction failure.
#if defined(DUMP_APPLY)
SubDocKey sub_doc_key;
CHECK_OK(sub_doc_key.FullyDecodeFrom(intent.doc_path, HybridTimeRequired::kFalse));
if (!sub_doc_key.subkeys().empty()) {
auto txn_id = FullyDecodeTransactionId(transaction_id_slice);
LOG(INFO) << "Apply: " << sub_doc_key.ToString()
<< ", time: " << commit_ht << ", write id: " << *write_id << ", txn: " << txn_id
<< ", value: " << intent_value.ToDebugString();
}
#endif

regular_batch->Put(key_parts, value_parts);
++*write_id;
}

return Status::OK();
}

template <size_t N>
void PutApplyState(
const Slice& transaction_id_slice, HybridTime commit_ht, IntraTxnWriteId write_id,
const std::array<Slice, N>& value_parts, rocksdb::WriteBatch* regular_batch) {
char transaction_apply_state_value_type = ValueTypeAsChar::kTransactionApplyState;
char group_end_value_type = ValueTypeAsChar::kGroupEnd;
char hybrid_time_value_type = ValueTypeAsChar::kHybridTime;
DocHybridTime doc_hybrid_time(commit_ht, write_id);
char doc_hybrid_time_buffer[kMaxBytesPerEncodedHybridTime];
char* doc_hybrid_time_buffer_end = doc_hybrid_time.EncodedInDocDbFormat(
doc_hybrid_time_buffer);
std::array<Slice, 5> key_parts = {{
Slice(&transaction_apply_state_value_type, 1),
transaction_id_slice,
Slice(&group_end_value_type, 1),
Slice(&hybrid_time_value_type, 1),
Slice(doc_hybrid_time_buffer, doc_hybrid_time_buffer_end),
}};
regular_batch->Put(key_parts, value_parts);
}

ApplyTransactionState StoreApplyState(
const Slice& transaction_id_slice, const Slice& key, IntraTxnWriteId write_id,
const AbortedSubTransactionSet& aborted, HybridTime commit_ht,
rocksdb::WriteBatch* regular_batch) {
auto result = ApplyTransactionState {
.key = key.ToBuffer(),
.write_id = write_id,
.aborted = aborted,
};
ApplyTransactionStatePB pb;
result.ToPB(&pb);
aborted.ToPB(pb.mutable_aborted()->mutable_set());
pb.set_commit_ht(commit_ht.ToUint64());
faststring encoded_pb;
pb_util::SerializeToString(pb, &encoded_pb);
char string_value_type = ValueTypeAsChar::kString;
std::array<Slice, 2> value_parts = {{
Slice(&string_value_type, 1),
Slice(encoded_pb.data(), encoded_pb.size())
}};
PutApplyState(transaction_id_slice, commit_ht, write_id, value_parts, regular_batch);
return result;
}

Result<ApplyTransactionState> PrepareApplyIntentsBatch(
const TabletId& tablet_id,
const TransactionId& transaction_id,
const AbortedSubTransactionSet& aborted,
HybridTime commit_ht,
const KeyBounds* key_bounds,
const ApplyTransactionState* apply_state,
HybridTime log_ht,
rocksdb::WriteBatch* regular_batch,
rocksdb::DB* intents_db,
rocksdb::WriteBatch* intents_batch) {
SCHECK_EQ((regular_batch != nullptr) + (intents_batch != nullptr), 1, InvalidArgument,
"Exactly one write batch should be non-null, either regular or intents");

// In case we have passed in a non-null apply_state, it's aborted set will have been loaded from
// persisted apply state, and the passed in aborted set will correspond to the aborted set at
// commit time. Rather then copy that set upstream so it is passed in as aborted, we simply grab
// a reference to it here, if it is defined, to use in this method.
const auto& latest_aborted_set = apply_state ? apply_state->aborted : aborted;

// regular_batch or intents_batch could be null. In this case we don't fill apply batch for
// appropriate DB.

KeyBytes txn_reverse_index_prefix;
Slice transaction_id_slice = transaction_id.AsSlice();
AppendTransactionKeyPrefix(transaction_id, &txn_reverse_index_prefix);
txn_reverse_index_prefix.AppendValueType(ValueType::kMaxByte);
Slice key_prefix = txn_reverse_index_prefix.AsSlice();
key_prefix.remove_suffix(1);
const Slice reverse_index_upperbound = txn_reverse_index_prefix.AsSlice();

auto reverse_index_iter = CreateRocksDBIterator(
intents_db, &KeyBounds::kNoBounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId, nullptr /* read_filter */, &reverse_index_upperbound);

BoundedRocksDbIterator intent_iter;

// If we don't have regular_batch, it means that we are just removing intents, i.e. when a
// transaction has been aborted. We don't need the intent iterator in that case, because the
// reverse index iterator is sufficient.
if (regular_batch) {
intent_iter = CreateRocksDBIterator(
intents_db, key_bounds, BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId);
}

reverse_index_iter.Seek(key_prefix);

DocHybridTimeBuffer doc_ht_buffer;

const auto& log_prefix = intents_db->GetOptions().log_prefix;

IntraTxnWriteId write_id = 0;
if (apply_state) {
reverse_index_iter.Seek(apply_state->key);
write_id = apply_state->write_id;
if (regular_batch) {
// This sanity check is invalid for remove case, because .SST file could be deleted.
LOG_IF(DFATAL, !reverse_index_iter.Valid() || reverse_index_iter.key() != apply_state->key)
<< "Continue from wrong key: " << Slice(apply_state->key).ToDebugString() << ", txn: "
<< transaction_id << ", position: "
<< (reverse_index_iter.Valid() ? reverse_index_iter.key().ToDebugString() : "<INVALID>")
<< ", write id: " << write_id;
}
}

const uint64_t max_records = FLAGS_txn_max_apply_batch_records;
const uint64_t write_id_limit = write_id + max_records;
while (reverse_index_iter.Valid()) {
const Slice key_slice(reverse_index_iter.key());

if (!key_slice.starts_with(key_prefix)) {
break;
}

VLOG(4) << log_prefix << "Apply reverse index record to ["
<< (regular_batch ? "R" : "") << (intents_batch ? "I" : "")
<< "]: " << EntryToString(reverse_index_iter, StorageDbType::kIntents);

// At this point, txn_reverse_index_prefix is a prefix of key_slice. If key_slice is equal to
// txn_reverse_index_prefix in size, then they are identical, and we are seeked to transaction
// metadata. Otherwise, we're seeked to an intent entry in the index which we may process.
if (key_slice.size() > txn_reverse_index_prefix.size()) {
auto reverse_index_value = reverse_index_iter.value();
if (!reverse_index_value.empty() && reverse_index_value[0] == ValueTypeAsChar::kBitSet) {
CHECK(!FLAGS_TEST_fail_on_replicated_batch_idx_set_in_txn_record);
reverse_index_value.remove_prefix(1);
RETURN_NOT_OK(OneWayBitmap::Skip(&reverse_index_value));
}

// Value of reverse index is a key of original intent record, so seek it and check match.
if (regular_batch && IsWithinBounds(key_bounds, reverse_index_value)) {
// We store apply state only if there are some more intents left.
// So doing this check here, instead of right after write_id was incremented.
if (write_id >= write_id_limit) {
return StoreApplyState(
transaction_id_slice, key_slice, write_id, latest_aborted_set, commit_ht,
regular_batch);
}
RETURN_NOT_OK(IntentToWriteRequest(
transaction_id_slice, latest_aborted_set, commit_ht, key_slice, reverse_index_value,
&intent_iter, regular_batch, &write_id));
}

if (intents_batch) {
intents_batch->SingleDelete(reverse_index_value);
}
}

if (intents_batch) {
if (intents_batch->Count() >= max_records) {
// No need to return with .aborted, since this branch is only hit during intent clean-up.
return ApplyTransactionState {
.key = key_slice.ToBuffer(),
.write_id = write_id,
};
}

intents_batch->SingleDelete(key_slice);
}

reverse_index_iter.Next();
}

if (apply_state && regular_batch) {
char tombstone_value_type = ValueTypeAsChar::kTombstone;
std::array<Slice, 1> value_parts = {{Slice(&tombstone_value_type, 1)}};
PutApplyState(transaction_id_slice, commit_ht, write_id, value_parts, regular_batch);
}

if (regular_batch) {
YB_TRANSACTION_DUMP(
ApplyIntents, tablet_id, transaction_id_slice, log_ht, regular_batch->Data());
}
return ApplyTransactionState {};
}

std::string ApplyTransactionState::ToString() const {
return Format(
"{ key: $0 write_id: $1 aborted: $2 }", Slice(key).ToDebugString(), write_id, aborted);
Expand Down
12 changes: 0 additions & 12 deletions src/yb/docdb/docdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,18 +231,6 @@ struct ApplyTransactionState {
}
};

Result<ApplyTransactionState> PrepareApplyIntentsBatch(
const TabletId& tablet_id,
const TransactionId& transaction_id,
const AbortedSubTransactionSet& aborted,
HybridTime commit_ht,
const KeyBounds* key_bounds,
const ApplyTransactionState* apply_state,
HybridTime log_ht,
rocksdb::WriteBatch* regular_batch,
rocksdb::DB* intents_db,
rocksdb::WriteBatch* intents_batch);

void AppendTransactionKeyPrefix(const TransactionId& transaction_id, docdb::KeyBytes* out);

// Class that is used while combining external intents into single key value pair.
Expand Down
4 changes: 4 additions & 0 deletions src/yb/docdb/docdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class DirectWriteToWriteBatchHandler : public rocksdb::DirectWriteHandler {
write_batch_->Put(key, value);
}

void SingleDelete(const Slice& key) override {
write_batch_->SingleDelete(key);
}

private:
rocksdb::WriteBatch *write_batch_;
};
Expand Down
Loading

0 comments on commit 93fc86a

Please sign in to comment.