Skip to content

Commit

Permalink
log transaction rest api error on broker side (apache#15081)
Browse files Browse the repository at this point in the history
Co-authored-by: gavingaozhangmin <gavingaozhangmin@didiglobal.com>
  • Loading branch information
gaozhangmin and gavingaozhangmin authored Apr 13, 2022
1 parent a4c4aea commit 19120e6
Showing 1 changed file with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits))
.thenAccept(stat -> asyncResponse.resume(stat))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get transaction state in transaction buffer {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down Expand Up @@ -122,8 +126,12 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits), subName)
.thenAccept(stat -> asyncResponse.resume(stat))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get transaction state in pending ack {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down Expand Up @@ -152,8 +160,12 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative)
.thenAccept(stat -> asyncResponse.resume(stat))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get transaction buffer stats in topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down Expand Up @@ -183,8 +195,12 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckStats(authoritative, subName)
.thenAccept(stats -> asyncResponse.resume(stats))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get transaction pending ack stats in topic {}",
clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down Expand Up @@ -276,10 +292,13 @@ public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncRespo
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckInternalStats(authoritative, subName, metadata)
.thenAccept(stats -> asyncResponse.resume(stats))
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get pending ack internal stats {}",
clientAppId(), topicName, ex);
}
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause);
if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
} else if (cause instanceof BrokerServiceException.NotAllowedException) {
Expand Down

0 comments on commit 19120e6

Please sign in to comment.