Skip to content

Commit

Permalink
Add new getPartitionNamesWithVersionByFilter metastore api
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikhil Collooru authored and shixuan-fan committed Sep 18, 2020
1 parent cdbd177 commit 584fd8c
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Function;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.metastore.CachingHiveMetastore.MetastoreCacheScope.ALL;
import static com.facebook.presto.hive.metastore.HivePartitionName.hivePartitionName;
import static com.facebook.presto.hive.metastore.HiveTableName.hiveTableName;
import static com.facebook.presto.hive.metastore.PartitionFilter.partitionFilter;
Expand Down Expand Up @@ -92,25 +93,41 @@ public enum MetastoreCacheScope
private final LoadingCache<String, Set<String>> rolesCache;
private final LoadingCache<PrestoPrincipal, Set<RoleGrant>> roleGrantsCache;

private final boolean partitionVersioningEnabled;

@Inject
public CachingHiveMetastore(@ForCachingHiveMetastore ExtendedHiveMetastore delegate, @ForCachingHiveMetastore ExecutorService executor, MetastoreClientConfig metastoreClientConfig)
public CachingHiveMetastore(
@ForCachingHiveMetastore ExtendedHiveMetastore delegate,
@ForCachingHiveMetastore ExecutorService executor,
MetastoreClientConfig metastoreClientConfig)
{
this(
delegate,
executor,
metastoreClientConfig.getMetastoreCacheTtl(),
metastoreClientConfig.getMetastoreRefreshInterval(),
metastoreClientConfig.getMetastoreCacheMaximumSize());
metastoreClientConfig.getMetastoreCacheMaximumSize(),
metastoreClientConfig.isPartitionVersioningEnabled(),
metastoreClientConfig.getMetastoreCacheScope());
}

public CachingHiveMetastore(ExtendedHiveMetastore delegate, ExecutorService executor, Duration cacheTtl, Duration refreshInterval, long maximumSize)
public CachingHiveMetastore(
ExtendedHiveMetastore delegate,
ExecutorService executor,
Duration cacheTtl,
Duration refreshInterval,
long maximumSize,
boolean partitionVersioningEnabled,
MetastoreCacheScope metastoreCacheScope)
{
this(
delegate,
executor,
OptionalLong.of(cacheTtl.toMillis()),
refreshInterval.toMillis() >= cacheTtl.toMillis() ? OptionalLong.empty() : OptionalLong.of(refreshInterval.toMillis()),
maximumSize);
maximumSize,
partitionVersioningEnabled,
metastoreCacheScope);
}

public static CachingHiveMetastore memoizeMetastore(ExtendedHiveMetastore delegate, long maximumSize)
Expand All @@ -120,24 +137,65 @@ public static CachingHiveMetastore memoizeMetastore(ExtendedHiveMetastore delega
newDirectExecutorService(),
OptionalLong.empty(),
OptionalLong.empty(),
maximumSize);
maximumSize,
false,
ALL);
}

private CachingHiveMetastore(ExtendedHiveMetastore delegate, ExecutorService executor, OptionalLong expiresAfterWriteMillis, OptionalLong refreshMills, long maximumSize)
private CachingHiveMetastore(
ExtendedHiveMetastore delegate,
ExecutorService executor,
OptionalLong expiresAfterWriteMillis,
OptionalLong refreshMills,
long maximumSize,
boolean partitionVersioningEnabled,
MetastoreCacheScope metastoreCacheScope)
{
this.delegate = requireNonNull(delegate, "delegate is null");
requireNonNull(executor, "executor is null");

databaseNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
this.partitionVersioningEnabled = partitionVersioningEnabled;

OptionalLong cacheExpiresAfterWriteMillis;
OptionalLong cacheRefreshMills;
long cacheMaxSize;

OptionalLong partitionCacheExpiresAfterWriteMillis;
OptionalLong partitionCacheRefreshMills;
long partitionCacheMaxSize;

switch (metastoreCacheScope) {
case PARTITION:
partitionCacheExpiresAfterWriteMillis = expiresAfterWriteMillis;
partitionCacheRefreshMills = refreshMills;
partitionCacheMaxSize = maximumSize;
cacheExpiresAfterWriteMillis = OptionalLong.of(0);
cacheRefreshMills = OptionalLong.of(0);
cacheMaxSize = 0;
break;

case ALL:
partitionCacheExpiresAfterWriteMillis = expiresAfterWriteMillis;
partitionCacheRefreshMills = refreshMills;
partitionCacheMaxSize = maximumSize;
cacheExpiresAfterWriteMillis = expiresAfterWriteMillis;
cacheRefreshMills = refreshMills;
cacheMaxSize = maximumSize;
break;

default:
throw new IllegalArgumentException("Unknown metastore-cache-scope: " + metastoreCacheScope);
}

databaseNamesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadAllDatabases), executor));

databaseCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
databaseCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadDatabase), executor));

tableNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
tableNamesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadAllTables), executor));

tableStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
tableStatisticsCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(new CacheLoader<HiveTableName, PartitionStatistics>()
{
@Override
Expand All @@ -147,7 +205,7 @@ public PartitionStatistics load(HiveTableName key)
}
}, executor));

partitionStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
partitionStatisticsCache = newCacheBuilder(partitionCacheExpiresAfterWriteMillis, partitionCacheRefreshMills, partitionCacheMaxSize)
.build(asyncReloading(new CacheLoader<HivePartitionName, PartitionStatistics>()
{
@Override
Expand All @@ -163,19 +221,19 @@ public Map<HivePartitionName, PartitionStatistics> loadAll(Iterable<? extends Hi
}
}, executor));

tableCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
tableCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadTable), executor));

viewNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
viewNamesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadAllViews), executor));

partitionNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
partitionNamesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadPartitionNames), executor));

partitionFilterCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
partitionFilterCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadPartitionNamesByFilter), executor));

partitionCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
partitionCache = newCacheBuilder(partitionCacheExpiresAfterWriteMillis, partitionCacheRefreshMills, partitionCacheMaxSize)
.build(asyncReloading(new CacheLoader<HivePartitionName, Optional<Partition>>()
{
@Override
Expand All @@ -191,13 +249,13 @@ public Map<HivePartitionName, Optional<Partition>> loadAll(Iterable<? extends Hi
}
}, executor));

tablePrivilegesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
tablePrivilegesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(key -> loadTablePrivileges(key.getDatabase(), key.getTable(), key.getPrincipal())), executor));

rolesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
rolesCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(() -> loadRoles()), executor));

roleGrantsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize)
roleGrantsCache = newCacheBuilder(cacheExpiresAfterWriteMillis, cacheRefreshMills, cacheMaxSize)
.build(asyncReloading(CacheLoader.from(this::loadRoleGrants), executor));
}

Expand Down Expand Up @@ -535,11 +593,39 @@ public List<String> getPartitionNamesByFilter(
String tableName,
Map<Column, Domain> partitionPredicates)
{
if (partitionVersioningEnabled) {
List<PartitionNameWithVersion> partitionNamesWithVersion = getPartitionNamesWithVersionByFilter(databaseName, tableName, partitionPredicates);
List<String> result = partitionNamesWithVersion.stream().map(PartitionNameWithVersion::getPartitionName).collect(toImmutableList());
partitionNamesWithVersion.forEach(partitionNameWithVersion -> invalidateStalePartition(partitionNameWithVersion, databaseName, tableName));
return result;
}
return get(
partitionFilterCache,
partitionFilter(databaseName, tableName, partitionPredicates));
}

@Override
public List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
return delegate.getPartitionNamesWithVersionByFilter(databaseName, tableName, partitionPredicates);
}

private void invalidateStalePartition(PartitionNameWithVersion partitionNameWithVersion, String databaseName, String tableName)
{
HivePartitionName hivePartitionName = hivePartitionName(databaseName, tableName, partitionNameWithVersion.getPartitionName());
Optional<Partition> partition = partitionCache.getIfPresent(hivePartitionName);
if (partition != null && partition.isPresent()) {
Optional<Integer> partitionVersion = partition.get().getPartitionVersion();
if (!partitionVersion.isPresent() || partitionVersion.get() < partitionNameWithVersion.getPartitionVersion()) {
partitionCache.invalidate(hivePartitionName);
partitionStatisticsCache.invalidate(hivePartitionName);
}
}
}

private List<String> loadPartitionNamesByFilter(PartitionFilter partitionFilter)
{
return delegate.getPartitionNamesByFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ List<String> getPartitionNamesByFilter(
String tableName,
Map<Column, Domain> partitionPredicates);

List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates);

Map<String, Optional<Partition>> getPartitionsByNames(String databaseName, String tableName, List<String> partitionNames);

void addPartitions(String databaseName, String tableName, List<PartitionWithStatistics> partitions);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.metastore;

import static java.util.Objects.requireNonNull;

public class PartitionNameWithVersion
{
private final String partitionName;
private final int partitionVersion;

public PartitionNameWithVersion(String partitionName, int partitionVersion)
{
this.partitionName = requireNonNull(partitionName, "partitionName is null");
this.partitionVersion = partitionVersion;
}

public String getPartitionName()
{
return partitionName;
}

public int getPartitionVersion()
{
return partitionVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ public List<String> getPartitionNamesByFilter(
() -> delegate.getPartitionNamesByFilter(databaseName, tableName, partitionPredicates));
}

@Override
public List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
throw new UnsupportedOperationException();
}

@Override
public Map<String, Optional<Partition>> getPartitionsByNames(String databaseName, String tableName, List<String> partitionNames)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
Expand Down Expand Up @@ -298,6 +299,15 @@ public List<String> getPartitionNamesByFilter(String databaseName, String tableN
return getPartitionNamesByParts(databaseName, tableName, parts).orElse(ImmutableList.of());
}

@Override
public List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
throw new UnsupportedOperationException();
}

/**
* return a list of partition names by which the values of each partition is at least
* contained which the {@code parts} argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
Expand Down Expand Up @@ -909,6 +910,15 @@ public synchronized List<String> getPartitionNamesByFilter(
.orElse(ImmutableList.of());
}

@Override
public List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
throw new UnsupportedOperationException();
}

private static boolean partitionMatches(String partitionName, List<String> parts)
{
List<String> values = toPartitionValues(partitionName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
Expand Down Expand Up @@ -674,6 +675,15 @@ public List<String> getPartitionNamesByFilter(
return buildPartitionNames(table.getPartitionColumns(), partitions);
}

@Override
public List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
throw new UnsupportedOperationException();
}

private List<Partition> getPartitions(String databaseName, String tableName, String expression)
{
if (partitionSegments == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
Expand Down Expand Up @@ -266,6 +267,15 @@ public List<String> getPartitionNamesByFilter(
return delegate.getPartitionNamesByFilter(databaseName, tableName, partitionPredicates);
}

@Override
public List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
String databaseName,
String tableName,
Map<Column, Domain> partitionPredicates)
{
return delegate.getPartitionNamesWithVersionByFilter(databaseName, tableName, partitionPredicates);
}

@Override
public Map<String, Optional<Partition>> getPartitionsByNames(String databaseName, String tableName, List<String> partitionNames)
{
Expand Down
Loading

0 comments on commit 584fd8c

Please sign in to comment.