Skip to content

Commit

Permalink
Move job handlers for state creation to util
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Oct 17, 2022
1 parent a6e4954 commit c8b0539
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 456 deletions.
107 changes: 2 additions & 105 deletions packages/eden-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
JOB_KIND_PRUNE,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
initClients,
Expand Down Expand Up @@ -55,15 +54,6 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);

const { data: { kind } } = job;

// If it's a pruning job: Create a hooks job.
if (kind === JOB_KIND_PRUNE) {
await this.createHooksJob();
}

await this._jobQueue.markComplete(job);
});
}

Expand All @@ -75,108 +65,15 @@ export class JobRunner {

async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
const { data: { blockHash, blockNumber } } = job;

// Get the current IPLD Status.
const ipldStatus = await this._indexer.getIPLDStatus();

if (ipldStatus) {
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
// Create hooks job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await this.createHooksJob(parentBlock.blockHash, parentBlock.blockNumber);

const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);

throw new Error(message);
}

if (ipldStatus.latestHooksBlockNumber > (blockNumber - 1)) {
log(`Hooks for blockNumber ${blockNumber} already processed`);

return;
}
}

// Process the hooks for the given block number.
await this._indexer.processCanonicalBlock(blockHash, blockNumber);

// Update the IPLD status.
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);

// Create a checkpoint job after completion of a hook job.
await this.createCheckpointJob(blockHash, blockNumber);

await this._jobQueue.markComplete(job);
await this._baseJobRunner.processHooks(job);
});
}

async subscribeBlockCheckpointQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
const { data: { blockHash, blockNumber } } = job;

// Get the current IPLD Status.
const ipldStatus = await this._indexer.getIPLDStatus();
assert(ipldStatus);

if (ipldStatus.latestCheckpointBlockNumber >= 0) {
if (ipldStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await this.createCheckpointJob(parentBlock.blockHash, parentBlock.blockNumber);

const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);

throw new Error(message);
}

if (ipldStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
log(`Checkpoints for blockNumber ${blockNumber} already processed`);

return;
}
}

// Process checkpoints for the given block.
await this._indexer.processCheckpoint(blockHash);

// Update the IPLD status.
await this._indexer.updateIPLDStatusCheckpointBlock(blockNumber);

await this._jobQueue.markComplete(job);
await this._baseJobRunner.processCheckpoint(job);
});
}

async createHooksJob (blockHash?: string, blockNumber?: number): Promise<void> {
if (!blockNumber || !blockHash) {
// Get the latest canonical block
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();

// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
blockHash = latestCanonicalBlock.parentHash;
blockNumber = latestCanonicalBlock.blockNumber - 1;
}

await this._jobQueue.pushJob(
QUEUE_HOOKS,
{
blockHash,
blockNumber
}
);
}

async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
await this._jobQueue.pushJob(
QUEUE_BLOCK_CHECKPOINT,
{
blockHash,
blockNumber
}
);
}
}

export const main = async (): Promise<any> => {
Expand Down
33 changes: 33 additions & 0 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Database } from './database';
import { Event } from './entity/Event';
import { fetchTokenDecimals, fetchTokenName, fetchTokenSymbol, fetchTokenTotalSupply } from './utils';
import { SyncStatus } from './entity/SyncStatus';
import { IpldStatus } from './entity/IpldStatus';
import artifacts from './artifacts/ERC20.json';
import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
Expand Down Expand Up @@ -249,6 +250,14 @@ export class Indexer implements IndexerInterface {
);
}

async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> {
// TODO Implement
}

async processCheckpoint (blockHash: string): Promise<void> {
// TODO Implement
}

getIPLDData (ipldBlock: IPLDBlock): any {
return this._baseIndexer.getIPLDData(ipldBlock);
}
Expand Down Expand Up @@ -297,6 +306,30 @@ export class Indexer implements IndexerInterface {
return { eventName, eventInfo };
}

async getIPLDStatus (): Promise<IpldStatus | undefined> {
return this._db.getIPLDStatus();
}

async updateIPLDStatusHooksBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
// TODO Implement
return {} as IpldStatus;
}

async updateIPLDStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<IpldStatus> {
// TODO Implement
return {} as IpldStatus;
}

async getLatestCanonicalBlock (): Promise<BlockProgress> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);

const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);

return latestCanonicalBlock;
}

async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
Expand Down
107 changes: 2 additions & 105 deletions packages/erc721-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
QUEUE_EVENT_PROCESSING,
QUEUE_BLOCK_CHECKPOINT,
QUEUE_HOOKS,
JOB_KIND_PRUNE,
JobQueueConfig,
DEFAULT_CONFIG_PATH,
initClients,
Expand Down Expand Up @@ -53,15 +52,6 @@ export class JobRunner {
async subscribeBlockProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseJobRunner.processBlock(job);

const { data: { kind } } = job;

// If it's a pruning job: Create a hooks job.
if (kind === JOB_KIND_PRUNE) {
await this.createHooksJob();
}

await this._jobQueue.markComplete(job);
});
}

Expand All @@ -73,108 +63,15 @@ export class JobRunner {

async subscribeHooksQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_HOOKS, async (job) => {
const { data: { blockHash, blockNumber } } = job;

// Get the current IPLD Status.
const ipldStatus = await this._indexer.getIPLDStatus();

if (ipldStatus) {
if (ipldStatus.latestHooksBlockNumber < (blockNumber - 1)) {
// Create hooks job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await this.createHooksJob(parentBlock.blockHash, parentBlock.blockNumber);

const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);

throw new Error(message);
}

if (ipldStatus.latestHooksBlockNumber > (blockNumber - 1)) {
log(`Hooks for blockNumber ${blockNumber} already processed`);

return;
}
}

// Process the hooks for the given block number.
await this._indexer.processCanonicalBlock(blockHash);

// Update the IPLD status.
await this._indexer.updateIPLDStatusHooksBlock(blockNumber);

// Create a checkpoint job after completion of a hook job.
await this.createCheckpointJob(blockHash, blockNumber);

await this._jobQueue.markComplete(job);
await this._baseJobRunner.processHooks(job);
});
}

async subscribeBlockCheckpointQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_BLOCK_CHECKPOINT, async (job) => {
const { data: { blockHash, blockNumber } } = job;

// Get the current IPLD Status.
const ipldStatus = await this._indexer.getIPLDStatus();
assert(ipldStatus);

if (ipldStatus.latestCheckpointBlockNumber >= 0) {
if (ipldStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await this.createCheckpointJob(parentBlock.blockHash, parentBlock.blockNumber);

const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);

throw new Error(message);
}

if (ipldStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
log(`Checkpoints for blockNumber ${blockNumber} already processed`);

return;
}
}

// Process checkpoints for the given block.
await this._indexer.processCheckpoint(blockHash);

// Update the IPLD status.
await this._indexer.updateIPLDStatusCheckpointBlock(blockNumber);

await this._jobQueue.markComplete(job);
await this._baseJobRunner.processCheckpoint(job);
});
}

async createHooksJob (blockHash?: string, blockNumber?: number): Promise<void> {
if (!blockNumber || !blockHash) {
// Get the latest canonical block
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();

// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
blockHash = latestCanonicalBlock.parentHash;
blockNumber = latestCanonicalBlock.blockNumber - 1;
}

await this._jobQueue.pushJob(
QUEUE_HOOKS,
{
blockHash,
blockNumber
}
);
}

async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
await this._jobQueue.pushJob(
QUEUE_BLOCK_CHECKPOINT,
{
blockHash,
blockNumber
}
);
}
}

export const main = async (): Promise<any> => {
Expand Down
Loading

0 comments on commit c8b0539

Please sign in to comment.