-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CODE: rework threading on QueryStreamWriter #383
CODE: rework threading on QueryStreamWriter #383
Conversation
retest this please |
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.SynchronousQueue; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@Override | ||
public void uncaughtException(Thread thread, Throwable exception) { | ||
streamsException.compareAndSet(null, exception); | ||
streamsException = exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we get a StreamsException the only sensible thing to do really is shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it gets passed back to the stream loop above, that loop will rethrow and cleanup
} | ||
} | ||
} catch (EOFException exception) { | ||
// The user has terminated the connection; we can stop writing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we at least log this so we know why it has been terminated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
log.error("Exception occurred while writing to connection stream: ", exception); | ||
out.write("\n".getBytes()); | ||
if (exception.getCause() instanceof KsqlException) { | ||
new ObjectMapper().writeValue(out, new StreamedRow(exception.getCause())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why we don't use the objectMapper
field here and below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -97,4 +81,27 @@ public Response streamQuery(KsqlRequest request) throws Exception { | |||
)); | |||
} | |||
} | |||
|
|||
private TopicStreamWriter getTopicStreamWriter(Map<String, Object> clientLocalProperties, PrintTopic statement) { | |||
PrintTopic printTopic = statement; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
params. Also, why assign the statement to the printTopic
? looks redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
private TopicStreamWriter getTopicStreamWriter(Map<String, Object> clientLocalProperties, PrintTopic statement) { | ||
PrintTopic printTopic = statement; | ||
String topicName = printTopic.getTopic().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this right? the topic name would be "topic something". Maybe add a topicName
method to PrintTopic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
printTopic.toString() would give you ' topic stuff' - getTopic returns QualifiedTopic - which implements toString with correct formatting. I removed the redundant reference on the line below
….x-CODE-QueryStreamWriterThreading
#382 drop threading in QueryStreamWriter. @dguy - can you LMK thoughts on the BlockingQueue