Skip to content

Commit

Permalink
feat: outbound connection metadata negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed May 26, 2020
1 parent a7ecd9d commit 5dd2e63
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 129 deletions.
265 changes: 138 additions & 127 deletions packages/SwingSet/src/vats/network/network.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export const ENDPOINT_SEPARATOR = '/';
* @property {(port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>} onBind A port will be bound
* @property {(port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>} onListen A port was listening
* @property {(port: Port, localAddr: Endpoint, listenHandler: ListenHandler, p: ProtocolHandler) => Promise<void>} onListenRemove A port listener has been reset
* @property {(port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise<ConnectionHandler|undefined>} onConnect A port initiates an outbound connection
* @property {(port: Port, localAddr: Endpoint, remote: Endpoint, c: ConnectionHandler, p: ProtocolHandler) => Promise<[Endpoint, ConnectionHandler]>} onConnect A port initiates an outbound connection
* @property {(port: Port, localAddr: Endpoint, p: ProtocolHandler) => Promise<void>} onRevoke The port is being completely destroyed
*
* @typedef {Object} InboundAttempt An inbound connection attempt
Expand All @@ -95,6 +95,7 @@ export const ENDPOINT_SEPARATOR = '/';
* @property {() => Promise<void>} close Abort the attempt
*
* @typedef {Object} ProtocolImpl Things the protocol can do for us
* @property {(prefix: Endpoint) => Promise<Port>} bind Claim a port, or if ending in ENDPOINT_SEPARATOR, a fresh name
* @property {(listenAddr: Endpoint, remoteAddr: Endpoint) => Promise<InboundAttempt>} inbound Make an attempt to connect into this protocol
* @property {(port: Port, remoteAddr: Endpoint, connectionHandler: ConnectionHandler) => Promise<Connection>} outbound Create an outbound connection
*/
Expand Down Expand Up @@ -307,129 +308,11 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
*/
const listening = makeStore('localAddr');

/**
* @type {ProtocolImpl}
*/
const protocolImpl = harden({
async inbound(listenAddr, remoteAddr) {
let lastFailure = Error(`No listeners for ${listenAddr}`);
for (const listenPrefix of getPrefixes(listenAddr)) {
if (!listening.has(listenPrefix)) {
// eslint-disable-next-line no-continue
continue;
}
const [port, listener] = listening.get(listenPrefix);
let localAddr;
try {
// See if we have a listener that's willing to receive this connection.
// eslint-disable-next-line no-await-in-loop
const localSuffix = await E(listener)
.onInbound(port, listenPrefix, remoteAddr, listener)
.catch(rethrowUnlessMissing);
localAddr = localSuffix
? `${listenPrefix}/${localSuffix}`
: listenAddr;
} catch (e) {
lastFailure = e;
// eslint-disable-next-line no-continue
continue;
}
// We have a legitimate inbound attempt.
let consummated;
const current = currentConnections.get(port);
const inboundAttempt = harden({
getLocalAddress() {
// Return address metadata.
return localAddr;
},
getRemoteAddress() {
return remoteAddr;
},
async close() {
if (consummated) {
throw consummated;
}
consummated = Error(`Already closed`);
current.delete(inboundAttempt);
await E(listener)
.onReject(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing);
},
async accept(rchandler) {
if (consummated) {
throw consummated;
}
consummated = Error(`Already accepted`);
current.delete(inboundAttempt);

const lchandler =
/** @type {ConnectionHandler} */
// eslint-disable-next-line prettier/prettier
(await E(listener).onAccept(port, localAddr, remoteAddr, listener));

return crossoverConnection(
lchandler,
localAddr,
rchandler,
remoteAddr,
current,
E,
)[1];
},
});
current.add(inboundAttempt);
return inboundAttempt;
}
throw lastFailure;
},
async outbound(port, remoteAddr, lchandler) {
const localAddr =
/** @type {string} */
(await E(port).getLocalAddress());

let lastFailure;
try {
// Attempt the loopback connection.
const attempt = await protocolImpl.inbound(remoteAddr, localAddr);
return attempt.accept(lchandler);
} catch (e) {
lastFailure = e;
}

const rchandler =
/** @type {ConnectionHandler} */
(await E(protocolHandler).onConnect(
port,
localAddr,
remoteAddr,
lchandler,
protocolHandler,
));

if (!rchandler) {
throw lastFailure;
}

const current = currentConnections.get(port);
return crossoverConnection(
lchandler,
localAddr,
rchandler,
remoteAddr,
current,
E,
)[0];
},
});

/**
* @type {Store<string, Port>}
*/
const boundPorts = makeStore('localAddr');

// Wire up the local protocol to the handler.
E(protocolHandler).onCreate(protocolImpl, protocolHandler);

/**
* @param {Endpoint} localAddr
*/
Expand Down Expand Up @@ -529,6 +412,7 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
* @type {Endpoint}
*/
const dst = harden(remotePort);
// eslint-disable-next-line no-use-before-define
const conn = await protocolImpl.outbound(port, dst, connectionHandler);
if (revoked) {
E(conn).close();
Expand Down Expand Up @@ -568,6 +452,126 @@ export function makeNetworkProtocol(protocolHandler, E = defaultE) {
return port;
};

/**
* @type {ProtocolImpl}
*/
const protocolImpl = harden({
bind,
async inbound(listenAddr, remoteAddr) {
let lastFailure = Error(`No listeners for ${listenAddr}`);
for (const listenPrefix of getPrefixes(listenAddr)) {
if (!listening.has(listenPrefix)) {
// eslint-disable-next-line no-continue
continue;
}
const [port, listener] = listening.get(listenPrefix);
let localAddr;
try {
// See if we have a listener that's willing to receive this connection.
// eslint-disable-next-line no-await-in-loop
const localSuffix = await E(listener)
.onInbound(port, listenPrefix, remoteAddr, listener)
.catch(rethrowUnlessMissing);
localAddr = localSuffix
? `${listenPrefix}/${localSuffix}`
: listenAddr;
} catch (e) {
lastFailure = e;
// eslint-disable-next-line no-continue
continue;
}
// We have a legitimate inbound attempt.
let consummated;
const current = currentConnections.get(port);
const inboundAttempt = harden({
getLocalAddress() {
// Return address metadata.
return localAddr;
},
getRemoteAddress() {
return remoteAddr;
},
async close() {
if (consummated) {
throw consummated;
}
consummated = Error(`Already closed`);
current.delete(inboundAttempt);
await E(listener)
.onReject(port, localAddr, remoteAddr, listener)
.catch(rethrowUnlessMissing);
},
async accept(rchandler) {
if (consummated) {
throw consummated;
}
consummated = Error(`Already accepted`);
current.delete(inboundAttempt);

const lchandler =
/** @type {ConnectionHandler} */
// eslint-disable-next-line prettier/prettier
(await E(listener).onAccept(port, localAddr, remoteAddr, listener));

return crossoverConnection(
lchandler,
localAddr,
rchandler,
remoteAddr,
current,
E,
)[1];
},
});
current.add(inboundAttempt);
return inboundAttempt;
}
throw lastFailure;
},
async outbound(port, remoteAddr, lchandler) {
const localAddr =
/** @type {string} */
(await E(port).getLocalAddress());

let lastFailure;
try {
// Attempt the loopback connection.
const attempt = await protocolImpl.inbound(remoteAddr, localAddr);
return attempt.accept(lchandler);
} catch (e) {
lastFailure = e;
}

const [connectedAddress, rchandler] =
/** @type {[Endpoint, ConnectionHandler]} */
(await E(protocolHandler).onConnect(
port,
localAddr,
remoteAddr,
lchandler,
protocolHandler,
));

if (!rchandler) {
throw lastFailure;
}

const current = currentConnections.get(port);
return crossoverConnection(
lchandler,
localAddr,
rchandler,
connectedAddress,
current,
E,
)[0];
},
});

// Wire up the local protocol to the handler.
E(protocolHandler).onCreate(protocolImpl, protocolHandler);

// Return the user-facing protocol.
return harden({ bind });
}

Expand Down Expand Up @@ -629,14 +633,21 @@ export function makeLoopbackProtocolHandler(E = defaultE) {
}
const [lport, lhandler] = listeners.get(remoteAddr);
// console.log(`looking up onAccept in`, lhandler);
const rport = await E(lhandler).onAccept(
lport,
remoteAddr,
localAddr,
lhandler,
);
// console.log(`rport is`, rport);
return rport;
const remoteSuffix =
/** @type {Endpoint} */
(await E(lhandler)
.onInbound(lport, remoteAddr, localAddr, lhandler)
.catch(e => rethrowUnlessMissing(e)));

if (remoteSuffix) {
remoteAddr = `${remoteAddr}/${remoteSuffix}`;
}

const rchandler =
/** @type {ConnectionHandler} */
(await E(lhandler).onAccept(lport, remoteAddr, localAddr, lhandler));
// console.log(`rchandler is`, rchandler);
return [remoteAddr, rchandler];
},
async onListen(port, localAddr, listenHandler, _protocolHandler) {
// TODO: Implement other listener replacement policies.
Expand Down
6 changes: 4 additions & 2 deletions packages/SwingSet/test/test-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ const makeProtocolHandler = t => {
t.assert(localAddr, `local address is supplied to onConnect`);
t.assert(remoteAddr, `remote address is supplied to onConnect`);
if (lp) {
return l.onAccept(lp, localAddr, remoteAddr, l);
return l
.onAccept(lp, localAddr, remoteAddr, l)
.then(ch => [localAddr, ch]);
}
return makeEchoConnectionHandler();
return [remoteAddr, makeEchoConnectionHandler()];
},
async onListen(port, localAddr, listenHandler) {
t.assert(port, `port is tracked in onListen`);
Expand Down

0 comments on commit 5dd2e63

Please sign in to comment.