Skip to content

Commit

Permalink
Use reset CLIs from cli package (#403)
Browse files Browse the repository at this point in the history
* Use reset CLIs from cli package

* Wait on latest entities being reset
  • Loading branch information
prathamesh0 authored Nov 21, 2022
1 parent ee389ab commit 471bb05
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 99 deletions.
47 changes: 4 additions & 43 deletions packages/uni-info-watcher/src/cli/reset-cmds/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@
// Copyright 2022 Vulcanize, Inc.
//

import debug from 'debug';

import { getConfig } from '@cerc-io/util';
import { Config } from '@vulcanize/util';
import { ResetStateCmd } from '@cerc-io/cli';

import { Database } from '../../database';

const log = debug('vulcanize:reset-state');

export const command = 'state';

export const desc = 'Reset State to a given block number';
Expand All @@ -22,42 +17,8 @@ export const builder = {
};

export const handler = async (argv: any): Promise<void> => {
const { blockNumber } = argv;
const config: Config = await getConfig(argv.configFile);

// Initialize database
const db = new Database(config.database, config.server);
await db.init();

// Create a DB transaction
const dbTx = await db.createTransactionRunner();

console.time('time:reset-state');
try {
// Delete all State entries after the given block
await db.removeStatesAfterBlock(dbTx, blockNumber);

// Reset the stateSyncStatus.
const stateSyncStatus = await db.getStateSyncStatus();

if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockNumber) {
await db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true);
}

if (stateSyncStatus.latestCheckpointBlockNumber > blockNumber) {
await db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true);
}
}

dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
console.timeEnd('time:reset-state');
const resetStateCmd = new ResetStateCmd();
await resetStateCmd.init(argv, Database);

log(`Reset state successfully to block ${blockNumber}`);
await resetStateCmd.exec();
};
35 changes: 6 additions & 29 deletions packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@
// Copyright 2021 Vulcanize, Inc.
//

import debug from 'debug';
import assert from 'assert';

import { JobQueue, resetJobs, getConfig, initClients } from '@cerc-io/util';
import { Config } from '@vulcanize/util';
import { ResetWatcherCmd } from '@cerc-io/cli';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Config } from '@vulcanize/util';

import { Database } from '../../database';
import { Indexer } from '../../indexer';

const log = debug('vulcanize:reset-watcher');

export const command = 'watcher';

export const desc = 'Reset watcher to a block number';
Expand All @@ -26,15 +21,9 @@ export const builder = {
};

export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { jobQueue: jobQueueConfig } = config;
const { ethClient, ethProvider } = await initClients(config);

// Initialize database.
const db = new Database(config.database, config.server);
await db.init();
const resetWatcherCmd = new ResetWatcherCmd();

const config: Config = await resetWatcherCmd.initConfig(argv.configFile);
const {
uniWatcher,
tokenWatcher
Expand All @@ -43,18 +32,6 @@ export const handler = async (argv: any): Promise<void> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);

assert(jobQueueConfig, 'Missing job queue config');

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(config.server, db, { uniClient, erc20Client, ethClient }, ethProvider, jobQueue);

await indexer.resetWatcherToBlock(argv.blockNumber);
await indexer.resetLatestEntities(argv.blockNumber);

log('Reset watcher successfully');
await resetWatcherCmd.init(argv, Database, Indexer, { uniClient, erc20Client });
await resetWatcherCmd.exec();
};
4 changes: 3 additions & 1 deletion packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export class Indexer implements IndexerInterface {
async resetLatestEntities (blockNumber: number): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
this._db.graphDatabase.resetLatestEntities(dbTx, blockNumber);
await this._db.graphDatabase.resetLatestEntities(dbTx, blockNumber);

dbTx.commitTransaction();
} catch (error) {
Expand Down Expand Up @@ -635,6 +635,8 @@ export class Indexer implements IndexerInterface {
async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [...ENTITIES, FrothyEntity];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);

await this.resetLatestEntities(blockNumber);
}

async _saveBlockAndFetchEvents ({
Expand Down
30 changes: 4 additions & 26 deletions packages/uni-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,11 @@
// Copyright 2021 Vulcanize, Inc.
//

import debug from 'debug';
import assert from 'assert';

import { JobQueue, resetJobs, getConfig, initClients } from '@cerc-io/util';
import { Config } from '@vulcanize/util';
import { ResetWatcherCmd } from '@cerc-io/cli';

import { Database } from '../../database';
import { Indexer } from '../../indexer';

const log = debug('vulcanize:reset-watcher');

export const command = 'watcher';

export const desc = 'Reset watcher to a block number';
Expand All @@ -24,24 +18,8 @@ export const builder = {
};

export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
await resetJobs(config);
const { jobQueue: jobQueueConfig } = config;
const { ethClient, ethProvider } = await initClients(config);

// Initialize database.
const db = new Database(config.database);
await db.init();

assert(jobQueueConfig, 'Missing job queue config');

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
const resetWatcherCmd = new ResetWatcherCmd();
await resetWatcherCmd.init(argv, Database, Indexer);

await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
await resetWatcherCmd.exec();
};

0 comments on commit 471bb05

Please sign in to comment.