forked from cmu-db/noisepage
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagg-vec.tpl
111 lines (90 loc) · 2.96 KB
/
agg-vec.tpl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// TODO(WAN): this doesn't work on prashanth's branch either, needs updating?
struct State {
table: AggregationHashTable
count: int32
}
struct Agg {
key: Integer
count: CountStarAggregate
}
fun setUpState(execCtx: *ExecutionContext, state: *State) -> nil {
@aggHTInit(&state.table, execCtx, @execCtxGetMem(execCtx), @sizeOf(Agg))
state.count = 0
}
fun tearDownState(state: *State) -> nil {
@aggHTFree(&state.table)
}
fun keyCheck(agg: *Agg, vpi: *VectorProjectionIterator) -> bool {
var key = @vpiGetInt(vpi, 1)
return @sqlToBool(key == agg.key)
}
fun hashFn(vpi: *VectorProjectionIterator) -> uint64 {
return @hash(@vpiGetInt(vpi, 1))
}
fun vecHashFnFiltered(hashes: [*]uint64, vec: *VectorProjectionIterator) -> nil {
for (var idx = 0; @vpiHasNextFiltered(vec); @vpiAdvanceFiltered(vec)) {
hashes[idx] = @hash(@vpiGetInt(vec, 1))
idx = idx + 1
}
}
fun vecHashFnUnfiltered(hashes: [*]uint64, vec: *VectorProjectionIterator) -> nil {
for (var idx = 0; @vpiHasNext(vec); @vpiAdvance(vec)) {
hashes[idx] = @hash(@vpiGetInt(vec, 1))
idx = idx + 1
}
}
fun vecHashFn(hashes: [*]uint64, vec: *VectorProjectionIterator) -> nil {
if (@vpiIsFiltered(vec)) {
vecHashFnFiltered(hashes, vec)
} else {
vecHashFnUnfiltered(hashes, vec)
}
}
fun constructAgg(agg: *Agg, vpi: *VectorProjectionIterator) -> nil {
// Set key
agg.key = @vpiGetInt(vpi, 1)
// Initialize aggregate
@aggInit(&agg.count)
}
fun updateAgg(agg: *Agg, vpi: *VectorProjectionIterator) -> nil {
var input = @vpiGetInt(vpi, 0)
@aggAdvance(&agg.count, &input)
}
fun pipeline_1(execCtx: *ExecutionContext, state: *State) -> nil {
// The table
var ht: *AggregationHashTable = &state.table
// Setup the iterator and iterate
var tvi: TableVectorIterator
var table_oid = @testCatalogLookup(execCtx, "test_1", "")
var col_oids: [2]uint32
col_oids[0] = @testCatalogLookup(execCtx, "test_1", "colA")
col_oids[1] = @testCatalogLookup(execCtx, "test_1", "colB")
col_oids[2] = @testCatalogLookup(execCtx, "test_1", "colC")
for (@tableIterInit(&tvi, execCtx, table_oid, col_oids); @tableIterAdvance(&tvi); ) {
var vec = @tableIterGetVPI(&tvi)
@aggHTProcessBatch(ht, vec, hashFn, keyCheck, constructAgg, updateAgg, false)
}
@tableIterClose(&tvi)
}
fun pipeline_2(state: *State) -> nil {
var aht_iter: AHTIterator
var iter = &aht_iter
for (@aggHTIterInit(iter, &state.table); @aggHTIterHasNext(iter); @aggHTIterNext(iter)) {
var agg = @ptrCast(*Agg, @aggHTIterGetRow(iter))
state.count = state.count + 1
}
@aggHTIterClose(iter)
}
fun main(execCtx: *ExecutionContext) -> int32 {
var state: State
// Initialize state
setUpState(execCtx, &state)
// Run pipeline 1
pipeline_1(execCtx, &state)
// Run pipeline 2
pipeline_2(&state)
var ret = state.count
// Cleanup
tearDownState(&state)
return ret
}