From b4d00317b402bee59a06f77a41ad8fad8a173e66 Mon Sep 17 00:00:00 2001 From: Stephen Compall Date: Tue, 28 Sep 2021 16:47:42 -0400 Subject: [PATCH] detect unsynchronized contract table and retry (#10617) * enumerating out-of-sync offsets at the DB level * cleanup in lastOffset * write the latest-requested-or-read offset when catching up - Writing only the latest-read, as before, would imply unsynced offsets that are actually well-synced. This puts the DB in a more uniform state, i.e. it should actually reflect the single value that the fetchAndPersist loop tries to catch everything up to. * detecting lagging offsets from the unsynced-offsets set - Treating every unsynced offset as a lag would make us needlessly retry perfectly synchronized query results. * add Foldable1 derived from Foldable for NonEmpty * nicer version of the unsynced function * ConnectionIO scalaz monad * rename Offset.ordering to `Offset ordering` so it can be imported verbatim * finish aggregating in the lag-detector function, compiles * port sjd * XTag, a scalaz 7.3-derived tag to allow stacked tags * make the complicated aggregation properly testable * extra semantic corner cases I didn't think of * tests for laggingOffsets * a way to rerun queries if the laggingOffsets check reveals inconsistency * if bookmark is ever different, we always have to rerun anyway * boolean blindness * incorporate laggingOffsets into fetchAndPersistBracket * split fetchAndPersist from getTermination and clean up its arguments * just compose functors * add looping to fetchAndPersistBracket * more mvo tests * test unsyncedOffsets, too * Lagginess collector * supply more likely actual data with mvo tests; don't trust Java equals * rework minimumViableOffsets to track sync states across template IDs * extra note * fix the tests to work against the stricter mvo * move surrogatesToDomains call * more tests for lagginess accumulator * add changelog CHANGELOG_BEGIN - [JSON API] Under rare conditions, a multi-template query backed by database could have an ACS portion that doesn't match its transaction stream, if updated concurrently. This conditions is now checked and accounted for. See `issue #10617 `__. CHANGELOG_END * port toSeq to Scala 2.12 * handle a corner case with offsets being too close to expected values * didn't need XTag --- .../digitalasset/http/dbbackend/Queries.scala | 42 +++- .../http/dbbackend/QueriesSpec.scala | 15 ++ .../digitalasset/http/ContractsFetch.scala | 212 ++++++++++++------ .../digitalasset/http/WebSocketService.scala | 27 +-- .../http/dbbackend/ContractDao.scala | 129 ++++++++++- .../scala/com/digitalasset/http/domain.scala | 7 +- .../http/dbbackend/ContractDaoTest.scala | 90 ++++++++ libs-scala/scala-utils/BUILD.bazel | 3 + .../daml/scalautil/nonempty/NonEmpty.scala | 45 +++- 9 files changed, 476 insertions(+), 94 deletions(-) create mode 100644 ledger-service/http-json/src/test/scala/com/digitalasset/http/dbbackend/ContractDaoTest.scala diff --git a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala index 5ce65702dc59..61b99128cf72 100644 --- a/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala +++ b/ledger-service/db-backend/src/main/scala/com/digitalasset/http/dbbackend/Queries.scala @@ -203,16 +203,36 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im final def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit log: LogHandler ): ConnectionIO[Map[String, String]] = { - val partyVector = - cats.data.OneAnd(parties.head, parties.tail.toList) + import Queries.CompatImplicits.catsReducibleFromFoldable1 val q = sql""" SELECT party, last_offset FROM $ledgerOffsetTableName WHERE tpid = $tpid AND - """ ++ Fragments.in(fr"party", partyVector) + """ ++ Fragments.in(fr"party", parties) q.query[(String, String)] .to[Vector] .map(_.toMap) } + /** Template IDs, parties, and offsets that don't match expected offset values for + * a particular `tpid`. + */ + private[http] final def unsyncedOffsets( + expectedOffset: String, + tpids: NonEmpty[Set[SurrogateTpId]], + ): ConnectionIO[Map[SurrogateTpId, Map[String, String]]] = { + val condition = { + import Queries.CompatImplicits.catsReducibleFromFoldable1, scalaz.std.iterable._ + Fragments.in(fr"tpid", tpids.toSeq.toOneAnd) + } + val q = sql""" + SELECT tpid, party, last_offset FROM $ledgerOffsetTableName + WHERE last_offset <> $expectedOffset AND $condition + """ + q.query[(SurrogateTpId, String, String)] + .map { case (tpid, party, offset) => (tpid, (party, offset)) } + .to[Vector] + .map(groupUnsyncedOffsets(tpids, _)) + } + /** Consistency of the whole database mostly pivots around the offset update * check, since an offset read and write bookend the update. * @@ -549,6 +569,16 @@ object Queries { ) } + private[dbbackend] def groupUnsyncedOffsets[TpId, Party, Off]( + allTpids: Set[TpId], + queryResult: Vector[(TpId, (Party, Off))], + ): Map[TpId, Map[Party, Off]] = { + val grouped = queryResult.groupBy1(_._1).transform((_, tpos) => tpos.view.map(_._2).toMap) + // lagging offsets still needs to consider the template IDs that weren't + // returned by the offset table query + (allTpids diff grouped.keySet).view.map((_, Map.empty[Party, Off])).toMap ++ grouped + } + import doobie.util.invariant.InvalidValue @throws[InvalidValue[_, _]] @@ -612,6 +642,12 @@ object Queries { override def reduceRightTo[A, B](fa: F[A])(z: A => B)(f: (A, Eval[B]) => Eval[B]) = F.foldMapRight1(fa)(a => Eval later z(a))((a, eb) => f(a, Eval defer eb)) } + + implicit def monadFromCatsMonad[F[_]](implicit F: cats.Monad[F]): scalaz.Monad[F] = + new scalaz.Monad[F] { + override def bind[A, B](fa: F[A])(f: A => F[B]): F[B] = F.flatMap(fa)(f) + override def point[A](a: => A): F[A] = F.point(a) + } } } diff --git a/ledger-service/db-backend/src/test/scala/com/digitalasset/http/dbbackend/QueriesSpec.scala b/ledger-service/db-backend/src/test/scala/com/digitalasset/http/dbbackend/QueriesSpec.scala index b21387612bb1..c14cadec014b 100644 --- a/ledger-service/db-backend/src/test/scala/com/digitalasset/http/dbbackend/QueriesSpec.scala +++ b/ledger-service/db-backend/src/test/scala/com/digitalasset/http/dbbackend/QueriesSpec.scala @@ -44,6 +44,21 @@ class QueriesSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChec fragmentElems(frag) should ===(fragmentElems(projection)) } } + + "groupUnsyncedOffsets" should { + import Queries.groupUnsyncedOffsets + "not drop duplicate template IDs" in { + groupUnsyncedOffsets(Set.empty, Vector((0, (1, 2)), (0, (3, 4)))) should ===( + Map(0 -> Map(1 -> 2, 3 -> 4)) + ) + } + + "add empty maps for template IDs missing from the input" in { + groupUnsyncedOffsets(Set(0, 1, 2), Vector((0, (1, 2)), (0, (3, 4)))) should ===( + Map(0 -> Map(1 -> 2, 3 -> 4), 1 -> Map.empty, 2 -> Map.empty) + ) + } + } } object QueriesSpec { diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala index d498b238faff..843238d90df8 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/ContractsFetch.scala @@ -30,6 +30,7 @@ import com.daml.http.util.IdentifierConverters.apiIdentifier import com.daml.http.util.Logging.{InstanceUUID} import util.{AbsoluteBookmark, BeginBookmark, ContractStreamStep, InsertDeleteStep, LedgerBegin} import com.daml.scalautil.ExceptionOps._ +import com.daml.scalautil.nonempty.NonEmpty import com.daml.jwt.domain.Jwt import com.daml.ledger.api.v1.transaction.Transaction import com.daml.ledger.api.{v1 => lav1} @@ -41,6 +42,7 @@ import scalaz.OneAnd._ import scalaz.std.set._ import scalaz.std.vector._ import scalaz.std.list._ +import scalaz.std.option.none import scalaz.syntax.show._ import scalaz.syntax.tag._ import scalaz.syntax.functor._ @@ -65,6 +67,64 @@ private class ContractsFetch( private[this] val logger = ContextualizedLogger.get(getClass) + /** run `within` repeatedly after fetchAndPersist until the latter is + * consistent before and after `within` + */ + def fetchAndPersistBracket[A]( + jwt: Jwt, + ledgerId: LedgerApiDomain.LedgerId, + parties: OneAnd[Set, domain.Party], + templateIds: List[domain.TemplateId.RequiredPkg], + )(within: BeginBookmark[Terminates.AtAbsolute] => ConnectionIO[A])(implicit + ec: ExecutionContext, + mat: Materializer, + lc: LoggingContextOf[InstanceUUID], + ): ConnectionIO[A] = { + import ContractDao.laggingOffsets + val initTries = 10 + val fetchContext = FetchContext(jwt, ledgerId, parties) + def go( + maxAttempts: Int, + fetchTemplateIds: List[domain.TemplateId.RequiredPkg], + absEnd: Terminates.AtAbsolute, + ): ConnectionIO[A] = for { + bb <- fetchToAbsEnd(fetchContext, fetchTemplateIds, absEnd) + a <- within(bb) + // fetchTemplateIds can be a subset of templateIds (or even empty), + // but we only get away with that by checking _all_ of templateIds, + // which can then indicate that a larger set than fetchTemplateIds + // has desynchronized + lagging <- (templateIds.toSet, bb.map(_.toDomain)) match { + case (NonEmpty(tids), AbsoluteBookmark(expectedOff)) => + laggingOffsets(parties.toSet, expectedOff, tids) + case _ => fconn.pure(none[(domain.Offset, Set[domain.TemplateId.RequiredPkg])]) + } + retriedA <- lagging.cata( + { case (newOff, laggingTids) => + if (maxAttempts > 1) + go( + maxAttempts - 1, + laggingTids.toList, + domain.Offset.toTerminates(newOff), + ) + else + fconn.raiseError( + new IllegalStateException( + s"failed after $initTries attempts to synchronize database for $fetchContext, $templateIds" + ) + ) + }, + fconn.pure(a), + ) + } yield retriedA + + // we assume that if the ledger termination is LedgerBegin, then + // `within` will not yield concurrency-relevant results + connectionIOFuture(getTermination(jwt, ledgerId)) flatMap { + _.cata(go(initTries, templateIds, _), within(LedgerBegin)) + } + } + def fetchAndPersist( jwt: Jwt, ledgerId: LedgerApiDomain.LedgerId, @@ -74,66 +134,71 @@ private class ContractsFetch( ec: ExecutionContext, mat: Materializer, lc: LoggingContextOf[InstanceUUID], + ): ConnectionIO[BeginBookmark[Terminates.AtAbsolute]] = + connectionIOFuture(getTermination(jwt, ledgerId)) flatMap { + _.cata( + fetchToAbsEnd(FetchContext(jwt, ledgerId, parties), templateIds, _), + fconn.pure(LedgerBegin), + ) + } + + private[this] def fetchToAbsEnd( + fetchContext: FetchContext, + templateIds: List[domain.TemplateId.RequiredPkg], + absEnd: Terminates.AtAbsolute, + )(implicit + ec: ExecutionContext, + mat: Materializer, + lc: LoggingContextOf[InstanceUUID], ): ConnectionIO[BeginBookmark[Terminates.AtAbsolute]] = { import cats.instances.list._, cats.syntax.foldable.{toFoldableOps => ToFoldableOps}, cats.syntax.traverse.{toTraverseOps => ToTraverseOps}, cats.syntax.functor._, doobie.implicits._ // we can fetch for all templateIds on a single acsFollowingAndBoundary // by comparing begin offsets; however this is trickier so we don't do it // right now -- Stephen / Leo - connectionIOFuture(getTermination(jwt, ledgerId)) flatMap { - _.cata( - absEnd => - // traverse once, use the max _returned_ bookmark, - // re-traverse any that != the max returned bookmark (overriding lastOffset) - // fetch cannot go "too far" the second time - templateIds.traverse(fetchAndPersist(jwt, ledgerId, parties, false, absEnd, _)).flatMap { - actualAbsEnds => - val newAbsEndTarget = { - import scalaz.std.list._, scalaz.syntax.foldable._, - domain.Offset.{ordering => `Offset ordering`} - // it's fine if all yielded LedgerBegin, so we don't want to conflate the "fallback" - // with genuine results - actualAbsEnds.maximum getOrElse AbsoluteBookmark(absEnd.toDomain) - } - newAbsEndTarget match { - case LedgerBegin => - fconn.pure(AbsoluteBookmark(absEnd)) - case AbsoluteBookmark(feedback) => - val feedbackTerminator = - Terminates - .AtAbsolute(lav1.ledger_offset.LedgerOffset.Value.Absolute(feedback.unwrap)) - // contractsFromOffsetIo can go _past_ absEnd, because the ACS ignores - // this argument; see https://github.com/digital-asset/daml/pull/8226#issuecomment-756446537 - // for an example of this happening. We deal with this race condition - // by detecting that it has happened and rerunning any other template IDs - // to "catch them up" to the one that "raced" ahead - (actualAbsEnds zip templateIds) - .collect { case (`newAbsEndTarget`, templateId) => templateId } - .traverse_ { - // passing a priorBookmark prevents contractsIo_ from using the ACS, - // and it cannot go "too far" reading only the tx stream - fetchAndPersist( - jwt, - ledgerId, - parties, - true, - feedbackTerminator, - _, - ) - } - .as(AbsoluteBookmark(feedbackTerminator)) + // + // traverse once, use the max _returned_ bookmark, + // re-traverse any that != the max returned bookmark (overriding lastOffset) + // fetch cannot go "too far" the second time + templateIds + .traverse(fetchAndPersist(fetchContext, false, absEnd, _)) + .flatMap { actualAbsEnds => + val newAbsEndTarget = { + import scalaz.std.list._, scalaz.syntax.foldable._, domain.Offset.`Offset ordering` + // it's fine if all yielded LedgerBegin, so we don't want to conflate the "fallback" + // with genuine results + actualAbsEnds.maximum getOrElse AbsoluteBookmark(absEnd.toDomain) + } + newAbsEndTarget match { + case LedgerBegin => + fconn.pure(AbsoluteBookmark(absEnd)) + case AbsoluteBookmark(feedback) => + val feedbackTerminator = + domain.Offset.toTerminates(feedback) + // contractsFromOffsetIo can go _past_ absEnd, because the ACS ignores + // this argument; see https://github.com/digital-asset/daml/pull/8226#issuecomment-756446537 + // for an example of this happening. We deal with this race condition + // by detecting that it has happened and rerunning any other template IDs + // to "catch them up" to the one that "raced" ahead + (actualAbsEnds zip templateIds) + .collect { case (`newAbsEndTarget`, templateId) => templateId } + .traverse_ { + // passing a priorBookmark prevents contractsIo_ from using the ACS, + // and it cannot go "too far" reading only the tx stream + fetchAndPersist( + fetchContext, + true, + feedbackTerminator, + _, + ) } - }, - fconn.pure(LedgerBegin), - ) - } - + .as(AbsoluteBookmark(feedbackTerminator)) + } + } } private[this] def fetchAndPersist( - jwt: Jwt, - ledgerId: LedgerApiDomain.LedgerId, - parties: OneAnd[Set, domain.Party], + fetchContext: FetchContext, disableAcs: Boolean, absEnd: Terminates.AtAbsolute, templateId: domain.TemplateId.RequiredPkg, @@ -147,7 +212,7 @@ private class ContractsFetch( def loop(maxAttempts: Int): ConnectionIO[BeginBookmark[domain.Offset]] = { logger.debug(s"contractsIo, maxAttempts: $maxAttempts") - (contractsIo_(jwt, ledgerId, parties, disableAcs, absEnd, templateId) <* fconn.commit) + (contractsIo_(fetchContext, disableAcs, absEnd, templateId) <* fconn.commit) .exceptSql { case e if maxAttempts > 0 && retrySqlStates(e.getSQLState) => logger.debug(s"contractsIo, exception: ${e.description}, state: ${e.getSQLState}") @@ -162,9 +227,7 @@ private class ContractsFetch( } private def contractsIo_( - jwt: Jwt, - ledgerId: LedgerApiDomain.LedgerId, - parties: OneAnd[Set, domain.Party], + fetchContext: FetchContext, disableAcs: Boolean, absEnd: Terminates.AtAbsolute, templateId: domain.TemplateId.RequiredPkg, @@ -172,20 +235,22 @@ private class ContractsFetch( ec: ExecutionContext, mat: Materializer, lc: LoggingContextOf[InstanceUUID], - ): ConnectionIO[BeginBookmark[domain.Offset]] = + ): ConnectionIO[BeginBookmark[domain.Offset]] = { + import fetchContext.parties for { offsets <- ContractDao.lastOffset(parties, templateId) offset1 <- contractsFromOffsetIo( - jwt, - ledgerId, - parties, + fetchContext, templateId, offsets, disableAcs, absEnd, ) - _ = logger.debug(s"contractsFromOffsetIo($jwt, $parties, $templateId, $offsets): $offset1") + _ = logger.debug( + s"contractsFromOffsetIo($fetchContext, $templateId, $offsets, $disableAcs, $absEnd): $offset1" + ) } yield offset1 + } private def prepareCreatedEventStorage( ce: lav1.event.CreatedEvent @@ -222,9 +287,7 @@ private class ContractsFetch( .mapPreservingIds(prepareCreatedEventStorage(_) valueOr (e => throw e)) private def contractsFromOffsetIo( - jwt: Jwt, - ledgerId: LedgerApiDomain.LedgerId, - parties: OneAnd[Set, domain.Party], + fetchContext: FetchContext, templateId: domain.TemplateId.RequiredPkg, offsets: Map[domain.Party, domain.Offset], disableAcs: Boolean, @@ -235,8 +298,8 @@ private class ContractsFetch( lc: LoggingContextOf[InstanceUUID], ): ConnectionIO[BeginBookmark[domain.Offset]] = { - import domain.Offset._ - val offset = offsets.values.toList.minimum.cata(AbsoluteBookmark(_), LedgerBegin) + import domain.Offset._, fetchContext.{jwt, ledgerId, parties} + val startOffset = offsets.values.toList.minimum.cata(AbsoluteBookmark(_), LedgerBegin) val graph = RunnableGraph.fromGraph( GraphDSL.create( @@ -254,7 +317,7 @@ private class ContractsFetch( ) // include ACS iff starting at LedgerBegin - val (idses, lastOff) = (offset, disableAcs) match { + val (idses, lastOff) = (startOffset, disableAcs) match { case (LedgerBegin, false) => val stepsAndOffset = builder add acsFollowingAndBoundary(txnK) stepsAndOffset.in <~ getActiveContracts( @@ -267,7 +330,7 @@ private class ContractsFetch( case (AbsoluteBookmark(_), _) | (LedgerBegin, true) => val stepsAndOffset = builder add transactionsFollowingBoundary(txnK) - stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(offset)) + stepsAndOffset.in <~ Source.single(domain.Offset.tag.unsubst(startOffset)) ( (stepsAndOffset: FanOutShape2[_, ContractStreamStep.LAV1, _]).out0, stepsAndOffset.out1, @@ -291,12 +354,13 @@ private class ContractsFetch( for { _ <- sinkCioSequence_(acsQueue) offset0 <- connectionIOFuture(lastOffsetFuture) - offsetOrError <- offset0 match { - case AbsoluteBookmark(str) => - val newOffset = domain.Offset(str) + offsetOrError <- (domain.Offset.tag.subst(offset0) max AbsoluteBookmark( + absEnd.toDomain + )) match { + case ab @ AbsoluteBookmark(newOffset) => ContractDao .updateOffset(parties, templateId, newOffset, offsets) - .map(_ => AbsoluteBookmark(newOffset)) + .map(_ => ab) case LedgerBegin => fconn.pure(LedgerBegin) } @@ -434,7 +498,7 @@ private[http] object ContractsFetch { .flatMapConcat(off => transactionsSince(domain.Offset.tag.subst(off).toLedgerApi)) .map(transactionToInsertsAndDeletes) val txnSplit = b add project2[ContractStreamStep.Txn.LAV1, domain.Offset] - import domain.Offset.{ordering => `Offset ordering`} + import domain.Offset.`Offset ordering` val lastTxOff = b add last(LedgerBegin: Off) type EndoBookmarkFlow[A] = Flow[BeginBookmark[A], BeginBookmark[A], NotUsed] val maxOff = b add domain.Offset.tag.unsubst[EndoBookmarkFlow, String]( @@ -557,4 +621,10 @@ private[http] object ContractsFetch { TransactionFilter(domain.Party.unsubst(parties.toVector).map(_ -> filters).toMap) } + + private final case class FetchContext( + jwt: Jwt, + ledgerId: LedgerApiDomain.LedgerId, + parties: OneAnd[Set, domain.Party], + ) } diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala index 8e88d8afa522..366d4c33fca4 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/WebSocketService.scala @@ -257,7 +257,7 @@ object WebSocketService { request.queries.map(_.offset).toVector def matchesOffset(queryIndex: Int, maybeEventOffset: Option[domain.Offset]): Boolean = { - import domain.Offset.ordering + import domain.Offset.`Offset ordering` import scalaz.syntax.order._ val matches = for { @@ -365,7 +365,7 @@ object WebSocketService { ) import scalaz.syntax.foldable1._ - import domain.Offset.ordering + import domain.Offset.`Offset ordering` import scalaz.std.option.optionOrder // This is called after `adjustRequest` already filled in the blank offsets @@ -713,18 +713,19 @@ class WebSocketService( ): Future[Source[StepAndErrors[Positive, JsValue], NotUsed]] = contractsService.daoAndFetch.cata( { case (dao, fetch) => - val tx = for { - bookmark <- fetch.fetchAndPersist(jwt, ledgerId, parties, predicate.resolved.toList) - mdContracts <- predicate.dbQuery(parties, dao) - } yield { - val acs = - if (mdContracts.nonEmpty) { - Source.single(StepAndErrors(Seq.empty, ContractStreamStep.Acs(mdContracts))) - } else { - Source.empty + val tx = fetch.fetchAndPersistBracket(jwt, ledgerId, parties, predicate.resolved.toList) { + bookmark => + for { + mdContracts <- predicate.dbQuery(parties, dao) + } yield { + val acs = + if (mdContracts.nonEmpty) + Source.single(StepAndErrors(Seq.empty, ContractStreamStep.Acs(mdContracts))) + else + Source.empty + val liveMarker = liveBegin(bookmark.map(_.toDomain)) + acs ++ liveMarker } - val liveMarker = liveBegin(bookmark.map(_.toDomain)) - acs ++ liveMarker } dao.transact(tx).unsafeToFuture() }, diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala index 8068deaf6f47..3925809dcc47 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/dbbackend/ContractDao.scala @@ -7,21 +7,26 @@ import cats.effect._ import cats.syntax.apply._ import com.daml.dbutils.ConnectionPool import com.daml.doobie.logging.Slf4jLogHandler +import com.daml.http.dbbackend.Queries.SurrogateTpId import com.daml.http.domain import com.daml.http.json.JsonProtocol.LfValueDatabaseCodec import com.daml.http.util.Logging.InstanceUUID import com.daml.lf.crypto.Hash import com.daml.logging.LoggingContextOf import com.daml.metrics.Metrics -import com.daml.scalautil.nonempty.+-: +import com.daml.scalautil.nonempty.{+-:, NonEmpty, NonEmptyF} +import domain.Offset.`Offset ordering` import doobie.LogHandler import doobie.free.connection.ConnectionIO import doobie.free.{connection => fconn} import doobie.implicits._ import doobie.util.log import org.slf4j.LoggerFactory -import scalaz.{NonEmptyList, OneAnd} +import scalaz.{Equal, NonEmptyList, OneAnd, Order, Semigroup} +import scalaz.std.set._ +import scalaz.std.vector._ import scalaz.syntax.tag._ +import scalaz.syntax.order._ import spray.json.{JsNull, JsValue} import java.io.{Closeable, IOException} @@ -143,6 +148,126 @@ object ContractDao { } } + /** A "lagging offset" is a template-ID/party pair whose stored offset may not reflect + * the actual contract table state at query time. Examples of this are described in + * . + * + * It is perfectly fine for an offset to be returned but the set of lagging template IDs + * be empty; this means they appear to be consistent but not at `expectedOffset`; since + * that means the state at query time is indeterminate, the query must be rerun anyway, + * but no further DB updates are needed. + * + * @param parties The party-set that must not be lagging. We aren't concerned with + * whether ''other'' parties are lagging, in fact we can't correct if they are, + * but if another party got ahead, that means one of our parties lags. + * @param expectedOffset `fetchAndPersist` should have synchronized every template-ID/party + * pair to the same offset, this one. + * @param templateIds The template IDs we're checking. + * @return Any template IDs that are lagging, and the offset to catch them up to, if defined; + * otherwise everything is fine. + */ + def laggingOffsets( + parties: Set[domain.Party], + expectedOffset: domain.Offset, + templateIds: NonEmpty[Set[domain.TemplateId.RequiredPkg]], + )(implicit + log: LogHandler, + sjd: SupportedJdbcDriver.TC, + lc: LoggingContextOf[InstanceUUID], + ): ConnectionIO[Option[(domain.Offset, Set[domain.TemplateId.RequiredPkg])]] = { + type Unsynced[Party, Off] = Map[Queries.SurrogateTpId, Map[Party, Off]] + import scalaz.syntax.traverse._ + import sjd.q.queries.unsyncedOffsets + for { + tpids <- { + import Queries.CompatImplicits.monadFromCatsMonad + templateIds.toVector.toF.traverse { trp => surrogateTemplateId(trp) map ((_, trp)) } + }: ConnectionIO[NonEmptyF[Vector, (SurrogateTpId, domain.TemplateId.RequiredPkg)]] + surrogatesToDomains = tpids.toMap + unsyncedRaw <- unsyncedOffsets( + domain.Offset unwrap expectedOffset, + surrogatesToDomains.keySet, + ) + unsynced = domain.Party.subst[Unsynced[*, domain.Offset], String]( + domain.Offset.tag.subst[Unsynced[String, *], String](unsyncedRaw) + ): Unsynced[domain.Party, domain.Offset] + } yield minimumViableOffsets(parties, surrogatesToDomains, expectedOffset, unsynced) + } + + // postprocess the output of unsyncedOffsets + private[dbbackend] def minimumViableOffsets[ITpId, OTpId, Party, Off: Order]( + queriedParty: Set[Party], + surrogatesToDomains: ITpId => OTpId, + expectedOffset: Off, + unsynced: Map[ITpId, Map[Party, Off]], + ): Option[(Off, Set[OTpId])] = { + import scalaz.syntax.foldable1.{ToFoldableOps => _, _} + import scalaz.std.iterable._ + val lagging: Option[Lagginess[ITpId, Off]] = + (unsynced: Iterable[(ITpId, Map[Party, Off])]) foldMap1Opt { + case (surrogateTpId, partyOffs) => + val maxLocalOff = partyOffs + .collect { + case (unsyncedParty, unsyncedOff) + if queriedParty(unsyncedParty) || unsyncedOff > expectedOffset => + unsyncedOff + } + .maximum + .getOrElse(expectedOffset) + val singleton = Set(surrogateTpId) + // if a queried party is not in the map, we can safely assume that + // it is exactly at expectedOffset + val caughtUp = maxLocalOff <= expectedOffset || + queriedParty.forall { qp => partyOffs.get(qp).exists(_ === maxLocalOff) } + Lagginess( + if (caughtUp) singleton else Set.empty, + if (caughtUp) Set.empty else singleton, + // also an artifact of assuming that you actually ran update + // to expectedOffset before using this function + maxLocalOff max expectedOffset, + ) + } + lagging match { + case Some(lagging) if lagging.maxOff > expectedOffset => + Some((lagging.maxOff, lagging.notCaughtUp map surrogatesToDomains)) + case _ => None + } + } + + private[dbbackend] final case class Lagginess[TpId, +Off]( + caughtUp: Set[TpId], + notCaughtUp: Set[TpId], + maxOff: Off, + ) + private[dbbackend] object Lagginess { + implicit def semigroup[TpId, Off: Order]: Semigroup[Lagginess[TpId, Off]] = + Semigroup instance { case (Lagginess(cA, ncA, offA), Lagginess(cB, ncB, offB)) => + import scalaz.Ordering.{LT, GT, EQ} + val (c, nc) = offA cmp offB match { + case EQ => (cA union cB, Set.empty[TpId]) + case LT => (cB, cA) + case GT => (cA, cB) + } + Lagginess( + c, + nc union ncA union ncB, + offA max offB, + ) + } + + implicit def equal[TpId: Order, Off: Equal]: Equal[Lagginess[TpId, Off]] = + new Equal[Lagginess[TpId, Off]] { + override def equalIsNatural = Equal[TpId].equalIsNatural && Equal[Off].equalIsNatural + override def equal(a: Lagginess[TpId, Off], b: Lagginess[TpId, Off]) = + if (equalIsNatural) a == b + else + a match { + case Lagginess(caughtUp, notCaughtUp, maxOff) => + caughtUp === b.caughtUp && notCaughtUp === b.notCaughtUp && maxOff === b.maxOff + } + } + } + def updateOffset( parties: OneAnd[Set, domain.Party], templateId: domain.TemplateId.RequiredPkg, diff --git a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala index 8b9d8a67e920..ed433d145121 100644 --- a/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala +++ b/ledger-service/http-json/src/main/scala/com/digitalasset/http/domain.scala @@ -229,8 +229,13 @@ object domain { def toLedgerApi(o: Offset): lav1.ledger_offset.LedgerOffset = lav1.ledger_offset.LedgerOffset(lav1.ledger_offset.LedgerOffset.Value.Absolute(unwrap(o))) + def toTerminates(o: Offset): LedgerClientJwt.Terminates.AtAbsolute = + LedgerClientJwt.Terminates.AtAbsolute( + lav1.ledger_offset.LedgerOffset.Value.Absolute(unwrap(o)) + ) + implicit val semigroup: Semigroup[Offset] = Tag.unsubst(Semigroup[Offset @@ Tags.LastVal]) - implicit val ordering: Order[Offset] = Order.orderBy[Offset, String](Offset.unwrap(_)) + implicit val `Offset ordering`: Order[Offset] = Order.orderBy[Offset, String](Offset.unwrap(_)) } diff --git a/ledger-service/http-json/src/test/scala/com/digitalasset/http/dbbackend/ContractDaoTest.scala b/ledger-service/http-json/src/test/scala/com/digitalasset/http/dbbackend/ContractDaoTest.scala new file mode 100644 index 000000000000..af606116553a --- /dev/null +++ b/ledger-service/http-json/src/test/scala/com/digitalasset/http/dbbackend/ContractDaoTest.scala @@ -0,0 +1,90 @@ +// Copyright (c) 2021 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.http +package dbbackend + +import com.daml.scalatest.WordSpecCheckLaws +import scalaz.{Order, Traverse} +import scalaz.scalacheck.{ScalazProperties => ZP} +import scalaz.std.anyVal._ +import scalaz.std.map._ +import scalaz.std.set._ +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks +import org.scalatest.OptionValues +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class ContractDaoTest + extends AnyWordSpec + with Matchers + with OptionValues + with ScalaCheckDrivenPropertyChecks + with WordSpecCheckLaws { + import ContractDaoTest._ + + "minimumViableOffsets" should { + import ContractDao.minimumViableOffsets + + type Unsynced[Off] = Map[Byte, Map[Byte, Off]] + val UF: Traverse[Unsynced] = { + val one = Traverse[Map[Byte, *]] + one compose one + } + val queriedParties = (0 to 5).view.map(_.toByte).toSet + def everyParty[Off](off: Off) = queriedParties.view.map((_, off)).toMap + def mvo[TpId, Off: Order](expectedOffset: Off, unsynced: Map[TpId, Map[Byte, Off]]) = + minimumViableOffsets(queriedParties, identity[TpId], expectedOffset, unsynced) + + "return all OK if all offsets match" in forAll { unsynced: Unsynced[Unit] => + val allSame = UF.map(unsynced)(_ => 0) + mvo(0, allSame) shouldBe None + } + + "use the larger offset" in forAll { (a: Byte, b: Byte, unsynced: Unsynced[Boolean]) => + whenever(UF.foldMap(unsynced)(Set(_)) == Set(true, false)) { + val (min, max) = if (a < b) (a, b) else if (a == b) (0: Byte, 1: Byte) else (b, a) + val eitherOr = UF.map(unsynced)(b => if (b) min else max) + mvo(min, eitherOr).value._1 should ===(max) + } + } + + "require update for a template ID that is caught up in a different party" in { + mvo(1, Map(0 -> Map((2: Byte) -> 1, (3: Byte) -> 2))) should ===(Some((2, Set(0)))) + } + + "not require update for lagging, but unqueried, party" in { + mvo(3, Map(0 -> Map((-4: Byte) -> 1))) should ===(None) + } + + "require update for ahead, albeit unqueried, party" in { + mvo(3, Map(0 -> Map((-4: Byte) -> 4))) should ===(Some((4, Set(0)))) + } + + "report desync, but no updates, if consistent" in { + mvo(3, Map(0 -> everyParty(5), 1 -> everyParty(5))) should ===( + Some((5, Set.empty)) + ) + } + + "check lag across template IDs" in { + mvo(3, Map(0 -> Map((2: Byte) -> 3), 1 -> everyParty(5))) should ===(Some((5, Set(0)))) + } + + "update an expected-offset template ID when another template ID is ahead" in { + mvo(3, Map(0 -> Map.empty[Byte, Int], 1 -> everyParty(5))) should ===(Some((5, Set(0)))) + } + } + + "Lagginess" should { + import ContractDao.Lagginess + checkLaws(ZP.semigroup.laws[Lagginess[Byte, Byte]]) + } +} + +object ContractDaoTest { + import org.scalacheck.Arbitrary, Arbitrary.arbitrary + import ContractDao.Lagginess + implicit def `arb Lagginess`[TpId: Arbitrary, Off: Arbitrary]: Arbitrary[Lagginess[TpId, Off]] = + Arbitrary(arbitrary[(Set[TpId], Set[TpId], Off)].map((Lagginess[TpId, Off] _).tupled)) +} diff --git a/libs-scala/scala-utils/BUILD.bazel b/libs-scala/scala-utils/BUILD.bazel index 16ad0321b716..07e4bc368d5e 100644 --- a/libs-scala/scala-utils/BUILD.bazel +++ b/libs-scala/scala-utils/BUILD.bazel @@ -6,6 +6,7 @@ load( "da_scala_library", "da_scala_test", "lf_scalacopts", + "silencer_plugin", ) load("@scala_version//:index.bzl", "scala_major_version", "scala_version_suffix") @@ -20,8 +21,10 @@ da_scala_library( ]), plugins = [ "@maven//:org_typelevel_kind_projector_{}".format(scala_version_suffix), + silencer_plugin, ], scala_deps = [ + "@maven//:org_scala_lang_modules_scala_collection_compat", "@maven//:org_scalaz_scalaz_core", ], scalacopts = scalacopts, diff --git a/libs-scala/scala-utils/src/main/scala/com/daml/scalautil/nonempty/NonEmpty.scala b/libs-scala/scala-utils/src/main/scala/com/daml/scalautil/nonempty/NonEmpty.scala index 567cef4a8a1c..47787c29202f 100644 --- a/libs-scala/scala-utils/src/main/scala/com/daml/scalautil/nonempty/NonEmpty.scala +++ b/libs-scala/scala-utils/src/main/scala/com/daml/scalautil/nonempty/NonEmpty.scala @@ -3,11 +3,13 @@ package com.daml.scalautil.nonempty +import scala.collection.compat._ import scala.collection.{immutable => imm}, imm.Map, imm.Set import scalaz.Id.Id -import scalaz.{Foldable, OneAnd, Traverse} +import scalaz.{Foldable, Foldable1, Monoid, OneAnd, Semigroup, Traverse} import scalaz.Leibniz, Leibniz.=== import scalaz.Liskov, Liskov.<~< +import scalaz.syntax.std.option._ import NonEmptyCollCompat._ /** The visible interface of [[NonEmpty]]; use that value to access @@ -23,7 +25,7 @@ sealed abstract class NonEmptyColl { private[nonempty] def substF[T[_[_]], F[_]](tf: T[F]): T[NonEmptyF[F, *]] private[nonempty] def subst[F[_[_]]](tf: F[Id]): F[NonEmpty] - private[nonempty] def unsafeNarrow[Self](self: Self with imm.Iterable[_]): NonEmpty[Self] + private[nonempty] def unsafeNarrow[Self <: imm.Iterable[Any]](self: Self): NonEmpty[Self] /** Usable proof that [[NonEmpty]] is a subtype of its argument. (We cannot put * this in an upper-bound, because that would prevent us from adding implicit @@ -68,7 +70,7 @@ object NonEmptyColl extends NonEmptyCollInstances { override def apply[Self](self: Self with imm.Iterable[_]) = if (self.nonEmpty) Some(self) else None - private[nonempty] override def unsafeNarrow[Self](self: Self with imm.Iterable[_]) = self + private[nonempty] override def unsafeNarrow[Self <: imm.Iterable[Any]](self: Self) = self } implicit final class ReshapeOps[F[_], A](private val nfa: NonEmpty[F[A]]) extends AnyVal { @@ -110,6 +112,12 @@ object NonEmptyColl extends NonEmptyCollInstances { private type ESelf = IterableOps[A, imm.Iterable, C with imm.Iterable[A]] def toList: NonEmpty[List[A]] = un((self: ESelf).toList) def toVector: NonEmpty[Vector[A]] = un((self: ESelf).toVector) + def toSeq: NonEmpty[imm.Seq[A]] = un((self: ESelf) match { + case is: imm.Seq[A] => is + case other => other.to(imm.Seq) + }) // can just use .toSeq in scala 2.13 + def toSet: NonEmpty[Set[A]] = un((self: ESelf).toSet) + def toMap[K, V](implicit isPair: A <:< (K, V)): NonEmpty[Map[K, V]] = un((self: ESelf).toMap) // ideas for extension: safe head/tail (not valuable unless also using // wartremover to disable partial Seq ops) } @@ -132,9 +140,38 @@ object NonEmptyColl extends NonEmptyCollInstances { NonEmpty.substF(F) } -sealed abstract class NonEmptyCollInstances { +sealed abstract class NonEmptyCollInstances extends NonEmptyCollInstances0 { implicit def foldable[F[_]](implicit F: Foldable[F]): Foldable[NonEmptyF[F, *]] = NonEmpty.substF(F) +} + +sealed abstract class NonEmptyCollInstances0 { + implicit def foldable1[F[_]](implicit F: Foldable[F]): Foldable1[NonEmptyF[F, *]] = + NonEmpty.substF(new Foldable1[F] { + private[this] def errEmpty(fa: F[_]) = + throw new IllegalArgumentException( + s"empty structure coerced to non-empty: $fa: ${fa.getClass.getSimpleName}" + ) + + private[this] def assertNE[Z](original: F[_], fa: Option[Z]) = + fa.cata(identity, errEmpty(original)) + + override def foldMap1[A, B: Semigroup](fa: F[A])(f: A => B) = + assertNE(fa, F.foldMap1Opt(fa)(f)) + + override def foldMapRight1[A, B](fa: F[A])(z: A => B)(f: (A, => B) => B) = + assertNE(fa, F.foldMapRight1Opt(fa)(z)(f)) + + override def foldMapLeft1[A, B](fa: F[A])(z: A => B)(f: (B, A) => B) = + assertNE(fa, F.foldMapLeft1Opt(fa)(z)(f)) + + override def foldMap[A, B: Monoid](fa: F[A])(f: A => B) = F.foldMap(fa)(f) + + override def foldRight[A, B](fa: F[A], z: => B)(f: (A, => B) => B) = F.foldRight(fa, z)(f) + + override def foldLeft[A, B](fa: F[A], z: B)(f: (B, A) => B) = + F.foldLeft(fa, z)(f) + }) import scala.language.implicitConversions