Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](cloud) dump error file when footer parse failure #40943

Merged
merged 4 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ void register_suites() {
arg0->second = true;
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=Segment::parse_footer:magic_number_corruption'
suite_map.emplace("Segment::parse_footer:magic_number_corruption", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Segment::parse_footer:magic_number_corruption_inj", [](auto&& args) {
if (auto p = std::any_cast<uint8_t*>(args[0])) {
memset(p, 0, 12);
} else {
std::cerr << "Failed to cast std::any to uint8_t*" << std::endl;
}
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,9 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second, "0"); // zero for not
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
// rename ttl filename to new format during read, with some performance cost
DEFINE_mBool(translate_to_new_ttl_format_during_read, "false");
DEFINE_mBool(enbale_dump_error_file, "true");
// limit the max size of error log on disk
DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB

DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
Expand Down
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,9 @@ DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
DECLARE_Bool(enable_ttl_cache_evict_using_lru);
// rename ttl filename to new format during read, with some performance cost
DECLARE_Bool(translate_to_new_ttl_format_during_read);
DECLARE_mBool(enbale_dump_error_file);
// limit the max size of error log on disk
DECLARE_mInt64(file_cache_error_log_limit_bytes);

// inverted index searcher cache
// cache entry stay time after lookup
Expand Down
86 changes: 85 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
#include <memory>
#include <utility>

#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "cpp/sync_point.h"
#include "io/cache/block_file_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/cached_remote_file_reader.h"
#include "io/fs/file_reader.h"
#include "io/fs/file_system.h"
#include "io/io_common.h"
Expand Down Expand Up @@ -294,6 +296,67 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o
return iter->get()->init(read_options);
}

Status Segment::_write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
io::IOContext& io_ctx) {
if (!config::enbale_dump_error_file || !doris::config::is_cloud_mode()) {
return Status::OK();
}

std::string file_name = _rowset_id.to_string() + "_" + std::to_string(_segment_id) + ".dat";
std::string dir_path = io::FileCacheFactory::instance()->get_base_paths()[0] + "/error_file/";
Status create_st = io::global_local_filesystem()->create_directory(dir_path, true);
if (!create_st.ok() && !create_st.is<ErrorCode::ALREADY_EXIST>()) {
LOG(WARNING) << "failed to create error file dir: " << create_st.to_string();
return create_st;
}
size_t dir_size = 0;
RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(dir_path, &dir_size));
if (dir_size > config::file_cache_error_log_limit_bytes) {
LOG(WARNING) << "error file dir size is too large: " << dir_size;
return Status::OK();
}

std::string error_part;
error_part.resize(bytes_read);
std::string part_path = dir_path + file_name + ".part_offset_" + std::to_string(offset);
LOG(WARNING) << "writer error part to " << part_path;
bool is_part_exist = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(part_path, &is_part_exist));
if (is_part_exist) {
LOG(WARNING) << "error part already exists: " << part_path;
} else {
std::unique_ptr<io::FileWriter> part_writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(part_path, &part_writer));
RETURN_IF_ERROR(part_writer->append(Slice(data, bytes_read)));
RETURN_IF_ERROR(part_writer->close());
}

std::string error_file;
error_file.resize(file_size);
auto* cached_reader = dynamic_cast<io::CachedRemoteFileReader*>(_file_reader.get());
if (cached_reader == nullptr) {
return Status::InternalError("file reader is not CachedRemoteFileReader");
}
size_t error_file_bytes_read = 0;
RETURN_IF_ERROR(cached_reader->get_remote_reader()->read_at(
0, Slice(error_file.data(), file_size), &error_file_bytes_read, &io_ctx));
DCHECK(error_file_bytes_read == file_size);
//std::string file_path = dir_path + std::to_string(cur_time) + "_" + ss.str();
std::string file_path = dir_path + file_name;
LOG(WARNING) << "writer error file to " << file_path;
bool is_file_exist = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(file_path, &is_file_exist));
if (is_file_exist) {
LOG(WARNING) << "error file already exists: " << part_path;
} else {
std::unique_ptr<io::FileWriter> writer;
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file_path, &writer));
RETURN_IF_ERROR(writer->append(Slice(error_file.data(), file_size)));
RETURN_IF_ERROR(writer->close());
}
return Status::OK(); // already exists
};

Status Segment::_parse_footer(SegmentFooterPB* footer) {
// Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
auto file_size = _file_reader->size();
Expand All @@ -310,8 +373,14 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
RETURN_IF_ERROR(
_file_reader->read_at(file_size - 12, Slice(fixed_buf, 12), &bytes_read, &io_ctx));
DCHECK_EQ(bytes_read, 12);

TEST_SYNC_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption", fixed_buf);
TEST_INJECTION_POINT_CALLBACK("Segment::parse_footer:magic_number_corruption_inj", fixed_buf);
if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
Status st =
_write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption(
"Bad segment file {}: file_size: {}, magic number not match, cache_key: {}",
_file_reader->path().native(), file_size,
Expand All @@ -321,6 +390,11 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
// read footer PB
uint32_t footer_length = decode_fixed32_le(fixed_buf);
if (file_size < 12 + footer_length) {
Status st =
_write_error_file(file_size, file_size - 12, bytes_read, (char*)fixed_buf, io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}",
_file_reader->path().native(), file_size, 12 + footer_length,
file_cache_key_str(_file_reader->path().native()));
Expand All @@ -336,6 +410,11 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
uint32_t expect_checksum = decode_fixed32_le(fixed_buf + 4);
uint32_t actual_checksum = crc32c::Value(footer_buf.data(), footer_buf.size());
if (actual_checksum != expect_checksum) {
Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
footer_buf.data(), io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption(
"Bad segment file {}: file_size = {}, footer checksum not match, actual={} "
"vs expect={}, cache_key: {}",
Expand All @@ -345,6 +424,11 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {

// deserialize footer PB
if (!footer->ParseFromString(footer_buf)) {
Status st = _write_error_file(file_size, file_size - 12 - footer_length, bytes_read,
footer_buf.data(), io_ctx);
if (!st.ok()) {
LOG(WARNING) << "failed to write error file: " << st.to_string();
}
return Status::Corruption(
"Bad segment file {}: file_size = {}, failed to parse SegmentFooterPB, cache_key: ",
_file_reader->path().native(), file_size,
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ class Segment : public std::enable_shared_from_this<Segment> {
std::unique_ptr<ColumnIterator>* iter,
const SubcolumnColumnReaders::Node* root,
vectorized::DataTypePtr target_type_hint);
Status _write_error_file(size_t file_size, size_t offset, size_t bytes_read, char* data,
io::IOContext& io_ctx);

Status _load_index_impl();
Status _open_inverted_index();
Expand Down
Loading