Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detect unsynchronized contract table and retry #10617

Merged
merged 42 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e5d02fe
enumerating out-of-sync offsets at the DB level
S11001001 Aug 18, 2021
cd5d230
cleanup in lastOffset
S11001001 Aug 18, 2021
3bc02d4
write the latest-requested-or-read offset when catching up
S11001001 Aug 18, 2021
5f89e7b
detecting lagging offsets from the unsynced-offsets set
S11001001 Aug 18, 2021
71d1f38
Merge commit 'f77cd0a81c4c77e082a8de2b6f5fdfd316f4a66f' into 10334-de…
S11001001 Aug 20, 2021
b9725e7
add Foldable1 derived from Foldable for NonEmpty
S11001001 Aug 20, 2021
3e41b15
nicer version of the unsynced function
S11001001 Aug 20, 2021
6696b5d
ConnectionIO scalaz monad
S11001001 Aug 20, 2021
56b18be
rename Offset.ordering to `Offset ordering` so it can be imported ver…
S11001001 Aug 20, 2021
e4cf94b
finish aggregating in the lag-detector function, compiles
S11001001 Aug 20, 2021
fc79ee9
Merge commit '2555dbb30bdaa96d516b2455125d8b4e8c1d6fd6' into 10334-de…
S11001001 Aug 31, 2021
34e2a2b
Merge commit 'f576cdfd06cf33686385ad61df1583a8e0668d44' into 10334-de…
S11001001 Sep 1, 2021
4c94598
port sjd
S11001001 Sep 1, 2021
9ba9ebf
XTag, a scalaz 7.3-derived tag to allow stacked tags
S11001001 Sep 1, 2021
455e536
make the complicated aggregation properly testable
S11001001 Sep 3, 2021
582346e
extra semantic corner cases I didn't think of
S11001001 Sep 8, 2021
a009b9b
Merge commit 'ac02dbdeb925444579b1b8dd8738c3be9502283d' into 10334-de…
S11001001 Sep 20, 2021
ee75cc0
tests for laggingOffsets
S11001001 Sep 22, 2021
fd26984
a way to rerun queries if the laggingOffsets check reveals inconsistency
S11001001 Sep 22, 2021
f957c07
if bookmark is ever different, we always have to rerun anyway
S11001001 Sep 22, 2021
1482e6c
Merge commit 'ced4a272408cfc13b45da20da3607e2dd7e07389' into 10334-de…
S11001001 Sep 22, 2021
a332798
boolean blindness
S11001001 Sep 22, 2021
2839ac9
incorporate laggingOffsets into fetchAndPersistBracket
S11001001 Sep 22, 2021
c00b2ba
split fetchAndPersist from getTermination and clean up its arguments
S11001001 Sep 24, 2021
389a497
just compose functors
S11001001 Sep 24, 2021
a9c6080
add looping to fetchAndPersistBracket
S11001001 Sep 24, 2021
95153be
more mvo tests
S11001001 Sep 24, 2021
f966692
test unsyncedOffsets, too
S11001001 Sep 24, 2021
361802f
Lagginess collector
S11001001 Sep 27, 2021
de2a1d2
supply more likely actual data with mvo tests; don't trust Java equals
S11001001 Sep 27, 2021
5845a35
rework minimumViableOffsets to track sync states across template IDs
S11001001 Sep 27, 2021
4de5cb7
extra note
S11001001 Sep 27, 2021
4859b86
fix the tests to work against the stricter mvo
S11001001 Sep 27, 2021
72abc1b
move surrogatesToDomains call
S11001001 Sep 27, 2021
29f5964
Merge commit '6bf45a344a07568f1c2564ba894860dd9b276129' into 10334-de…
S11001001 Sep 27, 2021
5e4d279
more tests for lagginess accumulator
S11001001 Sep 27, 2021
4020ce1
add changelog
S11001001 Sep 27, 2021
1d5f723
port toSeq to Scala 2.12
S11001001 Sep 27, 2021
6f11c86
handle a corner case with offsets being too close to expected values
S11001001 Sep 28, 2021
5da2571
Merge commit '9641fd5f83f5954add9fc147c9bd43e5a1abd885' into 10334-de…
S11001001 Sep 28, 2021
7930ff2
didn't need XTag
S11001001 Sep 28, 2021
4a02764
Merge commit '5458aa890ccb1da23b5e44f09d512d338074d5a6' into 10334-de…
S11001001 Sep 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,36 @@ sealed abstract class Queries(tablePrefix: String, tpIdCacheMaxEntries: Long)(im
final def lastOffset(parties: OneAnd[Set, String], tpid: SurrogateTpId)(implicit
log: LogHandler
): ConnectionIO[Map[String, String]] = {
val partyVector =
cats.data.OneAnd(parties.head, parties.tail.toList)
import Queries.CompatImplicits.catsReducibleFromFoldable1
val q = sql"""
SELECT party, last_offset FROM $ledgerOffsetTableName WHERE tpid = $tpid AND
""" ++ Fragments.in(fr"party", partyVector)
""" ++ Fragments.in(fr"party", parties)
q.query[(String, String)]
.to[Vector]
.map(_.toMap)
}

/** Template IDs, parties, and offsets that don't match expected offset values for
* a particular `tpid`.
*/
private[http] final def unsyncedOffsets(
expectedOffset: String,
tpids: NonEmpty[Set[SurrogateTpId]],
): ConnectionIO[Map[SurrogateTpId, Map[String, String]]] = {
val condition = {
import Queries.CompatImplicits.catsReducibleFromFoldable1, scalaz.std.iterable._
Fragments.in(fr"tpid", tpids.toSeq.toOneAnd)
}
val q = sql"""
SELECT tpid, party, last_offset FROM $ledgerOffsetTableName
WHERE last_offset <> $expectedOffset AND $condition
"""
q.query[(SurrogateTpId, String, String)]
.map { case (tpid, party, offset) => (tpid, (party, offset)) }
.to[Vector]
.map(groupUnsyncedOffsets(tpids, _))
}

/** Consistency of the whole database mostly pivots around the offset update
* check, since an offset read and write bookend the update.
*
Expand Down Expand Up @@ -549,6 +569,16 @@ object Queries {
)
}

private[dbbackend] def groupUnsyncedOffsets[TpId, Party, Off](
allTpids: Set[TpId],
queryResult: Vector[(TpId, (Party, Off))],
): Map[TpId, Map[Party, Off]] = {
val grouped = queryResult.groupBy1(_._1).transform((_, tpos) => tpos.view.map(_._2).toMap)
// lagging offsets still needs to consider the template IDs that weren't
// returned by the offset table query
(allTpids diff grouped.keySet).view.map((_, Map.empty[Party, Off])).toMap ++ grouped
}

import doobie.util.invariant.InvalidValue

@throws[InvalidValue[_, _]]
Expand Down Expand Up @@ -612,6 +642,12 @@ object Queries {
override def reduceRightTo[A, B](fa: F[A])(z: A => B)(f: (A, Eval[B]) => Eval[B]) =
F.foldMapRight1(fa)(a => Eval later z(a))((a, eb) => f(a, Eval defer eb))
}

implicit def monadFromCatsMonad[F[_]](implicit F: cats.Monad[F]): scalaz.Monad[F] =
new scalaz.Monad[F] {
override def bind[A, B](fa: F[A])(f: A => F[B]): F[B] = F.flatMap(fa)(f)
override def point[A](a: => A): F[A] = F.point(a)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ class QueriesSpec extends AnyWordSpec with Matchers with TableDrivenPropertyChec
fragmentElems(frag) should ===(fragmentElems(projection))
}
}

"groupUnsyncedOffsets" should {
import Queries.groupUnsyncedOffsets
"not drop duplicate template IDs" in {
groupUnsyncedOffsets(Set.empty, Vector((0, (1, 2)), (0, (3, 4)))) should ===(
Map(0 -> Map(1 -> 2, 3 -> 4))
)
}

"add empty maps for template IDs missing from the input" in {
groupUnsyncedOffsets(Set(0, 1, 2), Vector((0, (1, 2)), (0, (3, 4)))) should ===(
Map(0 -> Map(1 -> 2, 3 -> 4), 1 -> Map.empty, 2 -> Map.empty)
)
}
}
}

object QueriesSpec {
Expand Down
Loading