-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CODE: rework threading on QueryStreamWriter #383
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,3 @@ | ||
#bootstrap.servers=localhost:1119092 | ||
bootstrap.servers=localhost:9092 | ||
ksql.command.topic.suffix=commands | ||
|
||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,12 +16,15 @@ | |
|
||
package io.confluent.ksql.rest.server.resources.streaming; | ||
|
||
import com.fasterxml.jackson.core.JsonGenerator; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import io.confluent.ksql.GenericRow; | ||
import io.confluent.ksql.KsqlEngine; | ||
import io.confluent.ksql.rest.entity.StreamedRow; | ||
import io.confluent.ksql.util.KsqlException; | ||
import io.confluent.ksql.util.QueryMetadata; | ||
import io.confluent.ksql.util.QueuedQueryMetadata; | ||
import org.apache.kafka.streams.KeyValue; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -31,16 +34,16 @@ | |
import java.io.OutputStream; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
class QueryStreamWriter implements StreamingOutput { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(QueryStreamWriter.class); | ||
|
||
private final QueuedQueryMetadata queryMetadata; | ||
private final long disconnectCheckInterval; | ||
private final AtomicReference<Throwable> streamsException; | ||
private final ObjectMapper objectMapper; | ||
private Throwable streamsException; | ||
|
||
|
||
QueryStreamWriter( | ||
KsqlEngine ksqlEngine, | ||
|
@@ -51,6 +54,7 @@ class QueryStreamWriter implements StreamingOutput { | |
throws Exception { | ||
QueryMetadata queryMetadata = | ||
ksqlEngine.buildMultipleQueries(true, queryString, overriddenProperties).get(0); | ||
this.objectMapper = new ObjectMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); | ||
if (!(queryMetadata instanceof QueuedQueryMetadata)) { | ||
throw new Exception(String.format( | ||
"Unexpected metadata type: expected QueuedQueryMetadata, found %s instead", | ||
|
@@ -61,82 +65,63 @@ class QueryStreamWriter implements StreamingOutput { | |
this.disconnectCheckInterval = disconnectCheckInterval; | ||
this.queryMetadata = ((QueuedQueryMetadata) queryMetadata); | ||
|
||
this.streamsException = new AtomicReference<>(null); | ||
this.queryMetadata.getKafkaStreams().setUncaughtExceptionHandler(new StreamsExceptionHandler()); | ||
|
||
queryMetadata.getKafkaStreams().start(); | ||
} | ||
|
||
@Override | ||
public void write(OutputStream out) throws IOException { | ||
try { | ||
AtomicBoolean rowsWritten = new AtomicBoolean(false); | ||
QueryRowWriter queryRowWriter = new QueryRowWriter( | ||
out, | ||
streamsException, | ||
queryMetadata.getRowQueue(), | ||
rowsWritten | ||
); | ||
Thread rowWriterThread = new Thread(queryRowWriter); | ||
rowWriterThread.start(); | ||
try { | ||
while (true) { | ||
Thread.sleep(disconnectCheckInterval); | ||
Throwable exception = streamsException.get(); | ||
if (exception != null) { | ||
throw exception; | ||
} | ||
while (true) { | ||
|
||
KeyValue<String, GenericRow> value = queryMetadata.getRowQueue().poll(disconnectCheckInterval, TimeUnit.MILLISECONDS); | ||
if (value != null) { | ||
write(out, value.value); | ||
} else { | ||
// If no new rows have been written, the user may have terminated the connection without | ||
// us knowing. Check by trying to write a single newline. | ||
if (!rowsWritten.getAndSet(false)) { | ||
synchronized (out) { | ||
out.write("\n".getBytes()); | ||
out.flush(); | ||
} | ||
} | ||
} | ||
} catch (EOFException exception) { | ||
// The user has terminated the connection; we can stop writing | ||
} catch (InterruptedException exception) { | ||
// The most likely cause of this is the server shutting down. Should just try to close | ||
// gracefully, without writing any more to the connection stream. | ||
log.warn("Interrupted while writing to connection stream"); | ||
} catch (Throwable exception) { | ||
log.error("Exception occurred while writing to connection stream: ", exception); | ||
synchronized (out) { | ||
out.write("\n".getBytes()); | ||
if (exception.getCause() instanceof KsqlException) { | ||
new ObjectMapper().writeValue(out, new StreamedRow(exception.getCause())); | ||
} else { | ||
new ObjectMapper().writeValue(out, new StreamedRow(exception)); | ||
} | ||
out.write("\n".getBytes()); | ||
out.flush(); | ||
} | ||
} | ||
|
||
if (rowWriterThread.isAlive()) { | ||
try { | ||
rowWriterThread.interrupt(); | ||
rowWriterThread.join(); | ||
} catch (InterruptedException exception) { | ||
log.warn( | ||
"Failed to join row writer thread; setting to daemon to avoid hanging on shutdown" | ||
); | ||
rowWriterThread.setDaemon(true); | ||
if (streamsException != null) { | ||
throw streamsException; | ||
} | ||
} | ||
} catch (EOFException exception) { | ||
// The user has terminated the connection; we can stop writing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we at least log this so we know why it has been terminated? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
log.warn("Query terminated due to exception:" + exception.toString()); | ||
} catch (InterruptedException exception) { | ||
// The most likely cause of this is the server shutting down. Should just try to close | ||
// gracefully, without writing any more to the connection stream. | ||
log.warn("Interrupted while writing to connection stream"); | ||
} catch (Throwable exception) { | ||
log.error("Exception occurred while writing to connection stream: ", exception); | ||
out.write("\n".getBytes()); | ||
if (exception.getCause() instanceof KsqlException) { | ||
objectMapper.writeValue(out, new StreamedRow(exception.getCause())); | ||
} else { | ||
objectMapper.writeValue(out, new StreamedRow(exception)); | ||
} | ||
out.write("\n".getBytes()); | ||
out.flush(); | ||
|
||
} finally { | ||
queryMetadata.getKafkaStreams().close(100L, TimeUnit.MILLISECONDS); | ||
queryMetadata.getKafkaStreams().cleanUp(); | ||
} | ||
} | ||
|
||
void write(OutputStream output, GenericRow row) throws IOException { | ||
objectMapper.writeValue(output, new StreamedRow(row)); | ||
output.write("\n".getBytes()); | ||
output.flush(); | ||
} | ||
|
||
private class StreamsExceptionHandler implements Thread.UncaughtExceptionHandler { | ||
|
||
@Override | ||
public void uncaughtException(Thread thread, Throwable exception) { | ||
streamsException.compareAndSet(null, exception); | ||
streamsException = exception; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we get a StreamsException the only sensible thing to do really is shutdown There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it gets passed back to the stream loop above, that loop will rethrow and cleanup |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done