Skip to content

Commit

Permalink
fix: more dIBC inbound work
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed May 2, 2020
1 parent 3988235 commit 6653937
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 30 deletions.
3 changes: 3 additions & 0 deletions packages/SwingSet/src/vats/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
throw TypeError(`listenHandler is not defined`);
}
if (listening.has(localAddr)) {
// Last one wins.
const [lport, lhandler] = listening.get(localAddr);
if (lhandler === listenHandler) {
return;
Expand All @@ -399,6 +400,8 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
listening.init(localAddr, [port, listenHandler]);
}

// TODO: Check that the listener defines onAccept.

await E(protocolHandler).onListen(
port,
localAddr,
Expand Down
65 changes: 35 additions & 30 deletions packages/cosmic-swingset/lib/ag-solo/vats/ibc.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ export function makeIBCProtocolHandler(
{ timerService },
) {
/**
* @type {Store<string, [ConnectionHandler, Promise<Connection>]>}
* @type {Store<string, Promise<Connection>>}
*/
const channelKeyToHandler = makeStore('CHANNEL:PORT');
const channelKeyToConnP = makeStore('CHANNEL:PORT');

/**
* @typedef {Object} Counterparty
Expand Down Expand Up @@ -152,9 +152,9 @@ export function makeIBCProtocolHandler(
}

/**
* @type {Store<string, PromiseRecord<Bytes, any>>}
* @type {Store<string, Store<number, PromiseRecord<Bytes, any>>>}
*/
const seqChannelKeyToAck = makeStore('SEQ:CHANNEL:PORT');
const channelKeyToSeqAck = makeStore('CHANNEL:PORT');

/**
* @param {string} channelID
Expand All @@ -171,6 +171,9 @@ export function makeIBCProtocolHandler(
rPortID,
ordered,
) {
const channelKey = `${channelID}:${portID}`;
const seqToAck = makeStore('SEQUENCE');
channelKeyToSeqAck.init(channelKey, seqToAck);
/**
* @param {Connection} _conn
* @param {Bytes} packetBytes
Expand All @@ -194,8 +197,7 @@ export function makeIBCProtocolHandler(
* @type {PromiseRecord<Bytes, any>}
*/
const ackDeferred = producePromise();
const sck = `${sequence}:${channelID}:${portID}`;
seqChannelKeyToAck.init(sck, ackDeferred);
seqToAck.init(sequence, ackDeferred);
return ackDeferred.promise;
};

Expand Down Expand Up @@ -224,7 +226,6 @@ export function makeIBCProtocolHandler(
}
const boundSender = sender;
sender = data => {
console.error(`Would send data`, data);
return boundSender(data);
};
},
Expand All @@ -235,6 +236,12 @@ export function makeIBCProtocolHandler(
source_channel: channelID,
};
await callIBCDevice('channelCloseInit', { packet });
const rejectReason = Error('Connection closed');
for (const ackDeferred of seqToAck.values()) {
ackDeferred.reject(rejectReason);
}
channelKeyToSeqAck.delete(channelKey);

// TODO: Let's look carefully at this
// There's a danger of the two sides disagreeing about whether
// the channel is closed or not, and reusing channelIDs could
Expand Down Expand Up @@ -621,16 +628,11 @@ paths:

// Actually connect.
// eslint-disable-next-line prettier/prettier
const connP = /** @type {Promise<Connection>} */
(E(protocolImpl).inbound(listenSearch, localAddr, remoteAddr, rchandler))
.then(conn => {
console.info(`FIGME: got connection`, conn);
return conn;
});
const pr = E(protocolImpl).inbound(listenSearch, localAddr, remoteAddr, rchandler);
const connP = /** @type {Promise<Connection>} */ (pr);

/* Stash it for later use. */
console.info(`FIGME: Stashing ${channelKey}`, rchandler);
channelKeyToHandler.init(channelKey, [rchandler, connP]);
channelKeyToConnP.init(channelKey, connP);
break;
}

Expand All @@ -643,11 +645,11 @@ paths:
} = packet;
const channelKey = `${channelID}:${portID}`;

const [chandler, connP] = channelKeyToHandler.get(channelKey);
const connP = channelKeyToConnP.get(channelKey);
const data = base64ToBytes(data64);

connP
.then(conn => E(chandler).onReceive(conn, data, chandler))
E(connP)
.send(data)
.then(ack => {
const ack64 = dataToBase64(/** @type {Bytes} */ (ack));
return callIBCDevice('packetExecuted', { packet, ack: ack64 });
Expand All @@ -663,10 +665,11 @@ paths:
source_channel: channelID,
source_port: portID,
} = packet;
const sck = `${sequence}:${channelID}:${portID}`;
const ackDeferred = seqChannelKeyToAck.get(sck);
const channelKey = `${channelID}:${portID}`;
const seqToAck = channelKeyToSeqAck.get(channelKey);
const ackDeferred = seqToAck.get(sequence);
ackDeferred.resolve(base64ToBytes(acknowledgement));
seqChannelKeyToAck.delete(sck);
seqToAck.delete(sequence);
break;
}

Expand All @@ -677,21 +680,22 @@ paths:
source_channel: channelID,
source_port: portID,
} = packet;
const sck = `${sequence}:${channelID}:${portID}`;
const ackDeferred = seqChannelKeyToAck.get(sck);
const channelKey = `${channelID}:${portID}`;
const seqToAck = channelKeyToSeqAck.get(channelKey);
const ackDeferred = seqToAck.get(sequence);
ackDeferred.reject(Error(`Packet timed out`));
seqChannelKeyToAck.delete(sck);
seqToAck.delete(sequence);
break;
}

case 'channelCloseInit':
case 'channelCloseConfirm': {
const { portID, channelID } = obj;
const channelKey = `${channelID}:${portID}`;
if (channelKeyToHandler.has(channelKey)) {
const [chandler, connP] = channelKeyToHandler.get(channelKey);
channelKeyToHandler.delete(channelKey);
connP.then(conn => E(chandler).onClose(conn, undefined, chandler));
if (channelKeyToConnP.has(channelKey)) {
const connP = channelKeyToConnP.get(channelKey);
channelKeyToConnP.delete(channelKey);
E(connP).close();
}
break;
}
Expand All @@ -718,8 +722,9 @@ paths:
* @type {PromiseRecord<Bytes, any>}
*/
const ackDeferred = producePromise();
const sck = `${sequence}:${channelID}:${portID}`;
seqChannelKeyToAck.init(sck, ackDeferred);
const channelKey = `${channelID}:${portID}`;
const seqToAck = channelKeyToSeqAck.get(channelKey);
seqToAck.init(sequence, ackDeferred);
ackDeferred.promise.then(
ack => console.info('Manual packet', fullPacket, 'acked:', ack),
e => console.warn('Manual packet', fullPacket, 'timed out:', e),
Expand Down

0 comments on commit 6653937

Please sign in to comment.