Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build table snapshot on write node for disaggregated read from S3 #6919

Merged
merged 15 commits into from
Mar 6, 2023
6 changes: 6 additions & 0 deletions dbms/src/Common/Config/TOMLConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ bool TOMLConfiguration::getRaw(const std::string & key, std::string & value) con
{
try
{
#ifndef NDEBUG
// Avoid the exception thrown in `get_qualified` to make debugging hard.
// It takes more time to check, just throw and catch to handle the non exist key in release mode.
if (!root->contains_qualified(key))
return false;
#endif
auto node = root->get_qualified(key);
if (auto str_node = std::dynamic_pointer_cast<cpptoml::value<std::string>>(node))
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ extern const int PAGE_SIZE_NOT_MATCH = 9006;
extern const int ILLFORMED_PAGE_NAME = 9007;
extern const int ILLFORMAT_RAFT_ROW = 9008;
extern const int REGION_DATA_SCHEMA_UPDATED = 9009;
extern const int REGION_EPOCH_NOT_MATCH = 9010;

extern const int LOCK_EXCEPTION = 10000;
extern const int VERSION_ERROR = 10001;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ namespace DB
M(tiflash_disaggregated_object_lock_request_duration_seconds, "Bucketed histogram of S3 object lock/delete request duration", Histogram, \
F(type_lock, {{"type", "cop"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete, {{"type", "batch"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_disaggregated_read_tasks_count, "Total number of storage engine disaggregated read tasks", Counter) \
M(tiflash_raft_command_duration_seconds, "Bucketed histogram of some raft command: apply snapshot", \
Histogram, /* these command usually cost servel seconds, increase the start bucket to 50ms */ \
F(type_ingest_sst, {{"type", "ingest_sst"}}, ExpBuckets{0.05, 2, 10}), \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <common/defines.h>
#include <common/types.h>

#define DBMS_NAME "ClickHouse"
#define DBMS_VERSION_MAJOR 1
Expand Down Expand Up @@ -48,6 +49,8 @@
#define DEFAULT_MPP_TASK_RUNNING_TIMEOUT (DEFAULT_MPP_TASK_TIMEOUT + 30)
#define DEFAULT_MPP_TASK_WAITING_TIMEOUT 36000

static constexpr UInt64 DEFAULT_DISAGG_TASK_TIMEOUT_SEC = 3600;

#define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L
#define DEFAULT_BATCH_SEND_MIN_LIMIT (-1)

Expand Down
25 changes: 25 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,31 @@ DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMet
initOutputInfo();
}

DAGContext::DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_)
: dag_request(&dag_request_)
, dummy_query_string(dag_request->DebugString())
, dummy_ast(makeDummyQuery())
, tidb_host(compute_node_host_)
, collect_execution_summaries(dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries())
, is_mpp_task(false)
, is_root_mpp_task(false)
, is_batch_cop(false)
, is_disaggregated_task(true)
, tables_regions_info(std::move(tables_regions_info_))
, log(std::move(log_))
, flags(dag_request->flags())
, sql_mode(dag_request->sql_mode())
, disaggregated_id(std::make_unique<DM::DisaggTaskId>(task_id_))
, max_recorded_error_count(getMaxErrorCount(*dag_request))
, warnings(max_recorded_error_count)
, warning_count(0)
{
RUNTIME_CHECK(dag_request->has_root_executor() && dag_request->root_executor().has_executor_id());
return_executor_id = dag_request->root_executor().has_executor_id() || dag_request->executors(0).has_executor_id();

initOutputInfo();
}

// for test
DAGContext::DAGContext(UInt64 max_error_count_)
: dag_request(nullptr)
Expand Down
18 changes: 16 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Storages/DeltaMerge/ScanContext.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#ifdef __clang__
Expand All @@ -33,6 +32,8 @@
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Remote/DisaggTaskId.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
Expand Down Expand Up @@ -132,6 +133,9 @@ class DAGContext
// for mpp
DAGContext(const tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_);

// for disaggregated task on write node
DAGContext(const tipb::DAGRequest & dag_request_, const DM::DisaggTaskId & task_id_, TablesRegionsInfo && tables_regions_info_, const String & compute_node_host_, LoggerPtr log_);

// for test
explicit DAGContext(UInt64 max_error_count_);

Expand Down Expand Up @@ -188,6 +192,10 @@ class DAGContext
{
return mpp_task_id;
}
const std::unique_ptr<DM::DisaggTaskId> & getDisaggTaskId() const
{
return disaggregated_id;
}

std::pair<bool, double> getTableScanThroughput();

Expand Down Expand Up @@ -278,13 +286,17 @@ class DAGContext
Clock::time_point read_wait_index_start_timestamp{Clock::duration::zero()};
Clock::time_point read_wait_index_end_timestamp{Clock::duration::zero()};
String table_scan_executor_id;

// For mpp/cop/batchcop this is the host of tidb
// For disaggregated read, this is the host of compute node
String tidb_host = "Unknown";
bool collect_execution_summaries{};
bool return_executor_id{};
String root_executor_id = "";
String root_executor_id;
/* const */ bool is_mpp_task = false;
/* const */ bool is_root_mpp_task = false;
/* const */ bool is_batch_cop = false;
/* const */ bool is_disaggregated_task = false; // a disagg task handling by the write node
// `tunnel_set` is always set by `MPPTask` and is intended to be used for `DAGQueryBlockInterpreter`.
MPPTunnelSetPtr tunnel_set;
TablesRegionsInfo tables_regions_info;
Expand Down Expand Up @@ -334,6 +346,8 @@ class DAGContext
UInt64 sql_mode;
mpp::TaskMeta mpp_task_meta;
const MPPTaskId mpp_task_id = MPPTaskId::unknown_mpp_task_id;
// The task id for disaggregated read
const std::unique_ptr<DM::DisaggTaskId> disaggregated_id;
/// max_recorded_error_count is the max error/warning need to be recorded in warnings
UInt64 max_recorded_error_count;
ConcurrentBoundedQueue<tipb::Error> warnings;
Expand Down
93 changes: 82 additions & 11 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <Interpreters/Context.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshotManager.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/IManageableStorage.h>
#include <Storages/MutableSupport.h>
Expand All @@ -41,6 +44,7 @@
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/TMTContext.h>
#include <TiDB/Schema/SchemaSyncer.h>
#include <common/logger_useful.h>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
Expand All @@ -51,6 +55,11 @@

namespace DB
{
namespace ErrorCodes
{
extern const int REGION_EPOCH_NOT_MATCH;
} // namespace ErrorCodes

namespace FailPoints
{
extern const char region_exception_after_read_from_storage_some_error[];
Expand Down Expand Up @@ -223,13 +232,15 @@ void injectFailPointForLocalRead([[maybe_unused]] const SelectQueryInfo & query_
if (random() % 100 > 50)
region_ids.insert(info.region_id);
}
LOG_WARNING(Logger::get(), "failpoint inject region_exception_after_read_from_storage_some_error, throw RegionException with region_ids={}", region_ids);
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
});
fiu_do_on(FailPoints::region_exception_after_read_from_storage_all_error, {
const auto & regions_info = query_info.mvcc_query_info->regions_query_info;
RegionException::UnavailableRegions region_ids;
for (const auto & info : regions_info)
region_ids.insert(info.region_id);
LOG_WARNING(Logger::get(), "failpoint inject region_exception_after_read_from_storage_all_error, throw RegionException with region_ids={}", region_ids);
throw RegionException(std::move(region_ids), RegionException::RegionReadStatus::NOT_FOUND);
});
}
Expand Down Expand Up @@ -276,20 +287,19 @@ DAGStorageInterpreter::DAGStorageInterpreter(
}
}

// Apply learner read to ensure we can get strong consistent with TiKV Region
// leaders. If the local Regions do not match the requested Regions, then build
// request to retry fetching data from other nodes.
void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
{
prepare();
prepare(); // learner read

executeImpl(pipeline);
}

void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
auto & dag_context = dagContext();

auto scan_context = std::make_shared<DM::ScanContext>();
dagContext().scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

if (!mvcc_query_info->regions_query_info.empty())
Expand All @@ -300,7 +310,18 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
// Should build `remote_requests` and `null_stream` under protect of `table_structure_lock`.
auto null_stream_if_empty = std::make_shared<NullBlockInputStream>(storage_for_logical_table->getSampleBlockForColumns(required_columns));

// Note that `buildRemoteRequests` must be called after `buildLocalStreams` because
// `buildLocalStreams` will setup `region_retry_from_local_region` and we must
// retry those regions or there will be data lost.
Comment on lines +313 to +315
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detect by test case "tests/fullstack-test/fault-inject/exception_after_read_from_storage.test"
/cc @breezewish

auto remote_requests = buildRemoteRequests(scan_context);
if (dag_context.is_disaggregated_task && !remote_requests.empty())
{
// This means RN is sending requests with stale region info, we simply reject the request
// and ask RN to send requests again with correct region info. When RN updates region info,
// RN may be sending requests to other WN.

throw Exception("Rejected disaggregated DAG execute because RN region info does not match", DB::ErrorCodes::REGION_EPOCH_NOT_MATCH);
}

// A failpoint to test pause before alter lock released
FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);
Expand Down Expand Up @@ -348,6 +369,9 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
}
}

// Apply learner read to ensure we can get strong consistent with TiKV Region
// leaders. If the local Regions do not match the requested Regions, then build
// request to retry fetching data from other nodes.
void DAGStorageInterpreter::prepare()
{
// About why we do learner read before acquiring structure lock on Storage(s).
Expand Down Expand Up @@ -600,6 +624,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(mvcc_query_info->resolve_locks, mvcc_query_info->read_tso, mvcc_query_info->scan_context);
ret.emplace(physical_table_id, std::move(query_info));
}
// Dispatch the regions_query_info to different physical table's query_info
for (auto & r : mvcc_query_info->regions_query_info)
{
ret[r.physical_table_id].mvcc_query_info->regions_query_info.push_back(r);
Expand Down Expand Up @@ -675,15 +700,17 @@ bool DAGStorageInterpreter::checkRetriableForBatchCopOrMPP(
}
}

void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
DM::Remote::DisaggPhysicalTableReadSnapshotPtr
DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
const TableID & table_id,
const SelectQueryInfo & query_info,
DAGPipeline & pipeline,
size_t max_block_size)
{
DM::Remote::DisaggPhysicalTableReadSnapshotPtr table_snap;
size_t region_num = query_info.mvcc_query_info->regions_query_info.size();
if (region_num == 0)
return;
return table_snap;

assert(storages_with_structure_lock.find(table_id) != storages_with_structure_lock.end());
auto & storage = storages_with_structure_lock[table_id].storage;
Expand All @@ -693,8 +720,25 @@ void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
{
try
{
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
if (!dag_context.is_disaggregated_task)
{
// build local inputstreams
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
pipeline.streams = storage->read(required_columns, query_info, context, from_stage, max_block_size, max_streams);
}
else
{
// build a snapshot on write node
StorageDeltaMergePtr delta_merge_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
RUNTIME_CHECK_MSG(delta_merge_storage != nullptr, "delta_merge_storage which cast from storage is null");
table_snap = delta_merge_storage->writeNodeBuildRemoteReadSnapshot(required_columns, query_info, context, max_streams);
// TODO: could be shared on the logical table level
table_snap->output_field_types = std::make_shared<std::vector<tipb::FieldType>>();
*table_snap->output_field_types = collectOutputFieldTypes(*dag_context.dag_request);
RUNTIME_CHECK(table_snap->output_field_types->size() == table_snap->column_defines->size(),
table_snap->output_field_types->size(),
table_snap->column_defines->size());
}

injectFailPointForLocalRead(query_info);

Expand Down Expand Up @@ -731,6 +775,7 @@ void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable(
throw;
}
}
return table_snap;
}

void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max_block_size)
Expand All @@ -739,22 +784,48 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max
size_t total_local_region_num = mvcc_query_info->regions_query_info.size();
if (total_local_region_num == 0)
return;
mvcc_query_info->scan_context->total_local_region_num = total_local_region_num;
const auto table_query_infos = generateSelectQueryInfos();
bool has_multiple_partitions = table_query_infos.size() > 1;
// MultiPartitionStreamPool will be disabled in no partition mode or single-partition case
std::shared_ptr<MultiPartitionStreamPool> stream_pool = has_multiple_partitions ? std::make_shared<MultiPartitionStreamPool>() : nullptr;

auto disaggregated_snap = std::make_shared<DM::Remote::DisaggReadSnapshot>();
for (const auto & table_query_info : table_query_infos)
{
DAGPipeline current_pipeline;
const TableID table_id = table_query_info.first;
const SelectQueryInfo & query_info = table_query_info.second;
buildLocalStreamsForPhysicalTable(table_id, query_info, current_pipeline, max_block_size);
auto table_snap = buildLocalStreamsForPhysicalTable(table_id, query_info, current_pipeline, max_block_size);
if (table_snap)
{
disaggregated_snap->addTask(table_id, std::move(table_snap));
}

if (has_multiple_partitions)
stream_pool->addPartitionStreams(current_pipeline.streams);
else
pipeline.streams.insert(pipeline.streams.end(), current_pipeline.streams.begin(), current_pipeline.streams.end());
}

LOG_DEBUG(
log,
"local streams built, is_disaggregated_task={} snap_id={}",
dag_context.is_disaggregated_task,
dag_context.is_disaggregated_task ? *dag_context.getDisaggTaskId() : DM::DisaggTaskId::unknown_disaggregated_task_id);

if (dag_context.is_disaggregated_task)
{
// register the snapshot to manager
auto & tmt = context.getTMTContext();
auto * snap_manager = tmt.getDisaggSnapshotManager();
const auto & snap_id = *dag_context.getDisaggTaskId();
auto timeout_s = context.getSettingsRef().disagg_task_snapshot_timeout;
auto expired_at = Clock::now() + std::chrono::seconds(timeout_s);
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
bool register_snapshot_ok = snap_manager->registerSnapshot(snap_id, std::move(disaggregated_snap), expired_at);
RUNTIME_CHECK_MSG(register_snapshot_ok, "disaggregated task has been registered {}", snap_id);
LOG_INFO(log, "task snapshot registered, snapshot_id={}", snap_id);
}

if (has_multiple_partitions)
{
String req_info = dag_context.isMPPTask() ? dag_context.getMPPTaskId().toString() : "";
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/RemoteRequest.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
#include <Storages/RegionQueryInfo.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableLockHolder.h>
Expand Down Expand Up @@ -70,7 +71,8 @@ class DAGStorageInterpreter
const SelectQueryInfo & query_info,
const RegionException & e,
int num_allow_retry);
void buildLocalStreamsForPhysicalTable(
DM::Remote::DisaggPhysicalTableReadSnapshotPtr
buildLocalStreamsForPhysicalTable(
const TableID & table_id,
const SelectQueryInfo & query_info,
DAGPipeline & pipeline,
Expand Down
Loading