From 0f9a7b4535bddb53cd9b1b94173fb689a2f833c1 Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Mon, 14 Nov 2022 14:23:58 +0530 Subject: [PATCH] Add flag to concurrently load relations (#392) --- .../uni-info-watcher/environments/local.toml | 3 + .../src/cli/checkpoint-cmds/create.ts | 2 +- .../uni-info-watcher/src/cli/export-state.ts | 2 +- .../uni-info-watcher/src/cli/import-state.ts | 2 +- .../uni-info-watcher/src/cli/inspect-cid.ts | 2 +- .../src/cli/reset-cmds/state.ts | 2 +- .../src/cli/reset-cmds/watcher.ts | 2 +- .../src/cli/watch-contract.ts | 2 +- packages/uni-info-watcher/src/common.ts | 1 + .../uni-info-watcher/src/custom-indexer.ts | 223 ++++++++++-------- packages/uni-info-watcher/src/database.ts | 7 +- packages/uni-info-watcher/src/fill.ts | 2 +- packages/uni-info-watcher/src/job-runner.ts | 2 +- packages/uni-info-watcher/src/resolvers.ts | 6 +- packages/uni-info-watcher/src/schema.ts | 5 + packages/uni-info-watcher/src/server.ts | 2 +- 16 files changed, 148 insertions(+), 117 deletions(-) diff --git a/packages/uni-info-watcher/environments/local.toml b/packages/uni-info-watcher/environments/local.toml index 4a6850e5..0b2f11d6 100644 --- a/packages/uni-info-watcher/environments/local.toml +++ b/packages/uni-info-watcher/environments/local.toml @@ -24,6 +24,9 @@ # Boolean to skip updating entity fields required in state creation and not required in the frontend. skipStateFieldsUpdate = false + # Boolean to load GQL query nested entity relations sequentially. + loadRelationsSequential = false + [metrics] host = "127.0.0.1" port = 9004 diff --git a/packages/uni-info-watcher/src/cli/checkpoint-cmds/create.ts b/packages/uni-info-watcher/src/cli/checkpoint-cmds/create.ts index b1bdccf3..b3cef253 100644 --- a/packages/uni-info-watcher/src/cli/checkpoint-cmds/create.ts +++ b/packages/uni-info-watcher/src/cli/checkpoint-cmds/create.ts @@ -35,7 +35,7 @@ export const handler = async (argv: any): Promise => { const config: Config = await getConfig(argv.configFile); const { ethClient, ethProvider } = await initClients(config); - const db = new Database(config.database); + const db = new Database(config.database, config.server); await db.init(); const jobQueueConfig = config.jobQueue; diff --git a/packages/uni-info-watcher/src/cli/export-state.ts b/packages/uni-info-watcher/src/cli/export-state.ts index 4452d179..12d4379a 100644 --- a/packages/uni-info-watcher/src/cli/export-state.ts +++ b/packages/uni-info-watcher/src/cli/export-state.ts @@ -45,7 +45,7 @@ const main = async (): Promise => { const config: Config = await getConfig(argv.configFile); const { ethClient, ethProvider } = await initClients(config); - const db = new Database(config.database); + const db = new Database(config.database, config.server); await db.init(); const jobQueueConfig = config.jobQueue; diff --git a/packages/uni-info-watcher/src/cli/import-state.ts b/packages/uni-info-watcher/src/cli/import-state.ts index 0e6e077c..ddd618a9 100644 --- a/packages/uni-info-watcher/src/cli/import-state.ts +++ b/packages/uni-info-watcher/src/cli/import-state.ts @@ -47,7 +47,7 @@ export const main = async (): Promise => { const config: Config = await getConfig(argv.configFile); const { ethClient, ethProvider } = await initClients(config); - const db = new Database(config.database); + const db = new Database(config.database, config.server); await db.init(); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. diff --git a/packages/uni-info-watcher/src/cli/inspect-cid.ts b/packages/uni-info-watcher/src/cli/inspect-cid.ts index 60c7eb1b..6b724629 100644 --- a/packages/uni-info-watcher/src/cli/inspect-cid.ts +++ b/packages/uni-info-watcher/src/cli/inspect-cid.ts @@ -41,7 +41,7 @@ const main = async (): Promise => { const config: Config = await getConfig(argv.configFile); const { ethClient, ethProvider } = await getResetConfig(config); - const db = new Database(config.database); + const db = new Database(config.database, config.server); await db.init(); const jobQueueConfig = config.jobQueue; diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts index 0ed58e20..fe25b294 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/state.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/state.ts @@ -25,7 +25,7 @@ export const handler = async (argv: any): Promise => { const config = await getConfig(argv.configFile); // Initialize database - const db = new Database(config.database); + const db = new Database(config.database, config.server); await db.init(); // Create a DB transaction diff --git a/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts b/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts index 2f16880b..581808a6 100644 --- a/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts +++ b/packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts @@ -31,7 +31,7 @@ export const handler = async (argv: any): Promise => { const { dbConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config); // Initialize database. - const db = new Database(dbConfig); + const db = new Database(dbConfig, config.server); await db.init(); const { diff --git a/packages/uni-info-watcher/src/cli/watch-contract.ts b/packages/uni-info-watcher/src/cli/watch-contract.ts index deba0efc..d6909084 100644 --- a/packages/uni-info-watcher/src/cli/watch-contract.ts +++ b/packages/uni-info-watcher/src/cli/watch-contract.ts @@ -57,7 +57,7 @@ import { Indexer } from '../indexer'; assert(dbConfig); - const db = new Database(dbConfig); + const db = new Database(dbConfig, config.server); await db.init(); assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/uni-info-watcher/src/common.ts b/packages/uni-info-watcher/src/common.ts index 05434a3a..b5f6db16 100644 --- a/packages/uni-info-watcher/src/common.ts +++ b/packages/uni-info-watcher/src/common.ts @@ -3,6 +3,7 @@ // import { Repository, DeepPartial } from 'typeorm'; +import _ from 'lodash'; export function getLatestEntityFromEntity (latestEntityRepo: Repository, entity: any): Entity { const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName); diff --git a/packages/uni-info-watcher/src/custom-indexer.ts b/packages/uni-info-watcher/src/custom-indexer.ts index a26c252b..829ee23d 100644 --- a/packages/uni-info-watcher/src/custom-indexer.ts +++ b/packages/uni-info-watcher/src/custom-indexer.ts @@ -330,118 +330,92 @@ export class CustomIndexer { const relationSelections = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value])); - for (const selection of relationSelections) { - assert(selection.kind === 'Field'); - const field = selection.name.value; - const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field]; - let childSelections = selection.selectionSet?.selections || []; - - // Filter out __typename field in GQL for loading relations. - childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename')); - - if (isDerived) { - const where: Where = { - [foreignKey]: [{ - value: entities.map((entity: any) => entity.id), - not: false, - operator: 'in' - }] - }; - - const relatedEntities = await this.getModelEntities( - queryRunner, - relationEntity, - relationsMap, - block, - where, - {}, - childSelections - ); - - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => { - // Related entity might be loaded with data. - const parentEntityId = entity[foreignKey].id ?? entity[foreignKey]; - - if (!acc[parentEntityId]) { - acc[parentEntityId] = []; - } + if (this._config.server.loadRelationsSequential) { + for (const selection of relationSelections) { + await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection); + } + } else { + const loadRelationPromises = relationSelections.map(async selection => { + await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection); + }); - if (acc[parentEntityId].length < DEFAULT_LIMIT) { - acc[parentEntityId].push(entity); - } + await Promise.all(loadRelationPromises); + } - return acc; - }, {}); + return entities; + } - entities.forEach((entity: any) => { - if (relatedEntitiesMap[entity.id]) { - entity[field] = relatedEntitiesMap[entity.id]; - } else { - entity[field] = []; - } - }); + async loadRelation ( + queryRunner: QueryRunner, + block: BlockHeight, + relationsMap: Map, + relations: { [key: string]: any }, + entities: Entity[], + selection: SelectionNode + ): Promise { + assert(selection.kind === 'Field'); + const field = selection.name.value; + const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field]; + let childSelections = selection.selectionSet?.selections || []; - continue; - } + // Filter out __typename field in GQL for loading relations. + childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename')); - if (isArray) { - const relatedIds = entities.reduce((acc: Set, entity: any) => { - entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId)); + if (isDerived) { + const where: Where = { + [foreignKey]: [{ + value: entities.map((entity: any) => entity.id), + not: false, + operator: 'in' + }] + }; - return acc; - }, new Set()); + const relatedEntities = await this.getModelEntities( + queryRunner, + relationEntity, + relationsMap, + block, + where, + {}, + childSelections + ); - const where: Where = { - id: [{ - value: Array.from(relatedIds), - not: false, - operator: 'in' - }] - }; + const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => { + // Related entity might be loaded with data. + const parentEntityId = entity[foreignKey].id ?? entity[foreignKey]; - const relatedEntities = await this.getModelEntities( - queryRunner, - relationEntity, - relationsMap, - block, - where, - {}, - childSelections - ); + if (!acc[parentEntityId]) { + acc[parentEntityId] = []; + } - entities.forEach((entity: any) => { - const relatedEntityIds: Set = entity[field].reduce((acc: Set, id: string) => { - acc.add(id); + if (acc[parentEntityId].length < DEFAULT_LIMIT) { + acc[parentEntityId].push(entity); + } - return acc; - }, new Set()); + return acc; + }, {}); + entities.forEach((entity: any) => { + if (relatedEntitiesMap[entity.id]) { + entity[field] = relatedEntitiesMap[entity.id]; + } else { entity[field] = []; + } + }); - relatedEntities.forEach((relatedEntity: any) => { - if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) { - entity[field].push(relatedEntity); - } - }); - }); - - continue; - } - - // field is neither an array nor derivedFrom + return; + } - // Avoid loading relation if selections only has id field. - if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') { - entities.forEach((entity: any) => { - entity[field] = { id: entity[field] }; - }); + if (isArray) { + const relatedIds = entities.reduce((acc: Set, entity: any) => { + entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId)); - continue; - } + return acc; + }, new Set()); const where: Where = { id: [{ - value: entities.map((entity: any) => entity[field]), + value: Array.from(relatedIds), not: false, operator: 'in' }] @@ -457,20 +431,65 @@ export class CustomIndexer { childSelections ); - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { - acc[entity.id] = entity; + entities.forEach((entity: any) => { + const relatedEntityIds: Set = entity[field].reduce((acc: Set, id: string) => { + acc.add(id); - return acc; - }, {}); + return acc; + }, new Set()); + + entity[field] = []; + + relatedEntities.forEach((relatedEntity: any) => { + if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) { + entity[field].push(relatedEntity); + } + }); + }); + + return; + } + // field is neither an array nor derivedFrom + + // Avoid loading relation if selections only has id field. + if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') { entities.forEach((entity: any) => { - if (relatedEntitiesMap[entity[field]]) { - entity[field] = relatedEntitiesMap[entity[field]]; - } + entity[field] = { id: entity[field] }; }); + + return; } - return entities; + const where: Where = { + id: [{ + value: entities.map((entity: any) => entity[field]), + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getModelEntities( + queryRunner, + relationEntity, + relationsMap, + block, + where, + {}, + childSelections + ); + + const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { + acc[entity.id] = entity; + + return acc; + }, {}); + + entities.forEach((entity: any) => { + if (relatedEntitiesMap[entity[field]]) { + entity[field] = relatedEntitiesMap[entity[field]]; + } + }); } buildQuery ( diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 52ae6536..b3b2738f 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -26,7 +26,8 @@ import { DatabaseInterface, BlockHeight, QueryOptions, - Where + Where, + ServerConfig } from '@cerc-io/util'; import { Database as GraphDatabase, ENTITY_QUERY_TYPE } from '@cerc-io/graph-node'; @@ -108,7 +109,7 @@ export class Database implements DatabaseInterface { _graphDatabase: GraphDatabase _relationsMap: Map - constructor (config: ConnectionOptions) { + constructor (config: ConnectionOptions, serverConfig: ServerConfig) { assert(config); const entitiesDir = path.join(__dirname, 'entity/*'); @@ -119,7 +120,7 @@ export class Database implements DatabaseInterface { }; this._baseDatabase = new BaseDatabase(this._config); - this._graphDatabase = new GraphDatabase(this.baseDatabase, ENTITY_QUERY_TYPE_MAP); + this._graphDatabase = new GraphDatabase(serverConfig, this.baseDatabase, ENTITY_QUERY_TYPE_MAP); this._relationsMap = new Map(); this._populateRelationsMap(); } diff --git a/packages/uni-info-watcher/src/fill.ts b/packages/uni-info-watcher/src/fill.ts index 84dd4653..fefd55d8 100644 --- a/packages/uni-info-watcher/src/fill.ts +++ b/packages/uni-info-watcher/src/fill.ts @@ -79,7 +79,7 @@ export const main = async (): Promise => { assert(dbConfig, 'Missing database config'); - const db = new Database(dbConfig); + const db = new Database(dbConfig, config.server); await db.init(); assert(upstream, 'Missing upstream config'); diff --git a/packages/uni-info-watcher/src/job-runner.ts b/packages/uni-info-watcher/src/job-runner.ts index f24f66c0..717fccdd 100644 --- a/packages/uni-info-watcher/src/job-runner.ts +++ b/packages/uni-info-watcher/src/job-runner.ts @@ -83,7 +83,7 @@ export const main = async (): Promise => { assert(dbConfig, 'Missing database config'); - const db = new Database(dbConfig); + const db = new Database(dbConfig, config.server); await db.init(); assert(upstream, 'Missing upstream config'); diff --git a/packages/uni-info-watcher/src/resolvers.ts b/packages/uni-info-watcher/src/resolvers.ts index 9e0ee75d..a0c94885 100644 --- a/packages/uni-info-watcher/src/resolvers.ts +++ b/packages/uni-info-watcher/src/resolvers.ts @@ -83,12 +83,14 @@ export const createResolvers = async (indexer: Indexer, customIndexer: CustomInd return indexer.getBundle(id, block); }, - bundles: async (_: any, { block = {}, first, skip }: { first: number, skip: number, block: BlockHeight }) => { + bundles: async ( + _: any, + { block = {}, first, skip, where = {} }: { first: number, skip: number, block: BlockHeight, where: { [key: string]: any } } + ) => { log('bundles', JSONbig.stringify({ block, first, skip })); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('bundles').inc(1); - let where = {}; if (!indexer._isDemo) { // Filter using address deployed on mainnet if not in demo mode where = { id: BUNDLE_ID }; diff --git a/packages/uni-info-watcher/src/schema.ts b/packages/uni-info-watcher/src/schema.ts index eb4a288c..e4e8d59f 100644 --- a/packages/uni-info-watcher/src/schema.ts +++ b/packages/uni-info-watcher/src/schema.ts @@ -401,6 +401,10 @@ enum OrderDirection { desc } +input Bundle_filter { + id: ID +} + input PoolDayData_filter { date_gt: Int pool: String @@ -581,6 +585,7 @@ type Query { bundles( first: Int = 100 skip: Int = 0 + where: Bundle_filter """ The block at which the query should be executed. Can either be an '{ number: diff --git a/packages/uni-info-watcher/src/server.ts b/packages/uni-info-watcher/src/server.ts index d1abf255..49bb29d1 100644 --- a/packages/uni-info-watcher/src/server.ts +++ b/packages/uni-info-watcher/src/server.ts @@ -53,7 +53,7 @@ export const main = async (): Promise => { assert(dbConfig, 'Missing database config'); - const db = new Database(dbConfig); + const db = new Database(dbConfig, config.server); await db.init(); assert(upstream, 'Missing upstream config');