Skip to content

Commit accec6d

Browse files
committed
quic: update the packet generation logic
1 parent cffe9b2 commit accec6d

File tree

8 files changed

+149
-84
lines changed

8 files changed

+149
-84
lines changed

src/quic/application.cc

Lines changed: 107 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ Packet* Session::Application::CreateStreamDataPacket() {
204204
return Packet::Create(env(),
205205
session_->endpoint_.get(),
206206
session_->remote_address_,
207-
ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
207+
session_->max_packet_size(),
208208
"stream data");
209209
}
210210

@@ -238,37 +238,33 @@ void Session::Application::StreamReset(Stream* stream,
238238
void Session::Application::SendPendingData() {
239239
Debug(session_, "Application sending pending data");
240240
PathStorage path;
241+
PathStorage prev_path;
241242

242243
Packet* packet = nullptr;
243244
uint8_t* pos = nullptr;
245+
uint8_t* begin = nullptr;
244246
int err = 0;
247+
size_t last_nwrite = 0;
245248

246-
size_t maxPacketCount = std::min(static_cast<size_t>(64000),
247-
ngtcp2_conn_get_send_quantum(*session_));
248-
size_t packetSendCount = 0;
249+
// The maximum size of packet to create.
250+
const size_t max_packet_size = session_->max_packet_size();
249251

250-
const auto updateTimer = [&] {
251-
Debug(session_, "Application updating the session timer");
252-
ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime());
253-
session_->UpdateTimer();
254-
};
255-
256-
const auto congestionLimited = [&](auto packet) {
257-
auto len = pos - ngtcp2_vec(*packet).base;
258-
// We are either congestion limited or done.
259-
if (len) {
260-
// Some data was serialized into the packet. We need to send it.
261-
packet->Truncate(len);
262-
session_->Send(std::move(packet), path);
263-
}
252+
// The maximum number of packets to send in this call to SendPendingData.
253+
const size_t max_packet_count = std::min(
254+
static_cast<size_t>(64000),
255+
ngtcp2_conn_get_send_quantum(*session_) / max_packet_size);
264256

265-
updateTimer();
266-
};
257+
// The number of packets that have been sent in this call to SendPendingData.
258+
size_t packet_send_count = 0;
267259

268260
for (;;) {
261+
// ndatalen is the amount of stream data that was accepted into the packet,
262+
// if any.
269263
ssize_t ndatalen = 0;
270-
StreamData stream_data;
271264

265+
// The stream_data is the next block of data from the application stream to
266+
// send.
267+
StreamData stream_data;
272268
err = GetStreamData(&stream_data);
273269

274270
if (err < 0) {
@@ -278,115 +274,148 @@ void Session::Application::SendPendingData() {
278274
return session_->Close(Session::CloseMethod::SILENT);
279275
}
280276

277+
// If we got here, we were at least successful in checking for stream data.
278+
// There might not be any stream data to send.
279+
Debug(session_, "Application using stream data: %s", stream_data);
280+
281+
// Now let's make sure we have a packet to write data into.
281282
if (packet == nullptr) {
282283
packet = CreateStreamDataPacket();
283284
if (packet == nullptr) {
284285
session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL);
285286
return session_->Close(Session::CloseMethod::SILENT);
286287
}
287-
pos = ngtcp2_vec(*packet).base;
288+
pos = begin = ngtcp2_vec(*packet).base;
288289
}
289290

290-
Debug(session_, "Application using stream data: %s", stream_data);
291291
DCHECK_NOT_NULL(pos);
292+
DCHECK_NOT_NULL(begin);
292293
DCHECK_NOT_NULL(stream_data.buf);
293-
ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data);
294-
Debug(session_, "Application accepted %zu bytes", ndatalen);
295294

296-
if (nwrite <= 0) {
295+
// Awesome, let's write our packet!
296+
ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data);
297+
Debug(session_, "Application accepted %zu bytes into packet", ndatalen);
298+
299+
// A negative nwrite value indicates either an error or that there is more data
300+
// to write into the packet.
301+
if (nwrite < 0) {
297302
switch (nwrite) {
298-
case 0: {
299-
Debug(session_, "Congestion limited. Sending nothing.");
300-
if (stream_data.id >= 0) ResumeStream(stream_data.id);
301-
return congestionLimited(std::move(packet));
302-
}
303303
case NGTCP2_ERR_STREAM_DATA_BLOCKED: {
304-
session().StreamDataBlocked(stream_data.id);
305-
if (session().max_data_left() == 0) {
306-
if (stream_data.id >= 0) ResumeStream(stream_data.id);
307-
return congestionLimited(std::move(packet));
308-
}
309-
CHECK_LE(ndatalen, 0);
304+
// We could not write any data for this stream into the packet because
305+
// the flow control for the stream itself indicates that the stream
306+
// is blocked. We'll skip and move on to the next stream.
307+
DCHECK_EQ(ndatalen, -1);
308+
session_->StreamDataBlocked(stream_data.id);
310309
continue;
311310
}
312311
case NGTCP2_ERR_STREAM_SHUT_WR: {
313-
Debug(session_, "Stream %" PRIi64 " is closed for writing",
314-
stream_data.id);
315-
// Indicates that the writable side of the stream has been closed
312+
// Indicates that the writable side of the stream should be closed
316313
// locally or the stream is being reset. In either case, we can't send
317314
// any stream data!
318-
CHECK_GE(stream_data.id, 0);
319-
// We need to notify the stream that the writable side has been closed
320-
// and no more outbound data can be sent.
321-
CHECK_LE(ndatalen, 0);
322-
auto stream = session_->FindStream(stream_data.id);
323-
if (stream) stream->EndWritable();
315+
Debug(session_, "Stream %" PRIi64 " should be closed for writing",
316+
stream_data.id);
317+
DCHECK_EQ(ndatalen, -1);
318+
// If we got this response, then there should have been a stream associated
319+
// with the stream_data, otherwise the response wouldn't make any sense.
320+
DCHECK(stream_data.stream);
321+
stream_data.stream->EndWritable();
324322
continue;
325323
}
326324
case NGTCP2_ERR_WRITE_MORE: {
327325
Debug(session_, "Application should write more to packet");
328-
CHECK_GT(ndatalen, 0);
329-
if (!StreamCommit(&stream_data, ndatalen)) return session_->Close();
330-
pos += ndatalen;
326+
DCHECK_GE(ndatalen, 0);
327+
if (!StreamCommit(&stream_data, ndatalen)) {
328+
packet->Done(UV_ECANCELED);
329+
return session_->Close(CloseMethod::SILENT);
330+
}
331331
continue;
332332
}
333333
}
334334

335+
// Some other type of error happened.
336+
DCHECK_EQ(ndatalen, -1);
337+
Debug(session_, "Application encountered error while writing packet: %s",
338+
ngtcp2_strerror(nwrite));
335339
packet->Done(UV_ECANCELED);
336-
session_->last_error_ = QuicError::ForNgtcp2Error(nwrite);
337-
Debug(session_, "Application failed to write stream data with error %s",
338-
session_->last_error_);
340+
session_->SetLastError(QuicError::ForNgtcp2Error(nwrite));
339341
return session_->Close(Session::CloseMethod::SILENT);
342+
} else if (ndatalen >= 0) {
343+
// We wrote some data into the packet. We need to update the flow control
344+
// by committing the data.
345+
if (!StreamCommit(&stream_data, ndatalen)) {
346+
packet->Done(UV_ECANCELED);
347+
return session_->Close(CloseMethod::SILENT);
348+
}
340349
}
341350

342-
pos += nwrite;
343-
if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) {
344-
// Since we are closing the session here, we don't worry about updating
345-
// the pkt tx time. The failed StreamCommit should have updated the
346-
// last_error_ appropriately.
347-
packet->Done(UV_ECANCELED);
348-
return session_->Close(Session::CloseMethod::SILENT);
351+
// When nwrite is zero, it means we are congestion limited.
352+
if (nwrite == 0) {
353+
Debug(session_, "Congestion limited.");
354+
// There might be a partial packet already prepared. If so, send it.
355+
if (pos - begin) {
356+
Debug(session_, "Packet has at least some data to send");
357+
// At least some data had been written into the packet. We should send it.
358+
packet->Truncate(pos - begin);
359+
session_->Send(packet, path);
360+
} else {
361+
packet->Done(UV_ECANCELED);
362+
}
363+
364+
session_->UpdatePacketTxTime();
365+
return;
349366
}
350367

351-
if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id);
368+
pos += nwrite;
352369

353-
Debug(session_, "Packet ready to send with %zu bytes", nwrite);
354-
packet->Truncate(nwrite);
355-
session_->Send(packet, path);
356-
packet = nullptr;
357-
pos = nullptr;
370+
if (packet_send_count == 0) {
371+
path.CopyTo(&prev_path);
372+
last_nwrite = nwrite;
373+
} else if (prev_path != path ||
374+
static_cast<size_t>(nwrite) > last_nwrite ||
375+
(last_nwrite > max_packet_size && static_cast<size_t>(nwrite) != last_nwrite)) {
376+
auto datalen = pos - begin - nwrite;
377+
packet->Truncate(datalen);
378+
session_->Send(packet->Clone(), prev_path);
379+
session_->Send(packet, path);
380+
session_->UpdatePacketTxTime();
381+
packet = nullptr;
382+
pos = nullptr;
383+
begin = nullptr;
384+
continue;
385+
}
358386

359-
if (++packetSendCount == maxPacketCount) {
360-
break;
387+
if (++packet_send_count == max_packet_count || static_cast<size_t>(nwrite) < last_nwrite) {
388+
auto datalen = pos - begin;
389+
packet->Truncate(datalen);
390+
session_->Send(packet, path);
391+
session_->UpdatePacketTxTime();
392+
return;
361393
}
362394
}
363-
364-
updateTimer();
365395
}
366396

367397
ssize_t Session::Application::WriteVStream(PathStorage* path,
368-
uint8_t* buf,
398+
uint8_t* dest,
369399
ssize_t* ndatalen,
400+
size_t max_packet_size,
370401
const StreamData& stream_data) {
371-
CHECK_LE(stream_data.count, kMaxVectorCount);
372-
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE;
373-
if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE;
402+
DCHECK_LE(stream_data.count, kMaxVectorCount);
403+
uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE;
374404
if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN;
375405
ngtcp2_pkt_info pi;
376-
377-
ssize_t ret = ngtcp2_conn_writev_stream(
406+
Debug(session_, "Writing max %zu bytes to packet", max_packet_size);
407+
return ngtcp2_conn_writev_stream(
378408
*session_,
379409
&path->path,
380410
&pi,
381-
buf,
382-
ngtcp2_conn_get_max_tx_udp_payload_size(*session_),
411+
dest,
412+
max_packet_size,
383413
ndatalen,
384414
flags,
385415
stream_data.id,
386416
stream_data.buf,
387417
stream_data.count,
388418
uv_hrtime());
389-
return ret;
390419
}
391420

392421
// The DefaultApplication is the default implementation of Session::Application

src/quic/application.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ class Session::Application : public MemoryRetainer {
132132
ssize_t WriteVStream(PathStorage* path,
133133
uint8_t* buf,
134134
ssize_t* ndatalen,
135+
size_t max_packet_size,
135136
const StreamData& stream_data);
136137

137138
private:

src/quic/data.cc

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,27 @@ std::string Path::ToString() const {
4747
return res;
4848
}
4949

50-
PathStorage::PathStorage() {
51-
ngtcp2_path_storage_zero(this);
52-
}
50+
PathStorage::PathStorage() { Reset(); }
5351
PathStorage::operator ngtcp2_path() {
5452
return path;
5553
}
5654

55+
void PathStorage::Reset() {
56+
ngtcp2_path_storage_zero(this);
57+
}
58+
59+
void PathStorage::CopyTo(PathStorage* path) const {
60+
ngtcp2_path_copy(&path->path, &this->path);
61+
}
62+
63+
bool PathStorage::operator==(const PathStorage& other) const {
64+
return ngtcp2_path_eq(&path, &other.path) != 0;
65+
}
66+
67+
bool PathStorage::operator!=(const PathStorage& other) const {
68+
return ngtcp2_path_eq(&path, &other.path) == 0;
69+
}
70+
5771
// ============================================================================
5872

5973
Store::Store(std::shared_ptr<v8::BackingStore> store,

src/quic/data.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ struct Path final : public ngtcp2_path {
2424
struct PathStorage final : public ngtcp2_path_storage {
2525
PathStorage();
2626
operator ngtcp2_path();
27+
28+
void Reset();
29+
void CopyTo(PathStorage* path) const;
30+
31+
bool operator==(const PathStorage& other) const;
32+
bool operator!=(const PathStorage& other) const;
2733
};
2834

2935
class Store final : public MemoryRetainer {

src/quic/http3.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,9 @@ class Http3Application final : public Session::Application {
362362
return static_cast<int>(ret);
363363
} else {
364364
data->remaining = data->count = static_cast<size_t>(ret);
365+
if (data->id > 0) {
366+
data->stream = session().FindStream(data->id);
367+
}
365368
}
366369
}
367370
return 0;

src/quic/packet.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ Packet* Packet::FromFreeList(Environment* env,
161161
CHECK_NOT_NULL(packet);
162162
CHECK_EQ(env, packet->env());
163163
Debug(packet, "Reusing packet from freelist");
164-
packet->data_ = data;
164+
packet->data_ = std::move(data);
165165
packet->destination_ = destination;
166166
packet->listener_ = listener;
167167
return packet;

src/quic/session.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,10 @@ Session::~Session() {
572572
DCHECK(streams_.empty());
573573
}
574574

575+
size_t Session::max_packet_size() const {
576+
return ngtcp2_conn_get_max_tx_udp_payload_size(*this);
577+
}
578+
575579
Session::operator ngtcp2_conn*() const {
576580
return connection_.get();
577581
}
@@ -862,6 +866,10 @@ void Session::Send(Packet* packet, const PathStorage& path) {
862866
Send(packet);
863867
}
864868

869+
void Session::UpdatePacketTxTime() {
870+
ngtcp2_conn_update_pkt_tx_time(*this, uv_hrtime());
871+
}
872+
865873
uint64_t Session::SendDatagram(Store&& data) {
866874
auto tp = ngtcp2_conn_get_remote_transport_params(*this);
867875
uint64_t max_datagram_size = tp->max_datagram_frame_size;
@@ -1322,8 +1330,8 @@ void Session::OnTimeout() {
13221330
if (is_destroyed()) return;
13231331

13241332
int ret = ngtcp2_conn_handle_expiry(*this, uv_hrtime());
1325-
if (NGTCP2_OK(ret) && !is_in_closing_period() && !is_in_draining_period() &&
1326-
env()->can_call_into_js()) {
1333+
if (NGTCP2_OK(ret) && !is_in_closing_period() && !is_in_draining_period()) {
1334+
Debug(this, "Sending pending data after timr expiry");
13271335
SendPendingDataScope send_scope(this);
13281336
return;
13291337
}
@@ -1346,6 +1354,7 @@ void Session::UpdateTimer() {
13461354
}
13471355

13481356
auto timeout = (expiry - now) / NGTCP2_MILLISECONDS;
1357+
Debug(this, "Updating timeout to %zu milliseconds", timeout);
13491358

13501359
// If timeout is zero here, it means our timer is less than a millisecond
13511360
// off from expiry. Let's bump the timer to 1.

src/quic/session.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
235235
bool is_destroyed() const;
236236
bool is_server() const;
237237

238+
size_t max_packet_size() const;
239+
238240
void set_priority_supported(bool on = true);
239241

240242
std::string diagnostic_name() const override;
@@ -246,6 +248,7 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
246248

247249
TransportParams GetLocalTransportParams() const;
248250
TransportParams GetRemoteTransportParams() const;
251+
void UpdatePacketTxTime();
249252

250253
void MemoryInfo(MemoryTracker* tracker) const override;
251254
SET_MEMORY_INFO_NAME(Session)

0 commit comments

Comments
 (0)