diff --git a/packages/erc20-watcher/src/cli/reset-cmds/state.ts b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts similarity index 94% rename from packages/erc20-watcher/src/cli/reset-cmds/state.ts rename to packages/erc20-watcher/src/cli/reset-cmds/watcher.ts index 60c35b8a..7f66d066 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/state.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/watcher.ts @@ -15,11 +15,11 @@ import { Allowance } from '../../entity/Allowance'; import { Balance } from '../../entity/Balance'; import { Contract } from '../../entity/Contract'; -const log = debug('vulcanize:reset-state'); +const log = debug('vulcanize:reset-watcher'); -export const command = 'state'; +export const command = 'watcher'; -export const desc = 'Reset state to block number'; +export const desc = 'Reset watcher to a block number'; export const builder = { blockNumber: { @@ -82,5 +82,5 @@ export const handler = async (argv: any): Promise => { await dbTx.release(); } - log('Reset state successfully'); + log('Reset watcher successfully'); }; diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index f5945954..5625f668 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -17,8 +17,8 @@ import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; -import { IPLDBlock } from './entity/IPLDBlock'; -import { IpldStatus } from './entity/IpldStatus'; +import { State } from './entity/State'; +import { StateSyncStatus } from './entity/StateSyncStatus'; export class Database implements DatabaseInterface { _config: ConnectionOptions @@ -44,45 +44,45 @@ export class Database implements DatabaseInterface { return this._baseDatabase.close(); } - getNewIPLDBlock (): IPLDBlock { - return new IPLDBlock(); + getNewState (): State { + return new State(); } - async getIPLDBlocks (where: FindConditions): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getStates (where: FindConditions): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getIPLDBlocks(repo, where); + return this._baseDatabase.getStates(repo, where); } - async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber); + return this._baseDatabase.getLatestState(repo, contractAddress, kind, blockNumber); } - // Fetch all diff IPLDBlocks after the specified block number. - async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise { - const repo = this._conn.getRepository(IPLDBlock); + // Fetch all diff States after the specified block number. + async getDiffStatesInRange (contractAddress: string, startblock: number, endBlock: number): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock); + return this._baseDatabase.getDiffStatesInRange(repo, contractAddress, startblock, endBlock); } - async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async saveOrUpdateState (dbTx: QueryRunner, state: State): Promise { + const repo = dbTx.manager.getRepository(State); - return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock); + return this._baseDatabase.saveOrUpdateState(repo, state); } - async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async removeStates (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { + const repo = dbTx.manager.getRepository(State); - await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind); + await this._baseDatabase.removeStates(repo, blockNumber, kind); } - async getIPLDStatus (): Promise { - const repo = this._conn.getRepository(IpldStatus); + async getStateSyncStatus (): Promise { + const repo = this._conn.getRepository(StateSyncStatus); - return this._baseDatabase.getIPLDStatus(repo); + return this._baseDatabase.getStateSyncStatus(repo); } async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise { diff --git a/packages/uni-info-watcher/src/entity/IPLDBlock.ts b/packages/erc20-watcher/src/entity/State.ts similarity index 96% rename from packages/uni-info-watcher/src/entity/IPLDBlock.ts rename to packages/erc20-watcher/src/entity/State.ts index bff1118c..7cf3e49b 100644 --- a/packages/uni-info-watcher/src/entity/IPLDBlock.ts +++ b/packages/erc20-watcher/src/entity/State.ts @@ -12,7 +12,7 @@ import { BlockProgress } from './BlockProgress'; @Index(['cid'], { unique: true }) @Index(['block', 'contractAddress']) @Index(['block', 'contractAddress', 'kind'], { unique: true }) -export class IPLDBlock { +export class State { @PrimaryGeneratedColumn() id!: number; diff --git a/packages/uni-info-watcher/src/entity/IpldStatus.ts b/packages/erc20-watcher/src/entity/StateSyncStatus.ts similarity index 66% rename from packages/uni-info-watcher/src/entity/IpldStatus.ts rename to packages/erc20-watcher/src/entity/StateSyncStatus.ts index fb81069e..ef3e2fdc 100644 --- a/packages/uni-info-watcher/src/entity/IpldStatus.ts +++ b/packages/erc20-watcher/src/entity/StateSyncStatus.ts @@ -5,16 +5,13 @@ import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm'; @Entity() -export class IpldStatus { +export class StateSyncStatus { @PrimaryGeneratedColumn() id!: number; @Column('integer') - latestHooksBlockNumber!: number; + latestIndexedBlockNumber!: number; @Column('integer', { nullable: true }) latestCheckpointBlockNumber!: number; - - @Column('integer', { nullable: true }) - latestIPFSBlockNumber!: number; } diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 1c97ade7..8703c28a 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -11,7 +11,15 @@ import { ethers } from 'ethers'; import { JobQueue, IndexerInterface } from '@vulcanize/util'; import { EthClient } from '@cerc-io/ipld-eth-client'; -import { Indexer as BaseIndexer, IPFSClient, ServerConfig, IpldStatus as IpldStatusInterface, ValueResult, Where, QueryOptions, UNKNOWN_EVENT_NAME } from '@cerc-io/util'; +import { + Indexer as BaseIndexer, + ServerConfig, + StateStatus, + ValueResult, + Where, + QueryOptions, + UNKNOWN_EVENT_NAME +} from '@cerc-io/util'; import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper'; import { Database } from './database'; @@ -21,7 +29,8 @@ import { SyncStatus } from './entity/SyncStatus'; import artifacts from './artifacts/ERC20.json'; import { BlockProgress } from './entity/BlockProgress'; import { Contract } from './entity/Contract'; -import { IPLDBlock } from './entity/IPLDBlock'; +import { State } from './entity/State'; +import { StateSyncStatus } from './entity/StateSyncStatus'; const log = debug('vulcanize:indexer'); @@ -66,8 +75,7 @@ export class Indexer implements IndexerInterface { this._ethProvider = ethProvider; this._serverConfig = serverConfig; this._serverMode = this._serverConfig.mode; - const ipfsClient = new IPFSClient(serverConfig.ipfsApiAddr); - this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient); + this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue); const { abi, storageLayout } = artifacts; @@ -111,8 +119,8 @@ export class Indexer implements IndexerInterface { ); } - getIPLDData (ipldBlock: IPLDBlock): any { - return this._baseIndexer.getIPLDData(ipldBlock); + getStateData (state: State): any { + return this._baseIndexer.getStateData(state); } async totalSupply (blockHash: string, token: string): Promise { @@ -327,6 +335,14 @@ export class Indexer implements IndexerInterface { // Method for processing on indexing new block. } + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { + // TODO Implement + } + + async processCheckpoint (blockHash: string): Promise { + // TODO Implement + } + parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; @@ -362,6 +378,30 @@ export class Indexer implements IndexerInterface { return { eventName, eventInfo }; } + async getStateSyncStatus (): Promise { + return this._db.getStateSyncStatus(); + } + + async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise { + // TODO Implement + return {} as StateSyncStatus; + } + + async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise { + // TODO Implement + return {} as StateSyncStatus; + } + + async getLatestCanonicalBlock (): Promise { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); + + const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + assert(latestCanonicalBlock); + + return latestCanonicalBlock; + } + async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -374,8 +414,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.watchContract(address, CONTRACT_KIND, checkpoint, startingBlock); } - async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise { - await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus); + updateStateStatusMap (address: string, stateStatus: StateStatus): void { + this._baseIndexer.updateStateStatusMap(address, stateStatus); } async saveEventEntity (dbEvent: Event): Promise { @@ -438,7 +478,7 @@ export class Indexer implements IndexerInterface { } async fetchBlockWithEvents (block: DeepPartial): Promise { - // Method not used in uni-info-watcher but required for indexer interface. + // Method not used in erc20-watcher but required for indexer interface. return new BlockProgress(); } diff --git a/packages/uni-info-watcher/environments/local.toml b/packages/uni-info-watcher/environments/local.toml index 0c4a7b22..aa9f611c 100644 --- a/packages/uni-info-watcher/environments/local.toml +++ b/packages/uni-info-watcher/environments/local.toml @@ -11,9 +11,6 @@ # Checkpoint interval in number of blocks. checkpointInterval = 2000 - # IPFS API address (can be taken from the output on running the IPFS daemon). - # ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" - # Max block range for which to return events in eventsInRange GQL query. # Use -1 for skipping check on block range. maxEventsBlockRange = 1000 diff --git a/packages/uni-info-watcher/src/cli/fill-state.ts b/packages/uni-info-watcher/src/cli/fill-state.ts index 8147ae57..3822a52d 100644 --- a/packages/uni-info-watcher/src/cli/fill-state.ts +++ b/packages/uni-info-watcher/src/cli/fill-state.ts @@ -33,7 +33,7 @@ export const fillState = async ( log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`); // Check that there are no existing diffs in this range - const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } }); + const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } }); if (existingStates.length > 0) { log('found existing state(s) in the given range'); process.exit(1); @@ -97,26 +97,11 @@ export const fillState = async ( // Persist subgraph state to the DB await indexer.dumpSubgraphState(blockHash, true); - await indexer.updateIPLDStatusHooksBlock(blockNumber); + await indexer.updateStateSyncStatusIndexedBlock(blockNumber); // Create checkpoints await indexer.processCheckpoint(blockHash); - await indexer.updateIPLDStatusCheckpointBlock(blockNumber); - - // TODO: Push state to IPFS in separate process. - if (indexer.isIPFSConfigured()) { - // Get IPLDBlocks for the given blocHash. - const ipldBlocks = await indexer.getIPLDBlocksByHash(blockHash); - - // Push all the IPLDBlocks to IPFS. - for (const ipldBlock of ipldBlocks) { - const data = indexer.getIPLDData(ipldBlock); - await indexer.pushToIPFS(data); - } - - // Update the IPLD status. - await indexer.updateIPLDStatusIPFSBlock(blockNumber); - } + await indexer.updateStateSyncStatusCheckpointBlock(blockNumber); console.timeEnd(`time:fill-state-${blockNumber}`); } diff --git a/packages/uni-info-watcher/src/cli/inspect-cid.ts b/packages/uni-info-watcher/src/cli/inspect-cid.ts index f11eafb7..60c7eb1b 100644 --- a/packages/uni-info-watcher/src/cli/inspect-cid.ts +++ b/packages/uni-info-watcher/src/cli/inspect-cid.ts @@ -64,12 +64,12 @@ const main = async (): Promise => { const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); await indexer.init(); - const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid); - assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.'); + const state = await indexer.getStateByCID(argv.cid); + assert(state, 'State for the provided CID doesn\'t exist.'); - const ipldData = await indexer.getIPLDData(ipldBlock); + const stateData = await indexer.getStateData(state); - log(util.inspect(ipldData, false, null)); + log(util.inspect(stateData, false, null)); }; main().catch(err => { diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/ipld-state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/ipld-state.ts deleted file mode 100644 index 78f2a86e..00000000 --- a/packages/uni-info-watcher/src/cli/reset-cmds/ipld-state.ts +++ /dev/null @@ -1,66 +0,0 @@ -// -// Copyright 2022 Vulcanize, Inc. -// - -import debug from 'debug'; - -import { getConfig } from '@cerc-io/util'; - -import { Database } from '../../database'; - -const log = debug('vulcanize:reset-ipld-state'); - -export const command = 'ipld-state'; - -export const desc = 'Reset IPLD state in the given range'; - -export const builder = { - blockNumber: { - type: 'number' - } -}; - -export const handler = async (argv: any): Promise => { - const { blockNumber } = argv; - const config = await getConfig(argv.configFile); - - // Initialize database - const db = new Database(config.database); - await db.init(); - - // Create a DB transaction - const dbTx = await db.createTransactionRunner(); - - console.time('time:reset-ipld-state'); - try { - // Delete all IPLDBlock entries in the given range - await db.removeStateAfterBlock(dbTx, blockNumber); - - // Reset the IPLD status. - const ipldStatus = await db.getIPLDStatus(); - - if (ipldStatus) { - if (ipldStatus.latestHooksBlockNumber > blockNumber) { - await db.updateIPLDStatusHooksBlock(dbTx, blockNumber, true); - } - - if (ipldStatus.latestCheckpointBlockNumber > blockNumber) { - await db.updateIPLDStatusCheckpointBlock(dbTx, blockNumber, true); - } - - if (ipldStatus.latestIPFSBlockNumber > blockNumber) { - await db.updateIPLDStatusIPFSBlock(dbTx, blockNumber, true); - } - } - - dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - console.timeEnd('time:reset-ipld-state'); - - log(`Reset ipld-state successfully to block ${blockNumber}`); -}; diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts index 73aa8d02..0ed58e20 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts @@ -1,42 +1,18 @@ // -// Copyright 2021 Vulcanize, Inc. +// Copyright 2022 Vulcanize, Inc. // import debug from 'debug'; -import { MoreThan } from 'typeorm'; -import assert from 'assert'; -import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; -import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; -import { Client as UniClient } from '@vulcanize/uni-watcher'; +import { getConfig } from '@cerc-io/util'; import { Database } from '../../database'; -import { Indexer } from '../../indexer'; -import { BlockProgress } from '../../entity/BlockProgress'; -import { Factory } from '../../entity/Factory'; -import { Bundle } from '../../entity/Bundle'; -import { Pool } from '../../entity/Pool'; -import { Mint } from '../../entity/Mint'; -import { Burn } from '../../entity/Burn'; -import { Swap } from '../../entity/Swap'; -import { PositionSnapshot } from '../../entity/PositionSnapshot'; -import { Position } from '../../entity/Position'; -import { Token } from '../../entity/Token'; -import { PoolDayData } from '../../entity/PoolDayData'; -import { PoolHourData } from '../../entity/PoolHourData'; -import { Tick } from '../../entity/Tick'; -import { TickDayData } from '../../entity/TickDayData'; -import { TokenDayData } from '../../entity/TokenDayData'; -import { TokenHourData } from '../../entity/TokenHourData'; -import { Transaction } from '../../entity/Transaction'; -import { UniswapDayData } from '../../entity/UniswapDayData'; -import { Contract } from '../../entity/Contract'; const log = debug('vulcanize:reset-state'); export const command = 'state'; -export const desc = 'Reset state to block number'; +export const desc = 'Reset State to a given block number'; export const builder = { blockNumber: { @@ -45,79 +21,33 @@ export const builder = { }; export const handler = async (argv: any): Promise => { + const { blockNumber } = argv; const config = await getConfig(argv.configFile); - await resetJobs(config); - const { jobQueue: jobQueueConfig } = config; - const { dbConfig, serverConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config); - // Initialize database. - const db = new Database(dbConfig); + // Initialize database + const db = new Database(config.database); await db.init(); - const { - uniWatcher, - tokenWatcher - } = upstreamConfig; - - const uniClient = new UniClient(uniWatcher); - const erc20Client = new ERC20Client(tokenWatcher); - - assert(jobQueueConfig, 'Missing job queue config'); - - const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; - assert(dbConnectionString, 'Missing job queue db connection string'); - - const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - await jobQueue.start(); - - const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); - - const syncStatus = await indexer.getSyncStatus(); - assert(syncStatus, 'Missing syncStatus'); - - const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); - assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); - assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); - const [blockProgress] = blockProgresses; - + // Create a DB transaction const dbTx = await db.createTransactionRunner(); + console.time('time:reset-state'); try { - const removeEntitiesPromise = [ - BlockProgress, - Factory, - Bundle, - Pool, - Mint, - Burn, - Swap, - PositionSnapshot, - Position, - Token, - PoolDayData, - PoolHourData, - Tick, - TickDayData, - TokenDayData, - TokenHourData, - Transaction, - UniswapDayData - ].map(async entityClass => { - return db.deleteEntitiesByConditions(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); - }); - - await Promise.all(removeEntitiesPromise); - await db.deleteEntitiesByConditions(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) }); - - if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } + // Delete all State entries after the given block + await db.removeStatesAfterBlock(dbTx, blockNumber); - if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { - await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); - } + // Reset the stateSyncStatus. + const stateSyncStatus = await db.getStateSyncStatus(); + + if (stateSyncStatus) { + if (stateSyncStatus.latestIndexedBlockNumber > blockNumber) { + await db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, true); + } - await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); + if (stateSyncStatus.latestCheckpointBlockNumber > blockNumber) { + await db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, true); + } + } dbTx.commitTransaction(); } catch (error) { @@ -126,6 +56,7 @@ export const handler = async (argv: any): Promise => { } finally { await dbTx.release(); } + console.timeEnd('time:reset-state'); - log('Reset state successfully'); + log(`Reset state successfully to block ${blockNumber}`); }; diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts b/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts new file mode 100644 index 00000000..39f706b7 --- /dev/null +++ b/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts @@ -0,0 +1,143 @@ +// +// Copyright 2021 Vulcanize, Inc. +// + +import debug from 'debug'; +import { MoreThan } from 'typeorm'; +import assert from 'assert'; + +import { getConfig, getResetConfig, JobQueue, resetJobs } from '@vulcanize/util'; +import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; +import { Client as UniClient } from '@vulcanize/uni-watcher'; + +import { Database } from '../../database'; +import { Indexer } from '../../indexer'; +import { BlockProgress } from '../../entity/BlockProgress'; +import { Factory } from '../../entity/Factory'; +import { Bundle } from '../../entity/Bundle'; +import { Pool } from '../../entity/Pool'; +import { Mint } from '../../entity/Mint'; +import { Burn } from '../../entity/Burn'; +import { Swap } from '../../entity/Swap'; +import { PositionSnapshot } from '../../entity/PositionSnapshot'; +import { Position } from '../../entity/Position'; +import { Token } from '../../entity/Token'; +import { PoolDayData } from '../../entity/PoolDayData'; +import { PoolHourData } from '../../entity/PoolHourData'; +import { Tick } from '../../entity/Tick'; +import { TickDayData } from '../../entity/TickDayData'; +import { TokenDayData } from '../../entity/TokenDayData'; +import { TokenHourData } from '../../entity/TokenHourData'; +import { Transaction } from '../../entity/Transaction'; +import { UniswapDayData } from '../../entity/UniswapDayData'; +import { Contract } from '../../entity/Contract'; + +const log = debug('vulcanize:reset-watcher'); + +export const command = 'watcher'; + +export const desc = 'Reset watcher to a block number'; + +export const builder = { + blockNumber: { + type: 'number' + } +}; + +export const handler = async (argv: any): Promise => { + const config = await getConfig(argv.configFile); + await resetJobs(config); + const { jobQueue: jobQueueConfig } = config; + const { dbConfig, serverConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config); + + // Initialize database. + const db = new Database(dbConfig); + await db.init(); + + const { + uniWatcher, + tokenWatcher + } = upstreamConfig; + + const uniClient = new UniClient(uniWatcher); + const erc20Client = new ERC20Client(tokenWatcher); + + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue); + + const syncStatus = await indexer.getSyncStatus(); + assert(syncStatus, 'Missing syncStatus'); + + const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false); + assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`); + assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`); + const [blockProgress] = blockProgresses; + + const dbTx = await db.createTransactionRunner(); + + try { + const removeEntitiesPromise = [ + BlockProgress, + Factory, + Bundle, + Pool, + Mint, + Burn, + Swap, + PositionSnapshot, + Position, + Token, + PoolDayData, + PoolHourData, + Tick, + TickDayData, + TokenDayData, + TokenHourData, + Transaction, + UniswapDayData + ].map(async entityClass => { + return db.deleteEntitiesByConditions(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); + }); + + await Promise.all(removeEntitiesPromise); + await db.deleteEntitiesByConditions(dbTx, Contract, { startingBlock: MoreThan(argv.blockNumber) }); + + if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { + await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true); + } + + if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) { + await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true); + } + + const stateSyncStatus = await indexer.getStateSyncStatus(); + + if (stateSyncStatus) { + if (stateSyncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) { + await indexer.updateStateSyncStatusIndexedBlock(blockProgress.blockNumber, true); + } + + if (stateSyncStatus.latestCheckpointBlockNumber > blockProgress.blockNumber) { + await indexer.updateStateSyncStatusCheckpointBlock(blockProgress.blockNumber, true); + } + } + + await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); + + dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + log('Reset watcher successfully'); +}; diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 588124ef..44f9392e 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -55,8 +55,8 @@ import { Block } from './events'; import { SyncStatus } from './entity/SyncStatus'; import { TickDayData } from './entity/TickDayData'; import { Contract } from './entity/Contract'; -import { IPLDBlock } from './entity/IPLDBlock'; -import { IpldStatus } from './entity/IpldStatus'; +import { State } from './entity/State'; +import { StateSyncStatus } from './entity/StateSyncStatus'; import { Collect } from './entity/Collect'; import { Flash } from './entity/Flash'; import { TickHourData } from './entity/TickHourData'; @@ -130,11 +130,11 @@ export class Database implements DatabaseInterface { this._populateRelationsMap(); } - get relationsMap () { + get relationsMap (): Map { return this._relationsMap; } - get cachedEntities () { + get cachedEntities (): CachedEntities { return this._cachedEntities; } @@ -146,75 +146,69 @@ export class Database implements DatabaseInterface { return this._baseDatabase.close(); } - getNewIPLDBlock (): IPLDBlock { - return new IPLDBlock(); + getNewState (): State { + return new State(); } - async getIPLDBlocks (where: FindConditions): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getStates (where: FindConditions): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getIPLDBlocks(repo, where); + return this._baseDatabase.getStates(repo, where); } - async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber); + return this._baseDatabase.getLatestState(repo, contractAddress, kind, blockNumber); } - async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getPrevState (blockHash: string, contractAddress: string, kind?: string): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getPrevIPLDBlock(repo, blockHash, contractAddress, kind); + return this._baseDatabase.getPrevState(repo, blockHash, contractAddress, kind); } - // Fetch all diff IPLDBlocks after the specified block number. - async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise { - const repo = this._conn.getRepository(IPLDBlock); + // Fetch all diff States after the specified block number. + async getDiffStatesInRange (contractAddress: string, startblock: number, endBlock: number): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock); + return this._baseDatabase.getDiffStatesInRange(repo, contractAddress, startblock, endBlock); } - async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async saveOrUpdateState (dbTx: QueryRunner, state: State): Promise { + const repo = dbTx.manager.getRepository(State); - return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock); + return this._baseDatabase.saveOrUpdateState(repo, state); } - async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async removeStates (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { + const repo = dbTx.manager.getRepository(State); - await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind); + await this._baseDatabase.removeStates(repo, blockNumber, kind); } - async removeStateAfterBlock (dbTx: QueryRunner, blockNumber: number): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async removeStatesAfterBlock (dbTx: QueryRunner, blockNumber: number): Promise { + const repo = dbTx.manager.getRepository(State); - await this._baseDatabase.removeIPLDBlocksAfterBlock(repo, blockNumber); + await this._baseDatabase.removeStatesAfterBlock(repo, blockNumber); } - async getIPLDStatus (): Promise { - const repo = this._conn.getRepository(IpldStatus); + async getStateSyncStatus (): Promise { + const repo = this._conn.getRepository(StateSyncStatus); - return this._baseDatabase.getIPLDStatus(repo); + return this._baseDatabase.getStateSyncStatus(repo); } - async updateIPLDStatusHooksBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { - const repo = queryRunner.manager.getRepository(IpldStatus); + async updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); - return this._baseDatabase.updateIPLDStatusHooksBlock(repo, blockNumber, force); + return this._baseDatabase.updateStateSyncStatusIndexedBlock(repo, blockNumber, force); } - async updateIPLDStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { - const repo = queryRunner.manager.getRepository(IpldStatus); + async updateStateSyncStatusCheckpointBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { + const repo = queryRunner.manager.getRepository(StateSyncStatus); - return this._baseDatabase.updateIPLDStatusCheckpointBlock(repo, blockNumber, force); - } - - async updateIPLDStatusIPFSBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise { - const repo = queryRunner.manager.getRepository(IpldStatus); - - return this._baseDatabase.updateIPLDStatusIPFSBlock(repo, blockNumber, force); + return this._baseDatabase.updateStateSyncStatusCheckpointBlock(repo, blockNumber, force); } async getFactory (queryRunner: QueryRunner, { id, blockHash }: DeepPartial): Promise { diff --git a/packages/erc20-watcher/src/entity/IPLDBlock.ts b/packages/uni-info-watcher/src/entity/State.ts similarity index 96% rename from packages/erc20-watcher/src/entity/IPLDBlock.ts rename to packages/uni-info-watcher/src/entity/State.ts index bff1118c..7cf3e49b 100644 --- a/packages/erc20-watcher/src/entity/IPLDBlock.ts +++ b/packages/uni-info-watcher/src/entity/State.ts @@ -12,7 +12,7 @@ import { BlockProgress } from './BlockProgress'; @Index(['cid'], { unique: true }) @Index(['block', 'contractAddress']) @Index(['block', 'contractAddress', 'kind'], { unique: true }) -export class IPLDBlock { +export class State { @PrimaryGeneratedColumn() id!: number; diff --git a/packages/uni-watcher/src/entity/IpldStatus.ts b/packages/uni-info-watcher/src/entity/StateSyncStatus.ts similarity index 66% rename from packages/uni-watcher/src/entity/IpldStatus.ts rename to packages/uni-info-watcher/src/entity/StateSyncStatus.ts index fb81069e..ef3e2fdc 100644 --- a/packages/uni-watcher/src/entity/IpldStatus.ts +++ b/packages/uni-info-watcher/src/entity/StateSyncStatus.ts @@ -5,16 +5,13 @@ import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm'; @Entity() -export class IpldStatus { +export class StateSyncStatus { @PrimaryGeneratedColumn() id!: number; @Column('integer') - latestHooksBlockNumber!: number; + latestIndexedBlockNumber!: number; @Column('integer', { nullable: true }) latestCheckpointBlockNumber!: number; - - @Column('integer', { nullable: true }) - latestIPFSBlockNumber!: number; } diff --git a/packages/uni-info-watcher/src/hooks.ts b/packages/uni-info-watcher/src/hooks.ts index 60d05482..dd4943b3 100644 --- a/packages/uni-info-watcher/src/hooks.ts +++ b/packages/uni-info-watcher/src/hooks.ts @@ -20,13 +20,13 @@ export async function createInitialState (indexer: Indexer, contractAddress: str assert(blockHash); assert(contractAddress); - // Store an empty state in an IPLDBlock. - const ipldBlockData: any = { + // Store an empty State. + const stateData: any = { state: {} }; // Return initial state data to be saved. - return ipldBlockData; + return stateData; } /** diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 2515fe96..70285fdf 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -10,11 +10,23 @@ import { providers, utils, BigNumber } from 'ethers'; import { SelectionNode } from 'graphql'; import _ from 'lodash'; -import * as codec from '@ipld/dag-cbor'; import { Client as UniClient } from '@vulcanize/uni-watcher'; import { Client as ERC20Client } from '@vulcanize/erc20-watcher'; import { GraphDecimal, JobQueue } from '@vulcanize/util'; -import { ServerConfig, IPFSClient, IpldStatus as IpldStatusInterface, ValueResult, Indexer as BaseIndexer, IndexerInterface, QueryOptions, OrderDirection, BlockHeight, Where, ResultIPLDBlock, eventProcessingEthCallDuration, getFullTransaction, getFullBlock } from '@cerc-io/util'; +import { + ServerConfig, + StateStatus, + ValueResult, + Indexer as BaseIndexer, + IndexerInterface, + QueryOptions, + OrderDirection, + BlockHeight, + Where, + eventProcessingEthCallDuration, + getFullTransaction, + getFullBlock +} from '@cerc-io/util'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper'; @@ -39,8 +51,8 @@ import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; import { Tick } from './entity/Tick'; import { Contract, KIND_POOL } from './entity/Contract'; -import { IPLDBlock } from './entity/IPLDBlock'; -import { IpldStatus } from './entity/IpldStatus'; +import { State } from './entity/State'; +import { StateSyncStatus } from './entity/StateSyncStatus'; import { createInitialState, createStateCheckpoint } from './hooks'; const SYNC_DELTA = 5; @@ -73,8 +85,7 @@ export class Indexer implements IndexerInterface { this._ethClient = ethClient; this._ethProvider = ethProvider; this._serverConfig = serverConfig; - const ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); - this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient); + this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue); this._isDemo = this._serverConfig.mode === 'demo'; } @@ -88,7 +99,7 @@ export class Indexer implements IndexerInterface { async init (): Promise { await this._baseIndexer.fetchContracts(); - await this._baseIndexer.fetchIPLDStatus(); + await this._baseIndexer.fetchStateStatus(); } getResultEvent (event: Event): ResultEvent { @@ -117,26 +128,6 @@ export class Indexer implements IndexerInterface { }; } - getResultIPLDBlock (ipldBlock: IPLDBlock): ResultIPLDBlock { - const block = ipldBlock.block; - - const data = codec.decode(Buffer.from(ipldBlock.data)) as any; - - return { - block: { - cid: block.cid, - hash: block.blockHash, - number: block.blockNumber, - timestamp: block.blockTimestamp, - parentHash: block.parentHash - }, - contractAddress: ipldBlock.contractAddress, - cid: ipldBlock.cid, - kind: ipldBlock.kind, - data: JSON.stringify(data) - }; - } - async getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise { return this._baseIndexer.getStorageValue( storageLayout, @@ -147,10 +138,6 @@ export class Indexer implements IndexerInterface { ); } - async pushToIPFS (data: any): Promise { - await this._baseIndexer.pushToIPFS(data); - } - async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); @@ -161,6 +148,10 @@ export class Indexer implements IndexerInterface { return createStateCheckpoint(this, contractAddress, blockHash); } + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { + // TODO Implement + } + async processCheckpoint (blockHash: string): Promise { // Return if checkpointInterval is <= 0. const checkpointInterval = this._serverConfig.checkpointInterval; @@ -173,28 +164,24 @@ export class Indexer implements IndexerInterface { console.timeEnd('time:indexer#processCheckpoint-checkpoint'); } - async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise { - return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind); + async getPrevState (blockHash: string, contractAddress: string, kind?: string): Promise { + return this._db.getPrevState(blockHash, contractAddress, kind); } - async getIPLDBlocksByHash (blockHash: string): Promise { - return this._baseIndexer.getIPLDBlocksByHash(blockHash); + async getStatesByHash (blockHash: string): Promise { + return this._baseIndexer.getStatesByHash(blockHash); } - async getIPLDBlocks (where: FindConditions): Promise { - return this._db.getIPLDBlocks(where); + async getStateByCID (cid: string): Promise { + return this._baseIndexer.getStateByCID(cid); } - async getIPLDBlockByCid (cid: string): Promise { - return this._baseIndexer.getIPLDBlockByCid(cid); + async getStates (where: FindConditions): Promise { + return this._db.getStates(where); } - getIPLDData (ipldBlock: IPLDBlock): any { - return this._baseIndexer.getIPLDData(ipldBlock); - } - - isIPFSConfigured (): boolean { - return this._baseIndexer.isIPFSConfigured(); + getStateData (state: State): any { + return this._baseIndexer.getStateData(state); } // Method used to create auto diffs (diff_staged). @@ -317,12 +304,16 @@ export class Indexer implements IndexerInterface { } } - async updateIPLDStatusHooksBlock (blockNumber: number, force?: boolean): Promise { + async getStateSyncStatus (): Promise { + return this._db.getStateSyncStatus(); + } + + async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.updateIPLDStatusHooksBlock(dbTx, blockNumber, force); + res = await this._db.updateStateSyncStatusIndexedBlock(dbTx, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -334,12 +325,12 @@ export class Indexer implements IndexerInterface { return res; } - async updateIPLDStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise { + async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { - res = await this._db.updateIPLDStatusCheckpointBlock(dbTx, blockNumber, force); + res = await this._db.updateStateSyncStatusCheckpointBlock(dbTx, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); @@ -351,21 +342,14 @@ export class Indexer implements IndexerInterface { return res; } - async updateIPLDStatusIPFSBlock (blockNumber: number, force?: boolean): Promise { - const dbTx = await this._db.createTransactionRunner(); - let res; + async getLatestCanonicalBlock (): Promise { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); - try { - res = await this._db.updateIPLDStatusIPFSBlock(dbTx, blockNumber, force); - await dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } + const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + assert(latestCanonicalBlock); - return res; + return latestCanonicalBlock; } async getBlockEntities (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise { @@ -548,8 +532,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } - async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise { - await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus); + updateStateStatusMap (address: string, stateStatus: StateStatus): void { + this._baseIndexer.updateStateStatusMap(address, stateStatus); } cacheContract (contract: Contract): void { @@ -1511,7 +1495,7 @@ export class Indexer implements IndexerInterface { } } - async _handleFlash (block: Block, contractAddress: string, tx: Transaction, burnEvent: FlashEvent): Promise { + async _handleFlash (block: Block, contractAddress: string, tx: Transaction, flashEvent: FlashEvent): Promise { const dbTx = await this._db.createTransactionRunner(); try { diff --git a/packages/uni-info-watcher/src/resolvers.ts b/packages/uni-info-watcher/src/resolvers.ts index 6837b636..7729d261 100644 --- a/packages/uni-info-watcher/src/resolvers.ts +++ b/packages/uni-info-watcher/src/resolvers.ts @@ -8,7 +8,7 @@ import debug from 'debug'; import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; import { gqlQueryCount, gqlTotalQueryCount, GraphDecimal } from '@vulcanize/util'; -import { BlockHeight, OrderDirection } from '@cerc-io/util'; +import { BlockHeight, OrderDirection, getResultState } from '@cerc-io/util'; import { Indexer } from './indexer'; import { Burn } from './entity/Burn'; @@ -503,9 +503,9 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch gqlTotalQueryCount.inc(1); gqlQueryCount.labels('getState').inc(1); - const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind); + const state = await indexer.getPrevState(blockHash, contractAddress, kind); - return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined; + return state && state.block.isComplete ? getResultState(state) : undefined; } } }; diff --git a/packages/uni-info-watcher/src/schema.ts b/packages/uni-info-watcher/src/schema.ts index 73f4f0cb..eeef417b 100644 --- a/packages/uni-info-watcher/src/schema.ts +++ b/packages/uni-info-watcher/src/schema.ts @@ -55,7 +55,7 @@ type Pool { totalValueLockedETH: BigDecimal! totalValueLockedUSDUntracked: BigDecimal! liquidityProviderCount: BigInt! - + # Skipping fee growth as they are not queried. # feeGrowthGlobal0X128: BigInt! # feeGrowthGlobal1X128: BigInt! @@ -151,7 +151,7 @@ type TickDayData { } type TickHourData { - id: ID! + id: ID! pool: Pool! tick: Tick! liquidityGross: BigInt! @@ -553,7 +553,7 @@ type _Block_ { parentHash: String! } -type ResultIPLDBlock { +type ResultState { block: _Block_! contractAddress: String! cid: String @@ -793,7 +793,7 @@ type Query { subgraphName: String! ): SubgraphIndexingStatus - getState(blockHash: String!, contractAddress: String!, kind: String): ResultIPLDBlock + getState(blockHash: String!, contractAddress: String!, kind: String): ResultState } # diff --git a/packages/uni-watcher/src/cli/reset-cmds/state.ts b/packages/uni-watcher/src/cli/reset-cmds/watcher.ts similarity index 93% rename from packages/uni-watcher/src/cli/reset-cmds/state.ts rename to packages/uni-watcher/src/cli/reset-cmds/watcher.ts index a5745caf..05bc501b 100644 --- a/packages/uni-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-watcher/src/cli/reset-cmds/watcher.ts @@ -13,11 +13,11 @@ import { Indexer } from '../../indexer'; import { BlockProgress } from '../../entity/BlockProgress'; import { Contract } from '../../entity/Contract'; -const log = debug('vulcanize:reset-state'); +const log = debug('vulcanize:reset-watcher'); -export const command = 'state'; +export const command = 'watcher'; -export const desc = 'Reset state to block number'; +export const desc = 'Reset watcher to a block number'; export const builder = { blockNumber: { @@ -76,5 +76,5 @@ export const handler = async (argv: any): Promise => { await dbTx.release(); } - log('Reset state successfully'); + log('Reset watcher successfully'); }; diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index 5a6a6cd0..e7352825 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -12,8 +12,8 @@ import { Event } from './entity/Event'; import { Contract } from './entity/Contract'; import { BlockProgress } from './entity/BlockProgress'; import { SyncStatus } from './entity/SyncStatus'; -import { IPLDBlock } from './entity/IPLDBlock'; -import { IpldStatus } from './entity/IpldStatus'; +import { State } from './entity/State'; +import { StateSyncStatus } from './entity/StateSyncStatus'; export class Database implements DatabaseInterface { _config: ConnectionOptions @@ -39,45 +39,45 @@ export class Database implements DatabaseInterface { return this._baseDatabase.close(); } - getNewIPLDBlock (): IPLDBlock { - return new IPLDBlock(); + getNewState (): State { + return new State(); } - async getIPLDBlocks (where: FindConditions): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getStates (where: FindConditions): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getIPLDBlocks(repo, where); + return this._baseDatabase.getStates(repo, where); } - async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { - const repo = this._conn.getRepository(IPLDBlock); + async getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber); + return this._baseDatabase.getLatestState(repo, contractAddress, kind, blockNumber); } - // Fetch all diff IPLDBlocks after the specified block number. - async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise { - const repo = this._conn.getRepository(IPLDBlock); + // Fetch all diff States after the specified block number. + async getDiffStatesInRange (contractAddress: string, startblock: number, endBlock: number): Promise { + const repo = this._conn.getRepository(State); - return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock); + return this._baseDatabase.getDiffStatesInRange(repo, contractAddress, startblock, endBlock); } - async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async saveOrUpdateState (dbTx: QueryRunner, state: State): Promise { + const repo = dbTx.manager.getRepository(State); - return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock); + return this._baseDatabase.saveOrUpdateState(repo, state); } - async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { - const repo = dbTx.manager.getRepository(IPLDBlock); + async removeStates (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { + const repo = dbTx.manager.getRepository(State); - await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind); + await this._baseDatabase.removeStates(repo, blockNumber, kind); } - async getIPLDStatus (): Promise { - const repo = this._conn.getRepository(IpldStatus); + async getStateSyncStatus (): Promise { + const repo = this._conn.getRepository(StateSyncStatus); - return this._baseDatabase.getIPLDStatus(repo); + return this._baseDatabase.getStateSyncStatus(repo); } async getLatestContract (kind: string): Promise { diff --git a/packages/uni-watcher/src/entity/IPLDBlock.ts b/packages/uni-watcher/src/entity/State.ts similarity index 96% rename from packages/uni-watcher/src/entity/IPLDBlock.ts rename to packages/uni-watcher/src/entity/State.ts index bff1118c..7cf3e49b 100644 --- a/packages/uni-watcher/src/entity/IPLDBlock.ts +++ b/packages/uni-watcher/src/entity/State.ts @@ -12,7 +12,7 @@ import { BlockProgress } from './BlockProgress'; @Index(['cid'], { unique: true }) @Index(['block', 'contractAddress']) @Index(['block', 'contractAddress', 'kind'], { unique: true }) -export class IPLDBlock { +export class State { @PrimaryGeneratedColumn() id!: number; diff --git a/packages/erc20-watcher/src/entity/IpldStatus.ts b/packages/uni-watcher/src/entity/StateSyncStatus.ts similarity index 66% rename from packages/erc20-watcher/src/entity/IpldStatus.ts rename to packages/uni-watcher/src/entity/StateSyncStatus.ts index fb81069e..ef3e2fdc 100644 --- a/packages/erc20-watcher/src/entity/IpldStatus.ts +++ b/packages/uni-watcher/src/entity/StateSyncStatus.ts @@ -5,16 +5,13 @@ import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm'; @Entity() -export class IpldStatus { +export class StateSyncStatus { @PrimaryGeneratedColumn() id!: number; @Column('integer') - latestHooksBlockNumber!: number; + latestIndexedBlockNumber!: number; @Column('integer', { nullable: true }) latestCheckpointBlockNumber!: number; - - @Column('integer', { nullable: true }) - latestIPFSBlockNumber!: number; } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index fc92a502..f75f22b7 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -9,7 +9,7 @@ import { ethers } from 'ethers'; import assert from 'assert'; import { JobQueue, IndexerInterface } from '@vulcanize/util'; -import { Indexer as BaseIndexer, IPFSClient, IpldStatus as IpldStatusInterface, ServerConfig, Where, QueryOptions, ValueResult, UNKNOWN_EVENT_NAME } from '@cerc-io/util'; +import { Indexer as BaseIndexer, StateStatus, ServerConfig, Where, QueryOptions, ValueResult, UNKNOWN_EVENT_NAME } from '@cerc-io/util'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper'; @@ -18,11 +18,12 @@ import { Event } from './entity/Event'; import { BlockProgress } from './entity/BlockProgress'; import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract'; import { SyncStatus } from './entity/SyncStatus'; +import { State } from './entity/State'; +import { StateSyncStatus } from './entity/StateSyncStatus'; import { abi as factoryABI, storageLayout as factoryStorageLayout } from './artifacts/factory.json'; import { abi as nfpmABI, storageLayout as nfpmStorageLayout } from './artifacts/NonfungiblePositionManager.json'; import poolABI from './artifacts/pool.json'; -import { IPLDBlock } from './entity/IPLDBlock'; const log = debug('vulcanize:indexer'); @@ -55,8 +56,7 @@ export class Indexer implements IndexerInterface { this._ethClient = ethClient; this._ethProvider = ethProvider; this._serverConfig = serverConfig; - const ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); - this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient); + this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue); this._factoryContract = new ethers.utils.Interface(factoryABI); this._poolContract = new ethers.utils.Interface(poolABI); @@ -118,8 +118,8 @@ export class Indexer implements IndexerInterface { ); } - getIPLDData (ipldBlock: IPLDBlock): any { - return this._baseIndexer.getIPLDData(ipldBlock); + getStateData (state: State): any { + return this._baseIndexer.getStateData(state); } async triggerIndexingOnEvent (dbTx: QueryRunner, dbEvent: Event): Promise { @@ -152,6 +152,14 @@ export class Indexer implements IndexerInterface { // Method for processing on indexing new block. } + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { + // TODO Implement + } + + async processCheckpoint (blockHash: string): Promise { + // TODO Implement + } + parseEventNameAndArgs (kind: string, logObj: any): any { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; @@ -436,6 +444,30 @@ export class Indexer implements IndexerInterface { } } + async getStateSyncStatus (): Promise { + return this._db.getStateSyncStatus(); + } + + async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise { + // TODO Implement + return {} as StateSyncStatus; + } + + async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise { + // TODO Implement + return {} as StateSyncStatus; + } + + async getLatestCanonicalBlock (): Promise { + const syncStatus = await this.getSyncStatus(); + assert(syncStatus); + + const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + assert(latestCanonicalBlock); + + return latestCanonicalBlock; + } + async getContract (type: string): Promise { const contract = await this._db.getLatestContract(type); return contract; @@ -453,8 +485,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } - async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise { - await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus); + updateStateStatusMap (address: string, stateStatus: StateStatus): void { + this._baseIndexer.updateStateStatusMap(address, stateStatus); } cacheContract (contract: Contract): void { @@ -486,7 +518,7 @@ export class Indexer implements IndexerInterface { } async fetchBlockWithEvents (block: DeepPartial): Promise { - // Method not used in uni-info-watcher but required for indexer interface. + // Method not used in uni-watcher but required for indexer interface. return new BlockProgress(); } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index cb4666f8..1aab8145 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -82,7 +82,7 @@ export class JobRunner { break; case JOB_KIND_CONTRACT: - await this._updateWatchedContracts(job); + this._updateWatchedContracts(job); break; default: @@ -426,10 +426,12 @@ export class JobRunner { console.timeEnd('time:job-runner#_processEvents-events'); } - async _updateWatchedContracts (job: any): Promise { + _updateWatchedContracts (job: any): void { const { data: { contract } } = job; assert(this._indexer.cacheContract); this._indexer.cacheContract(contract); + + this._indexer.updateStateStatusMap(contract.address, {}); } }