Skip to content

Commit

Permalink
add transcript pseudo-events: init, snapshot save/load, shutdown
Browse files Browse the repository at this point in the history
This introduces four new pseudo-delivery events to the transcript:

* 'initialize-worker': a new empty worker is created
* 'load-snapshot': a worker is loaded from heap snapshot
* 'save-snapshot': we tell the worker to write a heap snapshot
* 'shutdown-worker': we stop the worker (e.g. during upgrade)

These events are not actually delivered to the worker: they are not
VatDeliveryObjects. However many of them are implemented with commands
to the worker (but not `deliver()` commands). The vat-warehouse
records these events in the transcript to help subsequent
manual/external replay tools know what happened. Without them, we'd
need to deduce e.g. the heap-snapshot writing schedule by counting
deliveries and comparing them against snapshot initial/interval.

The 'save-snapshot'/'load-snapshot' pair indicates what a replay would
do. It does not mean that the vat-warehouse actually tore down the old
worker and immediately replaced it with a new one (from snapshot). It
might choose to do that, or the worker itself might choose to replace
its XS engine instance with a fresh one, or it might keep using the
old engine. The 'save-snapshot' command has side-effects (it does a
forced GC), so it is important to keep track of when it happened.

As before, the transcript is broken up into "spans", delimited by heap
snapshots or upgrade-related shutdowns. To bring a worker up to date,
we want to start a worker (either a blank one, or from a snapshot),
and then replay the "current span".

With this change, the current span always starts either with
'initialize-worker' or with 'load-snapshot', telling us exactly what
needs to be done. The span then contains all the deliveries that must
be replayed. Old spans will end with `save-snapshot` or
`shutdown-worker`, but the current span will never include one of
those: the span is closed immediately after those events are
added. When the kernel replays a transcript to bring a worker up to
date, that replay will never see 'save-snapshot' or
'shutdown-worker'. But an external tool which replays a historical
span will see them at the end.

The `initialize-worker` event contains `workerOptions` (which includes
which type of worker is being used, as well as helper bundle IDs like
lockdown and supervisor), as well as the `source.bundleID` for the vat
bundle.

The `save-snapshot` event results contain the `snapshotID` hash that
was generated. The `load-snapshot` event includes the `snapshotID` in
a record that could be extended with additional details in the
future (like an xsnap version).

The types were improved to make `TranscriptDelivery` be a superset of
`VatDeliveryObject`. We also record TranscriptDeliveryResult, which is
currently a stripped down subset of VatDeliveryResult (just the "ok"
status), plus the save-snapshot hash. In the future, we'll probably
record the deterministic subset of metering results (computrons, maybe
something about memory allocation).

In the slog, the `heap-snapshot-save` event details now contain
`snapshotID` instead of `hash`, to be consistent.

Previously vat-warehouse used `lastVatID` to track which vat received
a delivery most recently, and `saveSnapshot()` used that to decide
which vat requires a snapshot. This commit changes that path to be
more explicit, and removes `lastVatID`.

refs #7199
refs #6770
  • Loading branch information
warner committed Apr 24, 2023
1 parent 90de7cd commit f11ff2d
Show file tree
Hide file tree
Showing 10 changed files with 581 additions and 95 deletions.
13 changes: 8 additions & 5 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ export default function buildKernel(
* @typedef { {
* abort?: boolean, // changes should be discarded, not committed
* consumeMessage?: boolean, // discard the aborted delivery
* didDelivery?: boolean, // we made a delivery to a vat, for run policy
* didDelivery?: VatID, // we made a delivery to a vat, for run policy and save-snapshot
* computrons?: BigInt, // computron count for run policy
* meterID?: string, // deduct those computrons from a meter
* decrementReapCount?: { vatID: VatID }, // the reap counter should decrement
Expand Down Expand Up @@ -461,7 +461,7 @@ export default function buildKernel(
// TODO metering.allocate, some day

/** @type {CrankResults} */
const results = { didDelivery: true, computrons };
const results = { didDelivery: vatID, computrons };

if (meterID && computrons) {
results.meterID = meterID; // decrement meter when we're done
Expand Down Expand Up @@ -701,7 +701,7 @@ export default function buildKernel(
console.log('error during createDynamicVat', err);
const info = makeError(`${err}`);
const results = {
didDelivery: true, // ok, it failed, but we did spend the time
didDelivery: vatID, // ok, it failed, but we did spend the time
abort: true, // delete partial vat state
consumeMessage: true, // don't repeat createVat
terminate: { vatID, reject: true, info },
Expand Down Expand Up @@ -1267,8 +1267,11 @@ export default function buildKernel(
crankResults.consumeMessage ? 'deliver' : 'start',
);
} else {
// eslint-disable-next-line @jessie.js/no-nested-await
await vatWarehouse.maybeSaveSnapshot();
const vatID = crankResults.didDelivery;
if (vatID) {
// eslint-disable-next-line @jessie.js/no-nested-await
await vatWarehouse.maybeSaveSnapshot(vatID);
}
}
const { computrons, meterID } = crankResults;
if (computrons) {
Expand Down
63 changes: 50 additions & 13 deletions packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import { enumeratePrefixedKeys } from './storageHelper.js';
* @typedef { import('../../types-internal.js').VatManager } VatManager
* @typedef { import('../../types-internal.js').RecordedVatOptions } RecordedVatOptions
* @typedef { import('../../types-internal.js').TranscriptEntry } TranscriptEntry
* @typedef {import('../../types-internal.js').TranscriptDeliverySaveSnapshot} TDSaveSnapshot
* @typedef {import('../../types-internal.js').TranscriptDeliverySaveSnapshotResults} TDSaveSnapshotResults
* @typedef {import('../../types-internal.js').TranscriptDeliveryLoadSnapshot} TDLoadSnapshot
*/

// makeVatKeeper is a pure function: all state is kept in the argument object
Expand Down Expand Up @@ -469,16 +472,28 @@ export function makeVatKeeper(
}
}

function transcriptSize() {
const bounds = transcriptStore.getCurrentSpanBounds(vatID);
const { startPos, endPos } = bounds;
return endPos - startPos;
}

/**
* Generator function to return the vat's transcript, one entry at a time.
* Generator function to return the vat's current-span transcript,
* one entry at a time.
*
* @param {number} [startPos] Optional position to begin reading from
*
* @yields { TranscriptEntry } a stream of transcript entries
* @yields { [number, TranscriptEntry] } a stream of deliveryNum and transcript entries
*/
function* getTranscript(startPos) {
for (const entry of transcriptStore.readSpan(vatID, startPos)) {
yield /** @type { TranscriptEntry } */ (JSON.parse(entry));
function* getTranscript() {
const bounds = transcriptStore.getCurrentSpanBounds(vatID);
let deliveryNum = bounds.startPos;
// readSpan() starts at startPos and ends just before endPos
for (const entry of transcriptStore.readSpan(vatID)) {
const te = /** @type { TranscriptEntry } */ (JSON.parse(entry));
/** @type { [number, TranscriptEntry]} */
const retval = [deliveryNum, te];
yield retval;
deliveryNum += 1;
}
}

Expand Down Expand Up @@ -512,34 +527,55 @@ export function makeVatKeeper(
* Store a snapshot, if given a snapStore.
*
* @param {VatManager} manager
* @returns {Promise<boolean>}
* @returns {Promise<void>}
*/
async function saveSnapshot(manager) {
if (!snapStore || !manager.makeSnapshot) {
return false;
return undefined;
}

// tell the manager to save a heap snapshot to the snapStore
const endPosition = getTranscriptEndPosition();
const info = await manager.makeSnapshot(endPosition, snapStore);
transcriptStore.rolloverSpan(vatID);

const {
hash,
hash: snapshotID,
uncompressedSize,
rawSaveSeconds,
compressedSize,
compressSeconds,
} = info;

// push a save-snapshot transcript entry
addToTranscript({
d: /** @type {TDSaveSnapshot} */ ['save-snapshot'],
sc: [],
r: /** @type {TDSaveSnapshotResults} */ { status: 'ok', snapshotID },
});

// then start a new transcript span
transcriptStore.rolloverSpan(vatID);

// then push a load-snapshot entry, so that the current span
// always starts with an initialize-worker or load-snapshot
// pseudo-delivery
const loadConfig = { snapshotID };
addToTranscript({
d: /** @type {TDLoadSnapshot} */ ['load-snapshot', loadConfig],
sc: [],
r: { status: 'ok' },
});

kernelSlog.write({
type: 'heap-snapshot-save',
vatID,
hash,
snapshotID,
uncompressedSize,
rawSaveSeconds,
compressedSize,
compressSeconds,
endPosition,
});
return true;
}

function deleteSnapshotsAndTranscript() {
Expand Down Expand Up @@ -618,6 +654,7 @@ export function makeVatKeeper(
hasCListEntry,
deleteCListEntry,
deleteCListEntriesForKernelSlots,
transcriptSize,
getTranscript,
transcriptSnapshotStats,
addToTranscript,
Expand Down
122 changes: 85 additions & 37 deletions packages/SwingSet/src/kernel/vat-warehouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import djson from '../lib/djson.js';
* @typedef {import('@agoric/swingset-liveslots').VatSyscallResult} VatSyscallResult
* @typedef {import('@agoric/swingset-liveslots').VatSyscallHandler} VatSyscallHandler
* @typedef {import('../types-internal.js').VatManager} VatManager
* @typedef {import('../types-internal.js').VatID} VatID
* @typedef {import('../types-internal.js').TranscriptDeliveryInitializeWorkerOptions} TDInitializeWorkerOptions
* @typedef {import('../types-internal.js').TranscriptDeliveryInitializeWorker} TDInitializeWorker
* @typedef {import('../types-internal.js').TranscriptDeliveryShutdownWorker} TDShutdownWorker
* @typedef {import('../types-internal.js').TranscriptDeliveryResults} TranscriptDeliveryResults
* @typedef {import('../types-internal.js').TranscriptEntry} TranscriptEntry
* @typedef {{ body: string, slots: unknown[] }} Capdata
Expand Down Expand Up @@ -38,8 +42,31 @@ function recordSyscalls(origHandler) {
syscalls.push({ s: vso, r: vres });
return vres;
};
const getTranscriptSyscalls = () => syscalls;
return { syscallHandler, getTranscriptSyscalls };
const getTranscriptEntry = (vd, deliveryResult) => {
// TODO add metering computrons to results
/** @type {TranscriptDeliveryResults} */
const tdr = { status: deliveryResult[0] };
const transcriptEntry = { d: vd, sc: syscalls, r: tdr };
return transcriptEntry;
};
return { syscallHandler, getTranscriptEntry };
}

/**
* @param {TranscriptEntry} transcriptEntry
* @returns {VatDeliveryObject}
*/
function onlyRealDelivery(transcriptEntry) {
const dtype = transcriptEntry.d[0];
if (
dtype === 'save-snapshot' ||
dtype === 'load-snapshot' ||
dtype === 'initialize-worker' ||
dtype === 'shutdown-worker'
) {
throw Fail`replay should not see ${dtype}`;
}
return transcriptEntry.d;
}

/**
Expand Down Expand Up @@ -253,28 +280,33 @@ export function makeVatWarehouse({
* @returns {Promise<void>}
*/
async function replayTranscript(vatID, vatKeeper, manager) {
const snapshotInfo = vatKeeper.getSnapshotInfo();
const startPos = snapshotInfo ? snapshotInfo.endPos : undefined;
// console.log('replay from', { vatID, startPos });

const total = vatKeeper.vatStats().transcriptCount;
const total = vatKeeper.transcriptSize();
kernelSlog.write({ type: 'start-replay', vatID, deliveries: total });
// TODO glean deliveryNum better, make sure we get the post-snapshot
// transcript starting point right. getTranscript() should probably
// return [deliveryNum, t] pairs.
let deliveryNum = startPos || 0;
for await (const te of vatKeeper.getTranscript(startPos)) {
let first = true;
for await (const [deliveryNum, te] of vatKeeper.getTranscript()) {
// if (deliveryNum % 100 === 0) {
// console.debug(`replay vatID:${vatID} deliveryNum:${deliveryNum} / ${total}`);
// }
//
if (first) {
// the first entry should always be initialize-worker or
// load-snapshot
first = false;
const dtype = te.d[0];
if (dtype === 'initialize-worker' || dtype === 'load-snapshot') {
continue; // TODO: use this to launch the worker
} else {
console.log(`transcript for ${vatID} starts with ${te.d[0]}`);
throw Fail`transcript for ${vatID} doesn't start with init/load`;
}
}
// we slog the replay just like the original, but some fields are missing
const finishSlog = slogReplay(kernelSlog, vatID, deliveryNum, te);
const delivery = onlyRealDelivery(te);
const sim = makeSyscallSimulator(kernelSlog, vatID, deliveryNum, te);
const status = await manager.deliver(te.d, sim.syscallHandler);
const status = await manager.deliver(delivery, sim.syscallHandler);
finishSlog(status);
sim.finishSimulation(); // will throw if syscalls did not match
deliveryNum += 1;
}
kernelSlog.write({ type: 'finish-replay', vatID });
}
Expand All @@ -298,6 +330,22 @@ export function makeVatWarehouse({
const translators = provideTranslators(vatID);
const syscallHandler = buildVatSyscallHandler(vatID, translators);

// if we use transcripts, but don't have one, create one with an
// initialize-worker event, to represent the vatLoader.create()
// we're about to do
if (options.useTranscript && vatKeeper.transcriptSize() === 0) {
/** @type { TDInitializeWorkerOptions } */
const initOpts = { source: {}, workerOptions: options.workerOptions };
// if the vat is somehow using a full bundle, we don't want that
// in the transcript: we only record bundleIDs
initOpts.source.bundleID = source.bundleID;
vatKeeper.addToTranscript({
d: /** @type {TDInitializeWorker} */ ['initialize-worker', initOpts],
sc: [],
r: { status: 'ok' },
});
}

const isDynamic = kernelKeeper.getDynamicVats().includes(vatID);
const managerP = vatLoader.create(vatID, {
isDynamic,
Expand Down Expand Up @@ -439,7 +487,7 @@ export function makeVatWarehouse({
* options: pay $/block to keep in RAM - advisory; not consensus
* creation arg: # of vats to keep in RAM (LRU 10~50~100)
*
* @param {string} currentVatID
* @param {VatID} currentVatID
*/
async function applyAvailabilityPolicy(currentVatID) {
const lru = recent.add(currentVatID);
Expand All @@ -453,13 +501,9 @@ export function makeVatWarehouse({
await evict(lru);
}

/** @type { string | undefined } */
let lastVatID;

/** @type {(vatID: string, kd: KernelDeliveryObject, d: VatDeliveryObject, vs: VatSlog) => Promise<VatDeliveryResult> } */
async function deliverToVat(vatID, kd, vd, vs) {
await applyAvailabilityPolicy(vatID);
lastVatID = vatID;

const recreate = true; // PANIC in the failure case
// create the worker and replay the transcript, if necessary
Expand All @@ -479,7 +523,7 @@ export function makeVatWarehouse({

// wrap the syscallHandler with a syscall recorder
const recorder = recordSyscalls(origHandler);
const { syscallHandler, getTranscriptSyscalls } = recorder;
const { syscallHandler, getTranscriptEntry } = recorder;
assert(syscallHandler);

// make the delivery
Expand All @@ -490,11 +534,7 @@ export function makeVatWarehouse({
// TODO: if the dispatch failed for whatever reason, and we choose to
// destroy the vat, change what we do with the transcript here.
if (options.useTranscript) {
// record transcript entry
/** @type {TranscriptDeliveryResults} */
const tdr = { status: deliveryResult[0] };
const transcriptEntry = { d: vd, sc: getTranscriptSyscalls(), r: tdr };
vatKeeper.addToTranscript(transcriptEntry);
vatKeeper.addToTranscript(getTranscriptEntry(vd, deliveryResult));
}

// TODO: if per-vat policy decides it wants a BOYD or heap snapshot,
Expand All @@ -504,21 +544,19 @@ export function makeVatWarehouse({
}

/**
* Save a snapshot of most recently used vat,
* depending on snapshotInterval.
* Save a heap snapshot for the given vatID, if the snapshotInterval
* is satisified
*
* @param {VatID} vatID
*/
async function maybeSaveSnapshot() {
if (!lastVatID || !lookup(lastVatID)) {
return false;
}

async function maybeSaveSnapshot(vatID) {
const recreate = true; // PANIC in the failure case
const { manager } = await ensureVatOnline(lastVatID, recreate);
const { manager } = await ensureVatOnline(vatID, recreate);
if (!manager.makeSnapshot) {
return false;
return false; // worker cannot make snapshots
}

const vatKeeper = kernelKeeper.provideVatKeeper(lastVatID);
const vatKeeper = kernelKeeper.provideVatKeeper(vatID);
let reason;
const { totalEntries, snapshottedEntries } =
vatKeeper.transcriptSnapshotStats();
Expand All @@ -532,10 +570,15 @@ export function makeVatWarehouse({
}
// console.log('maybeSaveSnapshot: reason:', reason);
if (!reason) {
return false;
return false; // not time to make a snapshot
}

// in addition to saving the actual snapshot,
// vatKeeper.saveSnapshot() pushes a save-snapshot transcript
// entry, then starts a new transcript span, then pushes a
// load-snapshot entry, so that the current span always starts
// with an initialize-snapshot or load-snapshot pseudo-delivery
await vatKeeper.saveSnapshot(manager);
lastVatID = undefined;
return true;
}

Expand Down Expand Up @@ -584,6 +627,11 @@ export function makeVatWarehouse({
async function resetWorker(vatID) {
await evict(vatID);
const vatKeeper = kernelKeeper.provideVatKeeper(vatID);
vatKeeper.addToTranscript({
d: /** @type {TDShutdownWorker} */ ['shutdown-worker'],
sc: [],
r: { status: 'ok' },
});
vatKeeper.dropSnapshotAndResetTranscript();
}

Expand Down
Loading

0 comments on commit f11ff2d

Please sign in to comment.