Description
We are creating a new API and would like to push Order events to our partners via graphql subscriptions.
I am new to reactive streams but have come up with a basic concept to retrieve order updates from our database and publish them to a flux with a generator and send them out a graphql subscription like so:
@SubscriptionMapping
public Flux<Order> orders(@Argument Long id) {
Flux<Order> flux = Flux.concat(Flux.generate(() -> id, (orderId, synchronousSink) -> {
List<Order> orders = getNewOrdersFromDB(orderId);
Flux<Order> orderFlux = Flux.fromIterable(orders);
synchronousSink.next(orderFlux);
if(orders.isEmpty())
return orderId;
else
return orders.get(orders.size()-1).getId();
}));
return flux;
}
Testing this with graphiql I see two problems when I close the window and close the websocket:
- This exception is thrown when a new Order is sent to the Sink after the connection is closed:
2022-11-08 15:18:47.195 ERROR 33912 --- [-67e73587779f-1] o.s.g.s.webmvc.GraphQlWebSocketHandler : Closing session due to exception for StandardWebSocketSession[id=e57e5bc6-445e-bbd7-d058-67e73587779f, uri=ws://localhost:8080/graphql]
java.io.IOException: java.io.IOException: Broken pipe
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:327) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:254) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:227) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215) ~[spring-websocket-5.3.23.jar:5.3.23]
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:106) ~[spring-websocket-5.3.23.jar:5.3.23]
at org.springframework.graphql.server.webmvc.GraphQlWebSocketHandler$SendMessageSubscriber.hookOnNext(GraphQlWebSocketHandler.java:562) ~[spring-graphql-1.0.2.jar:1.0.2]
at org.springframework.graphql.server.webmvc.GraphQlWebSocketHandler$SendMessageSubscriber.hookOnNext(GraphQlWebSocketHandler.java:540) ~[spring-graphql-1.0.2.jar:1.0.2]
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.4.24.jar:3.4.24]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.4.24.jar:3.4.24]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: java.io.IOException: Broken pipe
at java.base/sun.nio.ch.FileDispatcherImpl.writev0(Native Method) ~[na:na]
at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66) ~[na:na]
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:227) ~[na:na]
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:158) ~[na:na]
at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:563) ~[na:na]
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:147) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper$NioOperationState.run(NioEndpoint.java:1680) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
at org.apache.tomcat.util.net.SocketWrapperBase$OperationState.start(SocketWrapperBase.java:1070) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
at org.apache.tomcat.util.net.SocketWrapperBase.vectoredOperation(SocketWrapperBase.java:1489) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:1415) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
at org.apache.tomcat.util.net.SocketWrapperBase.write(SocketWrapperBase.java:1386) ~[tomcat-embed-core-9.0.68.jar:9.0.68]
at org.apache.tomcat.websocket.server.WsRemoteEndpointImplServer.doWrite(WsRemoteEndpointImplServer.java:93) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.writeMessagePart(WsRemoteEndpointImplBase.java:512) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock(WsRemoteEndpointImplBase.java:314) ~[tomcat-embed-websocket-9.0.68.jar:9.0.68]
... 17 common frames omitted
- Secondly if no orders are retrieved from the DB and published to the flux after graphiql is closed, no closed or complete signal seems to be sent to the generator and it just keeps running, until an order is sent to the Sink, in which case you get the above exceptions.
So my question: is there a way for the server to periodically ping the client through the websocket and shut down the Flux generator in a graceful manner if the connection is closed?
It seems that in the graphqlws spec provides a message type for 'ping' and 'pong' messages, see https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#ping, maybe the time interval of these could be configurable, or if no messages have been received in certain time period a ping could be sent.
Thanks, for any and all advice or info you can provide.
The sample project that can reproduce this behavior can be found here: https://github.com/erikdrewdunning/subscription-demo
Also here is a simple graphql subscription request that works against the server:
subscription {
orders(id: "1") {
id
}
}