Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions iocore/net/quic/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ libquic_a_SOURCES = \
QUICNewRenoCongestionController.cc \
QUICFlowController.cc \
QUICStreamState.cc \
QUICStreamAdapter.cc \
QUICStreamVCAdapter.cc \
QUICStream.cc \
QUICHandshake.cc \
QUICPacketHeaderProtector.cc \
Expand Down Expand Up @@ -92,6 +94,7 @@ libquic_a_SOURCES = \
QUICFrameGenerator.cc \
QUICFrameRetransmitter.cc \
QUICAddrVerifyState.cc \
QUICTransferProgressProvider.cc \
QUICBidirectionalStream.cc \
QUICCryptoStream.cc \
QUICUnidirectionalStream.cc \
Expand Down
90 changes: 85 additions & 5 deletions iocore/net/quic/Mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "QUICPinger.h"
#include "QUICPadder.h"
#include "QUICHandshakeProtocol.h"
#include "QUICStreamAdapter.h"

class MockQUICContext;

Expand Down Expand Up @@ -717,6 +718,76 @@ class MockQUICLossDetector : public QUICLossDetector
MockQUICCongestionController _cc;
};

class MockQUICStreamAdapter : public QUICStreamAdapter
{
public:
MockQUICStreamAdapter(QUICStream &stream) : QUICStreamAdapter(stream) {}

void
write_to_stream(const uint8_t *buf, size_t len)
{
this->_total_sending_data_len += len;
this->_sending_data_len += len;
}

int64_t
write(QUICOffset offset, const uint8_t *data, uint64_t data_length, bool fin) override
{
this->_total_receiving_data_len += data_length;
this->_receiving_data_len += data_length;
return data_length;
}
bool
is_eos() override
{
return false;
}
uint64_t
unread_len() override
{
return this->_sending_data_len;
}
uint64_t
read_len() override
{
return 0;
}
uint64_t
total_len() override
{
return this->_total_sending_data_len;
}
void
encourge_read() override
{
}
void
encourge_write() override
{
}
void
notify_eos() override
{
}

protected:
Ptr<IOBufferBlock>
_read(size_t len) override
{
this->_sending_data_len -= len;
Ptr<IOBufferBlock> block = make_ptr<IOBufferBlock>(new_IOBufferBlock());
block->alloc(iobuffer_size_to_index(len, BUFFER_SIZE_INDEX_32K));
block->fill(len);
return block;
}

private:
size_t _sending_data_len = 0;
size_t _total_sending_data_len = 0;
size_t _receiving_data_len = 0;
size_t _total_receiving_data_len = 0;
};

class MockQUICApplication : public QUICApplication
{
public:
Expand All @@ -726,20 +797,29 @@ class MockQUICApplication : public QUICApplication
main_event_handler(int event, Event *data)
{
if (event == 12345) {
QUICStreamIO *stream_io = static_cast<QUICStreamIO *>(data->cookie);
stream_io->write_reenable();
}
return EVENT_CONT;
}

void
on_new_stream(QUICStream &stream) override
{
auto ite = this->_streams.emplace(stream.id(), stream);
QUICStreamAdapter &adapter = ite.first->second;
stream.set_io_adapter(&adapter);
}

void
send(const uint8_t *data, size_t size, QUICStreamId stream_id)
{
QUICStreamIO *stream_io = this->_find_stream_io(stream_id);
stream_io->write(data, size);
auto ite = this->_streams.find(stream_id);
auto &adapter = ite->second;
adapter.write_to_stream(data, size);

eventProcessor.schedule_imm(this, ET_CALL, 12345, stream_io);
// eventProcessor.schedule_imm(this, ET_CALL, 12345, adapter);
}

std::unordered_map<QUICStreamId, MockQUICStreamAdapter> _streams;
};

class MockQUICPacketR : public QUICPacketR
Expand Down
248 changes: 1 addition & 247 deletions iocore/net/quic/QUICApplication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,172 +24,6 @@
#include "QUICApplication.h"
#include "QUICStream.h"

static constexpr char tag_stream_io[] = "quic_stream_io";
static constexpr char tag_app[] = "quic_app";

#define QUICStreamIODebug(fmt, ...) \
Debug(tag_stream_io, "[%s] [%" PRIu64 "] " fmt, this->_stream_vc->connection_info()->cids().data(), this->_stream_vc->id(), \
##__VA_ARGS__)

//
// QUICStreamIO
//
QUICStreamIO::QUICStreamIO(QUICApplication *app, QUICStreamVConnection *stream_vc) : _stream_vc(stream_vc)
{
this->_read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);
this->_write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_8K);

this->_read_buffer_reader = this->_read_buffer->alloc_reader();
this->_write_buffer_reader = this->_write_buffer->alloc_reader();

switch (stream_vc->direction()) {
case QUICStreamDirection::BIDIRECTIONAL:
this->_read_vio = stream_vc->do_io_read(app, INT64_MAX, this->_read_buffer);
this->_write_vio = stream_vc->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
break;
case QUICStreamDirection::SEND:
this->_write_vio = stream_vc->do_io_write(app, INT64_MAX, this->_write_buffer_reader);
break;
case QUICStreamDirection::RECEIVE:
this->_read_vio = stream_vc->do_io_read(app, INT64_MAX, this->_read_buffer);
break;
default:
ink_assert(false);
break;
}
}

QUICStreamIO::~QUICStreamIO()
{
// All readers will be deallocated
free_MIOBuffer(this->_read_buffer);
free_MIOBuffer(this->_write_buffer);
};

uint32_t
QUICStreamIO::stream_id() const
{
return this->_stream_vc->id();
}

bool
QUICStreamIO::is_bidirectional() const
{
return this->_stream_vc->is_bidirectional();
}

int64_t
QUICStreamIO::read(uint8_t *buf, int64_t len)
{
if (is_debug_tag_set(tag_stream_io)) {
if (this->_read_vio->nbytes == INT64_MAX) {
QUICStreamIODebug("nbytes=- ndone=%" PRId64 " read_avail=%" PRId64 " read_len=%" PRId64, this->_read_vio->ndone,
this->_read_buffer_reader->read_avail(), len);
} else {
QUICStreamIODebug("nbytes=%" PRId64 " ndone=%" PRId64 " read_avail=%" PRId64 " read_len=%" PRId64, this->_read_vio->nbytes,
this->_read_vio->ndone, this->_read_buffer_reader->read_avail(), len);
}
}

int64_t nread = this->_read_buffer_reader->read(buf, len);
if (nread > 0) {
this->_read_vio->ndone += nread;
}

this->_stream_vc->on_read();

return nread;
}

int64_t
QUICStreamIO::peek(uint8_t *buf, int64_t len)
{
return this->_read_buffer_reader->memcpy(buf, len) - reinterpret_cast<char *>(buf);
}

void
QUICStreamIO::consume(int64_t len)
{
this->_read_buffer_reader->consume(len);
this->_stream_vc->on_read();
}

bool
QUICStreamIO::is_read_done() const
{
return this->_read_vio->ntodo() == 0;
}

int64_t
QUICStreamIO::write(const uint8_t *buf, int64_t len)
{
SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());

int64_t nwritten = this->_write_buffer->write(buf, len);
if (nwritten > 0) {
this->_nwritten += nwritten;
}

return len;
}

int64_t
QUICStreamIO::write(IOBufferReader *r, int64_t len)
{
SCOPED_MUTEX_LOCK(lock, this->_write_vio->mutex, this_ethread());

int64_t bytes_avail = this->_write_buffer->write_avail();

if (bytes_avail > 0) {
if (is_debug_tag_set(tag_stream_io)) {
if (this->_write_vio->nbytes == INT64_MAX) {
QUICStreamIODebug("nbytes=- ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64, this->_write_vio->ndone,
bytes_avail, len);
} else {
QUICStreamIODebug("nbytes=%" PRId64 " ndone=%" PRId64 " write_avail=%" PRId64 " write_len=%" PRId64,
this->_write_vio->nbytes, this->_write_vio->ndone, bytes_avail, len);
}
}

int64_t bytes_len = std::min(bytes_avail, len);
int64_t nwritten = this->_write_buffer->write(r, bytes_len);

if (nwritten > 0) {
this->_nwritten += nwritten;
}

return nwritten;
} else {
return 0;
}
}

// TODO: Similar to other "write" apis, but do not copy.
int64_t
QUICStreamIO::write(IOBufferBlock *b)
{
ink_assert(!"not implemented yet");
return 0;
}

void
QUICStreamIO::write_done()
{
this->_write_vio->nbytes = this->_nwritten;
}

void
QUICStreamIO::read_reenable()
{
return this->_read_vio->reenable();
}

void
QUICStreamIO::write_reenable()
{
return this->_write_vio->reenable();
}

//
// QUICApplication
//
Expand All @@ -198,84 +32,4 @@ QUICApplication::QUICApplication(QUICConnection *qc) : Continuation(new_ProxyMut
this->_qc = qc;
}

QUICApplication::~QUICApplication()
{
for (auto const &kv : this->_stream_map) {
delete kv.second;
}
}

// @brief Bind stream and application
void
QUICApplication::set_stream(QUICStreamVConnection *stream_vc, QUICStreamIO *stream_io)
{
if (stream_io == nullptr) {
stream_io = new QUICStreamIO(this, stream_vc);
}
this->_stream_map.insert(std::make_pair(stream_vc->id(), stream_io));
}

// @brief Bind stream and application
void
QUICApplication::set_stream(QUICStreamIO *stream_io)
{
this->_stream_map.insert(std::make_pair(stream_io->stream_id(), stream_io));
}

bool
QUICApplication::is_stream_set(QUICStreamVConnection *stream)
{
auto result = this->_stream_map.find(stream->id());

return result != this->_stream_map.end();
}

void
QUICApplication::reenable(QUICStreamVConnection *stream)
{
QUICStreamIO *stream_io = this->_find_stream_io(stream->id());
if (stream_io) {
stream_io->read_reenable();
stream_io->write_reenable();
} else {
Debug(tag_app, "[%s] Unknown Stream id=%" PRIx64, this->_qc->cids().data(), stream->id());
}

return;
}

void
QUICApplication::unset_stream(QUICStreamVConnection *stream)
{
QUICStreamIO *stream_io = this->_find_stream_io(stream->id());
if (stream_io) {
this->_stream_map.erase(stream->id());
}
}

QUICStreamIO *
QUICApplication::_find_stream_io(QUICStreamId id)
{
auto result = this->_stream_map.find(id);

if (result == this->_stream_map.end()) {
return nullptr;
} else {
return result->second;
}
}

QUICStreamIO *
QUICApplication::_find_stream_io(VIO *vio)
{
if (vio == nullptr) {
return nullptr;
}

QUICStream *stream = dynamic_cast<QUICStream *>(vio->vc_server);
if (stream == nullptr) {
return nullptr;
}

return this->_find_stream_io(stream->id());
}
QUICApplication::~QUICApplication() {}
Loading