Skip to content

Commit e95eaec

Browse files
authored
Replace usage of sync/atomic with uber-go/atomic (cortexproject#2951)
* chunk/cache: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * util: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * util/services: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * ingester: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * querier/frontend: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * querier/queryrange: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * ring: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * ring/kv/memberlist: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * storage/tsdb: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * storagegateway: Replace usage of sync/atomic with uber-go/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com> * faillint: Ensure we are not using sync/atomic Signed-off-by: Javier Palomo <javier.palomo.almena@gmail.com>
1 parent 0cb6ff1 commit e95eaec

File tree

14 files changed

+78
-74
lines changed

14 files changed

+78
-74
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,9 @@ lint:
140140
golangci-lint run
141141

142142
# Ensure no blacklisted package is imported.
143-
faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,golang.org/x/net/context=context" ./pkg/... ./cmd/... ./tools/... ./integration/...
143+
faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,\
144+
golang.org/x/net/context=context,\
145+
sync/atomic=go.uber.org/atomic" ./pkg/... ./cmd/... ./tools/... ./integration/...
144146

145147
# Validate Kubernetes spec files. Requires:
146148
# https://kubeval.instrumenta.dev

pkg/chunk/cache/memcached_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package cache_test
33
import (
44
"context"
55
"errors"
6-
"sync/atomic"
76
"testing"
87

98
"github.com/bradfitz/gomemcache/memcache"
109
"github.com/go-kit/kit/log"
1110
"github.com/stretchr/testify/require"
11+
"go.uber.org/atomic"
1212

1313
"github.com/cortexproject/cortex/pkg/chunk/cache"
1414
)
@@ -71,7 +71,7 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) {
7171
// mockMemcache whose calls fail 1/3rd of the time.
7272
type mockMemcacheFailing struct {
7373
*mockMemcache
74-
calls uint64
74+
calls atomic.Uint64
7575
}
7676

7777
func newMockMemcacheFailing() *mockMemcacheFailing {
@@ -81,7 +81,7 @@ func newMockMemcacheFailing() *mockMemcacheFailing {
8181
}
8282

8383
func (c *mockMemcacheFailing) GetMulti(keys []string) (map[string]*memcache.Item, error) {
84-
calls := atomic.AddUint64(&c.calls, 1)
84+
calls := c.calls.Inc()
8585
if calls%3 == 0 {
8686
return nil, errors.New("fail")
8787
}

pkg/ingester/mapper.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"sort"
66
"strings"
77
"sync"
8-
"sync/atomic"
98

109
"github.com/go-kit/kit/log/level"
1110
"github.com/prometheus/common/model"
11+
"go.uber.org/atomic"
1212

1313
"github.com/cortexproject/cortex/pkg/util"
1414
)
@@ -24,8 +24,7 @@ type fpMappings map[model.Fingerprint]map[string]model.Fingerprint
2424
// fpMapper is used to map fingerprints in order to work around fingerprint
2525
// collisions.
2626
type fpMapper struct {
27-
// highestMappedFP has to be aligned for atomic operations.
28-
highestMappedFP model.Fingerprint
27+
highestMappedFP atomic.Uint64
2928

3029
mtx sync.RWMutex // Protects mappings.
3130
mappings fpMappings
@@ -130,7 +129,7 @@ func (m *fpMapper) maybeAddMapping(
130129
}
131130

132131
func (m *fpMapper) nextMappedFP() model.Fingerprint {
133-
mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
132+
mappedFP := model.Fingerprint(m.highestMappedFP.Inc())
134133
if mappedFP > maxMappedFP {
135134
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
136135
}

pkg/ingester/rate.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ package ingester
22

33
import (
44
"sync"
5-
"sync/atomic"
65
"time"
6+
7+
"go.uber.org/atomic"
78
)
89

910
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
1011
type ewmaRate struct {
11-
newEvents int64
12+
newEvents atomic.Int64
1213
alpha float64
1314
interval time.Duration
1415
lastRate float64
@@ -32,8 +33,8 @@ func (r *ewmaRate) rate() float64 {
3233

3334
// tick assumes to be called every r.interval.
3435
func (r *ewmaRate) tick() {
35-
newEvents := atomic.LoadInt64(&r.newEvents)
36-
atomic.AddInt64(&r.newEvents, -newEvents)
36+
newEvents := r.newEvents.Load()
37+
r.newEvents.Sub(newEvents)
3738
instantRate := float64(newEvents) / r.interval.Seconds()
3839

3940
r.mutex.Lock()
@@ -49,9 +50,9 @@ func (r *ewmaRate) tick() {
4950

5051
// inc counts one event.
5152
func (r *ewmaRate) inc() {
52-
atomic.AddInt64(&r.newEvents, 1)
53+
r.newEvents.Inc()
5354
}
5455

5556
func (r *ewmaRate) add(delta int64) {
56-
atomic.AddInt64(&r.newEvents, delta)
57+
r.newEvents.Add(delta)
5758
}

pkg/ingester/series_map.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ package ingester
22

33
import (
44
"sync"
5-
"sync/atomic"
65
"unsafe"
76

87
"github.com/prometheus/common/model"
8+
"go.uber.org/atomic"
99

1010
"github.com/cortexproject/cortex/pkg/util"
1111
)
@@ -16,7 +16,7 @@ const seriesMapShards = 128
1616
// goroutine-safe. A seriesMap is effectively a goroutine-safe version of
1717
// map[model.Fingerprint]*memorySeries.
1818
type seriesMap struct {
19-
size int32
19+
size atomic.Int32
2020
shards []shard
2121
}
2222

@@ -65,7 +65,7 @@ func (sm *seriesMap) put(fp model.Fingerprint, s *memorySeries) {
6565
shard.mtx.Unlock()
6666

6767
if !ok {
68-
atomic.AddInt32(&sm.size, 1)
68+
sm.size.Inc()
6969
}
7070
}
7171

@@ -77,7 +77,7 @@ func (sm *seriesMap) del(fp model.Fingerprint) {
7777
delete(shard.m, fp)
7878
shard.mtx.Unlock()
7979
if ok {
80-
atomic.AddInt32(&sm.size, -1)
80+
sm.size.Dec()
8181
}
8282
}
8383

@@ -106,5 +106,5 @@ func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
106106
}
107107

108108
func (sm *seriesMap) length() int {
109-
return int(atomic.LoadInt32(&sm.size))
109+
return int(sm.size.Load())
110110
}

pkg/querier/frontend/frontend_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"net"
99
"net/http"
1010
"net/http/httptest"
11-
"sync/atomic"
1211
"testing"
1312
"time"
1413

@@ -25,7 +24,7 @@ import (
2524
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
2625
"github.com/weaveworks/common/middleware"
2726
"github.com/weaveworks/common/user"
28-
uber_atomic "go.uber.org/atomic"
27+
"go.uber.org/atomic"
2928
"google.golang.org/grpc"
3029

3130
"github.com/cortexproject/cortex/pkg/querier"
@@ -172,10 +171,10 @@ func TestFrontend_RequestHostHeaderWhenDownstreamURLIsConfigured(t *testing.T) {
172171
// TestFrontendCancel ensures that when client requests are cancelled,
173172
// the underlying query is correctly cancelled _and not retried_.
174173
func TestFrontendCancel(t *testing.T) {
175-
var tries int32
174+
var tries atomic.Int32
176175
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
177176
<-r.Context().Done()
178-
atomic.AddInt32(&tries, 1)
177+
tries.Inc()
179178
})
180179
test := func(addr string) {
181180
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
@@ -195,10 +194,10 @@ func TestFrontendCancel(t *testing.T) {
195194
require.Error(t, err)
196195

197196
time.Sleep(100 * time.Millisecond)
198-
assert.Equal(t, int32(1), atomic.LoadInt32(&tries))
197+
assert.Equal(t, int32(1), tries.Load())
199198
}
200199
testFrontend(t, defaultFrontendConfig(), handler, test, false)
201-
tries = 0
200+
tries.Store(0)
202201
testFrontend(t, defaultFrontendConfig(), handler, test, true)
203202
}
204203

@@ -234,7 +233,7 @@ func TestFrontendCheckReady(t *testing.T) {
234233
} {
235234
t.Run(tt.name, func(t *testing.T) {
236235
f := &Frontend{
237-
connectedClients: uber_atomic.NewInt32(tt.connectedClients),
236+
connectedClients: atomic.NewInt32(tt.connectedClients),
238237
log: log.NewNopLogger(),
239238
cfg: Config{
240239
DownstreamURL: tt.downstreamURL,

pkg/querier/queryrange/retry_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ import (
55
"errors"
66
fmt "fmt"
77
"net/http"
8-
"sync/atomic"
98
"testing"
109

1110
"github.com/go-kit/kit/log"
1211
"github.com/stretchr/testify/require"
1312
"github.com/weaveworks/common/httpgrpc"
13+
"go.uber.org/atomic"
1414
)
1515

1616
func TestRetry(t *testing.T) {
17-
var try int32
17+
var try atomic.Int32
1818

1919
for _, tc := range []struct {
2020
name string
@@ -25,7 +25,7 @@ func TestRetry(t *testing.T) {
2525
{
2626
name: "retry failures",
2727
handler: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
28-
if atomic.AddInt32(&try, 1) == 5 {
28+
if try.Inc() == 5 {
2929
return &PrometheusResponse{Status: "Hello World"}, nil
3030
}
3131
return nil, fmt.Errorf("fail")
@@ -49,7 +49,7 @@ func TestRetry(t *testing.T) {
4949
{
5050
name: "last error",
5151
handler: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
52-
if atomic.AddInt32(&try, 1) == 5 {
52+
if try.Inc() == 5 {
5353
return nil, httpgrpc.Errorf(http.StatusBadRequest, "Bad Request")
5454
}
5555
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Internal Server Error")
@@ -58,7 +58,7 @@ func TestRetry(t *testing.T) {
5858
},
5959
} {
6060
t.Run(tc.name, func(t *testing.T) {
61-
try = 0
61+
try.Store(0)
6262
h := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(tc.handler)
6363
resp, err := h.Do(context.Background(), nil)
6464
require.Equal(t, tc.err, err)
@@ -68,26 +68,26 @@ func TestRetry(t *testing.T) {
6868
}
6969

7070
func Test_RetryMiddlewareCancel(t *testing.T) {
71-
var try int32
71+
var try atomic.Int32
7272
ctx, cancel := context.WithCancel(context.Background())
7373
cancel()
7474
_, err := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(
7575
HandlerFunc(func(c context.Context, r Request) (Response, error) {
76-
atomic.AddInt32(&try, 1)
76+
try.Inc()
7777
return nil, ctx.Err()
7878
}),
7979
).Do(ctx, nil)
80-
require.Equal(t, int32(0), try)
80+
require.Equal(t, int32(0), try.Load())
8181
require.Equal(t, ctx.Err(), err)
8282

8383
ctx, cancel = context.WithCancel(context.Background())
8484
_, err = NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(
8585
HandlerFunc(func(c context.Context, r Request) (Response, error) {
86-
atomic.AddInt32(&try, 1)
86+
try.Inc()
8787
cancel()
8888
return nil, errors.New("failed")
8989
}),
9090
).Do(ctx, nil)
91-
require.Equal(t, int32(1), try)
91+
require.Equal(t, int32(1), try.Load())
9292
require.Equal(t, ctx.Err(), err)
9393
}

pkg/querier/queryrange/split_by_interval_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ import (
77
"net/http/httptest"
88
"net/url"
99
"strconv"
10-
"sync/atomic"
1110
"testing"
1211
"time"
1312

1413
"github.com/stretchr/testify/require"
1514
"github.com/weaveworks/common/middleware"
1615
"github.com/weaveworks/common/user"
16+
"go.uber.org/atomic"
1717
)
1818

1919
const seconds = 1e3 // 1e3 milliseconds per second.
@@ -261,11 +261,11 @@ func TestSplitByDay(t *testing.T) {
261261
} {
262262
t.Run(strconv.Itoa(i), func(t *testing.T) {
263263

264-
actualCount := int32(0)
264+
var actualCount atomic.Int32
265265
s := httptest.NewServer(
266266
middleware.AuthenticateUser.Wrap(
267267
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
268-
atomic.AddInt32(&actualCount, 1)
268+
actualCount.Inc()
269269
_, _ = w.Write([]byte(responseBody))
270270
}),
271271
),
@@ -293,7 +293,7 @@ func TestSplitByDay(t *testing.T) {
293293
bs, err := ioutil.ReadAll(resp.Body)
294294
require.NoError(t, err)
295295
require.Equal(t, tc.expectedBody, string(bs))
296-
require.Equal(t, tc.expectedQueryCount, actualCount)
296+
require.Equal(t, tc.expectedQueryCount, actualCount.Load())
297297
})
298298
}
299299
}

pkg/ring/batch.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import (
44
"context"
55
"fmt"
66
"sync"
7-
"sync/atomic"
7+
8+
"go.uber.org/atomic"
89
)
910

1011
type batchTracker struct {
11-
rpcsPending int32
12-
rpcsFailed int32
12+
rpcsPending atomic.Int32
13+
rpcsFailed atomic.Int32
1314
done chan struct{}
1415
err chan error
1516
}
@@ -23,8 +24,8 @@ type ingester struct {
2324
type itemTracker struct {
2425
minSuccess int
2526
maxFailures int
26-
succeeded int32
27-
failed int32
27+
succeeded atomic.Int32
28+
failed atomic.Int32
2829
}
2930

3031
// DoBatch request against a set of keys in the ring, handling replication and
@@ -70,10 +71,10 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges
7071
}
7172

7273
tracker := batchTracker{
73-
rpcsPending: int32(len(itemTrackers)),
74-
done: make(chan struct{}, 1),
75-
err: make(chan error, 1),
74+
done: make(chan struct{}, 1),
75+
err: make(chan error, 1),
7676
}
77+
tracker.rpcsPending.Store(int32(len(itemTrackers)))
7778

7879
var wg sync.WaitGroup
7980

@@ -115,17 +116,17 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
115116
// goroutine will write to either channel.
116117
for i := range sampleTrackers {
117118
if err != nil {
118-
if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) {
119+
if sampleTrackers[i].failed.Inc() <= int32(sampleTrackers[i].maxFailures) {
119120
continue
120121
}
121-
if atomic.AddInt32(&b.rpcsFailed, 1) == 1 {
122+
if b.rpcsFailed.Inc() == 1 {
122123
b.err <- err
123124
}
124125
} else {
125-
if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) {
126+
if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) {
126127
continue
127128
}
128-
if atomic.AddInt32(&b.rpcsPending, -1) == 0 {
129+
if b.rpcsPending.Dec() == 0 {
129130
b.done <- struct{}{}
130131
}
131132
}

0 commit comments

Comments
 (0)