Skip to content

Commit

Permalink
#134 - fixed, rc2
Browse files Browse the repository at this point in the history
  • Loading branch information
TortugaAttack committed Feb 1, 2021
1 parent 1d7c3d4 commit 0f77c6c
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.aksw.iguana.commons.streams;

import org.aksw.iguana.commons.io.BigByteArrayOutputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -22,8 +20,8 @@ public class Streams {
* @return the content of inputStream as a string.
* @throws IOException from inputStream.read
*/
static public BigByteArrayOutputStream inputStream2String(InputStream inputStream) throws IOException {
BigByteArrayOutputStream result = new BigByteArrayOutputStream();
static public ByteArrayOutputStream inputStream2String(InputStream inputStream) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
try {
inputStream2ByteArrayOutputStream(inputStream, null, -1.0, result);
} catch (TimeoutException e) {
Expand All @@ -43,8 +41,8 @@ static public BigByteArrayOutputStream inputStream2String(InputStream inputStrea
* @throws IOException from inputStream.read
* @throws TimeoutException Maybe thrown any time after if startTime + timeout is exceed
*/
static public BigByteArrayOutputStream inputStream2String(InputStream inputStream, Instant startTime, double timeout) throws IOException, TimeoutException {
BigByteArrayOutputStream result = new BigByteArrayOutputStream();
static public ByteArrayOutputStream inputStream2String(InputStream inputStream, Instant startTime, double timeout) throws IOException, TimeoutException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
inputStream2ByteArrayOutputStream(inputStream, startTime, timeout, result);
return result;
}
Expand All @@ -60,7 +58,7 @@ static public BigByteArrayOutputStream inputStream2String(InputStream inputStrea
* @throws IOException from inputStream.read
* @throws TimeoutException Maybe thrown any time after if startTime + timeout is exceed
*/
public static long inputStream2ByteArrayOutputStream(InputStream inputStream, Instant startTime, double timeout, BigByteArrayOutputStream result) throws IOException, TimeoutException {
public static long inputStream2ByteArrayOutputStream(InputStream inputStream, Instant startTime, double timeout, ByteArrayOutputStream result) throws IOException, TimeoutException {
assert (result != null);
boolean enable_timeout = timeout > 0;
byte[] buffer = new byte[10 * 1024 * 1024]; // 10 MB buffer
Expand All @@ -81,7 +79,7 @@ public static long inputStream2ByteArrayOutputStream(InputStream inputStream, In
* @return size of the output stream
* @throws IOException from inputStream.read
*/
public static long inputStream2ByteArrayOutputStream(InputStream inputStream, BigByteArrayOutputStream result) throws IOException {
public static long inputStream2ByteArrayOutputStream(InputStream inputStream, ByteArrayOutputStream result) throws IOException {
try {
return inputStream2ByteArrayOutputStream(inputStream, Instant.now(), -1, result);
}catch(TimeoutException e){
Expand All @@ -104,7 +102,7 @@ static public long inputStream2Length(InputStream inputStream, Instant startTime
long length;
long ret = 0;
while ((length = inputStream.read(buffer)) != -1) {
if (durationInMilliseconds(startTime, Instant.now()) > timeout)
if (durationInMilliseconds(startTime, Instant.now()) > timeout && timeout >0)
throw new TimeoutException("reading the answer timed out");
ret += length;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.aksw.iguana.cc.lang;

import org.aksw.iguana.commons.constants.COMMON;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.aksw.iguana.commons.streams.Streams;
import org.aksw.iguana.rp.vocab.Vocab;
import org.apache.http.Header;
Expand Down Expand Up @@ -48,17 +47,17 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura
}

@Override
public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException {
public Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException {
return Long.valueOf(content.size());
}

@Override
public long readResponse(InputStream inputStream, BigByteArrayOutputStream responseBody) throws IOException {
public long readResponse(InputStream inputStream, ByteArrayOutputStream responseBody) throws IOException, TimeoutException {
return Streams.inputStream2ByteArrayOutputStream(inputStream, responseBody);
}

//@Override
public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, BigByteArrayOutputStream responseBody) throws IOException, TimeoutException {
public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, ByteArrayOutputStream responseBody) throws IOException, TimeoutException {
return Streams.inputStream2ByteArrayOutputStream(inputStream, startTime, timeOut, responseBody);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.aksw.iguana.cc.lang;

import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.jena.rdf.model.Model;
Expand Down Expand Up @@ -48,9 +47,9 @@ public interface LanguageProcessor {
*/
Long getResultSize(CloseableHttpResponse response) throws ParserConfigurationException, SAXException, ParseException, IOException;

Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException;
Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException;


long readResponse(InputStream inputStream, BigByteArrayOutputStream responseBody) throws IOException;
long readResponse(InputStream inputStream, ByteArrayOutputStream responseBody) throws IOException, TimeoutException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import org.aksw.iguana.cc.lang.QueryWrapper;
import org.aksw.iguana.commons.annotation.Shorthand;
import org.aksw.iguana.commons.constants.COMMON;
import org.aksw.iguana.commons.io.BigByteArrayInputStream;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.aksw.iguana.rp.vocab.Vocab;
import org.apache.http.Header;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -73,11 +71,11 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura
}

@Override
public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws IOException {
public Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws IOException {
Model m;
try {
//TODO BBAIS
InputStream inputStream = new BigByteArrayInputStream(content);
InputStream inputStream = new ByteArrayInputStream(content.toByteArray());
m = getModel(contentTypeHeader, inputStream);
} catch (IllegalAccessException e) {
LOGGER.error("Could not read response as model", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import org.aksw.iguana.cc.utils.SPARQLQueryStatistics;
import org.aksw.iguana.commons.annotation.Shorthand;
import org.aksw.iguana.commons.constants.COMMON;
import org.aksw.iguana.commons.io.BigByteArrayInputStream;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.aksw.iguana.rp.vocab.Vocab;
import org.apache.commons.lang.StringUtils;
import org.apache.http.Header;
Expand Down Expand Up @@ -123,18 +121,18 @@ public static String getContentTypeVal(Header header) {
return "application/sparql-results+json";
}

public static long getJsonResultSize(BigByteArrayOutputStream res) throws ParseException, UnsupportedEncodingException {
public static long getJsonResultSize(ByteArrayOutputStream res) throws ParseException, UnsupportedEncodingException {
JSONParser parser = new JSONParser();
SaxSparqlJsonResultCountingParser handler = new SaxSparqlJsonResultCountingParser();
parser.parse(res.toString(StandardCharsets.UTF_8), handler, true);
return handler.getNoBindings();
}

public static long getXmlResultSize(BigByteArrayOutputStream res) throws ParserConfigurationException, IOException, SAXException {
public static long getXmlResultSize(ByteArrayOutputStream res) throws ParserConfigurationException, IOException, SAXException {
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();

BigByteArrayInputStream bbais = new BigByteArrayInputStream(res);
ByteArrayInputStream bbais = new ByteArrayInputStream(res.toByteArray());
Document doc = dBuilder.parse(bbais);
NodeList childNodes = doc.getDocumentElement().getElementsByTagName(XML_RESULT_ROOT_ELEMENT_NAME).item(0).getChildNodes();

Expand All @@ -153,7 +151,7 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura
HttpEntity httpResponse = response.getEntity();
Header contentTypeHeader = response.getEntity().getContentType();

BigByteArrayOutputStream entity;
ByteArrayOutputStream entity;
try (InputStream inputStream = httpResponse.getContent()) {

entity = inputStream2String(inputStream);
Expand All @@ -165,7 +163,7 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura
}

//@Override
public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException {
public Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException {
try {
switch (getContentTypeVal(contentTypeHeader)) {
case QUERY_RESULT_TYPE_JSON:
Expand All @@ -174,8 +172,14 @@ public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream con
case QUERY_RESULT_TYPE_XML:
return getXmlResultSize(content);
default:
return content.countMatches('\n')+1;
//return (long) StringUtils.countMatches(content, "\n") + 1;
//return content.countMatches('\n')+1;
long matches=0;
for(byte b: content.toByteArray()){
if(b=='\n'){
matches++;
}
}
return matches+1;
}
} catch (ParseException | ParserConfigurationException | IOException | SAXException e) {
LOGGER.error("Query results could not be parsed: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import org.aksw.iguana.cc.lang.AbstractLanguageProcessor;
import org.aksw.iguana.commons.annotation.Shorthand;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.aksw.iguana.commons.streams.Streams;

import java.io.ByteArrayOutputStream;
Expand All @@ -15,7 +14,12 @@
public class ThrowawayLanguageProcessor extends AbstractLanguageProcessor {

@Override
public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, BigByteArrayOutputStream responseBody) throws IOException, TimeoutException {
public long readResponse(InputStream inputStream, ByteArrayOutputStream responseBody) throws IOException, TimeoutException {
return Streams.inputStream2Length(inputStream, Instant.now(), 0);
}

@Override
public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, ByteArrayOutputStream responseBody) throws IOException, TimeoutException {
return Streams.inputStream2Length(inputStream, startTime, timeOut);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ public void execute() {
private void loopSleep(int timeout) {
try {
TimeUnit.MILLISECONDS.sleep(timeout);

}catch(Exception e) {
LOGGER.error("Could not warmup ");
//shouldn't be thrown except something else really went wrong
LOGGER.error("Loop sleep did not work.", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.aksw.iguana.cc.worker.AbstractRandomQueryChooserWorker;
import org.aksw.iguana.commons.annotation.Nullable;
import org.aksw.iguana.commons.constants.COMMON;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.ClientProtocolException;
Expand Down Expand Up @@ -160,7 +159,7 @@ public void executeQuery(String query, String queryID) {
handleException(query, COMMON.QUERY_HTTP_FAILURE, e);
} catch (IOException e) {
if (requestTimedOut) {
LOGGER.warn("Worker[{} : {}]: Reached timout on query (ID {})\n{}",
LOGGER.warn("Worker[{} : {}]: Reached timeout on query (ID {})\n{}",
this.workerType, this.workerID, queryId, query);
addResultsOnce(new QueryExecutionStats(queryId, COMMON.QUERY_SOCKET_TIMEOUT, timeOut));
} else {
Expand Down Expand Up @@ -194,7 +193,7 @@ protected void processHttpResponse() {
try (InputStream inputStream = httpResponse.getContent()) {
// read content stream
//Stream in resultProcessor, return length, set string in StringBuilder.
BigByteArrayOutputStream responseBody = new BigByteArrayOutputStream();
ByteArrayOutputStream responseBody = new ByteArrayOutputStream();
long length = resultProcessor.readResponse(inputStream, responseBody);
tmpExecutedQueries++;
// check if such a result was already parsed and is cached
Expand All @@ -214,7 +213,7 @@ protected void processHttpResponse() {
}
}

} catch (IOException e) {
} catch (IOException | TimeoutException e) {
double duration = durationInMilliseconds(requestStartTime, Instant.now());
addResultsOnce(new QueryExecutionStats(queryId, COMMON.QUERY_HTTP_FAILURE, duration));
}
Expand Down Expand Up @@ -260,10 +259,10 @@ static class HttpResultProcessor implements Runnable {
private final String queryId;
private final double duration;
private final Header contentTypeHeader;
private BigByteArrayOutputStream contentStream;
private ByteArrayOutputStream contentStream;
private final long contentLength;

public HttpResultProcessor(HttpWorker httpWorker, String queryId, double duration, Header contentTypeHeader, BigByteArrayOutputStream contentStream, long contentLength) {
public HttpResultProcessor(HttpWorker httpWorker, String queryId, double duration, Header contentTypeHeader, ByteArrayOutputStream contentStream, long contentLength) {
this.httpWorker = httpWorker;
this.queryId = queryId;
this.duration = duration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.aksw.iguana.cc.lang;

import org.aksw.iguana.cc.lang.impl.SPARQLLanguageProcessor;
import org.aksw.iguana.commons.io.BigByteArrayOutputStream;
import org.apache.jena.ext.com.google.common.collect.Lists;
import org.apache.jena.query.Query;
import org.apache.jena.query.QueryFactory;
Expand Down Expand Up @@ -70,13 +69,13 @@ public class SPARQLLanguageProcessorTest {

@Test
public void checkJSON() throws ParseException, IOException {
BigByteArrayOutputStream bbaos = new BigByteArrayOutputStream();
ByteArrayOutputStream bbaos = new ByteArrayOutputStream();
bbaos.write(jsonResult.getBytes());
assertEquals(3, SPARQLLanguageProcessor.getJsonResultSize(bbaos));
//test if valid json response provide 0 bindings
try {
//check if invalid json throws exception
bbaos = new BigByteArrayOutputStream();
bbaos = new ByteArrayOutputStream();
bbaos.write("{ \"a\": \"b\"}".getBytes());
SPARQLLanguageProcessor.getJsonResultSize(bbaos);
assertTrue("Should have thrown an error", false);
Expand All @@ -85,7 +84,7 @@ public void checkJSON() throws ParseException, IOException {
}
try {
//check if invalid json throws exception
bbaos = new BigByteArrayOutputStream();
bbaos = new ByteArrayOutputStream();
bbaos.write("{ \"a\": \"b\"".getBytes());
SPARQLLanguageProcessor.getJsonResultSize(bbaos);
assertTrue("Should have thrown an error", false);
Expand All @@ -96,13 +95,13 @@ public void checkJSON() throws ParseException, IOException {

@Test
public void checkXML() throws IOException, SAXException, ParserConfigurationException {
BigByteArrayOutputStream bbaos = new BigByteArrayOutputStream();
ByteArrayOutputStream bbaos = new ByteArrayOutputStream();
bbaos.write(xmlResult.getBytes(StandardCharsets.UTF_8));
assertEquals(2, SPARQLLanguageProcessor.getXmlResultSize(bbaos));
//test if valid xml response provide 0 bindings
try {
//check if invalid xml throws exception
bbaos = new BigByteArrayOutputStream();
bbaos = new ByteArrayOutputStream();
bbaos.write("<a>b</a>".getBytes());
SPARQLLanguageProcessor.getJsonResultSize(bbaos);
assertTrue("Should have thrown an error", false);
Expand All @@ -111,7 +110,7 @@ public void checkXML() throws IOException, SAXException, ParserConfigurationExce
}
try {
//check if invalid xml throws exception
bbaos = new BigByteArrayOutputStream();
bbaos = new ByteArrayOutputStream();
bbaos.write("{ \"a\": \"b\"".getBytes());
SPARQLLanguageProcessor.getJsonResultSize(bbaos);
assertTrue("Should have thrown an error", false);
Expand Down

0 comments on commit 0f77c6c

Please sign in to comment.