diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index d84ad43d03a..483fca52cad 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -325,18 +325,24 @@ public interface KeyExtentCache { KeyExtent lookup(Text row); } + /** + * Function that will find a row in a file being bulk imported that is >= the row passed to the + * function. If there is no row then it should return null. + */ + public interface NextRowFunction { + Text apply(Text row) throws IOException; + } + public static List findOverlappingTablets(Function rowToExtentResolver, - FileSKVIterator reader) throws IOException { + NextRowFunction nextRowFunction) throws IOException { List result = new ArrayList<>(); - Collection columnFamilies = Collections.emptyList(); Text row = new Text(); while (true) { - reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) { + row = nextRowFunction.apply(row); + if (row == null) { break; } - row = reader.getTopKey().getRow(); KeyExtent extent = rowToExtentResolver.apply(row); result.add(extent); row = extent.endRow(); @@ -357,13 +363,23 @@ private static Text nextRow(Text row) { } public static List findOverlappingTablets(ClientContext context, - Function rowToExtentResolver, Path file, FileSystem fs, - Cache fileLenCache, CryptoService cs) throws IOException { + KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache fileLenCache, + CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file.toString(), fs, fs.getConf(), cs) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) .seekToBeginning().build()) { - return findOverlappingTablets(rowToExtentResolver, reader); + + Collection columnFamilies = Collections.emptyList(); + NextRowFunction nextRowFunction = row -> { + reader.seek(new Range(row, null), columnFamilies, false); + if (!reader.hasTop()) { + return null; + } + return reader.getTopKey().getRow(); + }; + + return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction); } } @@ -558,7 +574,7 @@ public SortedMap computeFileToTabletMappings(FileSystem fs try { long t1 = System.currentTimeMillis(); List extents = - findOverlappingTablets(context, extentCache::lookup, filePath, fs, fileLensCache, cs); + findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs); // make sure file isn't going to too many tablets checkTabletCount(maxTablets, extents.size(), filePath.toString()); Map estSizes = estimateSizes(context.getConfiguration(), filePath, diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index 8055be4519f..95858c88f68 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -27,20 +27,15 @@ import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.SortedSet; import java.util.function.Function; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; +import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.clientImpl.bulk.BulkImport; -import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.file.FileOperations; -import org.apache.accumulo.core.file.FileSKVIterator; -import org.apache.accumulo.core.spi.crypto.CryptoService; -import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -332,25 +327,35 @@ static SplitResolver from(SortedSet splits) { } } + public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { + return compute(file, Map.of(), splitResolver); + } + + // TODO test w/ empty file // TODO javadoc - public static LoadPlan compute(URI file, SplitResolver tabletResolver) throws IOException { - // TODO if the files needed a crypto service how could it be instantiated? Was trying to make - // this method independent of an ClientContext or ServerContext object. - CryptoService cs = NoCryptoServiceFactory.NONE; - Configuration conf = new Configuration(); - Path path = new Path(file); - FileSystem fs = path.getFileSystem(conf); - - try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, fs.getConf(), cs) - .withTableConfiguration(DefaultConfiguration.getInstance()).seekToBeginning().build()) { + public static LoadPlan compute(URI file, Map properties, + SplitResolver splitResolver) throws IOException { + try (var scanner = RFile.newScanner().from(file.toString()).withoutSystemIterators() + .withTableProperties(properties).build()) { + BulkImport.NextRowFunction nextRowFunction = row -> { + scanner.setRange(new Range(row, null)); + var iter = scanner.iterator(); + if (iter.hasNext()) { + return iter.next().getKey().getRow(); + } else { + return null; + } + }; Function rowToExtentResolver = row -> { - var tabletRange = tabletResolver.apply(row); + var tabletRange = splitResolver.apply(row); return new KeyExtent(FAKE_ID, tabletRange.endRow, tabletRange.prevRow); }; - List overlapping = BulkImport.findOverlappingTablets(rowToExtentResolver, reader); + List overlapping = + BulkImport.findOverlappingTablets(rowToExtentResolver, nextRowFunction); + + Path path = new Path(file); var builder = builder(); for (var extent : overlapping) {