Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply compat changes from latest Pekko #101

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import pekko.Done
import pekko.japi.Pair
import pekko.stream.connectors.amqp._
import pekko.stream.scaladsl.Keep

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpFlow {

Expand All @@ -38,7 +37,7 @@ object AmqpFlow {
*/
def create(
settings: AmqpWriteSettings): pekko.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.toJava).asJava
pekko.stream.connectors.amqp.scaladsl.AmqpFlow(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpFlow` that accepts `WriteMessage` elements and emits `WriteResult`.
Expand All @@ -62,7 +61,7 @@ object AmqpFlow {
settings: AmqpWriteSettings): pekko.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlow
.withConfirm(settings = settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -87,7 +86,7 @@ object AmqpFlow {
settings: AmqpWriteSettings): pekko.stream.javadsl.Flow[WriteMessage, WriteResult, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlow
.withConfirmUnordered(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -109,6 +108,6 @@ object AmqpFlow {
pekko.stream.connectors.amqp.scaladsl.AmqpFlow
.withConfirmAndPassThroughUnordered[T](settings = settings))(Keep.right)
.map { case (writeResult, passThrough) => Pair(writeResult, passThrough) }
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import java.util.concurrent.CompletionStage
import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.amqp._

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpFlowWithContext {

Expand All @@ -33,7 +32,7 @@ object AmqpFlowWithContext {
: pekko.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlowWithContext
.apply(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava

/**
Expand All @@ -50,6 +49,6 @@ object AmqpFlowWithContext {
: pekko.stream.javadsl.FlowWithContext[WriteMessage, T, WriteResult, T, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpFlowWithContext
.withConfirm(settings)
.mapMaterializedValue(_.toJava)
.mapMaterializedValue(_.asJava)
.asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.apache.pekko
import pekko.stream.connectors.amqp._
import pekko.stream.javadsl.Flow
import pekko.util.ByteString

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpRpcFlow {

Expand All @@ -37,7 +36,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.simple(settings, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -49,7 +48,7 @@ object AmqpRpcFlow {
bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -62,7 +61,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.asJava

/**
Expand All @@ -82,7 +81,7 @@ object AmqpRpcFlow {
repliesPerMessage: Int = 1): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] =
pekko.stream.connectors.amqp.scaladsl.AmqpRpcFlow
.committableFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.mapMaterializedValue(f => f.asJava)
.map(cm => new CommittableReadResult(cm))
.asJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.amqp._
import pekko.util.ByteString

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

object AmqpSink {

Expand All @@ -31,7 +30,7 @@ object AmqpSink {
* either normally or because of an amqp failure.
*/
def create(settings: AmqpWriteSettings): pekko.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava
pekko.stream.connectors.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.asJava).asJava

/**
* Creates an `AmqpSink` that accepts `ByteString` elements.
Expand All @@ -42,7 +41,7 @@ object AmqpSink {
def createSimple(
settings: AmqpWriteSettings): pekko.stream.javadsl.Sink[ByteString, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f =>
f.toJava).asJava
f.asJava).asJava

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
Expand All @@ -55,6 +54,6 @@ object AmqpSink {
def createReplyTo(
settings: AmqpReplyToSinkSettings): pekko.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] =
pekko.stream.connectors.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f =>
f.toJava).asJava
f.asJava).asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.connectors.amqp.ReadResult
import pekko.stream.connectors.amqp.scaladsl

import scala.compat.java8.FutureConverters._
import pekko.util.FutureConverters._

final class CommittableReadResult(cm: scaladsl.CommittableReadResult) {
val message: ReadResult = cm.message

def ack(): CompletionStage[Done] = ack(false)
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).toJava
def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).asJava

def nack(): CompletionStage[Done] = nack(false, true)
def nack(multiple: Boolean, requeue: Boolean): CompletionStage[Done] =
cm.nack(multiple, requeue).toJava
cm.nack(multiple, requeue).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import org.apache.pekko
import pekko.stream.connectors.aws.eventbridge.EventBridgePublishSettings
import pekko.stream.scaladsl.{ Flow, Keep, Sink }
import pekko.{ Done, NotUsed }
import pekko.util.FutureConverters._
import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient
import software.amazon.awssdk.services.eventbridge.model._

import scala.concurrent.Future
import scala.compat.java8.FutureConverters._

/**
* Scala API
Expand Down Expand Up @@ -64,7 +64,7 @@ object EventBridgePublisher {
settings: EventBridgePublishSettings)(
implicit eventBridgeClient: EventBridgeAsyncClient): Flow[PutEventsRequest, PutEventsResponse, NotUsed] =
Flow[PutEventsRequest]
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).toScala)
.mapAsync(settings.concurrency)(eventBridgeClient.putEvents(_).asScala)

/**
* Creates a [[pekko.stream.scaladsl.Flow Flow]] to publish messages to an EventBridge.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package org.apache.pekko.stream.connectors.awslambda.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Flow
import pekko.util.FutureConverters._
import software.amazon.awssdk.services.lambda.model.{ InvokeRequest, InvokeResponse }
import software.amazon.awssdk.services.lambda.LambdaAsyncClient
import scala.compat.java8.FutureConverters._

object AwsLambdaFlow {

Expand All @@ -27,6 +27,6 @@ object AwsLambdaFlow {
*/
def apply(
parallelism: Int)(implicit awsLambdaClient: LambdaAsyncClient): Flow[InvokeRequest, InvokeResponse, NotUsed] =
Flow[InvokeRequest].mapAsyncUnordered(parallelism)(awsLambdaClient.invoke(_).toScala)
Flow[InvokeRequest].mapAsyncUnordered(parallelism)(awsLambdaClient.invoke(_).asScala)

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object AzureQueueSink {
*/
private[javadsl] def fromFunction[T](f: T => Unit): Sink[T, CompletionStage[Done]] = {
import pekko.stream.connectors.azure.storagequeue.scaladsl.{ AzureQueueSink => AzureQueueSinkScalaDSL }
import scala.compat.java8.FutureConverters._
AzureQueueSinkScalaDSL.fromFunction(f).mapMaterializedValue(_.toJava).asJava
import pekko.util.FutureConverters._
AzureQueueSinkScalaDSL.fromFunction(f).mapMaterializedValue(_.asJava).asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

package org.apache.pekko.stream.connectors.azure.storagequeue

import org.apache.pekko.util.OptionConverters._

import java.time.{ Duration => JavaDuration }
import java.util.Optional

import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }

/**
Expand Down Expand Up @@ -51,7 +52,7 @@ final class AzureQueueSourceSettings private (
* Java API
*/
def getRetrieveRetryTimeout(): Optional[JavaDuration] =
retrieveRetryTimeout.map(d => JavaDuration.ofNanos(d.toNanos)).asJava
retrieveRetryTimeout.map(d => JavaDuration.ofNanos(d.toNanos)).toJava

private def copy(batchSize: Int = batchSize, retrieveRetryTimeout: Option[FiniteDuration] = retrieveRetryTimeout) =
new AzureQueueSourceSettings(initialVisibilityTimeout, batchSize, retrieveRetryTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ package org.apache.pekko.stream.connectors.cassandra

import java.util.concurrent.CompletionStage

import org.apache.pekko.Done
import org.apache.pekko
import pekko.Done
import pekko.util.FunctionConverters._
import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.CqlSession
import scala.compat.java8.FunctionConverters._
import scala.compat.java8.FutureConverters._

import scala.concurrent.Future

Expand All @@ -39,7 +40,7 @@ class CassandraSessionSettings private (val configPath: String,
* only execute the first.
*/
def withInit(value: java.util.function.Function[CqlSession, CompletionStage[Done]]): CassandraSessionSettings =
copy(init = Some(value.asScala.andThen(_.toScala)))
copy(init = Some(value.asScala.andThen(_.asScala)))

/**
* The `init` function will be performed once when the session is created, i.e.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

package org.apache.pekko.stream.connectors.cassandra

import org.apache.pekko.actor.{ ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem }
import org.apache.pekko
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem }
import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.{ Config, ConfigFactory }

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Failure

Expand Down Expand Up @@ -59,7 +60,7 @@ class DefaultSessionProvider(system: ActorSystem, config: Config) extends CqlSes
} else {
val driverConfig = CqlSessionProvider.driverConfig(system, config)
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfig)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().toScala
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import pekko.ConfigurationException
import pekko.actor.{ ActorSystem, ClassicActorSystemProvider }
import pekko.discovery.Discovery
import pekko.util.JavaDurationConverters._
import pekko.util.FutureConverters._
import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.{ Config, ConfigFactory }

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }

Expand Down Expand Up @@ -72,7 +72,7 @@ private[cassandra] object PekkoDiscoverySessionProvider {
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}]
""").withFallback(CqlSessionProvider.driverConfig(system, config))
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfigWithContactPoints)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().toScala
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().asScala
}
}

Expand Down
Loading