Skip to content

Commit

Permalink
force GC, no cache during migration, auto heap profile
Browse files Browse the repository at this point in the history
  • Loading branch information
p0mvn committed Feb 18, 2022
1 parent 32f0265 commit 680cede
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 6 deletions.
69 changes: 68 additions & 1 deletion mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"bytes"
"crypto/sha256"
"fmt"
"os"
"runtime"
"runtime/pprof"
"sort"
"sync"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -534,6 +538,9 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
fastItr.Close()
}

// Force garbage collection before we proceed to enabling fast storage.
runtime.GC()

if err := tree.enableFastStorageAndCommit(); err != nil {
tree.ndb.storageVersion = defaultStorageVersionValue
return false, err
Expand All @@ -558,23 +565,83 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
}
}()

done := make(chan struct{})
defer func() {
fmt.Println("signaling done")
done <- struct{}{}
close(done)
fmt.Println("closed")
}()

go func () {
timer := time.NewTimer(time.Second)
defer func () {
if !timer.Stop() {
fmt.Println("stop")
<-timer.C
}
}()

var m runtime.MemStats

hasTakenHeapProfile := false

for {
// Sample the current memory usage
runtime.ReadMemStats(&m)

if m.Alloc > 4 * 1024 * 1024 * 1024 {
// If we are using more than 4GB of memory, we should trigger garbage collection
// to free up some memory.
fmt.Println("gc")
runtime.GC()
}

if !hasTakenHeapProfile && m.Alloc > 8 * 1024 * 1024 * 1024 {
// If we are using more than 8GB of memory, we should write a pprof sample
fmt.Println("pprof heap")
time := time.Now()
heapProfilePath := "/tmp/heap_profile" + time.String() + ".pprof"
heapFile, _ := os.Create(heapProfilePath)
pprof.WriteHeapProfile(heapFile)
heapFile.Close()
hasTakenHeapProfile = true
}

select {
case <-timer.C:
fmt.Println("reset")
timer.Reset(time.Second)
case <-done:
fmt.Println("done")
return
}
}
}()

itr := NewIterator(nil, nil, true, tree.ImmutableTree)
defer itr.Close()
for ; itr.Valid(); itr.Next() {
if err = tree.ndb.SaveFastNode(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
if err = tree.ndb.SaveFastNodeNoCache(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
fmt.Println("save error")
return err
}
}

fmt.Println("finished")

if err = itr.Error(); err != nil {
fmt.Println("itr error")
return err
}

if err = tree.ndb.setFastStorageVersionToBatch(); err != nil {
fmt.Println("save version error")
return err
}

if err = tree.ndb.Commit(); err != nil {
fmt.Println("commit error")
return err
}
return nil
Expand Down
18 changes: 13 additions & 5 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,18 @@ func (ndb *nodeDB) SaveNode(node *Node) {
ndb.cacheNode(node)
}

// SaveNode saves a FastNode to disk.
// SaveNode saves a FastNode to disk and add to cache.
func (ndb *nodeDB) SaveFastNode(node *FastNode) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.saveFastNodeUnlocked(node)
return ndb.saveFastNodeUnlocked(node, true)
}

// SaveNode saves a FastNode to disk without adding to cache.
func (ndb *nodeDB) SaveFastNodeNoCache(node *FastNode) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.saveFastNodeUnlocked(node, false)
}

// setFastStorageVersionToBatch sets storage version to fast where the version is
Expand Down Expand Up @@ -274,7 +281,7 @@ func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool {
}

// SaveNode saves a FastNode to disk.
func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode) error {
func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode, shouldAddToCache bool) error {
if node.key == nil {
return fmt.Errorf("FastNode cannot have a nil value for key")
}
Expand All @@ -290,8 +297,9 @@ func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode) error {
if err := ndb.batch.Set(ndb.fastNodeKey(node.key), buf.Bytes()); err != nil {
return fmt.Errorf("error while writing key/val to nodedb batch. Err: %w", err)
}
debug("BATCH SAVE %X %p\n", node.key, node)
ndb.cacheFastNode(node)
if shouldAddToCache {
ndb.cacheFastNode(node)
}
return nil
}

Expand Down

0 comments on commit 680cede

Please sign in to comment.