Skip to content

Commit

Permalink
send bulk datagrams
Browse files Browse the repository at this point in the history
  • Loading branch information
orignal committed Jun 9, 2020
1 parent 221c14c commit 6d7847f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
14 changes: 12 additions & 2 deletions libi2pd/Datagram.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ namespace datagram
session->SendMsg(msg);
}

void DatagramDestination::FlushSendQueue (const i2p::data::IdentHash & ident)
{
ObtainSession(ident)->FlushSendQueue ();
}

void DatagramDestination::HandleDatagram (uint16_t fromPort, uint16_t toPort,uint8_t * const &buf, size_t len)
{
i2p::data::IdentityEx identity;
Expand Down Expand Up @@ -366,6 +371,7 @@ namespace datagram

void DatagramSession::FlushSendQueue ()
{
if (m_SendQueue.empty ()) return;
std::vector<i2p::tunnel::TunnelMessageBlock> send;
auto routingPath = GetSharedRoutingPath();
// if we don't have a routing path we will drop all queued messages
Expand All @@ -380,15 +386,19 @@ namespace datagram
routingPath->outboundTunnel->SendTunnelDataMsg(send);
}
m_SendQueue.clear();
ScheduleFlushSendQueue();
}

void DatagramSession::ScheduleFlushSendQueue()
{
boost::posix_time::milliseconds dlt(10);
m_SendQueueTimer.expires_from_now(dlt);
auto self = shared_from_this();
m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec) { if(ec) return; self->FlushSendQueue(); });
m_SendQueueTimer.async_wait([self](const boost::system::error_code & ec)
{
if(ec) return;
self->FlushSendQueue();
self->ScheduleFlushSendQueue();
});
}
}
}
3 changes: 2 additions & 1 deletion libi2pd/Datagram.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ namespace datagram

/** send an i2np message to remote endpoint for this session */
void SendMsg(std::shared_ptr<I2NPMessage> msg);
void FlushSendQueue();
/** get the last time in milliseconds for when we used this datagram session */
uint64_t LastActivity() const { return m_LastUse; }

Expand All @@ -84,7 +85,6 @@ namespace datagram

private:

void FlushSendQueue();
void ScheduleFlushSendQueue();

void HandleSend(std::shared_ptr<I2NPMessage> msg);
Expand Down Expand Up @@ -122,6 +122,7 @@ namespace datagram

void SendDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash & ident, uint16_t fromPort = 0, uint16_t toPort = 0);
void SendRawDatagramTo (const uint8_t * payload, size_t len, const i2p::data::IdentHash & ident, uint16_t fromPort = 0, uint16_t toPort = 0);
void FlushSendQueue (const i2p::data::IdentHash & ident);
void HandleDataMessagePayload (uint16_t fromPort, uint16_t toPort, const uint8_t * buf, size_t len, bool isRaw = false);

void SetReceiver (const Receiver& receiver) { m_Receiver = receiver; };
Expand Down
18 changes: 18 additions & 0 deletions libi2pd_client/I2PTunnel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,24 @@ namespace client
// send off to remote i2p destination
LogPrint(eLogDebug, "UDP Client: send ", transferred, " to ", m_RemoteIdent->ToBase32(), ":", RemotePort);
m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort);
size_t numPackets = 0;
while (numPackets < i2p::datagram::DATAGRAM_SEND_QUEUE_MAX_SIZE)
{
boost::system::error_code ec;
size_t moreBytes = m_LocalSocket.available(ec);
if (ec || !moreBytes) break;
transferred = m_LocalSocket.receive_from (boost::asio::buffer (m_RecvBuff, I2P_UDP_MAX_MTU), m_RecvEndpoint, 0, ec);
remotePort = m_RecvEndpoint.port();
// TODO: check remotePort
m_LocalDest->GetDatagramDestination()->SendDatagramTo(m_RecvBuff, transferred, *m_RemoteIdent, remotePort, RemotePort);
numPackets++;
}
if (numPackets)
{
LogPrint(eLogDebug, "UDP Client: sent ", numPackets, " more packets to ", m_RemoteIdent->ToBase32());
m_LocalDest->GetDatagramDestination()->FlushSendQueue (*m_RemoteIdent);
}

// mark convo as active
if (m_LastSession)
m_LastSession->second = i2p::util::GetMillisecondsSinceEpoch();
Expand Down

0 comments on commit 6d7847f

Please sign in to comment.