Skip to content

Commit

Permalink
Update scalafmt-core to 3.8.5 (#1100)
Browse files Browse the repository at this point in the history
* Update scalafmt-core to 3.8.5

* Reformat with scalafmt 3.8.5

Executed command: scalafmt --non-interactive

* Add 'Reformat with scalafmt 3.8.5' to .git-blame-ignore-revs
  • Loading branch information
scala-steward authored Jan 19, 2025
1 parent ea3667b commit bceba65
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 40 deletions.
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@

# Scala Steward: Reformat with scalafmt 3.8.2
1bd268889144380ec82c2ddf74b1243d3a05bb42

# Scala Steward: Reformat with scalafmt 3.8.5
237bd566ca80d0cbbaa18e12ba890abac57ab09e
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=3.8.3
version=3.8.5
runner.dialect = scala213
maxColumn = 120
align.preset = most
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ private[client] final class ProducerLive[R, R1, T](
): ZIO[Any, Nothing, (Option[PutRecordsResponse.ReadOnly], Chunk[ProduceRequest])] = {
val totalPayload = batch.map(_.data.length).sum
(for {
_ <- ZIO.logInfo(
s"PutRecords for batch of size ${batch.map(_.aggregateCount).sum} (${batch.size} aggregated). " +
_ <- ZIO.logInfo(
s"PutRecords for batch of size ${batch.map(_.aggregateCount).sum} (${batch.size} aggregated). " +
// s"Payload sizes: ${batch.map(_.data.asByteArrayUnsafe().length).mkString(",")} " +
s"(total = ${totalPayload} = ${totalPayload * 100.0 / maxPayloadSizePerRequest}%)."
)
s"(total = ${totalPayload} = ${totalPayload * 100.0 / maxPayloadSizePerRequest}%)."
)

// Avoid an allocation
response <-
Expand Down Expand Up @@ -228,9 +228,9 @@ private[client] final class ProducerLive[R, R1, T](
ZIO.succeed(requests.map(_.newAttempt))

// TODO backoff for shard limit stuff
_ <- failedQueue
.offerAll(updatedFailed)
.when(newFailed.nonEmpty)
_ <- failedQueue
.offerAll(updatedFailed)
.when(newFailed.nonEmpty)
} yield metrics.addFailures(failedCount)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ private class DefaultLeaseCoordinator(

// Initialization. If it fails, we will try in the loop
// initialShards will have been executed in the background so for efficiency we use it here
_ <- (takeLeases(initialShards).retryN(1).ignore *> periodicRefreshAndTakeLeases)
.ensuring(ZIO.logDebug("Shutting down refresh & take lease loop"))
.forkScoped
_ <- repeatAndRetry(settings.renewInterval)(renewLeases)
.ensuring(ZIO.logDebug("Shutting down renew lease loop"))
.forkScoped
_ <- (takeLeases(initialShards).retryN(1).ignore *> periodicRefreshAndTakeLeases)
.ensuring(ZIO.logDebug("Shutting down refresh & take lease loop"))
.forkScoped
_ <- repeatAndRetry(settings.renewInterval)(renewLeases)
.ensuring(ZIO.logDebug("Shutting down renew lease loop"))
.forkScoped
} yield ())
.tapErrorCause(c => ZIO.logErrorCause("Error in DefaultLeaseCoordinator initialize", c))

Expand Down Expand Up @@ -289,7 +289,7 @@ private class DefaultLeaseCoordinator(
)
.when(shardsWithoutLease.nonEmpty)
_ <- ZIO
.foreachParDiscard(shardsWithoutLease)({ shard =>
.foreachParDiscard(shardsWithoutLease) { shard =>
val lease = Lease(
key = shard.shardId,
owner = Some(workerId),
Expand All @@ -306,7 +306,7 @@ private class DefaultLeaseCoordinator(
case Left(e) =>
ZIO.logError(s"Error creating lease: ${e}") *> ZIO.fail(e)
}
})
}
.withParallelism(settings.maxParallelLeaseAcquisitions)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,20 @@ object NativeConsumerTest extends ZIOSpecDefault {
for {
producedShardsAndSequence <-
produceSampleRecords(streamName, nrRecords, chunkSize = 500) // Deterministic order
_ <- Consumer
.shardedStream(
streamName,
applicationName,
Serde.asciiString,
emitDiagnostic = onDiagnostic("worker1")
)
.flatMapPar(Int.MaxValue) { case (shard @ _, shardStream, checkpointer) =>
shardStream.tap(checkpointer.stage)
}
.take(nrRecords.toLong)
.runCollect
checkpoints <- getCheckpoints(applicationName)
expectedCheckpoints =
_ <- Consumer
.shardedStream(
streamName,
applicationName,
Serde.asciiString,
emitDiagnostic = onDiagnostic("worker1")
)
.flatMapPar(Int.MaxValue) { case (shard @ _, shardStream, checkpointer) =>
shardStream.tap(checkpointer.stage)
}
.take(nrRecords.toLong)
.runCollect
checkpoints <- getCheckpoints(applicationName)
expectedCheckpoints =
producedShardsAndSequence.groupBy(_.shardId).view.mapValues(_.last.sequenceNumber).toMap

} yield assert(checkpoints)(Assertion.hasSameElements(expectedCheckpoints))
Expand Down Expand Up @@ -236,14 +236,14 @@ object NativeConsumerTest extends ZIOSpecDefault {
.tap(tp => ZIO.logInfo(s"Got tuple ${tp}"))
.take(2) // 5 shards, so we expect 2
// .updateService[Logger[String]](_.named("worker2"))
worker1 <- consumer1.runDrain.tapError(e => ZIO.logError(s"Worker1 failed: ${e}")).fork
_ <- consumer1Started.await
_ <- ZIO.logInfo("Consumer 1 has started, starting consumer 2")
_ <- consumer2.runDrain
_ <- ZIO.logInfo("Shutting down worker 1")
_ <- worker1.interrupt
_ <- ZIO.logInfo("Shutting down producer")
_ <- producer.interrupt
worker1 <- consumer1.runDrain.tapError(e => ZIO.logError(s"Worker1 failed: ${e}")).fork
_ <- consumer1Started.await
_ <- ZIO.logInfo("Consumer 1 has started, starting consumer 2")
_ <- consumer2.runDrain
_ <- ZIO.logInfo("Shutting down worker 1")
_ <- worker1.interrupt
_ <- ZIO.logInfo("Shutting down producer")
_ <- producer.interrupt

} yield assertCompletes
}
Expand Down Expand Up @@ -1017,9 +1017,8 @@ object NativeConsumerTest extends ZIOSpecDefault {
case (streamName, applicationName) =>
withStream(streamName, shards = nrShards) {
ZIO.scoped[R with LeaseRepository] {
ZIO.addFinalizer(deleteTable(applicationName).ignore) *> { // Table may not have been created
ZIO.addFinalizer(deleteTable(applicationName).ignore) *> // Table may not have been created
f(streamName, applicationName)
}
}
}
}
Expand Down

0 comments on commit bceba65

Please sign in to comment.