From d24220a4804ee6e04346a03a4ddbf2d2711ae301 Mon Sep 17 00:00:00 2001 From: Jing Zhang Date: Tue, 21 Nov 2023 09:56:07 +0800 Subject: [PATCH] [HUDI-7111] Fix performance regression of tag when written into simple bucket index table (#10130) --- .../bucket/BucketIndexLocationMapper.java | 35 ------------ .../hudi/index/bucket/HoodieBucketIndex.java | 35 ------------ .../bucket/HoodieConsistentBucketIndex.java | 29 ++++++++-- .../index/bucket/HoodieSimpleBucketIndex.java | 54 +++++++++---------- 4 files changed, 50 insertions(+), 103 deletions(-) delete mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java deleted file mode 100644 index 1ce68ef97bf2..000000000000 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIndexLocationMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.index.bucket; - -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.util.Option; - -import java.io.Serializable; - -public interface BucketIndexLocationMapper extends Serializable { - - /** - * Get record location given hoodie key - */ - Option getRecordLocation(HoodieKey key); - -} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java index a41aa82a3e8c..3ca75d3e2649 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieBucketIndex.java @@ -19,13 +19,9 @@ package org.apache.hudi.index.bucket; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndex; @@ -37,8 +33,6 @@ import java.util.Arrays; import java.util.List; -import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded; - /** * Hash indexing mechanism. */ @@ -65,30 +59,6 @@ public HoodieData updateLocation(HoodieData writeStatu return writeStatuses; } - @Override - public HoodieData> tagLocation( - HoodieData> records, HoodieEngineContext context, - HoodieTable hoodieTable) - throws HoodieIndexException { - // Get bucket location mapper for the given partitions - List partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList(); - LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions); - BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable, partitions); - - return records.mapPartitions(iterator -> - new LazyIterableIterator, HoodieRecord>(iterator) { - @Override - protected HoodieRecord computeNext() { - // TODO maybe batch the operation to improve performance - HoodieRecord record = inputItr.next(); - Option loc = mapper.getRecordLocation(record.getKey()); - return tagAsNewRecordIfNeeded(record, loc); - } - }, - false - ); - } - @Override public boolean requiresTagging(WriteOperationType operationType) { switch (operationType) { @@ -127,9 +97,4 @@ public boolean isImplicitWithStorage() { public int getNumBuckets() { return numBuckets; } - - /** - * Get a location mapper for the given table & partitionPath - */ - protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable table, List partitionPath); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java index 156d14b7cf5c..125bc970d65f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java @@ -19,12 +19,14 @@ package org.apache.hudi.index.bucket; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.ConsistentHashingNode; import org.apache.hudi.common.model.HoodieConsistentHashingMetadata; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -35,10 +37,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded; + /** * Consistent hashing bucket index implementation, with auto-adjust bucket number. * NOTE: bucket resizing is triggered by clustering. @@ -71,11 +76,28 @@ public boolean rollbackCommit(String instantTime) { } @Override - protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List partitionPath) { - return new ConsistentBucketIndexLocationMapper(table, partitionPath); + public HoodieData> tagLocation( + HoodieData> records, HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + // Get bucket location mapper for the given partitions + List partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList(); + LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions); + ConsistentBucketIndexLocationMapper mapper = new ConsistentBucketIndexLocationMapper(hoodieTable, partitions); + + return records.mapPartitions(iterator -> + new LazyIterableIterator, HoodieRecord>(iterator) { + @Override + protected HoodieRecord computeNext() { + // TODO maybe batch the operation to improve performance + HoodieRecord record = inputItr.next(); + Option loc = mapper.getRecordLocation(record.getKey()); + return tagAsNewRecordIfNeeded(record, loc); + } + }, false); } - public class ConsistentBucketIndexLocationMapper implements BucketIndexLocationMapper { + public class ConsistentBucketIndexLocationMapper implements Serializable { /** * Mapping from partitionPath -> bucket identifier @@ -90,7 +112,6 @@ public ConsistentBucketIndexLocationMapper(HoodieTable table, List parti })); } - @Override public Option getRecordLocation(HoodieKey key) { String partitionPath = key.getPartitionPath(); ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java index fa2289ed87e7..a38fa489a2a4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieSimpleBucketIndex.java @@ -18,29 +18,29 @@ package org.apache.hudi.index.bucket; +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.HoodieIndexUtils; import org.apache.hudi.table.HoodieTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; + +import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded; /** * Simple bucket index implementation, with fixed bucket number. */ public class HoodieSimpleBucketIndex extends HoodieBucketIndex { - private static final Logger LOG = LoggerFactory.getLogger(HoodieSimpleBucketIndex.class); - public HoodieSimpleBucketIndex(HoodieWriteConfig config) { super(config); } @@ -79,27 +79,23 @@ public boolean canIndexLogFiles() { } @Override - protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List partitionPath) { - return new SimpleBucketIndexLocationMapper(table, partitionPath); - } - - public class SimpleBucketIndexLocationMapper implements BucketIndexLocationMapper { - - /** - * Mapping from partitionPath -> bucketId -> fileInfo - */ - private final Map> partitionPathFileIDList; - - public SimpleBucketIndexLocationMapper(HoodieTable table, List partitions) { - partitionPathFileIDList = partitions.stream() - .collect(Collectors.toMap(p -> p, p -> loadBucketIdToFileIdMappingForPartition(table, p))); - } - - @Override - public Option getRecordLocation(HoodieKey key) { - int bucketId = getBucketID(key); - Map bucketIdToFileIdMapping = partitionPathFileIDList.get(key.getPartitionPath()); - return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, null)); - } + public HoodieData> tagLocation( + HoodieData> records, HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + Map> partitionPathFileIDList = new HashMap<>(); + return records.mapPartitions(iterator -> new LazyIterableIterator, HoodieRecord>(iterator) { + @Override + protected HoodieRecord computeNext() { + HoodieRecord record = inputItr.next(); + int bucketId = getBucketID(record.getKey()); + String partitionPath = record.getPartitionPath(); + if (!partitionPathFileIDList.containsKey(partitionPath)) { + partitionPathFileIDList.put(partitionPath, loadBucketIdToFileIdMappingForPartition(hoodieTable, partitionPath)); + } + HoodieRecordLocation loc = partitionPathFileIDList.get(partitionPath).getOrDefault(bucketId, null); + return tagAsNewRecordIfNeeded(record, Option.ofNullable(loc)); + } + }, false); } }