Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redo "Fix issues with queries that are bigger than 2GB" #274

Merged
merged 29 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
510d834
Update QuerySource hashing functions
nck-mlcnv Aug 7, 2024
02e8c89
Update hash tests
nck-mlcnv Aug 7, 2024
4a32551
Add query size limitation for reading queries as string
nck-mlcnv Aug 14, 2024
8a0474d
Put query indices in record class instead of long array
nck-mlcnv Aug 14, 2024
5399f70
Fix test
nck-mlcnv Aug 14, 2024
cdaeeb6
Delegate QueryIndex to its own class
nck-mlcnv Aug 14, 2024
d33e0db
Make InMemQueryList store queries in byte arrays instead of strings
nck-mlcnv Aug 15, 2024
af555fa
Add shallow copy for BigByteArrayOutputStream
nck-mlcnv Aug 15, 2024
87b0840
Add available methods for BigByteArrayInputStream
nck-mlcnv Aug 15, 2024
4a1278e
Fix shallow copy
nck-mlcnv Aug 15, 2024
33be64d
Fix available method
nck-mlcnv Aug 15, 2024
f8a8c16
Make BigByteArrayInputStream use a ByteBuffer
nck-mlcnv Aug 15, 2024
9302156
Change RequestFactory
nck-mlcnv Aug 15, 2024
777c1e0
Make StreamEntityProducer to not fully read in stream
nck-mlcnv Aug 15, 2024
04e5e17
Fix test
nck-mlcnv Aug 15, 2024
4d15f2a
Try to speedup indexing
nck-mlcnv Aug 16, 2024
f6bdab8
Remove hash code file
nck-mlcnv Aug 16, 2024
b117f6e
Remove unnecessary exception
nck-mlcnv Aug 16, 2024
9b164e6
Change ByteArrayListInputStream
nck-mlcnv Aug 16, 2024
dc48789
Use ByteArrayListInputStreams for storing queries instead of ByteArra…
nck-mlcnv Aug 16, 2024
0b3cd1b
Fix tests
nck-mlcnv Aug 16, 2024
3058b02
Speed up FileUtils again
nck-mlcnv Aug 16, 2024
705a56a
Add doc mentioning limitations
nck-mlcnv Aug 16, 2024
42d98b6
Fix function
nck-mlcnv Aug 19, 2024
18dc1a5
Remove unused methods and add comments
nck-mlcnv Aug 19, 2024
5fbfa3d
Use constants
nck-mlcnv Sep 6, 2024
0c04503
Change title and move paragraph
nck-mlcnv Sep 9, 2024
3f52876
Complete last commit
nck-mlcnv Sep 9, 2024
e359004
Fix test
nck-mlcnv Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/configuration/queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ WHERE {
```
The `separator` property should be set to `"\n###\n"`. (be aware of different line endings on different operating systems)

## Huge Query Strings
When working with large queries (Queries that are larger than 2³¹ Bytes or ~2GB),
it is important to consider that only the request types `post query` and `update query`
support large queries.

## Example
```yaml
tasks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOExc
return new QueryStringWrapper(queryIndex, queryList.getQuery(queryIndex));
}

public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) throws IOException {
public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) {
final var queryIndex = querySelector.getNextIndex();
return new QueryStreamWrapper(queryIndex, config.caching(), () -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import org.aksw.iguana.cc.query.list.QueryList;
import org.aksw.iguana.cc.query.source.QuerySource;
import org.aksw.iguana.commons.io.ByteArrayListInputStream;
import org.aksw.iguana.commons.io.ByteArrayListOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -21,21 +23,45 @@ public class InMemQueryList extends QueryList {

private static final Logger LOGGER = LoggerFactory.getLogger(InMemQueryList.class);

private final List<byte[]> queries;
private final List<ByteArrayListOutputStream> queries = new ArrayList<>();

public InMemQueryList(QuerySource querySource) throws IOException {
super(querySource);
queries = this.querySource.getAllQueries().stream().map(s -> s.getBytes(StandardCharsets.UTF_8)).toList();
LOGGER.info("Reading queries from the source with the hash code {} into memory.", querySource.hashCode());
for (int i = 0; i < querySource.size(); i++) {
try (InputStream queryStream = querySource.getQueryStream(i)) {
ByteArrayListOutputStream balos = new ByteArrayListOutputStream();
byte[] currentBuffer;
do {
currentBuffer = queryStream.readNBytes(Integer.MAX_VALUE / 2);
balos.write(currentBuffer);
} while (currentBuffer.length == Integer.MAX_VALUE / 2);
balos.close();
queries.add(balos);
}
}
}

@Override
public String getQuery(int index) {
return new String(this.queries.get(index), StandardCharsets.UTF_8);
final var queryStream = queries.get(index);
if (queryStream.size() > Integer.MAX_VALUE - 8) {
throw new OutOfMemoryError("Query is too large to be read into a string object.");
}

byte[] buffer;
try {
buffer = queryStream.toInputStream().readNBytes(Integer.MAX_VALUE - 8);
} catch (IOException ignored) {
LOGGER.error("Could not read query into string.");
return "";
}
return new String(buffer, StandardCharsets.UTF_8);
}

@Override
public InputStream getQueryStream(int index) {
return new ByteArrayInputStream(this.queries.get(index));
return new ByteArrayListInputStream(queries.get(index).getBuffers());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public List<String> getAllQueries() throws IOException {

@Override
public int hashCode() {
return FileUtils.getHashcodeFromFileContent(this.files[0]);
return FileUtils.getHashcodeFromDirectory(this.path);
}
}
146 changes: 106 additions & 40 deletions src/main/java/org/aksw/iguana/cc/utils/files/FileUtils.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.aksw.iguana.cc.utils.files;

import net.jpountz.xxhash.StreamingXXHash64;
import net.jpountz.xxhash.XXHashFactory;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -14,20 +19,62 @@
*
*/
public class FileUtils {
private static final XXHashFactory hasherFactory = XXHashFactory.fastestJavaInstance();
private static final int BUFFER_SIZE = 8192;

/**
* This method calculates the hashcode of the content of a file. <br/>
* The hashcode is calculated using the XXHash64 algorithm.
*
* @param filepath the path of the file
* @return the hashcode of the file content
*/
public static int getHashcodeFromFileContent(Path filepath) {
int hashcode;
try {
String fileContents = readFile(filepath);
hashcode = Math.abs(fileContents.hashCode());
try (StreamingXXHash64 hasher = hasherFactory.newStreamingHash64(0);
InputStream is = new BufferedInputStream(Files.newInputStream(filepath), BUFFER_SIZE)) {
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = (is.read(buffer))) != -1) {
hasher.update(buffer, 0, bytesRead);
}
hashcode = (int) hasher.getValue();
} catch (IOException e) {
hashcode = 0;
return 0;
}

return hashcode;
}

public static String readFile(Path path) throws IOException {
return Files.readString(path, StandardCharsets.UTF_8);
/**
* This method calculated the hashcode of a directory by hashing the content of all files in the directory. <br/>
* Only top-level files are considered, subdirectories are ignored. <br/>
* The hashcode is calculated using the XXHash64 algorithm.
*
* @param directory the path of the directory
* @return the hashcode of the directory content
*/
public static int getHashcodeFromDirectory(Path directory) {

int hashcode;
try (StreamingXXHash64 hasher = hasherFactory.newStreamingHash64(0)) {
for (Path file : Files.list(directory).sorted().toArray(Path[]::new)) {
if (Files.isRegularFile(file)) {
try (InputStream is = new BufferedInputStream(Files.newInputStream(file), BUFFER_SIZE)) {
byte[] buffer = new byte[BUFFER_SIZE];
int bytesRead;
while ((bytesRead = (is.read(buffer))) != -1) {
hasher.update(buffer, 0, bytesRead);
}
}
}
}
hashcode = (int) hasher.getValue();
} catch (IOException e) {
return 0;
}

return hashcode;
}

/**
Expand All @@ -48,16 +95,24 @@ public static String readFile(Path path) throws IOException {
public static String getLineEnding(Path filepath) throws IOException {
if (filepath == null)
throw new IllegalArgumentException("Filepath must not be null.");
try(BufferedReader br = Files.newBufferedReader(filepath)) {
try (BufferedReader br = Files.newBufferedReader(filepath)) {
CharBuffer buffer = CharBuffer.allocate(8192);
char c;
while ((c = (char) br.read()) != (char) -1) {
if (c == '\n')
return "\n";
else if (c == '\r') {
if ((char) br.read() == '\n')
return "\r\n";
return "\r";
while (br.read(buffer) != -1) {
buffer.flip();
while (buffer.hasRemaining()) {
c = buffer.get();
if (c == '\n')
return "\n";
else if (c == '\r') {
if (!buffer.hasRemaining() && br.read(buffer) == -1)
return "\r";
if (buffer.hasRemaining() && buffer.get() == '\n')
return "\r\n";
return "\r";
}
}
buffer.clear();
}
}

Expand All @@ -84,45 +139,56 @@ private static int[] computePrefixTable(byte[] pattern) {
return prefixTable;
}

public static List<long[]> indexStream(String separator, InputStream is) throws IOException {
public static List<QueryIndex> indexStream(String separator, InputStream is) throws IOException {
// basically Knuth-Morris-Pratt
List<long[]> indices = new ArrayList<>();
List<QueryIndex> indices = new ArrayList<>();


final byte[] sepArray = separator.getBytes(StandardCharsets.UTF_8);
final int[] prefixTable = computePrefixTable(sepArray);

long itemStart = 0;

long byteOffset = 0;
int patternIndex = 0;
byte[] currentByte = new byte[1];
while (is.read(currentByte) == 1) {
// skipping fast-forward with the prefixTable
while (patternIndex > 0 && currentByte[0] != sepArray[patternIndex]) {
patternIndex = prefixTable[patternIndex - 1];
}


if (currentByte[0] == sepArray[patternIndex]) {
patternIndex++;

if (patternIndex == sepArray.length) { // match found
patternIndex = 0;
final long itemEnd = byteOffset - sepArray.length + 1;
final long len = itemEnd - itemStart;
indices.add(new long[]{itemStart, len});

itemStart = byteOffset + 1;
}
}

byteOffset++;
}
// read from the stream in chunks, because the BufferedInputStream is synchronized, so single byte reads are
// slow, which is especially bad for large files
byte[] buffer = new byte[8192];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
byte currentByte;
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
byteBuffer.limit(bytesRead);
while (byteBuffer.hasRemaining()) {
currentByte = byteBuffer.get();
// skipping fast-forward with the prefixTable
while (patternIndex > 0 && currentByte != sepArray[patternIndex]) {
patternIndex = prefixTable[patternIndex - 1];
}


if (currentByte == sepArray[patternIndex]) {
patternIndex++;

if (patternIndex == sepArray.length) { // match found
patternIndex = 0;
final long itemEnd = byteOffset - sepArray.length + 1;
final long len = itemEnd - itemStart;
indices.add(new QueryIndex(itemStart, len));

itemStart = byteOffset + 1;
}
}

byteOffset++;
}
byteBuffer.clear();

}

final long itemEnd = byteOffset;
final long len = itemEnd - itemStart;
indices.add(new long[]{itemStart, len});
indices.add(new QueryIndex(itemStart, len));

return indices;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class IndexedQueryReader {
/**
* This list stores the start position and the length of each indexed content.
*/
private final List<long[]> indices;
private final List<QueryIndex> indices;

/**
* The file whose content should be indexed.
Expand Down Expand Up @@ -96,12 +96,19 @@ private IndexedQueryReader(Path filepath, String separator) throws IOException {
* @throws IOException
*/
public String readQuery(int index) throws IOException {
final int size;
try {
size = Math.toIntExact(indices.get(index).queryLength());
} catch (Exception e) {
throw new OutOfMemoryError("Can't read a Query to a string, that's bigger than 2^31 Bytes (~2GB)");
}
// Indexed queries can't be larger than ~2GB
try (FileChannel channel = FileChannel.open(path, StandardOpenOption.READ)) {
final ByteBuffer buffer = ByteBuffer.allocate((int) indices.get(index)[1]);
final var read = channel.read(buffer, indices.get(index)[0]);
assert read == indices.get(index)[1];
return new String(buffer.array(), StandardCharsets.UTF_8);
final byte[] buffer = new byte[size]; // it's supposedly faster to manually create a byte array than a ByteBuffer
final ByteBuffer bufferWrapper = ByteBuffer.wrap(buffer);
final var read = channel.read(bufferWrapper, indices.get(index).filePosition());
assert read == indices.get(index).queryLength();
return new String(buffer, StandardCharsets.UTF_8);
}
}

Expand All @@ -111,8 +118,8 @@ public InputStream streamQuery(int index) throws IOException {
new BoundedInputStream(
Channels.newInputStream(
FileChannel.open(path, StandardOpenOption.READ)
.position(this.indices.get(index)[0] /* offset */)),
this.indices.get(index)[1] /* length */)));
.position(this.indices.get(index).filePosition() /* offset */)),
this.indices.get(index).queryLength() /* length */)));
}

/**
Expand Down Expand Up @@ -146,11 +153,11 @@ public int size() {
* @return the Indexes
* @throws IOException
*/
private static List<long[]> indexFile(Path filepath, String separator) throws IOException {
private static List<QueryIndex> indexFile(Path filepath, String separator) throws IOException {
try (InputStream fi = Files.newInputStream(filepath, StandardOpenOption.READ);
BufferedInputStream bis = new BufferedInputStream(fi)) {
return FileUtils.indexStream(separator, bis)
.stream().filter((long[] e) -> e[1] > 0 /* Only elements with length > 0 */).collect(Collectors.toList());
.stream().filter(e -> e.queryLength() > 0 /* Only elements with length > 0 */).collect(Collectors.toList());
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/aksw/iguana/cc/utils/files/QueryIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.aksw.iguana.cc.utils.files;

public record QueryIndex(long filePosition, long queryLength) {
}
Loading
Loading