Skip to content

Commit

Permalink
Add optional version parameter to metastore Partition
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikhil Collooru authored and shixuan-fan committed Sep 18, 2020
1 parent 9ae10f0 commit 9dcc558
Show file tree
Hide file tree
Showing 17 changed files with 125 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import org.apache.hadoop.hive.metastore.api.Partition;

import java.util.Optional;

public interface PartitionVersionFetcher
{
Optional<Integer> getPartitionVersion(Partition partition);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.hive.PartitionVersionFetcher;
import org.apache.hadoop.hive.metastore.api.Partition;

import javax.inject.Inject;

import java.util.Optional;

public class HivePartitionVersionFetcher
implements PartitionVersionFetcher
{
@Inject
public HivePartitionVersionFetcher() {}

@Override
public Optional<Integer> getPartitionVersion(Partition partition)
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -37,6 +38,7 @@ public class Partition
private final Storage storage;
private final List<Column> columns;
private final Map<String, String> parameters;
private final Optional<Integer> partitionVersion;

@JsonCreator
public Partition(
Expand All @@ -45,14 +47,16 @@ public Partition(
@JsonProperty("values") List<String> values,
@JsonProperty("storage") Storage storage,
@JsonProperty("columns") List<Column> columns,
@JsonProperty("parameters") Map<String, String> parameters)
@JsonProperty("parameters") Map<String, String> parameters,
@JsonProperty("partitionVersion") Optional<Integer> partitionVersion)
{
this.databaseName = requireNonNull(databaseName, "databaseName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.values = ImmutableList.copyOf(requireNonNull(values, "values is null"));
this.storage = requireNonNull(storage, "storage is null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.parameters = ImmutableMap.copyOf(requireNonNull(parameters, "parameters is null"));
this.partitionVersion = requireNonNull(partitionVersion, "partitionVersion is null");
}

@JsonProperty
Expand Down Expand Up @@ -91,6 +95,12 @@ public Map<String, String> getParameters()
return parameters;
}

@JsonProperty
public Optional<Integer> getPartitionVersion()
{
return partitionVersion;
}

@Override
public String toString()
{
Expand All @@ -117,13 +127,14 @@ public boolean equals(Object o)
Objects.equals(values, partition.values) &&
Objects.equals(storage, partition.storage) &&
Objects.equals(columns, partition.columns) &&
Objects.equals(parameters, partition.parameters);
Objects.equals(parameters, partition.parameters) &&
Objects.equals(partitionVersion, partition.partitionVersion);
}

@Override
public int hashCode()
{
return Objects.hash(databaseName, tableName, values, storage, columns, parameters);
return Objects.hash(databaseName, tableName, values, storage, columns, parameters, partitionVersion);
}

public static Builder builder()
Expand All @@ -144,6 +155,7 @@ public static class Builder
private List<String> values;
private List<Column> columns;
private Map<String, String> parameters = ImmutableMap.of();
private Optional<Integer> partitionVersion = Optional.empty();

private Builder()
{
Expand All @@ -158,6 +170,7 @@ private Builder(Partition partition)
this.values = partition.getValues();
this.columns = partition.getColumns();
this.parameters = partition.getParameters();
this.partitionVersion = partition.getPartitionVersion();
}

public Builder setDatabaseName(String databaseName)
Expand Down Expand Up @@ -201,9 +214,15 @@ public Builder setParameters(Map<String, String> parameters)
return this;
}

public Builder setPartitionVersion(int partitionVersion)
{
this.partitionVersion = Optional.of(partitionVersion);
return this;
}

public Partition build()
{
return new Partition(databaseName, tableName, values, storageBuilder.build(), columns, parameters);
return new Partition(databaseName, tableName, values, storageBuilder.build(), columns, parameters, partitionVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public Partition toPartition(String databaseName, String tableName, List<String>
.setParameters(parameters)
.build(),
columns,
parameters);
parameters,
Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.PartitionVersionFetcher;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
Expand Down Expand Up @@ -47,6 +48,7 @@

import static com.facebook.presto.hive.metastore.MetastoreUtil.verifyCanDropColumn;
import static com.facebook.presto.hive.metastore.PrestoTableType.TEMPORARY_TABLE;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiPartition;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.fromMetastoreApiTable;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.isAvroTableWithSchemaSet;
import static com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil.toMetastoreApiDatabase;
Expand All @@ -60,11 +62,13 @@ public class BridgingHiveMetastore
implements ExtendedHiveMetastore
{
private final HiveMetastore delegate;
private final PartitionVersionFetcher partitionVersionFetcher;

@Inject
public BridgingHiveMetastore(HiveMetastore delegate)
public BridgingHiveMetastore(HiveMetastore delegate, PartitionVersionFetcher partitionVersionFetcher)
{
this.delegate = delegate;
this.partitionVersionFetcher = partitionVersionFetcher;
}

@Override
Expand Down Expand Up @@ -244,7 +248,7 @@ private void alterTable(String databaseName, String tableName, org.apache.hadoop
@Override
public Optional<Partition> getPartition(String databaseName, String tableName, List<String> partitionValues)
{
return delegate.getPartition(databaseName, tableName, partitionValues).map(ThriftMetastoreUtil::fromMetastoreApiPartition);
return delegate.getPartition(databaseName, tableName, partitionValues).map(partition -> fromMetastoreApiPartition(partition, partitionVersionFetcher));
}

@Override
Expand Down Expand Up @@ -272,7 +276,7 @@ public Map<String, Optional<Partition>> getPartitionsByNames(String databaseName
Map<String, List<String>> partitionNameToPartitionValuesMap = partitionNames.stream()
.collect(Collectors.toMap(identity(), MetastoreUtil::toPartitionValues));
Map<List<String>, Partition> partitionValuesToPartitionMap = delegate.getPartitionsByNames(databaseName, tableName, partitionNames).stream()
.map(ThriftMetastoreUtil::fromMetastoreApiPartition)
.map(partition -> fromMetastoreApiPartition(partition, partitionVersionFetcher))
.collect(Collectors.toMap(Partition::getValues, identity()));
ImmutableMap.Builder<String, Optional<Partition>> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValuesMap.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HiveBucketProperty;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.PartitionVersionFetcher;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
Expand Down Expand Up @@ -436,7 +437,7 @@ public static boolean isAvroTableWithSchemaSet(org.apache.hadoop.hive.metastore.
serdeInfo.getSerializationLib().equals(AVRO.getSerDe());
}

public static Partition fromMetastoreApiPartition(org.apache.hadoop.hive.metastore.api.Partition partition)
public static Partition fromMetastoreApiPartition(org.apache.hadoop.hive.metastore.api.Partition partition, PartitionVersionFetcher partitionVersionFetcher)
{
StorageDescriptor storageDescriptor = partition.getSd();
if (storageDescriptor == null) {
Expand All @@ -452,6 +453,9 @@ public static Partition fromMetastoreApiPartition(org.apache.hadoop.hive.metasto
.collect(toList()))
.setParameters(partition.getParameters());

// set the partition version if available
partitionVersionFetcher.getPartitionVersion(partition).ifPresent(partitionBuilder::setPartitionVersion);

fromMetastoreApiStorageDescriptor(storageDescriptor, partitionBuilder.getStorageBuilder(), format("%s.%s", partition.getTableName(), partition.getValues()));

return partitionBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.hive.PartitionVersionFetcher;
import com.facebook.presto.hive.metastore.thrift.BridgingHiveMetastore;
import com.facebook.presto.hive.metastore.thrift.HiveCluster;
import com.facebook.presto.hive.metastore.thrift.HiveMetastoreClient;
Expand Down Expand Up @@ -59,8 +60,9 @@ public void setUp()
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("test-%s")));
ThriftHiveMetastore thriftHiveMetastore = new ThriftHiveMetastore(mockHiveCluster);
PartitionVersionFetcher hivePartitionVersionFetcher = new HivePartitionVersionFetcher();
metastore = new CachingHiveMetastore(
new BridgingHiveMetastore(thriftHiveMetastore),
new BridgingHiveMetastore(thriftHiveMetastore, hivePartitionVersionFetcher),
executor,
new Duration(5, TimeUnit.MINUTES),
new Duration(1, TimeUnit.MINUTES),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.hive.PartitionVersionFetcher;
import com.facebook.presto.hive.metastore.thrift.ThriftMetastoreUtil;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -118,6 +119,7 @@ public class TestHiveMetastoreUtil
1234567893,
TEST_STORAGE_DESCRIPTOR_WITH_UNSUPPORTED_FIELDS,
ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3"));
private static final PartitionVersionFetcher TEST_PARTITION_VERSION_FETCHER = new HivePartitionVersionFetcher();

static {
TEST_STORAGE_DESCRIPTOR_WITH_UNSUPPORTED_FIELDS.setSkewedInfo(new SkewedInfo(
Expand All @@ -138,7 +140,7 @@ public void testTableRoundTrip()
@Test
public void testPartitionRoundTrip()
{
Partition partition = ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION);
Partition partition = ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION, TEST_PARTITION_VERSION_FETCHER);
org.apache.hadoop.hive.metastore.api.Partition metastoreApiPartition = ThriftMetastoreUtil.toMetastoreApiPartition(partition);
assertEquals(metastoreApiPartition, TEST_PARTITION);
}
Expand All @@ -155,7 +157,7 @@ public void testHiveSchemaTable()
public void testHiveSchemaPartition()
{
Properties expected = MetaStoreUtils.getPartitionMetadata(TEST_PARTITION_WITH_UNSUPPORTED_FIELDS, TEST_TABLE_WITH_UNSUPPORTED_FIELDS);
Properties actual = getHiveSchema(ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION_WITH_UNSUPPORTED_FIELDS), ThriftMetastoreUtil.fromMetastoreApiTable(TEST_TABLE_WITH_UNSUPPORTED_FIELDS, TEST_SCHEMA));
Properties actual = getHiveSchema(ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION_WITH_UNSUPPORTED_FIELDS, TEST_PARTITION_VERSION_FETCHER), ThriftMetastoreUtil.fromMetastoreApiTable(TEST_TABLE_WITH_UNSUPPORTED_FIELDS, TEST_SCHEMA));
assertEquals(actual, expected);
}

Expand All @@ -169,7 +171,7 @@ public void testTableRoundTripUnsupported()
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Writing to skewed table/partition is not supported")
public void testPartitionRoundTripUnsupported()
{
Partition partition = ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION_WITH_UNSUPPORTED_FIELDS);
Partition partition = ThriftMetastoreUtil.fromMetastoreApiPartition(TEST_PARTITION_WITH_UNSUPPORTED_FIELDS, TEST_PARTITION_VERSION_FETCHER);
ThriftMetastoreUtil.toMetastoreApiPartition(partition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public class TestRecordingHiveMetastore
ImmutableList.of("value"),
TABLE_STORAGE,
ImmutableList.of(TABLE_COLUMN),
ImmutableMap.of("param", "value4"));
ImmutableMap.of("param", "value4"),
Optional.empty());
private static final PartitionStatistics PARTITION_STATISTICS = new PartitionStatistics(
new HiveBasicStatistics(10, 11, 10000, 10001),
ImmutableMap.of("column", new HiveColumnStatistics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.hive.cache.HiveCachingHdfsConfiguration;
import com.facebook.presto.hive.datasink.DataSinkFactory;
import com.facebook.presto.hive.datasink.OutputStreamDataSinkFactory;
import com.facebook.presto.hive.metastore.HivePartitionVersionFetcher;
import com.facebook.presto.hive.orc.DwrfBatchPageSourceFactory;
import com.facebook.presto.hive.orc.DwrfSelectivePageSourceFactory;
import com.facebook.presto.hive.orc.OrcBatchPageSourceFactory;
Expand Down Expand Up @@ -190,6 +191,8 @@ public void configure(Binder binder)

binder.bind(HiveEncryptionInformationProvider.class).in(Scopes.SINGLETON);
newSetBinder(binder, EncryptionInformationSource.class);

binder.bind(PartitionVersionFetcher.class).to(HivePartitionVersionFetcher.class).in(Scopes.SINGLETON);
}

@ForHiveClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePartitionVersionFetcher;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege;
import com.facebook.presto.hive.metastore.Partition;
Expand Down Expand Up @@ -907,7 +908,7 @@ protected final void setup(String host, int port, String databaseName, String ti

HiveCluster hiveCluster = new TestingHiveCluster(metastoreClientConfig, host, port);
ExtendedHiveMetastore metastore = new CachingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster)),
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster), new HivePartitionVersionFetcher()),
executor,
Duration.valueOf("1m"),
Duration.valueOf("15s"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.facebook.presto.hive.metastore.CachingHiveMetastore;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HivePartitionVersionFetcher;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.thrift.BridgingHiveMetastore;
Expand Down Expand Up @@ -181,7 +182,7 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive

hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
metastoreClient = new TestingHiveMetastore(
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster)),
new BridgingHiveMetastore(new ThriftHiveMetastore(hiveCluster), new HivePartitionVersionFetcher()),
executor,
metastoreClientConfig,
getBasePath(),
Expand Down
Loading

0 comments on commit 9dcc558

Please sign in to comment.