Skip to content

Commit

Permalink
Reset watcher to previous indexed block on start (#377)
Browse files Browse the repository at this point in the history
* Get block from cache and save to db if not found

* Reset watcher to previous indexed block before start

* Add missing uniswap entities for reset watcher

* Config changes to run smoke tests

* Add step for linking watcher-ts/graph-node

* Implement changes at graft block height (#6)

* Implement changes at graft block

* Implement workarounds to fix mismatches

* Add unique query for transaction table

* Apply where condition for id in inner subquery

Co-authored-by: nabarun <[email protected]>
Co-authored-by: nikugogoi <[email protected]>
  • Loading branch information
3 people authored Nov 3, 2022
1 parent ae2f497 commit da397dd
Show file tree
Hide file tree
Showing 24 changed files with 211 additions and 195 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This project uses [yarn workspaces](https://classic.yarnpkg.com/en/docs/workspac
cd packages/util && yarn link && cd ../..
cd packages/ipld-eth-client && yarn link && cd ../..
cd packages/solidity-mapper && yarn link && cd ../..
cd packages/graph-node && yarn link && cd ../..
# Workaround for typeorm dependency issue when using yarn link
cd node_modules/typeorm && yarn link && cd ../..
```
Expand All @@ -32,6 +33,7 @@ This project uses [yarn workspaces](https://classic.yarnpkg.com/en/docs/workspac
yarn link "@cerc-io/util"
yarn link "@cerc-io/ipld-eth-client"
yarn link "@cerc-io/solidity-mapper"
yarn link "@cerc-io/graph-node"
yarn link "typeorm"
```

Expand Down
34 changes: 1 addition & 33 deletions packages/erc20-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,6 @@ export const handler = async (argv: any): Promise<void> => {
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;

const dbTx = await db.createTransactionRunner();

try {
const removeEntitiesPromise = [BlockProgress, Allowance, Balance].map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});

await Promise.all(removeEntitiesPromise);
await db.removeEntities(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) });

if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);

dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};
16 changes: 16 additions & 0 deletions packages/erc20-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getStateSyncStatus(repo);
}

async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
const repo = queryRunner.manager.getRepository(StateSyncStatus);

return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force);
}

async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
const repo = queryRunner.manager.getRepository(StateSyncStatus);

return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force);
}

async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
return this._conn.getRepository(Balance)
.createQueryBuilder('balance')
Expand Down Expand Up @@ -285,6 +297,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}

async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}

async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}
Expand Down
7 changes: 7 additions & 0 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';
import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';

const log = debug('vulcanize:indexer');

Expand Down Expand Up @@ -499,6 +501,11 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}

async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [Allowance, Balance];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}

async _saveBlockAndFetchEvents ({
id,
cid: blockCid,
Expand Down
2 changes: 2 additions & 0 deletions packages/erc20-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export class JobRunner {
}

async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
}
Expand Down
2 changes: 2 additions & 0 deletions packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export const main = async (): Promise<any> => {

if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/environments/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@
jobDelayInMilliSecs = 1000
eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
68 changes: 1 addition & 67 deletions packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,72 +72,6 @@ export const handler = async (argv: any): Promise<void> => {

const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);

const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;

const dbTx = await db.createTransactionRunner();

try {
const removeEntitiesPromise = [
BlockProgress,
Factory,
Bundle,
Pool,
Mint,
Burn,
Swap,
PositionSnapshot,
Position,
Token,
PoolDayData,
PoolHourData,
Tick,
TickDayData,
TokenDayData,
TokenHourData,
Transaction,
UniswapDayData
].map(async entityClass => {
return db.deleteEntitiesByConditions<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});

await Promise.all(removeEntitiesPromise);
await db.deleteEntitiesByConditions(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) });

if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

const stateSyncStatus = await indexer.getStateSyncStatus();

if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}

if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}

await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);

dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};
6 changes: 3 additions & 3 deletions packages/uni-info-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ export interface CachedEntities {
const ENTITY_QUERY_TYPE_MAP = new Map<new() => any, ENTITY_QUERY_TYPE>([
[Bundle, ENTITY_QUERY_TYPE.SINGULAR],
[Factory, ENTITY_QUERY_TYPE.SINGULAR],
[Pool, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Token, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Pool, ENTITY_QUERY_TYPE.GROUP_BY],
[Token, ENTITY_QUERY_TYPE.GROUP_BY],
[Burn, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Mint, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Swap, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Transaction, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Transaction, ENTITY_QUERY_TYPE.UNIQUE],
[TokenDayData, ENTITY_QUERY_TYPE.DISTINCT_ON],
[TokenHourData, ENTITY_QUERY_TYPE.DISTINCT_ON],
[PoolDayData, ENTITY_QUERY_TYPE.DISTINCT_ON],
Expand Down
58 changes: 42 additions & 16 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,34 @@ import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
import { Token } from './entity/Token';
import { convertTokenToDecimal, loadFactory, loadTransaction, safeDiv, Block } from './utils';
import { createTick, feeTierToTickSpacing } from './utils/tick';
import { ADDRESS_ZERO, FACTORY_ADDRESS, WATCHED_CONTRACTS } from './utils/constants';
import { Position } from './entity/Position';
import { ADDRESS_ZERO, FACTORY_ADDRESS, FIRST_GRAFT_BLOCK, WATCHED_CONTRACTS } from './utils/constants';
import { Database, DEFAULT_LIMIT } from './database';
import { Event } from './entity/Event';
import { ResultEvent, Transaction, PoolCreatedEvent, InitializeEvent, MintEvent, BurnEvent, SwapEvent, IncreaseLiquidityEvent, DecreaseLiquidityEvent, CollectEvent, TransferEvent, FlashEvent } from './events';
import { Factory } from './entity/Factory';
import { Token } from './entity/Token';
import { Bundle } from './entity/Bundle';
import { Pool } from './entity/Pool';
import { Mint } from './entity/Mint';
import { Burn } from './entity/Burn';
import { Swap } from './entity/Swap';
import { Position } from './entity/Position';
import { PositionSnapshot } from './entity/PositionSnapshot';
import { Tick } from './entity/Tick';
import { PoolDayData } from './entity/PoolDayData';
import { PoolHourData } from './entity/PoolHourData';
import { UniswapDayData } from './entity/UniswapDayData';
import { TokenDayData } from './entity/TokenDayData';
import { TokenHourData } from './entity/TokenHourData';
import { TickDayData } from './entity/TickDayData';
import { Collect } from './entity/Collect';
import { Flash } from './entity/Flash';
import { TickHourData } from './entity/TickHourData';
import { Transaction as TransactionEntity } from './entity/Transaction';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { Tick } from './entity/Tick';
import { Contract, KIND_POOL } from './entity/Contract';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';
Expand Down Expand Up @@ -587,6 +597,11 @@ export class Indexer implements IndexerInterface {
this._subgraphStateMap.set(contractAddress, updatedData);
}

async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [Factory, Token, Bundle, Pool, Mint, Burn, Swap, Position, PositionSnapshot, Tick, PoolDayData, PoolHourData, UniswapDayData, TokenDayData, TokenHourData, TickDayData, Collect, Flash, TickHourData, TransactionEntity];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}

async _saveBlockAndFetchEvents ({
id,
cid: blockCid,
Expand Down Expand Up @@ -813,6 +828,13 @@ export class Indexer implements IndexerInterface {
pool.createdAtBlockNumber = BigInt(block.number);
pool = await this._db.savePool(dbTx, pool, block);

if (block.number >= 13450924) {
// Temp workaround to fix mismatch of Factory totalVolumeUSD and totalFeesUSD values with hosted subgraph endpoint
if (!WHITELIST_TOKENS.includes('0x4dd28568d05f09b02220b09c2cb307bfd837cb95')) {
WHITELIST_TOKENS.push('0x4dd28568d05f09b02220b09c2cb307bfd837cb95');
}
}

// Update white listed pools.
if (WHITELIST_TOKENS.includes(token0.id) || this._isDemo) {
token1.whitelistPools.push(pool.id);
Expand Down Expand Up @@ -1462,12 +1484,16 @@ export class Indexer implements IndexerInterface {
poolDayData.pool = pool.id;
await this._db.savePoolDayData(dbTx, poolDayData, block);

if (block.number > FIRST_GRAFT_BLOCK) {
await this._db.savePoolHourData(dbTx, poolHourData, block);
}

// Update inner vars of current or crossed ticks.
const newTick = pool.tick;
// Check that the tick value is not null (can be zero).
assert(newTick !== null);

const tickSpacing = feeTierToTickSpacing(pool.feeTier);
const tickSpacing = feeTierToTickSpacing(pool.feeTier, block);
const modulo = newTick % tickSpacing;

if (modulo === BigInt(0)) {
Expand Down Expand Up @@ -1625,7 +1651,6 @@ export class Indexer implements IndexerInterface {
position.withdrawnToken1 = position.withdrawnToken1.plus(amount1);

await this._db.savePosition(dbTx, position, block);

await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
Expand Down Expand Up @@ -1653,21 +1678,22 @@ export class Indexer implements IndexerInterface {
const dbTx = await this._db.createTransactionRunner();

try {
const [token0, token1] = await Promise.all([
this._db.getToken(dbTx, { id: position.token0, blockHash: block.hash }),
this._db.getToken(dbTx, { id: position.token1, blockHash: block.hash })
]);
if (block.number <= FIRST_GRAFT_BLOCK) {
const [token0, token1] = await Promise.all([
this._db.getToken(dbTx, { id: position.token0, blockHash: block.hash }),
this._db.getToken(dbTx, { id: position.token1, blockHash: block.hash })
]);

assert(token0 && token1);
assert(token0 && token1);

const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals));
const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals));

position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0);
position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1);
position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0);
position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1);
}

await this._db.savePosition(dbTx, position, block);

await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ export class JobRunner {
}

async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
}
Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export const main = async (): Promise<any> => {

const pubSub = new PubSub();
const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubSub, jobQueue);
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();

const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);
Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export const FACTORY_ADDRESS = '0x1F98431c8aD98523631AE4a59f267346ea31F984';
export const NFPM_ADDRESS = '0xC36442b4a4522E871399CD717aBDD847Ab11FE88';
export const BUNDLE_ID = '1';

export const FIRST_GRAFT_BLOCK = 13591197;

export const WATCHED_CONTRACTS = [
{
kind: KIND_FACTORY,
Expand Down
Loading

0 comments on commit da397dd

Please sign in to comment.