diff --git a/build.sbt b/build.sbt index 07cfd2977d..923527dcdb 100644 --- a/build.sbt +++ b/build.sbt @@ -150,7 +150,7 @@ val zio2Version = "2.1.11" val zio1InteropRsVersion = "1.3.12" val zio2InteropRsVersion = "2.0.2" -val oxVersion = "0.4.0" +val oxVersion = "0.5.1" val sttpModelVersion = "1.7.11" val sttpSharedVersion = "1.3.22" diff --git a/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala b/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala index 4d595d8d5c..ca5d275457 100644 --- a/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala +++ b/effects/ox/src/main/scala/sttp/client4/impl/ox/sse/OxServerSentEvents.scala @@ -1,14 +1,13 @@ package sttp.client4.impl.ox.sse -import ox.* -import ox.channels.Source +import ox.flow.Flow import sttp.model.sse.ServerSentEvent import java.io.InputStream object OxServerSentEvents: - def parse(is: InputStream)(using Ox): Source[ServerSentEvent] = - Source + def parse(is: InputStream): Flow[ServerSentEvent] = + Flow .fromInputStream(is) .linesUtf8 .mapStatefulConcat(() => List.empty[String])( @@ -18,5 +17,5 @@ object OxServerSentEvents: else None } ) - .filterAsView(_.nonEmpty) + .filter(_.nonEmpty) .map(ServerSentEvent.parse) diff --git a/effects/ox/src/main/scala/sttp/client4/impl/ox/ws/OxWebSockets.scala b/effects/ox/src/main/scala/sttp/client4/impl/ox/ws/OxWebSockets.scala index 4dd9dca216..7d25ee686b 100644 --- a/effects/ox/src/main/scala/sttp/client4/impl/ox/ws/OxWebSockets.scala +++ b/effects/ox/src/main/scala/sttp/client4/impl/ox/ws/OxWebSockets.scala @@ -6,21 +6,23 @@ import sttp.client4.ws.SyncWebSocket import sttp.ws.WebSocketFrame import scala.util.control.NonFatal +import ox.flow.Flow -/** Converts a [[SyncWebSocket]] into a pair of `Source` of server responses and a `Sink` for client requests. The +/** Converts a [[SyncWebSocket]] into a pair of [[Source]] of server responses and a [[Sink]] for client requests. The * `Source` starts receiving frames immediately, its internal buffer size can be adjusted with an implicit - * [[ox.channels.StageCapacity]]. Make sure that the `Source` is contiunually read. This will guarantee that - * server-side Close signal is received and handled. If you don't want to process frames from the server, you can at + * [[ox.channels.BufferCapacity]]. Make sure that the `Source` is contiunually read. This will guarantee that + * server-side close signal is received and handled. If you don't want to process frames from the server, you can at * least handle it with a `fork { source.drain() }`. * * You don't need to manually call `ws.close()` when using this approach, this will be handled automatically * underneath, according to following rules: - * - If the request `Sink` is closed due to an upstream error, a Close frame is sent, and the `Source` with incoming - * responses gets completed as `Done`. - * - If the request `Sink` completes as `Done`, a `Close` frame is sent, and the response `Sink` keeps receiving + * - If the request sink is closed due to an upstream error, a close frame is sent. The response sink keeps receiving + * responses, until the enclosing [[Ox]] scope ends (that is controlled by the caller). When this happens, the fork + * which populates the response channel will be interrupted. + * - If the request sink completes as done, a close frame is sent. As above, the response sink keeps receiving * responses until the server closes communication. - * - If the response `Source` is closed by a Close frome from the server or due to an error, the request Sink is - * closed as `Done`, which will still send all outstanding buffered frames, and then finish. + * - If the response source is closed by a close frame from the server or due to an error, the request sink is closed + * as done. This will attempt to send all outstanding buffered frames, unless the enclosing scope ends beforehand). * * @param ws * a `SyncWebSocket` where the underlying `Sink` will send requests, and where the `Source` will pull responses from. @@ -29,38 +31,30 @@ import scala.util.control.NonFatal */ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(using Ox, - StageCapacity + BufferCapacity ): (Source[WebSocketFrame], Sink[WebSocketFrame]) = - val requestsChannel = StageCapacity.newChannel[WebSocketFrame] - val responsesChannel = StageCapacity.newChannel[WebSocketFrame] - fork { - try + val requestsChannel = BufferCapacity.newChannel[WebSocketFrame] + + val responsesChannel = Flow + .usingEmit[WebSocketFrame] { emit => repeatWhile { ws.receive() match - case frame: WebSocketFrame.Data[_] => - responsesChannel.sendOrClosed(frame) match - case _: ChannelClosed => false - case _ => true - case WebSocketFrame.Close(status, msg) if status > 1001 => - responsesChannel.errorOrClosed(new WebSocketClosedWithError(status, msg)).discard - false - case _: WebSocketFrame.Close => - responsesChannel.doneOrClosed().discard - false + case frame: WebSocketFrame.Data[_] => emit(frame); true + case WebSocketFrame.Close(status, msg) if status > 1001 => throw new WebSocketClosedWithError(status, msg) + case _: WebSocketFrame.Close => false case ping: WebSocketFrame.Ping => requestsChannel.sendOrClosed(WebSocketFrame.Pong(ping.payload)).discard - // Keep receiving even if pong couldn't be send due to closed request channel. We want to process + // Keep receiving even if pong couldn't be sent due to closed request channel. We want to process // whatever responses there are still coming from the server until it signals the end with a Close frome. true case _: WebSocketFrame.Pong => // ignore pongs true } - catch - case NonFatal(err) => - responsesChannel.errorOrClosed(err).discard - finally requestsChannel.doneOrClosed().discard - }.discard + } + .pipe(optionallyConcatenateFrames(_, concatenateFragmented)) + .onComplete(requestsChannel.doneOrClosed().discard) + .runToChannel() fork { try @@ -78,7 +72,7 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(us case ChannelClosed.Error(err) => // There's no proper "client error" status. Statuses 4000+ are available for custom cases ws.send(WebSocketFrame.Close(4000, "Client error")) - responsesChannel.doneOrClosed().discard + // Assuming the responsesChannel fork will get interrupted because the enclosing scope will end false } catch @@ -87,17 +81,17 @@ def asSourceAndSink(ws: SyncWebSocket, concatenateFragmented: Boolean = true)(us if (!responsesChannel.isClosedForReceive) requestsChannel.errorOrClosed(err).discard }.discard - (optionallyConcatenateFrames(responsesChannel, concatenateFragmented), requestsChannel) + (responsesChannel, requestsChannel) final case class WebSocketClosedWithError(statusCode: Int, msg: String) extends Exception(s"WebSocket closed with status $statusCode: $msg") -private def optionallyConcatenateFrames(s: Source[WebSocketFrame], doConcatenate: Boolean)(using +private def optionallyConcatenateFrames(f: Flow[WebSocketFrame], doConcatenate: Boolean)(using Ox -): Source[WebSocketFrame] = +): Flow[WebSocketFrame] = if doConcatenate then type Accumulator = Option[Either[Array[Byte], String]] - s.mapStateful(() => None: Accumulator) { + f.mapStateful(() => None: Accumulator) { case (None, f: WebSocketFrame.Ping) => (None, Some(f)) case (None, f: WebSocketFrame.Pong) => (None, Some(f)) case (None, f: WebSocketFrame.Close) => (None, Some(f)) @@ -115,5 +109,5 @@ private def optionallyConcatenateFrames(s: Source[WebSocketFrame], doConcatenate s"Unexpected WebSocket frame received during concatenation. Frame received: ${f.getClass .getSimpleName()}, accumulator type: ${acc.map(_.getClass.getSimpleName)}" ) - }.collectAsView { case Some(f: WebSocketFrame) => f } - else s + }.collect { case Some(f: WebSocketFrame) => f } + else f diff --git a/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala b/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala index a8e6af2da8..9114413e49 100644 --- a/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala +++ b/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxSseTest.scala @@ -3,7 +3,6 @@ package sttp.client4.impl.ox.sse import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import ox.* import sttp.client4.* import sttp.client4.testing.HttpTest.* import sttp.model.sse.ServerSentEvent @@ -14,7 +13,7 @@ class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAft behavior of "OxServerSentEvents" - it should "parse SSEs" in supervised { + it should "parse SSEs" in { val sseData = "text1 in line1\ntext2 in line2" val expectedEvent = ServerSentEvent(data = Some(sseData), eventType = Some("test-event"), retry = Some(42000)) val expectedEvents = @@ -23,7 +22,7 @@ class OxServerSentEventsTest extends AnyFlatSpec with Matchers with BeforeAndAft .post(uri"$endpoint/sse/echo3") .body(sseData) .response(asInputStreamAlways { is => - OxServerSentEvents.parse(is).take(3).toList shouldBe expectedEvents + OxServerSentEvents.parse(is).take(3).runToList() shouldBe expectedEvents () }) .send(backend) diff --git a/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxWebSocketsTest.scala b/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxWebSocketsTest.scala index b739f217e5..60c71f55e1 100644 --- a/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxWebSocketsTest.scala +++ b/effects/ox/src/test/scala/sttp/client4/impl/ox/ws/OxWebSocketsTest.scala @@ -7,6 +7,7 @@ import org.scalatest.concurrent.Eventually import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import ox.* +import ox.flow.Flow import ox.channels.ChannelClosed import ox.channels.Sink import ox.channels.Source @@ -62,7 +63,7 @@ class OxWebSocketTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers w val (wsSource, wsSink) = asSourceAndSink(ws) wsSink.send(WebSocketFrame.text("test1")) wsSink.error(new Exception("failed source")) - eventually(wsSource.isClosedForReceiveDetail shouldBe Some(ChannelClosed.Done)) + eventually(wsSource.isClosedForReceiveDetail should matchPattern { case Some(ChannelClosed.Error(_)) => }) }) .send(backend) } @@ -117,7 +118,7 @@ class OxWebSocketTest extends AnyFlatSpec with BeforeAndAfterAll with Matchers w .response(asWebSocket { ws => val (wsSource, wsSink) = asSourceAndSink(ws, concatenateFragmented = false) sendText(wsSink, 1) - wsSource.take(3).toList shouldBe List( + Flow.fromSource(wsSource).take(3).runToList() shouldBe List( WebSocketFrame.Text("echo: ", false, None), WebSocketFrame.Text("test1", false, None), WebSocketFrame.Text("", true, None)