From f187db6b6657f46551de6d13af625220bb5db7a9 Mon Sep 17 00:00:00 2001 From: Caleb James DeLisle Date: Tue, 4 Sep 2018 11:51:11 +0200 Subject: [PATCH] Major refactoring to push data up from the InterfaceController to the ReachabilityCollector for submitting lag, drops and passed bits to the snode --- dht/Pathfinder.c | 1 + net/EventEmitter.c | 6 +- net/InterfaceController.c | 74 ++++++---- node_build/make.js | 2 +- subnode/LinkState.h | 65 +++++++++ subnode/MsgCore.c | 1 + subnode/MsgCore.h | 1 + subnode/ReachabilityAnnouncer.c | 23 +-- subnode/ReachabilityAnnouncer.h | 8 +- subnode/ReachabilityCollector.c | 193 +++++++++++++++++++++----- subnode/ReachabilityCollector.h | 23 ++- subnode/ReachabilityCollector_admin.c | 27 ++++ subnode/SubnodePathfinder.c | 63 +++++---- subnode/SupernodeHunter.c | 7 +- subnode/SupernodeHunter.h | 4 +- switch/EncodingScheme.c | 14 ++ switch/EncodingScheme.h | 3 + util/Set.c | 27 ++-- util/Set.h | 12 +- util/VarInt.h | 110 +++++++++++++++ util/test/Set_test.c | 4 +- util/test/VarInt_test.c | 100 +++++++++++++ wire/Announce.h | 50 +++++-- wire/PFChan.h | 19 ++- 24 files changed, 682 insertions(+), 155 deletions(-) create mode 100644 subnode/LinkState.h create mode 100644 util/VarInt.h create mode 100644 util/test/VarInt_test.c diff --git a/dht/Pathfinder.c b/dht/Pathfinder.c index 3815d1e10..9fdbe7512 100644 --- a/dht/Pathfinder.c +++ b/dht/Pathfinder.c @@ -453,6 +453,7 @@ static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* eventI case PFChan_Core_PING: return handlePing(msg, pf); case PFChan_Core_PONG: return handlePong(msg, pf); case PFChan_Core_UNSETUP_SESSION: + case PFChan_Core_LINK_STATE: case PFChan_Core_CTRL_MSG: return NULL; default:; } diff --git a/net/EventEmitter.c b/net/EventEmitter.c index 9665b58bc..7bc498e96 100644 --- a/net/EventEmitter.c +++ b/net/EventEmitter.c @@ -160,13 +160,17 @@ static bool PFChan_Core_sizeOk(enum PFChan_Core ev, int size) case PFChan_Core_CTRL_MSG: return (size >= 8 + PFChan_CtrlMsg_MIN_SIZE); + case PFChan_Core_LINK_STATE: + return (size >= 8 + PFChan_LinkState_Entry_SIZE) && + !((size - 8) % PFChan_LinkState_Entry_SIZE); + default:; } Assert_failure("invalid event [%d]", ev); } // Remember to add the event to this function too! Assert_compileTime(PFChan_Core__TOO_LOW == 1023); -Assert_compileTime(PFChan_Core__TOO_HIGH == 1039); +Assert_compileTime(PFChan_Core__TOO_HIGH == 1040); static Iface_DEFUN incomingFromCore(struct Message* msg, struct Iface* trickIf) { diff --git a/net/InterfaceController.c b/net/InterfaceController.c index 523a7773a..6cf80bfe6 100644 --- a/net/InterfaceController.c +++ b/net/InterfaceController.c @@ -59,8 +59,8 @@ /** Wait 32 seconds between sending beacon messages. */ #define BEACON_INTERVAL 32768 -/** Every 10 seconds, check the number of dropped packets and update the moving average. */ -#define CHECKDROPS_INTERVAL_MILLISECONDS 10000 +/** Every 3 seconds inform the pathfinder of the current link states. */ +#define LINKSTATE_UPDATE_INTERVAL 3000 // ---------------- Map ---------------- @@ -197,8 +197,8 @@ struct InterfaceController_pvt /** The timeout event to use for pinging potentially unresponsive neighbors. */ struct Timeout* const pingInterval; - /** The timeout event for updating the moving average of number of dropped packets. */ - struct Timeout* const dropCheckInterval; + /** The timeout event for updating the link state to the pathfinders. */ + struct Timeout* const linkStateInterval; /** For pinging lazy/unresponsive nodes. */ struct SwitchPinger* const switchPinger; @@ -322,34 +322,48 @@ static void sendPing(struct Peer* ep) } } -static void iciCheckDrops( - struct InterfaceController_Iface_pvt* ici, - struct InterfaceController_pvt* ic) +static void linkState(void* vic) { - for (uint32_t i = 0; i < ici->peerMap.count; i++) { - struct Peer* ep = ici->peerMap.values[i]; - - uint32_t drops = ep->caSession->replayProtector.lostPackets; - uint64_t newDrops = 0; - if (drops > ep->_lastDrops) { newDrops = drops - ep->_lastDrops; } - ep->_lastDrops = drops; - ep->lastDrops += newDrops; - - uint32_t packets = ep->caSession->replayProtector.baseOffset; - uint64_t newPackets = 0; - if (packets > ep->_lastPackets) { newPackets = packets - ep->_lastPackets; } - ep->_lastPackets = packets; - ep->lastPackets += newPackets; + struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vic); + uint32_t msgLen = 64; + for (int i = 0; i < ic->icis->length; i++) { + struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); + msgLen += PFChan_LinkState_Entry_SIZE * ici->peerMap.count; } -} + struct Allocator* alloc = Allocator_child(ic->alloc); + struct Message* msg = Message_new(0, msgLen, alloc); -static void checkDrops(void* vic) -{ - struct InterfaceController_pvt* ic = Identity_check((struct InterfaceController_pvt*) vic); for (int i = 0; i < ic->icis->length; i++) { struct InterfaceController_Iface_pvt* ici = ArrayList_OfIfaces_get(ic->icis, i); - iciCheckDrops(ici, ic); + for (uint32_t i = 0; i < ici->peerMap.count; i++) { + struct Peer* ep = ici->peerMap.values[i]; + + uint32_t drops = ep->caSession->replayProtector.lostPackets; + uint64_t newDrops = 0; + if (drops > ep->_lastDrops) { newDrops = drops - ep->_lastDrops; } + ep->_lastDrops = drops; + ep->lastDrops += newDrops; + + uint32_t packets = ep->caSession->replayProtector.baseOffset; + uint64_t newPackets = 0; + if (packets > ep->_lastPackets) { newPackets = packets - ep->_lastPackets; } + ep->_lastPackets = packets; + ep->lastPackets += newPackets; + + struct PFChan_LinkState_Entry e = { + .peerLabel_be = Endian_hostToBigEndian32((uint32_t) ep->addr.path), + .sumOfPackets_be = Endian_hostToBigEndian32(ep->lastPackets), + .sumOfDrops_be = Endian_hostToBigEndian32(ep->lastDrops), + .sumOfKb_be = Endian_hostToBigEndian32((uint32_t) (ep->bytesIn >> 10)) + }; + Message_push(msg, &e, PFChan_LinkState_Entry_SIZE, NULL); + } } + + Message_push32(msg, 0xffffffff, NULL); + Message_push32(msg, PFChan_Core_LINK_STATE, NULL); + Iface_send(&ic->eventEmitterIf, msg); + Allocator_free(alloc); } static void iciPing(struct InterfaceController_Iface_pvt* ici, struct InterfaceController_pvt* ic) @@ -491,7 +505,7 @@ static Iface_DEFUN receivedPostCryptoAuth(struct Message* msg, if (caState == CryptoAuth_State_ESTABLISHED) { moveEndpointIfNeeded(ep); - //sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff);// version is not known at this point. + //sendPeer(0xffffffff, PFChan_Core_PEER, ep, 0xffff);// version is not known. } else { // prevent some kinds of nasty things which could be done with packet replay. // This is checking the message switch header and will drop it unless the label @@ -1148,10 +1162,10 @@ struct InterfaceController* InterfaceController_new(struct CryptoAuth* ca, .forgetAfterMilliseconds = FORGET_AFTER_MILLISECONDS, .beaconInterval = BEACON_INTERVAL, - .dropCheckInterval = Timeout_setInterval( - checkDrops, + .linkStateInterval = Timeout_setInterval( + linkState, out, - CHECKDROPS_INTERVAL_MILLISECONDS, + LINKSTATE_UPDATE_INTERVAL, eventBase, alloc), diff --git a/node_build/make.js b/node_build/make.js index 3356e3e5f..467bdd85b 100644 --- a/node_build/make.js +++ b/node_build/make.js @@ -46,7 +46,7 @@ Builder.configure({ crossCompiling: process.env['CROSS'] !== undefined, gcc: GCC, tempDir: process.env['CJDNS_BUILD_TMPDIR'] || '/tmp', - optimizeLevel: '-O3', + optimizeLevel: '-O0', logLevel: process.env['Log_LEVEL'] || 'DEBUG' }, function (builder, waitFor) { diff --git a/subnode/LinkState.h b/subnode/LinkState.h new file mode 100644 index 000000000..9264eee3e --- /dev/null +++ b/subnode/LinkState.h @@ -0,0 +1,65 @@ +/* vim: set expandtab ts=4 sw=4: */ +/* + * You may redistribute this program and/or modify it under the terms of + * the GNU General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#ifndef LinkState_H +#define LinkState_H + +#include "util/VarInt.h" +#include "wire/Message.h" + +#include + +#define LinkState_SLOTS 18 + +struct LinkState { + uint16_t lagSlots[LinkState_SLOTS]; + uint16_t dropSlots[LinkState_SLOTS]; + uint32_t kbRecvSlots[LinkState_SLOTS]; + uint32_t samples; +}; +/* +struct LinkState_AllPeers { + +}; + +static int LinkState_encode(struct Message* msg, struct LinkState* ls, int lastSamples) +{ + struct VarInt_Iter iter = { + .ptr = msg->bytes, + .end = msg->bytes, + .start = &msg->bytes[-msg->padding] + }; + if (iter.end - iter.start > 255) { iter.start = &iter.end[-255]; } + + int startingPoint = ls->samples - lastSamples; + + int err = 0; + for (int i = 0; i < LinkState_SLOTS; i++) { err |= VarInt_push(&iter, ls->lagSlots[i]); } + for (int i = 0; i < LinkState_SLOTS; i++) { err |= VarInt_push(&iter, ls->dropSlots[i]); } + for (int i = 0; i < LinkState_SLOTS; i++) { err |= VarInt_push(&iter, ls->kbRecvSlots[i]); } + err |= VarInt_push(&iter, ls->samples); + err |= VarInt_push(&iter, 0xfc); + while ((((uintptr_t)iter.ptr) & 7) != 2) { err |= VarInt_push(&iter, 0); } + if (iter.end - iter.ptr) { } + VarInt_push(&iter, 0); + Announce_Type_LINK_STATE + err |= VarInt_push(&iter, 0xfc); + if (!err) { + Message_shift(msg, iter.end - iter.ptr ) + } + +} +*/ + +#endif \ No newline at end of file diff --git a/subnode/MsgCore.c b/subnode/MsgCore.c index b13e17473..59e4ed2a2 100644 --- a/subnode/MsgCore.c +++ b/subnode/MsgCore.c @@ -107,6 +107,7 @@ static void pingerOnResponse(String* data, uint32_t milliseconds, void* context) if (mcp->currentReply) { rc = Identity_check(mcp->currentReply); } + pp->pub.lag = milliseconds; if (pp->pub.cb) { pp->pub.cb((rc) ? rc->content : NULL, diff --git a/subnode/MsgCore.h b/subnode/MsgCore.h index 348bdd911..30facb6ed 100644 --- a/subnode/MsgCore.h +++ b/subnode/MsgCore.h @@ -56,6 +56,7 @@ struct MsgCore_Promise struct Allocator* alloc; void* userData; MsgCore_PromiseCb cb; + uint32_t lag; }; struct MsgCore_Promise* MsgCore_createQuery(struct MsgCore* core, diff --git a/subnode/ReachabilityAnnouncer.c b/subnode/ReachabilityAnnouncer.c index 9bbd71f73..16ff35de2 100644 --- a/subnode/ReachabilityAnnouncer.c +++ b/subnode/ReachabilityAnnouncer.c @@ -381,12 +381,8 @@ static void addServerStateMsg(struct ReachabilityAnnouncer_pvt* rap, struct Mess void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra, uint8_t ipv6[16], - uint64_t pathThemToUs, - uint64_t pathUsToThem, - uint32_t mtu, - uint16_t drops, - uint16_t latency, - uint16_t penalty) + uint32_t pathThemToUs, + uint32_t pathUsToThem) { struct ReachabilityAnnouncer_pvt* rap = Identity_check((struct ReachabilityAnnouncer_pvt*) ra); @@ -394,18 +390,13 @@ void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra, AddrTools_printIp(ipPrinted, ipv6); Log_debug(rap->log, "Update peer [%s] [%08llx]", ipPrinted, (long long) pathThemToUs); - if (pathThemToUs > 0xffffffff) { - Log_warn(rap->log, "oversize path for [%08llx]", (long long) pathThemToUs); - return; - } - struct Announce_Peer refPeer; Announce_Peer_init(&refPeer); refPeer.label_be = Endian_hostToBigEndian32(pathThemToUs); - refPeer.mtu8_be = Endian_hostToBigEndian16((mtu / 8)); - refPeer.drops_be = Endian_hostToBigEndian16(drops); - refPeer.latency_be = Endian_hostToBigEndian16(latency); - refPeer.penalty_be = Endian_hostToBigEndian16(penalty); + // TODO(cjd): This needs to carry the observed MTU + refPeer.mtu8_be = 0; + refPeer.unused = 0xffffffff; + refPeer.peerNum_be = EncodingScheme_parseDirector(rap->myScheme, pathUsToThem); refPeer.encodingFormNum = EncodingScheme_getFormNum(rap->myScheme, pathUsToThem); Bits_memcpy(refPeer.ipv6, ipv6, 16); @@ -438,7 +429,7 @@ void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra, return; } case updatePeer_UPDATE: { - if (drops == 0xffff) { + if (!pathThemToUs) { Log_debug(rap->log, "update (peergone)"); stateUpdate(rap, ReachabilityAnnouncer_State_PEERGONE); } else { diff --git a/subnode/ReachabilityAnnouncer.h b/subnode/ReachabilityAnnouncer.h index a2bbd07f2..bae13df4b 100644 --- a/subnode/ReachabilityAnnouncer.h +++ b/subnode/ReachabilityAnnouncer.h @@ -34,12 +34,8 @@ struct ReachabilityAnnouncer // (pathThemToUs == 0) -> peer is gone. void ReachabilityAnnouncer_updatePeer(struct ReachabilityAnnouncer* ra, uint8_t ipv6[16], - uint64_t pathThemToUs, - uint64_t pathUsToThem, - uint32_t mtu, - uint16_t drops, - uint16_t latency, - uint16_t penalty); + uint32_t pathThemToUs, + uint32_t pathUsToThem); struct ReachabilityAnnouncer* ReachabilityAnnouncer_new(struct Allocator* allocator, struct Log* log, diff --git a/subnode/ReachabilityCollector.c b/subnode/ReachabilityCollector.c index 229f5e941..935460c6e 100644 --- a/subnode/ReachabilityCollector.c +++ b/subnode/ReachabilityCollector.c @@ -19,6 +19,9 @@ #include "util/Identity.h" #include "util/events/Timeout.h" #include "util/AddrTools.h" +#include "util/events/Time.h" + +#include #define TIMEOUT_MILLISECONDS 10000 @@ -28,6 +31,20 @@ struct PeerInfo_pvt // Next path to check when sending getPeers requests to our peer looking for ourselves. uint64_t pathToCheck; + + // For this 10 second period + uint32_t sumOfLag; + uint32_t lagSamples; + uint32_t timeOfLastLagUpdate; + + uint32_t sumOfDropsLastSlot; + uint32_t sumOfPacketsLastSlot; + uint32_t sumOfKbLastSlot; + + uint32_t sumOfDrops; + uint32_t sumOfPackets; + uint32_t sumOfKb; + // This peer is waiting for response bool waitForResponse; @@ -48,12 +65,23 @@ struct ReachabilityCollector_pvt struct Log* log; struct BoilerplateResponder* br; struct Address* myAddr; + struct EventBase* base; + uint32_t resampleCycle; struct ArrayList_OfPeerInfo_pvt* piList; Identity }; +static struct PeerInfo_pvt* piForLabel(struct ReachabilityCollector_pvt* rcp, uint64_t label) +{ + for (int j = 0; j < rcp->piList->length; j++) { + struct PeerInfo_pvt* pi0 = ArrayList_OfPeerInfo_pvt_get(rcp->piList, j); + if (pi0->pub.addr.path == label) { return pi0; } + } + return NULL; +} + static void mkNextRequest(struct ReachabilityCollector_pvt* rcp); static void change0(struct ReachabilityCollector_pvt* rcp, @@ -62,25 +90,29 @@ static void change0(struct ReachabilityCollector_pvt* rcp, { for (int i = 0; i < rcp->piList->length; i++) { struct PeerInfo_pvt* pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, i); - if (Address_isSameIp(nodeAddr, &pi->pub.addr)) { - if (nodeAddr->path == 0) { - Log_debug(rcp->log, "Peer [%s] dropped", - Address_toString(&pi->pub.addr, tempAlloc)->bytes); - ArrayList_OfPeerInfo_pvt_remove(rcp->piList, i); - Allocator_free(pi->alloc); - } else if (nodeAddr->path != pi->pub.addr.path) { - Log_debug(rcp->log, "Peer [%s] changed path", - Address_toString(&pi->pub.addr, tempAlloc)->bytes); - pi->pub.pathThemToUs = -1; - pi->pathToCheck = 1; - pi->pub.querying = true; - pi->pub.addr.path = nodeAddr->path; - } - if (rcp->pub.onChange) { - rcp->pub.onChange(&rcp->pub, nodeAddr->ip6.bytes, 0, 0, 0, 0xffff, 0xffff, 0xffff); - } - return; + if (!Address_isSameIp(nodeAddr, &pi->pub.addr)) { continue; } + if (nodeAddr->path == 0) { + Log_debug(rcp->log, "Peer [%s] dropped", + Address_toString(&pi->pub.addr, tempAlloc)->bytes); + ArrayList_OfPeerInfo_pvt_remove(rcp->piList, i); + Allocator_free(pi->alloc); + rcp->pub.onChange(&rcp->pub, nodeAddr->ip6.bytes, 0, 0); + } else if (nodeAddr->path != pi->pub.addr.path) { + Log_debug(rcp->log, "Peer [%s] changed path", + Address_toString(&pi->pub.addr, tempAlloc)->bytes); + pi->pub.pathThemToUs = -1; + pi->pathToCheck = 1; + pi->pub.querying = true; + pi->pub.addr.path = nodeAddr->path; + //rcp->pub.onChange( + // &rcp->pub, nodeAddr->ip6.bytes, pi->pub.pathThemToUs, nodeAddr->path); + // Lets leave the peer in the list as working, our path to it changed + // but it's path to us didn't necessarily change. + } else { + Log_debug(rcp->log, "Peer [%s] message, peer already registered", + Address_toString(&pi->pub.addr, tempAlloc)->bytes); } + return; } if (nodeAddr->path == 0) { Log_debug(rcp->log, "Nonexistant peer [%s] dropped", @@ -130,6 +162,17 @@ static void onReplyTimeout(struct MsgCore_Promise* prom) } } +static void latencyUpdate( + struct ReachabilityCollector_pvt* rcp, + struct PeerInfo_pvt* pip, + uint32_t lag) +{ + Log_debug(rcp->log, "Latency update for [%016x] [%u]ms", pip->pub.addr.path, lag); + pip->sumOfLag += lag; + pip->lagSamples++; + pip->timeOfLastLagUpdate = Time_currentTimeMilliseconds(rcp->base); +} + static void onReply(Dict* msg, struct Address* src, struct MsgCore_Promise* prom) { struct Query* q = (struct Query*) prom->userData; @@ -153,6 +196,7 @@ static void onReply(Dict* msg, struct Address* src, struct MsgCore_Promise* prom Log_debug(rcp->log, "Got message from peer which is gone from list"); return; } + latencyUpdate(rcp, pi, prom->lag); pi->waitForResponse = false; @@ -173,10 +217,7 @@ static void onReply(Dict* msg, struct Address* src, struct MsgCore_Promise* prom Log_debug(rcp->log, "Found back-route for [%s]", Address_toString(src, prom->alloc)->bytes); pi->pub.pathThemToUs = path; - if (rcp->pub.onChange) { - rcp->pub.onChange( - &rcp->pub, src->ip6.bytes, path, src->path, 0, 0xffff, 0xffff, 0xffff); - } + rcp->pub.onChange(&rcp->pub, src->ip6.bytes, path, src->path); } pi->pub.querying = false; mkNextRequest(rcp); @@ -193,21 +234,8 @@ static void onReply(Dict* msg, struct Address* src, struct MsgCore_Promise* prom mkNextRequest(rcp); } -static void mkNextRequest(struct ReachabilityCollector_pvt* rcp) +static void queryPeer(struct ReachabilityCollector_pvt* rcp, struct PeerInfo_pvt* pi) { - struct PeerInfo_pvt* pi = NULL; - for (int i = 0; i < rcp->piList->length; i++) { - pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, i); - if (pi->pub.querying && !pi->waitForResponse) { break; } - } - if (!pi || !pi->pub.querying) { - Log_debug(rcp->log, "All [%u] peers have been queried", rcp->piList->length); - return; - } - if (pi->waitForResponse) { - Log_debug(rcp->log, "Peer is waiting for response."); - return; - } struct MsgCore_Promise* query = MsgCore_createQuery(rcp->msgCore, TIMEOUT_MILLISECONDS, rcp->alloc); struct Query* q = Allocator_calloc(query->alloc, sizeof(struct Query), 1); @@ -217,6 +245,7 @@ static void mkNextRequest(struct ReachabilityCollector_pvt* rcp) query->cb = onReply; Assert_true(AddressCalc_validAddress(pi->pub.addr.ip6.bytes)); query->target = Address_clone(&pi->pub.addr, query->alloc); + Assert_true(pi->pub.addr.path); Dict* d = query->msg = Dict_new(query->alloc); Dict_putStringCC(d, "q", "gp", query->alloc); uint64_t label_be = Endian_hostToBigEndian64(pi->pathToCheck); @@ -233,10 +262,66 @@ static void mkNextRequest(struct ReachabilityCollector_pvt* rcp) pi->waitForResponse = true; } +static void mkNextRequest(struct ReachabilityCollector_pvt* rcp) +{ + struct PeerInfo_pvt* pi = NULL; + for (int i = 0; i < rcp->piList->length; i++) { + pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, i); + if (pi->pub.querying && !pi->waitForResponse) { break; } + } + if (!pi || !pi->pub.querying) { + Log_debug(rcp->log, "All [%u] peers have been queried", rcp->piList->length); + return; + } + if (pi->waitForResponse) { + Log_debug(rcp->log, "Peer is waiting for response."); + return; + } + queryPeer(rcp, pi); +} + static void cycle(void* vrc) { struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) vrc); mkNextRequest(rcp); + + // 10 second window is cut into 5 intervals + // second 0, 2, 4, 6, 8 + // number 1, 2, 3, 4, 5 + // in number 4, we will ping any peer who has not received one yet to get latency + // in number 5, we will collect everything back + rcp->resampleCycle++; + if (rcp->resampleCycle < 4) { return; } + + for (int j = 0; j < rcp->piList->length; j++) { + struct PeerInfo_pvt* pi = ArrayList_OfPeerInfo_pvt_get(rcp->piList, j); + Log_debug(rcp->log, "Visiting peer [%016x] samples [%u]", + pi->pub.addr.path, pi->lagSamples); + if (pi->lagSamples == 0) { + Log_debug(rcp->log, "Triggering a ping to peer [%016x]", pi->pub.addr.path); + queryPeer(rcp, pi); + } + + if (rcp->resampleCycle < 5) { continue; } + + int sampleNum = pi->pub.linkState.samples % ReachabilityCollector_SLOTS; + + uint64_t drops = pi->sumOfDrops - pi->sumOfDropsLastSlot; + uint64_t packets = pi->sumOfPackets - pi->sumOfPacketsLastSlot; + uint64_t dropRateShl18 = packets ? (drops << 18) / packets : 0; + pi->pub.linkState.dropSlots[sampleNum] = dropRateShl18 > 0xfffe ? 0xfffe : dropRateShl18; + pi->sumOfDropsLastSlot = pi->sumOfDrops; + + pi->pub.linkState.kbRecvSlots[sampleNum] = pi->sumOfKb - pi->sumOfKbLastSlot; + pi->sumOfKbLastSlot = pi->sumOfKb; + + pi->pub.linkState.lagSlots[sampleNum] = pi->lagSamples ? pi->sumOfLag / pi->lagSamples : 0; + pi->sumOfLag = 0; + pi->lagSamples = 0; + + pi->pub.linkState.samples++; + } + if (rcp->resampleCycle >= 5) { rcp->resampleCycle = 0; } } struct ReachabilityCollector_PeerInfo* @@ -247,6 +332,40 @@ struct ReachabilityCollector_PeerInfo* return pi ? &pi->pub : NULL; } +void ReachabilityCollector_lagSample( + struct ReachabilityCollector* rc, uint64_t label, uint32_t milliseconds) +{ + struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) rc); + struct PeerInfo_pvt* pi = piForLabel(rcp, label); + if (!pi) { return; } + latencyUpdate(rcp, pi, milliseconds); +} + +void ReachabilityCollector_updateBandwidthAndDrops( + struct ReachabilityCollector* rc, + uint64_t label, + uint32_t sumOfPackets, + uint32_t sumOfDrops, + uint32_t sumOfKb) +{ + struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) rc); + struct PeerInfo_pvt* pi = piForLabel(rcp, label); + if (!pi) { return; } + pi->sumOfPackets = sumOfPackets; + pi->sumOfDrops = sumOfDrops; + pi->sumOfKb = sumOfKb; +} + +static void dummyOnChange( + struct ReachabilityCollector* rc, + uint8_t nodeIpv6[16], + uint32_t pathThemToUs, + uint32_t pathUsToThem) +{ + struct ReachabilityCollector_pvt* rcp = Identity_check((struct ReachabilityCollector_pvt*) rc); + Log_debug(rcp->log, "dummyOnChange called, onChange unassigned"); +} + struct ReachabilityCollector* ReachabilityCollector_new(struct Allocator* allocator, struct MsgCore* msgCore, struct Log* log, @@ -263,6 +382,8 @@ struct ReachabilityCollector* ReachabilityCollector_new(struct Allocator* alloca rcp->piList = ArrayList_OfPeerInfo_pvt_new(alloc); rcp->log = log; rcp->br = br; + rcp->base = base; + rcp->pub.onChange = dummyOnChange; Identity_set(rcp); Timeout_setInterval(cycle, rcp, 2000, base, alloc); return &rcp->pub; diff --git a/subnode/ReachabilityCollector.h b/subnode/ReachabilityCollector.h index 2576fdefc..3b6212e6c 100644 --- a/subnode/ReachabilityCollector.h +++ b/subnode/ReachabilityCollector.h @@ -18,9 +18,12 @@ #include "dht/Address.h" #include "memory/Allocator.h" #include "subnode/MsgCore.h" +#include "subnode/LinkState.h" #include "util/Linker.h" Linker_require("subnode/ReachabilityCollector.c"); +#define ReachabilityCollector_SLOTS 18 + struct ReachabilityCollector_PeerInfo { // Address of the peer from us @@ -30,18 +33,16 @@ struct ReachabilityCollector_PeerInfo uint64_t pathThemToUs; bool querying; + + struct LinkState linkState; }; struct ReachabilityCollector; typedef void (* ReachabilityCollector_OnChange)(struct ReachabilityCollector* rc, uint8_t nodeIpv6[16], - uint64_t pathThemToUs, - uint64_t pathUsToThem, - uint32_t mtu, // 0 = unknown - uint16_t drops, // 0xffff = unknown - uint16_t latency, // 0xffff = unknown - uint16_t penalty); // 0xffff = unknown + uint32_t pathThemToUs, + uint32_t pathUsToThem); struct ReachabilityCollector { @@ -55,6 +56,16 @@ struct ReachabilityCollector_PeerInfo* // NodeAddr->path should be 0 if the node is not reachable. void ReachabilityCollector_change(struct ReachabilityCollector* rc, struct Address* nodeAddr); +void ReachabilityCollector_lagSample( + struct ReachabilityCollector* rc, uint64_t label, uint32_t milliseconds); + +void ReachabilityCollector_updateBandwidthAndDrops( + struct ReachabilityCollector* rc, + uint64_t label, + uint32_t sumOfPackets, + uint32_t sumOfDrops, + uint32_t sumOfKb); + struct ReachabilityCollector* ReachabilityCollector_new(struct Allocator* allocator, struct MsgCore* mc, struct Log* log, diff --git a/subnode/ReachabilityCollector_admin.c b/subnode/ReachabilityCollector_admin.c index 3062e17b2..68b68f19e 100644 --- a/subnode/ReachabilityCollector_admin.c +++ b/subnode/ReachabilityCollector_admin.c @@ -27,6 +27,23 @@ struct Context { Identity }; +static List* numList(void* buf, uint32_t numberSz, struct Allocator* alloc) +{ + List* l = List_new(alloc); + for (int i = 0; i < ReachabilityCollector_SLOTS; i++) { + int64_t num; + if (numberSz == 2) { + num = ((uint16_t*)buf)[i]; + } else if (numberSz == 4) { + num = ((uint32_t*)buf)[i]; + } else { + Assert_failure("unexpected number size"); + } + List_addInt(l, num, alloc); + } + return l; +} + #define NODES_PER_PAGE 8 static void getPeerInfo(Dict* args, void* vcontext, String* txid, struct Allocator* requestAlloc) { @@ -46,6 +63,16 @@ static void getPeerInfo(Dict* args, void* vcontext, String* txid, struct Allocat Dict_putStringC( pid, "pathThemToUs", String_newBinary(rpath, 19, requestAlloc), requestAlloc); Dict_putIntC(pid, "querying", pi->querying, requestAlloc); + + Dict_putListC( + pid, "lagSlots", numList(pi->linkState.lagSlots, 2, requestAlloc), requestAlloc); + Dict_putListC( + pid, "dropSlots", numList(pi->linkState.dropSlots, 2, requestAlloc), requestAlloc); + Dict_putListC( + pid, "kbRecvSlots", numList(pi->linkState.kbRecvSlots, 4, requestAlloc), requestAlloc); + Dict_putIntC( + pid, "samples", pi->linkState.samples, requestAlloc); + List_addDict(peerList, pid, requestAlloc); } Dict* out = Dict_new(requestAlloc); diff --git a/subnode/SubnodePathfinder.c b/subnode/SubnodePathfinder.c index f3b4bc19c..8299ff171 100644 --- a/subnode/SubnodePathfinder.c +++ b/subnode/SubnodePathfinder.c @@ -121,7 +121,7 @@ static Iface_DEFUN connected(struct SubnodePathfinder_pvt* pf, struct Message* m return NULL; } -static void addressForNode(struct Address* addrOut, struct Message* msg) +static uint32_t addressForNode(struct Address* addrOut, struct Message* msg) { struct PFChan_Node node; Message_pop(msg, &node, PFChan_Node_SIZE, NULL); @@ -130,6 +130,7 @@ static void addressForNode(struct Address* addrOut, struct Message* msg) addrOut->path = Endian_bigEndianToHost64(node.path_be); Bits_memcpy(addrOut->key, node.publicKey, 32); Bits_memcpy(addrOut->ip6.bytes, node.ip6, 16); + return Endian_bigEndianToHost32(node.metric_be); } static Iface_DEFUN switchErr(struct Message* msg, struct SubnodePathfinder_pvt* pf) @@ -253,22 +254,17 @@ static Iface_DEFUN searchReq(struct Message* msg, struct SubnodePathfinder_pvt* static void rcChange(struct ReachabilityCollector* rc, uint8_t nodeIpv6[16], - uint64_t pathThemToUs, - uint64_t pathUsToThem, - uint32_t mtu, - uint16_t drops, - uint16_t latency, - uint16_t penalty) + uint32_t pathThemToUs, + uint32_t pathUsToThem) { struct SubnodePathfinder_pvt* pf = Identity_check((struct SubnodePathfinder_pvt*) rc->userData); - ReachabilityAnnouncer_updatePeer( - pf->ra, nodeIpv6, pathThemToUs, pathUsToThem, mtu, drops, latency, penalty); + ReachabilityAnnouncer_updatePeer(pf->ra, nodeIpv6, pathThemToUs, pathUsToThem); } static Iface_DEFUN peer(struct Message* msg, struct SubnodePathfinder_pvt* pf) { struct Address addr; - addressForNode(&addr, msg); + uint32_t metric = addressForNode(&addr, msg); String* str = Address_toString(&addr, msg->alloc); Log_debug(pf->log, "Peer [%s]", str->bytes); @@ -286,6 +282,9 @@ static Iface_DEFUN peer(struct Message* msg, struct SubnodePathfinder_pvt* pf) //NodeCache_discoverNode(pf->nc, &addr); ReachabilityCollector_change(pf->pub.rc, &addr); + if ((metric & 0xffff) < 0xffff) { + ReachabilityCollector_lagSample(pf->pub.rc, addr.path, (metric & 0xffff)); + } return sendNode(msg, &addr, 0xfff00000, PFChan_Pathfinder_NODE, pf); } @@ -445,6 +444,22 @@ static Iface_DEFUN incomingMsg(struct Message* msg, struct SubnodePathfinder_pvt return Iface_next(&pf->msgCoreIf, msg); } +static Iface_DEFUN linkState(struct Message* msg, struct SubnodePathfinder_pvt* pf) +{ + while (msg->length) { + struct PFChan_LinkState_Entry lse; + Message_pop(msg, &lse, PFChan_LinkState_Entry_SIZE, NULL); + ReachabilityCollector_updateBandwidthAndDrops( + pf->pub.rc, + Endian_bigEndianToHost32(lse.peerLabel_be), + Endian_bigEndianToHost32(lse.sumOfPackets_be), + Endian_bigEndianToHost32(lse.sumOfDrops_be), + Endian_bigEndianToHost32(lse.sumOfKb_be) + ); + } + return NULL; +} + static Iface_DEFUN incomingFromMsgCore(struct Message* msg, struct Iface* iface) { struct SubnodePathfinder_pvt* pf = @@ -482,6 +497,7 @@ static Iface_DEFUN incomingFromEventIf(struct Message* msg, struct Iface* eventI case PFChan_Core_PONG: return handlePong(msg, pf); case PFChan_Core_CTRL_MSG: return ctrlMsg(msg, pf); case PFChan_Core_UNSETUP_SESSION: return unsetupSession(msg, pf); + case PFChan_Core_LINK_STATE: return linkState(msg, pf); default:; } Assert_failure("unexpected event [%d]", ev); @@ -503,26 +519,25 @@ static void sendEvent(struct SubnodePathfinder_pvt* pf, void SubnodePathfinder_start(struct SubnodePathfinder* sp) { struct SubnodePathfinder_pvt* pf = Identity_check((struct SubnodePathfinder_pvt*) sp); - pf->msgCore = MsgCore_new(pf->base, pf->rand, pf->alloc, pf->log, pf->myScheme); - Iface_plumb(&pf->msgCoreIf, &pf->msgCore->interRouterIf); + struct MsgCore* msgCore = pf->msgCore = + MsgCore_new(pf->base, pf->rand, pf->alloc, pf->log, pf->myScheme); + Iface_plumb(&pf->msgCoreIf, &msgCore->interRouterIf); - PingResponder_new(pf->alloc, pf->log, pf->msgCore, pf->br); + PingResponder_new(pf->alloc, pf->log, msgCore, pf->br); GetPeersResponder_new( - pf->alloc, pf->log, pf->myPeers, pf->myAddress, pf->msgCore, pf->br, pf->myScheme); + pf->alloc, pf->log, pf->myPeers, pf->myAddress, msgCore, pf->br, pf->myScheme); - pf->pub.snh = SupernodeHunter_new( - pf->alloc, pf->log, pf->base, pf->sp, pf->myPeers, pf->msgCore, pf->myAddress); + struct ReachabilityCollector* rc = pf->pub.rc = ReachabilityCollector_new( + pf->alloc, msgCore, pf->log, pf->base, pf->br, pf->myAddress); + rc->userData = pf; + rc->onChange = rcChange; - pf->ra = ReachabilityAnnouncer_new( - pf->alloc, pf->log, pf->base, pf->rand, pf->msgCore, pf->pub.snh, pf->privateKey, - pf->myScheme); + struct SupernodeHunter* snh = pf->pub.snh = SupernodeHunter_new( + pf->alloc, pf->log, pf->base, pf->sp, pf->myPeers, msgCore, pf->myAddress, rc); - pf->pub.rc = ReachabilityCollector_new( - pf->alloc, pf->msgCore, pf->log, pf->base, pf->br, pf->myAddress); - - pf->pub.rc->userData = pf; - pf->pub.rc->onChange = rcChange; + pf->ra = ReachabilityAnnouncer_new( + pf->alloc, pf->log, pf->base, pf->rand, msgCore, snh, pf->privateKey, pf->myScheme); struct PFChan_Pathfinder_Connect conn = { .superiority_be = Endian_hostToBigEndian32(1), diff --git a/subnode/SupernodeHunter.c b/subnode/SupernodeHunter.c index 932fbc503..faa76fd37 100644 --- a/subnode/SupernodeHunter.c +++ b/subnode/SupernodeHunter.c @@ -57,6 +57,8 @@ struct SupernodeHunter_pvt struct Address* myAddress; String* selfAddrStr; + struct ReachabilityCollector* rc; + Identity }; @@ -247,6 +249,7 @@ static void queryForAuthorized(struct SupernodeHunter_pvt* snp, struct Address* static void peerResponseOK(struct SwitchPinger_Response* resp, struct SupernodeHunter_pvt* snp) { + ReachabilityCollector_lagSample(snp->rc, resp->label, resp->milliseconds); struct Address snode; Bits_memcpy(&snode, &resp->snode, sizeof(struct Address)); if (!snode.path) { @@ -372,7 +375,8 @@ struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator, struct SwitchPinger* sp, struct AddrSet* peers, struct MsgCore* msgCore, - struct Address* myAddress) + struct Address* myAddress, + struct ReachabilityCollector* rc) { struct Allocator* alloc = Allocator_child(allocator); struct SupernodeHunter_pvt* out = @@ -389,6 +393,7 @@ struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator, out->alloc = alloc; out->msgCore = msgCore; out->myAddress = myAddress; + out->rc = rc; out->selfAddrStr = String_newBinary(myAddress->ip6.bytes, 16, alloc); out->sp = sp; out->snodePathUpdated = false; diff --git a/subnode/SupernodeHunter.h b/subnode/SupernodeHunter.h index 0d69c2771..6bda7046e 100644 --- a/subnode/SupernodeHunter.h +++ b/subnode/SupernodeHunter.h @@ -20,6 +20,7 @@ #include "util/events/EventBase.h" #include "subnode/AddrSet.h" #include "subnode/MsgCore.h" +#include "subnode/ReachabilityCollector.h" #include "dht/Address.h" #include "net/SwitchPinger.h" #include "util/Linker.h" @@ -70,6 +71,7 @@ struct SupernodeHunter* SupernodeHunter_new(struct Allocator* allocator, struct SwitchPinger* sp, struct AddrSet* peers, struct MsgCore* msgCore, - struct Address* myAddress); + struct Address* myAddress, + struct ReachabilityCollector* rc); #endif diff --git a/switch/EncodingScheme.c b/switch/EncodingScheme.c index 622cd3faa..9f7338d7d 100644 --- a/switch/EncodingScheme.c +++ b/switch/EncodingScheme.c @@ -53,6 +53,20 @@ static bool is358(struct EncodingScheme* scheme) return true; } + +int EncodingScheme_parseDirector(struct EncodingScheme* scheme, uint64_t label) +{ + int formNum = EncodingScheme_getFormNum(scheme, label); + if (formNum == EncodingScheme_getFormNum_INVALID) { + return EncodingScheme_parseDirector_INVALID; + } + struct EncodingScheme_Form* currentForm = &scheme->forms[formNum]; + if ((label & Bits_maxBits64(currentForm->prefixLen + currentForm->bitCount)) == 1) { + return 0; + } + return ((label >> currentForm->prefixLen) & Bits_maxBits64(currentForm->bitCount)) + 1; +} + uint64_t EncodingScheme_convertLabel(struct EncodingScheme* scheme, uint64_t routeLabel, int convertTo) diff --git a/switch/EncodingScheme.h b/switch/EncodingScheme.h index ee971c4b7..178bc8773 100644 --- a/switch/EncodingScheme.h +++ b/switch/EncodingScheme.h @@ -109,4 +109,7 @@ int EncodingScheme_isSelfRoute(struct EncodingScheme* scheme, uint64_t routeLabe */ int EncodingScheme_isOneHop(struct EncodingScheme* scheme, uint64_t routeLabel); +#define EncodingScheme_parseDirector_INVALID -1 +int EncodingScheme_parseDirector(struct EncodingScheme* scheme, uint64_t label); + #endif diff --git a/util/Set.c b/util/Set.c index 58234db16..9a4551ded 100644 --- a/util/Set.c +++ b/util/Set.c @@ -25,6 +25,7 @@ struct Entry { void* data; uint32_t hashCode; struct Allocator* alloc; + struct Set_pvt* set; struct { struct Entry* rbe_left; struct Entry* rbe_right; @@ -72,10 +73,8 @@ struct Set_pvt static int compare(const struct Entry* a, const struct Entry* b) { if (a->hashCode != b->hashCode) { return a->hashCode - b->hashCode; } - const struct Entry* root = a; - while (root->tree.rbe_parent) { root = root->tree.rbe_parent; } - const struct Set_pvt* set = Identity_containerOf(root, struct Set_pvt, activeTree.rbh_root); - return set->compare(a, b); + struct Set_pvt* set = Identity_check((struct Set_pvt*) a->set); + return set->compare(a->data, b->data); } RB_GENERATE_STATIC(ActiveTree, Entry, tree, compare) @@ -105,6 +104,7 @@ static struct Entry* allocateBlock(struct Set_pvt* set) set->block = newBlock; uint32_t num = newBlock->number = (set->block ? set->block->number : -1) + 1; for (int i = 0; i < BLOCK_SZ; i++) { + newBlock->entries[i].set = set; newBlock->entries[i].hashCode = num; FreeTree_RB_INSERT(&set->freeTree, &newBlock->entries[i]); } @@ -155,11 +155,12 @@ static void freeEntry(struct Set_pvt* set, struct Entry* e) if (!b->usedCount) { freeBlock(set, b); } } -static struct Entry* get(struct Set_pvt* set, void* val) +static struct Entry* get(struct Set_pvt* set, void* val, uint32_t hashCode) { struct Entry e = { - .hashCode = set->hashCode(val), - .data = val + .hashCode = hashCode, + .data = val, + .set = set }; return ActiveTree_RB_FIND(&set->activeTree, &e); } @@ -168,10 +169,10 @@ int Set_addCopy(struct Set* _set, void* val, uint32_t size) { struct Set_pvt* set = Identity_check((struct Set_pvt*) _set); uint32_t hashCode = set->hashCode(val); - struct Entry* e = get(set, val); + struct Entry* e = get(set, val, hashCode); if (!e) { struct Entry* e = newEntry(set); - e->hashCode = set->hashCode(val); + e->hashCode = hashCode; ActiveTree_RB_INSERT(&set->activeTree, e); struct Block* b = blockForEntry(set, e); e->alloc = Allocator_child(b->alloc); @@ -185,11 +186,11 @@ int Set_add(struct Set* _set, void* val) { struct Set_pvt* set = Identity_check((struct Set_pvt*) _set); uint32_t hashCode = set->hashCode(val); - struct Entry* e = get(set, val); + struct Entry* e = get(set, val, hashCode); if (!e) { struct Entry* e = newEntry(set); e->data = val; - e->hashCode = set->hashCode(val); + e->hashCode = hashCode; ActiveTree_RB_INSERT(&set->activeTree, e); } return set->size; @@ -221,7 +222,7 @@ void Set_iterNext(struct Set_Iter* iter) void* Set_remove(struct Set* _set, void* val) { struct Set_pvt* set = Identity_check((struct Set_pvt*) _set); - struct Entry* e = get(set, val); + struct Entry* e = get(set, val, set->hashCode(val)); void* out = NULL; if (e) { out = e->data; @@ -233,7 +234,7 @@ void* Set_remove(struct Set* _set, void* val) void* Set_get(struct Set* _set, void* val) { struct Set_pvt* set = Identity_check((struct Set_pvt*) _set); - struct Entry* e = get(set, val); + struct Entry* e = get(set, val, set->hashCode(val)); void* out = NULL; if (e) { out = e->data; } return out; diff --git a/util/Set.h b/util/Set.h index f24ac75ad..6c752385c 100644 --- a/util/Set.h +++ b/util/Set.h @@ -16,7 +16,6 @@ #define Set_H #include "memory/Allocator.h" -#include "util/Hash.h" #include "util/Bits.h" #include "util/UniqueName.h" @@ -46,6 +45,7 @@ int Set_addCopy(struct Set* _set, void* val, uint32_t size); for (Set_ ## name ## _iter(set, &UniqueName_last()); \ ((out) = UniqueName_last().val); \ Set_ ## name ## _iterNext(&UniqueName_last())) +// CHECKFILES_IGNORE expecting a { #endif // Used multiple times... @@ -71,7 +71,8 @@ struct Set_FULLNAME { int size; }; -struct Set_FUNCTION(Iter) { +struct Set_FUNCTION(Iter) +{ Set_TYPE* val; void* internal; }; @@ -82,23 +83,24 @@ struct Set_FUNCTION(Iter) { #endif static inline uint32_t Set_FUNCTION(_hashCode)(const void* a) { - return Set_HASHCODE(((Set_TYPE**)a)[0]); + return Set_HASHCODE((Set_TYPE*)a); } static inline int Set_FUNCTION(_compare)(const void* a, const void* b) { - return Set_COMPARE(((Set_TYPE**)a)[0], ((Set_TYPE**)b)[0]); + return Set_COMPARE((Set_TYPE*)a, (Set_TYPE*)b); } #else #ifdef Set_HASHCODE #error cannot specify Set_HASHCODE without Set_COMPARE #endif +#include "util/Hash.h" static inline uint32_t Set_FUNCTION(_hashCode)(const void* a) { return Hash_compute((uint8_t*) a, sizeof(Set_TYPE)); } static inline int Set_FUNCTION(_compare)(const void* a, const void* b) { - return Bits_memcmp(((Set_TYPE**)a)[0], ((Set_TYPE**)b)[0], sizeof(Set_TYPE)); + return Bits_memcmp(a, b, sizeof(Set_TYPE)); } #endif diff --git a/util/VarInt.h b/util/VarInt.h new file mode 100644 index 000000000..5d1a384b5 --- /dev/null +++ b/util/VarInt.h @@ -0,0 +1,110 @@ +/* vim: set expandtab ts=4 sw=4: */ +/* + * You may redistribute this program and/or modify it under the terms of + * the GNU General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#ifndef VarInt_H +#define VarInt_H + +#include "util/Bits.h" + +#include + +// Thank you Satoshi + +struct VarInt_Iter { + uint8_t* ptr; + uint8_t* end; + uint8_t* start; +}; + +static inline void VarInt_mk(struct VarInt_Iter* out, uint8_t* ptr, int length) +{ + out->ptr = ptr; + out->end = ptr + length; + out->start = ptr; +} + +static inline void VarInt_toStart(struct VarInt_Iter* iter) +{ + iter->ptr = iter->start; +} + +static inline void VarInt_toEnd(struct VarInt_Iter* iter) +{ + iter->ptr = iter->end; +} + +static inline int VarInt_hasMore(struct VarInt_Iter* iter) +{ + return iter->end > iter->ptr; +} + +static inline int VarInt_pop(struct VarInt_Iter* iter, uint64_t* _out) +{ + uint64_t out = 0; + uint8_t* bytes = iter->ptr; + int len = iter->end - bytes; + if (len < 9) { + if (len < 5) { + if (len < 3) { + if (len < 1) { return -1; } + if (*bytes >= 0xfd) { return -1; } + } else if (*bytes >= 0xfe) { return -1; } + } else if (*bytes >= 0xff) { return -1; } + } + switch (*bytes) { + case 0xff: + out |= *++bytes; out <<= 8; + out |= *++bytes; out <<= 8; + out |= *++bytes; out <<= 8; + out |= *++bytes; out <<= 8; + case 0xfe: + out |= *++bytes; out <<= 8; + out |= *++bytes; out <<= 8; + case 0xfd: + out |= *++bytes; out <<= 8; + bytes++; + default: + out |= *bytes++; + } + iter->ptr = bytes; + if (_out) { *_out = out; } + return 0; +} + +static inline int VarInt_push(struct VarInt_Iter* iter, uint64_t val) +{ + uint8_t* ptr = iter->ptr; + int padding = ptr - iter->start; + if (padding < 9) { + if (padding < 5) { + if (padding < 3) { + if (padding < 1) { return -1; } + if (val > 0xfc) { return -1; } + } else if (val > 0xffff) { return -1; } + } else if (val > 0xffffffff) { return -1; } + } + // 1, 2, 4, 8 + int i = (!!(val >> 32)) * 4 + (!!(val >> 16)) * 2 + (!!((val + 3) >> 8)) + 1; + for (int j = 0; j < i; j++) { *--ptr = val & 0xff; val >>= 8; } + switch (i) { + case 8: *--ptr = 0xff; break; + case 4: *--ptr = 0xfe; break; + case 2: *--ptr = 0xfd; break; + } + iter->ptr = ptr; + return 0; +} + + +#endif \ No newline at end of file diff --git a/util/test/Set_test.c b/util/test/Set_test.c index d8a6c8177..4ef2360b1 100644 --- a/util/test/Set_test.c +++ b/util/test/Set_test.c @@ -38,7 +38,7 @@ static inline uint32_t hashCodeHighOnly(const uint32_t* a) #include #include -#define CYCLES 10000 +#define CYCLES 1 #define COUNT 100 @@ -106,7 +106,7 @@ int main() struct Random* rand = Random_new(mainAlloc, NULL, NULL); for (int cycles = 0; cycles < CYCLES; cycles++) { - struct Allocator* alloc = MallocAllocator_new(1<<18); + struct Allocator* alloc = MallocAllocator_new(1<<25); simpleTest(alloc, rand); Allocator_free(alloc); alloc = MallocAllocator_new(1<<18); diff --git a/util/test/VarInt_test.c b/util/test/VarInt_test.c new file mode 100644 index 000000000..82ead4056 --- /dev/null +++ b/util/test/VarInt_test.c @@ -0,0 +1,100 @@ +/* vim: set expandtab ts=4 sw=4: */ +/* + * You may redistribute this program and/or modify it under the terms of + * the GNU General Public License as published by the Free Software Foundation, + * either version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +#include "memory/MallocAllocator.h" +#include "crypto/random/Random.h" +#include "util/VarInt.h" +#include "util/Assert.h" +#include "util/Bits.h" + +#include + +#define BUF_SZ 1024 + +static void fidelityTest() +{ + struct Allocator* alloc = MallocAllocator_new(1<<20); + struct Random* rand = Random_new(alloc, NULL, NULL); + uint64_t* buf = Allocator_malloc(alloc, BUF_SZ*8); + Random_bytes(rand, (uint8_t*) buf, BUF_SZ*8); + + // make a mix of different size numbers + for (int i = 0; i < BUF_SZ; i++) { + uint64_t x = buf[i]; + switch (x & 3) { + case 3: break; + case 2: buf[i] = x >> 32; break; + case 1: buf[i] = x >> 48; break; + case 0: buf[i] = x >> 56; break; + } + } + + uint8_t* buf2 = Allocator_malloc(alloc, BUF_SZ*8); + struct VarInt_Iter iter; + VarInt_mk(&iter, buf2, BUF_SZ*8); + VarInt_toEnd(&iter); + + for (int i = 0; i < BUF_SZ; i++) { Assert_true(!VarInt_push(&iter, buf[i])); } + Assert_true(iter.ptr > buf2); + + for (int i = BUF_SZ - 1; i >= 0; i--) { + uint64_t x = ~0; + Assert_true(!VarInt_pop(&iter, &x)); + Assert_true(buf[i] == x); + } + + Assert_true(iter.ptr == iter.end); + + Allocator_free(alloc); +} + +static inline void test(uint8_t* bytes, int len, uint64_t num, uint8_t* buf) +{ + struct VarInt_Iter iter = { .ptr = bytes, .end = &bytes[len], .start = bytes }; + uint64_t out = ~0; + Assert_true(!VarInt_pop(&iter, &out)); + Assert_true(out == num); + Assert_true(iter.ptr == iter.end); + + buf[0] = buf[len+1] = 0xee; + struct VarInt_Iter iter2 = { .ptr = &buf[len+1], .end = &buf[len+1], .start = &buf[1] }; + if (len < 9) { Assert_true(VarInt_push(&iter2, 0xf000000000000000ull)); } + if (len < 5) { Assert_true(VarInt_push(&iter2, 0xf0000000)); } + if (len < 3) { Assert_true(VarInt_push(&iter2, 0xf000)); } + Assert_true(!VarInt_push(&iter2, num)); + Assert_true(iter2.ptr == iter2.start); + Assert_true(buf[0] == 0xee); + Assert_true(buf[len+1] == 0xee); +} +#define TEST(bytes, num) do { \ + uint8_t data[sizeof(bytes)+1] = {0}; \ + test(bytes, sizeof(bytes)-1, num, data); \ +} while (0) +// CHECKFILES_IGNORE expecting a ; + +static void simpleTest() +{ + TEST("\x00", 0x00); + TEST("\xAC", 0xAC); + TEST("\xFD\xAB\xCD", 0xABCD); + TEST("\xFE\xAB\xCD\xEF\x01", 0xABCDEF01); + TEST("\xFF\xAB\xCD\xEF\x01\x23\x45\x67\x89", 0xABCDEF0123456789ull); +} + +int main() +{ + simpleTest(); + fidelityTest(); + return 0; +} \ No newline at end of file diff --git a/wire/Announce.h b/wire/Announce.h index 22069e19d..aba1f4a30 100644 --- a/wire/Announce.h +++ b/wire/Announce.h @@ -80,9 +80,9 @@ static inline void Announce_EncodingScheme_push(struct Message* pushTo, String* * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * 0 | length | type | encodingForm | flags | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * 4 | MTU (8 byte units) | drops | + * 4 | MTU (8 byte units) | peer number | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - * 8 | latency | penalty | + * 8 | Unused | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * 12 | | * + + @@ -114,18 +114,12 @@ struct Announce_Peer // 0xffff = MTU of 542280 bytes uint16_t mtu8_be; - // Fraction of packets dropped in previous time-window (out of 65k) + // Number of the peer in the list, used for referencing in LinkState // 0xffff is unknown - uint16_t drops_be; + uint16_t peerNum_be; - // Average latency of packets in previous time-window (milliseconds) - // 0xffff is unknown - uint16_t latency_be; - - // Penalty which would be applied to a packet (with current penalty 0) - // if it passes through this link. - // 0xffff is unknown - uint16_t penalty_be; + // 0xffffffff + uint32_t unused; // Ipv6 of a node from which this node is reachable uint8_t ipv6[16]; @@ -142,6 +136,38 @@ static inline void Announce_Peer_init(struct Announce_Peer* peer) Bits_memset(peer, 0, Announce_Peer_SIZE); peer->length = Announce_Peer_SIZE; peer->type = Announce_Type_PEER; + peer->unused = 0xffffffff; + peer->peerNum_be = 0xffff; +} + +/** + * 1 2 3 + * 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + * 0 | length | type | padding | | + * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + * 4 | Compressed Link State..... | + * + + + */ + +struct Announce_LinkState +{ + // Length of linkState + 2 + uint8_t length; + + // Announce_Type_LINK_STATE + uint8_t type; + + // number of zero bytes before beginning of packed numbers + uint8_t padding; + + // linkState + uint8_t linkState[2]; +}; + +static inline void Announce_LinkState_applyHeader(struct Message* pushTo) +{ + Assert_failure("todo implement"); } struct Announce_ItemHeader diff --git a/wire/PFChan.h b/wire/PFChan.h index 6e74fd500..e43fea611 100644 --- a/wire/PFChan.h +++ b/wire/PFChan.h @@ -296,8 +296,24 @@ enum PFChan_Core */ PFChan_Core_UNSETUP_SESSION = 1038, - PFChan_Core__TOO_HIGH = 1039, + /** + * Will be emitted once every 3 seconds to inform pathfinders of the link state of + * the peering links, contains an array of PFChan_LinkState_Entry. + * (emitted by: InterfaceController.c) + */ + PFChan_Core_LINK_STATE = 1039, + + PFChan_Core__TOO_HIGH = 1040, +}; + +struct PFChan_LinkState_Entry { + uint32_t peerLabel_be; + uint32_t sumOfPackets_be; + uint32_t sumOfDrops_be; + uint32_t sumOfKb_be; }; +#define PFChan_LinkState_Entry_SIZE 16 +Assert_compileTime(sizeof(struct PFChan_LinkState_Entry) == PFChan_LinkState_Entry_SIZE); struct PFChan_Core_SearchReq { @@ -371,6 +387,7 @@ struct PFChan_FromCore struct PFChan_Msg msg; struct PFChan_Ping ping; struct PFChan_Ping pong; + struct PFChan_LinkState_Entry linkState; uint8_t bytes[4]; } content; };