Skip to content

Commit

Permalink
use Rfile api to read
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Sep 18, 2024
1 parent c8c5f21 commit 9ef0bcf
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,24 @@ public interface KeyExtentCache {
KeyExtent lookup(Text row);
}

/**
* Function that will find a row in a file being bulk imported that is >= the row passed to the
* function. If there is no row then it should return null.
*/
public interface NextRowFunction {
Text apply(Text row) throws IOException;
}

public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
FileSKVIterator reader) throws IOException {
NextRowFunction nextRowFunction) throws IOException {

List<KeyExtent> result = new ArrayList<>();
Collection<ByteSequence> columnFamilies = Collections.emptyList();
Text row = new Text();
while (true) {
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
row = nextRowFunction.apply(row);
if (row == null) {
break;
}
row = reader.getTopKey().getRow();
KeyExtent extent = rowToExtentResolver.apply(row);
result.add(extent);
row = extent.endRow();
Expand All @@ -357,13 +363,23 @@ private static Text nextRow(Text row) {
}

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
Function<Text,KeyExtent> rowToExtentResolver, Path file, FileSystem fs,
Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {
KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
.seekToBeginning().build()) {
return findOverlappingTablets(rowToExtentResolver, reader);

Collection<ByteSequence> columnFamilies = Collections.emptyList();
NextRowFunction nextRowFunction = row -> {
reader.seek(new Range(row, null), columnFamilies, false);
if (!reader.hasTop()) {
return null;
}
return reader.getTopKey().getRow();
};

return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction);
}
}

Expand Down Expand Up @@ -558,7 +574,7 @@ public SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs
try {
long t1 = System.currentTimeMillis();
List<KeyExtent> extents =
findOverlappingTablets(context, extentCache::lookup, filePath, fs, fileLensCache, cs);
findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs);
// make sure file isn't going to too many tablets
checkTabletCount(maxTablets, extents.size(), filePath.toString());
Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), filePath,
Expand Down
45 changes: 25 additions & 20 deletions core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,15 @@
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

Expand Down Expand Up @@ -332,25 +327,35 @@ static SplitResolver from(SortedSet<Text> splits) {
}
}

public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException {
return compute(file, Map.of(), splitResolver);
}

// TODO test w/ empty file
// TODO javadoc
public static LoadPlan compute(URI file, SplitResolver tabletResolver) throws IOException {
// TODO if the files needed a crypto service how could it be instantiated? Was trying to make
// this method independent of an ClientContext or ServerContext object.
CryptoService cs = NoCryptoServiceFactory.NONE;
Configuration conf = new Configuration();
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);

try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(DefaultConfiguration.getInstance()).seekToBeginning().build()) {
public static LoadPlan compute(URI file, Map<String,String> properties,
SplitResolver splitResolver) throws IOException {
try (var scanner = RFile.newScanner().from(file.toString()).withoutSystemIterators()
.withTableProperties(properties).build()) {
BulkImport.NextRowFunction nextRowFunction = row -> {
scanner.setRange(new Range(row, null));
var iter = scanner.iterator();
if (iter.hasNext()) {
return iter.next().getKey().getRow();
} else {
return null;
}
};

Function<Text,KeyExtent> rowToExtentResolver = row -> {
var tabletRange = tabletResolver.apply(row);
var tabletRange = splitResolver.apply(row);
return new KeyExtent(FAKE_ID, tabletRange.endRow, tabletRange.prevRow);
};

List<KeyExtent> overlapping = BulkImport.findOverlappingTablets(rowToExtentResolver, reader);
List<KeyExtent> overlapping =
BulkImport.findOverlappingTablets(rowToExtentResolver, nextRowFunction);

Path path = new Path(file);

var builder = builder();
for (var extent : overlapping) {
Expand Down

0 comments on commit 9ef0bcf

Please sign in to comment.