Skip to content

Commit

Permalink
hot fixes and CLI improvment
Browse files Browse the repository at this point in the history
  • Loading branch information
TortugaAttack committed May 23, 2019
1 parent c9efaca commit fa52e3a
Show file tree
Hide file tree
Showing 13 changed files with 550 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,7 @@ public class COMMON {
public static final Long SOCKET_TIMEOUT_VALUE = -1L;

public static final Long WRONG_RESPONSE_CODE_VALUE = -2L;

public static final String QUERY_HASH = "queryHash";

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void receiveData(Properties p) {
size = Long.parseLong(p.get(COMMON.RECEIVE_DATA_SIZE).toString());
}
String queryID = p.getProperty(COMMON.QUERY_ID_KEY);

int queryHash = Integer.parseInt(p.get(COMMON.QUERY_HASH).toString());
Properties extra = getExtraMeta(p);

Properties tmp = getDataFromContainer(extra);
Expand All @@ -61,14 +61,15 @@ public void receiveData(Properties p) {
oldArr[4]+=timeout;
oldArr[5]+=unknown;
oldArr[6]+=wrongCode;
oldArr[7]+=queryHash;
}
else if(tmp!=null){
long[] resArr = {time, success, failure, size, timeout, unknown, wrongCode};
long[] resArr = {time, success, failure, size, timeout, unknown, wrongCode,queryHash};
tmp.put(queryID, resArr);
}
else{
tmp = new Properties();
long[] resArr = new long[]{time, success, failure, size, timeout, unknown, wrongCode};
long[] resArr = new long[]{time, success, failure, size, timeout, unknown, wrongCode,queryHash};
tmp.put(queryID, resArr);
}
addDataToContainer(extra, tmp);
Expand All @@ -89,16 +90,17 @@ public void close() {
Set<String> isRes = new HashSet<String>();
isRes.add(subject);

Triple[] triples = new Triple[9];
Triple[] triples = new Triple[10];
//qps
triples[0] = new Triple(subject, "queriesPerSecond", qps);
//failed
triples[1] = new Triple(subject, "failed", resArr[2]);
//succeded
triples[2] = new Triple(subject, "succeded", resArr[1]);
//queryID
triples[3] = new Triple(subject, "queryID", queryID.toString());
triples[3].setObjectResource(true);

triples[3] = new Triple(subject, "id", queryID.toString());
triples[3].setObjectResource(false);
//totaltime
triples[4] = new Triple(subject, "totalTime", resArr[0]);
triples[5] = new Triple(subject, "resultSize", "?");
Expand All @@ -109,6 +111,9 @@ public void close() {
triples[7] = new Triple(subject, "unknownExceptions", resArr[5]);
triples[8] = new Triple(subject, "wrongCodes", resArr[6]);

triples[9] = new Triple(subject, "queryID", resArr[7]+"/"+queryID.toString());
triples[9].setObjectResource(true);

Properties results = new Properties();
results.put("qps#query", subject);
sendTriples(results, isRes, key, triples);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ public abstract class AbstractWorkerQueryHandler implements QueryHandler{
public AbstractWorkerQueryHandler(Collection<Worker> workers) {
this.workers = workers;
for(Worker worker : workers) {
if(worker instanceof SPARQLWorker || worker instanceof CLIWorker || worker instanceof CLIInputWorker) {
if(worker instanceof SPARQLWorker) {
sparqlKeys.add(((SPARQLWorker)worker).getQueriesFileName());
}
else if(worker instanceof CLIWorker) {
sparqlKeys.add(((CLIWorker)worker).getQueriesFileName());
}
else if(worker instanceof CLIInputWorker){
sparqlKeys.add(((CLIInputWorker)worker).getQueriesFileName());

}
else if(worker instanceof UPDATEWorker) {
updateKeys.add(((UPDATEWorker)worker).getQueriesFileName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,27 @@ public String generateTripleStats(String taskID, String resource, String propert
QueryStatistics qs2 = new QueryStatistics();
qs2.getStatistics(q);
//builder.append("<").append(taskID).append("> ").append(property).append("querySet> <").append(resource + queryFile.getName()).append("> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("aggregations> \"").append(qs2.aggr).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("filter> \"").append(qs2.filter).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("groupBy> \"").append(qs2.groupBy).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("having> \"").append(qs2.having).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("triples> \"").append(qs2.triples).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("offset> \"").append(qs2.offset).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("optional> \"").append(qs2.optional).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("orderBy> \"").append(qs2.orderBy).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(property)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(property)
.append("union> \"").append(qs2.union).append("\"^^<").append(xsdUri).append("int> . \n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(rdfs)
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(rdfs)
.append("label> \"").append(query).append("\" .\n");
builder.append("<").append(resource + queryFile.getName()).append("> <").append(rdfs).append("ID> \"")
builder.append("<").append(resource +query.hashCode() +"/" + queryFile.getName()).append("> <").append(rdfs).append("ID> \"")
.append(queryFile.getName().replace("sparql", "")).append("\" .\n");
//TODO query complexity
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public abstract class AbstractWorker implements Worker {
*/
protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractWorker.class);

private boolean endSignal = false;
protected boolean endSignal = false;
protected long executedQueries;

private Collection<Properties> results = new LinkedList<Properties>();
Expand Down Expand Up @@ -221,6 +221,7 @@ public void startWorker() {
System.out.println("size: "+resultTime[2]);
result.put(COMMON.RECEIVE_DATA_SIZE, resultTime[2]);
}
result.put(COMMON.QUERY_HASH, query.toString().hashCode());
result.setProperty(COMMON.QUERY_ID_KEY, queryID.toString());
// Add extra Meta Key, worker ID and worker Type
result.put(COMMON.EXTRA_META_KEY, this.extra);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.aksw.iguana.tp.tasks.impl.stresstest.worker.impl;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;

public class CLIInputFileWorker extends MultipleCLIInputWorker {


private String dir;

@Override
public void init(String args[]) {
super.init(args);
int i=13;
if(args.length>13) {
processList = new Process[Integer.parseInt(args[13])];
i++;
}
this.dir = args[i];

}

@Override
protected String writableQuery(String query) {
File f;

try {
f = new File(dir+File.separator+"tmpquery.sparql");
f.deleteOnExit();
try(PrintWriter pw = new PrintWriter(f)){
pw.print(query);
}
return f.getName();
} catch (IOException e) {
e.printStackTrace();
}

return query;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.aksw.iguana.tp.tasks.impl.stresstest.worker.impl;


public class CLIInputPrefixWorker extends MultipleCLIInputWorker {

private String prefix;
private String suffix;


@Override
public void init(String args[]) {
super.init(args);
int i=13;
if(args.length>15) {
processList = new Process[Integer.parseInt(args[13])];
i++;
}
this.prefix = args[i];
this.suffix = args[i+1];

}


@Override
protected String writableQuery(String query) {
return prefix+" "+query+" "+suffix;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.aksw.iguana.tp.tasks.impl.stresstest.worker.AbstractWorker;
Expand All @@ -23,54 +25,75 @@ public class CLIInputWorker extends AbstractWorker {
private Process process;
private String queryFinished;
private BufferedReader reader;
private int currentProcess = 0;
private BufferedWriter output;
private Boolean useFileToQuery = false;
private String error;
private ProcessBuilder processBuilder;
private String initFinished;

public CLIInputWorker() {
super("CLIWorker");
super("CLIInputWorker");
}

public CLIInputWorker(String[] args) {
super(args, "CLIWorker");
super(args, "CLIInputWorker");
queryPatternChooser = new Random(this.workerID);

}

@Override
public void init(String args[]) {
super.init(args);
String initFinished = args[10];
this.initFinished = args[10];
this.queryFinished = args[11];
this.error = args[12];
queryPatternChooser = new Random(this.workerID);
// start cli input
ProcessBuilder processBuilder = new ProcessBuilder();
System.out.println("Init CLIInputWorker " + args[11]);

processBuilder = new ProcessBuilder();
processBuilder.redirectErrorStream(true);
try {
if (SystemUtils.IS_OS_LINUX) {

processBuilder.command(new String[] { "bash", "-c", this.service });

} else if (SystemUtils.IS_OS_WINDOWS) {
processBuilder.command(new String[] { "cmd.exe", "-c", this.service });
}
process = processBuilder.start();

output = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));
reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
readUntilStringOccurs(reader, initFinished);

} catch (IOException e) {
e.printStackTrace();
}
}

private long readUntilStringOccurs(BufferedReader reader2, String initFinished) throws IOException {
private long readUntilStringOccurs(BufferedReader reader, String initFinished) throws IOException {
String line;
long size=-1;
while ((line = reader.readLine()) != null) {
if(line.contains(initFinished)) {
break;
}
size++;
System.out.println("Will look for: " + initFinished + " or as error: " + error);
StringBuilder output = new StringBuilder();
long size = -1;
while ((line = reader.readLine()) != null) {
if (line.contains(error)) {
System.out.println("Found error");
System.out.println("Query finished with " + initFinished);

throw new IOException(line);
} else if (line.contains(initFinished)) {
System.out.println("Query finished with " + initFinished);
break;
}

if (output.length() < 1000) {
output.append(line).append("\n");
}
size++;
}
System.out.println(output.substring(0, Math.min(1000, output.length())));
return size;
}

Expand All @@ -79,35 +102,57 @@ public Long[] getTimeForQueryMs(String query, String queryID) {
long start = System.currentTimeMillis();
// execute queryCLI and read response
try {
AtomicLong size =new AtomicLong(-1);
AtomicLong size = new AtomicLong(-1);
AtomicBoolean failed = new AtomicBoolean(false);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {

@Override
public void run() {
try {
System.out.println("Process Alive: " + process.isAlive());
System.out.println("Reader ready: " + reader.ready());
size.set(readUntilStringOccurs(reader, queryFinished));
} catch (IOException e) {
e.printStackTrace();
failed.set(true);
}
}});
output.write(query+"\n");
output.flush();
executor.shutdown();
executor.awaitTermination(this.timeOut, TimeUnit.MILLISECONDS);
}
});
BufferedWriter output = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));
try {
if (process.isAlive()) {
output.write(writableQuery(query) + "\n");
output.flush();
} else if (this.endSignal) {
return new Long[] { 0L, System.currentTimeMillis() - start };
} else {
return new Long[] { 0L, System.currentTimeMillis() - start };
}
} finally {
executor.shutdown();
executor.awaitTermination(this.timeOut, TimeUnit.MILLISECONDS);
}
long end = System.currentTimeMillis();
if(end-start>timeOut) {

if (end - start >= timeOut) {
return new Long[] { 0L, end - start };
} else if (failed.get()) {
return new Long[] { 0L, end - start };
}
System.out.println("[DEBUG] Query successfully executed size: "+size.get());
return new Long[] { 1L, end - start , size.get()};
System.out.println("[DEBUG] Query successfully executed size: " + size.get());
return new Long[] { 1L, end - start, size.get() };
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
// ERROR
return new Long[] { 0L, System.currentTimeMillis() - start };
}


protected String writableQuery(String query) {
return query;
}

@Override
public void getNextQuery(StringBuilder queryStr, StringBuilder queryID) throws IOException {
// get next Query File and next random Query out of it.
Expand All @@ -131,4 +176,9 @@ public void setQueriesList(File[] queries) {
this.currentQueryID = queryPatternChooser.nextInt(this.queryFileList.length);
}

@Override
public void stopSending() {
super.stopSending();
process.destroyForcibly();
}
}
Loading

0 comments on commit fa52e3a

Please sign in to comment.