Skip to content

Commit

Permalink
Change RequestFactory behavior
Browse files Browse the repository at this point in the history
* cached queries will be sent with fixed-sizes request
* requests of cached queries will be cached as well (addresses #223)
  • Loading branch information
nck-mlcnv committed Feb 16, 2024
1 parent c238a40 commit d9190cb
Showing 1 changed file with 40 additions and 85 deletions.
125 changes: 40 additions & 85 deletions src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import net.jpountz.xxhash.XXHashFactory;
import org.aksw.iguana.cc.config.elements.ConnectionConfig;
import org.aksw.iguana.cc.query.handler.QueryHandler;
import org.aksw.iguana.cc.utils.http.StreamEntityProducer;
import org.aksw.iguana.cc.worker.ResponseBodyProcessor;
import org.aksw.iguana.cc.worker.HttpWorker;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
Expand All @@ -24,9 +25,7 @@
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.StreamChannel;
import org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityProducer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.reactor.IOReactorConfig;
Expand Down Expand Up @@ -74,6 +73,7 @@ public String value() {
}

private final RequestType requestType;
private final Map<Integer, AsyncRequestProducer> cache = new HashMap<>();

public RequestFactory(RequestType requestType) {
this.requestType = requestType;
Expand All @@ -89,93 +89,45 @@ private static String urlEncode(String name, String value) {
return name + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8);
}

private static class StreamEntityProducer extends AbstractBinAsyncEntityProducer {

private final byte[] buffer = new byte[8192];
private final InputStream stream;

public StreamEntityProducer(InputStream stream, ContentType contentType) {
super(8192, contentType);
this.stream = stream;
}

@Override
protected int availableData() {
try {
return stream.available();
} catch (IOException ignore) {} // TODO: log
return 0;
}

@Override
protected void produceData(StreamChannel<ByteBuffer> channel) throws IOException {
int read;
if ((read = stream.read(buffer)) != -1) {
channel.write(ByteBuffer.wrap(buffer, 0, read));
} else {
channel.endStream();
}
}

@Override
public boolean isRepeatable() {
return false;
}

@Override
public void failed(Exception cause) {
// TODO: log
}

@Override
public boolean isChunked() {
return true;
}

@Override
public long getContentLength() {
return super.getContentLength();
}

@Override
public void releaseResources() {
super.releaseResources();
}
}

public AsyncRequestProducer buildHttpRequest(InputStream queryStream,
public AsyncRequestProducer buildHttpRequest(QueryHandler.QueryStreamWrapper queryHandle,
ConnectionConfig connection,
String requestHeader) throws URISyntaxException, IOException {
if (queryHandle.cached() && cache.containsKey(queryHandle.index()))
return cache.get(queryHandle.index());

AsyncRequestBuilder asyncRequestBuilder;
Supplier<InputStream> queryStreamSupplier;
InputStream queryStream;

try {
queryStreamSupplier = queryHandle.queryInputStreamSupplier();
queryStream = queryStreamSupplier.get();
} catch (RuntimeException e) {
throw new IOException(e);
}

switch (this.requestType) {
case GET_QUERY -> {
asyncRequestBuilder = AsyncRequestBuilder.get(new URIBuilder(connection.endpoint())
.addParameter("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8))
.build()
);
}
case POST_URL_ENC_QUERY -> {
// entity will be automatically set to the url encoded parameters
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new StringAsyncEntityProducer(urlEncode("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8))));
}
case POST_QUERY -> {
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/sparql-query")
.setEntity(new StreamEntityProducer(queryStream, ContentType.create("application/sparql-query")));
}
case POST_URL_ENC_UPDATE -> {
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new StringAsyncEntityProducer(urlEncode("update", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8))));
}
case POST_UPDATE -> {
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/sparql-update")
.setEntity(new StreamEntityProducer(queryStream, ContentType.create("application/sparql-update")));
}
case GET_QUERY ->
asyncRequestBuilder = AsyncRequestBuilder.get(new URIBuilder(connection.endpoint())
.addParameter("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8))
.build()
);
case POST_URL_ENC_QUERY ->
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new BasicAsyncEntityProducer(urlEncode("query", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), ContentType.APPLICATION_FORM_URLENCODED, !queryHandle.cached()));
case POST_QUERY ->
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/sparql-query")
.setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached()));
case POST_URL_ENC_UPDATE ->
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.setEntity(new BasicAsyncEntityProducer(urlEncode("update", new String(queryStream.readAllBytes(), StandardCharsets.UTF_8)), ContentType.APPLICATION_FORM_URLENCODED, !queryHandle.cached()));
case POST_UPDATE ->
asyncRequestBuilder = AsyncRequestBuilder.post(connection.endpoint())
.setHeader(HttpHeaders.CONTENT_TYPE, "application/sparql-update")
.setEntity(new StreamEntityProducer(queryStreamSupplier, !queryHandle.cached()));
default -> throw new IllegalStateException("Unexpected value: " + this.requestType);
}

Expand All @@ -186,6 +138,9 @@ public AsyncRequestProducer buildHttpRequest(InputStream queryStream,
HttpWorker.basicAuth(connection.authentication().user(),
Optional.ofNullable(connection.authentication().password()).orElse("")));

if (queryHandle.cached())
cache.put(queryHandle.index(), asyncRequestBuilder.build());

return asyncRequestBuilder.build();
}
}
Expand Down

0 comments on commit d9190cb

Please sign in to comment.