Skip to content

Commit

Permalink
Fix existing partitions overwrite in Hive
Browse files Browse the repository at this point in the history
This implementation writes to target partition directory directly.
After successful write, all files within partition whose name prefix
or suffix don't match current query ID are removed.
  • Loading branch information
Arkadiusz Czajkowski committed Sep 27, 2021
1 parent d7e0cdd commit 96e77e7
Show file tree
Hide file tree
Showing 7 changed files with 421 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.base.CatalogName;
import io.trino.plugin.hive.HdfsEnvironment.HdfsContext;
import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation;
import io.trino.plugin.hive.HiveSessionProperties.InsertExistingPartitionsBehavior;
import io.trino.plugin.hive.LocationService.WriteInfo;
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidSchema;
Expand Down Expand Up @@ -105,7 +106,9 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.serde.serdeConstants;
Expand Down Expand Up @@ -170,6 +173,7 @@
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.HiveSessionProperties.getCompressionCodec;
import static io.trino.plugin.hive.HiveSessionProperties.getHiveStorageFormat;
import static io.trino.plugin.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
import static io.trino.plugin.hive.HiveSessionProperties.getQueryPartitionFilterRequiredSchemas;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.HiveSessionProperties.isBucketExecutionEnabled;
Expand Down Expand Up @@ -218,6 +222,7 @@
import static io.trino.plugin.hive.HiveType.HIVE_STRING;
import static io.trino.plugin.hive.HiveType.toHiveType;
import static io.trino.plugin.hive.HiveWriterFactory.computeBucketedFileName;
import static io.trino.plugin.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static io.trino.plugin.hive.PartitionUpdate.UpdateMode.APPEND;
import static io.trino.plugin.hive.PartitionUpdate.UpdateMode.NEW;
import static io.trino.plugin.hive.PartitionUpdate.UpdateMode.OVERWRITE;
Expand Down Expand Up @@ -249,6 +254,7 @@
import static io.trino.plugin.hive.util.HiveUtil.verifyPartitionTypeSupported;
import static io.trino.plugin.hive.util.HiveWriteUtils.checkTableIsWritable;
import static io.trino.plugin.hive.util.HiveWriteUtils.initializeSerializer;
import static io.trino.plugin.hive.util.HiveWriteUtils.isFileCreatedByQuery;
import static io.trino.plugin.hive.util.HiveWriteUtils.isS3FileSystem;
import static io.trino.plugin.hive.util.HiveWriteUtils.isWritableType;
import static io.trino.plugin.hive.util.Statistics.ReduceOperator.ADD;
Expand Down Expand Up @@ -1657,6 +1663,11 @@ public HiveInsertTableHandle beginInsert(ConnectorSession session, ConnectorTabl
transaction);

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
if (getInsertExistingPartitionsBehavior(session) == InsertExistingPartitionsBehavior.OVERWRITE
&& isTransactionalTable(table.getParameters())
&& writeInfo.getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
throw new TrinoException(NOT_SUPPORTED, "Overwriting existing partition in transactional tables doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}
metastore.declareIntentionToWrite(session, writeInfo.getWriteMode(), writeInfo.getWritePath(), tableName);
return result;
}
Expand Down Expand Up @@ -1759,14 +1770,22 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
if (!partition.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) {
throw new TrinoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert");
}
if (partitionUpdate.getUpdateMode() == OVERWRITE) {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues(), true);
}
PartitionStatistics partitionStatistics = createPartitionStatistics(
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partition.getValues()));
metastore.addPartition(session, handle.getSchemaName(), handle.getTableName(), partition, partitionUpdate.getWritePath(), partitionStatistics);
if (partitionUpdate.getUpdateMode() == OVERWRITE) {
if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath());
}
else {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues(), true);
metastore.addPartition(session, handle.getSchemaName(), handle.getTableName(), partition, partitionUpdate.getWritePath(), partitionStatistics);
}
}
else {
metastore.addPartition(session, handle.getSchemaName(), handle.getTableName(), partition, partitionUpdate.getWritePath(), partitionStatistics);
}
}
else {
throw new IllegalArgumentException(format("Unsupported update mode: %s", partitionUpdate.getUpdateMode()));
Expand All @@ -1788,6 +1807,27 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
.collect(toImmutableList())));
}

private void removeNonCurrentQueryFiles(ConnectorSession session, Path partitionPath)
{
String queryId = session.getQueryId();
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), partitionPath);
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(partitionPath, false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (!isFileCreatedByQuery(file.getName(), queryId)) {
fileSystem.delete(file, false);
}
}
}
catch (Exception ex) {
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Failed to delete partition %s files during overwrite", partitionPath),
ex);
}
}

private void createOrcAcidVersionFile(HdfsContext context, Path deltaDirectory)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
Expand Down Expand Up @@ -426,7 +425,6 @@ public HiveWriter createWriter(Page partitionColumns, int position, OptionalInt
schema = getHiveSchema(table);

writeInfo = locationService.getPartitionWriteInfo(locationHandle, Optional.empty(), partitionName.get());
checkState(writeInfo.getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY, "Overwriting existing partition doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
break;
case ERROR:
throw new TrinoException(HIVE_PARTITION_READ_ONLY, "Cannot insert into an existing partition of Hive table: " + partitionName.get());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.hive.s3.S3HiveQueryRunner;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.List;

import static io.trino.testing.MaterializedResult.resultBuilder;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public abstract class BaseTestHiveInsertOverwrite
extends AbstractTestQueryFramework
{
private static final String HIVE_TEST_SCHEMA = "hive_insert_overwrite";

private String bucketName;
private HiveMinioDataLake dockerizedS3DataLake;

private final String hiveHadoopImage;

public BaseTestHiveInsertOverwrite(String hiveHadoopImage)
{
this.hiveHadoopImage = requireNonNull(hiveHadoopImage, "hiveHadoopImage is null");
}

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.bucketName = "test-hive-insert-overwrite-" + randomTableSuffix();
this.dockerizedS3DataLake = closeAfterClass(
new HiveMinioDataLake(bucketName, ImmutableMap.of(), hiveHadoopImage));
this.dockerizedS3DataLake.start();
return S3HiveQueryRunner.create(
this.dockerizedS3DataLake.getHiveHadoop().getHiveMetastoreEndpoint(),
this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(),
HiveMinioDataLake.ACCESS_KEY,
HiveMinioDataLake.SECRET_KEY,
ImmutableMap.<String, String>builder()
// This is required when using MinIO which requires path style access
.put("hive.s3.path-style-access", "true")
.put("hive.insert-existing-partitions-behavior", "OVERWRITE")
.put("hive.non-managed-table-writes-enabled", "true")
.build());
}

@BeforeClass
public void setUp()
{
computeActual(format(
"CREATE SCHEMA hive.%1$s WITH (location='s3a://%2$s/%1$s')",
HIVE_TEST_SCHEMA,
bucketName));
}

@Test
public void testInsertOverwriteNonPartitionedTable()
{
String testTable = getTestTableName();
computeActual(getCreateTableStatement(testTable));
assertInsertFailure(
testTable,
"Overwriting unpartitioned table not supported when writing directly to target directory");
computeActual(format("DROP TABLE %s", testTable));
}

@Test
public void testInsertOverwriteNonPartitionedBucketedTable()
{
String testTable = getTestTableName();
computeActual(getCreateTableStatement(
testTable,
"bucketed_by = ARRAY['nationkey']",
"bucket_count = 3"));
assertInsertFailure(
testTable,
"Overwriting unpartitioned table not supported when writing directly to target directory");
computeActual(format("DROP TABLE %s", testTable));
}

@Test
public void testInsertOverwritePartitionedTable()
{
String testTable = getTestTableName();
computeActual(getCreateTableStatement(
testTable,
"partitioned_by=ARRAY['regionkey']"));
copyTpchNationToTable(testTable);
assertOverwritePartition(testTable);
}

@Test
public void testInsertOverwritePartitionedAndBucketedTable()
{
String testTable = getTestTableName();
computeActual(getCreateTableStatement(
testTable,
"partitioned_by=ARRAY['regionkey']",
"bucketed_by = ARRAY['nationkey']",
"bucket_count = 3"));
copyTpchNationToTable(testTable);
assertOverwritePartition(testTable);
}

@Test
public void testInsertOverwritePartitionedAndBucketedExternalTable()
{
String testTable = getTestTableName();
// Store table data in data lake bucket
computeActual(getCreateTableStatement(
testTable,
"partitioned_by=ARRAY['regionkey']",
"bucketed_by = ARRAY['nationkey']",
"bucket_count = 3"));
copyTpchNationToTable(testTable);

// Map this table as external table
String externalTableName = testTable + "_ext";
computeActual(getCreateTableStatement(
externalTableName,
"partitioned_by=ARRAY['regionkey']",
"bucketed_by = ARRAY['nationkey']",
"bucket_count = 3",
format("external_location = 's3a://%s/%s/%s/'", this.bucketName, HIVE_TEST_SCHEMA, testTable)));
copyTpchNationToTable(testTable);
assertOverwritePartition(externalTableName);
}

protected void assertInsertFailure(String testTable, String expectedMessageRegExp)
{
assertQueryFails(
format("INSERT INTO %s " +
"SELECT name, comment, nationkey, regionkey " +
"FROM tpch.tiny.nation",
testTable),
expectedMessageRegExp);
}

protected void assertOverwritePartition(String testTable)
{
computeActual(format(
"INSERT INTO %s VALUES " +
"('POLAND', 'Test Data', 25, 5), " +
"('CZECH', 'Test Data', 26, 5)",
testTable));
query(format("SELECT count(*) FROM %s WHERE regionkey = 5", testTable))
.assertThat()
.skippingTypesCheck()
.containsAll(resultBuilder(getSession())
.row(2L)
.build());

query(format("INSERT INTO %s values('POLAND', 'Overwrite', 25, 5)", testTable))
.assertThat()
.skippingTypesCheck()
.containsAll(resultBuilder(getSession())
.row(1L)
.build());
computeActual(format("DROP TABLE %s", testTable));
}

protected String getTestTableName()
{
return format("hive.%s.%s", HIVE_TEST_SCHEMA, "nation_" + randomTableSuffix());
}

protected String getCreateTableStatement(String tableName, String... propertiesEntries)
{
return getCreateTableStatement(tableName, Arrays.asList(propertiesEntries));
}

protected String getCreateTableStatement(String tableName, List<String> propertiesEntries)
{
return format(
"CREATE TABLE %s (" +
" name varchar(25), " +
" comment varchar(152), " +
" nationkey bigint, " +
" regionkey bigint) " +
(propertiesEntries.size() < 1 ? "" : propertiesEntries
.stream()
.collect(joining(",", "WITH (", ")"))),
tableName);
}

protected void copyTpchNationToTable(String testTable)
{
computeActual(format("INSERT INTO " + testTable + " SELECT name, comment, nationkey, regionkey FROM tpch.tiny.nation"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hive;

import io.trino.plugin.hive.containers.HiveHadoop;

public class TestHive2InsertOverwrite
extends BaseTestHiveInsertOverwrite
{
public TestHive2InsertOverwrite()
{
super(HiveHadoop.DEFAULT_IMAGE);
}
}
Loading

0 comments on commit 96e77e7

Please sign in to comment.