Skip to content

Commit

Permalink
feat: inbound network connection metadata negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed May 26, 2020
1 parent 6cca466 commit a7ecd9d
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 60 deletions.
1 change: 0 additions & 1 deletion packages/SwingSet/src/vats/network/bytes.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
export function toBytes(data) {
// TODO: We really need marshallable TypedArrays.
if (typeof data === 'string') {
// eslint-disable-next-line no-bitwise
data = data.split('').map(c => c.charCodeAt(0));
}

Expand Down
129 changes: 95 additions & 34 deletions packages/SwingSet/src/vats/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ export const ENDPOINT_SEPARATOR = '/';
* See multiaddr.js for an opinionated router implementation
*/

/**
* @typedef {Object} Closable A closable object
* @property {() => Promise<void>} close Terminate the object
*/

/**
* @typedef {Object} Protocol The network Protocol
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if ending in ENDPOINT_SEPARATOR, a fresh name
Expand All @@ -50,6 +55,7 @@ export const ENDPOINT_SEPARATOR = '/';
/**
* @typedef {Object} ListenHandler A handler for incoming connections
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onListen] The listener has been registered
* @property {(port: Port, listenAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<Endpoint>} [onInbound] Return metadata for inbound connection attempt
* @property {(port: Port, localAddr: Endpoint, remoteAddr: Endpoint, l: ListenHandler) => Promise<ConnectionHandler>} onAccept A new connection is incoming
* @property {(port: Port, rej: any, l: ListenHandler) => Promise<void>} [onError] There was an error while listening
* @property {(port: Port, l: ListenHandler) => Promise<void>} [onRemove] The listener has been removed
Expand All @@ -58,7 +64,7 @@ export const ENDPOINT_SEPARATOR = '/';
/**
* @typedef {Object} Connection
* @property {(packetBytes: Data) => Promise<Bytes>} send Send a packet on the connection
* @property {() => void} close Close both ends of the connection
* @property {() => Promise<void>} close Close both ends of the connection
* @property {() => Endpoint} getLocalAddress Get the locally bound name of this connection
* @property {() => Endpoint} getRemoteAddress Get the name of the counterparty
*/
Expand All @@ -82,9 +88,14 @@ export const ENDPOINT_SEPARATOR = '/';
* @property {(port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise<ConnectionHandler|undefined>} onConnect A port initiates an outbound connection
* @property {(port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>} onRevoke The port is being completely destroyed
*
* @typedef {Object} InboundAttempt An inbound connection attempt
* @property {(connectionHandler: ConnectionHandler) => Promise<Connection>} accept Establish the connection
* @property {() => Endpoint} getLocalAddress Return the local address for this attempt
* @property {() => Endpoint} getRemoteAddress Return the remote address for this attempt
* @property {() => Promise<void>} close Abort the attempt
*
* @typedef {Object} ProtocolImpl Things the protocol can do for us
* @property {(listenSearch: Endpoint[]) => Promise<boolean>} isListening Tell whether anything in listenSearch is listening
* @property {(listenSearch: Endpoint[], localAddr: Endpoint, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise<Connection>} inbound Establish a connection into this protocol
* @property {(listenAddr: Endpoint, remoteAddr: Endpoint) => Promise<InboundAttempt>} inbound Make an attempt to connect into this protocol
* @property {(port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise<Connection>} outbound Create an outbound connection
*/

Expand All @@ -97,7 +108,7 @@ export const rethrowUnlessMissing = err => {
) {
throw err;
}
return true;
return false;
};

/**
Expand All @@ -106,15 +117,15 @@ export const rethrowUnlessMissing = err => {
* @param {ConnectionHandler} handler
* @param {Endpoint} localAddr
* @param {Endpoint} remoteAddr
* @param {WeakSet<Connection>} [current=new WeakSet()]
* @param {Set<Closable>} [current=new Set()]
* @param {typeof defaultE} [E=defaultE] Eventual send function
* @returns {Connection}
*/
export const makeConnection = (
handler,
localAddr,
remoteAddr,
current = new WeakSet(),
current = new Set(),
E = defaultE,
) => {
let closed;
Expand Down Expand Up @@ -286,7 +297,7 @@ export function getPrefixes(addr) {
* @returns {Protocol} the local capability for connecting and listening
*/
export function makeNetworkProtocol(protocolHandler, E = defaultE) {
/** @type {Store<Port, Set<Connection>>} */
/** @type {Store<Port, Set<Closable>>} */
const currentConnections = makeStore('port');

/**
Expand All @@ -300,39 +311,89 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
* @type {ProtocolImpl}
*/
const protocolImpl = harden({
async isListening(listenSearch) {
const listener = listenSearch.find(addr => listening.has(addr));
return !!listener;
},
async inbound(listenSearch, localAddr, remoteAddr, rchandler) {
const listenAddr = listenSearch.find(addr => listening.has(addr));
if (!listenAddr) {
throw Error(`Connection refused to ${localAddr}`);
async inbound(listenAddr, remoteAddr) {
let lastFailure = Error(`No listeners for ${listenAddr}`);
for (const listenPrefix of getPrefixes(listenAddr)) {
if (!listening.has(listenPrefix)) {
// eslint-disable-next-line no-continue
continue;
}
const [port, listener] = listening.get(listenPrefix);
let localAddr;
try {
// See if we have a listener that's willing to receive this connection.
// eslint-disable-next-line no-await-in-loop
const localSuffix = await E(listener)
.onInbound(port, listenPrefix, remoteAddr, listener)
.catch(rethrowUnlessMissing);
localAddr = localSuffix
? `${listenPrefix}/${localSuffix}`
: listenAddr;
} catch (e) {
lastFailure = e;
// eslint-disable-next-line no-continue
continue;
}
// We have a legitimate inbound attempt.
let consummated;
const current = currentConnections.get(port);
const inboundAttempt = harden({
getLocalAddress() {
// Return address metadata.
return localAddr;
},
getRemoteAddress() {
return remoteAddr;
},
async close() {
if (consummated) {
throw consummated;
}
consummated = Error(`Already closed`);
current.delete(inboundAttempt);
await E(listener)
.onReject(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing);
},
async accept(rchandler) {
if (consummated) {
throw consummated;
}
consummated = Error(`Already accepted`);
current.delete(inboundAttempt);

const lchandler =
/** @type {ConnectionHandler} */
// eslint-disable-next-line prettier/prettier
(await E(listener).onAccept(port, localAddr, remoteAddr, listener));

return crossoverConnection(
lchandler,
localAddr,
rchandler,
remoteAddr,
current,
E,
)[1];
},
});
current.add(inboundAttempt);
return inboundAttempt;
}
const [port, listener] = listening.get(listenAddr);
const current = currentConnections.get(port);

const lchandler =
/** @type {ConnectionHandler} */
(await E(listener).onAccept(port, localAddr, remoteAddr, listener));

return crossoverConnection(
lchandler,
localAddr,
rchandler,
remoteAddr,
current,
E,
)[1];
throw lastFailure;
},
async outbound(port, remoteAddr, lchandler) {
const localAddr =
/** @type {string} */
(await E(port).getLocalAddress());

const ret = getPrefixes(remoteAddr);
if (await protocolImpl.isListening(ret)) {
return protocolImpl.inbound(ret, remoteAddr, localAddr, lchandler);
let lastFailure;
try {
// Attempt the loopback connection.
const attempt = await protocolImpl.inbound(remoteAddr, localAddr);
return attempt.accept(lchandler);
} catch (e) {
lastFailure = e;
}

const rchandler =
Expand All @@ -346,7 +407,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
));

if (!rchandler) {
throw Error(`Cannot connect to ${remoteAddr}`);
throw lastFailure;
}

const current = currentConnections.get(port);
Expand Down
117 changes: 98 additions & 19 deletions packages/cosmic-swingset/lib/ag-solo/vats/ibc.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
rethrowUnlessMissing,
dataToBase64,
base64ToBytes,
getPrefixes,
} from '@agoric/swingset-vat/src/vats/network';
import makeStore from '@agoric/store';
import { producePromise } from '@agoric/produce-promise';
Expand All @@ -26,6 +25,7 @@ const FIXME_ALLOW_NAIVE_RELAYS = true;
* @typedef {import('@agoric/swingset-vat/src/vats/network').ProtocolImpl} ProtocolImpl
* @typedef {import('@agoric/swingset-vat/src/vats/network').ConnectionHandler} ConnectionHandler
* @typedef {import('@agoric/swingset-vat/src/vats/network').Connection} Connection
* @typedef {import('@agoric/swingset-vat/src/vats/network').InboundAttempt} InboundAttempt
* @typedef {import('@agoric/swingset-vat/src/vats/network').Port} Port
* @typedef {import('@agoric/swingset-vat/src/vats/network').Endpoint} Endpoint
* @typedef {import('@agoric/swingset-vat/src/vats/network').Bytes} Bytes
Expand Down Expand Up @@ -113,6 +113,11 @@ export function makeIBCProtocolHandler(
*/
const channelKeyToInfo = makeStore('CHANNEL:PORT');

/**
* @type {Store<string, Promise<InboundAttempt>>}
*/
const channelKeyToAttemptP = makeStore('CHANNEL:PORT');

/**
* @type {Set<string>}
*/
Expand Down Expand Up @@ -396,7 +401,6 @@ export function makeIBCProtocolHandler(

// TODO: Will need to change to dispatch (without sending)
// a ChanOpenInit to get a passive relayer flowing.
// eslint-disable-next-line no-constant-condition
if (false) {
const packet = {
source_channel: channelID,
Expand Down Expand Up @@ -494,6 +498,7 @@ EOF
rPortID,
removeMatching = false,
) {
// /ibc-port/portID/ibc-channel/channelID(/ibc-hop/hop1(/ibc-hop/hop2))/ibc-port/rPortID/ibc-channel/rChannelID
// FIXME: Leaves garbage behind in the less specific outboundWaiters.
for (let i = 0; i <= hops.length; i += 1) {
// Try most specific to least specific outbound connections.
Expand Down Expand Up @@ -546,8 +551,9 @@ EOF
async fromBridge(srcID, obj) {
console.warn('IBC fromBridge', srcID, obj);
switch (obj.event) {
case 'channelOpenTry':
case 'channelOpenInit': {
// This event is sent by a naive relayer that wants to initiate
// a connection.
const {
channelID,
portID,
Expand All @@ -556,20 +562,96 @@ EOF
} = obj;

const channelKey = `${channelID}:${portID}`;
const waiter = getWaiter(
hops,

if (FIXME_ALLOW_NAIVE_RELAYS) {
// Continue the handshake if we are waiting for it.
const waiter = getWaiter(
hops,
channelID,
portID,
rChannelID,
rPortID,
false,
);
if (waiter) {
// We have more specific information for the outbound connection.
channelKeyToInfo.set(channelKey, obj);
break;
}
}

// We're not waiting for an init, so throw.
throw Error(`No waiting outbound connection for ${channelKey}`);
}

case 'attemptChannelOpenTry':
case 'channelOpenTry': {
// They're (more or less politely) asking if we are listening, so make an attempt.
const {
channelID,
portID,
rChannelID,
rPortID,
false,
counterparty: { port_id: rPortID, channel_id: rChannelID },
connectionHops: hops,
order,
version,
counterpartyVersion: rVersion,
} = obj;

const channelKey = `${channelID}:${portID}`;
if (channelKeyToAttemptP.has(channelKey)) {
// We have a pending attempt, so continue the handshake.
break;
}

const versionSuffix = version ? `/${version}` : '';
const localAddr = `/ibc-port/${portID}/${order.toLowerCase()}${versionSuffix}`;
const ibcHops = hops.map(hop => `/ibc-hop/${hop}`).join('/');
const remoteAddr = `${ibcHops}/ibc-port/${rPortID}/${order.toLowerCase()}/${rVersion}`;

// See if we allow an inbound attempt for this address pair (without rejecting).
const attemptP =
/** @type {Promise<InboundAttempt>} */
(E(protocolImpl).inbound(localAddr, remoteAddr));

// Tell what version string we negotiated.
const attemptedLocal =
/** @type {string} */
(await E(attemptP).getLocalAddress());
const match = attemptedLocal.match(
// Match: /ibc-port/PORT /ORDER/VERSION...
new RegExp('^/ibc-port/[^/]+/[^/]+/([^/]+)(/|$)'),
);
if (!waiter) {
await E(protocolImpl).isListening([`/ibc-port/${portID}`]);
channelKeyToInfo.init(channelKey, obj);
} else {
// We have more specific information.
channelKeyToInfo.set(channelKey, obj);
if (!match) {
throw Error(
`Cannot determine version from attempted local address ${attemptedLocal}`,
);
}

channelKeyToAttemptP.init(channelKey, attemptP);
channelKeyToInfo.init(channelKey, obj);

const negotiatedVersion = match[1];
if (obj.type === 'attemptChannelOpenTry') {
// We can try to open with the version we wanted.
const packet = {
source_channel: channelID,
source_port: portID,
destination_channel: rChannelID,
destination_port: rPortID,
};

await callIBCDevice('channelOpenTry', {
packet,
order,
hops,
version: negotiatedVersion,
counterpartyVersion: rVersion,
});
} else if (negotiatedVersion !== version) {
// Too late to change the version.
throw Error(
`Rejecting version ${version}; we negotiated ${negotiatedVersion}`,
);
}
break;
}
Expand Down Expand Up @@ -615,8 +697,6 @@ EOF
break;
}

// Check for a listener for this subprotocol.
const listenSearch = getPrefixes(localAddr);
const rchandler = makeIBCConnectionHandler(
channelID,
portID,
Expand All @@ -625,9 +705,8 @@ EOF
order === 'ORDERED',
);

// Actually connect.
// eslint-disable-next-line prettier/prettier
E(protocolImpl).inbound(listenSearch, localAddr, remoteAddr, rchandler);
// Attempt a connection and accept our handler.
E(E(protocolImpl).inbound(localAddr, remoteAddr)).accept(rchandler);
break;
}

Expand Down
Loading

0 comments on commit a7ecd9d

Please sign in to comment.