Skip to content

Commit

Permalink
[Divulgence pruning] Fixes divulgence pruning offset update query (#1…
Browse files Browse the repository at this point in the history
…1046)

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da authored Sep 29, 2021
1 parent f13c6d6 commit ec2d26f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class QueryNonPrunedImpl(storageBackend: ParameterStorageBackend) extends Q
): T = {
val result = query

storageBackend.prunedUptoInclusive(conn) match {
storageBackend.prunedUpToInclusive(conn) match {
case None =>
result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,13 @@ trait ParameterStorageBackend {
/** Part of pruning process, this needs to be in the same transaction as the other pruning related database operations
*/
def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit
def prunedUptoInclusive(connection: Connection): Option[Offset]
def prunedUpToInclusive(connection: Connection): Option[Offset]
def updatePrunedAllDivulgedContractsUpToInclusive(
prunedUpToInclusive: Offset
)(connection: Connection): Unit
def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset]

/** Initializes the parameters table and verifies or updates ledger identity parameters.
* This method is idempotent:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.daml.platform.store.backend.common
import java.io.InputStream
import java.sql.Connection
import java.time.Instant

import anorm.SqlParser.{array, binaryStream, bool, int, long, str}
import anorm.{Row, RowParser, SimpleSql, ~}
import com.daml.ledger.offset.Offset
Expand Down Expand Up @@ -35,6 +34,11 @@ trait EventStorageBackendTemplate extends EventStorageBackend {

def eventStrategy: EventStrategy
def queryStrategy: QueryStrategy
// TODO Refactoring: This method is needed in pruneEvents, but belongs to [[ParameterStorageBackend]].
// Remove with the break-out of pruneEvents.
def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset]

private val selectColumnsForFlatTransactions =
Seq(
Expand Down Expand Up @@ -402,6 +406,8 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
)(connection)
}

// TODO Refactoring: This method is too complex for StorageBackend.
// Break the method into its constituents and trigger them from the caller of this method.
override def pruneEvents(
pruneUpToInclusive: Offset,
pruneAllDivulgedContracts: Boolean,
Expand Down Expand Up @@ -493,14 +499,6 @@ trait EventStorageBackendTemplate extends EventStorageBackend {
}(connection, loggingContext)
}

private def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset] =
SQL"select participant_all_divulged_contracts_pruned_up_to_inclusive from parameters"
.as(offset("participant_all_divulged_contracts_pruned_up_to_inclusive").?.single)(
connection
)

private def pruneWithLogging(queryDescription: String)(query: SimpleSql[Row])(
connection: Connection,
loggingContext: LoggingContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private[backend] trait ParameterStorageBackendTemplate extends ParameterStorageB
private val SQL_UPDATE_MOST_RECENT_PRUNING_INCLUDING_ALL_DIVULGED_CONTRACTS =
SQL("""
|update parameters set participant_all_divulged_contracts_pruned_up_to_inclusive={prune_all_divulged_contracts_up_to_inclusive}
|where participant_pruned_up_to_inclusive < {prune_all_divulged_contracts_up_to_inclusive} or participant_all_divulged_contracts_pruned_up_to_inclusive is null
|where participant_all_divulged_contracts_pruned_up_to_inclusive < {prune_all_divulged_contracts_up_to_inclusive} or participant_all_divulged_contracts_pruned_up_to_inclusive is null
|""".stripMargin)

def updatePrunedUptoInclusive(prunedUpToInclusive: Offset)(connection: Connection): Unit = {
Expand All @@ -171,7 +171,19 @@ private[backend] trait ParameterStorageBackendTemplate extends ParameterStorageB
"select participant_pruned_up_to_inclusive from parameters"
)

def prunedUptoInclusive(connection: Connection): Option[Offset] =
def prunedUpToInclusive(connection: Connection): Option[Offset] =
SQL_SELECT_MOST_RECENT_PRUNING
.as(offset("participant_pruned_up_to_inclusive").?.single)(connection)

private val SQL_SELECT_MOST_RECENT_PRUNING_ALL_DIVULGED_CONTRACTS =
SQL("select participant_all_divulged_contracts_pruned_up_to_inclusive from parameters")

def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset] = {
SQL_SELECT_MOST_RECENT_PRUNING_ALL_DIVULGED_CONTRACTS
.as(offset("participant_all_divulged_contracts_pruned_up_to_inclusive").?.single)(
connection
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,58 @@ private[backend] trait StorageBackendTestsPruning extends Matchers with StorageB
import StorageBackendTestValues._

it should "correctly update the pruning offset" in {
val someOffset = offset(3)
val offset_1 = offset(3)
val offset_2 = offset(2)
val offset_3 = offset(4)
for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
initialPruningOffset <- executeSql(backend.prunedUptoInclusive)
_ <- executeSql(backend.updatePrunedUptoInclusive(someOffset))
updatedPruningOffset <- executeSql(backend.prunedUptoInclusive)
initialPruningOffset <- executeSql(backend.prunedUpToInclusive)

_ <- executeSql(backend.updatePrunedUptoInclusive(offset_1))
updatedPruningOffset_1 <- executeSql(backend.prunedUpToInclusive)

_ <- executeSql(backend.updatePrunedUptoInclusive(offset_2))
updatedPruningOffset_2 <- executeSql(backend.prunedUpToInclusive)

_ <- executeSql(backend.updatePrunedUptoInclusive(offset_3))
updatedPruningOffset_3 <- executeSql(backend.prunedUpToInclusive)
} yield {
initialPruningOffset shouldBe empty
updatedPruningOffset_1 shouldBe Some(offset_1)
// The pruning offset is not updated if lower than the existing offset
updatedPruningOffset_2 shouldBe Some(offset_1)
updatedPruningOffset_3 shouldBe Some(offset_3)
}
}

it should "correctly update the pruning offset of all divulged contracts" in {
val offset_1 = offset(3)
val offset_2 = offset(2)
val offset_3 = offset(4)
for {
_ <- executeSql(backend.initializeParameters(someIdentityParams))
initialPruningOffset <- executeSql(backend.participantAllDivulgedContractsPrunedUpToInclusive)

_ <- executeSql(backend.updatePrunedAllDivulgedContractsUpToInclusive(offset_1))
updatedPruningOffset_1 <- executeSql(
backend.participantAllDivulgedContractsPrunedUpToInclusive
)

_ <- executeSql(backend.updatePrunedAllDivulgedContractsUpToInclusive(offset_2))
updatedPruningOffset_2 <- executeSql(
backend.participantAllDivulgedContractsPrunedUpToInclusive
)

_ <- executeSql(backend.updatePrunedAllDivulgedContractsUpToInclusive(offset_3))
updatedPruningOffset_3 <- executeSql(
backend.participantAllDivulgedContractsPrunedUpToInclusive
)
} yield {
initialPruningOffset shouldBe empty
updatedPruningOffset shouldBe Some(someOffset)
updatedPruningOffset_1 shouldBe Some(offset_1)
// The pruning offset is not updated if lower than the existing offset
updatedPruningOffset_2 shouldBe Some(offset_1)
updatedPruningOffset_3 shouldBe Some(offset_3)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ class SequentialWriteDaoSpec extends AnyFlatSpec with Matchers {
): Unit =
throw new UnsupportedOperationException

override def prunedUptoInclusive(connection: Connection): Option[Offset] =
override def prunedUpToInclusive(connection: Connection): Option[Offset] =
throw new UnsupportedOperationException

override def participantAllDivulgedContractsPrunedUpToInclusive(
connection: Connection
): Option[Offset] =
throw new UnsupportedOperationException
}
}
Expand Down

0 comments on commit ec2d26f

Please sign in to comment.