Skip to content

Commit

Permalink
eth/filters: add global block logs cache (ethereum#25459)
Browse files Browse the repository at this point in the history
This adds a cache for block logs which is shared by all filters. The cache
size of is configurable using the `--cache.blocklogs` flag.

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
2 people authored and blakehhuynh committed Oct 7, 2022
1 parent 133f9dd commit d9c73d6
Show file tree
Hide file tree
Showing 21 changed files with 310 additions and 200 deletions.
31 changes: 13 additions & 18 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type SimulatedBackend struct {
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live
events *filters.EventSystem // for filtering log events live
filterSystem *filters.FilterSystem // for filtering database logs

config *params.ChainConfig

Expand All @@ -89,7 +90,11 @@ func NewSimulatedBackendWithDatabase(database ethdb.Database, alloc core.Genesis
blockchain: blockchain,
config: genesis.Config,
}
backend.events = filters.NewEventSystem(&filterBackend{database, blockchain, backend}, false)

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

backend.rollback(blockchain.CurrentBlock())
return backend
}
Expand Down Expand Up @@ -729,7 +734,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain, b}, *query.BlockHash, query.Addresses, query.Topics)
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaries to run from genesis to chain head
from := int64(0)
Expand All @@ -741,7 +746,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain, b}, from, to, query.Addresses, query.Topics)
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -867,7 +872,8 @@ type filterBackend struct {
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }

func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }

func (fb *filterBackend) HeaderByNumber(ctx context.Context, block rpc.BlockNumber) (*types.Header, error) {
Expand All @@ -893,19 +899,8 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
return rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config()), nil
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
number := rawdb.ReadHeaderNumber(fb.db, hash)
if number == nil {
return nil, nil
}
receipts := rawdb.ReadReceipts(fb.db, hash, *number, fb.bc.Config())
if receipts == nil {
return nil, nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
}
func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
logs := rawdb.ReadLogs(fb.db, hash, number, fb.bc.Config())
return logs, nil
}

Expand Down
11 changes: 9 additions & 2 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
override := ctx.Bool(utils.OverrideTerminalTotalDifficultyPassed.Name)
cfg.Eth.OverrideTerminalTotalDifficultyPassed = &override
}

backend, eth := utils.RegisterEthService(stack, &cfg.Eth)

// Warn users to migrate if they have a legacy freezer format.
if eth != nil && !ctx.IsSet(utils.IgnoreLegacyReceiptsFlag.Name) {
firstIdx := uint64(0)
Expand All @@ -181,10 +183,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
utils.Fatalf("Database has receipts with a legacy format. Please run `geth db freezer-migrate`.")
}
}
// Configure GraphQL if requested

// Configure log filter RPC API.
filterSystem := utils.RegisterFilterAPI(stack, backend, &cfg.Eth)

// Configure GraphQL if requested.
if ctx.IsSet(utils.GraphQLEnabledFlag.Name) {
utils.RegisterGraphQLService(stack, backend, cfg.Node)
utils.RegisterGraphQLService(stack, backend, filterSystem, &cfg.Node)
}

// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
utils.CacheSnapshotFlag,
utils.CacheNoPrefetchFlag,
utils.CachePreimagesFlag,
utils.CacheLogSizeFlag,
utils.FDLimitFlag,
utils.ListenPortFlag,
utils.DiscoveryPortFlag,
Expand Down
34 changes: 29 additions & 5 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
ethcatalyst "github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
pcsclite "github.com/gballet/go-libpcsclite"
gopsutil "github.com/shirou/gopsutil/mem"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -491,6 +493,12 @@ var (
Usage: "Enable recording the SHA3/keccak preimages of trie keys",
Category: flags.PerfCategory,
}
CacheLogSizeFlag = &cli.IntFlag{
Name: "cache.blocklogs",
Usage: "Size (in number of blocks) of the log cache for filtering",
Category: flags.PerfCategory,
Value: ethconfig.Defaults.FilterLogCacheSize,
}
FDLimitFlag = &cli.IntFlag{
Name: "fdlimit",
Usage: "Raise the open file descriptor resource limit (default = system fd limit)",
Expand Down Expand Up @@ -1808,6 +1816,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheSnapshotFlag.Name) {
cfg.SnapshotCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheSnapshotFlag.Name) / 100
}
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if !ctx.Bool(SnapshotFlag.Name) {
// If snap-sync is requested, this flag is also required
if cfg.SyncMode == downloader.SnapSync {
Expand Down Expand Up @@ -2005,21 +2016,34 @@ func RegisterEthService(stack *node.Node, cfg *ethconfig.Config) (ethapi.Backend
return backend.APIBackend, backend
}

// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
// the given node.
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
func RegisterEthStatsService(stack *node.Node, backend ethapi.Backend, url string) {
if err := ethstats.New(stack, backend, backend.Engine(), url); err != nil {
Fatalf("Failed to register the Ethereum Stats service: %v", err)
}
}

// RegisterGraphQLService is a utility function to construct a new service and register it against a node.
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.Config) {
if err := graphql.New(stack, backend, cfg.GraphQLCors, cfg.GraphQLVirtualHosts); err != nil {
// RegisterGraphQLService adds the GraphQL API to the node.
func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cfg *node.Config) {
err := graphql.New(stack, backend, filterSystem, cfg.GraphQLCors, cfg.GraphQLVirtualHosts)
if err != nil {
Fatalf("Failed to register the GraphQL service: %v", err)
}
}

// RegisterFilterAPI adds the eth log filtering RPC API to the node.
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, isLightClient),
}})
return filterSystem
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
Expand Down
14 changes: 2 additions & 12 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package eth
import (
"context"
"errors"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -202,17 +201,8 @@ func (b *EthAPIBackend) GetReceipts(ctx context.Context, hash common.Hash) (type
return b.eth.blockchain.GetReceiptsByHash(hash), nil
}

func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
db := b.eth.ChainDb()
number := rawdb.ReadHeaderNumber(db, hash)
if number == nil {
return nil, fmt.Errorf("failed to get block number for hash %#x", hash)
}
logs := rawdb.ReadLogs(db, hash, *number, b.eth.blockchain.Config())
if logs == nil {
return nil, fmt.Errorf("failed to get logs for block #%d (0x%s)", *number, hash.TerminalString())
}
return logs, nil
func (b *EthAPIBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
return rawdb.ReadLogs(b.eth.chainDb, hash, number, b.ChainConfig()), nil
}

func (b *EthAPIBackend) GetTd(ctx context.Context, hash common.Hash) *big.Int {
Expand Down
5 changes: 0 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
Expand Down Expand Up @@ -315,9 +313,6 @@ func (s *Ethereum) APIs() []rpc.API {
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.eventMux),
}, {
Namespace: "eth",
Service: filters.NewFilterAPI(s.APIBackend, false, 5*time.Minute),
}, {
Namespace: "admin",
Service: NewAdminAPI(s),
Expand Down
4 changes: 4 additions & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ var Defaults = Config{
TrieDirtyCache: 256,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
Miner: miner.Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Expand Down Expand Up @@ -171,6 +172,9 @@ type Config struct {
SnapshotCache int
Preimages bool

// This is the number of blocks for which logs will be cached in the filter system.
FilterLogCacheSize int

// Mining options
Miner miner.Config

Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,22 @@ type filter struct {
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type FilterAPI struct {
backend Backend
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
}

// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(backend Backend, lightMode bool, timeout time.Duration) *FilterAPI {
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
api := &FilterAPI{
backend: backend,
events: NewEventSystem(backend, lightMode),
sys: system,
events: NewEventSystem(system, lightMode),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
timeout: system.cfg.Timeout,
}
go api.timeoutLoop(timeout)
go api.timeoutLoop(system.cfg.Timeout)

return api
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
var filter *Filter
if crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *crit.BlockHash, crit.Addresses, crit.Topics)
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -332,7 +332,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
end = crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, crit.Addresses, crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -371,7 +371,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
var filter *Filter
if f.crit.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = NewBlockFilter(api.backend, *f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
} else {
// Convert the RPC block numbers into internal representations
begin := rpc.LatestBlockNumber.Int64()
Expand All @@ -383,7 +383,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
end = f.crit.ToBlock.Int64()
}
// Construct the range filter
filter = NewRangeFilter(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down
Loading

0 comments on commit d9c73d6

Please sign in to comment.