Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use server CLI from cli package #408

Merged
merged 2 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/erc20-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {
EventWatcher as BaseEventWatcher,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
JobQueue
JobQueue,
IndexerInterface
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';

Expand All @@ -23,12 +24,12 @@ export class EventWatcher {
_pubsub: PubSub
_jobQueue: JobQueue

constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient);
assert(indexer);

this._ethClient = ethClient;
this._indexer = indexer;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
Expand Down
7 changes: 4 additions & 3 deletions packages/erc20-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';

import { gqlTotalQueryCount, gqlQueryCount, ValueResult } from '@cerc-io/util';
import { gqlTotalQueryCount, gqlQueryCount, ValueResult, IndexerInterface, EventWatcherInterface } from '@cerc-io/util';

import { CONTRACT_KIND, Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:resolver');

export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
assert(indexer);
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

return {
BigInt: new BigInt('bigInt'),
Expand Down
89 changes: 4 additions & 85 deletions packages/erc20-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,12 @@
// Copyright 2021 Vulcanize, Inc.
//

import assert from 'assert';
import 'reflect-metadata';
import express, { Application } from 'express';
import { PubSub } from 'graphql-subscriptions';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import 'graphql-import-node';

import { getCache } from '@cerc-io/cache';
import {
KIND_ACTIVE,
DEFAULT_CONFIG_PATH,
createAndStartServer,
JobQueue,
getCustomProvider,
startGQLMetricsServer,
getConfig
} from '@cerc-io/util';
import { Config } from '@vulcanize/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { ServerCmd } from '@cerc-io/cli';

import typeDefs from './schema';

import { createResolvers as createMockResolvers } from './mock/resolvers';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
Expand All @@ -35,73 +17,10 @@ import { EventWatcher } from './events';
const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;

const config: Config = await getConfig(argv.f);

assert(config.server, 'Missing server config');

const { kind: watcherKind } = config.server;

const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;

assert(dbConfig, 'Missing database config');

const db = new Database(dbConfig);
await db.init();

assert(upstream, 'Missing upstream config');
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');

const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
cache
});

const ethProvider = getCustomProvider(rpcProviderEndpoint);

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();

assert(jobQueueConfig, 'Missing job queue config');

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });

const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();

const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);

if (watcherKind === KIND_ACTIVE) {
await jobQueue.start();
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();
}

const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);

// Create an Express app and server
const app: Application = express();
const server = createAndStartServer(app, typeDefs, resolvers, config.server);

await startGQLMetricsServer(config);
const serverCmd = new ServerCmd();
await serverCmd.init(Database, Indexer, EventWatcher);

return { app, server };
return process.env.MOCK ? serverCmd.exec(createMockResolvers, typeDefs) : serverCmd.exec(createResolvers, typeDefs);
};

main().then(() => {
Expand Down
1 change: 1 addition & 0 deletions packages/uni-info-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Use mode demo when running watcher locally.
# Mode demo whitelists all tokens so that entity values get updated.
mode = "prod"
kind = "active"

# Checkpointing state.
checkpointing = true
Expand Down
17 changes: 14 additions & 3 deletions packages/uni-info-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@ import debug from 'debug';
import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql';
import JSONbig from 'json-bigint';

import { BlockHeight, OrderDirection, getResultState, setGQLCacheHints, GraphDecimal, gqlQueryCount, gqlTotalQueryCount } from '@cerc-io/util';
import {
BlockHeight,
OrderDirection,
getResultState,
setGQLCacheHints,
GraphDecimal,
gqlQueryCount,
gqlTotalQueryCount,
IndexerInterface,
EventWatcherInterface
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { Burn } from './entity/Burn';
Expand Down Expand Up @@ -38,8 +48,9 @@ const log = debug('vulcanize:resolver');

export { BlockHeight };

export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
assert(indexer);
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

const gqlCacheConfig = indexer.serverConfig.gqlCache;

Expand Down
79 changes: 8 additions & 71 deletions packages/uni-info-watcher/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,16 @@
// Copyright 2021 Vulcanize, Inc.
//

import assert from 'assert';
import 'reflect-metadata';
import express, { Application } from 'express';
import { PubSub } from 'graphql-subscriptions';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import 'graphql-import-node';

import { ServerCmd } from '@cerc-io/cli';
import { Config } from '@vulcanize/util';
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
import { Client as UniClient } from '@vulcanize/uni-watcher';
import { Config } from '@vulcanize/util';
import { createAndStartServer, DEFAULT_CONFIG_PATH, JobQueue, getCustomProvider, startGQLMetricsServer, getConfig } from '@cerc-io/util';
import { getCache } from '@cerc-io/cache';
import { EthClient } from '@cerc-io/ipld-eth-client';

import typeDefs from './schema';

import { createResolvers as createMockResolvers } from './mock/resolvers';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
Expand All @@ -29,75 +21,20 @@ import { EventWatcher } from './events';
const log = debug('vulcanize:server');

export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv))
.option('f', {
alias: 'config-file',
demandOption: true,
describe: 'configuration file path (toml)',
type: 'string',
default: DEFAULT_CONFIG_PATH
})
.argv;

const config: Config = await getConfig(argv.f);

assert(config.server, 'Missing server config');
const serverCmd = new ServerCmd();

const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;

assert(dbConfig, 'Missing database config');

const db = new Database(dbConfig, config.server);
await db.init();

assert(upstream, 'Missing upstream config');
const config: Config = await serverCmd.initConfig();
const {
ethServer: {
gqlApiEndpoint,
rpcProviderEndpoint
},
uniWatcher,
tokenWatcher,
cache: cacheConfig
} = upstream;

assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');

const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
cache
});
tokenWatcher
} = config.upstream;

const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint);

assert(jobQueueConfig, 'Missing job queue config');

const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();

const indexer = new Indexer(config.server, db, { uniClient, erc20Client, ethClient }, ethProvider, jobQueue);

const pubSub = new PubSub();
const eventWatcher = new EventWatcher(ethClient, indexer, pubSub, jobQueue);
// Delete jobs to prevent creating jobs after completion of processing previous block.
await jobQueue.deleteAllJobs();
await eventWatcher.start();

const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);

// Create an Express app and server
const app: Application = express();
const server = createAndStartServer(app, typeDefs, resolvers, config.server);

await startGQLMetricsServer(config);
await serverCmd.init(Database, Indexer, EventWatcher, { uniClient, erc20Client });

return { app, server };
return process.env.MOCK ? serverCmd.exec(createMockResolvers, typeDefs) : serverCmd.exec(createResolvers, typeDefs);
};

main().then(() => {
Expand Down
1 change: 1 addition & 0 deletions packages/uni-watcher/environments/local.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[server]
host = "127.0.0.1"
port = 3003
kind = "active"

[metrics]
host = "127.0.0.1"
Expand Down
7 changes: 4 additions & 3 deletions packages/uni-watcher/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
JobQueue,
EventWatcherInterface
EventWatcherInterface,
IndexerInterface
} from '@cerc-io/util';

import { Indexer } from './indexer';
Expand All @@ -24,9 +25,9 @@ export class EventWatcher implements EventWatcherInterface {
_jobQueue: JobQueue
_baseEventWatcher: BaseEventWatcher

constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._ethClient = ethClient;
this._indexer = indexer;
this._indexer = indexer as Indexer;
this._pubsub = pubsub;
this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
Expand Down
13 changes: 11 additions & 2 deletions packages/uni-watcher/src/resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,23 @@ import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';

import { gqlTotalQueryCount, gqlQueryCount, ValueResult } from '@cerc-io/util';
import {
gqlTotalQueryCount,
gqlQueryCount,
ValueResult,
IndexerInterface,
EventWatcherInterface
} from '@cerc-io/util';

import { Indexer } from './indexer';
import { EventWatcher } from './events';

const log = debug('vulcanize:resolver');

export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
export const createResolvers = async (indexerArg: IndexerInterface, eventWatcherArg: EventWatcherInterface): Promise<any> => {
const indexer = indexerArg as Indexer;
const eventWatcher = eventWatcherArg as EventWatcher;

return {
BigInt: new BigInt('bigInt'),

Expand Down
Loading