From 2bb78e8a450e41979e2b8cafcff1aa4917db91c3 Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Wed, 12 Oct 2022 18:19:02 +0530 Subject: [PATCH] Avoid creating a separate job for prefetching blocks --- packages/util/src/common.ts | 40 ++++++++++++++++------------------ packages/util/src/constants.ts | 1 - 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 684b332bb..f0ff6792c 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -2,7 +2,7 @@ import debug from 'debug'; import assert from 'assert'; import { DeepPartial } from 'typeorm'; -import { QUEUE_BLOCK_PROCESSING, JOB_KIND_PRUNE, JOB_KIND_INDEX, JOB_KIND_FETCH_BLOCKS, UNKNOWN_EVENT_NAME } from './constants'; +import { QUEUE_BLOCK_PROCESSING, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; import { JobQueue } from './job-queue'; import { BlockProgressInterface, IndexerInterface, EventInterface } from './types'; import { wait } from './misc'; @@ -56,24 +56,26 @@ export const processBlockByNumber = async ( await jobQueue.pushJob( QUEUE_BLOCK_PROCESSING, { - kind: JOB_KIND_FETCH_BLOCKS, + kind: JOB_KIND_INDEX, blockNumber } ); }; export const fetchBlocks = async ( - blockNumber: number, + job: any, indexer: IndexerInterface, - jobQueue: JobQueue, jobQueueConfig: JobQueueConfig, prefetchedBlocksMap: Map -): Promise => { +): Promise[]> => { + const { blockNumber } = job.data; let blocks = []; - console.time('time:common#fetchBlocks-getSyncStatus'); const syncStatus = await indexer.getSyncStatus(); - console.timeEnd('time:common#fetchBlocks-getSyncStatus'); + // Skip blocks already pushed to job queue. They are already retried after fail. + if (syncStatus && syncStatus.chainHeadBlockNumber >= Number(blockNumber)) { + return []; + } // Check for blocks in cache if prefetchBlocksInMem flag set. if (jobQueueConfig.prefetchBlocksInMem) { @@ -113,26 +115,22 @@ export const fetchBlocks = async ( } } + const blocksToBeIndexed: DeepPartial[] = []; for (const block of blocks) { const { cid, blockHash, blockNumber, parentHash, timestamp } = block; - // Stop blocks already pushed to job queue. They are already retried after fail. - if (!syncStatus || syncStatus.chainHeadBlockNumber < Number(blockNumber)) { - await jobQueue.pushJob( - QUEUE_BLOCK_PROCESSING, - { - kind: JOB_KIND_INDEX, - blockNumber: Number(blockNumber), - cid, - blockHash, - parentHash, - timestamp - } - ); - } + blocksToBeIndexed.push({ + blockNumber: Number(blockNumber), + cid, + blockHash, + parentHash, + blockTimestamp: timestamp + }); } await indexer.updateSyncStatusChainHead(blocks[0].blockHash, blocks[0].blockNumber); + + return blocksToBeIndexed; }; export const _prefetchBlocks = async ( diff --git a/packages/util/src/constants.ts b/packages/util/src/constants.ts index c8c2126c1..f4479d7fa 100644 --- a/packages/util/src/constants.ts +++ b/packages/util/src/constants.ts @@ -13,7 +13,6 @@ export const QUEUE_IPFS = 'ipfs'; export const JOB_KIND_INDEX = 'index'; export const JOB_KIND_PRUNE = 'prune'; -export const JOB_KIND_FETCH_BLOCKS = 'fetch-blocks'; export const JOB_KIND_EVENTS = 'events'; export const JOB_KIND_CONTRACT = 'contract';