Skip to content

Commit

Permalink
Add file mode indexer unit tests for watched address methods
Browse files Browse the repository at this point in the history
  • Loading branch information
prathamesh0 committed Apr 1, 2022
1 parent a7f9354 commit e752646
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 31 deletions.
1 change: 1 addition & 0 deletions statediff/indexer/database/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ var TestConfig = Config{
ID: "mockNodeID",
ClientName: "go-ethereum",
},
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
}
36 changes: 27 additions & 9 deletions statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath
}
log.Info("Writing watched addresses SQL statements to file", "file", filePath)
if _, err := os.Stat(watchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create watched addresses file, file (%s) already exists", watchedAddressesFilePath)
}
_, err = os.Create(watchedAddressesFilePath)
if err != nil {
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)

w := NewSQLWriter(file)
wg := new(sync.WaitGroup)
Expand Down Expand Up @@ -524,8 +531,24 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
return err
}

// get already watched addresses
var watchedAddresses []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt)
if err != nil {
return err
}

watchedAddresses = append(watchedAddresses, addressString)
}

// append statements for new addresses to existing statements
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, arg.Address) {
continue
}

stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
Expand Down Expand Up @@ -579,18 +602,13 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {

// loadWatchedAddressesStmts loads sql statements from the given file in a string slice
func loadWatchedAddressesStmts(filePath string) ([]string, error) {
// return emtpy if file does not exist
if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}

file, err := os.Open(filePath)
if err != nil {
return nil, err
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()

var stmts []string
stmts := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
Expand All @@ -607,7 +625,7 @@ func loadWatchedAddressesStmts(filePath string) ([]string, error) {
func dumpWatchedAddressesStmts(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file (%s): %v", filePath, err)
return fmt.Errorf("error creating watched addresses file: %v", err)
}
defer file.Close()

Expand Down
29 changes: 27 additions & 2 deletions statediff/indexer/database/file/indexer_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,43 @@ func setupLegacy(t *testing.T) {
}
}

func dumpData(t *testing.T) {
func dumpFileData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
require.NoError(t, err)

_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}

func dumpWatchedAddressesFileData(t *testing.T) {
resetDB(t)

sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)

_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}

func resetDB(t *testing.T) {
file.TearDownDB(t, sqlxdb)

connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}

func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)

err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err)

err = os.Remove(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)

err = sqlxdb.Close()
require.NoError(t, err)
}
Expand All @@ -106,7 +131,7 @@ func expectTrue(t *testing.T, value bool) {
func TestFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacy(t)
dumpData(t)
dumpFileData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
Expand Down
Loading

0 comments on commit e752646

Please sign in to comment.