Skip to content

Commit ab3080f

Browse files
committed
crsync: add sharded counters and general support for sharding
Add support for sharding with CPU locality and add sharded counters. See counters_test.go for benchmark results. Sharding has two implementations, one is be used with the CRDB Go runtime (specifically cockroachdb/go#6) and the `cockroach_go` build tag. We also add a script that can be used to easily run tests against the CRDB Go (along with a CI job).
1 parent 68d0491 commit ab3080f

File tree

7 files changed

+592
-6
lines changed

7 files changed

+592
-6
lines changed

.github/workflows/ci.yaml

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ on:
1212
jobs:
1313

1414
linux:
15-
name: go-linux
1615
strategy:
1716
matrix:
1817
go: ["1.23", "1.24", "1.25"]
@@ -30,7 +29,6 @@ jobs:
3029
- run: go vet ./...
3130

3231
linux-32bit:
33-
name: go-linux-32bit
3432
strategy:
3533
matrix:
3634
go: ["1.23"]
@@ -47,7 +45,6 @@ jobs:
4745
- run: go test ./...
4846

4947
darwin:
50-
name: go-macos
5148
strategy:
5249
matrix:
5350
go: ["1.23"]
@@ -64,7 +61,6 @@ jobs:
6461
- run: go test ./...
6562

6663
linux-stress:
67-
name: go-linux-stress
6864
strategy:
6965
matrix:
7066
go: ["1.23"]
@@ -82,7 +78,6 @@ jobs:
8278
- run: go test ./... -exec 'stress -p 2 -maxruns 1000' -v
8379

8480
linux-stress-race:
85-
name: go-linux-stress-race
8681
strategy:
8782
matrix:
8883
go: ["1.23"]
@@ -98,3 +93,37 @@ jobs:
9893
- run: go install github.com/cockroachdb/stress@latest
9994
- run: go test -tags crlib_invariants ./... -race -exec 'stress -p 1 -maxruns 100' -v
10095
- run: go test ./... -race -exec 'stress -p 1 -maxruns 100' -v
96+
97+
linux-cockroach-go:
98+
runs-on: ubuntu-latest
99+
env:
100+
GO_BRANCH: cockroach-go1.23.12
101+
102+
steps:
103+
- uses: actions/checkout@v4
104+
105+
# Step 1: Fetch the branch tip SHA for cache key
106+
- name: Get cockroachdb/go commit hash
107+
id: go-sha
108+
run: |
109+
SHA=$(git ls-remote https://github.com/cockroachdb/go.git refs/heads/$GO_BRANCH | cut -f1)
110+
echo "GO_SHA=$SHA" >> $GITHUB_ENV
111+
112+
# Step 2: Restore cache (per branch + commit SHA)
113+
- name: Cache custom Go toolchain
114+
uses: actions/cache@v4
115+
with:
116+
path: ~/.cache/cockroachdb-go/${{ env.GO_SHA }}
117+
key: cockroachdb-${{ env.GO_SHA }}
118+
119+
# Step 3: Install bootstrap Go (needed to build fork)
120+
- name: Install bootstrap Go
121+
uses: actions/setup-go@v5
122+
with:
123+
go-version: "1.23.x"
124+
125+
# Step 4: Run tests with custom Go
126+
- run: ./scripts/run-tests-with-custom-go.sh ./...
127+
- run: ./scripts/run-tests-with-custom-go.sh -tags crlib_invariants ./...
128+
- run: ./scripts/run-tests-with-custom-go.sh -race ./...
129+

crsync/counters.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
15+
package crsync
16+
17+
import (
18+
"iter"
19+
"sync/atomic"
20+
"unsafe"
21+
)
22+
23+
// Counter is a single logical counter backed by a sharded implementation
24+
// (Counters) under the hood.
25+
//
26+
// Properties:
27+
// - Thread-safe increments: Add() can be called concurrently from many
28+
// goroutines.
29+
// - Low write contention: Writes are sharded to minimize cache-line
30+
// ping‑pong.
31+
// - Simple reads: Get() aggregates across shards to return the current value.
32+
// - Construction: Use MakeCounter(). The zero value is NOT ready to use.
33+
// - Performance: Add is O(1) with low contention; Get is O(NumShards()).
34+
// - Consistency: Reads are best-effort snapshots without global locking. Each
35+
// shard is read atomically, but the aggregation is not linearizable with
36+
// respect to concurrent Add calls. This is typically acceptable for metrics
37+
// and counters.
38+
//
39+
// Example:
40+
//
41+
// c := MakeCounter()
42+
// c.Add(1)
43+
// c.Add(41)
44+
// fmt.Println(c.Get()) // 42
45+
type Counter struct {
46+
c Counters
47+
}
48+
49+
// MakeCounter initializes a new Counter.
50+
func MakeCounter() Counter {
51+
return Counter{
52+
c: MakeCounters(1),
53+
}
54+
}
55+
56+
// Add atomically adds delta to the counter. It is safe for concurrent use by
57+
// multiple goroutines; delta may be negative (decrement).
58+
//
59+
// Add is very efficient: a single atomic increment on a mostly uncontended
60+
// cache line.
61+
func (c *Counter) Add(delta int64) {
62+
c.c.Add(0, delta)
63+
}
64+
65+
// Get the current value of the counter.
66+
//
67+
// It safe to call Get() while there are concurrent Add() calls (but there is no
68+
// guarantee wrt which of those are reflected).
69+
//
70+
// Get is O(NumShards()) so it is more expensive than Add().
71+
func (c *Counter) Get() int64 {
72+
return c.c.Get(0)
73+
}
74+
75+
// Counters is a sharded set of logical counters that can be incremented
76+
// concurrently with low contention.
77+
//
78+
// Use when you need N independent counters that are updated from many
79+
// goroutines (e.g., metrics like hits/misses/errors, per-state tallies).
80+
//
81+
// Properties:
82+
// - Thread-safe increments: Add() can be called concurrently from many
83+
// goroutines.
84+
// - Low write contention: Writes are sharded to minimize cache-line
85+
// ping‑pong.
86+
// - Simple reads: Get() aggregates across shards to return the current value.
87+
// - Construction: Use MakeCounter(). The zero value is NOT ready to use.
88+
// - Performance: Add is O(1) with low contention; Get is O(NumShards());
89+
// - Consistency: Reads are best-effort snapshots without global locking. Each
90+
// shard is read atomically, but the aggregation is not linearizable with
91+
// respect to concurrent Add calls. This is typically acceptable for metrics
92+
// and counters.
93+
type Counters struct {
94+
numShards uint32
95+
// shardSize is the number of counters per shard.
96+
shardSize uint32
97+
// counters contains numShards * shardSize counters; shardSize is a multiple
98+
// of countersPerCacheLine to avoid false sharing at shard boundaries. Note
99+
// that there is a high correlation between the current CPU and the chosen
100+
// shard, so different counters inside a shard can share cache lines.
101+
//
102+
// We linearize the array instead of using [][]atomic.Int64 to avoid an extra
103+
// pointer load in the fast path.
104+
counters []atomic.Int64
105+
numCounters int
106+
}
107+
108+
// Number of counters per cache line. We assume the typical 64-byte cache line.
109+
// Must be a power of 2.
110+
const countersPerCacheLine = 8
111+
112+
// MakeCounters creates a new Counters with the specified number of counters.
113+
func MakeCounters(numCounters int) Counters {
114+
return makeCounters(NumShards(), numCounters)
115+
}
116+
117+
func makeCounters(numShards, numCounters int) Counters {
118+
// shardSize is the number of counters, rounded up to fill the last cache line
119+
// (to avoid false sharing).
120+
shardSize := (numCounters + countersPerCacheLine - 1) / countersPerCacheLine * countersPerCacheLine
121+
// Allocate all the counters and align the slice to start at a cache line. We
122+
// allocate countersPerCacheLine-1 extra values to allow realignment.
123+
counters := make([]atomic.Int64, shardSize*numShards+countersPerCacheLine-1)
124+
if r := (uintptr(unsafe.Pointer(&counters[0])) / unsafe.Sizeof(atomic.Int64{})) % countersPerCacheLine; r != 0 {
125+
counters = counters[countersPerCacheLine-r:]
126+
}
127+
return Counters{
128+
numShards: uint32(numShards),
129+
shardSize: uint32(shardSize),
130+
counters: counters,
131+
numCounters: numCounters,
132+
}
133+
}
134+
135+
// Add atomically adds delta to the specified counter. It is safe for concurrent
136+
// use by multiple goroutines; delta may be negative (decrement).
137+
//
138+
// Add is very efficient: a single atomic increment on a mostly uncontended
139+
// cache line.
140+
func (c *Counters) Add(counter int, delta int64) {
141+
shard := uint32(CPUBiasedInt()) % c.numShards
142+
c.counters[shard*c.shardSize+uint32(counter)].Add(delta)
143+
}
144+
145+
// Get the current value of the specified counter.
146+
//
147+
// It safe to call Get() while there are concurrent Add() calls (but there is no
148+
// guarantee wrt which of those are reflected).
149+
//
150+
// Get is O(NumShards()) so it is more expensive than Add().
151+
func (c *Counters) Get(counter int) int64 {
152+
var res int64
153+
for shard := range c.numShards {
154+
res += c.counters[shard*c.shardSize+uint32(counter)].Load()
155+
}
156+
return res
157+
}
158+
159+
// All iterates through the current values of all counters (in order).
160+
//
161+
// Complexity is O(NumShards() * numCounters). All is safe for concurrent use,
162+
// but there are no ordering guarantees w.r.t. concurrent updates.
163+
//
164+
// All is designed to minimize disruption to concurrent Add() calls and is
165+
// preferable to multiple Get() calls when all counter values are needed.
166+
func (c *Counters) All() iter.Seq[int64] {
167+
return func(yield func(int64) bool) {
168+
// To access each cache line only once, we calculate countersPerCacheLine
169+
// counters at a time.
170+
var vals [countersPerCacheLine]int64
171+
for i := 0; i < c.numCounters; i += countersPerCacheLine {
172+
vals = [countersPerCacheLine]int64{}
173+
n := min(c.numCounters-i, countersPerCacheLine)
174+
for s := range c.numShards {
175+
start := int(s*c.shardSize) + i
176+
counters := c.counters[start : start+n]
177+
// Avoid bound checks inside the loop.
178+
_ = vals[len(counters)-1]
179+
for j := range counters {
180+
vals[j] += counters[j].Load()
181+
}
182+
}
183+
for j := range n {
184+
if !yield(vals[j]) {
185+
return
186+
}
187+
}
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)