Skip to content

Commit 0e499a2

Browse files
committed
UPD | mda
1 parent ffe9962 commit 0e499a2

File tree

4 files changed

+55
-33
lines changed

4 files changed

+55
-33
lines changed

src/http/HTTPv2.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include "components/ManapiURLDecodeStream.hpp"
77

88
#include "worker/default_http2.hpp"
9+
#include "worker/TCP.hpp"
910

1011
enum http_v2_priority {
1112
HTTP2_PRIORITY_0 = 0,
@@ -168,6 +169,7 @@ int http_v2_send_frame (manapi::net::http::http_v2_t *ctx, int frame_type, uint
168169

169170
if (!(ctx->flags & manapi::net::http::HTTP2_CTX_FLAG_BLOCK_WRITE)
170171
&& !ctx->worker->is_writable(ctx->conn)) {
172+
auto b = ctx->conn->as<manapi::net::worker::TCP::connection_interface>();
171173
ctx->flags |= manapi::net::http::HTTP2_CTX_FLAG_BLOCK_WRITE;
172174
ctx->worker->event_toggle(ctx->conn,
173175
true, manapi::ev::WRITE);
@@ -436,8 +438,10 @@ int manapi::net::http::http_v2_on_close (http_v2_t *ctx) {
436438
ctx->timeout = nullptr;
437439
}
438440

439-
for (const auto &s : *ctx->streams) {
440-
ctx->http_v2_worker->close_connection(s.second, false);
441+
if (ctx->streams) {
442+
for (const auto &s : *ctx->streams) {
443+
ctx->http_v2_worker->close_connection(s.second, false);
444+
}
441445
}
442446

443447
return 0;
@@ -582,9 +586,8 @@ int manapi::net::http::http_v2_on_close_stream(http_v2_t *ctx, int id) {
582586
}
583587

584588
int manapi::net::http::http_v2_on_write(http_v2_t *ctx) {
585-
if (ctx->flags & HTTP2_CTX_FLAG_BLOCK_WRITE) {
589+
if (ctx->flags & HTTP2_CTX_FLAG_BLOCK_WRITE)
586590
ctx->flags ^= HTTP2_CTX_FLAG_BLOCK_WRITE;
587-
}
588591

589592
bool no_one = true;
590593
for (const auto &s : *ctx->streams) {

src/worker/TCP.cpp

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,14 @@ std::shared_ptr<manapi::net::worker::TCP> manapi::net::worker::TCP::create(net::
223223
}
224224

225225
manapi::net::worker::shared_conn manapi::net::worker::TCP::accept (ev::shared_tcp &w, std::move_only_function<shared_conn()> init) {
226+
/**
227+
* Receving using 65k buffers,
228+
* but if we copy that buffer we
229+
* should copy to 4k buffers
230+
*
231+
* I mean only one 65k buffer
232+
* can be exists by thread
233+
*/
226234
auto connection = init();
227235

228236
if (!connection)
@@ -235,15 +243,16 @@ manapi::net::worker::shared_conn manapi::net::worker::TCP::accept (ev::shared_tc
235243
bytebuffer object;
236244
auto const connection = weak.lock();
237245

238-
if (buf->base) {
246+
if (buf->base)
239247
object = this->bufferpool().buffer(buf->base, buf->len);
240-
}
241248

242-
if (!nread) {
249+
if (!nread)
243250
return;
244-
}
245251

246252
if (nread < 0) {
253+
if (errno == EAGAIN || errno == EWOULDBLOCK)
254+
return;
255+
247256
/* maybe EOF */
248257
this->close_connection(connection, false);
249258
return;
@@ -560,9 +569,10 @@ int manapi::net::worker::TCP::event_flags(const shared_conn & conn) {
560569
int manapi::net::worker::TCP::flush_write_(const worker::shared_conn &connection, bool flush) {
561570
auto conn = connection->as<connection_interface>();
562571

563-
if ((conn->top->cur_send_size > this->config_->max_merge_buffer_stack)
564-
|| ((conn->top->cur_send_size == this->config_->max_merge_buffer_stack) && (conn->top->send.last_deque->buffer.size() == conn->top->send.deque_cursor))
572+
while ((conn->top->cur_send_size >= this->config_->max_merge_buffer_stack)
573+
//|| ((conn->top->cur_send_size == this->config_->max_merge_buffer_stack) && (conn->top->send.last_deque->buffer.size() == conn->top->send.deque_cursor))
565574
|| (flush && conn->top->cur_send_size)) {
575+
566576
std::unique_ptr<ev::buff_t, ev::buffer_deleter> s;
567577
s.reset(new ev::buff_t[conn->top->cur_send_size]);
568578

@@ -597,9 +607,9 @@ int manapi::net::worker::TCP::flush_write_(const worker::shared_conn &connection
597607
if (current && current->next)
598608
conn->top->send.deque = std::move(current->next);
599609

600-
auto rhs = conn->watcher->try_write(buffptr, conn->top->cur_send_size);
610+
ssize_t rhs = conn->watcher->try_write(buffptr, conn->top->cur_send_size);
601611
if (request == rhs) {
602-
conn->top->send_size = 0;
612+
conn->top->send_size -= conn->top->cur_send_size;
603613
conn->top->cur_send_size = 0;
604614
}
605615
else {
@@ -635,10 +645,12 @@ int manapi::net::worker::TCP::flush_write_(const worker::shared_conn &connection
635645
auto w = manapi::async::current()->eventloop()
636646
->create_watcher_write(conn->watcher.get(), [connection, b = std::move(sent), s = std::move(s)]
637647
(std::shared_ptr<ev::write> &w, int status)
638-
-> void {
648+
mutable -> void {
639649
auto conn = connection->as<connection_interface>();
640650

641651
conn->top->send_size -= w->custom()->nbufs;
652+
s.reset();
653+
b.reset();
642654

643655
if (status) {
644656
/* error */
@@ -759,7 +771,7 @@ bool manapi::net::worker::TCP::update_limit_rate_connection(const shared_conn &s
759771
if (conn_data->status & ev::READ)
760772
conn_data->watcher->read_start();
761773

762-
if (conn_data->status & ev::WRITE)
774+
if (conn_data->status & ev::WRITE && conn_data->ev_callback)
763775
conn_data->ev_callback->operator()(sconn, ev::WRITE, nullptr, 0, nullptr);
764776
}
765777
else {

src/worker/TLS.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,15 @@ ssize_t manapi::net::worker::TLS::sync_write_ex(const shared_conn &conn, ev::buf
182182

183183
return total;
184184
#else
185-
char buffer[65536];
185+
char buffer[32768];
186186
size_t cursor = 0;
187187
size_t lastcur = 0;
188188
ssize_t total = 0;
189189

190190
while (nbuff) {
191+
if (connection->top->send_size > maxcnt)
192+
break;
193+
191194
auto const copy = std::min<std::size_t>(buff->len - lastcur, sizeof (buffer) - cursor);
192195
memcpy (buffer + cursor, buff->base + lastcur, copy);
193196

src/worker/base_worker.cpp

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ manapi::future<ssize_t> manapi::net::worker::base::write(const shared_conn &conn
9898

9999
manapi::future<ssize_t> manapi::net::worker::base::write(const shared_conn &conn, ev::buff_t *buff, uint32_t nbuff, bool finish) {
100100
using promise = manapi::async::promise<ssize_t, std::false_type>;
101-
101+
assert(nbuff > 0);
102102
auto rhs = this->sync_write(conn, buff, nbuff, finish);
103103

104104
if (rhs)
@@ -162,9 +162,9 @@ manapi::future<ssize_t> manapi::net::worker::base::fwrite(const shared_conn &con
162162
ssize_t rhs = 0;
163163
while (total < size) {
164164
rhs = co_await this->write(conn, static_cast<const char *>(buff) + total, size - total, finish);
165-
if (rhs <= 0) {
165+
if (rhs <= 0)
166166
co_return rhs;
167-
}
167+
168168
total += rhs;
169169
}
170170
co_return total;
@@ -175,11 +175,9 @@ manapi::future<ssize_t> manapi::net::worker::base::fwrite(const shared_conn &con
175175
ssize_t rhs = 0;
176176
auto const size = slice.size();
177177

178-
auto buffs = slice.slices_buffs();
179-
auto buffptr = buffs.get();
180-
uint32_t nbuff = slice.slices_size();
181178
while (total < size) {
182-
rhs = co_await this->write(conn, buffptr, nbuff, finish);
179+
auto buffs = slice.slices_buffs();
180+
rhs = co_await this->write(conn, buffs.get(), slice.slices_size(), finish);
183181
if (rhs <= 0)
184182
co_return rhs;
185183

@@ -188,16 +186,18 @@ manapi::future<ssize_t> manapi::net::worker::base::fwrite(const shared_conn &con
188186
if (total == size)
189187
break;
190188

191-
while (nbuff && rhs >= buffptr->len) {
192-
rhs -= buffptr->len;
193-
buffptr++;
194-
nbuff--;
195-
}
196-
197-
if (nbuff) {
198-
buffptr->base += rhs;
199-
buffptr->len -= rhs;
200-
}
189+
slice = slice.subslice(rhs).value();
190+
//
191+
// while (nbuff && rhs >= buffptr->len) {
192+
// rhs -= buffptr->len;
193+
// buffptr++;
194+
// nbuff--;
195+
// }
196+
//
197+
// if (nbuff) {
198+
// buffptr->base += rhs;
199+
// buffptr->len -= rhs;
200+
// }
201201
}
202202
co_return total;
203203
}
@@ -291,7 +291,7 @@ ssize_t manapi::net::worker::base::connection_io_send(connection_io_part *top, c
291291
ssize_t rhs = 0;
292292
while (rhs != size) {
293293
if (!top->last_deque
294-
|| top->deque_cursor == top->deque->buffer.size()) {
294+
|| top->deque_cursor == top->last_deque->buffer.size()) {
295295
if (cnt && *cnt >= max_cnt)
296296
break;
297297

@@ -333,6 +333,10 @@ void manapi::net::worker::base::connection_io_send_start(connection_io_part *top
333333
return;
334334
}
335335

336+
if (buff && buff->size() > buffer_size)
337+
/* it may be a recv buffer */
338+
buff = nullptr;
339+
336340
ibuffpool_t tmp;
337341
if (buff) {
338342
buff->shift_add(static_cast<int>(buff->size() - size));

0 commit comments

Comments
 (0)