Skip to content

Commit

Permalink
Refactor state creation code and satisfy updated interfaces (#375)
Browse files Browse the repository at this point in the history
* Remove IPFS related code for state creation

* Rename state creation related methods and satisfy updated interfaces

* Make method to update state status map synchronous
  • Loading branch information
prathamesh0 authored Oct 19, 2022
1 parent 4226de5 commit 51028b7
Show file tree
Hide file tree
Showing 24 changed files with 426 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import { Allowance } from '../../entity/Allowance';
import { Balance } from '../../entity/Balance';
import { Contract } from '../../entity/Contract';

const log = debug('vulcanize:reset-state');
const log = debug('vulcanize:reset-watcher');

export const command = 'state';
export const command = 'watcher';

export const desc = 'Reset state to block number';
export const desc = 'Reset watcher to a block number';

export const builder = {
blockNumber: {
Expand Down Expand Up @@ -82,5 +82,5 @@ export const handler = async (argv: any): Promise<void> => {
await dbTx.release();
}

log('Reset state successfully');
log('Reset watcher successfully');
};
46 changes: 23 additions & 23 deletions packages/erc20-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import { IpldStatus } from './entity/IpldStatus';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';

export class Database implements DatabaseInterface {
_config: ConnectionOptions
Expand All @@ -44,45 +44,45 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.close();
}

getNewIPLDBlock (): IPLDBlock {
return new IPLDBlock();
getNewState (): State {
return new State();
}

async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
async getStates (where: FindConditions<State>): Promise<State[]> {
const repo = this._conn.getRepository(State);

return this._baseDatabase.getIPLDBlocks(repo, where);
return this._baseDatabase.getStates(repo, where);
}

async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
const repo = this._conn.getRepository(IPLDBlock);
async getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<State | undefined> {
const repo = this._conn.getRepository(State);

return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
return this._baseDatabase.getLatestState(repo, contractAddress, kind, blockNumber);
}

// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
// Fetch all diff States after the specified block number.
async getDiffStatesInRange (contractAddress: string, startblock: number, endBlock: number): Promise<State[]> {
const repo = this._conn.getRepository(State);

return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock);
return this._baseDatabase.getDiffStatesInRange(repo, contractAddress, startblock, endBlock);
}

async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {
const repo = dbTx.manager.getRepository(IPLDBlock);
async saveOrUpdateState (dbTx: QueryRunner, state: State): Promise<State> {
const repo = dbTx.manager.getRepository(State);

return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock);
return this._baseDatabase.saveOrUpdateState(repo, state);
}

async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
const repo = dbTx.manager.getRepository(IPLDBlock);
async removeStates (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
const repo = dbTx.manager.getRepository(State);

await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
await this._baseDatabase.removeStates(repo, blockNumber, kind);
}

async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);
async getStateSyncStatus (): Promise<StateSyncStatus | undefined> {
const repo = this._conn.getRepository(StateSyncStatus);

return this._baseDatabase.getIPLDStatus(repo);
return this._baseDatabase.getStateSyncStatus(repo);
}

async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { BlockProgress } from './BlockProgress';
@Index(['cid'], { unique: true })
@Index(['block', 'contractAddress'])
@Index(['block', 'contractAddress', 'kind'], { unique: true })
export class IPLDBlock {
export class State {
@PrimaryGeneratedColumn()
id!: number;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';

@Entity()
export class IpldStatus {
export class StateSyncStatus {
@PrimaryGeneratedColumn()
id!: number;

@Column('integer')
latestHooksBlockNumber!: number;
latestIndexedBlockNumber!: number;

@Column('integer', { nullable: true })
latestCheckpointBlockNumber!: number;

@Column('integer', { nullable: true })
latestIPFSBlockNumber!: number;
}
58 changes: 49 additions & 9 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ import { ethers } from 'ethers';

import { JobQueue, IndexerInterface } from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { Indexer as BaseIndexer, IPFSClient, ServerConfig, IpldStatus as IpldStatusInterface, ValueResult, Where, QueryOptions, UNKNOWN_EVENT_NAME } from '@cerc-io/util';
import {
Indexer as BaseIndexer,
ServerConfig,
StateStatus,
ValueResult,
Where,
QueryOptions,
UNKNOWN_EVENT_NAME
} from '@cerc-io/util';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

import { Database } from './database';
Expand All @@ -21,7 +29,8 @@ import { SyncStatus } from './entity/SyncStatus';
import artifacts from './artifacts/ERC20.json';
import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
import { IPLDBlock } from './entity/IPLDBlock';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';

const log = debug('vulcanize:indexer');

Expand Down Expand Up @@ -66,8 +75,7 @@ export class Indexer implements IndexerInterface {
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._serverMode = this._serverConfig.mode;
const ipfsClient = new IPFSClient(serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient);
this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue);

const { abi, storageLayout } = artifacts;

Expand Down Expand Up @@ -111,8 +119,8 @@ export class Indexer implements IndexerInterface {
);
}

getIPLDData (ipldBlock: IPLDBlock): any {
return this._baseIndexer.getIPLDData(ipldBlock);
getStateData (state: State): any {
return this._baseIndexer.getStateData(state);
}

async totalSupply (blockHash: string, token: string): Promise<ValueResult> {
Expand Down Expand Up @@ -327,6 +335,14 @@ export class Indexer implements IndexerInterface {
// Method for processing on indexing new block.
}

async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> {
// TODO Implement
}

async processCheckpoint (blockHash: string): Promise<void> {
// TODO Implement
}

parseEventNameAndArgs (kind: string, logObj: any): any {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
Expand Down Expand Up @@ -362,6 +378,30 @@ export class Indexer implements IndexerInterface {
return { eventName, eventInfo };
}

async getStateSyncStatus (): Promise<StateSyncStatus | undefined> {
return this._db.getStateSyncStatus();
}

async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
// TODO Implement
return {} as StateSyncStatus;
}

async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
// TODO Implement
return {} as StateSyncStatus;
}

async getLatestCanonicalBlock (): Promise<BlockProgress> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);

const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);

return latestCanonicalBlock;
}

async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
Expand All @@ -374,8 +414,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.watchContract(address, CONTRACT_KIND, checkpoint, startingBlock);
}

async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise<void> {
await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus);
updateStateStatusMap (address: string, stateStatus: StateStatus): void {
this._baseIndexer.updateStateStatusMap(address, stateStatus);
}

async saveEventEntity (dbEvent: Event): Promise<Event> {
Expand Down Expand Up @@ -438,7 +478,7 @@ export class Indexer implements IndexerInterface {
}

async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
// Method not used in uni-info-watcher but required for indexer interface.
// Method not used in erc20-watcher but required for indexer interface.
return new BlockProgress();
}

Expand Down
3 changes: 0 additions & 3 deletions packages/uni-info-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
# Checkpoint interval in number of blocks.
checkpointInterval = 2000

# IPFS API address (can be taken from the output on running the IPFS daemon).
# ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"

# Max block range for which to return events in eventsInRange GQL query.
# Use -1 for skipping check on block range.
maxEventsBlockRange = 1000
Expand Down
21 changes: 3 additions & 18 deletions packages/uni-info-watcher/src/cli/fill-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const fillState = async (
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);

// Check that there are no existing diffs in this range
const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } });
const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
Expand Down Expand Up @@ -97,26 +97,11 @@ export const fillState = async (

// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
await indexer.updateIPLDStatusHooksBlock(blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(blockNumber);

// Create checkpoints
await indexer.processCheckpoint(blockHash);
await indexer.updateIPLDStatusCheckpointBlock(blockNumber);

// TODO: Push state to IPFS in separate process.
if (indexer.isIPFSConfigured()) {
// Get IPLDBlocks for the given blocHash.
const ipldBlocks = await indexer.getIPLDBlocksByHash(blockHash);

// Push all the IPLDBlocks to IPFS.
for (const ipldBlock of ipldBlocks) {
const data = indexer.getIPLDData(ipldBlock);
await indexer.pushToIPFS(data);
}

// Update the IPLD status.
await indexer.updateIPLDStatusIPFSBlock(blockNumber);
}
await indexer.updateStateSyncStatusCheckpointBlock(blockNumber);

console.timeEnd(`time:fill-state-${blockNumber}`);
}
Expand Down
8 changes: 4 additions & 4 deletions packages/uni-info-watcher/src/cli/inspect-cid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ const main = async (): Promise<void> => {
const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);
await indexer.init();

const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid);
assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.');
const state = await indexer.getStateByCID(argv.cid);
assert(state, 'State for the provided CID doesn\'t exist.');

const ipldData = await indexer.getIPLDData(ipldBlock);
const stateData = await indexer.getStateData(state);

log(util.inspect(ipldData, false, null));
log(util.inspect(stateData, false, null));
};

main().catch(err => {
Expand Down
66 changes: 0 additions & 66 deletions packages/uni-info-watcher/src/cli/reset-cmds/ipld-state.ts

This file was deleted.

Loading

0 comments on commit 51028b7

Please sign in to comment.