Skip to content

Commit

Permalink
Fix existing partitions overwrite in Hive
Browse files Browse the repository at this point in the history
This implementation writes to target partition directory directly.
After successful write, all files within partition whose name prefix
or suffix don't match current query ID are removed.

Cherry-pick of trinodb/trino@96e77e7

Co-authored-by: Arkadiusz Czajkowski <arek@starburstdata.com>
  • Loading branch information
2 people authored and highker committed Mar 31, 2022
1 parent 0b67486 commit 3fbbb67
Show file tree
Hide file tree
Showing 18 changed files with 633 additions and 53 deletions.
26 changes: 26 additions & 0 deletions .github/workflows/hive-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,29 @@ jobs:
if [ "${HIVE_AWS_ACCESS_KEY_ID}" != "" ]; then
./mvnw test ${MAVEN_TEST} -pl :presto-hive -P test-hive-glue
fi
hive-dockerized-tests:
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v1
with:
java-version: 8
- name: Cache local Maven repository
id: cache-maven
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-2-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-2-
- name: Populate maven cache
if: steps.cache-maven.outputs.cache-hit != 'true'
run: ./mvnw de.qaware.maven:go-offline-maven-plugin:resolve-dependencies
- name: Install Hive Module
run: |
export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}"
./mvnw install ${MAVEN_FAST_INSTALL} -am -pl :presto-hive
- name: Run Hive Dockerized Tests
run: ./mvnw test ${MAVEN_TEST} -pl :presto-hive -P test-hive-insert-overwrite
19 changes: 19 additions & 0 deletions presto-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@
<exclude>**/TestHivePushdownIntegrationSmokeTest.java</exclude>
<exclude>**/TestHivePushdownDistributedQueries.java</exclude>
<exclude>**/TestParquetDistributedQueries.java</exclude>
<exclude>**/TestHive2InsertOverwrite.java</exclude>
<exclude>**/TestHive3InsertOverwrite.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down Expand Up @@ -607,5 +609,22 @@
</plugins>
</build>
</profile>
<profile>
<id>test-hive-insert-overwrite</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestHive2InsertOverwrite.java</include>
<include>**/TestHive3InsertOverwrite.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,16 @@
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.hive.BucketFunctionType.HIVE_COMPATIBLE;
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.APPEND;
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.ERROR;
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior.OVERWRITE;
import static com.facebook.presto.hive.HiveSessionProperties.INSERT_EXISTING_PARTITIONS_BEHAVIOR;
import static com.facebook.presto.hive.HiveStorageFormat.ORC;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

@DefunctConfig({
"hive.file-system-cache-ttl",
Expand Down Expand Up @@ -93,6 +100,7 @@ public class HiveClientConfig
private boolean createEmptyBucketFiles = true;
private boolean insertOverwriteImmutablePartitions;
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true;
private InsertExistingPartitionsBehavior insertExistingPartitionsBehavior;
private int maxPartitionsPerWriter = 100;
private int maxOpenSortFiles = 50;
private int writeValidationThreads = 16;
Expand Down Expand Up @@ -591,11 +599,13 @@ public HiveClientConfig setRespectTableFormat(boolean respectTableFormat)
return this;
}

@Deprecated
public boolean isInsertOverwriteImmutablePartitionEnabled()
{
return insertOverwriteImmutablePartitions;
}

@Deprecated
@Config("hive.insert-overwrite-immutable-partitions-enabled")
@ConfigDescription("When enabled, insertion query will overwrite existing partitions when partitions are immutable. This config only takes effect with hive.immutable-partitions set to true")
public HiveClientConfig setInsertOverwriteImmutablePartitionEnabled(boolean insertOverwriteImmutablePartitions)
Expand All @@ -604,6 +614,41 @@ public HiveClientConfig setInsertOverwriteImmutablePartitionEnabled(boolean inse
return this;
}

public enum InsertExistingPartitionsBehavior
{
ERROR,
APPEND,
OVERWRITE,
/**/;

public static InsertExistingPartitionsBehavior valueOf(String value, boolean immutablePartition)
{
InsertExistingPartitionsBehavior enumValue = valueOf(value.toUpperCase(ENGLISH));
if (immutablePartition) {
checkArgument(enumValue != APPEND, format("Presto is configured to treat Hive partitions as immutable. %s is not allowed to be set to %s", INSERT_EXISTING_PARTITIONS_BEHAVIOR, APPEND));
}

return enumValue;
}
}

public InsertExistingPartitionsBehavior getInsertExistingPartitionsBehavior()
{
if (insertExistingPartitionsBehavior != null) {
return insertExistingPartitionsBehavior;
}

return immutablePartitions ? (isInsertOverwriteImmutablePartitionEnabled() ? OVERWRITE : ERROR) : APPEND;
}

@Config("hive.insert-existing-partitions-behavior")
@ConfigDescription("Default value for insert existing partitions behavior")
public HiveClientConfig setInsertExistingPartitionsBehavior(InsertExistingPartitionsBehavior insertExistingPartitionsBehavior)
{
this.insertExistingPartitionsBehavior = insertExistingPartitionsBehavior;
return this;
}

public boolean isImmutablePartitions()
{
return immutablePartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.airlift.slice.Slice;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.serde2.OpenCSVSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
Expand Down Expand Up @@ -174,6 +177,7 @@
import static com.facebook.presto.hive.HiveBucketHandle.createVirtualBucketHandle;
import static com.facebook.presto.hive.HiveBucketing.HiveBucketFilter;
import static com.facebook.presto.hive.HiveBucketing.getHiveBucketHandle;
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior;
import static com.facebook.presto.hive.HiveColumnHandle.BUCKET_COLUMN_NAME;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.HiveColumnHandle.ColumnType.REGULAR;
Expand All @@ -184,6 +188,7 @@
import static com.facebook.presto.hive.HiveColumnHandle.updateRowIdHandle;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_COLUMN_ORDER_MISMATCH;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CONCURRENT_MODIFICATION_DETECTED;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TIMEZONE_MISMATCH;
Expand All @@ -205,6 +210,7 @@
import static com.facebook.presto.hive.HiveSessionProperties.getBucketFunctionTypeForExchange;
import static com.facebook.presto.hive.HiveSessionProperties.getCompressionCodec;
import static com.facebook.presto.hive.HiveSessionProperties.getHiveStorageFormat;
import static com.facebook.presto.hive.HiveSessionProperties.getInsertExistingPartitionsBehavior;
import static com.facebook.presto.hive.HiveSessionProperties.getOrcCompressionCodec;
import static com.facebook.presto.hive.HiveSessionProperties.getTemporaryTableCompressionCodec;
import static com.facebook.presto.hive.HiveSessionProperties.getTemporaryTableSchema;
Expand Down Expand Up @@ -279,9 +285,11 @@
import static com.facebook.presto.hive.HiveUtil.translateHiveUnsupportedTypesForTemporaryTable;
import static com.facebook.presto.hive.HiveUtil.verifyPartitionTypeSupported;
import static com.facebook.presto.hive.HiveWriteUtils.checkTableIsWritable;
import static com.facebook.presto.hive.HiveWriteUtils.isFileCreatedByQuery;
import static com.facebook.presto.hive.HiveWriteUtils.isWritableType;
import static com.facebook.presto.hive.HiveWriterFactory.computeBucketedFileName;
import static com.facebook.presto.hive.HiveWriterFactory.getFileExtension;
import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY;
import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.APPEND;
import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.NEW;
import static com.facebook.presto.hive.PartitionUpdate.UpdateMode.OVERWRITE;
Expand Down Expand Up @@ -356,6 +364,7 @@
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;

public class HiveMetadata
implements TransactionalMetadata
Expand Down Expand Up @@ -1908,6 +1917,13 @@ private HiveInsertTableHandle beginInsertInternal(ConnectorSession session, Conn
encryptionInformationProvider.getWriteEncryptionInformation(session, tableEncryptionProperties.map(identity()), tableName.getSchemaName(), tableName.getTableName()));

WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);

if (getInsertExistingPartitionsBehavior(session) == InsertExistingPartitionsBehavior.OVERWRITE
&& isTransactionalTable(table.getParameters())
&& writeInfo.getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
throw new PrestoException(NOT_SUPPORTED, "Overwriting existing partition in transactional tables doesn't support DIRECT_TO_TARGET_EXISTING_DIRECTORY write mode");
}

metastore.declareIntentionToWrite(
new HdfsContext(session, tableName.getSchemaName(), tableName.getTableName(), table.getStorage().getLocation(), false),
metastoreContext,
Expand Down Expand Up @@ -2056,9 +2072,18 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
if (!partition.getStorage().getStorageFormat().getInputFormat().equals(handle.getPartitionStorageFormat().getInputFormat()) && isRespectTableFormat(session)) {
throw new PrestoException(HIVE_CONCURRENT_MODIFICATION_DETECTED, "Partition format changed during insert");
}
if (existingPartitions.contains(partitionUpdate.getName())) {
boolean existingPartition = existingPartitions.contains(partitionUpdate.getName());
if (existingPartition) {
// Overwriting an existing partition
if (partitionUpdate.getUpdateMode() == OVERWRITE) {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), handle.getLocationHandle().getTargetPath().toString(), partition.getValues());
if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
// In this writeMode, the new files will be written to the same directory. Since this is
// an overwrite operation, we must remove all the old files not written by current query.
removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath());
}
else {
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), handle.getLocationHandle().getTargetPath().toString(), partition.getValues());
}
}
else {
throw new PrestoException(HIVE_PARTITION_READ_ONLY, "Cannot insert into an existing partition of Hive table: " + partitionUpdate.getName());
Expand All @@ -2069,15 +2094,19 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
partitionUpdate.getStatistics(),
columnTypes,
getColumnStatistics(partitionComputedStatistics, partition.getValues()));
metastore.addPartition(
session,
handle.getSchemaName(),
handle.getTableName(),
table.getStorage().getLocation(),
false,
partition,
partitionUpdate.getWritePath(),
partitionStatistics);

// New partition or overwriting existing partition by staging and moving the new partition
if (!existingPartition || handle.getLocationHandle().getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
metastore.addPartition(
session,
handle.getSchemaName(),
handle.getTableName(),
table.getStorage().getLocation(),
false,
partition,
partitionUpdate.getWritePath(),
partitionStatistics);
}
}
else {
throw new IllegalArgumentException(format("Unsupported update mode: %s", partitionUpdate.getUpdateMode()));
Expand All @@ -2091,6 +2120,35 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
.collect(toList())));
}

/**
* Deletes all the files not written by the current query from the given partition path.
* This is required when we are overwriting the partitions by directly writing the new
* files to the existing directory, where files written by older queries may be present too.
*
* @param session the ConnectorSession object
* @param partitionPath the path of the partition from where the older files are to be deleted
*/
private void removeNonCurrentQueryFiles(ConnectorSession session, Path partitionPath)
{
String queryId = session.getQueryId();
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsContext(session), partitionPath);
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(partitionPath, false);
while (iterator.hasNext()) {
Path file = iterator.next().getPath();
if (!isFileCreatedByQuery(file.getName(), queryId)) {
fileSystem.delete(file, false);
}
}
}
catch (Exception ex) {
throw new PrestoException(
HIVE_FILESYSTEM_ERROR,
format("Failed to delete partition %s files during overwrite", partitionPath),
ex);
}
}

private static boolean isTempPathRequired(ConnectorSession session, Optional<HiveBucketProperty> bucketProperty, List<SortingColumn> preferredOrderingColumns)
{
boolean hasSortedWrite = bucketProperty.map(property -> !property.getSortedBy().isEmpty()).orElse(false) || !preferredOrderingColumns.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND;
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR;
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.OVERWRITE;
import static com.facebook.presto.hive.HiveClientConfig.InsertExistingPartitionsBehavior;
import static com.facebook.presto.hive.OrcFileWriterConfig.DEFAULT_COMPRESSION_LEVEL;
import static com.facebook.presto.hive.metastore.MetastoreUtil.METASTORE_HEADERS;
import static com.facebook.presto.hive.metastore.MetastoreUtil.USER_DEFINED_TYPE_ENCODING_ENABLED;
Expand All @@ -53,7 +51,7 @@ public final class HiveSessionProperties
private static final String MIN_BUCKET_COUNT_TO_NOT_IGNORE_TABLE_BUCKETING = "min_bucket_count_to_not_ignore_table_bucketing";
private static final String BUCKET_EXECUTION_ENABLED = "bucket_execution_enabled";
private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
private static final String INSERT_EXISTING_PARTITIONS_BEHAVIOR = "insert_existing_partitions_behavior";
public static final String INSERT_EXISTING_PARTITIONS_BEHAVIOR = "insert_existing_partitions_behavior";
private static final String ORC_BLOOM_FILTERS_ENABLED = "orc_bloom_filters_enabled";
private static final String ORC_MAX_MERGE_DISTANCE = "orc_max_merge_distance";
private static final String ORC_MAX_BUFFER_SIZE = "orc_max_buffer_size";
Expand Down Expand Up @@ -144,24 +142,6 @@ public final class HiveSessionProperties

private final List<PropertyMetadata<?>> sessionProperties;

public enum InsertExistingPartitionsBehavior
{
ERROR,
APPEND,
OVERWRITE,
/**/;

public static InsertExistingPartitionsBehavior valueOf(String value, boolean immutablePartition)
{
InsertExistingPartitionsBehavior enumValue = valueOf(value.toUpperCase(ENGLISH));
if (immutablePartition) {
checkArgument(enumValue != APPEND, format("Presto is configured to treat Hive partitions as immutable. %s is not allowed to be set to %s", INSERT_EXISTING_PARTITIONS_BEHAVIOR, APPEND));
}

return enumValue;
}
}

@Inject
public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterConfig orcFileWriterConfig, ParquetFileWriterConfig parquetFileWriterConfig, CacheConfig cacheConfig)
{
Expand Down Expand Up @@ -195,7 +175,7 @@ public HiveSessionProperties(HiveClientConfig hiveClientConfig, OrcFileWriterCon
"Behavior on insert existing partitions; this session property doesn't control behavior on insert existing unpartitioned table",
VARCHAR,
InsertExistingPartitionsBehavior.class,
getDefaultInsertExistingPartitionsBehavior(hiveClientConfig),
hiveClientConfig.getInsertExistingPartitionsBehavior(),
false,
value -> InsertExistingPartitionsBehavior.valueOf((String) value, hiveClientConfig.isImmutablePartitions()),
InsertExistingPartitionsBehavior::toString),
Expand Down Expand Up @@ -1061,15 +1041,6 @@ public static PropertyMetadata<DataSize> dataSizeSessionProperty(String name, St
DataSize::toString);
}

private static InsertExistingPartitionsBehavior getDefaultInsertExistingPartitionsBehavior(HiveClientConfig hiveClientConfig)
{
if (!hiveClientConfig.isImmutablePartitions()) {
return APPEND;
}

return hiveClientConfig.isInsertOverwriteImmutablePartitionEnabled() ? OVERWRITE : ERROR;
}

public static boolean isFailFastOnInsertIntoImmutablePartitionsEnabled(ConnectorSession session)
{
return session.getProperty(FAIL_FAST_ON_INSERT_INTO_IMMUTABLE_PARTITIONS_ENABLED, Boolean.class);
Expand Down
Loading

0 comments on commit 3fbbb67

Please sign in to comment.