Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPP-496 HA indexer integration test #11033

Merged
merged 19 commits into from
Sep 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ledger/participant-integration-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ da_scala_test_suite(
"//ledger/metrics:metrics-test-lib",
"//ledger/participant-state",
"//ledger/participant-state-index",
"//ledger/test-common",
"//libs-scala/contextualized-logging",
"//libs-scala/logging-entries",
"//libs-scala/oracle-testing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ case class HaConfig(
workerLockAcquireRetryMillis: Long = 500,
workerLockAcquireMaxRetry: Long = 1000,
mainLockCheckerPeriodMillis: Long = 1000,
indexerLockId: Int = 0x646d6c00, // note 0x646d6c equals ASCII encoded "dml"
indexerWorkerLockId: Int = 0x646d6c01,
indexerLockId: Int = 0x646d6c0, // note 0x646d6c equals ASCII encoded "dml"
meiersi-da marked this conversation as resolved.
Show resolved Hide resolved
indexerWorkerLockId: Int = 0x646d6c1,
enable: Boolean = false, // TODO ha: remove as stable
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ trait StorageBackend[DB_BATCH]
with ContractStorageBackend
with EventStorageBackend
with DataSourceStorageBackend
with DBLockStorageBackend {
with DBLockStorageBackend
with IntegrityStorageBackend {

/** Truncates all storage backend tables, EXCEPT the packages table.
* Does not touch other tables, like the Flyway history table.
Expand Down Expand Up @@ -340,6 +341,15 @@ object DBLockStorageBackend {
}
}

trait IntegrityStorageBackend {

/** Verifies the integrity of the index database, throwing an exception if any issue is found.
* This operation is allowed to take some time to finish.
* It is not expected that it is used during regular index/indexer operation.
*/
def verifyIntegrity()(connection: Connection): Unit
}

object StorageBackend {
case class RawContractState(
templateId: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import javax.sql.DataSource

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
import scala.util.Using
import scala.util.{Failure, Using}

/** Returns a DataSource that is guaranteed to be connected to a responsive, compatible database. */
object VerifiedDataSource {
Expand Down Expand Up @@ -38,6 +38,8 @@ object VerifiedDataSource {
storageBackend.checkDatabaseAvailable
)
createdDatasource
}.andThen { case Failure(exception) =>
logger.warn(exception.getMessage)
}
}
_ <- Future {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.platform.store.backend.common

import anorm.{RowParser, SQL}

import java.sql.Connection
import anorm.SqlParser.long
import anorm.~
import com.daml.platform.store.backend.IntegrityStorageBackend

private[backend] trait IntegrityStorageBackendTemplate extends IntegrityStorageBackend {

private val allSequentialIds: String =
s"""
|SELECT event_sequential_id FROM participant_events_divulgence
|UNION ALL
|SELECT event_sequential_id FROM participant_events_create
|UNION ALL
|SELECT event_sequential_id FROM participant_events_consuming_exercise
|UNION ALL
|SELECT event_sequential_id FROM participant_events_non_consuming_exercise
|""".stripMargin

private val SQL_EVENT_SEQUENTIAL_IDS_SUMMARY = SQL(s"""
|WITH sequential_ids AS ($allSequentialIds)
|SELECT min(event_sequential_id) as min, max(event_sequential_id) as max, count(event_sequential_id) as count
|FROM sequential_ids, parameters
|WHERE event_sequential_id <= parameters.ledger_end_sequential_id
|""".stripMargin)

// Don't fetch an unbounded number of rows
private val maxReportedDuplicates = 100

private val SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS = SQL(s"""
|WITH sequential_ids AS ($allSequentialIds)
|SELECT event_sequential_id as id, count(*) as count
|FROM sequential_ids, parameters
|WHERE event_sequential_id <= parameters.ledger_end_sequential_id
|GROUP BY event_sequential_id
|HAVING count(*) > 1
|FETCH NEXT $maxReportedDuplicates ROWS ONLY
|""".stripMargin)

case class EventSequentialIdsRow(min: Long, max: Long, count: Long)

private val eventSequantialIdsParser: RowParser[EventSequentialIdsRow] =
long("min") ~
long("max") ~
long("count") map { case min ~ max ~ count =>
EventSequentialIdsRow(min, max, count)
}

override def verifyIntegrity()(connection: Connection): Unit = {
val duplicates = SQL_DUPLICATE_EVENT_SEQUENTIAL_IDS
.as(long("id").*)(connection)
val summary = SQL_EVENT_SEQUENTIAL_IDS_SUMMARY
.as(eventSequantialIdsParser.single)(connection)

// Verify that there are no duplicate ids.
if (duplicates.nonEmpty) {
throw new RuntimeException(
s"Found ${duplicates.length} duplicate event sequential ids. Examples: ${duplicates.mkString(", ")}"
)
}

// Verify that all ids are sequential (i.e., there are no "holes" in the ids).
// Since we already know that there are not duplicates, it is enough to check that the count is consistent with the range.
if (summary.count != summary.max - summary.min + 1) {
throw new RuntimeException(
s"Event sequential ids are not consecutive. Min=${summary.min}, max=${summary.max}, count=${summary.count}."
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.daml.platform.store.backend.h2

import java.sql.Connection
import java.time.Instant

import anorm.{Row, SQL, SimpleSql}
import anorm.SqlParser.get
import com.daml.ledger.offset.Offset
Expand All @@ -20,6 +19,7 @@ import com.daml.platform.store.backend.common.{
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
IntegrityStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
Expand All @@ -38,8 +38,8 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}
import javax.sql.DataSource

import javax.sql.DataSource
import scala.util.control.NonFatal

private[backend] object H2StorageBackend
Expand All @@ -53,7 +53,8 @@ private[backend] object H2StorageBackend
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate
with PartyStorageBackendTemplate {
with PartyStorageBackendTemplate
with IntegrityStorageBackendTemplate {

private val logger = ContextualizedLogger.get(this.getClass)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.platform.store.backend.common.{
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
IntegrityStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
Expand All @@ -30,15 +31,15 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}

import java.sql.Connection
import java.time.Instant

import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import javax.sql.DataSource

import javax.sql.DataSource
import scala.util.control.NonFatal

private[backend] object OracleStorageBackend
Expand All @@ -52,7 +53,8 @@ private[backend] object OracleStorageBackend
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate
with PartyStorageBackendTemplate {
with PartyStorageBackendTemplate
with IntegrityStorageBackendTemplate {

private val logger = ContextualizedLogger.get(this.getClass)

Expand Down Expand Up @@ -269,8 +271,8 @@ private[backend] object OracleStorageBackend

case class OracleLockId(id: Int) extends DBLockStorageBackend.LockId {
// respecting Oracle limitations: https://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_lock.htm#ARPLS021
assert(id >= 0)
assert(id <= 1073741823)
assert(id >= 0, s"Lock id $id is too small for Oracle")
assert(id <= 1073741823, s"Lock id $id is too large for Oracle")
}

private def oracleIntLockId(lockId: DBLockStorageBackend.LockId): Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.daml.platform.store.backend.postgresql

import java.sql.Connection
import java.time.Instant

import anorm.SQL
import anorm.SqlParser.{get, int}
import com.daml.ledger.offset.Offset
Expand All @@ -21,6 +20,7 @@ import com.daml.platform.store.backend.common.{
ConfigurationStorageBackendTemplate,
ContractStorageBackendTemplate,
DataSourceStorageBackendTemplate,
IntegrityStorageBackendTemplate,
DeduplicationStorageBackendTemplate,
EventStorageBackendTemplate,
EventStrategy,
Expand All @@ -39,6 +39,7 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}

import javax.sql.DataSource
import org.postgresql.ds.PGSimpleDataSource

Expand All @@ -53,7 +54,8 @@ private[backend] object PostgresStorageBackend
with EventStorageBackendTemplate
with ContractStorageBackendTemplate
with CompletionStorageBackendTemplate
with PartyStorageBackendTemplate {
with PartyStorageBackendTemplate
with IntegrityStorageBackendTemplate {

private val logger: ContextualizedLogger = ContextualizedLogger.get(this.getClass)

Expand Down
Loading