Skip to content

Commit

Permalink
update canton to 20250212.15197.v49c4ce7c (#20770)
Browse files Browse the repository at this point in the history
* update canton to 20250212.15197.v49c4ce7c

tell-slack: canton

* Add io_grpc_grpc_inprocess as a dependency

---------

Co-authored-by: Azure Pipelines Daml Build <support@digitalasset.com>
Co-authored-by: Dylan Thinnes <dylan.thinnes@digitalasset.com>
  • Loading branch information
3 people authored Feb 13, 2025
1 parent 7868cab commit c031573
Show file tree
Hide file tree
Showing 47 changed files with 616 additions and 591 deletions.
2 changes: 2 additions & 0 deletions sdk/canton/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ scala_library(
"@maven//:com_zaxxer_HikariCP",
"@maven//:io_circe_circe_core_2_13",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_inprocess",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_grpc_grpc_protobuf",
"@maven//:io_grpc_grpc_services",
Expand Down Expand Up @@ -1156,6 +1157,7 @@ scala_library(
"@maven//:dev_optics_monocle_macro_2_13",
"@maven//:io_circe_circe_core_2_13",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_inprocess",
"@maven//:io_grpc_grpc_netty",
"@maven//:io_grpc_grpc_services",
"@maven//:io_grpc_grpc_stub",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ enum SequencerConnectionValidation {
SEQUENCER_CONNECTION_VALIDATION_ACTIVE = 2;
// Validate all the connections
SEQUENCER_CONNECTION_VALIDATION_ALL = 3;
// Validate only the ones we could reach, but at least threshold many
SEQUENCER_CONNECTION_VALIDATION_THRESHOLD_ACTIVE = 4;
}

message SequencerConnections {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.digitalasset.canton.logging.{ErrorLoggingContext, NamedLoggerFactory}
import com.digitalasset.canton.metrics.DeclarativeApiMetrics
import com.digitalasset.canton.participant.config.LocalParticipantConfig
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.EitherTUtil

import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -83,7 +84,7 @@ object DeclarativeApiManager {
)
}
}
.getOrElse(EitherT.rightT(()))
.getOrElse(EitherTUtil.unit)
}

override def verifyConfig(name: String, config: C)(implicit
Expand All @@ -92,7 +93,6 @@ object DeclarativeApiManager {
config.init.state
.map(c => DeclarativeParticipantApi.readConfig(c.file).map(_ => ()))
.getOrElse(Right(()))

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.digitalasset.canton.admin.api.client.commands.{
}
import com.digitalasset.canton.admin.api.client.data.LedgerApiUser
import com.digitalasset.canton.auth.CantonAdminToken
import com.digitalasset.canton.config.CantonRequireTypes.String255
import com.digitalasset.canton.config.ClientConfig
import com.digitalasset.canton.config.RequireTypes.PositiveInt
import com.digitalasset.canton.console.GrpcAdminCommandRunner
Expand Down Expand Up @@ -195,45 +196,51 @@ final case class DeclarativeSequencerConnectionConfig(
* observed
*/
final case class DeclarativeConnectionConfig(
synchronizerAlias: String,
synchronizerAlias: SynchronizerAlias,
connections: NonEmpty[Map[String, DeclarativeSequencerConnectionConfig]],
manualConnect: Boolean = false,
priority: Int = 0,
initializeFromTrustedSynchronizer: Boolean = false,
trustThreshold: PositiveInt = PositiveInt.one,
) {

def isEquivalent(other: DeclarativeConnectionConfig): Boolean =
manualConnect == other.manualConnect &&
priority == other.priority &&
initializeFromTrustedSynchronizer == other.initializeFromTrustedSynchronizer &&
trustThreshold == other.trustThreshold &&
connections.keySet == other.connections.keySet &&
def isEquivalent(other: DeclarativeConnectionConfig): Boolean = {
val areConnectionsEquivalent = connections.keySet == other.connections.keySet &&
connections.forall { case (name, conn) =>
other.connections.get(name).exists(_.isEquivalent(conn))
}

def toSynchronizerConnectionConfig: SynchronizerConnectionConfig =
SynchronizerConnectionConfig(
synchronizerAlias = SynchronizerAlias.tryCreate(synchronizerAlias),
sequencerConnections = SequencerConnections
.many(
connections = connections.map { case (alias, conn) =>
GrpcSequencerConnection(
endpoints = conn.endpoints,
transportSecurity = conn.transportSecurity,
sequencerAlias = SequencerAlias.tryCreate(alias),
customTrustCertificates = conn.customTrustCertificatesAsByteString.toOption.flatten,
)
}.toSeq,
sequencerTrustThreshold = trustThreshold,
submissionRequestAmplification = SubmissionRequestAmplification.NoAmplification,
)
.getOrElse(throw new IllegalArgumentException("Cannot create sequencer connections")),
manualConnect = manualConnect,
priority = priority,
initializeFromTrustedSynchronizer = initializeFromTrustedSynchronizer,
)
if (areConnectionsEquivalent)
this.copy(connections = other.connections) == other
else false
}

def toSynchronizerConnectionConfig: Either[String, SynchronizerConnectionConfig] = {
val sequencerConnectionsE = SequencerConnections
.many(
connections = connections.map { case (alias, conn) =>
GrpcSequencerConnection(
endpoints = conn.endpoints,
transportSecurity = conn.transportSecurity,
sequencerAlias = SequencerAlias.tryCreate(alias),
customTrustCertificates = conn.customTrustCertificatesAsByteString.toOption.flatten,
)
}.toSeq,
sequencerTrustThreshold = trustThreshold,
submissionRequestAmplification = SubmissionRequestAmplification.NoAmplification,
)

sequencerConnectionsE.map { sequencerConnections =>
SynchronizerConnectionConfig(
synchronizerAlias = synchronizerAlias,
sequencerConnections = sequencerConnections,
manualConnect = manualConnect,
priority = priority,
initializeFromTrustedSynchronizer = initializeFromTrustedSynchronizer,
)
}

}

}

Expand Down Expand Up @@ -643,11 +650,13 @@ class DeclarativeParticipantApi(
checkSelfConsistent: Boolean,
)(implicit traceContext: TraceContext): Either[String, UpdateResult] = {

def toDeclarative(config: SynchronizerConnectionConfig): (String, DeclarativeConnectionConfig) =
def toDeclarative(
config: SynchronizerConnectionConfig
): (SynchronizerAlias, DeclarativeConnectionConfig) =
(
config.synchronizerAlias.unwrap,
config.synchronizerAlias,
DeclarativeConnectionConfig(
synchronizerAlias = config.synchronizerAlias.unwrap,
synchronizerAlias = config.synchronizerAlias,
connections = config.sequencerConnections.aliasToConnection.map {
case (alias, connection: GrpcSequencerConnection) =>
(
Expand All @@ -666,20 +675,21 @@ class DeclarativeParticipantApi(
),
)

def fetchConnections(): Either[String, (Seq[(String, DeclarativeConnectionConfig)])] =
def fetchConnections(): Either[String, Seq[(SynchronizerAlias, DeclarativeConnectionConfig)]] =
queryAdminApi(ParticipantAdminCommands.SynchronizerConnectivity.ListRegisteredSynchronizers)
.map(_.map(_._1).map(toDeclarative))
.map(_.map { case (synchronizerConnectionConfig, _) => synchronizerConnectionConfig }
.map(toDeclarative))

def removeSynchronizerConnection(alias: String): Either[String, Unit] = {
val synchronizerAlias = SynchronizerAlias.tryCreate(alias)
def removeSynchronizerConnection(synchronizerAlias: SynchronizerAlias): Either[String, Unit] =
// cannot really remove connections for now, just disconnect and disable
for {
currentO <- queryAdminApi(
ParticipantAdminCommands.SynchronizerConnectivity.ListRegisteredSynchronizers
).map(_.find(_._1.synchronizerAlias == synchronizerAlias))
).map(_.collectFirst {
case (config, _) if config.synchronizerAlias == synchronizerAlias => config
})
current <- currentO
.toRight(s"Unable to find configuration for synchronizer $alias")
.map(_._1)
.toRight(s"Unable to find configuration for synchronizer $synchronizerAlias")
_ <- queryAdminApi(
ParticipantAdminCommands.SynchronizerConnectivity.DisconnectSynchronizer(
synchronizerAlias
Expand All @@ -693,31 +703,39 @@ class DeclarativeParticipantApi(
)
} yield ()

}
def add(config: DeclarativeConnectionConfig): Either[String, Unit] =
for {
synchronizerConnectionConfig <- config.toSynchronizerConnectionConfig
_ <- queryAdminApi(
ParticipantAdminCommands.SynchronizerConnectivity.ConnectSynchronizer(
synchronizerConnectionConfig,
sequencerConnectionValidation = SequencerConnectionValidation.Active,
)
)
} yield ()

def update(config: DeclarativeConnectionConfig) =
for {
synchronizerConnectionConfig <- config.toSynchronizerConnectionConfig
_ <- queryAdminApi(
ParticipantAdminCommands.SynchronizerConnectivity.ModifySynchronizerConnection(
synchronizerConnectionConfig,
sequencerConnectionValidation = SequencerConnectionValidation.Active,
)
)
} yield ()

run[String, DeclarativeConnectionConfig](
run[SynchronizerAlias, DeclarativeConnectionConfig](
"connections",
removeExcess = removeConnections,
checkSelfConsistent = checkSelfConsistent,
want = connections.map(c => (c.synchronizerAlias, c)),
fetch = _ => fetchConnections(),
add = { case (_, config) =>
queryAdminApi(
ParticipantAdminCommands.SynchronizerConnectivity.ConnectSynchronizer(
config.toSynchronizerConnectionConfig,
sequencerConnectionValidation = SequencerConnectionValidation.Active,
)
)
},
add = { case (_, config) => add(config) },
upd = { case (_, config, existing) =>
if (config.isEquivalent(existing)) Either.unit
else
queryAdminApi(
ParticipantAdminCommands.SynchronizerConnectivity.ModifySynchronizerConnection(
config.toSynchronizerConnectionConfig,
sequencerConnectionValidation = SequencerConnectionValidation.Active,
)
)
update(config)
},
rm = removeSynchronizerConnection,
compare = Some { case (x, y) => x.isEquivalent(y) },
Expand Down Expand Up @@ -827,6 +845,9 @@ object DeclarativeParticipantApi {
import pureconfig.generic.semiauto.*
// import canton config to include the implicit that prevents unknown keys

implicit val synchronizerAliasReader: ConfigReader[SynchronizerAlias] =
ConfigReader[String255].map(SynchronizerAlias(_))

implicit val declarativeParticipantConfigReader: ConfigReader[DeclarativeParticipantConfig] = {
implicit val darConfigReader: ConfigReader[DeclarativeDarConfig] =
deriveReader[DeclarativeDarConfig]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ object SequencerConnectionValidation {
override val toProtoV30: v30.SequencerConnectionValidation =
v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_ACTIVE
}
object ThresholdActive extends SequencerConnectionValidation {
override val toProtoV30: v30.SequencerConnectionValidation =
v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_THRESHOLD_ACTIVE
}

def fromProtoV30(
proto: v30.SequencerConnectionValidation
Expand All @@ -241,6 +245,8 @@ object SequencerConnectionValidation {
Right(Disabled)
case v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_ALL => Right(All)
case v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_ACTIVE => Right(Active)
case v30.SequencerConnectionValidation.SEQUENCER_CONNECTION_VALIDATION_THRESHOLD_ACTIVE =>
Right(ThresholdActive)
case _ =>
Left(
ProtoDeserializationError.ValueConversionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ class SequencerInfoLoader(
): EitherT[FutureUnlessShutdown, Seq[LoadSequencerEndpointInformationResult.NotValid], Unit] =
sequencerConnectionValidation match {
case SequencerConnectionValidation.Disabled => EitherT.rightT(())
case SequencerConnectionValidation.All | SequencerConnectionValidation.Active =>
case SequencerConnectionValidation.All | SequencerConnectionValidation.Active |
SequencerConnectionValidation.ThresholdActive =>
EitherT(
loadSequencerEndpoints(
alias,
Expand All @@ -233,6 +234,7 @@ class SequencerInfoLoader(
SequencerInfoLoader.validateNewSequencerConnectionResults(
expectedSynchronizerId,
sequencerConnectionValidation,
sequencerConnections.sequencerTrustThreshold,
logger,
)(_)
)
Expand Down Expand Up @@ -456,6 +458,7 @@ object SequencerInfoLoader {
def validateNewSequencerConnectionResults(
expectedSynchronizerId: Option[SynchronizerId],
sequencerConnectionValidation: SequencerConnectionValidation,
sequencerTrustThreshold: PositiveInt,
logger: TracedLogger,
)(
results: Seq[LoadSequencerEndpointInformationResult]
Expand All @@ -473,13 +476,15 @@ object SequencerInfoLoader {
case Nil =>
accumulated
case (notValid: LoadSequencerEndpointInformationResult.NotValid) :: rest =>
if (sequencerConnectionValidation != SequencerConnectionValidation.All) {
logger.info(
s"Skipping validation, as I am unable to obtain synchronizer id and sequencer-id: ${notValid.error} for ${notValid.sequencerConnection}"
)
go(reference, sequencerIds, rest, accumulated)
} else
go(reference, sequencerIds, rest, notValid +: accumulated)
sequencerConnectionValidation match {
case SequencerConnectionValidation.All | SequencerConnectionValidation.ThresholdActive =>
go(reference, sequencerIds, rest, notValid +: accumulated)
case SequencerConnectionValidation.Active | SequencerConnectionValidation.Disabled =>
logger.info(
s"Skipping validation, as I am unable to obtain synchronizer id and sequencer-id: ${notValid.error} for ${notValid.sequencerConnection}"
)
go(reference, sequencerIds, rest, accumulated)
}
case (valid: LoadSequencerEndpointInformationResult.Valid) :: rest =>
val result = for {
// check that synchronizer id matches the reference
Expand Down Expand Up @@ -560,7 +565,10 @@ object SequencerInfoLoader {

}
val collected = go(None, Map.empty, results.toList, Seq.empty)
Either.cond(collected.isEmpty, (), collected)
if (collected.isEmpty) Either.unit
else if (sequencerConnectionValidation == SequencerConnectionValidation.ThresholdActive)
Either.cond(results.size - collected.size >= sequencerTrustThreshold.unwrap, (), collected)
else Left(collected)
}

/** Aggregates the endpoint information into the actual connection
Expand Down Expand Up @@ -588,10 +596,9 @@ object SequencerInfoLoader {
validateNewSequencerConnectionResults(
expectedSynchronizerId,
sequencerConnectionValidation,
sequencerTrustThreshold,
logger,
)(
fullResult.toList
) match {
)(fullResult) match {
case Right(()) =>
val validSequencerConnections = fullResult
.collect { case valid: LoadSequencerEndpointInformationResult.Valid =>
Expand Down
Loading

0 comments on commit c031573

Please sign in to comment.