diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 9cb825b9f8e1f..7b2c99ff9d510 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -88,8 +88,12 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp validateTopicName(tenant, namespace, encodedTopic); internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits), Long.parseLong(leastSigBits)) - .thenAccept(stat -> asyncResponse.resume(stat)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get transaction state in transaction buffer {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -122,8 +126,12 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async validateTopicName(tenant, namespace, encodedTopic); internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits), Long.parseLong(leastSigBits), subName) - .thenAccept(stat -> asyncResponse.resume(stat)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get transaction state in pending ack {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -152,8 +160,12 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon checkTransactionCoordinatorEnabled(); validateTopicName(tenant, namespace, encodedTopic); internalGetTransactionBufferStats(authoritative) - .thenAccept(stat -> asyncResponse.resume(stat)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get transaction buffer stats in topic {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -183,8 +195,12 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse, checkTransactionCoordinatorEnabled(); validateTopicName(tenant, namespace, encodedTopic); internalGetPendingAckStats(authoritative, subName) - .thenAccept(stats -> asyncResponse.resume(stats)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get transaction pending ack stats in topic {}", + clientAppId(), topicName, ex); + } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); @@ -276,10 +292,13 @@ public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncRespo checkTransactionCoordinatorEnabled(); validateTopicName(tenant, namespace, encodedTopic); internalGetPendingAckInternalStats(authoritative, subName, metadata) - .thenAccept(stats -> asyncResponse.resume(stats)) + .thenAccept(asyncResponse::resume) .exceptionally(ex -> { + if (!isRedirectException(ex)) { + log.error("[{}] Failed to get pending ack internal stats {}", + clientAppId(), topicName, ex); + } Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause); if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) { asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause)); } else if (cause instanceof BrokerServiceException.NotAllowedException) {