Skip to content

fix(graphql-transport-ws): ensure result message is processed before … #466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions links/gql_websocket_link/lib/src/graphql_transport_ws.dart
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ class _ConnectionState {
// TODO: WebSocketChannel should have a `state` getter and `onStateChange` stream
bool isOpen = false;

Map<String, Completer<void>> nextOrErrorMsgWaitMap = {};

/// Checks the `connect` problem and evaluates if the client should retry.
bool shouldRetryConnectOrThrow(Object errOrCloseEvent) {
options.log?.call("shouldRetryConnectOrThrow $errOrCloseEvent");
Expand Down Expand Up @@ -813,6 +815,29 @@ class _ConnectionState {
}
// parseMessage(msg!, reviver: options.jsonMessageReviver);
if (!isOpen) return;

// wait for next or error message (result) to be processed before process complete message
if (message is CompleteMessage &&
nextOrErrorMsgWaitMap.containsKey(message.id)) {
final completer = nextOrErrorMsgWaitMap[message.id];

if (completer != null) {
if (completer.isCompleted) {
nextOrErrorMsgWaitMap.remove(message.id);
} else {
final timer = Timer(const Duration(seconds: 60), () {
// Timeout => let's return an error
if (!completer.isCompleted) {
completer.complete();
}
});

await completer.future;
timer.cancel();
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this potential endless loop be replaces using a completed? should there be a timeout to avoid hanging forever?


emitter.emit(TransportWsEvent.message(message));
if (message is PingMessage || message is PongMessage) {
final msgPayload = message is PingMessage
Expand Down Expand Up @@ -911,16 +936,16 @@ class _ConnectionState {
// if (socket.readyState == WebSocketImpl.CLOSING) await throwOnClose;

final _releaseComp = Completer<void>();
final void Function() release = _releaseComp.complete;
final released = _releaseComp.future;

return _Connection(
socket: socket,
release: release,
release: _releaseComp,
waitForReleaseOrThrowOnClose: Future.any([
// wait for
released.then((_) {
if (locks == 0) {
// if released, no other operations, and not keep alive, wait for the socket to close
if (locks == 0 && options.keepAlive == Duration.zero) {
// and if no more locks are present, complete the connection
final complete = () {
isOpen = false;
Expand Down Expand Up @@ -988,20 +1013,37 @@ class _Client extends TransportWsClient {
final socket = _c.socket;
final release = _c.release;
final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose;

// print("isolate debug name: ${Isolate.current.debugName}");
// print(payload.operation.toString());
// print(payload.variables.toString());
// print(payload.context.toString());
// print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}");
// if done while waiting for connect, release the connection lock right away
final _subscribeMsg = await options.graphQLSocketMessageEncoder(
SubscribeMessage(id, options.serializer.serializeRequest(payload)),
);
if (done) return release();
// print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}");
if (done) {
if (!release.isCompleted) release.complete();
}

final unlisten = emitter.onMessage(id, (message) {
if (message is NextMessage) {
sink.add(message.payload);
final completer = state.nextOrErrorMsgWaitMap[id];
if (completer != null && !completer.isCompleted) {
completer.complete();
}
state.nextOrErrorMsgWaitMap.remove(id);
} else if (message is ErrorMessage) {
errored = true;
done = true;
sink.addError(message.payload);
final completer = state.nextOrErrorMsgWaitMap[id];
if (completer != null && !completer.isCompleted) {
completer.complete();
}
state.nextOrErrorMsgWaitMap.remove(id);
releaser();
} else if (message is CompleteMessage) {
done = true;
Expand All @@ -1011,6 +1053,8 @@ class _Client extends TransportWsClient {

socket.sink.add(_subscribeMsg);

state.nextOrErrorMsgWaitMap[id] = Completer();

releaser = () async {
final _completeMsg =
await options.graphQLSocketMessageEncoder(CompleteMessage(id));
Expand All @@ -1020,7 +1064,7 @@ class _Client extends TransportWsClient {
}
state.locks--;
done = true;
release();
if (!release.isCompleted) release.complete();
};

// either the releaser will be called, connection completed and
Expand Down Expand Up @@ -1077,7 +1121,7 @@ class _Client extends TransportWsClient {

class _Connection {
final WebSocketChannel socket;
final void Function() release;
final Completer<void> release;
final Future<void> waitForReleaseOrThrowOnClose;

_Connection({
Expand Down
Loading
Loading