Skip to content

Commit

Permalink
Add PartitionSkippabilityChecker to check if partition can be skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
NikhilCollooru committed Sep 19, 2023
1 parent 92f51c0 commit 566ec0a
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.hive.HivePageSourceProvider;
import com.facebook.presto.hive.HivePartitionManager;
import com.facebook.presto.hive.HivePartitionObjectBuilder;
import com.facebook.presto.hive.HivePartitionSkippabilityChecker;
import com.facebook.presto.hive.HivePartitionStats;
import com.facebook.presto.hive.HiveSplitManager;
import com.facebook.presto.hive.HiveStagingFileCommitter;
Expand Down Expand Up @@ -187,7 +188,8 @@ public S3SelectTestHelper(String host,
config.getSplitLoaderConcurrency(),
config.getRecursiveDirWalkerEnabled(),
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
new HiveEncryptionInformationProvider(ImmutableSet.of()));
new HiveEncryptionInformationProvider(ImmutableSet.of()),
new HivePartitionSkippabilityChecker());
pageSourceProvider = new HivePageSourceProvider(
config,
hdfsEnvironment,
Expand Down Expand Up @@ -276,6 +278,7 @@ MaterializedResult getFilteredTableResult(SchemaTableName table, HiveColumnHandl

return null;
}

public static MaterializedResult expectedResult(ConnectorSession session, int start, int end)
{
MaterializedResult.Builder builder = MaterializedResult.resultBuilder(session, BIGINT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public void configure(Binder binder)
binder.bind(PartitionObjectBuilder.class).to(HivePartitionObjectBuilder.class).in(Scopes.SINGLETON);
binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON);
binder.bind(PartitionSkippabilityChecker.class).to(HivePartitionSkippabilityChecker.class).in(Scopes.SINGLETON);
binder.bind(CacheQuotaRequirementProvider.class).to(ConfigBasedCacheQuotaRequirementProvider.class).in(Scopes.SINGLETON);
newExporter(binder).export(ConnectorSplitManager.class).as(generatedNameOf(HiveSplitManager.class, connectorId));
binder.bind(ConnectorPageSourceProvider.class).to(HivePageSourceProvider.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.spi.ConnectorSession;

import static com.facebook.presto.hive.HiveSessionProperties.shouldIgnoreUnreadablePartition;

public class HivePartitionSkippabilityChecker
implements PartitionSkippabilityChecker
{
@Override
public boolean isPartitionSkippable(Partition partition, ConnectorSession session)
{
return shouldIgnoreUnreadablePartition(session) && partition.isEligibleToIgnore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import static com.facebook.presto.hive.HiveSessionProperties.isOfflineDataDebugModeEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isPartitionStatisticsBasedOptimizationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isUseParquetColumnNames;
import static com.facebook.presto.hive.HiveSessionProperties.shouldIgnoreUnreadablePartition;
import static com.facebook.presto.hive.HiveStorageFormat.PARQUET;
import static com.facebook.presto.hive.HiveStorageFormat.getHiveStorageFormat;
import static com.facebook.presto.hive.HiveType.getPrimitiveType;
Expand Down Expand Up @@ -143,6 +142,7 @@ public class HiveSplitManager
private final CounterStat highMemorySplitSourceCounter;
private final CacheQuotaRequirementProvider cacheQuotaRequirementProvider;
private final HiveEncryptionInformationProvider encryptionInformationProvider;
private final PartitionSkippabilityChecker partitionSkippabilityChecker;

@Inject
public HiveSplitManager(
Expand All @@ -154,7 +154,8 @@ public HiveSplitManager(
DirectoryLister directoryLister,
@ForHiveClient ExecutorService executorService,
CoercionPolicy coercionPolicy,
HiveEncryptionInformationProvider encryptionInformationProvider)
HiveEncryptionInformationProvider encryptionInformationProvider,
PartitionSkippabilityChecker partitionSkippabilityChecker)
{
this(
hiveTransactionManager,
Expand All @@ -171,7 +172,8 @@ public HiveSplitManager(
hiveClientConfig.getSplitLoaderConcurrency(),
hiveClientConfig.getRecursiveDirWalkerEnabled(),
cacheQuotaRequirementProvider,
encryptionInformationProvider);
encryptionInformationProvider,
partitionSkippabilityChecker);
}

public HiveSplitManager(
Expand All @@ -189,7 +191,8 @@ public HiveSplitManager(
int splitLoaderConcurrency,
boolean recursiveDfsWalkerEnabled,
CacheQuotaRequirementProvider cacheQuotaRequirementProvider,
HiveEncryptionInformationProvider encryptionInformationProvider)
HiveEncryptionInformationProvider encryptionInformationProvider,
PartitionSkippabilityChecker partitionSkippabilityChecker)
{
this.hiveTransactionManager = requireNonNull(hiveTransactionManager, "hiveTransactionManager is null");
this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
Expand All @@ -207,6 +210,7 @@ public HiveSplitManager(
this.recursiveDfsWalkerEnabled = recursiveDfsWalkerEnabled;
this.cacheQuotaRequirementProvider = requireNonNull(cacheQuotaRequirementProvider, "cacheQuotaRequirementProvider is null");
this.encryptionInformationProvider = requireNonNull(encryptionInformationProvider, "encryptionInformationProvider is null");
this.partitionSkippabilityChecker = requireNonNull(partitionSkippabilityChecker, "partitionSkippabilityChecker is null");
}

@Override
Expand Down Expand Up @@ -465,7 +469,7 @@ private Iterable<List<HivePartitionMetadata>> computePartitionMetadata(Iterable<
// verify partition is not marked as non-readable
String reason = partition.getParameters().get(OBJECT_NOT_READABLE);
if (!isNullOrEmpty(reason)) {
if (!shouldIgnoreUnreadablePartition(session) || !partition.isEligibleToIgnore()) {
if (!partitionSkippabilityChecker.isPartitionSkippable(partition, session)) {
throw new HiveNotReadableException(tableName, Optional.of(partitionName), reason);
}
unreadablePartitionsSkipped++;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.spi.ConnectorSession;

public interface PartitionSkippabilityChecker
{
/**
* Check if the un-readable partition can be skipped and not create any splits for it
*/
boolean isPartitionSkippable(Partition partition, ConnectorSession session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,8 @@ protected final void setup(String databaseName, HiveClientConfig hiveClientConfi
hiveClientConfig.getSplitLoaderConcurrency(),
false,
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
encryptionInformationProvider);
encryptionInformationProvider,
new HivePartitionSkippabilityChecker());
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(hiveClientConfig, metastoreClientConfig),
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ protected void setup(String host, int port, String databaseName, BiFunction<Hive
config.getSplitLoaderConcurrency(),
config.getRecursiveDirWalkerEnabled(),
new ConfigBasedCacheQuotaRequirementProvider(cacheConfig),
new HiveEncryptionInformationProvider(ImmutableSet.of()));
new HiveEncryptionInformationProvider(ImmutableSet.of()),
new HivePartitionSkippabilityChecker());
pageSinkProvider = new HivePageSinkProvider(
getDefaultHiveFileWriterFactories(config, metastoreClientConfig),
hdfsEnvironment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ private void assertRedundantColumnDomains(Range predicateRange, PartitionStatist
hiveClientConfig.getSplitLoaderConcurrency(),
false,
new ConfigBasedCacheQuotaRequirementProvider(new CacheConfig()),
new HiveEncryptionInformationProvider(ImmutableList.of()));
new HiveEncryptionInformationProvider(ImmutableList.of()),
new HivePartitionSkippabilityChecker());

HiveColumnHandle partitionColumn = new HiveColumnHandle(
"ds",
Expand Down

0 comments on commit 566ec0a

Please sign in to comment.