Skip to content

Commit

Permalink
fix create fulltext index (infiniflow#1454)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?
* Fixed: when executing `fusion_rerank_maxsim.slt`, the fulltext index
created may be empty, resulting in failure

### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Co-authored-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
Ma-cat and JinHai-CN authored Jul 9, 2024
1 parent 6ebf6c1 commit 11c55c7
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
21 changes: 12 additions & 9 deletions src/storage/invertedindex/common/external_sort_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,17 @@ void SortMerger<KeyType, LenType>::Run() {
}
}

template <typename KeyType, typename LenType>
requires std::same_as<KeyType, TermTuple>
SortMergerTermTuple<KeyType, LenType>::SortMergerTermTuple(const char *filenm, u32 group_size, u32 bs, u32 output_num)
: Super(filenm, group_size, bs, output_num) {
InitRunFile();
io_stream_ = MakeUnique<DirectIO>(run_file_);
this->FILE_LEN_ = io_stream_->Length();
io_stream_->Read((char *)(&this->count_), sizeof(u64));
Super::Init(*io_stream_);
}

template <typename KeyType, typename LenType>
requires std::same_as<KeyType, TermTuple>
void SortMergerTermTuple<KeyType, LenType>::MergeImpl() {
Expand Down Expand Up @@ -547,15 +558,7 @@ void SortMergerTermTuple<KeyType, LenType>::JoinThreads(Vector<UniquePtr<Thread>
template <typename KeyType, typename LenType>
requires std::same_as<KeyType, TermTuple>
void SortMergerTermTuple<KeyType, LenType>::Run(Vector<UniquePtr<Thread>>& threads) {
InitRunFile();
DirectIO io_stream(run_file_);
this->FILE_LEN_ = io_stream.Length();

io_stream.Read((char *)(&this->count_), sizeof(u64));

Super::Init(io_stream);

UniquePtr<Thread> predict_thread = MakeUnique<Thread>(std::bind(&self_t::PredictImpl, this, io_stream));
UniquePtr<Thread> predict_thread = MakeUnique<Thread>(std::bind(&self_t::PredictImpl, this, *io_stream_));
UniquePtr<Thread> merge_thread = MakeUnique<Thread>(std::bind(&self_t::MergeImpl, this));

threads.push_back(std::move(predict_thread));
Expand Down
4 changes: 2 additions & 2 deletions src/storage/invertedindex/common/external_sort_merger.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,13 @@ protected:
using Super = SortMerger<KeyType, LenType>;
using typename Super::KeyAddr;
FILE *run_file_{nullptr};
UniquePtr<DirectIO> io_stream_;

void PredictImpl(DirectIO &io_stream);

void MergeImpl();
public:
SortMergerTermTuple(const char *filenm, u32 group_size = 4, u32 bs = 100000000, u32 output_num = 2)
: Super(filenm, group_size, bs, output_num) {}
SortMergerTermTuple(const char *filenm, u32 group_size = 4, u32 bs = 100000000, u32 output_num = 2);

void Run(Vector<UniquePtr<Thread>>& threads);

Expand Down
30 changes: 16 additions & 14 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,24 +170,26 @@ SizeT MemoryIndexer::CommitOffline(SizeT wait_if_empty_ms) {
return 0;
}

if (nullptr == spill_file_handle_) {
PrepareSpillFile();
}
Vector<SharedPtr<ColumnInverter>> inverters;
this->ring_sorted_.GetBatch(inverters, wait_if_empty_ms);
SizeT num = inverters.size();
if (num > 0) {
for (auto &inverter : inverters) {
inverter->SpillSortResults(this->spill_file_handle_, this->tuple_count_, buf_writer_);
num_runs_++;
}
if (!num) {
return num;
}
if (num > 0) {
std::unique_lock<std::mutex> lock(mutex_);
inflight_tasks_ -= num;
if (inflight_tasks_ == 0) {
cv_.notify_all();
}

if (nullptr == spill_file_handle_) {
PrepareSpillFile();
}

for (auto &inverter : inverters) {
inverter->SpillSortResults(this->spill_file_handle_, this->tuple_count_, buf_writer_);
num_runs_++;
}

std::unique_lock<std::mutex> task_lock(mutex_);
inflight_tasks_ -= num;
if (inflight_tasks_ == 0) {
cv_.notify_all();
}
return num;
}
Expand Down
3 changes: 0 additions & 3 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ TxnTimeStamp TxnManager::GetCleanupScanTS() {
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);
for (auto *txn : finished_txns_) {
if (txn->CommittedTS() > res) {
break;
}
res = std::min(res, txn->BeginTS());
}
return res;
Expand Down

0 comments on commit 11c55c7

Please sign in to comment.