Skip to content

Commit

Permalink
fix reactor#1968 Make FluxPublish propagate 1st subscriber context
Browse files Browse the repository at this point in the history
FluxPublish operator should propagate the first subscriber context,
like FluxReplay does.

Relates-to: reactor#1114
  • Loading branch information
andreisilviudragnea authored and simonbasle committed Nov 25, 2019
1 parent 80d78bc commit 38b8669
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,7 @@
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* A connectable publisher which shares an underlying source and dispatches source values
Expand Down Expand Up @@ -536,6 +537,11 @@ public Stream<? extends Scannable> inners() {
return Stream.of(subscribers);
}

@Override
public Context currentContext() {
return Operators.multiSubscribersContext(subscribers);
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
Expand Down Expand Up @@ -643,4 +649,4 @@ public Object scanUnsafe(Attr key) {
return super.scanUnsafe(key);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
* Copyright (c) 2011-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

import org.junit.Assert;
Expand All @@ -34,6 +35,7 @@
import reactor.test.publisher.TestPublisher;
import reactor.test.subscriber.AssertSubscriber;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -720,4 +722,30 @@ public void removeUnknownInnerIgnored() {
subscriber.remove(inner);
assertThat(subscriber.subscribers).as("post remove inner").isEmpty();
}

@Test
public void subscriberContextPropagation() {
String key = "key";
int expectedValue = 1;

AtomicReference<Context> reference = new AtomicReference<>();

Flux<Integer> integerFlux =
Flux.just(1, 2, 3)
.flatMap(value ->
Mono.subscriberContext()
.doOnNext(reference::set)
.thenReturn(value)
)
.publish()
.autoConnect(2);

integerFlux.subscriberContext(Context.of(key, expectedValue))
.subscribe();

integerFlux.subscriberContext(Context.of(key, 2))
.subscribe();

assertThat((int) reference.get().get(key)).isEqualTo(expectedValue);
}
}

0 comments on commit 38b8669

Please sign in to comment.