Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,39 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class LanceFragmentScanner implements AutoCloseable {
private static LoadingCache<LanceConfig, List<Fragment>> LOADING_CACHE =
private static LoadingCache<CacheKey, Map<Integer, Fragment>> LOADING_CACHE =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(
new CacheLoader<LanceConfig, List<Fragment>>() {
new CacheLoader<CacheKey, Map<Integer, Fragment>>() {
@Override
public List<Fragment> load(LanceConfig config) throws Exception {
public Map<Integer, Fragment> load(CacheKey key) throws Exception {
BufferAllocator allocator = LanceDatasetAdapter.allocator;
LanceConfig config = key.getConfig();
ReadOptions options = SparkOptions.genReadOptionFromConfig(config);
Dataset dataset = Dataset.open(allocator, config.getDatasetUri(), options);
return dataset.getFragments();
return dataset.getFragments().stream()
.collect(Collectors.toMap(Fragment::getId, f -> f));
}
});
private LanceScanner scanner;
private final LanceScanner scanner;

private LanceFragmentScanner(LanceScanner scanner) {
this.scanner = scanner;
}

public static LanceFragmentScanner create(int fragmentId, LanceInputPartition inputPartition) {
LanceScanner scanner = null;
try {
LanceConfig config = inputPartition.getConfig();
List<Fragment> cachedFragments = LOADING_CACHE.get(config);
CacheKey key = new CacheKey(config, inputPartition.getScanId());
Map<Integer, Fragment> cachedFragments = LOADING_CACHE.get(key);
Fragment fragment = cachedFragments.get(fragmentId);
ScanOptions.Builder scanOptions = new ScanOptions.Builder();
scanOptions.columns(getColumnNames(inputPartition.getSchema()));
Expand All @@ -81,18 +85,10 @@ public static LanceFragmentScanner create(int fragmentId, LanceInputPartition in
if (inputPartition.getTopNSortOrders().isPresent()) {
scanOptions.setColumnOrderings(inputPartition.getTopNSortOrders().get());
}
scanner = fragment.newScan(scanOptions.build());
} catch (Throwable t) {
if (scanner != null) {
try {
scanner.close();
} catch (Throwable it) {
t.addSuppressed(it);
}
}
throw new RuntimeException(t);
return new LanceFragmentScanner(fragment.newScan(scanOptions.build()));
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
return new LanceFragmentScanner(scanner);
}

/** @return the arrow reader. The caller is responsible for closing the reader */
Expand Down Expand Up @@ -130,4 +126,30 @@ private static boolean getWithRowAddress(StructType schema) {
.map(StructField::name)
.anyMatch(name -> name.equals(LanceConstant.ROW_ADDRESS));
}

private static class CacheKey {
private final LanceConfig config;
private final String scanId;

CacheKey(LanceConfig config, String scanId) {
this.config = config;
this.scanId = scanId;
}

public LanceConfig getConfig() {
return config;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
CacheKey cacheKey = (CacheKey) o;
return Objects.equals(config, cacheKey.config) && Objects.equals(scanId, cacheKey.scanId);
}

@Override
public int hashCode() {
return Objects.hash(config, scanId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,25 @@ public class LanceInputPartition implements InputPartition {
private final Optional<Integer> limit;
private final Optional<Integer> offset;
private final Optional<List<ColumnOrdering>> topNSortOrders;

public LanceInputPartition(
StructType schema,
int partitionId,
LanceSplit lanceSplit,
LanceConfig config,
Optional<String> whereCondition) {
this.schema = schema;
this.partitionId = partitionId;
this.lanceSplit = lanceSplit;
this.config = config;
this.whereCondition = whereCondition;
this.limit = Optional.empty();
this.offset = Optional.empty();
this.topNSortOrders = Optional.empty();
}
private final String scanId;

public LanceInputPartition(
StructType schema,
int partitionId,
LanceSplit lanceSplit,
LanceConfig config,
Optional<String> whereCondition,
Optional<Integer> limit,
Optional<Integer> offset) {
this.schema = schema;
this.partitionId = partitionId;
this.lanceSplit = lanceSplit;
this.config = config;
this.whereCondition = whereCondition;
this.limit = limit;
this.offset = offset;
this.topNSortOrders = Optional.empty();
String scanId) {
this(
schema,
partitionId,
lanceSplit,
config,
whereCondition,
Optional.empty(),
Optional.empty(),
Optional.empty(),
scanId);
}

public LanceInputPartition(
Expand All @@ -76,7 +62,8 @@ public LanceInputPartition(
Optional<String> whereCondition,
Optional<Integer> limit,
Optional<Integer> offset,
Optional<List<ColumnOrdering>> topNSortOrders) {
Optional<List<ColumnOrdering>> topNSortOrders,
String scanId) {
this.schema = schema;
this.partitionId = partitionId;
this.lanceSplit = lanceSplit;
Expand All @@ -85,6 +72,7 @@ public LanceInputPartition(
this.limit = limit;
this.offset = offset;
this.topNSortOrders = topNSortOrders;
this.scanId = scanId;
}

public StructType getSchema() {
Expand Down Expand Up @@ -118,4 +106,8 @@ public Optional<Integer> getOffset() {
public Optional<List<ColumnOrdering>> getTopNSortOrders() {
return topNSortOrders;
}

public String getScanId() {
return scanId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import java.util.stream.IntStream;

public class LanceScan
Expand All @@ -46,6 +47,7 @@ public class LanceScan
private final Optional<Integer> limit;
private final Optional<Integer> offset;
private final Optional<List<ColumnOrdering>> topNSortOrders;
private final String scanId = UUID.randomUUID().toString();

public LanceScan(
StructType schema,
Expand Down Expand Up @@ -81,7 +83,8 @@ public InputPartition[] planInputPartitions() {
whereConditions,
limit,
offset,
topNSortOrders))
topNSortOrders,
scanId))
.toArray(InputPartition[]::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ public static class TestTable1Config {
lanceConfig = LanceConfig.from(datasetUri);
inputPartition =
new LanceInputPartition(
schema, 0, new LanceSplit(Arrays.asList(0, 1)), lanceConfig, Optional.empty());
schema,
0,
new LanceSplit(Arrays.asList(0, 1)),
lanceConfig,
Optional.empty(),
"test");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public void test() throws Exception {
0,
split,
TestUtils.TestTable1Config.lanceConfig,
Optional.empty());
Optional.empty(),
"test");
try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) {
List<List<Long>> expectedValues = TestUtils.TestTable1Config.expectedValues;
int rowIndex = 0;
Expand Down Expand Up @@ -73,7 +74,9 @@ public void testOffsetAndLimit() throws Exception {
TestUtils.TestTable1Config.lanceConfig,
Optional.empty(),
Optional.of(1),
Optional.of(1));
Optional.of(1),
Optional.empty(),
"testOffsetAndLimit");
try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) {
List<List<Long>> expectedValues = TestUtils.TestTable1Config.expectedValues;
int rowIndex = 1;
Expand Down Expand Up @@ -112,7 +115,8 @@ public void testTopN() throws Exception {
Optional.empty(),
Optional.of(1),
Optional.empty(),
Optional.of(Collections.singletonList(builder.build())));
Optional.of(Collections.singletonList(builder.build())),
"testTopN");
try (LanceColumnarPartitionReader reader = new LanceColumnarPartitionReader(partition)) {
List<List<Long>> expectedValues = TestUtils.TestTable1Config.expectedValues;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public void validateFragment(List<List<Object>> expectedValues, int fragment, St
0,
new LanceSplit(Arrays.asList(fragment)),
TestUtils.TestTable1Config.lanceConfig,
Optional.empty()))) {
Optional.empty(),
"validateFragment"))) {
try (ArrowReader reader = scanner.getArrowReader()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
assertNotNull(root);
Expand Down