Skip to content

Commit

Permalink
Add ability to start only indexer or only ledger API server [KVL-584] (
Browse files Browse the repository at this point in the history
…#7574)

* Add ability to start only indexer or lapi server

CHANGELOG_BEGIN
CHANGELOG_END

* Change command line arguments

* Use Resource.unit

* Fix MultiParticipantFixture

* Add a new conformance test

* Improve retrying connecting to the database

* Improve naming

* Introduce shardName

and use it to create unique metric names

* Fix a merge error

* Remove unused comment

* Fix test

* Run conformance-test-split-participant in batch mode

Co-authored-by: tudor-da <tudor.voicu@digitalasset.com>
  • Loading branch information
rautenrieth-da and tudor-da authored Oct 12, 2020
1 parent 8e905b3 commit 8f57b9b
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.daml.bazeltools.BazelRunfiles._
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, OwnedResource, SuiteResource}
import com.daml.ledger.api.tls.TlsConfiguration
import com.daml.ledger.on.memory.{ExtraConfig, Owner}
import com.daml.ledger.participant.state.kvutils.app.ParticipantConfig
import com.daml.ledger.participant.state.kvutils.app.{ParticipantConfig, ParticipantRunMode}
import com.daml.ledger.participant.state.kvutils.{app => kvutils}
import com.daml.ledger.participant.state.v1
import com.daml.lf.engine.script._
Expand Down Expand Up @@ -41,7 +41,9 @@ trait MultiParticipantFixture

private val participantId1 = v1.ParticipantId.assertFromString("participant1")
private val participant1 = ParticipantConfig(
mode = ParticipantRunMode.Combined,
participantId = participantId1,
shardName = None,
address = Some("localhost"),
port = Port.Dynamic,
portFile = Some(participant1Portfile),
Expand All @@ -52,7 +54,9 @@ trait MultiParticipantFixture
)
private val participantId2 = v1.ParticipantId.assertFromString("participant2")
private val participant2 = ParticipantConfig(
mode = ParticipantRunMode.Combined,
participantId = participantId2,
shardName = None,
address = Some("localhost"),
port = Port.Dynamic,
portFile = Some(participant2Portfile),
Expand Down
18 changes: 18 additions & 0 deletions ledger/ledger-on-memory/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,24 @@ conformance_test(
],
)

conformance_test(
name = "conformance-test-split-participant",
ports = [
6865,
],
server = ":app",
server_args = [
"--contract-id-seeding=testing-weak",
"--participant participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=server1,run-mode=ledger-api-server",
"--participant participant-id=split-example,port=6865,server-jdbc-url=jdbc:h2:mem:split-example;db_close_delay=-1;db_close_on_exit=false,shard-name=indexer,run-mode=indexer",
"--batching enable=true,max-batch-size-bytes=262144",
],
test_tool_args = [
"--verbose",
"--exclude=ConfigManagementServiceIT",
],
)

conformance_test(
name = "benchmark-performance-envelope",
ports = [6865],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ private[platform] object ReadOnlySqlLedger {
loggingContext: LoggingContext,
): Future[LedgerId] = {
val predicate: PartialFunction[Throwable, Boolean] = {
// If the index database schema is not yet fully created, querying for the
// ledger ID will throw SQL errors.
case _: java.sql.SQLNonTransientException => true
case _: LedgerIdNotFoundException => true
case _: MismatchException.LedgerId => false
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,33 @@ object Config {

opt[Map[String, String]]("participant")
.unbounded()
.text("The configuration of a participant. Comma-separated pairs in the form key=value, with mandatory keys: [participant-id, port] and optional keys [address, port-file, server-jdbc-url, max-commands-in-flight, management-service-timeout]")
.text("The configuration of a participant. Comma-separated pairs in the form key=value, with mandatory keys: [participant-id, port] and optional keys [address, port-file, server-jdbc-url, max-commands-in-flight, management-service-timeout, run-mode, shard-name]")
.action((kv, config) => {
val participantId = ParticipantId.assertFromString(kv("participant-id"))
val port = Port(kv("port").toInt)
val address = kv.get("address")
val portFile = kv.get("port-file").map(new File(_).toPath)
val runMode: ParticipantRunMode = kv.get("run-mode") match {
case None => ParticipantRunMode.Combined
case Some("combined") => ParticipantRunMode.Combined
case Some("indexer") => ParticipantRunMode.Indexer
case Some("ledger-api-server") => ParticipantRunMode.LedgerApiServer
case Some(unknownMode) =>
throw new RuntimeException(
s"$unknownMode is not a valid run mode. Valid modes are: combined, indexer, ledger-api-server. Default mode is combined.")
}
val jdbcUrl =
kv.getOrElse("server-jdbc-url", ParticipantConfig.defaultIndexJdbcUrl(participantId))
val maxCommandsInFlight = kv.get("max-commands-in-flight").map(_.toInt)
val managementServiceTimeout = kv
.get("management-service-timeout")
.map(Duration.parse)
.getOrElse(ParticipantConfig.defaultManagementServiceTimeout)
val shardName = kv.get("shard-name")
val partConfig = ParticipantConfig(
runMode,
participantId,
shardName,
address,
port,
portFile,
Expand All @@ -118,6 +130,7 @@ object Config {
)
config.copy(participants = config.participants :+ partConfig)
})

opt[String]("ledger-id")
.optional()
.text("The ID of the ledger. This must be the same each time the ledger is started. Defaults to a random UUID.")
Expand Down Expand Up @@ -219,6 +232,24 @@ object Config {
}))
}

checkConfig(c => {
val participantsIdsWithNonUniqueShardNames = c.participants
.map(pc => pc.participantId -> pc.shardName)
.groupBy(_._1)
.map { case (k, v) => (k, v.map(_._2)) }
.filter { case (_, v) => v.length != v.distinct.length }
.keys
if (participantsIdsWithNonUniqueShardNames.nonEmpty)
failure(
participantsIdsWithNonUniqueShardNames.mkString(
"The following participant IDs are duplicate, but the individual shards don't have unique names: ",
",",
". Use the optional 'shard-name' key when specifying horizontally scaled participants."
))
else
success
})

help("help").text(s"$name as a service.")
}
extraOptions(parser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ trait ConfigProvider[ExtraConfig] {
def createMetrics(
participantConfig: ParticipantConfig,
config: Config[ExtraConfig],
): Metrics =
new Metrics(SharedMetricRegistries.getOrCreate(participantConfig.participantId))
): Metrics = {
val registryName = participantConfig.participantId + participantConfig.shardName
.map("-" + _)
.getOrElse("")
new Metrics(SharedMetricRegistries.getOrCreate(registryName))
}
}

trait ReadServiceOwner[+RS <: ReadService, ExtraConfig] extends ConfigProvider[ExtraConfig] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ sealed abstract class Mode

object Mode {

/** Run the participant */
case object Run extends Mode

/** Dump index metadata and exit */
final case class DumpIndexMetadata(jdbcUrls: Vector[String]) extends Mode

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import com.daml.ports.Port
import java.time.Duration

final case class ParticipantConfig(
mode: ParticipantRunMode,
participantId: ParticipantId,
// A name of the participant shard in a horizontally scaled participant.
shardName: Option[String],
address: Option[String],
port: Port,
portFile: Option[Path],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.ledger.participant.state.kvutils.app

sealed abstract class ParticipantRunMode

object ParticipantRunMode {

/** Run the full participant, including both the indexer and ledger API server */
case object Combined extends ParticipantRunMode

/** Run only the indexer */
case object Indexer extends ParticipantRunMode

/** Run only the ledger API server */
case object LedgerApiServer extends ParticipantRunMode

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,37 @@ final class Runner[T <: ReadWriteService, Extra](
)
_ <- Resource.fromFuture(
Future.sequence(config.archiveFiles.map(uploadDar(_, writeService))))
_ <- new StandaloneIndexerServer(
readService = readService,
config = factory.indexerConfig(participantConfig, config),
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
_ <- new StandaloneApiServer(
ledgerId = config.ledgerId,
config = factory.apiServerConfig(participantConfig, config),
commandConfig = factory.commandConfig(participantConfig, config),
partyConfig = factory.partyConfig(config),
ledgerConfig = factory.ledgerConfig(config),
optWriteService = Some(writeService),
authService = factory.authService(config),
healthChecks = healthChecks,
metrics = metrics,
timeServiceBackend = factory.timeServiceBackend(config),
otherInterceptors = factory.interceptors(config),
engine = sharedEngine,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
_ <- participantConfig.mode match {
case ParticipantRunMode.Combined | ParticipantRunMode.Indexer =>
new StandaloneIndexerServer(
readService = readService,
config = factory.indexerConfig(participantConfig, config),
metrics = metrics,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
case ParticipantRunMode.LedgerApiServer =>
Resource.unit
}
_ <- participantConfig.mode match {
case ParticipantRunMode.Combined | ParticipantRunMode.LedgerApiServer =>
new StandaloneApiServer(
ledgerId = config.ledgerId,
config = factory.apiServerConfig(participantConfig, config),
commandConfig = factory.commandConfig(participantConfig, config),
partyConfig = factory.partyConfig(config),
ledgerConfig = factory.ledgerConfig(config),
optWriteService = Some(writeService),
authService = factory.authService(config),
healthChecks = healthChecks,
metrics = metrics,
timeServiceBackend = factory.timeServiceBackend(config),
otherInterceptors = factory.interceptors(config),
engine = sharedEngine,
lfValueTranslationCache = lfValueTranslationCache,
).acquire()
case ParticipantRunMode.Indexer =>
Resource.unit
}
} yield ()
})
} yield ()
Expand Down

0 comments on commit 8f57b9b

Please sign in to comment.