From c03157327e6519cfbe32c3a3cb375dc82e6fa6e8 Mon Sep 17 00:00:00 2001 From: "azure-pipelines[bot]" <36771401+azure-pipelines[bot]@users.noreply.github.com> Date: Thu, 13 Feb 2025 16:08:16 +0000 Subject: [PATCH] update canton to 20250212.15197.v49c4ce7c (#20770) * update canton to 20250212.15197.v49c4ce7c tell-slack: canton * Add io_grpc_grpc_inprocess as a dependency --------- Co-authored-by: Azure Pipelines Daml Build Co-authored-by: Dylan Thinnes --- sdk/canton/BUILD.bazel | 2 + .../sequencer/v30/sequencer_connection.proto | 2 + .../declarative/DeclarativeApiManager.scala | 4 +- .../DeclarativeParticipantApi.scala | 129 ++++++----- .../sequencing/SequencerConnections.scala | 6 + .../sequencer/grpc/SequencerInfoLoader.scala | 31 ++- .../grpc/SequencerInfoLoaderTest.scala | 74 ++++++- .../platform/apiserver/GrpcServer.scala | 62 +++--- .../canton/http/CommandService.scala | 4 +- .../canton/http/PackageService.scala | 7 +- .../http/endpoints/UserManagement.scala | 2 +- .../digitalasset/canton/http/package.scala | 2 +- .../StartableStoppableLedgerApiServer.scala | 14 +- .../block/BlockSequencerStateManager.scala | 5 +- .../block/update/BlockChunkProcessor.scala | 38 ++-- .../block/update/BlockUpdate.scala | 4 +- .../block/update/BlockUpdateGenerator.scala | 2 - .../SequencedSubmissionsValidator.scala | 32 +-- .../update/SubmissionRequestValidator.scala | 208 ++++++------------ .../update/TrafficControlValidator.scala | 63 ++---- .../sequencer/BaseSequencer.scala | 16 -- .../synchronizer/sequencer/Sequencer.scala | 4 - .../sequencer/SubmissionOutcome.scala | 60 ++++- .../sequencer/SubmissionRequestOutcome.scala | 79 ------- .../sequencer/block/BlockSequencer.scala | 1 - .../availability/AvailabilityModule.scala | 3 +- .../consensus/iss/IssConsensusModule.scala | 6 +- .../IssConsensusSignatureVerifier.scala | 79 ++++--- .../validation/PbftMessageValidatorImpl.scala | 1 - .../core/modules/mempool/MempoolModule.scala | 2 +- .../core/networking/BftP2PNetworkIn.scala | 6 +- .../core/networking/BftP2PNetworkOut.scala | 3 +- .../update/BlockChunkProcessorTest.scala | 34 ++- .../update/BlockUpdateGeneratorImplTest.scala | 2 - .../sequencer/BaseSequencerTest.scala | 30 +-- .../DatabaseSequencerSnapshottingTest.scala | 26 ++- .../sequencer/SequencerTest.scala | 43 ++-- .../sequencer/SequencerWriterSourceTest.scala | 2 + .../simulation/SimulationSettings.scala | 5 + .../BftOrderingSimulationTest.scala | 38 +++- .../simulation/bftordering/IssClient.scala | 29 ++- .../unit/modules/MempoolModuleTest.scala | 23 +- .../availability/AvailabilityModuleTest.scala | 8 +- .../GrpcSequencerIntegrationTest.scala | 2 - .../service/GrpcSequencerServiceTest.scala | 2 - .../canton/config/InProcessGrpcName.scala | 10 + sdk/canton/ref | 2 +- 47 files changed, 616 insertions(+), 591 deletions(-) delete mode 100644 sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionRequestOutcome.scala create mode 100644 sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/config/InProcessGrpcName.scala diff --git a/sdk/canton/BUILD.bazel b/sdk/canton/BUILD.bazel index 75405c8d0064..201c17cbad62 100644 --- a/sdk/canton/BUILD.bazel +++ b/sdk/canton/BUILD.bazel @@ -794,6 +794,7 @@ scala_library( "@maven//:com_zaxxer_HikariCP", "@maven//:io_circe_circe_core_2_13", "@maven//:io_grpc_grpc_api", + "@maven//:io_grpc_grpc_inprocess", "@maven//:io_grpc_grpc_netty", "@maven//:io_grpc_grpc_protobuf", "@maven//:io_grpc_grpc_services", @@ -1156,6 +1157,7 @@ scala_library( "@maven//:dev_optics_monocle_macro_2_13", "@maven//:io_circe_circe_core_2_13", "@maven//:io_grpc_grpc_api", + "@maven//:io_grpc_grpc_inprocess", "@maven//:io_grpc_grpc_netty", "@maven//:io_grpc_grpc_services", "@maven//:io_grpc_grpc_stub", diff --git a/sdk/canton/community/admin-api/src/main/protobuf/com/digitalasset/canton/admin/sequencer/v30/sequencer_connection.proto b/sdk/canton/community/admin-api/src/main/protobuf/com/digitalasset/canton/admin/sequencer/v30/sequencer_connection.proto index 6a3ceb5eaea8..4156ae4219b5 100644 --- a/sdk/canton/community/admin-api/src/main/protobuf/com/digitalasset/canton/admin/sequencer/v30/sequencer_connection.proto +++ b/sdk/canton/community/admin-api/src/main/protobuf/com/digitalasset/canton/admin/sequencer/v30/sequencer_connection.proto @@ -32,6 +32,8 @@ enum SequencerConnectionValidation { SEQUENCER_CONNECTION_VALIDATION_ACTIVE = 2; // Validate all the connections SEQUENCER_CONNECTION_VALIDATION_ALL = 3; + // Validate only the ones we could reach, but at least threshold many + SEQUENCER_CONNECTION_VALIDATION_THRESHOLD_ACTIVE = 4; } message SequencerConnections { diff --git a/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeApiManager.scala b/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeApiManager.scala index 38f128595e7a..aafa80566853 100644 --- a/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeApiManager.scala +++ b/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeApiManager.scala @@ -12,6 +12,7 @@ import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory} import com.digitalasset.canton.metrics.DeclarativeApiMetrics import com.digitalasset.canton.participant.config.LocalParticipantConfig import com.digitalasset.canton.tracing.TraceContext +import com.digitalasset.canton.util.EitherTUtil import java.util.concurrent.ScheduledExecutorService import scala.concurrent.{ExecutionContext, Future} @@ -83,7 +84,7 @@ object DeclarativeApiManager { ) } } - .getOrElse(EitherT.rightT(())) + .getOrElse(EitherTUtil.unit) } override def verifyConfig(name: String, config: C)(implicit @@ -92,7 +93,6 @@ object DeclarativeApiManager { config.init.state .map(c => DeclarativeParticipantApi.readConfig(c.file).map(_ => ())) .getOrElse(Right(())) - } } diff --git a/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeParticipantApi.scala b/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeParticipantApi.scala index 025deccb753f..eeccd73055db 100644 --- a/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeParticipantApi.scala +++ b/sdk/canton/community/app-base/src/main/scala/com/digitalasset/canton/console/declarative/DeclarativeParticipantApi.scala @@ -15,6 +15,7 @@ import com.digitalasset.canton.admin.api.client.commands.{ } import com.digitalasset.canton.admin.api.client.data.LedgerApiUser import com.digitalasset.canton.auth.CantonAdminToken +import com.digitalasset.canton.config.CantonRequireTypes.String255 import com.digitalasset.canton.config.ClientConfig import com.digitalasset.canton.config.RequireTypes.PositiveInt import com.digitalasset.canton.console.GrpcAdminCommandRunner @@ -195,7 +196,7 @@ final case class DeclarativeSequencerConnectionConfig( * observed */ final case class DeclarativeConnectionConfig( - synchronizerAlias: String, + synchronizerAlias: SynchronizerAlias, connections: NonEmpty[Map[String, DeclarativeSequencerConnectionConfig]], manualConnect: Boolean = false, priority: Int = 0, @@ -203,37 +204,43 @@ final case class DeclarativeConnectionConfig( trustThreshold: PositiveInt = PositiveInt.one, ) { - def isEquivalent(other: DeclarativeConnectionConfig): Boolean = - manualConnect == other.manualConnect && - priority == other.priority && - initializeFromTrustedSynchronizer == other.initializeFromTrustedSynchronizer && - trustThreshold == other.trustThreshold && - connections.keySet == other.connections.keySet && + def isEquivalent(other: DeclarativeConnectionConfig): Boolean = { + val areConnectionsEquivalent = connections.keySet == other.connections.keySet && connections.forall { case (name, conn) => other.connections.get(name).exists(_.isEquivalent(conn)) } - def toSynchronizerConnectionConfig: SynchronizerConnectionConfig = - SynchronizerConnectionConfig( - synchronizerAlias = SynchronizerAlias.tryCreate(synchronizerAlias), - sequencerConnections = SequencerConnections - .many( - connections = connections.map { case (alias, conn) => - GrpcSequencerConnection( - endpoints = conn.endpoints, - transportSecurity = conn.transportSecurity, - sequencerAlias = SequencerAlias.tryCreate(alias), - customTrustCertificates = conn.customTrustCertificatesAsByteString.toOption.flatten, - ) - }.toSeq, - sequencerTrustThreshold = trustThreshold, - submissionRequestAmplification = SubmissionRequestAmplification.NoAmplification, - ) - .getOrElse(throw new IllegalArgumentException("Cannot create sequencer connections")), - manualConnect = manualConnect, - priority = priority, - initializeFromTrustedSynchronizer = initializeFromTrustedSynchronizer, - ) + if (areConnectionsEquivalent) + this.copy(connections = other.connections) == other + else false + } + + def toSynchronizerConnectionConfig: Either[String, SynchronizerConnectionConfig] = { + val sequencerConnectionsE = SequencerConnections + .many( + connections = connections.map { case (alias, conn) => + GrpcSequencerConnection( + endpoints = conn.endpoints, + transportSecurity = conn.transportSecurity, + sequencerAlias = SequencerAlias.tryCreate(alias), + customTrustCertificates = conn.customTrustCertificatesAsByteString.toOption.flatten, + ) + }.toSeq, + sequencerTrustThreshold = trustThreshold, + submissionRequestAmplification = SubmissionRequestAmplification.NoAmplification, + ) + + sequencerConnectionsE.map { sequencerConnections => + SynchronizerConnectionConfig( + synchronizerAlias = synchronizerAlias, + sequencerConnections = sequencerConnections, + manualConnect = manualConnect, + priority = priority, + initializeFromTrustedSynchronizer = initializeFromTrustedSynchronizer, + ) + } + + } } @@ -643,11 +650,13 @@ class DeclarativeParticipantApi( checkSelfConsistent: Boolean, )(implicit traceContext: TraceContext): Either[String, UpdateResult] = { - def toDeclarative(config: SynchronizerConnectionConfig): (String, DeclarativeConnectionConfig) = + def toDeclarative( + config: SynchronizerConnectionConfig + ): (SynchronizerAlias, DeclarativeConnectionConfig) = ( - config.synchronizerAlias.unwrap, + config.synchronizerAlias, DeclarativeConnectionConfig( - synchronizerAlias = config.synchronizerAlias.unwrap, + synchronizerAlias = config.synchronizerAlias, connections = config.sequencerConnections.aliasToConnection.map { case (alias, connection: GrpcSequencerConnection) => ( @@ -666,20 +675,21 @@ class DeclarativeParticipantApi( ), ) - def fetchConnections(): Either[String, (Seq[(String, DeclarativeConnectionConfig)])] = + def fetchConnections(): Either[String, Seq[(SynchronizerAlias, DeclarativeConnectionConfig)]] = queryAdminApi(ParticipantAdminCommands.SynchronizerConnectivity.ListRegisteredSynchronizers) - .map(_.map(_._1).map(toDeclarative)) + .map(_.map { case (synchronizerConnectionConfig, _) => synchronizerConnectionConfig } + .map(toDeclarative)) - def removeSynchronizerConnection(alias: String): Either[String, Unit] = { - val synchronizerAlias = SynchronizerAlias.tryCreate(alias) + def removeSynchronizerConnection(synchronizerAlias: SynchronizerAlias): Either[String, Unit] = // cannot really remove connections for now, just disconnect and disable for { currentO <- queryAdminApi( ParticipantAdminCommands.SynchronizerConnectivity.ListRegisteredSynchronizers - ).map(_.find(_._1.synchronizerAlias == synchronizerAlias)) + ).map(_.collectFirst { + case (config, _) if config.synchronizerAlias == synchronizerAlias => config + }) current <- currentO - .toRight(s"Unable to find configuration for synchronizer $alias") - .map(_._1) + .toRight(s"Unable to find configuration for synchronizer $synchronizerAlias") _ <- queryAdminApi( ParticipantAdminCommands.SynchronizerConnectivity.DisconnectSynchronizer( synchronizerAlias @@ -693,31 +703,39 @@ class DeclarativeParticipantApi( ) } yield () - } + def add(config: DeclarativeConnectionConfig): Either[String, Unit] = + for { + synchronizerConnectionConfig <- config.toSynchronizerConnectionConfig + _ <- queryAdminApi( + ParticipantAdminCommands.SynchronizerConnectivity.ConnectSynchronizer( + synchronizerConnectionConfig, + sequencerConnectionValidation = SequencerConnectionValidation.Active, + ) + ) + } yield () + + def update(config: DeclarativeConnectionConfig) = + for { + synchronizerConnectionConfig <- config.toSynchronizerConnectionConfig + _ <- queryAdminApi( + ParticipantAdminCommands.SynchronizerConnectivity.ModifySynchronizerConnection( + synchronizerConnectionConfig, + sequencerConnectionValidation = SequencerConnectionValidation.Active, + ) + ) + } yield () - run[String, DeclarativeConnectionConfig]( + run[SynchronizerAlias, DeclarativeConnectionConfig]( "connections", removeExcess = removeConnections, checkSelfConsistent = checkSelfConsistent, want = connections.map(c => (c.synchronizerAlias, c)), fetch = _ => fetchConnections(), - add = { case (_, config) => - queryAdminApi( - ParticipantAdminCommands.SynchronizerConnectivity.ConnectSynchronizer( - config.toSynchronizerConnectionConfig, - sequencerConnectionValidation = SequencerConnectionValidation.Active, - ) - ) - }, + add = { case (_, config) => add(config) }, upd = { case (_, config, existing) => if (config.isEquivalent(existing)) Either.unit else - queryAdminApi( - ParticipantAdminCommands.SynchronizerConnectivity.ModifySynchronizerConnection( - config.toSynchronizerConnectionConfig, - sequencerConnectionValidation = SequencerConnectionValidation.Active, - ) - ) + update(config) }, rm = removeSynchronizerConnection, compare = Some { case (x, y) => x.isEquivalent(y) }, @@ -827,6 +845,9 @@ object DeclarativeParticipantApi { import pureconfig.generic.semiauto.* // import canton config to include the implicit that prevents unknown keys + implicit val synchronizerAliasReader: ConfigReader[SynchronizerAlias] = + ConfigReader[String255].map(SynchronizerAlias(_)) + implicit val declarativeParticipantConfigReader: ConfigReader[DeclarativeParticipantConfig] = { implicit val darConfigReader: ConfigReader[DeclarativeDarConfig] = deriveReader[DeclarativeDarConfig] diff --git a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/SequencerConnections.scala b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/SequencerConnections.scala index 501824468ca0..4b1c6251386b 100644 --- a/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/SequencerConnections.scala +++ b/sdk/canton/community/base/src/main/scala/com/digitalasset/canton/sequencing/SequencerConnections.scala @@ -232,6 +232,10 @@ object SequencerConnectionValidation { override val toProtoV30: v30.SequencerConnectionValidation = v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_ACTIVE } + object ThresholdActive extends SequencerConnectionValidation { + override val toProtoV30: v30.SequencerConnectionValidation = + v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_THRESHOLD_ACTIVE + } def fromProtoV30( proto: v30.SequencerConnectionValidation @@ -241,6 +245,8 @@ object SequencerConnectionValidation { Right(Disabled) case v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_ALL => Right(All) case v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_ACTIVE => Right(Active) + case v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_THRESHOLD_ACTIVE => + Right(ThresholdActive) case _ => Left( ProtoDeserializationError.ValueConversionError( diff --git a/sdk/canton/community/common/src/main/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoader.scala b/sdk/canton/community/common/src/main/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoader.scala index 10012e7e3055..6b58dca034aa 100644 --- a/sdk/canton/community/common/src/main/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoader.scala +++ b/sdk/canton/community/common/src/main/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoader.scala @@ -222,7 +222,8 @@ class SequencerInfoLoader( ): EitherT[FutureUnlessShutdown, Seq[LoadSequencerEndpointInformationResult.NotValid], Unit] = sequencerConnectionValidation match { case SequencerConnectionValidation.Disabled => EitherT.rightT(()) - case SequencerConnectionValidation.All | SequencerConnectionValidation.Active => + case SequencerConnectionValidation.All | SequencerConnectionValidation.Active | + SequencerConnectionValidation.ThresholdActive => EitherT( loadSequencerEndpoints( alias, @@ -233,6 +234,7 @@ class SequencerInfoLoader( SequencerInfoLoader.validateNewSequencerConnectionResults( expectedSynchronizerId, sequencerConnectionValidation, + sequencerConnections.sequencerTrustThreshold, logger, )(_) ) @@ -456,6 +458,7 @@ object SequencerInfoLoader { def validateNewSequencerConnectionResults( expectedSynchronizerId: Option[SynchronizerId], sequencerConnectionValidation: SequencerConnectionValidation, + sequencerTrustThreshold: PositiveInt, logger: TracedLogger, )( results: Seq[LoadSequencerEndpointInformationResult] @@ -473,13 +476,15 @@ object SequencerInfoLoader { case Nil => accumulated case (notValid: LoadSequencerEndpointInformationResult.NotValid) :: rest => - if (sequencerConnectionValidation != SequencerConnectionValidation.All) { - logger.info( - s"Skipping validation, as I am unable to obtain synchronizer id and sequencer-id: ${notValid.error} for ${notValid.sequencerConnection}" - ) - go(reference, sequencerIds, rest, accumulated) - } else - go(reference, sequencerIds, rest, notValid +: accumulated) + sequencerConnectionValidation match { + case SequencerConnectionValidation.All | SequencerConnectionValidation.ThresholdActive => + go(reference, sequencerIds, rest, notValid +: accumulated) + case SequencerConnectionValidation.Active | SequencerConnectionValidation.Disabled => + logger.info( + s"Skipping validation, as I am unable to obtain synchronizer id and sequencer-id: ${notValid.error} for ${notValid.sequencerConnection}" + ) + go(reference, sequencerIds, rest, accumulated) + } case (valid: LoadSequencerEndpointInformationResult.Valid) :: rest => val result = for { // check that synchronizer id matches the reference @@ -560,7 +565,10 @@ object SequencerInfoLoader { } val collected = go(None, Map.empty, results.toList, Seq.empty) - Either.cond(collected.isEmpty, (), collected) + if (collected.isEmpty) Either.unit + else if (sequencerConnectionValidation == SequencerConnectionValidation.ThresholdActive) + Either.cond(results.size - collected.size >= sequencerTrustThreshold.unwrap, (), collected) + else Left(collected) } /** Aggregates the endpoint information into the actual connection @@ -588,10 +596,9 @@ object SequencerInfoLoader { validateNewSequencerConnectionResults( expectedSynchronizerId, sequencerConnectionValidation, + sequencerTrustThreshold, logger, - )( - fullResult.toList - ) match { + )(fullResult) match { case Right(()) => val validSequencerConnections = fullResult .collect { case valid: LoadSequencerEndpointInformationResult.Valid => diff --git a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoaderTest.scala b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoaderTest.scala index 09d2878b57ea..f8523c0b621e 100644 --- a/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoaderTest.scala +++ b/sdk/canton/community/common/src/test/scala/com/digitalasset/canton/common/sequencer/grpc/SequencerInfoLoaderTest.scala @@ -46,10 +46,12 @@ class SequencerInfoLoaderTest extends BaseTestWordSpec with HasExecutionContext ) private lazy val sequencerAlias1 = SequencerAlias.tryCreate("sequencer1") private lazy val sequencerAlias2 = SequencerAlias.tryCreate("sequencer2") + private lazy val sequencerAlias3 = SequencerAlias.tryCreate("sequencer3") private lazy val synchronizerId1 = SynchronizerId.tryFromString("first::namespace") private lazy val synchronizerId2 = SynchronizerId.tryFromString("second::namespace") private lazy val endpoint1 = Endpoint("localhost", Port.tryCreate(1001)) private lazy val endpoint2 = Endpoint("localhost", Port.tryCreate(1002)) + private lazy val endpoint3 = Endpoint("localhost", Port.tryCreate(1003)) private lazy val staticSynchronizerParameters = BaseTest.defaultStaticSynchronizerParametersWith() private lazy val synchronizerAlias = SynchronizerAlias.tryCreate("synchronizer1") @@ -94,11 +96,13 @@ class SequencerInfoLoaderTest extends BaseTestWordSpec with HasExecutionContext Either[SequencerInfoLoaderError, SynchronizerClientBootstrapInfo], ) ], - activeOnly: Boolean, - ) = SequencerInfoLoader + validation: SequencerConnectionValidation = SequencerConnectionValidation.All, + threshold: PositiveInt = PositiveInt.one, + ): Either[Seq[LoadSequencerEndpointInformationResult.NotValid], Unit] = SequencerInfoLoader .validateNewSequencerConnectionResults( expectSynchronizerId, - if (activeOnly) SequencerConnectionValidation.Active else SequencerConnectionValidation.All, + validation, + threshold, logger, )(mapArgs(args)) @@ -111,9 +115,10 @@ class SequencerInfoLoaderTest extends BaseTestWordSpec with HasExecutionContext Either[SequencerInfoLoaderError, SynchronizerClientBootstrapInfo], ) ], - activeOnly: Boolean = false, + validation: SequencerConnectionValidation = SequencerConnectionValidation.All, + threshold: PositiveInt = PositiveInt.one, )(check: String => Assertion): Assertion = { - val result = run(expectSynchronizerId, args, activeOnly) + val result = run(expectSynchronizerId, args, validation, threshold) result.left.value should have length (1) result.left.value.foreach(x => check(x.error.cause)) succeed @@ -215,9 +220,66 @@ class SequencerInfoLoaderTest extends BaseTestWordSpec with HasExecutionContext Right(SynchronizerClientBootstrapInfo(synchronizerId1, sequencer2)), ), ), - activeOnly = false, ).value shouldBe (()) } + "tolerate errors if threshold can be reached" in { + forAll( + Seq(SequencerConnectionValidation.Active, SequencerConnectionValidation.ThresholdActive) + ) { validation => + run( + None, + List( + ( + sequencerAlias1, + endpoint1, + Right(SynchronizerClientBootstrapInfo(synchronizerId1, sequencer1)), + ), + (sequencerAlias2, endpoint2, Left(SequencerInfoLoaderError.InvalidState("booh"))), + ( + sequencerAlias3, + endpoint3, + Right(SynchronizerClientBootstrapInfo(synchronizerId1, sequencer2)), + ), + ), + validation, + threshold = PositiveInt.tryCreate(2), + ).value shouldBe (()) + } + } + "tolerate errors for Active if threshold can not be reached" in { + run( + None, + List( + ( + sequencerAlias1, + endpoint1, + Right(SynchronizerClientBootstrapInfo(synchronizerId1, sequencer1)), + ), + (sequencerAlias2, endpoint2, Left(SequencerInfoLoaderError.InvalidState("booh2"))), + (sequencerAlias3, endpoint3, Left(SequencerInfoLoaderError.InvalidState("booh3"))), + ), + SequencerConnectionValidation.Active, + threshold = PositiveInt.tryCreate(2), + ).value shouldBe (()) + } + "complain about errors for StrictActive if threshold can not be reached" in { + val result = run( + None, + List( + ( + sequencerAlias1, + endpoint1, + Right(SynchronizerClientBootstrapInfo(synchronizerId1, sequencer1)), + ), + (sequencerAlias2, endpoint2, Left(SequencerInfoLoaderError.InvalidState("booh2"))), + (sequencerAlias3, endpoint3, Left(SequencerInfoLoaderError.InvalidState("booh3"))), + ), + SequencerConnectionValidation.ThresholdActive, + threshold = PositiveInt.tryCreate(2), + ) + result.left.value should have length 2 + forAll(result.left.value)(_.error.cause should (include("booh2") or include("booh3"))) + } } "aggregation" should { diff --git a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/GrpcServer.scala b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/GrpcServer.scala index ff12de044cb4..1f9ddcf93df6 100644 --- a/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/GrpcServer.scala +++ b/sdk/canton/community/ledger/ledger-api-core/src/main/scala/com/digitalasset/canton/platform/apiserver/GrpcServer.scala @@ -5,13 +5,14 @@ package com.digitalasset.canton.platform.apiserver import com.daml.ledger.resources.ResourceOwner import com.daml.metrics.grpc.GrpcMetricsServerInterceptor -import com.digitalasset.canton.config.KeepAliveServerConfig import com.digitalasset.canton.config.RequireTypes.Port +import com.digitalasset.canton.config.{InProcessGrpcName, KeepAliveServerConfig} import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.metrics.LedgerApiServerMetrics import com.digitalasset.canton.platform.apiserver.error.ErrorInterceptor import com.google.protobuf.Message import io.grpc.* +import io.grpc.inprocess.InProcessServerBuilder import io.grpc.netty.NettyServerBuilder import io.netty.handler.ssl.SslContext @@ -45,6 +46,33 @@ object GrpcServer { keepAlive: Option[KeepAliveServerConfig], ): ResourceOwner[Server] = { val host = address.map(InetAddress.getByName).getOrElse(InetAddress.getLoopbackAddress) + + def addServicesAndInterceptors[T <: ForwardingServerBuilder[T]](builder: T) = { + val builderWithInterceptors = + interceptors + .foldLeft(builder) { case (builder, interceptor) => + builder.intercept(interceptor) + } + .intercept(new ActiveStreamMetricsInterceptor(metrics)) + .intercept(new GrpcMetricsServerInterceptor(metrics.grpc)) + .intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength)) + .intercept(new ErrorInterceptor(loggerFactory)) + + val builderWithServices = services.foldLeft(builderWithInterceptors) { + case (builder, service) => + toLegacyService(service).fold(builder.addService(service))(legacyService => + builder + .addService(service) + .addService(legacyService) + ) + } + ResourceOwner + .forServer(builderWithServices, shutdownTimeout = 1.second) + .transform(_.recoverWith { + case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] => + Failure(new UnableToBind(desiredPort, e.getCause)) + }) + } val builder = NettyServerBuilder .forAddress(new InetSocketAddress(host, desiredPort.unwrap)) @@ -55,30 +83,15 @@ object GrpcServer { val builderWithKeepAlive = configureKeepAlive(keepAlive, builder) // NOTE: Interceptors run in the reverse order in which they were added. - val builderWithInterceptors = - interceptors - .foldLeft(builderWithKeepAlive) { case (builder, interceptor) => - builder.intercept(interceptor) - } - .intercept(new ActiveStreamMetricsInterceptor(metrics)) - .intercept(new GrpcMetricsServerInterceptor(metrics.grpc)) - .intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength)) - .intercept(new ErrorInterceptor(loggerFactory)) - val builderWithServices = services.foldLeft(builderWithInterceptors) { - case (builder, service) => - toLegacyService(service).fold(builder.addService(service))(legacyService => - builder - .addService(service) - .addService(legacyService) - ) - } - ResourceOwner - .forServer(builderWithServices, shutdownTimeout = 1.second) - .transform(_.recoverWith { - case e: IOException if e.getCause != null && e.getCause.isInstanceOf[BindException] => - Failure(new UnableToBind(desiredPort, e.getCause)) - }) + val inProcessBuilder: InProcessServerBuilder = + InProcessServerBuilder + .forName(InProcessGrpcName.forPort(desiredPort)) + .executor(servicesExecutor) + for { + _ <- addServicesAndInterceptors(inProcessBuilder) + httpServer <- addServicesAndInterceptors(builderWithKeepAlive) + } yield httpServer } def configureKeepAlive( @@ -138,5 +151,4 @@ object GrpcServer { Option(digitalassetDef.build()) } else None } - } diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/CommandService.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/CommandService.scala index 104120300f5a..2e559809aef8 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/CommandService.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/CommandService.scala @@ -208,7 +208,7 @@ class CommandService( case -\/((templateId, contractKey)) => Commands.exerciseByKey( templateId = refApiIdentifier(templateId), - // TODO daml-14549 somehow pass choiceSource + // TODO(#13303) Re-adapted from Daml repo: daml-14549 somehow pass choiceSource contractKey = contractKey, choice = input.choice, argument = input.argument, @@ -223,7 +223,7 @@ class CommandService( } } - // TODO daml-14549 somehow use the choiceInterfaceId + // TODO(#13303) Re-adapted from Daml repo: daml-14549 somehow use the choiceInterfaceId private def createAndExerciseCommand( input: CreateAndExerciseCommand.LAVResolved ): lav2.commands.Command.Command.CreateAndExercise = diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/PackageService.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/PackageService.scala index d055fb2ab50d..bdebf742b6dd 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/PackageService.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/PackageService.scala @@ -317,7 +317,7 @@ object PackageService { import com.digitalasset.canton.http.ContractTypeId as C object Overload extends LowPriority { - /* TODO #15293 see below note about Top + /* TODO(#13303) Re-adapted from Daml repo #15293: see below note about Top implicit case object Unknown extends Overload[C.Unknown.RequiredPkg, C.ResolvedId[C.Definite[String]]] */ @@ -325,7 +325,8 @@ object PackageService { case object Top extends Overload[C.RequiredPkg, C.Definite] } - // TODO #15293 if the request model has .Unknown included, then LowPriority and Top are + // TODO(#13303) Re-adapted from Daml repo #15293: + // if the request model has .Unknown included, then LowPriority and Top are // no longer needed and can be replaced with Overload.Unknown above sealed abstract class LowPriority { this: Overload.type => // needs to be low priority so it doesn't win against Template @@ -496,7 +497,7 @@ object PackageService { ctId: ContractTypeId.ResolvedPkgId, choice: Choice, ): Error \/ (Option[ContractTypeId.Interface.ResolvedPkgId], typesig.Type) = { - // TODO #14727 skip indirect resolution if ctId is an interface ID + // TODO(#13303) Re-adapted from Daml repo #14727: skip indirect resolution if ctId is an interface ID val resolution = for { choices <- choiceIdMap get ctId overloads <- choices get choice diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/endpoints/UserManagement.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/endpoints/UserManagement.scala index 1aca492b2042..a2a403e09bd3 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/endpoints/UserManagement.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/endpoints/UserManagement.scala @@ -179,7 +179,7 @@ final class UserManagement( private def aggregateListUserPages( token: Option[String], - pageSize: Int = 1000, // TODO could be made configurable in the future + pageSize: Int = 1000, )(implicit traceContext: TraceContext): Source[Error \/ User, NotUsed] = { import scalaz.std.option.* Source.unfoldAsync(some("")) { diff --git a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/package.scala b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/package.scala index f3cf1ae24cf3..9bd6e32c95be 100644 --- a/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/package.scala +++ b/sdk/canton/community/ledger/ledger-json-api/src/main/scala/com/digitalasset/canton/http/package.scala @@ -1055,7 +1055,7 @@ package http { object Interface extends Like[Interface] - // TODO #14727 make an opaque subtype, produced by PackageService on + // TODO(#13303) Re-adapted from Daml repo #14727: make an opaque subtype, produced by PackageService on // confirmed-present IDs only. Can probably start by adding // `with Definite[Any]` here and seeing what happens /** A resolved [[ContractTypeId]], typed `CtTyId`. */ diff --git a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala index d6805ffbe8bb..33312605cd73 100644 --- a/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala +++ b/sdk/canton/community/participant/src/main/scala/com/digitalasset/canton/participant/ledger/api/StartableStoppableLedgerApiServer.scala @@ -19,7 +19,7 @@ import com.digitalasset.canton.concurrent.{ ExecutionContextIdlenessExecutorService, FutureSupervisor, } -import com.digitalasset.canton.config.ProcessingTimeout +import com.digitalasset.canton.config.{InProcessGrpcName, ProcessingTimeout} import com.digitalasset.canton.connection.GrpcApiInfoService import com.digitalasset.canton.connection.v30.ApiInfoServiceGrpc import com.digitalasset.canton.data.Offset @@ -41,11 +41,7 @@ import com.digitalasset.canton.ledger.participant.state.{InternalStateService, P import com.digitalasset.canton.lifecycle.* import com.digitalasset.canton.lifecycle.LifeCycle.FastCloseableChannel import com.digitalasset.canton.logging.{LoggingContextWithTrace, NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.networking.grpc.{ - ApiRequestLogger, - CantonGrpcUtil, - ClientChannelBuilder, -} +import com.digitalasset.canton.networking.grpc.{ApiRequestLogger, CantonGrpcUtil} import com.digitalasset.canton.participant.ParticipantNodeParameters import com.digitalasset.canton.participant.protocol.ContractAuthenticator import com.digitalasset.canton.platform.ResourceOwnerOps @@ -67,6 +63,7 @@ import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.{FutureUtil, SimpleExecutionQueue} import com.digitalasset.daml.lf.data.Ref import com.digitalasset.daml.lf.engine.Engine +import io.grpc.inprocess.InProcessChannelBuilder import io.grpc.{BindableService, ServerInterceptor, ServerServiceDefinition} import io.opentelemetry.api.trace.Tracer import io.opentelemetry.instrumentation.grpc.v1_6.GrpcTelemetry @@ -477,8 +474,9 @@ class StartableStoppableLedgerApiServer( for { channel <- ResourceOwner .forReleasable(() => - ClientChannelBuilder - .createChannelBuilderToTrustedServer(config.serverConfig.clientConfig) + InProcessChannelBuilder + .forName(InProcessGrpcName.forPort(config.serverConfig.clientConfig.port)) + .executor(executionContext.execute(_)) .build() )(channel => Future( diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/BlockSequencerStateManager.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/BlockSequencerStateManager.scala index 7c9ea3b503ff..9ac408bdda31 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/BlockSequencerStateManager.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/BlockSequencerStateManager.scala @@ -24,7 +24,6 @@ import com.digitalasset.canton.synchronizer.sequencer.{ DeliverableSubmissionOutcome, InFlightAggregations, SequencerIntegration, - SubmissionRequestOutcome, } import com.digitalasset.canton.synchronizer.sequencing.traffic.store.TrafficConsumedStore import com.digitalasset.canton.topology.{Member, SynchronizerId} @@ -244,7 +243,7 @@ class BlockSequencerStateManager( ) val trafficConsumedUpdates = update.submissionsOutcomes.flatMap { - case SubmissionRequestOutcome(_, _, outcome: DeliverableSubmissionOutcome) => + case outcome: DeliverableSubmissionOutcome => outcome.trafficReceiptO match { case Some(trafficReceipt) => Some( @@ -261,7 +260,7 @@ class BlockSequencerStateManager( trafficConsumedStore.store(trafficConsumedUpdates) ) ) - _ <- dbSequencerIntegration.blockSequencerWrites(update.submissionsOutcomes.map(_.outcome)) + _ <- dbSequencerIntegration.blockSequencerWrites(update.submissionsOutcomes) _ <- EitherT.right[String]( dbSequencerIntegration.blockSequencerAcknowledge(update.acknowledgements) ) diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessor.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessor.scala index 1cd227989443..1126c056b47f 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessor.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessor.scala @@ -45,7 +45,6 @@ import SequencedSubmissionsValidator.SequencedSubmissionsValidationResult /** Processes a chunk of events in a block, yielding a [[ChunkUpdate]]. */ final class BlockChunkProcessor( - synchronizerId: SynchronizerId, protocolVersion: ProtocolVersion, synchronizerSyncCryptoApi: SynchronizerCryptoClient, sequencerId: SequencerId, @@ -59,8 +58,6 @@ final class BlockChunkProcessor( private val sequencedSubmissionsValidator = new SequencedSubmissionsValidator( - synchronizerId, - protocolVersion, synchronizerSyncCryptoApi, sequencerId, rateLimitManager, @@ -144,7 +141,7 @@ final class BlockChunkProcessor( // using the lastTs computed initially pre-validation. lastChunkTsOfSuccessfulEvents = reversedOutcomes - .collect { case SubmissionRequestOutcome(_, _, o: DeliverableSubmissionOutcome) => + .collect { case o: DeliverableSubmissionOutcome => o.sequencingTime } .maxOption @@ -220,26 +217,23 @@ final class BlockChunkProcessor( latestSequencerEventTimestamp = Some(tickSequencingTimestamp), ) val tickSubmissionOutcome = - SubmissionRequestOutcome( - Map.empty, // Sequenced events are legacy and will be removed, so no need to generate them - None, - outcome = SubmissionOutcome.Deliver( - SubmissionRequest.tryCreate( - sender = sequencerId, - messageId = MessageId.tryCreate(s"topology-tick-$height"), - batch = Batch.empty(protocolVersion), - maxSequencingTime = tickSequencingTimestamp, - topologyTimestamp = None, - aggregationRule = None, - submissionCost = None, - protocolVersion = protocolVersion, - ), - sequencingTime = tickSequencingTimestamp, - deliverToMembers = sequencerRecipients(SequencersOfSynchronizer), + SubmissionOutcome.Deliver( + SubmissionRequest.tryCreate( + sender = sequencerId, + messageId = MessageId.tryCreate(s"topology-tick-$height"), batch = Batch.empty(protocolVersion), - submissionTraceContext = TraceContext.createNew(), - trafficReceiptO = None, + maxSequencingTime = tickSequencingTimestamp, + topologyTimestamp = None, + aggregationRule = None, + submissionCost = None, + protocolVersion = protocolVersion, ), + sequencingTime = tickSequencingTimestamp, + deliverToMembers = sequencerRecipients(SequencersOfSynchronizer), + batch = Batch.empty(protocolVersion), + submissionTraceContext = TraceContext.createNew(), + trafficReceiptO = None, + inFlightAggregation = None, ) val chunkUpdate = ChunkUpdate( acknowledgements = Map.empty, diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdate.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdate.scala index f335b72ef982..303805c26531 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdate.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdate.scala @@ -9,7 +9,7 @@ import com.digitalasset.canton.synchronizer.block.data.BlockInfo import com.digitalasset.canton.synchronizer.sequencer.{ InFlightAggregationUpdates, InFlightAggregations, - SubmissionRequestOutcome, + SubmissionOutcome, } import com.digitalasset.canton.topology.Member @@ -55,5 +55,5 @@ final case class ChunkUpdate( inFlightAggregationUpdates: InFlightAggregationUpdates = Map.empty, lastSequencerEventTimestamp: Option[CantonTimestamp], inFlightAggregations: InFlightAggregations, - submissionsOutcomes: Seq[SubmissionRequestOutcome] = Seq.empty, + submissionsOutcomes: Seq[SubmissionOutcome] = Seq.empty, ) extends OrderedBlockUpdate diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGenerator.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGenerator.scala index ec2155b92d33..bb48d75ba34a 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGenerator.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGenerator.scala @@ -90,7 +90,6 @@ object BlockUpdateGenerator { } class BlockUpdateGeneratorImpl( - synchronizerId: SynchronizerId, protocolVersion: ProtocolVersion, synchronizerSyncCryptoApi: SynchronizerCryptoClient, sequencerId: SequencerId, @@ -107,7 +106,6 @@ class BlockUpdateGeneratorImpl( private val blockChunkProcessor = new BlockChunkProcessor( - synchronizerId, protocolVersion, synchronizerSyncCryptoApi, sequencerId, diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SequencedSubmissionsValidator.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SequencedSubmissionsValidator.scala index 2702c85e92ce..a0b8d5da02d8 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SequencedSubmissionsValidator.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SequencedSubmissionsValidator.scala @@ -16,7 +16,6 @@ import com.digitalasset.canton.synchronizer.sequencer.traffic.SequencerRateLimit import com.digitalasset.canton.topology.* import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.{ErrorUtil, MapsUtil, MonadUtil} -import com.digitalasset.canton.version.ProtocolVersion import scala.concurrent.ExecutionContext @@ -27,8 +26,6 @@ import SubmissionRequestValidator.SubmissionRequestValidationResult /** Validates a list of [[SequencedSubmission]]s corresponding to a chunk. */ private[update] final class SequencedSubmissionsValidator( - synchronizerId: SynchronizerId, - protocolVersion: ProtocolVersion, synchronizerSyncCryptoApi: SynchronizerCryptoClient, sequencerId: SequencerId, rateLimitManager: SequencerRateLimitManager, @@ -40,8 +37,6 @@ private[update] final class SequencedSubmissionsValidator( private val submissionRequestValidator = new SubmissionRequestValidator( - synchronizerId, - protocolVersion, synchronizerSyncCryptoApi, sequencerId, rateLimitManager, @@ -117,35 +112,25 @@ private[update] final class SequencedSubmissionsValidator( ) _ = logger.debug( s"At block $height, the submission request ${signedOrderingRequest.submissionRequest.messageId} " + - s"at $sequencingTimestamp created the following counters: \n" ++ outcome.eventsByMember - .map { case (member, sequencedEvent) => - s"\t counter ${sequencedEvent.counter} for $member" - } - .mkString("\n") + s"at $sequencingTimestamp validated to: ${SubmissionOutcome.prettyString(outcome)}" ) } yield result } private def processSubmissionOutcome( inFlightAggregations: InFlightAggregations, - outcome: SubmissionRequestOutcome, + outcome: SubmissionOutcome, resultIfNoDeliverEvents: SequencedSubmissionsValidationResult, inFlightAggregationUpdates: InFlightAggregationUpdates, sequencerEventTimestamp: Option[CantonTimestamp], - remainingReversedOutcomes: Seq[SubmissionRequestOutcome], + remainingReversedOutcomes: Seq[SubmissionOutcome], )(implicit traceContext: TraceContext - ): FutureUnlessShutdown[SequencedSubmissionsValidationResult] = { - val SubmissionRequestOutcome( - _, - newAggregationO, - unifiedOutcome, - ) = outcome - - unifiedOutcome match { - case _: DeliverableSubmissionOutcome => + ): FutureUnlessShutdown[SequencedSubmissionsValidationResult] = + outcome match { + case deliverable: DeliverableSubmissionOutcome => val (newInFlightAggregations, newInFlightAggregationUpdates) = - newAggregationO.fold(inFlightAggregations -> inFlightAggregationUpdates) { + deliverable.inFlightAggregation.fold(inFlightAggregations -> inFlightAggregationUpdates) { case (aggregationId, inFlightAggregationUpdate) => InFlightAggregations.tryApplyUpdates( inFlightAggregations, @@ -168,7 +153,6 @@ private[update] final class SequencedSubmissionsValidator( case _ => // Discarded submission FutureUnlessShutdown.pure(resultIfNoDeliverEvents) } - } } private[update] object SequencedSubmissionsValidator { @@ -177,6 +161,6 @@ private[update] object SequencedSubmissionsValidator { inFlightAggregations: InFlightAggregations, inFlightAggregationUpdates: InFlightAggregationUpdates = Map.empty, lastSequencerEventTimestamp: Option[CantonTimestamp] = None, - reversedOutcomes: Seq[SubmissionRequestOutcome] = Seq.empty, + reversedOutcomes: Seq[SubmissionOutcome] = Seq.empty, ) } diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SubmissionRequestValidator.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SubmissionRequestValidator.scala index 1e072f398725..870ba72cc34f 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SubmissionRequestValidator.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/SubmissionRequestValidator.scala @@ -10,13 +10,12 @@ import cats.syntax.either.* import cats.syntax.foldable.* import cats.syntax.parallel.* import cats.syntax.traverse.* -import com.digitalasset.canton.SequencerCounter import com.digitalasset.canton.crypto.{HashPurpose, SyncCryptoApi, SynchronizerCryptoClient} import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.discard.Implicits.DiscardOps import com.digitalasset.canton.error.{BaseAlarm, BaseCantonError} import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown} -import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging, TracedLogger} +import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} import com.digitalasset.canton.sequencing.GroupAddressResolver import com.digitalasset.canton.sequencing.protocol.* import com.digitalasset.canton.sequencing.traffic.TrafficReceipt @@ -32,9 +31,7 @@ import com.digitalasset.canton.synchronizer.sequencer.store.SequencerMemberValid import com.digitalasset.canton.synchronizer.sequencer.traffic.SequencerRateLimitManager import com.digitalasset.canton.topology.* import com.digitalasset.canton.tracing.TraceContext -import com.digitalasset.canton.util.ShowUtil.* import com.digitalasset.canton.util.{EitherTUtil, ErrorUtil} -import com.digitalasset.canton.version.ProtocolVersion import monocle.Monocle.toAppliedFocusOps import scala.concurrent.ExecutionContext @@ -44,8 +41,6 @@ import SubmissionRequestValidator.* /** Validates a single [[SubmissionRequest]] within a chunk. */ private[update] final class SubmissionRequestValidator( - synchronizerId: SynchronizerId, - protocolVersion: ProtocolVersion, synchronizerSyncCryptoApi: SynchronizerCryptoClient, sequencerId: SequencerId, rateLimitManager: SequencerRateLimitManager, @@ -56,8 +51,6 @@ private[update] final class SubmissionRequestValidator( extends NamedLogging { private val trafficControlValidator = new TrafficControlValidator( - synchronizerId, - protocolVersion, rateLimitManager, loggerFactory, metrics, @@ -118,7 +111,6 @@ private[update] final class SubmissionRequestValidator( signedOrderingRequest, sequencingTimestamp, latestSequencerEventTimestamp, - signedOrderingRequest.submissionRequest.sender, ) } @@ -156,7 +148,7 @@ private[update] final class SubmissionRequestValidator( s"Sender [${submissionRequest.sender}] of send request [${submissionRequest.messageId}] " + "is not registered so cannot send or receive events. Dropping send request." ) - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard }, ) .mapK(validationFUSK) @@ -167,7 +159,7 @@ private[update] final class SubmissionRequestValidator( ).mapK(validationFUSK) // At this point we know the sender has indeed properly signed the submission request // so we'll want to run the traffic control logic - _ <- EitherT.liftF[SequencedEventValidationF, SubmissionRequestOutcome, Unit]( + _ <- EitherT.liftF[SequencedEventValidationF, SubmissionOutcome, Unit]( WriterT.tell(TrafficConsumption(true)) ) _ <- EitherT.cond[SequencedEventValidationF]( @@ -187,7 +179,7 @@ private[update] final class SubmissionRequestValidator( submissionRequest.messageId.unwrap, ) .discard - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard }, ) _ <- checkRecipientsAreKnown( @@ -231,7 +223,7 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Map[GroupRecipient, Set[Member]]] = { + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Map[GroupRecipient, Set[Member]]] = { val groupRecipients = submissionRequest.batch.allRecipients.collect { case group: GroupRecipient => group @@ -271,7 +263,7 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Map[GroupRecipient, Set[Member]]] = { + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Map[GroupRecipient, Set[Member]]] = { val useSequencersOfSynchronizer = groupRecipients.contains(SequencersOfSynchronizer) if (useSequencersOfSynchronizer) { for { @@ -279,10 +271,10 @@ private[update] final class SubmissionRequestValidator( topologyOrSequencingSnapshot.ipsSnapshot .sequencerGroup() .map( - _.fold[Either[SubmissionRequestOutcome, Set[Member]]]( + _.fold[Either[SubmissionOutcome, Set[Member]]]( // TODO(#14322): review if still applicable and consider an error code (SequencerDeliverError) Left( - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.SubmissionRequestRefused("No sequencer group found"), @@ -293,7 +285,7 @@ private[update] final class SubmissionRequestValidator( ) } yield Map((SequencersOfSynchronizer: GroupRecipient) -> sequencers) } else - EitherT.rightT[FutureUnlessShutdown, SubmissionRequestOutcome]( + EitherT.rightT[FutureUnlessShutdown, SubmissionOutcome]( Map.empty[GroupRecipient, Set[Member]] ) } @@ -304,14 +296,14 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Map[GroupRecipient, Set[Member]]] = + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Map[GroupRecipient, Set[Member]]] = if (!groupRecipients.contains(AllMembersOfSynchronizer)) { - EitherT.rightT[FutureUnlessShutdown, SubmissionRequestOutcome]( + EitherT.rightT[FutureUnlessShutdown, SubmissionOutcome]( Map.empty[GroupRecipient, Set[Member]] ) } else { for { - allMembers <- EitherT.right[SubmissionRequestOutcome]( + allMembers <- EitherT.right[SubmissionOutcome]( topologyOrSequencingSnapshot.ipsSnapshot.allMembers() ) } yield Map((AllMembersOfSynchronizer: GroupRecipient, allMembers)) @@ -325,13 +317,13 @@ private[update] final class SubmissionRequestValidator( )(implicit traceContext: TraceContext, executionContext: ExecutionContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Map[GroupRecipient, Set[Member]]] = { + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Map[GroupRecipient, Set[Member]]] = { val mediatorGroups = groupRecipients.collect { case MediatorGroupRecipient(group) => group }.toSeq if (mediatorGroups.isEmpty) - EitherT.rightT[FutureUnlessShutdown, SubmissionRequestOutcome]( + EitherT.rightT[FutureUnlessShutdown, SubmissionOutcome]( Map.empty[GroupRecipient, Set[Member]] ) else @@ -340,13 +332,13 @@ private[update] final class SubmissionRequestValidator( .mediatorGroupsOfAll(mediatorGroups) .leftMap(nonExistingGroups => // TODO(#14322): review if still applicable and consider an error code (SequencerDeliverError) - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.SubmissionRequestRefused( s"The following mediator groups do not exist $nonExistingGroups" ), - ) + ): SubmissionOutcome ) _ <- groups.parTraverse { group => val nonRegisteredF = @@ -364,13 +356,13 @@ private[update] final class SubmissionRequestValidator( nonRegistered.isEmpty, (), // TODO(#14322): review if still applicable and consider an error code (SequencerDeliverError) - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.SubmissionRequestRefused( s"The mediator group ${group.index} contains non registered mediators $nonRegistered" ), - ), + ): SubmissionOutcome, ) } ) @@ -385,7 +377,7 @@ private[update] final class SubmissionRequestValidator( )(implicit traceContext: TraceContext, executionContext: ExecutionContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Unit] = + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Unit] = submissionRequest.batch.envelopes .parTraverse_ { closedEnvelope => closedEnvelope.verifySignatures( @@ -402,7 +394,7 @@ private[update] final class SubmissionRequestValidator( topologyOrSequencingSnapshot.ipsSnapshot.timestamp, ) .report() - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard } private def validateMaxSequencingTimeForAggregationRule( @@ -412,14 +404,14 @@ private[update] final class SubmissionRequestValidator( )(implicit traceContext: TraceContext, ec: ExecutionContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Unit] = + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Unit] = submissionRequest.aggregationRule.traverse_ { _ => for { synchronizerParameters <- EitherT( topologyOrSequencingSnapshot.ipsSnapshot.findDynamicSynchronizerParameters() ) .leftMap(error => - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.SubmissionRequestRefused( @@ -432,7 +424,7 @@ private[update] final class SubmissionRequestValidator( ) _ <- EitherTUtil.condUnitET[FutureUnlessShutdown]( submissionRequest.maxSequencingTime.toInstant.isBefore(maxSequencingTimeUpperBound), - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.MaxSequencingTimeTooFar( @@ -440,7 +432,7 @@ private[update] final class SubmissionRequestValidator( submissionRequest.maxSequencingTime, maxSequencingTimeUpperBound, ), - ), + ): SubmissionOutcome, ) } yield () } @@ -451,7 +443,7 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Unit] = + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Unit] = // group addresses checks are covered separately later on for { unknownRecipients <- @@ -467,11 +459,11 @@ private[update] final class SubmissionRequestValidator( res <- EitherT.cond[FutureUnlessShutdown]( unknownRecipients.isEmpty, (), - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.UnknownRecipients(unknownRecipients), - ), + ): SubmissionOutcome, ) } yield res @@ -481,7 +473,7 @@ private[update] final class SubmissionRequestValidator( )(implicit traceContext: TraceContext, executionContext: ExecutionContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Unit] = { + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Unit] = { val submissionRequest = signedSubmissionRequest.content val alarm = for { @@ -503,7 +495,7 @@ private[update] final class SubmissionRequestValidator( alarm.leftMap { a => a.report() - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard } } @@ -513,10 +505,10 @@ private[update] final class SubmissionRequestValidator( topologyTimestampError: Option[SequencerDeliverError], )(implicit traceContext: TraceContext - ): Either[SubmissionRequestOutcome, Unit] = + ): Either[SubmissionOutcome, Unit] = topologyTimestampError .map( - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, _, @@ -525,7 +517,8 @@ private[update] final class SubmissionRequestValidator( .toLeft(()) // Performs additional checks and runs the aggregation logic - // If this succeeds, it will produce a SubmissionRequestOutcome containing DeliverEvents + // If this succeeds, it will produce a SubmissionOutcome containing + // the complete validation result for a single submission private def finalizeProcessing( groupToMembers: Map[GroupRecipient, Set[Member]], inFlightAggregations: InFlightAggregations, @@ -536,7 +529,7 @@ private[update] final class SubmissionRequestValidator( executionContext: ExecutionContext, ): EitherT[ FutureUnlessShutdown, - SubmissionRequestOutcome, + SubmissionOutcome, SubmissionRequestValidationResult, ] = for { @@ -546,7 +539,7 @@ private[update] final class SubmissionRequestValidator( SequencerError.MultipleMediatorRecipients .Error(submissionRequest, sequencingTimestamp) .report() - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard }, ) aggregationIdO <- EitherT.fromEither[FutureUnlessShutdown]( @@ -557,7 +550,7 @@ private[update] final class SubmissionRequestValidator( s"Internal error occurred when processing request $sequencingTimestamp: computation of aggregation id: ${err.message}" ) - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard } ) aggregationOutcome <- @@ -623,17 +616,14 @@ private[update] final class SubmissionRequestValidator( } yield SubmissionRequestValidationResult( inFlightAggregations, - SubmissionRequestOutcome( - Map.empty, - aggregationUpdate, - outcome = SubmissionOutcome.Deliver( - submissionRequest, - sequencingTimestamp, - members, - aggregatedBatch, - traceContext, - trafficReceiptO = None, // traffic receipt is updated at the end of the processing - ), + SubmissionOutcome.Deliver( + submissionRequest, + sequencingTimestamp, + members, + aggregatedBatch, + traceContext, + trafficReceiptO = None, // traffic receipt is updated at the end of the processing + inFlightAggregation = aggregationUpdate, ), sequencerEventTimestamp, ) @@ -646,7 +636,7 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, InFlightAggregationUpdate] = { + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, InFlightAggregationUpdate] = { val rule = submissionRequest.aggregationRule.getOrElse( ErrorUtil.internalError( new IllegalStateException( @@ -690,7 +680,7 @@ private[update] final class SubmissionRequestValidator( case InFlightAggregation.AlreadyDelivered(deliveredAt) => val message = s"The aggregatable request with aggregation ID $aggregationId was previously delivered at $deliveredAt" - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.AggregateSubmissionAlreadySent(message), @@ -698,7 +688,7 @@ private[update] final class SubmissionRequestValidator( case InFlightAggregation.AggregationStuffing(_, at) => val message = s"The sender ${submissionRequest.sender} previously contributed to the aggregatable submission with ID $aggregationId at $at" - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.AggregateSubmissionStuffing(message), @@ -719,18 +709,13 @@ private[update] final class SubmissionRequestValidator( logger.debug( s"Aggregation ID $aggregationId has now ${newAggregation.aggregatedSenders.size} senders aggregated. Threshold is ${newAggregation.rule.threshold.value}." ) - val deliverReceiptEvent = - deliverReceipt(submissionRequest, sequencingTimestamp) - SubmissionRequestOutcome( - Map(submissionRequest.sender -> deliverReceiptEvent), - Some(aggregationId -> fullInFlightAggregationUpdate), - outcome = SubmissionOutcome.DeliverReceipt( - submissionRequest, - sequencingTimestamp, - traceContext, - trafficReceiptO = None, // traffic receipt is updated at the end of the processing - ), - ) + SubmissionOutcome.DeliverReceipt( + submissionRequest, + sequencingTimestamp, + traceContext, + trafficReceiptO = None, // traffic receipt is updated at the end of the processing + inFlightAggregation = Some(aggregationId -> fullInFlightAggregationUpdate), + ): SubmissionOutcome }, ) .mapK(FutureUnlessShutdown.outcomeK) @@ -744,7 +729,7 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Unit] = + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Unit] = for { _ <- wellFormedAggregationRule(submissionRequest, rule) @@ -762,13 +747,13 @@ private[update] final class SubmissionRequestValidator( .condUnitET( unregisteredEligibleMembers.isEmpty, // TODO(#14322): review if still applicable and consider an error code (SequencerDeliverError) - invalidSubmissionRequest( + SubmissionOutcome.Reject.logAndCreate( submissionRequest, sequencingTimestamp, SequencerErrors.SubmissionRequestRefused( s"Aggregation rule contains unregistered eligible members: $unregisteredEligibleMembers" ), - ), + ): SubmissionOutcome, ) .mapK(FutureUnlessShutdown.outcomeK) } yield () @@ -779,7 +764,7 @@ private[update] final class SubmissionRequestValidator( )(implicit executionContext: ExecutionContext, traceContext: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Unit] = + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Unit] = EitherT.fromEither( SequencerValidations .wellformedAggregationRule(submissionRequest.sender, rule) @@ -788,42 +773,10 @@ private[update] final class SubmissionRequestValidator( .Error(submissionRequest.messageId, message) alarm.report() - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard } ) - private def deliverReceipt( - submissionRequest: SubmissionRequest, - sequencingTimestamp: CantonTimestamp, - ): SequencedEvent[ClosedEnvelope] = - Deliver.create( - SequencerCounter.Genesis, - sequencingTimestamp, - synchronizerId, - Some(submissionRequest.messageId), - Batch.empty(protocolVersion), - // Since the receipt does not contain any envelopes and does not authenticate the envelopes - // in any way, there is no point in including a topology timestamp in the receipt, - // as it cannot be used to prove anything about the submission anyway. - None, - protocolVersion, - Option.empty[TrafficReceipt], - ) - - private def invalidSubmissionRequest( - submissionRequest: SubmissionRequest, - sequencingTimestamp: CantonTimestamp, - sequencerError: SequencerDeliverError, - )(implicit traceContext: TraceContext): SubmissionRequestOutcome = - SubmissionRequestValidator.invalidSubmissionRequest( - submissionRequest, - sequencingTimestamp, - sequencerError, - logger, - synchronizerId, - protocolVersion, - ) - // Off-boarded sequencers may still receive blocks (e.g., BFT sequencers still contribute to ordering for a while // after being deactivated in the Canton topology, specifically until the underlying consensus algorithm // allows them to be also removed from the BFT ordering topology), but they should not be considered addressed, @@ -843,7 +796,7 @@ private[update] object SubmissionRequestValidator { type SequencedEventValidationF[A] = WriterT[FutureUnlessShutdown, TrafficConsumption, A] // Type of validation methods, uses SequencedEventValidationF as the F of an EitherT // This gives us short circuiting semantics while having access to the traffic consumption state at the end - type SequencedEventValidation[A] = EitherT[SequencedEventValidationF, SubmissionRequestOutcome, A] + type SequencedEventValidation[A] = EitherT[SequencedEventValidationF, SubmissionOutcome, A] def validationFUSK(implicit executionContext: ExecutionContext) = WriterT.liftK[FutureUnlessShutdown, TrafficConsumption] def validationK(implicit executionContext: ExecutionContext) = @@ -866,13 +819,13 @@ private[update] object SubmissionRequestValidator { final case class SubmissionRequestValidationResult( inFlightAggregations: InFlightAggregations, - outcome: SubmissionRequestOutcome, + outcome: SubmissionOutcome, latestSequencerEventTimestamp: Option[CantonTimestamp], ) { // When we reach the end of the validation, we decide based on the outcome so far // if we should try to consume traffic for the event. - def shouldTryToConsumeTraffic: Boolean = outcome.outcome match { + def shouldTryToConsumeTraffic: Boolean = outcome match { // The happy case where the request will be delivered - traffic should be consumed case _: SubmissionOutcome.Deliver => true // This is a deliver receipt from an aggregated submission - traffic should be consumed @@ -889,7 +842,7 @@ private[update] object SubmissionRequestValidator { // Wasted traffic is defined as events that have been sequenced but will not be delivered to their // recipients. This method return a Some with the reason if the traffic was wasted, None otherwise - def wastedTrafficReason: Option[String] = outcome.outcome match { + def wastedTrafficReason: Option[String] = outcome match { // Only events that are delivered are not wasted case _: SubmissionOutcome.Deliver => None case _: SubmissionOutcome.DeliverReceipt => None @@ -899,39 +852,8 @@ private[update] object SubmissionRequestValidator { } def updateTrafficReceipt( - sender: Member, - trafficReceipt: Option[TrafficReceipt], + trafficReceipt: Option[TrafficReceipt] ): SubmissionRequestValidationResult = - copy(outcome = outcome.updateTrafficReceipt(sender, trafficReceipt)) - } - - private[update] def invalidSubmissionRequest( - submissionRequest: SubmissionRequest, - sequencingTimestamp: CantonTimestamp, - sequencerError: SequencerDeliverError, - logger: TracedLogger, - synchronizerId: SynchronizerId, - protocolVersion: ProtocolVersion, - )(implicit traceContext: TraceContext): SubmissionRequestOutcome = { - val SubmissionRequest(sender, messageId, _, _, _, _, _) = submissionRequest - logger.debug( - show"Rejecting submission request $messageId from $sender with error ${sequencerError.code - .toMsg(sequencerError.cause, correlationId = None, limit = None)}" - ) - SubmissionRequestOutcome.reject( - submissionRequest, - sender, - DeliverError - .create( - SequencerCounter.Genesis, - sequencingTimestamp, - synchronizerId, - messageId, - sequencerError, - protocolVersion, - Option.empty[TrafficReceipt], // Traffic receipt is updated in at the end of processing - ), - traceContext, - ) + copy(outcome = outcome.updateTrafficReceipt(trafficReceipt)) } } diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/TrafficControlValidator.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/TrafficControlValidator.scala index 1fcdd10c2716..b51f943f2f5e 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/TrafficControlValidator.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/block/update/TrafficControlValidator.scala @@ -9,11 +9,7 @@ import com.digitalasset.canton.config.RequireTypes.NonNegativeLong import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown} import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.sequencing.protocol.{ - SequencerDeliverError, - SequencerErrors, - SubmissionRequest, -} +import com.digitalasset.canton.sequencing.protocol.SequencerErrors import com.digitalasset.canton.sequencing.traffic.TrafficReceipt import com.digitalasset.canton.synchronizer.metrics.SequencerMetrics import com.digitalasset.canton.synchronizer.sequencer.Sequencer.{ @@ -27,12 +23,9 @@ import com.digitalasset.canton.synchronizer.sequencer.traffic.{ import com.digitalasset.canton.synchronizer.sequencer.{ DeliverableSubmissionOutcome, SubmissionOutcome, - SubmissionRequestOutcome, } -import com.digitalasset.canton.topology.{Member, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.ErrorUtil -import com.digitalasset.canton.version.ProtocolVersion import scala.concurrent.ExecutionContext @@ -47,34 +40,17 @@ import SubmissionRequestValidator.{ * Largely it applies traffic control rules, and insert a traffic receipt in the deliver receipts to the sender with the result. */ private[update] class TrafficControlValidator( - synchronizerId: SynchronizerId, - protocolVersion: ProtocolVersion, rateLimitManager: SequencerRateLimitManager, override val loggerFactory: NamedLoggerFactory, metrics: SequencerMetrics, )(implicit closeContext: CloseContext) extends NamedLogging { - private def invalidSubmissionRequest( - submissionRequest: SubmissionRequest, - sequencingTimestamp: CantonTimestamp, - sequencerError: SequencerDeliverError, - )(implicit traceContext: TraceContext): SubmissionRequestOutcome = - SubmissionRequestValidator.invalidSubmissionRequest( - submissionRequest, - sequencingTimestamp, - sequencerError, - logger, - synchronizerId, - protocolVersion, - ) - def applyTrafficControl( submissionValidation: SequencedEventValidationF[SubmissionRequestValidationResult], signedOrderingRequest: SignedOrderingRequest, sequencingTimestamp: CantonTimestamp, latestSequencerEventTimestamp: Option[CantonTimestamp], - sender: Member, )(implicit traceContext: TraceContext, executionContext: ExecutionContext, @@ -107,7 +83,7 @@ private[update] class TrafficControlValidator( ) .map { receipt => // On successful consumption, updated the result with the receipt - val updated = result.updateTrafficReceipt(sender, receipt) + val updated = result.updateTrafficReceipt(receipt) // It's still possible that the result itself is not a `Deliver` (because of earlier failed validation) // In that case we record it as wasted traffic updated.wastedTrafficReason match { @@ -126,7 +102,7 @@ private[update] class TrafficControlValidator( .leftMap { trafficConsumptionErrorOutcome => // If the traffic consumption failed and the processing so far had produced a valid outcome // we replace it with the failed outcome from traffic validation - val updated = result.outcome.outcome match { + val updated = result.outcome match { case _: DeliverableSubmissionOutcome => result.copy( outcome = trafficConsumptionErrorOutcome, @@ -137,7 +113,7 @@ private[update] class TrafficControlValidator( } if (result.latestSequencerEventTimestamp.isDefined) { logger.debug( - s"An event addressed to the sequencer (likely a topology event) was rejected due to a traffic control error. For that reason the lastSequencerEventTimestamp was not updated, as the event will not be delivered to the sequencer. ${trafficConsumptionErrorOutcome.outcome}" + s"An event addressed to the sequencer (likely a topology event) was rejected due to a traffic control error. For that reason the lastSequencerEventTimestamp was not updated, as the event will not be delivered to the sequencer. $trafficConsumptionErrorOutcome" ) } recordSequencingWasted( @@ -161,9 +137,8 @@ private[update] class TrafficControlValidator( )(implicit ec: ExecutionContext, tc: TraceContext, - ): EitherT[FutureUnlessShutdown, SubmissionRequestOutcome, Option[TrafficReceipt]] = { + ): EitherT[FutureUnlessShutdown, SubmissionOutcome, Option[TrafficReceipt]] = { val request = orderingRequest.signedSubmissionRequest - val sender = request.content.sender rateLimitManager .validateRequestAndConsumeTraffic( request.content, @@ -184,17 +159,17 @@ private[update] class TrafficControlValidator( logger.debug( s"Sender does not have enough traffic at $sequencingTimestamp for event with cost ${error.trafficCost} processed by sequencer ${orderingRequest.signature.signedBy}" ) - invalidSubmissionRequest( - request.content, - sequencingTimestamp, - SequencerErrors.TrafficCredit(error.toString), - ) + SubmissionOutcome.Reject + .logAndCreate( + request.content, + sequencingTimestamp, + SequencerErrors.TrafficCredit(error.toString), + ) .updateTrafficReceipt( - sender, // When above traffic limit we don't consume traffic, hence cost = 0 Some( error.trafficState.copy(lastConsumedCost = NonNegativeLong.zero).toTrafficReceipt - ), + ) ) // Outdated event costs are possible if the sender is too far behind and out of the tolerance window. // This could be the case if the processing sequencer itself is so far behind that it lets the request @@ -206,11 +181,13 @@ private[update] class TrafficControlValidator( s"Event cost for event at $sequencingTimestamp from sender ${request.content.sender} sent" + s" to sequencer ${orderingRequest.content.sequencerId} was outdated: $error." ) - invalidSubmissionRequest( - request.content, - sequencingTimestamp, - SequencerErrors.OutdatedTrafficCost(error.toString), - ).updateTrafficReceipt(sender, error.trafficReceipt) + SubmissionOutcome.Reject + .logAndCreate( + request.content, + sequencingTimestamp, + SequencerErrors.OutdatedTrafficCost(error.toString), + ) + .updateTrafficReceipt(error.trafficReceipt) // An incorrect event cost means the sender calculated an incorrect submission cost even // according to its own supplied submission topology timestamp. // Additionally, the sequencer that received the submission request did not stop it. @@ -218,7 +195,7 @@ private[update] class TrafficControlValidator( // So we raise an alarm and discard the request. case error: SequencerRateLimitError.IncorrectEventCost.Error => error.report() - SubmissionRequestOutcome.discardSubmissionRequest + SubmissionOutcome.Discard // This indicates either that we're doing crash recovery but the traffic consumed data has been pruned already // Or that we somehow consumed traffic out of order. We can't really distinguish between the 2 but either way // this needs to be visible because it requires investigation and is very likely sign of a bug. diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencer.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencer.scala index 0d748e004d0a..e6c4c1bd3b6a 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencer.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencer.scala @@ -21,7 +21,6 @@ import com.digitalasset.canton.synchronizer.sequencer.errors.{ } import com.digitalasset.canton.time.{Clock, PeriodicAction} import com.digitalasset.canton.topology.Member -import com.digitalasset.canton.tracing.Spanning.SpanWrapper import com.digitalasset.canton.tracing.{Spanning, TraceContext} import com.digitalasset.canton.util.ShowUtil.* import io.opentelemetry.api.trace.Tracer @@ -100,21 +99,6 @@ abstract class BaseSequencer( traceContext: TraceContext ): FutureUnlessShutdown[Unit] - override def sendAsync( - submission: SubmissionRequest - )(implicit traceContext: TraceContext): EitherT[FutureUnlessShutdown, SendAsyncError, Unit] = - withSpan("Sequencer.sendAsync") { implicit traceContext => span => - setSpanAttributes(span, submission) - for { - _ <- sendAsyncInternal(submission) - } yield () - } - - private def setSpanAttributes(span: SpanWrapper, submission: SubmissionRequest): Unit = { - span.setAttribute("sender", submission.sender.toString) - span.setAttribute("message_id", submission.messageId.unwrap) - } - protected def localSequencerMember: Member protected def disableMemberInternal(member: Member)(implicit traceContext: TraceContext diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/Sequencer.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/Sequencer.scala index bd2565caff6d..c35479842838 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/Sequencer.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/Sequencer.scala @@ -100,10 +100,6 @@ trait Sequencer traceContext: TraceContext ): EitherT[FutureUnlessShutdown, SendAsyncError, Unit] - def sendAsync(submission: SubmissionRequest)(implicit - traceContext: TraceContext - ): EitherT[FutureUnlessShutdown, SendAsyncError, Unit] - def read(member: Member, offset: SequencerCounter)(implicit traceContext: TraceContext ): EitherT[FutureUnlessShutdown, CreateSubscriptionError, Sequencer.EventSource] diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionOutcome.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionOutcome.scala index ef5af0fba746..e7fb62e1a39b 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionOutcome.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionOutcome.scala @@ -3,14 +3,24 @@ package com.digitalasset.canton.synchronizer.sequencer +import cats.implicits.showInterpolator import com.digitalasset.canton.data.CantonTimestamp -import com.digitalasset.canton.sequencing.protocol.{Batch, ClosedEnvelope, SubmissionRequest} +import com.digitalasset.canton.logging.ErrorLoggingContext +import com.digitalasset.canton.sequencing.protocol.{ + AggregationId, + Batch, + ClosedEnvelope, + SequencerDeliverError, + SubmissionRequest, +} import com.digitalasset.canton.sequencing.traffic.TrafficReceipt import com.digitalasset.canton.topology.Member import com.digitalasset.canton.tracing.TraceContext import com.google.rpc.status.Status -sealed trait SubmissionOutcome +sealed trait SubmissionOutcome { + def updateTrafficReceipt(trafficReceiptO: Option[TrafficReceipt]): SubmissionOutcome +} sealed trait DeliverableSubmissionOutcome extends SubmissionOutcome { def submission: SubmissionRequest @@ -22,7 +32,11 @@ sealed trait DeliverableSubmissionOutcome extends SubmissionOutcome { def trafficReceiptO: Option[TrafficReceipt] - def updateTrafficReceipt(trafficReceiptO: Option[TrafficReceipt]): DeliverableSubmissionOutcome + override def updateTrafficReceipt( + trafficReceiptO: Option[TrafficReceipt] + ): DeliverableSubmissionOutcome + + def inFlightAggregation: Option[(AggregationId, InFlightAggregationUpdate)] } object SubmissionOutcome { @@ -42,6 +56,7 @@ object SubmissionOutcome { batch: Batch[ClosedEnvelope], override val submissionTraceContext: TraceContext, override val trafficReceiptO: Option[TrafficReceipt], + override val inFlightAggregation: Option[(AggregationId, InFlightAggregationUpdate)], ) extends DeliverableSubmissionOutcome { override def updateTrafficReceipt( trafficReceiptO: Option[TrafficReceipt] @@ -61,6 +76,7 @@ object SubmissionOutcome { override val sequencingTime: CantonTimestamp, override val submissionTraceContext: TraceContext, override val trafficReceiptO: Option[TrafficReceipt], + override val inFlightAggregation: Option[(AggregationId, InFlightAggregationUpdate)], ) extends DeliverableSubmissionOutcome { override def deliverToMembers: Set[Member] = Set(submission.sender) @@ -71,7 +87,10 @@ object SubmissionOutcome { /** The submission was fully discarded, no error is delivered to sender, no messages are sent to the members. */ - case object Discard extends SubmissionOutcome + case object Discard extends SubmissionOutcome { + override def updateTrafficReceipt(trafficReceiptO: Option[TrafficReceipt]): SubmissionOutcome = + this + } /** The submission was rejected and an error should be delivered to the sender. * @@ -94,6 +113,39 @@ object SubmissionOutcome { override def updateTrafficReceipt( trafficReceiptO: Option[TrafficReceipt] ): DeliverableSubmissionOutcome = copy(trafficReceiptO = trafficReceiptO) + + override def inFlightAggregation: Option[(AggregationId, InFlightAggregationUpdate)] = None + } + + object Reject { + def logAndCreate( + submission: SubmissionRequest, + sequencingTime: CantonTimestamp, + sequencerError: SequencerDeliverError, + )(implicit + traceContext: TraceContext, + loggingContext: ErrorLoggingContext, + ): Reject = { + loggingContext.debug( + show"Rejecting submission request ${submission.messageId} from ${submission.sender} with error ${sequencerError.code + .toMsg(sequencerError.cause, correlationId = None, limit = None)}" + ) + + new Reject( + submission, + sequencingTime, + sequencerError.rpcStatusWithoutLoggingContext(), + traceContext, + trafficReceiptO = None, + ) + } + } + + def prettyString(outcome: SubmissionOutcome): String = outcome match { + case _: Deliver => "Deliver" + case _: DeliverReceipt => "DeliverReceipt" + case Discard => "Discard" + case reject: Reject => s"Reject(${reject.error.message})" } } diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionRequestOutcome.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionRequestOutcome.scala deleted file mode 100644 index e938a4efc33f..000000000000 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/SubmissionRequestOutcome.scala +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package com.digitalasset.canton.synchronizer.sequencer - -import com.digitalasset.canton.sequencing.protocol.* -import com.digitalasset.canton.sequencing.traffic.TrafficReceipt -import com.digitalasset.canton.synchronizer.sequencer.SubmissionOutcome.Discard -import com.digitalasset.canton.topology.Member -import com.digitalasset.canton.tracing.TraceContext - -/** Describes the outcome of processing a submission request: - * - * @param eventsByMember The sequenced events for each member that is recipient of the submission request. - * @param inFlightAggregation If [[scala.Some$]], the [[com.digitalasset.canton.sequencing.protocol.AggregationId]] - * and the [[InFlightAggregationUpdate]] - * of the current in-flight aggregation state. - * If [[scala.None$]], then the submission request either is not aggregatable or was refused. - * @param outcome The outcome of the submission request for the interface with Database sequencer. - */ -final case class SubmissionRequestOutcome( - eventsByMember: Map[Member, SequencedEvent[ClosedEnvelope]], - inFlightAggregation: Option[(AggregationId, InFlightAggregationUpdate)], - outcome: SubmissionOutcome, -) { - def updateTrafficReceipt( - sender: Member, - trafficReceipt: Option[TrafficReceipt], - ): SubmissionRequestOutcome = { - // Find the event addressed to the sender in the map, that's the receipt - val receipt = eventsByMember.get(sender) - // Update it with the traffic consumed - val updated: Option[SequencedEvent[ClosedEnvelope]] = receipt.map { - case deliverError: DeliverError => deliverError.updateTrafficReceipt(trafficReceipt) - case deliver: Deliver[ClosedEnvelope] => - deliver.updateTrafficReceipt(trafficReceipt = trafficReceipt) - } - // Put it back to the map - val updatedMap = updated - .map(sender -> _) - .map(updatedReceipt => eventsByMember + updatedReceipt) - .getOrElse(eventsByMember) - - val outcomeWithTrafficReceipt = outcome match { - case deliverableOutcome: DeliverableSubmissionOutcome => - deliverableOutcome.updateTrafficReceipt(trafficReceipt) - case Discard => outcome - } - - this.copy(eventsByMember = updatedMap, outcome = outcomeWithTrafficReceipt) - } -} - -object SubmissionRequestOutcome { - val discardSubmissionRequest: SubmissionRequestOutcome = - SubmissionRequestOutcome( - Map.empty, - None, - outcome = SubmissionOutcome.Discard, - ) - - def reject( - submission: SubmissionRequest, - sender: Member, - rejection: DeliverError, - submissionTraceContext: TraceContext, - ): SubmissionRequestOutcome = - SubmissionRequestOutcome( - Map(sender -> rejection), - None, - outcome = SubmissionOutcome.Reject( - submission, - rejection.timestamp, - rejection.reason, - submissionTraceContext, - trafficReceiptO = None, // traffic receipt is updated at the end of processing - ), - ) -} diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/BlockSequencer.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/BlockSequencer.scala index ee08c3355322..297bd51172f2 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/BlockSequencer.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/BlockSequencer.scala @@ -139,7 +139,6 @@ class BlockSequencer( noTracingLogger.info(s"Subscribing to block source from ${headState.block.height + 1}") val updateGenerator = new BlockUpdateGeneratorImpl( - synchronizerId, protocolVersion, cryptoApi, sequencerId, diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/availability/AvailabilityModule.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/availability/AvailabilityModule.scala index 02b0643da377..c4f0b412ad97 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/availability/AvailabilityModule.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/availability/AvailabilityModule.scala @@ -127,7 +127,8 @@ final class AvailabilityModule[E <: Env[E]]( exception, ) case Success(Left(exception)) => - logger.warn( + // Info because it can also happen at epoch boundaries + logger.info( s"Skipping message since we can't verify signature for ${signedMessage.message} (signature ${signedMessage.signature}) reason=$exception" ) emitInvalidMessage(metrics, signedMessage.from) diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/IssConsensusModule.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/IssConsensusModule.scala index 16ba4a534dbd..2947955376fd 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/IssConsensusModule.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/IssConsensusModule.scala @@ -406,8 +406,7 @@ final class IssConsensusModule[E <: Env[E]]( case Consensus.ConsensusMessage.PbftUnverifiedNetworkMessage(underlyingNetworkMessage) => context.pipeToSelf( - signatureVerifier - .verify(underlyingNetworkMessage, activeTopologyInfo.currentCryptoProvider) + signatureVerifier.verify(underlyingNetworkMessage, activeTopologyInfo) ) { case Failure(error) => logger.warn( @@ -423,7 +422,8 @@ final class IssConsensusModule[E <: Env[E]]( ) None case Success(Left(errors)) => - logger.warn( + // Info because it can also happen at epoch boundaries + logger.info( s"Message $underlyingNetworkMessage from ${underlyingNetworkMessage.from} failed validation, dropping: $errors" ) emitNonCompliance(metrics)( diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/IssConsensusSignatureVerifier.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/IssConsensusSignatureVerifier.scala index 5a237b831fe9..ff92d7092282 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/IssConsensusSignatureVerifier.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/IssConsensusSignatureVerifier.scala @@ -13,7 +13,12 @@ import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framewor ProofOfAvailability, } import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.data.ordering.ConsensusCertificate -import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.modules.ConsensusSegment.ConsensusMessage.PbftNetworkMessage +import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.data.topology.OrderingTopologyInfo +import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.modules.ConsensusSegment.ConsensusMessage.{ + PbftNetworkMessage, + PrePrepare, + Prepare, +} import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.modules.{ Consensus, ConsensusSegment, @@ -31,32 +36,32 @@ final class IssConsensusSignatureVerifier[E <: Env[E]] { def verify( message: SignedMessage[ConsensusSegment.ConsensusMessage.PbftNetworkMessage], - cryptoProvider: CryptoProvider[E], + topologyInfo: OrderingTopologyInfo[E], )(implicit context: E#ActorContextT[Consensus.Message[E]], traceContext: TraceContext, ): VerificationResult = validateSignedMessage[ConsensusSegment.ConsensusMessage.PbftNetworkMessage]( - validateMessage(_, cryptoProvider) + validateMessage(_, topologyInfo) )(message)( context, - cryptoProvider, + topologyInfo.currentCryptoProvider, traceContext, ) private def validateMessage( message: ConsensusSegment.ConsensusMessage.PbftNetworkMessage, - cryptoProvider: CryptoProvider[E], + topologyInfo: OrderingTopologyInfo[E], )(implicit context: E#ActorContextT[Consensus.Message[E]], traceContext: TraceContext, ): VerificationResult = { - implicit val implicitCryptoProvider: CryptoProvider[E] = cryptoProvider + implicit val implicitCryptoProvider: CryptoProvider[E] = topologyInfo.currentCryptoProvider message match { - case p: ConsensusSegment.ConsensusMessage.PrePrepare => - validatePrePrepare(p) - case ConsensusSegment.ConsensusMessage.Prepare( + case p: PrePrepare => + validatePrePrepare(p, topologyInfo) + case Prepare( blockMetadata, viewNumber, hash, @@ -64,10 +69,10 @@ final class IssConsensusSignatureVerifier[E <: Env[E]] { from, ) => context.pureFuture(Either.unit[Seq[SignatureCheckError]]) - case msg: ConsensusSegment.ConsensusMessage.Commit => + case _: ConsensusSegment.ConsensusMessage.Commit => context.pureFuture(Either.unit[Seq[SignatureCheckError]]) case msg: ConsensusSegment.ConsensusMessage.ViewChange => - validateViewChange(msg) + validateViewChange(msg, topologyInfo) case ConsensusSegment.ConsensusMessage.NewView( blockMetadata, segmentIndex, @@ -78,8 +83,8 @@ final class IssConsensusSignatureVerifier[E <: Env[E]] { from, ) => collectFuturesAndFlatten( - viewChanges.map(validateSignedMessage(validateViewChange)) ++ - prePrepares.map(validateSignedMessage(validatePrePrepare)) + viewChanges.map(validateSignedMessage(validateViewChange(_, topologyInfo))) ++ + prePrepares.map(validateSignedMessage(validatePrePrepare(_, topologyInfo))) ) } } @@ -107,13 +112,13 @@ final class IssConsensusSignatureVerifier[E <: Env[E]] { collectFuturesAndFlatten(block.proofs.map(validateProofOfAvailability(_))) private def validatePrePrepare( - message: ConsensusSegment.ConsensusMessage.PrePrepare + message: PrePrepare, + topologyInfo: OrderingTopologyInfo[E], )(implicit context: E#ActorContextT[Consensus.Message[E]], - cryptoProvider: CryptoProvider[E], traceContext: TraceContext, ): VerificationResult = message match { - case ConsensusSegment.ConsensusMessage.PrePrepare( + case PrePrepare( blockMetadata, viewNumber, localTimestamp, @@ -121,27 +126,28 @@ final class IssConsensusSignatureVerifier[E <: Env[E]] { canonicalCommitSet, from, ) => + // Canonical commit sets are validated in more detail later in the process + val maybeCanonicalCommitSetEpochNumber = + canonicalCommitSet.sortedCommits.map(_.message.blockMetadata.epochNumber).headOption + val prePrepareEpochNumber = blockMetadata.epochNumber + implicit val cryptoProvider: CryptoProvider[E] = + if (maybeCanonicalCommitSetEpochNumber.contains(prePrepareEpochNumber)) { + topologyInfo.currentCryptoProvider + } else { + topologyInfo.previousCryptoProvider + } collectFuturesAndFlatten( - // TODO(#22184) validate commit signatures - // canonicalCommitSet.sortedCommits.map( - // validateSignedMessage(validateCommit) - // ) :+ validateOrderingBlock(block) - Seq(validateOrderingBlock(block)) + canonicalCommitSet.sortedCommits.map( + validateSignedMessage(_ => context.pureFuture(Either.unit[Seq[SignatureCheckError]])) + ) :+ validateOrderingBlock(block) ) } - private def validateConsensusCertificate(certificate: ConsensusCertificate)(implicit - context: E#ActorContextT[Consensus.Message[E]], - cryptoProvider: CryptoProvider[E], - traceContext: TraceContext, - ): VerificationResult = - validateSignedMessage(validatePrePrepare)(certificate.prePrepare) - private def validateViewChange( - message: ConsensusSegment.ConsensusMessage.ViewChange + message: ConsensusSegment.ConsensusMessage.ViewChange, + topologyInfo: OrderingTopologyInfo[E], )(implicit context: E#ActorContextT[Consensus.Message[E]], - cryptoProvider: CryptoProvider[E], traceContext: TraceContext, ): VerificationResult = message match { case ConsensusSegment.ConsensusMessage.ViewChange( @@ -152,7 +158,18 @@ final class IssConsensusSignatureVerifier[E <: Env[E]] { certs, from, ) => - collectFuturesAndFlatten(certs.map(validateConsensusCertificate(_))) + collectFuturesAndFlatten(certs.map(validateConsensusCertificate(_, topologyInfo))) + } + + private def validateConsensusCertificate( + certificate: ConsensusCertificate, + topologyInfo: OrderingTopologyInfo[E], + )(implicit + context: E#ActorContextT[Consensus.Message[E]], + traceContext: TraceContext, + ): VerificationResult = { + implicit val implicitCryptoProvider: CryptoProvider[E] = topologyInfo.currentCryptoProvider + validateSignedMessage[PrePrepare](validatePrePrepare(_, topologyInfo))(certificate.prePrepare) } private def validateSignedMessage[A <: PbftNetworkMessage]( diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/PbftMessageValidatorImpl.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/PbftMessageValidatorImpl.scala index 33ab5fb4bf34..7a868914e898 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/PbftMessageValidatorImpl.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/consensus/iss/validation/PbftMessageValidatorImpl.scala @@ -20,7 +20,6 @@ final class PbftMessageValidatorImpl(epoch: Epoch, metrics: BftOrderingMetrics)( mc: MetricsContext ) extends PbftMessageValidator { - // TODO(#17108): verify PrePrepare is sound in terms of ProofsOfAvailability def validatePrePrepare(prePrepare: PrePrepare, firstInSegment: Boolean): Either[String, Unit] = { val block = prePrepare.block val prePrepareEpochNumber = prePrepare.blockMetadata.epochNumber diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/mempool/MempoolModule.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/mempool/MempoolModule.scala index 7086d792be38..fb44585445f0 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/mempool/MempoolModule.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/modules/mempool/MempoolModule.scala @@ -64,7 +64,7 @@ class MempoolModule[E <: Env[E]]( val rejectionMessage = s"mempool received client request of size $payloadSize " + s"but it exceeds the maximum (${config.maxRequestPayloadBytes}), dropping it" - logger.info(rejectionMessage) + logger.warn(rejectionMessage) from.foreach(_.asyncSend(SequencerNode.RequestRejected(rejectionMessage))) metrics.ingress.labels.outcome.values.RequestTooBig } else { diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkIn.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkIn.scala index 77f5e02794bf..b9cda89de8ee 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkIn.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkIn.scala @@ -47,10 +47,12 @@ class BftP2PNetworkIn[E <: Env[E]]( context: E#ActorContextT[BftOrderingServiceReceiveRequest], traceContext: TraceContext, ): Unit = { - logger.debug(s"Received network message $message") + val sentBySequencerUid = message.sentBySequencerUid + logger.debug(s"Received network message from $sentBySequencerUid") + logger.trace(s"Message from $sentBySequencerUid is: $message") val start = Instant.now val sequencerIdOrError = UniqueIdentifier - .fromProtoPrimitive(message.sentBySequencerUid, "sent_by_sequencer_uid") + .fromProtoPrimitive(sentBySequencerUid, "sent_by_sequencer_uid") .map(SequencerId(_)) .leftMap(_.toString) parseAndForwardBody( diff --git a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkOut.scala b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkOut.scala index c0509a5c3bf3..0e69271348a4 100644 --- a/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkOut.scala +++ b/sdk/canton/community/synchronizer/src/main/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/core/networking/BftP2PNetworkOut.scala @@ -204,7 +204,8 @@ final class BftP2PNetworkOut[E <: Env[E]]( val mc1: MetricsContext = sendMetricsContext(metrics, serializedMessage, to, droppedAsUnauthenticated = false) locally { - logger.debug(s"Sending network message to $to: $message") + logger.debug(s"Sending network message to $to") + logger.trace(s"Message to $to is: $message") implicit val mc: MetricsContext = mc1 networkSend(ref, serializedMessage) emitSendStats(metrics, serializedMessage) diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessorTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessorTest.scala index 36a8fdef4490..f5e7f7aa92c4 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessorTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockChunkProcessorTest.scala @@ -13,10 +13,10 @@ import com.digitalasset.canton.synchronizer.block.update.{ BlockUpdateGeneratorImpl, } import com.digitalasset.canton.synchronizer.metrics.SequencerTestMetrics +import com.digitalasset.canton.synchronizer.sequencer.SubmissionOutcome import com.digitalasset.canton.synchronizer.sequencer.block.BlockSequencerFactory.OrderingTimeFixMode import com.digitalasset.canton.synchronizer.sequencer.store.SequencerMemberValidator import com.digitalasset.canton.synchronizer.sequencer.traffic.SequencerRateLimitManager -import com.digitalasset.canton.synchronizer.sequencer.{SubmissionOutcome, SubmissionRequestOutcome} import com.digitalasset.canton.topology.DefaultTestIdentities.{sequencerId, synchronizerId} import com.digitalasset.canton.topology.TestingIdentityFactory import org.scalatest.wordspec.AsyncWordSpec @@ -50,7 +50,6 @@ class BlockChunkProcessorTest extends AsyncWordSpec with BaseTest { val blockChunkProcessor = new BlockChunkProcessor( - synchronizerId, testedProtocolVersion, syncCryptoApiFake, sequencerId, @@ -77,25 +76,22 @@ class BlockChunkProcessorTest extends AsyncWordSpec with BaseTest { state.latestSequencerEventTimestamp shouldBe Some(tickSequencingTimestamp) update.submissionsOutcomes should matchPattern { case Seq( - SubmissionRequestOutcome( - _, - None, - SubmissionOutcome.Deliver( - SubmissionRequest( - `sequencerId`, - `aMessageId`, - _, - `tickSequencingTimestamp`, - None, - None, - None, - ), - `tickSequencingTimestamp`, - deliverToMembers, - batch, - _, + SubmissionOutcome.Deliver( + SubmissionRequest( + `sequencerId`, + `aMessageId`, _, + `tickSequencingTimestamp`, + None, + None, + None, ), + `tickSequencingTimestamp`, + deliverToMembers, + batch, + _, + _, + None, ) ) if deliverToMembers == Set(sequencerId) && diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGeneratorImplTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGeneratorImplTest.scala index 5a5e435707e7..6b0575a4a510 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGeneratorImplTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/block/update/BlockUpdateGeneratorImplTest.scala @@ -55,7 +55,6 @@ class BlockUpdateGeneratorImplTest val blockUpdateGenerator = new BlockUpdateGeneratorImpl( - synchronizerId, testedProtocolVersion, syncCryptoApiFake, sequencerId, @@ -97,7 +96,6 @@ class BlockUpdateGeneratorImplTest val blockUpdateGenerator = new BlockUpdateGeneratorImpl( - synchronizerId, testedProtocolVersion, syncCryptoApiFake, sequencerId, diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencerTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencerTest.scala index a4dc6df15ca7..a04af19ea13b 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencerTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/BaseSequencerTest.scala @@ -190,24 +190,18 @@ class BaseSequencerTest extends AsyncWordSpec with BaseTest with FailOnShutdown FutureUnlessShutdown.pure(existingMembers.contains(member)) } - Seq(("sendAsync", false), ("sendAsyncSigned", true)).foreach { case (name, useSignedSend) => - def send(sequencer: Sequencer)(submission: SubmissionRequest) = - if (useSignedSend) - sequencer.sendAsyncSigned( - SignedContent(submission, Signature.noSignature, None, testedProtocolVersion) - ) - else sequencer.sendAsync(submission) - - name should { - - "sends should not auto register" in { - val sequencer = new StubSequencer(existingMembers = Set(participant1)) - val request = submission(from = participant1, to = Set(participant1, participant2)) - - for { - _ <- send(sequencer)(request).value.failOnShutdown - } yield sequencer.newlyRegisteredMembers shouldBe empty - } + "sendAsyncSigned" should { + "sends should not auto register" in { + val sequencer = new StubSequencer(existingMembers = Set(participant1)) + val request = submission(from = participant1, to = Set(participant1, participant2)) + for { + _ <- sequencer + .sendAsyncSigned( + SignedContent(request, Signature.noSignature, None, testedProtocolVersion) + ) + .value + .failOnShutdown + } yield sequencer.newlyRegisteredMembers shouldBe empty } } diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/DatabaseSequencerSnapshottingTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/DatabaseSequencerSnapshottingTest.scala index 317e0f5936aa..61efac92f391 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/DatabaseSequencerSnapshottingTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/DatabaseSequencerSnapshottingTest.scala @@ -5,10 +5,11 @@ package com.digitalasset.canton.synchronizer.sequencer import com.digitalasset.canton.SequencerCounter import com.digitalasset.canton.config.{CachingConfigs, DefaultProcessingTimeouts} -import com.digitalasset.canton.crypto.SynchronizerCryptoClient +import com.digitalasset.canton.crypto.{HashPurpose, SynchronizerCryptoClient} import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.protocol.DynamicSynchronizerParameters import com.digitalasset.canton.resource.MemoryStorage +import com.digitalasset.canton.sequencing.client.RequestSigner import com.digitalasset.canton.sequencing.protocol.{Recipients, SubmissionRequest} import com.digitalasset.canton.sequencing.traffic.TrafficReceipt import com.digitalasset.canton.synchronizer.metrics.SequencerMetrics @@ -27,16 +28,19 @@ final class DatabaseSequencerSnapshottingTest extends SequencerApiTest { )(implicit materializer: Materializer): CantonSequencer = createSequencerWithSnapshot(None) + val crypto = TestingIdentityFactory( + TestingTopology(), + loggerFactory, + DynamicSynchronizerParameters.initialValues(clock, testedProtocolVersion), + ).forOwnerAndSynchronizer(owner = mediatorId, synchronizerId) + + val requestSigner = RequestSigner(crypto, testedProtocolVersion, loggerFactory) + def createSequencerWithSnapshot( initialState: Option[SequencerInitialState] )(implicit materializer: Materializer): DatabaseSequencer = { if (clock == null) clock = createClock() - val crypto = TestingIdentityFactory( - TestingTopology(), - loggerFactory, - DynamicSynchronizerParameters.initialValues(clock, testedProtocolVersion), - ).forOwnerAndSynchronizer(owner = mediatorId, synchronizerId) val metrics = SequencerMetrics.noop("database-sequencer-test") val dbConfig = TestDatabaseSequencerConfig() @@ -90,6 +94,9 @@ final class DatabaseSequencerSnapshottingTest extends SequencerApiTest { TestDatabaseSequencerWrapper(sequencer.asInstanceOf[DatabaseSequencer]) for { + signedRequest <- valueOrFail( + requestSigner.signRequest(request, HashPurpose.SubmissionRequestSignature).failOnShutdown + )(s"Sign request") _ <- valueOrFail( testSequencerWrapper.registerMemberInternal(sender, CantonTimestamp.Epoch).failOnShutdown )( @@ -103,7 +110,7 @@ final class DatabaseSequencerSnapshottingTest extends SequencerApiTest { "Register sequencer" ) - _ <- sequencer.sendAsync(request).valueOrFailShutdown("Sent async") + _ <- sequencer.sendAsyncSigned(signedRequest).valueOrFailShutdown("Sent async") messages <- readForMembers(List(sender), sequencer).failOnShutdown("readForMembers") _ = { val details = EventDetails( @@ -155,11 +162,14 @@ final class DatabaseSequencerSnapshottingTest extends SequencerApiTest { )(snapshot.representativeProtocolVersion)) } + signedRequest2 <- valueOrFail( + requestSigner.signRequest(request2, HashPurpose.SubmissionRequestSignature) + )(s"Sign request") _ <- { // need to advance clock so that the new event doesn't get the same timestamp as the previous one, // which would then cause it to be ignored on the read path simClockOrFail(clock).advance(Duration.ofSeconds(1)) - secondSequencer.sendAsync(request2).value + secondSequencer.sendAsyncSigned(signedRequest2).value } messages2 <- readForMembers( diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerTest.scala index 99c48c48bca3..da23fcaa8eb7 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerTest.scala @@ -6,7 +6,7 @@ package com.digitalasset.canton.synchronizer.sequencer import cats.syntax.parallel.* import com.digitalasset.canton.config.RequireTypes.PositiveInt import com.digitalasset.canton.config.{CachingConfigs, DefaultProcessingTimeouts, ProcessingTimeout} -import com.digitalasset.canton.crypto.SynchronizerCryptoClient +import com.digitalasset.canton.crypto.{HashPurpose, SynchronizerCryptoClient} import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.lifecycle.* import com.digitalasset.canton.logging.TracedLogger @@ -18,6 +18,7 @@ import com.digitalasset.canton.protocol.messages.{ import com.digitalasset.canton.protocol.v30 import com.digitalasset.canton.resource.MemoryStorage import com.digitalasset.canton.sequencing.OrdinarySerializedEvent +import com.digitalasset.canton.sequencing.client.RequestSigner import com.digitalasset.canton.sequencing.protocol.* import com.digitalasset.canton.synchronizer.metrics.SequencerMetrics import com.digitalasset.canton.synchronizer.sequencer.store.{InMemorySequencerStore, SequencerStore} @@ -82,24 +83,33 @@ class SequencerTest loggerFactory = loggerFactory, ) val clock = new WallClock(timeouts, loggerFactory = loggerFactory) + private val testingTopology = TestingTopology( + sequencerGroup = SequencerGroup( + active = Seq(SequencerId(synchronizerId.uid)), + passive = Seq.empty, + threshold = PositiveInt.one, + ), + participants = Seq( + alice, + bob, + carole, + ).map((_, ParticipantAttributes(ParticipantPermission.Confirmation))).toMap, + ) + .build(loggerFactory) + val crypto: SynchronizerCryptoClient = valueOrFail( - TestingTopology( - sequencerGroup = SequencerGroup( - active = Seq(SequencerId(synchronizerId.uid)), - passive = Seq.empty, - threshold = PositiveInt.one, - ), - participants = Seq( - alice, - bob, - carole, - ).map((_, ParticipantAttributes(ParticipantPermission.Confirmation))).toMap, - ) - .build(loggerFactory) + testingTopology .forOwner(SequencerId(synchronizerId.uid)) .forSynchronizer(synchronizerId, defaultStaticSynchronizerParameters) .toRight("crypto error") )("building crypto") + val aliceCrypto: SynchronizerCryptoClient = valueOrFail( + testingTopology + .forOwner(alice) + .forSynchronizer(synchronizerId, defaultStaticSynchronizerParameters) + .toRight("crypto error") + )("building alice crypto") + val metrics: SequencerMetrics = SequencerMetrics.noop("sequencer-test") val dbConfig = CommunitySequencerConfig.Database() @@ -231,7 +241,10 @@ class SequencerTest )( "member registration" ) - _ <- sequencer.sendAsync(submission).valueOrFail("send") + signedSubmission <- RequestSigner(aliceCrypto, testedProtocolVersion, loggerFactory) + .signRequest(submission, HashPurpose.SubmissionRequestSignature) + .valueOrFail("sign request") + _ <- sequencer.sendAsyncSigned(signedSubmission).valueOrFail("send") aliceDeliverEvent <- readAsSeq(alice, 1) .map(_.loneElement.signedEvent.content) .map(asDeliverEvent) diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerWriterSourceTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerWriterSourceTest.scala index 5a58a3bb6157..1d5e0db3eb18 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerWriterSourceTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/SequencerWriterSourceTest.scala @@ -424,6 +424,7 @@ class SequencerWriterSourceTest batch = batch, submissionTraceContext = TraceContext.empty, trafficReceiptO = None, + inFlightAggregation = None, ) ) )("send to unknown recipient") @@ -611,6 +612,7 @@ class SequencerWriterSourceTest batch = batch, submissionTraceContext = TraceContext.empty, trafficReceiptO = None, + inFlightAggregation = None, ) ) )("send").failOnShutdown diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/framework/simulation/SimulationSettings.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/framework/simulation/SimulationSettings.scala index 780b80102183..32f93a5b44c9 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/framework/simulation/SimulationSettings.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/framework/simulation/SimulationSettings.scala @@ -3,6 +3,8 @@ package com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.simulation +import com.digitalasset.canton.config.RequireTypes.PositiveInt + import java.util.concurrent.TimeUnit import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.util.Random @@ -118,6 +120,9 @@ final case class SimulationSettings( durationOfFirstPhaseWithFaults: FiniteDuration, durationOfSecondPhaseWithoutFaults: FiniteDuration = 30.seconds, clientRequestInterval: Option[FiniteDuration] = Some(1.second), + clientRequestApproximateByteSize: Option[PositiveInt] = Some( + PositiveInt.three // fully arbitrary + ), livenessCheckInterval: FiniteDuration = 20.seconds, peerOnboardingDelays: Iterable[FiniteDuration] = Iterable.empty, becomingOnlineAfterOnboardingDelay: FiniteDuration = diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/BftOrderingSimulationTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/BftOrderingSimulationTest.scala index 0fe2b8c4590e..920dc08ce136 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/BftOrderingSimulationTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/BftOrderingSimulationTest.scala @@ -5,7 +5,7 @@ package com.digitalasset.canton.synchronizer.sequencer.block.bftordering.simulat import com.daml.metrics.api.MetricsContext import com.digitalasset.canton.BaseTest -import com.digitalasset.canton.config.RequireTypes.Port +import com.digitalasset.canton.config.RequireTypes.{Port, PositiveInt} import com.digitalasset.canton.data.CantonTimestamp import com.digitalasset.canton.logging.TracedLogger import com.digitalasset.canton.networking.Endpoint @@ -411,7 +411,7 @@ trait BftOrderingSimulationTest extends AnyFlatSpec with BaseTest { requestInspector, ) }, - IssClient.initializer(simSettings.clientRequestInterval, thisPeer, peerLogger, timeouts), + IssClient.initializer(simSettings, thisPeer, peerLogger, timeouts), initializeImmediately, ) } @@ -607,6 +607,7 @@ class BftOrderingEmptyBlocksSimulationTest extends BftOrderingSimulationTest { durationOfSecondPhaseWithoutFaults, // This will result in empty blocks only. clientRequestInterval = None, + clientRequestApproximateByteSize = None, // This value is lower than the default to prevent view changes from ensuring liveness (as we want empty blocks to ensure it). // When the simulation becomes "healthy", we don't know when the last crash (resetting the view change timeout) // or view change happened. Similarly, we don't know how "advanced" the empty block creation at that moment is. @@ -618,6 +619,39 @@ class BftOrderingEmptyBlocksSimulationTest extends BftOrderingSimulationTest { ) } +// Note that simulation tests don't use a real network, so this test doesn't cover gRPC messages. +class BftOrderingSimulationTest2NodesLargeRequests extends BftOrderingSimulationTest { + override val numberOfRuns: Int = 1 + override val numberOfInitialPeers: Int = 2 + + private val durationOfFirstPhaseWithFaults = 1.minute + private val durationOfSecondPhaseWithoutFaults = 1.minute + + private val randomSourceToCreateSettings: Random = + new Random(4) // Manually remove the seed for fully randomized local runs. + + override def generateStages(): Seq[SimulationTestStage] = Seq( + SimulationTestStage( + simulationSettings = SimulationSettings( + LocalSettings( + randomSeed = randomSourceToCreateSettings.nextLong() + ), + NetworkSettings( + randomSeed = randomSourceToCreateSettings.nextLong() + ), + durationOfFirstPhaseWithFaults, + durationOfSecondPhaseWithoutFaults, + // The test is a bit slow with the default interval + clientRequestInterval = Some(10.seconds), + clientRequestApproximateByteSize = + // -100 to account for tags and payloads' prefixes + // Exceeding the default size results in warning logs and dropping messages in Mempool + Some(PositiveInt.tryCreate(BftBlockOrderer.DefaultMaxRequestPayloadBytes - 100)), + ) + ) + ) +} + /* // TODO(#17284) Activate when we can handle the crash restart fault class BftOrderingSimulationTest2NodesCrashFaults extends BftOrderingSimulationTest { diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/bftordering/IssClient.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/bftordering/IssClient.scala index 387a54323a16..521c9c729fec 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/bftordering/IssClient.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/simulation/bftordering/IssClient.scala @@ -7,7 +7,10 @@ import com.digitalasset.canton.config.ProcessingTimeout import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.data.OrderingRequest import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.modules.Mempool -import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.simulation.SimulationClient +import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.simulation.{ + SimulationClient, + SimulationSettings, +} import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.{ Env, Module, @@ -17,26 +20,36 @@ import com.digitalasset.canton.topology.SequencerId import com.digitalasset.canton.tracing.{TraceContext, Traced} import com.google.protobuf.ByteString -import scala.concurrent.duration.FiniteDuration +import scala.util.Random class IssClient[E <: Env[E]]( - requestInterval: Option[FiniteDuration], + simSettings: SimulationSettings, mempool: ModuleRef[Mempool.OrderRequest], name: String, override val loggerFactory: NamedLoggerFactory, override val timeouts: ProcessingTimeout, ) extends Module[E, Unit] with NamedLogging { + + private val random = new Random(simSettings.localSettings.randomSeed) + private var submissionNumber = 0 + override def receiveInternal(msg: Unit)(implicit context: E#ActorContextT[Unit], traceContext: TraceContext, ): Unit = { + val additionalPayload = + simSettings.clientRequestApproximateByteSize + .map(bytes => + ByteString.copyFromUtf8("-").concat(ByteString.copyFrom(random.nextBytes(bytes.value))) + ) + .getOrElse(ByteString.empty) val request = Mempool.OrderRequest( Traced( OrderingRequest( "tag", - ByteString.copyFromUtf8(f"$name-submission-$submissionNumber"), + ByteString.copyFromUtf8(s"$name-submission-$submissionNumber").concat(additionalPayload), ) ) ) @@ -44,13 +57,13 @@ class IssClient[E <: Env[E]]( mempool.asyncSend(request) - requestInterval.foreach(interval => context.delayedEvent(interval, ())) + simSettings.clientRequestInterval.foreach(interval => context.delayedEvent(interval, ())) } } object IssClient { def initializer[E <: Env[E]]( - requestInterval: Option[FiniteDuration], + simSettings: SimulationSettings, name: SequencerId, loggerFactory: NamedLoggerFactory, timeouts: ProcessingTimeout, @@ -58,11 +71,11 @@ object IssClient { new SimulationClient.Initializer[E, Unit, Mempool.Message] { override def createClient(systemRef: ModuleRef[Mempool.Message]): Module[E, Unit] = - new IssClient[E](requestInterval, systemRef, name.filterString, loggerFactory, timeouts) + new IssClient[E](simSettings, systemRef, name.filterString, loggerFactory, timeouts) override def init(context: E#ActorContextT[Unit]): Unit = // If the interval is None, the progress of the simulation time will solely depend on other delayed events // across the BFT Ordering Service (e.g., clock tick events from the Availability module). - requestInterval.foreach(interval => context.delayedEvent(interval, ())) + simSettings.clientRequestInterval.foreach(interval => context.delayedEvent(interval, ())) } } diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/MempoolModuleTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/MempoolModuleTest.scala index 186e8b485115..86ecab28a15c 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/MempoolModuleTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/MempoolModuleTest.scala @@ -4,8 +4,8 @@ package com.digitalasset.canton.synchronizer.sequencer.block.bftordering.unit.modules import com.daml.metrics.api.MetricsContext -import com.digitalasset.canton.BaseTest import com.digitalasset.canton.synchronizer.metrics.SequencerMetrics +import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.core.BftSequencerBaseTest import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.core.driver.BftBlockOrderer import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.core.modules.mempool.{ MempoolModule, @@ -19,6 +19,7 @@ import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framewor SequencerNode, } import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.framework.{Env, ModuleRef} +import com.digitalasset.canton.synchronizer.sequencer.block.bftordering.unit.modules.UnitTestContext.DelayCount import com.digitalasset.canton.tracing.Traced import com.google.protobuf.ByteString import org.scalatest.wordspec.AnyWordSpec @@ -26,9 +27,7 @@ import org.scalatest.wordspec.AnyWordSpec import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.FiniteDuration -import UnitTestContext.DelayCount - -class MempoolModuleTest extends AnyWordSpec with BaseTest { +class MempoolModuleTest extends AnyWordSpec with BftSequencerBaseTest { private val AnOrderRequest = Mempool.OrderRequest( Traced(OrderingRequest("tag", ByteString.copyFromUtf8("b"))) @@ -66,15 +65,13 @@ class MempoolModuleTest extends AnyWordSpec with BaseTest { "refuse the request" in { val mempool = createMempool[UnitTestEnv](fakeModuleExpectingSilence, maxRequestPayloadBytes = 0) - mempool.receiveInternal( - Mempool.CreateLocalBatches(1) - ) - mempool.receiveInternal( - Mempool.OrderRequest( - Traced( - OrderingRequest("tag", ByteString.copyFromUtf8("c")) - ), - requestRefusedHandler, + mempool.receiveInternal(Mempool.CreateLocalBatches(1)) + suppressProblemLogs( + mempool.receiveInternal( + Mempool.OrderRequest( + Traced(OrderingRequest("tag", ByteString.copyFromUtf8("c"))), + requestRefusedHandler, + ) ) ) succeed diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/availability/AvailabilityModuleTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/availability/AvailabilityModuleTest.scala index 191a0290f33b..0ba4b656449c 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/availability/AvailabilityModuleTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencer/block/bftordering/unit/modules/availability/AvailabilityModuleTest.scala @@ -65,7 +65,7 @@ import com.digitalasset.canton.topology.SequencerId import com.digitalasset.canton.tracing.Traced import com.google.protobuf.ByteString import org.scalatest.wordspec.AnyWordSpec -import org.slf4j.event.Level.WARN +import org.slf4j.event.Level import java.util.concurrent.atomic.AtomicReference import scala.collection.mutable @@ -580,7 +580,7 @@ class AvailabilityModuleTest extends AnyWordSpec with BftSequencerBaseTest { RemoteDissemination.RemoteBatch.create(WrongBatchId, ABatch, from = Node1Peer) ), log => { - log.level shouldBe WARN + log.level shouldBe Level.WARN log.message should include("BatchId doesn't match digest") }, ) @@ -1233,7 +1233,7 @@ class AvailabilityModuleTest extends AnyWordSpec with BftSequencerBaseTest { RemoteOutputFetch.RemoteBatchDataFetched.create(Node1Peer, otherBatchId, ABatch) ), log => { - log.level shouldBe WARN + log.level shouldBe Level.WARN log.message should include("BatchId doesn't match digest") }, ) @@ -2387,7 +2387,7 @@ class AvailabilityModuleTest extends AnyWordSpec with BftSequencerBaseTest { assertLogs( context.runPipedMessages() shouldBe Seq(Availability.NoOp), (logEntry: LogEntry) => { - logEntry.level shouldBe WARN + logEntry.level shouldBe Level.INFO logEntry.message should include("Skipping message since we can't verify signature") }, ) diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerIntegrationTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerIntegrationTest.scala index 8ad1f9787af0..75d46851bf19 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerIntegrationTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerIntegrationTest.scala @@ -382,8 +382,6 @@ class GrpcSequencerIntegrationTest "send from the client gets a message to the sequencer" in { env => val anotherParticipant = ParticipantId("another") - when(env.sequencer.sendAsync(any[SubmissionRequest])(anyTraceContext)) - .thenReturn(EitherT.pure[FutureUnlessShutdown, SendAsyncError](())) when(env.sequencer.sendAsyncSigned(any[SignedContent[SubmissionRequest]])(anyTraceContext)) .thenReturn(EitherT.pure[FutureUnlessShutdown, SendAsyncError](())) implicit val metricsContext: MetricsContext = MetricsContext.Empty diff --git a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerServiceTest.scala b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerServiceTest.scala index a5f76d6d7ea0..3b8d753c4358 100644 --- a/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerServiceTest.scala +++ b/sdk/canton/community/synchronizer/src/test/scala/com/digitalasset/canton/synchronizer/sequencing/service/GrpcSequencerServiceTest.scala @@ -79,8 +79,6 @@ class GrpcSequencerServiceTest class Environment(member: Member) extends Matchers { val sequencer: Sequencer = mock[Sequencer] - when(sequencer.sendAsync(any[SubmissionRequest])(anyTraceContext)) - .thenReturn(EitherT.rightT[FutureUnlessShutdown, SendAsyncError](())) when(sequencer.sendAsyncSigned(any[SignedContent[SubmissionRequest]])(anyTraceContext)) .thenReturn(EitherT.rightT[FutureUnlessShutdown, SendAsyncError](())) when(sequencer.acknowledgeSigned(any[SignedContent[AcknowledgeRequest]])(anyTraceContext)) diff --git a/sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/config/InProcessGrpcName.scala b/sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/config/InProcessGrpcName.scala new file mode 100644 index 000000000000..8ff30e81ed93 --- /dev/null +++ b/sdk/canton/daml-common-staging/util-external/src/main/scala/com/digitalasset/canton/config/InProcessGrpcName.scala @@ -0,0 +1,10 @@ +// Copyright (c) 2025 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.digitalasset.canton.config + +import com.digitalasset.canton.config.RequireTypes.Port + +object InProcessGrpcName { + def forPort(port: Port): String = s"inprocess-grpc-${port.unwrap}" +} diff --git a/sdk/canton/ref b/sdk/canton/ref index 33dcf0414aca..c4c840994baa 100644 --- a/sdk/canton/ref +++ b/sdk/canton/ref @@ -1 +1 @@ -20250211.15190.vb6aa7695 +20250212.15197.v49c4ce7c