Skip to content

Commit

Permalink
[fix](group_commit)Fix bound checking problem when reading wal block (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hust-hhb authored Feb 22, 2024
1 parent 8949307 commit a6f0a46
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 70 deletions.
21 changes: 14 additions & 7 deletions be/src/vec/exec/format/wal/wal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "wal_reader.h"

#include "common/logging.h"
#include "common/sync_point.h"
#include "gutil/strings/split.h"
#include "olap/wal/wal_manager.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -61,11 +62,17 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
//convert to dst block
vectorized::Block dst_block;
int index = 0;
auto columns = block->get_columns_with_type_and_name();
if (_column_id_count != columns.size() || columns.size() != _tuple_descriptor->slots().size()) {
auto output_block_columns = block->get_columns_with_type_and_name();
size_t output_block_column_size = output_block_columns.size();
TEST_SYNC_POINT_CALLBACK("WalReader::set_column_id_count", &_column_id_count);
TEST_SYNC_POINT_CALLBACK("WalReader::set_out_block_column_size", &output_block_column_size);
if (_column_id_count != src_block.columns() ||
output_block_column_size != _tuple_descriptor->slots().size()) {
return Status::InternalError(
"not equal _column_id_count={} vs columns size={} vs tuple_descriptor size={}",
std::to_string(_column_id_count), std::to_string(columns.size()),
"not equal wal _column_id_count={} vs wal block columns size={}, "
"output block columns size={} vs tuple_descriptor size={}",
std::to_string(_column_id_count), std::to_string(src_block.columns()),
std::to_string(output_block_column_size),
std::to_string(_tuple_descriptor->slots().size()));
}
for (auto slot_desc : _tuple_descriptor->slots()) {
Expand All @@ -78,9 +85,9 @@ Status WalReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (column_ptr != nullptr && slot_desc->is_nullable()) {
column_ptr = make_nullable(column_ptr);
}
dst_block.insert(
index, vectorized::ColumnWithTypeAndName(std::move(column_ptr), columns[index].type,
columns[index].name));
dst_block.insert(index, vectorized::ColumnWithTypeAndName(
std::move(column_ptr), output_block_columns[index].type,
output_block_columns[index].name));
index++;
}
block->swap(dst_block);
Expand Down
155 changes: 92 additions & 63 deletions be/test/vec/exec/vwal_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>

#include "common/object_pool.h"
#include "common/sync_point.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "io/fs/local_file_system.h"
Expand All @@ -40,29 +41,33 @@ class VWalScannerTest : public testing::Test {
init();
_profile = _runtime_state.runtime_profile();
_runtime_state.init_mem_trackers();
static_cast<void>(_runtime_state.init(unique_id, query_options, query_globals, _env));
WARN_IF_ERROR(_runtime_state.init(_unique_id, _query_options, _query_globals, _env),
"fail to init _runtime_state");
}
void init();
void generate_scanner(std::shared_ptr<VFileScanner>& scanner);

void TearDown() override {
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
WARN_IF_ERROR(_scan_node->close(&_runtime_state), "fail to close scan_node")
WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir),
fmt::format("fail to delete dir={}", _wal_dir));
SAFE_STOP(_env->_wal_manager);
}

protected:
virtual void SetUp() override {}

private:
void init_desc_table();
void _init_desc_table();

ExecEnv* _env = nullptr;
std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
int64_t db_id = 1;
int64_t tb_id = 2;
int64_t txn_id = 789;
int64_t version = 0;
int64_t backend_id = 1001;
std::string label = "test";
std::string _wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
int64_t _db_id = 1;
int64_t _tb_id = 2;
int64_t _txn_id = 789;
int64_t _version = 0;
int64_t _backend_id = 1001;
std::string _label = "test";

TupleId _dst_tuple_id = 0;
RuntimeState _runtime_state;
Expand All @@ -73,12 +78,18 @@ class VWalScannerTest : public testing::Test {
ScannerCounter _counter;
std::vector<TExpr> _pre_filter;
TPlanNode _tnode;
TUniqueId unique_id;
TQueryOptions query_options;
TQueryGlobals query_globals;
TUniqueId _unique_id;
TQueryOptions _query_options;
TQueryGlobals _query_globals;
std::shared_ptr<NewFileScanNode> _scan_node = nullptr;
std::vector<TFileRangeDesc> _ranges;
TFileRangeDesc _range_desc;
TFileScanRange _scan_range;
std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
std::unique_ptr<TMasterInfo> _master_info = nullptr;
};

void VWalScannerTest::init_desc_table() {
void VWalScannerTest::_init_desc_table() {
TDescriptorTable t_desc_table;

// table descriptors
Expand Down Expand Up @@ -118,6 +129,7 @@ void VWalScannerTest::init_desc_table() {
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "c1";
slot_desc.slotIdx = 1;
slot_desc.col_unique_id = 0;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
Expand Down Expand Up @@ -145,6 +157,7 @@ void VWalScannerTest::init_desc_table() {
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "c2";
slot_desc.slotIdx = 2;
slot_desc.col_unique_id = 1;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
Expand Down Expand Up @@ -172,6 +185,7 @@ void VWalScannerTest::init_desc_table() {
slot_desc.nullIndicatorBit = -1;
slot_desc.colName = "c3";
slot_desc.slotIdx = 3;
slot_desc.col_unique_id = 2;
slot_desc.isMaterialized = true;

t_desc_table.slotDescriptors.push_back(slot_desc);
Expand All @@ -196,9 +210,10 @@ void VWalScannerTest::init_desc_table() {

void VWalScannerTest::init() {
config::group_commit_wal_max_disk_limit = "100M";
init_desc_table();
static_cast<void>(io::global_local_filesystem()->create_directory(
wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id)));
_init_desc_table();
WARN_IF_ERROR(io::global_local_filesystem()->create_directory(
_wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id)),
"fail to creat directory");

// Node Id
_tnode.node_id = 0;
Expand All @@ -210,73 +225,87 @@ void VWalScannerTest::init() {
_tnode.file_scan_node.tuple_id = 0;
_tnode.__isset.file_scan_node = true;

_scan_node = std::make_shared<NewFileScanNode>(&_obj_pool, _tnode, *_desc_tbl);
_scan_node->_output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init scan_node");
WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare scan_node");

_range_desc.start_offset = 0;
_range_desc.size = 1000;
_ranges.push_back(_range_desc);
_scan_range.ranges = _ranges;
_scan_range.__isset.params = true;
_scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
_kv_cache.reset(new ShardedKVCache(48));

_runtime_state._wal_id = _txn_id;

_master_info.reset(new TMasterInfo());
_env = ExecEnv::GetInstance();
_env->_master_info = new TMasterInfo();
_env->_master_info = _master_info.get();
_env->_master_info->network_address.hostname = "host name";
_env->_master_info->network_address.port = backend_id;
_env->_master_info->network_address.port = _backend_id;
_env->_master_info->backend_id = 1001;
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
st = _env->_wal_manager->create_wal_path(db_id, tb_id, txn_id, label, base_path);
st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id, _label, base_path);
std::string src = "./be/test/exec/test_data/wal_scanner/wal";
std::string dst = wal_dir + "/" + std::to_string(db_id) + "/" + std::to_string(tb_id) + "/" +
std::to_string(version) + "_" + std::to_string(backend_id) + "_" +
std::to_string(txn_id) + "_" + label;
std::string dst = _wal_dir + "/" + std::to_string(_db_id) + "/" + std::to_string(_tb_id) + "/" +
std::to_string(_version) + "_" + std::to_string(_backend_id) + "_" +
std::to_string(_txn_id) + "_" + _label;
std::filesystem::copy(src, dst);
}

TEST_F(VWalScannerTest, normal) {
std::vector<size_t> index_vector;
index_vector.emplace_back(0);
index_vector.emplace_back(1);
index_vector.emplace_back(2);
// config::group_commit_replay_wal_dir = wal_dir;
NewFileScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
scan_node._output_tuple_desc = _runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
static_cast<void>(scan_node.init(_tnode, &_runtime_state));
auto status = scan_node.prepare(&_runtime_state);
EXPECT_TRUE(status.ok());

std::vector<TFileRangeDesc> ranges;
TFileRangeDesc range_desc;
{
range_desc.start_offset = 0;
range_desc.size = 1000;
}
ranges.push_back(range_desc);
TFileScanRange scan_range;
scan_range.ranges = ranges;
scan_range.__isset.params = true;
scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
std::unique_ptr<ShardedKVCache> _kv_cache;
_kv_cache.reset(new ShardedKVCache(48));
_runtime_state._wal_id = txn_id;
VFileScanner scanner(&_runtime_state, &scan_node, -1, scan_range, _profile, _kv_cache.get());
scanner._is_load = false;
void VWalScannerTest::generate_scanner(std::shared_ptr<VFileScanner>& scanner) {
scanner = std::make_shared<VFileScanner>(&_runtime_state, _scan_node.get(), -1, _scan_range,
_profile, _kv_cache.get());
scanner->_is_load = false;
vectorized::VExprContextSPtrs _conjuncts;
std::unordered_map<std::string, ColumnValueRangeType> _colname_to_value_range;
std::unordered_map<std::string, int> _colname_to_slot_id;
static_cast<void>(scanner.prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id));
WARN_IF_ERROR(scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id),
"fail to prepare scanner");
}

TEST_F(VWalScannerTest, normal) {
std::shared_ptr<VFileScanner> scanner = nullptr;
generate_scanner(scanner);
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
auto st = scanner.get_block(&_runtime_state, block.get(), &eof);
EXPECT_EQ(3, block->rows());
auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
ASSERT_TRUE(st.ok());
EXPECT_EQ(3, block->rows());
block->clear();
st = scanner.get_block(&_runtime_state, block.get(), &eof);
st = scanner->get_block(&_runtime_state, block.get(), &eof);
ASSERT_TRUE(st.ok());
EXPECT_EQ(0, block->rows());
ASSERT_TRUE(eof);
static_cast<void>(scanner.close(&_runtime_state));
static_cast<void>(scan_node.close(&_runtime_state));
WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
}

{
std::stringstream ss;
scan_node.runtime_profile()->pretty_print(&ss);
LOG(INFO) << ss.str();
}
TEST_F(VWalScannerTest, fail_with_not_equal) {
auto sp = SyncPoint::get_instance();
Defer defer {[sp] {
sp->clear_call_back("WalReader::set_column_id_count");
sp->clear_call_back("WalReader::set_out_block_column_size");
}};
sp->set_call_back("WalReader::set_column_id_count",
[](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 2; });
sp->set_call_back("WalReader::set_out_block_column_size",
[](auto&& args) { *try_any_cast<size_t*>(args[0]) = 2; });
sp->enable_processing();

std::shared_ptr<VFileScanner> scanner = nullptr;
generate_scanner(scanner);
std::unique_ptr<vectorized::Block> block(new vectorized::Block());
bool eof = false;
auto st = scanner->get_block(&_runtime_state, block.get(), &eof);
ASSERT_FALSE(st.ok());
auto msg = st.to_string();
auto pos = msg.find("not equal");
ASSERT_TRUE(pos != msg.npos);
WARN_IF_ERROR(scanner->close(&_runtime_state), "fail to close scanner");
}

} // namespace vectorized
Expand Down

0 comments on commit a6f0a46

Please sign in to comment.