diff --git a/.gitignore b/.gitignore index e62ff4c0..ca638482 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules/ dist/ out/ +.vscode diff --git a/README.md b/README.md index 84c7a210..0013d22e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,6 @@ The default config files used by the watchers assume the following services are * `vulcanize/go-ethereum` on port 8545 * `vulcanize/ipld-eth-server` with native GQL API enabled on port 8082 and RPC API on port 8081 -* `postgraphile` on the `vulcanize/ipld-eth-server` database, on port 5000 To check whether the endpoints in watcher config are working, run: @@ -31,8 +30,6 @@ yarn check-config --config-file ../erc20-watcher/environments/local.toml yarn check-config --config-file ../uni-watcher/environments/local.toml # vulcanize:check-config Checking ipld-eth-server GQL endpoint http://127.0.0.1:8082/graphql +0ms # vulcanize:check-config ipld-eth-server GQL endpoint working +33ms -# vulcanize:check-config Checking postgraphile GQL endpoint http://127.0.0.1:5000/graphql +1ms -# vulcanize:check-config postgraphile GQL endpoint working +12ms # vulcanize:check-config Checking RPC endpoint http://127.0.0.1:8081 +1ms # vulcanize:check-config RPC endpoint working +25ms ``` diff --git a/packages/address-watcher/README.md b/packages/address-watcher/README.md index b30efc56..ee2e4203 100644 --- a/packages/address-watcher/README.md +++ b/packages/address-watcher/README.md @@ -34,7 +34,7 @@ createdb address-watcher Update `environments/local.toml` with database connection settings for both the databases. -Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API, the `indexer-db` postgraphile and the tracing API (`debug_traceTransaction` RPC provider) endpoints. +Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the tracing API (`debug_traceTransaction` RPC provider) endpoints. ## Run diff --git a/packages/address-watcher/environments/local.toml b/packages/address-watcher/environments/local.toml index 19f9bd99..363b83ff 100644 --- a/packages/address-watcher/environments/local.toml +++ b/packages/address-watcher/environments/local.toml @@ -17,7 +17,6 @@ [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" [upstream.cache] name = "requests" diff --git a/packages/address-watcher/src/fill.ts b/packages/address-watcher/src/fill.ts index fb235dd4..c1af472d 100644 --- a/packages/address-watcher/src/fill.ts +++ b/packages/address-watcher/src/fill.ts @@ -55,14 +55,13 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + const { ethServer: { gqlApiEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream; + assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, + gqlEndpoint: gqlApiEndpoint, cache }); diff --git a/packages/address-watcher/src/job-runner.ts b/packages/address-watcher/src/job-runner.ts index 010cd3fa..5506d6d5 100644 --- a/packages/address-watcher/src/job-runner.ts +++ b/packages/address-watcher/src/job-runner.ts @@ -42,15 +42,13 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); diff --git a/packages/address-watcher/src/server.ts b/packages/address-watcher/src/server.ts index d2e7da54..3d4245d6 100644 --- a/packages/address-watcher/src/server.ts +++ b/packages/address-watcher/src/server.ts @@ -50,15 +50,13 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint }, traceProviderEndpoint, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(traceProviderEndpoint, 'Missing upstream traceProviderEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); diff --git a/packages/address-watcher/src/tx-watcher.ts b/packages/address-watcher/src/tx-watcher.ts index 9294d60d..65fa8eca 100644 --- a/packages/address-watcher/src/tx-watcher.ts +++ b/packages/address-watcher/src/tx-watcher.ts @@ -67,11 +67,12 @@ export class TxWatcher { } }); - this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => { - const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode'); - log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2)); - await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: true }); - }); + // TODO: Update to pull based watcher. + // this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => { + // const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode'); + // log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2)); + // await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: true }); + // }); } async publishAddressEventToSubscribers (txHash: string, timeElapsedInSeconds: number): Promise { diff --git a/packages/codegen/src/templates/config-template.handlebars b/packages/codegen/src/templates/config-template.handlebars index b6a92a5f..2f2a2826 100644 --- a/packages/codegen/src/templates/config-template.handlebars +++ b/packages/codegen/src/templates/config-template.handlebars @@ -16,7 +16,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" blockDelayInMilliSecs = 2000 diff --git a/packages/codegen/src/templates/events-template.handlebars b/packages/codegen/src/templates/events-template.handlebars index ffdc62e2..7bf44c52 100644 --- a/packages/codegen/src/templates/events-template.handlebars +++ b/packages/codegen/src/templates/events-template.handlebars @@ -31,7 +31,7 @@ export class EventWatcher { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); @@ -39,7 +39,7 @@ export class EventWatcher { this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/codegen/src/templates/fill-template.handlebars b/packages/codegen/src/templates/fill-template.handlebars index cf77817c..c5b3db9d 100644 --- a/packages/codegen/src/templates/fill-template.handlebars +++ b/packages/codegen/src/templates/fill-template.handlebars @@ -54,19 +54,12 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -75,7 +68,7 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(db, ethClient, ethProvider); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; assert(dbConnectionString, 'Missing job queue db connection string'); @@ -83,11 +76,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); assert(jobQueueConfig, 'Missing job queue config'); - await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, blockDelayInMilliSecs, argv); + await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); }; main().catch(err => { diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index 854b8e91..3d32abda 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -54,22 +54,20 @@ export class Indexer { _db: Database _ethClient: EthClient _ethProvider: BaseProvider - _postgraphileClient: EthClient; _baseIndexer: BaseIndexer _abi: JsonFragment[] _storageLayout: StorageLayout _contract: ethers.utils.Interface - constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider) { + constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider) { assert(db); assert(ethClient); this._db = db; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider); const { abi, storageLayout } = artifacts; @@ -306,7 +304,7 @@ export class Indexer { } ] } - } = await this._postgraphileClient.getBlockWithTransactions({ blockHash }); + } = await this._ethClient.getBlockWithTransactions({ blockHash }); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index 1549db23..361ddbba 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -90,25 +90,18 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(db, ethClient, ethProvider); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/codegen/src/templates/readme-template.handlebars b/packages/codegen/src/templates/readme-template.handlebars index 6cec7402..5c55024b 100644 --- a/packages/codegen/src/templates/readme-template.handlebars +++ b/packages/codegen/src/templates/readme-template.handlebars @@ -37,14 +37,14 @@ * Update the [config](./environments/local.toml) with database connection settings. -* Update the `upstream` config in the [config file](./environments/local.toml) and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints. +* Update the `upstream` config in the [config file](./environments/local.toml) and provide the `ipld-eth-server` GQL API endpoint. ## Customize * Indexing on an event: * Edit the custom hook function `handleEvent` (triggered on an event) in [hooks.ts](./src/hooks.ts) to perform corresponding indexing using the `Indexer` object. - + * Refer to [hooks.example.ts](./src/hooks.example.ts) for an example hook function for events in an ERC20 contract. ## Run diff --git a/packages/codegen/src/templates/server-template.handlebars b/packages/codegen/src/templates/server-template.handlebars index 7c95ec4e..d7d6e1ad 100644 --- a/packages/codegen/src/templates/server-template.handlebars +++ b/packages/codegen/src/templates/server-template.handlebars @@ -50,26 +50,19 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); const ethProvider = getCustomProvider(rpcProviderEndpoint); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider); + const indexer = new Indexer(db, ethClient, ethProvider); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries @@ -81,7 +74,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/erc20-watcher/README.md b/packages/erc20-watcher/README.md index fa0ca998..571f5406 100644 --- a/packages/erc20-watcher/README.md +++ b/packages/erc20-watcher/README.md @@ -34,7 +34,7 @@ createdb erc20-watcher Update `environments/local.toml` with database connection settings for both the databases. -Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints. +Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API endpoint. ## Run diff --git a/packages/erc20-watcher/environments/local.toml b/packages/erc20-watcher/environments/local.toml index f2d84ae1..b2335aad 100644 --- a/packages/erc20-watcher/environments/local.toml +++ b/packages/erc20-watcher/environments/local.toml @@ -18,7 +18,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" blockDelayInMilliSecs = 2000 diff --git a/packages/erc20-watcher/src/cli/reset-cmds/state.ts b/packages/erc20-watcher/src/cli/reset-cmds/state.ts index 902cec68..2dad30a7 100644 --- a/packages/erc20-watcher/src/cli/reset-cmds/state.ts +++ b/packages/erc20-watcher/src/cli/reset-cmds/state.ts @@ -31,7 +31,7 @@ export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); const { jobQueue: jobQueueConfig } = config; - const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { dbConfig, serverConfig, ethClient, ethProvider } = await getResetConfig(config); // Initialize database. const db = new Database(dbConfig); @@ -44,7 +44,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, serverConfig.mode); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/erc20-watcher/src/cli/watch-contract.ts b/packages/erc20-watcher/src/cli/watch-contract.ts index 60e0d662..445fdfc2 100644 --- a/packages/erc20-watcher/src/cli/watch-contract.ts +++ b/packages/erc20-watcher/src/cli/watch-contract.ts @@ -38,7 +38,7 @@ import { Indexer } from '../indexer'; const config: Config = await getConfig(argv.configFile); const { database: dbConfig, server: { mode }, jobQueue: jobQueueConfig } = config; - const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, ethProvider } = await getResetConfig(config); assert(dbConfig); @@ -53,7 +53,7 @@ import { Indexer } from '../indexer'; const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode); await indexer.watchContract(argv.address, argv.startingBlock); diff --git a/packages/erc20-watcher/src/events.ts b/packages/erc20-watcher/src/events.ts index ecd0aa6a..aaf4fc67 100644 --- a/packages/erc20-watcher/src/events.ts +++ b/packages/erc20-watcher/src/events.ts @@ -31,7 +31,7 @@ export class EventWatcher { _pubsub: PubSub _jobQueue: JobQueue - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { assert(ethClient); assert(indexer); @@ -39,7 +39,7 @@ export class EventWatcher { this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/erc20-watcher/src/fill.ts b/packages/erc20-watcher/src/fill.ts index 6a521a27..fc76540f 100644 --- a/packages/erc20-watcher/src/fill.ts +++ b/packages/erc20-watcher/src/fill.ts @@ -67,18 +67,11 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -94,9 +87,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index b4e97639..35efb1b4 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -46,7 +46,6 @@ interface EventResult { export class Indexer { _db: Database _ethClient: EthClient - _postgraphileClient: EthClient _ethProvider: BaseProvider _baseIndexer: BaseIndexer @@ -55,16 +54,15 @@ export class Indexer { _contract: ethers.utils.Interface _serverMode: string - constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) { + constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, serverMode: string) { assert(db); assert(ethClient); this._db = db; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverMode = serverMode; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue); const { abi, storageLayout } = artifacts; diff --git a/packages/erc20-watcher/src/job-runner.ts b/packages/erc20-watcher/src/job-runner.ts index f8ff54ed..8c5f9adf 100644 --- a/packages/erc20-watcher/src/job-runner.ts +++ b/packages/erc20-watcher/src/job-runner.ts @@ -80,19 +80,12 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -106,7 +99,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); diff --git a/packages/erc20-watcher/src/server.ts b/packages/erc20-watcher/src/server.ts index b2502eaf..eb959f70 100644 --- a/packages/erc20-watcher/src/server.ts +++ b/packages/erc20-watcher/src/server.ts @@ -51,19 +51,12 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -80,9 +73,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue, mode); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { await jobQueue.start(); diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index ffbdce8c..b574bf96 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -76,7 +76,7 @@ export class EthClient { return this._graphqlClient.query( ethQueries.getBlocks, { - blockNumber, + blockNumber: blockNumber?.toString(), blockHash } ); @@ -113,14 +113,6 @@ export class EthClient { return { logs, block }; } - async watchBlocks (onNext: (value: any) => void): Promise { - return this._graphqlClient.subscribe(ethQueries.subscribeBlocks, onNext); - } - - async watchTransactions (onNext: (value: any) => void): Promise { - return this._graphqlClient.subscribe(ethQueries.subscribeTransactions, onNext); - } - async _getCachedOrFetch (queryName: keyof typeof ethQueries, vars: Vars): Promise { const keyObj = { queryName, diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index 91ff40df..af4e99e6 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -95,44 +95,10 @@ query block($blockHash: Bytes32) { } `; -export const subscribeBlocks = gql` -subscription { - listen(topic: "header_cids") { - relatedNode { - ... on EthHeaderCid { - blockHash - blockNumber - parentHash - timestamp - } - } - } -} -`; - -export const subscribeTransactions = gql` -subscription SubscriptionHeader { - listen(topic: "transaction_cids") { - relatedNode { - ... on EthTransactionCid { - txHash - ethHeaderCidByHeaderId { - blockHash - blockNumber - parentHash - } - } - } - } -} -`; - export default { getStorageAt, getLogs, getBlockWithTransactions, getBlocks, - getBlockByHash, - subscribeBlocks, - subscribeTransactions + getBlockByHash }; diff --git a/packages/lighthouse-watcher/environments/local.toml b/packages/lighthouse-watcher/environments/local.toml index 5e889b98..807c9581 100644 --- a/packages/lighthouse-watcher/environments/local.toml +++ b/packages/lighthouse-watcher/environments/local.toml @@ -8,7 +8,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" [upstream.cache] name = "requests" diff --git a/packages/lighthouse-watcher/src/events.ts b/packages/lighthouse-watcher/src/events.ts index 1c9c51f5..c18bbf02 100644 --- a/packages/lighthouse-watcher/src/events.ts +++ b/packages/lighthouse-watcher/src/events.ts @@ -38,16 +38,18 @@ export class EventWatcher { async watchBlocksAtChainHead (): Promise { log('Started watching upstream blocks...'); - this._subscription = await this._ethClient.watchBlocks(async (value) => { - const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode'); - log('watchBlock', blockHash, blockNumber); - const events = await this._indexer.getOrFetchBlockEvents(blockHash); + // TODO: Update to pull based watcher. + // this._subscription = await this._ethClient.watchBlocks(async (value) => { + // const { blockHash, blockNumber } = _.get(value, 'data.listen.relatedNode'); + // log('watchBlock', blockHash, blockNumber); - for (let ei = 0; ei < events.length; ei++) { - await this.publishLighthouseEventToSubscribers(events[ei]); - } - }); + // const events = await this._indexer.getOrFetchBlockEvents(blockHash); + + // for (let ei = 0; ei < events.length; ei++) { + // await this.publishLighthouseEventToSubscribers(events[ei]); + // } + // }); } async publishLighthouseEventToSubscribers (resultEvent: ResultEvent): Promise { diff --git a/packages/lighthouse-watcher/src/indexer.ts b/packages/lighthouse-watcher/src/indexer.ts index f90ffe27..4875a295 100644 --- a/packages/lighthouse-watcher/src/indexer.ts +++ b/packages/lighthouse-watcher/src/indexer.ts @@ -37,15 +37,13 @@ export interface Config extends BaseConfig { export class Indexer { _config: Config _ethClient: EthClient - _postgraphileClient: EthClient _lighthouseContract: ethers.utils.Interface - constructor (config: Config, ethClient: EthClient, postgraphileClient: EthClient) { + constructor (config: Config, ethClient: EthClient) { assert(config.watch); this._config = config; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; this._lighthouseContract = new ethers.utils.Interface(lighthouseABI); } @@ -96,7 +94,7 @@ export class Indexer { } ] } - } = await this._postgraphileClient.getBlockWithTransactions({ blockHash }); + } = await this._ethClient.getBlockWithTransactions({ blockHash }); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; diff --git a/packages/lighthouse-watcher/src/server.ts b/packages/lighthouse-watcher/src/server.ts index 71a663a9..02761e18 100644 --- a/packages/lighthouse-watcher/src/server.ts +++ b/packages/lighthouse-watcher/src/server.ts @@ -43,23 +43,16 @@ export const main = async (): Promise => { const { upstream } = config; assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, cache }); - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const indexer = new Indexer(config, ethClient, postgraphileClient); + const indexer = new Indexer(config, ethClient); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries diff --git a/packages/uni-info-watcher/environments/local.toml b/packages/uni-info-watcher/environments/local.toml index 09224cd9..7090818a 100644 --- a/packages/uni-info-watcher/environments/local.toml +++ b/packages/uni-info-watcher/environments/local.toml @@ -23,7 +23,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" blockDelayInMilliSecs = 2000 diff --git a/packages/uni-info-watcher/environments/test.toml b/packages/uni-info-watcher/environments/test.toml index c1fed6e8..bc5e233c 100644 --- a/packages/uni-info-watcher/environments/test.toml +++ b/packages/uni-info-watcher/environments/test.toml @@ -18,7 +18,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" rpcProviderEndpoint = "http://127.0.0.1:8545" blockDelayInMilliSecs = 2000 diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts index 9da639c9..335225a3 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts @@ -48,7 +48,7 @@ export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); const { jobQueue: jobQueueConfig } = config; - const { dbConfig, serverConfig, upstreamConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { dbConfig, serverConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config); // Initialize database. const db = new Database(dbConfig); @@ -70,7 +70,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, serverConfig.mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, serverConfig.mode); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-info-watcher/src/events.ts b/packages/uni-info-watcher/src/events.ts index 2c207389..c1e32a75 100644 --- a/packages/uni-info-watcher/src/events.ts +++ b/packages/uni-info-watcher/src/events.ts @@ -124,12 +124,12 @@ export class EventWatcher implements EventWatcherInterface { _jobQueue: JobQueue _baseEventWatcher: BaseEventWatcher - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getBlockProgressEventIterator (): AsyncIterator { diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 0f46f700..c18f0f21 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -69,19 +69,12 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig, uniWatcher, tokenWatcher } = upstream; const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -100,9 +93,9 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, eventWatcher, blockDelayInMilliSecs, argv); }; diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index d2f4eab9..2126e2ee 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -47,22 +47,19 @@ export class Indexer implements IndexerInterface { _uniClient: UniClient _erc20Client: ERC20Client _ethClient: EthClient - _postgraphileClient: EthClient _baseIndexer: BaseIndexer _isDemo: boolean - constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) { + constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, jobQueue: JobQueue, mode: string) { assert(db); assert(uniClient); assert(erc20Client); - assert(postgraphileClient); this._db = db; this._uniClient = uniClient; this._erc20Client = erc20Client; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, ethProvider, jobQueue); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, ethProvider, jobQueue); this._isDemo = mode === 'demo'; } diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index 08dbaf45..ce806f29 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -93,7 +93,6 @@ export const main = async (): Promise => { cache: cacheConfig, ethServer: { gqlApiEndpoint, - gqlPostgraphileEndpoint, rpcProviderEndpoint } } = upstream; @@ -106,12 +105,6 @@ export const main = async (): Promise => { const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -131,7 +124,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode); await indexer.init(); if (mode !== 'demo') { diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index 01a1d625..37b513f7 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -56,7 +56,6 @@ export const main = async (): Promise => { const { ethServer: { gqlApiEndpoint, - gqlPostgraphileEndpoint, rpcProviderEndpoint }, uniWatcher, @@ -65,17 +64,10 @@ export const main = async (): Promise => { } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -91,10 +83,10 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode); const pubSub = new PubSub(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubSub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubSub, jobQueue); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); diff --git a/packages/uni-info-watcher/test/init.ts b/packages/uni-info-watcher/test/init.ts index 72d77ce6..1d8a3cd7 100644 --- a/packages/uni-info-watcher/test/init.ts +++ b/packages/uni-info-watcher/test/init.ts @@ -46,7 +46,7 @@ const main = async () => { const erc20Client = new ERC20Client(tokenWatcher); - const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, ethProvider } = await getResetConfig(config); assert(jobQueueConfig, 'Missing job queue config'); const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; @@ -55,7 +55,7 @@ const main = async () => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, uniClient, erc20Client, ethClient, postgraphileClient, ethProvider, jobQueue, mode); + const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, jobQueue, mode); await indexer.init(); // Get the factory contract address. diff --git a/packages/uni-watcher/environments/local.toml b/packages/uni-watcher/environments/local.toml index ffdf88d7..4a7ea97d 100644 --- a/packages/uni-watcher/environments/local.toml +++ b/packages/uni-watcher/environments/local.toml @@ -20,7 +20,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" rpcProviderEndpoint = "http://127.0.0.1:8081" blockDelayInMilliSecs = 2000 diff --git a/packages/uni-watcher/environments/test.toml b/packages/uni-watcher/environments/test.toml index 6bd041f3..8a0d0e0d 100644 --- a/packages/uni-watcher/environments/test.toml +++ b/packages/uni-watcher/environments/test.toml @@ -15,7 +15,6 @@ [upstream] [upstream.ethServer] gqlApiEndpoint = "http://127.0.0.1:8082/graphql" - gqlPostgraphileEndpoint = "http://127.0.0.1:5000/graphql" rpcProviderEndpoint = "http://127.0.0.1:8545" blockDelayInMilliSecs = 2000 diff --git a/packages/uni-watcher/src/chain-pruning.test.ts b/packages/uni-watcher/src/chain-pruning.test.ts index 26869864..2c600069 100644 --- a/packages/uni-watcher/src/chain-pruning.test.ts +++ b/packages/uni-watcher/src/chain-pruning.test.ts @@ -45,19 +45,12 @@ describe('chain pruning', () => { // Create an Indexer object. assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -68,7 +61,7 @@ describe('chain pruning', () => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + indexer = new Indexer(db, ethClient, ethProvider, jobQueue); assert(indexer, 'Could not create indexer object.'); jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/uni-watcher/src/cli/reset-cmds/state.ts b/packages/uni-watcher/src/cli/reset-cmds/state.ts index 316b2155..e43b09b6 100644 --- a/packages/uni-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-watcher/src/cli/reset-cmds/state.ts @@ -29,7 +29,7 @@ export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); await resetJobs(config); const { jobQueue: jobQueueConfig } = config; - const { dbConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { dbConfig, ethClient, ethProvider } = await getResetConfig(config); // Initialize database. const db = new Database(dbConfig); @@ -42,7 +42,7 @@ export const handler = async (argv: any): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); const syncStatus = await indexer.getSyncStatus(); assert(syncStatus, 'Missing syncStatus'); diff --git a/packages/uni-watcher/src/cli/watch-contract.ts b/packages/uni-watcher/src/cli/watch-contract.ts index c289080a..26733743 100644 --- a/packages/uni-watcher/src/cli/watch-contract.ts +++ b/packages/uni-watcher/src/cli/watch-contract.ts @@ -44,7 +44,7 @@ import { Indexer } from '../indexer'; const config: Config = await getConfig(argv.configFile); const { database: dbConfig, jobQueue: jobQueueConfig } = config; - const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, ethProvider } = await getResetConfig(config); assert(dbConfig); @@ -59,7 +59,7 @@ import { Indexer } from '../indexer'; const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); await indexer.init(); await indexer.watchContract(argv.address, argv.kind, argv.startingBlock); diff --git a/packages/uni-watcher/src/events.ts b/packages/uni-watcher/src/events.ts index 5d4ddfa1..4e7b2104 100644 --- a/packages/uni-watcher/src/events.ts +++ b/packages/uni-watcher/src/events.ts @@ -31,12 +31,12 @@ export class EventWatcher implements EventWatcherInterface { _jobQueue: JobQueue _baseEventWatcher: BaseEventWatcher - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { + constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { this._ethClient = ethClient; this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; - this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, postgraphileClient, this._indexer, this._pubsub, this._jobQueue); + this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); } getEventIterator (): AsyncIterator { diff --git a/packages/uni-watcher/src/fill.ts b/packages/uni-watcher/src/fill.ts index 2141fe27..65a7bf48 100644 --- a/packages/uni-watcher/src/fill.ts +++ b/packages/uni-watcher/src/fill.ts @@ -67,18 +67,11 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, blockDelayInMilliSecs }, cache: cacheConfig } = upstream; const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -94,10 +87,10 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 699780c4..9c7e0669 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -38,7 +38,6 @@ type ResultEvent = { export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient - _postgraphileClient: EthClient _baseIndexer: BaseIndexer _ethProvider: ethers.providers.BaseProvider @@ -46,12 +45,11 @@ export class Indexer implements IndexerInterface { _poolContract: ethers.utils.Interface _nfpmContract: ethers.utils.Interface - constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { + constructor (db: Database, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { this._db = db; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue); this._factoryContract = new ethers.utils.Interface(factoryABI); this._poolContract = new ethers.utils.Interface(poolABI); @@ -444,7 +442,7 @@ export class Indexer implements IndexerInterface { console.time('time:indexer#_fetchAndSaveEvents-get-logs-txs'); const logsPromise = this._ethClient.getLogs({ blockHash }); - const transactionsPromise = this._postgraphileClient.getBlockWithTransactions({ blockHash }); + const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); const [ { logs }, diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 7004362d..1bacdad0 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -81,19 +81,12 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -107,7 +100,7 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); await indexer.init(); const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index 85a0fb63..e4813b63 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -51,19 +51,12 @@ export const main = async (): Promise => { await db.init(); assert(upstream, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -81,10 +74,10 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); await indexer.init(); - const eventWatcher = new EventWatcher(upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); + const eventWatcher = new EventWatcher(upstream, ethClient, indexer, pubsub, jobQueue); await eventWatcher.start(); const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher); diff --git a/packages/uni-watcher/src/smoke.test.ts b/packages/uni-watcher/src/smoke.test.ts index 3690616e..cc70a55f 100644 --- a/packages/uni-watcher/src/smoke.test.ts +++ b/packages/uni-watcher/src/smoke.test.ts @@ -64,7 +64,6 @@ describe('uni-watcher', () => { let db: Database; let uniClient: UniClient; let ethClient: EthClient; - let postgraphileClient: EthClient; let ethProvider: ethers.providers.JsonRpcProvider; let jobQueue: JobQueue; let signer: Signer; @@ -80,9 +79,8 @@ describe('uni-watcher', () => { assert(host, 'Missing host.'); assert(port, 'Missing port.'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint.'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint.'); assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint.'); assert(cacheConfig, 'Missing dbConfig.'); @@ -92,12 +90,6 @@ describe('uni-watcher', () => { const cache = await getCache(cacheConfig); ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -135,7 +127,7 @@ describe('uni-watcher', () => { factory = new Contract(factoryContract.address, FACTORY_ABI, signer); // Verifying with the db. - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); await indexer.init(); assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.'); }); @@ -271,7 +263,7 @@ describe('uni-watcher', () => { nfpm = new Contract(nfpmContract.address, NFPM_ABI, signer); // Verifying with the db. - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); await indexer.init(); assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.'); }); diff --git a/packages/uni-watcher/test/init.ts b/packages/uni-watcher/test/init.ts index b3f9a352..d278a872 100644 --- a/packages/uni-watcher/test/init.ts +++ b/packages/uni-watcher/test/init.ts @@ -57,7 +57,7 @@ const main = async () => { assert(host, 'Missing host.'); assert(port, 'Missing port.'); - const { ethClient, postgraphileClient, ethProvider } = await getResetConfig(config); + const { ethClient, ethProvider } = await getResetConfig(config); // Initialize uniClient. const endpoint = `http://${host}:${port}/graphql`; @@ -81,7 +81,7 @@ const main = async () => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); - const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue); + const indexer = new Indexer(db, ethClient, ethProvider, jobQueue); let factory: Contract; // Checking whether factory is deployed. diff --git a/packages/util/src/cli/check-config.ts b/packages/util/src/cli/check-config.ts index c32fccb3..ffadf713 100644 --- a/packages/util/src/cli/check-config.ts +++ b/packages/util/src/cli/check-config.ts @@ -23,7 +23,7 @@ const main = async () => { } }).argv; - const { upstream: { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint } } } = await getConfig(argv.configFile); + const { upstream: { ethServer: { gqlApiEndpoint, rpcProviderEndpoint } } } = await getConfig(argv.configFile); // Get latest block in chain using ipld-eth-server GQL. log(`Checking ipld-eth-server GQL endpoint ${gqlApiEndpoint}`); @@ -32,19 +32,10 @@ const main = async () => { assert(currentBlock && currentBlock.number); log('ipld-eth-server GQL endpoint working'); - // Get block by number using postgraphile. - log(`Checking postgraphile GQL endpoint ${gqlPostgraphileEndpoint}`); - const postgraphileClient = new EthClient({ gqlEndpoint: gqlPostgraphileEndpoint, cache: undefined }); - const { allEthHeaderCids: { nodes } } = await postgraphileClient.getBlocks({ blockNumber: currentBlock.number }); - assert(nodes.length); - const [{ blockHash }] = nodes; - assert(blockHash === currentBlock.hash); - log('postgraphile GQL endpoint working'); - // Get block by hash using RPC endpoint. log(`Checking RPC endpoint ${rpcProviderEndpoint}`); const ethProvider = getCustomProvider(rpcProviderEndpoint); - const ethBlock = await ethProvider.getBlock(blockHash); + const ethBlock = await ethProvider.getBlock(currentBlock.hash); assert(ethBlock.number === currentBlock.number); log('RPC endpoint working'); }; diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index 58275c49..1d2d2d50 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -63,9 +63,9 @@ export const processBlockByNumber = async ( }); if (!blocks.length) { - console.time('time:common#processBlockByNumber-postgraphile'); + console.time('time:common#processBlockByNumber-ipld-eth-server'); blocks = await indexer.getBlocks({ blockNumber }); - console.timeEnd('time:common#processBlockByNumber-postgraphile'); + console.timeEnd('time:common#processBlockByNumber-ipld-eth-server'); } if (blocks.length) { diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 07cde8ff..dc436c31 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -42,7 +42,6 @@ export interface UpstreamConfig { cache: CacheConfig, ethServer: { gqlApiEndpoint: string; - gqlPostgraphileEndpoint: string; rpcProviderEndpoint: string; blockDelayInMilliSecs: number; } @@ -83,7 +82,6 @@ export const getResetConfig = async (config: Config): Promise<{ serverConfig: ServerConfig, upstreamConfig: UpstreamConfig, ethClient: EthClient, - postgraphileClient: EthClient, ethProvider: BaseProvider }> => { const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config; @@ -92,21 +90,14 @@ export const getResetConfig = async (config: Config): Promise<{ assert(dbConfig, 'Missing database config'); assert(upstreamConfig, 'Missing upstream config'); - const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstreamConfig; + const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstreamConfig; assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint'); - assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint'); assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint'); const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint: gqlApiEndpoint, - gqlSubscriptionEndpoint: gqlPostgraphileEndpoint, - cache - }); - - const postgraphileClient = new EthClient({ - gqlEndpoint: gqlPostgraphileEndpoint, cache }); @@ -117,7 +108,6 @@ export const getResetConfig = async (config: Config): Promise<{ serverConfig, upstreamConfig, ethClient, - postgraphileClient, ethProvider }; }; diff --git a/packages/util/src/events.ts b/packages/util/src/events.ts index 9c14af6e..8a65cd5d 100644 --- a/packages/util/src/events.ts +++ b/packages/util/src/events.ts @@ -21,17 +21,15 @@ export const BlockProgressEvent = 'block-progress-event'; export class EventWatcher { _ethClient: EthClient - _postgraphileClient: EthClient _indexer: IndexerInterface _subscription?: ZenObservable.Subscription _pubsub: PubSub _jobQueue: JobQueue _upstreamConfig: UpstreamConfig - constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, postgraphileClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { + constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { this._upstreamConfig = upstreamConfig; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; this._indexer = indexer; this._pubsub = pubsub; this._jobQueue = jobQueue; diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index c8bbe755..c427b633 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -29,17 +29,15 @@ export interface ValueResult { export class Indexer { _db: DatabaseInterface; _ethClient: EthClient; - _postgraphileClient: EthClient; _getStorageAt: GetStorageAt; _ethProvider: ethers.providers.BaseProvider; _jobQueue: JobQueue; _watchedContracts: { [key: string]: ContractInterface } = {}; - constructor (db: DatabaseInterface, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { + constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { this._db = db; this._ethClient = ethClient; - this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._jobQueue = jobQueue; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); @@ -127,7 +125,7 @@ export class Indexer { async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise { assert(blockFilter.blockHash || blockFilter.blockNumber); - const result = await this._postgraphileClient.getBlocks(blockFilter); + const result = await this._ethClient.getBlocks(blockFilter); const { allEthHeaderCids: { nodes: blocks } } = result; if (!blocks.length) {