@@ -868,31 +868,51 @@ ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
868
868
// various callback functions. Each of these will typically result in a call
869
869
// out to JavaScript so this particular function is rather hot and can be
870
870
// quite expensive. This is a potential performance optimization target later.
871
- ssize_t Http2Session::Write (const uv_buf_t * bufs, size_t nbufs) {
872
- size_t total = 0 ;
873
- // Note that nghttp2_session_mem_recv is a synchronous operation that
874
- // will trigger a number of other callbacks. Those will, in turn have
871
+ ssize_t Http2Session::ConsumeHTTP2Data () {
872
+ CHECK_NOT_NULL (stream_buf_.base );
873
+ CHECK_LT (stream_buf_offset_, stream_buf_.len );
874
+ size_t read_len = stream_buf_.len - stream_buf_offset_;
875
+
875
876
// multiple side effects.
876
- for (size_t n = 0 ; n < nbufs; n++) {
877
- Debug (this , " receiving %d bytes [wants data? %d]" ,
878
- bufs[n].len ,
879
- nghttp2_session_want_read (session_));
880
- ssize_t ret =
881
- nghttp2_session_mem_recv (session_,
882
- reinterpret_cast <uint8_t *>(bufs[n].base ),
883
- bufs[n].len );
884
- CHECK_NE (ret, NGHTTP2_ERR_NOMEM);
885
-
886
- if (ret < 0 )
887
- return ret;
877
+ Debug (this , " receiving %d bytes [wants data? %d]" ,
878
+ read_len,
879
+ nghttp2_session_want_read (session_));
880
+ flags_ &= ~SESSION_STATE_NGHTTP2_RECV_PAUSED;
881
+ ssize_t ret =
882
+ nghttp2_session_mem_recv (session_,
883
+ reinterpret_cast <uint8_t *>(stream_buf_.base ) +
884
+ stream_buf_offset_,
885
+ read_len);
886
+ CHECK_NE (ret, NGHTTP2_ERR_NOMEM);
888
887
889
- total += ret;
888
+ if (flags_ & SESSION_STATE_NGHTTP2_RECV_PAUSED) {
889
+ CHECK_NE (flags_ & SESSION_STATE_READING_STOPPED, 0 );
890
+
891
+ CHECK_GT (ret, 0 );
892
+ CHECK_LE (static_cast <size_t >(ret), read_len);
893
+
894
+ if (static_cast <size_t >(ret) < read_len) {
895
+ // Mark the remainder of the data as available for later consumption.
896
+ stream_buf_offset_ += ret;
897
+ return ret;
898
+ }
890
899
}
900
+
901
+ // We are done processing the current input chunk.
902
+ DecrementCurrentSessionMemory (stream_buf_.len );
903
+ stream_buf_offset_ = 0 ;
904
+ stream_buf_ab_.Reset ();
905
+ stream_buf_allocation_.clear ();
906
+ stream_buf_ = uv_buf_init (nullptr , 0 );
907
+
908
+ if (ret < 0 )
909
+ return ret;
910
+
891
911
// Send any data that was queued up while processing the received data.
892
912
if (!IsDestroyed ()) {
893
913
SendPendingData ();
894
914
}
895
- return total ;
915
+ return ret ;
896
916
}
897
917
898
918
@@ -1194,8 +1214,18 @@ int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
1194
1214
nghttp2_session_consume_stream (handle, id, avail);
1195
1215
else
1196
1216
stream->inbound_consumed_data_while_paused_ += avail;
1217
+
1218
+ // If we have a gathered a lot of data for output, try sending it now.
1219
+ if (session->outgoing_length_ > 4096 ) session->SendPendingData ();
1197
1220
} while (len != 0 );
1198
1221
1222
+ // If we are currently waiting for a write operation to finish, we should
1223
+ // tell nghttp2 that we want to wait before we process more input data.
1224
+ if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
1225
+ session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
1226
+ return NGHTTP2_ERR_PAUSE;
1227
+ }
1228
+
1199
1229
return 0 ;
1200
1230
}
1201
1231
@@ -1283,6 +1313,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
1283
1313
size_t offset = buf.base - session->stream_buf_ .base ;
1284
1314
1285
1315
// Verify that the data offset is inside the current read buffer.
1316
+ CHECK_GE (offset, session->stream_buf_offset_ );
1286
1317
CHECK_LE (offset, session->stream_buf_ .len );
1287
1318
CHECK_LE (offset + buf.len , session->stream_buf_ .len );
1288
1319
@@ -1554,6 +1585,11 @@ void Http2Session::OnStreamAfterWrite(WriteWrap* w, int status) {
1554
1585
stream_->ReadStart ();
1555
1586
}
1556
1587
1588
+ // If there is more incoming data queued up, consume it.
1589
+ if (stream_buf_offset_ > 0 ) {
1590
+ ConsumeHTTP2Data ();
1591
+ }
1592
+
1557
1593
if (!(flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
1558
1594
// Schedule a new write if nghttp2 wants to send data.
1559
1595
MaybeScheduleWrite ();
@@ -1609,6 +1645,7 @@ void Http2Session::ClearOutgoing(int status) {
1609
1645
1610
1646
if (outgoing_buffers_.size () > 0 ) {
1611
1647
outgoing_storage_.clear ();
1648
+ outgoing_length_ = 0 ;
1612
1649
1613
1650
std::vector<nghttp2_stream_write> current_outgoing_buffers_;
1614
1651
current_outgoing_buffers_.swap (outgoing_buffers_);
@@ -1639,6 +1676,11 @@ void Http2Session::ClearOutgoing(int status) {
1639
1676
}
1640
1677
}
1641
1678
1679
+ void Http2Session::PushOutgoingBuffer (nghttp2_stream_write&& write) {
1680
+ outgoing_length_ += write .buf .len ;
1681
+ outgoing_buffers_.emplace_back (std::move (write ));
1682
+ }
1683
+
1642
1684
// Queue a given block of data for sending. This always creates a copy,
1643
1685
// so it is used for the cases in which nghttp2 requests sending of a
1644
1686
// small chunk of data.
@@ -1651,7 +1693,7 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) {
1651
1693
// of the outgoing_buffers_ vector may invalidate the pointer.
1652
1694
// The correct base pointers will be set later, before writing to the
1653
1695
// underlying socket.
1654
- outgoing_buffers_. emplace_back (nghttp2_stream_write {
1696
+ PushOutgoingBuffer (nghttp2_stream_write {
1655
1697
uv_buf_init (nullptr , src_length)
1656
1698
});
1657
1699
}
@@ -1774,13 +1816,13 @@ int Http2Session::OnSendData(
1774
1816
if (write .buf .len <= length) {
1775
1817
// This write does not suffice by itself, so we can consume it completely.
1776
1818
length -= write .buf .len ;
1777
- session->outgoing_buffers_ . emplace_back (std::move (write ));
1819
+ session->PushOutgoingBuffer (std::move (write ));
1778
1820
stream->queue_ .pop ();
1779
1821
continue ;
1780
1822
}
1781
1823
1782
1824
// Slice off `length` bytes of the first write in the queue.
1783
- session->outgoing_buffers_ . emplace_back (nghttp2_stream_write {
1825
+ session->PushOutgoingBuffer (nghttp2_stream_write {
1784
1826
uv_buf_init (write .buf .base , length)
1785
1827
});
1786
1828
write .buf .base += length;
@@ -1790,7 +1832,7 @@ int Http2Session::OnSendData(
1790
1832
1791
1833
if (frame->data .padlen > 0 ) {
1792
1834
// Send padding if that was requested.
1793
- session->outgoing_buffers_ . emplace_back (nghttp2_stream_write {
1835
+ session->PushOutgoingBuffer (nghttp2_stream_write {
1794
1836
uv_buf_init (const_cast <char *>(zero_bytes_256), frame->data .padlen - 1 )
1795
1837
});
1796
1838
}
@@ -1827,8 +1869,6 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
1827
1869
Http2Scope h2scope (this );
1828
1870
CHECK_NOT_NULL (stream_);
1829
1871
Debug (this , " receiving %d bytes" , nread);
1830
- CHECK_EQ (stream_buf_allocation_.size (), 0 );
1831
- CHECK (stream_buf_ab_.IsEmpty ());
1832
1872
AllocatedBuffer buf (env (), buf_);
1833
1873
1834
1874
// Only pass data on if nread > 0
@@ -1839,24 +1879,31 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
1839
1879
return ;
1840
1880
}
1841
1881
1842
- // Shrink to the actual amount of used data.
1843
- buf.Resize (nread);
1882
+ statistics_.data_received += nread;
1844
1883
1845
- IncrementCurrentSessionMemory (nread);
1846
- OnScopeLeave on_scope_leave ([&]() {
1847
- // Once finished handling this write, reset the stream buffer.
1848
- // The memory has either been free()d or was handed over to V8.
1849
- // We use `nread` instead of `buf.size()` here, because the buffer is
1850
- // cleared as part of the `.ToArrayBuffer()` call below.
1851
- DecrementCurrentSessionMemory (nread);
1884
+ if (UNLIKELY (stream_buf_offset_ > 0 )) {
1885
+ // This is a very unlikely case, and should only happen if the ReadStart()
1886
+ // call in OnStreamAfterWrite() immediately provides data. If that does
1887
+ // happen, we concatenate the data we received with the already-stored
1888
+ // pending input data, slicing off the already processed part.
1889
+ AllocatedBuffer new_buf = env ()->AllocateManaged (
1890
+ stream_buf_.len - stream_buf_offset_ + nread);
1891
+ memcpy (new_buf.data (),
1892
+ stream_buf_.base + stream_buf_offset_,
1893
+ stream_buf_.len - stream_buf_offset_);
1894
+ memcpy (new_buf.data () + stream_buf_.len - stream_buf_offset_,
1895
+ buf.data (),
1896
+ nread);
1897
+ buf = std::move (new_buf);
1898
+ nread = buf.size ();
1899
+ stream_buf_offset_ = 0 ;
1852
1900
stream_buf_ab_.Reset ();
1853
- stream_buf_allocation_.clear ();
1854
- stream_buf_ = uv_buf_init (nullptr , 0 );
1855
- });
1901
+ DecrementCurrentSessionMemory (stream_buf_offset_);
1902
+ }
1856
1903
1857
- // Make sure that there was no read previously active .
1858
- CHECK_NULL (stream_buf_. base );
1859
- CHECK_EQ (stream_buf_. len , 0 );
1904
+ // Shrink to the actual amount of used data .
1905
+ buf. Resize (nread );
1906
+ IncrementCurrentSessionMemory (nread );
1860
1907
1861
1908
// Remember the current buffer, so that OnDataChunkReceived knows the
1862
1909
// offset of a DATA frame's data into the socket read buffer.
@@ -1869,8 +1916,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
1869
1916
// to copy memory.
1870
1917
stream_buf_allocation_ = std::move (buf);
1871
1918
1872
- statistics_.data_received += nread;
1873
- ssize_t ret = Write (&stream_buf_, 1 );
1919
+ ssize_t ret = ConsumeHTTP2Data ();
1874
1920
1875
1921
if (UNLIKELY (ret < 0 )) {
1876
1922
Debug (this , " fatal error receiving data: %d" , ret);
0 commit comments