diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp index 68ef961c471..a5aa5ded6bc 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp @@ -70,6 +70,8 @@ void TiDBTableScan::constructTableScanForRemoteRead(tipb::TableScan * tipb_table tipb_table_scan->set_table_id(table_id); for (const auto & column : partition_table_scan.columns()) *tipb_table_scan->add_columns() = column; + for (const auto & filter : partition_table_scan.pushed_down_filter_conditions()) + *tipb_table_scan->add_pushed_down_filter_conditions() = filter; tipb_table_scan->set_desc(partition_table_scan.desc()); for (auto id : partition_table_scan.primary_column_ids()) tipb_table_scan->add_primary_column_ids(id); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 5b4ae4539a8..0bd8787bf72 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -295,6 +295,7 @@ size_t DMFileReader::skipNextBlock() scan_context->total_dmfile_skipped_rows += read_rows; next_row_offset += read_rows; + last_read_skipped = true; return read_rows; } @@ -396,6 +397,8 @@ Block DMFileReader::readWithFilter(const IColumn::Filter & filter) // merge blocks Block res = vstackBlocks(std::move(blocks)); res.setStartOffset(start_row_offset); + + last_read_skipped = false; return res; } @@ -639,6 +642,8 @@ Block DMFileReader::read() e.rethrow(); } } + + last_read_skipped = false; return res; } @@ -654,7 +659,7 @@ void DMFileReader::readFromDisk( if (auto iter = column_streams.find(stream_name); iter != column_streams.end()) { auto & top_stream = iter->second; - bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0; + bool should_seek = force_seek || shouldSeek(start_pack_id) || skip_packs > 0 || last_read_skipped; auto data_type = dmfile->getColumnStat(column_define.id).type; data_type->deserializeBinaryBulkWithMultipleStreams( // diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 55924510964..09a31790cb1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -180,6 +180,9 @@ class DMFileReader std::unique_ptr col_data_cache{}; std::unordered_map last_read_from_cache{}; + + /// call skipNextBlock() before read() + bool last_read_skipped{false}; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp index 270b92ca15f..f9f9ae8dc1a 100644 --- a/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/LateMaterializationBlockInputStream.cpp @@ -55,7 +55,26 @@ Block LateMaterializationBlockInputStream::readImpl() // If filter is nullptr, it means that these push down filters are always true. if (!filter) { - Block rest_column_block = rest_column_stream->read(); + IColumn::Filter col_filter; + col_filter.resize(filter_column_block.rows()); + Block rest_column_block; + if (bitmap_filter->get(col_filter, filter_column_block.startOffset(), filter_column_block.rows())) + { + rest_column_block = rest_column_stream->read(); + } + else + { + rest_column_block = rest_column_stream->read(); + size_t passed_count = countBytesInFilter(col_filter); + for (auto & col : rest_column_block) + { + col.column = col.column->filter(col_filter, passed_count); + } + for (auto & col : filter_column_block) + { + col.column = col.column->filter(col_filter, passed_count); + } + } return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header); } @@ -66,7 +85,7 @@ Block LateMaterializationBlockInputStream::readImpl() if (size_t passed_count = countBytesInFilter(*filter); passed_count == 0) { // if all rows are filtered, skip the next block of rest_column_stream - if (rest_column_stream->skipNextBlock() == 0) + if (size_t skipped_rows = rest_column_stream->skipNextBlock(); skipped_rows == 0) { // if we fail to skip, we need to call read() of rest_column_stream, but ignore the result // NOTE: skipNextBlock() return 0 only if failed to skip or meets the end of stream, @@ -77,6 +96,7 @@ Block LateMaterializationBlockInputStream::readImpl() } else { + RUNTIME_CHECK(skipped_rows == rows); LOG_DEBUG(log, "Late materialization skip read block at start_offset: {}, rows: {}", filter_column_block.startOffset(), filter_column_block.rows()); } }