Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(wip)raft: introduce term cache #141524

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/client_manual_proposal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
Expand Down Expand Up @@ -228,6 +229,7 @@ LIMIT
SyncWaiter: swl,
EntryCache: raftentry.NewCache(1024),
Settings: st,
TermCache: raft.NewTermCache(kvserver.TermCacheSize),
Metrics: logstore.Metrics{
RaftLogCommitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Expand Down
29 changes: 28 additions & 1 deletion pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ type AppendStats struct {

// Metrics contains metrics specific to the log storage.
type Metrics struct {
RaftLogCommitLatency metric.IHistogram
RaftLogCommitLatency metric.IHistogram
LoadTermFromStorageLatency metric.IHistogram
TermCacheAccesses *metric.Counter
TermCacheHits *metric.Counter
}

// LogStore is a stub of a separated Raft log storage.
Expand All @@ -186,6 +189,9 @@ type LogStore struct {
Settings *cluster.Settings
Metrics Metrics

// p4 termCache
TermCache *raft.TermCache

DisableSyncLogWriteToss bool // for testing only
}

Expand Down Expand Up @@ -392,6 +398,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
// to be durable and a stable portion for entries that are known to be
// durable.
s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */)
_ = s.TermCache.ScanAppend(m.Entries, true /* truncate */)

return state, nil
}
Expand Down Expand Up @@ -608,15 +615,32 @@ func LoadTerm(
rangeID roachpb.RangeID,
eCache *raftentry.Cache,
index kvpb.RaftIndex,
tc *raft.TermCache,
metrics Metrics,
) (kvpb.RaftTerm, error) {
metrics.TermCacheAccesses.Inc(1)
term, err := tc.Term(uint64(index))
if err == nil {
// found
metrics.TermCacheHits.Inc(1)
return kvpb.RaftTerm(term), nil
}

start := crtime.NowMono()
eCache.Metric.LoadTermAccesses.Inc(1)
entry, found := eCache.Get(rangeID, index)
if found {
eCache.Metric.LoadTermHits.Inc(1)
return kvpb.RaftTerm(entry.Term), nil
}

reader := eng.NewReader(storage.StandardDurability)
defer reader.Close()

defer func() {
metrics.LoadTermFromStorageLatency.RecordValue(start.Elapsed().Nanoseconds())
}()

if err := raftlog.Visit(ctx, reader, rangeID, index, index+1, func(ent raftpb.Entry) error {
if found {
return errors.Errorf("found more than one entry in [%d,%d)", index, index+1)
Expand All @@ -643,6 +667,7 @@ func LoadTerm(
}
if !typ.IsSideloaded() {
eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */)
_ = tc.ScanAppend([]raftpb.Entry{entry}, false /* truncate */)
}
return kvpb.RaftTerm(entry.Term), nil
}
Expand Down Expand Up @@ -686,6 +711,7 @@ func LoadEntries(
lo, hi kvpb.RaftIndex,
maxBytes uint64,
account *BytesAccount,
tc *raft.TermCache,
) (_ []raftpb.Entry, _cachedSize uint64, _loadedSize uint64, _ error) {
if lo > hi {
return nil, 0, 0, errors.Errorf("lo:%d is greater than hi:%d", lo, hi)
Expand Down Expand Up @@ -761,6 +787,7 @@ func LoadEntries(
return nil, 0, 0, err
}
eCache.Add(rangeID, ents, false /* truncate */)
_ = tc.ScanAppend(ents, false /* truncate */)

// Did the correct number of results come back? If so, we're all good.
// Did we hit the size limits? If so, return what we have.
Expand Down
37 changes: 36 additions & 1 deletion pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ var (
Unit: metric.Unit_COUNT,
}

//Ingest metrics
// Ingest metrics
metaIngestCount = metric.Metadata{
Name: "storage.ingest.count",
Help: "Number of successful ingestions performed",
Expand Down Expand Up @@ -1452,6 +1452,12 @@ being received, or at performance issues at the storage layer.
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaLoadTermFromStorageLatency = metric.Metadata{
Name: "raft.process.loadTerm.latency",
Help: `Latency histogram for loading the term of a Raft log entries`,
Measurement: "Latency",
Unit: metric.Unit_NANOSECONDS,
}
metaRaftCommandCommitLatency = metric.Metadata{
Name: "raft.process.commandcommit.latency",
Help: `Latency histogram for applying a batch of Raft commands to the state machine.
Expand Down Expand Up @@ -1588,6 +1594,20 @@ cache will already have moved on to newer entries.
Unit: metric.Unit_COUNT,
}

// Raft term cache metrics.
metaTermCacheAccesses = metric.Metadata{
Name: "raft.process.termCache.accesses",
Help: "Number of raft term cache accesses",
Measurement: "Access count",
Unit: metric.Unit_COUNT,
}
metaTermCacheHits = metric.Metadata{
Name: "raft.process.termCache.hits",
Help: "Number of raft term cache hits",
Measurement: "Cache Hit Count",
Unit: metric.Unit_COUNT,
}

// Raft message metrics.
metaRaftRcvdProp = metric.Metadata{
Name: "raft.rcvd.prop",
Expand Down Expand Up @@ -2840,6 +2860,7 @@ type StoreMetrics struct {
RaftCommandsPending *metric.Gauge
RaftCommandsApplied *metric.Counter
RaftLogCommitLatency metric.IHistogram
LoadTermFromStorageLatency metric.IHistogram
RaftCommandCommitLatency metric.IHistogram
RaftHandleReadyLatency metric.IHistogram
RaftApplyCommittedLatency metric.IHistogram
Expand All @@ -2849,6 +2870,10 @@ type StoreMetrics struct {
RaftStorageReadBytes *metric.Counter
RaftStorageError *metric.Counter

// Raft term cache metrics.
TermCacheAccesses *metric.Counter
TermCacheHits *metric.Counter

// Raft message metrics.
//
// An array for conveniently finding the appropriate metric.
Expand Down Expand Up @@ -3569,6 +3594,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
Duration: histogramWindow,
BucketConfig: metric.IOLatencyBuckets,
}),
LoadTermFromStorageLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaLoadTermFromStorageLatency,
Duration: histogramWindow,
BucketConfig: metric.IOLatencyBuckets,
}),
RaftCommandCommitLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaRaftCommandCommitLatency,
Expand Down Expand Up @@ -3603,6 +3634,10 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RaftStorageReadBytes: metric.NewCounter(metaRaftStorageReadBytes),
RaftStorageError: metric.NewCounter(metaRaftStorageError),

// Raft term cache metrics.
TermCacheAccesses: metric.NewCounter(metaTermCacheAccesses),
TermCacheHits: metric.NewCounter(metaTermCacheHits),

// Raft message metrics.
RaftRcvdMessages: [maxRaftMsgType + 1]*metric.Counter{
raftpb.MsgProp: metric.NewCounter(metaRaftRcvdProp),
Expand Down
24 changes: 12 additions & 12 deletions pkg/kv/kvserver/raftentry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// values tailored to the access patterns of the storage package.
// Cache is safe for concurrent access.
type Cache struct {
metrics Metrics
Metric Metrics
maxBytes int32

// accessed with atomics
Expand Down Expand Up @@ -113,14 +113,14 @@ func NewCache(maxBytes uint64) *Cache {
}
return &Cache{
maxBytes: int32(maxBytes),
metrics: makeMetrics(),
Metric: makeMetrics(),
parts: map[roachpb.RangeID]*partition{},
}
}

// Metrics returns a struct which contains metrics for the raft entry cache.
// Metrics returns a struct which contains Metric for the raft entry cache.
func (c *Cache) Metrics() Metrics {
return c.metrics
return c.Metric
}

// Drop drops all cached entries associated with the specified range.
Expand Down Expand Up @@ -211,7 +211,7 @@ func (c *Cache) Clear(id roachpb.RangeID, hi kvpb.RaftIndex) {
// Get returns the entry for the specified index and true for the second return
// value. If the index is not present in the cache, false is returned.
func (c *Cache) Get(id roachpb.RangeID, idx kvpb.RaftIndex) (e raftpb.Entry, ok bool) {
c.metrics.Accesses.Inc(1)
c.Metric.Accesses.Inc(1)
c.mu.Lock()
p := c.getPartLocked(id, false /* create */, true /* recordUse */)
c.mu.Unlock()
Expand All @@ -222,8 +222,8 @@ func (c *Cache) Get(id roachpb.RangeID, idx kvpb.RaftIndex) (e raftpb.Entry, ok
defer p.mu.RUnlock()
e, ok = p.get(idx)
if ok {
c.metrics.Hits.Inc(1)
c.metrics.ReadBytes.Inc(int64(e.Size()))
c.Metric.Hits.Inc(1)
c.Metric.ReadBytes.Inc(int64(e.Size()))
}
return e, ok
}
Expand All @@ -237,7 +237,7 @@ func (c *Cache) Get(id roachpb.RangeID, idx kvpb.RaftIndex) (e raftpb.Entry, ok
func (c *Cache) Scan(
ents []raftpb.Entry, id roachpb.RangeID, lo, hi kvpb.RaftIndex, maxBytes uint64,
) (_ []raftpb.Entry, bytes uint64, nextIdx kvpb.RaftIndex, exceededMaxBytes bool) {
c.metrics.Accesses.Inc(1)
c.Metric.Accesses.Inc(1)
c.mu.Lock()
p := c.getPartLocked(id, false /* create */, true /* recordUse */)
c.mu.Unlock()
Expand All @@ -251,9 +251,9 @@ func (c *Cache) Scan(
// Track all bytes that are returned to caller, but only consider an access a
// "hit" if it returns all requested entries or stops short because of a
// maximum bytes limit.
c.metrics.ReadBytes.Inc(int64(bytes))
c.Metric.ReadBytes.Inc(int64(bytes))
if nextIdx == hi || exceededMaxBytes {
c.metrics.Hits.Inc(1)
c.Metric.Hits.Inc(1)
}
return ents, bytes, nextIdx, exceededMaxBytes
}
Expand Down Expand Up @@ -332,8 +332,8 @@ func (c *Cache) addEntries(toAdd int32) int32 {
}

func (c *Cache) updateGauges(bytes, entries int32) {
c.metrics.Bytes.Update(int64(bytes))
c.metrics.Size.Update(int64(entries))
c.Metric.Bytes.Update(int64(bytes))
c.Metric.Size.Update(int64(entries))
}

var initialSize = newCacheSize(partitionSize, 0)
Expand Down
36 changes: 26 additions & 10 deletions pkg/kv/kvserver/raftentry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ var (
Measurement: "Hits",
Unit: metric.Unit_COUNT,
}
metaLoadTermAccesses = metric.Metadata{
Name: "raft.loadterm.accesses",
Help: "Number of cache lookups in the Raft entry cache for entry term",
Measurement: "LoadTermAccesses",
Unit: metric.Unit_COUNT,
}
metaLoadTermHits = metric.Metadata{
Name: "raft.loadterm.hits",
Help: "Number of successful cache lookups in the Raft entry cache for entry term",
Measurement: "LoadTermHits",
Unit: metric.Unit_COUNT,
}
metaEntryCacheReadBytes = metric.Metadata{
Name: "raft.entrycache.read_bytes",
Help: "Counter of bytes in entries returned from the Raft entry cache",
Expand All @@ -44,19 +56,23 @@ var (
type Metrics struct {
// NB: the values in the gauges are updated asynchronously and may hold stale
// values in the face of concurrent updates.
Size *metric.Gauge
Bytes *metric.Gauge
Accesses *metric.Counter
Hits *metric.Counter
ReadBytes *metric.Counter
Size *metric.Gauge
Bytes *metric.Gauge
Accesses *metric.Counter
Hits *metric.Counter
LoadTermAccesses *metric.Counter
LoadTermHits *metric.Counter
ReadBytes *metric.Counter
}

func makeMetrics() Metrics {
return Metrics{
Size: metric.NewGauge(metaEntryCacheSize),
Bytes: metric.NewGauge(metaEntryCacheBytes),
Accesses: metric.NewCounter(metaEntryCacheAccesses),
Hits: metric.NewCounter(metaEntryCacheHits),
ReadBytes: metric.NewCounter(metaEntryCacheReadBytes),
Size: metric.NewGauge(metaEntryCacheSize),
Bytes: metric.NewGauge(metaEntryCacheBytes),
Accesses: metric.NewCounter(metaEntryCacheAccesses),
Hits: metric.NewCounter(metaEntryCacheHits),
LoadTermAccesses: metric.NewCounter(metaLoadTermAccesses),
LoadTermHits: metric.NewCounter(metaLoadTermHits),
ReadBytes: metric.NewCounter(metaEntryCacheReadBytes),
}
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
replicaChangeTxnUpdateDescOpName = "change-replica-update-desc"

defaultReplicaRaftMuWarnThreshold = 500 * time.Millisecond

TermCacheSize = 6
)

// StrictGCEnforcement controls whether requests are rejected based on the GC
Expand Down Expand Up @@ -495,6 +497,10 @@ type Replica struct {
// log was checked for truncation or at the time of the last Raft log
// truncation.
raftLogLastCheckSize int64

// should be in shMu, because it is accessed in raft operation
// and mutated in replica operations
termCache *raft.TermCache
}

mu struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ func (r *Replica) handleTruncatedStateResult(
// Clear any entries in the Raft log entry cache for this range up
// to and including the most recently truncated index.
r.store.raftEntryCache.Clear(r.RangeID, t.Index+1)
_ = r.raftMu.logStorage.TermCache.ClearTo(uint64(t.Index + 1))

// Truncate the sideloaded storage. This is safe only if the new truncated
// state is durably stored on disk, i.e. synced.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,12 @@ func newUninitializedReplicaWithoutRaftGroup(
SyncWaiter: store.syncWaiters[int(rangeID)%len(store.syncWaiters)],
EntryCache: store.raftEntryCache,
Settings: store.cfg.Settings,
TermCache: raft.NewTermCache(TermCacheSize),
Metrics: logstore.Metrics{
RaftLogCommitLatency: store.metrics.RaftLogCommitLatency,
RaftLogCommitLatency: store.metrics.RaftLogCommitLatency,
LoadTermFromStorageLatency: store.metrics.LoadTermFromStorageLatency,
TermCacheAccesses: store.metrics.TermCacheAccesses,
TermCacheHits: store.metrics.TermCacheHits,
},
DisableSyncLogWriteToss: buildutil.CrdbTestBuild &&
store.TestingKnobs().DisableSyncLogWriteToss,
Expand Down
Loading
Loading