Skip to content

Commit

Permalink
Update latest entity tables on chain pruning (#387)
Browse files Browse the repository at this point in the history
* Add a table for entites in frothy region

* Update latest entity tables on chain pruning
  • Loading branch information
prathamesh0 authored Nov 10, 2022
1 parent 9d31def commit e913d26
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 22 deletions.
90 changes: 70 additions & 20 deletions packages/uni-info-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ import { StateSyncStatus } from './entity/StateSyncStatus';
import { Collect } from './entity/Collect';
import { Flash } from './entity/Flash';
import { TickHourData } from './entity/TickHourData';
import { FrothyEntity } from './entity/FrothyEntity';
import { entityToLatestEntityMap } from './custom-indexer';

const log = debug('vulcanize:database');

Expand Down Expand Up @@ -95,8 +97,8 @@ export const ENTITY_QUERY_TYPE_MAP = new Map<new() => any, ENTITY_QUERY_TYPE>([
[UniswapDayData, ENTITY_QUERY_TYPE.GROUP_BY_WITHOUT_PRUNED]
]);

const ENTITIES = [Bundle, Burn, Collect, Factory, Flash, Mint, Pool, PoolDayData, PoolHourData, Position, PositionSnapshot,
Swap, Tick, TickDayData, TickHourData, Transaction, UniswapDayData];
export const ENTITIES = new Set([Bundle, Burn, Collect, Factory, Flash, Mint, Pool, PoolDayData, PoolHourData, Position, PositionSnapshot,
Swap, Tick, TickDayData, TickHourData, Transaction, UniswapDayData]);

export class Database implements DatabaseInterface {
_config: ConnectionOptions
Expand Down Expand Up @@ -869,37 +871,66 @@ export class Database implements DatabaseInterface {
const blockHashes = blocks.map(block => block.blockHash);

// Get all entities at the block height
const entitiesAtBlock = await Promise.all(
ENTITIES.map(entity => {
return this.getEntities(
queryRunner,
entity as any,
{
select: ['id'] as any,
where: { blockNumber }
}
);
})
);
const entitiesAtHeight = await this.getEntities(queryRunner, FrothyEntity, { where: { blockNumber } });

// Extract entity ids from result
const entityIds = entitiesAtBlock.map(entities => {
return entities.map((entity: any) => entity.id);
});
const entityIdsMap: Map<string, string[]> = new Map();
entitiesAtHeight.forEach(entity =>
entityIdsMap.set(
entity.name,
[...entityIdsMap.get(entity.name) || [], entity.id]
)
);

// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
updatePromises.push(
...ENTITIES.map((entity, index: number) => {
[...ENTITIES].map((entity) => {
return this.updateEntity(
queryRunner,
entity as any,
{ id: In(entityIds[index]), blockHash: In(blockHashes) },
{ id: In(entityIdsMap.get(entity.name) || []), blockHash: In(blockHashes) },
{ isPruned: true }
);
}) as any
);

// Simultaneously update isPruned flag for all entities
await Promise.all(updatePromises);

// Update latest entity tables with canonical entries
await this.updateNonCanonicalLatestEntities(queryRunner, blockNumber, blockHashes);
}

async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise<void> {
// Update latest entity tables with canonical entries
await Promise.all(
Array.from(entityToLatestEntityMap.entries()).map(async ([entity, latestEntity]) => {
// Get entries for non canonical blocks
const nonCanonicalLatestEntities = await this.getEntities(queryRunner, latestEntity, { where: { blockHash: In(nonCanonicalBlockHashes) } });

await Promise.all(nonCanonicalLatestEntities.map(async (nonCanonicalLatestEntity: any) => {
// Get pruned version for the non canonical entity
const prunedVersion = await this.getLatestPrunedEntity(queryRunner, entity, nonCanonicalLatestEntity.id, blockNumber);

// If found, update the latestEntity entry for the id
// Else, delete the latestEntity entry for the id
if (prunedVersion) {
return this.updateEntity(
queryRunner,
latestEntity,
{ id: nonCanonicalLatestEntity.id },
prunedVersion
);
} else {
return this.removeEntities(
queryRunner,
latestEntity,
{ where: { id: nonCanonicalLatestEntity.id } }
);
}
}));
})
);
}

async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
Expand Down Expand Up @@ -929,7 +960,26 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
}

async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
async getLatestPrunedEntity<Entity> (queryRunner: QueryRunner, entity: new () => Entity, id: string, canonicalBlockNumber: number): Promise<Entity | undefined> {
// Fetch the latest canonical entity for given id
const repo = queryRunner.manager.getRepository(entity);
const entityInPrunedRegion = await repo.createQueryBuilder('entity')
.where('entity.id = :id', { id })
.andWhere('entity.is_pruned = false')
.andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('entity.block_number', 'DESC')
.limit(1)
.getOne();

return entityInPrunedRegion;
}

async pruneFrothyEntities (queryRunner: QueryRunner, blockNumber: number): Promise<void> {
// Remove frothy entity entries at the prune block height
return this.removeEntities(queryRunner, FrothyEntity, { where: { blockNumber: LessThanOrEqual(blockNumber) } });
}

async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity>): Promise<void> {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}

Expand Down
21 changes: 21 additions & 0 deletions packages/uni-info-watcher/src/entity/FrothyEntity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//
// Copyright 2022 Vulcanize, Inc.
//

import { Entity, PrimaryColumn, Column, Index } from 'typeorm';

@Entity()
@Index(['blockNumber'])
export class FrothyEntity {
@PrimaryColumn('varchar')
id!: string;

@PrimaryColumn('varchar')
name!: string;

@PrimaryColumn('varchar', { length: 66 })
blockHash!: string;

@Column('integer')
blockNumber!: number;
}
29 changes: 27 additions & 2 deletions packages/uni-info-watcher/src/entity/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent }
import _ from 'lodash';

import { entityToLatestEntityMap } from '../custom-indexer';
import { ENTITIES } from '../database';
import { FrothyEntity } from './FrothyEntity';

@EventSubscriber()
export class EntitySubscriber implements EntitySubscriberInterface {
Expand All @@ -19,10 +21,33 @@ export class EntitySubscriber implements EntitySubscriberInterface {
}

const afterInsertOrUpdate = async (event: InsertEvent<any> | UpdateEvent<any>): Promise<void> => {
// Get latest entity's type
const entity = event.entity;
const entityTarget = entityToLatestEntityMap.get(entity.constructor);

// Return if the entity is being pruned
if (entity.isPruned) {
return;
}

// Insert the entity details in FrothyEntity table
if (ENTITIES.has(entity.constructor)) {
const frothyEntity = event.manager.create(
FrothyEntity,
{
..._.pick(entity, ['id', 'blockHash', 'blockNumber']),
...{ name: entity.constructor.name }
}
);

await event.manager.createQueryBuilder()
.insert()
.into(FrothyEntity)
.values(frothyEntity)
.orIgnore()
.execute();
}

// Get latest entity's type
const entityTarget = entityToLatestEntityMap.get(entity.constructor);
if (!entityTarget) {
return;
}
Expand Down
17 changes: 17 additions & 0 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,20 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.markBlocksAsPruned(blocks);
}

async pruneFrothyEntities (blockNumber: number): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
await this._db.pruneFrothyEntities(dbTx, blockNumber);

dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}

async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
}
Expand Down Expand Up @@ -549,8 +563,11 @@ export class Indexer implements IndexerInterface {

async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const syncStatus = await this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);

this._db.pruneEntityCacheFrothyBlocks(syncStatus.latestCanonicalBlockHash, syncStatus.latestCanonicalBlockNumber);

await this.pruneFrothyEntities(blockNumber);

return syncStatus;
}

Expand Down

0 comments on commit e913d26

Please sign in to comment.