Skip to content

Commit

Permalink
Use util indexer and database from watcher-ts
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi committed Oct 5, 2022
1 parent dd861d2 commit 346c1a5
Show file tree
Hide file tree
Showing 47 changed files with 989 additions and 1,701 deletions.
2 changes: 1 addition & 1 deletion packages/erc20-watcher/src/cli/reset-cmds/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export const handler = async (argv: any): Promise<void> => {

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, serverConfig.mode);
const indexer = new Indexer(serverConfig, db, ethClient, ethProvider, jobQueue);

const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');
Expand Down
14 changes: 10 additions & 4 deletions packages/erc20-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import 'reflect-metadata';
import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';

import { Database } from '../database';
import { Indexer } from '../indexer';
import { CONTRACT_KIND, Indexer } from '../indexer';

(async () => {
const argv = await yargs.parserConfiguration({
Expand All @@ -29,6 +29,12 @@ import { Indexer } from '../indexer';
demandOption: true,
describe: 'Address of the deployed contract'
},
checkpoint: {
type: 'boolean',
require: true,
demandOption: true,
describe: 'Turn checkpointing on'
},
startingBlock: {
type: 'number',
default: 1,
Expand All @@ -37,7 +43,7 @@ import { Indexer } from '../indexer';
}).argv;

const config: Config = await getConfig(argv.configFile);
const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config;
const { database: dbConfig, jobQueue: jobQueueConfig } = config;
const { ethClient, ethProvider } = await getResetConfig(config);

assert(dbConfig);
Expand All @@ -53,9 +59,9 @@ import { Indexer } from '../indexer';
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);

await indexer.watchContract(argv.address, argv.startingBlock);
await indexer.watchContract(argv.address, CONTRACT_KIND, argv.checkpoint, argv.startingBlock);

await db.close();
await jobQueue.stop();
Expand Down
58 changes: 54 additions & 4 deletions packages/erc20-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import path from 'path';

import { Database as BaseDatabase, QueryOptions, Where } from '@vulcanize/util';
import { Database as BaseDatabase, DatabaseInterface, StateKind, QueryOptions, Where } from '@cerc-io/util';

import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';
Expand All @@ -17,8 +17,10 @@ import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import { IpldStatus } from './entity/IpldStatus';

export class Database {
export class Database implements DatabaseInterface {
_config: ConnectionOptions
_conn!: Connection
_baseDatabase: BaseDatabase;
Expand All @@ -42,6 +44,47 @@ export class Database {
return this._baseDatabase.close();
}

getNewIPLDBlock (): IPLDBlock {
return new IPLDBlock();
}

async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);

return this._baseDatabase.getIPLDBlocks(repo, where);
}

async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
const repo = this._conn.getRepository(IPLDBlock);

return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
}

// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);

return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock);
}

async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {
const repo = dbTx.manager.getRepository(IPLDBlock);

return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock);
}

async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
const repo = dbTx.manager.getRepository(IPLDBlock);

await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}

async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);

return this._baseDatabase.getIPLDStatus(repo);
}

async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
return this._conn.getRepository(Balance)
.createQueryBuilder('balance')
Expand Down Expand Up @@ -154,16 +197,23 @@ export class Database {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}

async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);

return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}

async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);

return this._baseDatabase.saveEvents(eventRepo, events);
}

async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<Contract> {
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {
const repo = queryRunner.manager.getRepository(Contract);

return this._baseDatabase.saveContract(repo, address, startingBlock, kind);
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
}

async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
Expand Down
4 changes: 4 additions & 0 deletions packages/erc20-watcher/src/entity/BlockProgress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn()
id!: number;

// TODO: Remove nullable after cid for all blocks updated.
@Column('varchar', { nullable: true })
cid!: string;

@Column('varchar', { length: 66 })
blockHash!: string;

Expand Down
3 changes: 3 additions & 0 deletions packages/erc20-watcher/src/entity/Contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ export class Contract {
@Column('varchar', { length: 8 })
kind!: string;

@Column('boolean', { default: false })
checkpoint!: boolean;

@Column('integer')
startingBlock!: number;
}
36 changes: 36 additions & 0 deletions packages/erc20-watcher/src/entity/IPLDBlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Copyright 2022 Vulcanize, Inc.
//

import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';

import { StateKind } from '@cerc-io/util';

import { BlockProgress } from './BlockProgress';

@Entity()
@Index(['cid'], { unique: true })
@Index(['block', 'contractAddress'])
@Index(['block', 'contractAddress', 'kind'], { unique: true })
export class IPLDBlock {
@PrimaryGeneratedColumn()
id!: number;

@ManyToOne(() => BlockProgress, { onDelete: 'CASCADE' })
block!: BlockProgress;

@Column('varchar', { length: 42 })
contractAddress!: string;

@Column('varchar')
cid!: string;

@Column({
type: 'enum',
enum: StateKind
})
kind!: StateKind;

@Column('bytea')
data!: Buffer;
}
20 changes: 20 additions & 0 deletions packages/erc20-watcher/src/entity/IpldStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//
// Copyright 2022 Vulcanize, Inc.
//

import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';

@Entity()
export class IpldStatus {
@PrimaryGeneratedColumn()
id!: number;

@Column('integer')
latestHooksBlockNumber!: number;

@Column('integer', { nullable: true })
latestCheckpointBlockNumber!: number;

@Column('integer', { nullable: true })
latestIPFSBlockNumber!: number;
}
6 changes: 6 additions & 0 deletions packages/erc20-watcher/src/entity/SyncStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ export class SyncStatus implements SyncStatusInterface {

@Column('integer')
latestCanonicalBlockNumber!: number;

@Column('varchar', { length: 66 })
initialIndexedBlockHash!: string;

@Column('integer')
initialIndexedBlockNumber!: number;
}
6 changes: 3 additions & 3 deletions packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import debug from 'debug';
import { PubSub } from 'apollo-server-express';

import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';

import { Database } from './database';
import { Indexer } from './indexer';
Expand Down Expand Up @@ -61,7 +61,7 @@ export const main = async (): Promise<any> => {

assert(config.server, 'Missing server config');

const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config;
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;

assert(dbConfig, 'Missing database config');

Expand Down Expand Up @@ -89,7 +89,7 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode);
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue);

const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue);

Expand Down
Loading

0 comments on commit 346c1a5

Please sign in to comment.