Skip to content

Commit

Permalink
Add Partition version support for getPartitionNamesByFilter and getPa…
Browse files Browse the repository at this point in the history
…rtitionsByNames
  • Loading branch information
ajaygeorge committed Apr 9, 2024
1 parent 5594e84 commit 3d28d35
Show file tree
Hide file tree
Showing 41 changed files with 352 additions and 214 deletions.
6 changes: 6 additions & 0 deletions presto-hive-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class HivePartition
{
public static final String UNPARTITIONED_ID = "<UNPARTITIONED>";
public static final PartitionNameWithVersion UNPARTITIONED_ID = new PartitionNameWithVersion("<UNPARTITIONED>", Optional.empty());

private final SchemaTableName tableName;
private final String partitionId;
private final PartitionNameWithVersion partitionId;
private final Map<ColumnHandle, NullableValue> keys;

public HivePartition(SchemaTableName tableName)
Expand All @@ -38,7 +39,7 @@ public HivePartition(SchemaTableName tableName)

public HivePartition(
SchemaTableName tableName,
String partitionId,
PartitionNameWithVersion partitionId,
Map<ColumnHandle, NullableValue> keys)
{
this.tableName = requireNonNull(tableName, "tableName is null");
Expand All @@ -51,7 +52,7 @@ public SchemaTableName getTableName()
return tableName;
}

public String getPartitionId()
public PartitionNameWithVersion getPartitionId()
{
return partitionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,78 @@
*/
package com.facebook.presto.hive;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;

import javax.annotation.concurrent.Immutable;

import java.util.Objects;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

@Immutable
public class PartitionNameWithVersion
implements Comparable<PartitionNameWithVersion>
{
private final String partitionName;
private final Optional<Long> partitionVersion;

public PartitionNameWithVersion(String partitionName, Optional<Long> partitionVersion)
@JsonCreator
public PartitionNameWithVersion(@JsonProperty("partitionName") String partitionName, @JsonProperty("partitionVersion") Optional<Long> partitionVersion)
{
this.partitionName = requireNonNull(partitionName, "partitionName is null");
this.partitionVersion = requireNonNull(partitionVersion, "partitionVersion is null");
}

@JsonProperty
public String getPartitionName()
{
return partitionName;
}

@JsonProperty
public Optional<Long> getPartitionVersion()
{
return partitionVersion;
}

@Override
public String toString()
{
final StringBuilder sb = new StringBuilder("PartitionNameWithVersion{");
sb.append("partitionName='").append(partitionName).append('\'');
sb.append(", partitionVersion=").append(partitionVersion);
sb.append('}');
return sb.toString();
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartitionNameWithVersion that = (PartitionNameWithVersion) o;
return Objects.equals(partitionName, that.partitionName) && Objects.equals(partitionVersion, that.partitionVersion);
}

@Override
public int hashCode()
{
return Objects.hash(partitionName, partitionVersion);
}

@Override
public int compareTo(PartitionNameWithVersion other)
{
return ComparisonChain.start()
.compare(this.partitionName, other.partitionName)
.compare(this.partitionVersion.orElse(Long.MIN_VALUE), other.partitionVersion.orElse(Long.MIN_VALUE))
.result();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ default void dropTableFromMetastore(MetastoreContext metastoreContext, String da

Optional<List<String>> getPartitionNames(MetastoreContext metastoreContext, String databaseName, String tableName);

List<String> getPartitionNamesByFilter(
List<PartitionNameWithVersion> getPartitionNamesByFilter(
MetastoreContext metastoreContext,
String databaseName,
String tableName,
Expand All @@ -113,7 +113,7 @@ List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String tableName,
Map<Column, Domain> partitionPredicates);

Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<String> partitionNames);
Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionNameWithVersion> partitionNames);

MetastoreOperationResult addPartitions(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionWithStatistics> partitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.hive.PartitionNameWithVersion;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
Expand All @@ -33,22 +34,32 @@ public class HivePartitionName
{
private final HiveTableName hiveTableName;
private final List<String> partitionValues;
private final Optional<String> partitionName; // does not participate in hashCode/equals
private final Optional<PartitionNameWithVersion> partitionNameWithVersion;

@JsonCreator
public HivePartitionName(
@JsonProperty("hiveTableName") HiveTableName hiveTableName,
@JsonProperty("partitionValues") List<String> partitionValues,
@JsonProperty("partitionName") Optional<String> partitionName)
@JsonProperty("partitionNameWithVersion") Optional<PartitionNameWithVersion> partitionNameWithVersion)
{
this.hiveTableName = requireNonNull(hiveTableName, "hiveTableName is null");
this.partitionValues = ImmutableList.copyOf(requireNonNull(partitionValues, "partitionValues is null"));
this.partitionName = requireNonNull(partitionName, "partitionName is null");
this.partitionNameWithVersion = requireNonNull(partitionNameWithVersion, "partitionNameWithVersion is null");
}

public static HivePartitionName hivePartitionName(HiveTableName hiveTableName, PartitionNameWithVersion partitionName)
{
return new HivePartitionName(hiveTableName, toPartitionValues(partitionName.getPartitionName()), Optional.of(partitionName));
}

public static HivePartitionName hivePartitionName(HiveTableName hiveTableName, String partitionName)
{
return new HivePartitionName(hiveTableName, toPartitionValues(partitionName), Optional.of(partitionName));
return new HivePartitionName(hiveTableName, toPartitionValues(partitionName), Optional.of(new PartitionNameWithVersion(partitionName, Optional.empty())));
}

public static HivePartitionName hivePartitionName(String databaseName, String tableName, PartitionNameWithVersion partitionName)
{
return hivePartitionName(hiveTableName(databaseName, tableName), partitionName);
}

public static HivePartitionName hivePartitionName(String databaseName, String tableName, String partitionName)
Expand All @@ -74,9 +85,9 @@ public List<String> getPartitionValues()
}

@JsonProperty
public Optional<String> getPartitionName()
public Optional<PartitionNameWithVersion> getPartitionNameWithVersion()
{
return partitionName;
return partitionNameWithVersion;
}

@Override
Expand All @@ -85,7 +96,7 @@ public String toString()
return toStringHelper(this)
.add("hiveTableName", hiveTableName)
.add("partitionValues", partitionValues)
.add("partitionName", partitionName)
.add("partitionNameWithVersion", partitionNameWithVersion)
.toString();
}

Expand All @@ -101,12 +112,13 @@ public boolean equals(Object o)

HivePartitionName other = (HivePartitionName) o;
return Objects.equals(hiveTableName, other.hiveTableName) &&
Objects.equals(partitionValues, other.partitionValues);
Objects.equals(partitionValues, other.partitionValues) &&
Objects.equals(partitionNameWithVersion, other.partitionNameWithVersion);
}

@Override
public int hashCode()
{
return Objects.hash(hiveTableName, partitionValues);
return Objects.hash(hiveTableName, partitionValues, partitionNameWithVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class InMemoryCachingHiveMetastore
private final LoadingCache<KeyAndContext<HivePartitionName>, PartitionStatistics> partitionStatisticsCache;
private final LoadingCache<KeyAndContext<String>, Optional<List<String>>> viewNamesCache;
private final LoadingCache<KeyAndContext<HivePartitionName>, Optional<Partition>> partitionCache;
private final LoadingCache<KeyAndContext<PartitionFilter>, List<String>> partitionFilterCache;
private final LoadingCache<KeyAndContext<PartitionFilter>, List<PartitionNameWithVersion>> partitionFilterCache;
private final LoadingCache<KeyAndContext<HiveTableName>, Optional<List<String>>> partitionNamesCache;
private final LoadingCache<KeyAndContext<UserTableKey>, Set<HivePrivilegeInfo>> tablePrivilegesCache;
private final LoadingCache<KeyAndContext<String>, Set<String>> rolesCache;
Expand Down Expand Up @@ -411,19 +411,19 @@ public Map<String, PartitionStatistics> getPartitionStatistics(MetastoreContext
Map<KeyAndContext<HivePartitionName>, PartitionStatistics> statistics = getAll(partitionStatisticsCache, partitions);
return statistics.entrySet()
.stream()
.collect(toImmutableMap(entry -> entry.getKey().getKey().getPartitionName().get(), Entry::getValue));
.collect(toImmutableMap(entry -> entry.getKey().getKey().getPartitionNameWithVersion().get().getPartitionName(), Entry::getValue));
}

private PartitionStatistics loadPartitionColumnStatistics(KeyAndContext<HivePartitionName> partition)
{
String partitionName = partition.getKey().getPartitionName().get();
String partitionName = partition.getKey().getPartitionNameWithVersion().get().getPartitionName();
Map<String, PartitionStatistics> partitionStatistics = delegate.getPartitionStatistics(
partition.getContext(),
partition.getKey().getHiveTableName().getDatabaseName(),
partition.getKey().getHiveTableName().getTableName(),
ImmutableSet.of(partitionName));
if (!partitionStatistics.containsKey(partitionName)) {
throw new PrestoException(HIVE_PARTITION_DROPPED_DURING_QUERY, "Statistics result does not contain entry for partition: " + partition.getKey().getPartitionName());
throw new PrestoException(HIVE_PARTITION_DROPPED_DURING_QUERY, "Statistics result does not contain entry for partition: " + partition.getKey().getPartitionNameWithVersion());
}
return partitionStatistics.get(partitionName);
}
Expand All @@ -435,7 +435,7 @@ private Map<KeyAndContext<HivePartitionName>, PartitionStatistics> loadPartition
ImmutableMap.Builder<KeyAndContext<HivePartitionName>, PartitionStatistics> result = ImmutableMap.builder();
tablePartitions.keySet().forEach(table -> {
Set<String> partitionNames = tablePartitions.get(table).stream()
.map(partitionName -> partitionName.getKey().getPartitionName().get())
.map(partitionName -> partitionName.getKey().getPartitionNameWithVersion().get().getPartitionName())
.collect(toImmutableSet());
Map<String, PartitionStatistics> partitionStatistics = delegate.getPartitionStatistics(table.getContext(), table.getKey().getDatabaseName(), table.getKey().getTableName(), partitionNames);
for (String partitionName : partitionNames) {
Expand Down Expand Up @@ -584,17 +584,16 @@ private Optional<List<String>> loadPartitionNames(KeyAndContext<HiveTableName> h
}

@Override
public List<String> getPartitionNamesByFilter(
public List<PartitionNameWithVersion> getPartitionNamesByFilter(
MetastoreContext metastoreContext,
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
if (partitionVersioningEnabled) {
List<PartitionNameWithVersion> partitionNamesWithVersion = getPartitionNamesWithVersionByFilter(metastoreContext, databaseName, tableName, partitionPredicates);
List<String> result = partitionNamesWithVersion.stream().map(PartitionNameWithVersion::getPartitionName).collect(toImmutableList());
invalidateStalePartitions(partitionNamesWithVersion, databaseName, tableName, metastoreContext);
return result;
return partitionNamesWithVersion;
}
return get(partitionFilterCache, getCachingKey(metastoreContext, partitionFilter(databaseName, tableName, partitionPredicates)));
}
Expand Down Expand Up @@ -669,7 +668,7 @@ private void validatePartitionCache(Map<KeyAndContext<HivePartitionName>, Option
}
}

private List<String> loadPartitionNamesByFilter(KeyAndContext<PartitionFilter> partitionFilterKey)
private List<PartitionNameWithVersion> loadPartitionNamesByFilter(KeyAndContext<PartitionFilter> partitionFilterKey)
{
return delegate.getPartitionNamesByFilter(
partitionFilterKey.getContext(),
Expand All @@ -679,7 +678,7 @@ private List<String> loadPartitionNamesByFilter(KeyAndContext<PartitionFilter> p
}

@Override
public Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<String> partitionNames)
public Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionNameWithVersion> partitionNames)
{
Iterable<KeyAndContext<HivePartitionName>> names = transform(partitionNames, name -> getCachingKey(metastoreContext, HivePartitionName.hivePartitionName(databaseName, tableName, name)));

Expand All @@ -691,7 +690,7 @@ public Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext me
for (Entry<KeyAndContext<HivePartitionName>, Optional<Partition>> entry : all.entrySet()) {
Optional<Partition> value = entry.getValue();
invalidatePartitionsWithHighColumnCount(value, entry.getKey());
partitionsByName.put(entry.getKey().getKey().getPartitionName().get(), value);
partitionsByName.put(entry.getKey().getKey().getPartitionNameWithVersion().get().getPartitionName(), value);
}
return partitionsByName.build();
}
Expand All @@ -716,17 +715,19 @@ private Map<KeyAndContext<HivePartitionName>, Optional<Partition>> loadPartition
String databaseName = hiveTableName.getDatabaseName();
String tableName = hiveTableName.getTableName();

List<String> partitionsToFetch = new ArrayList<>();
List<PartitionNameWithVersion> partitionsToFetch = new ArrayList<>();
for (KeyAndContext<HivePartitionName> partitionNameKey : partitionNamesKey) {
checkArgument(partitionNameKey.getKey().getHiveTableName().equals(hiveTableName), "Expected table name %s but got %s", hiveTableName, partitionNameKey.getKey().getHiveTableName());
checkArgument(partitionNameKey.getContext().equals(firstPartitionKey.getContext()), "Expected context %s but got %s", firstPartitionKey.getContext(), partitionNameKey.getContext());
partitionsToFetch.add(partitionNameKey.getKey().getPartitionName().get());
partitionsToFetch.add(partitionNameKey.getKey().getPartitionNameWithVersion().get());
}

ImmutableMap.Builder<KeyAndContext<HivePartitionName>, Optional<Partition>> partitions = ImmutableMap.builder();
ImmutableMap<String, PartitionNameWithVersion> partitionNameToVersionMap = partitionsToFetch.stream()
.collect(toImmutableMap(PartitionNameWithVersion::getPartitionName, Function.identity()));
Map<String, Optional<Partition>> partitionsByNames = delegate.getPartitionsByNames(firstPartitionKey.getContext(), databaseName, tableName, partitionsToFetch);
for (Entry<String, Optional<Partition>> entry : partitionsByNames.entrySet()) {
partitions.put(getCachingKey(firstPartitionKey.getContext(), HivePartitionName.hivePartitionName(hiveTableName, entry.getKey())), entry.getValue());
partitions.put(getCachingKey(firstPartitionKey.getContext(), HivePartitionName.hivePartitionName(hiveTableName, partitionNameToVersionMap.get(entry.getKey()))), entry.getValue());
}
return partitions.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveBasicStatistics;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.PartitionOfflineException;
import com.facebook.presto.hive.TableOfflineException;
import com.facebook.presto.hive.TypeTranslator;
Expand Down Expand Up @@ -86,6 +87,7 @@
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -131,6 +133,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Strings.padEnd;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.io.BaseEncoding.base16;
import static java.lang.Float.intBitsToFloat;
Expand Down Expand Up @@ -1106,4 +1109,18 @@ public static void checkIfNullView(ConnectorViewDefinition view, SchemaTableName
throw new ViewNotFoundException(viewName);
}
}

public static List<PartitionNameWithVersion> getPartitionsWithEmptyVersion(Collection<String> partitions)
{
return partitions.stream()
.map(partition -> new PartitionNameWithVersion(partition, Optional.empty()))
.collect(toImmutableList());
}

public static List<String> getPartitionNames(Collection<PartitionNameWithVersion> partitionNameWithVersions)
{
return partitionNameWithVersions.stream()
.map(PartitionNameWithVersion::getPartitionName)
.collect(toImmutableList());
}
}
Loading

0 comments on commit 3d28d35

Please sign in to comment.