Skip to content

Commit

Permalink
Remove the use of deprecated type DirectProcessor in tests and samples (
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy authored Sep 17, 2024
1 parent d0f3d76 commit f777bba
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -298,7 +297,7 @@ class MachineInformation implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(MachineInformation.class);
private final AtomicReference<List<Integer>> temperatures = new AtomicReference<>(new ArrayList<>());
private final ConnectableFlux<AverageTemperature> averageTemperatures;
private final DirectProcessor<Boolean> onDispose = DirectProcessor.create();
private final Sinks.One<Boolean> onDispose = Sinks.one();
private final AtomicBoolean isDisposed = new AtomicBoolean();

private volatile Instant lastReported = Instant.EPOCH;
Expand All @@ -312,7 +311,7 @@ class MachineInformation implements AutoCloseable {
MachineInformation(String identifier, Duration reportingInterval) {
this.identifier = identifier;
this.averageTemperatures = Flux.interval(reportingInterval)
.takeUntilOther(onDispose)
.takeUntilOther(onDispose.asMono())
.map(unused -> {
final Instant timeCalculated = Instant.now();
final List<Integer> temperaturesInInterval = temperatures.getAndSet(new ArrayList<>());
Expand Down Expand Up @@ -381,9 +380,7 @@ public void close() {
return;
}

final FluxSink<Boolean> sink = onDispose.sink();
sink.next(true);
sink.complete();
onDispose.emitValue(true, Sinks.EmitFailureHandler.FAIL_FAST);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -51,14 +50,14 @@ public static void main(String[] args) {
Duration.ofSeconds(1), batchOptions);

// This represents a stream of events that we want to publish.
final DirectProcessor<EventData> events = DirectProcessor.create();
final Sinks.Many<EventData> events = Sinks.many().multicast().onBackpressureBuffer();

System.out.println("Publishing events...");
publisher.publish(events).subscribe(unused -> System.out.println("Completed."),
publisher.publish(events.asFlux()).subscribe(unused -> System.out.println("Completed."),
error -> System.err.println("Error sending events: " + error),
() -> System.out.println("Completed sending events."));

emitEvents(events.sink());
emitEvents(events);

// The .subscribe() creation and assignment is not a blocking call. For the purpose of this example, we sleep
// the thread so the program does not end before the send operation is complete.
Expand All @@ -75,9 +74,9 @@ public static void main(String[] args) {
* Helper function that emits 50 events. The interval between each event is randomly selected between 0 - 250ms and
* is a random substring of the lorem ipsum text.
*
* @param sink Sink for generated events.
* @param events Sink for generated events.
*/
private static void emitEvents(FluxSink<EventData> sink) {
private static void emitEvents(Sinks.Many<EventData> events) {
final String contents = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do "
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis "
+ "nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure "
Expand All @@ -99,10 +98,10 @@ private static void emitEvents(FluxSink<EventData> sink) {
final EventData event = new EventData(contents.substring(0, endIndex));
event.getProperties().put(EVENT_NUMBER, String.valueOf(i));

sink.next(event);
events.emitNext(event, Sinks.EmitFailureHandler.FAIL_FAST);
}

sink.complete();
events.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -82,11 +82,8 @@ class EventHubPartitionAsyncConsumerTest {

private final EventPosition originalPosition = EventPosition.latest();
private final AtomicReference<Supplier<EventPosition>> currentPosition = new AtomicReference<>(() -> originalPosition);
private final DirectProcessor<AmqpEndpointState> endpointProcessor = DirectProcessor.create();
private final FluxSink<AmqpEndpointState> endpointProcessorSink = endpointProcessor.sink();

private final DirectProcessor<Message> messageProcessor = DirectProcessor.create();
private final FluxSink<Message> messageProcessorSink = messageProcessor.sink();
final Sinks.Many<AmqpEndpointState> endpointStatesSink = Sinks.many().multicast().onBackpressureBuffer();
final Sinks.Many<Message> messagesSink = Sinks.many().multicast().onBackpressureBuffer();

private MessageFluxWrapper linkProcessor;
private EventHubPartitionAsyncConsumer consumer;
Expand All @@ -97,8 +94,8 @@ void setup() {

when(retryPolicy.getRetryOptions()).thenReturn(new AmqpRetryOptions());

when(link1.getEndpointStates()).thenReturn(endpointProcessor);
when(link1.receive()).thenReturn(messageProcessor);
when(link1.getEndpointStates()).thenReturn(endpointStatesSink.asFlux());
when(link1.receive()).thenReturn(messagesSink.asFlux());
when(link1.addCredits(anyInt())).thenReturn(Mono.empty());

when(link2.addCredits(anyInt())).thenReturn(Mono.empty());
Expand Down Expand Up @@ -139,9 +136,9 @@ void receivesMessages(boolean trackLastEnqueuedProperties) {
// Act & Assert
StepVerifier.create(consumer.receive())
.then(() -> {
endpointProcessorSink.next(AmqpEndpointState.ACTIVE);
messageProcessorSink.next(message1);
messageProcessorSink.next(message2);
endpointStatesSink.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);
messagesSink.emitNext(message1, Sinks.EmitFailureHandler.FAIL_FAST);
messagesSink.emitNext(message2, Sinks.EmitFailureHandler.FAIL_FAST);
})
.assertNext(partitionEvent -> {
verifyPartitionContext(partitionEvent.getPartitionContext());
Expand Down Expand Up @@ -195,8 +192,8 @@ void receiveMultipleTimes() {
// Act & Assert
StepVerifier.create(consumer.receive())
.then(() -> {
messageProcessorSink.next(message1);
messageProcessorSink.next(message2);
messagesSink.emitNext(message1, Sinks.EmitFailureHandler.FAIL_FAST);
messagesSink.emitNext(message2, Sinks.EmitFailureHandler.FAIL_FAST);
})
.assertNext(partitionEvent -> {
verifyPartitionContext(partitionEvent.getPartitionContext());
Expand Down Expand Up @@ -269,9 +266,9 @@ void listensToShutdownSignals() throws InterruptedException {
});

// Act
messageProcessorSink.next(message1);
messageProcessorSink.next(message2);
messageProcessorSink.next(message3);
messagesSink.emitNext(message1, Sinks.EmitFailureHandler.FAIL_FAST);
messagesSink.emitNext(message2, Sinks.EmitFailureHandler.FAIL_FAST);
messagesSink.emitNext(message3, Sinks.EmitFailureHandler.FAIL_FAST);

linkProcessor.cancel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -146,8 +145,7 @@ class EventHubProducerAsyncClientTest {
.setDelay(Duration.ofMillis(500))
.setMode(AmqpRetryMode.FIXED)
.setTryTimeout(Duration.ofSeconds(10));
private final DirectProcessor<AmqpEndpointState> endpointProcessor = DirectProcessor.create();
private final FluxSink<AmqpEndpointState> endpointSink = endpointProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
private final Sinks.Many<AmqpEndpointState> endpointStates = Sinks.many().multicast().onBackpressureBuffer();
private EventHubProducerAsyncClient producer;
private ConnectionCacheWrapper connectionProcessor;
private ConnectionOptions connectionOptions;
Expand All @@ -163,8 +161,8 @@ void setup(TestInfo testInfo) {
CLIENT_OPTIONS, SslDomain.VerifyMode.VERIFY_PEER_NAME,
"client-product", "client-version");

when(connection.getEndpointStates()).thenReturn(endpointProcessor);
endpointSink.next(AmqpEndpointState.ACTIVE);
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);

when(connection.closeAsync()).thenReturn(Mono.empty());

Expand Down Expand Up @@ -1319,8 +1317,8 @@ void closesDedicatedConnectionOnlyOnce() {
@Test
void reopensOnFailure() {
// Arrange
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
endpointSink.next(AmqpEndpointState.ACTIVE);
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);

EventHubReactorAmqpConnection[] connections = new EventHubReactorAmqpConnection[]{
connection, connection2, connection3
Expand All @@ -1342,14 +1340,14 @@ void reopensOnFailure() {
.thenReturn(Mono.just(sendLink));
when(sendLink.send(anyList())).thenReturn(Mono.empty());

final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
when(connection2.getEndpointStates()).thenReturn(connectionState2);
final Sinks.Many<AmqpEndpointState> connectionState2 = Sinks.many().multicast().onBackpressureBuffer();
when(connection2.getEndpointStates()).thenReturn(connectionState2.asFlux());
when(connection2.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
.thenReturn(Mono.just(sendLink2));
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());

final DirectProcessor<AmqpEndpointState> connectionState3 = DirectProcessor.create();
when(connection3.getEndpointStates()).thenReturn(connectionState3);
final Sinks.Many<AmqpEndpointState> connectionState3 = Sinks.many().multicast().onBackpressureBuffer();
when(connection3.getEndpointStates()).thenReturn(connectionState3.asFlux());
when(connection3.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
.thenReturn(Mono.just(sendLink3));
when(sendLink3.send(anyList())).thenReturn(Mono.empty());
Expand All @@ -1360,8 +1358,8 @@ void reopensOnFailure() {
.verify(DEFAULT_TIMEOUT);

// Send in an error signal like a server busy condition.
endpointSink.error(new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
new AmqpErrorContext("test-namespace")));
endpointStates.emitError(new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
new AmqpErrorContext("test-namespace")), Sinks.EmitFailureHandler.FAIL_FAST);

StepVerifier.create(producer.send(testData2))
.expectComplete()
Expand All @@ -1385,8 +1383,8 @@ void reopensOnFailure() {
@Test
void closesOnNonTransientFailure() {
// Arrange
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
endpointSink.next(AmqpEndpointState.ACTIVE);
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);

EventHubReactorAmqpConnection[] connections = new EventHubReactorAmqpConnection[]{
connection, connection2, connection3
Expand All @@ -1408,8 +1406,8 @@ void closesOnNonTransientFailure() {
.thenReturn(Mono.just(sendLink));
when(sendLink.send(anyList())).thenReturn(Mono.empty());

final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
when(connection2.getEndpointStates()).thenReturn(connectionState2);
final Sinks.Many<AmqpEndpointState> connectionState2 = Sinks.many().multicast().onBackpressureBuffer();
when(connection2.getEndpointStates()).thenReturn(connectionState2.asFlux());
when(connection2.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
.thenReturn(Mono.just(sendLink2));
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());
Expand All @@ -1423,7 +1421,7 @@ void closesOnNonTransientFailure() {
.verify(DEFAULT_TIMEOUT);

// Send in an error signal like authorization failure.
endpointSink.error(nonTransientError);
endpointStates.emitError(nonTransientError, Sinks.EmitFailureHandler.FAIL_FAST);

StepVerifier.create(producer.send(testData2))
.expectErrorSatisfies(error -> {
Expand Down Expand Up @@ -1453,8 +1451,8 @@ void closesOnNonTransientFailure() {
@Test
void resendMessageOnTransientLinkFailure() {
// Arrange
when(connection.getEndpointStates()).thenReturn(endpointProcessor);
endpointSink.next(AmqpEndpointState.ACTIVE);
when(connection.getEndpointStates()).thenReturn(endpointStates.asFlux());
endpointStates.emitNext(AmqpEndpointState.ACTIVE, Sinks.EmitFailureHandler.FAIL_FAST);

EventHubReactorAmqpConnection[] connections = new EventHubReactorAmqpConnection[]{connection, connection2};
connectionProcessor = createConnectionProcessor(connections, connectionOptions.getRetry(), false);
Expand Down Expand Up @@ -1484,12 +1482,12 @@ void resendMessageOnTransientLinkFailure() {
final Throwable error = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "Test-message",
new AmqpErrorContext("test-namespace"));

endpointSink.error(error);
endpointStates.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST);
return Mono.error(error);
});

final DirectProcessor<AmqpEndpointState> connectionState2 = DirectProcessor.create();
when(connection2.getEndpointStates()).thenReturn(connectionState2);
final Sinks.Many<AmqpEndpointState> connectionState2 = Sinks.many().multicast().onBackpressureBuffer();
when(connection2.getEndpointStates()).thenReturn(connectionState2.asFlux());
when(connection2.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER)))
.thenReturn(Mono.just(sendLink2));
when(sendLink2.send(any(Message.class))).thenReturn(Mono.empty());
Expand Down
Loading

0 comments on commit f777bba

Please sign in to comment.