Skip to content

Raise error of mismatch column name (when matching_by_column_name is enabled) #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Raise an error if there is a mismatch between a column name in a snow…
…flake table and a name in the embulk input schema.
  • Loading branch information
chikamura committed May 1, 2024
commit bcc84b29d2baff424a4b15227382476c43b2b65c
30 changes: 28 additions & 2 deletions src/main/java/org/embulk/output/SnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,16 +247,42 @@ protected void doBegin(
matchByColumnName == SnowflakePluginTask.MatchByColumnName.CASE_SENSITIVE
? String::equals
: String::equalsIgnoreCase;

Optional<JdbcSchema> initialTargetTableSchema =
pluginTask.getMode().ignoreTargetTableSchema()
? Optional.empty()
: newJdbcSchemaFromTableIfExists(con, task.getActualTable());

if (initialTargetTableSchema.isPresent()) {
JdbcSchema skipColumnsFilteredTargetTableSchema =
JdbcSchema.filterSkipColumns(targetTableSchema);
for (JdbcColumn column : initialTargetTableSchema.get().getColumns()) {
if (skipColumnsFilteredTargetTableSchema.findColumn(column.getName()).isPresent()) {
continue;
}
throw new UnsupportedOperationException(
String.format("table column %s is not found in input schema.", column.getName()));
}
}

int columnNumber = 1;
for (int i = 0; i < targetTableSchema.getCount(); i++) {
JdbcColumn targetColumn = targetTableSchema.getColumn(i);
Column schemaColumn = schema.getColumn(i);
if (targetColumn.isSkipColumn()) {
continue;
throw new UnsupportedOperationException(
String.format(
"input schema column %s is not found in %s table.",
schemaColumn.getName(), task.getActualTable().getTableName()));
}
Column schemaColumn = schema.getColumn(i);
if (compare.apply(schemaColumn.getName(), targetColumn.getName())) {
copyIntoTableColumnNames.add(targetColumn.getName());
copyIntoCSVColumnNumbers.add(columnNumber);
} else {
throw new UnsupportedOperationException(
String.format(
"input schema column %s is not found in %s table. (probably a case-sensitive problems)",
schemaColumn.getName(), task.getActualTable().getTableName()));
}
columnNumber += 1;
}
Expand Down
178 changes: 63 additions & 115 deletions src/test/java/org/embulk/output/snowflake/TestSnowflakeOutputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.embulk.output.snowflake;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import java.io.File;
import java.io.IOException;
Expand All @@ -25,6 +23,7 @@
import org.embulk.EmbulkSystemProperties;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.exec.PartialExecutionException;
import org.embulk.input.file.LocalFileInputPlugin;
import org.embulk.output.SnowflakeOutputPlugin;
import org.embulk.output.SnowflakeOutputPlugin.SnowflakePluginTask;
Expand All @@ -41,10 +40,7 @@
import org.embulk.util.config.modules.ZoneIdModule;
import org.embulk.util.config.units.ColumnConfig;
import org.embulk.util.config.units.SchemaConfig;
import org.junit.After;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.*;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -869,34 +865,18 @@ public void testRuntimeWithMatchByColumnNameCaseSensitiveInNoTable() throws IOEx
.set("mode", "insert")
.set("match_by_column_name", "case_sensitive")
.set("table", targetTableName);
embulk.runOutput(config, in.toPath());

runQuery(
String.format("select count(*) from %s;", targetTableFullName),
foreachResult(
rs -> {
assertEquals(3, rs.getInt(1));
}));
List<String> results = new ArrayList();
runQuery(
"select \"c0\", \"C1\", \"c2\", \"C3\" from " + targetTableFullName + " order by 1",
foreachResult(
rs -> {
results.add(
rs.getString(1)
+ ","
+ rs.getString(2)
+ ","
+ rs.getString(3)
+ ","
+ rs.getString(4));
}));
List<String> expected =
Stream.of("1.0,null,10000.0,null", "2.0,null,20000.0,null", "3.0,null,30000.0,null")
.collect(Collectors.toList());
for (int i = 0; i < results.size(); i++) {
assertEquals(expected.get(i), results.get(i));
}
PartialExecutionException exception =
assertThrows(
PartialExecutionException.class,
() -> {
embulk.runOutput(config, in.toPath());
});
assertTrue(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("input schema column c1"));
assertTrue(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("case-sensitive"));
}

@Test
Expand Down Expand Up @@ -933,26 +913,18 @@ public void testRuntimeWithMatchByColumnNameCaseSensitiveWhenOnlyPresentColumnIn
.set("mode", "insert")
.set("match_by_column_name", "case_sensitive")
.set("table", targetTableName);
embulk.runOutput(config, in.toPath());

runQuery(
String.format("select count(*) from %s;", targetTableFullName),
foreachResult(
rs -> {
assertEquals(3, rs.getInt(1));
}));
List<String> results = new ArrayList();
runQuery(
"select \"c0\", \"c2\" from " + targetTableFullName + " order by 1",
foreachResult(
rs -> {
results.add(rs.getString(1) + "," + rs.getString(2));
}));
List<String> expected =
Stream.of("1.0,10000.0", "2.0,20000.0", "3.0,30000.0").collect(Collectors.toList());
for (int i = 0; i < results.size(); i++) {
assertEquals(expected.get(i), results.get(i));
}
PartialExecutionException exception =
assertThrows(
PartialExecutionException.class,
() -> {
embulk.runOutput(config, in.toPath());
});
assertTrue(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("input schema column c1"));
assertFalse(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("case-sensitive"));
}

@Test
Expand Down Expand Up @@ -983,26 +955,18 @@ public void testRuntimeWithMatchByColumnNameCaseSensitiveWhenOnlyPresentColumnIn
.set("mode", "insert")
.set("match_by_column_name", "case_sensitive")
.set("table", targetTableName);
embulk.runOutput(config, in.toPath());

runQuery(
String.format("select count(*) from %s;", targetTableFullName),
foreachResult(
rs -> {
assertEquals(3, rs.getInt(1));
}));
List<String> results = new ArrayList();
runQuery(
"select \"c0\", \"c1\" from " + targetTableFullName + " order by 1",
foreachResult(
rs -> {
results.add(rs.getString(1) + "," + rs.getString(2));
}));
List<String> expected =
Stream.of("1.0,null", "2.0,null", "3.0,null").collect(Collectors.toList());
for (int i = 0; i < results.size(); i++) {
assertEquals(expected.get(i), results.get(i));
}
PartialExecutionException exception =
assertThrows(
PartialExecutionException.class,
() -> {
embulk.runOutput(config, in.toPath());
});
assertTrue(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("table column c1"));
assertFalse(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("case-sensitive"));
}

@Test
Expand Down Expand Up @@ -1136,26 +1100,18 @@ public void testRuntimeWithMatchByColumnNameCaseInsensitiveWhenOnlyPresentColumn
.set("mode", "insert")
.set("match_by_column_name", "case_insensitive")
.set("table", targetTableName);
embulk.runOutput(config, in.toPath());

runQuery(
String.format("select count(*) from %s;", targetTableFullName),
foreachResult(
rs -> {
assertEquals(3, rs.getInt(1));
}));
List<String> results = new ArrayList();
runQuery(
"select \"C0\", \"c2\" from " + targetTableFullName + " order by 1",
foreachResult(
rs -> {
results.add(rs.getString(1) + "," + rs.getString(2));
}));
List<String> expected =
Stream.of("1.0,10000.0", "2.0,20000.0", "3.0,30000.0").collect(Collectors.toList());
for (int i = 0; i < results.size(); i++) {
assertEquals(expected.get(i), results.get(i));
}
PartialExecutionException exception =
assertThrows(
PartialExecutionException.class,
() -> {
embulk.runOutput(config, in.toPath());
});
assertTrue(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("input schema column c1"));
assertFalse(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("case-sensitive"));
}

@Test
Expand Down Expand Up @@ -1186,26 +1142,18 @@ public void testRuntimeWithMatchByColumnNameCaseInsensitiveWhenOnlyPresentColumn
.set("mode", "insert")
.set("match_by_column_name", "case_insensitive")
.set("table", targetTableName);
embulk.runOutput(config, in.toPath());

runQuery(
String.format("select count(*) from %s;", targetTableFullName),
foreachResult(
rs -> {
assertEquals(3, rs.getInt(1));
}));
List<String> results = new ArrayList();
runQuery(
"select \"C0\", \"c1\" from " + targetTableFullName + " order by 1",
foreachResult(
rs -> {
results.add(rs.getString(1) + "," + rs.getString(2));
}));
List<String> expected =
Stream.of("1.0,null", "2.0,null", "3.0,null").collect(Collectors.toList());
for (int i = 0; i < results.size(); i++) {
assertEquals(expected.get(i), results.get(i));
}
PartialExecutionException exception =
assertThrows(
PartialExecutionException.class,
() -> {
embulk.runOutput(config, in.toPath());
});
assertTrue(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("table column c1"));
assertFalse(
exception.getCause().getMessage(),
exception.getCause().getMessage().contains("case-sensitive"));
}

@Ignore(
Expand Down