Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pbatko-da committed Dec 2, 2022
1 parent 9decad6 commit 9c2fef5
Show file tree
Hide file tree
Showing 45 changed files with 2,559 additions and 1,155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.daml.platform.configuration.{
CommandConfiguration,
IndexServiceConfig,
InitialLedgerConfiguration,
TransactionsFlatStreamReaderConfig,
TransactionsTreeStreamReaderConfig,
}
import com.daml.platform.indexer.ha.HaConfig
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode, PackageMetadataViewConfig}
Expand Down Expand Up @@ -341,6 +343,14 @@ class PureConfigReaderWriter(secure: Boolean = true) {
implicit val indexServiceConfigHint =
ProductHint[IndexServiceConfig](allowUnknownKeys = false)

implicit val transactionsTreeStreamReaderConfigConfigConvert
: ConfigConvert[TransactionsTreeStreamReaderConfig] =
deriveConvert[TransactionsTreeStreamReaderConfig]

implicit val transactionsFlatStreamReaderConfigConfigConvert
: ConfigConvert[TransactionsFlatStreamReaderConfig] =
deriveConvert[TransactionsFlatStreamReaderConfig]

implicit val indexServiceConfigConvert: ConfigConvert[IndexServiceConfig] =
deriveConvert[IndexServiceConfig]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,45 @@ class IndexDBMetrics(override val prefix: MetricName, override val registry: Met
val lookupContractByKeyDbMetrics: DatabaseMetrics = createDbMetrics(
"lookup_contract_by_key"
)

// TODO etq: Tune metric names
val flatTxIdsCreate: DatabaseMetrics = createDbMetrics(
"get_create_event_ids_for_flat_transactions"
)
val flatTxIdsConsuming: DatabaseMetrics = createDbMetrics(
"get_consuming_event_ids_for_flat_transactions"
)
val flatTxPayloadCreate: DatabaseMetrics = createDbMetrics(
"get_create_event_payloads_for_flat_transactions"
)
val flatTxPayloadConsuming: DatabaseMetrics = createDbMetrics(
"get_consuming_event_payloads_for_flat_transactions"
)
val treeTxIdsCreateStakeholder: DatabaseMetrics = createDbMetrics(
"get_create_event_ids_for_stakeholder_for_tree_transactions"
)
val treeTxIdsCreateNonStakeholderInformee: DatabaseMetrics = createDbMetrics(
"get_create_event_ids_for_nonstakeholderinformee_for_tree_transactions"
)
val treeTxIdsConsumingStakeholder: DatabaseMetrics = createDbMetrics(
"get_consuming_event_ids_for_stakeholder_for_tree_transactions"
)
val treeTxIdsConsumingNonStakeholderInformee: DatabaseMetrics = createDbMetrics(
"get_consuming_event_ids_for_nonstakeholderinformee_for_tree_transactions"
)
val treeTxIdsNonConsumingInformee: DatabaseMetrics = createDbMetrics(
"get_nonconsuming_event_ids_for_informee_for_tree_transactions"
)
val treeTxPayloadCreate: DatabaseMetrics = createDbMetrics(
"get_create_event_payloads_for_tree_transactions"
)
val treeTxPayloadConsuming: DatabaseMetrics = createDbMetrics(
"get_consuming_event_payloads_for_tree_transactions"
)
val treeTxPayloadNonConsuming: DatabaseMetrics = createDbMetrics(
"get_nonconsuming_event_payloads_for_tree_transactions"
)

val getFlatTransactions: DatabaseMetrics = createDbMetrics("get_flat_transactions")
val lookupFlatTransactionById: DatabaseMetrics = createDbMetrics(
"lookup_flat_transaction_by_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,6 @@ CREATE INDEX participant_events_create_event_offset ON participant_events_create
-- sequential_id index for paging
CREATE INDEX participant_events_create_event_sequential_id ON participant_events_create (event_sequential_id);

-- lookup by transaction id
CREATE INDEX participant_events_create_transaction_id_idx ON participant_events_create (transaction_id);

-- lookup by contract id
CREATE INDEX participant_events_create_contract_id_idx ON participant_events_create (contract_id);

Expand Down Expand Up @@ -281,9 +278,6 @@ CREATE INDEX participant_events_consuming_exercise_event_offset ON participant_e
-- sequential_id index for paging
CREATE INDEX participant_events_consuming_exercise_event_sequential_id ON participant_events_consuming_exercise (event_sequential_id);

-- lookup by transaction id
CREATE INDEX participant_events_consuming_exercise_transaction_id_idx ON participant_events_consuming_exercise (transaction_id);

-- lookup by contract id
CREATE INDEX participant_events_consuming_exercise_contract_id_idx ON participant_events_consuming_exercise (contract_id);

Expand Down Expand Up @@ -339,9 +333,6 @@ CREATE INDEX participant_events_non_consuming_exercise_event_offset ON participa
-- sequential_id index for paging
CREATE INDEX participant_events_non_consuming_exercise_event_sequential_id ON participant_events_non_consuming_exercise (event_sequential_id);

-- lookup by transaction id
CREATE INDEX participant_events_non_consuming_exercise_transaction_id_idx ON participant_events_non_consuming_exercise (transaction_id);

CREATE TABLE string_interning (
internal_id integer PRIMARY KEY NOT NULL,
external_string text
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
68900c9d2b68e7b27c7717fdf4c23d314d358fce7f4b7d8906489f1533a6a5ed
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX participant_events_create_transaction_id_idx;
DROP INDEX participant_events_consuming_exercise_transaction_id_idx;
DROP INDEX participant_events_non_consuming_exercise_transaction_id_idx;
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1be130f7bb6da74f3d2180e3314a96961d8ba781a25a37f436f03eac5cf0763c
722aea175c2b4ec1e8471e88eb277cfa5cb5e13f1d62b4d6dc0c89507e5c62e5
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@ CREATE TABLE participant_transaction_meta(
CREATE INDEX participant_transaction_meta_tid_idx ON participant_transaction_meta(transaction_id);
CREATE INDEX participant_transaction_meta_event_offset_idx ON participant_transaction_meta(event_offset);

DROP INDEX participant_events_create_transaction_id_idx;
DROP INDEX participant_events_consuming_exercise_transaction_id_idx;
DROP INDEX participant_events_non_consuming_exercise_transaction_id_idx;
--DROP INDEX participant_events_create_transaction_id_idx;
--DROP INDEX participant_events_consuming_exercise_transaction_id_idx;
--DROP INDEX participant_events_non_consuming_exercise_transaction_id_idx;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
68900c9d2b68e7b27c7717fdf4c23d314d358fce7f4b7d8906489f1533a6a5ed
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DROP INDEX participant_events_create_transaction_id_idx;
DROP INDEX participant_events_consuming_exercise_transaction_id_idx;
DROP INDEX participant_events_non_consuming_exercise_transaction_id_idx;
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final case class IndexServiceConfig(
acsIdPageBufferSize: Int = IndexServiceConfig.DefaultAcsIdPageBufferSize,
acsIdPageWorkingMemoryBytes: Int = IndexServiceConfig.DefaultAcsIdPageWorkingMemoryBytes,
acsIdFetchingParallelism: Int = IndexServiceConfig.DefaultAcsIdFetchingParallelism,
// TODO etq: Document that it must be a power of two because Akka buffer sizing requires it
// java.lang.IllegalArgumentException: buffer size must be a power of two
// at akka.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.<init>(ActorGraphInterpreter.scala:128)
// TODO etq: Same for other config values used to size an akka streaming buffer
acsContractFetchingParallelism: Int = IndexServiceConfig.DefaultAcsContractFetchingParallelism,
acsGlobalParallelism: Int = IndexServiceConfig.DefaultAcsGlobalParallelism,
maxContractStateCacheSize: Long = IndexServiceConfig.DefaultMaxContractStateCacheSize,
Expand All @@ -26,6 +30,13 @@ final case class IndexServiceConfig(
preparePackageMetadataTimeOutWarning: FiniteDuration =
IndexServiceConfig.PreparePackageMetadataTimeOutWarning,
completionsMaxPayloadsPerPayloadsPage: Int = 1000,
transactionsFlatStreamReaderConfig: TransactionsFlatStreamReaderConfig =
TransactionsFlatStreamReaderConfig.default,
transactionsTreeStreamReaderConfig: TransactionsTreeStreamReaderConfig =
TransactionsTreeStreamReaderConfig.default,
// TODO etq: Take care what config key names will get exposed in the user-facing participant config
globalMaxIdQueries: Int = 20,
globalMaxPayloadQueries: Int = 10,
)

object IndexServiceConfig {
Expand All @@ -36,6 +47,7 @@ object IndexServiceConfig {
val DefaultAcsIdPageBufferSize: Int = 1
val DefaultAcsIdPageWorkingMemoryBytes: Int = 100 * 1024 * 1024
val DefaultAcsIdFetchingParallelism: Int = 2
// TODO etq: This must be a power of 2
val DefaultAcsContractFetchingParallelism: Int = 2
val DefaultAcsGlobalParallelism: Int = 10
val DefaultMaxContractStateCacheSize: Long = 100000L
Expand All @@ -46,3 +58,41 @@ object IndexServiceConfig {
val DefaultInMemoryFanOutThreadPoolSize: Int = 16
val PreparePackageMetadataTimeOutWarning: FiniteDuration = FiniteDuration(1, "second")
}

// TODO etq: Take care what config key names will get exposed in the user-facing participant config
case class TransactionsFlatStreamReaderConfig(
maxIdsPerIdPage: Int = 20000,
maxPagesPerIdPagesBuffer: Int = 1,
maxWorkingMemoryInBytesForIdPages: Int = 100 * 1024 * 1024,
maxPayloadsPerPayloadsPage: Int = 1000, // eventsPageSize
maxParallelIdCreateQueries: Int = 4,
maxParallelIdConsumingQueries: Int = 4,
// TODO etq: This must be a power of 2
maxParallelPayloadCreateQueries: Int = 2,
// TODO etq: This must be a power of 2
maxParallelPayloadConsumingQueries: Int = 2,
maxParallelPayloadQueries: Int = 2,
payloadProcessingParallelism: Int = 8,
)
object TransactionsFlatStreamReaderConfig {
def default: TransactionsFlatStreamReaderConfig = TransactionsFlatStreamReaderConfig()
}

// TODO etq: Take care what config key names will get exposed in the user-facing participant config
case class TransactionsTreeStreamReaderConfig(
maxIdsPerIdPage: Int = 20000,
maxPagesPerIdPagesBuffer: Int = 1,
maxWorkingMemoryInBytesForIdPages: Int = 100 * 1024 * 1024,
maxPayloadsPerPayloadsPage: Int = 1000,
maxParallelIdCreateQueries: Int = 8,
maxParallelIdConsumingQueries: Int = 8,
maxParallelIdNonConsumingQueries: Int = 4,
maxParallelPayloadCreateQueries: Int = 2,
maxParallelPayloadConsumingQueries: Int = 2,
maxParallelPayloadNonConsumingQueries: Int = 2,
maxParallelPayloadQueries: Int = 2,
payloadProcessingParallelism: Int = 8,
)
object TransactionsTreeStreamReaderConfig {
def default: TransactionsTreeStreamReaderConfig = TransactionsTreeStreamReaderConfig()
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ final class IndexServiceOwner(
ledgerEndCache = ledgerEndCache,
stringInterning = stringInterning,
completionsMaxPayloadsPerPayloadsPage = config.completionsMaxPayloadsPerPayloadsPage,
transactionsFlatStreamReaderConfig = config.transactionsFlatStreamReaderConfig,
transactionsTreeStreamReaderConfig = config.transactionsTreeStreamReaderConfig,
globalMaxIdQueries = config.globalMaxIdQueries,
globalMaxPayloadQueries = config.globalMaxPayloadQueries,
)

private def buildInMemoryFanOutExecutionContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ object ParallelIndexerSubscription {
private val logger = ContextualizedLogger.get(this.getClass)

/** Batch wraps around a T-typed batch, enriching it with processing relevant information.
* Contains events from one or more transactions.
* If it contains an event from a transaction then it contains all the events from that transaction.
*
* @param lastOffset The latest offset available in the batch. Needed for tail ingestion.
* @param lastSeqEventId The latest sequential-event-id in the batch, or if none present there, then the latest from before. Needed for tail ingestion.
Expand Down Expand Up @@ -177,6 +179,8 @@ object ParallelIndexerSubscription {
offsetsUpdates = Vector.empty,
)

/** Assigns sequential ids to events.
*/
def seqMapper(
internize: Iterable[DbDto] => Iterable[(Int, String)],
metrics: Metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.ApiOffset
import com.daml.platform.configuration.ServerRole
import com.daml.platform.configuration.{
ServerRole,
TransactionsFlatStreamReaderConfig,
TransactionsTreeStreamReaderConfig,
}
import com.daml.platform.store.DbSupport.{ConnectionPoolConfig, DbConfig}
import com.daml.platform.store.cache.MutableLedgerEndCache
import com.daml.platform.store.dao.{JdbcLedgerDao, LedgerReadDao}
Expand Down Expand Up @@ -88,6 +92,10 @@ object IndexMetadata {
participantId = Ref.ParticipantId.assertFromString("1"),
ledgerEndCache = MutableLedgerEndCache(), // not used
stringInterning = new StringInterningView(), // not used
transactionsFlatStreamReaderConfig = TransactionsFlatStreamReaderConfig.default,
transactionsTreeStreamReaderConfig = TransactionsTreeStreamReaderConfig.default,
globalMaxIdQueries = 20,
globalMaxPayloadQueries = 10,
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,9 @@ import com.daml.lf.crypto.Hash
import com.daml.lf.data.Time.Timestamp
import com.daml.lf.ledger.EventId
import com.daml.logging.LoggingContext
import com.daml.platform.{
ApplicationId,
ContractId,
Identifier,
Key,
PackageId,
Party,
TransactionId,
}
import com.daml.platform.{ApplicationId, ContractId, Identifier, Key, PackageId, Party}
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.dao.events.Raw
import com.daml.platform.store.backend.EventStorageBackend.{FilterParams, RangeParams}
import com.daml.platform.store.backend.MeteringParameterStorageBackend.LedgerMeteringEnd
import com.daml.platform.store.backend.postgresql.PostgresDataSourceConfig
import com.daml.platform.store.entries.{ConfigurationEntry, PackageLedgerEntry, PartyLedgerEntry}
Expand All @@ -34,6 +25,11 @@ import com.daml.scalautil.NeverEqualsOverride
import java.sql.Connection
import javax.sql.DataSource

import com.daml.platform.store.backend.common.{
TransactionPointwiseQueries,
TransactionStreamingQueries,
}

import scala.annotation.unused

/** Encapsulates the interface which hides database technology specific implementations.
Expand Down Expand Up @@ -262,6 +258,9 @@ object ContractStorageBackend {

trait EventStorageBackend {

def transactionPointwiseQueries: TransactionPointwiseQueries
def transactionStreamingQueries: TransactionStreamingQueries

/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
*/
def pruneEvents(pruneUpToInclusive: Offset, pruneAllDivulgedContracts: Boolean)(
Expand All @@ -273,37 +272,17 @@ trait EventStorageBackend {
pruneAllDivulgedContracts: Boolean,
connection: Connection,
): Boolean
def transactionEvents(
rangeParams: RangeParams,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def activeContractEventIds(
partyFilter: Party,
templateIdFilter: Option[Identifier],
startExclusive: Long,
endInclusive: Long,
limit: Int,
)(connection: Connection): Vector[Long]

def activeContractEventBatch(
eventSequentialIds: Iterable[Long],
allFilterParties: Set[Party],
endInclusive: Long,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def flatTransaction(
transactionId: TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.FlatEvent]]
def transactionTreeEvents(
rangeParams: RangeParams,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]]
def transactionTree(
transactionId: TransactionId,
filterParams: FilterParams,
)(connection: Connection): Vector[EventStorageBackend.Entry[Raw.TreeEvent]]

/** Max event sequential id of observable (create, consuming and nonconsuming exercise) events. */
def maxEventSequentialIdOfAnObservableEvent(offset: Offset)(connection: Connection): Option[Long]

// Used only in testing
def rawEvents(startExclusive: Long, endInclusive: Long)(
connection: Connection
): Vector[EventStorageBackend.RawTransactionEvent]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import com.daml.platform.index.index.StatusDetails
import com.daml.platform.store.dao.JdbcLedgerDao
import com.daml.platform.store.dao.events._
import com.daml.platform._

import java.util.UUID

object UpdateToDbDto {
Expand Down
Loading

0 comments on commit 9c2fef5

Please sign in to comment.