Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use import-state CLI from cli package #406

Merged
merged 2 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions packages/erc20-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import {
JobQueue
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
UpstreamConfig
} from '@vulcanize/util';

import { Indexer } from './indexer';

Expand All @@ -26,15 +23,15 @@ export class EventWatcher {
_pubsub: PubSub
_jobQueue: JobQueue

constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);

this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}

getEventIterator (): AsyncIterator<any> {
Expand Down
2 changes: 1 addition & 1 deletion packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const main = async (): Promise<any> => {

const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);

const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
};
Expand Down
11 changes: 10 additions & 1 deletion packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import {
UNKNOWN_EVENT_NAME,
JobQueue,
DatabaseInterface,
Clients
Clients,
StateKind
} from '@cerc-io/util';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

Expand Down Expand Up @@ -362,6 +363,14 @@ export class Indexer implements IndexerInterface {
return undefined;
}

async saveOrUpdateState (state: State): Promise<State> {
return {} as State;
}

async removeStates (blockNumber: number, kind: StateKind): Promise<void> {
// TODO Implement
}

parseEventNameAndArgs (kind: string, logObj: any): any {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
Expand Down
2 changes: 1 addition & 1 deletion packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();

const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
Expand Down
110 changes: 6 additions & 104 deletions packages/uni-info-watcher/src/cli/import-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,13 @@
// Copyright 2022 Vulcanize, Inc.
//

import assert from 'assert';
import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import fs from 'fs';
import path from 'path';

import { getConfig, JobQueue, DEFAULT_CONFIG_PATH, initClients, StateKind, fillBlocks } from '@cerc-io/util';
import { updateEntitiesFromState } from '@cerc-io/graph-node';
import { Config } from '@vulcanize/util';
import { ImportStateCmd } from '@cerc-io/cli';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import * as codec from '@ipld/dag-cbor';
import { Config } from '@vulcanize/util';

import { Database } from '../database';
import { Indexer } from '../indexer';
Expand All @@ -26,43 +18,9 @@ import { State } from '../entity/State';
const log = debug('vulcanize:import-state');

export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;

const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);

const db = new Database(config.database, config.server);
await db.init();

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();

const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const importStateCmd = new ImportStateCmd();

const config: Config = await importStateCmd.initConfig();
const {
uniWatcher,
tokenWatcher
Expand All @@ -71,64 +29,8 @@ export const main = async (): Promise<any> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);

const indexer = new Indexer(config.server, db, { uniClient, erc20Client, ethClient }, ethProvider, jobQueue);
await indexer.init();

const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);

// Import data.
const importFilePath = path.resolve(argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const importData = codec.decode(Buffer.from(encodedImportData)) as any;

// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);

// Fill the Contracts.
for (const contract of importData.contracts) {
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}

// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);

// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();

state = Object.assign(state, checkpoint);
state.block = block;

state.data = Buffer.from(codec.encode(state.data));

state = await indexer.saveOrUpdateState(state);
await updateEntitiesFromState(db.graphDatabase, indexer, state);
}

// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);

// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);

log(`Import completed for snapshot block at height ${block.blockNumber}`);
await importStateCmd.init(Database, Indexer, EventWatcher, { uniClient, erc20Client });
await importStateCmd.exec(State);
};

main().catch(err => {
Expand Down
9 changes: 4 additions & 5 deletions packages/uni-info-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, EventWatcher as BaseEventWatcher, JobQueue, EventWatcherInterface } from '@cerc-io/util';
import { UpstreamConfig } from '@vulcanize/util';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, EventWatcher as BaseEventWatcher, JobQueue, EventWatcherInterface, IndexerInterface } from '@cerc-io/util';

import { Indexer } from './indexer';

Expand Down Expand Up @@ -141,12 +140,12 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher

constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export const main = async (): Promise<any> => {
return;
}

const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
};
Expand Down
4 changes: 4 additions & 0 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}

getRelationsMap (): Map<any, { [key: string]: any }> {
return this._db.relationsMap;
}

async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._saveBlockAndFetchEvents(block);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { uniClient, erc20Client, ethClient }, ethProvider, jobQueue);

const pubSub = new PubSub();
const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubSub, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubSub, jobQueue);
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
Expand Down
5 changes: 2 additions & 3 deletions packages/uni-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
JobQueue,
EventWatcherInterface
} from '@cerc-io/util';
import { UpstreamConfig } from '@vulcanize/util';

import { Indexer } from './indexer';

Expand All @@ -25,12 +24,12 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher

constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue);
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}

getEventIterator (): AsyncIterator<any> {
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();

const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
};
Expand Down
11 changes: 10 additions & 1 deletion packages/uni-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import {
getResultEvent,
JobQueue,
DatabaseInterface,
Clients
Clients,
StateKind
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';
Expand Down Expand Up @@ -143,6 +144,14 @@ export class Indexer implements IndexerInterface {
return undefined;
}

async saveOrUpdateState (state: State): Promise<State> {
return {} as State;
}

async removeStates (blockNumber: number, kind: StateKind): Promise<void> {
// TODO Implement
}

parseEventNameAndArgs (kind: string, logObj: any): any {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();

const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue);
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
Expand Down