Skip to content

Commit

Permalink
Discovery tests (#1894)
Browse files Browse the repository at this point in the history
* nlog discovery manager loggs

* console write line test

* WIP

* wip

* merged master

* fix

* wip

* NettyTest are workting correctly on mac

* console.write removal

* another console

* ignoring snappy test to see if build pass on mac

* wip

* remove ignore on snappy tests

* CR fixes

* putting commented code into region

* wip

* more consoles\try catch for better view on hive

* more consoles

* console logs in ping adress validation

* removing source destination null checkin ping adress validator

* refactor of console log to write less useless data

* more specific console in ping adress validation

* comment out some code in discovery manager

* wip

* wip

* added IPResolver to DiscoveryManager as well as more consoles

* wip

* wip on ignoring spoof atacks

* removal of consoles in discovery module

* wip

* wip

* wip

* remade IsBonded in NodeLifecycleManager

* remove consoles in discovery manager

* update submodule

* etherum/tests updated

* go back to default eth/tests commit

* revert Dirichlet update
  • Loading branch information
SebastianDremo authored May 19, 2020
1 parent 0f4f5a0 commit e60398f
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DiscoveryManagerTests
private string _host = "192.168.1.17";
private Node[] _nodes;
private PublicKey _publicKey;
private IIPResolver _ipResolver;

[SetUp]
public void Initialize()
Expand All @@ -75,13 +76,15 @@ public void Initialize()

_timestamper = Timestamper.Default;

_ipResolver = new IPResolver(_networkConfig, logManager);

var evictionManager = new EvictionManager(_nodeTable, logManager);
var lifecycleFactory = new NodeLifecycleManagerFactory(_nodeTable, new DiscoveryMessageFactory(_timestamper), evictionManager, new NodeStatsManager(statsConfig, logManager), discoveryConfig, logManager);

_nodes = new[] {new Node("192.168.1.18", 1), new Node("192.168.1.19", 2)};

IFullDb nodeDb = new SimpleFilePublicKeyDb("Test", "test_db", logManager);
_discoveryManager = new DiscoveryManager(lifecycleFactory, _nodeTable, new NetworkStorage(nodeDb, logManager), discoveryConfig, logManager);
_discoveryManager = new DiscoveryManager(lifecycleFactory, _nodeTable, new NetworkStorage(nodeDb, logManager), discoveryConfig, logManager, _ipResolver);
_discoveryManager.MessageSender = _messageSender;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class NettyDiscoveryHandlerTests
private readonly PrivateKey _privateKey2 = new PrivateKey("3a1076bf45ab87712ad64ccb3b10217737f7faacbf2872e88fdd9a537d8fe266");
private List<IChannel> _channels;
private List<NettyDiscoveryHandler> _discoveryHandlers;
private List<IDiscoveryManager> _discoveryManagers;
private List<IDiscoveryManager> _discoveryManagersMocks;
private readonly IPEndPoint _address = new IPEndPoint(IPAddress.Loopback, 10001);
private readonly IPEndPoint _address2 = new IPEndPoint(IPAddress.Loopback, 10002);
private int _channelActivatedCounter;
Expand All @@ -57,21 +57,19 @@ public async Task Initialize()
{
_channels = new List<IChannel>();
_discoveryHandlers = new List<NettyDiscoveryHandler>();
_discoveryManagers = new List<IDiscoveryManager>();
_discoveryManagersMocks = new List<IDiscoveryManager>();
_channelActivatedCounter = 0;
var discoveryManager = Substitute.For<IDiscoveryManager>();
var discoveryManagerMock = Substitute.For<IDiscoveryManager>();
var messageSerializationService = Build.A.SerializationService().WithDiscovery(_privateKey).TestObject;

var discoveryManager2 = Substitute.For<IDiscoveryManager>();
var discoveryManagerMock2 = Substitute.For<IDiscoveryManager>();
var messageSerializationService2 = Build.A.SerializationService().WithDiscovery(_privateKey).TestObject;

await StartUdpChannel("127.0.0.1", 10001, discoveryManager, messageSerializationService);
await StartUdpChannel("127.0.0.1", 10002, discoveryManager2, messageSerializationService2);
await StartUdpChannel("127.0.0.1", 10001, discoveryManagerMock, messageSerializationService);
await StartUdpChannel("127.0.0.1", 10002, discoveryManagerMock2, messageSerializationService2);

_discoveryManagers.Add(discoveryManager);
_discoveryManagers.Add(discoveryManager2);

Thread.Sleep(50);
_discoveryManagersMocks.Add(discoveryManagerMock);
_discoveryManagersMocks.Add(discoveryManagerMock2);

Assert.AreEqual(2, _channelActivatedCounter);
}
Expand All @@ -97,7 +95,7 @@ public void PingSentReceivedTest()
};
_discoveryHandlers[0].SendMessage(msg);
SleepWhileWaiting();
_discoveryManagers[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Ping));
_discoveryManagersMocks[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Ping));

var msg2 = new PingMessage
{
Expand All @@ -109,7 +107,7 @@ public void PingSentReceivedTest()
};
_discoveryHandlers[1].SendMessage(msg2);
SleepWhileWaiting();
_discoveryManagers[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Ping));
_discoveryManagersMocks[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Ping));
}

[Test]
Expand All @@ -125,7 +123,7 @@ public void PongSentReceivedTest()
};
_discoveryHandlers[0].SendMessage(msg);
SleepWhileWaiting();
_discoveryManagers[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Pong));
_discoveryManagersMocks[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Pong));

var msg2 = new PongMessage
{
Expand All @@ -136,14 +134,9 @@ public void PongSentReceivedTest()
};
_discoveryHandlers[1].SendMessage(msg2);
SleepWhileWaiting();
_discoveryManagers[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Pong));
}

private static void SleepWhileWaiting()
{
Thread.Sleep((TestContext.CurrentContext.CurrentRepeatCount + 1) * 300);
_discoveryManagersMocks[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Pong));
}

[Test]
[Retry(5)]
public void FindNodeSentReceivedTest()
Expand All @@ -157,7 +150,7 @@ public void FindNodeSentReceivedTest()
};
_discoveryHandlers[0].SendMessage(msg);
SleepWhileWaiting();
_discoveryManagers[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.FindNode));
_discoveryManagersMocks[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.FindNode));

var msg2 = new FindNodeMessage
{
Expand All @@ -168,7 +161,7 @@ public void FindNodeSentReceivedTest()
};
_discoveryHandlers[1].SendMessage(msg2);
SleepWhileWaiting();
_discoveryManagers[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.FindNode));
_discoveryManagersMocks[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.FindNode));
}

[Test]
Expand All @@ -184,7 +177,7 @@ public void NeighborsSentReceivedTest()
};
_discoveryHandlers[0].SendMessage(msg);
SleepWhileWaiting();
_discoveryManagers[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Neighbors));
_discoveryManagersMocks[1].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Neighbors));

var msg2 = new NeighborsMessage
{
Expand All @@ -195,7 +188,7 @@ public void NeighborsSentReceivedTest()
};
_discoveryHandlers[1].SendMessage(msg2);
SleepWhileWaiting();
_discoveryManagers[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Neighbors));
_discoveryManagersMocks[0].Received(1).OnIncomingMessage(Arg.Is<DiscoveryMessage>(x => x.MessageType == MessageType.Neighbors));
}

private async Task StartUdpChannel(string address, int port, IDiscoveryManager discoveryManager, IMessageSerializationService service)
Expand Down Expand Up @@ -224,5 +217,10 @@ private void InitializeChannel(IDatagramChannel channel, IDiscoveryManager disco
.AddLast(new LoggingHandler(DotNetty.Handlers.Logging.LogLevel.TRACE))
.AddLast(handler);
}

private static void SleepWhileWaiting()
{
Thread.Sleep((TestContext.CurrentContext.CurrentRepeatCount + 1) * 300);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class NodeLifecycleManagerTests
private INodeTable _nodeTable;
private IConfigProvider _configurationProvider;
private ITimestamper _timestamper;
private IIPResolver _ipResolverMock;
private int _port = 1;
private string _host = "192.168.1.27";

Expand All @@ -69,6 +70,8 @@ public void Initialize()
discoveryConfig.BucketSize = 3;
discoveryConfig.BucketsCount = 1;

_ipResolverMock = Substitute.For<IIPResolver>();

IStatsConfig statsConfig = _configurationProvider.GetConfig<IStatsConfig>();

var calculator = new NodeDistanceCalculator(discoveryConfig);
Expand All @@ -84,7 +87,7 @@ public void Initialize()
_udpClient = Substitute.For<IMessageSender>();

var discoveryDb = new SimpleFilePublicKeyDb("Test","test", logManager);
_discoveryManager = new DiscoveryManager(lifecycleFactory, _nodeTable, new NetworkStorage(discoveryDb, logManager), discoveryConfig, logManager);
_discoveryManager = new DiscoveryManager(lifecycleFactory, _nodeTable, new NetworkStorage(discoveryDb, logManager), discoveryConfig, logManager, _ipResolverMock);
_discoveryManager.MessageSender = _udpClient;
}

Expand Down
15 changes: 5 additions & 10 deletions src/Nethermind/Nethermind.Network/Discovery/DiscoveryManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class DiscoveryManager : IDiscoveryManager
private readonly ConcurrentDictionary<Keccak, INodeLifecycleManager> _nodeLifecycleManagers = new ConcurrentDictionary<Keccak, INodeLifecycleManager>();
private readonly INodeTable _nodeTable;
private readonly INetworkStorage _discoveryStorage;
private readonly IIPResolver _ipResolver;

private readonly ConcurrentDictionary<MessageTypeKey, TaskCompletionSource<DiscoveryMessage>> _waitingEvents = new ConcurrentDictionary<MessageTypeKey, TaskCompletionSource<DiscoveryMessage>>();
private IMessageSender _messageSender;
Expand All @@ -52,14 +53,16 @@ public DiscoveryManager(
INodeTable nodeTable,
INetworkStorage discoveryStorage,
IDiscoveryConfig discoveryConfig,
ILogManager logManager)
ILogManager logManager,
IIPResolver ipResolver)
{
_logger = logManager.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));
_discoveryConfig = discoveryConfig ?? throw new ArgumentNullException(nameof(discoveryConfig));
_nodeLifecycleManagerFactory = nodeLifecycleManagerFactory ?? throw new ArgumentNullException(nameof(nodeLifecycleManagerFactory));
_nodeTable = nodeTable ?? throw new ArgumentNullException(nameof(nodeTable));
_discoveryStorage = discoveryStorage ?? throw new ArgumentNullException(nameof(discoveryStorage));
_nodeLifecycleManagerFactory.DiscoveryManager = this;
_ipResolver = ipResolver;
}

public IMessageSender MessageSender
Expand All @@ -72,7 +75,6 @@ public void OnIncomingMessage(DiscoveryMessage message)
try
{
if (_logger.IsTrace) _logger.Trace($"Received msg: {message}");

MessageType msgType = message.MessageType;

Node node = new Node(message.FarPublicKey, message.FarAddress);
Expand Down Expand Up @@ -155,7 +157,6 @@ public INodeLifecycleManager GetNodeLifecycleManager(Node node, bool isPersisted
public void SendMessage(DiscoveryMessage discoveryMessage)
{
if (_logger.IsTrace) _logger.Trace($"Sending msg: {discoveryMessage}");

try
{
if (discoveryMessage is PingMessage pingMessage)
Expand Down Expand Up @@ -208,17 +209,11 @@ public IReadOnlyCollection<INodeLifecycleManager> GetOrAddNodeLifecycleManagers(

private bool ValidatePingAddress(PingMessage message)
{
if (message.DestinationAddress == null || message.SourceAddress == null || message.FarAddress == null)
if (message.DestinationAddress == null || message.FarAddress == null)
{
if (_logger.IsDebug) _logger.Debug($"Received a ping message with empty address, message: {message}");
return false;
}

if (!Bytes.AreEqual(_nodeTable.MasterNode.Address.Address.MapToIPv6().GetAddressBytes(), message.DestinationAddress?.Address.MapToIPv6().GetAddressBytes()))
{
if (_logger.IsDebug) _logger.Debug($"Received a message with incorrect destination address, message: {message}");
return false;
}

#region
// port will be different as we dynamically open ports for each socket connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public NodeLifecycleManager(Node node, IDiscoveryManager discoveryManager, INode
public Node ManagedNode { get; }
public NodeLifecycleState State { get; private set; }
public INodeStats NodeStats { get; }
public bool IsBonded => (_sentPing && _receivedPong) || (_receivedPing && _sentPong);
public bool IsBonded => (_sentPing && _receivedPong) && (_receivedPing && _sentPong);

public event EventHandler<NodeLifecycleState> OnStateChanged;

Expand Down Expand Up @@ -95,6 +95,8 @@ public void ProcessPongMessage(PongMessage discoveryMessage)
else
{
// ignore spoofed message
_receivedPong = false;
return;
}
}

Expand Down Expand Up @@ -179,6 +181,7 @@ public void SendPong(PingMessage discoveryMessage)
{
PongMessage msg = _discoveryMessageFactory.CreateOutgoingMessage<PongMessage>(ManagedNode);
msg.PingMdc = discoveryMessage.Mdc;

_discoveryManager.SendMessage(msg);
NodeStats.AddNodeStatsEvent(NodeStatsEventType.DiscoveryPongOut);
_sentPong = true;
Expand All @@ -192,7 +195,8 @@ public void SendNeighbors(Node[] nodes)
{
if (!IsBonded)
{
if (_logger.IsWarn) _logger.Warn("Sending NEIGHBOURS before bonding");
if (_logger.IsWarn) _logger.Warn("Attempt to send NEIGHBOURS before bonding");
return;
}

NeighborsMessage msg = _discoveryMessageFactory.CreateOutgoingMessage<NeighborsMessage>(ManagedNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ await _channel.WriteAndFlushAsync(packet).ContinueWith(t =>
}
});
}

protected override void ChannelRead0(IChannelHandlerContext ctx, DatagramPacket packet)
{
IByteBuffer content = packet.Content;
Expand Down Expand Up @@ -197,19 +196,19 @@ private bool ValidateMessage(DiscoveryMessage message, MessageType type, EndPoin

return true;
}

private static void ReportMessageByType(DiscoveryMessage message)
private void ReportMessageByType(DiscoveryMessage message)
{
if (message is PingMessage pingMessage)
{
if (NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportIncomingMessage(pingMessage.FarAddress.Address.ToString(), "HANDLER disc v4", $"Ping {pingMessage.SourceAddress.Address} -> {pingMessage.DestinationAddress.Address}");
}
else
{
if (NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportIncomingMessage(message.FarAddress.Address.ToString(), "HANDLER disc v4", message.MessageType.ToString());
}
if (message is PingMessage pingMessage)
{
if(NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportIncomingMessage(pingMessage.FarAddress.Address.ToString(), "HANDLER disc v4", $"PING {pingMessage.SourceAddress.Address} -> {pingMessage.DestinationAddress.Address}");
}
else
{
if(NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportIncomingMessage(message.FarAddress.Address.ToString(), "HANDLER disc v4", message.MessageType.ToString());
}
}

public event EventHandler OnChannelActivated;
}
}
3 changes: 0 additions & 3 deletions src/Nethermind/Nethermind.Network/IPResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using Nethermind.Core;
using Nethermind.Logging;
using Nethermind.Network.Config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ private void InitDiscovery()
SameKeyGenerator privateKeyProvider = new SameKeyGenerator(_ctx.NodeKey);
DiscoveryMessageFactory discoveryMessageFactory = new DiscoveryMessageFactory(_ctx.Timestamper);
NodeIdResolver nodeIdResolver = new NodeIdResolver(_ctx.EthereumEcdsa);
IPResolver ipResolver = new IPResolver(_networkConfig, _ctx.LogManager);

IDiscoveryMsgSerializersProvider msgSerializersProvider = new DiscoveryMsgSerializersProvider(
_ctx._messageSerializationService,
Expand Down Expand Up @@ -261,7 +262,9 @@ private void InitDiscovery()
nodeTable,
discoveryStorage,
discoveryConfig,
_ctx.LogManager);
_ctx.LogManager,
ipResolver
);

NodesLocator nodesLocator = new NodesLocator(
nodeTable,
Expand Down

0 comments on commit e60398f

Please sign in to comment.