Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CODE: rework threading on QueryStreamWriter #383

Merged
merged 6 commits into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion config/ksqlserver.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#bootstrap.servers=localhost:1119092
bootstrap.servers=localhost:9092
ksql.command.topic.suffix=commands

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class QueuedSchemaKStream extends SchemaKStream {

private final SynchronousQueue<KeyValue<String, GenericRow>> rowQueue = new SynchronousQueue<>();
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue = new LinkedBlockingQueue<>(100);

private QueuedSchemaKStream(final Schema schema, final KStream kstream, final Field keyField,
final List<SchemaKStream> sourceSchemaKStreams,
Expand All @@ -62,7 +60,7 @@ private QueuedSchemaKStream(final Schema schema, final KStream kstream, final Fi
);
}

public SynchronousQueue<KeyValue<String, GenericRow>> getQueue() {
public BlockingQueue<KeyValue<String, GenericRow>> getQueue() {
return rowQueue;
}

Expand Down Expand Up @@ -124,11 +122,11 @@ public List<SchemaKStream> getSourceSchemaKStreams() {
}

protected static class QueuePopulator<K> implements ForeachAction<K, GenericRow> {
private final SynchronousQueue<KeyValue<String, GenericRow>> queue;
private final BlockingQueue<KeyValue<String, GenericRow>> queue;
private final Optional<Integer> limit;
private int counter = 0;

QueuePopulator(SynchronousQueue<KeyValue<String, GenericRow>> queue,
QueuePopulator(BlockingQueue<KeyValue<String, GenericRow>> queue,
Optional<Integer> limit) {
this.queue = queue;
this.limit = limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@
import org.apache.kafka.streams.KeyValue;

import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;

public class QueuedQueryMetadata extends QueryMetadata {

private final SynchronousQueue<KeyValue<String, GenericRow>> rowQueue;
private final BlockingQueue<KeyValue<String, GenericRow>> rowQueue;

public QueuedQueryMetadata(
String statementString,
KafkaStreams kafkaStreams,
OutputNode outputNode,
String executionPlan,
SynchronousQueue<KeyValue<String, GenericRow>> rowQueue,
BlockingQueue<KeyValue<String, GenericRow>> rowQueue,
DataSource.DataSourceType dataSourceType
) {
super(statementString, kafkaStreams, outputNode, executionPlan, dataSourceType);
this.rowQueue = rowQueue;
}

public SynchronousQueue<KeyValue<String, GenericRow>> getRowQueue() {
public BlockingQueue<KeyValue<String, GenericRow>> getRowQueue() {
return rowQueue;
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlEngine;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,16 +34,16 @@
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class QueryStreamWriter implements StreamingOutput {

private static final Logger log = LoggerFactory.getLogger(QueryStreamWriter.class);

private final QueuedQueryMetadata queryMetadata;
private final long disconnectCheckInterval;
private final AtomicReference<Throwable> streamsException;
private final ObjectMapper objectMapper;
private Throwable streamsException;


QueryStreamWriter(
KsqlEngine ksqlEngine,
Expand All @@ -51,6 +54,7 @@ class QueryStreamWriter implements StreamingOutput {
throws Exception {
QueryMetadata queryMetadata =
ksqlEngine.buildMultipleQueries(true, queryString, overriddenProperties).get(0);
this.objectMapper = new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
if (!(queryMetadata instanceof QueuedQueryMetadata)) {
throw new Exception(String.format(
"Unexpected metadata type: expected QueuedQueryMetadata, found %s instead",
Expand All @@ -61,82 +65,63 @@ class QueryStreamWriter implements StreamingOutput {
this.disconnectCheckInterval = disconnectCheckInterval;
this.queryMetadata = ((QueuedQueryMetadata) queryMetadata);

this.streamsException = new AtomicReference<>(null);
this.queryMetadata.getKafkaStreams().setUncaughtExceptionHandler(new StreamsExceptionHandler());

queryMetadata.getKafkaStreams().start();
}

@Override
public void write(OutputStream out) throws IOException {
try {
AtomicBoolean rowsWritten = new AtomicBoolean(false);
QueryRowWriter queryRowWriter = new QueryRowWriter(
out,
streamsException,
queryMetadata.getRowQueue(),
rowsWritten
);
Thread rowWriterThread = new Thread(queryRowWriter);
rowWriterThread.start();
try {
while (true) {
Thread.sleep(disconnectCheckInterval);
Throwable exception = streamsException.get();
if (exception != null) {
throw exception;
}
while (true) {

KeyValue<String, GenericRow> value = queryMetadata.getRowQueue().poll(disconnectCheckInterval, TimeUnit.MILLISECONDS);
if (value != null) {
write(out, value.value);
} else {
// If no new rows have been written, the user may have terminated the connection without
// us knowing. Check by trying to write a single newline.
if (!rowsWritten.getAndSet(false)) {
synchronized (out) {
out.write("\n".getBytes());
out.flush();
}
}
}
} catch (EOFException exception) {
// The user has terminated the connection; we can stop writing
} catch (InterruptedException exception) {
// The most likely cause of this is the server shutting down. Should just try to close
// gracefully, without writing any more to the connection stream.
log.warn("Interrupted while writing to connection stream");
} catch (Throwable exception) {
log.error("Exception occurred while writing to connection stream: ", exception);
synchronized (out) {
out.write("\n".getBytes());
if (exception.getCause() instanceof KsqlException) {
new ObjectMapper().writeValue(out, new StreamedRow(exception.getCause()));
} else {
new ObjectMapper().writeValue(out, new StreamedRow(exception));
}
out.write("\n".getBytes());
out.flush();
}
}

if (rowWriterThread.isAlive()) {
try {
rowWriterThread.interrupt();
rowWriterThread.join();
} catch (InterruptedException exception) {
log.warn(
"Failed to join row writer thread; setting to daemon to avoid hanging on shutdown"
);
rowWriterThread.setDaemon(true);
if (streamsException != null) {
throw streamsException;
}
}
} catch (EOFException exception) {
// The user has terminated the connection; we can stop writing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we at least log this so we know why it has been terminated?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.warn("Query terminated due to exception:" + exception.toString());
} catch (InterruptedException exception) {
// The most likely cause of this is the server shutting down. Should just try to close
// gracefully, without writing any more to the connection stream.
log.warn("Interrupted while writing to connection stream");
} catch (Throwable exception) {
log.error("Exception occurred while writing to connection stream: ", exception);
out.write("\n".getBytes());
if (exception.getCause() instanceof KsqlException) {
objectMapper.writeValue(out, new StreamedRow(exception.getCause()));
} else {
objectMapper.writeValue(out, new StreamedRow(exception));
}
out.write("\n".getBytes());
out.flush();

} finally {
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS);
queryMetadata.getKafkaStreams().cleanUp();
}
}

void write(OutputStream output, GenericRow row) throws IOException {
objectMapper.writeValue(output, new StreamedRow(row));
output.write("\n".getBytes());
output.flush();
}

private class StreamsExceptionHandler implements Thread.UncaughtExceptionHandler {

@Override
public void uncaughtException(Thread thread, Throwable exception) {
streamsException.compareAndSet(null, exception);
streamsException = exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we get a StreamsException the only sensible thing to do really is shutdown

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets passed back to the stream loop above, that loop will rethrow and cleanup

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,15 @@ public Response streamQuery(KsqlRequest request) throws Exception {
Map<String, Object> clientLocalProperties =
Optional.ofNullable(request.getStreamsProperties()).orElse(Collections.emptyMap());
Statement statement = statementParser.parseSingleStatement(ksql);

if (statement instanceof Query) {
QueryStreamWriter queryStreamWriter =
new QueryStreamWriter(ksqlEngine, disconnectCheckInterval, ksql, clientLocalProperties);
log.info("Streaming query '{}'", ksql);
return Response.ok().entity(queryStreamWriter).build();

} else if (statement instanceof PrintTopic) {
PrintTopic printTopic = (PrintTopic) statement;
String topicName = printTopic.getTopic().toString();
Long interval =
Optional.ofNullable(printTopic.getIntervalValue()).map(LongLiteral::getValue).orElse(1L);
KsqlTopic ksqlTopic = ksqlEngine.getMetaStore().getTopic(printTopic.getTopic().toString());
Objects.requireNonNull(
ksqlTopic,
String.format("Could not find topic '%s' in the metastore", topicName)
);
Map<String, Object> properties = ksqlEngine.getKsqlConfigProperties();
properties.putAll(clientLocalProperties);
TopicStreamWriter topicStreamWriter = new TopicStreamWriter(
properties,
ksqlTopic,
interval,
disconnectCheckInterval,
printTopic.getFromBeginning()
);
log.info("Printing topic '{}'", topicName);
TopicStreamWriter topicStreamWriter = getTopicStreamWriter(clientLocalProperties, (PrintTopic) statement);
return Response.ok().entity(topicStreamWriter).build();
} else {
throw new Exception(String.format(
Expand All @@ -97,4 +81,26 @@ public Response streamQuery(KsqlRequest request) throws Exception {
));
}
}

private TopicStreamWriter getTopicStreamWriter(final Map<String, Object> clientLocalProperties, final PrintTopic printTopic) {
String topicName = printTopic.getTopic().toString();
Long interval =
Optional.ofNullable(printTopic.getIntervalValue()).map(LongLiteral::getValue).orElse(1L);
KsqlTopic ksqlTopic = ksqlEngine.getMetaStore().getTopic(topicName);
Objects.requireNonNull(
ksqlTopic,
String.format("Could not find topic '%s' in the metastore", topicName)
);
Map<String, Object> properties = ksqlEngine.getKsqlConfigProperties();
properties.putAll(clientLocalProperties);
TopicStreamWriter topicStreamWriter = new TopicStreamWriter(
properties,
ksqlTopic,
interval,
disconnectCheckInterval,
printTopic.getFromBeginning()
);
log.info("Printing topic '{}'", topicName);
return topicStreamWriter;
}
}