Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Denormalize is_pruned flag in eden-watcher #230

Merged
merged 2 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/eden-watcher/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ import { SyncStatus } from './entity/SyncStatus';
import { StateSyncStatus } from './entity/StateSyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { State } from './entity/State';
import { Account } from './entity/Account';
import { Claim } from './entity/Claim';
import { Distribution } from './entity/Distribution';
import { Distributor } from './entity/Distributor';
import { Epoch } from './entity/Epoch';
import { Network } from './entity/Network';
import { Producer } from './entity/Producer';
import { ProducerEpoch } from './entity/ProducerEpoch';
import { ProducerRewardCollectorChange } from './entity/ProducerRewardCollectorChange';
import { ProducerSet } from './entity/ProducerSet';
import { ProducerSetChange } from './entity/ProducerSetChange';
import { RewardSchedule } from './entity/RewardSchedule';
import { RewardScheduleEntry } from './entity/RewardScheduleEntry';
import { Slash } from './entity/Slash';
import { Slot } from './entity/Slot';
import { SlotClaim } from './entity/SlotClaim';
import { Staker } from './entity/Staker';

export const ENTITIES = new Set([Account, Claim, Distribution, Distributor, Epoch, Network, Producer, ProducerEpoch, ProducerRewardCollectorChange, ProducerSet, ProducerSetChange, RewardSchedule, RewardScheduleEntry, Slash, Slot, SlotClaim, Staker]);

export class Database implements DatabaseInterface {
_config: ConnectionOptions;
Expand Down
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ export class Account {

@Column('numeric', { transformer: bigintTransformer })
totalSlashed!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,7 @@ export class Block {

@Column('numeric', { nullable: true, transformer: bigintTransformer })
size!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Claim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export class Claim {

@Column('numeric', { transformer: bigintTransformer })
claimed!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Distribution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ export class Distribution {

@Column('varchar')
metadataURI!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Distributor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ export class Distributor {

@Column('varchar', { nullable: true })
currentDistribution!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Epoch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class Epoch {

@Column('numeric', { default: 0, transformer: decimalTransformer })
producerBlocksRatio!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class Network {
// https://github.com/brianc/node-postgres/issues/1943#issuecomment-520500053
@Column('varchar', { transformer: bigintArrayTransformer, array: true })
stakedPercentiles!: bigint[];

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ export class Producer {

@Column('numeric', { transformer: bigintTransformer })
pendingEpochBlocks!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/ProducerEpoch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ export class ProducerEpoch {

@Column('numeric', { default: 0, transformer: decimalTransformer })
blocksProducedRatio!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ export class ProducerRewardCollectorChange {

@Column('varchar')
rewardCollector!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/ProducerSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ export class ProducerSet {

@Column('varchar', { array: true })
producers!: string[];

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/ProducerSetChange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ export class ProducerSetChange {
enum: ProducerSetChangeType
})
changeType!: ProducerSetChangeType;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/RewardSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ export class RewardSchedule {

@Column('varchar', { nullable: true })
activeRewardScheduleEntry!: string;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/RewardScheduleEntry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ export class RewardScheduleEntry {

@Column('numeric', { transformer: bigintTransformer })
rewardsPerEpoch!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Slash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ export class Slash {

@Column('numeric', { transformer: bigintTransformer })
slashed!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Slot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class Slot {

@Column('numeric', { default: 0, transformer: decimalTransformer })
taxRatePerDay!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/SlotClaim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ export class SlotClaim {

@Column('numeric', { default: 0, transformer: decimalTransformer })
taxRatePerDay!: Decimal;

@Column('boolean', { default: false })
isPruned!: boolean
}
3 changes: 3 additions & 0 deletions packages/eden-watcher/src/entity/Staker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ export class Staker {

@Column('numeric', { nullable: true, transformer: bigintTransformer })
rank!: bigint;

@Column('boolean', { default: false })
isPruned!: boolean
}
6 changes: 4 additions & 2 deletions packages/eden-watcher/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node';

import { Database } from './database';
import { Database, ENTITIES } from './database';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
Expand Down Expand Up @@ -476,7 +476,9 @@ export class Indexer implements IndexerInterface {
}

async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks);
await this._baseIndexer.markBlocksAsPruned(blocks);

await this._graphWatcher.pruneEntities(blocks, ENTITIES);
}

async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
Expand Down
71 changes: 62 additions & 9 deletions packages/graph-node/src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import {
Connection,
ConnectionOptions,
FindOneOptions,
In,
LessThanOrEqual,
QueryRunner,
Repository,
SelectQueryBuilder
SelectQueryBuilder,
UpdateResult
} from 'typeorm';
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
import { RawSqlResultsToEntityTransformer } from 'typeorm/query-builder/transformer/RawSqlResultsToEntityTransformer';
Expand Down Expand Up @@ -1032,6 +1034,10 @@ export class Database {
const entityValuePromises = entityFields.map(async (field: any) => {
const { propertyName } = field;

if (propertyName === 'isPruned') {
return undefined;
}

// Get blockHash property for db entry from block instance.
if (propertyName === 'blockHash') {
return block.blockHash;
Expand Down Expand Up @@ -1191,14 +1197,6 @@ export class Database {
prunedBlockHashes.forEach(blockHash => this.cachedEntities.frothyBlocks.delete(blockHash));
}

_measureCachedPrunedEntities () {
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);

log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
cachePrunedEntitiesCount.set(totalEntities);
}

async transformResults<Entity> (queryRunner: QueryRunner, qb: SelectQueryBuilder<Entity>, rawResults: any[]): Promise<any[]> {
const transformer = new RawSqlResultsToEntityTransformer(
qb.expressionMap,
Expand All @@ -1210,4 +1208,59 @@ export class Database {
assert(qb.expressionMap.mainAlias);
return transformer.transform(rawResults, qb.expressionMap.mainAlias);
}

async updateEntity<Entity> (queryRunner: QueryRunner, entity: new () => Entity, criteria: any, update: any): Promise<UpdateResult> {
const repo = queryRunner.manager.getRepository(entity);
return repo.createQueryBuilder()
.update()
.set(update)
.where(criteria)
.execute();
}

async pruneEntities (queryRunner: QueryRunner, blocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
// Assumption: all blocks are at same height
assert(blocks.length);
const blockNumber = blocks[0].blockNumber;
const blockHashes = blocks.map(block => block.blockHash);

// Get all entities at the block height
const entitiesAtBlock = await Promise.all(
[...entityTypes].map(entityType => {
return this._baseDatabase.getEntities(
queryRunner,
entityType as any,
{
select: ['id'] as any,
where: { blockNumber }
}
);
})
);

// Extract entity ids from result
const entityIds = entitiesAtBlock.map(entities => {
return entities.map((entity: any) => entity.id);
});

// Update isPruned flag using fetched entity ids and hashes of blocks to be pruned
const updatePromises = [...entityTypes].map((entity, index: number) => {
return this.updateEntity(
queryRunner,
entity as any,
{ id: In(entityIds[index]), blockHash: In(blockHashes) },
{ isPruned: true }
);
});

await Promise.all(updatePromises);
}

_measureCachedPrunedEntities () {
const totalEntities = Array.from(this.cachedEntities.latestPrunedEntities.values())
.reduce((acc, idEntitiesMap) => acc + idEntitiesMap.size, 0);

log(`Total entities in cachedEntities.latestPrunedEntities map: ${totalEntities}`);
cachePrunedEntitiesCount.set(totalEntities);
}
}
14 changes: 14 additions & 0 deletions packages/graph-node/src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ export class GraphWatcher {
this._database.updateEntityCacheFrothyBlocks(blockProgress, this._indexer.serverConfig.clearEntitiesCacheInterval);
}

async pruneEntities (prunedBlocks: BlockProgressInterface[], entityTypes: Set<new () => any>) {
const dbTx = await this._database.createTransactionRunner();

try {
await this._database.pruneEntities(dbTx, prunedBlocks, entityTypes);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}

pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) {
this._database.pruneEntityCacheFrothyBlocks(canonicalBlockHash, canonicalBlockNumber);
}
Expand Down