Skip to content

Commit

Permalink
fix(swingset): change dispatch from object to function
Browse files Browse the repository at this point in the history
Previously, the lowest level of a vat was defined by an object named
`dispatch`, which had three methods: `{ deliver, notify, dropExports }`. Now
we define `dispatch()` to be a *function*, which takes a
`VatDeliveryObject` (of which there are three flavors). This simplifies the
supervisor code and removes a lot of redundant boilerplate, and prepares for
later phase where `dispatch()` becomes `async dispatch()`.

refs #2671
  • Loading branch information
warner committed Mar 21, 2021
1 parent ad6d4a6 commit 9214adf
Show file tree
Hide file tree
Showing 18 changed files with 509 additions and 528 deletions.
29 changes: 27 additions & 2 deletions packages/SwingSet/src/kernel/liveSlots.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { assert, details as X } from '@agoric/assert';
import { isPromise } from '@agoric/promise-kit';
import { insistVatType, makeVatSlot, parseVatSlot } from '../parseVatSlots';
import { insistCapData } from '../capdata';
import { insistMessage } from '../message';
import { makeVirtualObjectManager } from './virtualObjectManager';

const DEFAULT_VIRTUAL_OBJECT_CACHE_SIZE = 3; // XXX ridiculously small value to force churn for testing
Expand Down Expand Up @@ -764,13 +765,37 @@ function build(
const rootObject = buildRootObject(harden(vpow), harden(vatParameters));
assert.equal(passStyleOf(rootObject), REMOTE_STYLE);

const rootSlot = makeVatSlot('object', true, 0n);
const rootSlot = makeVatSlot('object', true, BigInt(0));
valToSlot.set(rootObject, rootSlot);
slotToVal.set(rootSlot, new WeakRef(rootObject));
exported.add(rootObject);
}

const dispatch = harden({ deliver, notify, dropExports });
function dispatch(vatDeliveryObject) {
const [type, ...args] = vatDeliveryObject;
switch (type) {
case 'message': {
const [targetSlot, msg] = args;
insistMessage(msg);
deliver(targetSlot, msg.method, msg.args, msg.result);
break;
}
case 'notify': {
const [resolutions] = args;
notify(resolutions);
break;
}
case 'dropExports': {
const [vrefs] = args;
dropExports(vrefs);
break;
}
default:
assert.fail(X`unknown delivery type ${type}`);
}
}
harden(dispatch);

return harden({ vatGlobals, setBuildRootObject, dispatch, m });
}

Expand Down
85 changes: 31 additions & 54 deletions packages/SwingSet/src/kernel/vatManager/deliver.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { assert, details as X } from '@agoric/assert';
import { insistMessage } from '../../message';
/** @type { (delivery: VatDeliveryObject, prefix: string) => string } */
function summarizeDelivery(vatDeliveryObject, prefix = 'vat') {
const [type, ...args] = vatDeliveryObject;
if (type === 'message') {
const [targetSlot, msg] = args[0];
return `${prefix}[${targetSlot}].${msg.method} dispatch failed`;
}
return `${prefix}.${type} failed`;
}
harden(summarizeDelivery);
export { summarizeDelivery };

export function makeDeliver(tools, dispatch) {
const {
Expand Down Expand Up @@ -44,11 +53,25 @@ export function makeDeliver(tools, dispatch) {
// TODO: accumulate used.allocate and used.compute into vatStats
}

async function doProcess(dispatchRecord, errmsg) {
const dispatchOp = dispatchRecord[0];
const dispatchArgs = dispatchRecord.slice(1);
transcriptManager.startDispatch(dispatchRecord);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
// vatDeliveryObject is one of:
// ['message', target, msg]
// target is vref
// msg is: { method, args (capdata), result }
// ['notify', resolutions]
// resolutions is an array of triplets: [vpid, rejected, value]
// vpid is the id of the primary promise being resolved
// rejected is a boolean flag indicating if vpid is being fulfilled or rejected
// value is capdata describing the value the promise is being resolved to
// The first entry in the resolutions array is the primary promise being
// resolved, while the remainder (if any) are collateral promises it
// references whose resolution was newly discovered at the time the
// notification delivery was being generated
// ['dropExports', vrefs]

async function deliver(vatDeliveryObject) {
const errmsg = summarizeDelivery(vatDeliveryObject, `vat[${vatID}]`);
transcriptManager.startDispatch(vatDeliveryObject);
await runAndWait(() => dispatch(vatDeliveryObject), errmsg);
stopGlobalMeter();

let status = ['ok', null, null];
Expand All @@ -75,59 +98,13 @@ export function makeDeliver(tools, dispatch) {
return status;
}

async function deliverOneMessage(targetSlot, msg) {
insistMessage(msg);
const errmsg = `vat[${vatID}][${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

async function deliverOneNotification(resolutions) {
const errmsg = `vat[${vatID}].notify failed`;
return doProcess(['notify', resolutions], errmsg);
}

async function deliverOneDropExports(vrefs) {
const errmsg = `vat[${vatID}].dropExports failed`;
return doProcess(['dropExports', vrefs], errmsg);
}

// vatDeliverObject is:
// ['message', target, msg]
// target is vid
// msg is: { method, args (capdata), result }
// ['notify', resolutions]
// resolutions is an array of triplets: [vpid, rejected, value]
// vpid is the id of the primary promise being resolved
// rejected is a boolean flag indicating if vpid is being fulfilled or rejected
// value is capdata describing the value the promise is being resolved to
// The first entry in the resolutions array is the primary promise being
// resolved, while the remainder (if any) are collateral promises it
// references whose resolution was newly discovered at the time the
// notification delivery was being generated
async function deliver(vatDeliverObject) {
const [type, ...args] = vatDeliverObject;
switch (type) {
case 'message':
return deliverOneMessage(...args);
case 'notify':
return deliverOneNotification(...args);
case 'dropExports':
return deliverOneDropExports(...args);
default:
assert.fail(X`unknown delivery type ${type}`);
}
}

async function replayTranscript() {
transcriptManager.startReplay();
for (const t of vatKeeper.getTranscript()) {
transcriptManager.checkReplayError();
transcriptManager.startReplayDelivery(t.syscalls);
// eslint-disable-next-line no-await-in-loop
await doProcess(t.d, null);
await deliver(t.d);
}
transcriptManager.checkReplayError();
transcriptManager.finishReplay();
Expand Down
7 changes: 4 additions & 3 deletions packages/SwingSet/src/kernel/vatManager/manager-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ export function makeLocalVatManagerFactory(tools) {
const transcriptManager = makeTranscriptManager(vatKeeper, vatID);
const { syscall, setVatSyscallHandler } = createSyscall(transcriptManager);
function finish(dispatch, meterRecord) {
assert(
dispatch && dispatch.deliver,
`vat failed to return a 'dispatch' with .deliver: ${dispatch}`,
assert.equal(
typeof dispatch,
'function',
`vat failed to return a 'dispatch' function: ${dispatch}`,
);
const { deliver, replayTranscript } = makeDeliver(
{
Expand Down
46 changes: 8 additions & 38 deletions packages/SwingSet/src/kernel/vatManager/supervisor-nodeworker.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { makeMarshal } from '@agoric/marshal';
import { WeakRef, FinalizationRegistry } from '../../weakref';
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { makeLiveSlots } from '../liveSlots';
import { summarizeDelivery } from './deliver';

// eslint-disable-next-line no-unused-vars
function workerLog(first, ...args) {
Expand Down Expand Up @@ -41,34 +42,15 @@ function sendUplink(msg) {

let dispatch;

async function doProcess(dispatchRecord, errmsg) {
const dispatchOp = dispatchRecord[0];
const dispatchArgs = dispatchRecord.slice(1);
async function deliver(vatDeliveryObject) {
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
const errmsg = summarizeDelivery(vatDeliveryObject);
await runAndWait(() => dispatch(vatDeliveryObject), errmsg);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(resolutions) {
const errmsg = `vat.notify failed`;
return doProcess(['notify', resolutions], errmsg);
}

function doDropExports(vrefs) {
const errmsg = `vat.doDropExport failed`;
return doProcess(['dropExports', vrefs], errmsg);
}

parentPort.on('message', ([type, ...margs]) => {
workerLog(`received`, type);
if (type === 'start') {
Expand Down Expand Up @@ -143,22 +125,10 @@ parentPort.on('message', ([type, ...margs]) => {
workerLog(`error: deliver before dispatchReady`);
return;
}
const [[dtype, ...dargs]] = margs;
if (dtype === 'message') {
doMessage(...dargs).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else if (dtype === 'notify') {
doNotify(...dargs).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else if (dtype === 'dropExports') {
doDropExports(...dargs).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else {
assert.fail(X`bad delivery type ${dtype}`);
}
const [vatDeliveryObject] = margs;
deliver(vatDeliveryObject).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else {
workerLog(`unrecognized downlink message ${type}`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from '../../netstring';
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { makeLiveSlots } from '../liveSlots';
import { summarizeDelivery } from './deliver';

// eslint-disable-next-line no-unused-vars
function workerLog(first, ...args) {
Expand All @@ -41,34 +42,15 @@ function runAndWait(f, errmsg) {

let dispatch;

async function doProcess(dispatchRecord, errmsg) {
const dispatchOp = dispatchRecord[0];
const dispatchArgs = dispatchRecord.slice(1);
async function deliver(vatDeliveryObject) {
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
const errmsg = summarizeDelivery(vatDeliveryObject);
await runAndWait(() => dispatch(vatDeliveryObject), errmsg);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(resolutions) {
const errmsg = `vat.notify failed`;
return doProcess(['notify', resolutions], errmsg);
}

function doDropExports(vrefs) {
const errmsg = `vat.doDropExport failed`;
return doProcess(['dropExports', vrefs], errmsg);
}

const toParent = arrayEncoderStream();
toParent
.pipe(netstringEncoderStream())
Expand Down Expand Up @@ -163,22 +145,10 @@ fromParent.on('data', ([type, ...margs]) => {
workerLog(`error: deliver before dispatchReady`);
return;
}
const [[dtype, ...dargs]] = margs;
if (dtype === 'message') {
doMessage(...dargs).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else if (dtype === 'notify') {
doNotify(...dargs).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else if (dtype === 'dropExports') {
doDropExports(...dargs).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else {
assert.fail(X`bad delivery type ${dtype}`);
}
const [vatDeliveryObject] = margs;
deliver(vatDeliveryObject).then(vatDeliveryResults =>
sendUplink(['deliverDone', vatDeliveryResults]),
);
} else {
workerLog(`unrecognized downlink message ${type}`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import { importBundle } from '@agoric/import-bundle';
import { makeMarshal } from '@agoric/marshal';
// grumble... waitUntilQuiescent is exported and closes over ambient authority
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { insistVatDeliveryObject } from '../../message';

import { makeLiveSlots } from '../liveSlots';
import { summarizeDelivery } from './deliver';

const encoder = new TextEncoder();
const decoder = new TextDecoder();
Expand Down Expand Up @@ -119,44 +121,22 @@ function runAndWait(f, errmsg) {
* @param { ReturnType<typeof managerPort> } port
*/
function makeWorker(port) {
/** @type { Record<string, (...args: unknown[]) => void> | null } */
/** @type { ((delivery: VatDeliveryObject) => void) | null } */
let dispatch = null;

/** @type { (dr: Tagged, errmsg: string) => Promise<Tagged> } */
async function doProcess(dispatchRecord, errmsg) {
/** @type { (delivery: VatDeliveryObject) => Promise<Tagged> } */
async function deliver(delivery) {
assert(dispatch);
const theDispatch = dispatch;
const [dispatchOp, ...dispatchArgs] = dispatchRecord;
assert(typeof dispatchOp === 'string');
workerLog(`runAndWait`);
await runAndWait(() => theDispatch[dispatchOp](...dispatchArgs), errmsg);
const errmsg = summarizeDelivery(delivery);
await runAndWait(() => theDispatch(delivery), errmsg);
workerLog(`doProcess done`);
/** @type { Tagged } */
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

/** @type { (ts: unknown, msg: any) => Promise<Tagged> } */
function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

/** @type { (rs: unknown) => Promise<Tagged> } */
function doNotify(resolutions) {
const errmsg = `vat.notify failed`;
return doProcess(['notify', resolutions], errmsg);
}

/** @type { (rs: unknown) => Promise<Tagged> } */
function doDropExports(vrefs) {
const errmsg = `vat.dropExports failed`;
return doProcess(['dropExports', vrefs], errmsg);
}

/**
* TODO: consider other methods per SES VirtualConsole.
* See https://github.com/Agoric/agoric-sdk/issues/2146
Expand Down Expand Up @@ -277,18 +257,9 @@ function makeWorker(port) {
}
case 'deliver': {
assert(dispatch, 'cannot deliver before setBundle');
assert(Array.isArray(args[0]));
const [[dtype, ...dargs]] = args;
switch (dtype) {
case 'message':
return doMessage(dargs[0], dargs[1]);
case 'notify':
return doNotify(dargs[0]);
case 'dropExports':
return doDropExports(dargs[0]);
default:
assert.fail(X`bad delivery type ${dtype}`);
}
const [vatDeliveryObject] = args;
insistVatDeliveryObject(vatDeliveryObject);
return deliver(vatDeliveryObject);
}
default:
workerLog('handleItem: bad tag', tag, args.length);
Expand Down
Loading

0 comments on commit 9214adf

Please sign in to comment.