Skip to content

Commit

Permalink
Storage: should seek before read (#7161)
Browse files Browse the repository at this point in the history
  • Loading branch information
Lloyd-Pottiger authored Mar 25, 2023
1 parent 348d424 commit 16a60b0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/TiDBTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table
tipb_table_scan->set_table_id(table_id);
for (const auto & column : partition_table_scan.columns())
*tipb_table_scan->add_columns() = column;
for (const auto & filter : partition_table_scan.pushed_down_filter_conditions())
*tipb_table_scan->add_pushed_down_filter_conditions() = filter;
tipb_table_scan->set_desc(partition_table_scan.desc());
for (auto id : partition_table_scan.primary_column_ids())
tipb_table_scan->add_primary_column_ids(id);
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ size_t DMFileReader::skipNextBlock()

scan_context->total_dmfile_skipped_rows += read_rows;
next_row_offset += read_rows;
last_read_skipped = true;
return read_rows;
}

Expand Down Expand Up @@ -396,6 +397,8 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter)
// merge blocks
Block res = vstackBlocks(std::move(blocks));
res.setStartOffset(start_row_offset);

last_read_skipped = false;
return res;
}

Expand Down Expand Up @@ -639,6 +642,8 @@ Block DMFileReader::read()
e.rethrow();
}
}

last_read_skipped = false;
return res;
}

Expand All @@ -654,7 +659,7 @@ void DMFileReader::readFromDisk(
if (auto iter = column_streams.find(stream_name); iter != column_streams.end())
{
auto & top_stream = iter->second;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0;
bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0 || last_read_skipped;

auto data_type = dmfile->getColumnStat(column_define.id).type;
data_type->deserializeBinaryBulkWithMultipleStreams( //
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class DMFileReader

std::unique_ptr<ColumnSharingCacheMap> col_data_cache{};
std::unordered_map<ColId, bool> last_read_from_cache{};

/// call skipNextBlock() before read()
bool last_read_skipped{false};
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,26 @@ Block LateMaterializationBlockInputStream::readImpl()
// If filter is nullptr, it means that these push down filters are always true.
if (!filter)
{
Block rest_column_block = rest_column_stream->read();
IColumn::Filter col_filter;
col_filter.resize(filter_column_block.rows());
Block rest_column_block;
if (bitmap_filter->get(col_filter, filter_column_block.startOffset(), filter_column_block.rows()))
{
rest_column_block = rest_column_stream->read();
}
else
{
rest_column_block = rest_column_stream->read();
size_t passed_count = countBytesInFilter(col_filter);
for (auto & col : rest_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
for (auto & col : filter_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
}
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
}

Expand All @@ -66,7 +85,7 @@ Block LateMaterializationBlockInputStream::readImpl()
if (size_t passed_count = countBytesInFilter(*filter); passed_count == 0)
{
// if all rows are filtered, skip the next block of rest_column_stream
if (rest_column_stream->skipNextBlock() == 0)
if (size_t skipped_rows = rest_column_stream->skipNextBlock(); skipped_rows == 0)
{
// if we fail to skip, we need to call read() of rest_column_stream, but ignore the result
// NOTE: skipNextBlock() return 0 only if failed to skip or meets the end of stream,
Expand All @@ -77,6 +96,7 @@ Block LateMaterializationBlockInputStream::readImpl()
}
else
{
RUNTIME_CHECK(skipped_rows == rows);
LOG_DEBUG(log, "Late materialization skip read block at start_offset: {}, rows: {}", filter_column_block.startOffset(), filter_column_block.rows());
}
}
Expand Down

0 comments on commit 16a60b0

Please sign in to comment.