Skip to content

Commit

Permalink
rework threading on QueryStreamWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
bluemonk3y committed Oct 17, 2017
1 parent 87671d0 commit 587b233
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 160 deletions.
1 change: 0 additions & 1 deletion config/ksqlserver.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#bootstrap.servers=localhost:1119092
bootstrap.servers=localhost:9092
ksql.command.topic.suffix=commands

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class QueuedSchemaKStream extends SchemaKStream {

private final SynchronousQueue<KeyValue<String, GenericRow>> rowQueue = new SynchronousQueue<>();
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = new LinkedBlockingQueue<>(100);

private QueuedSchemaKStream(final Schema schema, final KStream kstream, final Field keyField,
final List<SchemaKStream> sourceSchemaKStreams,
Expand All @@ -64,7 +64,7 @@ private QueuedSchemaKStream(final Schema schema, final KStream kstream, final Fi
);
}

public SynchronousQueue<KeyValue<String, GenericRow>> getQueue() {
public BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
return rowQueue;
}

Expand Down Expand Up @@ -126,11 +126,11 @@ public List<SchemaKStream> getSourceSchemaKStreams() {
}

protected static class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
private final SynchronousQueue<KeyValue<String, GenericRow>> queue;
private final BlockingQueue<KeyValue<String, GenericRow>> queue;
private final Optional<Integer> limit;
private int counter = 0;

QueuePopulator(SynchronousQueue<KeyValue<String, GenericRow>> queue,
QueuePopulator(BlockingQueue<KeyValue<String, GenericRow>> queue,
Optional<Integer> limit) {
this.queue = queue;
this.limit = limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@
import org.apache.kafka.streams.KeyValue;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class QueuedQueryMetadata extends QueryMetadata {

private final SynchronousQueue<KeyValue<String, GenericRow>> rowQueue;
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;

public QueuedQueryMetadata(
String statementString,
KafkaStreams kafkaStreams,
OutputNode outputNode,
String executionPlan,
SynchronousQueue<KeyValue<String, GenericRow>> rowQueue,
BlockingQueue<KeyValue<String, GenericRow>> rowQueue,
DataSource.DataSourceType dataSourceType
) {
super(statementString, kafkaStreams, outputNode, executionPlan, dataSourceType);
this.rowQueue = rowQueue;
}

public SynchronousQueue<KeyValue<String, GenericRow>> getRowQueue() {
public BlockingQueue<KeyValue<String, GenericRow>> getRowQueue() {
return rowQueue;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +43,9 @@ class QueryStreamWriter implements StreamingOutput {

private final QueuedQueryMetadata queryMetadata;
private final long disconnectCheckInterval;
private final AtomicReference<Throwable> streamsException;
private final ObjectMapper objectMapper;
private Throwable streamsException;


QueryStreamWriter(
KsqlEngine ksqlEngine,
Expand All @@ -51,6 +56,7 @@ class QueryStreamWriter implements StreamingOutput {
throws Exception {
QueryMetadata queryMetadata =
ksqlEngine.buildMultipleQueries(true, queryString, overriddenProperties).get(0);
this.objectMapper = new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
if (!(queryMetadata instanceof QueuedQueryMetadata)) {
throw new Exception(String.format(
"Unexpected metadata type: expected QueuedQueryMetadata, found %s instead",
Expand All @@ -61,82 +67,62 @@ class QueryStreamWriter implements StreamingOutput {
this.disconnectCheckInterval = disconnectCheckInterval;
this.queryMetadata = ((QueuedQueryMetadata) queryMetadata);

this.streamsException = new AtomicReference<>(null);
this.queryMetadata.getKafkaStreams().setUncaughtExceptionHandler(new StreamsExceptionHandler());

queryMetadata.getKafkaStreams().start();
}

@Override
public void write(OutputStream out) throws IOException {
try {
AtomicBoolean rowsWritten = new AtomicBoolean(false);
QueryRowWriter queryRowWriter = new QueryRowWriter(
out,
streamsException,
queryMetadata.getRowQueue(),
rowsWritten
);
Thread rowWriterThread = new Thread(queryRowWriter);
rowWriterThread.start();
try {
while (true) {
Thread.sleep(disconnectCheckInterval);
Throwable exception = streamsException.get();
if (exception != null) {
throw exception;
}
while (true) {

KeyValue<String, GenericRow> value = queryMetadata.getRowQueue().poll(disconnectCheckInterval, TimeUnit.MILLISECONDS);
if (value != null) {
write(out, value.value);
} else {
// If no new rows have been written, the user may have terminated the connection without
// us knowing. Check by trying to write a single newline.
if (!rowsWritten.getAndSet(false)) {
synchronized (out) {
out.write("\n".getBytes());
out.flush();
}
}
}
} catch (EOFException exception) {
// The user has terminated the connection; we can stop writing
} catch (InterruptedException exception) {
// The most likely cause of this is the server shutting down. Should just try to close
// gracefully, without writing any more to the connection stream.
log.warn("Interrupted while writing to connection stream");
} catch (Throwable exception) {
log.error("Exception occurred while writing to connection stream: ", exception);
synchronized (out) {
out.write("\n".getBytes());
if (exception.getCause() instanceof KsqlException) {
new ObjectMapper().writeValue(out, new StreamedRow(exception.getCause()));
} else {
new ObjectMapper().writeValue(out, new StreamedRow(exception));
}
out.write("\n".getBytes());
out.flush();
}
}

if (rowWriterThread.isAlive()) {
try {
rowWriterThread.interrupt();
rowWriterThread.join();
} catch (InterruptedException exception) {
log.warn(
"Failed to join row writer thread; setting to daemon to avoid hanging on shutdown"
);
rowWriterThread.setDaemon(true);
if (streamsException != null) {
throw streamsException;
}
}
} catch (EOFException exception) {
// The user has terminated the connection; we can stop writing
} catch (InterruptedException exception) {
// The most likely cause of this is the server shutting down. Should just try to close
// gracefully, without writing any more to the connection stream.
log.warn("Interrupted while writing to connection stream");
} catch (Throwable exception) {
log.error("Exception occurred while writing to connection stream: ", exception);
out.write("\n".getBytes());
if (exception.getCause() instanceof KsqlException) {
new ObjectMapper().writeValue(out, new StreamedRow(exception.getCause()));
} else {
new ObjectMapper().writeValue(out, new StreamedRow(exception));
}
out.write("\n".getBytes());
out.flush();

} finally {
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
queryMetadata.getKafkaStreams().cleanUp();
}
}

void write(OutputStream output, GenericRow row) throws IOException {
objectMapper.writeValue(output, new StreamedRow(row));
output.write("\n".getBytes());
output.flush();
}

private class StreamsExceptionHandler implements Thread.UncaughtExceptionHandler {

@Override
public void uncaughtException(Thread thread, Throwable exception) {
streamsException.compareAndSet(null, exception);
streamsException = exception;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,15 @@ public Response streamQuery(KsqlRequest request) throws Exception {
Map<String, Object> clientLocalProperties =
Optional.ofNullable(request.getStreamsProperties()).orElse(Collections.emptyMap());
Statement statement = statementParser.parseSingleStatement(ksql);

if (statement instanceof Query) {
QueryStreamWriter queryStreamWriter =
new QueryStreamWriter(ksqlEngine, disconnectCheckInterval, ksql, clientLocalProperties);
log.info("Streaming query '{}'", ksql);
return Response.ok().entity(queryStreamWriter).build();

} else if (statement instanceof PrintTopic) {
PrintTopic printTopic = (PrintTopic) statement;
String topicName = printTopic.getTopic().toString();
Long interval =
Optional.ofNullable(printTopic.getIntervalValue()).map(LongLiteral::getValue).orElse(1L);
KsqlTopic ksqlTopic = ksqlEngine.getMetaStore().getTopic(printTopic.getTopic().toString());
Objects.requireNonNull(
ksqlTopic,
String.format("Could not find topic '%s' in the metastore", topicName)
);
Map<String, Object> properties = ksqlEngine.getKsqlConfigProperties();
properties.putAll(clientLocalProperties);
TopicStreamWriter topicStreamWriter = new TopicStreamWriter(
properties,
ksqlTopic,
interval,
disconnectCheckInterval,
printTopic.getFromBeginning()
);
log.info("Printing topic '{}'", topicName);
TopicStreamWriter topicStreamWriter = getTopicStreamWriter(clientLocalProperties, (PrintTopic) statement);
return Response.ok().entity(topicStreamWriter).build();
} else {
throw new Exception(String.format(
Expand All @@ -97,4 +81,27 @@ public Response streamQuery(KsqlRequest request) throws Exception {
));
}
}

private TopicStreamWriter getTopicStreamWriter(Map<String, Object> clientLocalProperties, PrintTopic statement) {
PrintTopic printTopic = statement;
String topicName = printTopic.getTopic().toString();
Long interval =
Optional.ofNullable(printTopic.getIntervalValue()).map(LongLiteral::getValue).orElse(1L);
KsqlTopic ksqlTopic = ksqlEngine.getMetaStore().getTopic(printTopic.getTopic().toString());
Objects.requireNonNull(
ksqlTopic,
String.format("Could not find topic '%s' in the metastore", topicName)
);
Map<String, Object> properties = ksqlEngine.getKsqlConfigProperties();
properties.putAll(clientLocalProperties);
TopicStreamWriter topicStreamWriter = new TopicStreamWriter(
properties,
ksqlTopic,
interval,
disconnectCheckInterval,
printTopic.getFromBeginning()
);
log.info("Printing topic '{}'", topicName);
return topicStreamWriter;
}
}

0 comments on commit 587b233

Please sign in to comment.