Skip to content
This repository has been archived by the owner on Feb 20, 2023. It is now read-only.

Fix storage interface, table vector iterator not being freed on abort. #1482

Merged
merged 19 commits into from
Mar 15, 2021

Conversation

lmwnshn
Copy link
Contributor

@lmwnshn lmwnshn commented Feb 20, 2021

Heading

Fix storage interface and table vector iterator not being freed on abort.

Description

Fix #1481. Fix #1489.

This changes function-local variables like the StorageInterface and TableVectorIterator to be stashed in the pipeline state instead.

The motivation is that there are unfortunately multiple ways to exit out of the TPL function; in addition to reaching the end of the function, you might reach an abortTxn. The pipeline setup and teardown functions are guaranteed to be called, so it is better to have "must be cleaned up" stuff there.

However, there's a slight complication with nested pipelines, e.g., SELECT * FROM table1, table2;.
Inside the nested loop, it needs to perform the setup and teardown per-iteration.
So it isn't always enough to teardown at PipelineState time, you still need to teardown per-iteration.
Currently, this is resolved by stashing a boolean flag in the PipelineState indicating whether it still needs to teardown at the pipeline teardown time.

Remaining tasks

N/A.

Future work

The nested pipeline complication might come up again with other translators, I only noticed it for seq scan.

@lmwnshn lmwnshn added bug Something isn't working (correctness). Mark issues with this. in-progress This PR is being actively worked on and not ready to be reviewed or merged. Mark PRs with this. ready-for-ci Indicate that this build should be run through CI. labels Feb 20, 2021
@lmwnshn lmwnshn self-assigned this Feb 20, 2021
@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
-6.57% tpcc RAM disk
Detailsmaster tps=8925.73, commit tps=8339.5, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
-5.82% tpcc None
Detailsmaster tps=10029.72, commit tps=9445.8, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-0.94% tpcc HDD
Detailsmaster tps=524.85, commit tps=519.94, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
-9.8% tatp RAM disk
Detailsmaster tps=3742.44, commit tps=3375.66, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
-13.66% tatp None
Detailsmaster tps=4059.08, commit tps=3504.7, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-2.45% tatp HDD
Detailsmaster tps=372.5, commit tps=363.36, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@codecov
Copy link

codecov bot commented Feb 21, 2021

Codecov Report

Merging #1482 (c78dec7) into master (86d87d1) will decrease coverage by 0.03%.
The diff coverage is 71.42%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1482      +/-   ##
==========================================
- Coverage   81.80%   81.77%   -0.04%     
==========================================
  Files         706      706              
  Lines       50433    50437       +4     
==========================================
- Hits        41257    41243      -14     
- Misses       9176     9194      +18     
Impacted Files Coverage Δ
...execution/compiler/operator/analyze_translator.cpp 0.00% <0.00%> (ø)
src/include/execution/compiler/codegen.h 100.00% <ø> (ø)
...de/execution/compiler/operator/delete_translator.h 20.00% <ø> (ø)
...xecution/compiler/operator/index_scan_translator.h 25.00% <ø> (-15.00%) ⬇️
...de/execution/compiler/operator/insert_translator.h 25.00% <ø> (ø)
...de/execution/compiler/operator/update_translator.h 25.00% <ø> (ø)
.../execution/compiler/operator/delete_translator.cpp 67.56% <77.77%> (ø)
src/execution/compiler/codegen.cpp 83.07% <100.00%> (+0.02%) ⬆️
...cution/compiler/operator/index_scan_translator.cpp 98.11% <100.00%> (ø)
.../execution/compiler/operator/insert_translator.cpp 97.22% <100.00%> (ø)
... and 13 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 86d87d1...c78dec7. Read the comment docs.

@lmwnshn
Copy link
Contributor Author

lmwnshn commented Feb 21, 2021

Dumping this here for myself to look at tomorrow, others are welcome too..

struct QueryState {
    execCtx: *ExecutionContext
}
struct P1_State {
    storageInterface: StorageInterface
}
fun Query0_Init(queryState: *QueryState) -> nil {
    return
}

fun Query0_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var col_oids: [1]uint32
    col_oids[0] = 1
    @storageInterfaceInit(&pipelineState.storageInterface, queryState.execCtx, 1061, col_oids, true)
    return
}

fun Query0_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceFree(&pipelineState.storageInterface)
    return
}

fun Query0_Pipeline1_SerialWork(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var insert_pr: *ProjectedRow
    insert_pr = @getTablePR(&pipelineState.storageInterface)
    @prSetIntNull(insert_pr, 0, @intToSql(1))
    var insert_slot = @tableInsert(&pipelineState.storageInterface)
    @execCtxAddRowsAffected(queryState.execCtx, 1)
    insert_pr = @getTablePR(&pipelineState.storageInterface)
    @prSetIntNull(insert_pr, 0, @intToSql(2))
    var insert_slot1 = @tableInsert(&pipelineState.storageInterface)
    @execCtxAddRowsAffected(queryState.execCtx, 1)
    insert_pr = @getTablePR(&pipelineState.storageInterface)
    @prSetIntNull(insert_pr, 0, @intToSql(3))
    var insert_slot2 = @tableInsert(&pipelineState.storageInterface)
    @execCtxAddRowsAffected(queryState.execCtx, 1)
    return
}

fun Query0_Pipeline1_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P1_State), Query0_Pipeline1_InitPipelineState, Query0_Pipeline1_TearDownPipelineState, queryState)
    return
}

fun Query0_Pipeline1_Run(queryState: *QueryState) -> nil {
    var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    Query0_Pipeline1_SerialWork(queryState, pipelineState)
    return
}

fun Query0_Pipeline1_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query0_TearDown(queryState: *QueryState) -> nil {
    return
}


struct QueryState {
    execCtx: *ExecutionContext
}
struct P1_State {
    filterManager   : FilterManager
    storageInterface: StorageInterface
}
fun Query1_Pipeline1_FilterClause(execCtx: *ExecutionContext, vp: *VectorProjection, tids: *TupleIdList, context: *uint8) -> nil {
    @filterEq(execCtx, vp, 0, @intToSql(3), tids)
    return
}

fun Query1_Init(queryState: *QueryState) -> nil {
    return
}

fun Query1_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var col_oids: [1]uint32
    col_oids[0] = 1
    @storageInterfaceInit(&pipelineState.storageInterface, queryState.execCtx, 1061, col_oids, true)
    @filterManagerInit(&pipelineState.filterManager, queryState.execCtx)
    @filterManagerInsertFilter(&pipelineState.filterManager, Query1_Pipeline1_FilterClause)
    return
}

fun Query1_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceFree(&pipelineState.storageInterface)
    @filterManagerFree(&pipelineState.filterManager)
    return
}

fun Query1_Pipeline1_SerialWork(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var tviBase: TableVectorIterator
    var tvi = &tviBase
    var col_oids1: [1]uint32
    col_oids1[0] = 1
    @tableIterInit(tvi, queryState.execCtx, 1061, col_oids1)
    var slot: TupleSlot
    for (@tableIterAdvance(tvi)) {
        var vpi = @tableIterGetVPI(tvi)
        @filterManagerRunFilters(&pipelineState.filterManager, vpi, queryState.execCtx)
        for (; @vpiHasNextFiltered(vpi); @vpiAdvanceFiltered(vpi)) {
            slot = @vpiGetSlot(vpi)
            var update_pr: *ProjectedRow
            update_pr = @getTablePR(&pipelineState.storageInterface)
            @prSetIntNull(update_pr, 0, @intToSql(4))
            if (!@tableUpdate(&pipelineState.storageInterface, &slot)) {
                @abortTxn(queryState.execCtx)
            }
            @execCtxAddRowsAffected(queryState.execCtx, 1)
        }
        var vpi_num_tuples = @tableIterGetVPINumTuples(tvi)
    }
    @tableIterClose(tvi)
    return
}

fun Query1_Pipeline1_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P1_State), Query1_Pipeline1_InitPipelineState, Query1_Pipeline1_TearDownPipelineState, queryState)
    return
}

fun Query1_Pipeline1_Run(queryState: *QueryState) -> nil {
    var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    Query1_Pipeline1_SerialWork(queryState, pipelineState)
    return
}

fun Query1_Pipeline1_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query1_TearDown(queryState: *QueryState) -> nil {
    return
}


struct QueryState {
    execCtx: *ExecutionContext
}
struct P1_State {
    filterManager   : FilterManager
    storageInterface: StorageInterface
}
fun Query2_Pipeline1_FilterClause(execCtx: *ExecutionContext, vp: *VectorProjection, tids: *TupleIdList, context: *uint8) -> nil {
    @filterEq(execCtx, vp, 0, @intToSql(3), tids)
    return
}

fun Query2_Init(queryState: *QueryState) -> nil {
    return
}

fun Query2_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var col_oids: [1]uint32
    col_oids[0] = 1
    @storageInterfaceInit(&pipelineState.storageInterface, queryState.execCtx, 1061, col_oids, true)
    @filterManagerInit(&pipelineState.filterManager, queryState.execCtx)
    @filterManagerInsertFilter(&pipelineState.filterManager, Query2_Pipeline1_FilterClause)
    return
}

fun Query2_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceFree(&pipelineState.storageInterface)
    @filterManagerFree(&pipelineState.filterManager)
    return
}

fun Query2_Pipeline1_SerialWork(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var tviBase: TableVectorIterator
    var tvi = &tviBase
    var col_oids1: [1]uint32
    col_oids1[0] = 1
    @tableIterInit(tvi, queryState.execCtx, 1061, col_oids1)
    var slot: TupleSlot
    for (@tableIterAdvance(tvi)) {
        var vpi = @tableIterGetVPI(tvi)
        @filterManagerRunFilters(&pipelineState.filterManager, vpi, queryState.execCtx)
        for (; @vpiHasNextFiltered(vpi); @vpiAdvanceFiltered(vpi)) {
            slot = @vpiGetSlot(vpi)
            var update_pr: *ProjectedRow
            update_pr = @getTablePR(&pipelineState.storageInterface)
            @prSetIntNull(update_pr, 0, @intToSql(5))
            if (!@tableUpdate(&pipelineState.storageInterface, &slot)) {
                @abortTxn(queryState.execCtx)
            }
            @execCtxAddRowsAffected(queryState.execCtx, 1)
        }
        var vpi_num_tuples = @tableIterGetVPINumTuples(tvi)
    }
    @tableIterClose(tvi)
    return
}

fun Query2_Pipeline1_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P1_State), Query2_Pipeline1_InitPipelineState, Query2_Pipeline1_TearDownPipelineState, queryState)
    return
}

fun Query2_Pipeline1_Run(queryState: *QueryState) -> nil {
    var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    Query2_Pipeline1_SerialWork(queryState, pipelineState)
    return
}

fun Query2_Pipeline1_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query2_TearDown(queryState: *QueryState) -> nil {
    return
}


struct QueryState {
    execCtx: *ExecutionContext
}
struct P1_State {
    filterManager   : FilterManager
    storageInterface: StorageInterface
}
fun Query3_Pipeline1_FilterClause(execCtx: *ExecutionContext, vp: *VectorProjection, tids: *TupleIdList, context: *uint8) -> nil {
    @filterEq(execCtx, vp, 0, @intToSql(3), tids)
    return
}

fun Query3_Init(queryState: *QueryState) -> nil {
    return
}

fun Query3_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var col_oids: [1]uint32
    col_oids[0] = 1
    @storageInterfaceInit(&pipelineState.storageInterface, queryState.execCtx, 1061, col_oids, true)
    @filterManagerInit(&pipelineState.filterManager, queryState.execCtx)
    @filterManagerInsertFilter(&pipelineState.filterManager, Query3_Pipeline1_FilterClause)
    return
}

fun Query3_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceFree(&pipelineState.storageInterface)
    @filterManagerFree(&pipelineState.filterManager)
    return
}

fun Query3_Pipeline1_SerialWork(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var tviBase: TableVectorIterator
    var tvi = &tviBase
    var col_oids1: [1]uint32
    col_oids1[0] = 1
    @tableIterInit(tvi, queryState.execCtx, 1061, col_oids1)
    var slot: TupleSlot
    for (@tableIterAdvance(tvi)) {
        var vpi = @tableIterGetVPI(tvi)
        @filterManagerRunFilters(&pipelineState.filterManager, vpi, queryState.execCtx)
        for (; @vpiHasNextFiltered(vpi); @vpiAdvanceFiltered(vpi)) {
            slot = @vpiGetSlot(vpi)
            var update_pr: *ProjectedRow
            update_pr = @getTablePR(&pipelineState.storageInterface)
            @prSetIntNull(update_pr, 0, @intToSql(5))
            if (!@tableUpdate(&pipelineState.storageInterface, &slot)) {
                @abortTxn(queryState.execCtx)
            }
            @execCtxAddRowsAffected(queryState.execCtx, 1)
        }
        var vpi_num_tuples = @tableIterGetVPINumTuples(tvi)
    }
    @tableIterClose(tvi)
    return
}

fun Query3_Pipeline1_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P1_State), Query3_Pipeline1_InitPipelineState, Query3_Pipeline1_TearDownPipelineState, queryState)
    return
}

fun Query3_Pipeline1_Run(queryState: *QueryState) -> nil {
    var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    Query3_Pipeline1_SerialWork(queryState, pipelineState)
    return
}

fun Query3_Pipeline1_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query3_TearDown(queryState: *QueryState) -> nil {
    return
}

…of inside a pipeline.

Unfortunately this still leaks in TrafficCopTest's DisconnectAbort.
@lmwnshn lmwnshn removed the in-progress This PR is being actively worked on and not ready to be reviewed or merged. Mark PRs with this. label Feb 25, 2021
@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
2.11% tpcc RAM disk
Detailsmaster tps=8869.26, commit tps=9056.54, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
2.05% tpcc None
Detailsmaster tps=10096.65, commit tps=10303.44, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-12.55% tpcc HDD
Detailsmaster tps=555.77, commit tps=486.03, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
1.2% tatp RAM disk
Detailsmaster tps=3754.97, commit tps=3800.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
-1.65% tatp None
Detailsmaster tps=4122.08, commit tps=4054.18, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-15.7% tatp HDD
Detailsmaster tps=396.31, commit tps=334.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
1.86% tpcc RAM disk
Detailsmaster tps=8891.29, commit tps=9056.54, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
2.13% tpcc None
Detailsmaster tps=10088.23, commit tps=10303.44, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-2.77% tpcc HDD
Detailsmaster tps=499.88, commit tps=486.03, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
0.98% tatp RAM disk
Detailsmaster tps=3763.14, commit tps=3800.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
-1.94% tatp None
Detailsmaster tps=4134.34, commit tps=4054.18, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-6.59% tatp HDD
Detailsmaster tps=357.66, commit tps=334.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
-0.43% tpcc RAM disk
Detailsmaster tps=9095.96, commit tps=9056.54, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
0.82% tpcc None
Detailsmaster tps=10219.58, commit tps=10303.44, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-3.53% tpcc HDD
Detailsmaster tps=503.84, commit tps=486.03, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
3.57% tatp RAM disk
Detailsmaster tps=3669.26, commit tps=3800.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
1.16% tatp None
Detailsmaster tps=4007.62, commit tps=4054.18, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-5.75% tatp HDD
Detailsmaster tps=354.47, commit tps=334.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

1 similar comment
@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
-0.43% tpcc RAM disk
Detailsmaster tps=9095.96, commit tps=9056.54, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
0.82% tpcc None
Detailsmaster tps=10219.58, commit tps=10303.44, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-3.53% tpcc HDD
Detailsmaster tps=503.84, commit tps=486.03, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
3.57% tatp RAM disk
Detailsmaster tps=3669.26, commit tps=3800.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
1.16% tatp None
Detailsmaster tps=4007.62, commit tps=4054.18, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-5.75% tatp HDD
Detailsmaster tps=354.47, commit tps=334.1, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@lmwnshn lmwnshn added the ready-for-review This PR passes all checks and is ready to be reviewed. Mark PRs with this. label Mar 5, 2021
Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except that analyze_translator needs the same changes. I can make the updates to analyze_translator if you want.

@lmwnshn
Copy link
Contributor Author

lmwnshn commented Mar 7, 2021

ah sorry, we had a brief discussion at standup: my current goal is to try and make the non-hacky version work.

…e level instead of inside a pipeline.""

This reverts commit bc95c9d.
@jkosh44
Copy link
Contributor

jkosh44 commented Mar 7, 2021

ah sorry, we had a brief discussion at standup: my current goal is to try and make the non-hacky version work.

Ah, that's what I get for missing standup.

@lmwnshn lmwnshn added in-progress This PR is being actively worked on and not ready to be reviewed or merged. Mark PRs with this. and removed ready-for-review This PR passes all checks and is ready to be reviewed. Mark PRs with this. labels Mar 8, 2021
@lmwnshn
Copy link
Contributor Author

lmwnshn commented Mar 8, 2021

Nah, I should've marked it in progress, sorry.

Current update on the pipeline state version:
So the storageInterfaceFree IS getting called.
The tlsClear IS getting called.

It appears the table vector iterator is leaking actually, if the report is to be believed...

…lled in pipeline state teardown.

Nesting adds new complications.
The problem is that for a SeqScan, it might actually be nested inside another node like another SeqScan.
So PipelineState is actually not quite the right level for any variable that can appear nested,
because those will need to be setup/teardown'd per nested iteration instead of once per pipeline.

But we shove it into the pipeline state anyway and basically generate an if for whether it needs to be freed.

This raises the question of whether the other translators have similar issues.
If they do and our tests don't catch it, then we probably need better tests...
@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
-0.89% tpcc RAM disk
Detailsmaster tps=8940.36, commit tps=8860.42, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
-1.27% tpcc None
Detailsmaster tps=10142.5, commit tps=10013.45, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-14.85% tpcc HDD
Detailsmaster tps=539.43, commit tps=459.32, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
2.53% tatp RAM disk
Detailsmaster tps=3789.09, commit tps=3884.86, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
-0.33% tatp None
Detailsmaster tps=4136.99, commit tps=4123.35, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-18.34% tatp HDD
Detailsmaster tps=385.23, commit tps=314.56, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@lmwnshn lmwnshn added ready-for-review This PR passes all checks and is ready to be reviewed. Mark PRs with this. and removed in-progress This PR is being actively worked on and not ready to be reviewed or merged. Mark PRs with this. labels Mar 9, 2021
@lmwnshn lmwnshn changed the title Fix storage interface not being freed on abort. Fix storage interface, table vector iterator not being freed on abort. Mar 9, 2021
@apavlo apavlo requested a review from jkosh44 March 9, 2021 20:45

void AnalyzeTranslator::InitializePipelineState(const Pipeline &pipeline, FunctionBuilder *function) const {
// @storageInterfaceInit(&pipelineState.storageInterface, execCtx, table_oid, col_oids, true)
DeclareAndInitPgStatisticUpdater(function);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we need to remove the call to this function and FreePgStatisticUpdater on lines 123 and 153?

@@ -282,7 +282,10 @@ void IndexCreateTranslator::IndexInsert(WorkContext *ctx, FunctionBuilder *funct
codegen_->ConstBool(index_schema.Unique())});
auto *cond = codegen_->UnaryOp(parsing::Token::Type::BANG, index_insert_call);
If success(function, cond);
{ function->Append(codegen_->AbortTxn(GetExecutionContext())); }
{
TearDownStorageInterface(function, local_storage_interface_.GetPtr(codegen_));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you may have forgotten to update this class to move this logic to the pipeline state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weirdly enough, this already had it in the pipeline state. I removed my change, and it looks like things are fine:

struct QueryState {
    execCtx        : *ExecutionContext
    global_col_oids: [1]uint32
}
struct P1_State {
    local_storage_interface: StorageInterface
    local_index_pr         : *ProjectedRow
    local_tuple_slot       : TupleSlot
    num_inserts            : uint32
    execFeatures           : ExecOUFeatureVector
}
fun Query0_Init(queryState: *QueryState) -> nil {
    queryState.global_col_oids[0] = 1
    return
}

fun Query0_Pipeline1_PostHook(queryState: *QueryState, pipelineState: *P1_State, dummyArg: *nil) -> nil {
    @execOUFeatureVectorInit(queryState.execCtx, &pipelineState.execFeatures, 1, true)
    @registerThreadWithMetricsManager(queryState.execCtx)
    @execCtxStartPipelineTracker(queryState.execCtx, 1)
    var num_tuples = @indexGetSize(&pipelineState.local_storage_interface)
    @execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 0, 0, num_tuples)
    @execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 1, 0, num_tuples)
    var heap_size = @storageInterfaceGetIndexHeapSize(&pipelineState.local_storage_interface)
    @execCtxSetMemoryUseOverride(queryState.execCtx, heap_size)
    @execCtxEndPipelineTracker(queryState.execCtx, 0, 1, &pipelineState.execFeatures)
    @aggregateMetricsThread(queryState.execCtx)
    @execOUFeatureVectorReset(&pipelineState.execFeatures)
    return
}

fun Query0_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceInit(&pipelineState.local_storage_interface, queryState.execCtx, 1061, queryState.global_col_oids, false)
    pipelineState.local_index_pr = @getIndexPR(&pipelineState.local_storage_interface, 1062)
    return
}

fun Query0_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceFree(&pipelineState.local_storage_interface)
    @execOUFeatureVectorReset(&pipelineState.execFeatures)
    return
}

fun Query0_Pipeline1_ParallelWork(queryState: *QueryState, pipelineState: *P1_State, tvi: *TableVectorIterator) -> nil {
    pipelineState.num_inserts = 0
    @execOUFeatureVectorInit(queryState.execCtx, &pipelineState.execFeatures, 1, false)
    @registerThreadWithMetricsManager(queryState.execCtx)
    @execCtxStartPipelineTracker(queryState.execCtx, 1)
    for (@tableIterAdvance(tvi)) {
        var vpi = @tableIterGetVPI(tvi)
        for (; @vpiHasNext(vpi); @vpiAdvance(vpi)) {
            pipelineState.local_tuple_slot = @vpiGetSlot(vpi)
            @prSetIntNull(pipelineState.local_index_pr, 0, @vpiGetIntNull(vpi, 0))
            if (!@indexInsertWithSlot(&pipelineState.local_storage_interface, &pipelineState.local_tuple_slot, false)) {
                @abortTxn(queryState.execCtx)
            }
            pipelineState.num_inserts = pipelineState.num_inserts + IntegralCast(1)
        }
    }
    @execCtxSetMemoryUseOverride(queryState.execCtx, 0)
    @execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 0, 0, pipelineState.num_inserts)
    @execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 1, 0, pipelineState.num_inserts)
    @execCtxEndPipelineTracker(queryState.execCtx, 0, 1, &pipelineState.execFeatures)
    @aggregateMetricsThread(queryState.execCtx)
    @execOUFeatureVectorReset(&pipelineState.execFeatures)
    return
}

fun Query0_Pipeline1_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P1_State), Query0_Pipeline1_InitPipelineState, Query0_Pipeline1_TearDownPipelineState, queryState)
    return
}

fun Query0_Pipeline1_Run(queryState: *QueryState) -> nil {
    var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    @execCtxInitHooks(queryState.execCtx, 1)
    @execCtxRegisterHook(queryState.execCtx, 0, Query0_Pipeline1_PostHook)
    @iterateTableParallel(1061, queryState.global_col_oids, queryState, queryState.execCtx, Query0_Pipeline1_ParallelWork)
    @execCtxClearHooks(queryState.execCtx)
    return
}

fun Query0_Pipeline1_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query0_TearDown(queryState: *QueryState) -> nil {
    return
}

// @storageInterfaceInit(pg_statistic_updater, execCtx, table_oid, pg_statistic_col_oids, true)
auto *updater_setup = codegen->StorageInterfaceInit(
pg_statistic_updater_, GetExecutionContext(),
pg_statistic_updater_.GetPtr(codegen), GetExecutionContext(),
catalog::postgres::PgStatistic::STATISTIC_TABLE_OID.UnderlyingValue(), pg_statistic_col_oids_, true);
Copy link
Contributor

@jkosh44 jkosh44 Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll have to move pg_statistic_col_oids_ to the pipeline state too now, otherwise it will not be found when trying to init the updater.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oddly, it passed CI before -- do we have any analyze tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I think you meant with the changes. Changes have been made.

@@ -35,7 +35,6 @@ AnalyzeTranslator::AnalyzeTranslator(const planner::AnalyzePlanNode &plan, Compi
num_rows_(GetCodeGen()->MakeFreshIdentifier("num_rows")),
pg_statistic_index_iterator_(GetCodeGen()->MakeFreshIdentifier("pg_statistic_index_iterator")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also allocated and freed within the pipeline work. I think we probably have to move it to the pipeline state to avoid the same problem.

@@ -36,6 +36,11 @@ SeqScanTranslator::SeqScanTranslator(const planner::SeqScanPlanNode &plan, Compi
local_filter_manager_ = pipeline->DeclarePipelineStateEntry("filterManager", fm_type);
}

tvi_base_ =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to make these changes to the seq_scan_translator? As far as I can tell we don't call abort from anywhere within the translator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A seq scan translator can "nest" (ctx->Push(), (line 227)) other translators inside it.

Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a couple of comments/questions

@lmwnshn
Copy link
Contributor Author

lmwnshn commented Mar 14, 2021

Ready for a second pass @jkosh44, fyi analyze looks like this rn:

create table foo (a int);
analyze foo;

---

struct AggPayload {
    agg_term_attr0: CountAggregate
    agg_term_attr1: CountAggregate
    agg_term_attr2: CountAggregate
    agg_term_attr3: IntegerTopKAggregate
    agg_term_attr4: IntegerHistogramAggregate
}
struct AggValues {
    agg_term_attr0: Integer
    agg_term_attr1: Integer
    agg_term_attr2: Integer
    agg_term_attr3: Integer
    agg_term_attr4: Integer
}
struct KeyType {
    agg_distinct: Integer
}
struct QueryState {
    execCtx   : *ExecutionContext
    hashTable2: AggregationHashTable
    aggs      : AggPayload
}
struct P2_State {
    tviBase     : TableVectorIterator
    tviNeedsFree: bool
}
struct P1_State {
    storageInterface: StorageInterface
    indexIterator   : IndexIterator
}
fun Query0_Pipeline1_DistinctKeyFn(payload: *KeyType, key: *KeyType) -> bool {
    if (SqlBoolToBool(payload.agg_distinct != key.agg_distinct)) {
        return false
    }
    return true
}

fun Query0_Init(queryState: *QueryState) -> nil {
    @aggHTInit(&queryState.hashTable2, queryState.execCtx, @sizeOf(KeyType))
    return
}

fun Query0_Pipeline2_InitPipelineState(queryState: *QueryState, pipelineState: *P2_State) -> nil {
    return
}

fun Query0_Pipeline2_TearDownPipelineState(queryState: *QueryState, pipelineState: *P2_State) -> nil {
    if (pipelineState.tviNeedsFree) {
        @tableIterClose(&pipelineState.tviBase)
        pipelineState.tviNeedsFree = false
    }
    return
}

fun Query0_Pipeline2_SerialWork(queryState: *QueryState, pipelineState: *P2_State) -> nil {
    var tvi = &pipelineState.tviBase
    var col_oids: [1]uint32
    col_oids[0] = 1
    @tableIterInit(tvi, queryState.execCtx, 1061, col_oids)
    pipelineState.tviNeedsFree = true
    var slot: TupleSlot
    for (@tableIterAdvance(tvi)) {
        var vpi = @tableIterGetVPI(tvi)
        for (; @vpiHasNext(vpi); @vpiAdvance(vpi)) {
            slot = @vpiGetSlot(vpi)
            var aggValues: AggValues
            aggValues.agg_term_attr0 = @intToSql(0)
            aggValues.agg_term_attr1 = @vpiGetIntNull(vpi, 0)
            aggValues.agg_term_attr2 = @vpiGetIntNull(vpi, 0)
            aggValues.agg_term_attr3 = @vpiGetIntNull(vpi, 0)
            aggValues.agg_term_attr4 = @vpiGetIntNull(vpi, 0)
            @aggAdvance(&queryState.aggs.agg_term_attr0, &aggValues.agg_term_attr0)
            @aggAdvance(&queryState.aggs.agg_term_attr1, &aggValues.agg_term_attr1)
            var lookupKey: KeyType
            lookupKey.agg_distinct = @vpiGetIntNull(vpi, 0)
            var hashVal = @hash(lookupKey.agg_distinct)
            var lookupPayload = @ptrCast(*KeyType, @aggHTLookup(&queryState.hashTable2, hashVal, Query0_Pipeline1_DistinctKeyFn, &lookupKey))
            if (lookupPayload == nil) {
                lookupPayload = @ptrCast(*KeyType, @aggHTInsert(&queryState.hashTable2, hashVal, false))
                lookupPayload.agg_distinct = lookupKey.agg_distinct
                @aggAdvance(&queryState.aggs.agg_term_attr2, &aggValues.agg_term_attr2)
            }
            @aggAdvance(&queryState.aggs.agg_term_attr3, &aggValues.agg_term_attr3)
            @aggAdvance(&queryState.aggs.agg_term_attr4, &aggValues.agg_term_attr4)
        }
        var vpi_num_tuples = @tableIterGetVPINumTuples(tvi)
    }
    @tableIterClose(&pipelineState.tviBase)
    pipelineState.tviNeedsFree = false
    return
}

fun Query0_Pipeline2_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P2_State), Query0_Pipeline2_InitPipelineState, Query0_Pipeline2_TearDownPipelineState, queryState)
    return
}

fun Query0_Pipeline2_Run(queryState: *QueryState) -> nil {
    @aggInit(&queryState.aggs.agg_term_attr0)
    @aggInit(&queryState.aggs.agg_term_attr1)
    @aggInit(&queryState.aggs.agg_term_attr2)
    @aggInit(&queryState.aggs.agg_term_attr3)
    @aggInit(&queryState.aggs.agg_term_attr4)
    var pipelineState = @ptrCast(*P2_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    Query0_Pipeline2_SerialWork(queryState, pipelineState)
    return
}

fun Query0_Pipeline2_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query0_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var pg_statistic_col_oids: [7]uint32
    pg_statistic_col_oids[0] = 1
    pg_statistic_col_oids[1] = 2
    pg_statistic_col_oids[2] = 3
    pg_statistic_col_oids[3] = 4
    pg_statistic_col_oids[4] = 5
    pg_statistic_col_oids[5] = 6
    pg_statistic_col_oids[6] = 7
    @storageInterfaceInit(&pipelineState.storageInterface, queryState.execCtx, 91, pg_statistic_col_oids, true)
    @indexIteratorInit(&pipelineState.indexIterator, queryState.execCtx, 2, 91, 92, pg_statistic_col_oids)
    return
}

fun Query0_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    @storageInterfaceFree(&pipelineState.storageInterface)
    @indexIteratorFree(&pipelineState.indexIterator)
    @aggFree(&queryState.aggs.agg_term_attr3)
    @aggFree(&queryState.aggs.agg_term_attr4)
    return
}

fun Query0_Pipeline1_SerialWork(queryState: *QueryState, pipelineState: *P1_State) -> nil {
    var aggRow = &queryState.aggs
    var table_oid = @intToSql(1061)
    var col_oid: Integer
    var num_rows = @aggResult(&aggRow.agg_term_attr0)
    var stat_col_4: Integer
    var stat_col_5: Integer
    var stat_col_6: StringVal
    var stat_col_7: StringVal
    var pg_statistic_index_pr: *ProjectedRow
    col_oid = @intToSql(1)
    stat_col_4 = @aggResult(&aggRow.agg_term_attr1)
    stat_col_5 = @aggResult(&aggRow.agg_term_attr2)
    stat_col_6 = @aggResult(queryState.execCtx, &aggRow.agg_term_attr3)
    stat_col_7 = @aggResult(queryState.execCtx, &aggRow.agg_term_attr4)
    pg_statistic_index_pr = @indexIteratorGetPR(&pipelineState.indexIterator)
    @prSetInt(pg_statistic_index_pr, 0, table_oid)
    @prSetInt(pg_statistic_index_pr, 1, col_oid)
    for (@indexIteratorScanKey(&pipelineState.indexIterator); @indexIteratorAdvance(&pipelineState.indexIterator); ) {
        var pg_statistic_slot = @indexIteratorGetSlot(&pipelineState.indexIterator)
        var pg_statistic_update_pr: *ProjectedRow
        if (!@tableDelete(&pipelineState.storageInterface, &pg_statistic_slot)) {
            @abortTxn(queryState.execCtx)
        }
        pg_statistic_update_pr = @getTablePR(&pipelineState.storageInterface)
        @prSetInt(pg_statistic_update_pr, 2, table_oid)
        @prSetInt(pg_statistic_update_pr, 3, col_oid)
        @prSetInt(pg_statistic_update_pr, 4, num_rows)
        @prSetInt(pg_statistic_update_pr, 5, stat_col_4)
        @prSetInt(pg_statistic_update_pr, 6, stat_col_5)
        @prSetVarlenNull(pg_statistic_update_pr, 0, stat_col_6, true)
        @prSetVarlenNull(pg_statistic_update_pr, 1, stat_col_7, true)
        var insert_slot = @tableInsert(&pipelineState.storageInterface)
        var delete_index_pr = @getIndexPR(&pipelineState.storageInterface, 92)
        @prSetInt(delete_index_pr, 0, table_oid)
        @prSetInt(delete_index_pr, 1, col_oid)
        @indexDelete(&pipelineState.storageInterface, &pg_statistic_slot)
        var insert_index_pr = @getIndexPR(&pipelineState.storageInterface, 92)
        @prSetInt(insert_index_pr, 0, table_oid)
        @prSetInt(insert_index_pr, 1, col_oid)
        if (!@indexInsertUnique(&pipelineState.storageInterface)) {
            @abortTxn(queryState.execCtx)
        }
    }
    return
}

fun Query0_Pipeline1_Init(queryState: *QueryState) -> nil {
    var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
    @tlsReset(threadStateContainer, @sizeOf(P1_State), Query0_Pipeline1_InitPipelineState, Query0_Pipeline1_TearDownPipelineState, queryState)
    return
}

fun Query0_Pipeline1_Run(queryState: *QueryState) -> nil {
    var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
    Query0_Pipeline1_SerialWork(queryState, pipelineState)
    return
}

fun Query0_Pipeline1_TearDown(queryState: *QueryState) -> nil {
    @tlsClear(@execCtxGetTLS(queryState.execCtx))
    @ensureTrackersStopped(queryState.execCtx)
    return
}

fun Query0_TearDown(queryState: *QueryState) -> nil {
    @aggHTFree(&queryState.hashTable2)
    return
}

@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
0.36% tpcc RAM disk
Detailsmaster tps=8999.14, commit tps=9031.87, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
-0.3% tpcc None
Detailsmaster tps=9961.35, commit tps=9931.19, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-5.98% tpcc HDD
Detailsmaster tps=482.02, commit tps=453.18, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
3.34% tatp RAM disk
Detailsmaster tps=3439.88, commit tps=3554.78, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
-1.92% tatp None
Detailsmaster tps=3796.27, commit tps=3723.39, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-7.62% tatp HDD
Detailsmaster tps=345.48, commit tps=319.17, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@noisepage-checks
Copy link

Minor Decrease in Performance

Be warned: this PR may have decreased the throughput of the system slightly.

tps (%change) benchmark_type wal_device details
-0.27% tpcc RAM disk
Detailsmaster tps=8999.14, commit tps=8974.71, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
-3.48% tpcc None
Detailsmaster tps=9961.35, commit tps=9615.06, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-0.28% tpcc HDD
Detailsmaster tps=482.02, commit tps=480.66, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
3.91% tatp RAM disk
Detailsmaster tps=3439.88, commit tps=3574.34, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
0.1% tatp None
Detailsmaster tps=3796.27, commit tps=3799.95, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-1.06% tatp HDD
Detailsmaster tps=345.48, commit tps=341.83, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@noisepage-checks
Copy link

Major Decrease in Performance

STOP: this PR has a major negative performance impact

tps (%change) benchmark_type wal_device details
-0.34% tpcc RAM disk
Detailsmaster tps=8999.14, commit tps=8968.8, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=RAM disk, max_connection_threads=32
0.92% tpcc None
Detailsmaster tps=9961.35, commit tps=10052.8, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=None, max_connection_threads=32
-5.46% tpcc HDD
Detailsmaster tps=482.02, commit tps=455.69, query_mode=extended, benchmark_type=tpcc, scale_factor=32.0000, terminals=32, client_time=60, weights={'Payment': 43, 'Delivery': 4, 'NewOrder': 45, 'StockLevel': 4, 'OrderStatus': 4}, wal_device=HDD, max_connection_threads=32
4.68% tatp RAM disk
Detailsmaster tps=3439.88, commit tps=3601.02, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=RAM disk, max_connection_threads=32
-1.49% tatp None
Detailsmaster tps=3796.27, commit tps=3739.67, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=None, max_connection_threads=32
-5.92% tatp HDD
Detailsmaster tps=345.48, commit tps=325.02, query_mode=extended, benchmark_type=tatp, scale_factor=1.0000, terminals=16, client_time=60, weights={'GetAccessData': 35, 'UpdateLocation': 14, 'GetNewDestination': 10, 'GetSubscriberData': 35, 'DeleteCallForwarding': 2, 'InsertCallForwarding': 2, 'UpdateSubscriberData': 2}, wal_device=HDD, max_connection_threads=32

@lmwnshn lmwnshn merged commit bbf730f into cmu-db:master Mar 15, 2021
@lmwnshn lmwnshn deleted the freeeeeeeeeee branch March 15, 2021 00:54
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working (correctness). Mark issues with this. ready-for-ci Indicate that this build should be run through CI. ready-for-review This PR passes all checks and is ready to be reviewed. Mark PRs with this.
Projects
None yet
3 participants