Skip to content

Commit

Permalink
fix: reject all sends when the connection is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed May 2, 2020
1 parent 9f65899 commit 61b0975
Showing 1 changed file with 29 additions and 3 deletions.
32 changes: 29 additions & 3 deletions packages/SwingSet/src/vats/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import makeStore from '@agoric/store';
import rawHarden from '@agoric/harden';
import { E as defaultE } from '@agoric/eventual-send';
import { producePromise } from '@agoric/produce-promise';
import { toBytes } from './bytes';

const harden = /** @type {<T>(x: T) => T} */ (rawHarden);
Expand All @@ -13,6 +14,11 @@ export const ENDPOINT_SEPARATOR = '/';
* @typedef {import('@agoric/store').Store<T,U>} Store
*/

/**
* @template T,U
* @typedef {import('@agoric/produce-promise').PromiseRecord<T, U>} PromiseRecord
*/

/**
* @typedef {import('./bytes').Bytes} Bytes
* @typedef {import('./bytes').Data} Data
Expand Down Expand Up @@ -108,6 +114,10 @@ export const makeConnection = (
E = defaultE,
) => {
let closed;
/**
* @type {Set<PromiseRecord<Bytes,any>>}
*/
const pendingAcks = new Set();
/**
* @type {Connection}
*/
Expand All @@ -124,6 +134,10 @@ export const makeConnection = (
}
current.delete(connection);
closed = Error('Connection closed');
for (const ackDeferred of [...pendingAcks.values()]) {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(closed);
}
await E(handler)
.onClose(connection, undefined, handler)
.catch(rethrowUnlessMissing);
Expand All @@ -134,10 +148,22 @@ export const makeConnection = (
throw closed;
}
const bytes = toBytes(data);
const ack = await E(handler)
const ackDeferred = producePromise();
pendingAcks.add(ackDeferred);
E(handler)
.onReceive(connection, bytes, handler)
.catch(err => rethrowUnlessMissing(err) || '');
return toBytes(ack);
.catch(err => rethrowUnlessMissing(err) || '')
.then(
ack => {
pendingAcks.delete(ackDeferred);
ackDeferred.resolve(toBytes(ack));
},
err => {
pendingAcks.delete(ackDeferred);
ackDeferred.reject(err);
},
);
return ackDeferred.promise;
},
});

Expand Down

0 comments on commit 61b0975

Please sign in to comment.