Skip to content

Remove second read stage during uniq index update #1258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 80 additions & 59 deletions ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@ using namespace NYql::NNodes;

namespace {

struct TRowsAndKeysResult {
TDqPhyPrecompute RowsPrecompute;
TDqPhyPrecompute KeysPrecompute;
};

TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseResult,
TDqStage ExtractRowsAndKeysStage(const TCondenseInputResult& condenseResult,
const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx)
{
TCoArgument rowsListArg(ctx.NewArgument(pos, "rows_list"));
Expand All @@ -40,7 +35,7 @@ TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseRes
.Build()
.Done();

auto computeStage = Build<TDqStage>(ctx, pos)
return Build<TDqStage>(ctx, pos)
.Inputs()
.Add(condenseResult.StageInputs)
.Build()
Expand All @@ -67,29 +62,6 @@ TRowsAndKeysResult PrecomputeRowsAndKeys(const TCondenseInputResult& condenseRes
.Build()
.Settings().Build()
.Done();

auto rowsPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(computeStage)
.Index().Build("0")
.Build()
.Build()
.Done();

auto keysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(computeStage)
.Index().Build("1")
.Build()
.Build()
.Done();

return TRowsAndKeysResult {
.RowsPrecompute = rowsPrecompute,
.KeysPrecompute = keysPrecompute
};
}

// Return set of data columns need to be save during index update
Expand Down Expand Up @@ -344,10 +316,10 @@ TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecomput
.Done();
}

TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns,
const THashSet<TString>& checkDefaults,
const TKikimrTableDescription& table, const TSecondaryIndexes& indexes,
TPositionHandle pos, TExprContext& ctx)
TMaybe<std::pair<TCondenseInputResult, TMaybeNode<TDqPhyPrecompute>>>
RewriteInputForConstraint(const TExprBase& inputRows, const THashSet<TStringBuf> inputColumns,
const THashSet<TString>& checkDefaults, const TKikimrTableDescription& table,
const TSecondaryIndexes& indexes, TPositionHandle pos, TExprContext& ctx)
{
auto condenseResult = CondenseInput(inputRows, ctx);
if (!condenseResult) {
Expand Down Expand Up @@ -384,6 +356,8 @@ TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRow
missedKeyInput.clear();
}

TMaybeNode<TDqPhyPrecompute> precomputeTableLookupDict;

if (!missedKeyInput.empty() || !checkDefaults.empty()) {
TVector<TExprBase> columns;

Expand Down Expand Up @@ -413,11 +387,6 @@ TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRow
}

for (const auto& x : missedKeyInput) {
auto atom = Build<TCoAtom>(ctx, pos)
.Value(x)
.Done();
columns.emplace_back(atom);

auto columnType = table.GetColumnType(TString(x));
YQL_ENSURE(columnType);

Expand All @@ -431,19 +400,29 @@ TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRow
.Done());
}

for (const auto& x : mainPk) {
auto atom = Build<TCoAtom>(ctx, pos)
.Value(x)
.Done();
columns.emplace_back(atom);
const THashSet<TString> indexKeyColumns = CreateKeyColumnSetToRead(indexes);
const THashSet<TString> indexDataColumns = CreateDataColumnSetToRead(indexes);

for (const auto& x : indexKeyColumns) {
columns.push_back(Build<TCoAtom>(ctx, pos).Value(x).Done());
}

for(const auto& x: checkDefaults) {
for (const auto& x : indexDataColumns) {
// Handle the case of multiple indexes
// one of them has 'foo' as data column but for another one foo is just indexed column
if (indexKeyColumns.contains(x))
continue;
columns.push_back(Build<TCoAtom>(ctx, pos).Value(x).Done());
}

for (const auto& x : mainPk) {
if (indexKeyColumns.contains(x))
continue;
columns.push_back(Build<TCoAtom>(ctx, pos).Value(x).Done());
}

auto inPrecompute = PrecomputeCondenseInputResult(*condenseResult, pos, ctx);
auto precomputeTableLookupDict = PrecomputeTableLookupDict(inPrecompute, table, columns, pos, ctx, true);
precomputeTableLookupDict = PrecomputeTableLookupDict(inPrecompute, table, columns, pos, ctx, true);

TVector<TExprBase> keyLookupTuples;
for (const auto& key : mainPk) {
Expand Down Expand Up @@ -512,7 +491,15 @@ TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRow

auto helper = CreateUpsertUniqBuildHelper(table, inputColumns, usedIndexes, pos, ctx);
if (helper->GetChecksNum() == 0) {
return condenseResult;
// Return result of read stage only in case of uniq index
// We do not want to change plan for non uniq index for a while
if (hasUniqIndex) {
return std::make_pair<TCondenseInputResult, TMaybeNode<TDqPhyPrecompute>>
(std::move(*condenseResult), std::move(precomputeTableLookupDict));
} else {
return std::make_pair<TCondenseInputResult, TMaybeNode<TDqPhyPrecompute>>
(std::move(*condenseResult), {});
}
}

auto computeKeysStage = helper->CreateComputeKeysStage(condenseResult.GetRef(), pos, ctx);
Expand Down Expand Up @@ -590,11 +577,19 @@ TMaybe<TCondenseInputResult> RewriteInputForConstraint(const TExprBase& inputRow
stageInputs.insert(stageInputs.end(), uniquePrecomputes.begin(), uniquePrecomputes.end());
stageInputs.emplace_back(noExistingKeysPrecompute);

return TCondenseInputResult {
auto res = TCondenseInputResult {
.Stream = body,
.StageInputs = std::move(stageInputs),
.StageArgs = std::move(stageArgs)
};

if (hasUniqIndex) {
return std::make_pair<TCondenseInputResult, TMaybeNode<TDqPhyPrecompute>>(std::move(res),
std::move(precomputeTableLookupDict));
} else {
return std::make_pair<TCondenseInputResult, TMaybeNode<TDqPhyPrecompute>>(std::move(res),
{});
}
}

} // namespace
Expand Down Expand Up @@ -633,24 +628,50 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
return {};
}

auto condenseInputResult = DeduplicateInput(checkedInput.GetRef(), table, ctx);

auto inputRowsAndKeys = PrecomputeRowsAndKeys(condenseInputResult, table, pos, ctx);
auto condenseInputResult = DeduplicateInput(checkedInput->first, table, ctx);

// For UPSERT check that indexes is not empty
YQL_ENSURE(mode == TKqpPhyUpsertIndexMode::UpdateOn || indexes);

THashSet<TString> indexDataColumns = CreateDataColumnSetToRead(indexes);
THashSet<TString> indexKeyColumns = CreateKeyColumnSetToRead(indexes);

auto lookupDict = PrecomputeTableLookupDict(inputRowsAndKeys.KeysPrecompute, table, indexDataColumns, indexKeyColumns, pos, ctx);
if (!lookupDict) {
return {};
TMaybeNode<TDqPhyPrecompute> lookupDict;
TMaybeNode<TDqPhyPrecompute> rowsPrecompute;

if (checkedInput->second) {
rowsPrecompute = PrecomputeCondenseInputResult(condenseInputResult, pos, ctx);
// In case of uniq index use main table read stage from checking uniq constraint
lookupDict = checkedInput->second;
} else {
auto inputRowsAndKeysStage = ExtractRowsAndKeysStage(condenseInputResult, table, pos, ctx);
rowsPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(inputRowsAndKeysStage)
.Index().Build("0")
.Build()
.Build()
.Done();

auto keysPrecompute = Build<TDqPhyPrecompute>(ctx, pos)
.Connection<TDqCnValue>()
.Output()
.Stage(inputRowsAndKeysStage)
.Index().Build("1")
.Build()
.Build()
.Done();

lookupDict = PrecomputeTableLookupDict(keysPrecompute, table, indexDataColumns, indexKeyColumns, pos, ctx);
if (!lookupDict) {
return {};
}
}

TExprBase tableUpsertRows = (mode == TKqpPhyUpsertIndexMode::UpdateOn)
? MakeNonexistingRowsFilter(inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(), pk, pos, ctx)
: inputRowsAndKeys.RowsPrecompute;
? MakeNonexistingRowsFilter(rowsPrecompute.Cast(), lookupDict.Cast(), pk, pos, ctx)
: rowsPrecompute.Cast();

auto tableUpsert = Build<TKqlUpsertRows>(ctx, pos)
.Table(BuildTableMeta(table, pos, ctx))
Expand Down Expand Up @@ -780,7 +801,7 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,
auto lookupDictArg = TCoArgument(ctx.NewArgument(pos, "recalc_dict_arg_" + indexDesc->Name));
auto reComputeDictStage = Build<TDqStage>(ctx, pos)
.Inputs()
.Add(inputRowsAndKeys.RowsPrecompute) // input rows
.Add(rowsPrecompute.Cast()) // input rows
.Add(lookupDict.Cast()) // dict contains loockuped from table rows
.Build()
.Program()
Expand Down Expand Up @@ -875,9 +896,9 @@ TMaybeNode<TExprList> KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode,

if (needIndexTableUpdate) {
auto upsertIndexRows = optUpsert
? MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDictRecomputed,
? MakeUpsertIndexRows(mode, rowsPrecompute.Cast(), lookupDictRecomputed,
inputColumnsSet, indexTableColumns, table, pos, ctx, true)
: MakeUpsertIndexRows(mode, inputRowsAndKeys.RowsPrecompute, lookupDict.Cast(),
: MakeUpsertIndexRows(mode, rowsPrecompute.Cast(), lookupDict.Cast(),
inputColumnsSet, indexTableColumns, table, pos, ctx, false);

auto indexUpsert = Build<TKqlUpsertRows>(ctx, pos)
Expand Down
16 changes: 9 additions & 7 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}

auto uniqExtraStages = uniq ? 6 : 0;
auto uniqExtraStages = uniq ? 5 : 0;
{
// Upsert - add new row
const TString query2 = Q1_(R"(
Expand All @@ -941,12 +941,14 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

Cerr << stats.DebugString() << Endl;

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), uniqExtraStages + 5);

// One read from main table
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).name(), "/Root/TestTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).reads().rows(), 0);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).name(), "/Root/TestTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).reads().rows(), 0);

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 2).table_access().size(), 0);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 3).table_access().size(), 0);
Expand Down Expand Up @@ -988,9 +990,9 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), uniqExtraStages + 5);

// One read from main table
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).name(), "/Root/TestTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 1).table_access(0).reads().rows(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).name(), "/Root/TestTable");
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniq + 1).table_access(0).reads().rows(), 1);

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 2).table_access().size(), 0);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(uniqExtraStages + 3).table_access().size(), 0);
Expand Down