Skip to content

Commit

Permalink
Retry upsert of command deduplication on oracle and h2 [DPP-609] (dig…
Browse files Browse the repository at this point in the history
…ital-asset#10976) (digital-asset#10989)

* Retry upsert of command deduplication on oracle and h2 [DPP-609]

CHANGELOG_BEGIN
CHANGELOG_END

* address review comments

* fix a typo

Co-authored-by: Moritz Kiefer <moritz.kiefer@purelyfunctional.org>
  • Loading branch information
mziolekda and cocreature authored Sep 22, 2021
1 parent 4dbf245 commit 49a7580
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ trait DeduplicationStorageBackend {
key: String,
submittedAt: Instant,
deduplicateUntil: Instant,
)(connection: Connection): Int
)(connection: Connection)(implicit loggingContext: LoggingContext): Int
def removeExpiredDeduplicationData(currentTime: Instant)(connection: Connection): Unit
def stopDeduplicatingCommand(deduplicationKey: String)(connection: Connection): Unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ package com.daml.platform.store.backend.h2

import java.sql.Connection
import java.time.Instant

import anorm.{Row, SQL, SimpleSql}
import anorm.SqlParser.get
import com.daml.ledger.offset.Offset
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.appendonlydao.events.ContractId
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
Expand All @@ -32,9 +33,10 @@ import com.daml.platform.store.backend.{
StorageBackend,
common,
}

import javax.sql.DataSource

import scala.util.control.NonFatal

private[backend] object H2StorageBackend
extends StorageBackend[AppendOnlySchema.Batch]
with CommonStorageBackend[AppendOnlySchema.Batch]
Expand All @@ -43,6 +45,8 @@ private[backend] object H2StorageBackend
with CompletionStorageBackendTemplate
with PartyStorageBackendTemplate {

private val logger = ContextualizedLogger.get(this.getClass)

override def reset(connection: Connection): Unit = {
SQL("""set referential_integrity false;
|truncate table configuration_entries;
Expand Down Expand Up @@ -91,14 +95,30 @@ private[backend] object H2StorageBackend
key: String,
submittedAt: Instant,
deduplicateUntil: Instant,
)(connection: Connection): Int =
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)
)(connection: Connection)(implicit loggingContext: LoggingContext): Int = {

// Under the default READ_COMMITTED isolation level used for the indexdb, when a deduplication
// upsert is performed simultaneously from multiple threads, the query fails with
// JdbcSQLIntegrityConstraintViolationException: Unique index or primary key violation
// Simple retry helps
def retry[T](op: => T): T =
try {
op
} catch {
case NonFatal(e) =>
logger.debug(s"Caught exception while upserting a deduplication entry: $e")
op
}
retry(
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)
)
}

override def batch(dbDtos: Vector[DbDto]): AppendOnlySchema.Batch =
H2Schema.schema.prepareData(dbDtos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import java.time.Instant

import com.daml.ledger.offset.Offset
import com.daml.platform.store.backend.EventStorageBackend.FilterParams
import com.daml.logging.LoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.store.backend.common.ComposableQuery.{CompositeSql, SqlStringInterpolation}
import javax.sql.DataSource

import scala.util.control.NonFatal

private[backend] object OracleStorageBackend
extends StorageBackend[AppendOnlySchema.Batch]
with CommonStorageBackend[AppendOnlySchema.Batch]
Expand All @@ -42,6 +44,8 @@ private[backend] object OracleStorageBackend
with CompletionStorageBackendTemplate
with PartyStorageBackendTemplate {

private val logger = ContextualizedLogger.get(this.getClass)

override def reset(connection: Connection): Unit =
List(
"truncate table configuration_entries cascade",
Expand Down Expand Up @@ -86,14 +90,30 @@ private[backend] object OracleStorageBackend
key: String,
submittedAt: Instant,
deduplicateUntil: Instant,
)(connection: Connection): Int =
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)
)(connection: Connection)(implicit loggingContext: LoggingContext): Int = {

// Under the default READ_COMMITTED isolation level used for the indexdb, when a deduplication
// upsert is performed simultaneously from multiple threads, the query fails with
// SQLIntegrityConstraintViolationException: ORA-00001: unique constraint (INDEXDB.SYS_C007590) violated
// Simple retry helps
def retry[T](op: => T): T =
try {
op
} catch {
case NonFatal(e) =>
logger.debug(s"Caught exception while upserting a deduplication entry: $e")
op
}
retry(
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
"submittedAt" -> Timestamp.instantToMicros(submittedAt),
"deduplicateUntil" -> Timestamp.instantToMicros(deduplicateUntil),
)
.executeUpdate()(connection)
)
}

override def batch(dbDtos: Vector[DbDto]): AppendOnlySchema.Batch =
OracleSchema.schema.prepareData(dbDtos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[backend] object PostgresStorageBackend
key: String,
submittedAt: Instant,
deduplicateUntil: Instant,
)(connection: Connection): Int =
)(connection: Connection)(implicit loggingContext: LoggingContext): Int =
SQL(SQL_INSERT_COMMAND)
.on(
"deduplicationKey" -> key,
Expand Down

0 comments on commit 49a7580

Please sign in to comment.