Skip to content

Commit

Permalink
detect unsynchronized contract table and retry (digital-asset#10617)
Browse files Browse the repository at this point in the history
* enumerating out-of-sync offsets at the DB level

* cleanup in lastOffset

* write the latest-requested-or-read offset when catching up

- Writing only the latest-read, as before, would imply unsynced offsets
  that are actually well-synced.  This puts the DB in a more uniform
  state, i.e. it should actually reflect the single value that the
  fetchAndPersist loop tries to catch everything up to.

* detecting lagging offsets from the unsynced-offsets set

- Treating every unsynced offset as a lag would make us needlessly retry
  perfectly synchronized query results.

* add Foldable1 derived from Foldable for NonEmpty

* nicer version of the unsynced function

* ConnectionIO scalaz monad

* rename Offset.ordering to `Offset ordering` so it can be imported verbatim

* finish aggregating in the lag-detector function, compiles

* port sjd

* XTag, a scalaz 7.3-derived tag to allow stacked tags

* make the complicated aggregation properly testable

* extra semantic corner cases I didn't think of

* tests for laggingOffsets

* a way to rerun queries if the laggingOffsets check reveals inconsistency

* if bookmark is ever different, we always have to rerun anyway

* boolean blindness

* incorporate laggingOffsets into fetchAndPersistBracket

* split fetchAndPersist from getTermination and clean up its arguments

* just compose functors

* add looping to fetchAndPersistBracket

* more mvo tests

* test unsyncedOffsets, too

* Lagginess collector

* supply more likely actual data with mvo tests; don't trust Java equals

* rework minimumViableOffsets to track sync states across template IDs

* extra note

* fix the tests to work against the stricter mvo

* move surrogatesToDomains call

* more tests for lagginess accumulator

* add changelog

CHANGELOG_BEGIN
- [JSON API] Under rare conditions, a multi-template query backed by database
  could have an ACS portion that doesn't match its transaction stream, if
  updated concurrently.  This conditions is now checked and accounted for.
  See `issue digital-asset#10617 <https://github.com/digital-asset/daml/pull/10617>`__.
CHANGELOG_END

* port toSeq to Scala 2.12

* handle a corner case with offsets being too close to expected values

* didn't need XTag
  • Loading branch information
S11001001 authored Sep 28, 2021
1 parent 3d779cf commit b4d0031
Show file tree
Hide file tree
Showing 9 changed files with 476 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,36 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
final def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit
log: LogHandler
): ConnectionIO[Map[String, String]] = {
val partyVector =
cats.data.OneAnd(parties.head, parties.tail.toList)
import Queries.CompatImplicits.catsReducibleFromFoldable1
val q = sql"""
SELECT party, last_offset FROM $ledgerOffsetTableName WHERE tpid = $tpid AND
""" ++ Fragments.in(fr"party", partyVector)
""" ++ Fragments.in(fr"party", parties)
q.query[(String, String)]
.to[Vector]
.map(_.toMap)
}

/** Template IDs, parties, and offsets that don't match expected offset values for
* a particular `tpid`.
*/
private[http] final def unsyncedOffsets(
expectedOffset: String,
tpids: NonEmpty[Set[SurrogateTpId]],
): ConnectionIO[Map[SurrogateTpId, Map[String, String]]] = {
val condition = {
import Queries.CompatImplicits.catsReducibleFromFoldable1, scalaz.std.iterable._
Fragments.in(fr"tpid", tpids.toSeq.toOneAnd)
}
val q = sql"""
SELECT tpid, party, last_offset FROM $ledgerOffsetTableName
WHERE last_offset <> $expectedOffset AND $condition
"""
q.query[(SurrogateTpId, String, String)]
.map { case (tpid, party, offset) => (tpid, (party, offset)) }
.to[Vector]
.map(groupUnsyncedOffsets(tpids, _))
}

/** Consistency of the whole database mostly pivots around the offset update
* check, since an offset read and write bookend the update.
*
Expand Down Expand Up @@ -549,6 +569,16 @@ object Queries {
)
}

private[dbbackend] def groupUnsyncedOffsets[TpId, Party, Off](
allTpids: Set[TpId],
queryResult: Vector[(TpId, (Party, Off))],
): Map[TpId, Map[Party, Off]] = {
val grouped = queryResult.groupBy1(_._1).transform((_, tpos) => tpos.view.map(_._2).toMap)
// lagging offsets still needs to consider the template IDs that weren't
// returned by the offset table query
(allTpids diff grouped.keySet).view.map((_, Map.empty[Party, Off])).toMap ++ grouped
}

import doobie.util.invariant.InvalidValue

@throws[InvalidValue[_, _]]
Expand Down Expand Up @@ -612,6 +642,12 @@ object Queries {
override def reduceRightTo[A, B](fa: F[A])(z: A => B)(f: (A, Eval[B]) => Eval[B]) =
F.foldMapRight1(fa)(a => Eval later z(a))((a, eb) => f(a, Eval defer eb))
}

implicit def monadFromCatsMonad[F[_]](implicit F: cats.Monad[F]): scalaz.Monad[F] =
new scalaz.Monad[F] {
override def bind[A, B](fa: F[A])(f: A => F[B]): F[B] = F.flatMap(fa)(f)
override def point[A](a: => A): F[A] = F.point(a)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ class QueriesSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChec
fragmentElems(frag) should ===(fragmentElems(projection))
}
}

"groupUnsyncedOffsets" should {
import Queries.groupUnsyncedOffsets
"not drop duplicate template IDs" in {
groupUnsyncedOffsets(Set.empty, Vector((0, (1, 2)), (0, (3, 4)))) should ===(
Map(0 -> Map(1 -> 2, 3 -> 4))
)
}

"add empty maps for template IDs missing from the input" in {
groupUnsyncedOffsets(Set(0, 1, 2), Vector((0, (1, 2)), (0, (3, 4)))) should ===(
Map(0 -> Map(1 -> 2, 3 -> 4), 1 -> Map.empty, 2 -> Map.empty)
)
}
}
}

object QueriesSpec {
Expand Down
Loading

0 comments on commit b4d0031

Please sign in to comment.