Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/pax_storage/src/cpp/access/pax_scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
&& !(flags & SO_TYPE_VECTOR)
#endif
) {
filter->InitRowFilter(rel, ps, filter->GetColumnProjection());
filter->InitRowFilter(rel, ps, filter->GetColumnProjection(), key, nkeys);
}
}
return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags,
Expand Down
1 change: 1 addition & 0 deletions contrib/pax_storage/src/cpp/comm/cbdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extern "C" {
#include "commands/progress.h"
#include "commands/tablecmds.h"
#include "funcapi.h"
#include "lib/bloomfilter.h"
#include "miscadmin.h"
#include "nodes/bitmapset.h"
#include "nodes/execnodes.h"
Expand Down
10 changes: 5 additions & 5 deletions contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ namespace pax {

PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {}

void PaxFilter::InitSparseFilter(Relation relation, List *quals,
ScanKey key, int nkeys,
bool allow_fallback_to_pg) {
void PaxFilter::InitSparseFilter(Relation relation, List *quals, ScanKey key,
int nkeys, bool allow_fallback_to_pg) {
Assert(!sparse_filter_);
sparse_filter_ =
std::make_shared<PaxSparseFilter>(relation, allow_fallback_to_pg);
Expand Down Expand Up @@ -123,10 +122,11 @@ void PaxFilter::SetColumnProjection(const std::vector<int> &cols, int natts) {
}

void PaxFilter::InitRowFilter(Relation relation, PlanState *ps,
const std::vector<bool> &projection) {
const std::vector<bool> &projection, ScanKey key,
int nkeys) {
Assert(!row_filter_);
row_filter_ = std::make_shared<PaxRowFilter>();
if (!row_filter_->Initialize(relation, ps, projection)) {
if (!row_filter_->Initialize(relation, ps, projection, key, nkeys)) {
row_filter_ = nullptr;
}
}
Expand Down
3 changes: 2 additions & 1 deletion contrib/pax_storage/src/cpp/storage/filter/pax_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class PaxFilter final {

// The row filter
void InitRowFilter(Relation relation, PlanState *ps,
const std::vector<bool> &projection);
const std::vector<bool> &projection, ScanKey key,
int nkeys);
std::shared_ptr<PaxRowFilter> GetRowFilter();

void LogStatistics() const;
Expand Down
70 changes: 57 additions & 13 deletions contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/

#include "storage/filter/pax_row_filter.h"

#include "comm/cbdb_wrappers.h"

namespace paxc {
Expand All @@ -45,19 +46,54 @@ static inline void FindAttrsInQual(Node *qual, bool *proj, int ncol,
}

static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
pax::ExecutionFilterContext *ctx) {
pax::ExecutionFilterContext *ctx,
ScanKey key, int nkeys) {
List *qual = ps->plan->qual;
List **qual_list;
ListCell *lc;
bool *proj;
int *qual_atts;
int natts = RelationGetNumberOfAttributes(rel);

if (!qual || !IsA(qual, List)) return false;
int ret = false;

if (key && nkeys > 0) {
if (nodeTag(ps) != T_SeqScanState) {
elog(ERROR, "runtime filter only support seqscan state, but got %d",
nodeTag(ps));
}

for (int i = 0; i < nkeys; i++) {
if (key[i].sk_flags & SK_BLOOM_FILTER) {
ctx->runtime_bloom_keys.emplace_back(key[i]);
ret = true;
}
}

// register bloom filters
for (int i = 0; i < (int)ctx->runtime_bloom_keys.size(); ++i) {
pax::ExecutionFilterContext::FilterNode node;
node.kind = pax::ExecutionFilterContext::FilterKind::kBloom;
node.index = i;
ctx->filter_nodes.emplace_back(node);
}

if (ps->instrument) {
ps->instrument->prf_work = true;
}
ctx->ps = ps;

// set filter_in_seqscan to false, so that the filter will not be executed
// in SeqNext(), but will be executed in pax_row_filter
auto seqscan = (SeqScanState *)ps;
seqscan->filter_in_seqscan = false;
}

if (!qual || !IsA(qual, List)) return ret;

if (list_length(qual) == 1 && IsA(linitial(qual), BoolExpr)) {
auto boolexpr = (BoolExpr *)linitial(qual);
if (boolexpr->boolop != AND_EXPR) return false;
if (boolexpr->boolop != AND_EXPR) return ret;
qual = boolexpr->args;
}
Assert(IsA(qual, List));
Expand Down Expand Up @@ -98,6 +134,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
if (!qual_list[i]) continue;
ctx->estates[k] = ExecInitQual(qual_list[i], ps);
ctx->attnos[k] = i;
// register expr filter node (by index k)
pax::ExecutionFilterContext::FilterNode node;
node.kind = pax::ExecutionFilterContext::FilterKind::kExpr;
node.index = k;
ctx->filter_nodes.emplace_back(node);
list_free(qual_list[i]);
k++;
}
Expand All @@ -108,7 +149,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
list_free(qual_list[0]);
}

Assert(ctx->size > 0 || ctx->estate_final);
Assert(ctx->size > 0 || ctx->estate_final ||
ctx->runtime_bloom_keys.size() > 0);

// remove qual from plan state, so that the qual will not be executed in
// executor, but will be executed in pax_row_filter
ps->qual = nullptr;

pfree(proj);
Expand All @@ -117,20 +162,19 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
return true;
}

} // namespace paxc

} // namespace paxc

namespace pax {

PaxRowFilter::PaxRowFilter() {}

bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector<bool> &projection) {
bool PaxRowFilter::Initialize(Relation rel, PlanState *ps,
const std::vector<bool> &projection, ScanKey key,
int nkeys) {
bool ok = false;

CBDB_WRAP_START;
{
ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_);
}
{ ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_, key, nkeys); }
CBDB_WRAP_END;

if (ok) {
Expand All @@ -140,7 +184,8 @@ bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector<boo
return ok;
}

void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector<bool> &projection) {
void PaxRowFilter::FillRemainingColumns(Relation rel,
const std::vector<bool> &projection) {
int natts = RelationGetNumberOfAttributes(rel);
auto proj_len = projection.size();
std::vector<bool> atts(natts);
Expand All @@ -162,5 +207,4 @@ void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector<bool> &p
}
}


} // namespace pax
35 changes: 28 additions & 7 deletions contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,38 @@ struct ExecutionFilterContext {
ExprState *estate_final = nullptr;
ExprState **estates;
AttrNumber *attnos;
PlanState *ps;
int size = 0;
inline bool HasExecutionFilter() const { return size > 0 || estate_final; }

// runtime bloom filters pushed down via SeqScanState->filters
// (SK_BLOOM_FILTER)
std::vector<ScanKeyData> runtime_bloom_keys;

// unified filter nodes (expr + bloom) for execution ordering
enum class FilterKind { kExpr, kBloom };
struct FilterNode {
FilterKind kind;
int index; // index in estates (for kExpr) or in runtime_bloom_keys (for
// kBloom)
uint64 tested = 0; // number of rows tested during sampling
uint64 passed = 0; // number of rows passed during sampling
double score = 1.0; // pass rate used for ordering (lower is better)
};
std::vector<FilterNode> filter_nodes;

// sampling control to determine filter order
bool sampling = true;
uint64 sample_target = 65536; // number of rows for sampling phase
uint64 sample_rows = 0; // rows seen in sampling
};

class PaxRowFilter final {
public:
public:
PaxRowFilter();

bool Initialize(Relation rel, PlanState *ps,
const std::vector<bool> &projection);
const std::vector<bool> &projection, ScanKey key, int nkeys);

inline const ExecutionFilterContext *GetExecutionFilterContext() const {
return &efctx_;
Expand All @@ -60,17 +82,16 @@ class PaxRowFilter final {
inline const std::vector<AttrNumber> &GetRemainingColumns() const {
return remaining_attnos_;
}
private:

private:
void FillRemainingColumns(Relation rel, const std::vector<bool> &projection);

private:
private:
ExecutionFilterContext efctx_;
// all selected columns - single row filting columns
// before running final cross columns expression filtering, the remaining
// columns should be filled.
std::vector<AttrNumber> remaining_attnos_;
};


};
}; // namespace pax
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@

#include "storage/micro_partition_row_filter_reader.h"

#include <algorithm>

#include "comm/guc.h"
#include "comm/log.h"
#include "comm/pax_memory.h"
#include "storage/filter/pax_filter.h"
#include "storage/filter/pax_sparse_filter.h"
#include "storage/filter/pax_row_filter.h"
#include "storage/filter/pax_sparse_filter.h"
#include "storage/pax_defined.h"
#include "storage/pax_itemptr.h"

Expand Down Expand Up @@ -73,6 +75,95 @@ MicroPartitionRowFilterReader::GetNextGroup(TupleDesc desc) {
return group_;
}

void MicroPartitionRowFilterReader::LoadExprFilterColumns(
MicroPartitionReader::Group *group, TupleDesc desc,
const ExecutionFilterContext *ctx, size_t row_index, TupleTableSlot *slot) {
for (int i = 0; i < ctx->size; i++) {
auto attno = ctx->attnos[i];
Assert(attno > 0);
std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) =
group->GetColumnValue(desc, attno - 1, row_index);
}
}

bool MicroPartitionRowFilterReader::EvalBloomNode(
const ExecutionFilterContext *ctx, MicroPartitionReader::Group *group,
TupleDesc desc, size_t row_index, int bloom_index) {
Assert(bloom_index >= 0 &&
(size_t)bloom_index < ctx->runtime_bloom_keys.size());
const auto &skd = ctx->runtime_bloom_keys[bloom_index];
const ScanKey sk = const_cast<ScanKeyData *>(&skd);
bool isnull = false;
Datum val;
std::tie(val, isnull) =
group->GetColumnValue(desc, sk->sk_attno - 1, row_index);
if (isnull) return true;
bloom_filter *bf = (bloom_filter *)DatumGetPointer(sk->sk_argument);
return !bloom_lacks_element(bf, (unsigned char *)&val, sizeof(Datum));
}

bool MicroPartitionRowFilterReader::EvalExprNode(
const ExecutionFilterContext *ctx, TupleTableSlot *slot, int expr_index) {
return TestRowScanInternal(slot, ctx->estates[expr_index],
ctx->attnos[expr_index]);
}

bool MicroPartitionRowFilterReader::EvalFilterNode(
ExecutionFilterContext *ctx, MicroPartitionReader::Group *group,
TupleDesc desc, size_t row_index, TupleTableSlot *slot,
ExecutionFilterContext::FilterNode &node, bool update_stats) {
bool pass = true;
if (node.kind == ExecutionFilterContext::FilterKind::kBloom) {
pass = EvalBloomNode(ctx, group, desc, row_index, node.index);
if (ctx->ps->instrument && !pass) ctx->ps->instrument->nfilteredPRF += 1;
} else {
pass = EvalExprNode(ctx, slot, node.index);
}
if (update_stats) {
node.tested++;
node.passed += pass ? 1 : 0;
}
return pass;
}

bool MicroPartitionRowFilterReader::ApplyFiltersWithSampling(
ExecutionFilterContext *ctx, MicroPartitionReader::Group *group,
TupleDesc desc, size_t row_index, TupleTableSlot *slot) {
if (!ctx->sampling) {
for (auto &node : ctx->filter_nodes) {
if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, false)) {
return false;
}
}
return true;
}

bool all_pass = true;
for (auto &node : ctx->filter_nodes) {
if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, true)) {
all_pass = false;
break;
}
}
ctx->sample_rows++;
if (!all_pass) return false;

if (ctx->sample_rows >= ctx->sample_target) {
for (auto &node : ctx->filter_nodes) {
node.score =
(node.tested == 0) ? 1.0 : (double)node.passed / (double)node.tested;
}
std::stable_sort(ctx->filter_nodes.begin(), ctx->filter_nodes.end(),
[](const auto &a, const auto &b) {
// Lower pass rate first (better selectivity)
if (a.score != b.score) return a.score < b.score;
return (int)a.kind < (int)b.kind;
});
ctx->sampling = false;
}
return true;
}

bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) {
auto g = group_;
Assert(filter_->GetRowFilter());
Expand Down Expand Up @@ -108,16 +199,14 @@ bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) {
}
}

for (int i = 0; i < ctx->size; i++) {
auto attno = ctx->attnos[i];
Assert(attno > 0);
std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) =
g->GetColumnValue(desc, attno - 1, current_group_row_index_);
if (!TestRowScanInternal(slot, ctx->estates[i], attno)) {
current_group_row_index_++;
goto retry_next;
}
LoadExprFilterColumns(g.get(), desc, ctx, current_group_row_index_, slot);
if (!ApplyFiltersWithSampling(const_cast<ExecutionFilterContext *>(ctx),
g.get(), desc, current_group_row_index_,
slot)) {
current_group_row_index_++;
goto retry_next;
}

for (auto attno : remaining_columns) {
std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) =
g->GetColumnValue(desc, attno - 1, current_group_row_index_);
Expand Down
Loading
Loading