Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use event watcher from watcher-ts util #415

Merged
merged 2 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/erc20-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@
"debug": "^4.3.1",
"ethers": "^5.2.0",
"express": "^4.18.2",
"graphql-import-node": "^0.0.4",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",
Expand Down
69 changes: 0 additions & 69 deletions packages/erc20-watcher/src/events.ts

This file was deleted.

3 changes: 1 addition & 2 deletions packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import { FillCmd } from '@cerc-io/cli';

import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const fillCmd = new FillCmd();
await fillCmd.init(Database);
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);

await fillCmd.exec();
};
Expand Down
6 changes: 2 additions & 4 deletions packages/erc20-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';

import { gqlTotalQueryCount, gqlQueryCount, ValueResult, IndexerInterface, EventWatcherInterface } from '@cerc-io/util';
import { gqlTotalQueryCount, gqlQueryCount, ValueResult, IndexerInterface, EventWatcher } from '@cerc-io/util';

import { CONTRACT_KIND, Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:resolver');

export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

return {
BigInt: new BigInt('bigInt'),
Expand Down
4 changes: 1 addition & 3 deletions packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@
//

import debug from 'debug';
import 'graphql-import-node';

import { ServerCmd } from '@cerc-io/cli';

import typeDefs from './schema';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database);
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);

return serverCmd.exec(createResolvers, typeDefs);
};
Expand Down
2 changes: 0 additions & 2 deletions packages/uni-info-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
"debug": "^4.3.1",
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-import-node": "^0.0.4",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ export const handler = async (argv: any): Promise<void> => {
await createCheckpointCmd.init(argv, Database, { uniClient, erc20Client });

await createCheckpointCmd.initIndexer(Indexer);

await createCheckpointCmd.exec();
};
1 change: 1 addition & 0 deletions packages/uni-info-watcher/src/cli/export-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const main = async (): Promise<void> => {

await exportStateCmd.init(Database, { uniClient, erc20Client });
await exportStateCmd.initIndexer(Indexer);

await exportStateCmd.exec();
};

Expand Down
4 changes: 2 additions & 2 deletions packages/uni-info-watcher/src/cli/import-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { Config } from '@vulcanize/util';

import { Database } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';

const log = debug('vulcanize:import-state');
Expand All @@ -30,7 +29,8 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);

await importStateCmd.init(Database, { uniClient, erc20Client });
await importStateCmd.initIndexer(Indexer, EventWatcher);
await importStateCmd.initIndexer(Indexer);

await importStateCmd.exec(State);
};

Expand Down
1 change: 1 addition & 0 deletions packages/uni-info-watcher/src/cli/inspect-cid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const main = async (): Promise<void> => {

await inspectCIDCmd.init(Database, { uniClient, erc20Client });
await inspectCIDCmd.initIndexer(Indexer);

await inspectCIDCmd.exec();
};

Expand Down
1 change: 1 addition & 0 deletions packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ export const handler = async (argv: any): Promise<void> => {

await resetWatcherCmd.init(argv, Database, { uniClient, erc20Client });
await resetWatcherCmd.initIndexer(Indexer);

await resetWatcherCmd.exec();
};
1 change: 1 addition & 0 deletions packages/uni-info-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const main = async (): Promise<void> => {

await watchContractCmd.init(Database, { uniClient, erc20Client });
await watchContractCmd.initIndexer(Indexer);

await watchContractCmd.exec();
};

Expand Down
57 changes: 0 additions & 57 deletions packages/uni-info-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, EventWatcher as BaseEventWatcher, JobQueue, EventWatcherInterface, IndexerInterface } from '@cerc-io/util';

import { Indexer } from './indexer';

const log = debug('vulcanize:events');

export interface PoolCreatedEvent {
__typename: 'PoolCreatedEvent';
token0: string;
Expand Down Expand Up @@ -131,49 +120,3 @@ export interface ResultEvent {
data: string;
}
}

export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher

constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}

getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}

async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
log('Started watching upstream events...');

await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}

async stop (): Promise<void> {
this._baseEventWatcher.stop();
}

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}
3 changes: 1 addition & 2 deletions packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { Client as ERC20Client } from '@vulcanize/erc20-watcher';

import { Database, ENTITIES } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { FACTORY_ADDRESS } from './utils/constants';

const log = debug('vulcanize:fill');
Expand All @@ -30,7 +29,7 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);

await fillCmd.init(Database, { uniClient, erc20Client });
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);

await fillCmd.exec(getContractEntitiesMap());
};
Expand Down
6 changes: 2 additions & 4 deletions packages/uni-info-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
gqlQueryCount,
gqlTotalQueryCount,
IndexerInterface,
EventWatcherInterface
EventWatcher
} from '@cerc-io/util';

import { Indexer } from './indexer';
Expand All @@ -41,16 +41,14 @@ import { TickHourData } from './entity/TickHourData';
import { Flash } from './entity/Flash';
import { Collect } from './entity/Collect';
import { PoolHourData } from './entity/PoolHourData';
import { EventWatcher } from './events';
import { FACTORY_ADDRESS, BUNDLE_ID } from './utils/constants';

const log = debug('vulcanize:resolver');

export { BlockHeight };

export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

const gqlCacheConfig = indexer.serverConfig.gqlCache;

Expand Down
3 changes: 1 addition & 2 deletions packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import typeDefs from './schema';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

Expand All @@ -31,7 +30,7 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);

await serverCmd.init(Database, { uniClient, erc20Client });
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);

return serverCmd.exec(createResolvers, typeDefs);
};
Expand Down
2 changes: 0 additions & 2 deletions packages/uni-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
"debug": "^4.3.1",
"ethers": "^5.2.0",
"express": "^4.18.2",
"graphql-import-node": "^0.0.4",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",
Expand Down
Loading