Skip to content

Commit b2836e1

Browse files
committed
UPD | http1: send chunked data
1 parent 632d4b9 commit b2836e1

File tree

5 files changed

+129
-33
lines changed

5 files changed

+129
-33
lines changed

include/components/ManapiBuffer.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace manapi {
1818

1919
public:
2020
enum flags {
21-
BYTEBUFFER_FLAG_OBJECT_POOL
21+
BYTEBUFFER_FLAG_OBJECT_POOL = 1
2222
};
2323

2424
bytebuffer ();

src/http/HTTPv1_1.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ int manapi::net::http::http_v1_1_chunked_read(http_v1_1_chunked_t *ctx, worker::
534534
case HTTP_V1_1_CHUNK_BODY: {
535535
auto const copy = std::min(size - pos, static_cast<ssize_t>(ctx->left));
536536
if (copy) {
537-
if (worker->event_flags(conn) & ev::READ) {
537+
if (!ctx->top.last_deque && worker->event_flags(conn) & ev::READ) {
538538
worker->feed_event(conn, ev::READ, buffer + pos, copy, nullptr /* no way :( */);
539539
}
540540
else {

src/http/base_http.cpp

Lines changed: 120 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "ManapiHttpRequest.hpp"
1616
#include "ManapiHttpResponse.hpp"
1717
#include "../include/ManapiDefaultErrors.hpp"
18+
#include "crypto/ManapiCryptoUtils.hpp"
1819

1920
static const std::set<std::string> methods = {"POST", "GET", "HEAD", "OPTIONS", "TRACE", "PUT", "DELETE", "PATCH", "CONNECT"};
2021

@@ -432,6 +433,57 @@ manapi::future<> manapi::net::http::internal::send_response_formdata(uq_handle_d
432433
co_return;
433434
}
434435

436+
bool http_v1_1_is_chunked_data (manapi::net::http::internal::uq_handle_data_t &cdata, manapi::net::http::response *resp) {
437+
if (cdata->conn->version == manapi::net::http::versions::HTTP_v1_1
438+
&& !resp->headers().contains(manapi::net::http::HEADER.CONTENT_LENGTH)) {
439+
return true;
440+
}
441+
return false;
442+
}
443+
444+
manapi::future<> send_http_v1_1_chunked_data (manapi::net::http::internal::uq_handle_data_t &cdata, ssize_t rhs, const void *buffer, std::size_t size, bool &finish) {
445+
if (rhs < 0)
446+
THROW_MANAPIHTTP_EXCEPTION2(manapi::ERR_INVALID_ARGUMENT, "The callback returned an invalid length");
447+
448+
std::string_view const msg = {"0\r\n\r\n"};
449+
450+
if (rhs) {
451+
char header[20];
452+
453+
auto res = std::to_chars(header, header + sizeof (header), rhs, 16);
454+
455+
if (res.ec != std::errc())
456+
THROW_MANAPIHTTP_EXCEPTION(manapi::ERR_INTERNAL, "to_chars failed: {}", std::make_error_code(res.ec).message());
457+
458+
auto len = static_cast<ssize_t>(res.ptr - header);
459+
460+
if (len + 2 > sizeof (header) - 1)
461+
/* TODO: think */
462+
goto err;
463+
464+
memcpy (header + len, static_cast<const char *>("\r\n"), 2);
465+
len += 2;
466+
467+
if (co_await cdata->worker->fwrite(cdata->conn, header, len, false) <= 0)
468+
goto err;
469+
470+
if (co_await cdata->worker->fwrite(cdata->conn, buffer, rhs, false) <= 0)
471+
goto err;
472+
473+
if (co_await cdata->worker->fwrite(cdata->conn, msg.data() + 1, 2, false) <= 0)
474+
goto err;
475+
}
476+
477+
if (finish) {
478+
if (co_await cdata->worker->fwrite(cdata->conn, msg.data(),
479+
static_cast<ssize_t>(msg.size()), rhs == 0) <= 0)
480+
goto err;
481+
}
482+
co_return;
483+
err:
484+
THROW_MANAPIHTTP_EXCEPTION2(manapi::ERR_ABORTED, "write(...) failed");
485+
}
486+
435487
void manapi::net::http::internal::send_response_sync_cb(uq_handle_data_t cdata, std::unique_ptr<response> res, response_features_t features) {
436488
if (features.compressor_for_file || features.compressor_for_string) {
437489
THROW_MANAPIHTTP_EXCEPTION2 (ERR_FAILED_PRECONDITION, "Compression isn't supported");
@@ -441,6 +493,10 @@ void manapi::net::http::internal::send_response_sync_cb(uq_handle_data_t cdata,
441493
THROW_MANAPIHTTP_EXCEPTION2 (ERR_FAILED_PRECONDITION, "Replacers isn't supported");
442494
}
443495

496+
497+
if (http_v1_1_is_chunked_data(cdata, res.get()))
498+
res->header(http::HEADER.TRANSFER_ENCODING, "chunked");
499+
444500
auto task = mask_response(cdata.get(), res.get(), false);
445501
manapi::async::run<ssize_t> ( std::move(task),
446502
[res = std::move(res), cdata = std::move(cdata)] (std::exception_ptr err, ssize_t *result) mutable
@@ -454,25 +510,35 @@ void manapi::net::http::internal::send_response_sync_cb(uq_handle_data_t cdata,
454510
auto cb_sync = std::make_unique<http::response::resp_callback_sync>(std::move(res->callback_sync()));
455511
manapi::async::run ([res = std::move(res), cb_sync = std::move(cb_sync), cdata = std::move(cdata)] () mutable
456512
-> manapi::future<> {
457-
auto buffer = manapi::async::current()->memory_fabric().buffer (res->config()->buffer_size);
513+
auto buffer = manapi::async::current()->memory_fabric().buffer (
514+
std::max(res->config()->buffer_size, 64L));
458515

459516
bool finish = false;
460517
std::size_t cursor = 0;
461518

462-
while (!finish) {
463-
auto rhs = cb_sync->operator()(buffer.data() + cursor, buffer.size() - cursor, finish);
464-
if (rhs < 0) {
465-
THROW_MANAPIHTTP_EXCEPTION2(ERR_INVALID_ARGUMENT, "The callback returned an invalid length");
519+
520+
if (http_v1_1_is_chunked_data(cdata, res.get())) {
521+
while (!finish) {
522+
auto rhs = cb_sync->operator()(buffer.data(), buffer.size(), finish);
523+
co_await send_http_v1_1_chunked_data(cdata, rhs, buffer.data(), buffer.size(), finish);
466524
}
467-
rhs = co_await cdata->worker->write(cdata->conn, buffer.data() + cursor, rhs, finish);
525+
}
526+
else {
527+
while (!finish) {
528+
auto rhs = cb_sync->operator()(buffer.data() + cursor, buffer.size() - cursor, finish);
529+
if (rhs < 0) {
530+
THROW_MANAPIHTTP_EXCEPTION2(ERR_INVALID_ARGUMENT, "The callback returned an invalid length");
531+
}
532+
rhs = co_await cdata->worker->write(cdata->conn, buffer.data() + cursor, rhs, finish);
468533

469-
if (rhs <= 0)
470-
co_return;
534+
if (rhs <= 0)
535+
co_return;
471536

472-
cursor += rhs;
537+
cursor += rhs;
473538

474-
if (buffer.size() == cursor)
475-
cursor = 0;
539+
if (buffer.size() == cursor)
540+
cursor = 0;
541+
}
476542
}
477543
});
478544
}
@@ -488,6 +554,9 @@ void manapi::net::http::internal::send_response_stream_cb(uq_handle_data_t cdata
488554
THROW_MANAPIHTTP_EXCEPTION2 (ERR_FAILED_PRECONDITION, "Replacers isn't supported");
489555
}
490556

557+
if (http_v1_1_is_chunked_data(cdata, res.get()))
558+
res->header(http::HEADER.TRANSFER_ENCODING, "chunked");
559+
491560
auto task = mask_response(cdata.get(), res.get(), false);
492561
manapi::async::run<ssize_t> (std::move(task),
493562
[res = std::move(res), cdata = std::move(cdata)] (std::exception_ptr err, ssize_t *result) mutable
@@ -498,13 +567,24 @@ void manapi::net::http::internal::send_response_stream_cb(uq_handle_data_t cdata
498567
}
499568

500569
if (result && *result >= 0) {
501-
auto reserved = res->config()->buffer_size;
502570
auto cb_async = std::make_unique<http::response::resp_stream>(std::move(res->callback_stream()));
503-
manapi::async::run ([res = std::move(res), reserved, cb_async = std::move(cb_async), cdata = std::move(cdata)] () mutable
571+
manapi::async::run ([res = std::move(res), cb_async = std::move(cb_async), cdata = std::move(cdata)] () mutable
504572
-> manapi::future<> {
505-
co_await cb_async->operator()([&cdata] (const void *buffer, ssize_t size, bool fin) -> manapi::future<ssize_t> {
506-
co_return co_await cdata->worker->fwrite(cdata->conn, buffer, size, fin);
507-
});
573+
if (http_v1_1_is_chunked_data(cdata, res.get())) {
574+
co_await cb_async->operator()([&cdata]
575+
(const void *buffer, ssize_t size, bool fin)
576+
-> manapi::future<ssize_t> {
577+
co_await send_http_v1_1_chunked_data(cdata, size, buffer, size, fin);
578+
co_return size;
579+
});
580+
}
581+
else {
582+
co_await cb_async->operator()([&cdata]
583+
(const void *buffer, ssize_t size, bool fin)
584+
-> manapi::future<ssize_t> {
585+
co_return co_await cdata->worker->fwrite(cdata->conn, buffer, size, fin);
586+
});
587+
}
508588
cdata->cb->call(true);
509589
});
510590
}
@@ -521,6 +601,9 @@ void manapi::net::http::internal::send_response_async_cb(uq_handle_data_t cdata,
521601
THROW_MANAPIHTTP_EXCEPTION2 (ERR_FAILED_PRECONDITION, "Replacers isn't supported");
522602
}
523603

604+
if (http_v1_1_is_chunked_data(cdata, res.get()))
605+
res->header(http::HEADER.TRANSFER_ENCODING, "chunked");
606+
524607
auto task = mask_response(cdata.get(), res.get(), false);
525608
manapi::async::run<ssize_t> (std::move(task),
526609
[res = std::move(res), cdata = std::move(cdata)] (std::exception_ptr err, ssize_t *result) mutable
@@ -534,24 +617,33 @@ void manapi::net::http::internal::send_response_async_cb(uq_handle_data_t cdata,
534617
auto cb_async = std::make_unique<http::response::resp_callback_async>(std::move(res->callback_async()));
535618
manapi::async::run ([res = std::move(res), cb_async = std::move(cb_async), cdata = std::move(cdata)] () mutable
536619
-> manapi::future<> {
537-
auto buffer = manapi::async::current()->memory_fabric().buffer(res->config()->buffer_size);
620+
auto buffer = manapi::async::current()->memory_fabric().buffer(
621+
std::max(res->config()->buffer_size, 64L));
538622

539-
std::size_t cursor = 0;
540623
bool finish = false;
541624

542-
while (!finish) {
543-
auto rhs = co_await cb_async->operator()(buffer.data() + cursor, buffer.size() - cursor, finish);
544-
if (rhs < 0)
545-
THROW_MANAPIHTTP_EXCEPTION2(ERR_INVALID_ARGUMENT, "The callback returned an invalid length");
625+
if (http_v1_1_is_chunked_data(cdata, res.get())) {
626+
while (!finish) {
627+
auto rhs = co_await cb_async->operator()(buffer.data(), buffer.size(), finish);
628+
co_await send_http_v1_1_chunked_data(cdata, rhs, buffer.data(), buffer.size(), finish);
629+
}
630+
}
631+
else {
632+
std::size_t cursor = 0;
633+
while (!finish) {
634+
auto rhs = co_await cb_async->operator()(buffer.data() + cursor, buffer.size() - cursor, finish);
635+
if (rhs < 0)
636+
THROW_MANAPIHTTP_EXCEPTION2(ERR_INVALID_ARGUMENT, "The callback returned an invalid length");
546637

547-
rhs = co_await cdata->worker->write(cdata->conn, buffer.data() + cursor, rhs, finish);
548-
if (rhs <= 0)
549-
co_return;
638+
rhs = co_await cdata->worker->write(cdata->conn, buffer.data() + cursor, rhs, finish);
639+
if (rhs <= 0)
640+
co_return;
550641

551-
cursor += rhs;
642+
cursor += rhs;
552643

553-
if (cursor == buffer.size())
554-
cursor = 0;
644+
if (cursor == buffer.size())
645+
cursor = 0;
646+
}
555647
}
556648
});
557649
}

src/worker/TLS.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ int manapi::net::worker::TLS::event_flags(const shared_conn & conn, int flags) n
181181
}
182182
}
183183

184+
if ((status & (CONN_READ|CONN_CLOSED|CONN_REMOVED)) == 0
185+
&& data->watcher->is_active()) {
186+
data->watcher->read_stop();
187+
}
188+
184189
if ((status & CONN_RECV_END) && (status & CONN_READ & flags) && data->ev_callback) {
185190
data->ev_callback->operator()(conn, CONN_RECV_END, nullptr, 0, nullptr);
186191
}
@@ -604,8 +609,7 @@ int manapi::net::worker::TLS::ssl_flush_recv(const shared_conn &conn, connection
604609
}
605610
}
606611

607-
if (data->status & ev::READ
608-
&& !(data->status & (CONN_CLOSED|CONN_REMOVED))
612+
if (data->status & ((CONN_READ|CONN_CLOSED|CONN_REMOVED)) == CONN_READ
609613
&& !data->watcher->is_active()) {
610614
data->watcher->read_start();
611615
}

src/worker/default_http2.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ manapi::future<ssize_t> manapi::net::worker::http_v2::response(const shared_conn
7575
data, resp->status_code(), std::move(resp->headers()), finish);
7676
}
7777

78-
int manapi::net::worker::http_v2::event_flags(const shared_conn & conn) {
78+
int manapi::net::worker::http_v2::event_flags(const shared_conn & conn) {
7979
auto const data = conn->as<http::http_v2_stream_t>();
8080
data->speed_min_delay = static_cast<int>(this->w->config()->speed_check_delay);
8181
return data->flags & CONN_MASK_GETTING;

0 commit comments

Comments
 (0)