Skip to content

Commit db36d94

Browse files
hbhaskergvisor-bot
authored andcommitted
TCP Receive window advertisement fixes.
The fix in commit 028e045 was incorrect as it can cause the right edge of the window to shrink when we announce a zero window due to receive buffer being full as its done before the check for seeing if the window is being shrunk because of the selected window. Further the window was calculated purely on available space but in cases where we are getting full sized segments it makes more sense to use the actual bytes being held. This CL changes to use the lower of the total available space vs the available space in the maximal window we could advertise minus the actual payload bytes being held. This change also cleans up the code so that the window selection logic is not duplicated between getSendParams() and windowCrossedACKThresholdLocked. PiperOrigin-RevId: 336404827
1 parent d75fe76 commit db36d94

File tree

5 files changed

+146
-27
lines changed

5 files changed

+146
-27
lines changed

pkg/tcpip/transport/tcp/connect.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,12 +1219,6 @@ func (e *endpoint) handleSegment(s *segment) (cont bool, err *tcpip.Error) {
12191219
return true, nil
12201220
}
12211221

1222-
// Increase counter if after processing the segment we would potentially
1223-
// advertise a zero window.
1224-
if crossed, above := e.windowCrossedACKThresholdLocked(-s.segMemSize()); crossed && !above {
1225-
e.stats.ReceiveErrors.ZeroRcvWindowState.Increment()
1226-
}
1227-
12281222
// Now check if the received segment has caused us to transition
12291223
// to a CLOSED state, if yes then terminate processing and do
12301224
// not invoke the sender.

pkg/tcpip/transport/tcp/endpoint.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ type ReceiveErrors struct {
248248
// ZeroRcvWindowState is the number of times we advertised
249249
// a zero receive window when rcvList is full.
250250
ZeroRcvWindowState tcpip.StatCounter
251+
252+
// WantZeroWindow is the number of times we wanted to advertise a
253+
// zero receive window but couldn't because it would have caused
254+
// the receive window's right edge to shrink.
255+
WantZeroRcvWindow tcpip.StatCounter
251256
}
252257

253258
// SendErrors collect segment send errors within the transport layer.
@@ -1162,7 +1167,7 @@ func (e *endpoint) cleanupLocked() {
11621167
// wndFromSpace returns the window that we can advertise based on the available
11631168
// receive buffer space.
11641169
func wndFromSpace(space int) int {
1165-
return space / (1 << rcvAdvWndScale)
1170+
return space >> rcvAdvWndScale
11661171
}
11671172

11681173
// initialReceiveWindow returns the initial receive window to advertise in the
@@ -1518,6 +1523,38 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro
15181523
return num, tcpip.ControlMessages{}, nil
15191524
}
15201525

1526+
// selectWindowLocked returns the new window without checking for shrinking or scaling
1527+
// applied.
1528+
// Precondition: e.mu and e.rcvListMu must be held.
1529+
func (e *endpoint) selectWindowLocked() (wnd seqnum.Size) {
1530+
wndFromAvailable := wndFromSpace(e.receiveBufferAvailableLocked())
1531+
maxWindow := wndFromSpace(e.rcvBufSize)
1532+
wndFromUsedBytes := maxWindow - e.rcvBufUsed
1533+
1534+
// We take the lesser of the wndFromAvailable and wndFromUsedBytes because in
1535+
// cases where we receive a lot of small segments the segment overhead is a
1536+
// lot higher and we can run out socket buffer space before we can fill the
1537+
// previous window we advertised. In cases where we receive MSS sized or close
1538+
// MSS sized segments we will probably run out of window space before we
1539+
// exhaust receive buffer.
1540+
newWnd := wndFromAvailable
1541+
if newWnd > wndFromUsedBytes {
1542+
newWnd = wndFromUsedBytes
1543+
}
1544+
if newWnd < 0 {
1545+
newWnd = 0
1546+
}
1547+
return seqnum.Size(newWnd)
1548+
}
1549+
1550+
// selectWindow invokes selectWindowLocked after acquiring e.rcvListMu.
1551+
func (e *endpoint) selectWindow() (wnd seqnum.Size) {
1552+
e.rcvListMu.Lock()
1553+
wnd = e.selectWindowLocked()
1554+
e.rcvListMu.Unlock()
1555+
return wnd
1556+
}
1557+
15211558
// windowCrossedACKThresholdLocked checks if the receive window to be announced
15221559
// would be under aMSS or under the window derived from half receive buffer,
15231560
// whichever smaller. This is useful as a receive side silly window syndrome
@@ -1534,7 +1571,7 @@ func (e *endpoint) Peek(vec [][]byte) (int64, tcpip.ControlMessages, *tcpip.Erro
15341571
//
15351572
// Precondition: e.mu and e.rcvListMu must be held.
15361573
func (e *endpoint) windowCrossedACKThresholdLocked(deltaBefore int) (crossed bool, above bool) {
1537-
newAvail := wndFromSpace(e.receiveBufferAvailableLocked())
1574+
newAvail := int(e.selectWindowLocked())
15381575
oldAvail := newAvail - deltaBefore
15391576
if oldAvail < 0 {
15401577
oldAvail = 0

pkg/tcpip/transport/tcp/rcv.go

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type receiver struct {
4343
// rcvWnd is the non-scaled receive window last advertised to the peer.
4444
rcvWnd seqnum.Size
4545

46+
// rcvWUP is the rcvNxt value at the last window update sent.
47+
rcvWUP seqnum.Value
48+
4649
rcvWndScale uint8
4750

4851
closed bool
@@ -64,6 +67,7 @@ func newReceiver(ep *endpoint, irs seqnum.Value, rcvWnd seqnum.Size, rcvWndScale
6467
rcvNxt: irs + 1,
6568
rcvAcc: irs.Add(rcvWnd + 1),
6669
rcvWnd: rcvWnd,
70+
rcvWUP: irs + 1,
6771
rcvWndScale: rcvWndScale,
6872
lastRcvdAckTime: time.Now(),
6973
}
@@ -84,34 +88,54 @@ func (r *receiver) acceptable(segSeq seqnum.Value, segLen seqnum.Size) bool {
8488
return header.Acceptable(segSeq, segLen, r.rcvNxt, r.rcvNxt.Add(advertisedWindowSize))
8589
}
8690

91+
// currentWindow returns the available space in the window that was advertised
92+
// last to our peer.
93+
func (r *receiver) currentWindow() (curWnd seqnum.Size) {
94+
endOfWnd := r.rcvWUP.Add(r.rcvWnd)
95+
if endOfWnd.LessThan(r.rcvNxt) {
96+
// return 0 if r.rcvNxt is past the end of the previously advertised window.
97+
// This can happen because we accept a large segment completely even if
98+
// accepting it causes it to partially exceed the advertised window.
99+
return 0
100+
}
101+
return r.rcvNxt.Size(endOfWnd)
102+
}
103+
87104
// getSendParams returns the parameters needed by the sender when building
88105
// segments to send.
89106
func (r *receiver) getSendParams() (rcvNxt seqnum.Value, rcvWnd seqnum.Size) {
90-
avail := wndFromSpace(r.ep.receiveBufferAvailable())
91-
if avail == 0 {
92-
// We have no space available to accept any data, move to zero window
93-
// state.
94-
r.rcvWnd = 0
95-
return r.rcvNxt, 0
96-
}
97-
98-
acc := r.rcvNxt.Add(seqnum.Size(avail))
99-
newWnd := r.rcvNxt.Size(acc)
100-
curWnd := r.rcvNxt.Size(r.rcvAcc)
101-
107+
newWnd := r.ep.selectWindow()
108+
curWnd := r.currentWindow()
102109
// Update rcvAcc only if new window is > previously advertised window. We
103110
// should never shrink the acceptable sequence space once it has been
104111
// advertised the peer. If we shrink the acceptable sequence space then we
105112
// would end up dropping bytes that might already be in flight.
106-
if newWnd > curWnd {
107-
r.rcvAcc = r.rcvNxt.Add(newWnd)
113+
// ==================================================== sequence space.
114+
// ^ ^ ^ ^
115+
// rcvWUP rcvNxt rcvAcc new rcvAcc
116+
// <=====curWnd ===>
117+
// <========= newWnd > curWnd ========= >
118+
if r.rcvNxt.Add(seqnum.Size(curWnd)).LessThan(r.rcvNxt.Add(seqnum.Size(newWnd))) {
119+
// If the new window moves the right edge, then update rcvAcc.
120+
r.rcvAcc = r.rcvNxt.Add(seqnum.Size(newWnd))
108121
} else {
122+
if newWnd == 0 {
123+
// newWnd is zero but we can't advertise a zero as it would cause window
124+
// to shrink so just increment a metric to record this event.
125+
r.ep.stats.ReceiveErrors.WantZeroRcvWindow.Increment()
126+
}
109127
newWnd = curWnd
110128
}
111129
// Stash away the non-scaled receive window as we use it for measuring
112130
// receiver's estimated RTT.
113131
r.rcvWnd = newWnd
114-
return r.rcvNxt, r.rcvWnd >> r.rcvWndScale
132+
r.rcvWUP = r.rcvNxt
133+
scaledWnd := r.rcvWnd >> r.rcvWndScale
134+
if scaledWnd == 0 {
135+
// Increment a metric if we are advertising an actual zero window.
136+
r.ep.stats.ReceiveErrors.ZeroRcvWindowState.Increment()
137+
}
138+
return r.rcvNxt, scaledWnd
115139
}
116140

117141
// nonZeroWindow is called when the receive window grows from zero to nonzero;

pkg/tcpip/transport/tcp/tcp_test.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6264,14 +6264,27 @@ func TestReceiveBufferAutoTuning(t *testing.T) {
62646264
rawEP.NextSeqNum--
62656265
rawEP.SendPacketWithTS(nil, tsVal)
62666266
rawEP.NextSeqNum++
6267+
62676268
if i == 0 {
62686269
// In the first iteration the receiver based RTT is not
62696270
// yet known as a result the moderation code should not
62706271
// increase the advertised window.
62716272
rawEP.VerifyACKRcvWnd(scaleRcvWnd(curRcvWnd))
62726273
} else {
6273-
pkt := c.GetPacket()
6274-
curRcvWnd = int(header.TCP(header.IPv4(pkt).Payload()).WindowSize()) << c.WindowScale
6274+
// Read loop above could generate an ACK if the window had dropped to
6275+
// zero and then read had opened it up.
6276+
lastACK := c.GetPacket()
6277+
// Discard any intermediate ACKs and only check the last ACK we get in a
6278+
// short time period of few ms.
6279+
for {
6280+
time.Sleep(1 * time.Millisecond)
6281+
pkt := c.GetPacketNonBlocking()
6282+
if pkt == nil {
6283+
break
6284+
}
6285+
lastACK = pkt
6286+
}
6287+
curRcvWnd = int(header.TCP(header.IPv4(lastACK).Payload()).WindowSize()) << c.WindowScale
62756288
// If thew new current window is close maxReceiveBufferSize then terminate
62766289
// the loop. This can happen before all iterations are done due to timing
62776290
// differences when running the test.
@@ -7328,7 +7341,7 @@ func TestIncreaseWindowOnBufferResize(t *testing.T) {
73287341

73297342
// Write chunks of ~30000 bytes. It's important that two
73307343
// payloads make it equal or longer than MSS.
7331-
remain := rcvBuf * 2
7344+
remain := rcvBuf
73327345
sent := 0
73337346
data := make([]byte, defaultMTU/2)
73347347

@@ -7343,7 +7356,6 @@ func TestIncreaseWindowOnBufferResize(t *testing.T) {
73437356
})
73447357
sent += len(data)
73457358
remain -= len(data)
7346-
73477359
checker.IPv4(t, c.GetPacket(),
73487360
checker.PayloadLen(header.TCPMinimumSize),
73497361
checker.TCP(

test/syscalls/linux/tcp_socket.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,58 @@ TEST_P(SimpleTcpSocketTest, NonBlockingConnectNoListener) {
903903
EXPECT_EQ(err, ECONNREFUSED);
904904
}
905905

906+
TEST_P(SimpleTcpSocketTest, SelfConnectSendRecv_NoRandomSave) {
907+
// Initialize address to the loopback one.
908+
sockaddr_storage addr =
909+
ASSERT_NO_ERRNO_AND_VALUE(InetLoopbackAddr(GetParam()));
910+
socklen_t addrlen = sizeof(addr);
911+
912+
const FileDescriptor s =
913+
ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP));
914+
915+
ASSERT_THAT(
916+
(bind)(s.get(), reinterpret_cast<struct sockaddr*>(&addr), addrlen),
917+
SyscallSucceeds());
918+
// Get the bound port.
919+
ASSERT_THAT(
920+
getsockname(s.get(), reinterpret_cast<struct sockaddr*>(&addr), &addrlen),
921+
SyscallSucceeds());
922+
ASSERT_THAT(RetryEINTR(connect)(
923+
s.get(), reinterpret_cast<struct sockaddr*>(&addr), addrlen),
924+
SyscallSucceeds());
925+
926+
constexpr int kBufSz = 1 << 20; // 1 MiB
927+
std::vector<char> writebuf(kBufSz);
928+
929+
// Start reading the response in a loop.
930+
int read_bytes = 0;
931+
ScopedThread t([&s, &read_bytes]() {
932+
// Too many syscalls.
933+
const DisableSave ds;
934+
935+
char readbuf[2500] = {};
936+
int n = -1;
937+
while (n != 0) {
938+
ASSERT_THAT(n = RetryEINTR(read)(s.get(), &readbuf, sizeof(readbuf)),
939+
SyscallSucceeds());
940+
read_bytes += n;
941+
}
942+
});
943+
944+
// Try to send the whole thing.
945+
int n;
946+
ASSERT_THAT(n = SendFd(s.get(), writebuf.data(), kBufSz, 0),
947+
SyscallSucceeds());
948+
949+
// We should have written the whole thing.
950+
EXPECT_EQ(n, kBufSz);
951+
EXPECT_THAT(shutdown(s.get(), SHUT_WR), SyscallSucceedsWithValue(0));
952+
t.Join();
953+
954+
// We should have read the whole thing.
955+
EXPECT_EQ(read_bytes, kBufSz);
956+
}
957+
906958
TEST_P(SimpleTcpSocketTest, NonBlockingConnect) {
907959
const FileDescriptor listener =
908960
ASSERT_NO_ERRNO_AND_VALUE(Socket(GetParam(), SOCK_STREAM, IPPROTO_TCP));

0 commit comments

Comments
 (0)