Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state creation code and satisfy updated interfaces #375

Merged
merged 3 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import { Allowance } from '../../entity/Allowance';
import { Balance } from '../../entity/Balance';
import { Contract } from '../../entity/Contract';

const log = debug('vulcanize:reset-state');
const log = debug('vulcanize:reset-watcher');

export const command = 'state';
export const command = 'watcher';

export const desc = 'Reset state to block number';
export const desc = 'Reset watcher to a block number';

export const builder = {
blockNumber: {
Expand Down Expand Up @@ -82,5 +82,5 @@ export const handler = async (argv: any): Promise<void> => {
await dbTx.release();
}

log('Reset state successfully');
log('Reset watcher successfully');
};
46 changes: 23 additions & 23 deletions packages/erc20-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import { IpldStatus } from './entity/IpldStatus';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';

export class Database implements DatabaseInterface {
_config: ConnectionOptions
Expand All @@ -44,45 +44,45 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.close();
}

getNewIPLDBlock (): IPLDBlock {
return new IPLDBlock();
getNewState (): State {
return new State();
}

async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
async getStates (where: FindConditions<State>): Promise<State[]> {
const repo = this._conn.getRepository(State);

return this._baseDatabase.getIPLDBlocks(repo, where);
return this._baseDatabase.getStates(repo, where);
}

async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
const repo = this._conn.getRepository(IPLDBlock);
async getLatestState (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<State | undefined> {
const repo = this._conn.getRepository(State);

return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
return this._baseDatabase.getLatestState(repo, contractAddress, kind, blockNumber);
}

// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
// Fetch all diff States after the specified block number.
async getDiffStatesInRange (contractAddress: string, startblock: number, endBlock: number): Promise<State[]> {
const repo = this._conn.getRepository(State);

return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock);
return this._baseDatabase.getDiffStatesInRange(repo, contractAddress, startblock, endBlock);
}

async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {
const repo = dbTx.manager.getRepository(IPLDBlock);
async saveOrUpdateState (dbTx: QueryRunner, state: State): Promise<State> {
const repo = dbTx.manager.getRepository(State);

return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock);
return this._baseDatabase.saveOrUpdateState(repo, state);
}

async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
const repo = dbTx.manager.getRepository(IPLDBlock);
async removeStates (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
const repo = dbTx.manager.getRepository(State);

await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
await this._baseDatabase.removeStates(repo, blockNumber, kind);
}

async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);
async getStateSyncStatus (): Promise<StateSyncStatus | undefined> {
const repo = this._conn.getRepository(StateSyncStatus);

return this._baseDatabase.getIPLDStatus(repo);
return this._baseDatabase.getStateSyncStatus(repo);
}

async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { BlockProgress } from './BlockProgress';
@Index(['cid'], { unique: true })
@Index(['block', 'contractAddress'])
@Index(['block', 'contractAddress', 'kind'], { unique: true })
export class IPLDBlock {
export class State {
@PrimaryGeneratedColumn()
id!: number;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';

@Entity()
export class IpldStatus {
export class StateSyncStatus {
@PrimaryGeneratedColumn()
id!: number;

@Column('integer')
latestHooksBlockNumber!: number;
latestIndexedBlockNumber!: number;

@Column('integer', { nullable: true })
latestCheckpointBlockNumber!: number;

@Column('integer', { nullable: true })
latestIPFSBlockNumber!: number;
}
58 changes: 49 additions & 9 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ import { ethers } from 'ethers';

import { JobQueue, IndexerInterface } from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { Indexer as BaseIndexer, IPFSClient, ServerConfig, IpldStatus as IpldStatusInterface, ValueResult, Where, QueryOptions, UNKNOWN_EVENT_NAME } from '@cerc-io/util';
import {
Indexer as BaseIndexer,
ServerConfig,
StateStatus,
ValueResult,
Where,
QueryOptions,
UNKNOWN_EVENT_NAME
} from '@cerc-io/util';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

import { Database } from './database';
Expand All @@ -21,7 +29,8 @@ import { SyncStatus } from './entity/SyncStatus';
import artifacts from './artifacts/ERC20.json';
import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
import { IPLDBlock } from './entity/IPLDBlock';
import { State } from './entity/State';
import { StateSyncStatus } from './entity/StateSyncStatus';

const log = debug('vulcanize:indexer');

Expand Down Expand Up @@ -66,8 +75,7 @@ export class Indexer implements IndexerInterface {
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._serverMode = this._serverConfig.mode;
const ipfsClient = new IPFSClient(serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient);
this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue);

const { abi, storageLayout } = artifacts;

Expand Down Expand Up @@ -111,8 +119,8 @@ export class Indexer implements IndexerInterface {
);
}

getIPLDData (ipldBlock: IPLDBlock): any {
return this._baseIndexer.getIPLDData(ipldBlock);
getStateData (state: State): any {
return this._baseIndexer.getStateData(state);
}

async totalSupply (blockHash: string, token: string): Promise<ValueResult> {
Expand Down Expand Up @@ -327,6 +335,14 @@ export class Indexer implements IndexerInterface {
// Method for processing on indexing new block.
}

async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> {
// TODO Implement
}

async processCheckpoint (blockHash: string): Promise<void> {
// TODO Implement
}

parseEventNameAndArgs (kind: string, logObj: any): any {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
Expand Down Expand Up @@ -362,6 +378,30 @@ export class Indexer implements IndexerInterface {
return { eventName, eventInfo };
}

async getStateSyncStatus (): Promise<StateSyncStatus | undefined> {
return this._db.getStateSyncStatus();
}

async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
// TODO Implement
return {} as StateSyncStatus;
}

async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
// TODO Implement
return {} as StateSyncStatus;
}

async getLatestCanonicalBlock (): Promise<BlockProgress> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);

const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);

return latestCanonicalBlock;
}

async getEventsByFilter (blockHash: string, contract: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
Expand All @@ -374,8 +414,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.watchContract(address, CONTRACT_KIND, checkpoint, startingBlock);
}

async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise<void> {
await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus);
updateStateStatusMap (address: string, stateStatus: StateStatus): void {
this._baseIndexer.updateStateStatusMap(address, stateStatus);
}

async saveEventEntity (dbEvent: Event): Promise<Event> {
Expand Down Expand Up @@ -438,7 +478,7 @@ export class Indexer implements IndexerInterface {
}

async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
// Method not used in uni-info-watcher but required for indexer interface.
// Method not used in erc20-watcher but required for indexer interface.
return new BlockProgress();
}

Expand Down
3 changes: 0 additions & 3 deletions packages/uni-info-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
# Checkpoint interval in number of blocks.
checkpointInterval = 2000

# IPFS API address (can be taken from the output on running the IPFS daemon).
# ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"

# Max block range for which to return events in eventsInRange GQL query.
# Use -1 for skipping check on block range.
maxEventsBlockRange = 1000
Expand Down
21 changes: 3 additions & 18 deletions packages/uni-info-watcher/src/cli/fill-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const fillState = async (
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);

// Check that there are no existing diffs in this range
const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } });
const existingStates = await indexer.getStates({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
Expand Down Expand Up @@ -97,26 +97,11 @@ export const fillState = async (

// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
await indexer.updateIPLDStatusHooksBlock(blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(blockNumber);

// Create checkpoints
await indexer.processCheckpoint(blockHash);
await indexer.updateIPLDStatusCheckpointBlock(blockNumber);

// TODO: Push state to IPFS in separate process.
if (indexer.isIPFSConfigured()) {
// Get IPLDBlocks for the given blocHash.
const ipldBlocks = await indexer.getIPLDBlocksByHash(blockHash);

// Push all the IPLDBlocks to IPFS.
for (const ipldBlock of ipldBlocks) {
const data = indexer.getIPLDData(ipldBlock);
await indexer.pushToIPFS(data);
}

// Update the IPLD status.
await indexer.updateIPLDStatusIPFSBlock(blockNumber);
}
await indexer.updateStateSyncStatusCheckpointBlock(blockNumber);

console.timeEnd(`time:fill-state-${blockNumber}`);
}
Expand Down
8 changes: 4 additions & 4 deletions packages/uni-info-watcher/src/cli/inspect-cid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ const main = async (): Promise<void> => {
const indexer = new Indexer(config.server, db, uniClient, erc20Client, ethClient, ethProvider, jobQueue);
await indexer.init();

const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid);
assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.');
const state = await indexer.getStateByCID(argv.cid);
assert(state, 'State for the provided CID doesn\'t exist.');

const ipldData = await indexer.getIPLDData(ipldBlock);
const stateData = await indexer.getStateData(state);

log(util.inspect(ipldData, false, null));
log(util.inspect(stateData, false, null));
};

main().catch(err => {
Expand Down
66 changes: 0 additions & 66 deletions packages/uni-info-watcher/src/cli/reset-cmds/ipld-state.ts

This file was deleted.

Loading