Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype of bulk import v2 distributed file examination #4898

Draft
wants to merge 21 commits into
base: 2.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -324,7 +325,7 @@ public interface KeyExtentCache {
KeyExtent lookup(Text row);
}

public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
public static List<KeyExtent> findOverlappingTablets(Function<Text,KeyExtent> rowToExtentResolver,
FileSKVIterator reader) throws IOException {

List<KeyExtent> result = new ArrayList<>();
Expand All @@ -336,7 +337,7 @@ public static List<KeyExtent> findOverlappingTablets(KeyExtentCache extentCache,
break;
}
row = reader.getTopKey().getRow();
KeyExtent extent = extentCache.lookup(row);
KeyExtent extent = rowToExtentResolver.apply(row);
result.add(extent);
row = extent.endRow();
if (row != null) {
Expand All @@ -356,13 +357,13 @@ private static Text nextRow(Text row) {
}

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
KeyExtentCache extentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
Function<Text,KeyExtent> rowToExtentResolver, Path file, FileSystem fs,
Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
.seekToBeginning().build()) {
return findOverlappingTablets(extentCache, reader);
return findOverlappingTablets(rowToExtentResolver, reader);
}
}

Expand Down Expand Up @@ -557,7 +558,7 @@ public SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs
try {
long t1 = System.currentTimeMillis();
List<KeyExtent> extents =
findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs);
findOverlappingTablets(context, extentCache::lookup, filePath, fs, fileLensCache, cs);
// make sure file isn't going to too many tablets
checkTabletCount(maxTablets, extents.size(), filePath.toString());
Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), filePath,
Expand Down
102 changes: 102 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,35 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.SortedSet;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
import org.apache.accumulo.core.clientImpl.bulk.BulkImport;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.UnsignedBytes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand Down Expand Up @@ -228,4 +246,88 @@ public LoadPlan build() {
}
};
}

private static final TableId FAKE_ID = TableId.of("999");

private static class JsonDestination {
String fileName;
String startRow;
String endRow;
RangeType rangeType;

JsonDestination() {}

JsonDestination(Destination destination) {
fileName = destination.getFileName();
startRow = destination.getStartRow() == null ? null
: Base64.getUrlEncoder().encodeToString(destination.getStartRow());
endRow = destination.getEndRow() == null ? null
: Base64.getUrlEncoder().encodeToString(destination.getEndRow());
rangeType = destination.getRangeType();
}

Destination toDestination() {
return new Destination(fileName, rangeType,
startRow == null ? null : Base64.getUrlDecoder().decode(startRow),
endRow == null ? null : Base64.getUrlDecoder().decode(endRow));
}
}

private static final class JsonAll {
List<JsonDestination> destinations;

JsonAll() {}

JsonAll(List<Destination> destinations) {
this.destinations =
destinations.stream().map(JsonDestination::new).collect(Collectors.toList());
}

}

private static final Gson gson = new GsonBuilder().disableJdkUnsafe().create();

// TODO javadoc
public String toJson() {
return gson.toJson(new JsonAll(destinations));
}

// TODO javadoc
public static LoadPlan fromJson(String json) {
var dests = gson.fromJson(json, JsonAll.class).destinations.stream()
.map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList());
return new LoadPlan(dests);
}

// TODO javadoc
public static LoadPlan compute(URI file, SortedSet<Text> splits) throws IOException {

// TODO if the files needed a crypto service how could it be instantiated? Was trying to make
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure of the best way forward here. As a design goal was attempting to make this compute method independent of something like an accumulo client and a client context, however ran into a problem with that design goal with the crypto service.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can look at how the rfile PrintInfo command does this. It calls:

       CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE,
           siteConfig.getAllCryptoProperties());

As a server-side utility, you could make certain assumptions that it has the ability to read the accumulo properties file on the server side, like that utility does. However, as a purely client-side API, you may need to just pass in the CryptoService directly, or pass in other options, so it can set up the right config (crypto, compression, etc.) to be able to read the files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an update in 9ef0bcf to pass in a map of props which is passed to the Rfile api which internally calls CryptoFactoryLoader.getServiceForClient using that map of properties.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could work, since it's a static entry point to building a load plan. If it was dangling off AccumuloClient, users might expect client properties to be passed. But for the static entry point, I think it's reasonable to require them to be provided explicitly.

I like how you were able to use the existing RFile.newScanner() code.

// this method independent of an ClientContext or ServerContext object.
CryptoService cs = NoCryptoServiceFactory.NONE;
Configuration conf = new Configuration();
Path path = new Path(file);
FileSystem fs = path.getFileSystem(conf);

try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.withTableConfiguration(DefaultConfiguration.getInstance()).seekToBeginning().build()) {

Function<Text,KeyExtent> rowToExtentResolver = row -> {
var headSet = splits.headSet(row);
Text prevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
Text endRow = tailSet.isEmpty() ? null : tailSet.first();
return new KeyExtent(FAKE_ID, endRow, prevRow);
};

List<KeyExtent> overlapping = BulkImport.findOverlappingTablets(rowToExtentResolver, reader);

var builder = builder();
for (var extent : overlapping) {
builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow());
}
return builder.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.accumulo.server.constraints.MetadataConstraints;
import org.apache.accumulo.server.constraints.SystemEnvironment;
import org.apache.accumulo.test.util.Wait;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -476,6 +477,62 @@ public void testBadLoadPlans() throws Exception {
}
}

@Test
public void testComputeLoadPlan() throws Exception {

try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
addSplits(c, tableName, "0333 0666 0999 1333 1666");

String dir = getDir("/testBulkFile-");

Map<String,Set<String>> hashes = new HashMap<>();
String h1 = writeData(dir + "/f1.", aconf, 0, 333);
hashes.put("0333", new HashSet<>(List.of(h1)));
String h2 = writeData(dir + "/f2.", aconf, 0, 666);
hashes.get("0333").add(h2);
hashes.put("0666", new HashSet<>(List.of(h2)));
String h3 = writeData(dir + "/f3.", aconf, 334, 700);
hashes.get("0666").add(h3);
hashes.put("0999", new HashSet<>(List.of(h3)));
hashes.put("1333", Set.of());
hashes.put("1666", Set.of());
hashes.put("null", Set.of());

SortedSet<Text> splits = new TreeSet<>(c.tableOperations().listSplits(tableName));

for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) {
// The body of this loop simulates what each reducer would do
Path path = new Path(dir + "/" + filename);

// compute the load plan for the rfile
String lpJson = LoadPlan.compute(path.toUri(), splits).toJson();

// save the load plan to a file
Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", ".lp"));
try (var output = getCluster().getFileSystem().create(lpPath, false)) {
IOUtils.write(lpJson, output, UTF_8);
}
}

// This simulates the code that would run after the map reduce job and bulk import the files
var builder = LoadPlan.builder();
for (var status : getCluster().getFileSystem().listStatus(new Path(dir),
p -> p.getName().endsWith(".lp"))) {
try (var input = getCluster().getFileSystem().open(status.getPath())) {
String lpJson = IOUtils.toString(input, UTF_8);
builder.addPlan(LoadPlan.fromJson(lpJson));
}
}

LoadPlan lpAll = builder.build();

c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load();

verifyData(c, tableName, 0, 700, false);
verifyMetadata(c, tableName, hashes);
}
}

@Test
public void testEmptyDir() throws Exception {
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
Expand Down Expand Up @@ -621,7 +678,7 @@ private void verifyMetadata(AccumuloClient client, String tableName,

String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString();

assertEquals(expectedHashes.get(endRow), fileHashes);
assertEquals(expectedHashes.get(endRow), fileHashes, "endRow " + endRow);

endRowsSeen.add(endRow);
}
Expand Down