Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvutils: Add the logging context for ledger state operations. #11030

Merged
1 commit merged into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@ import com.daml.ledger.validator.{
LedgerStateOperations,
TimedLedgerStateOperations,
}
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics

import scala.concurrent.{ExecutionContext, Future}

final class InMemoryLedgerStateAccess(state: InMemoryState, metrics: Metrics)
extends LedgerStateAccess[Index] {
override def inTransaction[T](body: LedgerStateOperations[Index] => Future[T])(implicit
executionContext: ExecutionContext
override def inTransaction[T](
body: LedgerStateOperations[Index] => Future[T]
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[T] =
state.write { (log, state) =>
body(new TimedLedgerStateOperations(new InMemoryLedgerStateOperations(log, state), metrics))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.daml.ledger.on.memory.InMemoryState.MutableLog
import com.daml.ledger.participant.state.kvutils.api.LedgerRecord
import com.daml.ledger.participant.state.kvutils.{OffsetBuilder, Raw}
import com.daml.ledger.validator.BatchingLedgerStateOperations
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -19,20 +20,23 @@ final class InMemoryLedgerStateOperations(

override def readState(
keys: Iterable[Raw.StateKey]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Envelope]]] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Seq[Option[Raw.Envelope]]] =
Future.successful(keys.view.map(state.get).toSeq)

override def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: ExecutionContext): Future[Unit] = {
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] = {
state ++= keyValuePairs
Future.unit
}

override def appendToLog(
key: Raw.LogEntryId,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Index] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Index] =
Future.successful(appendEntry(log, LedgerRecord(_, key, value)))

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,11 @@ object SqlLedgerReaderWriter {

private final class SqlLedgerStateAccess(database: Database, metrics: Metrics)
extends LedgerStateAccess[Index] {
override def inTransaction[T](body: LedgerStateOperations[Index] => sc.Future[T])(implicit
executionContext: sc.ExecutionContext
override def inTransaction[T](
body: LedgerStateOperations[Index] => sc.Future[T]
)(implicit
executionContext: sc.ExecutionContext,
loggingContext: LoggingContext,
): sc.Future[T] =
database
.inWriteTransaction("commit") { queries =>
Expand All @@ -200,18 +203,27 @@ object SqlLedgerReaderWriter {
extends BatchingLedgerStateOperations[Index] {
override def readState(
keys: Iterable[Raw.StateKey]
)(implicit executionContext: sc.ExecutionContext): sc.Future[Seq[Option[Raw.Envelope]]] =
)(implicit
executionContext: sc.ExecutionContext,
loggingContext: LoggingContext,
): sc.Future[Seq[Option[Raw.Envelope]]] =
Future.fromTry(queries.selectStateValuesByKeys(keys)).removeExecutionContext

override def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: sc.ExecutionContext): sc.Future[Unit] =
)(implicit
executionContext: sc.ExecutionContext,
loggingContext: LoggingContext,
): sc.Future[Unit] =
Future.fromTry(queries.updateState(keyValuePairs)).removeExecutionContext

override def appendToLog(
key: Raw.LogEntryId,
value: Raw.Envelope,
)(implicit executionContext: sc.ExecutionContext): sc.Future[Index] =
)(implicit
executionContext: sc.ExecutionContext,
loggingContext: LoggingContext,
): sc.Future[Index] =
Future.fromTry(queries.insertRecordIntoLog(key, value)).removeExecutionContext
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.participant.state.kvutils.export

import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.ledger.validator.LedgerStateWriteOperations
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -14,22 +15,22 @@ final class SubmissionAggregatorWriteOperations(builder: SubmissionAggregator.Wr
override def writeState(
key: Raw.StateKey,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
Future {
builder += key -> value
}

override def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
Future {
builder ++= keyValuePairs
}

override def appendToLog(
key: Raw.LogEntryId,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
Future {
builder += key -> value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.daml.ledger.validator

import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -16,18 +17,21 @@ final class CombinedLedgerStateWriteOperations[ALogResult, BLogResult, LogResult
override def writeState(
key: Raw.StateKey,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
inParallel(a.writeState(key, value), b.writeState(key, value)).map(_ => ())

override def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
inParallel(a.writeState(keyValuePairs), b.writeState(keyValuePairs)).map(_ => ())

override def appendToLog(
key: Raw.LogEntryId,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[LogResult] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[LogResult] =
inParallel(a.appendToLog(key, value), b.appendToLog(key, value)).map {
case (aResult, bResult) => combineLogResults(aResult, bResult)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
}
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext

import scala.concurrent.Future

Expand All @@ -29,5 +30,5 @@ trait CommitStrategy[Result] {
inputState: Map[DamlStateKey, Option[DamlStateValue]],
outputState: Map[DamlStateKey, DamlStateValue],
exporterWriteSet: Option[SubmissionAggregator.WriteSetBuilder] = None,
): Future[Result]
)(implicit loggingContext: LoggingContext): Future[Result]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.validator

import com.daml.dec.DirectExecutionContext
import com.daml.ledger.participant.state.kvutils.Raw
import com.daml.logging.LoggingContext
import com.daml.metrics.{Metrics, Timed}

import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -23,7 +24,7 @@ trait LedgerStateAccess[+LogResult] {
*/
def inTransaction[T](
body: LedgerStateOperations[LogResult] => Future[T]
)(implicit executionContext: ExecutionContext): Future[T]
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[T]
}

/** Defines how the validator/committer can read from the backing store of the ledger.
Expand All @@ -37,7 +38,10 @@ trait LedgerStateReadOperations {
*/
def readState(
key: Raw.StateKey
)(implicit executionContext: ExecutionContext): Future[Option[Raw.Envelope]]
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Option[Raw.Envelope]]

/** Reads values of a set of keys from the backing store.
*
Expand All @@ -46,7 +50,10 @@ trait LedgerStateReadOperations {
*/
def readState(
keys: Iterable[Raw.StateKey]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Envelope]]]
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Seq[Option[Raw.Envelope]]]
}

/** Defines how the validator/committer can write to the backing store of the ledger.
Expand All @@ -60,13 +67,13 @@ trait LedgerStateWriteOperations[+LogResult] {
def writeState(
key: Raw.StateKey,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Unit]
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit]

/** Writes a list of key-value pairs to the backing store. In case a key already exists its value is overwritten.
*/
def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: ExecutionContext): Future[Unit]
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit]

/** Writes a single log entry to the backing store. The implementation may return Future.failed in case the key
* (i.e., the log entry ID) already exists.
Expand All @@ -76,7 +83,7 @@ trait LedgerStateWriteOperations[+LogResult] {
def appendToLog(
key: Raw.LogEntryId,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[LogResult]
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[LogResult]
}

/** Defines how the validator/committer can access the backing store of the ledger.
Expand All @@ -92,13 +99,16 @@ trait LedgerStateOperations[+LogResult]
abstract class BatchingLedgerStateOperations[LogResult] extends LedgerStateOperations[LogResult] {
override final def readState(
key: Raw.StateKey
)(implicit executionContext: ExecutionContext): Future[Option[Raw.Envelope]] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Option[Raw.Envelope]] =
readState(Seq(key)).map(_.head)(DirectExecutionContext)

override final def writeState(
key: Raw.StateKey,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
writeState(Seq(key -> value))
}

Expand All @@ -109,12 +119,15 @@ abstract class NonBatchingLedgerStateOperations[LogResult]
extends LedgerStateOperations[LogResult] {
override final def readState(
keys: Iterable[Raw.StateKey]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Envelope]]] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Seq[Option[Raw.Envelope]]] =
Future.sequence(keys.map(readState)).map(_.toSeq)

override final def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
Future
.sequence(keyValuePairs.map { case (key, value) =>
writeState(key, value)
Expand All @@ -129,28 +142,37 @@ final class TimedLedgerStateOperations[LogResult](

override def readState(
key: Raw.StateKey
)(implicit executionContext: ExecutionContext): Future[Option[Raw.Envelope]] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Option[Raw.Envelope]] =
Timed.future(metrics.daml.ledger.state.read, delegate.readState(key))

override def readState(
keys: Iterable[Raw.StateKey]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[Raw.Envelope]]] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Seq[Option[Raw.Envelope]]] =
Timed.future(metrics.daml.ledger.state.read, delegate.readState(keys))

override def writeState(
key: Raw.StateKey,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
Timed.future(metrics.daml.ledger.state.write, delegate.writeState(key, value))

override def writeState(
keyValuePairs: Iterable[Raw.StateEntry]
)(implicit executionContext: ExecutionContext): Future[Unit] =
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[Unit] =
Timed.future(metrics.daml.ledger.state.write, delegate.writeState(keyValuePairs))

override def appendToLog(
key: Raw.LogEntryId,
value: Raw.Envelope,
)(implicit executionContext: ExecutionContext): Future[LogResult] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[LogResult] =
Timed.future(metrics.daml.ledger.log.append, delegate.appendToLog(key, value))
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.daml.ledger.participant.state.kvutils.DamlKvutils.{
import com.daml.ledger.participant.state.kvutils.export.SubmissionAggregator
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw}
import com.daml.lf.data.Ref
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -30,7 +31,7 @@ class LogAppendingCommitStrategy[Index](
inputState: Map[DamlStateKey, Option[DamlStateValue]],
outputState: Map[DamlStateKey, DamlStateValue],
writeSetBuilder: Option[SubmissionAggregator.WriteSetBuilder] = None,
): Future[Index] = {
)(implicit loggingContext: LoggingContext): Future[Index] = {
val rawLogEntryId = Raw.LogEntryId(entryId)
for {
(serializedKeyValuePairs, envelopedLogEntry) <- inParallel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package com.daml.ledger.validator
import com.daml.ledger.participant.state.kvutils.DamlKvutils.{DamlStateKey, DamlStateValue}
import com.daml.ledger.participant.state.kvutils.{Envelope, Raw}
import com.daml.ledger.validator.reading.{DamlLedgerStateReader, LedgerStateReader}
import com.daml.logging.LoggingContext

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -18,7 +19,10 @@ final class RawToDamlLedgerStateReaderAdapter(

override def read(
keys: Iterable[DamlStateKey]
)(implicit executionContext: ExecutionContext): Future[Seq[Option[DamlStateValue]]] =
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[Seq[Option[DamlStateValue]]] =
ledgerStateReader
.read(keys.map(keySerializationStrategy.serializeStateKey))
.map(_.map(_.map(deserializeDamlStateValue)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ class SubmissionValidator[LogResult] private[validator] (
ignored: Any,
logEntryAndState: LogEntryAndState,
stateOperations: LedgerStateOperations[LogResult],
)(implicit executionContext: ExecutionContext): Future[LogResult] = {
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): Future[LogResult] = {
val (rawLogEntry, rawStateUpdates) = serializeProcessedSubmission(logEntryAndState)
val eventualLogResult = stateOperations.appendToLog(Raw.LogEntryId(logEntryId), rawLogEntry)
val eventualStateResult =
Expand Down Expand Up @@ -266,7 +269,7 @@ class SubmissionValidator[LogResult] private[validator] (
extends LedgerStateAccess[LogResult] {
override def inTransaction[T](
body: LedgerStateOperations[LogResult] => Future[T]
)(implicit executionContext: ExecutionContext): Future[T] = {
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext): Future[T] = {
// This is necessary to ensure we capture successful and failed acquisitions separately.
// These need to be measured separately as they may have very different characteristics.
val acquisitionWasRecorded = new AtomicBoolean(false)
Expand Down
Loading