Skip to content

Commit

Permalink
Refactor event-watcher and move code to util (#242)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi authored Nov 18, 2022
1 parent f3c65cb commit 662c79a
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 330 deletions.
55 changes: 2 additions & 53 deletions packages/codegen/src/templates/events-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
Expand All @@ -13,16 +12,10 @@ import {
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { Event } from './entity/Event';

const EVENT = 'event';

const log = debug('vulcanize:events');

export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
Expand All @@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
}

getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([EVENT]);
return this._baseEventWatcher.getEventIterator();
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand All @@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { request, failed, state, createdOn } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;

// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);

if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}

async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);

log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);

// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(EVENT, {
onEvent: resultEvent
});
}
}
}
55 changes: 2 additions & 53 deletions packages/eden-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
Expand All @@ -13,16 +12,10 @@ import {
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { Event } from './entity/Event';

const EVENT = 'event';

const log = debug('vulcanize:events');

export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
Expand All @@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
}

getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([EVENT]);
return this._baseEventWatcher.getEventIterator();
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand All @@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { request, failed, state, createdOn } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;

// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);

if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}

async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);

log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);

// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(EVENT, {
onEvent: resultEvent
});
}
}
}
63 changes: 4 additions & 59 deletions packages/erc20-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,21 @@
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { Event } from './entity/Event';

const EVENT = 'event';

const log = debug('vulcanize:events');

export class EventWatcher {
export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
_indexer: Indexer
_subscription: ZenObservable.Subscription | undefined
Expand All @@ -43,7 +37,7 @@ export class EventWatcher {
}

getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([EVENT]);
return this._baseEventWatcher.getEventIterator();
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand All @@ -64,62 +58,13 @@ export class EventWatcher {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { request, failed, state, createdOn } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;

// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);

if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}

async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const { block: { blockHash }, contract: token } = dbEvent;
const resultEvent = this._indexer.getResultEvent(dbEvent);

log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);

// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(EVENT, {
onTokenEvent: {
blockHash,
token,
event: resultEvent
}
});
}
}
}
55 changes: 2 additions & 53 deletions packages/erc721-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//

import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';

import { EthClient } from '@cerc-io/ipld-eth-client';
Expand All @@ -13,16 +12,10 @@ import {
EventWatcherInterface,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { Event } from './entity/Event';

const EVENT = 'event';

const log = debug('vulcanize:events');

export class EventWatcher implements EventWatcherInterface {
_ethClient: EthClient
Expand All @@ -44,7 +37,7 @@ export class EventWatcher implements EventWatcherInterface {
}

getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([EVENT]);
return this._baseEventWatcher.getEventIterator();
}

getBlockProgressEventIterator (): AsyncIterator<any> {
Expand All @@ -65,57 +58,13 @@ export class EventWatcher implements EventWatcherInterface {

async initBlockProcessingOnCompleteHandler (): Promise<void> {
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
const { id, data: { failed } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
return;
}

await this._baseEventWatcher.blockProcessingCompleteHandler(job);
});
}

async initEventProcessingOnCompleteHandler (): Promise<void> {
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
const { id, data: { request, failed, state, createdOn } } = job;

if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}

const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;

// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);

if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
});
}

async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
const resultEvent = this._indexer.getResultEvent(dbEvent);

log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);

// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish(EVENT, {
onEvent: resultEvent
});
}
}
}
Loading

0 comments on commit 662c79a

Please sign in to comment.