-
Notifications
You must be signed in to change notification settings - Fork 10.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Watch method in health check service. #16351
Conversation
|
|
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 9 of 9 files at r1.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @markdroth and @yang-g)
src/cpp/server/health/default_health_check_service.h, line 134 at r1 (raw file):
ByteBuffer request_; GenericServerAsyncResponseWriter stream_;
Maybe rename this to avoid using "stream" in unary case.
src/cpp/server/health/default_health_check_service.h, line 251 at r1 (raw file):
HealthCheckServiceImpl::CallHandler
It looks like services_map_
only makes sense in the streaming case, but we are generalizing the interface to take CallHandler
and having SendHealth()
in CallHandler
. I think it's better to restrict the usage to WatchCallHandler
. Then ServiceData
only accepts WatchCallHandler
, and SendHealth()
is only in WatchCallHandler
.
src/cpp/server/health/default_health_check_service.cc, line 39 at r1 (raw file):
DefaultHealthCheckService::DefaultHealthCheckService() { services_map_[""].SetServingStatus(SERVING);
Why is "" always serving?
src/cpp/server/health/default_health_check_service.cc, line 296 at r1 (raw file):
ServingStatus serving_status = database_->GetServingStatus(service_name); if (serving_status == NOT_FOUND) { status = Status(StatusCode::NOT_FOUND, "service name unknown");
We should encode the serving status into the response in this case.
src/cpp/server/health/default_health_check_service.cc, line 419 at r1 (raw file):
"OnCallReceived"
SendHealthLocked
src/cpp/server/health/default_health_check_service.cc, line 443 at r1 (raw file):
"OnReadDone"
OnSendHealthDone
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 6 unresolved discussions (waiting on @AspirinSJL and @yang-g)
src/cpp/server/health/default_health_check_service.h, line 134 at r1 (raw file):
Previously, AspirinSJL (Juanli Shen) wrote…
Maybe rename this to avoid using "stream" in unary case.
Done.
src/cpp/server/health/default_health_check_service.h, line 251 at r1 (raw file):
Previously, AspirinSJL (Juanli Shen) wrote…
HealthCheckServiceImpl::CallHandler
It looks like
services_map_
only makes sense in the streaming case, but we are generalizing the interface to takeCallHandler
and havingSendHealth()
inCallHandler
. I think it's better to restrict the usage toWatchCallHandler
. ThenServiceData
only acceptsWatchCallHandler
, andSendHealth()
is only inWatchCallHandler
.
I thought about that, but it would require making WatchCallHandler
public in HealthCheckServiceImpl
so that DefaultHealthCheckService
can access it. I think it's better to have to put this one unused method in the CallHandler
API than to expose internal implementation details that don't otherwise need to be exposed.
src/cpp/server/health/default_health_check_service.cc, line 39 at r1 (raw file):
Previously, AspirinSJL (Juanli Shen) wrote…
Why is "" always serving?
This was the existing behavior. I'm not changing it in this PR.
src/cpp/server/health/default_health_check_service.cc, line 296 at r1 (raw file):
Previously, AspirinSJL (Juanli Shen) wrote…
We should encode the serving status into the response in this case.
The existing behavior of the Check
method is that it fails with status NOT_FOUND
if the service is unknown. This PR adds a Watch
method with slightly different semantics, but we did not want to break clients of the existing Check
method, so we're not altering its behavior.
Note that for unary RPCs, there's generally no point in populating the response message if the RPC returns a non-OK status, because most clients will ignore the message in that case.
src/cpp/server/health/default_health_check_service.cc, line 419 at r1 (raw file):
Previously, AspirinSJL (Juanli Shen) wrote…
"OnCallReceived"
SendHealthLocked
Done.
src/cpp/server/health/default_health_check_service.cc, line 443 at r1 (raw file):
Previously, AspirinSJL (Juanli Shen) wrote…
"OnReadDone"
OnSendHealthDone
Done.
|
|
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Reviewed 1 of 1 files at r2, 2 of 2 files at r3.
Reviewable status: complete! all files reviewed, all discussions resolved (waiting on @yang-g)
I will not be able to get to it today ~
…On Fri, Aug 17, 2018 at 11:33 AM Juanli Shen ***@***.***> wrote:
***@***.**** approved this pull request.
Looks good to me!
Reviewed 1 of 1 files at r2, 2 of 2 files at r3.
*Reviewable
<https://reviewable.io/reviews/grpc/grpc/16351#-:-LK89DUVBzmYOv1aovqT:b9j9n3q>*
status: [image: ] complete! all files reviewed, all discussions
resolved (waiting on @yang-g <https://github.com/yang-g>)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#16351 (review)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AJp0Co7fPZbBA5mjB6iGqpZ61quLqkM8ks5uRwxngaJpZM4V8vaU>
.
|
|
|
|
|
|
|
|
|
|
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before I dig into the implementation details, is there a design doc for the addition of the Watch API you can point me to? I am a bit not sure what it is used for and whether we should introduce more structured request and response type for the API. Let's say when there are several services at the server and a client wants to watch all of them, should the client start one watch stream or one for each service.
AddMethod(watch_method_); | ||
// Create serving thread. | ||
thread_ = std::unique_ptr<::grpc_core::Thread>( | ||
new ::grpc_core::Thread("health_check_service", Serve, this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"grpc_health_check_service"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
void DefaultHealthCheckService::HealthCheckServiceImpl::Serve(void* arg) { | ||
HealthCheckServiceImpl* service = | ||
reinterpret_cast<HealthCheckServiceImpl*>(arg); | ||
// TODO(juanlishen): This is a workaround to wait for the cq to be ready. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AspirinSJL what does it mean by cq not ready?
// instance will deallocate itself when it's done. | ||
CreateAndStart(cq_, database_, service_); | ||
// Process request. | ||
gpr_log(GPR_INFO, "[HCS %p] Health check started for handler %p", service_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downgrade to debug log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
std::placeholders::_1, std::placeholders::_2), | ||
std::move(self)); | ||
if (status.ok()) { | ||
writer_.Finish(response, status, &next_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note if the cq has been shutdown during the process, we should not call Finish here any more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: | ||
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { | ||
if (ok) { | ||
gpr_log(GPR_INFO, "[HCS %p] Health check call finished for handler %p", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downgrade to debug log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, | ||
std::placeholders::_1, std::placeholders::_2), | ||
std::move(self)); | ||
// TODO(juanlishen): Maybe add a message proto for the client to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure the comment is relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This is here because I copied Juanli's code for the load reporting service, but I agree that this comment is not needed in this case. Removed.
// TODO(juanlishen): Maybe add a message proto for the client to | ||
// explicitly cancel the stream so that we can return OK status in such | ||
// cases. | ||
stream_.Finish(Status::CANCELLED, &on_finish_done_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this stream is never finished with OK status, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Basically, the only way it ever gets terminated is if the client cancels it or the server is shutting down.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: | ||
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { | ||
if (ok) { | ||
gpr_log(GPR_INFO, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on how many streams/client we expect to see at a server, these logs could be spammy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Changed them all to debug.
} | ||
// OnCallReceived() may be called after OnDoneNotified(), so we need to | ||
// try to Finish() every time we are in Shutdown(). | ||
if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that two Shutdown's are called from two threads? Say a client cancel comes in and we enter Shutdown from DoneNotified and then enter again when a SendDone sees shutdown_ is true and enters Shutdown again. We are going to call Finish twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fact that we currently only have a single thread polling the cq means that this won't happen. But you're right that it's something we need to be careful of if that changes in the future. I've added a TODO about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I was think about some Executor-based new polling engine when I was asking.
void* tag; | ||
bool ok; | ||
while (true) { | ||
if (!service->cq_->Next(&tag, &ok)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a big fan of a single cq on a single thread. But I think it will work.
A bit more context on the sync-only implementation of the previous implementation: When I first did the PR, it contains sync and async versions (use automatically depending on whether the server has sync or async). The async version was done similar to the Unimplemented version, but was later removed because "our sync server will be as cheap as async" :). But of course when we have a stream version of API, it should use async somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Given that we need the async implementation for the streaming method, it seemed like a good idea to use async for the unary method too.
I agree that a single polling thread might prove a bottleneck in some cases. I was thinking that if/when that happens, we could add some optional parameters to set the number of threads here. But I would welcome alternative suggestions if you have any.
I suspect that the number of threads here is the real issue, not the number of cqs. Is there some reason we might want to spread over multiple cqs too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, most of the problem is the single thread. Adding a cq in the back is not nice but something we can tolerate I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an original implementation, we did the async service handling by requesting something on easy async cq in a similar way as the Unimplemented. The benefit of that is 1. we do not need to create a cq behind the back and 2 the handling scales out to all cq/threads automatically. But it would be more complicated and not as self-contained as this implementation (especially for a streaming call). So I think this is good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've sent out a gRFC for client-side health checking:
Please let me know if you have any other questions. Thanks!
AddMethod(watch_method_); | ||
// Create serving thread. | ||
thread_ = std::unique_ptr<::grpc_core::Thread>( | ||
new ::grpc_core::Thread("health_check_service", Serve, this)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
void* tag; | ||
bool ok; | ||
while (true) { | ||
if (!service->cq_->Next(&tag, &ok)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Given that we need the async implementation for the streaming method, it seemed like a good idea to use async for the unary method too.
I agree that a single polling thread might prove a bottleneck in some cases. I was thinking that if/when that happens, we could add some optional parameters to set the number of threads here. But I would welcome alternative suggestions if you have any.
I suspect that the number of threads here is the real issue, not the number of cqs. Is there some reason we might want to spread over multiple cqs too?
// instance will deallocate itself when it's done. | ||
CreateAndStart(cq_, database_, service_); | ||
// Process request. | ||
gpr_log(GPR_INFO, "[HCS %p] Health check started for handler %p", service_, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
void DefaultHealthCheckService::HealthCheckServiceImpl::CheckCallHandler:: | ||
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { | ||
if (ok) { | ||
gpr_log(GPR_INFO, "[HCS %p] Health check call finished for handler %p", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
CallableTag(std::bind(&WatchCallHandler::OnFinishDone, this, | ||
std::placeholders::_1, std::placeholders::_2), | ||
std::move(self)); | ||
// TODO(juanlishen): Maybe add a message proto for the client to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This is here because I copied Juanli's code for the load reporting service, but I agree that this comment is not needed in this case. Removed.
// TODO(juanlishen): Maybe add a message proto for the client to | ||
// explicitly cancel the stream so that we can return OK status in such | ||
// cases. | ||
stream_.Finish(Status::CANCELLED, &on_finish_done_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Basically, the only way it ever gets terminated is if the client cancels it or the server is shutting down.
void DefaultHealthCheckService::HealthCheckServiceImpl::WatchCallHandler:: | ||
OnFinishDone(std::shared_ptr<CallHandler> self, bool ok) { | ||
if (ok) { | ||
gpr_log(GPR_INFO, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Changed them all to debug.
std::placeholders::_1, std::placeholders::_2), | ||
std::move(self)); | ||
if (status.ok()) { | ||
writer_.Finish(response, status, &next_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
// OnCallReceived() may be called after OnDoneNotified(), so we need to | ||
// try to Finish() every time we are in Shutdown(). | ||
if (call_state_ >= CALL_RECEIVED && call_state_ < FINISH_CALLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fact that we currently only have a single thread polling the cq means that this won't happen. But you're right that it's something we need to be careful of if that changes in the future. I've added a TODO about this.
|
|
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
|
|
|
e71abc2
to
99ce3e1
Compare
|
|
|
|
This adds a streaming Watch method to the health check service, which is needed as part of the work to enable client-side health checking. The proto changes are imported from grpc/grpc-proto#33.
Note that this changes the health check service to use the async API for both the unary and streaming methods (code based on the work done by @AspirinSJL for server load reporting), so this eliminates the restriction that we don't add the default health checking service on async-only servers.
The one thing here that I'm not crazy about is the way I had to create the
ServerCompletionQueue
inside ofServer::Start()
, rather than doing it inside ofServerBuilder
like we do for all of the other ones. Unfortunately, I didn't see a better way to do that without breaking the existingEnableDefaultHealthCheckService()
API, but I'd welcome suggestions on how to improve this.This change is