Skip to content

Commit

Permalink
Port Pekko support to sttp3
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed Aug 9, 2023
1 parent 531a2f9 commit b8d002d
Show file tree
Hide file tree
Showing 18 changed files with 100 additions and 98 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ val zio1InteropRsVersion = "1.3.12"
val zio2InteropRsVersion = "2.0.1"

val sttpModelVersion = "1.5.5"
val sttpSharedVersion = "1.3.15"
val sttpSharedVersion = "1.3.16"

val logback = "ch.qos.logback" % "logback-classic" % "1.4.5"

Expand Down Expand Up @@ -562,7 +562,7 @@ lazy val pekkoHttpBackend = (projectMatrix in file("pekko-http-backend"))
)
.dependsOn(core % compileAndTest)
.jvmPlatform(
scalaVersions = scala2 ++ scala3
scalaVersions = scala2alive ++ scala3
)

//-- async http client
Expand Down
20 changes: 10 additions & 10 deletions docs/backends/pekko.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This backend is based on [pekko-http](https://pekko.apache.org/docs/pekko-http/current/). To use, add the following dependency to your project:

```
"com.softwaremill.sttp.client4" %% "pekko-http-backend" % "@VERSION@"
"com.softwaremill.sttp.client3" %% "pekko-http-backend" % "@VERSION@"
```

A fully **asynchronous** backend. Uses the `Future` effect to return responses. There are also [other `Future`-based backends](future.md), which don't depend on Pekko.
Expand All @@ -17,27 +17,27 @@ Note that you'll also need an explicit dependency on pekko-streams, as pekko-htt
Next you'll need to add create the backend instance:

```scala mdoc:compile-only
import sttp.client4.pekkohttp._
import sttp.client3.pekkohttp._
val backend = PekkoHttpBackend()
```

or, if you'd like to use an existing actor system:

```scala mdoc:compile-only
import sttp.client4.pekkohttp._
import sttp.client3.pekkohttp._
import org.apache.pekko.actor.ActorSystem

val actorSystem: ActorSystem = ???
val backend = PekkoHttpBackend.usingActorSystem(actorSystem)
```

This backend supports sending and receiving [pekko-streams](https://pekko.apache.org/docs/pekko/current/stream/index.html) streams. The streams capability is represented as `sttp.client4.pekkohttp.PekkoStreams`.
This backend supports sending and receiving [pekko-streams](https://pekko.apache.org/docs/pekko/current/stream/index.html) streams. The streams capability is represented as `sttp.client3.pekkohttp.PekkoStreams`.

To set the request body as a stream:

```scala mdoc:compile-only
import sttp.capabilities.pekko.PekkoStreams
import sttp.client4._
import sttp.client3._

import org.apache.pekko
import pekko.stream.scaladsl.Source
Expand All @@ -55,8 +55,8 @@ To receive the response body as a stream:
```scala mdoc:compile-only
import scala.concurrent.Future
import sttp.capabilities.pekko.PekkoStreams
import sttp.client4._
import sttp.client4.pekkohttp.PekkoHttpBackend
import sttp.client3._
import sttp.client3.pekkohttp.PekkoHttpBackend

import org.apache.pekko
import pekko.stream.scaladsl.Source
Expand All @@ -82,7 +82,7 @@ That way, you can "mock" a server that the backend will talk to, without startin
If your application provides a client library for its dependants to use, this is a great way to ensure that the client actually matches the routes exposed by your application:

```scala mdoc:compile-only
import sttp.client4.pekkohttp._
import sttp.client3.pekkohttp._
import org.apache.pekko
import pekko.http.scaladsl.server.Route
import pekko.actor.ActorSystem
Expand Down Expand Up @@ -110,9 +110,9 @@ import scala.concurrent.Future
import org.apache.pekko.stream.scaladsl.Source

import sttp.capabilities.pekko.PekkoStreams
import sttp.client4.pekkohttp.PekkoHttpServerSentEvents
import sttp.client3.pekkohttp.PekkoHttpServerSentEvents
import sttp.model.sse.ServerSentEvent
import sttp.client4._
import sttp.client3._

def processEvents(source: Source[ServerSentEvent, Any]): Future[Unit] = ???

Expand Down
4 changes: 2 additions & 2 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ Example code:
Required dependencies:

```scala
libraryDependencies ++= List("com.softwaremill.sttp.client4" %% "pekko-http-backend" % "@VERSION@")
libraryDependencies ++= List("com.softwaremill.sttp.client3" %% "pekko-http-backend" % "@VERSION@")
```

Example code:

```eval_rst
.. literalinclude:: ../../examples/src/main/scala/sttp/client4/examples/WebSocketPekko.scala
.. literalinclude:: ../../examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala
:language: scala
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sttp.client4.examples
package sttp.client3.examples

import sttp.client4._
import sttp.client4.pekkohttp.PekkoHttpBackend
import sttp.client3._
import sttp.client3.pekkohttp.PekkoHttpBackend
import sttp.ws.WebSocket

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down
4 changes: 2 additions & 2 deletions generated-docs/out/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,13 @@ Example code:
Required dependencies:

```scala
libraryDependencies ++= List("com.softwaremill.sttp.client4" %% "pekko-http-backend" % "@VERSION@")
libraryDependencies ++= List("com.softwaremill.sttp.client3" %% "pekko-http-backend" % "@VERSION@")
```

Example code:

```eval_rst
.. literalinclude:: ../../examples/src/main/scala/sttp/client4/examples/WebSocketPekko.scala
.. literalinclude:: ../../examples/src/main/scala/sttp/client3/examples/WebSocketPekko.scala
:language: scala
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttp.client4.pekkohttp
package sttp.client3.pekkohttp

import java.util.concurrent.atomic.AtomicBoolean
import org.apache.pekko
Expand All @@ -9,9 +9,9 @@ import pekko.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import pekko.util.ByteString
import pekko.{Done, NotUsed}
import sttp.capabilities.pekko.PekkoStreams
import sttp.client4._
import sttp.client4.internal._
import sttp.client4.ws.{GotAWebSocketException, NotAWebSocketException}
import sttp.client3._
import sttp.client3.internal._
import sttp.client3.ws.{GotAWebSocketException, NotAWebSocketException}
import sttp.model.{Headers, ResponseMetadata}
import sttp.monad.{FutureMonad, MonadError}
import sttp.ws.{WebSocket, WebSocketBufferFull, WebSocketClosed, WebSocketFrame}
Expand All @@ -21,7 +21,7 @@ import scala.util.Failure

private[pekkohttp] class BodyFromPekko()(implicit ec: ExecutionContext, mat: Materializer, m: MonadError[Future]) {
def apply[T, R](
responseAs: ResponseAsDelegate[T, R],
responseAs: ResponseAs[T, R],
meta: ResponseMetadata,
response: Either[HttpResponse, Promise[Flow[Message, Message, NotUsed]]]
): Future[T] =
Expand Down Expand Up @@ -72,7 +72,7 @@ private[pekkohttp] class BodyFromPekko()(implicit ec: ExecutionContext, mat: Mat
)

override protected def handleWS[T](
responseAs: GenericWebSocketResponseAs[T, _],
responseAs: WebSocketResponseAs[T, _],
meta: ResponseMetadata,
ws: Promise[Flow[Message, Message, NotUsed]]
): Future[T] = wsFromPekko(responseAs, ws, meta)
Expand All @@ -87,7 +87,7 @@ private[pekkohttp] class BodyFromPekko()(implicit ec: ExecutionContext, mat: Mat
}

private def wsFromPekko[T, R](
rr: GenericWebSocketResponseAs[T, R],
rr: WebSocketResponseAs[T, R],
wsFlow: Promise[Flow[Message, Message, NotUsed]],
meta: ResponseMetadata
)(implicit ec: ExecutionContext, mat: Materializer): Future[T] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttp.client4.pekkohttp
package sttp.client3.pekkohttp

import org.apache.pekko
import pekko.http.scaladsl.model.{
Expand All @@ -13,17 +13,17 @@ import pekko.http.scaladsl.model.{
import pekko.stream.scaladsl.{Source, StreamConverters}
import pekko.util.ByteString
import sttp.capabilities.pekko.PekkoStreams
import sttp.client4.internal.throwNestedMultipartNotAllowed
import sttp.client4._
import sttp.client3.internal.throwNestedMultipartNotAllowed
import sttp.client3._
import sttp.model.{HeaderNames, Part}

import scala.collection.immutable.Seq
import scala.util.{Failure, Success, Try}

private[pekkohttp] object BodyToPekko {
def apply[R](
r: GenericRequest[_, R],
body: GenericRequestBody[R],
r: Request[_, R],
body: RequestBody[R],
ar: HttpRequest
): Try[HttpRequest] = {
def ctWithCharset(ct: ContentType, charset: String) =
Expand All @@ -34,7 +34,7 @@ private[pekkohttp] object BodyToPekko {

def contentLength = r.headers.find(_.is(HeaderNames.ContentLength)).flatMap(h => Try(h.value.toLong).toOption)

def toBodyPart(mp: Part[BodyPart[_]]): Try[PekkoMultipart.FormData.BodyPart] = {
def toBodyPart(mp: Part[RequestBody[_]]): Try[PekkoMultipart.FormData.BodyPart] = {
def streamPartEntity(contentType: ContentType, s: PekkoStreams.BinaryStream) =
mp.contentLength match {
case None => HttpEntity.IndefiniteLength(contentType, s)
Expand All @@ -49,6 +49,8 @@ private[pekkohttp] object BodyToPekko {
case isb: InputStreamBody => streamPartEntity(ct, StreamConverters.fromInputStream(() => isb.b))
case FileBody(b, _) => HttpEntity.fromPath(ct, b.toPath)
case StreamBody(b) => streamPartEntity(ct, b.asInstanceOf[PekkoStreams.BinaryStream])
case MultipartBody(_) => throwNestedMultipartNotAllowed
case NoBody => HttpEntity.Empty
}

for {
Expand Down Expand Up @@ -82,7 +84,7 @@ private[pekkohttp] object BodyToPekko {
}

private def multipartEntity(
r: GenericRequest[_, _],
r: Request[_, _],
bodyParts: Seq[PekkoMultipart.FormData.BodyPart]
): Try[RequestEntity] =
r.headers.find(Util.isContentType) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package sttp.client4.pekkohttp
package sttp.client3.pekkohttp

import org.apache.pekko
import pekko.http.scaladsl.model.HttpResponse
import sttp.client4.{GenericRequest, SttpClientException}
import sttp.client3.{Request, SttpClientException}
import sttp.model.{Header, HeaderNames}

import scala.collection.immutable.Seq
Expand All @@ -16,7 +16,7 @@ private[pekkohttp] object FromPekko {
ch :: (cl.toList ++ other)
}

def exception(request: GenericRequest[_, _], e: Exception): Option[Exception] =
def exception(request: Request[_, _], e: Exception): Option[Exception] =
e match {
case e: pekko.stream.ConnectionException => Some(new SttpClientException.ConnectException(request, e))
case e: pekko.stream.StreamTcpException =>
Expand Down
Loading

0 comments on commit b8d002d

Please sign in to comment.