Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ object Dependencies {
// needs to be inline with the aeron version, check
// https://github.com/real-logic/aeron/blob/1.x.y/build.gradle
val agronaVersion = "1.15.1"
val nettyVersion = "3.10.6.Final"
val netty3Version = "3.10.6.Final"
val netty4Version = "4.1.100.Final"
val protobufJavaVersion = "3.16.3"
val logbackVersion = "1.2.11"

Expand Down Expand Up @@ -57,7 +58,9 @@ object Dependencies {
// Compile

val config = "com.typesafe" % "config" % "1.4.3"
val netty = "io.netty" % "netty" % nettyVersion
val netty3 = "io.netty" % "netty" % netty3Version
val `netty4-transport` = "io.netty" % "netty-transport" % netty4Version
val `netty4-handler` = "io.netty" % "netty-handler" % netty4Version

val scalaReflect = ScalaVersionDependentModuleID.versioned("org.scala-lang" % "scala-reflect" % _)

Expand Down Expand Up @@ -272,7 +275,7 @@ object Dependencies {
Compile.slf4jApi,
TestDependencies.scalatest.value)

val remoteDependencies = Seq(netty, aeronDriver, aeronClient)
val remoteDependencies = Seq(netty3, `netty4-transport`, `netty4-handler`, aeronDriver, aeronClient)
val remoteOptionalDependencies = remoteDependencies.map(_ % "optional")

val remote = l ++= Seq(
Expand All @@ -284,7 +287,7 @@ object Dependencies {

val remoteTests = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value) ++ remoteDependencies

val multiNodeTestkit = l ++= Seq(netty)
val multiNodeTestkit = l ++= Seq(netty3)

val cluster = l ++= Seq(TestDependencies.junit, TestDependencies.scalatest.value)

Expand Down
2 changes: 1 addition & 1 deletion project/Paradox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ object Paradox {
"scalatest.version" -> Dependencies.scalaTestVersion,
"sigar_loader.version" -> "1.6.6-rev002",
"aeron_version" -> Dependencies.aeronVersion,
"netty_version" -> Dependencies.nettyVersion,
"netty_version" -> Dependencies.netty3Version,
"logback_version" -> Dependencies.logbackVersion))

val rootsSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.remote.classic

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.testkit.SocketUtil
import org.jboss.netty.channel.ChannelException
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class RemotingNetty4FailedToBindSpec extends AnyWordSpec with Matchers {

"an ActorSystem" must {
"not start if port is taken (Netty 4)" in {
val port = SocketUtil.temporaryLocalPort()
val config = ConfigFactory.parseString(s"""
|pekko {
| actor {
| provider = remote
| }
| remote.artery.enabled = off
| remote.classic.netty {
| ssl {
| ssl-engine-provider = org.apache.pekko.remote.transport.netty4.ConfigSSLEngineProvider
| }
| tcp {
| hostname = "127.0.0.1"
| port = $port
| transport-class = "org.apache.pekko.remote.transport.netty4.NettyTransport"
| }
| }
|}
""".stripMargin)
val as = ActorSystem("RemotingFailedToBindSpec", config)
try {
val ex = intercept[ChannelException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should startWith("Failed to bind")
} finally {
as.terminate()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.remote.transport.netty4

import java.nio.channels.ClosedChannelException

import scala.annotation.nowarn
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.PekkoException
import pekko.util.unused

import io.netty.buffer.ByteBuf
import io.netty.channel.{ ChannelHandlerContext, SimpleChannelInboundHandler }

/**
* INTERNAL API
*/
private[netty4] trait NettyHelpers {

protected def onConnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onDisconnect(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onOpen(@unused ctx: ChannelHandlerContext): Unit = ()

protected def onMessage(@unused ctx: ChannelHandlerContext, @unused msg: ByteBuf): Unit = ()

protected def onException(@unused ctx: ChannelHandlerContext, @unused e: Throwable): Unit = ()

final protected def transformException(ctx: ChannelHandlerContext, ex: Throwable): Unit = {
val cause = if (ex ne null) ex else new PekkoException("Unknown cause")
cause match {
case _: ClosedChannelException => // Ignore
case null | NonFatal(_) => onException(ctx, ex)
case e: Throwable => throw e // Rethrow fatals
}
}
}

/**
* INTERNAL API
*/
private[netty4] abstract class NettyChannelHandlerAdapter extends SimpleChannelInboundHandler[ByteBuf]
with NettyHelpers {
final override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBuf): Unit = {
onMessage(ctx, msg)
}

@nowarn("msg=deprecated")
final override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
transformException(ctx, cause)
}

final override def channelActive(ctx: ChannelHandlerContext): Unit = {
onOpen(ctx)
onConnect(ctx)
}

final override def channelInactive(ctx: ChannelHandlerContext): Unit = {
onDisconnect(ctx)
}
}

/**
* INTERNAL API
*/
private[netty4] trait NettyServerHelpers extends NettyChannelHandlerAdapter

/**
* INTERNAL API
*/
private[netty4] trait NettyClientHelpers extends NettyChannelHandlerAdapter
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.remote.transport.netty4

import scala.annotation.nowarn

import com.typesafe.config.Config
import org.apache.pekko
import pekko.japi.Util._
import pekko.util.ccompat._

import io.netty.channel.Channel
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.Future

/**
* INTERNAL API
*/
private[pekko] class SSLSettings(config: Config) {

import config.getBoolean
import config.getString
import config.getStringList

val SSLKeyStore = getString("key-store")
val SSLTrustStore = getString("trust-store")
val SSLKeyStorePassword = getString("key-store-password")
val SSLKeyPassword = getString("key-password")

val SSLTrustStorePassword = getString("trust-store-password")

val SSLEnabledAlgorithms = immutableSeq(getStringList("enabled-algorithms")).to(Set)

val SSLProtocol = getString("protocol")

val SSLRandomNumberGenerator = getString("random-number-generator")

val SSLRequireMutualAuthentication = getBoolean("require-mutual-authentication")

}

/**
* INTERNAL API
*
* Used for adding SSL support to Netty pipeline.
* The `SSLEngine` is created via the configured [[SSLEngineProvider]].
*/
@ccompatUsedUntil213
@nowarn("msg=deprecated")
private[pekko] object NettySSLSupport {

/**
* Construct a SSLHandler which can be inserted into a Netty server/client pipeline
*/
def apply(sslEngineProvider: SSLEngineProvider, isClient: Boolean): SslHandler = {
val sslEngine =
if (isClient) sslEngineProvider.createClientSSLEngine()
else sslEngineProvider.createServerSSLEngine()
val handler = new SslHandler(sslEngine)
handler.handshakeFuture().addListener((future: Future[Channel]) => {
if (!future.isSuccess) {
handler.closeOutbound().channel().close()
}
})
handler
}
}
Loading