-
Notifications
You must be signed in to change notification settings - Fork 501
Fix storage interface, table vector iterator not being freed on abort. #1482
Conversation
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
Codecov Report
@@ Coverage Diff @@
## master #1482 +/- ##
==========================================
- Coverage 81.80% 81.77% -0.04%
==========================================
Files 706 706
Lines 50433 50437 +4
==========================================
- Hits 41257 41243 -14
- Misses 9176 9194 +18
Continue to review full report at Codecov.
|
Dumping this here for myself to look at tomorrow, others are welcome too..
|
…of inside a pipeline. Unfortunately this still leaks in TrafficCopTest's DisconnectAbort.
…instead of inside a pipeline." This reverts commit 66fb4d2.
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
1 similar comment
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except that analyze_translator needs the same changes. I can make the updates to analyze_translator if you want.
ah sorry, we had a brief discussion at standup: my current goal is to try and make the non-hacky version work. |
…e level instead of inside a pipeline."" This reverts commit bc95c9d.
Ah, that's what I get for missing standup. |
Nah, I should've marked it in progress, sorry. Current update on the pipeline state version: It appears the table vector iterator is leaking actually, if the report is to be believed... |
…lled in pipeline state teardown. Nesting adds new complications. The problem is that for a SeqScan, it might actually be nested inside another node like another SeqScan. So PipelineState is actually not quite the right level for any variable that can appear nested, because those will need to be setup/teardown'd per nested iteration instead of once per pipeline. But we shove it into the pipeline state anyway and basically generate an if for whether it needs to be freed. This raises the question of whether the other translators have similar issues. If they do and our tests don't catch it, then we probably need better tests...
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
|
||
void AnalyzeTranslator::InitializePipelineState(const Pipeline &pipeline, FunctionBuilder *function) const { | ||
// @storageInterfaceInit(&pipelineState.storageInterface, execCtx, table_oid, col_oids, true) | ||
DeclareAndInitPgStatisticUpdater(function); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we need to remove the call to this function and FreePgStatisticUpdater
on lines 123 and 153?
@@ -282,7 +282,10 @@ void IndexCreateTranslator::IndexInsert(WorkContext *ctx, FunctionBuilder *funct | |||
codegen_->ConstBool(index_schema.Unique())}); | |||
auto *cond = codegen_->UnaryOp(parsing::Token::Type::BANG, index_insert_call); | |||
If success(function, cond); | |||
{ function->Append(codegen_->AbortTxn(GetExecutionContext())); } | |||
{ | |||
TearDownStorageInterface(function, local_storage_interface_.GetPtr(codegen_)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you may have forgotten to update this class to move this logic to the pipeline state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weirdly enough, this already had it in the pipeline state. I removed my change, and it looks like things are fine:
struct QueryState {
execCtx : *ExecutionContext
global_col_oids: [1]uint32
}
struct P1_State {
local_storage_interface: StorageInterface
local_index_pr : *ProjectedRow
local_tuple_slot : TupleSlot
num_inserts : uint32
execFeatures : ExecOUFeatureVector
}
fun Query0_Init(queryState: *QueryState) -> nil {
queryState.global_col_oids[0] = 1
return
}
fun Query0_Pipeline1_PostHook(queryState: *QueryState, pipelineState: *P1_State, dummyArg: *nil) -> nil {
@execOUFeatureVectorInit(queryState.execCtx, &pipelineState.execFeatures, 1, true)
@registerThreadWithMetricsManager(queryState.execCtx)
@execCtxStartPipelineTracker(queryState.execCtx, 1)
var num_tuples = @indexGetSize(&pipelineState.local_storage_interface)
@execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 0, 0, num_tuples)
@execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 1, 0, num_tuples)
var heap_size = @storageInterfaceGetIndexHeapSize(&pipelineState.local_storage_interface)
@execCtxSetMemoryUseOverride(queryState.execCtx, heap_size)
@execCtxEndPipelineTracker(queryState.execCtx, 0, 1, &pipelineState.execFeatures)
@aggregateMetricsThread(queryState.execCtx)
@execOUFeatureVectorReset(&pipelineState.execFeatures)
return
}
fun Query0_Pipeline1_InitPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
@storageInterfaceInit(&pipelineState.local_storage_interface, queryState.execCtx, 1061, queryState.global_col_oids, false)
pipelineState.local_index_pr = @getIndexPR(&pipelineState.local_storage_interface, 1062)
return
}
fun Query0_Pipeline1_TearDownPipelineState(queryState: *QueryState, pipelineState: *P1_State) -> nil {
@storageInterfaceFree(&pipelineState.local_storage_interface)
@execOUFeatureVectorReset(&pipelineState.execFeatures)
return
}
fun Query0_Pipeline1_ParallelWork(queryState: *QueryState, pipelineState: *P1_State, tvi: *TableVectorIterator) -> nil {
pipelineState.num_inserts = 0
@execOUFeatureVectorInit(queryState.execCtx, &pipelineState.execFeatures, 1, false)
@registerThreadWithMetricsManager(queryState.execCtx)
@execCtxStartPipelineTracker(queryState.execCtx, 1)
for (@tableIterAdvance(tvi)) {
var vpi = @tableIterGetVPI(tvi)
for (; @vpiHasNext(vpi); @vpiAdvance(vpi)) {
pipelineState.local_tuple_slot = @vpiGetSlot(vpi)
@prSetIntNull(pipelineState.local_index_pr, 0, @vpiGetIntNull(vpi, 0))
if (!@indexInsertWithSlot(&pipelineState.local_storage_interface, &pipelineState.local_tuple_slot, false)) {
@abortTxn(queryState.execCtx)
}
pipelineState.num_inserts = pipelineState.num_inserts + IntegralCast(1)
}
}
@execCtxSetMemoryUseOverride(queryState.execCtx, 0)
@execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 0, 0, pipelineState.num_inserts)
@execOUFeatureVectorRecordFeature(&pipelineState.execFeatures, 1, 10000, 1, 0, pipelineState.num_inserts)
@execCtxEndPipelineTracker(queryState.execCtx, 0, 1, &pipelineState.execFeatures)
@aggregateMetricsThread(queryState.execCtx)
@execOUFeatureVectorReset(&pipelineState.execFeatures)
return
}
fun Query0_Pipeline1_Init(queryState: *QueryState) -> nil {
var threadStateContainer = @execCtxGetTLS(queryState.execCtx)
@tlsReset(threadStateContainer, @sizeOf(P1_State), Query0_Pipeline1_InitPipelineState, Query0_Pipeline1_TearDownPipelineState, queryState)
return
}
fun Query0_Pipeline1_Run(queryState: *QueryState) -> nil {
var pipelineState = @ptrCast(*P1_State, @tlsGetCurrentThreadState(@execCtxGetTLS(queryState.execCtx)))
@execCtxInitHooks(queryState.execCtx, 1)
@execCtxRegisterHook(queryState.execCtx, 0, Query0_Pipeline1_PostHook)
@iterateTableParallel(1061, queryState.global_col_oids, queryState, queryState.execCtx, Query0_Pipeline1_ParallelWork)
@execCtxClearHooks(queryState.execCtx)
return
}
fun Query0_Pipeline1_TearDown(queryState: *QueryState) -> nil {
@tlsClear(@execCtxGetTLS(queryState.execCtx))
@ensureTrackersStopped(queryState.execCtx)
return
}
fun Query0_TearDown(queryState: *QueryState) -> nil {
return
}
// @storageInterfaceInit(pg_statistic_updater, execCtx, table_oid, pg_statistic_col_oids, true) | ||
auto *updater_setup = codegen->StorageInterfaceInit( | ||
pg_statistic_updater_, GetExecutionContext(), | ||
pg_statistic_updater_.GetPtr(codegen), GetExecutionContext(), | ||
catalog::postgres::PgStatistic::STATISTIC_TABLE_OID.UnderlyingValue(), pg_statistic_col_oids_, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll have to move pg_statistic_col_oids_
to the pipeline state too now, otherwise it will not be found when trying to init the updater.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oddly, it passed CI before -- do we have any analyze tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, I think you meant with the changes. Changes have been made.
@@ -35,7 +35,6 @@ AnalyzeTranslator::AnalyzeTranslator(const planner::AnalyzePlanNode &plan, Compi | |||
num_rows_(GetCodeGen()->MakeFreshIdentifier("num_rows")), | |||
pg_statistic_index_iterator_(GetCodeGen()->MakeFreshIdentifier("pg_statistic_index_iterator")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also allocated and freed within the pipeline work. I think we probably have to move it to the pipeline state to avoid the same problem.
@@ -36,6 +36,11 @@ SeqScanTranslator::SeqScanTranslator(const planner::SeqScanPlanNode &plan, Compi | |||
local_filter_manager_ = pipeline->DeclarePipelineStateEntry("filterManager", fm_type); | |||
} | |||
|
|||
tvi_base_ = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to make these changes to the seq_scan_translator? As far as I can tell we don't call abort from anywhere within the translator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A seq scan translator can "nest" (ctx->Push()
, (line 227)) other translators inside it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a couple of comments/questions
…ngs in pipeline state.
Ready for a second pass @jkosh44, fyi analyze looks like this rn:
|
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
Minor Decrease in PerformanceBe warned: this PR may have decreased the throughput of the system slightly.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Major Decrease in PerformanceSTOP: this PR has a major negative performance impact
|
Heading
Fix storage interface and table vector iterator not being freed on abort.
Description
Fix #1481. Fix #1489.
This changes function-local variables like the
StorageInterface
andTableVectorIterator
to be stashed in the pipeline state instead.The motivation is that there are unfortunately multiple ways to exit out of the TPL function; in addition to reaching the end of the function, you might reach an
abortTxn
. The pipeline setup and teardown functions are guaranteed to be called, so it is better to have "must be cleaned up" stuff there.However, there's a slight complication with nested pipelines, e.g.,
SELECT * FROM table1, table2;
.Inside the nested loop, it needs to perform the setup and teardown per-iteration.
So it isn't always enough to teardown at
PipelineState
time, you still need to teardown per-iteration.Currently, this is resolved by stashing a boolean flag in the
PipelineState
indicating whether it still needs to teardown at the pipeline teardown time.Remaining tasks
N/A.
Future work
The nested pipeline complication might come up again with other translators, I only noticed it for seq scan.