Skip to content

Commit 4751763

Browse files
committed
quic: fixup application packet preparation
1 parent accec6d commit 4751763

File tree

3 files changed

+70
-71
lines changed

3 files changed

+70
-71
lines changed

src/quic/application.cc

Lines changed: 62 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -236,62 +236,64 @@ void Session::Application::StreamReset(Stream* stream,
236236
}
237237

238238
void Session::Application::SendPendingData() {
239+
static constexpr size_t kMaxPackets = 32;
239240
Debug(session_, "Application sending pending data");
240241
PathStorage path;
241-
PathStorage prev_path;
242-
243-
Packet* packet = nullptr;
244-
uint8_t* pos = nullptr;
245-
uint8_t* begin = nullptr;
246-
int err = 0;
247-
size_t last_nwrite = 0;
242+
StreamData stream_data;
248243

249244
// The maximum size of packet to create.
250245
const size_t max_packet_size = session_->max_packet_size();
251246

252247
// The maximum number of packets to send in this call to SendPendingData.
253248
const size_t max_packet_count = std::min(
254-
static_cast<size_t>(64000),
249+
kMaxPackets,
255250
ngtcp2_conn_get_send_quantum(*session_) / max_packet_size);
256251

257252
// The number of packets that have been sent in this call to SendPendingData.
258253
size_t packet_send_count = 0;
259254

255+
Packet* packet = nullptr;
256+
uint8_t* pos = nullptr;
257+
uint8_t* begin = nullptr;
258+
259+
auto ensure_packet = [&] {
260+
if (packet == nullptr) {
261+
packet = CreateStreamDataPacket();
262+
if (packet == nullptr) return false;
263+
pos = begin = ngtcp2_vec(*packet).base;
264+
}
265+
DCHECK_NOT_NULL(packet);
266+
DCHECK_NOT_NULL(pos);
267+
DCHECK_NOT_NULL(begin);
268+
return true;
269+
};
270+
271+
// We're going to enter a loop here to prepare and send no more than
272+
// max_packet_count packets.
260273
for (;;) {
261-
// ndatalen is the amount of stream data that was accepted into the packet,
262-
// if any.
274+
// ndatalen is the amount of stream data that was accepted into the packet.
263275
ssize_t ndatalen = 0;
264276

265-
// The stream_data is the next block of data from the application stream to
266-
// send.
267-
StreamData stream_data;
268-
err = GetStreamData(&stream_data);
277+
// Make sure we have a packet to write data into.
278+
if (!ensure_packet()) {
279+
Debug(session_, "Failed to create packet for stream data");
280+
// Doh! Could not create a packet. Time to bail.
281+
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
282+
return session_->Close(Session::CloseMethod::SILENT);
283+
}
269284

270-
if (err < 0) {
285+
// The stream_data is the next block of data from the application stream.
286+
if (GetStreamData(&stream_data) < 0) {
287+
Debug(session_, "Application failed to get stream data");
271288
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
272-
Debug(session_, "Application failed to get stream data with error %s",
273-
session_->last_error_);
289+
packet->Done(UV_ECANCELED);
274290
return session_->Close(Session::CloseMethod::SILENT);
275291
}
276292

277293
// If we got here, we were at least successful in checking for stream data.
278294
// There might not be any stream data to send.
279295
Debug(session_, "Application using stream data: %s", stream_data);
280296

281-
// Now let's make sure we have a packet to write data into.
282-
if (packet == nullptr) {
283-
packet = CreateStreamDataPacket();
284-
if (packet == nullptr) {
285-
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
286-
return session_->Close(Session::CloseMethod::SILENT);
287-
}
288-
pos = begin = ngtcp2_vec(*packet).base;
289-
}
290-
291-
DCHECK_NOT_NULL(pos);
292-
DCHECK_NOT_NULL(begin);
293-
DCHECK_NOT_NULL(stream_data.buf);
294-
295297
// Awesome, let's write our packet!
296298
ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data);
297299
Debug(session_, "Application accepted %zu bytes into packet", ndatalen);
@@ -304,7 +306,10 @@ void Session::Application::SendPendingData() {
304306
// We could not write any data for this stream into the packet because
305307
// the flow control for the stream itself indicates that the stream
306308
// is blocked. We'll skip and move on to the next stream.
309+
// ndatalen = -1 means that no stream data was accepted into the
310+
// packet, which is what we want here.
307311
DCHECK_EQ(ndatalen, -1);
312+
DCHECK(stream_data.stream);
308313
session_->StreamDataBlocked(stream_data.id);
309314
continue;
310315
}
@@ -314,14 +319,16 @@ void Session::Application::SendPendingData() {
314319
// any stream data!
315320
Debug(session_, "Stream %" PRIi64 " should be closed for writing",
316321
stream_data.id);
322+
// ndatalen = -1 means that no stream data was accepted into the
323+
// packet, which is what we want here.
317324
DCHECK_EQ(ndatalen, -1);
318-
// If we got this response, then there should have been a stream associated
319-
// with the stream_data, otherwise the response wouldn't make any sense.
320325
DCHECK(stream_data.stream);
321326
stream_data.stream->EndWritable();
322327
continue;
323328
}
324329
case NGTCP2_ERR_WRITE_MORE: {
330+
// This return value indicates that we should call into WriteVStream
331+
// again to write more data into the same packet.
325332
Debug(session_, "Application should write more to packet");
326333
DCHECK_GE(ndatalen, 0);
327334
if (!StreamCommit(&stream_data, ndatalen)) {
@@ -336,8 +343,8 @@ void Session::Application::SendPendingData() {
336343
DCHECK_EQ(ndatalen, -1);
337344
Debug(session_, "Application encountered error while writing packet: %s",
338345
ngtcp2_strerror(nwrite));
339-
packet->Done(UV_ECANCELED);
340346
session_->SetLastError(QuicError::ForNgtcp2Error(nwrite));
347+
packet->Done(UV_ECANCELED);
341348
return session_->Close(Session::CloseMethod::SILENT);
342349
} else if (ndatalen >= 0) {
343350
// We wrote some data into the packet. We need to update the flow control
@@ -349,48 +356,41 @@ void Session::Application::SendPendingData() {
349356
}
350357

351358
// When nwrite is zero, it means we are congestion limited.
359+
// We should stop trying to send additional packets.
352360
if (nwrite == 0) {
353361
Debug(session_, "Congestion limited.");
354362
// There might be a partial packet already prepared. If so, send it.
355-
if (pos - begin) {
356-
Debug(session_, "Packet has at least some data to send");
363+
size_t datalen = pos - begin;
364+
if (datalen) {
365+
Debug(session_, "Packet has %zu bytes to send", datalen);
357366
// At least some data had been written into the packet. We should send it.
358-
packet->Truncate(pos - begin);
367+
packet->Truncate(datalen);
359368
session_->Send(packet, path);
360369
} else {
361370
packet->Done(UV_ECANCELED);
362371
}
363372

364-
session_->UpdatePacketTxTime();
365-
return;
373+
// If there was stream data selected, we should reschedule it to try sending again.
374+
if (stream_data.id >= 0) ResumeStream(stream_data.id);
375+
376+
return session_->UpdatePacketTxTime();
366377
}
367378

379+
// At this point we have a packet prepared to send.
368380
pos += nwrite;
369-
370-
if (packet_send_count == 0) {
371-
path.CopyTo(&prev_path);
372-
last_nwrite = nwrite;
373-
} else if (prev_path != path ||
374-
static_cast<size_t>(nwrite) > last_nwrite ||
375-
(last_nwrite > max_packet_size && static_cast<size_t>(nwrite) != last_nwrite)) {
376-
auto datalen = pos - begin - nwrite;
377-
packet->Truncate(datalen);
378-
session_->Send(packet->Clone(), prev_path);
379-
session_->Send(packet, path);
380-
session_->UpdatePacketTxTime();
381-
packet = nullptr;
382-
pos = nullptr;
383-
begin = nullptr;
384-
continue;
381+
size_t datalen = pos - begin;
382+
Debug(session_, "Sending packet with %zu bytes", datalen);
383+
packet->Truncate(datalen);
384+
session_->Send(packet, path);
385+
386+
// If we have sent the maximum number of packets, we're done.
387+
if (++packet_send_count == max_packet_count) {
388+
return session_->UpdatePacketTxTime();
385389
}
386390

387-
if (++packet_send_count == max_packet_count || static_cast<size_t>(nwrite) < last_nwrite) {
388-
auto datalen = pos - begin;
389-
packet->Truncate(datalen);
390-
session_->Send(packet, path);
391-
session_->UpdatePacketTxTime();
392-
return;
393-
}
391+
// Prepare to loop back around to prepare a new packet.
392+
packet = nullptr;
393+
pos = begin = nullptr;
394394
}
395395
}
396396

@@ -403,7 +403,6 @@ ssize_t Session::Application::WriteVStream(PathStorage* path,
403403
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
404404
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
405405
ngtcp2_pkt_info pi;
406-
Debug(session_, "Writing max %zu bytes to packet", max_packet_size);
407406
return ngtcp2_conn_writev_stream(
408407
*session_,
409408
&path->path,

src/quic/application.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,27 +115,26 @@ class Session::Application : public MemoryRetainer {
115115
// the default stream priority.
116116
virtual StreamPriority GetStreamPriority(const Stream& stream);
117117

118-
protected:
119-
inline Environment* env() const { return session_->env(); }
120-
inline Session& session() { return *session_; }
121-
inline const Session& session() const { return *session_; }
122-
123-
Packet* CreateStreamDataPacket();
124-
125118
struct StreamData;
126119

127120
virtual int GetStreamData(StreamData* data) = 0;
128121
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
129122
virtual bool ShouldSetFin(const StreamData& data) = 0;
130123

124+
inline Environment* env() const { return session_->env(); }
125+
inline Session& session() { return *session_; }
126+
inline const Session& session() const { return *session_; }
127+
128+
private:
129+
Packet* CreateStreamDataPacket();
130+
131131
// Write the given stream_data into the buffer.
132132
ssize_t WriteVStream(PathStorage* path,
133133
uint8_t* buf,
134134
ssize_t* ndatalen,
135135
size_t max_packet_size,
136136
const StreamData& stream_data);
137137

138-
private:
139138
Session* session_;
140139
};
141140

src/quic/http3.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ class Http3Application final : public Session::Application {
367367
}
368368
}
369369
}
370+
DCHECK_NOT_NULL(stream_data.buf);
370371
return 0;
371372
}
372373

0 commit comments

Comments
 (0)