Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FLASH-562: Fix race condition of batch command handling #277

Merged
merged 2 commits into from
Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions dbms/src/Flash/BatchCommandsHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <Flash/BatchCommandsHandler.h>
#include <Flash/CoprocessorHandler.h>
#include <common/ThreadPool.h>

namespace DB
{
Expand All @@ -10,38 +9,40 @@ BatchCommandsHandler::BatchCommandsHandler(BatchCommandsContext & batch_commands
: batch_commands_context(batch_commands_context_), request(request_), response(response_), log(&Logger::get("BatchCommandsHandler"))
{}

ThreadPool::Job BatchCommandsHandler::handleCommandJob(
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) const
{
return [&]() {
if (!req.has_coprocessor())
{
ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
return;
}

const auto & cop_req = req.coprocessor();
auto cop_resp = resp.mutable_coprocessor();

auto [context, status] = batch_commands_context.db_context_creation_func(&batch_commands_context.grpc_server_context);
if (!status.ok())
{
ret = status;
return;
}

CoprocessorContext cop_context(context, cop_req.context(), batch_commands_context.grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
};
}

grpc::Status BatchCommandsHandler::execute()
{
if (request.requests_size() == 0)
return grpc::Status::OK;

// TODO: Fill transport_layer_load into BatchCommandsResponse.

auto command_handler_func
= [](BatchCommandsContext::DBContextCreationFunc db_context_creation_func, grpc::ServerContext * grpc_server_context,
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) {
if (!req.has_coprocessor())
{
ret = grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
return;
}

const auto & cop_req = req.coprocessor();
auto cop_resp = resp.mutable_coprocessor();

auto [context, status] = db_context_creation_func(grpc_server_context);
if (!status.ok())
{
ret = status;
return;
}

CoprocessorContext cop_context(context, cop_req.context(), *grpc_server_context);
CoprocessorHandler cop_handler(cop_context, &cop_req, cop_resp);

ret = cop_handler.execute();
};

/// Shortcut for only one request by not going to thread pool.
if (request.requests_size() == 1)
{
Expand All @@ -51,7 +52,7 @@ grpc::Status BatchCommandsHandler::execute()
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(0));
auto ret = grpc::Status::OK;
command_handler_func(batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, ret);
handleCommandJob(req, *resp, ret)();
return ret;
}

Expand All @@ -65,18 +66,16 @@ grpc::Status BatchCommandsHandler::execute()

ThreadPool thread_pool(max_threads);

std::vector<grpc::Status> rets;
std::vector<grpc::Status> rets(request.requests_size());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pre-allocation is a must, otherwise the following rets.back() reference will become dangling after buffer re-allocation when inserting more elements.

size_t i = 0;

for (const auto & req : request.requests())
{
auto resp = response.add_responses();
response.add_request_ids(request.request_ids(i++));
rets.emplace_back(grpc::Status::OK);
thread_pool.schedule([&]() {
command_handler_func(
batch_commands_context.db_context_creation_func, &batch_commands_context.grpc_server_context, req, *resp, rets.back());
Copy link
Contributor Author

@zanmato1984 zanmato1984 Oct 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pitfall here is that, lambda will capture resp itself by reference (i.e. the address of variable resp, which is actually a local variable within the scope of for loop and become invalid when jumping out of for loop), rather than the reference to *resp which is the truly reference we want to capture.

The same rule applies to rets.back() as well, lambda captures rets rather than rets.back(), resulting in all threads calling rets.back() when running, i.e. writing results all to the last element.

Fix by evaluating *resp and rets.back() instantly as arguments of an extra function (handleCommandJob), and capture them within the function.

});

thread_pool.schedule(handleCommandJob(req, *resp, rets.back()));
}

thread_pool.wait();
Expand All @@ -85,7 +84,10 @@ grpc::Status BatchCommandsHandler::execute()
for (const auto & ret : rets)
{
if (!ret.ok())
{
response.Clear();
return ret;
}
}

return grpc::Status::OK;
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/BatchCommandsHandler.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Interpreters/Context.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>
#include <grpcpp/server_context.h>
#pragma GCC diagnostic push
Expand All @@ -18,10 +19,10 @@ struct BatchCommandsContext

/// Context creation function for each individual command - they should be handled isolated,
/// given that context is being used to pass arguments regarding queries.
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(grpc::ServerContext *)>;
using DBContextCreationFunc = std::function<std::tuple<Context, grpc::Status>(const grpc::ServerContext *)>;
DBContextCreationFunc db_context_creation_func;

grpc::ServerContext & grpc_server_context;
const grpc::ServerContext & grpc_server_context;

BatchCommandsContext(
Context & db_context_, DBContextCreationFunc && db_context_creation_func_, grpc::ServerContext & grpc_server_context_)
Expand All @@ -40,7 +41,11 @@ class BatchCommandsHandler
grpc::Status execute();

protected:
BatchCommandsContext & batch_commands_context;
ThreadPool::Job handleCommandJob(
const tikvpb::BatchCommandsRequest::Request & req, tikvpb::BatchCommandsResponse::Response & resp, grpc::Status & ret) const;

protected:
const BatchCommandsContext & batch_commands_context;
const tikvpb::BatchCommandsRequest & request;
tikvpb::BatchCommandsResponse & response;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/CoprocessorHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ try
cop_context.kv_context.region_epoch().version(), cop_context.kv_context.region_epoch().conf_ver(), std::move(key_ranges),
dag_response);
driver.execute();
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
cop_response->set_data(dag_response.SerializeAsString());
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done");
break;
}
case COP_REQ_TYPE_ANALYZE:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/CoprocessorHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ struct CoprocessorContext
{
Context & db_context;
const kvrpcpb::Context & kv_context;
grpc::ServerContext & grpc_server_context;
const grpc::ServerContext & grpc_server_context;

CoprocessorContext(Context & db_context_, const kvrpcpb::Context & kv_context_, grpc::ServerContext & grpc_server_context_)
CoprocessorContext(Context & db_context_, const kvrpcpb::Context & kv_context_, const grpc::ServerContext & grpc_server_context_)
: db_context(db_context_), kv_context(kv_context_), grpc_server_context(grpc_server_context_)
{}
};
Expand Down
15 changes: 7 additions & 8 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ grpc::Status FlashService::BatchCommands(

tikvpb::BatchCommandsResponse response;
BatchCommandsContext batch_commands_context(
context, [this](grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); }, *grpc_context);
context, [this](const grpc::ServerContext * grpc_server_context) { return createDBContext(grpc_server_context); },
*grpc_context);
BatchCommandsHandler batch_commands_handler(batch_commands_context, request, response);
auto ret = batch_commands_handler.execute();
if (!ret.ok())
Expand All @@ -75,22 +76,20 @@ grpc::Status FlashService::BatchCommands(
return grpc::Status::OK;
}

String getClientMetaVarWithDefault(grpc::ServerContext * grpc_context, const String & name, const String & default_val)
String getClientMetaVarWithDefault(const grpc::ServerContext * grpc_context, const String & name, const String & default_val)
{
if (grpc_context->client_metadata().count(name) != 1)
return default_val;
else
return String(grpc_context->client_metadata().find(name)->second.data());
if (auto it = grpc_context->client_metadata().find(name); it != grpc_context->client_metadata().end())
return it->second.data();
return default_val;
}

std::tuple<Context, grpc::Status> FlashService::createDBContext(grpc::ServerContext * grpc_context)
std::tuple<Context, grpc::Status> FlashService::createDBContext(const grpc::ServerContext * grpc_context) const
{
/// Create DB context.
Context context = server.context();
context.setGlobalContext(server.context());

/// Set a bunch of client information.
auto client_meta = grpc_context->client_metadata();
String query_id = getClientMetaVarWithDefault(grpc_context, "query_id", "");
context.setCurrentQueryId(query_id);
ClientInfo & client_info = context.getClientInfo();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/FlashService.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class FlashService final : public tikvpb::Tikv::Service, public std::enable_shar
grpc::ServerReaderWriter<tikvpb::BatchCommandsResponse, tikvpb::BatchCommandsRequest> * stream) override;

private:
std::tuple<Context, ::grpc::Status> createDBContext(grpc::ServerContext * grpc_contex);
std::tuple<Context, ::grpc::Status> createDBContext(const grpc::ServerContext * grpc_contex) const;

private:
IServer & server;
Expand Down