Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pbatko-da committed Dec 2, 2022
1 parent 9decad6 commit b9ffc4f
Show file tree
Hide file tree
Showing 24 changed files with 1,961 additions and 722 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.daml.platform.configuration.{
CommandConfiguration,
IndexServiceConfig,
InitialLedgerConfiguration,
TransactionsFlatStreamReaderConfig,
TransactionsTreeStreamReaderConfig,
}
import com.daml.platform.indexer.ha.HaConfig
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, PackageMetadataViewConfig}
Expand Down Expand Up @@ -341,6 +343,14 @@ class PureConfigReaderWriter(secure: Boolean = true) {
implicit val indexServiceConfigHint =
ProductHint[IndexServiceConfig](allowUnknownKeys = false)

implicit val transactionsTreeStreamReaderConfigConfigConvert
: ConfigConvert[TransactionsTreeStreamReaderConfig] =
deriveConvert[TransactionsTreeStreamReaderConfig]

implicit val transactionsFlatStreamReaderConfigConfigConvert
: ConfigConvert[TransactionsFlatStreamReaderConfig] =
deriveConvert[TransactionsFlatStreamReaderConfig]

implicit val indexServiceConfigConvert: ConfigConvert[IndexServiceConfig] =
deriveConvert[IndexServiceConfig]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,45 @@ class IndexDBMetrics(override val prefix: MetricName, override val registry: Met
val lookupContractByKeyDbMetrics: DatabaseMetrics = createDbMetrics(
"lookup_contract_by_key"
)

// TODO etq: Tune metric names
val flatTxIdsCreate: DatabaseMetrics = createDbMetrics(
"get_create_event_ids_for_flat_transactions"
)
val flatTxIdsConsuming: DatabaseMetrics = createDbMetrics(
"get_consuming_event_ids_for_flat_transactions"
)
val flatTxPayloadCreate: DatabaseMetrics = createDbMetrics(
"get_create_event_payloads_for_flat_transactions"
)
val flatTxPayloadConsuming: DatabaseMetrics = createDbMetrics(
"get_consuming_event_payloads_for_flat_transactions"
)
val treeTxIdsCreateStakeholder: DatabaseMetrics = createDbMetrics(
"get_create_event_ids_for_stakeholder_for_tree_transactions"
)
val treeTxIdsCreateNonStakeholderInformee: DatabaseMetrics = createDbMetrics(
"get_create_event_ids_for_nonstakeholderinformee_for_tree_transactions"
)
val treeTxIdsConsumingStakeholder: DatabaseMetrics = createDbMetrics(
"get_consuming_event_ids_for_stakeholder_for_tree_transactions"
)
val treeTxIdsConsumingNonStakeholderInformee: DatabaseMetrics = createDbMetrics(
"get_consuming_event_ids_for_nonstakeholderinformee_for_tree_transactions"
)
val treeTxIdsNonConsumingInformee: DatabaseMetrics = createDbMetrics(
"get_nonconsuming_event_ids_for_informee_for_tree_transactions"
)
val treeTxPayloadCreate: DatabaseMetrics = createDbMetrics(
"get_create_event_payloads_for_tree_transactions"
)
val treeTxPayloadConsuming: DatabaseMetrics = createDbMetrics(
"get_consuming_event_payloads_for_tree_transactions"
)
val treeTxPayloadNonConsuming: DatabaseMetrics = createDbMetrics(
"get_nonconsuming_event_payloads_for_tree_transactions"
)

val getFlatTransactions: DatabaseMetrics = createDbMetrics("get_flat_transactions")
val lookupFlatTransactionById: DatabaseMetrics = createDbMetrics(
"lookup_flat_transaction_by_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final case class IndexServiceConfig(
acsIdPageBufferSize: Int = IndexServiceConfig.DefaultAcsIdPageBufferSize,
acsIdPageWorkingMemoryBytes: Int = IndexServiceConfig.DefaultAcsIdPageWorkingMemoryBytes,
acsIdFetchingParallelism: Int = IndexServiceConfig.DefaultAcsIdFetchingParallelism,
// TODO etq: Document that it must be a power of two because Akka buffer sizing requires it
// java.lang.IllegalArgumentException: buffer size must be a power of two
// at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.<init>(ActorGraphInterpreter.scala:128)
// TODO etq: Same for other config values used to size an akka streaming buffer
acsContractFetchingParallelism: Int = IndexServiceConfig.DefaultAcsContractFetchingParallelism,
acsGlobalParallelism: Int = IndexServiceConfig.DefaultAcsGlobalParallelism,
maxContractStateCacheSize: Long = IndexServiceConfig.DefaultMaxContractStateCacheSize,
Expand All @@ -26,6 +30,13 @@ final case class IndexServiceConfig(
preparePackageMetadataTimeOutWarning: FiniteDuration =
IndexServiceConfig.PreparePackageMetadataTimeOutWarning,
completionsMaxPayloadsPerPayloadsPage: Int = 1000,
transactionsFlatStreamReaderConfig: TransactionsFlatStreamReaderConfig =
TransactionsFlatStreamReaderConfig.default,
transactionsTreeStreamReaderConfig: TransactionsTreeStreamReaderConfig =
TransactionsTreeStreamReaderConfig.default,
// TODO etq: Take care what config key names will get exposed in the user-facing participant config
globalMaxIdQueries: Int = 20,
globalMaxPayloadQueries: Int = 10,
)

object IndexServiceConfig {
Expand All @@ -36,6 +47,7 @@ object IndexServiceConfig {
val DefaultAcsIdPageBufferSize: Int = 1
val DefaultAcsIdPageWorkingMemoryBytes: Int = 100 * 1024 * 1024
val DefaultAcsIdFetchingParallelism: Int = 2
// TODO etq: This must be a power of 2
val DefaultAcsContractFetchingParallelism: Int = 2
val DefaultAcsGlobalParallelism: Int = 10
val DefaultMaxContractStateCacheSize: Long = 100000L
Expand All @@ -46,3 +58,41 @@ object IndexServiceConfig {
val DefaultInMemoryFanOutThreadPoolSize: Int = 16
val PreparePackageMetadataTimeOutWarning: FiniteDuration = FiniteDuration(1, "second")
}

// TODO etq: Take care what config key names will get exposed in the user-facing participant config
case class TransactionsFlatStreamReaderConfig(
maxIdsPerIdPage: Int = 20000,
maxPagesPerIdPagesBuffer: Int = 1,
maxWorkingMemoryInBytesForIdPages: Int = 100 * 1024 * 1024,
maxPayloadsPerPayloadsPage: Int = 1000, // eventsPageSize
maxParallelIdCreateQueries: Int = 4,
maxParallelIdConsumingQueries: Int = 4,
// TODO etq: This must be a power of 2
maxParallelPayloadCreateQueries: Int = 2,
// TODO etq: This must be a power of 2
maxParallelPayloadConsumingQueries: Int = 2,
maxParallelPayloadQueries: Int = 2,
payloadProcessingParallelism: Int = 8,
)
object TransactionsFlatStreamReaderConfig {
def default: TransactionsFlatStreamReaderConfig = TransactionsFlatStreamReaderConfig()
}

// TODO etq: Take care what config key names will get exposed in the user-facing participant config
case class TransactionsTreeStreamReaderConfig(
maxIdsPerIdPage: Int = 20000,
maxPagesPerIdPagesBuffer: Int = 1,
maxWorkingMemoryInBytesForIdPages: Int = 100 * 1024 * 1024,
maxPayloadsPerPayloadsPage: Int = 1000,
maxParallelIdCreateQueries: Int = 8,
maxParallelIdConsumingQueries: Int = 8,
maxParallelIdNonConsumingQueries: Int = 4,
maxParallelPayloadCreateQueries: Int = 2,
maxParallelPayloadConsumingQueries: Int = 2,
maxParallelPayloadNonConsumingQueries: Int = 2,
maxParallelPayloadQueries: Int = 2,
payloadProcessingParallelism: Int = 8,
)
object TransactionsTreeStreamReaderConfig {
def default: TransactionsTreeStreamReaderConfig = TransactionsTreeStreamReaderConfig()
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ final class IndexServiceOwner(
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,
completionsMaxPayloadsPerPayloadsPage = config.completionsMaxPayloadsPerPayloadsPage,
transactionsFlatStreamReaderConfig = config.transactionsFlatStreamReaderConfig,
transactionsTreeStreamReaderConfig = config.transactionsTreeStreamReaderConfig,
globalMaxIdQueries = config.globalMaxIdQueries,
globalMaxPayloadQueries = config.globalMaxPayloadQueries,
)

private def buildInMemoryFanOutExecutionContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset
import com.daml.platform.configuration.ServerRole
import com.daml.platform.configuration.{
ServerRole,
TransactionsFlatStreamReaderConfig,
TransactionsTreeStreamReaderConfig,
}
import com.daml.platform.store.DbSupport.{ConnectionPoolConfig, DbConfig}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
Expand Down Expand Up @@ -88,6 +92,10 @@ object IndexMetadata {
participantId = Ref.ParticipantId.assertFromString("1"),
ledgerEndCache = MutableLedgerEndCache(), // not used
stringInterning = new StringInterningView(), // not used
transactionsFlatStreamReaderConfig = TransactionsFlatStreamReaderConfig.default,
transactionsTreeStreamReaderConfig = TransactionsTreeStreamReaderConfig.default,
globalMaxIdQueries = 20,
globalMaxPayloadQueries = 10,
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.daml.platform.{
}
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.dao.events.Raw
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
Expand All @@ -34,6 +34,8 @@ import com.daml.scalautil.NeverEqualsOverride
import java.sql.Connection
import javax.sql.DataSource

import com.daml.platform.store.backend.common.TransactionStreamingQueries

import scala.annotation.unused

/** Encapsulates the interface which hides database technology specific implementations.
Expand Down Expand Up @@ -262,6 +264,8 @@ object ContractStorageBackend {

trait EventStorageBackend {

def transactionStreamingQueries: TransactionStreamingQueries

/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
*/
def pruneEvents(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(
Expand All @@ -273,17 +277,7 @@ trait EventStorageBackend {
pruneAllDivulgedContracts: Boolean,
connection: Connection,
): Boolean
def transactionEvents(
rangeParams: RangeParams,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def activeContractEventIds(
partyFilter: Party,
templateIdFilter: Option[Identifier],
startExclusive: Long,
endInclusive: Long,
limit: Int,
)(connection: Connection): Vector[Long]

def activeContractEventBatch(
eventSequentialIds: Iterable[Long],
allFilterParties: Set[Party],
Expand All @@ -293,10 +287,6 @@ trait EventStorageBackend {
transactionId: TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def transactionTreeEvents(
rangeParams: RangeParams,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]]
def transactionTree(
transactionId: TransactionId,
filterParams: FilterParams,
Expand Down
Loading

0 comments on commit b9ffc4f

Please sign in to comment.