Skip to content

Commit

Permalink
force GC, no cache during migration, auto heap profile
Browse files Browse the repository at this point in the history
  • Loading branch information
p0mvn committed Feb 18, 2022
1 parent 32f0265 commit f3fe49e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 6 deletions.
57 changes: 56 additions & 1 deletion mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import (
"bytes"
"crypto/sha256"
"fmt"
"os"
"runtime"
"runtime/pprof"
"sort"
"sync"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -534,6 +538,9 @@ func (tree *MutableTree) enableFastStorageAndCommitIfNotEnabled() (bool, error)
fastItr.Close()
}

// Force garbage collection before we proceed to enabling fast storage.
runtime.GC()

if err := tree.enableFastStorageAndCommit(); err != nil {
tree.ndb.storageVersion = defaultStorageVersionValue
return false, err
Expand All @@ -558,10 +565,58 @@ func (tree *MutableTree) enableFastStorageAndCommit() error {
}
}()

done := make(chan struct{})
defer func() {
done <- struct{}{}
close(done)
}()

go func () {
timer := time.NewTimer(time.Second)
defer func () {
if !timer.Stop() {
<-timer.C
}
}()

var m runtime.MemStats

hasTakenHeapProfile := false

for {
// Sample the current memory usage
runtime.ReadMemStats(&m)

if m.Alloc > 4 * 1024 * 1024 * 1024 {
// If we are using more than 4GB of memory, we should trigger garbage collection
// to free up some memory.
runtime.GC()
return
}

if !hasTakenHeapProfile && m.Alloc > 8 * 1024 * 1024 * 1024 {
// If we are using more than 8GB of memory, we should write a pprof sample
time := time.Now()
heapProfilePath := "/tmp/heap_profile" + time.String() + ".pprof"
heapFile, _ := os.Create(heapProfilePath)
pprof.WriteHeapProfile(heapFile)
heapFile.Close()
hasTakenHeapProfile = true
}

select {
case <-timer.C:
timer.Reset(time.Second)
case <-done:
return
}
}
}()

itr := NewIterator(nil, nil, true, tree.ImmutableTree)
defer itr.Close()
for ; itr.Valid(); itr.Next() {
if err = tree.ndb.SaveFastNode(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
if err = tree.ndb.SaveFastNodeNoCache(NewFastNode(itr.Key(), itr.Value(), tree.version)); err != nil {
return err
}
}
Expand Down
18 changes: 13 additions & 5 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,18 @@ func (ndb *nodeDB) SaveNode(node *Node) {
ndb.cacheNode(node)
}

// SaveNode saves a FastNode to disk.
// SaveNode saves a FastNode to disk and add to cache.
func (ndb *nodeDB) SaveFastNode(node *FastNode) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.saveFastNodeUnlocked(node)
return ndb.saveFastNodeUnlocked(node, true)
}

// SaveNode saves a FastNode to disk without adding to cache.
func (ndb *nodeDB) SaveFastNodeNoCache(node *FastNode) error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.saveFastNodeUnlocked(node, false)
}

// setFastStorageVersionToBatch sets storage version to fast where the version is
Expand Down Expand Up @@ -274,7 +281,7 @@ func (ndb *nodeDB) shouldForceFastStorageUpgrade() bool {
}

// SaveNode saves a FastNode to disk.
func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode) error {
func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode, shouldAddToCache bool) error {
if node.key == nil {
return fmt.Errorf("FastNode cannot have a nil value for key")
}
Expand All @@ -290,8 +297,9 @@ func (ndb *nodeDB) saveFastNodeUnlocked(node *FastNode) error {
if err := ndb.batch.Set(ndb.fastNodeKey(node.key), buf.Bytes()); err != nil {
return fmt.Errorf("error while writing key/val to nodedb batch. Err: %w", err)
}
debug("BATCH SAVE %X %p\n", node.key, node)
ndb.cacheFastNode(node)
if shouldAddToCache {
ndb.cacheFastNode(node)
}
return nil
}

Expand Down

0 comments on commit f3fe49e

Please sign in to comment.