@@ -5,11 +5,13 @@ import (
55 "sync"
66)
77
8- var SHARD_COUNT = 32
8+ var DEFAULT_SHARD_COUNT = 32
99
1010// A "thread" safe map of type string:Anything.
11- // To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
12- type ConcurrentMap []* ConcurrentMapShared
11+ // To avoid lock bottlenecks this map is dived to several (m.numShards()) map shards.
12+ type ConcurrentMap struct {
13+ maps []* ConcurrentMapShared
14+ }
1315
1416// A "thread" safe string to anything map.
1517type ConcurrentMapShared struct {
@@ -19,16 +21,33 @@ type ConcurrentMapShared struct {
1921
2022// Creates a new concurrent map.
2123func New () ConcurrentMap {
22- m := make (ConcurrentMap , SHARD_COUNT )
23- for i := 0 ; i < SHARD_COUNT ; i ++ {
24- m [i ] = & ConcurrentMapShared {items : make (map [string ]interface {})}
24+ m := ConcurrentMap {
25+ maps : make ([]* ConcurrentMapShared , DEFAULT_SHARD_COUNT ),
26+ }
27+ for i := 0 ; i < DEFAULT_SHARD_COUNT ; i ++ {
28+ m .maps [i ] = & ConcurrentMapShared {items : make (map [string ]interface {})}
2529 }
2630 return m
2731}
2832
33+ // Creates a new concurrent map with size shards.
34+ func NewWithSize (size int ) ConcurrentMap {
35+ m := ConcurrentMap {
36+ maps : make ([]* ConcurrentMapShared , size ),
37+ }
38+ for i := 0 ; i < size ; i ++ {
39+ m .maps [i ] = & ConcurrentMapShared {items : make (map [string ]interface {})}
40+ }
41+ return m
42+ }
43+
44+ func (m ConcurrentMap ) numShards () int {
45+ return len (m .maps )
46+ }
47+
2948// Returns shard under given key
3049func (m ConcurrentMap ) GetShard (key string ) * ConcurrentMapShared {
31- return m [uint (fnv32 (key ))% uint (SHARD_COUNT )]
50+ return m . maps [uint (fnv32 (key ))% uint (m . numShards () )]
3251}
3352
3453func (m ConcurrentMap ) MSet (data map [string ]interface {}) {
@@ -93,8 +112,8 @@ func (m ConcurrentMap) Get(key string) (interface{}, bool) {
93112// Returns the number of elements within the map.
94113func (m ConcurrentMap ) Count () int {
95114 count := 0
96- for i := 0 ; i < SHARD_COUNT ; i ++ {
97- shard := m [i ]
115+ for i := 0 ; i < m . numShards () ; i ++ {
116+ shard := m . maps [i ]
98117 shard .RLock ()
99118 count += len (shard .items )
100119 shard .RUnlock ()
@@ -191,11 +210,11 @@ func (m ConcurrentMap) IterBuffered() <-chan Tuple {
191210// It returns once the size of each buffered channel is determined,
192211// before all the channels are populated using goroutines.
193212func snapshot (m ConcurrentMap ) (chans []chan Tuple ) {
194- chans = make ([]chan Tuple , SHARD_COUNT )
213+ chans = make ([]chan Tuple , m . numShards () )
195214 wg := sync.WaitGroup {}
196- wg .Add (SHARD_COUNT )
215+ wg .Add (m . numShards () )
197216 // Foreach shard.
198- for index , shard := range m {
217+ for index , shard := range m . maps {
199218 go func (index int , shard * ConcurrentMapShared ) {
200219 // Foreach key, value pair.
201220 shard .RLock ()
@@ -249,8 +268,8 @@ type IterCb func(key string, v interface{})
249268// Callback based iterator, cheapest way to read
250269// all elements in a map.
251270func (m ConcurrentMap ) IterCb (fn IterCb ) {
252- for idx := range m {
253- shard := (m )[idx ]
271+ for idx := range m . maps {
272+ shard := (m ). maps [idx ]
254273 shard .RLock ()
255274 for key , value := range shard .items {
256275 fn (key , value )
@@ -266,8 +285,8 @@ func (m ConcurrentMap) Keys() []string {
266285 go func () {
267286 // Foreach shard.
268287 wg := sync.WaitGroup {}
269- wg .Add (SHARD_COUNT )
270- for _ , shard := range m {
288+ wg .Add (m . numShards () )
289+ for _ , shard := range m . maps {
271290 go func (shard * ConcurrentMapShared ) {
272291 // Foreach key, value pair.
273292 shard .RLock ()
0 commit comments