Skip to content

Commit

Permalink
Use event watcher from watcher-ts util
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Nov 25, 2022
1 parent bd99328 commit 941fb10
Show file tree
Hide file tree
Showing 18 changed files with 15 additions and 231 deletions.
1 change: 0 additions & 1 deletion packages/erc20-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
"ethers": "^5.2.0",
"express": "^4.18.2",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",
Expand Down
69 changes: 0 additions & 69 deletions packages/erc20-watcher/src/events.ts

This file was deleted.

3 changes: 1 addition & 2 deletions packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import { FillCmd } from '@cerc-io/cli';

import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const fillCmd = new FillCmd();
await fillCmd.init(Database);
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);

await fillCmd.exec();
};
Expand Down
6 changes: 2 additions & 4 deletions packages/erc20-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';

import { gqlTotalQueryCount, gqlQueryCount, ValueResult, IndexerInterface, EventWatcherInterface } from '@cerc-io/util';
import { gqlTotalQueryCount, gqlQueryCount, ValueResult, IndexerInterface, EventWatcher } from '@cerc-io/util';

import { CONTRACT_KIND, Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:resolver');

export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

return {
BigInt: new BigInt('bigInt'),
Expand Down
3 changes: 1 addition & 2 deletions packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import typeDefs from './schema';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const serverCmd = new ServerCmd();
await serverCmd.init(Database);
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);

return serverCmd.exec(createResolvers, typeDefs);
};
Expand Down
1 change: 0 additions & 1 deletion packages/uni-info-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"express": "^4.18.2",
"graphql": "^15.5.0",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",
Expand Down
4 changes: 2 additions & 2 deletions packages/uni-info-watcher/src/cli/import-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { Config } from '@vulcanize/util';

import { Database } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';

const log = debug('vulcanize:import-state');
Expand All @@ -30,7 +29,8 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);

await importStateCmd.init(Database, { uniClient, erc20Client });
await importStateCmd.initIndexer(Indexer, EventWatcher);
await importStateCmd.initIndexer(Indexer);

await importStateCmd.exec(State);
};

Expand Down
57 changes: 0 additions & 57 deletions packages/uni-info-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,6 @@
// Copyright 2021 Vulcanize, Inc.
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, EventWatcher as BaseEventWatcher, JobQueue, EventWatcherInterface, IndexerInterface } from '@cerc-io/util';

import { Indexer } from './indexer';

const log = debug('vulcanize:events');

export interface PoolCreatedEvent {
__typename: 'PoolCreatedEvent';
token0: string;
Expand Down Expand Up @@ -131,49 +120,3 @@ export interface ResultEvent {
data: string;
}
}

export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription?: ZenObservable.Subscription
_pubsub: PubSub
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher

constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
}

getBlockProgressEventIterator (): AsyncIterator<any> {
return this._baseEventWatcher.getBlockProgressEventIterator();
}

async start (): Promise<void> {
assert(!this._subscription, 'subscription already started');
log('Started watching upstream events...');

await this.initBlockProcessingOnCompleteHandler();
await this.initEventProcessingOnCompleteHandler();
this._baseEventWatcher.startBlockProcessing();
}

async stop (): Promise<void> {
this._baseEventWatcher.stop();
}

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
}
3 changes: 1 addition & 2 deletions packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { Client as ERC20Client } from '@vulcanize/erc20-watcher';

import { Database, ENTITIES } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { FACTORY_ADDRESS } from './utils/constants';

const log = debug('vulcanize:fill');
Expand All @@ -30,7 +29,7 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);

await fillCmd.init(Database, { uniClient, erc20Client });
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);

await fillCmd.exec(getContractEntitiesMap());
};
Expand Down
6 changes: 2 additions & 4 deletions packages/uni-info-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
gqlQueryCount,
gqlTotalQueryCount,
IndexerInterface,
EventWatcherInterface
EventWatcher
} from '@cerc-io/util';

import { Indexer } from './indexer';
Expand All @@ -41,16 +41,14 @@ import { TickHourData } from './entity/TickHourData';
import { Flash } from './entity/Flash';
import { Collect } from './entity/Collect';
import { PoolHourData } from './entity/PoolHourData';
import { EventWatcher } from './events';
import { FACTORY_ADDRESS, BUNDLE_ID } from './utils/constants';

const log = debug('vulcanize:resolver');

export { BlockHeight };

export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

const gqlCacheConfig = indexer.serverConfig.gqlCache;

Expand Down
3 changes: 1 addition & 2 deletions packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import typeDefs from './schema';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

Expand All @@ -31,7 +30,7 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);

await serverCmd.init(Database, { uniClient, erc20Client });
await serverCmd.initIndexer(Indexer, EventWatcher);
await serverCmd.initIndexer(Indexer);

return serverCmd.exec(createResolvers, typeDefs);
};
Expand Down
1 change: 0 additions & 1 deletion packages/uni-watcher/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"ethers": "^5.2.0",
"express": "^4.18.2",
"graphql-request": "^3.4.0",
"graphql-subscriptions": "^2.0.0",
"lodash": "^4.17.21",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",
Expand Down
67 changes: 0 additions & 67 deletions packages/uni-watcher/src/events.ts

This file was deleted.

3 changes: 1 addition & 2 deletions packages/uni-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import { FillCmd } from '@cerc-io/cli';

import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const fillCmd = new FillCmd();
await fillCmd.init(Database);
await fillCmd.initIndexer(Indexer, EventWatcher);
await fillCmd.initIndexer(Indexer);

await fillCmd.exec();
};
Expand Down
6 changes: 2 additions & 4 deletions packages/uni-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ import {
gqlQueryCount,
ValueResult,
IndexerInterface,
EventWatcherInterface
EventWatcher
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:resolver');

export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher: EventWatcher): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

return {
BigInt: new BigInt('bigInt'),
Expand Down
Loading

0 comments on commit 941fb10

Please sign in to comment.