diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 9d2684b7..506fb926 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -3,3 +3,6 @@ # Scala Steward: Reformat with scalafmt 3.8.2 1bd268889144380ec82c2ddf74b1243d3a05bb42 + +# Scala Steward: Reformat with scalafmt 3.8.5 +237bd566ca80d0cbbaa18e12ba890abac57ab09e diff --git a/.scalafmt.conf b/.scalafmt.conf index 83f26fd2..3bf70b2b 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,4 +1,4 @@ -version=3.8.3 +version=3.8.5 runner.dialect = scala213 maxColumn = 120 align.preset = most diff --git a/core/src/main/scala/nl/vroste/zio/kinesis/client/producer/ProducerLive.scala b/core/src/main/scala/nl/vroste/zio/kinesis/client/producer/ProducerLive.scala index 8f8a6007..a3615c59 100644 --- a/core/src/main/scala/nl/vroste/zio/kinesis/client/producer/ProducerLive.scala +++ b/core/src/main/scala/nl/vroste/zio/kinesis/client/producer/ProducerLive.scala @@ -123,11 +123,11 @@ private[client] final class ProducerLive[R, R1, T]( ): ZIO[Any, Nothing, (Option[PutRecordsResponse.ReadOnly], Chunk[ProduceRequest])] = { val totalPayload = batch.map(_.data.length).sum (for { - _ <- ZIO.logInfo( - s"PutRecords for batch of size ${batch.map(_.aggregateCount).sum} (${batch.size} aggregated). " + + _ <- ZIO.logInfo( + s"PutRecords for batch of size ${batch.map(_.aggregateCount).sum} (${batch.size} aggregated). " + // s"Payload sizes: ${batch.map(_.data.asByteArrayUnsafe().length).mkString(",")} " + - s"(total = ${totalPayload} = ${totalPayload * 100.0 / maxPayloadSizePerRequest}%)." - ) + s"(total = ${totalPayload} = ${totalPayload * 100.0 / maxPayloadSizePerRequest}%)." + ) // Avoid an allocation response <- @@ -228,9 +228,9 @@ private[client] final class ProducerLive[R, R1, T]( ZIO.succeed(requests.map(_.newAttempt)) // TODO backoff for shard limit stuff - _ <- failedQueue - .offerAll(updatedFailed) - .when(newFailed.nonEmpty) + _ <- failedQueue + .offerAll(updatedFailed) + .when(newFailed.nonEmpty) } yield metrics.addFailures(failedCount) } diff --git a/core/src/main/scala/nl/vroste/zio/kinesis/client/zionative/leasecoordinator/DefaultLeaseCoordinator.scala b/core/src/main/scala/nl/vroste/zio/kinesis/client/zionative/leasecoordinator/DefaultLeaseCoordinator.scala index 037094b0..96bb177f 100644 --- a/core/src/main/scala/nl/vroste/zio/kinesis/client/zionative/leasecoordinator/DefaultLeaseCoordinator.scala +++ b/core/src/main/scala/nl/vroste/zio/kinesis/client/zionative/leasecoordinator/DefaultLeaseCoordinator.scala @@ -75,12 +75,12 @@ private class DefaultLeaseCoordinator( // Initialization. If it fails, we will try in the loop // initialShards will have been executed in the background so for efficiency we use it here - _ <- (takeLeases(initialShards).retryN(1).ignore *> periodicRefreshAndTakeLeases) - .ensuring(ZIO.logDebug("Shutting down refresh & take lease loop")) - .forkScoped - _ <- repeatAndRetry(settings.renewInterval)(renewLeases) - .ensuring(ZIO.logDebug("Shutting down renew lease loop")) - .forkScoped + _ <- (takeLeases(initialShards).retryN(1).ignore *> periodicRefreshAndTakeLeases) + .ensuring(ZIO.logDebug("Shutting down refresh & take lease loop")) + .forkScoped + _ <- repeatAndRetry(settings.renewInterval)(renewLeases) + .ensuring(ZIO.logDebug("Shutting down renew lease loop")) + .forkScoped } yield ()) .tapErrorCause(c => ZIO.logErrorCause("Error in DefaultLeaseCoordinator initialize", c)) @@ -289,7 +289,7 @@ private class DefaultLeaseCoordinator( ) .when(shardsWithoutLease.nonEmpty) _ <- ZIO - .foreachParDiscard(shardsWithoutLease)({ shard => + .foreachParDiscard(shardsWithoutLease) { shard => val lease = Lease( key = shard.shardId, owner = Some(workerId), @@ -306,7 +306,7 @@ private class DefaultLeaseCoordinator( case Left(e) => ZIO.logError(s"Error creating lease: ${e}") *> ZIO.fail(e) } - }) + } .withParallelism(settings.maxParallelLeaseAcquisitions) } yield () diff --git a/core/src/test/scala/nl/vroste/zio/kinesis/client/zionative/NativeConsumerTest.scala b/core/src/test/scala/nl/vroste/zio/kinesis/client/zionative/NativeConsumerTest.scala index b46ae9f4..1745c804 100644 --- a/core/src/test/scala/nl/vroste/zio/kinesis/client/zionative/NativeConsumerTest.scala +++ b/core/src/test/scala/nl/vroste/zio/kinesis/client/zionative/NativeConsumerTest.scala @@ -113,20 +113,20 @@ object NativeConsumerTest extends ZIOSpecDefault { for { producedShardsAndSequence <- produceSampleRecords(streamName, nrRecords, chunkSize = 500) // Deterministic order - _ <- Consumer - .shardedStream( - streamName, - applicationName, - Serde.asciiString, - emitDiagnostic = onDiagnostic("worker1") - ) - .flatMapPar(Int.MaxValue) { case (shard @ _, shardStream, checkpointer) => - shardStream.tap(checkpointer.stage) - } - .take(nrRecords.toLong) - .runCollect - checkpoints <- getCheckpoints(applicationName) - expectedCheckpoints = + _ <- Consumer + .shardedStream( + streamName, + applicationName, + Serde.asciiString, + emitDiagnostic = onDiagnostic("worker1") + ) + .flatMapPar(Int.MaxValue) { case (shard @ _, shardStream, checkpointer) => + shardStream.tap(checkpointer.stage) + } + .take(nrRecords.toLong) + .runCollect + checkpoints <- getCheckpoints(applicationName) + expectedCheckpoints = producedShardsAndSequence.groupBy(_.shardId).view.mapValues(_.last.sequenceNumber).toMap } yield assert(checkpoints)(Assertion.hasSameElements(expectedCheckpoints)) @@ -236,14 +236,14 @@ object NativeConsumerTest extends ZIOSpecDefault { .tap(tp => ZIO.logInfo(s"Got tuple ${tp}")) .take(2) // 5 shards, so we expect 2 // .updateService[Logger[String]](_.named("worker2")) - worker1 <- consumer1.runDrain.tapError(e => ZIO.logError(s"Worker1 failed: ${e}")).fork - _ <- consumer1Started.await - _ <- ZIO.logInfo("Consumer 1 has started, starting consumer 2") - _ <- consumer2.runDrain - _ <- ZIO.logInfo("Shutting down worker 1") - _ <- worker1.interrupt - _ <- ZIO.logInfo("Shutting down producer") - _ <- producer.interrupt + worker1 <- consumer1.runDrain.tapError(e => ZIO.logError(s"Worker1 failed: ${e}")).fork + _ <- consumer1Started.await + _ <- ZIO.logInfo("Consumer 1 has started, starting consumer 2") + _ <- consumer2.runDrain + _ <- ZIO.logInfo("Shutting down worker 1") + _ <- worker1.interrupt + _ <- ZIO.logInfo("Shutting down producer") + _ <- producer.interrupt } yield assertCompletes } @@ -1017,9 +1017,8 @@ object NativeConsumerTest extends ZIOSpecDefault { case (streamName, applicationName) => withStream(streamName, shards = nrShards) { ZIO.scoped[R with LeaseRepository] { - ZIO.addFinalizer(deleteTable(applicationName).ignore) *> { // Table may not have been created + ZIO.addFinalizer(deleteTable(applicationName).ignore) *> // Table may not have been created f(streamName, applicationName) - } } } }