Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CODE: rework threading on QueryStreamWriter #383

Merged
merged 6 commits into from
Oct 17, 2017
Merged

CODE: rework threading on QueryStreamWriter #383

merged 6 commits into from
Oct 17, 2017

Conversation

bluemonk3y
Copy link

#382 drop threading in QueryStreamWriter. @dguy - can you LMK thoughts on the BlockingQueue

@bluemonk3y bluemonk3y requested a review from dguy October 17, 2017 11:12
@bluemonk3y
Copy link
Author

retest this please

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import java.util.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Override
public void uncaughtException(Thread thread, Throwable exception) {
streamsException.compareAndSet(null, exception);
streamsException = exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we get a StreamsException the only sensible thing to do really is shutdown

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets passed back to the stream loop above, that loop will rethrow and cleanup

}
}
} catch (EOFException exception) {
// The user has terminated the connection; we can stop writing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we at least log this so we know why it has been terminated?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

log.error("Exception occurred while writing to connection stream: ", exception);
out.write("\n".getBytes());
if (exception.getCause() instanceof KsqlException) {
new ObjectMapper().writeValue(out, new StreamedRow(exception.getCause()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we don't use the objectMapper field here and below?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -97,4 +81,27 @@ public Response streamQuery(KsqlRequest request) throws Exception {
));
}
}

private TopicStreamWriter getTopicStreamWriter(Map<String, Object> clientLocalProperties, PrintTopic statement) {
PrintTopic printTopic = statement;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final params. Also, why assign the statement to the printTopic? looks redundant

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private TopicStreamWriter getTopicStreamWriter(Map<String, Object> clientLocalProperties, PrintTopic statement) {
PrintTopic printTopic = statement;
String topicName = printTopic.getTopic().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? the topic name would be "topic something". Maybe add a topicName method to PrintTopic?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

printTopic.toString() would give you ' topic stuff' - getTopic returns QualifiedTopic - which implements toString with correct formatting. I removed the redundant reference on the line below

@bluemonk3y bluemonk3y merged commit 863fa31 into confluentinc:4.0.x Oct 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants