Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apache HTTP Client 5 implementation #243

Merged
merged 25 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
734b261
Add more logging messages
nck-mlcnv Dec 5, 2023
8a62c90
Fix log4j2 configuration
nck-mlcnv Dec 5, 2023
07b3052
Implement apache HTTP client
nck-mlcnv Jan 12, 2024
78408b9
Implement apache HTTP async client 5
nck-mlcnv Jan 12, 2024
476f528
Fix timeout
nck-mlcnv Jan 12, 2024
9587d60
Fixes
nck-mlcnv Feb 6, 2024
1831b60
Fix hashing bug
nck-mlcnv Feb 7, 2024
a9ca2a5
Fix conversion of byte stream to string
nck-mlcnv Feb 8, 2024
48be6c1
Implement POST request streaming
nck-mlcnv Feb 8, 2024
744b09e
Disable the storing and hashing of responses when the parseResults pa…
nck-mlcnv Feb 13, 2024
d454f6b
Move utility classes
nck-mlcnv Feb 14, 2024
d517e9c
StreamEntityProducer can send fixed-sized data and is reproducible now
nck-mlcnv Feb 16, 2024
c238a40
Make QueryHandler return stream supplier and info about query being c…
nck-mlcnv Feb 16, 2024
d9190cb
Change RequestFactory behavior
nck-mlcnv Feb 16, 2024
c25b560
Cleanup
nck-mlcnv Feb 16, 2024
94bcb3c
Preload requests
nck-mlcnv Feb 16, 2024
52ce0bb
Fix IDE warnings
nck-mlcnv Feb 16, 2024
e2f8199
Fix tests
nck-mlcnv Feb 16, 2024
fc523f0
Remove unneeded test class
nck-mlcnv Feb 16, 2024
bac8488
Add Javadocs
nck-mlcnv Feb 16, 2024
7d06c7d
Change requests
nck-mlcnv Feb 22, 2024
2701b27
Move the RequestFactory to a separate class and add comments
nck-mlcnv Feb 22, 2024
03a04b0
Add comments from overridden methods
nck-mlcnv Feb 22, 2024
71f2a0e
Merge branch 'develop' into feature/apache-http-async-client-5
nck-mlcnv Feb 22, 2024
c3fbeb7
Lower maximum capacity while reading response
nck-mlcnv Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
Expand All @@ -123,12 +113,6 @@
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
Expand Down Expand Up @@ -176,6 +160,12 @@
<artifactId>spring-context</artifactId>
<version>6.0.11</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.3</version>
</dependency>

</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.beust.jcommander.*;
import org.aksw.iguana.cc.suite.IguanaSuiteParser;
import org.aksw.iguana.cc.suite.Suite;
import org.apache.logging.log4j.core.config.Configurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;


Expand Down Expand Up @@ -44,6 +46,10 @@ public Path convert(String value) {
* @param argc The command line arguments that are passed to the program.
*/
public static void main(String[] argc) {
// Apparently, there is something weird going on, where the apache jena library already configures log4j2 for
// some reason. That's why you have to call reconfigure here.
Configurator.reconfigure(URI.create("log4j2.yml"));

var args = new Args();
JCommander jc = JCommander.newBuilder()
.addObject(args)
Expand Down
21 changes: 18 additions & 3 deletions src/main/java/org/aksw/iguana/cc/query/handler/QueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.aksw.iguana.cc.query.selector.QuerySelector;
Expand All @@ -23,6 +22,7 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Objects;
import java.util.function.Supplier;

/**
* The QueryHandler is used by every worker that extends the AbstractWorker.
Expand Down Expand Up @@ -124,7 +124,7 @@ public String value() {
}

public record QueryStringWrapper(int index, String query) {}
public record QueryStreamWrapper(int index, InputStream queryInputStream) {}
public record QueryStreamWrapper(int index, boolean cached, Supplier<InputStream> queryInputStreamSupplier) {}


protected final Logger LOGGER = LoggerFactory.getLogger(QueryHandler.class);
Expand Down Expand Up @@ -180,7 +180,13 @@ public QueryStringWrapper getNextQuery(QuerySelector querySelector) throws IOExc

public QueryStreamWrapper getNextQueryStream(QuerySelector querySelector) throws IOException {
final var queryIndex = querySelector.getNextIndex();
return new QueryStreamWrapper(queryIndex, this.queryList.getQueryStream(queryIndex));
return new QueryStreamWrapper(queryIndex, config.caching(), () -> {
try {
return this.queryList.getQueryStream(queryIndex);
} catch (IOException e) {
throw new RuntimeException(e);
}
nck-mlcnv marked this conversation as resolved.
Show resolved Hide resolved
});
}

@Override
Expand Down Expand Up @@ -209,4 +215,13 @@ public String[] getAllQueryIds() {
}
return out;
}

/**
* Returns the configuration of the QueryHandler.
*
* @return the configuration of the QueryHandler
*/
public Config getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.aksw.iguana.cc.query.source;

import org.aksw.iguana.cc.utils.FileUtils;
import org.aksw.iguana.cc.utils.files.FileUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.aksw.iguana.cc.query.source.impl;

import org.aksw.iguana.cc.utils.FileUtils;
import org.aksw.iguana.cc.utils.files.FileUtils;

import java.io.IOException;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.aksw.iguana.cc.query.source.impl;

import org.aksw.iguana.cc.query.source.QuerySource;
import org.aksw.iguana.cc.utils.IndexedQueryReader;
import org.aksw.iguana.cc.utils.files.IndexedQueryReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.aksw.iguana.cc.query.source.impl;

import org.aksw.iguana.cc.query.source.QuerySource;
import org.aksw.iguana.cc.utils.FileUtils;
import org.aksw.iguana.cc.utils.files.FileUtils;
import org.apache.commons.io.input.AutoCloseInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/aksw/iguana/cc/suite/Suite.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ else if (storageConfig instanceof RDFFileStorage.Config) {

public void run() {
for (int i = 0; i < tasks.size(); i++) {
LOGGER.info("Task/{} {} starting.", tasks.get(i).getTaskName(), i);
tasks.get(i).run();
LOGGER.info("Task/{} {} finished.", tasks.get(i).getTaskName(), i);
}
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ public Stresstest(String suiteID, long stresstestID, Config config, ResponseBody
}

public void run() {
var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed
if (!warmupWorkers.isEmpty()) {
SPARQLProtocolWorker.initHttpClient(warmupWorkers.size());
var warmupResults = executeWorkers(warmupWorkers); // warmup results will be dismissed
SPARQLProtocolWorker.closeHttpClient();
}

SPARQLProtocolWorker.initHttpClient(workers.size());
var results = executeWorkers(workers);
SPARQLProtocolWorker.closeHttpClient();

srp.process(results.workerResults);
srp.calculateAndSaveMetrics(results.startTime, results.endTime);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aksw.iguana.cc.utils;
package org.aksw.iguana.cc.utils.files;

import java.io.*;
import java.nio.charset.StandardCharsets;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.aksw.iguana.cc.utils;
package org.aksw.iguana.cc.utils.files;

import org.apache.commons.io.input.AutoCloseInputStream;
import org.apache.commons.io.input.BoundedInputStream;
Expand Down
144 changes: 144 additions & 0 deletions src/main/java/org/aksw/iguana/cc/utils/http/RequestFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.aksw.iguana.cc.utils.http;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.aksw.iguana.cc.config.elements.ConnectionConfig;
import org.aksw.iguana.cc.query.handler.QueryHandler;
import org.aksw.iguana.cc.worker.HttpWorker;
import org.aksw.iguana.cc.worker.impl.SPARQLProtocolWorker;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.net.URIBuilder;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* A factory for creating HTTP requests.
* The factory can create requests for different types of HTTP methods and different types of SPARQL queries.
* The factory can also cache requests to avoid creating the same request multiple times.
*/
public final class RequestFactory {
public enum RequestType {
GET_QUERY("get query"),
POST_URL_ENC_QUERY("post url-enc query"),
POST_QUERY("post query"),
POST_URL_ENC_UPDATE("post url-enc update"),
POST_UPDATE("post update");

private final String value;

@JsonCreator
RequestType(String value) {
this.value = Objects.requireNonNullElse(value, "get query");
}

@JsonValue
public String value() {
return value;
}
}

private final RequestType requestType;
private final Map<Integer, AsyncRequestProducer> cache = new HashMap<>();

public RequestFactory(RequestType requestType) {
this.requestType = requestType;
}

private static String urlEncode(List<String[]> parameters) {
return parameters.stream()
.map(e -> e[0] + "=" + URLEncoder.encode(e[1], StandardCharsets.UTF_8))
.collect(Collectors.joining("&"));
}

private static String urlEncode(String name, String value) {
return name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
}

/**
* Builds an HTTP request for a given query.
* If the query has been cached by the query handler, its content will be fully read by the entity producer into a
* byte buffer, which will then be reused on consecutive request executions.
* Cached requests will be sent non-chunked.
* If the query has not been cached by the query handler, the entity producer will use the query stream supplier to
* send the query in chunks.
*
* @param queryHandle the query handle containing the query and its index
* @param connection the connection to send the request to
* @param requestHeader the request header
* @return the request as an AsyncRequestProducer
* @throws URISyntaxException if the URI is invalid
* @throws IOException if the query stream cannot be read
*/
public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper queryHandle,
ConnectionConfig connection,
String requestHeader) throws URISyntaxException, IOException {
if (queryHandle.cached() && cache.containsKey(queryHandle.index()))
return cache.get(queryHandle.index());

AsyncRequestBuilder asyncRequestBuilder;
Supplier<InputStream> queryStreamSupplier;
InputStream queryStream;

try {
queryStreamSupplier = queryHandle.queryInputStreamSupplier();
queryStream = queryStreamSupplier.get();
} catch (RuntimeException e) {
throw new IOException(e);
}

switch (this.requestType) {
case GET_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.get(new URIBuilder(connection.endpoint())
.addParameter("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8))
.build()
);
case POST_URL_ENC_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
// manually set content type, because otherwise the
// entity producer would set it to "application/x-www-form-urlencoded; charset=ISO-8859-1"
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new BasicAsyncEntityProducer(urlEncode("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, !queryHandle.cached()));
case POST_QUERY -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached(), "application/sparql-query"));
case POST_URL_ENC_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new BasicAsyncEntityProducer(urlEncode("update", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), null, !queryHandle.cached()));
case POST_UPDATE -> asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached(), "application/sparql-update"));
default -> throw new IllegalStateException("Unexpected value: " + this.requestType);
}

if (requestHeader != null)
asyncRequestBuilder.addHeader("Accept", requestHeader);
if (connection.authentication() != null && connection.authentication().user() != null)
asyncRequestBuilder.addHeader("Authorization",
HttpWorker.basicAuth(connection.authentication().user(),
Optional.ofNullable(connection.authentication().password()).orElse("")));

if (queryHandle.cached())
cache.put(queryHandle.index(), asyncRequestBuilder.build());

return asyncRequestBuilder.build();
}

/**
* Get a cached request by the index of the query.
* If the request is not in the cache, an IllegalArgumentException is thrown.
*
* @param index the index of the query
* @return the request as an AsyncRequestProducer
*/
public AsyncRequestProducer getCachedRequest(int index) {
if (!cache.containsKey(index))
throw new IllegalArgumentException("No request with index " + index + " found in cache.");
return cache.get(index);
}
}
Loading
Loading