Skip to content

Commit

Permalink
[Mutable state cache] Fix initialization offset (#11024)
Browse files Browse the repository at this point in the history
* The mutable state cache index is initialized to the current ledger end

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
tudor-da authored Sep 28, 2021
1 parent 5458aa8 commit 3d779cf
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,18 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
): ResourceOwner[ReadOnlySqlLedgerWithMutableCache] =
for {
contractStore <- new MutableCacheBackedContractStore.OwnerWithSubscription(
subscribeToContractStateEvents = maybeOffsetSeqId =>
subscribeToContractStateEvents = cacheIndex =>
cacheUpdatesDispatcher
.startingAt(
maybeOffsetSeqId.getOrElse(startExclusive),
cacheIndex,
RangeSource(
ledgerDao.transactionsReader.getContractStateEvents(_, _)
),
)
.map(_._2),
contractsReader = ledgerDao.contractsReader,
signalNewLedgerHead = dispatcherLagMeter,
startIndexExclusive = startExclusive,
metrics = metrics,
maxContractsCacheSize = maxContractStateCacheSize,
maxKeyCacheSize = maxContractKeyStateCacheSize,
Expand Down Expand Up @@ -162,6 +163,7 @@ private[index] object ReadOnlySqlLedgerWithMutableCache {
val contractStore = MutableCacheBackedContractStore(
ledgerDao.contractsReader,
dispatcherLagMeter,
startExclusive,
metrics,
maxContractStateCacheSize,
maxContractKeyStateCacheSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

package com.daml.platform.store.cache

import java.time.Instant
import java.util.concurrent.atomic.AtomicReference

import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import akka.stream.{KillSwitches, Materializer, RestartSettings, UniqueKillSwitch}
import akka.{Done, NotUsed}
Expand All @@ -15,7 +12,6 @@ import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
import com.daml.lf.transaction.GlobalKey
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.{Metrics, Timed}
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.cache.ContractKeyStateValue._
import com.daml.platform.store.cache.ContractStateValue._
import com.daml.platform.store.cache.MutableCacheBackedContractStore._
Expand All @@ -29,6 +25,8 @@ import com.daml.platform.store.interfaces.LedgerDaoContractsReader.{
KeyState,
}

import java.time.Instant
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace
Expand All @@ -38,14 +36,15 @@ private[platform] class MutableCacheBackedContractStore(
metrics: Metrics,
contractsReader: LedgerDaoContractsReader,
signalNewLedgerHead: SignalNewLedgerHead,
startIndexExclusive: (Offset, Long),
private[cache] val keyCache: StateCache[GlobalKey, ContractKeyStateValue],
private[cache] val contractsCache: StateCache[ContractId, ContractStateValue],
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext)
extends ContractStore {

private val logger = ContextualizedLogger.get(getClass)

private[cache] val cacheIndex = new CacheIndex()
private[cache] val cacheIndex = new CacheIndex(startIndexExclusive)

def push(event: ContractStateEvent): Unit = {
debugEvents(event)
Expand Down Expand Up @@ -114,7 +113,7 @@ private[platform] class MutableCacheBackedContractStore(
private def readThroughContractsCache(contractId: ContractId)(implicit
loggingContext: LoggingContext
) = {
val currentCacheSequentialId = cacheIndex.getSequentialId
val currentCacheSequentialId = cacheIndex.getEventSequentialId
val fetchStateRequest =
Timed.future(
metrics.daml.index.lookupContract,
Expand Down Expand Up @@ -211,7 +210,7 @@ private[platform] class MutableCacheBackedContractStore(
private def readThroughKeyCache(
key: GlobalKey
)(implicit loggingContext: LoggingContext) = {
val currentCacheSequentialId = cacheIndex.getSequentialId
val currentCacheSequentialId = cacheIndex.getEventSequentialId
val eventualResult = contractsReader.lookupKeyState(key, currentCacheSequentialId)
val eventualValue = eventualResult.map(toKeyCacheValue)

Expand Down Expand Up @@ -306,13 +305,13 @@ private[platform] object MutableCacheBackedContractStore {
// Signal externally that the cache has caught up until the provided ledger head offset
type SignalNewLedgerHead = Offset => Unit
// Subscribe to the contract state events stream starting at a specific event_offset and event_sequential_id
// or from the beginning, if not provided
type SubscribeToContractStateEvents =
Option[(Offset, Long)] => Source[ContractStateEvent, NotUsed]
((Offset, EventSequentialId)) => Source[ContractStateEvent, NotUsed]

def apply(
contractsReader: LedgerDaoContractsReader,
signalNewLedgerHead: SignalNewLedgerHead,
startIndexExclusive: (Offset, Long),
metrics: Metrics,
maxContractsCacheSize: Long,
maxKeyCacheSize: Long,
Expand All @@ -324,6 +323,7 @@ private[platform] object MutableCacheBackedContractStore {
metrics,
contractsReader,
signalNewLedgerHead,
startIndexExclusive,
ContractKeyStateCache(maxKeyCacheSize, metrics),
ContractsStateCache(maxContractsCacheSize, metrics),
)
Expand All @@ -332,6 +332,7 @@ private[platform] object MutableCacheBackedContractStore {
subscribeToContractStateEvents: SubscribeToContractStateEvents,
contractsReader: LedgerDaoContractsReader,
signalNewLedgerHead: SignalNewLedgerHead,
startIndexExclusive: (Offset, Long),
metrics: Metrics,
maxContractsCacheSize: Long,
maxKeyCacheSize: Long,
Expand All @@ -345,6 +346,7 @@ private[platform] object MutableCacheBackedContractStore {
private val contractStore = MutableCacheBackedContractStore(
contractsReader,
signalNewLedgerHead,
startIndexExclusive,
metrics,
maxContractsCacheSize,
maxKeyCacheSize,
Expand Down Expand Up @@ -391,15 +393,15 @@ private[platform] object MutableCacheBackedContractStore {
s"Contract not found for contract id $contractId. Hint: this could be due racing with a concurrent archival."
}

private[cache] class CacheIndex {
private val offsetRef: AtomicReference[Option[(Offset, EventSequentialId)]] =
new AtomicReference(Option.empty)
private[cache] class CacheIndex(initValue: (Offset, Long)) {
private val offsetRef: AtomicReference[(Offset, EventSequentialId)] =
new AtomicReference(initValue)

def set(offset: Offset, sequentialId: EventSequentialId): Unit =
offsetRef.set(Some(offset -> sequentialId))
offsetRef.set(offset -> sequentialId)

def get: Option[(Offset, EventSequentialId)] = offsetRef.get()
def get: (Offset, EventSequentialId) = offsetRef.get()

def getSequentialId: EventSequentialId = get.map(_._2).getOrElse(EventSequentialId.beforeBegin)
def getEventSequentialId: EventSequentialId = get._2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.daml.platform.store.cache

import java.time.Instant
import java.util.concurrent.atomic.AtomicReference

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.Enqueued
Expand All @@ -20,12 +19,14 @@ import com.daml.lf.transaction.test.TransactionBuilder
import com.daml.lf.value.Value.{ContractInst, ValueRecord, ValueText}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.store.EventSequentialId
import com.daml.platform.store.cache.ContractKeyStateValue.{Assigned, Unassigned}
import com.daml.platform.store.cache.ContractStateValue.{Active, Archived}
import com.daml.platform.store.cache.MutableCacheBackedContractStore.{
ContractNotFound,
EmptyContractIds,
EventSequentialId,
SubscribeToContractStateEvents,
}
import com.daml.platform.store.cache.MutableCacheBackedContractStoreSpec.{
ContractsReaderFixture,
Expand Down Expand Up @@ -64,6 +65,14 @@ class MutableCacheBackedContractStoreSpec
override implicit val patienceConfig: PatienceConfig =
PatienceConfig(timeout = scaled(Span(2000, Millis)), interval = scaled(Span(50, Millis)))

"cache initialization" should {
"set the cache index to the initialization index" in {
val cacheInitializationIndex = Offset.fromByteArray(1337.toByteArray) -> 1337L
contractStore(cachesSize = 0L, startIndexExclusive = cacheInitializationIndex).asFuture
.map(_.cacheIndex.get shouldBe cacheInitializationIndex)
}
}

"event stream consumption" should {
"populate the caches from the contract state event stream" in {

Expand All @@ -88,20 +97,20 @@ class MutableCacheBackedContractStoreSpec
_ <- eventually {
store.contractsCache.get(cId_1) shouldBe Some(Active(contract1, Set(charlie), t1))
store.keyCache.get(someKey) shouldBe Some(Assigned(cId_1, Set(charlie)))
store.cacheIndex.getSequentialId shouldBe 1L
store.cacheIndex.getEventSequentialId shouldBe 1L
}

_ <- archivedEvent(c1, eventSequentialId = 2L)
_ <- eventually {
store.contractsCache.get(cId_1) shouldBe Some(Archived(Set(charlie)))
store.keyCache.get(someKey) shouldBe Some(Unassigned)
store.cacheIndex.getSequentialId shouldBe 2L
store.cacheIndex.getEventSequentialId shouldBe 2L
}

someOffset = Offset.fromByteArray(1337.toByteArray)
_ <- ledgerEnd(someOffset, 3L)
_ <- eventually {
store.cacheIndex.getSequentialId shouldBe 3L
store.cacheIndex.getEventSequentialId shouldBe 3L
lastLedgerHead.get() shouldBe someOffset
}
} yield succeed
Expand Down Expand Up @@ -144,14 +153,13 @@ class MutableCacheBackedContractStoreSpec
eventSequentialId = 3L,
)

val sourceSubscriptionFixture
: Option[(Offset, EventSequentialId)] => Source[ContractStateEvent, NotUsed] = {
case None =>
val sourceSubscriptionFixture: SubscribeToContractStateEvents = {
case (Offset.beforeBegin, EventSequentialId.beforeBegin) =>
Source
.fromIterator(() => Iterator(created, archived, dummy))
// Simulate the source failure at the last event
.map(x => if (x == dummy) throw new RuntimeException("some transient failure") else x)
case Some((_, 2L)) =>
case (_, 2L) =>
Source.fromIterator(() => Iterator(anotherCreate))
case subscriptionOffset =>
fail(s"Unexpected subscription offsets $subscriptionOffset")
Expand All @@ -168,7 +176,7 @@ class MutableCacheBackedContractStoreSpec
store.contractsCache.get(cId_1) shouldBe Some(Archived(Set(charlie)))
store.contractsCache.get(cId_2) shouldBe Some(Active(contract2, Set(alice), t2))
store.keyCache.get(someKey) shouldBe Some(Assigned(cId_2, Set(alice)))
store.cacheIndex.get shouldBe Some(offset3 -> 3L)
store.cacheIndex.get shouldBe offset3 -> 3L
}
} yield succeed
}
Expand Down Expand Up @@ -432,11 +440,13 @@ object MutableCacheBackedContractStoreSpec {
cachesSize: Long,
readerFixture: LedgerDaoContractsReader = ContractsReaderFixture(),
signalNewLedgerHead: Offset => Unit = _ => (),
sourceSubscriber: Option[(Offset, EventSequentialId)] => Source[ContractStateEvent, NotUsed] =
_ => Source.empty,
sourceSubscriber: SubscribeToContractStateEvents = _ => Source.empty,
startIndexExclusive: (Offset, EventSequentialId) =
Offset.beforeBegin -> EventSequentialId.beforeBegin,
)(implicit loggingContext: LoggingContext, materializer: Materializer) =
new MutableCacheBackedContractStore.OwnerWithSubscription(
subscribeToContractStateEvents = sourceSubscriber,
startIndexExclusive = startIndexExclusive,
minBackoffStreamRestart = 10.millis,
contractsReader = readerFixture,
signalNewLedgerHead = signalNewLedgerHead,
Expand Down

0 comments on commit 3d779cf

Please sign in to comment.