Skip to content

Commit

Permalink
Refactor code to use methods from watcher-ts
Browse files Browse the repository at this point in the history
  • Loading branch information
nikugogoi committed Nov 17, 2022
1 parent 0521a0a commit 36ef405
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 165 deletions.
11 changes: 0 additions & 11 deletions packages/uni-info-watcher/src/common.ts

This file was deleted.

95 changes: 2 additions & 93 deletions packages/uni-info-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import { Collect } from './entity/Collect';
import { Flash } from './entity/Flash';
import { TickHourData } from './entity/TickHourData';
import { FrothyEntity } from './entity/FrothyEntity';
import { getLatestEntityFromEntity } from './common';
import { LatestPool } from './entity/LatestPool';
import { LatestToken } from './entity/LatestToken';
import { LatestUniswapDayData } from './entity/LatestUniswapDayData';
Expand Down Expand Up @@ -881,85 +880,9 @@ export class Database implements DatabaseInterface {

async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);
await this._baseDatabase.markBlocksAsPruned(repo, blocks);

const updatePromises: Promise<void>[] = [];
updatePromises.push(this._baseDatabase.markBlocksAsPruned(repo, blocks));

// Assumption: all blocks are at same height
assert(blocks.length);
const blockNumber = blocks[0].blockNumber;
const blockHashes = blocks.map(block => block.blockHash);

// Get all entities at the block height
const entitiesAtHeight = await this.getEntities(queryRunner, FrothyEntity, { where: { blockNumber } });

// Extract entity ids from result
const entityIdsMap: Map<string, string[]> = new Map();
entitiesAtHeight.forEach(entity =>
entityIdsMap.set(
entity.name,
[...entityIdsMap.get(entity.name) || [], entity.id]
)
);

// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
updatePromises.push(
[...ENTITIES].map((entityType) => {
return this.updateEntity(
queryRunner,
entityType as any,
{ id: In(entityIdsMap.get(entityType.name) || []), blockHash: In(blockHashes) },
{ isPruned: true }
);
}) as any
);

// Simultaneously update isPruned flag for all entities
await Promise.all(updatePromises);

// Update latest entity tables with canonical entries
await this.updateNonCanonicalLatestEntities(queryRunner, blockNumber, blockHashes);
}

async updateNonCanonicalLatestEntities (queryRunner: QueryRunner, blockNumber: number, nonCanonicalBlockHashes: string[]): Promise<void> {
// Update latest entity tables with canonical entries
await Promise.all(
Array.from(ENTITY_TO_LATEST_ENTITY_MAP.entries()).map(async ([entityType, latestEntityType]) => {
// Get entries for non canonical blocks
const nonCanonicalLatestEntities = await this.getEntities(queryRunner, latestEntityType, { where: { blockHash: In(nonCanonicalBlockHashes) } });

// Canonicalize latest entity table at given block height
await this.canonicalizeLatestEntity(queryRunner, entityType, latestEntityType, nonCanonicalLatestEntities, blockNumber);
})
);
}

async canonicalizeLatestEntity (queryRunner: QueryRunner, entityType: any, latestEntityType: any, entities: any[], blockNumber: number): Promise<void> {
await Promise.all(entities.map(async (entity: any) => {
// Get latest pruned (canonical) version for the given entity
const prunedVersion = await this.getLatestPrunedEntity(queryRunner, entityType, entity.id, blockNumber);

// If found, update the latestEntity entry for the id
// Else, delete the latestEntity entry for the id
if (prunedVersion) {
// Create a latest entity instance and insert in the db
const latestEntityRepo = queryRunner.manager.getRepository(latestEntityType);
const latestEntity = getLatestEntityFromEntity(latestEntityRepo, prunedVersion);

await this.updateEntity(
queryRunner,
latestEntityType,
{ id: entity.id },
latestEntity
);
} else {
await this.removeEntities(
queryRunner,
latestEntityType,
{ where: { id: entity.id } }
);
}
}));
await this._graphDatabase.pruneEntities(FrothyEntity, queryRunner, blocks, ENTITIES);
}

async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
Expand Down Expand Up @@ -989,20 +912,6 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getEntities(queryRunner, entity, findConditions);
}

async getLatestPrunedEntity<Entity> (queryRunner: QueryRunner, entity: new () => Entity, id: string, canonicalBlockNumber: number): Promise<Entity | undefined> {
// Fetch the latest canonical entity for given id
const repo = queryRunner.manager.getRepository(entity);
const entityInPrunedRegion = await repo.createQueryBuilder('entity')
.where('entity.id = :id', { id })
.andWhere('entity.is_pruned = false')
.andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('entity.block_number', 'DESC')
.limit(1)
.getOne();

return entityInPrunedRegion;
}

async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity>): Promise<void> {
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
}
Expand Down
55 changes: 4 additions & 51 deletions packages/uni-info-watcher/src/entity/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,18 @@
import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } from 'typeorm';
import _ from 'lodash';

import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node';

import { FrothyEntity } from './FrothyEntity';
import { ENTITIES, ENTITY_TO_LATEST_ENTITY_MAP } from '../database';
import { getLatestEntityFromEntity } from '../common';

@EventSubscriber()
export class EntitySubscriber implements EntitySubscriberInterface {
async afterInsert (event: InsertEvent<any>): Promise<void> {
await afterInsertOrUpdate(event);
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
}

async afterUpdate (event: UpdateEvent<any>): Promise<void> {
await afterInsertOrUpdate(event);
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
}
}

const afterInsertOrUpdate = async (event: InsertEvent<any> | UpdateEvent<any>): Promise<void> => {
const entity = event.entity;

// Return if the entity is being pruned
if (entity.isPruned) {
return;
}

// Insert the entity details in FrothyEntity table
if (ENTITIES.has(entity.constructor)) {
const frothyEntity = event.manager.create(
FrothyEntity,
{
..._.pick(entity, ['id', 'blockHash', 'blockNumber']),
...{ name: entity.constructor.name }
}
);

await event.manager.createQueryBuilder()
.insert()
.into(FrothyEntity)
.values(frothyEntity)
.orIgnore()
.execute();
}

// Get latest entity's type
const entityTarget = ENTITY_TO_LATEST_ENTITY_MAP.get(entity.constructor);
if (!entityTarget) {
return;
}

// Get latest entity's fields to be updated
const latestEntityRepo = event.manager.getRepository(entityTarget);
const fieldsToUpdate = latestEntityRepo.metadata.columns.map(column => column.databaseName).filter(val => val !== 'id');

// Create a latest entity instance and upsert in the db
const latestEntity = getLatestEntityFromEntity(latestEntityRepo, entity);
await event.manager.createQueryBuilder()
.insert()
.into(entityTarget)
.values(latestEntity as any)
.orUpdate(
{ conflict_target: ['id'], overwrite: fieldsToUpdate }
)
.execute();
};
12 changes: 2 additions & 10 deletions packages/uni-info-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { updatePoolDayData, updatePoolHourData, updateTickDayData, updateTokenDa
import { convertTokenToDecimal, loadFactory, loadTransaction, safeDiv, Block } from './utils';
import { createTick, feeTierToTickSpacing } from './utils/tick';
import { ADDRESS_ZERO, FACTORY_ADDRESS, FIRST_GRAFT_BLOCK, WATCHED_CONTRACTS } from './utils/constants';
import { Database, DEFAULT_LIMIT, ENTITIES, ENTITY_TO_LATEST_ENTITY_MAP } from './database';
import { Database, DEFAULT_LIMIT, ENTITIES } from './database';
import { Event } from './entity/Event';
import { ResultEvent, Transaction, PoolCreatedEvent, InitializeEvent, MintEvent, BurnEvent, SwapEvent, IncreaseLiquidityEvent, DecreaseLiquidityEvent, CollectEvent, TransferEvent, FlashEvent } from './events';
import { Factory } from './entity/Factory';
Expand Down Expand Up @@ -527,15 +527,7 @@ export class Indexer implements IndexerInterface {
async resetLatestEntities (blockNumber: number): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
await Promise.all(
Array.from(ENTITY_TO_LATEST_ENTITY_MAP.entries()).map(async ([entityType, latestEntityType]) => {
// Get entries above the reset block
const entitiesToReset = await this._db.getEntities(dbTx, latestEntityType, { where: { blockNumber: MoreThan(blockNumber) } });

// Canonicalize latest entity table at the reset block height
await this._db.canonicalizeLatestEntity(dbTx, entityType, latestEntityType, entitiesToReset, blockNumber);
})
);
this._db.graphDatabase.resetLatestEntities(dbTx, blockNumber);

dbTx.commitTransaction();
} catch (error) {
Expand Down

0 comments on commit 36ef405

Please sign in to comment.