Skip to content

Commit

Permalink
[BACKPORT 2.12][#11269] YSQL: Add session variable to enable upsert m…
Browse files Browse the repository at this point in the history
…ode for inserts

Summary:
Speed up YSQL bulk loads by introducing the `yb_enable_upsert_mode` session variable. When set to true, inserts skip the lookup of any keys being inserted, thereby, enabling faster inserts.
The default value for this flag is false.
To enable the flag:
```
SET yb_enable_upsert_mode=true
```
To disable the flag:
```
SET yb_enable_upsert_mode=false
```

Test Plan:
ybd --java-test 'org.yb.pgsql.TestBatchCopyFrom'

We expect the following behaviors when bulk loading via COPY command:
1) Indexes should be updated just like a normal INSERT
2) Foreign key integrity should be maintained just like a normal INSERT
3) Disable batching if the table contains non-FK trigger as per the following [change](7c428da).

Original commit: D16241 / c6b90bb

Reviewers: ena, lnguyen, smishra

Reviewed By: smishra

Subscribers: yql, mihnea, dmitry

Differential Revision: https://phabricator.dev.yugabyte.com/D16655
  • Loading branch information
paullee-yb committed Apr 22, 2022
1 parent d3900a4 commit 9ad58d8
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 1 deletion.
36 changes: 36 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,42 @@ protected void verifyStatementWarning(Statement statement,
assertEquals("Expected (at most) one warning", null, warning.getNextWarning());
}

/**
* Verify that a (write) query succeeds with multiple lines of warnings.
* @param statement The statement used to execute the query.
* @param query The query string.
* @param warningSubstring A (case-insensitive) list of substrings of expected warning messages.
*/
protected void verifyStatementWarnings(Statement statement,
String query,
List<String> warningSubstrings) throws SQLException {
int warningLineIndex = 0;
int expectedWarningCount = warningSubstrings.size();
statement.execute(query);
SQLWarning warning = statement.getWarnings();

// make sure number of warnings match expected number of warnings
int warningCount = 0;
while (warning != null) {
warningCount++;
warning = warning.getNextWarning();
}
assertEquals("Expected " + expectedWarningCount + " warnings.", expectedWarningCount,
warningCount);

// check each warning matches expected list of warnings
warning = statement.getWarnings();
while (warning != null) {
assertTrue(String.format("Unexpected Warning Message. Got: '%s', expected to contain : '%s",
warning.getMessage(), warningSubstrings.get(warningLineIndex)),
StringUtils.containsIgnoreCase(warning.getMessage(),
warningSubstrings.get(warningLineIndex)));
warning = warning.getNextWarning();
warningLineIndex++;
}

}

protected String getSimpleTableCreationStatement(
String tableName,
String valueColumnName,
Expand Down
200 changes: 199 additions & 1 deletion java/yb-pgsql/src/test/java/org/yb/pgsql/TestBatchCopyFrom.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.yb.pgsql;

import static org.yb.AssertionWrappers.fail;
import static org.yb.AssertionWrappers.*;

import java.io.BufferedReader;
import java.io.BufferedWriter;
Expand All @@ -23,6 +23,7 @@
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;

import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
Expand All @@ -44,11 +45,15 @@ public class TestBatchCopyFrom extends BasePgSQLTest {
"argument to option \"rows_per_transaction\" must be a positive integer";
private static final String INVALID_COPY_INPUT_ERROR_MSG =
"invalid input syntax for integer";
private static final String INVALID_FOREIGN_KEY_ERROR_MSG =
"violates foreign key constraint";
private static final String BATCH_TXN_SESSION_VARIABLE_NAME =
"yb_default_copy_from_rows_per_transaction";
private static final int BATCH_TXN_SESSION_VARIABLE_DEFAULT_ROWS = 1000;
private static final String DISABLE_TXN_WRITES_SESSION_VARIABLE_NAME =
"yb_disable_transactional_writes";
private static final String YB_ENABLE_UPSERT_MODE_VARIABLE_NAME =
"yb_enable_upsert_mode";

private String getAbsFilePath(String fileName) {
return TestUtils.getBaseTmpDir() + "/" + fileName;
Expand All @@ -71,6 +76,13 @@ private void createFileInTmpDir(String absFilePath, int totalLines) throws IOExc
writer.close();
}

private void writeToFileInTmpDir(String absFilePath, String line) throws IOException {
File myObj = new File(absFilePath);
BufferedWriter writer = new BufferedWriter(new FileWriter(myObj, true));
writer.write(line);
writer.close();
}

private void appendInvalidEntries(String absFilePath, int totalLines) throws IOException {
FileWriter fileWriter = new FileWriter(absFilePath,true);
BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);
Expand Down Expand Up @@ -516,6 +528,192 @@ public void testNonTxnWriteSessionVariable() throws Exception {
}
}

@Test
public void testEnableUpsertModeSessionVariable() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-upsertmode-sessionvar.txt");
String tableName = "upsertModeSessionVarTable";
int totalValidLines = 5;
int expectedCopiedLines = totalValidLines;

createFileInTmpDir(absFilePath, totalValidLines);

try (Statement statement = connection.createStatement()) {
// check that yb_enable_upsert_mode session is off by default
assertOneRow(statement, "SHOW " + YB_ENABLE_UPSERT_MODE_VARIABLE_NAME, "off");

// set enable-upsert session variable
statement.execute("SET " + YB_ENABLE_UPSERT_MODE_VARIABLE_NAME + "=true");

// create and populate table with COPY command
statement.execute(String.format(
"CREATE TABLE %s (a INT, b INT, c TEXT, d TEXT)", tableName));
statement.execute(String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)", tableName, absFilePath));

// check every row was properly inserted into the table
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, expectedCopiedLines);

// verify DDL and DML statements work
statement.execute("ALTER TABLE " + tableName + " ADD COLUMN e INT");
statement.execute("ALTER TABLE " + tableName + " DROP COLUMN c");
statement.execute("INSERT INTO " + tableName + " VALUES (0, 1, 2, 3)");
statement.execute("TRUNCATE TABLE " + tableName);
statement.execute("DROP TABLE " + tableName);
statement.execute("CREATE TABLE " + tableName + " (a INT PRIMARY KEY, b TEXT)");
}
}

@Test
public void testEnableUpsertModeSessionVariableIndex() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-upsertmode-index-sessionvar.txt");
String tableName = "upsertModeSessionVarTableIndex";
int totalValidLines = 5;
int expectedCopiedLines = totalValidLines;

createFileInTmpDir(absFilePath, totalValidLines);

// add CSV line that shares the same primary key value as
// another row but updates the indexed column b
writeToFileInTmpDir(absFilePath, "0,5,2,3\n");

try (Statement statement = connection.createStatement()) {

// set enable-upsert session variable
statement.execute("SET " + YB_ENABLE_UPSERT_MODE_VARIABLE_NAME + "=true");

// create and populate table with COPY command
// COPY command should not throw error even with the duplicate primary key
// since upsert mode is ON
statement.execute(String.format(
"CREATE TABLE %s (a INT PRIMARY KEY, b INT, c TEXT, d TEXT)", tableName));
statement.execute(String.format("CREATE INDEX ON %s (b)", tableName));
statement.execute(String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)", tableName, absFilePath));

// check every row was properly inserted into the table
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, expectedCopiedLines);

// check that row with shared primary key was overwritten
assertOneRow(statement, String.format("SELECT COUNT(*) FROM %s WHERE b=1", tableName), 1);

// verify index exists, is being used, and is properly updated
String explainOutput = getExplainAnalyzeOutput(statement, String.format(
"SELECT * from %s WHERE b=5;", tableName));
assertTrue("SELECT query should be an index scan", explainOutput.contains(
"Index Scan using upsertmodesessionvartableindex_b_idx on upsertmodesessionvartableindex"));
explainOutput = getExplainAnalyzeOutput(statement, String.format(
"SELECT * from %s WHERE b=5;", tableName));
assertTrue("Expect to fetch 2 rows from index scan",
explainOutput.contains("rows=2 loops=1)"));
}
}

@Test
public void testEnableUpsertModeSessionVariableForeignKey() throws Exception {
String foreignKeyFilePath = getAbsFilePath("batch-copyfrom-upsertmode-fk-sessionvar.txt");
String primaryKeyTableName = "upsertModeSessionVarTablePk";
String foreignKeyTableName = "upsertModeSessionVarTableFk";
int expectedPrimaryKeyLines = 20;
int expectedForeignKeyLines = 5;

// create tmp CSV file with header
createFileInTmpDir(foreignKeyFilePath, expectedForeignKeyLines);

// add CSV line that shares the same primary key as
// another row but updates the foreign key column
writeToFileInTmpDir(foreignKeyFilePath, "0,2,3,4\n");

try (Statement statement = connection.createStatement()) {

// set enable-upsert session variable
statement.execute("SET " + YB_ENABLE_UPSERT_MODE_VARIABLE_NAME + "=true");

// create and populate primary key table with COPY command
statement.execute(String.format(
"CREATE TABLE %s (a INT PRIMARY KEY, b text)", primaryKeyTableName));
for (int i = 0; i < expectedPrimaryKeyLines; i++) {
statement.execute(String.format(
"INSERT INTO %s VALUES (%d, %s)", primaryKeyTableName, i, i+1));
}
// check every row was properly inserted into the table
assertOneRow(statement, "SELECT COUNT(*) FROM " + primaryKeyTableName,
expectedPrimaryKeyLines);

// create and bulk load the foreign key table
statement.execute(String.format(
"CREATE TABLE %s (a INT PRIMARY KEY, b INT REFERENCES %s, c TEXT, d TEXT)",
foreignKeyTableName, primaryKeyTableName));
statement.execute(String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)", foreignKeyTableName, foreignKeyFilePath));
// check every row was properly inserted into the table
assertOneRow(statement, "SELECT COUNT(*) FROM " + foreignKeyTableName,
expectedForeignKeyLines);

// verify foreign key was updated
assertOneRow(statement, String.format(
"SELECT COUNT(*) FROM %s WHERE b=2", foreignKeyTableName), 1);

// clear table to test scenario where FK constraint is violated
statement.execute(String.format("TRUNCATE TABLE %s", foreignKeyTableName));

// add CSV row with foreign key that points to a non-existent ID
writeToFileInTmpDir(foreignKeyFilePath, "0,-1,3,4\n");

// validate invalid FK is caught
runInvalidQuery(statement,
String.format(
"COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)", foreignKeyTableName, foreignKeyFilePath),
INVALID_FOREIGN_KEY_ERROR_MSG);

// no rows should have been copied
assertOneRow(statement, String.format("SELECT COUNT(*) FROM %s", foreignKeyTableName), 0);
}
}

@Test
public void testEnableUpsertModeSessionVariableTrigger() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-upsertmode-sessionvar.txt");
String tableName = "upsertModeSessionVarTable";
int totalValidLines = 5;
int expectedCopiedLines = totalValidLines;

createFileInTmpDir(absFilePath, totalValidLines);

try (Statement statement = connection.createStatement()) {
// create and populate table with COPY command
statement.execute(String.format(
"CREATE TABLE %s (a INT, b INT, c TEXT, d TEXT)", tableName));

// create before and after triggers on table
statement.execute(
"CREATE FUNCTION trigger_func() RETURNS trigger LANGUAGE plpgsql AS \'" +
"BEGIN " +
" RAISE NOTICE \'\'trigger_func(%) called: action = %, when = %, level = %\'\', " +
" TG_ARGV[0], TG_OP, TG_WHEN, TG_LEVEL; " +
"RETURN NULL; " +
"END;\';");
statement.execute(String.format(
"CREATE TRIGGER before_ins_stmt_trig BEFORE INSERT ON %s " +
"FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(\'before_ins_stmt\');", tableName));
statement.execute(String.format(
"CREATE TRIGGER after_ins_stmt_trig AFTER INSERT ON %s " +
"FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(\'after_ins_stmt\');", tableName));

// verify batching is disabled if the table contains a non-FK trigger
verifyStatementWarnings(
statement,
String.format("COPY %s FROM \'%s\' WITH (FORMAT CSV, HEADER)", tableName, absFilePath),
Arrays.asList(
"Batched COPY is not supported on table with non RI trigger. " +
"Defaulting to using one transaction for the entire copy.",
"trigger_func(before_ins_stmt) called: action = INSERT, when = BEFORE, level = STATEMENT",
"trigger_func(after_ins_stmt) called: action = INSERT, when = AFTER, level = STATEMENT"));

// check every row was properly inserted into the table
assertOneRow(statement, "SELECT COUNT(*) FROM " + tableName, expectedCopiedLines);
}
}

@Test
public void testSessionVariableWithRowsPerTransactionOption() throws Exception {
String absFilePath = getAbsFilePath("batch-copyfrom-sessionvar-with-query-option.txt");
Expand Down
6 changes: 6 additions & 0 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "access/yb_scan.h"

bool yb_disable_transactional_writes = false;
bool yb_enable_upsert_mode = false;

/*
* Hack to ensure that the next CommandCounterIncrement() will call
Expand Down Expand Up @@ -289,6 +290,11 @@ static Oid YBCExecuteInsertInternal(Oid dboid,
CacheInvalidateHeapTuple(rel, tuple, NULL);
}

if (yb_enable_upsert_mode)
{
HandleYBStatus(YBCPgInsertStmtSetUpsertMode(insert_stmt));
}

/* Execute the insert */
YBCExecWriteStmt(insert_stmt, rel, NULL /* rows_affected_count */, true /* cleanup */);

Expand Down
10 changes: 10 additions & 0 deletions src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2045,6 +2045,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},

{
{"yb_enable_upsert_mode", PGC_USERSET, CLIENT_CONN_STATEMENT,
gettext_noop("Sets the boolean flag to enable or disable upsert mode for writes."),
NULL
},
&yb_enable_upsert_mode,
false,
NULL, NULL, NULL
},

/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
Expand Down
7 changes: 7 additions & 0 deletions src/postgres/src/include/executor/ybcModifyTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
*/
extern bool yb_disable_transactional_writes;

/**
* YSQL guc variables that can be used to enable upsert mode for writes.
* e.g. 'SET yb_enable_upsert_mode=true'
* See also the corresponding entries in guc.c.
*/
extern bool yb_enable_upsert_mode;

//------------------------------------------------------------------------------
// YugaByte modify table API.

Expand Down

0 comments on commit 9ad58d8

Please sign in to comment.