Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support vectorized aggregation on Hypercore TAM #7655

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
append-* transparent_decompression-*
transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler*
hypercore_vacuum vectorized_aggregation vector_agg_text
vector_agg_groupagg
vector_agg_groupagg hypercore_parallel hypercore_vectoragg
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
1 change: 1 addition & 0 deletions .unreleased/pr_7655
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7655 Support vectorized aggregation on Hypercore TAM
8 changes: 7 additions & 1 deletion tsl/src/hypercore/arrow_tts.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow_cache.h"
#include "compression/arrow_c_data_interface.h"
#include "debug_assert.h"
#include "nodes/decompress_chunk/compressed_batch.h"

#include <limits.h>

Expand Down Expand Up @@ -88,6 +89,10 @@ typedef struct ArrowTupleTableSlot
const uint64 *arrow_qual_result; /* Bitmap with result of qual
* filtering over arrow_array. NULL if
* no filtering has been applied. */

/* Struct to hold values for one column. Necessary for compatibility with
* vector aggs. */
struct CompressedColumnValues ccvalues;
} ArrowTupleTableSlot;

extern const TupleTableSlotOps TTSOpsArrowTuple;
Expand Down Expand Up @@ -413,8 +418,9 @@ arrow_slot_per_segment_memory_context(const TupleTableSlot *slot)
return aslot->per_segment_mcxt;
}

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern const ArrowArray *arrow_slot_get_array(TupleTableSlot *slot, AttrNumber attno);

extern bool is_compressed_col(const TupleDesc tupdesc, AttrNumber attno);
extern void arrow_slot_set_referenced_attrs(TupleTableSlot *slot, Bitmapset *attrs);
extern void arrow_slot_set_index_attrs(TupleTableSlot *slot, Bitmapset *attrs);

Expand Down
5 changes: 3 additions & 2 deletions tsl/src/nodes/columnar_scan/columnar_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,9 +996,10 @@ static CustomScanMethods columnar_scan_plan_methods = {
};

bool
is_columnar_scan(const CustomScan *scan)
is_columnar_scan(const Plan *plan)
{
return scan->methods == &columnar_scan_plan_methods;
return IsA(plan, CustomScan) &&
((const CustomScan *) plan)->methods == &columnar_scan_plan_methods;
}

typedef struct VectorQualInfoHypercore
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/columnar_scan/columnar_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ typedef struct ColumnarScanPath
extern ColumnarScanPath *columnar_scan_path_create(PlannerInfo *root, RelOptInfo *rel,
Relids required_outer, int parallel_workers);
extern void columnar_scan_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht);
extern bool is_columnar_scan(const CustomScan *scan);
extern bool is_columnar_scan(const Plan *plan);
extern void _columnar_scan_init(void);

#endif /* TIMESCALEDB_COLUMNAR_SCAN_H */
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_batch.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_hash.c
${CMAKE_CURRENT_SOURCE_DIR}/plan.c
${CMAKE_CURRENT_SOURCE_DIR}/plan_decompress_chunk.c)
${CMAKE_CURRENT_SOURCE_DIR}/plan_decompress_chunk.c
${CMAKE_CURRENT_SOURCE_DIR}/plan_tam.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
148 changes: 136 additions & 12 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,28 @@

#include <commands/explain.h>
#include <executor/executor.h>
#include <executor/tuptable.h>
#include <nodes/extensible.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <optimizer/optimizer.h>

#include "nodes/vector_agg/exec.h"

#include "compression/arrow_c_data_interface.h"
#include "hypercore/arrow_tts.h"
#include "hypercore/vector_quals.h"
#include "nodes/columnar_scan/columnar_scan.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"

static int
get_input_offset(const CustomScanState *state, const Var *var)
get_input_offset_decompress_chunk(const DecompressChunkState *decompress_state, const Var *var)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;

/*
Expand Down Expand Up @@ -57,13 +61,56 @@ get_input_offset(const CustomScanState *state, const Var *var)
return index;
}

static int
get_value_bytes(const CustomScanState *state, int input_offset)
static void
get_column_storage_properties_decompress_chunk(const DecompressChunkState *state, int input_offset,
GroupingColumn *result)
{
const DecompressChunkState *decompress_state = (DecompressChunkState *) state;
const DecompressContext *dcontext = &decompress_state->decompress_context;
const DecompressContext *dcontext = &state->decompress_context;
const CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[input_offset];
return desc->value_bytes;
result->value_bytes = desc->value_bytes;
result->by_value = desc->by_value;
}

/*
* Given a Var reference, get the offset of the corresponding attribute in the
* input tuple.
*
* For a node returning arrow slots, this is just the attribute number in the
* Var. But if the node is DecompressChunk, it is necessary to translate
* between the compressed and non-compressed columns.
*/
static int
get_input_offset(const CustomScanState *state, const Var *var)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
return AttrNumberGetAttrOffset(var->varattno);

return get_input_offset_decompress_chunk((const DecompressChunkState *) state, var);
}

/*
* Get the type length and "byval" properties for the grouping column given by
* the input offset.
*
* For a node returning arrow slots, the properties can be read directly from
* the scanned relation's tuple descriptor. For DecompressChunk, the input
* offset references the compressed relation.
*/
static void
get_column_storage_properties(const CustomScanState *state, int input_offset,
GroupingColumn *result)
{
if (TTS_IS_ARROWTUPLE(state->ss.ss_ScanTupleSlot))
{
const TupleDesc tupdesc = RelationGetDescr(state->ss.ss_currentRelation);
result->by_value = TupleDescAttr(tupdesc, input_offset)->attbyval;
result->value_bytes = TupleDescAttr(tupdesc, input_offset)->attlen;
return;
}

get_column_storage_properties_decompress_chunk((const DecompressChunkState *) state,
input_offset,
result);
}

static void
Expand Down Expand Up @@ -187,7 +234,7 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)

Var *var = castNode(Var, tlentry->expr);
col->input_offset = get_input_offset(childstate, var);
col->value_bytes = get_value_bytes(childstate, col->input_offset);
get_column_storage_properties(childstate, col->input_offset, col);
}
}

Expand Down Expand Up @@ -310,6 +357,49 @@ compressed_batch_get_next_slot(VectorAggState *vector_agg_state)
return &batch_state->decompressed_scan_slot_data.base;
}

/*
* Get the next slot to aggregate for a arrow tuple table slot.
*
* Implements "get next slot" on top of ColumnarScan (or any node producing
* ArrowTupleTableSlots). It just reads the slot from the child node.
*/
static TupleTableSlot *
arrow_get_next_slot(VectorAggState *vector_agg_state)
{
TupleTableSlot *slot = vector_agg_state->custom.ss.ss_ScanTupleSlot;

if (!TTS_EMPTY(slot))
{
Assert(TTS_IS_ARROWTUPLE(slot));

/* If we read an arrow slot previously, the entire arrow array should
* have been aggregated so we should mark it is consumed so that we
* get the next array (or end) when we read the next slot. */

arrow_slot_mark_consumed(slot);
}

slot = ExecProcNode(linitial(vector_agg_state->custom.custom_ps));

if (TupIsNull(slot))
{
/* The input has ended. */
vector_agg_state->input_ended = true;
return NULL;
}

Assert(TTS_IS_ARROWTUPLE(slot));

/* Filtering should have happened in the scan node below so the slot
* should not be consumed here. */
Assert(!arrow_slot_is_consumed(slot));

/* Remember the slot until we're called next time */
vector_agg_state->custom.ss.ss_ScanTupleSlot = slot;

return slot;
}

/*
* Initialize vector quals for a compressed batch.
*
Expand Down Expand Up @@ -339,6 +429,18 @@ compressed_batch_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_
return &agg_state->vqual_state.vqstate;
}

/*
* Initialize FILTER vector quals for an arrow tuple slot.
*
* Used to implement vectorized aggregate function filter clause.
*/
static VectorQualState *
arrow_init_vector_quals(VectorAggState *agg_state, VectorAggDef *agg_def, TupleTableSlot *slot)
{
vector_qual_state_init(&agg_state->vqual_state.vqstate, agg_def->filter_clauses, slot);
return &agg_state->vqual_state.vqstate;
}

static TupleTableSlot *
vector_agg_exec(CustomScanState *node)
{
Expand Down Expand Up @@ -479,20 +581,42 @@ Node *
vector_agg_state_create(CustomScan *cscan)
{
VectorAggState *state = (VectorAggState *) newNode(sizeof(VectorAggState), T_CustomScanState);
CustomScan *childscan = castNode(CustomScan, linitial(cscan->custom_plans));

state->custom.methods = &exec_methods;

/*
* Initialize VectorAggState to process vector slots from different
* subnodes. Currently, only compressed batches are supported, but arrow
* slots will be supported as well.
* subnodes.
*
* VectorAgg supports two child nodes: ColumnarScan (producing arrow tuple
* table slots) and DecompressChunk (producing compressed batches).
*
* When the child is ColumnarScan, VectorAgg expects Arrow slots that
* carry arrow arrays. ColumnarScan performs standard qual filtering and
* vectorized qual filtering prior to handing the slot up to VectorAgg.
*
* When the child is DecompressChunk, VectorAgg doesn't read the slot from
* the child node. Instead, it bypasses DecompressChunk and reads
* compressed tuples directly from the grandchild. It therefore needs to
* handle batch decompression and vectorized qual filtering itself, in its
* own "get next slot" implementation.
*
* The vector qual init functions are needed to implement vectorized
* aggregate function FILTER clauses for arrow tuple table slots and
* compressed batches, respectively.
*/
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
if (is_columnar_scan(&childscan->scan.plan))
{
state->get_next_slot = arrow_get_next_slot;
state->init_vector_quals = arrow_init_vector_quals;
}
else
{
Assert(strcmp(childscan->methods->CustomName, "DecompressChunk") == 0);
state->get_next_slot = compressed_batch_get_next_slot;
state->init_vector_quals = compressed_batch_init_vector_quals;
}

return (Node *) state;
}
4 changes: 3 additions & 1 deletion tsl/src/nodes/vector_agg/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ typedef struct GroupingColumn
{
int input_offset;
int output_offset;
int value_bytes;

int16 value_bytes;
bool by_value;
} GroupingColumn;

typedef struct VectorAggState
Expand Down
27 changes: 26 additions & 1 deletion tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "exec.h"
#include "import/list.h"
#include "nodes/columnar_scan/columnar_scan.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "utils.h"
Expand Down Expand Up @@ -177,6 +178,27 @@ vector_agg_plan_create(Plan *childplan, Agg *agg, List *resolved_targetlist,
lfirst(list_nth_cell(vector_agg->custom_private, VASI_GroupingType)) =
makeInteger(grouping_type);

#if PG15_GE
if (is_columnar_scan(childplan))
{
CustomScan *custom = castNode(CustomScan, childplan);

/*
* ColumnarScan should not project when doing vectorized
* aggregation. If it projects, it will turn the arrow slot into a set
* of virtual slots and the vector data will not be passed up to
* VectorAgg.
*
* To make ColumnarScan avoid projection, unset the custom scan node's
* projection flag. Normally, it is to late to change this flag as
* PostgreSQL already planned projection based on it. However,
* ColumnarScan rechecks this flag before it begins execution and
* ignores any projection if the flag is not set.
*/
custom->flags &= ~CUSTOMPATH_SUPPORT_PROJECTION;
}
#endif

return (Plan *) vector_agg;
}

Expand Down Expand Up @@ -471,8 +493,11 @@ vectoragg_plan_possible(Plan *childplan, const List *rtable, VectorQualInfo *vqi

CustomScan *customscan = castNode(CustomScan, childplan);
bool vectoragg_possible = false;
RangeTblEntry *rte = rt_fetch(customscan->scan.scanrelid, rtable);

if (strcmp(customscan->methods->CustomName, "DecompressChunk") == 0)
if (ts_is_hypercore_am(ts_get_rel_am(rte->relid)))
vectoragg_possible = vectoragg_plan_tam(childplan, rtable, vqi);
else if (strcmp(customscan->methods->CustomName, "DecompressChunk") == 0)
vectoragg_possible = vectoragg_plan_decompress_chunk(childplan, vqi);

return vectoragg_possible;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ typedef enum

extern void _vector_agg_init(void);
extern bool vectoragg_plan_decompress_chunk(Plan *childplan, VectorQualInfo *vqi);

extern bool vectoragg_plan_tam(Plan *childplan, const List *rtable, VectorQualInfo *vqi);
Plan *try_insert_vector_agg_node(Plan *plan, List *rtable);
bool has_vector_agg_node(Plan *plan, bool *has_normal_agg);
Loading
Loading