Skip to content

Commit 34fb4f1

Browse files
authored
add incoming streaming for http proxy (#18211)
1 parent f91acf5 commit 34fb4f1

File tree

9 files changed

+421
-48
lines changed

9 files changed

+421
-48
lines changed

ydb/library/actors/http/http.cpp

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ bool THttpParser<THttpRequest>::HasBody() const {
8989
}
9090

9191
template <>
92-
void THttpParser<THttpRequest>::Advance(size_t len) {
92+
size_t THttpParser<THttpRequest>::AdvancePartial(size_t len) {
9393
TStringBuf data(Pos(), len);
9494
while (!data.empty()) {
9595
if (Stage != EParseStage::Error) {
@@ -140,7 +140,7 @@ void THttpParser<THttpRequest>::Advance(size_t len) {
140140
if (Stage != EParseStage::Body) {
141141
break;
142142
}
143-
[[fallthrough]];
143+
return TSocketBuffer::Advance(len - data.size());
144144
}
145145
case EParseStage::Body: {
146146
if (TEqNoCase()(TransferEncoding, "chunked")) {
@@ -204,6 +204,7 @@ void THttpParser<THttpRequest>::Advance(size_t len) {
204204
// Invalid body encoding
205205
Stage = EParseStage::Error;
206206
}
207+
return TSocketBuffer::Advance(len - data.size());
207208
}
208209
}
209210
}
@@ -220,7 +221,7 @@ void THttpParser<THttpRequest>::Advance(size_t len) {
220221
break;
221222
}
222223
}
223-
TSocketBuffer::Advance(len);
224+
return TSocketBuffer::Advance(len - data.size());
224225
}
225226

226227
template <>
@@ -253,7 +254,7 @@ void THttpResponse::Clear() {
253254
}
254255

255256
template <>
256-
void THttpParser<THttpResponse>::Advance(size_t len) {
257+
size_t THttpParser<THttpResponse>::AdvancePartial(size_t len) {
257258
TStringBuf data(Pos(), len);
258259
while (!data.empty()) {
259260
if (Stage != EParseStage::Error) {
@@ -304,7 +305,7 @@ void THttpParser<THttpResponse>::Advance(size_t len) {
304305
if (Stage != EParseStage::Body) {
305306
break;
306307
}
307-
[[fallthrough]];
308+
return TSocketBuffer::Advance(len - data.size());
308309
}
309310
case EParseStage::Body: {
310311
if (TEqNoCase()(TransferEncoding, "chunked")) {
@@ -379,6 +380,7 @@ void THttpParser<THttpResponse>::Advance(size_t len) {
379380
// Invalid body encoding
380381
Stage = EParseStage::Error;
381382
}
383+
return TSocketBuffer::Advance(len - data.size());
382384
}
383385
}
384386
}
@@ -394,7 +396,7 @@ void THttpParser<THttpResponse>::Advance(size_t len) {
394396
break;
395397
}
396398
}
397-
TSocketBuffer::Advance(len);
399+
return TSocketBuffer::Advance(len - data.size());
398400
}
399401

400402
template <>
@@ -654,9 +656,14 @@ void THttpOutgoingResponse::AddDataChunk(THttpOutgoingDataChunkPtr dataChunk) {
654656
}
655657

656658
THttpOutgoingDataChunk::THttpOutgoingDataChunk(THttpOutgoingResponsePtr response, TStringBuf data)
657-
: THttpDataChunk(data)
658-
, Response(std::move(response))
659-
{}
659+
: Response(std::move(response))
660+
{
661+
if (data) {
662+
SetData(data);
663+
} else {
664+
SetEndOfData();
665+
}
666+
}
660667

661668
THttpOutgoingDataChunk::THttpOutgoingDataChunk(THttpOutgoingResponsePtr response)
662669
: Response(std::move(response))

ydb/library/actors/http/http.h

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ class TSocketBuffer : public TBuffer, public THttpConfig {
154154
TString AsString() const {
155155
return TString(Data(), Size());
156156
}
157+
158+
size_t Advance(size_t size) {
159+
TBuffer::Advance(size);
160+
return size;
161+
}
157162
};
158163

159164
class THttpRequest {
@@ -342,7 +347,25 @@ class THttpParser : public HeaderType, public TSocketBuffer {
342347
return result;
343348
}
344349

345-
void Advance(size_t len);
350+
[[nodiscard]] size_t AdvancePartial(size_t len);
351+
352+
void Advance(size_t len) {
353+
while (len > 0) {
354+
len -= AdvancePartial(len);
355+
}
356+
}
357+
358+
void TruncateToHeaders() {
359+
if (HasHeaders()) {
360+
auto begin = Data();
361+
auto end = Data() + Size();
362+
auto desiredEnd = HeaderType::Headers.data() + HeaderType::Headers.size();
363+
if (begin < desiredEnd && desiredEnd < end) {
364+
Resize(desiredEnd - begin);
365+
}
366+
}
367+
}
368+
346369
void ConnectionClosed();
347370

348371
size_t GetBodySizeFromTotalSize() const {
@@ -365,6 +388,14 @@ class THttpParser : public HeaderType, public TSocketBuffer {
365388
return Stage == EParseStage::Error;
366389
}
367390

391+
bool IsStartOfChunk() const {
392+
return Stage == EParseStage::ChunkLength;
393+
}
394+
395+
bool HasNewDataChunk() const {
396+
return IsStartOfChunk() && !Content.empty();
397+
}
398+
368399
TStringBuf GetErrorText() const {
369400
switch (LastSuccessStage) {
370401
case EParseStage::Method:
@@ -427,6 +458,10 @@ class THttpParser : public HeaderType, public TSocketBuffer {
427458

428459
bool HaveBody() const { return HasBody(); } // deprecated, use HasBody() instead
429460

461+
bool IsChunkedEncoding() const {
462+
return TEqNoCase()(HeaderType::TransferEncoding, "chunked");
463+
}
464+
430465
bool EnsureEnoughSpaceAvailable(size_t need = TSocketBuffer::BUFFER_MIN_STEP) {
431466
bool result = TSocketBuffer::EnsureEnoughSpaceAvailable(need);
432467
if (!result && !TSocketBuffer::Empty()) {
@@ -714,13 +749,10 @@ struct THttpEndpointInfo {
714749
class THttpDataChunk : public TSocketBuffer {
715750
public:
716751
bool EndOfData = false;
752+
size_t DataSize = 0;
717753

718754
THttpDataChunk() = default;
719755

720-
THttpDataChunk(TStringBuf data) {
721-
SetData(data);
722-
}
723-
724756
bool EnsureEnoughSpaceAvailable(size_t need = TSocketBuffer::BUFFER_MIN_STEP) {
725757
return TSocketBuffer::EnsureEnoughSpaceAvailable(need);
726758
}
@@ -730,9 +762,16 @@ class THttpDataChunk : public TSocketBuffer {
730762
TSocketBuffer::Append(text.data(), text.size());
731763
}
732764

765+
bool IsEndOfData() const {
766+
return EndOfData;
767+
}
768+
733769
void SetData(TStringBuf data) {
734-
EnsureEnoughSpaceAvailable(data.size() + 4/*crlfcrlf*/ + 16);
735-
Append(ToHex(data.size()) + "\r\n");
770+
TSocketBuffer::Clear();
771+
EndOfData = false;
772+
DataSize = data.size();
773+
EnsureEnoughSpaceAvailable(DataSize + 4/*crlfcrlf*/ + 16);
774+
Append(ToHex(DataSize) + "\r\n");
736775
Append(TStringBuf(data));
737776
Append("\r\n");
738777
}
@@ -743,10 +782,6 @@ class THttpDataChunk : public TSocketBuffer {
743782
EndOfData = true;
744783
}
745784
}
746-
747-
bool IsEndOfData() const {
748-
return EndOfData;
749-
}
750785
};
751786

752787
class THttpOutgoingDataChunk;

ydb/library/actors/http/http_config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ struct THttpConfig {
1414
static constexpr int LISTEN_QUEUE = 10;
1515
static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000);
1616
static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000);
17+
static constexpr size_t MAX_REUSABLE_CONNECTIONS = 100;
1718
using SocketType = TInet64StreamSocket;
1819
using SocketAddressType = std::shared_ptr<ISockAddr>;
1920
};

ydb/library/actors/http/http_proxy.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,13 @@ class THttpProxy : public NActors::TActorBootstrapped<THttpProxy>, public THttpC
133133
}
134134

135135
void Handle(TEvHttpProxy::TEvHttpOutgoingConnectionAvailable::TPtr& event) {
136-
ALOG_DEBUG(HttpLog, "Connection " << event->Get()->ConnectionID << " available for destination " << event->Get()->Destination);
137-
AvailableConnections.emplace(event->Get()->Destination, event->Get()->ConnectionID);
136+
if (AvailableConnections.size() < MAX_REUSABLE_CONNECTIONS) {
137+
ALOG_DEBUG(HttpLog, "Connection " << event->Get()->ConnectionID << " available for destination " << event->Get()->Destination);
138+
AvailableConnections.emplace(event->Get()->Destination, event->Get()->ConnectionID);
139+
} else {
140+
ALOG_DEBUG(HttpLog, "Connection " << event->Get()->ConnectionID << " not added to available connections, limit reached");
141+
Send(event->Get()->ConnectionID, new NActors::TEvents::TEvPoisonPill());
142+
}
138143
}
139144

140145
void Handle(TEvHttpProxy::TEvHttpOutgoingConnectionClosed::TPtr& event) {

ydb/library/actors/http/http_proxy.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ struct TEvHttpProxy {
4545
EvHttpIncomingRequest,
4646
EvHttpOutgoingRequest,
4747
EvHttpIncomingResponse,
48+
EvHttpIncompleteIncomingResponse,
4849
EvHttpOutgoingResponse,
4950
EvHttpIncomingConnectionClosed,
5051
EvHttpOutgoingConnectionClosed,
@@ -53,6 +54,7 @@ struct TEvHttpProxy {
5354
EvResolveHostRequest,
5455
EvResolveHostResponse,
5556
EvReportSensors,
57+
EvHttpIncomingDataChunk,
5658
EvHttpOutgoingDataChunk,
5759
EvSubscribeForCancel,
5860
EvRequestCancelled,
@@ -119,6 +121,7 @@ struct TEvHttpProxy {
119121
THttpOutgoingRequestPtr Request;
120122
TDuration Timeout;
121123
bool AllowConnectionReuse = false;
124+
std::vector<TString> StreamContentTypes;
122125

123126
TEvHttpOutgoingRequest(THttpOutgoingRequestPtr request)
124127
: Request(std::move(request))
@@ -166,6 +169,16 @@ struct TEvHttpProxy {
166169
}
167170
};
168171

172+
struct TEvHttpIncompleteIncomingResponse : NActors::TEventLocal<TEvHttpIncompleteIncomingResponse, EvHttpIncompleteIncomingResponse> {
173+
THttpOutgoingRequestPtr Request;
174+
THttpIncomingResponsePtr Response;
175+
176+
TEvHttpIncompleteIncomingResponse(THttpOutgoingRequestPtr request, THttpIncomingResponsePtr response)
177+
: Request(std::move(request))
178+
, Response(std::move(response))
179+
{}
180+
};
181+
169182
struct TEvHttpOutgoingResponse : NActors::TEventLocal<TEvHttpOutgoingResponse, EvHttpOutgoingResponse> {
170183
THttpOutgoingResponsePtr Response;
171184

@@ -187,6 +200,29 @@ struct TEvHttpProxy {
187200
{}
188201
};
189202

203+
struct TEvHttpIncomingDataChunk : NActors::TEventLocal<TEvHttpIncomingDataChunk, EvHttpIncomingDataChunk> {
204+
THttpIncomingResponsePtr Response;
205+
TString Data;
206+
TString Error;
207+
bool EndOfData = false;
208+
209+
TEvHttpIncomingDataChunk(THttpIncomingResponsePtr response)
210+
: Response(std::move(response))
211+
{}
212+
213+
void SetData(TString&& data) {
214+
Data = data;
215+
}
216+
217+
void SetEndOfData() {
218+
EndOfData = true;
219+
}
220+
221+
bool IsEndOfData() const {
222+
return EndOfData;
223+
}
224+
};
225+
190226
struct TEvHttpIncomingConnectionClosed : NActors::TEventLocal<TEvHttpIncomingConnectionClosed, EvHttpIncomingConnectionClosed> {
191227
TActorId ConnectionID;
192228
TDeque<THttpIncomingRequestPtr> RecycledRequests;

ydb/library/actors/http/http_proxy_acceptor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class TAcceptorActor : public NActors::TActor<TAcceptorActor>, public THttpConfi
3232
hFunc(NActors::TEvPollerReady, Handle);
3333
hFunc(TEvHttpProxy::TEvHttpIncomingConnectionClosed, Handle);
3434
hFunc(TEvHttpProxy::TEvReportSensors, Handle);
35+
cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
3536
}
3637
}
3738

ydb/library/actors/http/http_proxy_incoming.cpp

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
114114
OnAccept();
115115
}
116116

117+
TString GetRequestDebugText() {
118+
TStringBuilder text;
119+
if (CurrentRequest) {
120+
text << CurrentRequest->Method << " " << CurrentRequest->URL;
121+
if (CurrentRequest->Body) {
122+
text << ", " << CurrentRequest->Body.Size() << " bytes";
123+
}
124+
}
125+
return text;
126+
}
127+
117128
void HandleConnected(TEvPollerReady::TPtr& event) {
118129
if (event->Get()->Read) {
119130
for (;;) {
@@ -142,7 +153,7 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
142153
CurrentRequest->Timer.Reset();
143154
if (CurrentRequest->IsReady()) {
144155
if (Endpoint->RateLimiter.Check(TActivationContext::Now())) {
145-
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
156+
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -> (" << GetRequestDebugText() << ")");
146157
Send(Endpoint->Proxy, new TEvHttpProxy::TEvHttpIncomingRequest(CurrentRequest));
147158
CurrentRequest = nullptr;
148159
} else {
@@ -153,7 +164,7 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
153164
CleanupRequest(CurrentRequest);
154165
}
155166
} else if (CurrentRequest->IsError()) {
156-
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << CurrentRequest->Method << " " << CurrentRequest->URL << ")");
167+
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") -! (" << GetRequestDebugText() << ")");
157168
bool success = Respond(CurrentRequest->CreateResponseBadRequest());
158169
if (!success) {
159170
return;
@@ -209,6 +220,31 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
209220
Respond(event->Get()->Response);
210221
}
211222

223+
static TString GetChunkDebugText(THttpOutgoingDataChunkPtr chunk) {
224+
TStringBuilder text;
225+
if (chunk->DataSize) {
226+
text << "data chunk " << chunk->DataSize << " bytes";
227+
}
228+
if (chunk->DataSize && chunk->IsEndOfData()) {
229+
text << ", ";
230+
}
231+
if (chunk->IsEndOfData()) {
232+
text << "end of stream";
233+
}
234+
return text;
235+
}
236+
237+
static TString GetResponseDebugText(THttpOutgoingResponsePtr response) {
238+
TStringBuilder text;
239+
if (response) {
240+
text << response->Status << " " << response->Message;
241+
if (response->Body) {
242+
text << ", " << response->Body.Size() << " bytes";
243+
}
244+
}
245+
return text;
246+
}
247+
212248
void HandleConnected(TEvHttpProxy::TEvHttpOutgoingDataChunk::TPtr& event) {
213249
if (event->Get()->Error) {
214250
ALOG_ERROR(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - DataChunk error: " << event->Get()->Error);
@@ -225,8 +261,7 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
225261
return PassAway();
226262
}
227263
}
228-
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (data chunk "
229-
<< event->Get()->DataChunk->Size() << (event->Get()->DataChunk->IsEndOfData() ? " bytes, final)" : " bytes)"));
264+
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- (" << GetChunkDebugText(event->Get()->DataChunk) << ")");
230265
if (event->Get()->DataChunk->IsEndOfData()) {
231266
CancelSubscriber = nullptr;
232267
}
@@ -240,7 +275,7 @@ class TIncomingConnectionActor : public TActor<TIncomingConnectionActor<TSocketI
240275
bool Respond(THttpOutgoingResponsePtr response) {
241276
THttpIncomingRequestPtr request = response->GetRequest();
242277
ALOG_DEBUG(HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") <- ("
243-
<< response->Status << " " << response->Message << (response->IsDone() ? ")" : ") (incomplete)"));
278+
<< GetResponseDebugText(response) << (response->IsDone() ? ")" : ") (incomplete)"));
244279
if (!response->Status.StartsWith('2') && !response->Status.StartsWith('3') && response->Status != "404") {
245280
static constexpr size_t MAX_LOGGED_SIZE = 1024;
246281
ALOG_DEBUG(HttpLog,

0 commit comments

Comments
 (0)