Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ set(SOURCES
cubic.c
bbr.c
datagram.c
deadline_engine.c
frame.c
partition.c
library.c
Expand Down
36 changes: 34 additions & 2 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,15 +636,24 @@ MsQuicConnectionSendResumptionTicket(
_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamOpen(
MsQuicStreamOpenWithDeadline(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_In_ _Pre_defensive_ QUIC_STREAM_CALLBACK_HANDLER Handler,
_In_opt_ void* Context,
_Outptr_ _At_(*NewStream, __drv_allocatesMem(Mem)) _Pre_defensive_
HQUIC *NewStream
HQUIC *NewStream,
_In_ QUIC_TIME_DIFF MsToDeadline
)
{
QUIC_TIME_POINT Now = CxPlatTimeUs64();
QUIC_TIME_POINT Deadline;
if(MsToDeadline >= QUIC_TIME_POINT_INFINITE - Now) {
// Sets deadline to infinite if the requested deadline is too far in the future.
Deadline = QUIC_TIME_POINT_INFINITE;
} else {
Deadline = Now + MsToDeadline;
}
QUIC_STATUS Status;
QUIC_CONNECTION* Connection;

Expand Down Expand Up @@ -692,6 +701,7 @@ MsQuicStreamOpen(

(*(QUIC_STREAM**)NewStream)->ClientCallbackHandler = Handler;
(*(QUIC_STREAM**)NewStream)->ClientContext = Context;
(*(QUIC_STREAM**)NewStream)->Deadline = Deadline;

Error:

Expand All @@ -703,6 +713,28 @@ MsQuicStreamOpen(
return Status;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamOpen(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_In_ _Pre_defensive_ QUIC_STREAM_CALLBACK_HANDLER Handler,
_In_opt_ void* Context,
_Outptr_ _At_(*NewStream, __drv_allocatesMem(Mem)) _Pre_defensive_
HQUIC *NewStream
)
{
return
MsQuicStreamOpenWithDeadline(
Handle,
Flags,
Handler,
Context,
NewStream,
QUIC_TIME_POINT_INFINITE);
}

#pragma warning(push)
#pragma warning(disable:6014) // SAL doesn't understand the free happens on the worker
_IRQL_requires_max_(PASSIVE_LEVEL)
Expand Down
13 changes: 13 additions & 0 deletions src/core/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,19 @@ MsQuicStreamOpen(
HQUIC *Stream
);

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamOpenWithDeadline(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ QUIC_STREAM_OPEN_FLAGS Flags,
_In_ _Pre_defensive_ QUIC_STREAM_CALLBACK_HANDLER Handler,
_In_opt_ void* Context,
_Outptr_ _At_(*NewStream, __drv_allocatesMem(Mem)) _Pre_defensive_
HQUIC *NewStream,
_In_ QUIC_TIME_DIFF MsToDeadline
);

_IRQL_requires_max_(PASSIVE_LEVEL)
void
QUIC_API
Expand Down
2 changes: 1 addition & 1 deletion src/core/bbr.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ typedef enum RECOVERY_STATE {
} RECOVERY_STATE;

//
// Bandwidth is measured as (bytes / BW_UNIT) per second
// Bandwidth is measured as (bytes * BW_UNIT) per second
//
#define BW_UNIT 8 // 1 << 3

Expand Down
120 changes: 120 additions & 0 deletions src/core/deadline_engine.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*++

Copyright (c) Microsoft Corporation.
Licensed under the MIT License.

Abstract:
Implements `DrainBeforeDeadlineEngine` which is used to check if the stream
can be drained before the deadline. This check requires working with congestion
control and has different behavior based on the congestion control algorithm used.

--*/

#include "precomp.h"

// Should match the enum in bbr.c
typedef enum BBR_STATE {

BBR_STATE_STARTUP,

BBR_STATE_DRAIN,

BBR_STATE_PROBE_BW,

BBR_STATE_PROBE_RTT

} BBR_STATE;

typedef enum TRILEAN {

TRILEAN_TRUE,

TRILEAN_FALSE,

//
// Indicates bad state, all operations on it result it TRILEAN_UNKNOWN
// Fall back to default behavior.
//
TRILEAN_UNKNOWN

} TRILEAN;


TRILEAN DrainBeforeDeadlineEngineBBR(
_In_ const QUIC_STREAM* Stream
)
{
QUIC_CONNECTION* Connection = Stream->Connection;
QUIC_CONGESTION_CONTROL* Cc = &Connection->CongestionControl;
QUIC_CONGESTION_CONTROL_BBR* Bbr = (QUIC_CONGESTION_CONTROL_BBR*)(&Cc->Bbr);

if(Bbr->BbrState == BBR_STATE_STARTUP)
{
// In the startup state, we do not have a good estimate of the bandwidth,
// so we assume that we can drain the stream.
return TRILEAN_UNKNOWN;
}

return TRUE;
}

TRILEAN DrainBeforeDeadlineCcSpecificEngine(
_In_ const QUIC_STREAM* Stream,
_In_ QUIC_CONGESTION_CONTROL* Cc
)
{
if(strcmp(Cc->Name, "BBR") == 0) {
// BBR congestion control algorithm.
return DrainBeforeDeadlineEngineBBR(Stream);
} else {
// Unknown or unsupported congestion control algorithm.
return TRUE;
}
}

BOOLEAN DrainBeforeDeadlineEngine(
_In_ const QUIC_STREAM* Stream
)
{
QUIC_TIME_POINT Now = CxPlatTimeUs64();
if(Stream->Deadline < Now) {
return FALSE;
}

QUIC_CONNECTION* Connection = Stream->Connection;
QUIC_CONGESTION_CONTROL* Cc = &Connection->CongestionControl;

// We do Congestion Control algorithm specific checks first
TRILEAN CcSpecficResult = DrainBeforeDeadlineCcSpecificEngine(Stream, Cc);

// TRILEAN_UNKNOWN && _ == TRILEAN_UNKNOWN (we use default value which is TRUE)
if(CcSpecficResult == TRILEAN_UNKNOWN)
// default behavior is true
return TRUE;
// TRILEAN_FALSE && _ == TRILEAN_FALSE
else if (CcSpecficResult == TRILEAN_FALSE)
return FALSE;

QUIC_NETWORK_STATISTICS NetworkStatistics;
CxPlatZeroMemory(&NetworkStatistics, sizeof(NetworkStatistics));
Cc->QuicCongestionControlGetNetworkStatistics(Connection, Cc, &NetworkStatistics);

if(NetworkStatistics.Bandwidth == 0)
{
// If we have no/invalid bandwidth estimate, we cannot determine if we can drain.
// Assume it can drain for now.
return TRUE;
}

uint32_t BytesInFlight = NetworkStatistics.BytesInFlight;
uint64_t SmoothedRTT = NetworkStatistics.SmoothedRTT;
uint64_t Bandwidth = NetworkStatistics.Bandwidth;

QUIC_TIME_DIFF TransmissionDelayOfBytesInFlight = (BytesInFlight / Bandwidth) * 1000000; // Convert to microseconds
uint64_t BytesToDrain = Stream->QueuedSendOffset - Stream->NextSendOffset;
QUIC_TIME_DIFF TransmissionDelayOfBytesToDrain = (BytesToDrain / Bandwidth) * 1000000; // Convert to microseconds

QUIC_TIME_DIFF TotalTransmissionDelay = TransmissionDelayOfBytesInFlight + TransmissionDelayOfBytesToDrain + (SmoothedRTT / 2);

return TotalTransmissionDelay < Stream->Deadline - Now;
}
1 change: 1 addition & 0 deletions src/core/library.c
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,7 @@ MsQuicOpenVersion(
Api->ConnectionCertificateValidationComplete = MsQuicConnectionCertificateValidationComplete;

Api->StreamOpen = MsQuicStreamOpen;
Api->StreamOpenWithDeadline = MsQuicStreamOpenWithDeadline;
Api->StreamClose = MsQuicStreamClose;
Api->StreamShutdown = MsQuicStreamShutdown;
Api->StreamStart = MsQuicStreamStart;
Expand Down
18 changes: 15 additions & 3 deletions src/core/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ QuicSendWriteFrames(

BOOLEAN
QuicSendCanSendStreamNow(
_In_ QUIC_STREAM* Stream
_In_ const QUIC_STREAM* Stream
)
{
CXPLAT_DBG_ASSERT(Stream->SendFlags != 0);
Expand Down Expand Up @@ -1016,6 +1016,19 @@ QuicSendGetNextStream(

QUIC_STREAM* Stream = CXPLAT_CONTAINING_RECORD(Entry, QUIC_STREAM, SendLink);

if(Stream->Deadline < CxPlatTimeUs64()) {
QuicSendSetStreamSendFlag(Send, Stream, QUIC_STREAM_SEND_FLAG_RECV_ABORT, TRUE);
QuicSendSetStreamSendFlag(Send, Stream, QUIC_STREAM_SEND_FLAG_SEND_ABORT, FALSE);

QUIC_STREAM_EVENT Event;
Event.Type = QUIC_STREAM_EVENT_CANCEL_ON_EXPIRED_DEADLINE;
Event.CANCEL_ON_EXPIRED_DEADLINE.ErrorCode = 0;
(void) QuicStreamIndicateEvent(Stream, &Event);

QuicStreamShutdown(Stream, QUIC_STREAM_SHUTDOWN_FLAG_ABORT,
Event.CANCEL_ON_EXPIRED_DEADLINE.ErrorCode);
}

//
// Make sure, given the current state of the connection and the stream,
// that we can use the stream to frame a packet.
Expand Down Expand Up @@ -1393,8 +1406,7 @@ QuicSendFlush(
} else {
WrotePacketFrames = FALSE;
}
} else if (Stream != NULL ||
(Stream = QuicSendGetNextStream(Send, &StreamPacketCount)) != NULL) {
} else if (Stream != NULL || (Stream = QuicSendGetNextStream(Send, &StreamPacketCount)) != NULL) {
if (!QuicPacketBuilderPrepareForStreamFrames(
&Builder,
Send->TailLossProbeNeeded)) {
Expand Down
2 changes: 1 addition & 1 deletion src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ QuicStreamTraceRundown(
_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QuicStreamIndicateEvent(
_In_ QUIC_STREAM* Stream,
_In_ const QUIC_STREAM* Stream,
_Inout_ QUIC_STREAM_EVENT* Event
)
{
Expand Down
7 changes: 6 additions & 1 deletion src/core/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ typedef struct QUIC_STREAM {
uint64_t CachedConnCongestionControlUs;
uint64_t CachedConnFlowControlUs;
} BlockedTimings;

//
// The deadline (in microseconds) for the stream to be processed.
//
QUIC_TIME_POINT Deadline;
} QUIC_STREAM;

//
Expand Down Expand Up @@ -634,7 +639,7 @@ QuicStreamTraceRundown(
_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QuicStreamIndicateEvent(
_In_ QUIC_STREAM* Stream,
_In_ const QUIC_STREAM* Stream,
_Inout_ QUIC_STREAM_EVENT* Event
);

Expand Down
14 changes: 12 additions & 2 deletions src/core/stream_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
#include "stream_send.c.clog.h"
#endif

BOOLEAN DrainBeforeDeadlineEngine(
_In_ const QUIC_STREAM* Stream
);

_IRQL_requires_max_(PASSIVE_LEVEL)
void
QuicStreamCompleteSendRequest(
Expand Down Expand Up @@ -336,7 +340,7 @@ QuicStreamHasPending0RttData(

//
// Returns TRUE if the stream can send a STREAM frame immediately. This
// function does not include any congestion control state checks.
// function does not include any congestion control state and deadline checks.
//
BOOLEAN
QuicStreamSendCanWriteDataFrames(
Expand Down Expand Up @@ -372,9 +376,15 @@ QuicStreamSendCanWriteDataFrames(
// Some unsent data. Can send only if flow control will allow.
//
QUIC_SEND* Send = &Stream->Connection->Send;

BOOLEAN CanWeDrainBeforeDeadline = TRUE;
if(Stream->Deadline != QUIC_TIME_POINT_INFINITE) {
CanWeDrainBeforeDeadline = DrainBeforeDeadlineEngine(Stream);
}

return
Stream->NextSendOffset < Stream->MaxAllowedSendOffset &&
Send->OrderedStreamBytesSent < Send->PeerMaxData;
Send->OrderedStreamBytesSent < Send->PeerMaxData && CanWeDrainBeforeDeadline;
}

BOOLEAN
Expand Down
2 changes: 2 additions & 0 deletions src/core/stream_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ QuicStreamSetGetStreamForPeer(
}

Stream->ID = NewStreamId;
// The peer which accepts the stream has no notion of deadline
Stream->Deadline = QUIC_TIME_POINT_INFINITE;
Status = QuicStreamStart(Stream, QUIC_STREAM_START_FLAG_NONE, TRUE);
if (QUIC_FAILED(Status)) {
*FatalError = TRUE;
Expand Down
Loading