Skip to content

Commit

Permalink
Alerting: Add mutex to Redis HA subs (grafana#89870)
Browse files Browse the repository at this point in the history
  • Loading branch information
cinkster authored Aug 22, 2024
1 parent 81935a3 commit 922babb
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions pkg/services/ngalert/notifier/redis_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type redisPeer struct {
states map[string]alertingCluster.State
subs map[string]*redis.PubSub
statesMtx sync.RWMutex
subsMtx sync.RWMutex

readyc chan struct{}
shutdownc chan struct{}
Expand Down Expand Up @@ -225,8 +226,10 @@ func newRedisPeer(cfg redisConfig, logger log.Logger, reg prometheus.Registerer,
p.nodePingDuration = nodePingDuration
p.nodePingFailures = nodePingFailures

p.subsMtx.Lock()
p.subs[fullStateChannel] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannel))
p.subs[fullStateChannelReq] = p.redis.Subscribe(context.Background(), p.withPrefix(fullStateChannelReq))
p.subsMtx.Unlock()

go p.heartbeatLoop()
go p.membersSyncLoop()
Expand Down Expand Up @@ -461,7 +464,9 @@ func (p *redisPeer) AddState(key string, state alertingCluster.State, _ promethe
// As we also want to get the state from other nodes, we subscribe to the key.
sub := p.redis.Subscribe(context.Background(), p.withPrefix(key))
go p.receiveLoop(sub)
p.subsMtx.Lock()
p.subs[key] = sub
p.subsMtx.Unlock()
return newRedisChannel(p, key, p.withPrefix(key), update)
}

Expand Down Expand Up @@ -507,17 +512,29 @@ func (p *redisPeer) fullStateReqReceiveLoop() {
select {
case <-p.shutdownc:
return
case data := <-p.subs[fullStateChannelReq].Channel():
// The payload of a full state request is the name of the peer that is
// requesting the full state. In case we received our own request, we
// can just ignore it. Redis pub/sub fanouts to all clients, regardless
// if a client was also the publisher.
if data.Payload == p.name {
default:
p.subsMtx.RLock()
sub, ok := p.subs[fullStateChannelReq]
p.subsMtx.RUnlock()

if !ok {
time.Sleep(waitForMsgIdle)
continue
}
p.fullStateSyncPublish()
default:
time.Sleep(waitForMsgIdle)

select {
case data := <-sub.Channel():
// The payload of a full state request is the name of the peer that is
// requesting the full state. In case we received our own request, we
// can just ignore it. Redis pub/sub fanouts to all clients, regardless
// if a client was also the publisher.
if data.Payload == p.name {
continue
}
p.fullStateSyncPublish()
default:
time.Sleep(waitForMsgIdle)
}
}
}
}
Expand All @@ -527,10 +544,22 @@ func (p *redisPeer) fullStateSyncReceiveLoop() {
select {
case <-p.shutdownc:
return
case data := <-p.subs[fullStateChannel].Channel():
p.mergeFullState([]byte(data.Payload))
default:
time.Sleep(waitForMsgIdle)
p.subsMtx.RLock()
sub, ok := p.subs[fullStateChannel]
p.subsMtx.RUnlock()

if !ok {
time.Sleep(waitForMsgIdle)
continue
}

select {
case data := <-sub.Channel():
p.mergeFullState([]byte(data.Payload))
default:
time.Sleep(waitForMsgIdle)
}
}
}
}
Expand Down

0 comments on commit 922babb

Please sign in to comment.