Skip to content

Commit

Permalink
[#19138] Docdb/ASH: Defining Macros and wait states for ASH
Browse files Browse the repository at this point in the history
Summary:
Implements the basic infrastructure for keeping track of the wait states.

WaitStateInfo is the class that stores the per-rpc wait-states.
Each incoming RPC owns one such object, and it is updated at various points
in the code using a framework that is similar to util::Trace.

We define various macros to adopt wait-state info objects -- to update the thread-local wait state
to be used in other parts of the code.

WaitStateInfo largely consists of

# an enum class WaitStateCode which is expected to be updated frequently;
# some metadata (AshMetadata) which should generally not change during the life of the Rpc; and
# AshAuxInfo contains additional/optional information which may be exported.

Upgrade/Rollback safety: Safe to upgrade/downgrade. Protos only affect the new functionality -- ASH, and do not interfere with existing functionality. ASH will be unavailable if downgraded.

Test Plan:
Jenkins: compile only
ybd --cxx-test wait_state-test

Reviewers: arybochkin, jason

Reviewed By: arybochkin

Subscribers: jason, hsunder, asaha, hbhanawat, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D28647
  • Loading branch information
amitanandaiyer committed Nov 17, 2023
1 parent e8b3b16 commit a77214b
Show file tree
Hide file tree
Showing 7 changed files with 631 additions and 2 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ add_postgres_shared_library(pq "${LIBPQ_SHARED_LIB}")
add_postgres_shared_library(yb_pgbackend "${YB_PGBACKEND_SHARED_LIB}")

set(YB_SUBDIR_NAMES
ash
bfcommon
bfpg
bfql
Expand Down
38 changes: 38 additions & 0 deletions src/yb/ash/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) YugaByteDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing permissions and limitations
# under the License.
#

#######################################
# yb_ash
#######################################


set(ASH_SRCS
wait_state.cc
)

set(ASH_LIBS
yb_common
yb_common_proto
yb_util
)

ADD_YB_LIBRARY(yb_ash
SRCS ${ASH_SRCS}
DEPS ${ASH_LIBS})

#######################################
# wait_state-test
#######################################

ADD_YB_TEST(wait_state-test)
YB_TEST_TARGET_LINK_LIBRARIES(wait_state-test yb_ash)
86 changes: 86 additions & 0 deletions src/yb/ash/wait_state-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/ash/wait_state.h"

#include "yb/common/common.pb.h"

#include "yb/util/random_util.h"
#include "yb/util/test_util.h"

namespace yb {

using yb::ash::AshAuxInfo;
using yb::ash::AshMetadata;

class WaitStateTest : public YBTest {};

HostPort RandomHostPort() {
return HostPort(
Format("host-$0", RandomUniformInt<uint16_t>(0, 10)), RandomUniformInt<uint16_t>());
}

AshMetadata GenerateRandomMetadata() {
return AshMetadata{
.root_request_id{Uuid::Generate()},
.yql_endpoint_tserver_uuid{Uuid::Generate()},
.query_id = RandomUniformInt<int64_t>(),
.rpc_request_id = RandomUniformInt<int64_t>(),
.client_host_port = RandomHostPort()};
}

TEST(WaitStateTest, TestToAndFromPB) {
AshMetadata meta1 = GenerateRandomMetadata();
AshMetadataPB pb;
meta1.ToPB(&pb);
ASSERT_EQ(pb.root_request_id().size(), kUuidSize);
ASSERT_EQ(pb.yql_endpoint_tserver_uuid().size(), kUuidSize);
AshMetadata meta2 = AshMetadata::FromPB(pb);
ASSERT_EQ(meta1.root_request_id, meta2.root_request_id);
ASSERT_EQ(meta1.yql_endpoint_tserver_uuid, meta2.yql_endpoint_tserver_uuid);
ASSERT_EQ(meta1.query_id, meta2.query_id);
ASSERT_EQ(meta1.rpc_request_id, meta2.rpc_request_id);
ASSERT_EQ(meta1.client_host_port, meta2.client_host_port);
}

TEST(WaitStateTest, TestUpdate) {
AshMetadata meta1 = GenerateRandomMetadata();
const AshMetadata meta1_copy = meta1;
// Update 3 fields, rest unset.
AshMetadataPB pb1;
auto pb1_root_request_id = Uuid::Generate();
pb1_root_request_id.ToBytes(pb1.mutable_root_request_id());
pb1.set_query_id(RandomUniformInt<int64_t>());
HostPortToPB(RandomHostPort(), pb1.mutable_client_host_port());
meta1.UpdateFrom(AshMetadata::FromPB(pb1));
ASSERT_EQ(meta1.root_request_id, pb1_root_request_id);
ASSERT_EQ(meta1.yql_endpoint_tserver_uuid, meta1_copy.yql_endpoint_tserver_uuid);
ASSERT_EQ(meta1.query_id, pb1.query_id());
ASSERT_EQ(meta1.rpc_request_id, meta1_copy.rpc_request_id);
ASSERT_EQ(meta1.client_host_port, HostPortFromPB(pb1.client_host_port()));

meta1 = meta1_copy;
// Update 2 other fields, rest unset.
AshMetadataPB pb2;
auto pb2_yql_endpoint_tserver_uuid = Uuid::Generate();
pb2_yql_endpoint_tserver_uuid.ToBytes(pb2.mutable_yql_endpoint_tserver_uuid());
pb2.set_rpc_request_id(RandomUniformInt<int64_t>());
meta1.UpdateFrom(AshMetadata::FromPB(pb2));
ASSERT_EQ(meta1.root_request_id, meta1_copy.root_request_id);
ASSERT_EQ(meta1.yql_endpoint_tserver_uuid, pb2_yql_endpoint_tserver_uuid);
ASSERT_EQ(meta1.query_id, meta1_copy.query_id);
ASSERT_EQ(meta1.rpc_request_id, pb2.rpc_request_id());
ASSERT_EQ(meta1.client_host_port, meta1_copy.client_host_port);
}

} // namespace yb
160 changes: 160 additions & 0 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/ash/wait_state.h"

#include <arpa/inet.h>

#include "yb/util/tostring.h"
#include "yb/util/trace.h"

namespace yb::ash {

namespace {
// The current wait_state_ for this thread.
thread_local WaitStateInfoPtr threadlocal_wait_state_;
}

void AshMetadata::set_client_host_port(const HostPort &host_port) {
client_host_port = host_port;
}

std::string AshMetadata::ToString() const {
return YB_STRUCT_TO_STRING(
yql_endpoint_tserver_uuid, root_request_id, query_id, rpc_request_id, client_host_port);
}

std::string AshAuxInfo::ToString() const {
return YB_STRUCT_TO_STRING(table_id, tablet_id, method);
}

void AshAuxInfo::UpdateFrom(const AshAuxInfo &other) {
if (!other.tablet_id.empty()) {
tablet_id = other.tablet_id;
}
if (!other.table_id.empty()) {
table_id = other.table_id;
}
if (!other.method.empty()) {
method = other.method;
}
}

WaitStateInfo::WaitStateInfo(AshMetadata &&meta)
: metadata_(std::move(meta)) {}

void WaitStateInfo::set_code(WaitStateCode c) {
TRACE(ash::ToString(c));
code_ = c;
}

WaitStateCode WaitStateInfo::code() const {
return code_;
}

std::atomic<WaitStateCode>& WaitStateInfo::mutable_code() {
return code_;
}

std::string WaitStateInfo::ToString() const {
std::lock_guard lock(mutex_);
return YB_CLASS_TO_STRING(metadata, code, aux_info);
}

void WaitStateInfo::set_rpc_request_id(int64_t rpc_request_id) {
std::lock_guard lock(mutex_);
metadata_.rpc_request_id = rpc_request_id;
}

void WaitStateInfo::set_root_request_id(const Uuid &root_request_id) {
std::lock_guard lock(mutex_);
metadata_.root_request_id = root_request_id;
}

void WaitStateInfo::set_query_id(int64_t query_id) {
std::lock_guard lock(mutex_);
metadata_.query_id = query_id;
}

int64_t WaitStateInfo::query_id() {
std::lock_guard lock(mutex_);
return metadata_.query_id;
}

void WaitStateInfo::set_client_host_port(const HostPort &host_port) {
std::lock_guard lock(mutex_);
metadata_.set_client_host_port(host_port);
}

void WaitStateInfo::set_yql_endpoint_tserver_uuid(const Uuid &yql_endpoint_tserver_uuid) {
std::lock_guard lock(mutex_);
metadata_.yql_endpoint_tserver_uuid = yql_endpoint_tserver_uuid;
}

void WaitStateInfo::UpdateMetadata(const AshMetadata &meta) {
std::lock_guard lock(mutex_);
metadata_.UpdateFrom(meta);
}

void WaitStateInfo::UpdateAuxInfo(const AshAuxInfo &aux) {
std::lock_guard lock(mutex_);
aux_info_.UpdateFrom(aux);
}

void WaitStateInfo::SetCurrentWaitState(WaitStateInfoPtr wait_state) {
threadlocal_wait_state_ = std::move(wait_state);
}

WaitStateInfoPtr WaitStateInfo::CurrentWaitState() {
if (!threadlocal_wait_state_) {
VLOG_WITH_FUNC(3) << " returning nullptr";
}
return threadlocal_wait_state_;
}

//
// ScopedWaitState
//
ScopedAdoptWaitState::ScopedAdoptWaitState(WaitStateInfoPtr wait_state)
: prev_state_(WaitStateInfo::CurrentWaitState()) {
WaitStateInfo::SetCurrentWaitState(std::move(wait_state));
}

ScopedAdoptWaitState::~ScopedAdoptWaitState() {
WaitStateInfo::SetCurrentWaitState(std::move(prev_state_));
}

//
// ScopedWaitStatus
//
ScopedWaitStatus::ScopedWaitStatus(WaitStateInfoPtr wait_state, WaitStateCode code)
: wait_state_(std::move(wait_state)), code_(code) {
if (wait_state_) {
prev_code_ = wait_state_->code();
wait_state_->set_code(code_);
}
}

ScopedWaitStatus::~ScopedWaitStatus() {
if (wait_state_) {
auto expected = code_;
if (!wait_state_->mutable_code().compare_exchange_strong(expected, prev_code_)) {
VLOG(3) << __func__ << " not reverting to prev_code_: " << prev_code_ << " since "
<< " current_code: " << expected << " is not " << code_;
}
}
wait_state_ = nullptr;
prev_code_ = WaitStateCode::Unused;
}

} // namespace yb::ash
Loading

0 comments on commit a77214b

Please sign in to comment.