From 1fe0071b3f985300cfda642ad32ecdc73d166f6f Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 18 Feb 2025 18:28:48 +0100 Subject: [PATCH 1/7] Test: fix panic for non-existent RAFT node/group Signed-off-by: Maurice van Veen --- server/jetstream_cluster_1_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 34caaa4c7e..ca2eed3e50 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7655,7 +7655,11 @@ func TestJetStreamClusterRecreateConsumerFromMetaSnapshot(t *testing.T) { return err } else if o := mset.lookupConsumer("CONSUMER"); o == nil { return errors.New("consumer doesn't exist") - } else if ccrg := o.raftNode().Group(); consumerRg == _EMPTY_ { + } else if rn := o.raftNode(); rn == nil { + return errors.New("consumer raft node doesn't exist") + } else if ccrg := rn.Group(); ccrg == _EMPTY_ { + return errors.New("consumer raft group doesn't exist") + } else if consumerRg == _EMPTY_ { consumerRg = ccrg } else if consumerRg != ccrg { return errors.New("consumer raft groups don't match") From 0875f6d024622f564b8c37f5f7cff3609016cf56 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 18 Feb 2025 13:50:18 -0700 Subject: [PATCH 2/7] [ADDED] Monitoring: Include subscriptions in `Gatewayz` When checking the `gatewayz` monitoring endpoint, if the user requests accounts `?accs=1` or a specific account `?acc_name=B` then we can now include the subscription list for each account with `&subs=1` or the subscription details list with `&subs=detail`. The subscription information will be contained in each account in the `accounts` list of the `outbound_gateways` block. Resolves #6473 Signed-off-by: Ivan Kozlovic --- server/monitor.go | 57 ++++++++++++---- server/monitor_test.go | 143 +++++++++++++++++++++++++++++++++++++++++ test/ocsp_peer_test.go | 1 - 3 files changed, 188 insertions(+), 13 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 1ce349bac9..7de9703b03 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1869,6 +1869,14 @@ type GatewayzOptions struct { // AccountName will limit the list of accounts to that account name (makes Accounts implicit) AccountName string `json:"account_name"` + + // AccountSubscriptions indicates if subscriptions should be included in the results. + // Note: This is used only if `Accounts` or `AccountName` are specified. + AccountSubscriptions bool `json:"subscriptions"` + + // AccountSubscriptionsDetail indicates if subscription details should be included in the results. + // Note: This is used only if `Accounts` or `AccountName` are specified. + AccountSubscriptionsDetail bool `json:"subscriptions_detail"` } // Gatewayz represents detailed information on Gateways @@ -1891,12 +1899,14 @@ type RemoteGatewayz struct { // AccountGatewayz represents interest mode for this account type AccountGatewayz struct { - Name string `json:"name"` - InterestMode string `json:"interest_mode"` - NoInterestCount int `json:"no_interest_count,omitempty"` - InterestOnlyThreshold int `json:"interest_only_threshold,omitempty"` - TotalSubscriptions int `json:"num_subs,omitempty"` - NumQueueSubscriptions int `json:"num_queue_subs,omitempty"` + Name string `json:"name"` + InterestMode string `json:"interest_mode"` + NoInterestCount int `json:"no_interest_count,omitempty"` + InterestOnlyThreshold int `json:"interest_only_threshold,omitempty"` + TotalSubscriptions int `json:"num_subs,omitempty"` + NumQueueSubscriptions int `json:"num_queue_subs,omitempty"` + Subs []string `json:"subscriptions_list,omitempty"` + SubsDetail []SubDetail `json:"subscriptions_list_detail,omitempty"` } // Gatewayz returns a Gatewayz struct containing information about gateways. @@ -2022,14 +2032,14 @@ func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*Accou if !ok { return nil } - a := createAccountOutboundGatewayz(accName, ei) + a := createAccountOutboundGatewayz(opts, accName, ei) return []*AccountGatewayz{a} } accs := make([]*AccountGatewayz, 0, 4) gw.outsim.Range(func(k, v any) bool { name := k.(string) - a := createAccountOutboundGatewayz(name, v) + a := createAccountOutboundGatewayz(opts, name, v) accs = append(accs, a) return true }) @@ -2037,7 +2047,7 @@ func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*Accou } // Returns an AccountGatewayz for this gateway outbound connection -func createAccountOutboundGatewayz(name string, ei any) *AccountGatewayz { +func createAccountOutboundGatewayz(opts *GatewayzOptions, name string, ei any) *AccountGatewayz { a := &AccountGatewayz{ Name: name, InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch, @@ -2049,6 +2059,23 @@ func createAccountOutboundGatewayz(name string, ei any) *AccountGatewayz { a.NoInterestCount = len(e.ni) a.NumQueueSubscriptions = e.qsubs a.TotalSubscriptions = int(e.sl.Count()) + if opts.AccountSubscriptions || opts.AccountSubscriptionsDetail { + var subsa [4096]*subscription + subs := subsa[:0] + e.sl.All(&subs) + if opts.AccountSubscriptions { + a.Subs = make([]string, 0, len(subs)) + } else { + a.SubsDetail = make([]SubDetail, 0, len(subs)) + } + for _, sub := range subs { + if opts.AccountSubscriptions { + a.Subs = append(a.Subs, string(sub.subject)) + } else { + a.SubsDetail = append(a.SubsDetail, newClientSubDetail(sub)) + } + } + } e.RUnlock() } else { a.InterestMode = Optimistic.String() @@ -2140,6 +2167,10 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) { s.httpReqStats[GatewayzPath]++ s.mu.Unlock() + subs, subsDet, err := decodeSubs(w, r) + if err != nil { + return + } accs, err := decodeBool(w, r, "accs") if err != nil { return @@ -2151,9 +2182,11 @@ func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) { } opts := &GatewayzOptions{ - Name: gwName, - Accounts: accs, - AccountName: accName, + Name: gwName, + Accounts: accs, + AccountName: accName, + AccountSubscriptions: subs, + AccountSubscriptionsDetail: subsDet, } gw, err := s.Gatewayz(opts) if err != nil { diff --git a/server/monitor_test.go b/server/monitor_test.go index 5a569e3c6a..97183e83bb 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -3689,6 +3689,149 @@ func TestMonitorGatewayzAccounts(t *testing.T) { }) } +func TestMonitorGatewayzWithSubs(t *testing.T) { + resetPreviousHTTPConnections() + + ob := testDefaultOptionsForGateway("B") + aA := NewAccount("A") + aB := NewAccount("B") + ob.Accounts = append(ob.Accounts, aA, aB) + ob.Users = append(ob.Users, + &User{Username: "a", Password: "a", Account: aA}, + &User{Username: "b", Password: "b", Account: aB}) + sb := runGatewayServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + oa.HTTPHost = "127.0.0.1" + oa.HTTPPort = MONITOR_PORT + aA = NewAccount("A") + aB = NewAccount("B") + oa.Accounts = append(oa.Accounts, aA, aB) + oa.Users = append(oa.Users, + &User{Username: "a", Password: "a", Account: aA}, + &User{Username: "b", Password: "b", Account: aB}) + sa := runGatewayServer(oa) + defer sa.Shutdown() + + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForInboundGateways(t, sa, 1, 2*time.Second) + + waitForOutboundGateways(t, sb, 1, 2*time.Second) + waitForInboundGateways(t, sb, 1, 2*time.Second) + + ncA := natsConnect(t, sb.ClientURL(), nats.UserInfo("a", "a")) + defer ncA.Close() + natsSubSync(t, ncA, "foo") + natsFlush(t, ncA) + + ncB := natsConnect(t, sb.ClientURL(), nats.UserInfo("b", "b")) + defer ncB.Close() + natsSubSync(t, ncB, "foo") + natsQueueSubSync(t, ncB, "bar", "baz") + natsFlush(t, ncB) + + checkGWInterestOnlyModeInterestOn(t, sa, "B", "A", "foo") + checkGWInterestOnlyModeInterestOn(t, sa, "B", "B", "foo") + checkForRegisteredQSubInterest(t, sa, "B", "B", "bar", 1, time.Second) + + for _, test := range []struct { + url string + allAccs bool + opts *GatewayzOptions + }{ + {"accs=1&subs=1", true, &GatewayzOptions{Accounts: true, AccountSubscriptions: true}}, + {"accs=1&subs=detail", true, &GatewayzOptions{Accounts: true, AccountSubscriptionsDetail: true}}, + {"acc_name=B&subs=1", false, &GatewayzOptions{AccountName: "B", AccountSubscriptions: true}}, + {"acc_name=B&subs=detail", false, &GatewayzOptions{AccountName: "B", AccountSubscriptionsDetail: true}}, + } { + t.Run(test.url, func(t *testing.T) { + gatewayzURL := fmt.Sprintf("http://127.0.0.1:%d/gatewayz?%s", sa.MonitorAddr().Port, test.url) + for pollMode := 0; pollMode < 2; pollMode++ { + gw := pollGatewayz(t, sa, pollMode, gatewayzURL, test.opts) + require_Equal(t, len(gw.OutboundGateways), 1) + ogw, ok := gw.OutboundGateways["B"] + require_True(t, ok) + require_NotNil(t, ogw) + var expected int + if test.allAccs { + expected = 3 // A + B + $G + } else { + expected = 1 // B + } + require_Len(t, len(ogw.Accounts), expected) + accs := map[string]*AccountGatewayz{} + for _, a := range ogw.Accounts { + // Do not include the global account there. + if a.Name == globalAccountName { + continue + } + accs[a.Name] = a + } + // Update the expected number of accounts if we asked for all accounts. + if test.allAccs { + expected-- + } + // The account B should always be present. + _, ok = accs["B"] + require_True(t, ok) + if expected == 2 { + _, ok = accs["A"] + require_True(t, ok) + } + // Now that we know we have the proper account(s), check the content. + for n, a := range accs { + require_NotNil(t, a) + require_Equal(t, a.Name, n) + totalSubs := 1 + var numQueueSubs int + if n == "B" { + totalSubs++ + numQueueSubs = 1 + } + require_Equal(t, a.TotalSubscriptions, totalSubs) + require_Equal(t, a.NumQueueSubscriptions, numQueueSubs) + + m := map[string]*SubDetail{} + if test.opts.AccountSubscriptions { + require_Len(t, len(a.Subs), totalSubs) + require_Len(t, len(a.SubsDetail), 0) + for _, sub := range a.Subs { + m[sub] = nil + } + } else { + require_Len(t, len(a.Subs), 0) + require_Len(t, len(a.SubsDetail), totalSubs) + for _, sub := range a.SubsDetail { + m[sub.Subject] = &sub + } + } + sd, ok := m["foo"] + require_True(t, ok) + if test.opts.AccountSubscriptionsDetail { + require_NotNil(t, sd) + require_Equal(t, sd.Queue, _EMPTY_) + } else { + require_True(t, sd == nil) + } + sd, ok = m["bar"] + if numQueueSubs == 1 { + require_True(t, ok) + if test.opts.AccountSubscriptionsDetail { + require_NotNil(t, sd) + require_Equal(t, sd.Queue, "baz") + } else { + require_True(t, sd == nil) + } + } else { + require_False(t, ok) + } + } + } + }) + } +} + func TestMonitorRouteRTT(t *testing.T) { // Do not change default PingInterval and expect RTT to still be reported diff --git a/test/ocsp_peer_test.go b/test/ocsp_peer_test.go index 1efb3f4f35..5c79de706e 100644 --- a/test/ocsp_peer_test.go +++ b/test/ocsp_peer_test.go @@ -1475,7 +1475,6 @@ func TestOCSPResponseCacheMonitor(t *testing.T) { s, _ := RunServerWithConfig(conf) defer s.Shutdown() v := monitorGetVarzHelper(t, 8222) - fmt.Println("Expect:", test.expect) var ct string if v.OCSPResponseCache != nil { ct = v.OCSPResponseCache.Type From f26aaae544a75b8fc3a0cf3bc972fcc1fbaf0ed6 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 19 Feb 2025 01:12:37 +0100 Subject: [PATCH 3/7] [FIXED] Consumer skips some messages Signed-off-by: Maurice van Veen --- server/filestore.go | 8 ++++++++ server/memstore.go | 1 + server/store_test.go | 18 ++++++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index 3952675234..60829447af 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -5308,6 +5308,7 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte if ss, ok := mb.fss.Find(stringToBytes(subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false } else { mb.fss.Insert(stringToBytes(subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } @@ -6030,6 +6031,7 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { if ss, ok := mb.fss.Find(bsubj); ok && ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false } else { mb.fss.Insert(bsubj, SimpleState{ Msgs: 1, @@ -8057,8 +8059,11 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { } if startSlot >= len(mb.cache.idx) { ss.First = ss.Last + ss.firstNeedsUpdate = false + ss.lastNeedsUpdate = false return } + endSlot := int(ss.Last - mb.cache.fseq) if endSlot < 0 { endSlot = 0 @@ -8085,6 +8090,8 @@ func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) { li := int(bi) - mb.cache.off if li >= len(mb.cache.buf) { ss.First = ss.Last + // Only need to reset ss.lastNeedsUpdate, ss.firstNeedsUpdate is already reset above. + ss.lastNeedsUpdate = false return } buf := mb.cache.buf[li:] @@ -8208,6 +8215,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error { if ss, ok := mb.fss.Find(stringToBytes(sm.subj)); ok && ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false } else { mb.fss.Insert(stringToBytes(sm.subj), SimpleState{Msgs: 1, First: seq, Last: seq}) } diff --git a/server/memstore.go b/server/memstore.go index 979bb55da2..a72e1a4249 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -196,6 +196,7 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int if ss != nil { ss.Msgs++ ss.Last = seq + ss.lastNeedsUpdate = false // Check per subject limits. if ms.maxp > 0 && ss.Msgs > uint64(ms.maxp) { ms.enforcePerSubjectLimit(subj, ss) diff --git a/server/store_test.go b/server/store_test.go index fa5be7befe..9933f098c4 100644 --- a/server/store_test.go +++ b/server/store_test.go @@ -255,6 +255,24 @@ func TestStoreSubjectStateConsistency(t *testing.T) { expectFirstSeq(6) require_Equal(t, ss.Last, 6) expectLastSeq(6) + + // We store a new message for ss.Last and remove it after, which marks it to be recalculated. + _, _, err = fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + removed, err = fs.RemoveMsg(8) + require_NoError(t, err) + require_True(t, removed) + // This will be the new ss.Last message, so reset ss.lastNeedsUpdate + _, _, err = fs.StoreMsg("foo", nil, nil) + require_NoError(t, err) + + // ss.First should remain the same, but ss.Last should equal the last message. + ss = getSubjectState() + require_Equal(t, ss.Msgs, 2) + require_Equal(t, ss.First, 6) + expectFirstSeq(6) + require_Equal(t, ss.Last, 9) + expectLastSeq(9) }, ) } From 9ca903da695946d2c30925bfc5141652b05b0e9b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 15 May 2024 13:58:35 -0600 Subject: [PATCH 4/7] [FIXED] LeafNode: connection may fail on slow link Added the leafnode remote configuration parameter `first_info_timeout` which is the amount of time that a server creating a leafnode connection will wait for the initial INFO from the remote server. Resolves #5417 Signed-off-by: Ivan Kozlovic --- server/config_check_test.go | 17 +++++ server/leafnode.go | 7 +- server/leafnode_test.go | 144 ++++++++++++++++++++++++++++++++++++ server/opts.go | 11 +++ 4 files changed, 177 insertions(+), 2 deletions(-) diff --git a/server/config_check_test.go b/server/config_check_test.go index a878caf02d..cc3086909e 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -1817,6 +1817,23 @@ func TestConfigCheck(t *testing.T) { errorLine: 9, errorPos: 9, }, + { + name: "invalid duration for remote leafnode first info timeout", + config: ` + leafnodes { + port: -1 + remotes [ + { + url: "nats://127.0.0.1:123" + first_info_timeout: abc + } + ] + } + `, + err: fmt.Errorf("error parsing first_info_timeout: time: invalid duration %q", "abc"), + errorLine: 7, + errorPos: 8, + }, { name: "show warnings on empty configs without values", config: ``, diff --git a/server/leafnode.go b/server/leafnode.go index 68db0c8c55..1ec4cc1849 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -997,6 +997,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name) var tlsFirst bool + var infoTimeout time.Duration if remote != nil { solicited = true remote.Lock() @@ -1006,6 +1007,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf c.leaf.isSpoke = true } tlsFirst = remote.TLSHandshakeFirst + infoTimeout = remote.FirstInfoTimeout remote.Unlock() c.acc = acc } else { @@ -1063,7 +1065,7 @@ func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCf } } // We need to wait for the info, but not for too long. - c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT)) + c.nc.SetReadDeadline(time.Now().Add(infoTimeout)) } // We will process the INFO from the readloop and finish by @@ -2897,6 +2899,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot compress := remote.Websocket.Compression // By default the server will mask outbound frames, but it can be disabled with this option. noMasking := remote.Websocket.NoMasking + infoTimeout := remote.FirstInfoTimeout remote.RUnlock() // Will do the client-side TLS handshake if needed. tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts) @@ -2949,6 +2952,7 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot if noMasking { req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue) } + c.nc.SetDeadline(time.Now().Add(infoTimeout)) if err := req.Write(c.nc); err != nil { return nil, WriteError, err } @@ -2956,7 +2960,6 @@ func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remot var resp *http.Response br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE) - c.nc.SetReadDeadline(time.Now().Add(DEFAULT_LEAFNODE_INFO_WAIT)) resp, err = http.ReadResponse(br, req) if err == nil && (resp.StatusCode != 101 || diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 74854e1fde..2f426cf858 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -9614,3 +9614,147 @@ func TestLeafNodePermissionWithLiteralSubjectAndQueueInterest(t *testing.T) { require_NoError(t, err) require_Equal(t, "OK", string(resp.Data)) } + +func TestLeafNodeConnectionSucceedsEvenWithDelayedFirstINFO(t *testing.T) { + for _, test := range []struct { + name string + websocket bool + }{ + {"regular", false}, + {"websocket", true}, + } { + t.Run(test.name, func(t *testing.T) { + ob := DefaultOptions() + ob.ServerName = "HUB" + ob.LeafNode.Host = "127.0.0.1" + ob.LeafNode.Port = -1 + ob.LeafNode.AuthTimeout = 10 + if test.websocket { + ob.Websocket.Host = "127.0.0.1" + ob.Websocket.Port = -1 + ob.Websocket.HandshakeTimeout = 10 * time.Second + ob.Websocket.AuthTimeout = 10 + ob.Websocket.NoTLS = true + } + sb := RunServer(ob) + defer sb.Shutdown() + + var port int + var scheme string + if test.websocket { + port = ob.Websocket.Port + scheme = wsSchemePrefix + } else { + port = ob.LeafNode.Port + scheme = "nats" + } + + urlStr := fmt.Sprintf("%s://127.0.0.1:%d", scheme, port) + proxy := createNetProxy(1100*time.Millisecond, 1024*1024*1024, 1024*1024*1024, urlStr, true) + defer proxy.stop() + proxyURL := proxy.clientURL() + _, proxyPort, err := net.SplitHostPort(proxyURL[len(scheme)+3:]) + require_NoError(t, err) + + lnBURL, err := url.Parse(fmt.Sprintf("%s://127.0.0.1:%s", scheme, proxyPort)) + require_NoError(t, err) + + oa := DefaultOptions() + oa.ServerName = "SPOKE" + oa.Cluster.Name = "xyz" + remote := &RemoteLeafOpts{ + URLs: []*url.URL{lnBURL}, + FirstInfoTimeout: 3 * time.Second, + } + oa.LeafNode.Remotes = []*RemoteLeafOpts{remote} + sa := RunServer(oa) + defer sa.Shutdown() + + checkLeafNodeConnected(t, sa) + }) + } +} + +type captureLeafConnClosed struct { + DummyLogger + ch chan struct{} +} + +func (l *captureLeafConnClosed) Noticef(format string, v ...any) { + msg := fmt.Sprintf(format, v...) + if strings.Contains(msg, "Leafnode connection closed: Read Error") { + select { + case l.ch <- struct{}{}: + default: + } + } +} + +func TestLeafNodeDetectsStaleConnectionIfNoInfo(t *testing.T) { + for _, test := range []struct { + name string + websocket bool + }{ + {"regular", false}, + {"websocket", true}, + } { + t.Run(test.name, func(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + require_NoError(t, err) + defer l.Close() + + ch := make(chan struct{}) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + c, err := l.Accept() + if err != nil { + return + } + defer c.Close() + <-ch + }() + + var scheme string + if test.websocket { + scheme = wsSchemePrefix + } else { + scheme = "nats" + } + urlStr := fmt.Sprintf("%s://%s", scheme, l.Addr()) + lnBURL, err := url.Parse(urlStr) + require_NoError(t, err) + + oa := DefaultOptions() + oa.ServerName = "SPOKE" + oa.Cluster.Name = "xyz" + remote := &RemoteLeafOpts{ + URLs: []*url.URL{lnBURL}, + FirstInfoTimeout: 250 * time.Millisecond, + } + oa.LeafNode.Remotes = []*RemoteLeafOpts{remote} + oa.DisableShortFirstPing = false + oa.NoLog = false + sa, err := NewServer(oa) + require_NoError(t, err) + defer sa.Shutdown() + + log := &captureLeafConnClosed{ch: make(chan struct{}, 1)} + sa.SetLogger(log, false, false) + sa.Start() + + select { + case <-log.ch: + // OK + case <-time.After(750 * time.Millisecond): + t.Fatalf("Connection was not closed") + } + + sa.Shutdown() + close(ch) + wg.Wait() + sa.WaitForShutdown() + }) + } +} diff --git a/server/opts.go b/server/opts.go index 3cbe5599a7..abd6d5774b 100644 --- a/server/opts.go +++ b/server/opts.go @@ -205,6 +205,11 @@ type RemoteLeafOpts struct { DenyImports []string `json:"-"` DenyExports []string `json:"-"` + // FirstInfoTimeout is the amount of time the server will wait for the + // initial INFO protocol from the remote server before closing the + // connection. + FirstInfoTimeout time.Duration `json:"-"` + // Compression options for this remote. Each remote could have a different // setting and also be different from the LeafNode options. Compression CompressionOpts `json:"-"` @@ -2612,6 +2617,8 @@ func parseRemoteLeafNodes(v any, errors *[]error, warnings *[]error) ([]*RemoteL *errors = append(*errors, err) continue } + case "first_info_timeout": + remote.FirstInfoTimeout = parseDuration(k, tk, v, errors, warnings) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ @@ -5198,6 +5205,10 @@ func setBaselineOptions(opts *Options) { c.Mode = CompressionS2Auto } } + // Set default first info timeout value if not set. + if r.FirstInfoTimeout <= 0 { + r.FirstInfoTimeout = DEFAULT_LEAFNODE_INFO_WAIT + } } } From 1a50fe83ebb8cbe6e71f9b0aefbab7655ab04cd7 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 21 Feb 2025 14:53:18 +0000 Subject: [PATCH 5/7] Update compress dependency, coveralls workflow Signed-off-by: Neil Twigg --- .github/workflows/cov.yaml | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/cov.yaml b/.github/workflows/cov.yaml index 137e88e341..ec6e02370d 100644 --- a/.github/workflows/cov.yaml +++ b/.github/workflows/cov.yaml @@ -42,8 +42,8 @@ jobs: - name: Coveralls # Use commit hash here to avoid a re-tagging attack, as this is a third-party action - # Commit 3dfc5567390f6fa9267c0ee9c251e4c8c3f18949 = tag v2 - uses: coverallsapp/github-action@643bc377ffa44ace6394b2b5d0d3950076de9f63 + # Commit 648a8eb78e6d50909eff900e4ec85cab4524a45b = tag v2.3.6 + uses: coverallsapp/github-action@648a8eb78e6d50909eff900e4ec85cab4524a45b with: github-token: ${{ secrets.github_token }} file: src/github.com/nats-io/nats-server/coverage.lcov diff --git a/go.mod b/go.mod index 74ef8b21d3..d89f801365 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22.0 toolchain go1.22.8 require ( - github.com/klauspost/compress v1.17.11 + github.com/klauspost/compress v1.18.0 github.com/minio/highwayhash v1.0.3 github.com/nats-io/jwt/v2 v2.7.3 github.com/nats-io/nats.go v1.39.0 diff --git a/go.sum b/go.sum index bcf80ebb5d..67bbd5ac39 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= From ec536aeffb7d1cdf47a11f7e4c5ade52b73da0f3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 19 Feb 2025 20:45:51 -0500 Subject: [PATCH 6/7] When we fail to deliver a message through a service import respond with no responders. Signed-off-by: Derek Collison --- server/accounts.go | 2 +- server/accounts_test.go | 30 +++++++++++++++++++++++++++++- server/client.go | 24 +++++++++++++++++++----- server/parser.go | 30 ++++++++++++++++-------------- 4 files changed, 65 insertions(+), 21 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 3152d3c55c..a821b9d8e5 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -2026,7 +2026,7 @@ func (a *Account) addServiceImportSub(si *serviceImport) error { a.mu.Unlock() cb := func(sub *subscription, c *client, acc *Account, subject, reply string, msg []byte) { - c.processServiceImport(si, acc, msg) + c.pa.delivered = c.processServiceImport(si, acc, msg) } sub, err := c.processSubEx([]byte(subject), nil, []byte(sid), cb, true, true, false) if err != nil { diff --git a/server/accounts_test.go b/server/accounts_test.go index 06c4de2124..ad39f871ba 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -1,4 +1,4 @@ -// Copyright 2018-2024 The NATS Authors +// Copyright 2018-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -3670,3 +3670,31 @@ func TestAccountServiceAndStreamExportDoubleDelivery(t *testing.T) { time.Sleep(200 * time.Millisecond) require_Equal(t, msgs.Load(), 1) } + +func TestAccountServiceImportNoResponders(t *testing.T) { + // Setup NATS server. + cf := createConfFile(t, []byte(` + port: -1 + accounts: { + accExp: { + users: [{user: accExp, password: accExp}] + exports: [{service: "foo"}] + } + accImp: { + users: [{user: accImp, password: accImp}] + imports: [{service: {account: accExp, subject: "foo"}}] + } + } + `)) + + s, _ := RunServerWithConfig(cf) + defer s.Shutdown() + + // Connect to the import account. We will not setup any responders, so a request should + // error out with ErrNoResponders. + nc := natsConnect(t, s.ClientURL(), nats.UserInfo("accImp", "accImp")) + defer nc.Close() + + _, err := nc.Request("foo", []byte("request"), 250*time.Millisecond) + require_Error(t, err, nats.ErrNoResponders) +} diff --git a/server/client.go b/server/client.go index 3304d53489..1556fcb49e 100644 --- a/server/client.go +++ b/server/client.go @@ -3472,17 +3472,25 @@ func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, su } client.mu.Unlock() + // For service imports, track if we delivered. + didDeliver := true + // Internal account clients are for service imports and need the '\r\n'. start := time.Now() if client.kind == ACCOUNT { sub.icb(sub, c, acc, string(subject), string(reply), msg) + // If we are a service import check to make sure we delivered the message somewhere. + if sub.si { + didDeliver = c.pa.delivered + } } else { sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize]) } if dur := time.Since(start); dur >= readLoopReportThreshold { srv.Warnf("Internal subscription on %q took too long: %v", subject, dur) } - return true + + return didDeliver } // If we are a client and we detect that the consumer we are @@ -4196,17 +4204,17 @@ var ( // processServiceImport is an internal callback when a subscription matches an imported service // from another account. This includes response mappings as well. -func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) { +func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byte) bool { // If we are a GW and this is not a direct serviceImport ignore. isResponse := si.isRespServiceImport() if (c.kind == GATEWAY || c.kind == ROUTER) && !isResponse { - return + return false } // Detect cycles and ignore (return) when we detect one. if len(c.pa.psi) > 0 { for i := len(c.pa.psi) - 1; i >= 0; i-- { if psi := c.pa.psi[i]; psi.se == si.se { - return + return false } } } @@ -4227,7 +4235,7 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // response service imports and rrMap entries which all will need to simply expire. // TODO(dlc) - Come up with something better. if shouldReturn || (checkJS && si.se != nil && si.se.acc == c.srv.SystemAccount()) { - return + return false } var nrr []byte @@ -4375,6 +4383,10 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt c.in.rts = orts c.pa = pacopy + // Before we undo didDeliver based on tracing and last mile, mark in the c.pa which informs us of no responders status. + // If we override due to tracing and traceOnly we do not want to send back a no responders. + c.pa.delivered = didDeliver + // Determine if we should remove this service import. This is for response service imports. // We will remove if we did not deliver, or if we are a response service import and we are // a singleton, or we have an EOF message. @@ -4404,6 +4416,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt siAcc.removeRespServiceImport(rsi, reason) } } + + return didDeliver } func (c *client) addSubToRouteTargets(sub *subscription) { diff --git a/server/parser.go b/server/parser.go index d9ce4a27a8..50b504b7f6 100644 --- a/server/parser.go +++ b/server/parser.go @@ -35,20 +35,21 @@ type parseState struct { } type pubArg struct { - arg []byte - pacache []byte - origin []byte - account []byte - subject []byte - deliver []byte - mapped []byte - reply []byte - szb []byte - hdb []byte - queues [][]byte - size int - hdr int - psi []*serviceImport + arg []byte + pacache []byte + origin []byte + account []byte + subject []byte + deliver []byte + mapped []byte + reply []byte + szb []byte + hdb []byte + queues [][]byte + size int + hdr int + psi []*serviceImport + delivered bool // Only used for service imports } // Parser constants @@ -500,6 +501,7 @@ func (c *client) parse(buf []byte) error { // Drop all pub args c.pa.arg, c.pa.pacache, c.pa.origin, c.pa.account, c.pa.subject, c.pa.mapped = nil, nil, nil, nil, nil, nil c.pa.reply, c.pa.hdr, c.pa.size, c.pa.szb, c.pa.hdb, c.pa.queues = nil, -1, 0, nil, nil, nil + c.pa.delivered = false lmsg = false case OP_A: switch b { From 18d9129ba404af675758c2b46e1e678b67f46fb0 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 19 Feb 2025 21:01:35 -0500 Subject: [PATCH 7/7] Fix tests that were looking for timeout errors vs no responders Signed-off-by: Derek Collison --- server/jetstream_cluster_2_test.go | 2 +- server/jwt_test.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 5d074699ee..0af0c7a66a 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7051,7 +7051,7 @@ func TestJetStreamClusterStreamDirectGetNotTooSoon(t *testing.T) { defer nc.Close() _, err = nc.Request(getSubj, nil, time.Second) - require_Error(t, err, nats.ErrTimeout) + require_Error(t, err, nats.ErrNoResponders) // Now start all and make sure they all eventually have subs for direct access. c.restartAll() diff --git a/server/jwt_test.go b/server/jwt_test.go index b6e4dfd3e8..b2bfd82149 100644 --- a/server/jwt_test.go +++ b/server/jwt_test.go @@ -6189,8 +6189,7 @@ func TestJWTAccountProtectedImport(t *testing.T) { // ensure service fails _, err = ncImp.Request(srvcSub, []byte("hello"), time.Second) - require_Error(t, err) - require_Contains(t, err.Error(), "timeout") + require_Error(t, err, nats.ErrNoResponders) s.AccountResolver().Store(exportPub, exportJWTOn) // ensure stream fails err = ncExp.Publish(strmSub, []byte("hello"))