Skip to content

Commit

Permalink
Add flag to concurrently load relations (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi authored Nov 14, 2022
1 parent 37b8a25 commit 0f9a7b4
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 117 deletions.
3 changes: 3 additions & 0 deletions packages/uni-info-watcher/environments/local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
# Boolean to skip updating entity fields required in state creation and not required in the frontend.
skipStateFieldsUpdate = false

# Boolean to load GQL query nested entity relations sequentially.
loadRelationsSequential = false

[metrics]
host = "127.0.0.1"
port = 9004
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);

const db = new Database(config.database);
const db = new Database(config.database, config.server);
await db.init();

const jobQueueConfig = config.jobQueue;
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/export-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const main = async (): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);

const db = new Database(config.database);
const db = new Database(config.database, config.server);
await db.init();

const jobQueueConfig = config.jobQueue;
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/import-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export const main = async (): Promise<any> => {
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);

const db = new Database(config.database);
const db = new Database(config.database, config.server);
await db.init();

// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/inspect-cid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const main = async (): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await getResetConfig(config);

const db = new Database(config.database);
const db = new Database(config.database, config.server);
await db.init();

const jobQueueConfig = config.jobQueue;
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/reset-cmds/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);

// Initialize database
const db = new Database(config.database);
const db = new Database(config.database, config.server);
await db.init();

// Create a DB transaction
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/reset-cmds/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const handler = async (argv: any): Promise<void> => {
const { dbConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config);

// Initialize database.
const db = new Database(dbConfig);
const db = new Database(dbConfig, config.server);
await db.init();

const {
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/cli/watch-contract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import { Indexer } from '../indexer';

assert(dbConfig);

const db = new Database(dbConfig);
const db = new Database(dbConfig, config.server);
await db.init();

assert(jobQueueConfig, 'Missing job queue config');
Expand Down
1 change: 1 addition & 0 deletions packages/uni-info-watcher/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//

import { Repository, DeepPartial } from 'typeorm';
import _ from 'lodash';

export function getLatestEntityFromEntity<Entity> (latestEntityRepo: Repository<Entity>, entity: any): Entity {
const latestEntityFields = latestEntityRepo.metadata.columns.map(column => column.propertyName);
Expand Down
223 changes: 121 additions & 102 deletions packages/uni-info-watcher/src/custom-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,118 +330,92 @@ export class CustomIndexer {

const relationSelections = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value]));

for (const selection of relationSelections) {
assert(selection.kind === 'Field');
const field = selection.name.value;
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field];
let childSelections = selection.selectionSet?.selections || [];

// Filter out __typename field in GQL for loading relations.
childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename'));

if (isDerived) {
const where: Where = {
[foreignKey]: [{
value: entities.map((entity: any) => entity.id),
not: false,
operator: 'in'
}]
};

const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);

const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => {
// Related entity might be loaded with data.
const parentEntityId = entity[foreignKey].id ?? entity[foreignKey];

if (!acc[parentEntityId]) {
acc[parentEntityId] = [];
}
if (this._config.server.loadRelationsSequential) {
for (const selection of relationSelections) {
await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection);
}
} else {
const loadRelationPromises = relationSelections.map(async selection => {
await this.loadRelation(queryRunner, block, relationsMap, relations, entities, selection);
});

if (acc[parentEntityId].length < DEFAULT_LIMIT) {
acc[parentEntityId].push(entity);
}
await Promise.all(loadRelationPromises);
}

return acc;
}, {});
return entities;
}

entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity.id]) {
entity[field] = relatedEntitiesMap[entity.id];
} else {
entity[field] = [];
}
});
async loadRelation<Entity> (
queryRunner: QueryRunner,
block: BlockHeight,
relationsMap: Map<any, { [key: string]: any }>,
relations: { [key: string]: any },
entities: Entity[],
selection: SelectionNode
): Promise<void> {
assert(selection.kind === 'Field');
const field = selection.name.value;
const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field];
let childSelections = selection.selectionSet?.selections || [];

continue;
}
// Filter out __typename field in GQL for loading relations.
childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename'));

if (isArray) {
const relatedIds = entities.reduce((acc: Set<string>, entity: any) => {
entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId));
if (isDerived) {
const where: Where = {
[foreignKey]: [{
value: entities.map((entity: any) => entity.id),
not: false,
operator: 'in'
}]
};

return acc;
}, new Set());
const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);

const where: Where = {
id: [{
value: Array.from(relatedIds),
not: false,
operator: 'in'
}]
};
const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => {
// Related entity might be loaded with data.
const parentEntityId = entity[foreignKey].id ?? entity[foreignKey];

const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);
if (!acc[parentEntityId]) {
acc[parentEntityId] = [];
}

entities.forEach((entity: any) => {
const relatedEntityIds: Set<string> = entity[field].reduce((acc: Set<string>, id: string) => {
acc.add(id);
if (acc[parentEntityId].length < DEFAULT_LIMIT) {
acc[parentEntityId].push(entity);
}

return acc;
}, new Set());
return acc;
}, {});

entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity.id]) {
entity[field] = relatedEntitiesMap[entity.id];
} else {
entity[field] = [];
}
});

relatedEntities.forEach((relatedEntity: any) => {
if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) {
entity[field].push(relatedEntity);
}
});
});

continue;
}

// field is neither an array nor derivedFrom
return;
}

// Avoid loading relation if selections only has id field.
if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') {
entities.forEach((entity: any) => {
entity[field] = { id: entity[field] };
});
if (isArray) {
const relatedIds = entities.reduce((acc: Set<string>, entity: any) => {
entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId));

continue;
}
return acc;
}, new Set());

const where: Where = {
id: [{
value: entities.map((entity: any) => entity[field]),
value: Array.from(relatedIds),
not: false,
operator: 'in'
}]
Expand All @@ -457,20 +431,65 @@ export class CustomIndexer {
childSelections
);

const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => {
acc[entity.id] = entity;
entities.forEach((entity: any) => {
const relatedEntityIds: Set<string> = entity[field].reduce((acc: Set<string>, id: string) => {
acc.add(id);

return acc;
}, {});
return acc;
}, new Set());

entity[field] = [];

relatedEntities.forEach((relatedEntity: any) => {
if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) {
entity[field].push(relatedEntity);
}
});
});

return;
}

// field is neither an array nor derivedFrom

// Avoid loading relation if selections only has id field.
if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') {
entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity[field]]) {
entity[field] = relatedEntitiesMap[entity[field]];
}
entity[field] = { id: entity[field] };
});

return;
}

return entities;
const where: Where = {
id: [{
value: entities.map((entity: any) => entity[field]),
not: false,
operator: 'in'
}]
};

const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
);

const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => {
acc[entity.id] = entity;

return acc;
}, {});

entities.forEach((entity: any) => {
if (relatedEntitiesMap[entity[field]]) {
entity[field] = relatedEntitiesMap[entity[field]];
}
});
}

buildQuery<Entity> (
Expand Down
7 changes: 4 additions & 3 deletions packages/uni-info-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import {
DatabaseInterface,
BlockHeight,
QueryOptions,
Where
Where,
ServerConfig
} from '@cerc-io/util';
import { Database as GraphDatabase, ENTITY_QUERY_TYPE } from '@cerc-io/graph-node';

Expand Down Expand Up @@ -108,7 +109,7 @@ export class Database implements DatabaseInterface {
_graphDatabase: GraphDatabase
_relationsMap: Map<any, { [key: string]: any }>

constructor (config: ConnectionOptions) {
constructor (config: ConnectionOptions, serverConfig: ServerConfig) {
assert(config);
const entitiesDir = path.join(__dirname, 'entity/*');

Expand All @@ -119,7 +120,7 @@ export class Database implements DatabaseInterface {
};

this._baseDatabase = new BaseDatabase(this._config);
this._graphDatabase = new GraphDatabase(this.baseDatabase, ENTITY_QUERY_TYPE_MAP);
this._graphDatabase = new GraphDatabase(serverConfig, this.baseDatabase, ENTITY_QUERY_TYPE_MAP);
this._relationsMap = new Map();
this._populateRelationsMap();
}
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/fill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export const main = async (): Promise<any> => {

assert(dbConfig, 'Missing database config');

const db = new Database(dbConfig);
const db = new Database(dbConfig, config.server);
await db.init();

assert(upstream, 'Missing upstream config');
Expand Down
2 changes: 1 addition & 1 deletion packages/uni-info-watcher/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export const main = async (): Promise<any> => {

assert(dbConfig, 'Missing database config');

const db = new Database(dbConfig);
const db = new Database(dbConfig, config.server);
await db.init();

assert(upstream, 'Missing upstream config');
Expand Down
Loading

0 comments on commit 0f9a7b4

Please sign in to comment.