Skip to content

Commit

Permalink
fix(prover): Handle starting blocks out of order in prover (#10350)
Browse files Browse the repository at this point in the history
Fixes issue introduced in #10263 where block states would be stored out
of order in the internal epoch proving state, which caused them to be
picked up out of order when computing block merges.

---------

Co-authored-by: Alex Gherghisan <[email protected]>
  • Loading branch information
spalladino and alexghr authored Dec 3, 2024
1 parent 0d1b722 commit 9106102
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 41 deletions.
3 changes: 2 additions & 1 deletion yarn-project/circuit-types/src/interfaces/epoch-prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ export interface EpochProver extends Omit<BlockBuilder, 'setBlockCompleted'> {
/**
* Starts a new epoch. Must be the first method to be called.
* @param epochNumber - The epoch number.
* @param firstBlockNumber - The block number of the first block in the epoch.
* @param totalNumBlocks - The total number of blocks expected in the epoch (must be at least one).
**/
startNewEpoch(epochNumber: number, totalNumBlocks: number): void;
startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number): void;

/** Pads the block with empty txs if it hasn't reached the declared number of txs. */
setBlockCompleted(blockNumber: number, expectedBlockHeader?: Header): Promise<L2Block>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ export class EpochProvingState {
private mergeRollupInputs: BlockMergeRollupInputData[] = [];
public rootRollupPublicInputs: RootRollupPublicInputs | undefined;
public finalProof: Proof | undefined;
public blocks: BlockProvingState[] = [];
public blocks: (BlockProvingState | undefined)[] = [];

constructor(
public readonly epochNumber: number,
public readonly firstBlockNumber: number,
public readonly totalNumBlocks: number,
private completionCallback: (result: ProvingResult) => void,
private rejectionCallback: (reason: string) => void,
Expand Down Expand Up @@ -106,8 +107,9 @@ export class EpochProvingState {
archiveTreeRootSiblingPath: Tuple<Fr, typeof ARCHIVE_HEIGHT>,
previousBlockHash: Fr,
): BlockProvingState {
const index = globalVariables.blockNumber.toNumber() - this.firstBlockNumber;
const block = new BlockProvingState(
this.blocks.length,
index,
numTxs,
globalVariables,
padArrayEnd(l1ToL2Messages, Fr.ZERO, NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP),
Expand All @@ -119,8 +121,8 @@ export class EpochProvingState {
previousBlockHash,
this,
);
this.blocks.push(block);
if (this.blocks.length === this.totalNumBlocks) {
this.blocks[index] = block;
if (this.blocks.filter(b => !!b).length === this.totalNumBlocks) {
this.provingStateLifecycle = PROVING_STATE_LIFECYCLE.PROVING_STATE_FULL;
}
return block;
Expand Down Expand Up @@ -176,7 +178,7 @@ export class EpochProvingState {

// Returns a specific transaction proving state
public getBlockProvingStateByBlockNumber(blockNumber: number) {
return this.blocks.find(block => block.blockNumber === blockNumber);
return this.blocks.find(block => block?.blockNumber === blockNumber);
}

// Returns a set of merge rollup inputs
Expand Down
11 changes: 7 additions & 4 deletions yarn-project/prover-client/src/orchestrator/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ export class ProvingOrchestrator implements EpochProver {
this.paddingTxProof = undefined;
}

public startNewEpoch(epochNumber: number, totalNumBlocks: number) {
public startNewEpoch(epochNumber: number, firstBlockNumber: number, totalNumBlocks: number) {
const { promise: _promise, resolve, reject } = promiseWithResolvers<ProvingResult>();
const promise = _promise.catch((reason): ProvingResult => ({ status: 'failure', reason }));
if (totalNumBlocks <= 0 || !Number.isInteger(totalNumBlocks)) {
throw new Error(`Invalid number of blocks for epoch (got ${totalNumBlocks})`);
}
logger.info(`Starting epoch ${epochNumber} with ${totalNumBlocks} blocks`);
this.provingState = new EpochProvingState(epochNumber, totalNumBlocks, resolve, reject);
this.provingState = new EpochProvingState(epochNumber, firstBlockNumber, totalNumBlocks, resolve, reject);
this.provingPromise = promise;
}

Expand Down Expand Up @@ -336,7 +336,7 @@ export class ProvingOrchestrator implements EpochProver {

/** Returns the block as built for a given index. */
public getBlock(index: number): L2Block {
const block = this.provingState?.blocks[index].block;
const block = this.provingState?.blocks[index]?.block;
if (!block) {
throw new Error(`Block at index ${index} not available`);
}
Expand All @@ -354,7 +354,10 @@ export class ProvingOrchestrator implements EpochProver {
})
private padEpoch(): Promise<void> {
const provingState = this.provingState!;
const lastBlock = maxBy(provingState.blocks, b => b.blockNumber)?.block;
const lastBlock = maxBy(
provingState.blocks.filter(b => !!b),
b => b!.blockNumber,
)?.block;
if (!lastBlock) {
return Promise.reject(new Error(`Epoch needs at least one completed block in order to be padded`));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('prover/orchestrator/errors', () => {
it('throws if adding too many transactions', async () => {
const txs = times(4, i => context.makeProcessedTx(i + 1));

orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(txs.length, context.globalVariables, []);

for (const tx of txs) {
Expand All @@ -43,7 +43,7 @@ describe('prover/orchestrator/errors', () => {
});

it('throws if adding too many blocks', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, context.globalVariables, []);
await orchestrator.setBlockCompleted(context.blockNumber);

Expand All @@ -59,29 +59,29 @@ describe('prover/orchestrator/errors', () => {
});

it('throws if adding a transaction before starting block', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(async () => await orchestrator.addNewTx(context.makeProcessedTx())).rejects.toThrow(
/Block proving state for 1 not found/,
);
});

it('throws if completing a block before start', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(async () => await orchestrator.setBlockCompleted(context.blockNumber)).rejects.toThrow(
/Block proving state for 1 not found/,
);
});

it('throws if setting an incomplete block as completed', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(3, context.globalVariables, []);
await expect(async () => await orchestrator.setBlockCompleted(context.blockNumber)).rejects.toThrow(
`Block not ready for completion: expecting ${3} more transactions.`,
);
});

it('throws if adding to a cancelled block', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, context.globalVariables, []);
orchestrator.cancel();

Expand All @@ -93,23 +93,23 @@ describe('prover/orchestrator/errors', () => {
it.each([[-4], [0], [1], [8.1]] as const)(
'fails to start a block with %i transactions',
async (blockSize: number) => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(
async () => await orchestrator.startNewBlock(blockSize, context.globalVariables, []),
).rejects.toThrow(`Invalid number of txs for block (got ${blockSize})`);
},
);

it.each([[-4], [0], [8.1]] as const)('fails to start an epoch with %i blocks', (epochSize: number) => {
orchestrator.startNewEpoch(1, 1);
expect(() => orchestrator.startNewEpoch(1, epochSize)).toThrow(
orchestrator.startNewEpoch(1, 1, 1);
expect(() => orchestrator.startNewEpoch(1, 1, epochSize)).toThrow(
`Invalid number of blocks for epoch (got ${epochSize})`,
);
});

it('rejects if too many l1 to l2 messages are provided', async () => {
const l1ToL2Messages = new Array(100).fill(new Fr(0n));
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await expect(
async () => await orchestrator.startNewBlock(2, context.globalVariables, l1ToL2Messages),
).rejects.toThrow('Too many L1 to L2 messages');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('prover/orchestrator/failures', () => {
// We generate them and add them as part of the pending chain
const blocks = await timesAsync(3, i => context.makePendingBlock(3, 1, i + 1, j => ({ privateOnly: j === 1 })));

orchestrator.startNewEpoch(1, 3);
orchestrator.startNewEpoch(1, 1, 3);

for (const { block, txs, msgs } of blocks) {
// these operations could fail if the target circuit fails before adding all blocks or txs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('prover/orchestrator/lifecycle', () => {
return deferred.promise;
});

orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, context.globalVariables, []);

await sleep(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('prover/orchestrator/mixed-blocks', () => {

const l1ToL2Messages = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 1 + 0x400).map(fr);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(3, context.globalVariables, l1ToL2Messages);
for (const tx of txs) {
await context.orchestrator.addNewTx(tx);
Expand All @@ -41,7 +41,7 @@ describe('prover/orchestrator/mixed-blocks', () => {

const l1ToL2Messages = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 1 + 0x400).map(fr);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(txs.length, context.globalVariables, l1ToL2Messages);

for (const tx of txs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('prover/orchestrator/public-functions', () => {
tx.data.constants.protocolContractTreeRoot = protocolContractTreeRoot;
}

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(numTransactions, context.globalVariables, []);

const [processed, failed] = await context.processPublicFunctions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ describe('prover/orchestrator/multi-block', () => {
describe('multiple blocks', () => {
it.each([1, 4, 5])('builds an epoch with %s blocks in sequence', async (numBlocks: number) => {
logger.info(`Seeding world state with ${numBlocks} blocks`);
const txCount = 1;
const txCount = 2;
const blocks = await timesAsync(numBlocks, i => context.makePendingBlock(txCount, 0, i + 1));

logger.info(`Starting new epoch with ${numBlocks}`);
context.orchestrator.startNewEpoch(1, numBlocks);
context.orchestrator.startNewEpoch(1, 1, numBlocks);
for (const { block, txs } of blocks) {
await context.orchestrator.startNewBlock(Math.max(txCount, 2), block.header.globalVariables, []);
for (const tx of txs) {
Expand All @@ -41,15 +41,17 @@ describe('prover/orchestrator/multi-block', () => {

it.each([1, 4, 5])('builds an epoch with %s blocks in parallel', async (numBlocks: number) => {
logger.info(`Seeding world state with ${numBlocks} blocks`);
const txCount = 1;
const txCount = 2;
const blocks = await timesAsync(numBlocks, i => context.makePendingBlock(txCount, 0, i + 1));

logger.info(`Starting new epoch with ${numBlocks}`);
context.orchestrator.startNewEpoch(1, numBlocks);
context.orchestrator.startNewEpoch(1, 1, numBlocks);
await Promise.all(
blocks.map(async ({ block, txs }) => {
await context.orchestrator.startNewBlock(Math.max(txCount, 2), block.header.globalVariables, []);
await Promise.all(txs.map(tx => context.orchestrator.addNewTx(tx)));
for (const tx of txs) {
await context.orchestrator.addNewTx(tx);
}
await context.orchestrator.setBlockCompleted(block.number);
}),
);
Expand All @@ -59,5 +61,32 @@ describe('prover/orchestrator/multi-block', () => {
expect(epoch.publicInputs.endBlockNumber.toNumber()).toEqual(numBlocks);
expect(epoch.proof).toBeDefined();
});

it('builds two consecutive epochs', async () => {
const numEpochs = 2;
const numBlocks = 4;
const txCount = 2;
logger.info(`Seeding world state with ${numBlocks * numEpochs} blocks`);
const blocks = await timesAsync(numBlocks * numEpochs, i => context.makePendingBlock(txCount, 0, i + 1));

for (let epochIndex = 0; epochIndex < numEpochs; epochIndex++) {
logger.info(`Starting epoch ${epochIndex + 1} with ${numBlocks} blocks`);
context.orchestrator.startNewEpoch(epochIndex + 1, epochIndex * numBlocks + 1, numBlocks);
await Promise.all(
blocks.slice(epochIndex * numBlocks, (epochIndex + 1) * numBlocks).map(async ({ block, txs }) => {
await context.orchestrator.startNewBlock(Math.max(txCount, 2), block.header.globalVariables, []);
for (const tx of txs) {
await context.orchestrator.addNewTx(tx);
}
await context.orchestrator.setBlockCompleted(block.number);
}),
);

logger.info('Finalising epoch');
const epoch = await context.orchestrator.finaliseEpoch();
expect(epoch.publicInputs.endBlockNumber.toNumber()).toEqual(numBlocks + epochIndex * numBlocks);
expect(epoch.proof).toBeDefined();
}
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ describe('prover/orchestrator/public-functions', () => {
const [processed, _] = await context.processPublicFunctions([tx], 1, undefined);

// This will need to be a 2 tx block
context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(2, context.globalVariables, []);

for (const processedTx of processed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('prover/orchestrator/blocks', () => {

describe('blocks', () => {
it('builds an empty L2 block', async () => {
context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(2, context.globalVariables, []);

const block = await context.orchestrator.setBlockCompleted(context.blockNumber);
Expand All @@ -34,7 +34,7 @@ describe('prover/orchestrator/blocks', () => {
const txs = [context.makeProcessedTx(1)];

// This will need to be a 2 tx block
context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(2, context.globalVariables, []);

for (const tx of txs) {
Expand All @@ -51,7 +51,7 @@ describe('prover/orchestrator/blocks', () => {

const l1ToL2Messages = range(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP, 1 + 0x400).map(fr);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);
await context.orchestrator.startNewBlock(txs.length, context.globalVariables, l1ToL2Messages);

for (const tx of txs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ describe('prover/orchestrator', () => {
}
});

orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, globalVariables, [message]);

await sleep(10);
Expand Down Expand Up @@ -104,7 +104,7 @@ describe('prover/orchestrator', () => {
});

it('waits for block to be completed before enqueueing block root proof', async () => {
orchestrator.startNewEpoch(1, 1);
orchestrator.startNewEpoch(1, 1, 1);
await orchestrator.startNewBlock(2, globalVariables, []);
await orchestrator.addNewTx(context.makeProcessedTx(1));
await orchestrator.addNewTx(context.makeProcessedTx(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export class ProvingAgent {
) => {
if (err) {
const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false;
this.log.info(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`);
this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err);
return this.broker.reportProvingJobError(jobId, err.message, retry);
} else if (result) {
const outputUri = await this.proofStore.saveProofOutput(jobId, type, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('prover/bb_prover/full-rollup', () => {
log.info(`Proving epoch with ${blockCount}/${totalBlocks} blocks with ${nonEmptyTxs}/${totalTxs} non-empty txs`);

const initialHeader = context.getHeader(0);
context.orchestrator.startNewEpoch(1, totalBlocks);
context.orchestrator.startNewEpoch(1, 1, totalBlocks);

for (let blockNum = 1; blockNum <= blockCount; blockNum++) {
const globals = makeGlobals(blockNum);
Expand Down Expand Up @@ -102,7 +102,7 @@ describe('prover/bb_prover/full-rollup', () => {
Fr.random,
);

context.orchestrator.startNewEpoch(1, 1);
context.orchestrator.startNewEpoch(1, 1, 1);

await context.orchestrator.startNewBlock(numTransactions, context.globalVariables, l1ToL2Messages);

Expand Down
5 changes: 3 additions & 2 deletions yarn-project/prover-node/src/job/epoch-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ export class EpochProvingJob {
public async run() {
const epochNumber = Number(this.epochNumber);
const epochSize = this.blocks.length;
this.log.info(`Starting epoch proving job`, { epochSize, epochNumber, uuid: this.uuid });
const firstBlockNumber = this.blocks[0].number;
this.log.info(`Starting epoch proving job`, { firstBlockNumber, epochSize, epochNumber, uuid: this.uuid });
this.state = 'processing';
const timer = new Timer();

const { promise, resolve } = promiseWithResolvers<void>();
this.runPromise = promise;

try {
this.prover.startNewEpoch(epochNumber, epochSize);
this.prover.startNewEpoch(epochNumber, firstBlockNumber, epochSize);

await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => {
const globalVariables = block.header.globalVariables;
Expand Down

0 comments on commit 9106102

Please sign in to comment.