From a92d2392b1db8b32d47b76d1349112dff0dde921 Mon Sep 17 00:00:00 2001 From: Nick Molcanov <32801560+nck-mlcnv@users.noreply.github.com> Date: Fri, 23 Feb 2024 12:22:18 +0100 Subject: [PATCH] Apache HTTP Client 5 implementation (#243) * Add more logging messages * Fix log4j2 configuration * Implement apache HTTP client * Implement apache HTTP async client 5 * Fix timeout * Fixes * Fix hashing bug * Fix conversion of byte stream to string * Implement POST request streaming * Disable the storing and hashing of responses when the parseResults parameter in the config is false * Move utility classes * StreamEntityProducer can send fixed-sized data and is reproducible now * Make QueryHandler return stream supplier and info about query being cached * Change RequestFactory behavior * cached queries will be sent with fixed-sizes request * requests of cached queries will be cached as well (addresses #223) * Cleanup * Preload requests * Fix IDE warnings * Fix tests * Remove unneeded test class * Add Javadocs * Change requests * Move the RequestFactory to a separate class and add comments * Add comments from overridden methods * Lower maximum capacity while reading response --- pom.xml | 22 +- .../iguana/cc/controller/MainController.java | 6 + .../iguana/cc/query/handler/QueryHandler.java | 21 +- .../iguana/cc/query/source/QuerySource.java | 3 +- .../source/impl/FileLineQuerySource.java | 2 +- .../source/impl/FileSeparatorQuerySource.java | 2 +- .../query/source/impl/FolderQuerySource.java | 2 +- .../java/org/aksw/iguana/cc/suite/Suite.java | 1 + .../aksw/iguana/cc/tasks/impl/Stresstest.java | 9 +- .../cc/utils/{ => files}/FileUtils.java | 2 +- .../utils/{ => files}/IndexedQueryReader.java | 2 +- .../iguana/cc/utils/http/RequestFactory.java | 144 +++++ .../cc/utils/http/StreamEntityProducer.java | 157 ++++++ .../cc/worker/impl/SPARQLProtocolWorker.java | 527 ++++++++++-------- src/main/resources/log4j2.yml | 4 +- .../cc/query/handler/QueryHandlerTest.java | 10 +- .../aksw/iguana/cc/utils/FileUtilsTest.java | 1 + .../cc/utils/IndexedQueryReaderTest.java | 1 + .../cc/worker/impl/RequestFactoryTest.java | 110 ---- .../worker/impl/SPARQLProtocolWorkerTest.java | 125 +++-- 20 files changed, 730 insertions(+), 421 deletions(-) rename src/main/java/org/aksw/iguana/cc/utils/{ => files}/FileUtils.java (98%) rename src/main/java/org/aksw/iguana/cc/utils/{ => files}/IndexedQueryReader.java (99%) create mode 100644 src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java create mode 100644 src/main/java/org/aksw/iguana/cc/utils/http/StreamEntityProducer.java delete mode 100644 src/test/java/org/aksw/iguana/cc/worker/impl/RequestFactoryTest.java diff --git a/pom.xml b/pom.xml index 755e70146..6347fe757 100644 --- a/pom.xml +++ b/pom.xml @@ -88,21 +88,11 @@ log4j-slf4j-impl ${log4j.version} - - org.apache.logging.log4j - log4j-api - ${log4j.version} - org.apache.logging.log4j log4j-core ${log4j.version} - - org.apache.logging.log4j - log4j-1.2-api - ${log4j.version} - com.fasterxml.jackson.dataformat jackson-dataformat-yaml @@ -123,12 +113,6 @@ json-simple 1.1.1 - - org.slf4j - slf4j-api - 1.7.32 - compile - org.junit.jupiter junit-jupiter @@ -176,6 +160,12 @@ spring-context 6.0.11 + + org.apache.httpcomponents.client5 + httpclient5 + 5.3 + + diff --git a/src/main/java/org/aksw/iguana/cc/controller/MainController.java b/src/main/java/org/aksw/iguana/cc/controller/MainController.java index e9a03e70e..b9291fc38 100644 --- a/src/main/java/org/aksw/iguana/cc/controller/MainController.java +++ b/src/main/java/org/aksw/iguana/cc/controller/MainController.java @@ -3,10 +3,12 @@ import com.beust.jcommander.*; import org.aksw.iguana.cc.suite.IguanaSuiteParser; import org.aksw.iguana.cc.suite.Suite; +import org.apache.logging.log4j.core.config.Configurator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.URI; import java.nio.file.Path; @@ -44,6 +46,10 @@ public Path convert(String value) { * @param argc The command line arguments that are passed to the program. */ public static void main(String[] argc) { + // Apparently, there is something weird going on, where the apache jena library already configures log4j2 for + // some reason. That's why you have to call reconfigure here. + Configurator.reconfigure(URI.create("log4j2.yml")); + var args = new Args(); JCommander jc = JCommander.newBuilder() .addObject(args) diff --git a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java index 1c9ac2eee..ceea25660 100644 --- a/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java +++ b/src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java @@ -3,7 +3,6 @@ import com.fasterxml.jackson.annotation.*; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.aksw.iguana.cc.query.selector.QuerySelector; @@ -23,6 +22,7 @@ import java.nio.file.Path; import java.util.HashMap; import java.util.Objects; +import java.util.function.Supplier; /** * The QueryHandler is used by every worker that extends the AbstractWorker. @@ -124,7 +124,7 @@ public String value() { } public record QueryStringWrapper(int index, String query) {} - public record QueryStreamWrapper(int index, InputStream queryInputStream) {} + public record QueryStreamWrapper(int index, boolean cached, Supplier queryInputStreamSupplier) {} protected final Logger LOGGER = LoggerFactory.getLogger(QueryHandler.class); @@ -180,7 +180,13 @@ public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOExc public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) throws IOException { final var queryIndex = querySelector.getNextIndex(); - return new QueryStreamWrapper(queryIndex, this.queryList.getQueryStream(queryIndex)); + return new QueryStreamWrapper(queryIndex, config.caching(), () -> { + try { + return this.queryList.getQueryStream(queryIndex); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } @Override @@ -209,4 +215,13 @@ public String[] getAllQueryIds() { } return out; } + + /** + * Returns the configuration of the QueryHandler. + * + * @return the configuration of the QueryHandler + */ + public Config getConfig() { + return config; + } } diff --git a/src/main/java/org/aksw/iguana/cc/query/source/QuerySource.java b/src/main/java/org/aksw/iguana/cc/query/source/QuerySource.java index 9800b858d..59285cfee 100644 --- a/src/main/java/org/aksw/iguana/cc/query/source/QuerySource.java +++ b/src/main/java/org/aksw/iguana/cc/query/source/QuerySource.java @@ -1,10 +1,9 @@ package org.aksw.iguana.cc.query.source; -import org.aksw.iguana.cc.utils.FileUtils; +import org.aksw.iguana.cc.utils.files.FileUtils; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; import java.nio.file.Path; import java.util.List; diff --git a/src/main/java/org/aksw/iguana/cc/query/source/impl/FileLineQuerySource.java b/src/main/java/org/aksw/iguana/cc/query/source/impl/FileLineQuerySource.java index 992df4384..69789aa6b 100644 --- a/src/main/java/org/aksw/iguana/cc/query/source/impl/FileLineQuerySource.java +++ b/src/main/java/org/aksw/iguana/cc/query/source/impl/FileLineQuerySource.java @@ -1,6 +1,6 @@ package org.aksw.iguana.cc.query.source.impl; -import org.aksw.iguana.cc.utils.FileUtils; +import org.aksw.iguana.cc.utils.files.FileUtils; import java.io.IOException; import java.nio.file.Path; diff --git a/src/main/java/org/aksw/iguana/cc/query/source/impl/FileSeparatorQuerySource.java b/src/main/java/org/aksw/iguana/cc/query/source/impl/FileSeparatorQuerySource.java index caaacbfa3..b1e82c9c3 100644 --- a/src/main/java/org/aksw/iguana/cc/query/source/impl/FileSeparatorQuerySource.java +++ b/src/main/java/org/aksw/iguana/cc/query/source/impl/FileSeparatorQuerySource.java @@ -1,7 +1,7 @@ package org.aksw.iguana.cc.query.source.impl; import org.aksw.iguana.cc.query.source.QuerySource; -import org.aksw.iguana.cc.utils.IndexedQueryReader; +import org.aksw.iguana.cc.utils.files.IndexedQueryReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/aksw/iguana/cc/query/source/impl/FolderQuerySource.java b/src/main/java/org/aksw/iguana/cc/query/source/impl/FolderQuerySource.java index 04ae5fd12..be71ccec8 100644 --- a/src/main/java/org/aksw/iguana/cc/query/source/impl/FolderQuerySource.java +++ b/src/main/java/org/aksw/iguana/cc/query/source/impl/FolderQuerySource.java @@ -1,7 +1,7 @@ package org.aksw.iguana.cc.query.source.impl; import org.aksw.iguana.cc.query.source.QuerySource; -import org.aksw.iguana.cc.utils.FileUtils; +import org.aksw.iguana.cc.utils.files.FileUtils; import org.apache.commons.io.input.AutoCloseInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/aksw/iguana/cc/suite/Suite.java b/src/main/java/org/aksw/iguana/cc/suite/Suite.java index 1cb38acb5..7e2e50025 100644 --- a/src/main/java/org/aksw/iguana/cc/suite/Suite.java +++ b/src/main/java/org/aksw/iguana/cc/suite/Suite.java @@ -94,6 +94,7 @@ else if (storageConfig instanceof RDFFileStorage.Config) { public void run() { for (int i = 0; i < tasks.size(); i++) { + LOGGER.info("Task/{} {} starting.", tasks.get(i).getTaskName(), i); tasks.get(i).run(); LOGGER.info("Task/{} {} finished.", tasks.get(i).getTaskName(), i); } diff --git a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index e76aa78ef..923a1683e 100644 --- a/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java +++ b/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -80,8 +80,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody } public void run() { - var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed + if (!warmupWorkers.isEmpty()) { + SPARQLProtocolWorker.initHttpClient(warmupWorkers.size()); + var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed + SPARQLProtocolWorker.closeHttpClient(); + } + + SPARQLProtocolWorker.initHttpClient(workers.size()); var results = executeWorkers(workers); + SPARQLProtocolWorker.closeHttpClient(); srp.process(results.workerResults); srp.calculateAndSaveMetrics(results.startTime, results.endTime); diff --git a/src/main/java/org/aksw/iguana/cc/utils/FileUtils.java b/src/main/java/org/aksw/iguana/cc/utils/files/FileUtils.java similarity index 98% rename from src/main/java/org/aksw/iguana/cc/utils/FileUtils.java rename to src/main/java/org/aksw/iguana/cc/utils/files/FileUtils.java index 58334906d..cea3b542f 100644 --- a/src/main/java/org/aksw/iguana/cc/utils/FileUtils.java +++ b/src/main/java/org/aksw/iguana/cc/utils/files/FileUtils.java @@ -1,4 +1,4 @@ -package org.aksw.iguana.cc.utils; +package org.aksw.iguana.cc.utils.files; import java.io.*; import java.nio.charset.StandardCharsets; diff --git a/src/main/java/org/aksw/iguana/cc/utils/IndexedQueryReader.java b/src/main/java/org/aksw/iguana/cc/utils/files/IndexedQueryReader.java similarity index 99% rename from src/main/java/org/aksw/iguana/cc/utils/IndexedQueryReader.java rename to src/main/java/org/aksw/iguana/cc/utils/files/IndexedQueryReader.java index b89ee7ae6..2e9c84f46 100644 --- a/src/main/java/org/aksw/iguana/cc/utils/IndexedQueryReader.java +++ b/src/main/java/org/aksw/iguana/cc/utils/files/IndexedQueryReader.java @@ -1,4 +1,4 @@ -package org.aksw.iguana.cc.utils; +package org.aksw.iguana.cc.utils.files; import org.apache.commons.io.input.AutoCloseInputStream; import org.apache.commons.io.input.BoundedInputStream; diff --git a/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java b/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java new file mode 100644 index 000000000..6966f87f4 --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java @@ -0,0 +1,144 @@ +package org.aksw.iguana.cc.utils.http; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.aksw.iguana.cc.config.elements.ConnectionConfig; +import org.aksw.iguana.cc.query.handler.QueryHandler; +import org.aksw.iguana.cc.worker.HttpWorker; +import org.aksw.iguana.cc.worker.impl.SPARQLProtocolWorker; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; +import org.apache.hc.core5.net.URIBuilder; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * A factory for creating HTTP requests. + * The factory can create requests for different types of HTTP methods and different types of SPARQL queries. + * The factory can also cache requests to avoid creating the same request multiple times. + */ +public final class RequestFactory { + public enum RequestType { + GET_QUERY("get query"), + POST_URL_ENC_QUERY("post url-enc query"), + POST_QUERY("post query"), + POST_URL_ENC_UPDATE("post url-enc update"), + POST_UPDATE("post update"); + + private final String value; + + @JsonCreator + RequestType(String value) { + this.value = Objects.requireNonNullElse(value, "get query"); + } + + @JsonValue + public String value() { + return value; + } + } + + private final RequestType requestType; + private final Map cache = new HashMap<>(); + + public RequestFactory(RequestType requestType) { + this.requestType = requestType; + } + + private static String urlEncode(List parameters) { + return parameters.stream() + .map(e -> e[0] + "=" + URLEncoder.encode(e[1], StandardCharsets.UTF_8)) + .collect(Collectors.joining("&")); + } + + private static String urlEncode(String name, String value) { + return name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + /** + * Builds an HTTP request for a given query. + * If the query has been cached by the query handler, its content will be fully read by the entity producer into a + * byte buffer, which will then be reused on consecutive request executions. + * Cached requests will be sent non-chunked. + * If the query has not been cached by the query handler, the entity producer will use the query stream supplier to + * send the query in chunks. + * + * @param queryHandle the query handle containing the query and its index + * @param connection the connection to send the request to + * @param requestHeader the request header + * @return the request as an AsyncRequestProducer + * @throws URISyntaxException if the URI is invalid + * @throws IOException if the query stream cannot be read + */ + public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper queryHandle, + ConnectionConfig connection, + String requestHeader) throws URISyntaxException, IOException { + if (queryHandle.cached() && cache.containsKey(queryHandle.index())) + return cache.get(queryHandle.index()); + + AsyncRequestBuilder asyncRequestBuilder; + Supplier queryStreamSupplier; + InputStream queryStream; + + try { + queryStreamSupplier = queryHandle.queryInputStreamSupplier(); + queryStream = queryStreamSupplier.get(); + } catch (RuntimeException e) { + throw new IOException(e); + } + + switch (this.requestType) { + case GET_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.get(new URIBuilder(connection.endpoint()) + .addParameter("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)) + .build() + ); + case POST_URL_ENC_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint()) + // manually set content type, because otherwise the + // entity producer would set it to "application/x-www-form-urlencoded; charset=ISO-8859-1" + .setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded") + .setEntity(new BasicAsyncEntityProducer(urlEncode("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, !queryHandle.cached())); + case POST_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint()) + .setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached(), "application/sparql-query")); + case POST_URL_ENC_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint()) + .setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded") + .setEntity(new BasicAsyncEntityProducer(urlEncode("update", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, !queryHandle.cached())); + case POST_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint()) + .setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached(), "application/sparql-update")); + default -> throw new IllegalStateException("Unexpected value: " + this.requestType); + } + + if (requestHeader != null) + asyncRequestBuilder.addHeader("Accept", requestHeader); + if (connection.authentication() != null && connection.authentication().user() != null) + asyncRequestBuilder.addHeader("Authorization", + HttpWorker.basicAuth(connection.authentication().user(), + Optional.ofNullable(connection.authentication().password()).orElse(""))); + + if (queryHandle.cached()) + cache.put(queryHandle.index(), asyncRequestBuilder.build()); + + return asyncRequestBuilder.build(); + } + + /** + * Get a cached request by the index of the query. + * If the request is not in the cache, an IllegalArgumentException is thrown. + * + * @param index the index of the query + * @return the request as an AsyncRequestProducer + */ + public AsyncRequestProducer getCachedRequest(int index) { + if (!cache.containsKey(index)) + throw new IllegalArgumentException("No request with index " + index + " found in cache."); + return cache.get(index); + } +} diff --git a/src/main/java/org/aksw/iguana/cc/utils/http/StreamEntityProducer.java b/src/main/java/org/aksw/iguana/cc/utils/http/StreamEntityProducer.java new file mode 100644 index 000000000..53320c98f --- /dev/null +++ b/src/main/java/org/aksw/iguana/cc/utils/http/StreamEntityProducer.java @@ -0,0 +1,157 @@ +package org.aksw.iguana.cc.utils.http; + +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Set; +import java.util.function.Supplier; + +/** + * An entity producer that produces the entity data from an input stream supplier. + * The entity data can optionally be sent in chunks. + * If the entity data is supposed to be sent non-chunked, the whole stream will be read into a byte buffer. + * The stream supplier should be repeatable, as this producer might be reused multiple times to create the entity data. + */ +public class StreamEntityProducer implements AsyncEntityProducer { + + private static final Logger logger = LoggerFactory.getLogger(StreamEntityProducer.class); + + private final Supplier streamSupplier; + private final boolean chunked; + private final String contentType; + + private ByteBuffer content; // used for non-chunked request, stores the whole content in reusable buffer + + private final static int BUFFER_SIZE = 8192; + private final byte[] buffer = new byte[BUFFER_SIZE]; + + private InputStream currentStream; // used for chunked request, stores the current stream to read from + + /** + * Creates a new entity producer that produces the entity data from the given input stream supplier. + * + * @param streamSupplier the input stream supplier, should be repeatable + * @param chunked whether the entity data should be sent in chunks + */ + public StreamEntityProducer(Supplier streamSupplier, boolean chunked, String contentType) throws IOException { + this.streamSupplier = streamSupplier; + this.chunked = chunked; + this.contentType = contentType; + + if (!chunked) { + content = ByteBuffer.wrap(streamSupplier.get().readAllBytes()); + } + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public void failed(Exception cause) { + logger.error("Failed to produce entity data", cause); + if (currentStream != null) { + try { + currentStream.close(); + } catch (IOException e) { + logger.error("Failed to close input stream", e); + } + } + } + + @Override + public boolean isChunked() { + return chunked; + } + + @Override + public Set getTrailerNames() { + return null; + } + + @Override + public long getContentLength() { + // if the content length is known (non-chunked request), return it + if (content != null) { + return content.limit(); + } + + // if the content length is unknown (chunked request), return -1 + return -1; + } + + @Override + public String getContentType() { + return contentType; + } + + @Override + public String getContentEncoding() { + return null; + } + + @Override + public void releaseResources() { + if (currentStream != null) { + try { + currentStream.close(); + } catch (IOException e) { + logger.error("Failed to close input stream", e); + } + } + } + + @Override + public int available() { + // If content is not null, it means the whole entity data has been read into the buffer from a stream that was + // taken from the stream supplier and that the content will be sent non-chunked. + // In this case, the remaining bytes in the buffer are returned. + if (content != null) { + return content.remaining(); + } + + // Otherwise, the data is sent in chunks. If there is currently a stream open, from which the data is being read + // from, the available bytes from that stream are returned. + if (currentStream != null) { + try { + return currentStream.available(); + } catch (IOException e) { + logger.error("Failed to get available bytes from input stream", e); + } + } + return 0; + } + + @Override + public void produce(DataStreamChannel channel) throws IOException { + // handling of non-chunked request + if (content != null) { + channel.write(content); + if (!content.hasRemaining()) { + channel.endStream(); + } + return; + } + + // handling of chunked request + if (chunked && currentStream == null) { + currentStream = streamSupplier.get(); + } + + int bytesRead; + while ((bytesRead = currentStream.read(buffer)) > 0) { + ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead); + channel.write(byteBuffer); + } + + if (bytesRead == -1) { + channel.endStream(); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java index 44f68bfa4..be90138b4 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java +++ b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java @@ -1,135 +1,43 @@ package org.aksw.iguana.cc.worker.impl; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonValue; +import net.jpountz.xxhash.StreamingXXHash64; import net.jpountz.xxhash.XXHashFactory; import org.aksw.iguana.cc.config.elements.ConnectionConfig; import org.aksw.iguana.cc.query.handler.QueryHandler; +import org.aksw.iguana.cc.query.selector.impl.LinearQuerySelector; +import org.aksw.iguana.cc.utils.http.RequestFactory; import org.aksw.iguana.cc.worker.ResponseBodyProcessor; import org.aksw.iguana.cc.worker.HttpWorker; import org.aksw.iguana.commons.io.BigByteArrayOutputStream; -import org.apache.http.client.utils.URIBuilder; +import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.DefaultConnectionKeepAliveStrategy; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.reactor.IOReactorConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; import java.io.IOException; -import java.io.InputStream; -import java.net.*; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.nio.charset.StandardCharsets; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.*; -import java.util.function.BiFunction; -import java.util.function.Supplier; -import java.util.stream.Collectors; public class SPARQLProtocolWorker extends HttpWorker { - public final static class RequestFactory { - public enum RequestType { - GET_QUERY("get query"), - POST_URL_ENC_QUERY("post url-enc query"), - POST_QUERY("post query"), - POST_URL_ENC_UPDATE("post url-enc update"), - POST_UPDATE("post update"); - - private final String value; - - @JsonCreator - RequestType(String value) { - this.value = Objects.requireNonNullElse(value, "get query"); - } - - @JsonValue - public String value() { - return value; - } - } - - private final RequestType requestType; - - public RequestFactory(RequestType requestType) { - this.requestType = requestType; - } - - private static String urlEncode(List parameters) { - return parameters.stream() - .map(e -> e[0] + "=" + URLEncoder.encode(e[1], StandardCharsets.UTF_8)) - .collect(Collectors.joining("&")); - } - - public HttpRequest buildHttpRequest(InputStream queryStream, - Duration timeout, - ConnectionConfig connection, - String requestHeader) throws URISyntaxException, IOException { - HttpRequest.Builder request = HttpRequest.newBuilder().timeout(timeout); - - class CustomStreamSupplier { - boolean used = false; // assume, that the stream will only be used again, if the first request failed, because of the client - public Supplier getStreamSupplier() { - if (!used) { - used = true; - return () -> queryStream; - } - else - return () -> null; - } - } - - if (requestHeader != null) - request.header("Accept", requestHeader); - if (connection.authentication() != null && connection.authentication().user() != null) - request.header("Authorization", - HttpWorker.basicAuth(connection.authentication().user(), - Optional.ofNullable(connection.authentication().password()).orElse(""))); - switch (this.requestType) { - case GET_QUERY -> { - request.uri(new URIBuilder(connection.endpoint()) - .setParameter("query", - new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)) - .build()) - .GET(); - } - case POST_URL_ENC_QUERY -> { - request.uri(connection.endpoint()) - .header("Content-Type", "application/x-www-form-urlencoded") - .POST(HttpRequest.BodyPublishers.ofString( - urlEncode(Collections.singletonList( - new String[]{"query" /* query is already URL encoded */, - new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)})))); - } - case POST_QUERY -> { - request.uri(connection.endpoint()) - .header("Content-Type", "application/sparql-query") - .POST(HttpRequest.BodyPublishers.ofInputStream(new CustomStreamSupplier().getStreamSupplier())); - } - case POST_URL_ENC_UPDATE -> { - request.uri(connection.endpoint()) - .header("Content-Type", "application/x-www-form-urlencoded") - .POST(HttpRequest.BodyPublishers.ofString( - urlEncode(Collections.singletonList( - new String[]{"update" /* query is already URL encoded */, - new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)})))); - } - case POST_UPDATE -> { - request.uri(connection.endpoint()) - .header("Content-Type", "application/sparql-update") - .POST(HttpRequest.BodyPublishers.ofInputStream(new CustomStreamSupplier().getStreamSupplier())); - } - } - - return request.build(); - } - } - - public record Config( Integer number, QueryHandler queries, @@ -161,7 +69,7 @@ public Config(Integer number, record HttpExecutionResult( int queryID, - Optional> response, + Optional response, Instant requestStart, Duration duration, Optional outputStream, @@ -175,13 +83,14 @@ public boolean completed() { public boolean successful() { if (response.isPresent() && exception.isEmpty()) - return (response.get().statusCode() / 100) == 2; + return (response.get().getCode() / 100) == 2; return false; } } - private HttpClient httpClient; + private static CloseableHttpAsyncClient httpClient; + private static AsyncClientConnectionManager connectionManager; private final ThreadPoolExecutor executor; private final XXHashFactory hasherFactory = XXHashFactory.fastestJavaInstance(); @@ -193,7 +102,8 @@ public boolean successful() { private BigByteArrayOutputStream responseBodybbaos = new BigByteArrayOutputStream(); // used to read the http response body - private final byte[] buffer = new byte[4096]; + private final byte[] buffer = new byte[BUFFER_SIZE]; + private static final int BUFFER_SIZE = 4096; private final static Logger LOGGER = LoggerFactory.getLogger(SPARQLProtocolWorker.class); @@ -202,13 +112,67 @@ public Config config() { return (SPARQLProtocolWorker.Config) config; } - public SPARQLProtocolWorker(long workerId, ResponseBodyProcessor responseBodyProcessor, Config config) { super(workerId, responseBodyProcessor, config); this.responseBodyProcessor = responseBodyProcessor; this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); this.requestFactory = new RequestFactory(config().requestType()); - this.httpClient = buildHttpClient(); + } + + /** + * Initializes the http client with the given thread count. + * All workers will use the same http client instance. + * + * @param threadCount the number of threads to be used by the http client + */ + public static void initHttpClient(int threadCount) { + connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setMaxConnTotal(threadCount) + .setMaxConnPerRoute(threadCount) + .build(); + final var ioReactorConfig = IOReactorConfig.custom() + .setTcpNoDelay(true) + .setIoThreadCount(threadCount) + .build(); + httpClient = HttpAsyncClients.custom() + .setConnectionManager(connectionManager) + .setIOReactorConfig(ioReactorConfig) + .setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy()) + .setDefaultRequestConfig(RequestConfig.custom() + .setContentCompressionEnabled(false) + .setHardCancellationEnabled(true) + .build()) + .build(); + httpClient.start(); + } + + /** + * Closes the http client and the connection manager. + */ + public static void closeHttpClient() { + try { + httpClient.close(); + connectionManager.close(); + } catch (IOException e) { + LOGGER.error("Failed to close http client.", e); + } + } + + /** + * Builds every request once, so that the requests can be loaded into the cache, if the queries themselves are + * cached. + * This is done to avoid the overhead of building (url-encoding) the requests during the benchmark. + */ + private void preloadRequests() { + final var selector = new LinearQuerySelector(config().queries().getQueryCount()); + for (int i = 0; i < config().queries().getQueryCount(); i++) { + try { + // build request and discard it + requestFactory.buildHttpRequest(config().queries().getNextQueryStream(selector), config().connection(), config().acceptHeader()); + } catch (IOException | URISyntaxException e) { + LOGGER.error("Failed to preload request.", e); + } + } } /** @@ -222,6 +186,7 @@ public SPARQLProtocolWorker(long workerId, ResponseBodyProcessor responseBodyPro * @return the CompletableFuture the contains the results of the worker. */ public CompletableFuture start() { + preloadRequests(); return CompletableFuture.supplyAsync(() -> { ZonedDateTime startTime = ZonedDateTime.now(); List executionStats = new ArrayList<>(); @@ -233,20 +198,30 @@ public CompletableFuture start() { logExecution(execution); executionStats.add(execution); } - LOGGER.info("{}\t:: Completed {} out of {} querymixes", this, i + 1, queryMixes.number()); + LOGGER.info("{}\t:: Completed {} out of {} querymixes.", this, i + 1, queryMixes.number()); } } else if (config().completionTarget() instanceof TimeLimit timeLimit) { - final Instant endTime = Instant.now().plus(timeLimit.duration()); - Instant now; - while ((now = Instant.now()).isBefore(endTime)) { - final Duration timeToEnd = Duration.between(now, endTime); - final boolean reducedTimeout = config().timeout().compareTo(timeToEnd) > 0; - final Duration thisQueryTimeOut = (reducedTimeout) ? timeToEnd : config().timeout(); - ExecutionStats execution = executeQuery(thisQueryTimeOut, reducedTimeout); + final var startNanos = System.nanoTime(); + long queryExecutionCount = 0; + int queryMixExecutionCount = 0; + int queryMixSize = config().queries().getQueryCount(); + long now; + while ((now = System.nanoTime()) - startNanos < ((TimeLimit) config.completionTarget()).duration().toNanos()) { + final var timeLeft = ((TimeLimit) config.completionTarget()).duration().toNanos() - (now - startNanos); + final var reducedTimeout = timeLeft < config.timeout().toNanos(); + final Duration actualQueryTimeOut = reducedTimeout ? Duration.of(timeLeft, ChronoUnit.NANOS) : config.timeout(); + ExecutionStats execution = executeQuery(actualQueryTimeOut, reducedTimeout); if (execution != null){ // If timeout is reduced, the execution result might be discarded if it failed and executeQuery returns null. logExecution(execution); executionStats.add(execution); } + + // + if ((++queryExecutionCount) >= queryMixSize) { + queryExecutionCount = 0; + queryMixExecutionCount++; + LOGGER.info("{}\t:: Completed {} querymixes.", this, queryMixExecutionCount); + } } LOGGER.info("{}\t:: Reached time limit of {}.", this, timeLimit.duration()); } @@ -265,14 +240,17 @@ public CompletableFuture start() { * @return the execution statistic of the execution */ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) { + // execute the request HttpExecutionResult result = executeHttpRequest(timeout); + + // process result Optional statuscode = Optional.empty(); if (result.response().isPresent()) - statuscode = Optional.of(result.response().get().statusCode()); + statuscode = Optional.of(result.response().get().getCode()); if (result.successful() && this.config.parseResults()) { // 2xx if (result.actualContentLength.isEmpty() || result.hash.isEmpty() || result.outputStream.isEmpty()) { - throw new RuntimeException("Response body is null, but execution was successful."); // This should never happen + throw new RuntimeException("Response body is null, but execution was successful."); // This should never happen, just here for fixing the warning. } // process result @@ -289,8 +267,6 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) this.responseBodybbaos = new BigByteArrayOutputStream(); } - // This is not explicitly checking for a timeout, instead it just checks if the execution was successful or not. - // TODO: This might cause problems if the query actually fails before the timeout and discardOnFailure is true. if (!result.successful() && discardOnFailure) { LOGGER.debug("{}\t:: Discarded execution, because the time limit has been reached: [queryID={}]", this, result.queryID); return null; @@ -307,128 +283,211 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) ); } - + /** + * Executes the next query given by the query selector from the query handler. + * It uses the http client to execute the request and returns the result of the execution. + * + * @param timeout the timeout for the execution + * @return the execution result of the execution + */ private HttpExecutionResult executeHttpRequest(Duration timeout) { - final QueryHandler.QueryStreamWrapper queryHandle; - try { - queryHandle = config().queries().getNextQueryStream(this.querySelector); - } catch (IOException e) { - return new HttpExecutionResult( - this.querySelector.getCurrentIndex(), - Optional.empty(), - Instant.now(), - Duration.ZERO, - Optional.empty(), - OptionalLong.empty(), - OptionalLong.empty(), - Optional.of(e) - ); - } + // get the next query and request + final AsyncRequestProducer request; + final int queryIndex; + if (config().queries().getConfig().caching()) { + queryIndex = querySelector.getNextIndex(); + request = requestFactory.getCachedRequest(queryIndex); + } else { + final QueryHandler.QueryStreamWrapper queryHandle; + try { + queryHandle = config().queries().getNextQueryStream(this.querySelector); + } catch (IOException e) { + return createFailedResultBeforeRequest(this.querySelector.getCurrentIndex(), e); + } - final HttpRequest request; + try { + request = requestFactory.buildHttpRequest( + queryHandle, + config().connection(), + config().acceptHeader() + ); + } catch (IOException | URISyntaxException e) { + return createFailedResultBeforeRequest(queryHandle.index(), e); + } - try { - request = requestFactory.buildHttpRequest( - queryHandle.queryInputStream(), - timeout, - config().connection(), - config().acceptHeader() - ); - } catch (IOException | URISyntaxException e) { - return new HttpExecutionResult( - queryHandle.index(), - Optional.empty(), - Instant.now(), - Duration.ZERO, - Optional.empty(), - OptionalLong.empty(), - OptionalLong.empty(), - Optional.of(e) - ); + // set queryIndex to the index of the queryHandle, so that the result can be associated with the query + queryIndex = queryHandle.index(); } - // check if the last execution task is stuck - if (this.httpClient.executor().isPresent() && ((ThreadPoolExecutor) this.httpClient.executor().get()).getActiveCount() != 0) { - // This might never cancel the task if the client that's connected to is broken. There also seems to be a - // bug where the httpClient never properly handles the interrupt from the shutdownNow method. - // See: https://bugs.openjdk.org/browse/JDK-8294047 - ((ThreadPoolExecutor) this.httpClient.executor().get()).shutdownNow(); - final var waitStart = Instant.now(); - try { - while (!((ThreadPoolExecutor) this.httpClient.executor().get()).awaitTermination(1, TimeUnit.SECONDS)) { - LOGGER.warn("{}\t:: [Thread-ID: {}]\t:: Waiting for the http client to shutdown. Elapsed time: {}", this, Thread.currentThread().getId(), Duration.between(waitStart, Instant.now())); + // execute the request + final Instant timeStamp = Instant.now(); + final var requestStart = System.nanoTime(); + final var future = httpClient.execute(request, new AbstractBinResponseConsumer() { + + private HttpResponse response; + private final StreamingXXHash64 hasher = hasherFactory.newStreamingHash64(0); + private long responseSize = 0; // will be used if parseResults is false + private long responseEnd = 0; // time in nanos + + @Override + public void releaseResources() {} // nothing to release + + @Override + protected int capacityIncrement() { + return Integer.MAX_VALUE - 8; // get as much data in as possible + } + + /** + * Triggered to pass incoming data packet to the data consumer. + * + * @param src the data packet. + * @param endOfStream flag indicating whether this data packet is the last in the data stream. + */ + @Override + protected void data(ByteBuffer src, boolean endOfStream) throws IOException { + if (endOfStream) { + responseEnd = System.nanoTime(); + return; + } + + if (config.parseResults()) { + // if the buffer uses an array, use the array directly + if (src.hasArray()) { + hasher.update(src.array(), src.position() + src.arrayOffset(), src.remaining()); + responseBodybbaos.write(src.array(), src.position() + src.arrayOffset(), src.remaining()); + } else { // otherwise, copy the buffer to an array + int readCount; + while (src.hasRemaining()) { + readCount = Math.min(BUFFER_SIZE, src.remaining()); + src.get(buffer, 0, readCount); + hasher.update(buffer, 0, readCount); + responseBodybbaos.write(buffer, 0, readCount); + } + } + } else { + responseSize += src.remaining(); + } + } + + /** + * Triggered to signal the beginning of response processing. + * + * @param response the response message head + * @param contentType the content type of the response body, + * or {@code null} if the response does not enclose a response entity. + */ + @Override + protected void start(HttpResponse response, ContentType contentType) { + this.response = response; + } + + /** + * Triggered to generate an object that represents a result of response message processing. + * + * @return the result of response message processing + */ + @Override + protected HttpExecutionResult buildResult() { + // if the responseEnd hasn't been set yet, set it to the current time + if (responseEnd == 0) + responseEnd = System.nanoTime(); + + // duration of the execution + final var duration = Duration.ofNanos(responseEnd - requestStart); + + // check for http error + if (response.getCode() / 100 != 2) { + return createFailedResultDuringResponse(queryIndex, response, timeStamp, duration, null); + } + + // check content length + final var contentLengthHeader = response.getFirstHeader("Content-Length"); + Long contentLength = contentLengthHeader != null ? Long.parseLong(contentLengthHeader.getValue()) : null; + if (contentLength != null) { + if ((!config.parseResults() && responseSize != contentLength) // if parseResults is false, the responseSize will be used + || (config.parseResults() && responseBodybbaos.size() != contentLength)) { // if parseResults is true, the size of the bbaos will be used + return createFailedResultDuringResponse(queryIndex, response, timeStamp, duration, new HttpException("Content-Length header value doesn't match actual content length.")); + } } - } catch (InterruptedException ignored) { - LOGGER.warn("{}\t:: Http client never shutdown. Continuing with the creation of a new http client.", this); + + // check timeout + if (duration.compareTo(timeout) > 0) { + return createFailedResultDuringResponse(queryIndex, response, timeStamp, duration, new TimeoutException()); + } + + // return successful result + return new HttpExecutionResult( + queryIndex, + Optional.of(response), + timeStamp, + Duration.ofNanos(responseEnd - requestStart), + Optional.of(responseBodybbaos), + OptionalLong.of(config.parseResults() ? responseBodybbaos.size() : responseSize), + OptionalLong.of(config.parseResults() ? hasher.getValue() : 0), + Optional.empty() + ); } - this.httpClient = buildHttpClient(); + }, null); // the callback is used to handle the end state of the request, but it's not needed here + + try { + // Wait for the request to finish, but don't wait longer than the timeout. + // The timeout from the configuration is used instead of the timeout from the parameter. + // The timeout from the parameter might be reduced if the end of the time limit is near + // and it might be so small that it causes issues. + return future.get(config.timeout().toNanos(), TimeUnit.NANOSECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + // This will close the connection and cancel the request if it's still running. + future.cancel(true); + return createFailedResultBeforeRequest(queryIndex, e); } + } - final Instant timeStamp = Instant.now(); - final var requestStart = System.nanoTime(); - BiFunction, Exception, HttpExecutionResult> createFailedResult = (response, e) -> new HttpExecutionResult( - queryHandle.index(), - Optional.ofNullable(response), - timeStamp, - Duration.ofNanos(System.nanoTime() - requestStart), + /** + * Creates a failed result for a query execution that failed before the request. + * + * @param queryIndex the index of the query + * @param e the exception that caused the error + * @return the failed result + */ + private static HttpExecutionResult createFailedResultBeforeRequest(int queryIndex, Exception e) { + return new HttpExecutionResult( + queryIndex, + Optional.empty(), + Instant.now(), + Duration.ZERO, Optional.empty(), OptionalLong.empty(), OptionalLong.empty(), Optional.ofNullable(e) ); - - try { - return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream()) - .thenApply(httpResponse -> { - try (final var bodyStream = httpResponse.body()) { - if (httpResponse.statusCode() / 100 == 2) { // Request was successful - OptionalLong contentLength = httpResponse.headers().firstValueAsLong("Content-Length"); - try (var hasher = hasherFactory.newStreamingHash64(0)) { - int readBytes; - while ((readBytes = bodyStream.readNBytes(this.buffer, 0, this.buffer.length)) != 0) { - if (Duration.between(Instant.now(), timeStamp.plus(timeout)).isNegative()) { - return createFailedResult.apply(httpResponse, new TimeoutException()); - } - hasher.update(this.buffer, 0, readBytes); - this.responseBodybbaos.write(this.buffer, 0, readBytes); - } - - if (contentLength.isPresent() && - (this.responseBodybbaos.size() < contentLength.getAsLong() || - this.responseBodybbaos.size() > contentLength.getAsLong())) { - return createFailedResult.apply(httpResponse, new ProtocolException("Content-Length header value doesn't match actual content length.")); - } - - return new HttpExecutionResult( - queryHandle.index(), - Optional.of(httpResponse), - timeStamp, - Duration.ofNanos(System.nanoTime() - requestStart), - Optional.of(this.responseBodybbaos), - OptionalLong.of(this.responseBodybbaos.size()), - OptionalLong.of(hasher.getValue()), - Optional.empty() - ); - } - } else { - return createFailedResult.apply(httpResponse, null); - } - } catch (IOException ex) { - return createFailedResult.apply(httpResponse, ex); - } - }).get(timeout.toNanos(), TimeUnit.NANOSECONDS); - } catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) { - return createFailedResult.apply(null, e); - } } - private HttpClient buildHttpClient() { - return HttpClient.newBuilder() - .version(HttpClient.Version.HTTP_1_1) - .executor(Executors.newFixedThreadPool(1)) - .followRedirects(HttpClient.Redirect.ALWAYS) - .connectTimeout(config().timeout()) - .build(); + /** + * Creates a failed result for a query execution that failed during the response. + * + * @param queryIndex the index of the query + * @param response the response of the query + * @param timestamp the start time of the query + * @param duration the duration of the query until error + * @param e the exception that caused the error, can be null + * @return the failed result + */ + private static HttpExecutionResult createFailedResultDuringResponse( + int queryIndex, + HttpResponse response, + Instant timestamp, + Duration duration, + Exception e) { + return new HttpExecutionResult( + queryIndex, + Optional.ofNullable(response), + timestamp, + duration, + Optional.empty(), + OptionalLong.empty(), + OptionalLong.empty(), + Optional.ofNullable(e) + ); } private void logExecution(ExecutionStats execution) { diff --git a/src/main/resources/log4j2.yml b/src/main/resources/log4j2.yml index f7d5b1ffc..0b5f391be 100644 --- a/src/main/resources/log4j2.yml +++ b/src/main/resources/log4j2.yml @@ -12,7 +12,7 @@ Configuration: name: STDOUT target: SYSTEM_OUT PatternLayout: - Pattern: "%highlight{%d [%t] %p [%c] - <%m>%n}{FATAL=red blink, ERROR=red, WARN=yellow bold, INFO=green, DEBUG=green bold, TRACE=blue}" + Pattern: "%highlight{%d [%t] \t %-5p [%c{1}] - <%m>%n}{FATAL=red blink, ERROR=red, WARN=yellow bold, INFO=green, DEBUG=green bold, TRACE=blue}" disableAnsi: false File: name: File @@ -32,7 +32,7 @@ Configuration: - ref: STDOUT - ref: File - name: org.reflections.Reflections - level: info + level: error additivity: true AppenderRef: - ref: STDOUT diff --git a/src/test/java/org/aksw/iguana/cc/query/handler/QueryHandlerTest.java b/src/test/java/org/aksw/iguana/cc/query/handler/QueryHandlerTest.java index 4235c1df2..152fe2c7b 100644 --- a/src/test/java/org/aksw/iguana/cc/query/handler/QueryHandlerTest.java +++ b/src/test/java/org/aksw/iguana/cc/query/handler/QueryHandlerTest.java @@ -88,7 +88,7 @@ public void testDeserialization(String json, Class sourceType) thro final var mapper = new ObjectMapper(); QueryHandler queryHandler = assertDoesNotThrow(() -> mapper.readValue(json, QueryHandler.class)); final var selector = queryHandler.getQuerySelectorInstance(); - assertTrue(selector instanceof LinearQuerySelector); + assertInstanceOf(LinearQuerySelector.class, selector); assertEquals(queries.size(), queryHandler.getQueryCount()); assertNotEquals(0, queryHandler.hashCode()); for (int i = 0; i < queryHandler.getQueryCount(); i++) { @@ -114,7 +114,7 @@ public void testQueryStreamWrapper(String json, Class sourceType) t for (int i = 0; i < queryHandler.getQueryCount(); i++) { final var wrapper = queryHandler.getNextQueryStream(selector); assertEquals(i, selector.getCurrentIndex()); - final var acutalQuery = new String(wrapper.queryInputStream().readAllBytes(), StandardCharsets.UTF_8); + final var acutalQuery = new String(wrapper.queryInputStreamSupplier().get().readAllBytes(), StandardCharsets.UTF_8); if (FolderQuerySource.class.isAssignableFrom(sourceType)) assertEquals(folderQueries.get(i).content(), acutalQuery); else @@ -129,7 +129,7 @@ public void testQueryStringWrapper(String json, Class sourceType) t final var mapper = new ObjectMapper(); QueryHandler queryHandler = assertDoesNotThrow(() -> mapper.readValue(json, QueryHandler.class)); final var selector = queryHandler.getQuerySelectorInstance(); - assertTrue(selector instanceof LinearQuerySelector); + assertInstanceOf(LinearQuerySelector.class, selector); assertEquals(queries.size(), queryHandler.getQueryCount()); assertNotEquals(0, queryHandler.hashCode()); for (int i = 0; i < queryHandler.getQueryCount(); i++) { @@ -149,7 +149,7 @@ public void testQueryIDs(String json, Class sourceType) { final var mapper = new ObjectMapper(); QueryHandler queryHandler = assertDoesNotThrow(() -> mapper.readValue(json, QueryHandler.class)); final var selector = queryHandler.getQuerySelectorInstance(); - assertTrue(selector instanceof LinearQuerySelector); + assertInstanceOf(LinearQuerySelector.class, selector); assertEquals(queries.size(), queryHandler.getQueryCount()); assertNotEquals(0, queryHandler.hashCode()); final var allQueryIDs = queryHandler.getAllQueryIds(); @@ -174,7 +174,7 @@ public void testRandomQuerySelectorSeedConsistency() throws IOException { for (int i = 0; i < 2; i++) { QueryHandler queryHandler = mapper.readValue(json[i], QueryHandler.class); final var selector = queryHandler.getQuerySelectorInstance(); - assertTrue(selector instanceof RandomQuerySelector); + assertInstanceOf(RandomQuerySelector.class, selector); indices[i] = new ArrayList<>(); for (int j = 0; j < 100000; j++) { indices[i].add(selector.getNextIndex()); diff --git a/src/test/java/org/aksw/iguana/cc/utils/FileUtilsTest.java b/src/test/java/org/aksw/iguana/cc/utils/FileUtilsTest.java index f213d73e0..b18d52704 100644 --- a/src/test/java/org/aksw/iguana/cc/utils/FileUtilsTest.java +++ b/src/test/java/org/aksw/iguana/cc/utils/FileUtilsTest.java @@ -1,5 +1,6 @@ package org.aksw.iguana.cc.utils; +import org.aksw.iguana.cc.utils.files.FileUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; diff --git a/src/test/java/org/aksw/iguana/cc/utils/IndexedQueryReaderTest.java b/src/test/java/org/aksw/iguana/cc/utils/IndexedQueryReaderTest.java index b279c8153..2bafe0ddf 100644 --- a/src/test/java/org/aksw/iguana/cc/utils/IndexedQueryReaderTest.java +++ b/src/test/java/org/aksw/iguana/cc/utils/IndexedQueryReaderTest.java @@ -1,5 +1,6 @@ package org.aksw.iguana.cc.utils; +import org.aksw.iguana.cc.utils.files.IndexedQueryReader; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.params.ParameterizedTest; diff --git a/src/test/java/org/aksw/iguana/cc/worker/impl/RequestFactoryTest.java b/src/test/java/org/aksw/iguana/cc/worker/impl/RequestFactoryTest.java deleted file mode 100644 index ef1c09d9f..000000000 --- a/src/test/java/org/aksw/iguana/cc/worker/impl/RequestFactoryTest.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.aksw.iguana.cc.worker.impl; - -import org.aksw.iguana.cc.mockup.MockupConnection; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.EnumSource; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.net.URISyntaxException; -import java.net.URLEncoder; -import java.net.http.HttpResponse; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.List; -import java.util.concurrent.Flow; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class RequestFactoryTest { - static final class StringSubscriber implements Flow.Subscriber { - final HttpResponse.BodySubscriber wrapped; - StringSubscriber(HttpResponse.BodySubscriber wrapped) { - this.wrapped = wrapped; - } - @Override - public void onSubscribe(Flow.Subscription subscription) { - wrapped.onSubscribe(subscription); - } - @Override - public void onNext(ByteBuffer item) { wrapped.onNext(List.of(item)); } - @Override - public void onError(Throwable throwable) { wrapped.onError(throwable); } - @Override - public void onComplete() { wrapped.onComplete(); } - } - - - @ParameterizedTest - @EnumSource(SPARQLProtocolWorker.RequestFactory.RequestType.class) - public void test(SPARQLProtocolWorker.RequestFactory.RequestType type) throws URISyntaxException, IOException { - final var content = "SELECT * WHERE { ?s ?p ?o }"; - final var connection = MockupConnection.createConnectionConfig("test-conn", "", "http://localhost:8080/sparql"); - final var duration = Duration.of(2, ChronoUnit.SECONDS); - final var stream = new ByteArrayInputStream(content.getBytes()); - final var requestHeader = "application/sparql-results+json"; - - final var requestFactory = new SPARQLProtocolWorker.RequestFactory(type); - final var request = requestFactory.buildHttpRequest( - stream, - duration, - connection, - requestHeader - ); - - switch (type) { - case GET_QUERY -> assertEquals(connection.endpoint() + "?query=" + URLEncoder.encode(content, StandardCharsets.UTF_8), request.uri().toString()); - case POST_QUERY -> { - assertEquals("application/sparql-query", request.headers().firstValue("Content-Type").get()); - assertEquals("http://localhost:8080/sparql", request.uri().toString()); - assertTrue(request.bodyPublisher().isPresent()); - String body = request.bodyPublisher().map(p -> { - var bodySubscriber = HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8); - var flowSubscriber = new StringSubscriber(bodySubscriber); - p.subscribe(flowSubscriber); - return bodySubscriber.getBody().toCompletableFuture().join(); - }).get(); - assertEquals(content, body); - } - case POST_UPDATE -> { - assertEquals("application/sparql-update", request.headers().firstValue("Content-Type").get()); - assertEquals("http://localhost:8080/sparql", request.uri().toString()); - assertTrue(request.bodyPublisher().isPresent()); - String body = request.bodyPublisher().map(p -> { - var bodySubscriber = HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8); - var flowSubscriber = new StringSubscriber(bodySubscriber); - p.subscribe(flowSubscriber); - return bodySubscriber.getBody().toCompletableFuture().join(); - }).get(); - assertEquals(content, body); - } - case POST_URL_ENC_QUERY -> { - assertEquals("application/x-www-form-urlencoded", request.headers().firstValue("Content-Type").get()); - assertEquals("http://localhost:8080/sparql", request.uri().toString()); - assertTrue(request.bodyPublisher().isPresent()); - String body = request.bodyPublisher().map(p -> { - var bodySubscriber = HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8); - var flowSubscriber = new StringSubscriber(bodySubscriber); - p.subscribe(flowSubscriber); - return bodySubscriber.getBody().toCompletableFuture().join(); - }).get(); - assertEquals("query=" + URLEncoder.encode(content, StandardCharsets.UTF_8), body); - } - case POST_URL_ENC_UPDATE -> { - assertEquals("application/x-www-form-urlencoded", request.headers().firstValue("Content-Type").get()); - assertEquals("http://localhost:8080/sparql", request.uri().toString()); - assertTrue(request.bodyPublisher().isPresent()); - String body = request.bodyPublisher().map(p -> { - var bodySubscriber = HttpResponse.BodySubscribers.ofString(StandardCharsets.UTF_8); - var flowSubscriber = new StringSubscriber(bodySubscriber); - p.subscribe(flowSubscriber); - return bodySubscriber.getBody().toCompletableFuture().join(); - }).get(); - assertEquals("update=" + URLEncoder.encode(content, StandardCharsets.UTF_8), body); - } - } - } -} diff --git a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java index 6b8e29e92..4df92a929 100644 --- a/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java +++ b/src/test/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorkerTest.java @@ -1,6 +1,7 @@ package org.aksw.iguana.cc.worker.impl; -import com.github.tomakehurst.wiremock.common.Slf4jNotifier; +import com.github.tomakehurst.wiremock.client.MappingBuilder; +import com.github.tomakehurst.wiremock.common.ConsoleNotifier; import com.github.tomakehurst.wiremock.core.Options; import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.http.Fault; @@ -8,6 +9,7 @@ import org.aksw.iguana.cc.config.elements.ConnectionConfig; import org.aksw.iguana.cc.config.elements.DatasetConfig; import org.aksw.iguana.cc.query.handler.QueryHandler; +import org.aksw.iguana.cc.utils.http.RequestFactory; import org.aksw.iguana.cc.worker.HttpWorker; import org.aksw.iguana.cc.worker.ResponseBodyProcessor; import org.junit.jupiter.api.*; @@ -16,6 +18,8 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; @@ -29,6 +33,8 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Stream; import static com.github.tomakehurst.wiremock.client.WireMock.*; @@ -38,7 +44,7 @@ public class SPARQLProtocolWorkerTest { @RegisterExtension public static WireMockExtension wm = WireMockExtension.newInstance() - .options(new WireMockConfiguration().useChunkedTransferEncoding(Options.ChunkedEncodingPolicy.NEVER).dynamicPort().notifier(new Slf4jNotifier(true))) + .options(new WireMockConfiguration().useChunkedTransferEncoding(Options.ChunkedEncodingPolicy.NEVER).dynamicPort().notifier(new ConsoleNotifier(false))) .failOnUnmatchedRequests(true) .build(); @@ -46,10 +52,13 @@ public class SPARQLProtocolWorkerTest { private final static int QUERY_MIXES = 1; private static Path queryFile; + private static final Logger LOGGER = LoggerFactory.getLogger(SPARQLProtocolWorker.class); + @BeforeAll public static void setup() throws IOException { queryFile = Files.createTempFile("iguana-test-queries", ".tmp"); Files.writeString(queryFile, QUERY, StandardCharsets.UTF_8); + SPARQLProtocolWorker.initHttpClient(1); } @BeforeEach @@ -60,30 +69,39 @@ public void reset() { @AfterAll public static void cleanup() throws IOException { Files.deleteIfExists(queryFile); + SPARQLProtocolWorker.closeHttpClient(); } - public static Stream> requestFactoryData() throws IOException, URISyntaxException { + public static Stream requestFactoryData() throws URISyntaxException { final var uri = new URI("http://localhost:" + wm.getPort() + "/ds/query"); final var processor = new ResponseBodyProcessor("application/sparql-results+json"); final var format = QueryHandler.Config.Format.SEPARATOR; - final var queryHandlder = new QueryHandler(new QueryHandler.Config(queryFile.toAbsolutePath().toString(), format, null, true, QueryHandler.Config.Order.LINEAR, 0L, QueryHandler.Config.Language.SPARQL)); + Function queryHandlderSupplier = (cached) -> { + try { + return new QueryHandler(new QueryHandler.Config(queryFile.toAbsolutePath().toString(), format, null, cached, QueryHandler.Config.Order.LINEAR, 0L, QueryHandler.Config.Language.SPARQL)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; final var datasetConfig = new DatasetConfig("TestDS", null); final var connection = new ConnectionConfig("TestConn", "1", datasetConfig, uri, new ConnectionConfig.Authentication("testUser", "password"), null, null); - final var workers = new ArrayDeque>(); + final var workers = new ArrayDeque(); int i = 0; - for (var requestType : SPARQLProtocolWorker.RequestFactory.RequestType.values()) { - final var config = new SPARQLProtocolWorker.Config( - 1, - queryHandlder, - new HttpWorker.QueryMixes(QUERY_MIXES), - connection, - Duration.parse("PT100S"), - "application/sparql-results+json", - requestType, - false - ); - workers.add(Named.of(config.requestType().name(), new SPARQLProtocolWorker(i++, processor, config))); + for (var requestType : RequestFactory.RequestType.values()) { + for (var cached : List.of(true, false)) { + final var config = new SPARQLProtocolWorker.Config( + 1, + queryHandlderSupplier.apply(cached), + new HttpWorker.QueryMixes(QUERY_MIXES), + connection, + Duration.parse("PT100S"), + "application/sparql-results+json", + requestType, + true + ); + workers.add(Arguments.of(Named.of(requestType.name(), new SPARQLProtocolWorker(i++, processor, config)), Named.of(String.valueOf(cached), cached))); + } } return workers.stream(); } @@ -104,42 +122,62 @@ public static List completionTargets() { return out; } - @ParameterizedTest(name = "[{index}] requestType = {0}") + @ParameterizedTest(name = "[{index}] requestType = {0}, cached = {1}") @MethodSource("requestFactoryData") @DisplayName("Test Request Factory") - public void testRequestFactory(SPARQLProtocolWorker worker) { + public void testRequestFactory(SPARQLProtocolWorker worker, boolean cached) { + BiFunction encoding = (builder, size) -> { + if (!cached) { + return builder.withHeader("Transfer-Encoding", equalTo("chunked")); + } else { + return builder.withHeader("Content-Length", equalTo(String.valueOf(size))); + } + }; + + MappingBuilder temp; switch (worker.config().requestType()) { - case GET_QUERY -> wm.stubFor(get(urlPathEqualTo("/ds/query")) + case GET_QUERY -> + wm.stubFor(get(urlPathEqualTo("/ds/query")) .withQueryParam("query", equalTo(QUERY)) .withBasicAuth("testUser", "password") .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); case POST_QUERY -> { - wm.stubFor(post(urlPathEqualTo("/ds/query")) + temp = post(urlPathEqualTo("/ds/query")) .withHeader("Content-Type", equalTo("application/sparql-query")) - .withHeader("Transfer-Encoding", equalTo("chunked")) .withBasicAuth("testUser", "password") .withRequestBody(equalTo(QUERY)) - .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body")); + encoding.apply(temp, QUERY.length()); + wm.stubFor(temp); } case POST_UPDATE -> { - wm.stubFor(post(urlPathEqualTo("/ds/query")) + temp = post(urlPathEqualTo("/ds/query")) .withHeader("Content-Type", equalTo("application/sparql-update")) - .withHeader("Transfer-Encoding", equalTo("chunked")) .withBasicAuth("testUser", "password") .withRequestBody(equalTo(QUERY)) - .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body")); + encoding.apply(temp, QUERY.length()); + wm.stubFor(temp); } - case POST_URL_ENC_QUERY -> wm.stubFor(post(urlPathEqualTo("/ds/query")) - .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) - .withBasicAuth("testUser", "password") - .withRequestBody(equalTo("query=" + URLEncoder.encode(QUERY, StandardCharsets.UTF_8))) - .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); - case POST_URL_ENC_UPDATE -> wm.stubFor(post(urlPathEqualTo("/ds/query")) - .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) - .withBasicAuth("testUser", "password") - .withRequestBody(equalTo("update=" + URLEncoder.encode(QUERY, StandardCharsets.UTF_8))) - .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); + case POST_URL_ENC_QUERY -> { + temp = post(urlPathEqualTo("/ds/query")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withBasicAuth("testUser", "password") + .withRequestBody(equalTo("query=" + URLEncoder.encode(QUERY, StandardCharsets.UTF_8))) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body")); + encoding.apply(temp, 43); + wm.stubFor(temp); + } + case POST_URL_ENC_UPDATE -> { + temp = post(urlPathEqualTo("/ds/query")) + .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) + .withBasicAuth("testUser", "password") + .withRequestBody(equalTo("update=" + URLEncoder.encode(QUERY, StandardCharsets.UTF_8))) + .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body")); + encoding.apply(temp, 44); + wm.stubFor(temp); + } } final HttpWorker.Result result = worker.start().join(); @@ -156,7 +194,7 @@ public void testRequestFactory(SPARQLProtocolWorker worker) { @ParameterizedTest(name = "[{index}] fault = {0}") @EnumSource(Fault.class) public void testMalformedResponseProcessing(Fault fault) throws IOException, URISyntaxException { - SPARQLProtocolWorker worker = (SPARQLProtocolWorker) requestFactoryData().toList().get(0).getPayload(); + SPARQLProtocolWorker worker = (SPARQLProtocolWorker) ((Named)requestFactoryData().toList().get(0).get()[0]).getPayload(); wm.stubFor(get(urlPathEqualTo("/ds/query")) .willReturn(aResponse().withFault(fault))); final HttpWorker.Result result = worker.start().join(); @@ -166,7 +204,7 @@ public void testMalformedResponseProcessing(Fault fault) throws IOException, URI @Test public void testBadHttpCodeResponse() throws IOException, URISyntaxException { - SPARQLProtocolWorker worker = (SPARQLProtocolWorker) requestFactoryData().toList().get(0).getPayload(); + SPARQLProtocolWorker worker = (SPARQLProtocolWorker) ((Named)requestFactoryData().toList().get(0).get()[0]).getPayload(); wm.stubFor(get(urlPathEqualTo("/ds/query")) .willReturn(aResponse().withStatus(404))); final HttpWorker.Result result = worker.start().join(); @@ -179,31 +217,32 @@ public void testBadHttpCodeResponse() throws IOException, URISyntaxException { public void testCompletionTargets(HttpWorker.CompletionTarget target) throws URISyntaxException, IOException { final var uri = new URI("http://localhost:" + wm.getPort() + "/ds/query"); final var processor = new ResponseBodyProcessor("application/sparql-results+json"); - final var queryHandlder = new QueryHandler(new QueryHandler.Config(queryFile.toAbsolutePath().toString(), QueryHandler.Config.Format.SEPARATOR, null, true, QueryHandler.Config.Order.LINEAR, 0L, QueryHandler.Config.Language.SPARQL)); + final var queryHandler = new QueryHandler(new QueryHandler.Config(queryFile.toAbsolutePath().toString(), QueryHandler.Config.Format.SEPARATOR, null, true, QueryHandler.Config.Order.LINEAR, 0L, QueryHandler.Config.Language.SPARQL)); final var datasetConfig = new DatasetConfig("TestDS", null); final var connection = new ConnectionConfig("TestConn", "1", datasetConfig, uri, new ConnectionConfig.Authentication("testUser", "password"), null, null); final var config = new SPARQLProtocolWorker.Config( 1, - queryHandlder, + queryHandler, target, connection, Duration.parse("PT20S"), "application/sparql-results+json", - SPARQLProtocolWorker.RequestFactory.RequestType.POST_URL_ENC_QUERY, + RequestFactory.RequestType.POST_URL_ENC_QUERY, false ); SPARQLProtocolWorker worker = new SPARQLProtocolWorker(0, processor, config); wm.stubFor(post(urlPathEqualTo("/ds/query")) .withHeader("Content-Type", equalTo("application/x-www-form-urlencoded")) - .withBasicAuth("testUser", "password") + // .withBasicAuth("testUser", "password") .withRequestBody(equalTo("query=" + URLEncoder.encode(QUERY, StandardCharsets.UTF_8))) .willReturn(aResponse().withStatus(200).withBody("Non-Empty-Body"))); final HttpWorker.Result result = worker.start().join(); for (var stat : result.executionStats()) { + stat.error().ifPresent(ex -> LOGGER.error(ex.getMessage(), ex)); assertTrue(stat.successful()); assertTrue(stat.error().isEmpty()); assertEquals(200, stat.httpStatusCode().orElseThrow()); @@ -239,7 +278,7 @@ public void testTimeLimitExecutionCutoff() throws URISyntaxException, IOExceptio connection, Duration.parse("PT20S"), "application/sparql-results+json", - SPARQLProtocolWorker.RequestFactory.RequestType.POST_URL_ENC_QUERY, + RequestFactory.RequestType.POST_URL_ENC_QUERY, false );