diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala index 170ffa1d02c..16a1d0e90cf 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala @@ -92,7 +92,7 @@ class LogMessagesSpec extends ScalaTestWithActorTestKit(""" logEvent.message should ===("received message Hello") logEvent.mdc should ===(Map("mdc" -> true)) true - case _ ⇒ + case _ => false }, diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala index 89897fd5a5a..d13d40dd6e0 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala @@ -27,7 +27,7 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with case class Probe(message: String, replyTo: ActorRef[String]) def withSystem[T](name: String, behavior: Behavior[T], doTerminate: Boolean = true)( - block: ActorSystem[T] ⇒ Unit): Unit = { + block: ActorSystem[T] => Unit): Unit = { val sys = system(behavior, s"$suite-$name") try { block(sys) @@ -47,7 +47,7 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with withSystem("a", Behaviors.receiveMessage[Probe] { p => p.replyTo ! p.message Behaviors.stopped - }, doTerminate = false) { sys ⇒ + }, doTerminate = false) { sys => val inbox = TestInbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala index d6151a96d16..9001ee94bdf 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala @@ -192,7 +192,7 @@ class IntroSpec extends ScalaTestWithActorTestKit with WordSpecLike { import ChatRoom._ val gabbler: Behavior[SessionEvent] = - Behaviors.setup { context ⇒ + Behaviors.setup { context => Behaviors.receiveMessage { //#chatroom-gabbler // We document that the compiler warns about the missing handler for `SessionDenied` diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/OOIntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/OOIntroSpec.scala index 066abe520cb..075ac6bd650 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/OOIntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/OOIntroSpec.scala @@ -93,12 +93,12 @@ class OOIntroSpec extends ScalaTestWithActorTestKit with WordSpecLike { import ChatRoom._ val gabbler = - Behaviors.setup[SessionEvent] { context ⇒ + Behaviors.setup[SessionEvent] { context => Behaviors.receiveMessage[SessionEvent] { case SessionDenied(reason) => context.log.info("cannot start chat room session: {}", reason) Behaviors.stopped - case SessionGranted(handle) ⇒ + case SessionGranted(handle) => handle ! PostMessage("Hello World!") Behaviors.same case MessagePosted(screenName, message) => @@ -117,7 +117,7 @@ class OOIntroSpec extends ScalaTestWithActorTestKit with WordSpecLike { Behaviors .receiveMessagePartial[String] { - case "go" ⇒ + case "go" => chatRoom ! GetSession("ol’ Gabbler", gabblerRef) Behaviors.same } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 978ad8fcf5e..cc4fb2b1916 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -62,7 +62,7 @@ private[akka] trait DeathWatch { this: ActorCell => } protected def receivedTerminated(t: Terminated): Unit = - terminatedQueued.get(t.actor).foreach { optionalMessage ⇒ + terminatedQueued.get(t.actor).foreach { optionalMessage => terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in receiveMessage(optionalMessage.getOrElse(t)) } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index f49b7f0394d..bcb179b8358 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -121,22 +121,22 @@ object ClusterShardingPersistenceSpec { stashing = false Effect.unstashAll() - case UnstashAllAndPassivate ⇒ + case UnstashAllAndPassivate => stashing = false shard ! Passivate(ctx.self) Effect.unstashAll() }, - eventHandler = (state, evt) ⇒ if (state.isEmpty) evt else state + "|" + evt).receiveSignal { - case (state, RecoveryCompleted) ⇒ + eventHandler = (state, evt) => if (state.isEmpty) evt else state + "|" + evt).receiveSignal { + case (state, RecoveryCompleted) => ctx.log.debug("onRecoveryCompleted: [{}]", state) lifecycleProbes.get(entityId) match { - case null ⇒ ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId) - case p ⇒ p ! s"recoveryCompleted:$state" + case null => ctx.log.debug("no lifecycleProbe (onRecoveryCompleted) for [{}]", entityId) + case p => p ! s"recoveryCompleted:$state" } - case (_, PostStop) ⇒ + case (_, PostStop) => lifecycleProbes.get(entityId) match { - case null ⇒ ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId) - case p ⇒ p ! "stopped" + case null => ctx.log.debug("no lifecycleProbe (postStop) for [{}]", entityId) + case p => p ! "stopped" } } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index f22b28b8cba..bea745c59a3 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -61,8 +61,8 @@ object ClusterShardingSettings { else config.getDuration("passivate-idle-entity-after", MILLISECONDS).millis val lease = config.getString("use-lease") match { - case s if s.isEmpty ⇒ None - case other ⇒ Some(new LeaseUsageSettings(other, config.getDuration("lease-retry-interval").asScala)) + case s if s.isEmpty => None + case other => Some(new LeaseUsageSettings(other, config.getDuration("lease-retry-interval").asScala)) } new ClusterShardingSettings( diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala index 6fe412090fd..fcda200c978 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala @@ -34,7 +34,7 @@ object ShardSpec { class EntityActor extends Actor with ActorLogging { override def receive: Receive = { - case msg ⇒ + case msg => log.info("Msg {}", msg) sender() ! s"ack ${msg}" } @@ -45,11 +45,11 @@ object ShardSpec { case class EntityEnvelope(entityId: Int, msg: Any) val extractEntityId: ShardRegion.ExtractEntityId = { - case EntityEnvelope(id, payload) ⇒ (id.toString, payload) + case EntityEnvelope(id, payload) => (id.toString, payload) } val extractShardId: ShardRegion.ExtractShardId = { - case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString + case EntityEnvelope(id, _) => (id % numberOfShards).toString } case class BadLease(msg: String) extends RuntimeException(msg) with NoStackTrace @@ -111,7 +111,7 @@ class ShardSpec extends AkkaSpec(ShardSpec.config) with ImplicitSender { Shard.props( typeName, shardId, - _ ⇒ Props(new EntityActor()), + _ => Props(new EntityActor()), settings, extractEntityId, extractShardId, diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index c7a0926df2f..e2ab797e22e 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -57,7 +57,7 @@ object ClusterSingletonManagerSettings { */ def apply(config: Config): ClusterSingletonManagerSettings = { val lease = config.getString("use-lease") match { - case s if s.isEmpty ⇒ None + case s if s.isEmpty => None case leaseConfigPath => Some(new LeaseUsageSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala)) } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala index 6979a1b67ff..fac0362be0b 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala @@ -45,23 +45,23 @@ class TestLeaseActor extends Actor with ActorLogging { override def receive = { - case c: Create ⇒ + case c: Create => log.info("Lease created with name {} ownerName {}", c.leaseName, c.ownerName) - case request: LeaseRequest ⇒ + case request: LeaseRequest => log.info("Lease request {} from {}", request, sender()) requests = (sender(), request) :: requests - case GetRequests ⇒ + case GetRequests => sender() ! LeaseRequests(requests.map(_._2)) - case ActionRequest(request, result) ⇒ + case ActionRequest(request, result) => requests.find(_._2 == request) match { - case Some((snd, req)) ⇒ + case Some((snd, req)) => log.info("Actioning request {} to {}", req, result) snd ! result requests = requests.filterNot(_._2 == request) - case None ⇒ + case None => throw new RuntimeException(s"unknown request to action: ${request}. Requests: ${requests}") } @@ -111,6 +111,6 @@ class TestLeaseActorClient(settings: LeaseSettings, system: ExtendedActorSystem) override def checkLease(): Boolean = false - override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = + override def acquire(callback: Option[Throwable] => Unit): Future[Boolean] = (leaseActor ? Acquire(settings.ownerName)).mapTo[Boolean] } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala index 86fa8ef0b9b..4e1bd2311dd 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala @@ -57,7 +57,7 @@ object ClusterSingletonManagerLeaseSpec extends MultiNodeConfig { log.info("Singleton stopping") } override def receive: Receive = { - case msg ⇒ + case msg => sender() ! Response(msg, selfAddress) } } diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala b/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala index 1a26d289432..3920222fa28 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala @@ -65,7 +65,7 @@ class TestLease(settings: LeaseSettings, system: ExtendedActorSystem) extends Le private val nextAcquireResult = new AtomicReference[Future[Boolean]](initialPromise.future) private val nextCheckLeaseResult = new AtomicReference[Boolean](false) - private val currentCallBack = new AtomicReference[Option[Throwable] ⇒ Unit](_ ⇒ ()) + private val currentCallBack = new AtomicReference[Option[Throwable] => Unit](_ => ()) def setNextAcquireResult(next: Future[Boolean]): Unit = nextAcquireResult.set(next) @@ -73,7 +73,7 @@ class TestLease(settings: LeaseSettings, system: ExtendedActorSystem) extends Le def setNextCheckLeaseResult(value: Boolean): Unit = nextCheckLeaseResult.set(value) - def getCurrentCallback(): Option[Throwable] ⇒ Unit = currentCallBack.get() + def getCurrentCallback(): Option[Throwable] => Unit = currentCallBack.get() override def acquire(): Future[Boolean] = { log.info("acquire, current response " + nextAcquireResult) @@ -88,7 +88,7 @@ class TestLease(settings: LeaseSettings, system: ExtendedActorSystem) extends Le override def checkLease(): Boolean = nextCheckLeaseResult.get - override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = { + override def acquire(callback: Option[Throwable] => Unit): Future[Boolean] = { currentCallBack.set(callback) acquire() } diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala index 1cb36f97549..4fe5e5975f3 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala @@ -29,7 +29,7 @@ class ImportantSingleton(lifeCycleProbe: ActorRef) extends Actor with ActorLoggi } override def receive: Receive = { - case msg ⇒ + case msg => sender() ! msg } } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala index 8889af503a2..92bc5bb7ce6 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala @@ -57,7 +57,7 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with withSystem("a", Behaviors.receiveMessage[Probe] { p => p.replyTo ! p.message Behaviors.stopped - }, doTerminate = false) { sys ⇒ + }, doTerminate = false) { sys => val inbox = TestInbox[String]("a") sys ! Probe("hello", inbox.ref) eventually { diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala b/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala index 6c20a9e5a1e..89457d81441 100644 --- a/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala +++ b/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala @@ -14,9 +14,9 @@ object TimeoutSettings { def apply(config: Config): TimeoutSettings = { val heartBeatTimeout = config.getDuration("heartbeat-timeout").asScala val heartBeatInterval = config.getValue("heartbeat-interval").valueType() match { - case ConfigValueType.STRING if config.getString("heartbeat-interval").isEmpty ⇒ + case ConfigValueType.STRING if config.getString("heartbeat-interval").isEmpty => (heartBeatTimeout / 10).max(5.seconds) - case _ ⇒ config.getDuration("heartbeat-interval").asScala + case _ => config.getDuration("heartbeat-interval").asScala } require(heartBeatInterval < (heartBeatTimeout / 2), "heartbeat-interval must be less than half heartbeat-timeout") new TimeoutSettings(heartBeatInterval, heartBeatTimeout, config.getDuration("lease-operation-timeout").asScala) diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala b/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala index 5c545589393..6be3257bb78 100644 --- a/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala +++ b/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala @@ -26,7 +26,7 @@ final private[akka] class LeaseAdapter(delegate: ScalaLease)(implicit val ec: Ex override def acquire(): CompletionStage[java.lang.Boolean] = delegate.acquire().map(Boolean.box).toJava override def acquire(leaseLostCallback: Consumer[Optional[Throwable]]): CompletionStage[java.lang.Boolean] = { - delegate.acquire(o ⇒ leaseLostCallback.accept(o.asJava)).map(Boolean.box).toJava + delegate.acquire(o => leaseLostCallback.accept(o.asJava)).map(Boolean.box).toJava } override def release(): CompletionStage[java.lang.Boolean] = delegate.release().map(Boolean.box).toJava diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala b/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala index e2c78f8f677..923234e2164 100644 --- a/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala +++ b/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala @@ -7,7 +7,7 @@ package akka.coordination.lease.javadsl import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.annotation.ApiMayChange import akka.coordination.lease.internal.LeaseAdapter -import akka.coordination.lease.scaladsl.{ LeaseProvider ⇒ ScalaLeaseProvider } +import akka.coordination.lease.scaladsl.{ LeaseProvider => ScalaLeaseProvider } @ApiMayChange object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider { diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala index 693bd3f1387..d7eca9af66a 100644 --- a/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala +++ b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala @@ -37,7 +37,7 @@ abstract class Lease(val settings: LeaseSettings) { * Implementations should not call leaseLostCallback until after the returned future * has been completed */ - def acquire(leaseLostCallback: Option[Throwable] ⇒ Unit): Future[Boolean] + def acquire(leaseLostCallback: Option[Throwable] => Unit): Future[Boolean] /** * Release the lease so some other owner can acquire it. diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala index 555401145ae..0673e4dfdce 100644 --- a/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala +++ b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala @@ -75,8 +75,8 @@ class LeaseProvider(system: ExtendedActorSystem) extends Extension { f } instance match { - case Success(value) ⇒ value - case Failure(e) ⇒ + case Success(value) => value + case Failure(e) => log.error( e, "Invalid lease configuration for leaseName [{}], configPath [{}] lease-class [{}]. " + diff --git a/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala b/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala index e5ba51e4478..fd04b1c9bd2 100644 --- a/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala +++ b/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala @@ -17,7 +17,7 @@ object LeaseProviderSpec { override def acquire(): Future[Boolean] = Future.successful(false) override def release(): Future[Boolean] = Future.successful(false) override def checkLease(): Boolean = false - override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = Future.successful(false) + override def acquire(callback: Option[Throwable] => Unit): Future[Boolean] = Future.successful(false) } class LeaseB(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) { @@ -25,7 +25,7 @@ object LeaseProviderSpec { override def acquire(): Future[Boolean] = Future.successful(false) override def release(): Future[Boolean] = Future.successful(false) override def checkLease(): Boolean = false - override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = Future.successful(false) + override def acquire(callback: Option[Throwable] => Unit): Future[Boolean] = Future.successful(false) } val config = ConfigFactory.parseString(s""" diff --git a/akka-discovery/src/main/scala/akka/discovery/Discovery.scala b/akka-discovery/src/main/scala/akka/discovery/Discovery.scala index 952f9c68d81..2b2b0a7ebf5 100644 --- a/akka-discovery/src/main/scala/akka/discovery/Discovery.scala +++ b/akka-discovery/src/main/scala/akka/discovery/Discovery.scala @@ -115,7 +115,7 @@ object Discovery extends ExtensionId[Discovery] with ExtensionIdProvider { throw new RuntimeException( "Old version of Akka Discovery from Akka Management found on the classpath. Remove `com.lightbend.akka.discovery:akka-discovery` from the classpath..") } catch { - case _: ClassCastException ⇒ + case _: ClassCastException => throw new RuntimeException( "Old version of Akka Discovery from Akka Management found on the classpath. Remove `com.lightbend.akka.discovery:akka-discovery` from the classpath..") case _: ClassNotFoundException => diff --git a/akka-discovery/src/test/scala/akka/discovery/LookupSpec.scala b/akka-discovery/src/test/scala/akka/discovery/LookupSpec.scala index 619999952e5..7316c0d6c37 100644 --- a/akka-discovery/src/test/scala/akka/discovery/LookupSpec.scala +++ b/akka-discovery/src/test/scala/akka/discovery/LookupSpec.scala @@ -56,7 +56,7 @@ class LookupSpec extends WordSpec with Matchers with OptionValues { } "generate a SRV Lookup from a valid SRV String" in { - srvWithValidDomainNames.foreach { str ⇒ + srvWithValidDomainNames.foreach { str => withClue(s"parsing '$str'") { val lookup = Lookup.parseSrv(str) lookup.portName.value shouldBe "portName" @@ -117,7 +117,7 @@ class LookupSpec extends WordSpec with Matchers with OptionValues { } "return true for any valid SRV String" in { - srvWithValidDomainNames.foreach { str ⇒ + srvWithValidDomainNames.foreach { str => withClue(s"parsing '$str'") { Lookup.isValidSrv(str) shouldBe true } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala index 9876e464a35..531ce4b4eeb 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/DeltaPropagationSelectorSpec.scala @@ -56,7 +56,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation( selfUniqueAddress, false, - Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) + Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L))) selector.collectPropagations() should ===(Map(nodes(0) -> expected)) selector.collectPropagations() should ===(Map.empty[UniqueAddress, DeltaPropagation]) selector.cleanupDeltaEntries() @@ -72,8 +72,8 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation( selfUniqueAddress, false, - Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) - selector.collectPropagations() should ===(Map(nodes(0) -> expected, nodes(1) → expected)) + Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L))) + selector.collectPropagations() should ===(Map(nodes(0) -> expected, nodes(1) -> expected)) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(true) selector.hasDeltaEntries("B") should ===(true) @@ -92,8 +92,8 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck DeltaPropagation( selfUniqueAddress, false, - Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" → Delta(DataEnvelope(deltaB), 1L, 1L))) - selector.collectPropagations() should ===(Map(nodes(0) -> expected1, nodes(1) → expected1)) + Map("A" -> Delta(DataEnvelope(deltaA), 1L, 1L), "B" -> Delta(DataEnvelope(deltaB), 1L, 1L))) + selector.collectPropagations() should ===(Map(nodes(0) -> expected1, nodes(1) -> expected1)) // new update before previous was propagated to all nodes selector.update("C", deltaC) val expected2 = DeltaPropagation( @@ -105,7 +105,7 @@ class DeltaPropagationSelectorSpec extends WordSpec with Matchers with TypeCheck "C" -> Delta(DataEnvelope(deltaC), 1L, 1L))) val expected3 = DeltaPropagation(selfUniqueAddress, false, Map("C" -> Delta(DataEnvelope(deltaC), 1L, 1L))) - selector.collectPropagations() should ===(Map(nodes(2) -> expected2, nodes(0) → expected3)) + selector.collectPropagations() should ===(Map(nodes(2) -> expected2, nodes(0) -> expected3)) selector.cleanupDeltaEntries() selector.hasDeltaEntries("A") should ===(false) selector.hasDeltaEntries("B") should ===(false) diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index b140d9ba49a..01c32a14af2 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -112,27 +112,27 @@ class ReplicatorMessageSerializerSpec checkSerialization(ReadResult(None)) checkSerialization( Status( - Map("A" -> ByteString.fromString("a"), "B" → ByteString.fromString("b")), + Map("A" -> ByteString.fromString("a"), "B" -> ByteString.fromString("b")), chunk = 3, totChunks = 10, Some(17), Some(19))) checkSerialization( Status( - Map("A" -> ByteString.fromString("a"), "B" → ByteString.fromString("b")), + Map("A" -> ByteString.fromString("a"), "B" -> ByteString.fromString("b")), chunk = 3, totChunks = 10, None, // can be None when sending back to a node of version 2.5.21 Some(19))) checkSerialization( Gossip( - Map("A" -> DataEnvelope(data1), "B" → DataEnvelope(GSet() + "b" + "c")), + Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" + "c")), sendBack = true, Some(17), Some(19))) checkSerialization( Gossip( - Map("A" -> DataEnvelope(data1), "B" → DataEnvelope(GSet() + "b" + "c")), + Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" + "c")), sendBack = true, None, // can be None when sending back to a node of version 2.5.21 Some(19))) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index f9243fb095f..51565625600 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -43,9 +43,9 @@ private[akka] final class BehaviorSetup[C, E, S]( val eventHandler: EventSourcedBehavior.EventHandler[S, E], val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity, private val signalHandler: PartialFunction[(S, Signal), Unit], - val tagger: E ⇒ Set[String], + val tagger: E => Set[String], val eventAdapter: EventAdapter[E, _], - val snapshotWhen: (S, E, Long) ⇒ Boolean, + val snapshotWhen: (S, E, Long) => Boolean, val recovery: Recovery, val retention: RetentionCriteria, var holdingRecoveryPermit: Boolean, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 8d6faefdf6d..ff33a1f1dda 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -63,9 +63,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( loggerClass: Class[_], journalPluginId: Option[String] = None, snapshotPluginId: Option[String] = None, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + tagger: Event => Set[String] = (_: Event) => Set.empty[String], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + snapshotWhen: (State, Event, Long) => Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery(), retention: RetentionCriteria = RetentionCriteria.disabled, supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, @@ -87,9 +87,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( val actualSignalHandler: PartialFunction[(State, Signal), Unit] = signalHandler.orElse { // default signal handler is always the fallback - case (_, SnapshotCompleted(meta)) ⇒ + case (_, SnapshotCompleted(meta)) => ctx.log.debug("Save snapshot successful, snapshot metadata [{}].", meta) - case (_, SnapshotFailed(meta, failure)) ⇒ + case (_, SnapshotFailed(meta, failure)) => ctx.log.error(failure, "Save snapshot failed, snapshot metadata [{}].", meta) case (_, DeleteSnapshotsCompleted(DeletionTarget.Individual(meta))) => ctx.log.debug("Persistent snapshot [{}] deleted successfully.", meta) @@ -107,7 +107,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( Behaviors .supervise { - Behaviors.setup[Command] { _ ⇒ + Behaviors.setup[Command] { _ => val eventSourcedSetup = new BehaviorSetup( ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala index 9339866a29f..fe40cf9a5a2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/SignalHandler.scala @@ -49,7 +49,7 @@ final class SignalHandlerBuilder[State] { */ def onSignal[T <: Signal](signalType: Class[T], callback: BiConsumer[State, T]): SignalHandlerBuilder[State] = { val newPF: PartialFunction[(State, Signal), Unit] = { - case (state, t) if signalType.isInstance(t) ⇒ + case (state, t) if signalType.isInstance(t) => callback.accept(state, t.asInstanceOf[T]) } handler = newPF.orElse(handler) @@ -61,7 +61,7 @@ final class SignalHandlerBuilder[State] { */ def onSignal[T <: Signal](signal: T, callback: Consumer[State]): SignalHandlerBuilder[State] = { val newPF: PartialFunction[(State, Signal), Unit] = { - case (state, `signal`) ⇒ + case (state, `signal`) => callback.accept(state) } handler = newPF.orElse(handler) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index ea315824740..e520cd6bc7c 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -102,7 +102,7 @@ class EventSourcedBehaviorFailureSpec EventSourcedBehavior[String, String, String]( pid, "", - (_, cmd) ⇒ { + (_, cmd) => { if (cmd == "wrong") throw TestException("wrong command") probe.tell("persisting") @@ -111,17 +111,17 @@ class EventSourcedBehaviorFailureSpec if (cmd == "wrong-callback") throw TestException("wrong command") } }, - (state, event) ⇒ { + (state, event) => { if (event == "wrong-event") throw TestException("wrong event") probe.tell(event) state + event }).receiveSignal(additionalSignalHandler.orElse { - case (_, RecoveryCompleted) ⇒ + case (_, RecoveryCompleted) => probe.tell("starting") - case (_, PostStop) ⇒ + case (_, PostStop) => probe.tell("stopped") - case (_, PreRestart) ⇒ + case (_, PreRestart) => probe.tell("restarting") }) @@ -132,7 +132,7 @@ class EventSourcedBehaviorFailureSpec val probe = TestProbe[String]() val excProbe = TestProbe[Throwable]() spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref, { - case (_, RecoveryFailed(t)) ⇒ + case (_, RecoveryFailed(t)) => excProbe.ref ! t })) @@ -144,7 +144,7 @@ class EventSourcedBehaviorFailureSpec "handle exceptions from RecoveryFailed signal handler" in { val probe = TestProbe[String]() val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref, { - case (_, RecoveryFailed(_)) ⇒ + case (_, RecoveryFailed(_)) => throw TestException("recovery call back failure") })) pa ! "one" @@ -169,7 +169,7 @@ class EventSourcedBehaviorFailureSpec EventFilter[JournalFailureException](occurrences = 1).intercept { // start again and then the event handler will throw spawn(failingPersistentActor(pid, probe.ref, { - case (_, RecoveryFailed(t)) ⇒ + case (_, RecoveryFailed(t)) => excProbe.ref ! t })) @@ -184,7 +184,7 @@ class EventSourcedBehaviorFailureSpec spawn( Behaviors .supervise(failingPersistentActor(PersistenceId("recovery-ok"), probe.ref, { - case (_, RecoveryCompleted) ⇒ + case (_, RecoveryCompleted) => probe.ref.tell("starting") throw TestException("recovery call back failure") })) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala index fb11e9742ee..9a2e0f6b73c 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorRetentionSpec.scala @@ -127,16 +127,16 @@ object EventSourcedBehaviorRetentionSpec { Effect.none.thenStop() }, - eventHandler = (state, evt) ⇒ + eventHandler = (state, evt) => evt match { - case Incremented(delta) ⇒ + case Incremented(delta) => probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) }).receiveSignal { case (_, RecoveryCompleted) => () - case (_, SnapshotCompleted(metadata)) ⇒ + case (_, SnapshotCompleted(metadata)) => snapshotProbe ! Success(metadata) - case (_, SnapshotFailed(_, failure)) ⇒ + case (_, SnapshotFailed(_, failure)) => snapshotProbe ! Failure(failure) case (_, e: EventSourcedSignal) => retentionProbe ! Success(e) @@ -303,7 +303,7 @@ class EventSourcedBehaviorRetentionSpec val replyProbe = TestProbe[State]() val persistentActor = spawn( - Behaviors.setup[Command](ctx ⇒ + Behaviors.setup[Command](ctx => counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2)))) @@ -336,7 +336,7 @@ class EventSourcedBehaviorRetentionSpec val persistentActor = spawn( Behaviors.setup[Command]( - ctx ⇒ + ctx => counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( // tests the Java API as well RetentionCriteria.snapshotEvery(numberOfEvents = 3, keepNSnapshots = 2).withDeleteEventsOnSnapshot))) @@ -382,7 +382,7 @@ class EventSourcedBehaviorRetentionSpec val persistentActor = spawn( Behaviors.setup[Command]( - ctx ⇒ + ctx => counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 5, keepNSnapshots = 1)))) @@ -419,7 +419,7 @@ class EventSourcedBehaviorRetentionSpec val persistentActor = spawn( Behaviors.setup[Command]( - ctx ⇒ + ctx => counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) .snapshotWhen((_, _, seqNr) => seqNr == 3 || seqNr == 13) .withRetention( @@ -468,7 +468,7 @@ class EventSourcedBehaviorRetentionSpec val replyProbe = TestProbe[State]() val persistentActor = spawn( - Behaviors.setup[Command](ctx ⇒ + Behaviors.setup[Command](ctx => counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref) .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3)))) @@ -508,7 +508,7 @@ class EventSourcedBehaviorRetentionSpec val replyProbe = TestProbe[State]() val persistentActor = spawn( - Behaviors.setup[Command](ctx ⇒ + Behaviors.setup[Command](ctx => counterWithSnapshotAndRetentionProbe(ctx, pid, snapshotProbe.ref, retentionProbe.ref).withRetention( RetentionCriteria.snapshotEvery(numberOfEvents = 1, keepNSnapshots = 3).withDeleteEventsOnSnapshot))) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala index 42c390a30be..35c8c8af665 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala @@ -277,16 +277,16 @@ object EventSourcedBehaviorSpec { Effect.none.thenStop() }, - eventHandler = (state, evt) ⇒ + eventHandler = (state, evt) => evt match { - case Incremented(delta) ⇒ + case Incremented(delta) => probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) }).receiveSignal { - case (_, RecoveryCompleted) ⇒ () - case (_, SnapshotCompleted(metadata)) ⇒ + case (_, RecoveryCompleted) => () + case (_, SnapshotCompleted(metadata)) => snapshotProbe ! Success(metadata) - case (_, SnapshotFailed(_, failure)) ⇒ + case (_, SnapshotFailed(_, failure)) => snapshotProbe ! Failure(failure) } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala index 76ebc6d60be..6977d602f56 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedSequenceNumberSpec.scala @@ -31,15 +31,15 @@ class EventSourcedSequenceNumberSpec system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1))) private def behavior(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = - Behaviors.setup(ctx ⇒ + Behaviors.setup(ctx => EventSourcedBehavior[String, String, String](pid, "", { (_, command) => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onCommand") - Effect.persist(command).thenRun(_ ⇒ probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun")) - }, { (state, evt) ⇒ + Effect.persist(command).thenRun(_ => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " thenRun")) + }, { (state, evt) => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " eventHandler") state + evt }).receiveSignal { - case (_, RecoveryCompleted) ⇒ + case (_, RecoveryCompleted) => probe ! (EventSourcedBehavior.lastSequenceNumber(ctx) + " onRecoveryComplete") }) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/LoggerSourceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/LoggerSourceSpec.scala index ffa03e9b497..b5839608c90 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/LoggerSourceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/LoggerSourceSpec.scala @@ -32,18 +32,18 @@ class LoggerSourceSpec private val pidCounter = new AtomicInteger(0) private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") - def behavior: Behavior[String] = Behaviors.setup { ctx ⇒ + def behavior: Behavior[String] = Behaviors.setup { ctx => ctx.log.info("setting-up-behavior") EventSourcedBehavior[String, String, String](nextPid(), emptyState = "", commandHandler = (_, _) => { ctx.log.info("command-received") Effect.persist("evt") - }, eventHandler = (state, _) ⇒ { + }, eventHandler = (state, _) => { ctx.log.info("event-received") state }).receiveSignal { - case (_, RecoveryCompleted) ⇒ ctx.log.info("recovery-completed") - case (_, SnapshotCompleted(_)) ⇒ - case (_, SnapshotFailed(_, _)) ⇒ + case (_, RecoveryCompleted) => ctx.log.info("recovery-completed") + case (_, SnapshotCompleted(_)) => + case (_, SnapshotFailed(_, _)) => } } @@ -91,7 +91,7 @@ class LoggerSourceSpec def eventFilterFor(logMsg: String) = EventFilter.custom( { - case l: LogEvent if l.message == logMsg ⇒ + case l: LogEvent if l.message == logMsg => if (l.logClass == classOf[LoggerSourceSpec]) true else fail(s"Unexpected log source: ${l.logClass} for message ${l.message}") case _ => false diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala index 5dd2a854783..d955a365e39 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala @@ -44,7 +44,7 @@ class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.co probe.tell("eventHandler:" + state + ":" + event) if (state == null) event else state + event }).receiveSignal { - case (state, RecoveryCompleted) ⇒ + case (state, RecoveryCompleted) => probe.tell("onRecoveryCompleted:" + state) } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index a6263a400ab..3be827d8a72 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -78,11 +78,11 @@ object PerformanceSpec { persistenceId = PersistenceId(name), "", commandHandler = CommandHandler.command { - case StopMeasure ⇒ + case StopMeasure => Effect.none.thenRun(_ => probe.ref ! StopMeasure) - case FailAt(sequence) ⇒ + case FailAt(sequence) => Effect.none.thenRun(_ => parameters.failAt = sequence) - case command ⇒ other(command, parameters) + case command => other(command, parameters) }, eventHandler = { case (state, _) => state diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index 6ee818e2213..cb94b5710c6 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -90,7 +90,7 @@ object PersistentActorCompileOnlyTest { case IntentRecorded(correlationId, data) => EventsInFlight( nextCorrelationId = correlationId + 1, - dataByCorrelationId = state.dataByCorrelationId + (correlationId → data)) + dataByCorrelationId = state.dataByCorrelationId + (correlationId -> data)) case SideEffectAcknowledged(correlationId) => state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId) }).receiveSignal { diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala index 5ce7f799be0..3851808c7cf 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala @@ -87,7 +87,7 @@ object BasicPersistentBehaviorCompileOnly { commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) .receiveSignal { - case (state, RecoveryCompleted) ⇒ + case (state, RecoveryCompleted) => throw new RuntimeException("TODO: add some end-of-recovery side-effect here") } //#recovery @@ -109,7 +109,7 @@ object BasicPersistentBehaviorCompileOnly { commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"), eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state")) .receiveSignal { - case (state, RecoveryCompleted) ⇒ + case (state, RecoveryCompleted) => throw new RuntimeException("TODO: add some end-of-recovery side-effect here") } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/FanoutProcessorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/FanoutProcessorSpec.scala index fe479e398fc..6ba72c16920 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/FanoutProcessorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/FanoutProcessorSpec.scala @@ -96,7 +96,7 @@ class FanoutProcessorSpec extends StreamSpec { val (promise, publisher) = Source.repeat(1).toMat(Sink.asPublisher(true))(Keep.both).run() val publisherRef = publisher.asInstanceOf[ActorPublisher[Int]].impl probe.watch(publisherRef) - Source.fromPublisher(publisher).map(_ ⇒ throw TE("boom")).runWith(Sink.ignore) + Source.fromPublisher(publisher).map(_ => throw TE("boom")).runWith(Sink.ignore) probe.expectTerminated(publisherRef) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala index 17772f29682..a119124f7ab 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/LazySinkSpec.scala @@ -128,7 +128,7 @@ class LazySinkSpec extends StreamSpec { } } - val result = Source(List("whatever")).runWith(Sink.lazyInitAsync[String, NotUsed](() ⇒ { + val result = Source(List("whatever")).runWith(Sink.lazyInitAsync[String, NotUsed](() => { Future.successful(Sink.fromGraph(FailingInnerMat)) })) @@ -139,8 +139,8 @@ class LazySinkSpec extends StreamSpec { "lazily propagate failure" in { case object MyException extends Exception val lazyMatVal = Source(List(1)) - .concat(Source.lazily(() ⇒ Source.failed(MyException))) - .runWith(Sink.lazyInitAsync(() ⇒ Future.successful(Sink.seq[Int]))) + .concat(Source.lazily(() => Source.failed(MyException))) + .runWith(Sink.lazyInitAsync(() => Future.successful(Sink.seq[Int]))) // lazy init async materialized a sink, so we should have a some here val innerMatVal: Future[immutable.Seq[Int]] = lazyMatVal.futureValue.get diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala index fe5df1c46ea..f9507c33f95 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSource.scala @@ -52,12 +52,12 @@ private object ActorRefSource { inheritedAttributes.get[Attributes.Name].map(_.n).getOrElse(super.stageActorName) val ref: ActorRef = getEagerStageActor(eagerMaterializer, poisonPillCompatibility = true) { - case (_, PoisonPill) ⇒ + case (_, PoisonPill) => log.warning("for backwards compatibility: PoisonPill will not be supported in the future") completeStage() - case (_, m) if failureMatcher.isDefinedAt(m) ⇒ + case (_, m) if failureMatcher.isDefinedAt(m) => failStage(failureMatcher(m)) - case (_, m) if completionMatcher.isDefinedAt(m) ⇒ + case (_, m) if completionMatcher.isDefinedAt(m) => completionMatcher(m) match { case CompletionStrategy.Draining => isCompleting = true @@ -65,7 +65,7 @@ private object ActorRefSource { case CompletionStrategy.Immediately => completeStage() } - case (_, m: T @unchecked) ⇒ + case (_, m: T @unchecked) => buffer match { case OptionVal.None => if (isCompleting) { @@ -87,37 +87,37 @@ private object ActorRefSource { tryPush() } else overflowStrategy match { - case s: DropHead ⇒ + case s: DropHead => log.log( s.logLevel, "Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buf.dropHead() buf.enqueue(m) tryPush() - case s: DropTail ⇒ + case s: DropTail => log.log( s.logLevel, "Dropping the tail element because buffer is full and overflowStrategy is: [DropTail]") buf.dropTail() buf.enqueue(m) tryPush() - case s: DropBuffer ⇒ + case s: DropBuffer => log.log( s.logLevel, "Dropping all the buffered elements because buffer is full and overflowStrategy is: [DropBuffer]") buf.clear() buf.enqueue(m) tryPush() - case s: DropNew ⇒ + case s: DropNew => log.log( s.logLevel, "Dropping the new element because buffer is full and overflowStrategy is: [DropNew]") - case s: Fail ⇒ + case s: Fail => log.log(s.logLevel, "Failing because buffer is full and overflowStrategy is: [Fail]") val bufferOverflowException = BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") failStage(bufferOverflowException) - case _: Backpressure ⇒ + case _: Backpressure => // there is a precondition check in Source.actorRefSource factory method to not allow backpressure as strategy failStage(new IllegalStateException("Backpressure is not supported")) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 726a44282fe..2a5cce7edcc 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -220,7 +220,7 @@ object GraphStageLogic { private val functionRef: FunctionRef = { val f: (ActorRef, Any) => Unit = { - case (r, PoisonPill) if poisonPillFallback ⇒ + case (r, PoisonPill) if poisonPillFallback => callback.invoke((r, PoisonPill)) case (_, m @ (PoisonPill | Kill)) => materializer.logger.warning( @@ -1234,9 +1234,9 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: protected[akka] def getEagerStageActor( eagerMaterializer: Materializer, poisonPillCompatibility: Boolean)( // fallback required for source actor backwards compatibility - receive: ((ActorRef, Any)) ⇒ Unit): StageActor = + receive: ((ActorRef, Any)) => Unit): StageActor = _stageActor match { - case null ⇒ + case null => val actorMaterializer = ActorMaterializerHelper.downcast(eagerMaterializer) _stageActor = new StageActor(actorMaterializer, getAsyncCallback, receive, stageActorName, poisonPillCompatibility)