Skip to content

Commit

Permalink
Merge pull request #398 from deep-stack/ng-fix-prefetchBlocksInMem
Browse files Browse the repository at this point in the history
Fix running watcher without prefetchBlocksInMem flag set to true
  • Loading branch information
prathamesh0 authored Nov 18, 2022
2 parents e7ca210 + bdd3a76 commit 84c3854
Showing 1 changed file with 29 additions and 17 deletions.
46 changes: 29 additions & 17 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import assert from 'assert';
import debug from 'debug';
import { In } from 'typeorm';
import { DeepPartial, In } from 'typeorm';

import {
JobQueueConfig,
Expand All @@ -21,7 +21,9 @@ import {
MAX_REORG_DEPTH,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME
UNKNOWN_EVENT_NAME,
wait,
BlockProgressInterface
} from '@cerc-io/util';

import { JobQueue } from './job-queue';
Expand All @@ -36,7 +38,7 @@ export class JobRunner {
_blockProcessStartTime?: Date
_blockNumEvents = 0

_prefetchedBlocksMap: Map<string, PrefetchedBlock> = new Map()
_blockAndEventsMap: Map<string, PrefetchedBlock> = new Map()

constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._jobQueueConfig = jobQueueConfig;
Expand All @@ -53,7 +55,7 @@ export class JobRunner {
job,
this._indexer,
this._jobQueueConfig,
this._prefetchedBlocksMap
this._blockAndEventsMap
);
const indexBlockPromises = blocksToBeIndexed.map(blockToBeIndexed => this._indexBlock(job, blockToBeIndexed));
await Promise.all(indexBlockPromises);
Expand Down Expand Up @@ -152,12 +154,15 @@ export class JobRunner {
console.timeEnd('time:job-runner#_pruneChain');
}

async _indexBlock (job: any, blockToBeIndexed: any): Promise<void> {
async _indexBlock (job: any, blockToBeIndexed: DeepPartial<BlockProgressInterface>): Promise<void> {
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);

const { data: { priority } } = job;
const { blockHash, blockNumber, parentHash } = blockToBeIndexed;
const { cid, blockHash, blockNumber, parentHash, blockTimestamp } = blockToBeIndexed;
assert(blockNumber);
assert(blockHash);
assert(parentHash);

const indexBlockStartTime = new Date();

Expand Down Expand Up @@ -258,14 +263,21 @@ export class JobRunner {
}

if (!blockProgress) {
const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash);
const prefetchedBlock = this._blockAndEventsMap.get(blockHash);

if (!prefetchedBlock) {
const message = `BlockProgress entity not found found in db and prefetchedBlocksMap for ${blockNumber}`;
throw new Error(message);
} else {
if (prefetchedBlock) {
({ block: blockProgress } = prefetchedBlock);
blockProgress = await this._indexer.saveBlockProgress(blockProgress);
} else {
// Delay required to process block.
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
await wait(jobDelayInMilliSecs);
let events = [];

console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
[blockProgress, events] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');

this._blockAndEventsMap.set(blockHash, { block: blockProgress, events });
}
}

Expand All @@ -292,12 +304,12 @@ export class JobRunner {

console.time('time:job-runner#_processEvents-events');

if (!this._prefetchedBlocksMap.has(block.blockHash)) {
if (!this._blockAndEventsMap.has(block.blockHash)) {
const [, events] = await this._indexer.saveBlockAndFetchEvents(block);
this._prefetchedBlocksMap.set(block.blockHash, { block, events });
this._blockAndEventsMap.set(block.blockHash, { block, events });
}

const prefetchedBlock = this._prefetchedBlocksMap.get(block.blockHash);
const prefetchedBlock = this._blockAndEventsMap.get(block.blockHash);
assert(prefetchedBlock);

const { events } = prefetchedBlock;
Expand Down Expand Up @@ -415,8 +427,8 @@ export class JobRunner {
this._indexer.saveEvents(dbEvents.filter(event => event.eventName !== UNKNOWN_EVENT_NAME))
]);
console.timeEnd('time:job-runner#_processEvents-updateBlockProgress-saveEvents');
this._prefetchedBlocksMap.delete(block.blockHash);
log('size:job-runner#_processEvents-_prefetchedBlocksMap:', this._prefetchedBlocksMap.size);
this._blockAndEventsMap.delete(block.blockHash);
log('size:job-runner#_processEvents-_blockAndEventsMap:', this._blockAndEventsMap.size);

console.timeEnd('time:job-runner#_processEvents-events');
}
Expand Down

0 comments on commit 84c3854

Please sign in to comment.