Skip to content

Commit

Permalink
[HUDI-7111] Fix performance regression of tag when written into simpl…
Browse files Browse the repository at this point in the history
…e bucket index table (apache#10130)
  • Loading branch information
beyond1920 authored Nov 21, 2023
1 parent 84990ae commit d24220a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 103 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@
package org.apache.hudi.index.bucket;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
Expand All @@ -37,8 +33,6 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;

/**
* Hash indexing mechanism.
*/
Expand All @@ -65,30 +59,6 @@ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatu
return writeStatuses;
}

@Override
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
// Get bucket location mapper for the given partitions
List<String> partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
BucketIndexLocationMapper mapper = getLocationMapper(hoodieTable, partitions);

return records.mapPartitions(iterator ->
new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
@Override
protected HoodieRecord<R> computeNext() {
// TODO maybe batch the operation to improve performance
HoodieRecord record = inputItr.next();
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey());
return tagAsNewRecordIfNeeded(record, loc);
}
},
false
);
}

@Override
public boolean requiresTagging(WriteOperationType operationType) {
switch (operationType) {
Expand Down Expand Up @@ -127,9 +97,4 @@ public boolean isImplicitWithStorage() {
public int getNumBuckets() {
return numBuckets;
}

/**
* Get a location mapper for the given table & partitionPath
*/
protected abstract BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.hudi.index.bucket;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand All @@ -35,10 +37,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;

/**
* Consistent hashing bucket index implementation, with auto-adjust bucket number.
* NOTE: bucket resizing is triggered by clustering.
Expand Down Expand Up @@ -71,11 +76,28 @@ public boolean rollbackCommit(String instantTime) {
}

@Override
protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
return new ConsistentBucketIndexLocationMapper(table, partitionPath);
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
// Get bucket location mapper for the given partitions
List<String> partitions = records.map(HoodieRecord::getPartitionPath).distinct().collectAsList();
LOG.info("Get BucketIndexLocationMapper for partitions: " + partitions);
ConsistentBucketIndexLocationMapper mapper = new ConsistentBucketIndexLocationMapper(hoodieTable, partitions);

return records.mapPartitions(iterator ->
new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
@Override
protected HoodieRecord<R> computeNext() {
// TODO maybe batch the operation to improve performance
HoodieRecord record = inputItr.next();
Option<HoodieRecordLocation> loc = mapper.getRecordLocation(record.getKey());
return tagAsNewRecordIfNeeded(record, loc);
}
}, false);
}

public class ConsistentBucketIndexLocationMapper implements BucketIndexLocationMapper {
public class ConsistentBucketIndexLocationMapper implements Serializable {

/**
* Mapping from partitionPath -> bucket identifier
Expand All @@ -90,7 +112,6 @@ public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String> parti
}));
}

@Override
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
String partitionPath = key.getPartitionPath();
ConsistentHashingNode node = partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,29 @@

package org.apache.hudi.index.bucket;

import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;

/**
* Simple bucket index implementation, with fixed bucket number.
*/
public class HoodieSimpleBucketIndex extends HoodieBucketIndex {

private static final Logger LOG = LoggerFactory.getLogger(HoodieSimpleBucketIndex.class);

public HoodieSimpleBucketIndex(HoodieWriteConfig config) {
super(config);
}
Expand Down Expand Up @@ -79,27 +79,23 @@ public boolean canIndexLogFiles() {
}

@Override
protected BucketIndexLocationMapper getLocationMapper(HoodieTable table, List<String> partitionPath) {
return new SimpleBucketIndexLocationMapper(table, partitionPath);
}

public class SimpleBucketIndexLocationMapper implements BucketIndexLocationMapper {

/**
* Mapping from partitionPath -> bucketId -> fileInfo
*/
private final Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList;

public SimpleBucketIndexLocationMapper(HoodieTable table, List<String> partitions) {
partitionPathFileIDList = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> loadBucketIdToFileIdMappingForPartition(table, p)));
}

@Override
public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
int bucketId = getBucketID(key);
Map<Integer, HoodieRecordLocation> bucketIdToFileIdMapping = partitionPathFileIDList.get(key.getPartitionPath());
return Option.ofNullable(bucketIdToFileIdMapping.getOrDefault(bucketId, null));
}
public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
HoodieTable hoodieTable)
throws HoodieIndexException {
Map<String, Map<Integer, HoodieRecordLocation>> partitionPathFileIDList = new HashMap<>();
return records.mapPartitions(iterator -> new LazyIterableIterator<HoodieRecord<R>, HoodieRecord<R>>(iterator) {
@Override
protected HoodieRecord<R> computeNext() {
HoodieRecord record = inputItr.next();
int bucketId = getBucketID(record.getKey());
String partitionPath = record.getPartitionPath();
if (!partitionPathFileIDList.containsKey(partitionPath)) {
partitionPathFileIDList.put(partitionPath, loadBucketIdToFileIdMappingForPartition(hoodieTable, partitionPath));
}
HoodieRecordLocation loc = partitionPathFileIDList.get(partitionPath).getOrDefault(bucketId, null);
return tagAsNewRecordIfNeeded(record, Option.ofNullable(loc));
}
}, false);
}
}

0 comments on commit d24220a

Please sign in to comment.