Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File-based SplitIterator #866

Merged
merged 2 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.baremaps.storage.geoparquet;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.stream.Stream;
Expand Down Expand Up @@ -49,11 +47,7 @@ private GeoParquetReader reader() {

@Override
public long size() {
try {
return reader().size();
} catch (URISyntaxException e) {
throw new GeoParquetException("Fail to access size from reader", e);
}
return reader().size();
}

@Override
Expand All @@ -73,13 +67,9 @@ public Stream<DataRow> stream() {

@Override
public Stream<DataRow> parallelStream() {
try {
return reader().read().map(group -> new DataRowImpl(
GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
GeoParquetTypeConversion.asRowValues(group)));
} catch (IOException | URISyntaxException e) {
throw new GeoParquetException("Fail to read() the reader", e);
}
return reader().readParallel().map(group -> new DataRowImpl(
GeoParquetTypeConversion.asSchema(path.toString(), group.getSchema()),
GeoParquetTypeConversion.asRowValues(group)));
}

@Override
Expand All @@ -96,13 +86,9 @@ public void clear() {
@Override
public DataSchema schema() {
if (schema == null) {
try {
Schema schema = reader().getGeoParquetSchema();
this.schema = GeoParquetTypeConversion.asSchema(path.toString(), schema);
return this.schema;
} catch (URISyntaxException e) {
throw new GeoParquetException("Failed to get the schema.", e);
}
Schema schema = reader().getGeoParquetSchema();
this.schema = GeoParquetTypeConversion.asSchema(path.toString(), schema);
return this.schema;
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,50 @@
package org.apache.baremaps.geoparquet;

import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.*;
import java.util.function.Consumer;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
import org.apache.baremaps.geoparquet.hadoop.GeoParquetGroupReadSupport;
import org.apache.hadoop.fs.FileStatus;
import org.apache.parquet.hadoop.ParquetReader;

public class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {
class GeoParquetGroupSpliterator implements Spliterator<GeoParquetGroup> {

private final GeoParquetReader geoParquetReader;
private final Queue<FileStatus> queue;
private final Map<FileStatus, GeoParquetReader.FileInfo> files;
private FileStatus fileStatus = null;
private final List<FileStatus> fileStatuses;
private ParquetReader<GeoParquetGroup> reader;

GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader,
Map<FileStatus, GeoParquetReader.FileInfo> files) {
GeoParquetGroupSpliterator(GeoParquetReader geoParquetReader, List<FileStatus> files) {
this.geoParquetReader = geoParquetReader;
this.files = files;
this.queue = new ArrayBlockingQueue<>(files.keySet().size(), false, files.keySet());
this.fileStatuses = Collections.synchronizedList(files);
setupReaderForNextFile();
}

private void setupReaderForNextFile() {
FileStatus fileStatus = fileStatuses.remove(0);
try {
reader = createParquetReader(fileStatus);
} catch (IOException e) {
throw new GeoParquetException("Failed to create reader for " + fileStatus, e);
}
}

@Override
public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {
try {
// Poll the next file
if (fileStatus == null) {
fileStatus = queue.poll();
}

// If there are no more files, return false
if (fileStatus == null) {
return false;
}

// Create a new reader if it does not exist
if (reader == null) {
reader = createParquetReader(fileStatus);
}

// Read the next group
GeoParquetGroup group = reader.read();

// If the group is null, close the resources and set the variables to null
if (group == null) {
reader.close();
reader = null;
fileStatus = null;

// Try to advance again
return tryAdvance(action);
// If the group is null, try to get the one from the next file.
while (group == null) {
synchronized (fileStatuses) {
if (fileStatuses.isEmpty()) {
reader.close();
return false;
}
setupReaderForNextFile();
}
group = reader.read();
}

// Accept the group and tell the caller that there are more groups to read
Expand All @@ -80,13 +70,10 @@ public boolean tryAdvance(Consumer<? super GeoParquetGroup> action) {

} catch (IOException e) {
// If an exception occurs, try to close the resources and throw a runtime exception
if (reader != null) {
try {
reader.close();
} catch (IOException e2) {
// Ignore the exception as the original exception is more important
}
reader = null;
try {
reader.close();
} catch (IOException e2) {
// Ignore the exception as the original exception is more important
}
throw new GeoParquetException("IOException caught while trying to read the next file.", e);
}
Expand All @@ -102,34 +89,30 @@ private ParquetReader<GeoParquetGroup> createParquetReader(FileStatus file)

@Override
public Spliterator<GeoParquetGroup> trySplit() {
if (queue.size() < 2) {
// There is nothing left to split
return null;
}

// Create a new spliterator by polling the next polledFileStatus
FileStatus polledFileStatus = queue.poll();
List<FileStatus> sublist;
synchronized (fileStatuses) {
if (fileStatuses.size() < 2) {
// There is nothing left to split
return null;
}

// If there are no more files, tell the caller that there is nothing to split anymore
if (polledFileStatus == null) {
return null;
sublist = fileStatuses.subList(0, fileStatuses.size() / 2);
}
List<FileStatus> secondList = new ArrayList<>(sublist);
sublist.clear();

// Return a new spliterator with the polledFileStatus
return new GeoParquetGroupSpliterator(geoParquetReader,
Map.of(polledFileStatus, files.get(polledFileStatus)));
// Return a new spliterator with the sublist
return new GeoParquetGroupSpliterator(geoParquetReader, secondList);
}

@Override
public long estimateSize() {
return files.values().stream()
.map(GeoParquetReader.FileInfo::recordCount)
.reduce(0L, Long::sum);
return geoParquetReader.size();
}

@Override
public int characteristics() {
// The spliterator is not ordered, or sorted
return NONNULL | IMMUTABLE | SIZED | DISTINCT;
return NONNULL | CONCURRENT | DISTINCT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.baremaps.geoparquet.data.GeoParquetGroup;
Expand All @@ -45,10 +45,9 @@
public class GeoParquetReader {

private final URI uri;

final Configuration configuration;

private Map<FileStatus, FileInfo> files;
private List<FileStatus> files;
private Long groupCount;

record FileInfo(FileStatus file, Long recordCount, Map<String, String> keyValueMetadata,
MessageType messageType, GeoParquetMetadata metadata,
Expand All @@ -64,51 +63,60 @@ public GeoParquetReader(URI uri, Configuration configuration) {
this.configuration = configuration;
}

public MessageType getParquetSchema() throws URISyntaxException {
return files().values().stream()
public MessageType getParquetSchema() {
return files().stream()
.findFirst()
.map(this::getFileInfo)
.orElseThrow()
.messageType();
}

public GeoParquetMetadata getGeoParquetMetadata() throws URISyntaxException {
return files().values().stream()
private FileInfo getFileInfo(FileStatus fileStatus) {
try {
return buildFileInfo(fileStatus);
} catch (IOException e) {
throw new GeoParquetException("Failed to build Info", e);
}
}

public GeoParquetMetadata getGeoParquetMetadata() {
return files().stream()
.findFirst()
.map(this::getFileInfo)
.orElseThrow()
.metadata();
}

public Schema getGeoParquetSchema() throws URISyntaxException {
return files().values().stream()
public Schema getGeoParquetSchema() {
return files().stream()
.findFirst()
.map(this::getFileInfo)
.orElseThrow()
.geoParquetSchema();
}

public Long size() throws URISyntaxException {
return files().values().stream().map(FileInfo::recordCount).reduce(0L, Long::sum);
public boolean validateSchemasAreIdentical() {
// Verify that the files all have the same schema
final int messageTypeCount = files().stream().parallel().map(this::getFileInfo)
.map(FileInfo::messageType).collect(Collectors.toSet()).size();
return messageTypeCount == 1;
}

private synchronized Map<FileStatus, FileInfo> files() {
public long size() {
if (groupCount == null) {
groupCount = files().stream().parallel().map(this::getFileInfo).map(FileInfo::recordCount)
.reduce(0L, Long::sum);
}
return groupCount;
}

private synchronized List<FileStatus> files() {
try {
if (files == null) {
files = new HashMap<>();
FileSystem fs = FileSystem.get(uri, configuration);
FileStatus[] fileStatuses = fs.globStatus(new Path(uri));

for (FileStatus file : fileStatuses) {
files.put(file, buildFileInfo(file));
}

// Verify that the files all have the same schema
MessageType commonMessageType = null;
for (FileInfo entry : files.values()) {
if (commonMessageType == null) {
commonMessageType = entry.messageType;
} else if (!commonMessageType.equals(entry.messageType)) {
throw new GeoParquetException("The files do not have the same schema");
}
}
Path globPath = new Path(uri.getPath());
FileSystem fileSystem = FileSystem.get(uri, configuration);

files = new ArrayList<>(Arrays.asList(fileSystem.globStatus(globPath)));
}
} catch (IOException e) {
throw new GeoParquetException("IOException while attempting to list files.", e);
Expand Down Expand Up @@ -139,12 +147,16 @@ private FileInfo buildFileInfo(FileStatus file) throws IOException {
geoParquetMetadata, geoParquetSchema);
}

public Stream<GeoParquetGroup> readParallel() throws URISyntaxException {
return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), true);
public Stream<GeoParquetGroup> readParallel() {
return retrieveGeoParquetGroups(true);
}

private Stream<GeoParquetGroup> retrieveGeoParquetGroups(boolean inParallel) {
return StreamSupport.stream(new GeoParquetGroupSpliterator(this, files()), inParallel);
}

public Stream<GeoParquetGroup> read() throws IOException, URISyntaxException {
return readParallel().sequential();
public Stream<GeoParquetGroup> read() {
return retrieveGeoParquetGroups(false);
}

private static Configuration createConfiguration() {
Expand Down
Loading
Loading