Skip to content

Commit

Permalink
[Chore](log) adjust error code on too many filtered rows (apache#26168)
Browse files Browse the repository at this point in the history
  • Loading branch information
BiteTheDDDDt authored Oct 31, 2023
1 parent 9d83948 commit 696ecc8
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 117 deletions.
4 changes: 2 additions & 2 deletions be/src/common/exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "util/stack_util.h"
namespace doris {

Exception::Exception(int code, const std::string_view msg) {
Exception::Exception(int code, const std::string_view& msg) {
_code = code;
_err_msg = std::make_unique<ErrMsg>();
_err_msg->_msg = msg;
Expand All @@ -31,7 +31,7 @@ Exception::Exception(int code, const std::string_view msg) {
}
}

Exception::Exception(const Exception& nested, int code, const std::string_view msg) {
Exception::Exception(const Exception& nested, int code, const std::string_view& msg) {
_code = code;
_err_msg = std::make_unique<ErrMsg>();
_err_msg->_msg = msg;
Expand Down
12 changes: 5 additions & 7 deletions be/src/common/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ inline thread_local int enable_thread_catch_bad_alloc = 0;
class Exception : public std::exception {
public:
Exception() : _code(ErrorCode::OK) {}
Exception(int code, const std::string_view msg);
Exception(int code, const std::string_view& msg);
// add nested exception as first param, or the template may could not find
// the correct method for ...args
Exception(const Exception& nested, int code, const std::string_view msg);
Exception(const Exception& nested, int code, const std::string_view& msg);

// Format message with fmt::format, like the logging functions.
template <typename... Args>
Exception(int code, const std::string_view fmt, Args&&... args)
Exception(int code, const std::string_view& fmt, Args&&... args)
: Exception(code, fmt::format(fmt, std::forward<Args>(args)...)) {}

int code() const { return _code; }
Expand Down Expand Up @@ -96,9 +96,8 @@ inline const std::string& Exception::to_string() const {
return Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
} else { \
return Status::Error<false>(e.code(), e.to_string()); \
} \
return Status::Error<false>(e.code(), e.to_string()); \
} \
} while (0)

Expand All @@ -118,8 +117,7 @@ inline const std::string& Exception::to_string() const {
return Status::MemoryLimitExceeded(fmt::format( \
"PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__)); \
} else { \
return Status::Error<false>(e.code(), e.to_string()); \
} \
return Status::Error<false>(e.code(), e.to_string()); \
} \
} while (0)
12 changes: 2 additions & 10 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::UNINITIALIZED
&& code != ErrorCode::PIP_WAIT_FOR_RF
&& code != ErrorCode::PIP_WAIT_FOR_SC
&& code != ErrorCode::INVALID_ARGUMENT;
&& code != ErrorCode::INVALID_ARGUMENT
&& code != ErrorCode::DATA_QUALITY_ERR;
}
// clang-format on

Expand Down Expand Up @@ -575,15 +576,6 @@ inline std::string Status::to_string() const {
} \
} while (false);

#define RETURN_WITH_WARN_IF_ERROR(stmt, ret_code, warning_prefix) \
do { \
Status _s = (stmt); \
if (UNLIKELY(!_s.ok())) { \
LOG(WARNING) << (warning_prefix) << ", error: " << _s; \
return ret_code; \
} \
} while (false);

#define RETURN_NOT_OK_STATUS_WITH_WARN(stmt, warning_prefix) \
do { \
Status _s = (stmt); \
Expand Down
17 changes: 10 additions & 7 deletions be/src/http/http_handler_with_auth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {

#ifndef BE_TEST
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
RETURN_WITH_WARN_IF_ERROR(
ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&auth_result, &auth_request](FrontendServiceConnection& client) {
client->checkAuth(auth_result, auth_request);
}),
-1, "checkAuth failed");
{
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&auth_result, &auth_request](FrontendServiceConnection& client) {
client->checkAuth(auth_result, auth_request);
});
if (!status) {
return -1;
}
}
#else
CHECK(_exec_env == nullptr);
#endif
Expand Down
2 changes: 1 addition & 1 deletion be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
} else {
pipe_id = runtime_state->fragment_instance_id();
}
*file_reader = multi_table_pipe->getPipe(pipe_id);
*file_reader = multi_table_pipe->get_pipe(pipe_id);
LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
<< " pipe: " << (*file_reader).get();

Expand Down
15 changes: 8 additions & 7 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para

if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
RETURN_IF_ERROR(
putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]));
put_pipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id)
<< " table=" << plan.table_name;
} else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
RETURN_IF_ERROR(putPipe(pipe_id, _planned_pipes[plan.table_name]));
RETURN_IF_ERROR(put_pipe(pipe_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "pipe_id=" << pipe_id << "table=" << plan.table_name;
} else {
LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
Expand All @@ -247,7 +247,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
if (num_selected_rows > 0 &&
(double)state->num_rows_load_filtered() / num_selected_rows >
_ctx->max_filter_ratio) {
*status = Status::InternalError("too many filtered rows");
*status = Status::DataQualityError("too many filtered rows");
}
if (_number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
_ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
Expand Down Expand Up @@ -300,7 +300,8 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para

#endif

Status MultiTablePipe::putPipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe) {
Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
Expand All @@ -310,16 +311,16 @@ Status MultiTablePipe::putPipe(const TUniqueId& pipe_id, std::shared_ptr<io::Str
return Status::OK();
}

std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::getPipe(const TUniqueId& pipe_id) {
std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it == std::end(_pipe_map)) {
return std::shared_ptr<io::StreamLoadPipe>(nullptr);
return {};
}
return it->second;
}

void MultiTablePipe::removePipe(const TUniqueId& pipe_id) {
void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
Expand Down
6 changes: 3 additions & 3 deletions be/src/io/fs/multi_table_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ class MultiTablePipe : public KafkaConsumerPipe {
void cancel(const std::string& reason) override;

// register <instance id, pipe> pair
Status putPipe(const TUniqueId& fragment_instance_id, std::shared_ptr<io::StreamLoadPipe> pipe);
Status put_pipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe);

std::shared_ptr<io::StreamLoadPipe> getPipe(const TUniqueId& fragment_instance_id);
std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& pipe_id);

void removePipe(const TUniqueId& fragment_instance_id);
void remove_pipe(const TUniqueId& pipe_id);

private:
// parse table name from data
Expand Down
6 changes: 1 addition & 5 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,7 @@ class SchemaChange {
TabletSharedPtr new_tablet, TabletSharedPtr base_tablet,
TabletSchemaSPtr base_tablet_schema) {
if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
RETURN_WITH_WARN_IF_ERROR(
rowset_writer->flush(), Status::Error<ErrorCode::INVALID_ARGUMENT>(""),
fmt::format("create empty version for schema change failed. version= {}-{}",
rowset_writer->version().first, rowset_writer->version().second));

RETURN_IF_ERROR(rowset_writer->flush());
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
(double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical reasons,
// some users may rely on this error message.
*status = Status::InternalError("too many filtered rows");
*status = Status::DataQualityError("too many filtered rows");
}
if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
Expand Down Expand Up @@ -254,7 +254,7 @@ Status StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
request.__set_operation(ctx->txn_operation);
request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
request.__set_label(ctx->label);
if (ctx->txn_id != ctx->default_txn_id) {
if (ctx->txn_id != doris::StreamLoadContext::default_txn_id) {
request.__set_txnId(ctx->txn_id);
}

Expand Down
Loading

0 comments on commit 696ecc8

Please sign in to comment.