Skip to content

Commit

Permalink
Implement watch address indexer methods for file mode
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Apr 1, 2022
1 parent 12b9f50 commit a7f9354
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 31 deletions.
3 changes: 2 additions & 1 deletion cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
switch dbType {
case shared.FILE:
indexerConfig = file.Config{
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}
case shared.POSTGRES:
driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name)
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ var (
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath,
configFileFlag,
}

Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath,
},
},
{
Expand Down
4 changes: 4 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,10 @@ var (
Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.",
Value: "./known_gaps.sql",
}
StateDiffWatchedAddressesFilePath = cli.StringFlag{
Name: "statediff.file.wapath",
Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode",
}
StateDiffDBClientNameFlag = cli.StringFlag{
Name: "statediff.db.clientname",
Usage: "Client name to use when writing state diffs to database",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/pganalyze/pg_query_go/v2 v2.1.0
github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1
github.com/rs/cors v1.7.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64=
Expand Down Expand Up @@ -516,6 +517,8 @@ github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChl
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
github.com/pganalyze/pg_query_go/v2 v2.1.0 h1:donwPZ4G/X+kMs7j5eYtKjdziqyOLVp3pkUrzb9lDl8=
github.com/pganalyze/pg_query_go/v2 v2.1.0/go.mod h1:XAxmVqz1tEGqizcQ3YSdN90vCOHBWjJi8URL1er5+cA=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 2 additions & 0 deletions statediff/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ This service introduces a CLI flag namespace `statediff`

`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode

`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode

The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)

e.g.
Expand Down
15 changes: 5 additions & 10 deletions statediff/indexer/database/dump/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,32 +497,27 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()
}

// LoadWatchedAddresses reads watched addresses from the database
// LoadWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// TODO implement
return nil, nil
}

// InsertWatchedAddresses inserts the given addresses in the database
// InsertWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement
return nil
}

// RemoveWatchedAddresses removes the given watched addresses from the database
// RemoveWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// TODO implement
return nil
}

// SetWatchedAddresses clears and inserts the given addresses in the database
// SetWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement
return nil
}

// ClearWatchedAddresses clears all the watched addresses from the database
// ClearWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
// TODO implement
return nil
}
5 changes: 3 additions & 2 deletions statediff/indexer/database/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (

// Config holds params for writing sql statements out to a file
type Config struct {
FilePath string
NodeInfo node.Info
FilePath string
NodeInfo node.Info
WatchedAddressesFilePath string
}

// Type satisfies interfaces.Config
Expand Down
158 changes: 140 additions & 18 deletions statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package file

import (
"bufio"
"context"
"errors"
"fmt"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -44,6 +47,9 @@ import (
)

const defaultFilePath = "./statediff.sql"
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"

const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"

var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}

Expand All @@ -57,6 +63,8 @@ type StateDiffIndexer struct {
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup

watchedAddressesFilePath string
}

// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
Expand All @@ -73,16 +81,24 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing statediff SQL statements to file", "file", filePath)

watchedAddressesFilePath := config.WatchedAddressesFilePath
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath
}
log.Info("Writing watched addresses SQL statements to file", "file", filePath)

w := NewSQLWriter(file)
wg := new(sync.WaitGroup)
w.Loop()
w.upsertNode(config.NodeInfo)
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
return &StateDiffIndexer{
fileWriter: w,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
fileWriter: w,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
watchedAddressesFilePath: watchedAddressesFilePath,
}, nil
}

Expand Down Expand Up @@ -479,32 +495,138 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close()
}

// LoadWatchedAddresses reads watched addresses from the database
// LoadWatchedAddresses loads watched addresses from a file
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// TODO implement
return nil, nil
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath)
if err != nil {
return nil, err
}

// extract addresses from the sql statements
watchedAddresses := []common.Address{}
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt)
if err != nil {
return nil, err
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
}

return watchedAddresses, nil
}

// InsertWatchedAddresses inserts the given addresses in the database
// InsertWatchedAddresses inserts the given addresses in a file
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement
return nil
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath)
if err != nil {
return err
}

// append statements for new addresses to existing statements
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}

return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts)
}

// RemoveWatchedAddresses removes the given watched addresses from the database
// RemoveWatchedAddresses removes the given watched addresses from a file
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// TODO implement
return nil
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath)
if err != nil {
return err
}

// get rid of statements having addresses to be removed
var updatedStmts []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt)
if err != nil {
return err
}

toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
return arg.Address == addressString
})

if !toRemove {
updatedStmts = append(updatedStmts, stmt)
}
}

return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, updatedStmts)
}

// SetWatchedAddresses clears and inserts the given addresses in the database
// SetWatchedAddresses clears and inserts the given addresses in a file
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement
return nil
var stmts []string
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}

return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts)
}

// ClearWatchedAddresses clears all the watched addresses from the database
// ClearWatchedAddresses clears all the watched addresses from a file
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
// TODO implement
return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, common.Big0)
}

// loadWatchedAddressesStmts loads sql statements from the given file in a string slice
func loadWatchedAddressesStmts(filePath string) ([]string, error) {
// return emtpy if file does not exist
if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}

file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()

var stmts []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error loading watched addresses: %v", err)
}

return stmts, nil
}

// dumpWatchedAddressesStmts dumps sql statements to the given file
func dumpWatchedAddressesStmts(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file (%s): %v", filePath, err)
}
defer file.Close()

for _, stmt := range stmts {
_, err := file.Write([]byte(stmt + "\n"))
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}

return nil
}

// parseWatchedAddressStmt parses given sql insert statement to extract the address argument
func parseWatchedAddressStmt(stmt string) (string, error) {
parseResult, err := pg_query.Parse(stmt)
if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err)
}

return parseResult.Stmts[0].Stmt.GetInsertStmt().SelectStmt.GetSelectStmt().ValuesLists[0].GetList().Items[0].GetAConst().GetVal().GetString_().Str, nil
}

0 comments on commit a7f9354

Please sign in to comment.