@@ -94,14 +94,14 @@ manapi::future<std::string> manapi::net::http::request::text() {
94
94
size_t j = 0 ;
95
95
// size_t socket_block_size = http_server->get_socket_block_size();
96
96
97
- co_await this ->read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&body, &j] (const char *data, ssize_t size) -> ssize_t {
97
+ co_await this ->read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&body, &j] (const char *data, ssize_t size, bool fin ) -> ssize_t {
98
98
memcpy (body.data () + j, data, size);
99
99
j += size;
100
100
return size;
101
101
});
102
102
}
103
103
else {
104
- co_await this ->read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&body] (const char *data, ssize_t size)
104
+ co_await this ->read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&body] (const char *data, ssize_t size, bool fin )
105
105
-> ssize_t { body.append (data, size); return size; });
106
106
}
107
107
@@ -115,14 +115,14 @@ manapi::future<manapi::json> manapi::net::http::request::json()
115
115
116
116
if (post_mask) {
117
117
json_builder builder = json_builder (*post_mask);
118
- co_await read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&builder] (const char *data, ssize_t size)
118
+ co_await read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&builder] (const char *data, ssize_t size, bool fin )
119
119
-> ssize_t { builder << std::string_view (data, size); return size; });
120
120
121
121
co_return std::move (builder.get ());
122
122
}
123
123
else {
124
124
json_builder builder = json_builder ();
125
- co_await read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&builder] (const char *data, ssize_t size)
125
+ co_await read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,[&builder] (const char *data, ssize_t size, bool fin )
126
126
-> ssize_t { builder << std::string_view (data, size); return size; });
127
127
128
128
co_return std::move (builder.get ());
@@ -134,11 +134,11 @@ manapi::future<> manapi::net::http::request::form (formdata_recv::onparam_cb_t c
134
134
co_await fdata.get (std::move (cb));
135
135
}
136
136
137
- manapi::future<> manapi::net::http::request::callback_sync ( std::move_only_function< ssize_t ( const char *buffer, ssize_t size)> callback) {
137
+ manapi::future<> manapi::net::http::request::callback_sync (onrecv_sync_cb callback) {
138
138
co_return co_await this ->read_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,std::move (callback));
139
139
}
140
140
141
- manapi::future<> manapi::net::http::request::callback_async (std::move_only_function<manapi::future< ssize_t >( const char *buffer, ssize_t size)> callback) {
141
+ manapi::future<> manapi::net::http::request::callback_async (onrecv_async_cb callback) {
142
142
co_return co_await this ->read_async_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data ,std::move (callback));
143
143
}
144
144
@@ -153,7 +153,7 @@ manapi::future<> manapi::net::http::request::file(std::string filepath) {
153
153
std::exception_ptr err{nullptr };
154
154
155
155
try {
156
- co_await this ->read_async_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data , [&] (const char *data, ssize_t size)
156
+ co_await this ->read_async_body_ (this ->worker_ .get (), this ->conn_ , this ->request_data , [&] (const char *data, ssize_t size, bool fin )
157
157
-> manapi::future<ssize_t > { return f.write (data, size); });
158
158
}
159
159
catch (...) {
@@ -240,7 +240,7 @@ bool manapi::net::http::request::propagation() const {
240
240
return this ->flags & internal::REQUEST_FLAG_IS_PROPAGATION;
241
241
}
242
242
243
- manapi::future<void > manapi::net::http::request::read_body_ (worker::base *worker, worker::shared_conn *conn, request_data_t *req, std::move_only_function< ssize_t ( const char *, ssize_t )> handler) {
243
+ manapi::future<void > manapi::net::http::request::read_body_ (worker::base *worker, worker::shared_conn *conn, request_data_t *req, onrecv_sync_cb handler) {
244
244
using promise = manapi::async::promise<void , std::false_type>;
245
245
std::unique_ptr<worker::worker_watcher_cb> prev{nullptr };
246
246
int pflags;
@@ -262,16 +262,36 @@ manapi::future<void> manapi::net::http::request::read_body_(worker::base *worker
262
262
if (flags & ev::READ) {
263
263
try {
264
264
ssize_t size;
265
- if (req->body_size >= 0 )
265
+ bool flg = false ;
266
+
267
+ if (req->body_size >= 0 ) {
266
268
size = std::min (req->body_size , static_cast <ssize_t > (nsize));
269
+ if (req->body_size == size)
270
+ flg = true ;
271
+ }
267
272
else
268
273
size = static_cast <ssize_t > (nsize);
269
274
275
+ if (flags & worker::base::CONN_RECV_END) {
276
+ flags ^= worker::base::CONN_RECV_END;
277
+ flg = true ;
278
+
279
+ if (!nsize) {
280
+ if (handler (nullptr , 0 , flg) < 0 ) {
281
+ reject (std::make_exception_ptr (RETHROW_MANAPIHTTP_EXCEPTION (
282
+ manapi::ERR_INVALID_ARGUMENT, manapi::error::default_msgs[manapi::error::ERRMSG_CUSTOM_CALLBACK_ERR1], " invalid result" )));
283
+ goto finish;
284
+ }
285
+ resolve ();
286
+ goto finish;
287
+ }
288
+ }
289
+
270
290
ssize_t rhs = 0 ;
271
291
while (rhs < size) {
272
292
auto const copy = size - rhs;
273
293
274
- auto const res = handler (buffer + rhs, copy);
294
+ auto const res = handler (buffer + rhs, copy, flg );
275
295
if (res >= 0 ) {
276
296
if (copy > res) {
277
297
reject (std::make_exception_ptr (RETHROW_MANAPIHTTP_EXCEPTION (
@@ -343,7 +363,7 @@ manapi::future<void> manapi::net::http::request::read_body_(worker::base *worker
343
363
}
344
364
}
345
365
346
- manapi::future<> manapi::net::http::request::read_async_body_ (worker::base *worker, worker::shared_conn *conn, request_data_t *req, std::move_only_function<manapi::future< ssize_t >( const char *, ssize_t )> handler) {
366
+ manapi::future<> manapi::net::http::request::read_async_body_ (worker::base *worker, worker::shared_conn *conn, request_data_t *req, onrecv_async_cb handler) {
347
367
using promise = manapi::async::promise<void , std::false_type>;
348
368
using handler_t = decltype (handler);
349
369
@@ -402,21 +422,41 @@ manapi::future<> manapi::net::http::request::read_async_body_(worker::base *work
402
422
ctx_cb.worker ->event_flags (conn, 0 );
403
423
ctx_cb.cnt ++;
404
424
manapi::async::run (manapi::async::invoke (
405
- [] (const worker::shared_conn & conn, worker::ibuffpool_t p, const char * buffer, ssize_t nsize, ctx_cb_t_ *ctx_cb) -> manapi::future<> {
425
+ [] (const worker::shared_conn & conn, worker::ibuffpool_t p, const char * buffer, ssize_t nsize, ctx_cb_t_ *ctx_cb, int flags ) -> manapi::future<> {
406
426
try {
407
427
408
428
ssize_t size;
409
-
410
- if (ctx_cb->req ->body_size >= 0 )
429
+ bool flg = false ;
430
+ if (ctx_cb->req ->body_size >= 0 ) {
411
431
size = std::min (ctx_cb->req ->body_size , static_cast <ssize_t > (nsize));
412
- else
432
+
433
+ if (ctx_cb->req ->body_size == size)
434
+ flg = true ;
435
+ }
436
+ else {
413
437
size = static_cast <ssize_t > (nsize);
438
+ }
439
+
440
+ if (flags & worker::base::CONN_RECV_END) {
441
+ flags ^= worker::base::CONN_RECV_END;
442
+ flg = true ;
443
+
444
+ if (!nsize) {
445
+ if (co_await ctx_cb->handler (nullptr , 0 , flg) < 0 ) {
446
+ ctx_cb->reject (std::make_exception_ptr (RETHROW_MANAPIHTTP_EXCEPTION (
447
+ manapi::ERR_INVALID_ARGUMENT, manapi::error::default_msgs[manapi::error::ERRMSG_CUSTOM_CALLBACK_ERR1], " invalid result" )));
448
+ goto finish;
449
+ }
450
+ ctx_cb->resolve ();
451
+ goto finish;
452
+ }
453
+ }
414
454
415
455
ssize_t rhs = 0 ;
416
456
while (rhs < size) {
417
457
auto const copy = size - rhs;
418
458
419
- auto const res = co_await ctx_cb->handler (buffer + rhs, copy);
459
+ auto const res = co_await ctx_cb->handler (buffer + rhs, copy, flg );
420
460
if (res >= 0 ) {
421
461
if (copy > res) {
422
462
ctx_cb->reject (std::make_exception_ptr (RETHROW_MANAPIHTTP_EXCEPTION (
@@ -448,26 +488,27 @@ manapi::future<> manapi::net::http::request::read_async_body_(worker::base *work
448
488
ctx_cb->resolve ();
449
489
goto finish;
450
490
}
491
+
451
492
}
452
493
catch (...) {
453
494
ctx_cb->reject (std::current_exception ());
454
495
goto finish;
455
496
}
456
497
457
- ctx_cb->worker ->waiting (conn, true );
458
498
ctx_cb->worker ->event_flags (conn, ev::READ);
459
499
ctx_cb->cnt --;
460
500
ctx_cb->mx .unlock ();
501
+
461
502
co_return ;
462
503
463
504
finish: {
464
- auto & ctx_cb_ = ctx_cb;
505
+ auto const ctx_cb_ = ctx_cb;
465
506
ctx_cb_->worker ->event_flags (conn, 0 );
466
507
ctx_cb_->worker ->event_on (conn, nullptr );
467
508
ctx_cb_->cnt --;
468
509
ctx_cb_->mx .unlock ();
469
510
}
470
- }, ctx_cb.conn , std::move (*p), buffer, nsize, &ctx_cb));
511
+ }, ctx_cb.conn , std::move (*p), buffer, nsize, &ctx_cb, flags ));
471
512
}
472
513
else if (flags & worker::base::CONN_RECV_END) {
473
514
ctx_cb.resolve ();
0 commit comments