diff --git a/iguana.commons/src/main/java/org/aksw/iguana/commons/streams/Streams.java b/iguana.commons/src/main/java/org/aksw/iguana/commons/streams/Streams.java index a8438fbe..ddff712e 100644 --- a/iguana.commons/src/main/java/org/aksw/iguana/commons/streams/Streams.java +++ b/iguana.commons/src/main/java/org/aksw/iguana/commons/streams/Streams.java @@ -1,7 +1,5 @@ package org.aksw.iguana.commons.streams; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -22,8 +20,8 @@ public class Streams { * @return the content of inputStream as a string. * @throws IOException from inputStream.read */ - static public BigByteArrayOutputStream inputStream2String(InputStream inputStream) throws IOException { - BigByteArrayOutputStream result = new BigByteArrayOutputStream(); + static public ByteArrayOutputStream inputStream2String(InputStream inputStream) throws IOException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); try { inputStream2ByteArrayOutputStream(inputStream, null, -1.0, result); } catch (TimeoutException e) { @@ -43,8 +41,8 @@ static public BigByteArrayOutputStream inputStream2String(InputStream inputStrea * @throws IOException from inputStream.read * @throws TimeoutException Maybe thrown any time after if startTime + timeout is exceed */ - static public BigByteArrayOutputStream inputStream2String(InputStream inputStream, Instant startTime, double timeout) throws IOException, TimeoutException { - BigByteArrayOutputStream result = new BigByteArrayOutputStream(); + static public ByteArrayOutputStream inputStream2String(InputStream inputStream, Instant startTime, double timeout) throws IOException, TimeoutException { + ByteArrayOutputStream result = new ByteArrayOutputStream(); inputStream2ByteArrayOutputStream(inputStream, startTime, timeout, result); return result; } @@ -60,7 +58,7 @@ static public BigByteArrayOutputStream inputStream2String(InputStream inputStrea * @throws IOException from inputStream.read * @throws TimeoutException Maybe thrown any time after if startTime + timeout is exceed */ - public static long inputStream2ByteArrayOutputStream(InputStream inputStream, Instant startTime, double timeout, BigByteArrayOutputStream result) throws IOException, TimeoutException { + public static long inputStream2ByteArrayOutputStream(InputStream inputStream, Instant startTime, double timeout, ByteArrayOutputStream result) throws IOException, TimeoutException { assert (result != null); boolean enable_timeout = timeout > 0; byte[] buffer = new byte[10 * 1024 * 1024]; // 10 MB buffer @@ -81,7 +79,7 @@ public static long inputStream2ByteArrayOutputStream(InputStream inputStream, In * @return size of the output stream * @throws IOException from inputStream.read */ - public static long inputStream2ByteArrayOutputStream(InputStream inputStream, BigByteArrayOutputStream result) throws IOException { + public static long inputStream2ByteArrayOutputStream(InputStream inputStream, ByteArrayOutputStream result) throws IOException { try { return inputStream2ByteArrayOutputStream(inputStream, Instant.now(), -1, result); }catch(TimeoutException e){ @@ -104,7 +102,7 @@ static public long inputStream2Length(InputStream inputStream, Instant startTime long length; long ret = 0; while ((length = inputStream.read(buffer)) != -1) { - if (durationInMilliseconds(startTime, Instant.now()) > timeout) + if (durationInMilliseconds(startTime, Instant.now()) > timeout && timeout >0) throw new TimeoutException("reading the answer timed out"); ret += length; } diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/AbstractLanguageProcessor.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/AbstractLanguageProcessor.java index f9e5dcd5..088b670b 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/AbstractLanguageProcessor.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/AbstractLanguageProcessor.java @@ -1,7 +1,6 @@ package org.aksw.iguana.cc.lang; import org.aksw.iguana.commons.constants.COMMON; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.aksw.iguana.commons.streams.Streams; import org.aksw.iguana.rp.vocab.Vocab; import org.apache.http.Header; @@ -48,17 +47,17 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura } @Override - public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException { + public Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException { return Long.valueOf(content.size()); } @Override - public long readResponse(InputStream inputStream, BigByteArrayOutputStream responseBody) throws IOException { + public long readResponse(InputStream inputStream, ByteArrayOutputStream responseBody) throws IOException, TimeoutException { return Streams.inputStream2ByteArrayOutputStream(inputStream, responseBody); } //@Override - public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, BigByteArrayOutputStream responseBody) throws IOException, TimeoutException { + public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, ByteArrayOutputStream responseBody) throws IOException, TimeoutException { return Streams.inputStream2ByteArrayOutputStream(inputStream, startTime, timeOut, responseBody); } } diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/LanguageProcessor.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/LanguageProcessor.java index f4cd7181..c18f2d72 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/LanguageProcessor.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/LanguageProcessor.java @@ -1,6 +1,5 @@ package org.aksw.iguana.cc.lang; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.apache.http.Header; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.jena.rdf.model.Model; @@ -48,9 +47,9 @@ public interface LanguageProcessor { */ Long getResultSize(CloseableHttpResponse response) throws ParserConfigurationException, SAXException, ParseException, IOException; - Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException; + Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException; - long readResponse(InputStream inputStream, BigByteArrayOutputStream responseBody) throws IOException; + long readResponse(InputStream inputStream, ByteArrayOutputStream responseBody) throws IOException, TimeoutException; } diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/RDFLanguageProcessor.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/RDFLanguageProcessor.java index 4b87a27a..084f4ba2 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/RDFLanguageProcessor.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/RDFLanguageProcessor.java @@ -5,8 +5,6 @@ import org.aksw.iguana.cc.lang.QueryWrapper; import org.aksw.iguana.commons.annotation.Shorthand; import org.aksw.iguana.commons.constants.COMMON; -import org.aksw.iguana.commons.io.BigByteArrayInputStream; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.aksw.iguana.rp.vocab.Vocab; import org.apache.http.Header; import org.apache.http.client.methods.CloseableHttpResponse; @@ -73,11 +71,11 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura } @Override - public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws IOException { + public Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws IOException { Model m; try { //TODO BBAIS - InputStream inputStream = new BigByteArrayInputStream(content); + InputStream inputStream = new ByteArrayInputStream(content.toByteArray()); m = getModel(contentTypeHeader, inputStream); } catch (IllegalAccessException e) { LOGGER.error("Could not read response as model", e); diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/SPARQLLanguageProcessor.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/SPARQLLanguageProcessor.java index 2336a6f7..13f4ebad 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/SPARQLLanguageProcessor.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/SPARQLLanguageProcessor.java @@ -6,8 +6,6 @@ import org.aksw.iguana.cc.utils.SPARQLQueryStatistics; import org.aksw.iguana.commons.annotation.Shorthand; import org.aksw.iguana.commons.constants.COMMON; -import org.aksw.iguana.commons.io.BigByteArrayInputStream; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.aksw.iguana.rp.vocab.Vocab; import org.apache.commons.lang.StringUtils; import org.apache.http.Header; @@ -123,18 +121,18 @@ public static String getContentTypeVal(Header header) { return "application/sparql-results+json"; } - public static long getJsonResultSize(BigByteArrayOutputStream res) throws ParseException, UnsupportedEncodingException { + public static long getJsonResultSize(ByteArrayOutputStream res) throws ParseException, UnsupportedEncodingException { JSONParser parser = new JSONParser(); SaxSparqlJsonResultCountingParser handler = new SaxSparqlJsonResultCountingParser(); parser.parse(res.toString(StandardCharsets.UTF_8), handler, true); return handler.getNoBindings(); } - public static long getXmlResultSize(BigByteArrayOutputStream res) throws ParserConfigurationException, IOException, SAXException { + public static long getXmlResultSize(ByteArrayOutputStream res) throws ParserConfigurationException, IOException, SAXException { DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder dBuilder = dbFactory.newDocumentBuilder(); - BigByteArrayInputStream bbais = new BigByteArrayInputStream(res); + ByteArrayInputStream bbais = new ByteArrayInputStream(res.toByteArray()); Document doc = dBuilder.parse(bbais); NodeList childNodes = doc.getDocumentElement().getElementsByTagName(XML_RESULT_ROOT_ELEMENT_NAME).item(0).getChildNodes(); @@ -153,7 +151,7 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura HttpEntity httpResponse = response.getEntity(); Header contentTypeHeader = response.getEntity().getContentType(); - BigByteArrayOutputStream entity; + ByteArrayOutputStream entity; try (InputStream inputStream = httpResponse.getContent()) { entity = inputStream2String(inputStream); @@ -165,7 +163,7 @@ public Long getResultSize(CloseableHttpResponse response) throws ParserConfigura } //@Override - public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException { + public Long getResultSize(Header contentTypeHeader, ByteArrayOutputStream content) throws ParserConfigurationException, SAXException, ParseException, IOException { try { switch (getContentTypeVal(contentTypeHeader)) { case QUERY_RESULT_TYPE_JSON: @@ -174,8 +172,14 @@ public Long getResultSize(Header contentTypeHeader, BigByteArrayOutputStream con case QUERY_RESULT_TYPE_XML: return getXmlResultSize(content); default: - return content.countMatches('\n')+1; - //return (long) StringUtils.countMatches(content, "\n") + 1; + //return content.countMatches('\n')+1; + long matches=0; + for(byte b: content.toByteArray()){ + if(b=='\n'){ + matches++; + } + } + return matches+1; } } catch (ParseException | ParserConfigurationException | IOException | SAXException e) { LOGGER.error("Query results could not be parsed: ", e); diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/ThrowawayLanguageProcessor.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/ThrowawayLanguageProcessor.java index 6cff9d27..5c4840da 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/ThrowawayLanguageProcessor.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/lang/impl/ThrowawayLanguageProcessor.java @@ -2,7 +2,6 @@ import org.aksw.iguana.cc.lang.AbstractLanguageProcessor; import org.aksw.iguana.commons.annotation.Shorthand; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.aksw.iguana.commons.streams.Streams; import java.io.ByteArrayOutputStream; @@ -15,7 +14,12 @@ public class ThrowawayLanguageProcessor extends AbstractLanguageProcessor { @Override - public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, BigByteArrayOutputStream responseBody) throws IOException, TimeoutException { + public long readResponse(InputStream inputStream, ByteArrayOutputStream responseBody) throws IOException, TimeoutException { + return Streams.inputStream2Length(inputStream, Instant.now(), 0); + } + + @Override + public long readResponse(InputStream inputStream, Instant startTime, Double timeOut, ByteArrayOutputStream responseBody) throws IOException, TimeoutException { return Streams.inputStream2Length(inputStream, startTime, timeOut); } diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java index 8be67049..1baae0a5 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/tasks/impl/Stresstest.java @@ -249,9 +249,9 @@ public void execute() { private void loopSleep(int timeout) { try { TimeUnit.MILLISECONDS.sleep(timeout); - }catch(Exception e) { - LOGGER.error("Could not warmup "); + //shouldn't be thrown except something else really went wrong + LOGGER.error("Loop sleep did not work.", e); } } diff --git a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/worker/impl/HttpWorker.java b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/worker/impl/HttpWorker.java index 640b3495..653269a1 100644 --- a/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/worker/impl/HttpWorker.java +++ b/iguana.corecontroller/src/main/java/org/aksw/iguana/cc/worker/impl/HttpWorker.java @@ -8,7 +8,6 @@ import org.aksw.iguana.cc.worker.AbstractRandomQueryChooserWorker; import org.aksw.iguana.commons.annotation.Nullable; import org.aksw.iguana.commons.constants.COMMON; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.client.ClientProtocolException; @@ -160,7 +159,7 @@ public void executeQuery(String query, String queryID) { handleException(query, COMMON.QUERY_HTTP_FAILURE, e); } catch (IOException e) { if (requestTimedOut) { - LOGGER.warn("Worker[{} : {}]: Reached timout on query (ID {})\n{}", + LOGGER.warn("Worker[{} : {}]: Reached timeout on query (ID {})\n{}", this.workerType, this.workerID, queryId, query); addResultsOnce(new QueryExecutionStats(queryId, COMMON.QUERY_SOCKET_TIMEOUT, timeOut)); } else { @@ -194,7 +193,7 @@ protected void processHttpResponse() { try (InputStream inputStream = httpResponse.getContent()) { // read content stream //Stream in resultProcessor, return length, set string in StringBuilder. - BigByteArrayOutputStream responseBody = new BigByteArrayOutputStream(); + ByteArrayOutputStream responseBody = new ByteArrayOutputStream(); long length = resultProcessor.readResponse(inputStream, responseBody); tmpExecutedQueries++; // check if such a result was already parsed and is cached @@ -214,7 +213,7 @@ protected void processHttpResponse() { } } - } catch (IOException e) { + } catch (IOException | TimeoutException e) { double duration = durationInMilliseconds(requestStartTime, Instant.now()); addResultsOnce(new QueryExecutionStats(queryId, COMMON.QUERY_HTTP_FAILURE, duration)); } @@ -260,10 +259,10 @@ static class HttpResultProcessor implements Runnable { private final String queryId; private final double duration; private final Header contentTypeHeader; - private BigByteArrayOutputStream contentStream; + private ByteArrayOutputStream contentStream; private final long contentLength; - public HttpResultProcessor(HttpWorker httpWorker, String queryId, double duration, Header contentTypeHeader, BigByteArrayOutputStream contentStream, long contentLength) { + public HttpResultProcessor(HttpWorker httpWorker, String queryId, double duration, Header contentTypeHeader, ByteArrayOutputStream contentStream, long contentLength) { this.httpWorker = httpWorker; this.queryId = queryId; this.duration = duration; diff --git a/iguana.corecontroller/src/test/java/org/aksw/iguana/cc/lang/SPARQLLanguageProcessorTest.java b/iguana.corecontroller/src/test/java/org/aksw/iguana/cc/lang/SPARQLLanguageProcessorTest.java index 7d58958c..93b1aff5 100644 --- a/iguana.corecontroller/src/test/java/org/aksw/iguana/cc/lang/SPARQLLanguageProcessorTest.java +++ b/iguana.corecontroller/src/test/java/org/aksw/iguana/cc/lang/SPARQLLanguageProcessorTest.java @@ -1,7 +1,6 @@ package org.aksw.iguana.cc.lang; import org.aksw.iguana.cc.lang.impl.SPARQLLanguageProcessor; -import org.aksw.iguana.commons.io.BigByteArrayOutputStream; import org.apache.jena.ext.com.google.common.collect.Lists; import org.apache.jena.query.Query; import org.apache.jena.query.QueryFactory; @@ -70,13 +69,13 @@ public class SPARQLLanguageProcessorTest { @Test public void checkJSON() throws ParseException, IOException { - BigByteArrayOutputStream bbaos = new BigByteArrayOutputStream(); + ByteArrayOutputStream bbaos = new ByteArrayOutputStream(); bbaos.write(jsonResult.getBytes()); assertEquals(3, SPARQLLanguageProcessor.getJsonResultSize(bbaos)); //test if valid json response provide 0 bindings try { //check if invalid json throws exception - bbaos = new BigByteArrayOutputStream(); + bbaos = new ByteArrayOutputStream(); bbaos.write("{ \"a\": \"b\"}".getBytes()); SPARQLLanguageProcessor.getJsonResultSize(bbaos); assertTrue("Should have thrown an error", false); @@ -85,7 +84,7 @@ public void checkJSON() throws ParseException, IOException { } try { //check if invalid json throws exception - bbaos = new BigByteArrayOutputStream(); + bbaos = new ByteArrayOutputStream(); bbaos.write("{ \"a\": \"b\"".getBytes()); SPARQLLanguageProcessor.getJsonResultSize(bbaos); assertTrue("Should have thrown an error", false); @@ -96,13 +95,13 @@ public void checkJSON() throws ParseException, IOException { @Test public void checkXML() throws IOException, SAXException, ParserConfigurationException { - BigByteArrayOutputStream bbaos = new BigByteArrayOutputStream(); + ByteArrayOutputStream bbaos = new ByteArrayOutputStream(); bbaos.write(xmlResult.getBytes(StandardCharsets.UTF_8)); assertEquals(2, SPARQLLanguageProcessor.getXmlResultSize(bbaos)); //test if valid xml response provide 0 bindings try { //check if invalid xml throws exception - bbaos = new BigByteArrayOutputStream(); + bbaos = new ByteArrayOutputStream(); bbaos.write("b".getBytes()); SPARQLLanguageProcessor.getJsonResultSize(bbaos); assertTrue("Should have thrown an error", false); @@ -111,7 +110,7 @@ public void checkXML() throws IOException, SAXException, ParserConfigurationExce } try { //check if invalid xml throws exception - bbaos = new BigByteArrayOutputStream(); + bbaos = new ByteArrayOutputStream(); bbaos.write("{ \"a\": \"b\"".getBytes()); SPARQLLanguageProcessor.getJsonResultSize(bbaos); assertTrue("Should have thrown an error", false);