Skip to content

Commit

Permalink
Major refactoring to push data up from the InterfaceController to the…
Browse files Browse the repository at this point in the history
… ReachabilityCollector for submitting lag, drops and passed bits to the snode
  • Loading branch information
cjdelisle committed Sep 4, 2018
1 parent 3130363 commit f187db6
Show file tree
Hide file tree
Showing 24 changed files with 682 additions and 155 deletions.
1 change: 1 addition & 0 deletions dht/Pathfinder.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* eventI
case PFChan_Core_PING: return handlePing(msg, pf);
case PFChan_Core_PONG: return handlePong(msg, pf);
case PFChan_Core_UNSETUP_SESSION:
case PFChan_Core_LINK_STATE:
case PFChan_Core_CTRL_MSG: return NULL;
default:;
}
Expand Down
6 changes: 5 additions & 1 deletion net/EventEmitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,17 @@ static bool PFChan_Core_sizeOk(enum PFChan_Core ev, int size)
case PFChan_Core_CTRL_MSG:
return (size >= 8 + PFChan_CtrlMsg_MIN_SIZE);

case PFChan_Core_LINK_STATE:
return (size >= 8 + PFChan_LinkState_Entry_SIZE) &&
!((size - 8) % PFChan_LinkState_Entry_SIZE);

default:;
}
Assert_failure("invalid event [%d]", ev);
}
// Remember to add the event to this function too!
Assert_compileTime(PFChan_Core__TOO_LOW == 1023);
Assert_compileTime(PFChan_Core__TOO_HIGH == 1039);
Assert_compileTime(PFChan_Core__TOO_HIGH == 1040);

static Iface_DEFUN incomingFromCore(struct Message* msg, struct Iface* trickIf)
{
Expand Down
74 changes: 44 additions & 30 deletions net/InterfaceController.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
/** Wait 32 seconds between sending beacon messages. */
#define BEACON_INTERVAL 32768

/** Every 10 seconds, check the number of dropped packets and update the moving average. */
#define CHECKDROPS_INTERVAL_MILLISECONDS 10000
/** Every 3 seconds inform the pathfinder of the current link states. */
#define LINKSTATE_UPDATE_INTERVAL 3000


// ---------------- Map ----------------
Expand Down Expand Up @@ -197,8 +197,8 @@ struct InterfaceController_pvt
/** The timeout event to use for pinging potentially unresponsive neighbors. */
struct Timeout* const pingInterval;

/** The timeout event for updating the moving average of number of dropped packets. */
struct Timeout* const dropCheckInterval;
/** The timeout event for updating the link state to the pathfinders. */
struct Timeout* const linkStateInterval;

/** For pinging lazy/unresponsive nodes. */
struct SwitchPinger* const switchPinger;
Expand Down Expand Up @@ -322,34 +322,48 @@ static void sendPing(struct Peer* ep)
}
}

static void iciCheckDrops(
struct InterfaceController_Iface_pvt* ici,
struct InterfaceController_pvt* ic)
static void linkState(void* vic)
{
for (uint32_t i = 0; i < ici->peerMap.count; i++) {
struct Peer* ep = ici->peerMap.values[i];

uint32_t drops = ep->caSession->replayProtector.lostPackets;
uint64_t newDrops = 0;
if (drops > ep->_lastDrops) { newDrops = drops - ep->_lastDrops; }
ep->_lastDrops = drops;
ep->lastDrops += newDrops;

uint32_t packets = ep->caSession->replayProtector.baseOffset;
uint64_t newPackets = 0;
if (packets > ep->_lastPackets) { newPackets = packets - ep->_lastPackets; }
ep->_lastPackets = packets;
ep->lastPackets += newPackets;
struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vic);
uint32_t msgLen = 64;
for (int i = 0; i < ic->icis->length; i++) {
struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i);
msgLen += PFChan_LinkState_Entry_SIZE * ici->peerMap.count;
}
}
struct Allocator* alloc = Allocator_child(ic->alloc);
struct Message* msg = Message_new(0, msgLen, alloc);

static void checkDrops(void* vic)
{
struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vic);
for (int i = 0; i < ic->icis->length; i++) {
struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i);
iciCheckDrops(ici, ic);
for (uint32_t i = 0; i < ici->peerMap.count; i++) {
struct Peer* ep = ici->peerMap.values[i];

uint32_t drops = ep->caSession->replayProtector.lostPackets;
uint64_t newDrops = 0;
if (drops > ep->_lastDrops) { newDrops = drops - ep->_lastDrops; }
ep->_lastDrops = drops;
ep->lastDrops += newDrops;

uint32_t packets = ep->caSession->replayProtector.baseOffset;
uint64_t newPackets = 0;
if (packets > ep->_lastPackets) { newPackets = packets - ep->_lastPackets; }
ep->_lastPackets = packets;
ep->lastPackets += newPackets;

struct PFChan_LinkState_Entry e = {
.peerLabel_be = Endian_hostToBigEndian32((uint32_t) ep->addr.path),
.sumOfPackets_be = Endian_hostToBigEndian32(ep->lastPackets),
.sumOfDrops_be = Endian_hostToBigEndian32(ep->lastDrops),
.sumOfKb_be = Endian_hostToBigEndian32((uint32_t) (ep->bytesIn >> 10))
};
Message_push(msg, &e, PFChan_LinkState_Entry_SIZE, NULL);
}
}

Message_push32(msg, 0xffffffff, NULL);
Message_push32(msg, PFChan_Core_LINK_STATE, NULL);
Iface_send(&ic->eventEmitterIf, msg);
Allocator_free(alloc);
}

static void iciPing(struct InterfaceController_Iface_pvt* ici, struct InterfaceController_pvt* ic)
Expand Down Expand Up @@ -491,7 +505,7 @@ static Iface_DEFUN receivedPostCryptoAuth(struct Message* msg,

if (caState == CryptoAuth_State_ESTABLISHED) {
moveEndpointIfNeeded(ep);
//sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff);// version is not known at this point.
//sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff);// version is not known.
} else {
// prevent some kinds of nasty things which could be done with packet replay.
// This is checking the message switch header and will drop it unless the label
Expand Down Expand Up @@ -1148,10 +1162,10 @@ struct InterfaceController* InterfaceController_new(struct CryptoAuth* ca,
.forgetAfterMilliseconds = FORGET_AFTER_MILLISECONDS,
.beaconInterval = BEACON_INTERVAL,

.dropCheckInterval = Timeout_setInterval(
checkDrops,
.linkStateInterval = Timeout_setInterval(
linkState,
out,
CHECKDROPS_INTERVAL_MILLISECONDS,
LINKSTATE_UPDATE_INTERVAL,
eventBase,
alloc),

Expand Down
2 changes: 1 addition & 1 deletion node_build/make.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Builder.configure({
crossCompiling: process.env['CROSS'] !== undefined,
gcc: GCC,
tempDir: process.env['CJDNS_BUILD_TMPDIR'] || '/tmp',
optimizeLevel: '-O3',
optimizeLevel: '-O0',
logLevel: process.env['Log_LEVEL'] || 'DEBUG'
}, function (builder, waitFor) {

Expand Down
65 changes: 65 additions & 0 deletions subnode/LinkState.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/* vim: set expandtab ts=4 sw=4: */
/*
* You may redistribute this program and/or modify it under the terms of
* the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#ifndef LinkState_H
#define LinkState_H

#include "util/VarInt.h"
#include "wire/Message.h"

#include <stdint.h>

#define LinkState_SLOTS 18

struct LinkState {
uint16_t lagSlots[LinkState_SLOTS];
uint16_t dropSlots[LinkState_SLOTS];
uint32_t kbRecvSlots[LinkState_SLOTS];
uint32_t samples;
};
/*
struct LinkState_AllPeers {
};
static int LinkState_encode(struct Message* msg, struct LinkState* ls, int lastSamples)
{
struct VarInt_Iter iter = {
.ptr = msg->bytes,
.end = msg->bytes,
.start = &msg->bytes[-msg->padding]
};
if (iter.end - iter.start > 255) { iter.start = &iter.end[-255]; }
int startingPoint = ls->samples - lastSamples;
int err = 0;
for (int i = 0; i < LinkState_SLOTS; i++) { err |= VarInt_push(&iter, ls->lagSlots[i]); }
for (int i = 0; i < LinkState_SLOTS; i++) { err |= VarInt_push(&iter, ls->dropSlots[i]); }
for (int i = 0; i < LinkState_SLOTS; i++) { err |= VarInt_push(&iter, ls->kbRecvSlots[i]); }
err |= VarInt_push(&iter, ls->samples);
err |= VarInt_push(&iter, 0xfc);
while ((((uintptr_t)iter.ptr) & 7) != 2) { err |= VarInt_push(&iter, 0); }
if (iter.end - iter.ptr) { }
VarInt_push(&iter, 0);
Announce_Type_LINK_STATE
err |= VarInt_push(&iter, 0xfc);
if (!err) {
Message_shift(msg, iter.end - iter.ptr )
}
}
*/

#endif
1 change: 1 addition & 0 deletions subnode/MsgCore.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ static void pingerOnResponse(String* data, uint32_t milliseconds, void* context)
if (mcp->currentReply) {
rc = Identity_check(mcp->currentReply);
}
pp->pub.lag = milliseconds;

if (pp->pub.cb) {
pp->pub.cb((rc) ? rc->content : NULL,
Expand Down
1 change: 1 addition & 0 deletions subnode/MsgCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ struct MsgCore_Promise
struct Allocator* alloc;
void* userData;
MsgCore_PromiseCb cb;
uint32_t lag;
};

struct MsgCore_Promise* MsgCore_createQuery(struct MsgCore* core,
Expand Down
23 changes: 7 additions & 16 deletions subnode/ReachabilityAnnouncer.c
Original file line number Diff line number Diff line change
Expand Up @@ -381,31 +381,22 @@ static void addServerStateMsg(struct ReachabilityAnnouncer_pvt* rap, struct Mess

void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra,
uint8_t ipv6[16],
uint64_t pathThemToUs,
uint64_t pathUsToThem,
uint32_t mtu,
uint16_t drops,
uint16_t latency,
uint16_t penalty)
uint32_t pathThemToUs,
uint32_t pathUsToThem)
{
struct ReachabilityAnnouncer_pvt* rap = Identity_check((struct ReachabilityAnnouncer_pvt*) ra);

uint8_t ipPrinted[40];
AddrTools_printIp(ipPrinted, ipv6);
Log_debug(rap->log, "Update peer [%s] [%08llx]", ipPrinted, (long long) pathThemToUs);

if (pathThemToUs > 0xffffffff) {
Log_warn(rap->log, "oversize path for [%08llx]", (long long) pathThemToUs);
return;
}

struct Announce_Peer refPeer;
Announce_Peer_init(&refPeer);
refPeer.label_be = Endian_hostToBigEndian32(pathThemToUs);
refPeer.mtu8_be = Endian_hostToBigEndian16((mtu / 8));
refPeer.drops_be = Endian_hostToBigEndian16(drops);
refPeer.latency_be = Endian_hostToBigEndian16(latency);
refPeer.penalty_be = Endian_hostToBigEndian16(penalty);
// TODO(cjd): This needs to carry the observed MTU
refPeer.mtu8_be = 0;
refPeer.unused = 0xffffffff;
refPeer.peerNum_be = EncodingScheme_parseDirector(rap->myScheme, pathUsToThem);
refPeer.encodingFormNum = EncodingScheme_getFormNum(rap->myScheme, pathUsToThem);
Bits_memcpy(refPeer.ipv6, ipv6, 16);

Expand Down Expand Up @@ -438,7 +429,7 @@ void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra,
return;
}
case updatePeer_UPDATE: {
if (drops == 0xffff) {
if (!pathThemToUs) {
Log_debug(rap->log, "update (peergone)");
stateUpdate(rap, ReachabilityAnnouncer_State_PEERGONE);
} else {
Expand Down
8 changes: 2 additions & 6 deletions subnode/ReachabilityAnnouncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ struct ReachabilityAnnouncer
// (pathThemToUs == 0) -> peer is gone.
void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra,
uint8_t ipv6[16],
uint64_t pathThemToUs,
uint64_t pathUsToThem,
uint32_t mtu,
uint16_t drops,
uint16_t latency,
uint16_t penalty);
uint32_t pathThemToUs,
uint32_t pathUsToThem);

struct ReachabilityAnnouncer* ReachabilityAnnouncer_new(struct Allocator* allocator,
struct Log* log,
Expand Down
Loading

0 comments on commit f187db6

Please sign in to comment.