From 193bce17c1c3e47099dc8a8605ecb3f1a1983078 Mon Sep 17 00:00:00 2001 From: tidwall Date: Wed, 26 Jun 2024 08:30:50 -0700 Subject: [PATCH] Fix followers not receiving channel messages This commit fixes a bug where the leader was not propagating to the followers. See #468 --- internal/server/follow.go | 10 +++-- internal/server/pubqueue.go | 81 +++++++++++++++++++++++++++++++++++++ internal/server/pubsub.go | 4 ++ internal/server/server.go | 4 ++ 4 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 internal/server/pubqueue.go diff --git a/internal/server/follow.go b/internal/server/follow.go index af785eef..0534a30b 100644 --- a/internal/server/follow.go +++ b/internal/server/follow.go @@ -169,15 +169,19 @@ func (s *Server) followHandleCommand(args []string, followc int, w io.Writer) (i return s.aofsz, errNoLongerFollowing } msg := &Message{Args: args} - _, d, err := s.command(msg, nil) if err != nil { if commandErrIsFatal(err) { return s.aofsz, err } } - if err := s.writeAOF(args, &d); err != nil { - return s.aofsz, err + switch msg.Command() { + case "publish": + // Avoid writing these commands to the AOF + default: + if err := s.writeAOF(args, &d); err != nil { + return s.aofsz, err + } } if len(s.aofbuf) > 10240 { s.flushAOF(false) diff --git a/internal/server/pubqueue.go b/internal/server/pubqueue.go new file mode 100644 index 00000000..d859de2b --- /dev/null +++ b/internal/server/pubqueue.go @@ -0,0 +1,81 @@ +package server + +import ( + "net" + "sync" + + "github.com/tidwall/redcon" +) + +type pubQueue struct { + cond *sync.Cond + entries []pubQueueEntry // follower publish queue + closed bool +} + +type pubQueueEntry struct { + channel string + messages []string +} + +func (s *Server) startPublishQueue(wg *sync.WaitGroup) { + defer wg.Done() + var buf []byte + var conns []net.Conn + s.pubq.cond = sync.NewCond(&sync.Mutex{}) + s.pubq.cond.L.Lock() + for { + for len(s.pubq.entries) > 0 { + entries := s.pubq.entries + s.pubq.entries = nil + s.pubq.cond.L.Unlock() + // Get follower connections + s.mu.RLock() + for conn := range s.aofconnM { + conns = append(conns, conn) + } + s.mu.RUnlock() + // Buffer the PUBLISH command pipeline + buf = buf[:0] + for _, entry := range entries { + for _, message := range entry.messages { + buf = redcon.AppendArray(buf, 3) + buf = redcon.AppendBulkString(buf, "PUBLISH") + buf = redcon.AppendBulkString(buf, entry.channel) + buf = redcon.AppendBulkString(buf, message) + } + } + // Publish to followers + for i, conn := range conns { + conn.Write(buf) + conns[i] = nil + } + conns = conns[:0] + s.pubq.cond.L.Lock() + } + if s.pubq.closed { + break + } + s.pubq.cond.Wait() + } + s.pubq.cond.L.Unlock() +} + +func (s *Server) stopPublishQueue() { + s.pubq.cond.L.Lock() + s.pubq.closed = true + s.pubq.cond.Broadcast() + s.pubq.cond.L.Unlock() +} + +func (s *Server) sendPublishQueue(channel string, message ...string) { + s.pubq.cond.L.Lock() + if !s.pubq.closed { + s.pubq.entries = append(s.pubq.entries, pubQueueEntry{ + channel: channel, + messages: message, + }) + } + s.pubq.cond.Broadcast() + s.pubq.cond.L.Unlock() +} diff --git a/internal/server/pubsub.go b/internal/server/pubsub.go index f6faa9a5..911ab3f2 100644 --- a/internal/server/pubsub.go +++ b/internal/server/pubsub.go @@ -66,6 +66,7 @@ func (s *Server) Publish(channel string, message ...string) int { } s.pubsub.mu.RUnlock() + // broadcast to clients for _, msg := range msgs { msg.target.cond.L.Lock() msg.target.msgs = append(msg.target.msgs, msg) @@ -73,6 +74,9 @@ func (s *Server) Publish(channel string, message ...string) int { msg.target.cond.L.Unlock() } + // Broadcast to followers + s.sendPublishQueue(channel, message...) + return len(msgs) } diff --git a/internal/server/server.go b/internal/server/server.go index c224fd7d..0ab37be9 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -140,6 +140,7 @@ type Server struct { fcup bool // follow caught up fcuponce bool // follow caught up once aofconnM map[net.Conn]io.Closer + pubq pubQueue // lua scripts luascripts *lScriptMap @@ -422,9 +423,12 @@ func Serve(opts Options) error { go s.backgroundExpiring(&bgwg) bgwg.Add(1) go s.backgroundSyncAOF(&bgwg) + bgwg.Add(1) + go s.startPublishQueue(&bgwg) defer func() { log.Debug("Stopping background routines") // Stop background routines + s.stopPublishQueue() s.followc.Add(1) // this will force any follow communication to die s.stopServer.Store(true) if mln != nil {