-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix existing partitions overwrite in Hive
This implementation writes to target partition directory directly. After successful write, all files within partition whose name prefix or suffix don't match current query ID are removed.
- Loading branch information
Arkadiusz Czajkowski
committed
Sep 27, 2021
1 parent
d7e0cdd
commit 96e77e7
Showing
7 changed files
with
421 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
212 changes: 212 additions & 0 deletions
212
plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveInsertOverwrite.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.plugin.hive; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import io.trino.plugin.hive.containers.HiveMinioDataLake; | ||
import io.trino.plugin.hive.s3.S3HiveQueryRunner; | ||
import io.trino.testing.AbstractTestQueryFramework; | ||
import io.trino.testing.QueryRunner; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.Test; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
import static io.trino.testing.MaterializedResult.resultBuilder; | ||
import static io.trino.testing.sql.TestTable.randomTableSuffix; | ||
import static java.lang.String.format; | ||
import static java.util.Objects.requireNonNull; | ||
import static java.util.stream.Collectors.joining; | ||
|
||
public abstract class BaseTestHiveInsertOverwrite | ||
extends AbstractTestQueryFramework | ||
{ | ||
private static final String HIVE_TEST_SCHEMA = "hive_insert_overwrite"; | ||
|
||
private String bucketName; | ||
private HiveMinioDataLake dockerizedS3DataLake; | ||
|
||
private final String hiveHadoopImage; | ||
|
||
public BaseTestHiveInsertOverwrite(String hiveHadoopImage) | ||
{ | ||
this.hiveHadoopImage = requireNonNull(hiveHadoopImage, "hiveHadoopImage is null"); | ||
} | ||
|
||
@Override | ||
protected QueryRunner createQueryRunner() | ||
throws Exception | ||
{ | ||
this.bucketName = "test-hive-insert-overwrite-" + randomTableSuffix(); | ||
this.dockerizedS3DataLake = closeAfterClass( | ||
new HiveMinioDataLake(bucketName, ImmutableMap.of(), hiveHadoopImage)); | ||
this.dockerizedS3DataLake.start(); | ||
return S3HiveQueryRunner.create( | ||
this.dockerizedS3DataLake.getHiveHadoop().getHiveMetastoreEndpoint(), | ||
this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(), | ||
HiveMinioDataLake.ACCESS_KEY, | ||
HiveMinioDataLake.SECRET_KEY, | ||
ImmutableMap.<String, String>builder() | ||
// This is required when using MinIO which requires path style access | ||
.put("hive.s3.path-style-access", "true") | ||
.put("hive.insert-existing-partitions-behavior", "OVERWRITE") | ||
.put("hive.non-managed-table-writes-enabled", "true") | ||
.build()); | ||
} | ||
|
||
@BeforeClass | ||
public void setUp() | ||
{ | ||
computeActual(format( | ||
"CREATE SCHEMA hive.%1$s WITH (location='s3a://%2$s/%1$s')", | ||
HIVE_TEST_SCHEMA, | ||
bucketName)); | ||
} | ||
|
||
@Test | ||
public void testInsertOverwriteNonPartitionedTable() | ||
{ | ||
String testTable = getTestTableName(); | ||
computeActual(getCreateTableStatement(testTable)); | ||
assertInsertFailure( | ||
testTable, | ||
"Overwriting unpartitioned table not supported when writing directly to target directory"); | ||
computeActual(format("DROP TABLE %s", testTable)); | ||
} | ||
|
||
@Test | ||
public void testInsertOverwriteNonPartitionedBucketedTable() | ||
{ | ||
String testTable = getTestTableName(); | ||
computeActual(getCreateTableStatement( | ||
testTable, | ||
"bucketed_by = ARRAY['nationkey']", | ||
"bucket_count = 3")); | ||
assertInsertFailure( | ||
testTable, | ||
"Overwriting unpartitioned table not supported when writing directly to target directory"); | ||
computeActual(format("DROP TABLE %s", testTable)); | ||
} | ||
|
||
@Test | ||
public void testInsertOverwritePartitionedTable() | ||
{ | ||
String testTable = getTestTableName(); | ||
computeActual(getCreateTableStatement( | ||
testTable, | ||
"partitioned_by=ARRAY['regionkey']")); | ||
copyTpchNationToTable(testTable); | ||
assertOverwritePartition(testTable); | ||
} | ||
|
||
@Test | ||
public void testInsertOverwritePartitionedAndBucketedTable() | ||
{ | ||
String testTable = getTestTableName(); | ||
computeActual(getCreateTableStatement( | ||
testTable, | ||
"partitioned_by=ARRAY['regionkey']", | ||
"bucketed_by = ARRAY['nationkey']", | ||
"bucket_count = 3")); | ||
copyTpchNationToTable(testTable); | ||
assertOverwritePartition(testTable); | ||
} | ||
|
||
@Test | ||
public void testInsertOverwritePartitionedAndBucketedExternalTable() | ||
{ | ||
String testTable = getTestTableName(); | ||
// Store table data in data lake bucket | ||
computeActual(getCreateTableStatement( | ||
testTable, | ||
"partitioned_by=ARRAY['regionkey']", | ||
"bucketed_by = ARRAY['nationkey']", | ||
"bucket_count = 3")); | ||
copyTpchNationToTable(testTable); | ||
|
||
// Map this table as external table | ||
String externalTableName = testTable + "_ext"; | ||
computeActual(getCreateTableStatement( | ||
externalTableName, | ||
"partitioned_by=ARRAY['regionkey']", | ||
"bucketed_by = ARRAY['nationkey']", | ||
"bucket_count = 3", | ||
format("external_location = 's3a://%s/%s/%s/'", this.bucketName, HIVE_TEST_SCHEMA, testTable))); | ||
copyTpchNationToTable(testTable); | ||
assertOverwritePartition(externalTableName); | ||
} | ||
|
||
protected void assertInsertFailure(String testTable, String expectedMessageRegExp) | ||
{ | ||
assertQueryFails( | ||
format("INSERT INTO %s " + | ||
"SELECT name, comment, nationkey, regionkey " + | ||
"FROM tpch.tiny.nation", | ||
testTable), | ||
expectedMessageRegExp); | ||
} | ||
|
||
protected void assertOverwritePartition(String testTable) | ||
{ | ||
computeActual(format( | ||
"INSERT INTO %s VALUES " + | ||
"('POLAND', 'Test Data', 25, 5), " + | ||
"('CZECH', 'Test Data', 26, 5)", | ||
testTable)); | ||
query(format("SELECT count(*) FROM %s WHERE regionkey = 5", testTable)) | ||
.assertThat() | ||
.skippingTypesCheck() | ||
.containsAll(resultBuilder(getSession()) | ||
.row(2L) | ||
.build()); | ||
|
||
query(format("INSERT INTO %s values('POLAND', 'Overwrite', 25, 5)", testTable)) | ||
.assertThat() | ||
.skippingTypesCheck() | ||
.containsAll(resultBuilder(getSession()) | ||
.row(1L) | ||
.build()); | ||
computeActual(format("DROP TABLE %s", testTable)); | ||
} | ||
|
||
protected String getTestTableName() | ||
{ | ||
return format("hive.%s.%s", HIVE_TEST_SCHEMA, "nation_" + randomTableSuffix()); | ||
} | ||
|
||
protected String getCreateTableStatement(String tableName, String... propertiesEntries) | ||
{ | ||
return getCreateTableStatement(tableName, Arrays.asList(propertiesEntries)); | ||
} | ||
|
||
protected String getCreateTableStatement(String tableName, List<String> propertiesEntries) | ||
{ | ||
return format( | ||
"CREATE TABLE %s (" + | ||
" name varchar(25), " + | ||
" comment varchar(152), " + | ||
" nationkey bigint, " + | ||
" regionkey bigint) " + | ||
(propertiesEntries.size() < 1 ? "" : propertiesEntries | ||
.stream() | ||
.collect(joining(",", "WITH (", ")"))), | ||
tableName); | ||
} | ||
|
||
protected void copyTpchNationToTable(String testTable) | ||
{ | ||
computeActual(format("INSERT INTO " + testTable + " SELECT name, comment, nationkey, regionkey FROM tpch.tiny.nation")); | ||
} | ||
} |
25 changes: 25 additions & 0 deletions
25
plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHive2InsertOverwrite.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.plugin.hive; | ||
|
||
import io.trino.plugin.hive.containers.HiveHadoop; | ||
|
||
public class TestHive2InsertOverwrite | ||
extends BaseTestHiveInsertOverwrite | ||
{ | ||
public TestHive2InsertOverwrite() | ||
{ | ||
super(HiveHadoop.DEFAULT_IMAGE); | ||
} | ||
} |
Oops, something went wrong.