Skip to content

Commit

Permalink
Apply compat changes from latest Pekko
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed May 11, 2023
1 parent a0c6a19 commit 42fc9ef
Show file tree
Hide file tree
Showing 95 changed files with 645 additions and 670 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import pekko.Done
import pekko.japi.Pair
import pekko.stream.connectors.amqp._
import pekko.stream.scaladsl.Keep

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpFlow {

Expand All @@ -38,7 +37,7 @@ object AmqpFlow {
*/
def create(
settings: AmqpWriteSettings): pekko.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.toJava).asJava
pekko.stream.connectors.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpFlow` that accepts `WriteMessage` elements and emits `WriteResult`.
Expand All @@ -62,7 +61,7 @@ object AmqpFlow {
settings: AmqpWriteSettings): pekko.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlow
.withConfirm(settings = settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -87,7 +86,7 @@ object AmqpFlow {
settings: AmqpWriteSettings): pekko.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlow
.withConfirmUnordered(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -109,6 +108,6 @@ object AmqpFlow {
pekko.stream.connectors.amqp.scaladsl.AmqpFlow
.withConfirmAndPassThroughUnordered[T](settings = settings))(Keep.right)
.map { case (writeResult, passThrough) => Pair(writeResult, passThrough) }
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import java.util.concurrent.CompletionStage
import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.amqp._

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpFlowWithContext {

Expand All @@ -33,7 +32,7 @@ object AmqpFlowWithContext {
: pekko.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlowWithContext
.apply(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -50,6 +49,6 @@ object AmqpFlowWithContext {
: pekko.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlowWithContext
.withConfirm(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.apache.pekko
import pekko.stream.connectors.amqp._
import pekko.stream.javadsl.Flow
import pekko.util.ByteString

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpRpcFlow {

Expand All @@ -37,7 +36,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.simple(settings, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -49,7 +48,7 @@ object AmqpRpcFlow {
bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -62,7 +61,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -82,7 +81,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int = 1): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.committableFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.map(cm => new CommittableReadResult(cm))
.asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.amqp._
import pekko.util.ByteString

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpSink {

Expand All @@ -31,7 +30,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def create(settings: AmqpWriteSettings): pekko.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava
pekko.stream.connectors.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpSink` that accepts `ByteString` elements.
Expand All @@ -42,7 +41,7 @@ object AmqpSink {
def createSimple(
settings: AmqpWriteSettings): pekko.stream.javadsl.Sink[ByteString, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f =>
f.toJava).asJava
f.asJava).asJava

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
Expand All @@ -55,6 +54,6 @@ object AmqpSink {
def createReplyTo(
settings: AmqpReplyToSinkSettings): pekko.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f =>
f.toJava).asJava
f.asJava).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.amqp.ReadResult
import pekko.stream.connectors.amqp.scaladsl

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

final class CommittableReadResult(cm: scaladsl.CommittableReadResult) {
val message: ReadResult = cm.message

def ack(): CompletionStage[Done] = ack(false)
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).toJava
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).asJava

def nack(): CompletionStage[Done] = nack(false, true)
def nack(multiple: Boolean, requeue: Boolean): CompletionStage[Done] =
cm.nack(multiple, requeue).toJava
cm.nack(multiple, requeue).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import org.apache.pekko
import pekko.stream.connectors.aws.eventbridge.EventBridgePublishSettings
import pekko.stream.scaladsl.{ Flow, Keep, Sink }
import pekko.{ Done, NotUsed }
import pekko.util.FutureConverters._
import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient
import software.amazon.awssdk.services.eventbridge.model._

import scala.concurrent.Future
import scala.compat.java8.FutureConverters._

/**
* Scala API
Expand Down Expand Up @@ -64,7 +64,7 @@ object EventBridgePublisher {
settings: EventBridgePublishSettings)(
implicit eventBridgeClient: EventBridgeAsyncClient): Flow[PutEventsRequest, PutEventsResponse, NotUsed] =
Flow[PutEventsRequest]
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).toScala)
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).asScala)

/**
* Creates a [[pekko.stream.scaladsl.Flow Flow]] to publish messages to an EventBridge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package org.apache.pekko.stream.connectors.awslambda.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Flow
import pekko.util.FutureConverters._
import software.amazon.awssdk.services.lambda.model.{ InvokeRequest, InvokeResponse }
import software.amazon.awssdk.services.lambda.LambdaAsyncClient
import scala.compat.java8.FutureConverters._

object AwsLambdaFlow {

Expand All @@ -27,6 +27,6 @@ object AwsLambdaFlow {
*/
def apply(
parallelism: Int)(implicit awsLambdaClient: LambdaAsyncClient): Flow[InvokeRequest, InvokeResponse, NotUsed] =
Flow[InvokeRequest].mapAsyncUnordered(parallelism)(awsLambdaClient.invoke(_).toScala)
Flow[InvokeRequest].mapAsyncUnordered(parallelism)(awsLambdaClient.invoke(_).asScala)

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object AzureQueueSink {
*/
private[javadsl] def fromFunction[T](f: T => Unit): Sink[T, CompletionStage[Done]] = {
import pekko.stream.connectors.azure.storagequeue.scaladsl.{ AzureQueueSink => AzureQueueSinkScalaDSL }
import scala.compat.java8.FutureConverters._
AzureQueueSinkScalaDSL.fromFunction(f).mapMaterializedValue(_.toJava).asJava
import pekko.util.FutureConverters._
AzureQueueSinkScalaDSL.fromFunction(f).mapMaterializedValue(_.asJava).asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

package org.apache.pekko.stream.connectors.azure.storagequeue

import org.apache.pekko.util.OptionConverters._

import java.time.{ Duration => JavaDuration }
import java.util.Optional

import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }

/**
Expand Down Expand Up @@ -51,7 +52,7 @@ final class AzureQueueSourceSettings private (
* Java API
*/
def getRetrieveRetryTimeout(): Optional[JavaDuration] =
retrieveRetryTimeout.map(d => JavaDuration.ofNanos(d.toNanos)).asJava
retrieveRetryTimeout.map(d => JavaDuration.ofNanos(d.toNanos)).toJava

private def copy(batchSize: Int = batchSize, retrieveRetryTimeout: Option[FiniteDuration] = retrieveRetryTimeout) =
new AzureQueueSourceSettings(initialVisibilityTimeout, batchSize, retrieveRetryTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package org.apache.pekko.stream.connectors.cassandra

import java.util.concurrent.CompletionStage

import org.apache.pekko.Done
import org.apache.pekko
import pekko.Done
import pekko.util.FunctionConverters._
import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.CqlSession
import scala.compat.java8.FunctionConverters._
import scala.compat.java8.FutureConverters._

import scala.concurrent.Future

Expand All @@ -39,7 +40,7 @@ class CassandraSessionSettings private (val configPath: String,
* only execute the first.
*/
def withInit(value: java.util.function.Function[CqlSession, CompletionStage[Done]]): CassandraSessionSettings =
copy(init = Some(value.asScala.andThen(_.toScala)))
copy(init = Some(value.asScala.andThen(_.asScala)))

/**
* The `init` function will be performed once when the session is created, i.e.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

package org.apache.pekko.stream.connectors.cassandra

import org.apache.pekko.actor.{ ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem }
import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem }
import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.{ Config, ConfigFactory }

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Failure

Expand Down Expand Up @@ -59,7 +60,7 @@ class DefaultSessionProvider(system: ActorSystem, config: Config) extends CqlSes
} else {
val driverConfig = CqlSessionProvider.driverConfig(system, config)
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfig)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().toScala
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import pekko.ConfigurationException
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.discovery.Discovery
import pekko.util.JavaDurationConverters._
import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.{ Config, ConfigFactory }

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }

Expand Down Expand Up @@ -72,7 +72,7 @@ private[cassandra] object PekkoDiscoverySessionProvider {
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}]
""").withFallback(CqlSessionProvider.driverConfig(system, config))
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfigWithContactPoints)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().toScala
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
}
}

Expand Down
Loading

0 comments on commit 42fc9ef

Please sign in to comment.