Skip to content

Commit

Permalink
Merge pull request #400 from deep-stack/ng-refactor-event-watcher
Browse files Browse the repository at this point in the history
Refactor event-watcher and use common code from watcher-ts
  • Loading branch information
prathamesh0 authored Nov 18, 2022
2 parents 6ffc248 + 3fc4d1c commit e32de56
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 129 deletions.
60 changes: 2 additions & 58 deletions packages/erc20-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import {
EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
JobQueue
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
Expand All @@ -19,11 +17,6 @@ import {
} from '@vulcanize/util';

import { Indexer } from './indexer';
import { Event } from './entity/Event';

const EVENT = 'event';

const log = debug('vulcanize:events');

export class EventWatcher {
_ethClient: EthClient
Expand All @@ -45,7 +38,7 @@ export class EventWatcher {
}

getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([EVENT]);
return this._baseEventWatcher.getEventIterator();
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand All @@ -66,62 +59,13 @@ export class EventWatcher {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { request, failed, state, createdOn } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;

// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);

if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}

async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const { block: { blockHash }, contract: token } = dbEvent;
const resultEvent = this._indexer.getResultEvent(dbEvent);

log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);

// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(EVENT, {
onTokenEvent: {
blockHash,
token,
event: resultEvent
}
});
}
}
}
16 changes: 1 addition & 15 deletions packages/uni-info-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,27 +167,13 @@ export class EventWatcher implements EventWatcherInterface {
}

async initBlockProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}
Expand Down
8 changes: 5 additions & 3 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1856,6 +1856,11 @@ export class Indexer implements IndexerInterface {
try {
const transaction = await loadTransaction(this._db, dbTx, { block, tx }, this._serverConfig.skipStateFieldsUpdate);
position.transaction = transaction.id;
position.feeGrowthInside0LastX128 = BigInt(positionResult.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(positionResult.feeGrowthInside1LastX128.toString());

// Save position to DB to load entity fields with default values.
position = await this._db.savePosition(dbTx, position, block);

await dbTx.commitTransaction();
} catch (error) {
Expand All @@ -1864,9 +1869,6 @@ export class Indexer implements IndexerInterface {
} finally {
await dbTx.release();
}

position.feeGrowthInside0LastX128 = BigInt(positionResult.feeGrowthInside0LastX128.toString());
position.feeGrowthInside1LastX128 = BigInt(positionResult.feeGrowthInside1LastX128.toString());
} catch (error: any) {
// The contract call reverts in situations where the position is minted and deleted in the same block.
// From my investigation this happens in calls from BancorSwap.
Expand Down
55 changes: 2 additions & 53 deletions packages/uni-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,19 @@
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
import {
EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
JobQueue,
EventWatcherInterface
} from '@cerc-io/util';
import { UpstreamConfig } from '@vulcanize/util';

import { Indexer } from './indexer';
import { Event } from './entity/Event';

const log = debug('vulcanize:events');

export const UniswapEvent = 'uniswap-event';

export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
Expand All @@ -41,7 +34,7 @@ export class EventWatcher implements EventWatcherInterface {
}

getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([UniswapEvent]);
return this._baseEventWatcher.getEventIterator();
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand All @@ -62,57 +55,13 @@ export class EventWatcher implements EventWatcherInterface {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { request, failed, state, createdOn } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;

// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);

if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishUniswapEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}

async publishUniswapEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);

log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);

// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(UniswapEvent, {
onEvent: resultEvent
});
}
}
}

0 comments on commit e32de56

Please sign in to comment.