Skip to content

Adds the option to use a pre configured RSocketRequester #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package org.springframework.graphql.client;

import java.net.URI;
import java.util.List;
import java.util.function.Consumer;

import io.rsocket.loadbalance.LoadbalanceStrategy;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import org.springframework.lang.Nullable;
Expand All @@ -45,6 +49,10 @@ final class DefaultRSocketGraphQlClientBuilder

private final RSocketRequester.Builder requesterBuilder;

private Publisher<List<LoadbalanceTarget>> targetPublisher;

private LoadbalanceStrategy loadbalanceStrategy;

@Nullable
private ClientTransport clientTransport;

Expand Down Expand Up @@ -111,6 +119,13 @@ public DefaultRSocketGraphQlClientBuilder rsocketRequester(Consumer<RSocketReque
return this;
}

@Override
public DefaultRSocketGraphQlClientBuilder transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) {
this.targetPublisher = targetPublisher;
this.loadbalanceStrategy = loadbalanceStrategy;
return this;
}

@Override
public RSocketGraphQlClient build() {

Expand All @@ -120,13 +135,20 @@ public RSocketGraphQlClient build() {
builder.encoders(encoders -> setJsonEncoder(CodecDelegate.findJsonEncoder(encoders)));
});

Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured");
RSocketRequester requester = this.requesterBuilder.transport(this.clientTransport);
RSocketRequester requester;

if (this.targetPublisher != null && this.loadbalanceStrategy != null) {
requester = this.requesterBuilder.transports(this.targetPublisher, this.loadbalanceStrategy);
} else {
Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured");
requester = this.requesterBuilder.transport(this.clientTransport);
}
RSocketGraphQlTransport graphQlTransport = new RSocketGraphQlTransport(this.route, requester, getJsonDecoder());

return new DefaultRSocketGraphQlClient(
super.buildGraphQlClient(graphQlTransport), requester,
this.requesterBuilder, this.clientTransport, this.route, getBuilderInitializer());
this.requesterBuilder, this.clientTransport, this.targetPublisher, this.loadbalanceStrategy,
this.route, getBuilderInitializer());
}


Expand All @@ -141,19 +163,26 @@ private static class DefaultRSocketGraphQlClient extends AbstractDelegatingGraph

private final ClientTransport clientTransport;

private final Publisher<List<LoadbalanceTarget>> targetPublisher;

private final LoadbalanceStrategy loadbalanceStrategy;

private final String route;

private final Consumer<AbstractGraphQlClientBuilder<?>> builderInitializer;

DefaultRSocketGraphQlClient(
GraphQlClient graphQlClient, RSocketRequester requester, RSocketRequester.Builder requesterBuilder,
ClientTransport clientTransport, String route, Consumer<AbstractGraphQlClientBuilder<?>> builderInitializer) {
ClientTransport clientTransport, Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy,
String route, Consumer<AbstractGraphQlClientBuilder<?>> builderInitializer) {

super(graphQlClient);

this.requester = requester;
this.requesterBuilder = requesterBuilder;
this.clientTransport = clientTransport;
this.targetPublisher = targetPublisher;
this.loadbalanceStrategy = loadbalanceStrategy;
this.route = route;
this.builderInitializer = builderInitializer;
}
Expand All @@ -174,6 +203,7 @@ public Mono<Void> stop() {
public RSocketGraphQlClient.Builder<?> mutate() {
DefaultRSocketGraphQlClientBuilder builder = new DefaultRSocketGraphQlClientBuilder(this.requesterBuilder);
builder.clientTransport(this.clientTransport);
builder.transports(this.targetPublisher, this.loadbalanceStrategy);
builder.route(this.route);
this.builderInitializer.accept(builder);
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
package org.springframework.graphql.client;

import java.net.URI;
import java.util.List;
import java.util.function.Consumer;

import io.rsocket.core.RSocketClient;
import io.rsocket.loadbalance.LoadbalanceStrategy;
import io.rsocket.loadbalance.LoadbalanceTarget;
import io.rsocket.transport.ClientTransport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
Expand Down Expand Up @@ -129,6 +133,20 @@ interface Builder<B extends Builder<B>> extends GraphQlClient.Builder<B> {
*/
B rsocketRequester(Consumer<RSocketRequester.Builder> requester);

/**
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient} that will
* connect to one of the given targets selected through the given
* {@link io.rsocket.loadbalance.LoadbalanceRSocketClient}.
* @param targetPublisher a {@code Publisher} that supplies a list of
* target transports to loadbalance against; the given list may be
* periodically updated by the {@code Publisher}.
* @param loadbalanceStrategy the strategy to use for selecting from
* the list of loadbalance targets.
* @return the same builder instance
*/
B transports(Publisher<List<LoadbalanceTarget>> targetPublisher, LoadbalanceStrategy loadbalanceStrategy);

/**
* Build the {@code RSocketGraphQlClient} instance.
*/
Expand Down