Skip to content

Support insert_all and upsert_all using MERGE #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Support supports_insert_on_duplicate_update?
  • Loading branch information
andyundso committed Mar 16, 2025
commit c0be62cbb71cfd8c1130703f4574807e5e4ea404
117 changes: 79 additions & 38 deletions lib/active_record/connection_adapters/sqlserver/database_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,12 @@ def default_insert_value(column)
private :default_insert_value

def build_insert_sql(insert) # :nodoc:
if insert.skip_duplicates?
if insert.skip_duplicates? || insert.update_duplicates?
insert_all = insert.send(:insert_all)
conflict_columns = get_conflicted_columns(insert_all:, insert:)

# if we do not have any columns that might have conflicting values, just execute a regular insert
return build_sql_for_regular_insert(insert) if conflict_columns.empty?

make_inserts_unique(insert_all:, conflict_columns:)
return build_sql_for_regular_insert(insert) if conflict_columns.flatten.empty?

primary_keys_for_insert = insert_all.primary_keys.to_set

Expand All @@ -172,39 +170,92 @@ def build_insert_sql(insert) # :nodoc:
enable_identity_insert = primary_keys_for_insert.length == 1 &&
(insert_all.primary_keys.to_set & insert.keys).present?

sql = +""
sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} ON;" if enable_identity_insert
sql << "MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target"
sql << " USING (SELECT DISTINCT * FROM (#{insert.values_list}) AS t1 (#{insert.send(:columns_list)})) AS source"
sql << " ON (#{conflict_columns.map do |columns|
columns.map do |column|
"target.#{quote_column_name(column)} = source.#{quote_column_name(column)}"
end.join(" AND ")
end.join(") OR (")})"
sql << " WHEN NOT MATCHED BY TARGET THEN"
sql << " INSERT (#{insert.send(:columns_list)}) #{insert.values_list}"
if (returning = insert_all.returning)
sql << " OUTPUT " << returning.map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ")
# why is the "PARTITION BY" clause needed?
# in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens
# when duplicates are found (SKIP OR UPDATE)
# by default rows are considered to be unique by every unique index on the table
# but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves
# otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables
# this works easiest by using PARTITION and make sure that any record
# we are trying to insert is "the first one seen across all the potential conflicted columns"
sql = <<~SQL
#{"SET IDENTITY_INSERT #{insert.model.quoted_table_name} ON;" if enable_identity_insert}
MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target
USING (
SELECT *
FROM (
SELECT #{insert.send(:columns_list)}, #{conflict_columns.map.with_index do |group_of_conflicted_columns, index|
<<~PARTITION_BY
ROW_NUMBER() OVER (
PARTITION BY #{group_of_conflicted_columns.map { |column| quote_column_name(column) }.join(",")}
ORDER BY #{group_of_conflicted_columns.map { |column| "#{quote_column_name(column)} DESC" }.join(",")}
) AS rn_#{index}
PARTITION_BY
end.join(", ")
}
FROM (#{insert.values_list})
AS t1 (#{insert.send(:columns_list)})
) AS ranked_source
WHERE #{conflict_columns.map.with_index do |group_of_conflicted_columns, index|
"rn_#{index} = 1"
end.join(" AND ")
}
) AS source
ON (#{conflict_columns.map do |columns|
columns.map do |column|
"target.#{quote_column_name(column)} = source.#{quote_column_name(column)}"
end.join(" AND ")
end.join(") OR (")})
SQL

if insert.update_duplicates?
sql << " WHEN MATCHED THEN UPDATE SET "

if insert.raw_update_sql?
sql << insert.raw_update_sql
else
if insert.record_timestamps?
sql << insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name|
if insert.send(:touch_timestamp_attribute?, column_name)
"target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END,"
end
end.join
end

sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",")
end
end
sql << " WHEN NOT MATCHED BY TARGET THEN"
sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})"
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))

sql << ";"
sql << "SET IDENTITY_INSERT #{insert.model.quoted_table_name} OFF;" if enable_identity_insert

return sql
end

build_sql_for_regular_insert(insert)
end

# MERGE executes a JOIN between our data we would like to insert and the existing data in the table
# but since it is a JOIN, it requires the data in the source also to be unique (aka our values to insert)
# here we modify @inserts in place of the "insert_all" object to be unique
# keeping the last occurence
# note that for other DBMS, this job is usually handed off to them by specifying something like
# "ON DUPLICATE SKIP" or "ON DUPLICATE UPDATE"
def make_inserts_unique(insert_all:, conflict_columns:)
unique_inserts = insert_all.inserts.reverse.uniq { |insert| conflict_columns.map { |column| insert[column] } }.reverse
insert_all.instance_variable_set(:@inserts, unique_inserts)
def build_sql_for_returning(insert:, insert_all:)
return "" unless insert_all.returning

returning_values_sql = if insert_all.returning.is_a?(String)
insert_all.returning
else
Array(insert_all.returning).map do |attribute|
if insert.model.attribute_alias?(attribute)
"INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}"
else
"INSERTED.#{quote_column_name(attribute)}"
end
end.join(",")
end

" OUTPUT #{returning_values_sql}"
end
private :make_inserts_unique
private :build_sql_for_returning

def get_conflicted_columns(insert_all:, insert:)
if (unique_by = insert_all.unique_by)
Expand All @@ -223,17 +274,7 @@ def get_conflicted_columns(insert_all:, insert:)
def build_sql_for_regular_insert(insert)
sql = "INSERT #{insert.into}"

returning = insert.send(:insert_all).returning

if returning
returning_sql = if returning.is_a?(String)
returning
else
Array(returning).map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ")
end
sql << " OUTPUT #{returning_sql}"
end

sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
sql << " #{insert.values_list}"
sql
end
Expand Down
2 changes: 1 addition & 1 deletion lib/active_record/connection_adapters/sqlserver_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def supports_insert_on_duplicate_skip?
end

def supports_insert_on_duplicate_update?
false
true
end

def supports_insert_conflict_target?
Expand Down
12 changes: 12 additions & 0 deletions test/cases/coerced_tests.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2552,6 +2552,18 @@ def test_insert_with_type_casting_and_serialize_is_consistent_coerced
Book.where(author_id: nil, name: '["Array"]').delete_all
Book.lease_connection.add_index(:books, [:author_id, :name], unique: true)
end

# Same as original but using target.status for assignment and GREATEST for operator
coerce_tests! :test_upsert_all_updates_using_provided_sql
def test_upsert_all_updates_using_provided_sql_coerced
Book.upsert_all(
[{id: 1, status: 1}, {id: 2, status: 1}],
on_duplicate: Arel.sql("target.status = GREATEST(target.status, 1)")
)

assert_equal "published", Book.find(1).status
assert_equal "written", Book.find(2).status
end
end

module ActiveRecord
Expand Down