Skip to content

Commit

Permalink
Denormalize is_pruned flag in eden-watcher (#230)
Browse files Browse the repository at this point in the history
* Add is_pruned flag to all entities

* Mark entities as pruned once the corresponding block gets pruned
  • Loading branch information
nikugogoi authored Nov 16, 2022
1 parent 62c57d8 commit 7e5974c
Show file tree
Hide file tree
Showing 22 changed files with 153 additions and 11 deletions.
19 changes: 19 additions & 0 deletions packages/eden-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ import { SyncStatus } from './entity/SyncStatus';
import { StateSyncStatus } from './entity/StateSyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { State } from './entity/State';
import { Account } from './entity/Account';
import { Claim } from './entity/Claim';
import { Distribution } from './entity/Distribution';
import { Distributor } from './entity/Distributor';
import { Epoch } from './entity/Epoch';
import { Network } from './entity/Network';
import { Producer } from './entity/Producer';
import { ProducerEpoch } from './entity/ProducerEpoch';
import { ProducerRewardCollectorChange } from './entity/ProducerRewardCollectorChange';
import { ProducerSet } from './entity/ProducerSet';
import { ProducerSetChange } from './entity/ProducerSetChange';
import { RewardSchedule } from './entity/RewardSchedule';
import { RewardScheduleEntry } from './entity/RewardScheduleEntry';
import { Slash } from './entity/Slash';
import { Slot } from './entity/Slot';
import { SlotClaim } from './entity/SlotClaim';
import { Staker } from './entity/Staker';

export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]);

export class Database implements DatabaseInterface {
_config: ConnectionOptions;
Expand Down
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ export class Account {

@Column('numeric', { transformer: bigintTransformer })
totalSlashed!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ export class Block {

@Column('numeric', { nullable: true, transformer: bigintTransformer })
size!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export class Claim {

@Column('numeric', { transformer: bigintTransformer })
claimed!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Distribution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export class Distribution {

@Column('varchar')
metadataURI!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Distributor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ export class Distributor {

@Column('varchar', { nullable: true })
currentDistribution!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Epoch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class Epoch {

@Column('numeric', { default: 0, transformer: decimalTransformer })
producerBlocksRatio!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class Network {
// https://github.com/brianc/node-postgres/issues/1943#issuecomment-520500053
@Column('varchar', { transformer: bigintArrayTransformer, array: true })
stakedPercentiles!: bigint[];

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ export class Producer {

@Column('numeric', { transformer: bigintTransformer })
pendingEpochBlocks!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/ProducerEpoch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ export class ProducerEpoch {

@Column('numeric', { default: 0, transformer: decimalTransformer })
blocksProducedRatio!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ export class ProducerRewardCollectorChange {

@Column('varchar')
rewardCollector!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/ProducerSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ export class ProducerSet {

@Column('varchar', { array: true })
producers!: string[];

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/ProducerSetChange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ export class ProducerSetChange {
enum: ProducerSetChangeType
})
changeType!: ProducerSetChangeType;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/RewardSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ export class RewardSchedule {

@Column('varchar', { nullable: true })
activeRewardScheduleEntry!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/RewardScheduleEntry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ export class RewardScheduleEntry {

@Column('numeric', { transformer: bigintTransformer })
rewardsPerEpoch!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Slash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ export class Slash {

@Column('numeric', { transformer: bigintTransformer })
slashed!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Slot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class Slot {

@Column('numeric', { default: 0, transformer: decimalTransformer })
taxRatePerDay!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/SlotClaim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class SlotClaim {

@Column('numeric', { default: 0, transformer: decimalTransformer })
taxRatePerDay!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Staker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ export class Staker {

@Column('numeric', { nullable: true, transformer: bigintTransformer })
rank!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
6 changes: 4 additions & 2 deletions packages/eden-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node';

import { Database } from './database';
import { Database, ENTITIES } from './database';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
Expand Down Expand Up @@ -476,7 +476,9 @@ export class Indexer implements IndexerInterface {
}

async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks);
await this._baseIndexer.markBlocksAsPruned(blocks);

await this._graphWatcher.pruneEntities(blocks, ENTITIES);
}

async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
Expand Down
71 changes: 62 additions & 9 deletions packages/graph-node/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import {
Connection,
ConnectionOptions,
FindOneOptions,
In,
LessThanOrEqual,
QueryRunner,
Repository,
SelectQueryBuilder
SelectQueryBuilder,
UpdateResult
} from 'typeorm';
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer';
Expand Down Expand Up @@ -1032,6 +1034,10 @@ export class Database {
const entityValuePromises = entityFields.map(async (field: any) => {
const { propertyName } = field;

if (propertyName === 'isPruned') {
return undefined;
}

// Get blockHash property for db entry from block instance.
if (propertyName === 'blockHash') {
return block.blockHash;
Expand Down Expand Up @@ -1191,14 +1197,6 @@ export class Database {
prunedBlockHashes.forEach(blockHash => this.cachedEntities.frothyBlocks.delete(blockHash));
}

_measureCachedPrunedEntities () {
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);

log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
cachePrunedEntitiesCount.set(totalEntities);
}

async transformResults<Entity> (queryRunner: QueryRunner, qb: SelectQueryBuilder<Entity>, rawResults: any[]): Promise<any[]> {
const transformer = new RawSqlResultsToEntityTransformer(
qb.expressionMap,
Expand All @@ -1210,4 +1208,59 @@ export class Database {
assert(qb.expressionMap.mainAlias);
return transformer.transform(rawResults, qb.expressionMap.mainAlias);
}

async updateEntity<Entity> (queryRunner: QueryRunner, entity: new () => Entity, criteria: any, update: any): Promise<UpdateResult> {
const repo = queryRunner.manager.getRepository(entity);
return repo.createQueryBuilder()
.update()
.set(update)
.where(criteria)
.execute();
}

async pruneEntities (queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
// Assumption: all blocks are at same height
assert(blocks.length);
const blockNumber = blocks[0].blockNumber;
const blockHashes = blocks.map(block => block.blockHash);

// Get all entities at the block height
const entitiesAtBlock = await Promise.all(
[...entityTypes].map(entityType => {
return this._baseDatabase.getEntities(
queryRunner,
entityType as any,
{
select: ['id'] as any,
where: { blockNumber }
}
);
})
);

// Extract entity ids from result
const entityIds = entitiesAtBlock.map(entities => {
return entities.map((entity: any) => entity.id);
});

// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
const updatePromises = [...entityTypes].map((entity, index: number) => {
return this.updateEntity(
queryRunner,
entity as any,
{ id: In(entityIds[index]), blockHash: In(blockHashes) },
{ isPruned: true }
);
});

await Promise.all(updatePromises);
}

_measureCachedPrunedEntities () {
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);

log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
cachePrunedEntitiesCount.set(totalEntities);
}
}
14 changes: 14 additions & 0 deletions packages/graph-node/src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ export class GraphWatcher {
this._database.updateEntityCacheFrothyBlocks(blockProgress, this._indexer.serverConfig.clearEntitiesCacheInterval);
}

async pruneEntities (prunedBlocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
const dbTx = await this._database.createTransactionRunner();

try {
await this._database.pruneEntities(dbTx, prunedBlocks, entityTypes);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}

pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) {
this._database.pruneEntityCacheFrothyBlocks(canonicalBlockHash, canonicalBlockNumber);
}
Expand Down

0 comments on commit 7e5974c

Please sign in to comment.