Skip to content

Commit

Permalink
Prefetch blocks with events (#372)
Browse files Browse the repository at this point in the history
* Prefetch blocks with events in a separate job

* Use constants from watcher-ts

* Fetch blocks in the job to index blocks

* Update watcher config

* Handle all blocks at a height in index job complete handler

* Use renamed imported methods
  • Loading branch information
prathamesh0 authored Oct 13, 2022
1 parent 175c77b commit 4226de5
Show file tree
Hide file tree
Showing 32 changed files with 160 additions and 219 deletions.
2 changes: 1 addition & 1 deletion packages/erc20-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

[upstream.cache]
name = "requests"
Expand All @@ -37,3 +36,4 @@
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50
blockDelayInMilliSecs = 2000
3 changes: 2 additions & 1 deletion packages/erc20-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';

import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { Config, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';

import { Database } from '../database';
import { CONTRACT_KIND, Indexer } from '../indexer';
Expand Down
8 changes: 5 additions & 3 deletions packages/erc20-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import assert from 'assert';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';

import {
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME
} from '@cerc-io/util';
import { EthClient } from '@vulcanize/ipld-eth-client';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
UNKNOWN_EVENT_NAME,
UpstreamConfig
} from '@vulcanize/util';

Expand Down
5 changes: 3 additions & 2 deletions packages/erc20-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'apollo-server-express';

import { getCache } from '@vulcanize/cache';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { getCache } from '@vulcanize/cache';
import { getConfig, fillBlocks, JobQueue, getCustomProvider } from '@vulcanize/util';

import { Database } from './database';
import { Indexer } from './indexer';
Expand Down
4 changes: 2 additions & 2 deletions packages/erc20-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';

import { UNKNOWN_EVENT_NAME, JobQueue, IndexerInterface } from '@vulcanize/util';
import { JobQueue, IndexerInterface } from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { Indexer as BaseIndexer, IPFSClient, ServerConfig, IpldStatus as IpldStatusInterface, ValueResult, Where, QueryOptions } from '@cerc-io/util';
import { Indexer as BaseIndexer, IPFSClient, ServerConfig, IpldStatus as IpldStatusInterface, ValueResult, Where, QueryOptions, UNKNOWN_EVENT_NAME } from '@cerc-io/util';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

import { Database } from './database';
Expand Down
11 changes: 7 additions & 4 deletions packages/erc20-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug';

import { EthClient } from '@cerc-io/ipld-eth-client';
import { JobQueueConfig, startMetricsServer } from '@cerc-io/util';
import {
JobQueueConfig,
startMetricsServer,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
DEFAULT_CONFIG_PATH
} from '@cerc-io/util';
import { getCache } from '@vulcanize/cache';
import {
getConfig,
JobQueue,
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
DEFAULT_CONFIG_PATH,
getCustomProvider
} from '@vulcanize/util';

Expand Down
6 changes: 5 additions & 1 deletion packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import 'graphql-import-node';
import { createServer } from 'http';

import { getCache } from '@vulcanize/cache';
import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue, KIND_ACTIVE, startGQLMetricsServer } from '@vulcanize/util';
import {
KIND_ACTIVE,
DEFAULT_CONFIG_PATH
} from '@cerc-io/util';
import { getConfig, getCustomProvider, JobQueue, startGQLMetricsServer } from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';

import typeDefs from './schema';
Expand Down
4 changes: 3 additions & 1 deletion packages/uni-info-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

[upstream.cache]
name = "requests"
Expand All @@ -60,3 +59,6 @@
jobDelayInMilliSecs = 1000
eventsInBatch = 50
subgraphEventsOrder = true
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
3 changes: 2 additions & 1 deletion packages/uni-info-watcher/src/cli/inspect-cid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import util from 'util';

import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { DEFAULT_CONFIG_PATH, Config, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { Config, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';

import { Database } from '../database';
import { Indexer } from '../indexer';
Expand Down
3 changes: 2 additions & 1 deletion packages/uni-info-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';

import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { Config, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';

Expand Down
3 changes: 2 additions & 1 deletion packages/uni-info-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import debug from 'debug';
import { PubSub } from 'apollo-server-express';

import { EthClient } from '@vulcanize/ipld-eth-client';
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UpstreamConfig } from '@vulcanize/util';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from '@cerc-io/util';
import { EventWatcher as BaseEventWatcher, EventWatcherInterface, JobQueue, UpstreamConfig } from '@vulcanize/util';

import { Indexer } from './indexer';

Expand Down
3 changes: 2 additions & 1 deletion packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug';

import { getCache } from '@vulcanize/cache';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { getConfig, fillBlocks, JobQueue, getCustomProvider } from '@vulcanize/util';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { EthClient } from '@cerc-io/ipld-eth-client';
Expand Down
2 changes: 0 additions & 2 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -693,9 +693,7 @@ export class Indexer implements IndexerInterface {
async _fetchEvents (block: DeepPartial<BlockProgress>): Promise<DeepPartial<Event>[]> {
assert(block.blockHash);

console.time('time:indexer#_fetchEvents-uni-get-events');
const events = await this._uniClient.getEvents(block.blockHash);
console.timeEnd('time:indexer#_fetchEvents-uni-get-events');

const dbEvents: Array<DeepPartial<Event>> = [];
const transactionsMap = new Map();
Expand Down
11 changes: 7 additions & 4 deletions packages/uni-info-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ import { getCache } from '@vulcanize/cache';
import {
getConfig,
JobQueue,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
JobRunner as BaseJobRunner,
DEFAULT_CONFIG_PATH,
getCustomProvider
} from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { JobQueueConfig, startMetricsServer } from '@cerc-io/util';
import {
JobQueueConfig,
startMetricsServer,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
DEFAULT_CONFIG_PATH
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { Database } from './database';
Expand Down
4 changes: 3 additions & 1 deletion packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import { createServer } from 'http';

import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue, startGQLMetricsServer } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';

import { getConfig, getCustomProvider, JobQueue, startGQLMetricsServer } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@cerc-io/ipld-eth-client';

Expand Down
4 changes: 3 additions & 1 deletion packages/uni-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000

[upstream.cache]
name = "requests"
Expand All @@ -36,3 +35,6 @@
jobDelayInMilliSecs = 0
eventsInBatch = 50
lazyUpdateBlockProgress = true
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
3 changes: 2 additions & 1 deletion packages/uni-watcher/src/chain-pruning.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import { AssertionError } from 'assert';
import 'mocha';
import _ from 'lodash';

import { getConfig, getCustomProvider, JobQueue, JobRunner, JOB_KIND_PRUNE } from '@vulcanize/util';
import { JOB_KIND_PRUNE } from '@cerc-io/util';
import { getConfig, getCustomProvider, JobQueue, JobRunner } from '@vulcanize/util';
import { getCache } from '@vulcanize/cache';
import { insertNDummyBlocks, removeEntities } from '@vulcanize/util/test';
import { EthClient } from '@cerc-io/ipld-eth-client';
Expand Down
3 changes: 2 additions & 1 deletion packages/uni-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import assert from 'assert';
import yargs from 'yargs';
import 'reflect-metadata';

import { Config, DEFAULT_CONFIG_PATH, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { Config, getConfig, getResetConfig, JobQueue } from '@vulcanize/util';

import { Database } from '../database';
import { Indexer } from '../indexer';
Expand Down
2 changes: 0 additions & 2 deletions packages/uni-watcher/src/entity/Event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, Index } from 'typeorm';
import { BlockProgress } from './BlockProgress';

export const UNKNOWN_EVENT_NAME = '__unknown__';

@Entity()
// Index to query events by block and event index.
@Index(['block', 'index'], { unique: true })
Expand Down
5 changes: 2 additions & 3 deletions packages/uni-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ import debug from 'debug';
import { PubSub } from 'apollo-server-express';

import { EthClient } from '@vulcanize/ipld-eth-client';
import { QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, UNKNOWN_EVENT_NAME } from '@cerc-io/util';
import {
JobQueue,
EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
EventWatcherInterface,
UpstreamConfig
} from '@vulcanize/util';

import { Indexer } from './indexer';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
import { Event } from './entity/Event';

const log = debug('vulcanize:events');

Expand Down
3 changes: 2 additions & 1 deletion packages/uni-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import { PubSub } from 'apollo-server-express';

import { getCache } from '@vulcanize/cache';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, getCustomProvider } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { getConfig, fillBlocks, JobQueue, getCustomProvider } from '@vulcanize/util';

import { Database } from './database';
import { Indexer } from './indexer';
Expand Down
6 changes: 2 additions & 4 deletions packages/uni-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import { ethers } from 'ethers';
import assert from 'assert';

import { JobQueue, IndexerInterface } from '@vulcanize/util';
import { Indexer as BaseIndexer, IPFSClient, IpldStatus as IpldStatusInterface, ServerConfig, Where, QueryOptions, ValueResult } from '@cerc-io/util';
import { Indexer as BaseIndexer, IPFSClient, IpldStatus as IpldStatusInterface, ServerConfig, Where, QueryOptions, ValueResult, UNKNOWN_EVENT_NAME } from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';

import { Database } from './database';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
import { Event } from './entity/Event';
import { BlockProgress } from './entity/BlockProgress';
import { Contract, KIND_FACTORY, KIND_POOL, KIND_NFPM } from './entity/Contract';
import { SyncStatus } from './entity/SyncStatus';
Expand Down Expand Up @@ -553,7 +553,6 @@ export class Indexer implements IndexerInterface {
async _fetchEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<DeepPartial<Event>[]> {
assert(blockHash);

console.time('time:indexer#_fetchAndSaveEvents-get-logs-txs');
const logsPromise = this._ethClient.getLogs({ blockHash });
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });

Expand All @@ -571,7 +570,6 @@ export class Indexer implements IndexerInterface {
}
}
] = await Promise.all([logsPromise, transactionsPromise]);
console.timeEnd('time:indexer#_fetchAndSaveEvents-get-logs-txs');

const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
Expand Down
11 changes: 7 additions & 4 deletions packages/uni-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import { hideBin } from 'yargs/helpers';
import debug from 'debug';

import { EthClient } from '@cerc-io/ipld-eth-client';
import { JobQueueConfig, startMetricsServer } from '@cerc-io/util';
import {
JobQueueConfig,
startMetricsServer,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
DEFAULT_CONFIG_PATH
} from '@cerc-io/util';
import { getCache } from '@vulcanize/cache';
import {
getConfig,
JobQueue,
JobRunner as BaseJobRunner,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
DEFAULT_CONFIG_PATH,
getCustomProvider
} from '@vulcanize/util';

Expand Down
3 changes: 2 additions & 1 deletion packages/uni-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import { createServer } from 'http';

import { EthClient } from '@cerc-io/ipld-eth-client';
import { getCache } from '@vulcanize/cache';
import { DEFAULT_CONFIG_PATH, getConfig, getCustomProvider, JobQueue, startGQLMetricsServer } from '@vulcanize/util';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { getConfig, getCustomProvider, JobQueue, startGQLMetricsServer } from '@vulcanize/util';

import typeDefs from './schema';

Expand Down
1 change: 0 additions & 1 deletion packages/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

export * from './src/config';
export * from './src/job-queue';
export * from './src/constants';
export * from './src/misc';
export * from './src/fill';
export * from './src/events';
Expand Down
Loading

0 comments on commit 4226de5

Please sign in to comment.