Skip to content

Commit

Permalink
Move binding of constraints out of execution and into the binding pha…
Browse files Browse the repository at this point in the history
…se of insert/update/delete
  • Loading branch information
Mytherin committed Apr 19, 2024
1 parent 8022a03 commit 87f3fe3
Show file tree
Hide file tree
Showing 32 changed files with 164 additions and 111 deletions.
2 changes: 1 addition & 1 deletion src/catalog/catalog_entry/table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ const ColumnDefinition &TableCatalogEntry::GetColumn(LogicalIndex idx) {
return columns.GetColumn(idx);
}

const vector<unique_ptr<Constraint>> &TableCatalogEntry::GetConstraints() {
const vector<unique_ptr<Constraint>> &TableCatalogEntry::GetConstraints() const {
return constraints;
}

Expand Down
20 changes: 11 additions & 9 deletions src/execution/operator/persistent/physical_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@

namespace duckdb {

PhysicalBatchInsert::PhysicalBatchInsert(vector<LogicalType> types, TableCatalogEntry &table,
physical_index_vector_t<idx_t> column_index_map,
vector<unique_ptr<Expression>> bound_defaults, idx_t estimated_cardinality)
: PhysicalOperator(PhysicalOperatorType::BATCH_INSERT, std::move(types), estimated_cardinality),
column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()),
bound_defaults(std::move(bound_defaults)) {
PhysicalBatchInsert::PhysicalBatchInsert(vector<LogicalType> types_p, TableCatalogEntry &table,
physical_index_vector_t<idx_t> column_index_map_p,
vector<unique_ptr<Expression>> bound_defaults_p,
vector<unique_ptr<BoundConstraint>> bound_constraints_p,
idx_t estimated_cardinality)
: PhysicalOperator(PhysicalOperatorType::BATCH_INSERT, std::move(types_p), estimated_cardinality),
column_index_map(std::move(column_index_map_p)), insert_table(&table), insert_types(table.GetTypes()),
bound_defaults(std::move(bound_defaults_p)), bound_constraints(std::move(bound_constraints_p)) {
}

PhysicalBatchInsert::PhysicalBatchInsert(LogicalOperator &op, SchemaCatalogEntry &schema,
Expand Down Expand Up @@ -171,7 +173,7 @@ class BatchInsertLocalState : public LocalSinkState {
TableAppendState current_append_state;
unique_ptr<RowGroupCollection> current_collection;
optional_ptr<OptimisticDataWriter> writer;
unique_ptr<ConstraintVerificationState> constraint_state;
unique_ptr<ConstraintState> constraint_state;

void CreateNewCollection(DuckTableEntry &table, const vector<LogicalType> &insert_types) {
auto &table_info = table.GetStorage().info;
Expand Down Expand Up @@ -496,7 +498,7 @@ SinkResultType PhysicalBatchInsert::Sink(ExecutionContext &context, DataChunk &c
}

if (!lstate.constraint_state) {
lstate.constraint_state = table.GetStorage().InitializeConstraintVerification(table, context.client);
lstate.constraint_state = table.GetStorage().InitializeConstraintState(table, bound_constraints);
}
table.GetStorage().VerifyAppendConstraints(*lstate.constraint_state, context.client, lstate.insert_chunk);

Expand Down Expand Up @@ -599,7 +601,7 @@ SinkFinalizeType PhysicalBatchInsert::Finalize(Pipeline &pipeline, Event &event,
auto &table = gstate.table;
auto &storage = table.GetStorage();
LocalAppendState append_state;
storage.InitializeLocalAppend(append_state, table, context);
storage.InitializeLocalAppend(append_state, table, context, bound_constraints);
auto &transaction = DuckTransaction::Get(context, table.catalog);
for (auto &entry : gstate.collections) {
if (entry.type != RowGroupBatchType::NOT_FLUSHED) {
Expand Down
14 changes: 11 additions & 3 deletions src/execution/operator/persistent/physical_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@

namespace duckdb {

PhysicalDelete::PhysicalDelete(vector<LogicalType> types, TableCatalogEntry &tableref, DataTable &table,
vector<unique_ptr<BoundConstraint>> bound_constraints, idx_t row_id_index,
idx_t estimated_cardinality, bool return_chunk)
: PhysicalOperator(PhysicalOperatorType::DELETE_OPERATOR, std::move(types), estimated_cardinality),
tableref(tableref), table(table), bound_constraints(std::move(bound_constraints)), row_id_index(row_id_index),
return_chunk(return_chunk) {
}
//===--------------------------------------------------------------------===//
// Sink
//===--------------------------------------------------------------------===//
Expand All @@ -26,9 +33,10 @@ class DeleteGlobalState : public GlobalSinkState {

class DeleteLocalState : public LocalSinkState {
public:
DeleteLocalState(ClientContext &context, TableCatalogEntry &table) {
DeleteLocalState(ClientContext &context, TableCatalogEntry &table,
const vector<unique_ptr<BoundConstraint>> &bound_constraints) {
delete_chunk.Initialize(Allocator::Get(context), table.GetTypes());
delete_state = table.GetStorage().InitializeDelete(table, context);
delete_state = table.GetStorage().InitializeDelete(table, context, bound_constraints);
}
DataChunk delete_chunk;
unique_ptr<TableDeleteState> delete_state;
Expand Down Expand Up @@ -64,7 +72,7 @@ unique_ptr<GlobalSinkState> PhysicalDelete::GetGlobalSinkState(ClientContext &co
}

unique_ptr<LocalSinkState> PhysicalDelete::GetLocalSinkState(ExecutionContext &context) const {
return make_uniq<DeleteLocalState>(context.client, tableref);
return make_uniq<DeleteLocalState>(context.client, tableref, bound_constraints);
}

//===--------------------------------------------------------------------===//
Expand Down
53 changes: 26 additions & 27 deletions src/execution/operator/persistent/physical_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,20 @@

namespace duckdb {

PhysicalInsert::PhysicalInsert(vector<LogicalType> types_p, TableCatalogEntry &table,
physical_index_vector_t<idx_t> column_index_map,
vector<unique_ptr<Expression>> bound_defaults,
vector<unique_ptr<Expression>> set_expressions, vector<PhysicalIndex> set_columns,
vector<LogicalType> set_types, idx_t estimated_cardinality, bool return_chunk,
bool parallel, OnConflictAction action_type,
unique_ptr<Expression> on_conflict_condition_p,
unique_ptr<Expression> do_update_condition_p, unordered_set<column_t> conflict_target_p,
vector<column_t> columns_to_fetch_p)
PhysicalInsert::PhysicalInsert(
vector<LogicalType> types_p, TableCatalogEntry &table, physical_index_vector_t<idx_t> column_index_map,
vector<unique_ptr<Expression>> bound_defaults, vector<unique_ptr<BoundConstraint>> bound_constraints_p,
vector<unique_ptr<Expression>> set_expressions, vector<PhysicalIndex> set_columns, vector<LogicalType> set_types,
idx_t estimated_cardinality, bool return_chunk, bool parallel, OnConflictAction action_type,
unique_ptr<Expression> on_conflict_condition_p, unique_ptr<Expression> do_update_condition_p,
unordered_set<column_t> conflict_target_p, vector<column_t> columns_to_fetch_p)
: PhysicalOperator(PhysicalOperatorType::INSERT, std::move(types_p), estimated_cardinality),
column_index_map(std::move(column_index_map)), insert_table(&table), insert_types(table.GetTypes()),
bound_defaults(std::move(bound_defaults)), return_chunk(return_chunk), parallel(parallel),
action_type(action_type), set_expressions(std::move(set_expressions)), set_columns(std::move(set_columns)),
set_types(std::move(set_types)), on_conflict_condition(std::move(on_conflict_condition_p)),
do_update_condition(std::move(do_update_condition_p)), conflict_target(std::move(conflict_target_p)),
columns_to_fetch(std::move(columns_to_fetch_p)) {
bound_defaults(std::move(bound_defaults)), bound_constraints(std::move(bound_constraints_p)),
return_chunk(return_chunk), parallel(parallel), action_type(action_type),
set_expressions(std::move(set_expressions)), set_columns(std::move(set_columns)), set_types(std::move(set_types)),
on_conflict_condition(std::move(on_conflict_condition_p)), do_update_condition(std::move(do_update_condition_p)),
conflict_target(std::move(conflict_target_p)), columns_to_fetch(std::move(columns_to_fetch_p)) {

if (action_type == OnConflictAction::THROW) {
return;
Expand Down Expand Up @@ -91,8 +89,9 @@ class InsertGlobalState : public GlobalSinkState {
class InsertLocalState : public LocalSinkState {
public:
InsertLocalState(ClientContext &context, const vector<LogicalType> &types,
const vector<unique_ptr<Expression>> &bound_defaults)
: default_executor(context, bound_defaults) {
const vector<unique_ptr<Expression>> &bound_defaults,
const vector<unique_ptr<BoundConstraint>> &bound_constraints)
: default_executor(context, bound_defaults), bound_constraints(bound_constraints) {
insert_chunk.Initialize(Allocator::Get(context), types);
}

Expand All @@ -106,12 +105,12 @@ class InsertLocalState : public LocalSinkState {
// Rows in the transaction-local storage that have been updated by a DO UPDATE conflict
unordered_set<row_t> updated_local_rows;
idx_t update_count = 0;
unique_ptr<ConstraintVerificationState> constraint_state;
unique_ptr<ConstraintState> constraint_state;
const vector<unique_ptr<BoundConstraint>> &bound_constraints;

ConstraintVerificationState &GetConstraintState(DataTable &table, TableCatalogEntry &tableref,
ClientContext &context) {
ConstraintState &GetConstraintState(DataTable &table, TableCatalogEntry &tableref) {
if (!constraint_state) {
constraint_state = table.InitializeConstraintVerification(tableref, context);
constraint_state = table.InitializeConstraintState(tableref, bound_constraints);
}
return *constraint_state;
}
Expand All @@ -135,7 +134,7 @@ unique_ptr<GlobalSinkState> PhysicalInsert::GetGlobalSinkState(ClientContext &co
}

unique_ptr<LocalSinkState> PhysicalInsert::GetLocalSinkState(ExecutionContext &context) const {
return make_uniq<InsertLocalState>(context.client, insert_types, bound_defaults);
return make_uniq<InsertLocalState>(context.client, insert_types, bound_defaults, bound_constraints);
}

void PhysicalInsert::ResolveDefaults(const TableCatalogEntry &table, DataChunk &chunk,
Expand Down Expand Up @@ -288,7 +287,7 @@ static idx_t PerformOnConflictAction(ExecutionContext &context, DataChunk &chunk
auto &data_table = table.GetStorage();
// Perform the update, using the results of the SET expressions
if (GLOBAL) {
auto update_state = data_table.InitializeUpdate(table, context.client);
auto update_state = data_table.InitializeUpdate(table, context.client, op.bound_constraints);
data_table.Update(*update_state, context.client, row_ids, set_columns, update_chunk);
} else {
auto &local_storage = LocalStorage::Get(context.client, data_table.db);
Expand Down Expand Up @@ -331,7 +330,7 @@ static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &c
ConflictInfo conflict_info(conflict_target);
ConflictManager conflict_manager(VerifyExistenceType::APPEND, lstate.insert_chunk.size(), &conflict_info);
if (GLOBAL) {
auto &constraint_state = lstate.GetConstraintState(data_table, table, context.client);
auto &constraint_state = lstate.GetConstraintState(data_table, table);
data_table.VerifyAppendConstraints(constraint_state, context.client, lstate.insert_chunk, &conflict_manager);
} else {
DataTable::VerifyUniqueIndexes(local_storage.GetIndexes(data_table), context.client, lstate.insert_chunk,
Expand Down Expand Up @@ -392,7 +391,7 @@ static idx_t HandleInsertConflicts(TableCatalogEntry &table, ExecutionContext &c
combined_chunk.Slice(sel.Selection(), sel.Count());
row_ids.Slice(sel.Selection(), sel.Count());
if (GLOBAL) {
auto &constraint_state = lstate.GetConstraintState(data_table, table, context.client);
auto &constraint_state = lstate.GetConstraintState(data_table, table);
data_table.VerifyAppendConstraints(constraint_state, context.client, combined_chunk, nullptr);
} else {
DataTable::VerifyUniqueIndexes(local_storage.GetIndexes(data_table), context.client,
Expand All @@ -419,7 +418,7 @@ idx_t PhysicalInsert::OnConflictHandling(TableCatalogEntry &table, ExecutionCont
InsertLocalState &lstate) const {
auto &data_table = table.GetStorage();
if (action_type == OnConflictAction::THROW) {
auto &constraint_state = lstate.GetConstraintState(data_table, table, context.client);
auto &constraint_state = lstate.GetConstraintState(data_table, table);
data_table.VerifyAppendConstraints(constraint_state, context.client, lstate.insert_chunk, nullptr);
return 0;
}
Expand All @@ -443,7 +442,7 @@ SinkResultType PhysicalInsert::Sink(ExecutionContext &context, DataChunk &chunk,

if (!parallel) {
if (!gstate.initialized) {
storage.InitializeLocalAppend(gstate.append_state, table, context.client);
storage.InitializeLocalAppend(gstate.append_state, table, context.client, bound_constraints);
gstate.initialized = true;
}

Expand Down Expand Up @@ -501,7 +500,7 @@ SinkCombineResultType PhysicalInsert::Combine(ExecutionContext &context, Operato
// we have few rows - append to the local storage directly
auto &table = gstate.table;
auto &storage = table.GetStorage();
storage.InitializeLocalAppend(gstate.append_state, table, context.client);
storage.InitializeLocalAppend(gstate.append_state, table, context.client, bound_constraints);
auto &transaction = DuckTransaction::Get(context.client, table.catalog);
lstate.local_collection->Scan(transaction, [&](DataChunk &insert_chunk) {
storage.LocalAppend(gstate.append_state, table, context.client, insert_chunk);
Expand Down
21 changes: 13 additions & 8 deletions src/execution/operator/persistent/physical_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ namespace duckdb {

PhysicalUpdate::PhysicalUpdate(vector<LogicalType> types, TableCatalogEntry &tableref, DataTable &table,
vector<PhysicalIndex> columns, vector<unique_ptr<Expression>> expressions,
vector<unique_ptr<Expression>> bound_defaults, idx_t estimated_cardinality,
vector<unique_ptr<Expression>> bound_defaults,
vector<unique_ptr<BoundConstraint>> bound_constraints, idx_t estimated_cardinality,
bool return_chunk)
: PhysicalOperator(PhysicalOperatorType::UPDATE, std::move(types), estimated_cardinality), tableref(tableref),
table(table), columns(std::move(columns)), expressions(std::move(expressions)),
bound_defaults(std::move(bound_defaults)), return_chunk(return_chunk) {
bound_defaults(std::move(bound_defaults)), bound_constraints(std::move(bound_constraints)),
return_chunk(return_chunk) {
}

//===--------------------------------------------------------------------===//
Expand All @@ -40,8 +42,9 @@ class UpdateGlobalState : public GlobalSinkState {
class UpdateLocalState : public LocalSinkState {
public:
UpdateLocalState(ClientContext &context, const vector<unique_ptr<Expression>> &expressions,
const vector<LogicalType> &table_types, const vector<unique_ptr<Expression>> &bound_defaults)
: default_executor(context, bound_defaults) {
const vector<LogicalType> &table_types, const vector<unique_ptr<Expression>> &bound_defaults,
const vector<unique_ptr<BoundConstraint>> &bound_constraints)
: default_executor(context, bound_defaults), bound_constraints(bound_constraints) {
// initialize the update chunk
auto &allocator = Allocator::Get(context);
vector<LogicalType> update_types;
Expand All @@ -59,17 +62,18 @@ class UpdateLocalState : public LocalSinkState {
ExpressionExecutor default_executor;
unique_ptr<TableDeleteState> delete_state;
unique_ptr<TableUpdateState> update_state;
const vector<unique_ptr<BoundConstraint>> &bound_constraints;

TableDeleteState &GetDeleteState(DataTable &table, TableCatalogEntry &tableref, ClientContext &context) {
if (!delete_state) {
delete_state = table.InitializeDelete(tableref, context);
delete_state = table.InitializeDelete(tableref, context, bound_constraints);
}
return *delete_state;
}

TableUpdateState &GetUpdateState(DataTable &table, TableCatalogEntry &tableref, ClientContext &context) {
if (!update_state) {
update_state = table.InitializeUpdate(tableref, context);
update_state = table.InitializeUpdate(tableref, context, bound_constraints);
}
return *update_state;
}
Expand Down Expand Up @@ -131,7 +135,7 @@ SinkResultType PhysicalUpdate::Sink(ExecutionContext &context, DataChunk &chunk,
for (idx_t i = 0; i < columns.size(); i++) {
mock_chunk.data[columns[i].index].Reference(update_chunk.data[i]);
}
table.LocalAppend(tableref, context.client, mock_chunk);
table.LocalAppend(tableref, context.client, mock_chunk, bound_constraints);
} else {
if (return_chunk) {
mock_chunk.SetCardinality(update_chunk);
Expand All @@ -157,7 +161,8 @@ unique_ptr<GlobalSinkState> PhysicalUpdate::GetGlobalSinkState(ClientContext &co
}

unique_ptr<LocalSinkState> PhysicalUpdate::GetLocalSinkState(ExecutionContext &context) const {
return make_uniq<UpdateLocalState>(context.client, expressions, table.GetTypes(), bound_defaults);
return make_uniq<UpdateLocalState>(context.client, expressions, table.GetTypes(), bound_defaults,
bound_constraints);
}

SinkCombineResultType PhysicalUpdate::Combine(ExecutionContext &context, OperatorSinkCombineInput &input) const {
Expand Down
4 changes: 2 additions & 2 deletions src/execution/physical_plan/plan_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ unique_ptr<PhysicalOperator> DuckCatalog::PlanDelete(ClientContext &context, Log
// get the index of the row_id column
auto &bound_ref = op.expressions[0]->Cast<BoundReferenceExpression>();

auto del = make_uniq<PhysicalDelete>(op.types, op.table, op.table.GetStorage(), bound_ref.index,
op.estimated_cardinality, op.return_chunk);
auto del = make_uniq<PhysicalDelete>(op.types, op.table, op.table.GetStorage(), std::move(op.bound_constraints),
bound_ref.index, op.estimated_cardinality, op.return_chunk);
del->children.push_back(std::move(plan));
return std::move(del);
}
Expand Down
11 changes: 6 additions & 5 deletions src/execution/physical_plan/plan_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ unique_ptr<PhysicalOperator> DuckCatalog::PlanInsert(ClientContext &context, Log
unique_ptr<PhysicalOperator> insert;
if (use_batch_index && !parallel_streaming_insert) {
insert = make_uniq<PhysicalBatchInsert>(op.types, op.table, op.column_index_map, std::move(op.bound_defaults),
op.estimated_cardinality);
std::move(op.bound_constraints), op.estimated_cardinality);
} else {
insert = make_uniq<PhysicalInsert>(
op.types, op.table, op.column_index_map, std::move(op.bound_defaults), std::move(op.expressions),
std::move(op.set_columns), std::move(op.set_types), op.estimated_cardinality, op.return_chunk,
parallel_streaming_insert && num_threads > 1, op.action_type, std::move(op.on_conflict_condition),
std::move(op.do_update_condition), std::move(op.on_conflict_filter), std::move(op.columns_to_fetch));
op.types, op.table, op.column_index_map, std::move(op.bound_defaults), std::move(op.bound_constraints),
std::move(op.expressions), std::move(op.set_columns), std::move(op.set_types), op.estimated_cardinality,
op.return_chunk, parallel_streaming_insert && num_threads > 1, op.action_type,
std::move(op.on_conflict_condition), std::move(op.do_update_condition), std::move(op.on_conflict_filter),
std::move(op.columns_to_fetch));
}
D_ASSERT(plan);
insert->children.push_back(std::move(plan));
Expand Down
6 changes: 3 additions & 3 deletions src/execution/physical_plan/plan_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace duckdb {

unique_ptr<PhysicalOperator> DuckCatalog::PlanUpdate(ClientContext &context, LogicalUpdate &op,
unique_ptr<PhysicalOperator> plan) {
auto update =
make_uniq<PhysicalUpdate>(op.types, op.table, op.table.GetStorage(), op.columns, std::move(op.expressions),
std::move(op.bound_defaults), op.estimated_cardinality, op.return_chunk);
auto update = make_uniq<PhysicalUpdate>(op.types, op.table, op.table.GetStorage(), op.columns,
std::move(op.expressions), std::move(op.bound_defaults),
std::move(op.bound_constraints), op.estimated_cardinality, op.return_chunk);

update->update_is_del_and_insert = op.update_is_del_and_insert;
update->children.push_back(std::move(plan));
Expand Down
Loading

0 comments on commit 87f3fe3

Please sign in to comment.