diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp index 87c3b176e63be1..4ed3258d74da12 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.cpp @@ -106,7 +106,7 @@ void DorisCompoundFileWriter::sort_files(std::vector& file_infos) { }); } -void DorisCompoundFileWriter::writeCompoundFile() { +size_t DorisCompoundFileWriter::writeCompoundFile() { // list files in current dir std::vector files; directory->list(&files); @@ -171,6 +171,7 @@ void DorisCompoundFileWriter::writeCompoundFile() { _CLTHROWA(CL_ERR_IO, "Create CompoundDirectory output error"); } std::unique_ptr output(out); + size_t start = output->getFilePointer(); output->writeVInt(file_count); // write file entries int64_t data_offset = header_len; @@ -203,7 +204,10 @@ void DorisCompoundFileWriter::writeCompoundFile() { // NOTE: need to decrease ref count, but not to delete here, // because index cache may get the same directory from DIRECTORIES _CLDECDELETE(out_dir) + auto compound_file_size = output->getFilePointer() - start; output->close(); + //LOG(INFO) << (idx_path / idx_name).c_str() << " size:" << compound_file_size; + return compound_file_size; } void DorisCompoundFileWriter::copyFile(const char* fileName, lucene::store::IndexOutput* output, @@ -641,7 +645,7 @@ void DorisCompoundDirectory::close() { if (useCompoundFileWriter) { auto* cfsWriter = _CLNEW DorisCompoundFileWriter(this); // write compound file - cfsWriter->writeCompoundFile(); + compound_file_size = cfsWriter->writeCompoundFile(); // delete index path, which contains separated inverted index files deleteDirectory(); _CLDELETE(cfsWriter) diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h index 510fb183bd8d44..4c238c138005ce 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_directory.h @@ -46,7 +46,7 @@ class DorisCompoundFileWriter : LUCENE_BASE { ~DorisCompoundFileWriter() override = default; /** Returns the directory of the compound file. */ CL_NS(store)::Directory* getDirectory(); - void writeCompoundFile(); + size_t writeCompoundFile(); void copyFile(const char* fileName, lucene::store::IndexOutput* output, uint8_t* buffer, int64_t bufferLength); @@ -77,6 +77,7 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory { std::string directory; std::string cfs_directory; bool useCompoundFileWriter {false}; + size_t compound_file_size = 0; void priv_getFN(char* buffer, const char* name) const; /// Removes an existing file in the directory. @@ -91,6 +92,7 @@ class CLUCENE_EXPORT DorisCompoundDirectory : public lucene::store::Directory { const io::FileSystemSPtr& getFileSystem() { return fs; } const io::FileSystemSPtr& getCompoundFileSystem() { return compound_fs; } + size_t getCompoundFileSize() const { return compound_file_size; } ~DorisCompoundDirectory() override; bool list(std::vector* names) const override; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 1c26dcd154217d..6e811971dab859 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -54,17 +54,11 @@ #include "util/slice.h" #include "util/string_util.h" -#define FINALIZE_OUTPUT(x) \ - if (x != nullptr) { \ - x->close(); \ - _CLDELETE(x); \ +#define FINALLY_CLOSE_OUTPUT(x) \ + try { \ + if (x != nullptr) x->close(); \ + } catch (...) { \ } -#define FINALLY_FINALIZE_OUTPUT(x) \ - try { \ - FINALIZE_OUTPUT(x) \ - } catch (...) { \ - } - namespace doris::segment_v2 { const int32_t MAX_FIELD_LEN = 0x7FFFFFFFL; const int32_t MERGE_FACTOR = 100000000; @@ -152,7 +146,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { _bkd_writer = std::make_shared( max_doc, DIMS, DIMS, value_length, MAX_LEAF_COUNT, MAXMBSortInHeap, total_point_count, true, config::max_depth_in_bkd_tree); - return Status::OK(); + return create_index_directory(_dir); } std::unique_ptr create_chinese_analyzer() { @@ -382,7 +376,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { // no values to add inverted index return Status::OK(); } - auto offsets = reinterpret_cast(offsets_ptr); + const auto* offsets = reinterpret_cast(offsets_ptr); if constexpr (field_is_slice_type(field_type)) { if (_field == nullptr || _index_writer == nullptr) { LOG(ERROR) << "field or index writer is null in inverted index writer."; @@ -404,7 +398,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { continue; } auto* v = (Slice*)((const uint8_t*)value_ptr + j * field_size); - strings.emplace_back(std::string(v->get_data(), v->get_size())); + strings.emplace_back(v->get_data(), v->get_size()); } auto value = join(strings, " "); @@ -456,7 +450,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { auto* v = (Slice*)item_data_ptr; if (!values->is_null_at(j)) { - strings.emplace_back(std::string(v->get_data(), v->get_size())); + strings.emplace_back(v->get_data(), v->get_size()); } item_data_ptr = (uint8_t*)item_data_ptr + field_size; } @@ -471,7 +465,7 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { auto* item_data_ptr = const_cast(values)->mutable_data(); for (size_t j = 0; j < values->length(); ++j) { - const CppType* p = reinterpret_cast(item_data_ptr); + const auto* p = reinterpret_cast(item_data_ptr); if (values->is_null_at(j)) { // bkd do not index null values, so we do nothing here. } else { @@ -515,88 +509,75 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { return 0; } - int64_t file_size() const override { - std::filesystem::path dir(_directory); - dir /= _segment_file_name; - auto file_name = InvertedIndexDescriptor::get_index_file_name( - dir.string(), _index_meta->index_id(), _index_meta->get_index_suffix()); - int64_t size = -1; - auto st = _fs->file_size(file_name.c_str(), &size); - if (!st.ok()) { - LOG(ERROR) << "try to get file:" << file_name << " size error:" << st; - } - return size; - } + int64_t file_size() const override { return _dir->getCompoundFileSize(); } - void write_null_bitmap(lucene::store::IndexOutput* null_bitmap_out, - lucene::store::Directory* dir) { + void write_null_bitmap(lucene::store::IndexOutput* null_bitmap_out) { // write null_bitmap file _null_bitmap.runOptimize(); size_t size = _null_bitmap.getSizeInBytes(false); if (size > 0) { - null_bitmap_out = dir->createOutput( - InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str()); faststring buf; buf.resize(size); _null_bitmap.write(reinterpret_cast(buf.data()), false); null_bitmap_out->writeBytes(reinterpret_cast(buf.data()), size); - FINALIZE_OUTPUT(null_bitmap_out) + null_bitmap_out->close(); } } Status finish() override { - lucene::store::Directory* dir = nullptr; - lucene::store::IndexOutput* null_bitmap_out = nullptr; - lucene::store::IndexOutput* data_out = nullptr; - lucene::store::IndexOutput* index_out = nullptr; - lucene::store::IndexOutput* meta_out = nullptr; + std::unique_ptr null_bitmap_out = nullptr; + std::unique_ptr data_out = nullptr; + std::unique_ptr index_out = nullptr; + std::unique_ptr meta_out = nullptr; try { // write bkd file if constexpr (field_is_numeric_type(field_type)) { - auto index_path = InvertedIndexDescriptor::get_temporary_index_path( - _directory + "/" + _segment_file_name, _index_meta->index_id(), - _index_meta->get_index_suffix()); - bool use_compound_file_writer = true; - bool can_use_ram_dir = true; - dir = DorisCompoundDirectoryFactory::getDirectory( - _fs, index_path.c_str(), use_compound_file_writer, can_use_ram_dir); - write_null_bitmap(null_bitmap_out, dir); _bkd_writer->max_doc_ = _rid; _bkd_writer->docs_seen_ = _row_ids_seen_for_bkd; - data_out = dir->createOutput( - InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str()); - meta_out = dir->createOutput( - InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str()); - index_out = dir->createOutput( - InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str()); - DBUG_EXECUTE_IF("InvertedIndexWriter._set_fulltext_data_out_nullptr", + null_bitmap_out = std::unique_ptr(_dir->createOutput( + InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str())); + data_out = std::unique_ptr(_dir->createOutput( + InvertedIndexDescriptor::get_temporary_bkd_index_data_file_name().c_str())); + meta_out = std::unique_ptr(_dir->createOutput( + InvertedIndexDescriptor::get_temporary_bkd_index_meta_file_name().c_str())); + index_out = std::unique_ptr(_dir->createOutput( + InvertedIndexDescriptor::get_temporary_bkd_index_file_name().c_str())); + write_null_bitmap(null_bitmap_out.get()); + + DBUG_EXECUTE_IF("InvertedIndexWriter._set_bkd_data_out_nullptr", { data_out = nullptr; }); if (data_out != nullptr && meta_out != nullptr && index_out != nullptr) { - _bkd_writer->meta_finish(meta_out, _bkd_writer->finish(data_out, index_out), + _bkd_writer->meta_finish(meta_out.get(), + _bkd_writer->finish(data_out.get(), index_out.get()), int(field_type)); } else { LOG(WARNING) << "Inverted index writer create output error occurred: nullptr"; _CLTHROWA(CL_ERR_IO, "Create output error with nullptr"); } - FINALIZE_OUTPUT(meta_out) - FINALIZE_OUTPUT(data_out) - FINALIZE_OUTPUT(index_out) - FINALIZE_OUTPUT(dir) + meta_out->close(); + data_out->close(); + index_out->close(); + _dir->close(); } else if constexpr (field_is_slice_type(field_type)) { - dir = _index_writer->getDirectory(); - write_null_bitmap(null_bitmap_out, dir); + null_bitmap_out = std::unique_ptr(_dir->createOutput( + InvertedIndexDescriptor::get_temporary_null_bitmap_file_name().c_str())); + write_null_bitmap(null_bitmap_out.get()); close(); - DBUG_EXECUTE_IF("InvertedIndexWriter._throw_clucene_error_in_bkd_writer_close", { - _CLTHROWA(CL_ERR_IO, "debug point: test throw error in bkd index writer"); - }); + DBUG_EXECUTE_IF( + "InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close", { + _CLTHROWA(CL_ERR_IO, + "debug point: test throw error in fulltext index writer"); + }); } } catch (CLuceneError& e) { - FINALLY_FINALIZE_OUTPUT(null_bitmap_out) - FINALLY_FINALIZE_OUTPUT(meta_out) - FINALLY_FINALIZE_OUTPUT(data_out) - FINALLY_FINALIZE_OUTPUT(index_out) + FINALLY_CLOSE_OUTPUT(null_bitmap_out) + FINALLY_CLOSE_OUTPUT(meta_out) + FINALLY_CLOSE_OUTPUT(data_out) + FINALLY_CLOSE_OUTPUT(index_out) if constexpr (field_is_numeric_type(field_type)) { - FINALLY_FINALIZE_OUTPUT(dir) + FINALLY_CLOSE_OUTPUT(_dir) + } else if constexpr (field_is_slice_type(field_type)) { + FINALLY_CLOSE_OUTPUT(_index_writer); } LOG(WARNING) << "Inverted index writer finish error occurred: " << e.what(); return Status::Error( diff --git a/regression-test/suites/fault_injection_p0/test_index_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_fault_injection.groovy index 08c2bc1b56fff8..5fd47739b4860d 100644 --- a/regression-test/suites/fault_injection_p0/test_index_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_index_fault_injection.groovy @@ -130,16 +130,16 @@ suite("test_index_failure_injection", "nonConcurrent") { create_httplogs_unique_table.call(testTable_unique) try { - GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._set_fulltext_data_out_nullptr") + GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._set_bkd_data_out_nullptr") load_httplogs_data.call(testTable_dup, 'test_httplogs_load_count_on_index', 'true', 'json', 'documents-1000.json') } finally { - GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._set_fulltext_data_out_nullptr") + GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._set_bkd_data_out_nullptr") } try { - GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_bkd_writer_close") + GetDebugPoint().enableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close") load_httplogs_data.call(testTable_unique, 'test_httplogs_load_count_on_index', 'true', 'json', 'documents-1000.json') } finally { - GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_bkd_writer_close") + GetDebugPoint().disableDebugPointForAllBEs("InvertedIndexWriter._throw_clucene_error_in_fulltext_writer_close") } qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" } finally { diff --git a/regression-test/suites/inverted_index_p0/test_show_data.groovy b/regression-test/suites/inverted_index_p0/test_show_data.groovy index 339152d378ddf8..7f9b43498eec24 100644 --- a/regression-test/suites/inverted_index_p0/test_show_data.groovy +++ b/regression-test/suites/inverted_index_p0/test_show_data.groovy @@ -17,12 +17,13 @@ suite("test_show_data", "p0") { // define a sql table - def testTable = "test_show_data_httplogs" + def testTableWithoutIndex = "test_show_data_httplogs_without_index" + def testTableWithIndex = "test_show_data_httplogs_with_index" def delta_time = 5000 def timeout = 60000 String database = context.config.getDbNameByFile(context.file) - def create_httplogs_dup_table = {testTablex -> + def create_httplogs_table_without_index = {testTablex -> // multi-line sql def result = sql """ CREATE TABLE IF NOT EXISTS ${testTablex} ( @@ -39,6 +40,24 @@ suite("test_show_data", "p0") { ); """ } + def create_httplogs_table_with_index = {testTablex -> + // multi-line sql + def result = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + `@timestamp` int(11) NULL, + `clientip` varchar(20) NULL, + `request` text NULL, + `status` int(11) NULL, + `size` int(11) NULL, + INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser"="english") COMMENT '' + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + } def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, expected_succ_rows = -1, load_to_single_tablet = 'true' -> @@ -80,7 +99,7 @@ suite("test_show_data", "p0") { def wait_for_show_data_finish = { table_name, OpTimeout, origin_size -> def useTime = 0 for(int t = delta_time; t <= OpTimeout; t += delta_time){ - result = sql """show data from ${database}.${table_name};""" + def result = sql """show data from ${database}.${table_name};""" if (result.size() > 0) { logger.info(table_name + " show data, detail: " + result[0].toString()) def size = result[0][2].replace(" KB", "").toDouble() @@ -98,7 +117,7 @@ suite("test_show_data", "p0") { def wait_for_latest_op_on_table_finish = { table_name, OpTimeout -> def useTime = 0 for(int t = delta_time; t <= OpTimeout; t += delta_time){ - alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;""" + def alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;""" alter_res = alter_res.toString() if(alter_res.contains("FINISHED")) { sleep(3000) // wait change table state to normal @@ -114,7 +133,7 @@ suite("test_show_data", "p0") { def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> def useTime = 0 for(int t = delta_time; t <= OpTimeout; t += delta_time){ - alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + def alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ if (alter_res.size() > 0) { def last_job_state = alter_res[alter_res.size()-1][7]; @@ -131,24 +150,194 @@ suite("test_show_data", "p0") { } try { - sql "DROP TABLE IF EXISTS ${testTable}" + sql "DROP TABLE IF EXISTS ${testTableWithoutIndex}" - create_httplogs_dup_table.call(testTable) + create_httplogs_table_without_index.call(testTableWithoutIndex) - load_httplogs_data.call(testTable, 'test_httplogs_load', 'true', 'json', 'documents-1000.json') + load_httplogs_data.call(testTableWithoutIndex, 'test_httplogs_load_without_index', 'true', 'json', 'documents-1000.json') sql "sync" - def no_index_size = wait_for_show_data_finish(testTable, 300000, 0) + def no_index_size = wait_for_show_data_finish(testTableWithoutIndex, 300000, 0) assertTrue(no_index_size != "wait_timeout") - sql """ ALTER TABLE ${testTable} ADD INDEX idx_request (`request`) USING INVERTED PROPERTIES("parser" = "english") """ - wait_for_latest_op_on_table_finish(testTable, timeout) + sql """ ALTER TABLE ${testTableWithoutIndex} ADD INDEX idx_request (`request`) USING INVERTED PROPERTIES("parser" = "english") """ + wait_for_latest_op_on_table_finish(testTableWithoutIndex, timeout) // BUILD INDEX and expect state is RUNNING - sql """ BUILD INDEX idx_request ON ${testTable} """ - def state = wait_for_last_build_index_on_table_finish(testTable, timeout) + sql """ BUILD INDEX idx_request ON ${testTableWithoutIndex} """ + def state = wait_for_last_build_index_on_table_finish(testTableWithoutIndex, timeout) assertEquals(state, "FINISHED") - def with_index_size = wait_for_show_data_finish(testTable, 300000, no_index_size) + def with_index_size = wait_for_show_data_finish(testTableWithoutIndex, 300000, no_index_size) assertTrue(with_index_size != "wait_timeout") + + sql "DROP TABLE IF EXISTS ${testTableWithIndex}" + create_httplogs_table_with_index.call(testTableWithIndex) + load_httplogs_data.call(testTableWithIndex, 'test_httplogs_load_with_index', 'true', 'json', 'documents-1000.json') + def another_with_index_size = wait_for_show_data_finish(testTableWithIndex, 300000, 0) + assertEquals(another_with_index_size, with_index_size) + } finally { + //try_sql("DROP TABLE IF EXISTS ${testTable}") + } +} + +suite("test_show_data_for_bkd", "p0") { + // define a sql table + def testTableWithoutBKDIndex = "test_show_data_httplogs_without_bkd_index" + def testTableWithBKDIndex = "test_show_data_httplogs_with_bkd_index" + def delta_time = 5000 + def timeout = 60000 + String database = context.config.getDbNameByFile(context.file) + + def create_httplogs_table_without_bkd_index = {testTablex -> + // multi-line sql + def result = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + `@timestamp` int(11) NULL, + `clientip` varchar(20) NULL, + `request` text NULL, + `status` int(11) NULL, + `size` int(11) NULL, + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + } + def create_httplogs_table_with_bkd_index = {testTablex -> + // multi-line sql + def result = sql """ + CREATE TABLE IF NOT EXISTS ${testTablex} ( + `@timestamp` int(11) NULL, + `clientip` varchar(20) NULL, + `request` text NULL, + `status` int(11) NULL, + `size` int(11) NULL, + INDEX status_idx (`status`) USING INVERTED + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + DISTRIBUTED BY HASH(`@timestamp`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + } + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + if (expected_succ_rows >= 0) { + assertEquals(json.NumberLoadedRows, expected_succ_rows) + } else { + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + } + + def wait_for_show_data_finish = { table_name, OpTimeout, origin_size -> + def useTime = 0 + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + def result = sql """show data from ${database}.${table_name};""" + if (result.size() > 0) { + logger.info(table_name + " show data, detail: " + result[0].toString()) + def size = result[0][2].replace(" KB", "").toDouble() + if (size > origin_size) { + return size + } + } + useTime = t + Thread.sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_show_data_finish timeout, useTime=${useTime}") + return "wait_timeout" + } + + def wait_for_latest_op_on_table_finish = { table_name, OpTimeout -> + def useTime = 0 + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + def alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;""" + alter_res = alter_res.toString() + if(alter_res.contains("FINISHED")) { + sleep(3000) // wait change table state to normal + logger.info(table_name + " latest alter job finished, detail: " + alter_res) + break + } + useTime = t + Thread.sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout, useTime=${useTime}") + } + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + def useTime = 0 + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + def alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + Thread.sleep(delta_time) + } + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout, useTime=${useTime}") + return "wait_timeout" + } + + try { + sql "DROP TABLE IF EXISTS ${testTableWithoutBKDIndex}" + + create_httplogs_table_without_bkd_index.call(testTableWithoutBKDIndex) + + load_httplogs_data.call(testTableWithoutBKDIndex, 'test_httplogs_load_without_bkd_index', 'true', 'json', 'documents-1000.json') + + sql "sync" + def no_index_size = wait_for_show_data_finish(testTableWithoutBKDIndex, 300000, 0) + assertTrue(no_index_size != "wait_timeout") + sql """ ALTER TABLE ${testTableWithoutBKDIndex} ADD INDEX idx_status (`status`) USING INVERTED; """ + wait_for_latest_op_on_table_finish(testTableWithoutBKDIndex, timeout) + + // BUILD INDEX and expect state is RUNNING + sql """ BUILD INDEX idx_status ON ${testTableWithoutBKDIndex} """ + def state = wait_for_last_build_index_on_table_finish(testTableWithoutBKDIndex, timeout) + assertEquals(state, "FINISHED") + def with_index_size = wait_for_show_data_finish(testTableWithoutBKDIndex, 300000, no_index_size) + assertTrue(with_index_size != "wait_timeout") + + sql "DROP TABLE IF EXISTS ${testTableWithBKDIndex}" + create_httplogs_table_with_bkd_index.call(testTableWithBKDIndex) + load_httplogs_data.call(testTableWithBKDIndex, 'test_httplogs_load_with_bkd_index', 'true', 'json', 'documents-1000.json') + def another_with_index_size = wait_for_show_data_finish(testTableWithBKDIndex, 300000, 0) + assertEquals(another_with_index_size, with_index_size) } finally { //try_sql("DROP TABLE IF EXISTS ${testTable}") }