Skip to content

Commit

Permalink
Avoid creating a separate job for prefetching blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Oct 12, 2022
1 parent 5c2f89d commit 2bb78e8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
40 changes: 19 additions & 21 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import debug from 'debug';
import assert from 'assert';
import { DeepPartial } from 'typeorm';

import { QUEUE_BLOCK_PROCESSING, JOB_KIND_PRUNE, JOB_KIND_INDEX, JOB_KIND_FETCH_BLOCKS, UNKNOWN_EVENT_NAME } from './constants';
import { QUEUE_BLOCK_PROCESSING, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, IndexerInterface, EventInterface } from './types';
import { wait } from './misc';
Expand Down Expand Up @@ -56,24 +56,26 @@ export const processBlockByNumber = async (
await jobQueue.pushJob(
QUEUE_BLOCK_PROCESSING,
{
kind: JOB_KIND_FETCH_BLOCKS,
kind: JOB_KIND_INDEX,
blockNumber
}
);
};

export const fetchBlocks = async (
blockNumber: number,
job: any,
indexer: IndexerInterface,
jobQueue: JobQueue,
jobQueueConfig: JobQueueConfig,
prefetchedBlocksMap: Map<string, PrefetchedBlock>
): Promise<void> => {
): Promise<DeepPartial<BlockProgressInterface>[]> => {
const { blockNumber } = job.data;
let blocks = [];

console.time('time:common#fetchBlocks-getSyncStatus');
const syncStatus = await indexer.getSyncStatus();
console.timeEnd('time:common#fetchBlocks-getSyncStatus');
// Skip blocks already pushed to job queue. They are already retried after fail.
if (syncStatus && syncStatus.chainHeadBlockNumber >= Number(blockNumber)) {
return [];
}

// Check for blocks in cache if prefetchBlocksInMem flag set.
if (jobQueueConfig.prefetchBlocksInMem) {
Expand Down Expand Up @@ -113,26 +115,22 @@ export const fetchBlocks = async (
}
}

const blocksToBeIndexed: DeepPartial<BlockProgressInterface>[] = [];
for (const block of blocks) {
const { cid, blockHash, blockNumber, parentHash, timestamp } = block;

// Stop blocks already pushed to job queue. They are already retried after fail.
if (!syncStatus || syncStatus.chainHeadBlockNumber < Number(blockNumber)) {
await jobQueue.pushJob(
QUEUE_BLOCK_PROCESSING,
{
kind: JOB_KIND_INDEX,
blockNumber: Number(blockNumber),
cid,
blockHash,
parentHash,
timestamp
}
);
}
blocksToBeIndexed.push({
blockNumber: Number(blockNumber),
cid,
blockHash,
parentHash,
blockTimestamp: timestamp
});
}

await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber);

return blocksToBeIndexed;
};

export const _prefetchBlocks = async (
Expand Down
1 change: 0 additions & 1 deletion packages/util/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ export const QUEUE_IPFS = 'ipfs';

export const JOB_KIND_INDEX = 'index';
export const JOB_KIND_PRUNE = 'prune';
export const JOB_KIND_FETCH_BLOCKS = 'fetch-blocks';

export const JOB_KIND_EVENTS = 'events';
export const JOB_KIND_CONTRACT = 'contract';
Expand Down

0 comments on commit 2bb78e8

Please sign in to comment.