Skip to content

Unify TCP and UnixSocket APIs #3563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 48 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
1cd3d23
Initial wip using builder pattern
mpilquist Apr 8, 2025
5e5b39e
Generalize NeedAddress
mpilquist Apr 8, 2025
5e27b71
Dump builder pattern
mpilquist Apr 8, 2025
a86b18d
wip
mpilquist Apr 8, 2025
c06e969
wip
mpilquist Apr 9, 2025
eff4b58
wip
mpilquist Apr 9, 2025
89362b0
wip
mpilquist Apr 10, 2025
fcd9233
wip
mpilquist Apr 10, 2025
5e912ab
Drop JVM socket group implementations
mpilquist Apr 10, 2025
7081b7a
Unix socket tests
mpilquist Apr 10, 2025
dedda55
Unix socket tests
mpilquist Apr 10, 2025
17ade5c
Cleanup Bind
mpilquist Apr 10, 2025
e2fafad
s/Bind/ServerSocket/
mpilquist Apr 10, 2025
fad0e04
Push old localAddress back down to Socket
mpilquist Apr 10, 2025
598d3ed
Native
mpilquist Apr 11, 2025
b649d00
JS tests passing
mpilquist Apr 16, 2025
4a00566
JS duplication reduction
mpilquist Apr 16, 2025
e22041b
Scalafmt
mpilquist Apr 16, 2025
906d8d5
Fix compilation errors
mpilquist Apr 17, 2025
058e616
Implement getLocalAddressGen on JVM
mpilquist Apr 17, 2025
7648b71
Cleanup
mpilquist Apr 18, 2025
01322d5
Progress on fixing addresses
mpilquist Apr 18, 2025
42773ce
Fixed addresses
mpilquist Apr 18, 2025
6de7fbe
Scalafmt
mpilquist Apr 18, 2025
4399a1a
Test unix socket addresses
mpilquist Apr 18, 2025
18eb5d2
IP address tests
mpilquist Apr 18, 2025
1e9703e
Socket option cleanup
mpilquist Apr 18, 2025
f27f191
Socket option cleanup
mpilquist Apr 18, 2025
5285acb
Socket option cleanup
mpilquist Apr 18, 2025
78bc8cc
Deprecate old socket group methods
mpilquist Apr 18, 2025
a0ab96b
Address cleanup
mpilquist Apr 19, 2025
8c28c69
Add address and peerAddress, deprecate localAddress and remoteAddress
mpilquist Apr 20, 2025
6533318
Deprecate Socket#isOpen
mpilquist Apr 21, 2025
e7ca920
Remove some unnecesssary changes from net facade
mpilquist Apr 21, 2025
f4bc9c8
Cleanup in selecting ip sockets provider
mpilquist Apr 21, 2025
896738a
Unify Network implementations
mpilquist Apr 21, 2025
e47817c
Fix 2.12 compilation
mpilquist Apr 21, 2025
f3c5f70
Update to ip4s 3.7.0
mpilquist Apr 21, 2025
0ee5c57
Fix JS 2.12 compilation
mpilquist Apr 21, 2025
0807afb
Mima fixes
mpilquist Apr 21, 2025
91c25f6
Fix native 2.12 warnings
mpilquist Apr 21, 2025
861f7ea
Fix selecting socket address NPE
mpilquist Apr 21, 2025
7395a77
Fix JVM unix sockets test
mpilquist Apr 21, 2025
8a0df13
Fix site docs
mpilquist Apr 21, 2025
47815b2
Deprecate old UnixSockets
mpilquist Apr 21, 2025
5af859e
Set client socket options on JVM Unix
mpilquist Apr 22, 2025
e09f425
Remove explicit DNS lookups from JS IP socket connect & bind
mpilquist Apr 23, 2025
c063029
Scalafmt
mpilquist Apr 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,55 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[MissingTypesProblem](
"fs2.interop.flow.StreamSubscriber$State$WaitingOnUpstream$"
)
),
// Network refactor: #3563
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.connect"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bind"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Network.bindAndAccept"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.address"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.io.net.Socket.peerAddress"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.address"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.supportedOptions"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.getOption"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.Socket.setOption"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.io.net.SocketCompanionPlatform#AsyncSocket.this"
),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroup$AbstractAsyncSocketGroup"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SocketGroupCompanionPlatform"),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.SocketGroupCompanionPlatform$AsyncSocketGroup"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.address"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.tls.TLSSocket.supportedOptions"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.getOption"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.io.net.tls.TLSSocket.setOption"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSockets"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSockets$"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JdkUnixSocketsImpl"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSockets"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSockets$"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.JnrUnixSocketsImpl"),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform$AsyncSocket"
),
ProblemFilters.exclude[MissingClassProblem](
"fs2.io.net.unixsocket.UnixSocketsCompanionPlatform$AsyncUnixSockets"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.SelectingSocket.apply"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.SelectingSocketGroup"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.io.net.Socket.forAsync"),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.io.net.SocketOptionCompanionPlatform#Key.get"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.net.Network.openDatagramSocket"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("fs2.io.net.FdPollingSocket.apply"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.FdPollingSocketGroup"),
ProblemFilters.exclude[MissingClassProblem]("fs2.io.net.unixsocket.FdPollingUnixSockets")
)

lazy val root = tlCrossRootProject
Expand Down Expand Up @@ -353,7 +401,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform, NativePlatform)
.settings(
name := "fs2-io",
tlVersionIntroduced ~= { _.updated("3", "3.1.0") },
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.6.0",
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.7.0",
tlJdkRelease := None
)
.jvmSettings(
Expand Down
65 changes: 0 additions & 65 deletions io/js-jvm/src/main/scala/fs2/io/net/Network.scala

This file was deleted.

3 changes: 2 additions & 1 deletion io/js/src/main/scala/fs2/io/internal/facade/net.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private[io] object net {
@js.native
trait Server extends EventEmitter {

def address(): ServerAddress = js.native
def address(): js.UndefOr[ServerAddress] = js.native

def listening: Boolean = js.native

Expand Down Expand Up @@ -110,6 +110,7 @@ private[io] object net {

def setTimeout(timeout: Double): Socket = js.native

def timeout: Double = js.native
}

}
181 changes: 181 additions & 0 deletions io/js/src/main/scala/fs2/io/net/AsyncSocketsProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package net

import cats.effect.{Async, Resource}
import cats.effect.std.Dispatcher
import cats.effect.syntax.all._
import cats.syntax.all._
import com.comcast.ip4s.{Host, IpAddress, Port, SocketAddress, UnixSocketAddress}
import fs2.concurrent.Channel
import fs2.io.internal.facade

import scala.scalajs.js

private[net] abstract class AsyncSocketsProvider[F[_]](implicit F: Async[F]) {

private def setSocketOptions(options: List[SocketOption])(socket: facade.net.Socket): F[Unit] =
options.traverse_(option => option.key.set(socket, option.value))

protected def connectIpOrUnix(
to: Either[SocketAddress[Host], UnixSocketAddress],
options: List[SocketOption]
): Resource[F, Socket[F]] =
(for {
sock <- Resource
.make(
F.delay(
new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true })
)
)(sock =>
F.delay {
if (!sock.destroyed)
sock.destroy()
}
)
.evalTap(setSocketOptions(options))

_ <- F
.async[Unit] { cb =>
sock
.registerOneTimeListener[F, js.Error]("error") { error =>
cb(Left(js.JavaScriptException(error)))
} <* F.delay {
to match {
case Left(addr) =>
sock.connect(addr.port.value, addr.host.toString, () => cb(Right(())))
case Right(addr) =>
sock.connect(addr.path, () => cb(Right(())))
}
}
}
.toResource

address = to match {
case Left(_) =>
SocketAddress(
IpAddress.fromString(sock.localAddress.get).get,
Port.fromInt(sock.localPort.get).get
)
case Right(_) => UnixSocketAddress("")
}
peerAddress = to match {
case Left(_) =>
SocketAddress(
IpAddress.fromString(sock.remoteAddress.get).get,
Port.fromInt(sock.remotePort.get).get
)
case Right(addr) => addr
}
socket <- Socket.forAsync(sock, address, peerAddress)
} yield socket).adaptError { case IOException(ex) => ex }

protected def bindIpOrUnix(
address: Either[SocketAddress[Host], UnixSocketAddress],
options: List[SocketOption]
): Resource[F, ServerSocket[F]] =
(for {
dispatcher <- Dispatcher.sequential[F]
channel <- Channel.unbounded[F, facade.net.Socket].toResource
server <- Resource.make(
F
.delay(
facade.net.createServer(
new facade.net.ServerOptions {
pauseOnConnect = true
allowHalfOpen = true
},
sock => dispatcher.unsafeRunAndForget(channel.send(sock))
)
)
)(server =>
F.async[Unit] { cb =>
if (server.listening)
F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) *>
channel.close.as(None)
else
F.delay(cb(Right(()))).as(None)
}
)
_ <- F
.async[Unit] { cb =>
server.registerOneTimeListener[F, js.Error]("error") { e =>
cb(Left(js.JavaScriptException(e)))
} <* F.delay {
address match {
case Left(addr) =>
if (
addr.host.isInstanceOf[IpAddress] && addr.host.asInstanceOf[IpAddress].isWildcard
)
server.listen(addr.port.value, () => cb(Right(())))
else
server.listen(addr.port.value, addr.host.toString, () => cb(Right(())))
case Right(addr) =>
server.listen(addr.path, () => cb(Right(())))
}
}
}
.toResource
serverSocketAddress = address match {
case Left(_) =>
val addr = server.address().get
SocketAddress(IpAddress.fromString(addr.address).get, Port.fromInt(addr.port).get)
case Right(addr) => addr
}
info = new SocketInfo[F] {
val address = serverSocketAddress
private def raiseOptionError[A]: F[A] =
F.raiseError(
new UnsupportedOperationException(
"Node.js server sockets do not support socket options"
)
)
def getOption[A](key: SocketOption.Key[A]) = raiseOptionError
def setOption[A](key: SocketOption.Key[A], value: A) = raiseOptionError
def supportedOptions = F.pure(Set.empty)
}
address0 = address
sockets = channel.stream
.evalTap(setSocketOptions(options))
.flatMap { sock =>
val address = address0 match {
case Left(_) =>
SocketAddress(
IpAddress.fromString(sock.localAddress.get).get,
Port.fromInt(sock.localPort.get).get
)
case Right(addr) => addr
}
val peerAddress = address0 match {
case Left(_) =>
SocketAddress(
IpAddress.fromString(sock.remoteAddress.get).get,
Port.fromInt(sock.remotePort.get).get
)
case Right(_) => UnixSocketAddress("")
}
Stream.resource(Socket.forAsync(sock, address, peerAddress))
}
} yield ServerSocket(info, sockets)).adaptError { case IOException(ex) => ex }
}
46 changes: 46 additions & 0 deletions io/js/src/main/scala/fs2/io/net/IpSocketsProviderPlatform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2
package io
package net

import cats.effect.{Async, Resource}
import com.comcast.ip4s.{Host, SocketAddress}

private[net] trait IpSocketsProviderCompanionPlatform { self: IpSocketsProvider.type =>

private[net] def forAsync[F[_]: Async]: IpSocketsProvider[F] =
new AsyncSocketsProvider[F] with IpSocketsProvider[F] {

override def connect(
address: SocketAddress[Host],
options: List[SocketOption]
): Resource[F, Socket[F]] =
connectIpOrUnix(Left(address), options)

override def bind(
address: SocketAddress[Host],
options: List[SocketOption]
): Resource[F, ServerSocket[F]] =
bindIpOrUnix(Left(address), options)
}
}
Loading