Skip to content

Commit

Permalink
Use lateral query for time travel queries using latest entity tables (#…
Browse files Browse the repository at this point in the history
…386)

* Use lateral query for time travel queries using latest entity tables

* Fix where condition in lateral query
  • Loading branch information
nikugogoi authored Nov 10, 2022
1 parent e913d26 commit 311b2a8
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 148 deletions.
139 changes: 111 additions & 28 deletions packages/uni-info-watcher/src/custom-indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import { SelectionNode } from 'graphql';
import assert from 'assert';
import { QueryRunner, Repository, SelectQueryBuilder } from 'typeorm';
import { Brackets, QueryRunner, Repository, SelectQueryBuilder } from 'typeorm';

import { OPERATOR_MAP, Config, QueryOptions, Where } from '@cerc-io/util';
import { OPERATOR_MAP, Config, QueryOptions, Where, BlockHeight } from '@cerc-io/util';
import { ENTITY_QUERY_TYPE, resolveEntityFieldConflicts } from '@cerc-io/graph-node';

import { Indexer } from './indexer';
Expand Down Expand Up @@ -43,8 +43,9 @@ export class CustomIndexer {
this._indexer = indexer;
}

async getLatestEntities<Entity> (
async getEntities<Entity> (
entity: new () => Entity,
block: BlockHeight,
where: { [key: string]: any } = {},
queryOptions: QueryOptions,
selections: ReadonlyArray<SelectionNode> = []
Expand All @@ -59,7 +60,7 @@ export class CustomIndexer {
queryOptions.limit = DEFAULT_LIMIT;
}

res = await this.getEntities(dbTx, entity, this._db.relationsMap, where, queryOptions, selections);
res = await this.getModelEntities(dbTx, entity, this._db.relationsMap, block, where, queryOptions, selections);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
Expand All @@ -71,10 +72,11 @@ export class CustomIndexer {
return res;
}

async getEntities<Entity> (
async getModelEntities<Entity> (
queryRunner: QueryRunner,
entity: new () => Entity,
relationsMap: Map<any, { [key: string]: any }>,
block: BlockHeight,
where: Where = {},
queryOptions: QueryOptions = {},
selections: ReadonlyArray<SelectionNode> = []
Expand All @@ -83,47 +85,59 @@ export class CustomIndexer {
const latestEntity = entityToLatestEntityMap.get(entity);

if (latestEntity) {
// Use latest entity tables for Pool and Token.
entities = await this.getDBLatestEntities(
queryRunner,
entity,
latestEntity,
where,
queryOptions,
selections
);
if (Object.keys(block).length) {
// Use lateral query for entities with latest entity table.
entities = await this.getEntitiesLateral(
queryRunner,
entity,
latestEntity,
block,
where,
queryOptions
);
} else {
// Use latest entity tables if block height not passed.
entities = await this.getDBLatestEntities(
queryRunner,
entity,
latestEntity,
where,
queryOptions,
selections
);
}
} else {
// Use different suitable query patterns based on entities.
switch (ENTITY_QUERY_TYPE_MAP.get(entity)) {
case ENTITY_QUERY_TYPE.SINGULAR:
entities = await this._db.graphDatabase.getEntitiesSingular(queryRunner, entity, {}, where);
entities = await this._db.graphDatabase.getEntitiesSingular(queryRunner, entity, block, where);
break;

case ENTITY_QUERY_TYPE.UNIQUE:
entities = await this._db.graphDatabase.getEntitiesUnique(queryRunner, entity, {}, where, queryOptions);
entities = await this._db.graphDatabase.getEntitiesUnique(queryRunner, entity, block, where, queryOptions);
break;

case ENTITY_QUERY_TYPE.UNIQUE_WITHOUT_PRUNED:
entities = await this._db.graphDatabase.getEntitiesUniqueWithoutPruned(queryRunner, entity, {}, where, queryOptions);
entities = await this._db.graphDatabase.getEntitiesUniqueWithoutPruned(queryRunner, entity, block, where, queryOptions);
break;

case ENTITY_QUERY_TYPE.DISTINCT_ON:
entities = await this._db.graphDatabase.getEntitiesDistinctOn(queryRunner, entity, {}, where, queryOptions);
entities = await this._db.graphDatabase.getEntitiesDistinctOn(queryRunner, entity, block, where, queryOptions);
break;

case ENTITY_QUERY_TYPE.DISTINCT_ON_WITHOUT_PRUNED:
entities = await this._db.graphDatabase.getEntitiesDistinctOnWithoutPruned(queryRunner, entity, {}, where, queryOptions);
entities = await this._db.graphDatabase.getEntitiesDistinctOnWithoutPruned(queryRunner, entity, block, where, queryOptions);
break;

case ENTITY_QUERY_TYPE.GROUP_BY_WITHOUT_PRUNED:
// Use group by query if entity query type is not specified in map.
entities = await this._db.graphDatabase.getEntitiesGroupByWithoutPruned(queryRunner, entity, {}, where, queryOptions);
entities = await this._db.graphDatabase.getEntitiesGroupByWithoutPruned(queryRunner, entity, block, where, queryOptions);
break;

case ENTITY_QUERY_TYPE.GROUP_BY:
default:
// Use group by query if entity query type is not specified in map.
entities = await this._db.graphDatabase.getEntitiesGroupBy(queryRunner, entity, {}, where, queryOptions);
entities = await this._db.graphDatabase.getEntitiesGroupBy(queryRunner, entity, block, where, queryOptions);
break;
}
}
Expand All @@ -132,13 +146,77 @@ export class CustomIndexer {
return [];
}

entities = await this.loadLatestEntitiesRelations(queryRunner, relationsMap, entity, entities, selections);
entities = await this.loadLatestEntitiesRelations(queryRunner, block, relationsMap, entity, entities, selections);
// Resolve any field name conflicts in the entity result.
entities = entities.map(entity => resolveEntityFieldConflicts(entity));

return entities;
}

async getEntitiesLateral<Entity> (
queryRunner: QueryRunner,
entity: new () => Entity,
latestEntity: new () => any,
block: BlockHeight,
where: Where = {},
queryOptions: QueryOptions = {}
): Promise<Entity[]> {
const entityRepo = queryRunner.manager.getRepository(entity);
const latestEntityRepo = queryRunner.manager.getRepository(latestEntity);

let subQuery = entityRepo.createQueryBuilder('subTable')
.where('latest.id = subTable.id')
.orderBy('subTable.block_number', 'DESC')
.limit(1);

if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this._db.baseDatabase.getFrothyRegion(queryRunner, block.hash);

subQuery = subQuery
.andWhere(new Brackets(qb => {
qb.where('subTable.block_hash IN (:...blockHashes)', { blockHashes })
.orWhere('subTable.block_number <= :canonicalBlockNumber', { canonicalBlockNumber });
}));
}

if (block.number) {
subQuery = subQuery.andWhere('subTable.block_number <= :blockNumber', { blockNumber: block.number });
}

let selectQueryBuilder = latestEntityRepo.createQueryBuilder('latest')
.select('*')
.from(
qb => {
// https://stackoverflow.com/a/72026555/10026807
qb.getQuery = () => `LATERAL (${subQuery.getQuery()})`;
qb.setParameters(subQuery.getParameters());
return qb;
},
'result'
);

selectQueryBuilder = this.buildQuery(latestEntityRepo, selectQueryBuilder, where, 'latest');

if (queryOptions.orderBy) {
selectQueryBuilder = this.orderQuery(latestEntityRepo, selectQueryBuilder, queryOptions, 'latest');
}

selectQueryBuilder = this.orderQuery(latestEntityRepo, selectQueryBuilder, { ...queryOptions, orderBy: 'id' }, 'latest');

if (queryOptions.skip) {
selectQueryBuilder = selectQueryBuilder.offset(queryOptions.skip);
}

if (queryOptions.limit) {
selectQueryBuilder = selectQueryBuilder.limit(queryOptions.limit);
}

let entities = await selectQueryBuilder.getRawMany();
entities = await this._db.graphDatabase.transformResults(queryRunner, entityRepo.createQueryBuilder('subTable'), entities);

return entities as Entity[];
}

async getDBLatestEntities<Entity> (
queryRunner: QueryRunner,
entity: new () => Entity,
Expand Down Expand Up @@ -195,6 +273,7 @@ export class CustomIndexer {

async loadLatestEntitiesRelations<Entity> (
queryRunner: QueryRunner,
block: BlockHeight,
relationsMap: Map<any, { [key: string]: any }>,
entity: new () => Entity,
entities: Entity[],
Expand Down Expand Up @@ -225,10 +304,11 @@ export class CustomIndexer {
}]
};

const relatedEntities = await this.getEntities(
const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
Expand Down Expand Up @@ -275,10 +355,11 @@ export class CustomIndexer {
}]
};

const relatedEntities = await this.getEntities(
const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
Expand Down Expand Up @@ -322,10 +403,11 @@ export class CustomIndexer {
}]
};

const relatedEntities = await this.getEntities(
const relatedEntities = await this.getModelEntities(
queryRunner,
relationEntity,
relationsMap,
block,
where,
{},
childSelections
Expand All @@ -351,21 +433,22 @@ export class CustomIndexer {
repo: Repository<Entity>,
selectQueryBuilder: SelectQueryBuilder<Entity>,
where: Where = {},
alias: string
alias: string,
columnPrefix = ''
): SelectQueryBuilder<Entity> {
Object.entries(where).forEach(([field, filters]) => {
filters.forEach((filter, index) => {
// Form the where clause.
let { not, operator, value } = filter;
const columnMetadata = repo.metadata.findColumnWithPropertyName(field);
assert(columnMetadata);
let whereClause = `"${alias}"."${columnMetadata.databaseName}" `;
let whereClause = `"${alias}"."${columnPrefix}${columnMetadata.databaseName}" `;

if (columnMetadata.relationMetadata) {
// For relation fields, use the id column.
const idColumn = columnMetadata.relationMetadata.joinColumns.find(column => column.referencedColumn?.propertyName === 'id');
assert(idColumn);
whereClause = `"${alias}"."${idColumn.databaseName}" `;
whereClause = `"${alias}"."${columnPrefix}${idColumn.databaseName}" `;
}

if (not) {
Expand Down
4 changes: 4 additions & 0 deletions packages/uni-info-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ export class Database implements DatabaseInterface {
return this._relationsMap;
}

get baseDatabase (): BaseDatabase {
return this._baseDatabase;
}

get graphDatabase (): GraphDatabase {
return this._graphDatabase;
}
Expand Down
Loading

0 comments on commit 311b2a8

Please sign in to comment.