Skip to content

Commit

Permalink
For the client binding propagate the full original completion [KVL-11…
Browse files Browse the repository at this point in the history
…12] (#10879)

* For the client binding propagate the full original completion.

Because more fields are added to the completion it gets more difficult to deconstruct it into our own models while keeping the same external API. Because of this, we are propagating the full completion as well

CHANGELOG_BEGIN
java-client-bindings - the original full completion is included in the `CompletionResponse` when available
CHANGELOG_END
  • Loading branch information
nicu-da authored Sep 24, 2021
1 parent 59ad995 commit e79a30a
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,32 @@ object CommandRetryFlow {
{
case Ctx(
RetryInfo(value, nrOfRetries, firstSubmissionTime, _),
Left(CompletionResponse.NotOkResponse(_, status)),
Left(notOk: CompletionResponse.NotOkResponse),
_,
) if RETRYABLE_ERROR_CODES.contains(status.code) =>
) if RETRYABLE_ERROR_CODES.contains(notOk.grpcStatus.code) =>
if ((firstSubmissionTime plus maxRetryTime) isBefore timeProvider.getCurrentTime) {
RetryLogger.logStopRetrying(
value,
status,
notOk.grpcStatus,
nrOfRetries,
firstSubmissionTime,
)
PROPAGATE_PORT
} else {
RetryLogger.logNonFatal(value, status, nrOfRetries)
RetryLogger.logNonFatal(value, notOk.grpcStatus, nrOfRetries)
RETRY_PORT
}
case Ctx(
RetryInfo(value, nrOfRetries, _, _),
Left(CompletionResponse.NotOkResponse(_, status)),
Left(notOk: CompletionResponse.NotOkResponse),
_,
) =>
RetryLogger.logFatal(value, status, nrOfRetries)
RetryLogger.logFatal(value, notOk.grpcStatus, nrOfRetries)
PROPAGATE_PORT
case Ctx(_, Left(CompletionResponse.TimeoutResponse(_)), _) =>
PROPAGATE_PORT
case Ctx(_, Left(CompletionResponse.NoStatusInResponse(commandId)), _) =>
statusNotFoundError(commandId)
case Ctx(_, Left(statusNotFound: CompletionResponse.NoStatusInResponse), _) =>
statusNotFoundError(statusNotFound.commandId)
case Ctx(_, Right(_), _) =>
PROPAGATE_PORT
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
} else {
Ctx(
context,
Right(CompletionResponse.CompletionSuccess(commands.commandId, "", status)),
Right(
CompletionResponse.CompletionSuccess(Completion(commands.commandId, Some(status)))
),
)
}
case x =>
Expand Down Expand Up @@ -102,9 +104,9 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with
)
val failedSubmissions = codesToFail.map { code =>
submitRequest(code.getNumber, Instant.ofEpochSecond(45)) map { result =>
inside(result) { case Seq(Ctx(context, Left(NotOkResponse(_, grpcStatus)), _)) =>
inside(result) { case Seq(Ctx(context, Left(notOk: NotOkResponse), _)) =>
context.nrOfRetries shouldBe 0
grpcStatus.code shouldBe code.getNumber
notOk.grpcStatus.code shouldBe code.getNumber
}
}
}
Expand All @@ -129,9 +131,9 @@ class CommandRetryFlowUT extends AsyncWordSpec with Matchers with AkkaTest with

"stop retrying after maxRetryTime" in {
submitRequest(Code.RESOURCE_EXHAUSTED_VALUE, Instant.ofEpochSecond(15)) map { result =>
inside(result) { case Seq(Ctx(context, Left(NotOkResponse(_, grpcStatus)), _)) =>
inside(result) { case Seq(Ctx(context, Left(notOk: NotOkResponse), _)) =>
context.nrOfRetries shouldBe 0
grpcStatus.code shouldBe Code.RESOURCE_EXHAUSTED_VALUE.intValue
notOk.grpcStatus.code shouldBe Code.RESOURCE_EXHAUSTED_VALUE.intValue
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ final class CommandClientIT
expectedMessageSubString: String,
): Future[Assertion] =
submitCommand(submitRequest).map { result =>
inside(result) { case Left(NotOkResponse(_, grpcStatus)) =>
grpcStatus.code should be(expectedErrorCode.value)
grpcStatus.message should include(expectedMessageSubString)
inside(result) { case Left(notOk: NotOkResponse) =>
notOk.grpcStatus.code should be(expectedErrorCode.value)
notOk.grpcStatus.message should include(expectedMessageSubString)
}
}(DirectExecutionContext)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,16 @@ private[commands] class CommandTracker[Context](
trackingData.context,
Left(
NotOkResponse(
commandId,
StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands with ID: $commandId for submission ID: $maybeSubmissionId. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.",
Seq.empty,
),
Completion(
commandId,
Some(
StatusProto.of(
Status.Code.INTERNAL.value(),
s"There are multiple pending commands with ID: $commandId for submission ID: $maybeSubmissionId. This can only happen for the mutating schema that shouldn't be used anymore, as it doesn't fully support command deduplication.",
Seq.empty,
)
),
)
)
),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ object CompletionResponse {
/** Represents failures from executing submissions through gRPC.
*/
sealed trait CompletionFailure
final case class NotOkResponse(commandId: String, grpcStatus: StatusProto)
extends CompletionFailure {
final case class NotOkResponse(completion: Completion) extends CompletionFailure {
val commandId: String = completion.commandId
val grpcStatus: StatusProto = completion.getStatus
def metadata: Map[String, String] = Map(
GrpcStatuses.DefiniteAnswerKey -> GrpcStatuses.isDefiniteAnswer(grpcStatus).toString
)
}
final case class TimeoutResponse(commandId: String) extends CompletionFailure
final case class NoStatusInResponse(commandId: String) extends CompletionFailure

final case class NoStatusInResponse(completion: Completion) extends CompletionFailure {
val commandId: String = completion.commandId
}

/** Represents failures of submissions throughout the execution queue.
*/
Expand All @@ -43,25 +47,21 @@ object CompletionResponse {
private[daml] final case class QueueSubmitFailure(status: Status) extends TrackedCompletionFailure

final case class CompletionSuccess(
commandId: String,
transactionId: String,
originalStatus: StatusProto,
)
completion: Completion
) {
val commandId: String = completion.commandId
val transactionId: String = completion.transactionId
val originalStatus: StatusProto = completion.getStatus
}

def apply(completion: Completion): Either[CompletionFailure, CompletionSuccess] =
completion.status match {
case Some(grpcStatus) if Code.OK.value() == grpcStatus.code =>
Right(
CompletionSuccess(
commandId = completion.commandId,
transactionId = completion.transactionId,
grpcStatus,
)
)
case Some(grpcStatus) =>
Left(NotOkResponse(completion.commandId, grpcStatus))
Right(CompletionSuccess(completion))
case Some(_) =>
Left(NotOkResponse(completion))
case None =>
Left(NoStatusInResponse(completion.commandId))
Left(NoStatusInResponse(completion))
}

/** For backwards compatibility, clients that are too coupled to [[Completion]] as a type can convert back from [[Either[CompletionFailure, CompletionSuccess]]]
Expand All @@ -70,22 +70,18 @@ object CompletionResponse {
response match {
case Left(failure) =>
failure match {
case NotOkResponse(commandId, grpcStatus) =>
Completion(commandId = commandId, status = Some(grpcStatus))
case NotOkResponse(completion) =>
completion
case TimeoutResponse(commandId) =>
Completion(
commandId = commandId,
status = Some(StatusProto(Code.ABORTED.value(), "Timeout")),
)
case NoStatusInResponse(commandId) =>
Completion(commandId = commandId)
case NoStatusInResponse(completion) =>
completion
}
case Right(success) =>
Completion(
commandId = success.commandId,
transactionId = success.transactionId,
status = Some(success.originalStatus),
)
success.completion
}

private[daml] def toException(response: TrackedCompletionFailure): StatusException =
Expand All @@ -105,7 +101,8 @@ object CompletionResponse {
}

private def extractStatus(response: CompletionFailure): StatusJavaProto.Builder = response match {
case CompletionResponse.NotOkResponse(_, grpcStatus) => GrpcStatus.toJavaBuilder(grpcStatus)
case notOkResponse: CompletionResponse.NotOkResponse =>
GrpcStatus.toJavaBuilder(notOkResponse.grpcStatus)
case CompletionResponse.TimeoutResponse(_) =>
GrpcStatus.toJavaBuilder(Code.ABORTED.value(), Some("Timeout"), Iterable.empty)
case CompletionResponse.NoStatusInResponse(_) =>
Expand Down
Loading

0 comments on commit e79a30a

Please sign in to comment.