From da397dd8e1127cddeaa9ec359f4d84d4130496fe Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 3 Nov 2022 03:32:18 -0500 Subject: [PATCH] Reset watcher to previous indexed block on start (#377) * Get block from cache and save to db if not found * Reset watcher to previous indexed block before start * Add missing uniswap entities for reset watcher * Config changes to run smoke tests * Add step for linking watcher-ts/graph-node * Implement changes at graft block height (#6) * Implement changes at graft block * Implement workarounds to fix mismatches * Add unique query for transaction table * Apply where condition for id in inner subquery Co-authored-by: nabarun Co-authored-by: nikugogoi <95nikass@gmail.com> --- README.md | 2 + .../src/cli/reset-cmds/watcher.ts | 34 +----- packages/erc20-watcher/src/database.ts | 16 +++ packages/erc20-watcher/src/indexer.ts | 7 ++ packages/erc20-watcher/src/job-runner.ts | 2 + packages/erc20-watcher/src/server.ts | 2 + .../uni-info-watcher/environments/test.toml | 2 + .../src/cli/reset-cmds/watcher.ts | 68 +----------- packages/uni-info-watcher/src/database.ts | 6 +- packages/uni-info-watcher/src/indexer.ts | 58 +++++++--- packages/uni-info-watcher/src/job-runner.ts | 2 + packages/uni-info-watcher/src/server.ts | 2 + .../uni-info-watcher/src/utils/constants.ts | 2 + .../uni-info-watcher/src/utils/pricing.ts | 101 ++++++++++++------ packages/uni-info-watcher/src/utils/tick.ts | 6 +- packages/uni-watcher/environments/test.toml | 2 + .../uni-watcher/src/cli/reset-cmds/watcher.ts | 33 +----- packages/uni-watcher/src/database.ts | 12 +++ packages/uni-watcher/src/indexer.ts | 4 + packages/uni-watcher/src/job-runner.ts | 2 + packages/uni-watcher/src/server.ts | 2 + packages/util/src/events.ts | 8 +- packages/util/src/job-runner.ts | 30 ++++-- packages/util/test/actions.ts | 3 +- 24 files changed, 211 insertions(+), 195 deletions(-) diff --git a/README.md b/README.md index c6fc65c0..7107667a 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ This project uses [yarn workspaces](https://classic.yarnpkg.com/en/docs/workspac cd packages/util && yarn link && cd ../.. cd packages/ipld-eth-client && yarn link && cd ../.. cd packages/solidity-mapper && yarn link && cd ../.. + cd packages/graph-node && yarn link && cd ../.. # Workaround for typeorm dependency issue when using yarn link cd node_modules/typeorm && yarn link && cd ../.. ``` @@ -32,6 +33,7 @@ This project uses [yarn workspaces](https://classic.yarnpkg.com/en/docs/workspac yarn link "@cerc-io/util" yarn link "@cerc-io/ipld-eth-client" yarn link "@cerc-io/solidity-mapper" + yarn link "@cerc-io/graph-node" yarn link "typeorm" ``` diff --git a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts index 7f66d066..f394c994 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts @@ -49,38 +49,6 @@ export const handler = async (argv: any): Promise => { const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const removeEntitiesPromise = [BlockProgress, Allowance, Balance].map(async entityClass => { - return db.removeEntities(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); - }); - - await Promise.all(removeEntitiesPromise); - await db.removeEntities(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) }); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index 5625f668..34507864 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -85,6 +85,18 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getStateSyncStatus(repo); } + async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); + + return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force); + } + + async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); + + return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force); + } + async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise { return this._conn.getRepository(Balance) .createQueryBuilder('balance') @@ -285,6 +297,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.removeEntities(queryRunner, entity, findConditions); } + async deleteEntitiesByConditions (queryRunner: QueryRunner, entity: new () => Entity, findConditions: FindConditions): Promise { + await this._baseDatabase.deleteEntitiesByConditions(queryRunner, entity, findConditions); + } + async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._baseDatabase.getAncestorAtDepth(blockHash, depth); } diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 5a1df0e7..aecc4728 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -31,6 +31,8 @@ import { BlockProgress } from './entity/BlockProgress'; import { Contract } from './entity/Contract'; import { State } from './entity/State'; import { StateSyncStatus } from './entity/StateSyncStatus'; +import { Allowance } from './entity/Allowance'; +import { Balance } from './entity/Balance'; const log = debug('vulcanize:indexer'); @@ -499,6 +501,11 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [Allowance, Balance]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + async _saveBlockAndFetchEvents ({ id, cid: blockCid, diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index 7400de3f..5454df17 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -43,6 +43,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); } diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index c871e638..b1fefd81 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -83,6 +83,8 @@ export const main = async (): Promise => { if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); } diff --git a/packages/uni-info-watcher/environments/test.toml b/packages/uni-info-watcher/environments/test.toml index 6c0d2fa6..0f9a870a 100644 --- a/packages/uni-info-watcher/environments/test.toml +++ b/packages/uni-info-watcher/environments/test.toml @@ -39,3 +39,5 @@ jobDelayInMilliSecs = 1000 eventsInBatch = 50 blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts b/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts index 39f706b7..3082934c 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts @@ -72,72 +72,6 @@ export const handler = async (argv: any): Promise => { const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - const removeEntitiesPromise = [ - BlockProgress, - Factory, - Bundle, - Pool, - Mint, - Burn, - Swap, - PositionSnapshot, - Position, - Token, - PoolDayData, - PoolHourData, - Tick, - TickDayData, - TokenDayData, - TokenHourData, - Transaction, - UniswapDayData - ].map(async entityClass => { - return db.deleteEntitiesByConditions(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); - }); - - await Promise.all(removeEntitiesPromise); - await db.deleteEntitiesByConditions(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) }); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - const stateSyncStatus = await indexer.getStateSyncStatus(); - - if (stateSyncStatus) { - if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); - } - - if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { - await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); - } - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 53ebe068..3cd9497b 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -79,12 +79,12 @@ export interface CachedEntities { const ENTITY_QUERY_TYPE_MAP = new Map any, ENTITY_QUERY_TYPE>([ [Bundle, ENTITY_QUERY_TYPE.SINGULAR], [Factory, ENTITY_QUERY_TYPE.SINGULAR], - [Pool, ENTITY_QUERY_TYPE.DISTINCT_ON], - [Token, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Pool, ENTITY_QUERY_TYPE.GROUP_BY], + [Token, ENTITY_QUERY_TYPE.GROUP_BY], [Burn, ENTITY_QUERY_TYPE.DISTINCT_ON], [Mint, ENTITY_QUERY_TYPE.DISTINCT_ON], [Swap, ENTITY_QUERY_TYPE.DISTINCT_ON], - [Transaction, ENTITY_QUERY_TYPE.DISTINCT_ON], + [Transaction, ENTITY_QUERY_TYPE.UNIQUE], [TokenDayData, ENTITY_QUERY_TYPE.DISTINCT_ON], [TokenHourData, ENTITY_QUERY_TYPE.DISTINCT_ON], [PoolDayData, ENTITY_QUERY_TYPE.DISTINCT_ON], diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index c31257db..bc154397 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -34,24 +34,34 @@ import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper'; import { findEthPerToken, getEthPriceInUSD, getTrackedAmountUSD, sqrtPriceX96ToTokenPrices, WHITELIST_TOKENS } from './utils/pricing'; import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDayData, updateTokenHourData, updateUniswapDayData } from './utils/interval-updates'; -import { Token } from './entity/Token'; import { convertTokenToDecimal, loadFactory, loadTransaction, safeDiv, Block } from './utils'; import { createTick, feeTierToTickSpacing } from './utils/tick'; -import { ADDRESS_ZERO, FACTORY_ADDRESS, WATCHED_CONTRACTS } from './utils/constants'; -import { Position } from './entity/Position'; +import { ADDRESS_ZERO, FACTORY_ADDRESS, FIRST_GRAFT_BLOCK, WATCHED_CONTRACTS } from './utils/constants'; import { Database, DEFAULT_LIMIT } from './database'; import { Event } from './entity/Event'; import { ResultEvent, Transaction, PoolCreatedEvent, InitializeEvent, MintEvent, BurnEvent, SwapEvent, IncreaseLiquidityEvent, DecreaseLiquidityEvent, CollectEvent, TransferEvent, FlashEvent } from './events'; import { Factory } from './entity/Factory'; +import { Token } from './entity/Token'; import { Bundle } from './entity/Bundle'; import { Pool } from './entity/Pool'; import { Mint } from './entity/Mint'; import { Burn } from './entity/Burn'; import { Swap } from './entity/Swap'; +import { Position } from './entity/Position'; import { PositionSnapshot } from './entity/PositionSnapshot'; +import { Tick } from './entity/Tick'; +import { PoolDayData } from './entity/PoolDayData'; +import { PoolHourData } from './entity/PoolHourData'; +import { UniswapDayData } from './entity/UniswapDayData'; +import { TokenDayData } from './entity/TokenDayData'; +import { TokenHourData } from './entity/TokenHourData'; +import { TickDayData } from './entity/TickDayData'; +import { Collect } from './entity/Collect'; +import { Flash } from './entity/Flash'; +import { TickHourData } from './entity/TickHourData'; +import { Transaction as TransactionEntity } from './entity/Transaction'; import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; -import { Tick } from './entity/Tick'; import { Contract, KIND_POOL } from './entity/Contract'; import { State } from './entity/State'; import { StateSyncStatus } from './entity/StateSyncStatus'; @@ -587,6 +597,11 @@ export class Indexer implements IndexerInterface { this._subgraphStateMap.set(contractAddress, updatedData); } + async resetWatcherToBlock (blockNumber: number): Promise { + const entities = [Factory, Token, Bundle, Pool, Mint, Burn, Swap, Position, PositionSnapshot, Tick, PoolDayData, PoolHourData, UniswapDayData, TokenDayData, TokenHourData, TickDayData, Collect, Flash, TickHourData, TransactionEntity]; + await this._baseIndexer.resetWatcherToBlock(blockNumber, entities); + } + async _saveBlockAndFetchEvents ({ id, cid: blockCid, @@ -813,6 +828,13 @@ export class Indexer implements IndexerInterface { pool.createdAtBlockNumber = BigInt(block.number); pool = await this._db.savePool(dbTx, pool, block); + if (block.number >= 13450924) { + // Temp workaround to fix mismatch of Factory totalVolumeUSD and totalFeesUSD values with hosted subgraph endpoint + if (!WHITELIST_TOKENS.includes('0x4dd28568d05f09b02220b09c2cb307bfd837cb95')) { + WHITELIST_TOKENS.push('0x4dd28568d05f09b02220b09c2cb307bfd837cb95'); + } + } + // Update white listed pools. if (WHITELIST_TOKENS.includes(token0.id) || this._isDemo) { token1.whitelistPools.push(pool.id); @@ -1462,12 +1484,16 @@ export class Indexer implements IndexerInterface { poolDayData.pool = pool.id; await this._db.savePoolDayData(dbTx, poolDayData, block); + if (block.number > FIRST_GRAFT_BLOCK) { + await this._db.savePoolHourData(dbTx, poolHourData, block); + } + // Update inner vars of current or crossed ticks. const newTick = pool.tick; // Check that the tick value is not null (can be zero). assert(newTick !== null); - const tickSpacing = feeTierToTickSpacing(pool.feeTier); + const tickSpacing = feeTierToTickSpacing(pool.feeTier, block); const modulo = newTick % tickSpacing; if (modulo === BigInt(0)) { @@ -1625,7 +1651,6 @@ export class Indexer implements IndexerInterface { position.withdrawnToken1 = position.withdrawnToken1.plus(amount1); await this._db.savePosition(dbTx, position, block); - await this._savePositionSnapshot(dbTx, position, block, tx); await dbTx.commitTransaction(); } catch (error) { @@ -1653,21 +1678,22 @@ export class Indexer implements IndexerInterface { const dbTx = await this._db.createTransactionRunner(); try { - const [token0, token1] = await Promise.all([ - this._db.getToken(dbTx, { id: position.token0, blockHash: block.hash }), - this._db.getToken(dbTx, { id: position.token1, blockHash: block.hash }) - ]); + if (block.number <= FIRST_GRAFT_BLOCK) { + const [token0, token1] = await Promise.all([ + this._db.getToken(dbTx, { id: position.token0, blockHash: block.hash }), + this._db.getToken(dbTx, { id: position.token1, blockHash: block.hash }) + ]); - assert(token0 && token1); + assert(token0 && token1); - const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); - const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); + const amount0 = convertTokenToDecimal(BigInt(event.amount0), BigInt(token0.decimals)); + const amount1 = convertTokenToDecimal(BigInt(event.amount1), BigInt(token1.decimals)); - position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0); - position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1); + position.collectedFeesToken0 = position.collectedFeesToken0.plus(amount0); + position.collectedFeesToken1 = position.collectedFeesToken1.plus(amount1); + } await this._db.savePosition(dbTx, position, block); - await this._savePositionSnapshot(dbTx, position, block, tx); await dbTx.commitTransaction(); } catch (error) { diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 5f09795f..aaa93016 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -45,6 +45,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); } diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 5fa6a95d..4849a5ac 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -89,6 +89,8 @@ export const main = async (): Promise => { const pubSub = new PubSub(); const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubSub, jobQueue); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); diff --git a/packages/uni-info-watcher/src/utils/constants.ts b/packages/uni-info-watcher/src/utils/constants.ts index e7b5729b..00b49cc1 100644 --- a/packages/uni-info-watcher/src/utils/constants.ts +++ b/packages/uni-info-watcher/src/utils/constants.ts @@ -12,6 +12,8 @@ export const FACTORY_ADDRESS = '0x1F98431c8aD98523631AE4a59f267346ea31F984'; export const NFPM_ADDRESS = '0xC36442b4a4522E871399CD717aBDD847Ab11FE88'; export const BUNDLE_ID = '1'; +export const FIRST_GRAFT_BLOCK = 13591197; + export const WATCHED_CONTRACTS = [ { kind: KIND_FACTORY, diff --git a/packages/uni-info-watcher/src/utils/pricing.ts b/packages/uni-info-watcher/src/utils/pricing.ts index 334b506e..36412164 100644 --- a/packages/uni-info-watcher/src/utils/pricing.ts +++ b/packages/uni-info-watcher/src/utils/pricing.ts @@ -12,6 +12,7 @@ import { exponentToBigDecimal, safeDiv } from '.'; import { Database } from '../database'; import { Token } from '../entity/Token'; import { Block } from '../events'; +import { FIRST_GRAFT_BLOCK } from './constants'; // TODO: Move constants to config. const WETH_ADDRESS = '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'; @@ -44,7 +45,17 @@ export const WHITELIST_TOKENS: string[] = [ '0x7fc66500c84a76ad7e9c93437bfc5ac33e2ddae9' // AAVE ]; +const STABLE_COINS: string[] = [ + '0x6b175474e89094c44da98b954eedeac495271d0f', + '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48', + '0xdac17f958d2ee523a2206206994597c13d831ec7', + '0x0000000000085d4780b73119b644ae5ecd22b376', + '0x956f47f50a910163d8bf957cf5846d573e7f87ca', + '0x4dd28568d05f09b02220b09c2cb307bfd837cb95' +]; + const MINIMUM_ETH_LOCKED = new GraphDecimal(52); +const HOT_FIX_MIN_ETH_LOCKED = new GraphDecimal(30); const Q192 = 2 ** 192; // Constants used in demo. @@ -96,37 +107,60 @@ export const findEthPerToken = async (db: Database, dbTx: QueryRunner, token: To let largestLiquidityETH = new GraphDecimal(0); let priceSoFar = new GraphDecimal(0); - for (let i = 0; i < whiteList.length; ++i) { - const poolAddress = whiteList[i]; - const pool = await db.getPool(dbTx, { id: poolAddress, blockHash: block.hash }); - assert(pool); - - if (BigNumber.from(pool.liquidity).gt(0)) { - if (pool.token0 === token.id) { - // whitelist token is token1 - const token1 = await db.getToken(dbTx, { id: pool.token1, blockHash: block.hash }); - assert(token1); - - // get the derived ETH in pool - const ethLocked = pool.totalValueLockedToken1.times(token1.derivedETH); - - if (ethLocked.gt(largestLiquidityETH) && ethLocked.gt(MINIMUM_ETH_LOCKED)) { - largestLiquidityETH = ethLocked; - // token1 per our token * Eth per token1 - priceSoFar = pool.token1Price.times(token1.derivedETH); + // hardcoded fix for incorrect rates + // if whitelist includes token - get the safe price + if (block.number > FIRST_GRAFT_BLOCK && STABLE_COINS.includes(token.id)) { + const bundle = await db.getBundle(dbTx, { id: '1', blockHash: block.hash }); + assert(bundle); + priceSoFar = safeDiv(new GraphDecimal(1), bundle.ethPriceUSD); + } else { + for (let i = 0; i < whiteList.length; ++i) { + const poolAddress = whiteList[i]; + const pool = await db.getPool(dbTx, { id: poolAddress, blockHash: block.hash }); + assert(pool); + + if (BigNumber.from(pool.liquidity).gt(0)) { + if (pool.token0 === token.id) { + // whitelist token is token1 + const token1 = await db.getToken(dbTx, { id: pool.token1, blockHash: block.hash }); + assert(token1); + + // get the derived ETH in pool + const ethLocked = pool.totalValueLockedToken1.times(token1.derivedETH); + + if ( + ethLocked.gt(largestLiquidityETH) && + // Temp workaround to fix mismatch of Token derivedEth value with hosted subgraph endpoint + ( + (block.number >= 13450893 && ethLocked.gt(HOT_FIX_MIN_ETH_LOCKED)) || + (block.number >= 13450924 && token.id === '0x4dd28568d05f09b02220b09c2cb307bfd837cb95') || + ethLocked.gt(MINIMUM_ETH_LOCKED) + ) + ) { + largestLiquidityETH = ethLocked; + // token1 per our token * Eth per token1 + priceSoFar = pool.token1Price.times(token1.derivedETH); + } } - } - if (pool.token1 === token.id) { - const token0 = await db.getToken(dbTx, { id: pool.token0, blockHash: block.hash }); - assert(token0); - - // get the derived ETH in pool - const ethLocked = pool.totalValueLockedToken0.times(token0.derivedETH); - - if (ethLocked.gt(largestLiquidityETH) && ethLocked.gt(MINIMUM_ETH_LOCKED)) { - largestLiquidityETH = ethLocked; - // token0 per our token * ETH per token0 - priceSoFar = pool.token0Price.times(token0.derivedETH); + if (pool.token1 === token.id) { + const token0 = await db.getToken(dbTx, { id: pool.token0, blockHash: block.hash }); + assert(token0); + + // get the derived ETH in pool + const ethLocked = pool.totalValueLockedToken0.times(token0.derivedETH); + + if ( + ethLocked.gt(largestLiquidityETH) && + // Temp workaround to fix mismatch of Token derivedEth value with hosted subgraph endpoint + ( + (block.number >= 13450893 && ethLocked.gt(HOT_FIX_MIN_ETH_LOCKED)) || + ethLocked.gt(MINIMUM_ETH_LOCKED) + ) + ) { + largestLiquidityETH = ethLocked; + // token0 per our token * ETH per token0 + priceSoFar = pool.token0Price.times(token0.derivedETH); + } } } } @@ -156,6 +190,13 @@ export const getTrackedAmountUSD = async ( const price0USD = token0.derivedETH.times(bundle.ethPriceUSD); const price1USD = token1.derivedETH.times(bundle.ethPriceUSD); + if (block.number >= 13450924) { + // Temp workaround to fix mismatch of Factory totalVolumeUSD and totalFeesUSD values with hosted subgraph endpoint + if (!WHITELIST_TOKENS.includes('0x4dd28568d05f09b02220b09c2cb307bfd837cb95')) { + WHITELIST_TOKENS.push('0x4dd28568d05f09b02220b09c2cb307bfd837cb95'); + } + } + // Both are whitelist tokens, return sum of both amounts. // Use demo mode if ((WHITELIST_TOKENS.includes(token0.id) && WHITELIST_TOKENS.includes(token1.id)) || isDemo) { diff --git a/packages/uni-info-watcher/src/utils/tick.ts b/packages/uni-info-watcher/src/utils/tick.ts index 9186f385..029e7a44 100644 --- a/packages/uni-info-watcher/src/utils/tick.ts +++ b/packages/uni-info-watcher/src/utils/tick.ts @@ -11,6 +11,7 @@ import { Database } from '../database'; import { bigDecimalExponated, safeDiv } from '.'; import { Tick } from '../entity/Tick'; import { Block } from '../events'; +import { FIRST_GRAFT_BLOCK } from './constants'; export const createTick = async (db: Database, dbTx: QueryRunner, tickId: string, tickIdx: bigint, pool: Pool, block: Block): Promise => { const tick = new Tick(); @@ -30,7 +31,7 @@ export const createTick = async (db: Database, dbTx: QueryRunner, tickId: string return db.saveTick(dbTx, tick, block); }; -export const feeTierToTickSpacing = (feeTier: bigint): bigint => { +export const feeTierToTickSpacing = (feeTier: bigint, block: Block): bigint => { if (feeTier === BigInt(10000)) { return BigInt(200); } @@ -40,6 +41,9 @@ export const feeTierToTickSpacing = (feeTier: bigint): bigint => { if (feeTier === BigInt(500)) { return BigInt(10); } + if (block.number > FIRST_GRAFT_BLOCK && feeTier === BigInt(100)) { + return BigInt(1); + } throw Error('Unexpected fee tier'); }; diff --git a/packages/uni-watcher/environments/test.toml b/packages/uni-watcher/environments/test.toml index 7a0a7f78..42b97416 100644 --- a/packages/uni-watcher/environments/test.toml +++ b/packages/uni-watcher/environments/test.toml @@ -28,3 +28,5 @@ jobDelayInMilliSecs = 100 eventsInBatch = 50 blockDelayInMilliSecs = 2000 + prefetchBlocksInMem = true + prefetchBlockCount = 10 diff --git a/packages/uni-watcher/src/cli/reset-cmds/watcher.ts b/packages/uni-watcher/src/cli/reset-cmds/watcher.ts index 05bc501b..ec034bed 100644 --- a/packages/uni-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/uni-watcher/src/cli/reset-cmds/watcher.ts @@ -44,37 +44,6 @@ export const handler = async (argv: any): Promise => { const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue); - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - - const dbTx = await db.createTransactionRunner(); - - try { - await db.deleteEntitiesByConditions(dbTx, BlockProgress, { blockNumber: MoreThan(blockProgress.blockNumber) }); - await db.deleteEntitiesByConditions(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) }); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } - - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - + await indexer.resetWatcherToBlock(argv.blockNumber); log('Reset watcher successfully'); }; diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index e7352825..7364df33 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -80,6 +80,18 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getStateSyncStatus(repo); } + async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); + + return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force); + } + + async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); + + return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force); + } + async getLatestContract (kind: string): Promise { return this._conn.getRepository(Contract) .createQueryBuilder('contract') diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 67e43ebe..81f5ad05 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -546,6 +546,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } + async resetWatcherToBlock (blockNumber: number): Promise { + await this._baseIndexer.resetWatcherToBlock(blockNumber, []); + } + async _saveBlockAndFetchEvents ({ id, cid: blockCid, diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 69d9c65f..691e7f7e 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -43,6 +43,8 @@ export class JobRunner { } async start (): Promise { + await this._jobQueue.deleteAllJobs(); + await this._baseJobRunner.resetToPrevIndexedBlock(); await this.subscribeBlockProcessingQueue(); await this.subscribeEventProcessingQueue(); } diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index 2e07f60f..ff4afe2f 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -79,6 +79,8 @@ export const main = async (): Promise => { await indexer.init(); const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); + // Delete jobs to prevent creating jobs after completion of processing previous block. + await jobQueue.deleteAllJobs(); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index c8919bbf..1f6e2556 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -64,7 +64,7 @@ export class EventWatcher { startBlockNumber = syncStatus.chainHeadBlockNumber + 1; } - processBlockByNumberWithCache(this._jobQueue, startBlockNumber); + await processBlockByNumberWithCache(this._jobQueue, startBlockNumber); // Creating an AsyncIterable from AsyncIterator to iterate over the values. // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of @@ -145,6 +145,12 @@ export class EventWatcher { log(`Job onComplete indexing blocks at height ${blockNumber}`); const blockProgressEntities = await this._indexer.getBlocksAtHeight(Number(blockNumber), false); + + if (blockProgressEntities.length === 0) { + log(`block not indexed at height ${blockNumber}`); + return; + } + const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockProgressEntities[0].blockHash, Number(blockNumber)); // Create pruning job if required. diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 21be4546..ccb5b4e2 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -4,7 +4,7 @@ import assert from 'assert'; import debug from 'debug'; -import { DeepPartial, In } from 'typeorm'; +import { In } from 'typeorm'; import { JobQueueConfig, @@ -26,7 +26,6 @@ import { import { JobQueue } from './job-queue'; import { EventInterface, IndexerInterface } from './types'; -import { wait } from './misc'; const log = debug('vulcanize:job-runner'); @@ -93,6 +92,17 @@ export class JobRunner { await this._jobQueue.markComplete(job); } + async resetToPrevIndexedBlock (): Promise { + const syncStatus = await this._indexer.getSyncStatus(); + + if (syncStatus) { + // Resetting to block before latest indexed as all events might not be processed in latest indexed block. + // Reprocessing of events in subgraph watchers is not possible as DB transaction is not implemented. + // TODO: Check updating latestIndexedBlock after blockProgress.isComplete is set to true. + await this._indexer.resetWatcherToBlock(syncStatus.latestIndexedBlockNumber - 1); + } + } + async _pruneChain (job: any): Promise { console.time('time:job-runner#_pruneChain'); @@ -147,7 +157,7 @@ export class JobRunner { assert(syncStatus); const { data: { priority } } = job; - const { cid, blockHash, blockNumber, parentHash, blockTimestamp } = blockToBeIndexed; + const { blockHash, blockNumber, parentHash } = blockToBeIndexed; const indexBlockStartTime = new Date(); @@ -249,19 +259,17 @@ export class JobRunner { if (!blockProgress) { const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash); - const block = { cid, blockHash, blockNumber, parentHash, blockTimestamp }; if (!prefetchedBlock) { - // Delay required to process block. - const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; - await wait(jobDelayInMilliSecs); - - let events: DeepPartial[]; - [blockProgress, events] = await this._indexer.saveBlockAndFetchEvents(block); - this._prefetchedBlocksMap.set(block.blockHash, { block: blockProgress, events }); + const message = `BlockProgress entity not found found in db and prefetchedBlocksMap for ${blockNumber}`; + throw new Error(message); + } else { + ({ block: blockProgress } = prefetchedBlock); + blockProgress = await this._indexer.saveBlockProgress(blockProgress); } } + assert(blockProgress); await this._indexer.processBlock(blockProgress); this._blockNumEvents = blockProgress.numEvents; diff --git a/packages/util/test/actions.ts b/packages/util/test/actions.ts index dde60726..6c163575 100644 --- a/packages/util/test/actions.ts +++ b/packages/util/test/actions.ts @@ -165,7 +165,8 @@ export const insertDummyBlock = async (db: DatabaseInterface, parentBlock: any): blockTimestamp, parentHash, numEvents: 0, - isComplete: true + isComplete: true, + cid: '' }; await db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber);