From c1f7647ceb81f4bc7aba29f0a7c651963d8a44a9 Mon Sep 17 00:00:00 2001 From: Anish Mukherjee Date: Thu, 15 Sep 2022 10:43:25 +0530 Subject: [PATCH] refactor --- .github/workflows/go.yml | 2 +- README.md | 2 +- benchmarks/map_test.go | 2 +- tests/e2e_test.go => e2e_test.go | 49 +++++--- hash.go | 2 +- list.go | 5 +- map.go | 194 +++++++++++++++---------------- 7 files changed, 133 insertions(+), 123 deletions(-) rename tests/e2e_test.go => e2e_test.go (84%) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 98f1c5f..153fc47 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,4 +19,4 @@ jobs: - name: Test run: | - go test tests/e2e_test.go + go test *.go diff --git a/README.md b/README.md index 6122b92..d62567e 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Main Actions Status](https://github.com/alphadose/haxmap/workflows/Go/badge.svg)](https://github.com/alphadose/haxmap/actions) [![Go Report Card](https://goreportcard.com/badge/github.com/alphadose/haxmap)](https://goreportcard.com/report/github.com/alphadose/haxmap) [![License](https://img.shields.io/badge/license-MIT-blue.svg)](./LICENSE.md) -> A blazing fast concurrent hashmap +> A lightning fast concurrent hashmap The hashing algorithm used was [xxHash](https://github.com/Cyan4973/xxHash) and the hashmap's buckets were implemented using [Harris lock-free list](https://www.cl.cam.ac.uk/research/srg/netos/papers/2001-caslists.pdf) diff --git a/benchmarks/map_test.go b/benchmarks/map_test.go index c686505..9f63a58 100644 --- a/benchmarks/map_test.go +++ b/benchmarks/map_test.go @@ -14,7 +14,7 @@ const ( mapSize = 256 ) -func setupHaxMap() *haxmap.HashMap[uintptr, uintptr] { +func setupHaxMap() *haxmap.Map[uintptr, uintptr] { m := haxmap.New[uintptr, uintptr](mapSize) for i := uintptr(0); i < epochs; i++ { m.Set(i, i) diff --git a/tests/e2e_test.go b/e2e_test.go similarity index 84% rename from tests/e2e_test.go rename to e2e_test.go index 40ad160..f1ff9f2 100644 --- a/tests/e2e_test.go +++ b/e2e_test.go @@ -1,4 +1,4 @@ -package test +package haxmap import ( "fmt" @@ -8,8 +8,6 @@ import ( "sync/atomic" "testing" "time" - - "github.com/alphadose/haxmap" ) type Animal struct { @@ -17,14 +15,14 @@ type Animal struct { } func TestMapCreation(t *testing.T) { - m := haxmap.New[int, int]() + m := New[int, int]() if m.Len() != 0 { t.Errorf("new map should be empty but has %d items.", m.Len()) } } func TestOverwrite(t *testing.T) { - m := haxmap.New[uint, string]() + m := New[uint, string]() key := uint(1) cat := "cat" tiger := "tiger" @@ -46,7 +44,7 @@ func TestOverwrite(t *testing.T) { } func TestSet(t *testing.T) { - m := haxmap.New[int, string](4) + m := New[int, string](4) m.Set(4, "cat") m.Set(3, "cat") @@ -59,7 +57,7 @@ func TestSet(t *testing.T) { } func TestGet(t *testing.T) { - m := haxmap.New[string, string]() + m := New[string, string]() cat := "cat" key := "animal" @@ -86,18 +84,18 @@ func TestGet(t *testing.T) { } func TestGrow(t *testing.T) { - m := haxmap.New[uint, uint]() + m := New[uint, uint]() m.Grow(63) - d := m.Datamap.Load() + d := m.metadata.Load() log := int(math.Log2(64)) expectedSize := uintptr(strconv.IntSize - log) - if d.Keyshifts != expectedSize { + if d.keyshifts != expectedSize { t.Errorf("Grow operation did not result in correct internal map data structure, Dump -> %#v", d) } } func TestDelete(t *testing.T) { - m := haxmap.New[int, *Animal]() + m := New[int, *Animal]() cat := &Animal{"cat"} tiger := &Animal{"tiger"} @@ -132,7 +130,7 @@ func TestDelete(t *testing.T) { } func TestIterator(t *testing.T) { - m := haxmap.New[int, *Animal]() + m := New[int, *Animal]() m.ForEach(func(i int, a *Animal) bool { t.Errorf("map should be empty but got key -> %d and value -> %#v.", i, a) @@ -161,7 +159,7 @@ func TestIterator(t *testing.T) { func TestMapParallel(t *testing.T) { max := 10 dur := 2 * time.Second - m := haxmap.New[int, int]() + m := New[int, int]() do := func(t *testing.T, max int, d time.Duration, fn func(*testing.T, int)) <-chan error { t.Helper() done := make(chan error) @@ -229,19 +227,19 @@ func TestMapParallel(t *testing.T) { } func TestMapConcurrentWrites(t *testing.T) { - blocks := haxmap.New[string, struct{}]() + blocks := New[string, struct{}]() var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) - go func(blocks *haxmap.HashMap[string, struct{}], i int) { + go func(blocks *Map[string, struct{}], i int) { defer wg.Done() blocks.Set(strconv.Itoa(i), struct{}{}) wg.Add(1) - go func(blocks *haxmap.HashMap[string, struct{}], i int) { + go func(blocks *Map[string, struct{}], i int) { defer wg.Done() blocks.Get(strconv.Itoa(i)) @@ -254,7 +252,7 @@ func TestMapConcurrentWrites(t *testing.T) { // Collision test case when hash key is 0 in value for all entries func TestHash0Collision(t *testing.T) { - m := haxmap.New[string, int]() + m := New[string, int]() staticHasher := func(key string) uintptr { return 0 } @@ -270,3 +268,20 @@ func TestHash0Collision(t *testing.T) { t.Error("2 not found") } } + +// test map freezing issue +// https://github.com/alphadose/haxmap/issues/7 +// https://github.com/alphadose/haxmap/issues/8 +// Update:- Solved now +func TestInfiniteLoop(t *testing.T) { + t.Run("infinite loop", func(b *testing.T) { + m := New[int, int](512) + for i := 0; i < 112050; i++ { + if i > 112024 { + m.Set(i, i) // set debug point here and step into until .inject + } else { + m.Set(i, i) + } + } + }) +} diff --git a/hash.go b/hash.go index ba9b8d6..bd95887 100644 --- a/hash.go +++ b/hash.go @@ -183,7 +183,7 @@ var ( } ) -func (m *HashMap[K, V]) setDefaultHasher() { +func (m *Map[K, V]) setDefaultHasher() { // default hash functions switch any(*new(K)).(type) { case string: diff --git a/list.go b/list.go index b4c4af5..2aa7278 100644 --- a/list.go +++ b/list.go @@ -106,10 +106,7 @@ func (self *element[K, V]) search(c uintptr, key K) (*element[K, V], *element[K, // remove removes the current node func (self *element[K, V]) remove() { - for { - if self.add(marked) { - return - } + for !self.add(marked) { } } diff --git a/map.go b/map.go index 03c5ba9..cc3b603 100644 --- a/map.go +++ b/map.go @@ -30,26 +30,27 @@ type ( int | int8 | int16 | int32 | int64 | uint | uint8 | uint16 | uint32 | uint64 | uintptr | float32 | float64 | string | complex64 | complex128 } - hashMapData[K hashable, V any] struct { - Keyshifts uintptr // array_size - log2(array_size) + // metadata of the hashmap + metadata[K hashable, V any] struct { + keyshifts uintptr // array_size - log2(array_size) count atomic.Uintptr // number of filled items - data unsafe.Pointer // pointer to array + data unsafe.Pointer // pointer to array of map indexes index []*element[K, V] } - // HashMap implements the concurrent hashmap - HashMap[K hashable, V any] struct { + // Map implements the concurrent hashmap + Map[K hashable, V any] struct { listHead *element[K, V] // Harris lock-free list of elements in ascending order of hash hasher func(K) uintptr - Datamap atomic.Pointer[hashMapData[K, V]] // atomic.Pointer for safe access even during resizing + metadata atomic.Pointer[metadata[K, V]] // atomic.Pointer for safe access even during resizing resizing atomic.Uint32 numItems atomic.Uintptr } ) // New returns a new HashMap instance with an optional specific initialization size -func New[K hashable, V any](size ...uintptr) *HashMap[K, V] { - m := &HashMap[K, V]{listHead: newListHead[K, V]()} +func New[K hashable, V any](size ...uintptr) *Map[K, V] { + m := &Map[K, V]{listHead: newListHead[K, V]()} m.numItems.Store(0) if len(size) > 0 { m.allocate(size[0]) @@ -60,58 +61,45 @@ func New[K hashable, V any](size ...uintptr) *HashMap[K, V] { return m } -// indexElement returns the index of a hash key, returns `nil` if absent -func (mapData *hashMapData[K, V]) indexElement(hashedKey uintptr) *element[K, V] { - index := hashedKey >> mapData.Keyshifts - ptr := (*unsafe.Pointer)(unsafe.Pointer(uintptr(mapData.data) + index*intSizeBytes)) - item := (*element[K, V])(atomic.LoadPointer(ptr)) - for (item == nil || hashedKey < item.keyHash) && index > 0 { - index-- - ptr = (*unsafe.Pointer)(unsafe.Pointer(uintptr(mapData.data) + index*intSizeBytes)) - item = (*element[K, V])(atomic.LoadPointer(ptr)) - } - return item -} - // Del deletes the key from the map // does nothing if key is absemt -func (m *HashMap[K, V]) Del(key K) { +func (m *Map[K, V]) Del(key K) { h := m.hasher(key) - element := m.Datamap.Load().indexElement(h) - iter := element + elem := m.metadata.Load().indexElement(h) + iter := elem loop: - for ; element != nil; element = element.next() { - if element.keyHash == h && element.key == key { + for ; elem != nil; elem = elem.next() { + if elem.keyHash == h && elem.key == key { break loop } - if element.keyHash > h { + if elem.keyHash > h { return } } - if element == nil { + if elem == nil { return } - element.remove() + elem.remove() // if index element is the same as the element to be deleted then start from list head - if element.key == iter.key { + if elem.key == iter.key { iter = m.listHead } // ensure complete deletion by iterating the list from the nearest index whenever possible for ; iter != nil; iter = iter.next() { } for { - data := m.Datamap.Load() - index := element.keyHash >> data.Keyshifts + data := m.metadata.Load() + index := elem.keyHash >> data.keyshifts ptr := (*unsafe.Pointer)(unsafe.Pointer(uintptr(data.data) + index*intSizeBytes)) - next := element.next() - if next != nil && element.keyHash>>data.Keyshifts != index { + next := elem.next() + if next != nil && elem.keyHash>>data.keyshifts != index { next = nil // do not set index to next item if it's not the same slice index } - atomic.CompareAndSwapPointer(ptr, unsafe.Pointer(element), unsafe.Pointer(next)) + atomic.CompareAndSwapPointer(ptr, unsafe.Pointer(elem), unsafe.Pointer(next)) - if data == m.Datamap.Load() { // check that no resize happened + if data == m.metadata.Load() { // check that no resize happened m.numItems.Add(marked) return } @@ -120,15 +108,15 @@ loop: // Get retrieves an element from the map // returns `falseā€œ if element is absent -func (m *HashMap[K, V]) Get(key K) (value V, ok bool) { +func (m *Map[K, V]) Get(key K) (value V, ok bool) { h := m.hasher(key) // inline search - for elem := m.Datamap.Load().indexElement(h); elem != nil; elem = elem.nextPtr.Load() { + for elem := m.metadata.Load().indexElement(h); elem != nil; elem = elem.nextPtr.Load() { if elem.keyHash == h && elem.key == key { value, ok = *elem.value.Load(), true return } - if elem.keyHash == marked || elem.keyHash <= h { + if elem.keyHash <= h || elem.keyHash == marked { continue } else { break @@ -141,7 +129,7 @@ func (m *HashMap[K, V]) Get(key K) (value V, ok bool) { // Set tries to update an element if key is present else it inserts a new element // If a resizing operation is happening concurrently while calling Set() // then the item might show up in the map only after the resize operation is finished -func (m *HashMap[K, V]) Set(key K, value V) { +func (m *Map[K, V]) Set(key K, value V) { h, valPtr := m.hasher(key), &value var ( alloc *element[K, V] @@ -149,7 +137,7 @@ func (m *HashMap[K, V]) Set(key K, value V) { ) start: - data := m.Datamap.Load() + data := m.metadata.Load() if data == nil { m.Grow(defaultSize) goto start // read mapdata and slice item again @@ -164,50 +152,13 @@ start: count := data.addItemToIndex(alloc) if resizeNeeded(uintptr(len(data.index)), count) && m.resizing.CompareAndSwap(notResizing, resizingInProgress) { - m.grow(0, true) - } -} - -// addItemToIndex adds an item to the index if needed and returns the new item counter if it changed, otherwise 0 -func (mapData *hashMapData[K, V]) addItemToIndex(item *element[K, V]) uintptr { - index := item.keyHash >> mapData.Keyshifts - ptr := (*unsafe.Pointer)(unsafe.Pointer(uintptr(mapData.data) + index*intSizeBytes)) - for { - element := (*element[K, V])(atomic.LoadPointer(ptr)) - if element == nil { - if atomic.CompareAndSwapPointer(ptr, nil, unsafe.Pointer(item)) { - return mapData.count.Add(1) - } - continue - } - - if item.keyHash < element.keyHash { - if !atomic.CompareAndSwapPointer(ptr, unsafe.Pointer(element), unsafe.Pointer(item)) { - continue - } - } - return 0 - } -} - -// fillIndexItems re-indexes the map given the latest state of the linked list -func (m *HashMap[K, V]) fillIndexItems(mapData *hashMapData[K, V]) { - first := m.listHead - item := first - lastIndex := uintptr(0) - for item != nil { - index := item.keyHash >> mapData.Keyshifts - if item == first || index != lastIndex { - mapData.addItemToIndex(item) - lastIndex = index - } - item = item.next() + m.grow(0) // double in size } } // ForEach iterates over key-value pairs and executes the lambda provided for each such pair // lambda must return `true` to continue iteration and `false` to break iteration -func (m *HashMap[K, V]) ForEach(lambda func(K, V) bool) { +func (m *Map[K, V]) ForEach(lambda func(K, V) bool) { for item := m.listHead.nextPtr.Load(); item != nil; item = item.nextPtr.Load() { if item.keyHash == marked { continue @@ -223,63 +174,74 @@ func (m *HashMap[K, V]) ForEach(lambda func(K, V) bool) { // This function returns immediately, the resize operation is done in a goroutine // No resizing is done in case of another resize operation already being in progress // Growth and map bucket policy is inspired from https://github.com/cornelk/hashmap -func (m *HashMap[K, V]) Grow(newSize uintptr) { +func (m *Map[K, V]) Grow(newSize uintptr) { if m.resizing.CompareAndSwap(notResizing, resizingInProgress) { - m.grow(newSize, true) + m.grow(newSize) } } // SetHasher sets the hash function to the one provided by the user -func (m *HashMap[K, V]) SetHasher(hs func(K) uintptr) { +func (m *Map[K, V]) SetHasher(hs func(K) uintptr) { m.hasher = hs } // Len returns the number of key-value pairs within the map -func (m *HashMap[K, V]) Len() uintptr { +func (m *Map[K, V]) Len() uintptr { return m.numItems.Load() } // Fillrate returns the fill rate of the map as an percentage integer -func (m *HashMap[K, V]) Fillrate() uintptr { - data := m.Datamap.Load() +func (m *Map[K, V]) Fillrate() uintptr { + data := m.metadata.Load() return (data.count.Load() * 100) / uintptr(len(data.index)) } // allocate map with the given size -func (m *HashMap[K, V]) allocate(newSize uintptr) { +func (m *Map[K, V]) allocate(newSize uintptr) { if m.resizing.CompareAndSwap(notResizing, resizingInProgress) { - m.grow(newSize, false) + m.grow(newSize) + } +} + +// fillIndexItems re-indexes the map given the latest state of the linked list +func (m *Map[K, V]) fillIndexItems(mapData *metadata[K, V]) { + first := m.listHead + item := first + lastIndex := uintptr(0) + for item != nil { + index := item.keyHash >> mapData.keyshifts + if item == first || index != lastIndex { + mapData.addItemToIndex(item) + lastIndex = index + } + item = item.next() } } // grow to the new size -func (m *HashMap[K, V]) grow(newSize uintptr, loop bool) { +func (m *Map[K, V]) grow(newSize uintptr) { defer m.resizing.CompareAndSwap(resizingInProgress, notResizing) for { - currentStore := m.Datamap.Load() + currentStore := m.metadata.Load() if newSize == 0 { newSize = uintptr(len(currentStore.index)) << 1 } else { newSize = roundUpPower2(newSize) } - index := make([]*element[K, V], newSize, newSize) + index := make([]*element[K, V], newSize) header := (*reflect.SliceHeader)(unsafe.Pointer(&index)) - newdata := &hashMapData[K, V]{ - Keyshifts: strconv.IntSize - log2(newSize), + newdata := &metadata[K, V]{ + keyshifts: strconv.IntSize - log2(newSize), data: unsafe.Pointer(header.Data), index: index, } m.fillIndexItems(newdata) // re-index with longer and more widespread keys - m.Datamap.Store(newdata) - - if !loop { - return - } + m.metadata.Store(newdata) if !resizeNeeded(newSize, uintptr(m.Len())) { return @@ -288,6 +250,42 @@ func (m *HashMap[K, V]) grow(newSize uintptr, loop bool) { } } +// indexElement returns the index of a hash key, returns `nil` if absent +func (md *metadata[K, V]) indexElement(hashedKey uintptr) *element[K, V] { + index := hashedKey >> md.keyshifts + ptr := (*unsafe.Pointer)(unsafe.Pointer(uintptr(md.data) + index*intSizeBytes)) + item := (*element[K, V])(atomic.LoadPointer(ptr)) + for (item == nil || hashedKey < item.keyHash) && index > 0 { + index-- + ptr = (*unsafe.Pointer)(unsafe.Pointer(uintptr(md.data) + index*intSizeBytes)) + item = (*element[K, V])(atomic.LoadPointer(ptr)) + } + return item +} + +// addItemToIndex adds an item to the index if needed and returns the new item counter if it changed, otherwise 0 +func (md *metadata[K, V]) addItemToIndex(item *element[K, V]) uintptr { + index := item.keyHash >> md.keyshifts + ptr := (*unsafe.Pointer)(unsafe.Pointer(uintptr(md.data) + index*intSizeBytes)) + for { + elem := (*element[K, V])(atomic.LoadPointer(ptr)) + if elem == nil { + if atomic.CompareAndSwapPointer(ptr, nil, unsafe.Pointer(item)) { + return md.count.Add(1) + } + continue + } + + if item.keyHash < elem.keyHash { + if !atomic.CompareAndSwapPointer(ptr, unsafe.Pointer(elem), unsafe.Pointer(item)) { + continue + } + } + return 0 + } +} + +// check if resize is needed func resizeNeeded(length, count uintptr) bool { return (count*100)/length > maxFillRate }