From 8c12f51819f67e2483089726d7cde89430dda3c6 Mon Sep 17 00:00:00 2001 From: awskii Date: Tue, 17 Dec 2024 23:38:10 +0000 Subject: [PATCH] Close cursor if it's opened by different tx (#12546) So we keep cursor interface opened inside `DomainRoTx`. If we then create new tx and try to read with it from `DomainRoTx`, value will be fetched via previously opened cursor which is not expected behaviour. I set stupid fix for that but i assume there could be better solutions. --------- Co-authored-by: alex.sharov --- .../rawtemporaldb/accessors_receipt_test.go | 6 +- core/vm/gas_table_test.go | 11 +- erigon-lib/kv/mdbx/kv_mdbx.go | 4 +- erigon-lib/state/aggregator_test.go | 207 ++++++++++++++++++ erigon-lib/state/domain.go | 40 +++- erigon-lib/state/domain_shared.go | 3 +- erigon-lib/state/domain_shared_test.go | 25 ++- erigon-lib/state/domain_test.go | 55 +++++ turbo/rpchelper/helper.go | 15 ++ 9 files changed, 346 insertions(+), 20 deletions(-) diff --git a/core/rawdb/rawtemporaldb/accessors_receipt_test.go b/core/rawdb/rawtemporaldb/accessors_receipt_test.go index 03f5e95155b..43dcca7249a 100644 --- a/core/rawdb/rawtemporaldb/accessors_receipt_test.go +++ b/core/rawdb/rawtemporaldb/accessors_receipt_test.go @@ -20,10 +20,11 @@ func TestAppendReceipt(t *testing.T) { require.NoError(err) defer tx.Rollback() - doms, err := state.NewSharedDomains(tx, log.New()) + ttx := tx.(kv.TemporalTx) + doms, err := state.NewSharedDomains(ttx, log.New()) require.NoError(err) defer doms.Close() - doms.SetTx(tx) + doms.SetTx(ttx) doms.SetTxNum(0) // block1 err = AppendReceipt(doms, &types.Receipt{CumulativeGasUsed: 10, FirstLogIndexWithinBlock: 0}, 0) @@ -48,7 +49,6 @@ func TestAppendReceipt(t *testing.T) { err = doms.Flush(context.Background(), tx) require.NoError(err) - ttx := tx.(kv.TemporalTx) v, ok, err := ttx.HistorySeek(kv.ReceiptDomain, FirstLogIndexKey, 0) require.NoError(err) require.True(ok) diff --git a/core/vm/gas_table_test.go b/core/vm/gas_table_test.go index fc33308a2eb..95677431ac5 100644 --- a/core/vm/gas_table_test.go +++ b/core/vm/gas_table_test.go @@ -22,9 +22,11 @@ package vm_test import ( "context" "errors" + "fmt" "math" "strconv" "testing" + "unsafe" "github.com/holiman/uint256" "github.com/stretchr/testify/require" @@ -195,12 +197,16 @@ func TestCreateGas(t *testing.T) { var txc wrap.TxContainer txc.Tx = tx - domains, err := state3.NewSharedDomains(tx, log.New()) + eface := *(*[2]uintptr)(unsafe.Pointer(&tx)) + fmt.Printf("init tx %x\n", eface[1]) + + domains, err := state3.NewSharedDomains(txc.Tx, log.New()) require.NoError(t, err) defer domains.Close() txc.Doms = domains - stateReader = rpchelper.NewLatestStateReader(tx) + //stateReader = rpchelper.NewLatestStateReader(domains) + stateReader = rpchelper.NewLatestDomainStateReader(domains) stateWriter = rpchelper.NewLatestStateWriter(txc, nil, 0) s := state.New(stateReader) @@ -230,5 +236,6 @@ func TestCreateGas(t *testing.T) { t.Errorf("test %d: gas used mismatch: have %v, want %v", i, gasUsed, tt.gasUsed) } tx.Rollback() + domains.Close() } } diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index beb06688d96..c70c671f0e9 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -1119,7 +1119,7 @@ func (tx *MdbxTx) stdCursor(bucket string) (kv.RwCursor, error) { if tx.toCloseMap == nil { tx.toCloseMap = make(map[uint64]kv.Closer) } - tx.toCloseMap[c.id] = c.c + tx.toCloseMap[c.id] = c return c, nil } @@ -1268,6 +1268,8 @@ func (c *MdbxCursor) Close() { } } +func (c *MdbxCursor) IsClosed() bool { return c.c == nil } + type MdbxDupSortCursor struct { *MdbxCursor } diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index b0c7dba7c98..a5d89ad3d57 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -53,6 +53,213 @@ import ( "github.com/stretchr/testify/require" ) +func TestAggregatorV3_Merge(t *testing.T) { + t.Parallel() + db, agg := testDbAndAggregatorv3(t, 10) + rwTx, err := db.BeginRwNosync(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + + ac := agg.BeginFilesRo() + defer ac.Close() + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + require.NoError(t, err) + defer domains.Close() + + txs := uint64(1000) + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + var ( + commKey1 = []byte("someCommKey") + commKey2 = []byte("otherCommKey") + ) + + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + var maxWrite, otherMaxWrite uint64 + for txNum := uint64(1); txNum <= txs; txNum++ { + domains.SetTxNum(txNum) + + addr, loc := make([]byte, length.Addr), make([]byte, length.Hash) + + n, err := rnd.Read(addr) + require.NoError(t, err) + require.EqualValues(t, length.Addr, n) + + n, err = rnd.Read(loc) + require.NoError(t, err) + require.EqualValues(t, length.Hash, n) + + buf := types.EncodeAccountBytesV3(1, uint256.NewInt(0), nil, 0) + err = domains.DomainPut(kv.AccountsDomain, addr, nil, buf, nil, 0) + require.NoError(t, err) + + err = domains.DomainPut(kv.StorageDomain, addr, loc, []byte{addr[0], loc[0]}, nil, 0) + require.NoError(t, err) + + var v [8]byte + binary.BigEndian.PutUint64(v[:], txNum) + if txNum%135 == 0 { + pv, step, err := domains.GetLatest(kv.CommitmentDomain, commKey2) + require.NoError(t, err) + + err = domains.DomainPut(kv.CommitmentDomain, commKey2, nil, v[:], pv, step) + require.NoError(t, err) + otherMaxWrite = txNum + } else { + pv, step, err := domains.GetLatest(kv.CommitmentDomain, commKey1) + require.NoError(t, err) + + err = domains.DomainPut(kv.CommitmentDomain, commKey1, nil, v[:], pv, step) + require.NoError(t, err) + maxWrite = txNum + } + require.NoError(t, err) + + } + + err = domains.Flush(context.Background(), rwTx) + require.NoError(t, err) + + require.NoError(t, err) + err = rwTx.Commit() + require.NoError(t, err) + rwTx = nil + + err = agg.BuildFiles(txs) + require.NoError(t, err) + + rwTx, err = db.BeginRw(context.Background()) + require.NoError(t, err) + defer rwTx.Rollback() + + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + stat, err := ac.Prune(context.Background(), rwTx, 0, logEvery) + require.NoError(t, err) + t.Logf("Prune: %s", stat) + + err = rwTx.Commit() + require.NoError(t, err) + + err = agg.MergeLoop(context.Background()) + require.NoError(t, err) + + // Check the history + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer roTx.Rollback() + + dc := agg.BeginFilesRo() + + v, _, ex, err := dc.GetLatest(kv.CommitmentDomain, commKey1, roTx) + require.NoError(t, err) + require.Truef(t, ex, "key %x not found", commKey1) + + require.EqualValues(t, maxWrite, binary.BigEndian.Uint64(v[:])) + + v, _, ex, err = dc.GetLatest(kv.CommitmentDomain, commKey2, roTx) + require.NoError(t, err) + require.Truef(t, ex, "key %x not found", commKey2) + dc.Close() + + require.EqualValues(t, otherMaxWrite, binary.BigEndian.Uint64(v[:])) +} + +func TestAggregatorV3_MergeValTransform(t *testing.T) { + t.Parallel() + db, agg := testDbAndAggregatorv3(t, 10) + rwTx, err := db.BeginRwNosync(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + ac := agg.BeginFilesRo() + defer ac.Close() + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + require.NoError(t, err) + defer domains.Close() + + txs := uint64(1000) + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + agg.commitmentValuesTransform = true + + state := make(map[string][]byte) + + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + //var maxWrite, otherMaxWrite uint64 + for txNum := uint64(1); txNum <= txs; txNum++ { + domains.SetTxNum(txNum) + + addr, loc := make([]byte, length.Addr), make([]byte, length.Hash) + + n, err := rnd.Read(addr) + require.NoError(t, err) + require.EqualValues(t, length.Addr, n) + + n, err = rnd.Read(loc) + require.NoError(t, err) + require.EqualValues(t, length.Hash, n) + + buf := types.EncodeAccountBytesV3(1, uint256.NewInt(txNum*1e6), nil, 0) + err = domains.DomainPut(kv.AccountsDomain, addr, nil, buf, nil, 0) + require.NoError(t, err) + + err = domains.DomainPut(kv.StorageDomain, addr, loc, []byte{addr[0], loc[0]}, nil, 0) + require.NoError(t, err) + + if (txNum+1)%agg.StepSize() == 0 { + _, err := domains.ComputeCommitment(context.Background(), true, txNum/10, "") + require.NoError(t, err) + } + + state[string(addr)] = buf + state[string(addr)+string(loc)] = []byte{addr[0], loc[0]} + } + + err = domains.Flush(context.Background(), rwTx) + require.NoError(t, err) + + err = rwTx.Commit() + require.NoError(t, err) + rwTx = nil + + err = agg.BuildFiles(txs) + require.NoError(t, err) + + ac.Close() + ac = agg.BeginFilesRo() + defer ac.Close() + + rwTx, err = db.BeginRwNosync(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + stat, err := ac.Prune(context.Background(), rwTx, 0, logEvery) + require.NoError(t, err) + t.Logf("Prune: %s", stat) + + err = rwTx.Commit() + require.NoError(t, err) + + err = agg.MergeLoop(context.Background()) + require.NoError(t, err) +} + func TestAggregatorV3_RestartOnDatadir(t *testing.T) { t.Parallel() //t.Skip() diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index b084d4d5794..4cc9ec20e77 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -654,7 +654,8 @@ type DomainRoTx struct { keyBuf [60]byte // 52b key and 8b for inverted step comBuf []byte - valsC kv.Cursor + valsC kv.Cursor + valCViewID uint64 // to make sure that valsC reading from the same view with given kv.Tx getFromFileCache *DomainGetFromFileCache } @@ -1638,6 +1639,7 @@ func (dt *DomainRoTx) Close() { if dt.files == nil { // invariant: it's safe to call Close multiple times return } + dt.closeValsCursor() files := dt.files dt.files = nil for i := range files { @@ -1705,12 +1707,46 @@ func (dt *DomainRoTx) statelessBtree(i int) *BtIndex { return r } -func (dt *DomainRoTx) valsCursor(tx kv.Tx) (c kv.Cursor, err error) { +var sdTxImmutabilityInvariant = errors.New("tx passed into ShredDomains is immutable") + +func (dt *DomainRoTx) closeValsCursor() { if dt.valsC != nil { dt.valsC.Close() + dt.valCViewID = 0 dt.valsC = nil + // dt.vcParentPtr.Store(0) + } +} + +type canCheckClosed interface { + IsClosed() bool +} + +func (dt *DomainRoTx) valsCursor(tx kv.Tx) (c kv.Cursor, err error) { + if dt.valsC != nil { // run in assert mode only + if asserts { + if tx.ViewID() != dt.valCViewID { + panic(fmt.Errorf("%w: DomainRoTx=%s cursor ViewID=%d; given tx.ViewID=%d", sdTxImmutabilityInvariant, dt.d.filenameBase, dt.valCViewID, tx.ViewID())) // cursor opened by different tx, invariant broken + } + if mc, ok := dt.valsC.(canCheckClosed); !ok && mc.IsClosed() { + panic(fmt.Sprintf("domainRoTx=%s cursor lives longer than Cursor (=> than tx opened that cursor)", dt.d.filenameBase)) + } + // if dt.d.largeValues { + // if mc, ok := dt.valsC.(*mdbx.MdbxCursor); ok && mc.IsClosed() { + // panic(fmt.Sprintf("domainRoTx=%s cursor lives longer than Cursor (=> than tx opened that cursor)", dt.d.filenameBase)) + // } + // } else { + // if mc, ok := dt.valsC.(*mdbx.MdbxDupSortCursor); ok && mc.IsClosed() { + // panic(fmt.Sprintf("domainRoTx=%s cursor lives longer than DupCursor (=> than tx opened that cursor)", dt.d.filenameBase)) + // } + // } + } + return dt.valsC, nil } + if asserts { + dt.valCViewID = tx.ViewID() + } if dt.d.largeValues { dt.valsC, err = tx.Cursor(dt.d.valuesTable) return dt.valsC, err diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index abd5ee0177a..6d79a7491f0 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -915,13 +915,14 @@ func (sd *SharedDomains) Flush(ctx context.Context, tx kv.RwTx) error { _, f, l, _ := runtime.Caller(1) fmt.Printf("[SD aggTx=%d] FLUSHING at tx %d [%x], caller %s:%d\n", sd.aggTx.id, sd.TxNum(), fh, filepath.Base(f), l) } - for _, w := range sd.domainWriters { + for di, w := range sd.domainWriters { if w == nil { continue } if err := w.Flush(ctx, tx); err != nil { return err } + sd.aggTx.d[di].closeValsCursor() } for _, w := range sd.iiWriters { if w == nil { diff --git a/erigon-lib/state/domain_shared_test.go b/erigon-lib/state/domain_shared_test.go index d1c9c49c73f..008454bc819 100644 --- a/erigon-lib/state/domain_shared_test.go +++ b/erigon-lib/state/domain_shared_test.go @@ -236,7 +236,8 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { ac = agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + wrwTx := WrapTxWithCtx(rwTx, ac) + domains, err := NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() @@ -266,7 +267,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize), iterCount(domains)) @@ -274,7 +275,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { { // delete marker is in RAM require.NoError(domains.Flush(ctx, rwTx)) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize), iterCount(domains)) @@ -304,7 +305,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-2), iterCount(domains)) @@ -325,7 +326,9 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { _, err := ac.Prune(ctx, rwTx, 0, nil) require.NoError(err) - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + + wrwTx = WrapTxWithCtx(rwTx, ac) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-2), iterCount(domains)) @@ -334,7 +337,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { { // delete/update more keys in RAM require.NoError(domains.Flush(ctx, rwTx)) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() @@ -354,7 +357,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-3), iterCount(domains)) @@ -364,7 +367,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() domains.SetTxNum(domains.TxNum() + 1) @@ -390,14 +393,15 @@ func TestSharedDomain_StorageIter(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + wtxRw := WrapTxWithCtx(rwTx, ac) + domains, err := NewSharedDomains(wtxRw, log.New()) require.NoError(t, err) defer domains.Close() maxTx := 3*stepSize + 10 hashes := make([][]byte, maxTx) - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wtxRw, log.New()) require.NoError(t, err) defer domains.Close() @@ -511,7 +515,6 @@ func TestSharedDomain_StorageIter(t *testing.T) { require.Zero(t, missed) require.Zero(t, notRemoved) } - fmt.Printf("deleted\n") err = domains.Flush(ctx, rwTx) require.NoError(t, err) diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 6840979f08b..ae5c32c7bc3 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -619,6 +619,61 @@ func TestDomain_ScanFiles(t *testing.T) { checkHistory(t, db, d, txs) } +func TestDomainRoTx_CursorParentCheck(t *testing.T) { + asserts = true + defer func() { asserts = false }() + + logger := log.New() + db, d := testDbAndDomain(t, logger) + ctx, require := context.Background(), require.New(t) + tx, err := db.BeginRw(ctx) + require.NoError(err) + defer tx.Rollback() + + dc := d.BeginFilesRo() + defer dc.Close() + writer := dc.NewWriter() + defer writer.close() + + val := []byte("value1") + writer.SetTxNum(1) + writer.addValue([]byte("key1"), nil, val) + + err = writer.Flush(ctx, tx) + require.NoError(err) + err = tx.Commit() + require.NoError(err) + + tx, err = db.BeginRw(ctx) + require.NoError(err) + defer tx.Rollback() + + _, _, _, err = dc.GetLatest([]byte("key1"), tx) + require.NoError(err) + + cursor, err := dc.valsCursor(tx) + require.NoError(err) + require.NotNil(cursor) + tx.Rollback() + + otherTx, err := db.BeginRw(ctx) + require.NoError(err) + defer otherTx.Rollback() + //dc.valsC.Close() + //dc.valsC = nil + + defer func() { + r := recover() + require.NotNil(r) + //re := r.(error) + //fmt.Println(re) + //require.ErrorIs(re, sdTxImmutabilityInvariant) + }() + + _, _, _, err = dc.GetLatest([]byte("key1"), otherTx) + require.NoError(err) +} + func TestDomain_Delete(t *testing.T) { t.Parallel() diff --git a/turbo/rpchelper/helper.go b/turbo/rpchelper/helper.go index 539255492a7..a544749b5aa 100644 --- a/turbo/rpchelper/helper.go +++ b/turbo/rpchelper/helper.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + state2 "github.com/erigontech/erigon-lib/state" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/kv" @@ -172,6 +173,20 @@ func CreateHistoryStateReader(tx kv.TemporalTx, txNumsReader rawdbv3.TxNumsReade return r, nil } +func NewLatestDomainStateReader(sd *state2.SharedDomains) state.StateReader { + return state.NewReaderV3(sd) +} + +func NewLatestDomainStateWriter(domains *state2.SharedDomains, blockReader services.FullBlockReader, blockNum uint64) state.StateWriter { + minTxNum, err := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(context.Background(), blockReader)).Min(domains.Tx(), blockNum) + if err != nil { + panic(err) + } + domains.SetTxNum(uint64(int(minTxNum) + /* 1 system txNum in beginning of block */ 1)) + return state.NewWriterV4(domains) + +} + func NewLatestStateReader(tx kv.Tx) state.StateReader { return state.NewReaderV3(tx.(kv.TemporalGetter)) }