Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset watcher to previous indexed block on start #377

Merged
merged 6 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This project uses [yarn workspaces](https://classic.yarnpkg.com/en/docs/workspac
cd packages/util && yarn link && cd ../..
cd packages/ipld-eth-client && yarn link && cd ../..
cd packages/solidity-mapper && yarn link && cd ../..
cd packages/graph-node && yarn link && cd ../..
# Workaround for typeorm dependency issue when using yarn link
cd node_modules/typeorm && yarn link && cd ../..
```
Expand All @@ -32,6 +33,7 @@ This project uses [yarn workspaces](https://classic.yarnpkg.com/en/docs/workspac
yarn link "@cerc-io/util"
yarn link "@cerc-io/ipld-eth-client"
yarn link "@cerc-io/solidity-mapper"
yarn link "@cerc-io/graph-node"
yarn link "typeorm"
```

Expand Down
34 changes: 1 addition & 33 deletions packages/erc20-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,6 @@ export const handler = async (argv: any): Promise<void> => {
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;

const dbTx = await db.createTransactionRunner();

try {
const removeEntitiesPromise = [BlockProgress, Allowance, Balance].map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});

await Promise.all(removeEntitiesPromise);
await db.removeEntities(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) });

if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);

dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};
16 changes: 16 additions & 0 deletions packages/erc20-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getStateSyncStatus(repo);
}

async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
const repo = queryRunner.manager.getRepository(StateSyncStatus);

return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force);
}

async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
const repo = queryRunner.manager.getRepository(StateSyncStatus);

return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force);
}

async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
return this._conn.getRepository(Balance)
.createQueryBuilder('balance')
Expand Down Expand Up @@ -285,6 +297,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}

async deleteEntitiesByConditions<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions<Entity>): Promise<void> {
await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions);
}

async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
}
Expand Down
7 changes: 7 additions & 0 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';
import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';

const log = debug('vulcanize:indexer');

Expand Down Expand Up @@ -499,6 +501,11 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}

async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [Allowance, Balance];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}

async _saveBlockAndFetchEvents ({
id,
cid: blockCid,
Expand Down
2 changes: 2 additions & 0 deletions packages/erc20-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ export class JobRunner {
}

async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
}
Expand Down
2 changes: 2 additions & 0 deletions packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ export const main = async (): Promise<any> => {

if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/environments/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@
jobDelayInMilliSecs = 1000
eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
68 changes: 1 addition & 67 deletions packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,72 +72,6 @@ export const handler = async (argv: any): Promise<void> => {

const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);

const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
const [blockProgress] = blockProgresses;

const dbTx = await db.createTransactionRunner();

try {
const removeEntitiesPromise = [
BlockProgress,
Factory,
Bundle,
Pool,
Mint,
Burn,
Swap,
PositionSnapshot,
Position,
Token,
PoolDayData,
PoolHourData,
Tick,
TickDayData,
TokenDayData,
TokenHourData,
Transaction,
UniswapDayData
].map(async entityClass => {
return db.deleteEntitiesByConditions<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});

await Promise.all(removeEntitiesPromise);
await db.deleteEntitiesByConditions(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) });

if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
}

const stateSyncStatus = await indexer.getStateSyncStatus();

if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true);
}

if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) {
await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true);
}
}

await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true);

dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}

await indexer.resetWatcherToBlock(argv.blockNumber);
log('Reset watcher successfully');
};
6 changes: 3 additions & 3 deletions packages/uni-info-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ export interface CachedEntities {
const ENTITY_QUERY_TYPE_MAP = new Map<new() => any, ENTITY_QUERY_TYPE>([
[Bundle, ENTITY_QUERY_TYPE.SINGULAR],
[Factory, ENTITY_QUERY_TYPE.SINGULAR],
[Pool, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Token, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Pool, ENTITY_QUERY_TYPE.GROUP_BY],
[Token, ENTITY_QUERY_TYPE.GROUP_BY],
[Burn, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Mint, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Swap, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Transaction, ENTITY_QUERY_TYPE.DISTINCT_ON],
[Transaction, ENTITY_QUERY_TYPE.UNIQUE],
[TokenDayData, ENTITY_QUERY_TYPE.DISTINCT_ON],
[TokenHourData, ENTITY_QUERY_TYPE.DISTINCT_ON],
[PoolDayData, ENTITY_QUERY_TYPE.DISTINCT_ON],
Expand Down
58 changes: 42 additions & 16 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,34 @@ import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing';
import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates';
import { Token } from './entity/Token';
import { convertTokenToDecimal, loadFactory, loadTransaction, safeDiv, Block } from './utils';
import { createTick, feeTierToTickSpacing } from './utils/tick';
import { ADDRESS_ZERO, FACTORY_ADDRESS, WATCHED_CONTRACTS } from './utils/constants';
import { Position } from './entity/Position';
import { ADDRESS_ZERO, FACTORY_ADDRESS, FIRST_GRAFT_BLOCK, WATCHED_CONTRACTS } from './utils/constants';
import { Database, DEFAULT_LIMIT } from './database';
import { Event } from './entity/Event';
import { ResultEvent, Transaction, PoolCreatedEvent, InitializeEvent, MintEvent, BurnEvent, SwapEvent, IncreaseLiquidityEvent, DecreaseLiquidityEvent, CollectEvent, TransferEvent, FlashEvent } from './events';
import { Factory } from './entity/Factory';
import { Token } from './entity/Token';
import { Bundle } from './entity/Bundle';
import { Pool } from './entity/Pool';
import { Mint } from './entity/Mint';
import { Burn } from './entity/Burn';
import { Swap } from './entity/Swap';
import { Position } from './entity/Position';
import { PositionSnapshot } from './entity/PositionSnapshot';
import { Tick } from './entity/Tick';
import { PoolDayData } from './entity/PoolDayData';
import { PoolHourData } from './entity/PoolHourData';
import { UniswapDayData } from './entity/UniswapDayData';
import { TokenDayData } from './entity/TokenDayData';
import { TokenHourData } from './entity/TokenHourData';
import { TickDayData } from './entity/TickDayData';
import { Collect } from './entity/Collect';
import { Flash } from './entity/Flash';
import { TickHourData } from './entity/TickHourData';
import { Transaction as TransactionEntity } from './entity/Transaction';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { Tick } from './entity/Tick';
import { Contract, KIND_POOL } from './entity/Contract';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';
Expand Down Expand Up @@ -587,6 +597,11 @@ export class Indexer implements IndexerInterface {
this._subgraphStateMap.set(contractAddress, updatedData);
}

async resetWatcherToBlock (blockNumber: number): Promise<void> {
const entities = [Factory, Token, Bundle, Pool, Mint, Burn, Swap, Position, PositionSnapshot, Tick, PoolDayData, PoolHourData, UniswapDayData, TokenDayData, TokenHourData, TickDayData, Collect, Flash, TickHourData, TransactionEntity];
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}

async _saveBlockAndFetchEvents ({
id,
cid: blockCid,
Expand Down Expand Up @@ -813,6 +828,13 @@ export class Indexer implements IndexerInterface {
pool.createdAtBlockNumber = BigInt(block.number);
pool = await this._db.savePool(dbTx, pool, block);

if (block.number >= 13450924) {
// Temp workaround to fix mismatch of Factory totalVolumeUSD and totalFeesUSD values with hosted subgraph endpoint
if (!WHITELIST_TOKENS.includes('0x4dd28568d05f09b02220b09c2cb307bfd837cb95')) {
WHITELIST_TOKENS.push('0x4dd28568d05f09b02220b09c2cb307bfd837cb95');
}
}

// Update white listed pools.
if (WHITELIST_TOKENS.includes(token0.id) || this._isDemo) {
token1.whitelistPools.push(pool.id);
Expand Down Expand Up @@ -1462,12 +1484,16 @@ export class Indexer implements IndexerInterface {
poolDayData.pool = pool.id;
await this._db.savePoolDayData(dbTx, poolDayData, block);

if (block.number > FIRST_GRAFT_BLOCK) {
await this._db.savePoolHourData(dbTx, poolHourData, block);
}

// Update inner vars of current or crossed ticks.
const newTick = pool.tick;
// Check that the tick value is not null (can be zero).
assert(newTick !== null);

const tickSpacing = feeTierToTickSpacing(pool.feeTier);
const tickSpacing = feeTierToTickSpacing(pool.feeTier, block);
const modulo = newTick % tickSpacing;

if (modulo === BigInt(0)) {
Expand Down Expand Up @@ -1625,7 +1651,6 @@ export class Indexer implements IndexerInterface {
position.withdrawnToken1 = position.withdrawnToken1.plus(amount1);

await this._db.savePosition(dbTx, position, block);

await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
Expand Down Expand Up @@ -1653,21 +1678,22 @@ export class Indexer implements IndexerInterface {
const dbTx = await this._db.createTransactionRunner();

try {
const [token0, token1] = await Promise.all([
this._db.getToken(dbTx, { id: position.token0, blockHash: block.hash }),
this._db.getToken(dbTx, { id: position.token1, blockHash: block.hash })
]);
if (block.number <= FIRST_GRAFT_BLOCK) {
const [token0, token1] = await Promise.all([
this._db.getToken(dbTx, { id: position.token0, blockHash: block.hash }),
this._db.getToken(dbTx, { id: position.token1, blockHash: block.hash })
]);

assert(token0 && token1);
assert(token0 && token1);

const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals));
const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals));
const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals));

position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0);
position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1);
position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0);
position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1);
}

await this._db.savePosition(dbTx, position, block);

await this._savePositionSnapshot(dbTx, position, block, tx);
await dbTx.commitTransaction();
} catch (error) {
Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ export class JobRunner {
}

async start (): Promise<void> {
await this._jobQueue.deleteAllJobs();
await this._baseJobRunner.resetToPrevIndexedBlock();
await this.subscribeBlockProcessingQueue();
await this.subscribeEventProcessingQueue();
}
Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ export const main = async (): Promise<any> => {

const pubSub = new PubSub();
const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubSub, jobQueue);
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();

const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);
Expand Down
2 changes: 2 additions & 0 deletions packages/uni-info-watcher/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export const FACTORY_ADDRESS = '0x1F98431c8aD98523631AE4a59f267346ea31F984';
export const NFPM_ADDRESS = '0xC36442b4a4522E871399CD717aBDD847Ab11FE88';
export const BUNDLE_ID = '1';

export const FIRST_GRAFT_BLOCK = 13591197;

export const WATCHED_CONTRACTS = [
{
kind: KIND_FACTORY,
Expand Down
Loading