-
-
Notifications
You must be signed in to change notification settings - Fork 78
/
strategy.rs
1238 lines (1157 loc) · 50.1 KB
/
strategy.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
use self::state::TracerState;
use crate::config::StrategyConfig;
use crate::error::{Error, Result};
use crate::net::Network;
use crate::probe::{
ProbeStatus, Response, ResponseData, ResponseSeq, ResponseSeqIcmp, ResponseSeqTcp,
ResponseSeqUdp,
};
use crate::types::{Sequence, TimeToLive, TraceId};
use crate::{MultipathStrategy, PortDirection, Protocol};
use std::net::IpAddr;
use std::time::{Duration, SystemTime};
use tracing::instrument;
/// The output from a round of tracing.
#[derive(Debug, Clone)]
pub struct Round<'a> {
/// The state of all `ProbeState` that were sent in the round.
pub probes: &'a [ProbeStatus],
/// The largest time-to-live (ttl) for which we received a reply in the round.
pub largest_ttl: TimeToLive,
/// Indicates what triggered the completion of the tracing round.
pub reason: CompletionReason,
}
impl<'a> Round<'a> {
#[must_use]
pub const fn new(
probes: &'a [ProbeStatus],
largest_ttl: TimeToLive,
reason: CompletionReason,
) -> Self {
Self {
probes,
largest_ttl,
reason,
}
}
}
/// Indicates what triggered the completion of the tracing round.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum CompletionReason {
/// The round ended because the target was found.
TargetFound,
/// The round ended because the time exceeded the configured maximum round time.
RoundTimeLimitExceeded,
}
/// Trace a path to a target.
#[derive(Debug, Clone)]
pub struct Strategy<F> {
config: StrategyConfig,
publish: F,
}
impl<F: Fn(&Round<'_>)> Strategy<F> {
#[instrument(skip_all)]
pub fn new(config: &StrategyConfig, publish: F) -> Self {
tracing::debug!(?config);
Self {
config: *config,
publish,
}
}
/// Run a continuous trace and publish results.
#[instrument(skip(self, network))]
pub fn run<N: Network>(self, mut network: N) -> Result<()> {
let mut state = TracerState::new(self.config);
while !state.finished(self.config.max_rounds) {
self.send_request(&mut network, &mut state)?;
self.recv_response(&mut network, &mut state)?;
self.update_round(&mut state);
}
Ok(())
}
/// Send the next probe if required.
///
/// Send a `ProbeState` for the next time-to-live (ttl) if all the following are true:
///
/// 1 - the target host has not been found
/// 2 - the next ttl is not greater than the maximum allowed ttl
/// 3 - if the target ttl of the target is known:
/// - the next ttl is not greater than the ttl of the target host observed from the prior
/// round
/// otherwise:
/// - the number of unknown-in-flight probes is lower than the maximum allowed
#[instrument(skip(self, network, st))]
fn send_request<N: Network>(&self, network: &mut N, st: &mut TracerState) -> Result<()> {
let can_send_ttl = if let Some(target_ttl) = st.target_ttl() {
st.ttl() <= target_ttl
} else {
st.ttl() - st.max_received_ttl().unwrap_or_default()
< TimeToLive(self.config.max_inflight.0)
};
if !st.target_found() && st.ttl() <= self.config.max_ttl && can_send_ttl {
let sent = SystemTime::now();
match self.config.protocol {
Protocol::Icmp => {
network.send_probe(st.next_probe(sent))?;
}
Protocol::Udp => network.send_probe(st.next_probe(sent))?,
Protocol::Tcp => {
let mut probe = if st.round_has_capacity() {
st.next_probe(sent)
} else {
return Err(Error::InsufficientCapacity);
};
while let Err(err) = network.send_probe(probe) {
match err {
Error::AddressNotAvailable(_) => {
if st.round_has_capacity() {
probe = st.reissue_probe(SystemTime::now());
} else {
return Err(Error::InsufficientCapacity);
}
}
other => return Err(other),
}
}
}
};
}
Ok(())
}
/// Read and process the next incoming `ICMP` packet.
///
/// We allow multiple probes to be in-flight at any time, and we cannot guarantee that responses
/// will be received in-order. We therefore maintain a buffer which holds details of each
/// `ProbeState` which is indexed by the offset of the sequence number from the sequence number
/// at the beginning of the round. The sequence number is set in the outgoing `ICMP`
/// `EchoRequest` (or `UDP` / `TCP`) packet and returned in both the `TimeExceeded` and
/// `EchoReply` responses.
///
/// Each incoming `ICMP` packet contains the original `ICMP` `EchoRequest` packet from which we
/// can read the `identifier` that we set which we can now validate to ensure we only
/// process responses which correspond to packets sent from this process. For The `UDP` and
/// `TCP` protocols, only packets destined for our src port will be delivered to us by the
/// OS and so no other `identifier` is needed, and so we allow the special case value of 0.
///
/// When we process an `EchoReply` from the target host we extract the time-to-live from the
/// corresponding original `EchoRequest`. Note that this may not be the greatest
/// time-to-live that was sent in the round as the algorithm will send `EchoRequest` with
/// larger time-to-live values before the `EchoReply` is received.
#[instrument(skip(self, network, st))]
fn recv_response<N: Network>(&self, network: &mut N, st: &mut TracerState) -> Result<()> {
let next = network.recv_probe()?;
match next {
Some(Response::TimeExceeded(data, icmp_code, extensions)) => {
let (trace_id, sequence, received, host) = self.extract(&data);
let is_target = host == self.config.target_addr;
if self.check_trace_id(trace_id) && st.in_round(sequence) && self.validate(&data) {
st.complete_probe_time_exceeded(
sequence, host, received, is_target, icmp_code, extensions,
);
}
}
Some(Response::DestinationUnreachable(data, icmp_code, extensions)) => {
let (trace_id, sequence, received, host) = self.extract(&data);
if self.check_trace_id(trace_id) && st.in_round(sequence) && self.validate(&data) {
st.complete_probe_unreachable(sequence, host, received, icmp_code, extensions);
}
}
Some(Response::EchoReply(data, icmp_code)) => {
let (trace_id, sequence, received, host) = self.extract(&data);
if self.check_trace_id(trace_id) && st.in_round(sequence) && self.validate(&data) {
st.complete_probe_echo_reply(sequence, host, received, icmp_code);
}
}
Some(Response::TcpReply(data) | Response::TcpRefused(data)) => {
let (trace_id, sequence, received, host) = self.extract(&data);
if self.check_trace_id(trace_id) && st.in_round(sequence) && self.validate(&data) {
st.complete_probe_other(sequence, host, received);
}
}
None => {}
}
Ok(())
}
/// Check if the round is complete and publish the results.
///
/// A round is considered to be complete when:
///
/// 1 - the round has exceeded the minimum round duration AND
/// 2 - the duration since the last packet was received exceeds the grace period AND
/// 3 - either:
/// A - the target has been found OR
/// B - the target has not been found and the round has exceeded the maximum round duration
#[instrument(skip(self, st))]
fn update_round(&self, st: &mut TracerState) {
let now = SystemTime::now();
let round_duration = now.duration_since(st.round_start()).unwrap_or_default();
let round_min = round_duration > self.config.min_round_duration;
let grace_exceeded = exceeds(st.received_time(), now, self.config.grace_duration);
let round_max = round_duration > self.config.max_round_duration;
let target_found = st.target_found();
if round_min && grace_exceeded && target_found || round_max {
self.publish_trace(st);
st.advance_round(self.config.first_ttl);
}
}
/// Publish details of all `ProbeState` in the completed round.
///
/// If the round completed without receiving an `EchoReply` from the target host then we also
/// publish the next `ProbeState` which is assumed to represent the TTL of the target host.
#[instrument(skip(self, state))]
fn publish_trace(&self, state: &TracerState) {
let max_received_ttl = if let Some(target_ttl) = state.target_ttl() {
target_ttl
} else {
state
.max_received_ttl()
.map_or(TimeToLive(0), |max_received_ttl| {
let max_sent_ttl = state.ttl() - TimeToLive(1);
max_sent_ttl.min(max_received_ttl + TimeToLive(1))
})
};
let probes = state.probes();
let largest_ttl = max_received_ttl;
let reason = if state.target_found() {
CompletionReason::TargetFound
} else {
CompletionReason::RoundTimeLimitExceeded
};
(self.publish)(&Round::new(probes, largest_ttl, reason));
}
/// Check if the `TraceId` matches the expected value for this tracer.
///
/// A special value of `0` is accepted for `udp` and `tcp` which do not have an identifier.
#[instrument(skip(self))]
fn check_trace_id(&self, trace_id: TraceId) -> bool {
self.config.trace_identifier == trace_id || trace_id == TraceId(0)
}
/// Validate the probe response data.
///
/// Carries out specific check for UDP/TCP probe responses. This is
/// required as the network layer may receive incoming ICMP
/// `DestinationUnreachable` (and other types) packets with a UDP/TCP
/// original datagram which does not correspond to a probe sent by the
/// tracer and must therefore be ignored.
///
/// For UDP and TCP probe responses, check that the src/dest ports and
/// dest address match the expected values.
///
/// For ICMP probe responses no additional checks are required.
fn validate(&self, resp: &ResponseData) -> bool {
const fn validate_ports(
port_direction: PortDirection,
src_port: u16,
dest_port: u16,
) -> bool {
match port_direction {
PortDirection::FixedSrc(src) if src.0 == src_port => true,
PortDirection::FixedDest(dest) if dest.0 == dest_port => true,
PortDirection::FixedBoth(src, dest) if src.0 == src_port && dest.0 == dest_port => {
true
}
_ => false,
}
}
match resp.resp_seq {
ResponseSeq::Icmp(_) => true,
ResponseSeq::Udp(ResponseSeqUdp {
dest_addr,
src_port,
dest_port,
has_magic,
..
}) => {
let check_ports = validate_ports(self.config.port_direction, src_port, dest_port);
let check_dest_addr = self.config.target_addr == dest_addr;
let check_magic = match (self.config.multipath_strategy, self.config.target_addr) {
(MultipathStrategy::Dublin, IpAddr::V6(_)) => has_magic,
_ => true,
};
check_dest_addr && check_ports && check_magic
}
ResponseSeq::Tcp(ResponseSeqTcp {
dest_addr,
src_port,
dest_port,
}) => {
let check_ports = validate_ports(self.config.port_direction, src_port, dest_port);
let check_dest_addr = self.config.target_addr == dest_addr;
check_dest_addr && check_ports
}
}
}
/// Extract the `TraceId`, `Sequence`, `SystemTime` and `IpAddr` from the `ProbeResponseData` in
/// a protocol specific way.
#[instrument(skip(self))]
fn extract(&self, resp: &ResponseData) -> (TraceId, Sequence, SystemTime, IpAddr) {
match resp.resp_seq {
ResponseSeq::Icmp(ResponseSeqIcmp {
identifier,
sequence,
}) => (
TraceId(identifier),
Sequence(sequence),
resp.recv,
resp.addr,
),
ResponseSeq::Udp(ResponseSeqUdp {
identifier,
src_port,
dest_port,
checksum,
payload_len,
..
}) => {
let sequence = match (
self.config.multipath_strategy,
self.config.port_direction,
self.config.target_addr,
) {
(MultipathStrategy::Classic, PortDirection::FixedDest(_), _) => src_port,
(MultipathStrategy::Classic, _, _) => dest_port,
(MultipathStrategy::Paris, _, _) => checksum,
(MultipathStrategy::Dublin, _, IpAddr::V4(_)) => identifier,
(MultipathStrategy::Dublin, _, IpAddr::V6(_)) => {
self.config.initial_sequence.0 + payload_len
}
};
(TraceId(0), Sequence(sequence), resp.recv, resp.addr)
}
ResponseSeq::Tcp(ResponseSeqTcp {
src_port,
dest_port,
..
}) => {
let sequence = match self.config.port_direction {
PortDirection::FixedSrc(_) => dest_port,
_ => src_port,
};
(TraceId(0), Sequence(sequence), resp.recv, resp.addr)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::MockNetwork;
use crate::probe::IcmpPacketCode;
use crate::{MaxRounds, Port};
use std::net::Ipv4Addr;
use std::num::NonZeroUsize;
// The network can return both `DestinationUnreachable` and `TcpRefused`
// for the same sequence number. This can occur for the target hop for
// TCP protocol as the network layer check for ICMP responses such as
// `DestinationUnreachable` and also synthesizes a `TcpRefused` response.
//
// This test simulates sending 1 TCP probe (seq=33000) and receiving two
// responses for that probe, a `DestinationUnreachable` followed by a
// `TcpRefused`.
#[test]
fn test_tcp_dest_unreachable_and_refused() -> anyhow::Result<()> {
let sequence = 33000;
let target_addr = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
let mut network = MockNetwork::new();
let mut seq = mockall::Sequence::new();
network.expect_send_probe().times(1).returning(|_| Ok(()));
network
.expect_recv_probe()
.times(1)
.in_sequence(&mut seq)
.returning(move || {
Ok(Some(Response::DestinationUnreachable(
ResponseData::new(
SystemTime::now(),
target_addr,
ResponseSeq::Tcp(ResponseSeqTcp::new(target_addr, sequence, 80)),
),
IcmpPacketCode(1),
None,
)))
});
network
.expect_recv_probe()
.times(1)
.in_sequence(&mut seq)
.returning(move || {
Ok(Some(Response::TcpRefused(ResponseData::new(
SystemTime::now(),
target_addr,
ResponseSeq::Tcp(ResponseSeqTcp::new(target_addr, sequence, 80)),
))))
});
let config = StrategyConfig {
target_addr,
max_rounds: Some(MaxRounds(NonZeroUsize::MIN)),
initial_sequence: Sequence(sequence),
port_direction: PortDirection::FixedDest(Port(80)),
protocol: Protocol::Tcp,
..Default::default()
};
let tracer = Strategy::new(&config, |_| {});
let mut state = TracerState::new(config);
tracer.send_request(&mut network, &mut state)?;
tracer.recv_response(&mut network, &mut state)?;
tracer.recv_response(&mut network, &mut state)?;
Ok(())
}
}
/// Mutable state needed for the tracing algorithm.
///
/// This is contained within a submodule to ensure that mutations are only performed via methods on
/// the `TracerState` struct.
mod state {
use crate::constants::MAX_SEQUENCE_PER_ROUND;
use crate::probe::{Extensions, IcmpPacketCode, IcmpPacketType, Probe, ProbeStatus};
use crate::strategy::StrategyConfig;
use crate::types::{MaxRounds, Port, RoundId, Sequence, TimeToLive, TraceId};
use crate::{Flags, MultipathStrategy, PortDirection, Protocol};
use std::array::from_fn;
use std::net::IpAddr;
use std::time::SystemTime;
use tracing::instrument;
/// The maximum number of `ProbeState` entries in the buffer.
///
/// This is larger than maximum number of time-to-live (TTL) we can support to allow for skipped
/// sequences.
const BUFFER_SIZE: u16 = MAX_SEQUENCE_PER_ROUND;
/// The maximum sequence number.
///
/// The sequence number is only ever wrapped between rounds, and so we need to ensure that there
/// are enough sequence numbers for a complete round.
///
/// A sequence number can be skipped if, for example, the port for that sequence number cannot
/// be bound as it is already in use.
///
/// To ensure each `ProbeState` is in the correct place in the buffer (i.e. the index into the
/// buffer is always `Probe.sequence - round_sequence`), when we skip a sequence we leave
/// the skipped `ProbeState` in-place and use the next slot for the next sequence.
///
/// We cap the number of sequences that can potentially be skipped in a round to ensure that
/// sequence number does not even need to wrap around during a round.
///
/// We only ever send `ttl` in the range 1..255, and so we may use all buffer capacity, except
/// the minimum needed to send up to a max `ttl` of 255 (a `ttl` of 0 is never sent).
const MAX_SEQUENCE: Sequence = Sequence(u16::MAX - BUFFER_SIZE);
/// Mutable state needed for the tracing algorithm.
#[derive(Debug)]
pub struct TracerState {
/// Tracer configuration.
config: StrategyConfig,
/// The state of all `ProbeState` requests and responses.
buffer: [ProbeStatus; BUFFER_SIZE as usize],
/// An increasing sequence number for every `EchoRequest`.
sequence: Sequence,
/// The starting sequence number of the current round.
round_sequence: Sequence,
/// The time-to-live for the _next_ `EchoRequest` packet to be sent.
ttl: TimeToLive,
/// The current round.
round: RoundId,
/// The timestamp of when the current round started.
round_start: SystemTime,
/// Did we receive an `EchoReply` from the target host in this round?
target_found: bool,
/// The maximum time-to-live echo response packet we have received.
max_received_ttl: Option<TimeToLive>,
/// The observed time-to-live of the `EchoReply` from the target host.
///
/// Note that this is _not_ reset each round and that it can also _change_ over time,
/// including going _down_ as responses can be received out-of-order.
target_ttl: Option<TimeToLive>,
/// The timestamp of the echo response packet.
received_time: Option<SystemTime>,
}
impl TracerState {
pub fn new(config: StrategyConfig) -> Self {
Self {
config,
buffer: from_fn(|_| ProbeStatus::default()),
sequence: config.initial_sequence,
round_sequence: config.initial_sequence,
ttl: config.first_ttl,
round: RoundId(0),
round_start: SystemTime::now(),
target_found: false,
max_received_ttl: None,
target_ttl: None,
received_time: None,
}
}
/// Get a slice of `ProbeState` for the current round.
pub fn probes(&self) -> &[ProbeStatus] {
let round_size = self.sequence - self.round_sequence;
&self.buffer[..round_size.0 as usize]
}
/// Get the `ProbeState` for `sequence`
pub fn probe_at(&self, sequence: Sequence) -> ProbeStatus {
self.buffer[usize::from(sequence - self.round_sequence)].clone()
}
pub const fn ttl(&self) -> TimeToLive {
self.ttl
}
pub const fn round_start(&self) -> SystemTime {
self.round_start
}
pub const fn target_found(&self) -> bool {
self.target_found
}
pub const fn max_received_ttl(&self) -> Option<TimeToLive> {
self.max_received_ttl
}
pub const fn target_ttl(&self) -> Option<TimeToLive> {
self.target_ttl
}
pub const fn received_time(&self) -> Option<SystemTime> {
self.received_time
}
/// Is `sequence` in the current round?
pub fn in_round(&self, sequence: Sequence) -> bool {
sequence >= self.round_sequence && sequence.0 - self.round_sequence.0 < BUFFER_SIZE
}
/// Do we have capacity in the current round for another sequence?
pub fn round_has_capacity(&self) -> bool {
let round_size = self.sequence - self.round_sequence;
round_size.0 < BUFFER_SIZE
}
/// Are all rounds complete?
pub const fn finished(&self, max_rounds: Option<MaxRounds>) -> bool {
match max_rounds {
None => false,
Some(max_rounds) => self.round.0 > max_rounds.0.get() - 1,
}
}
/// Create and return the next `Probe` at the current `sequence` and `ttl`.
///
/// We post-increment `ttl` here and so in practice we only allow `ttl` values in the range
/// `1..254` to allow us to use a `u8`.
#[instrument(skip(self))]
pub fn next_probe(&mut self, sent: SystemTime) -> Probe {
let (src_port, dest_port, identifier, flags) = self.probe_data();
let probe = Probe::new(
self.sequence,
identifier,
src_port,
dest_port,
self.ttl,
self.round,
sent,
flags,
);
let probe_index = usize::from(self.sequence - self.round_sequence);
self.buffer[probe_index] = ProbeStatus::Awaited(probe.clone());
debug_assert!(self.ttl < TimeToLive(u8::MAX));
self.ttl += TimeToLive(1);
debug_assert!(self.sequence < Sequence(u16::MAX));
self.sequence += Sequence(1);
probe
}
/// Re-issue the `Probe` with the next sequence number.
///
/// This will mark the `ProbeState` at the previous `sequence` as skipped and re-create it
/// with the previous `ttl` and the current `sequence`.
///
/// For example, if the sequence is `4` and the `ttl` is `5` prior to calling this method
/// then afterward:
/// - The `ProbeState` at sequence `3` will be set to `Skipped` state
/// - A new `ProbeState` will be created at sequence `4` with a `ttl` of `5`
#[instrument(skip(self))]
pub fn reissue_probe(&mut self, sent: SystemTime) -> Probe {
let probe_index = usize::from(self.sequence - self.round_sequence);
self.buffer[probe_index - 1] = ProbeStatus::Skipped;
let (src_port, dest_port, identifier, flags) = self.probe_data();
let probe = Probe::new(
self.sequence,
identifier,
src_port,
dest_port,
self.ttl - TimeToLive(1),
self.round,
sent,
flags,
);
self.buffer[probe_index] = ProbeStatus::Awaited(probe.clone());
debug_assert!(self.sequence < Sequence(u16::MAX));
self.sequence += Sequence(1);
probe
}
/// Determine the `src_port`, `dest_port` and `identifier` for the current probe.
///
/// This will differ depending on the `TracerProtocol`, `MultipathStrategy` &
/// `PortDirection`.
fn probe_data(&self) -> (Port, Port, TraceId, Flags) {
match self.config.protocol {
Protocol::Icmp => self.probe_icmp_data(),
Protocol::Udp => self.probe_udp_data(),
Protocol::Tcp => self.probe_tcp_data(),
}
}
/// Determine the `src_port`, `dest_port` and `identifier` for the current ICMP probe.
const fn probe_icmp_data(&self) -> (Port, Port, TraceId, Flags) {
(
Port(0),
Port(0),
self.config.trace_identifier,
Flags::empty(),
)
}
/// Determine the `src_port`, `dest_port` and `identifier` for the current UDP probe.
fn probe_udp_data(&self) -> (Port, Port, TraceId, Flags) {
match self.config.multipath_strategy {
MultipathStrategy::Classic => match self.config.port_direction {
PortDirection::FixedSrc(src_port) => (
Port(src_port.0),
Port(self.sequence.0),
TraceId(0),
Flags::empty(),
),
PortDirection::FixedDest(dest_port) => (
Port(self.sequence.0),
Port(dest_port.0),
TraceId(0),
Flags::empty(),
),
PortDirection::FixedBoth(_, _) | PortDirection::None => {
unimplemented!()
}
},
MultipathStrategy::Paris => {
let round_port = ((self.config.initial_sequence.0 as usize + self.round.0)
% usize::from(u16::MAX)) as u16;
match self.config.port_direction {
PortDirection::FixedSrc(src_port) => (
Port(src_port.0),
Port(round_port),
TraceId(0),
Flags::PARIS_CHECKSUM,
),
PortDirection::FixedDest(dest_port) => (
Port(round_port),
Port(dest_port.0),
TraceId(0),
Flags::PARIS_CHECKSUM,
),
PortDirection::FixedBoth(src_port, dest_port) => (
Port(src_port.0),
Port(dest_port.0),
TraceId(0),
Flags::PARIS_CHECKSUM,
),
PortDirection::None => unimplemented!(),
}
}
MultipathStrategy::Dublin => {
let round_port = ((self.config.initial_sequence.0 as usize + self.round.0)
% usize::from(u16::MAX)) as u16;
match self.config.port_direction {
PortDirection::FixedSrc(src_port) => (
Port(src_port.0),
Port(round_port),
TraceId(self.sequence.0),
Flags::DUBLIN_IPV6_PAYLOAD_LENGTH,
),
PortDirection::FixedDest(dest_port) => (
Port(round_port),
Port(dest_port.0),
TraceId(self.sequence.0),
Flags::DUBLIN_IPV6_PAYLOAD_LENGTH,
),
PortDirection::FixedBoth(src_port, dest_port) => (
Port(src_port.0),
Port(dest_port.0),
TraceId(self.sequence.0),
Flags::DUBLIN_IPV6_PAYLOAD_LENGTH,
),
PortDirection::None => unimplemented!(),
}
}
}
}
/// Determine the `src_port`, `dest_port` and `identifier` for the current TCP probe.
fn probe_tcp_data(&self) -> (Port, Port, TraceId, Flags) {
let (src_port, dest_port) = match self.config.port_direction {
PortDirection::FixedSrc(src_port) => (src_port.0, self.sequence.0),
PortDirection::FixedDest(dest_port) => (self.sequence.0, dest_port.0),
PortDirection::FixedBoth(_, _) | PortDirection::None => unimplemented!(),
};
(Port(src_port), Port(dest_port), TraceId(0), Flags::empty())
}
/// Mark the `ProbeState` at `sequence` completed as `TimeExceeded` and update the round
/// state.
#[instrument(skip(self))]
pub fn complete_probe_time_exceeded(
&mut self,
sequence: Sequence,
host: IpAddr,
received: SystemTime,
is_target: bool,
icmp_code: IcmpPacketCode,
extensions: Option<Extensions>,
) {
self.complete_probe(
sequence,
IcmpPacketType::TimeExceeded(icmp_code),
host,
received,
is_target,
extensions,
);
}
/// Mark the `ProbeState` at `sequence` completed as `Unreachable` and update the round
/// state.
#[instrument(skip(self))]
pub fn complete_probe_unreachable(
&mut self,
sequence: Sequence,
host: IpAddr,
received: SystemTime,
icmp_code: IcmpPacketCode,
extensions: Option<Extensions>,
) {
self.complete_probe(
sequence,
IcmpPacketType::Unreachable(icmp_code),
host,
received,
true,
extensions,
);
}
/// Mark the `ProbeState` at `sequence` completed as `EchoReply` and update the round state.
#[instrument(skip(self))]
pub fn complete_probe_echo_reply(
&mut self,
sequence: Sequence,
host: IpAddr,
received: SystemTime,
icmp_code: IcmpPacketCode,
) {
self.complete_probe(
sequence,
IcmpPacketType::EchoReply(icmp_code),
host,
received,
true,
None,
);
}
/// Mark the `ProbeState` at `sequence` completed as `NotApplicable` and update the round
/// state.
#[instrument(skip(self))]
pub fn complete_probe_other(
&mut self,
sequence: Sequence,
host: IpAddr,
received: SystemTime,
) {
self.complete_probe(
sequence,
IcmpPacketType::NotApplicable,
host,
received,
true,
None,
);
}
/// Update the state of a `ProbeState` and the trace.
///
/// We want to update:
///
/// - the `target_ttl` to be the time-to-live of the `ProbeState` request from the target
/// - the `max_received_ttl` we have observed this round
/// - the latest packet `received_time` in this round
/// - whether the target has been found in this round
///
/// The ICMP replies may arrive out-of-order, and so we must be careful here to avoid
/// overwriting the state with stale values. We may also receive multiple replies
/// from the target host with differing time-to-live values and so must ensure we
/// use the time-to-live with the lowest sequence number.
#[instrument(skip(self))]
fn complete_probe(
&mut self,
sequence: Sequence,
icmp_packet_type: IcmpPacketType,
host: IpAddr,
received: SystemTime,
is_target: bool,
extensions: Option<Extensions>,
) {
// Retrieve and update the `ProbeState` at `sequence`.
let probe = self.probe_at(sequence);
let awaited = match probe {
ProbeStatus::Awaited(awaited) => awaited,
// there is a valid scenario for TCP where a probe is already
// `Complete`, see `test_tcp_dest_unreachable_and_refused`.
ProbeStatus::Complete(_) => {
return;
}
_ => {
debug_assert!(
false,
"completed probe was not in Awaited state (probe={probe:#?})"
);
return;
}
};
let completed = awaited.complete(host, received, icmp_packet_type, extensions);
let ttl = completed.ttl;
self.buffer[usize::from(sequence - self.round_sequence)] =
ProbeStatus::Complete(completed);
// If this `ProbeState` found the target then we set the `target_tll` if not already
// set, being careful to account for `Probes` being received out-of-order.
//
// If this `ProbeState` did not find the target but has a ttl that is greater or equal
// to the target ttl (if known) then we reset the target ttl to None. This
// is to support Equal Cost Multi-path Routing (ECMP) cases where the number
// of hops to the target will vary over the lifetime of the trace.
self.target_ttl = if is_target {
match self.target_ttl {
None => Some(ttl),
Some(target_ttl) if ttl < target_ttl => Some(ttl),
Some(target_ttl) => Some(target_ttl),
}
} else {
match self.target_ttl {
Some(target_ttl) if ttl >= target_ttl => None,
Some(target_ttl) => Some(target_ttl),
None => None,
}
};
self.max_received_ttl = match self.max_received_ttl {
None => Some(ttl),
Some(max_received_ttl) => Some(max_received_ttl.max(ttl)),
};
self.received_time = Some(received);
self.target_found |= is_target;
}
/// Advance to the next round.
///
/// If, during the rond which just completed, we went above the max sequence number then we
/// reset it here. We do this here to avoid having to deal with the sequence number
/// wrapping during a round, which is more problematic.
#[instrument(skip(self))]
pub fn advance_round(&mut self, first_ttl: TimeToLive) {
if self.sequence >= self.max_sequence() {
self.sequence = self.config.initial_sequence;
}
self.target_found = false;
self.round_sequence = self.sequence;
self.received_time = None;
self.round_start = SystemTime::now();
self.max_received_ttl = None;
self.round += RoundId(1);
self.ttl = first_ttl;
}
/// The maximum sequence number allowed.
///
/// The Dublin multipath strategy for IPv6/udp encodes the sequence
/// number as the payload length and consequently the maximum sequence
/// number must be no larger than the maximum IPv6/udp payload size.
///
/// It is also required that the range of possible sequence numbers is
/// _at least_ `BUFFER_SIZE` to ensure delayed responses from a prior
/// round are not incorrectly associated with later rounds (see
/// `in_round` function).
fn max_sequence(&self) -> Sequence {
match (self.config.multipath_strategy, self.config.target_addr) {
(MultipathStrategy::Dublin, IpAddr::V6(_)) => {
self.config.initial_sequence + Sequence(BUFFER_SIZE)
}
_ => MAX_SEQUENCE,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::probe::IcmpPacketType;
use crate::types::MaxInflight;
use rand::Rng;
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
#[allow(
clippy::cognitive_complexity,
clippy::too_many_lines,
clippy::bool_assert_comparison
)]
#[test]
fn test_state() {
let mut state = TracerState::new(cfg(Sequence(33000)));
// Validate the initial TracerState
assert_eq!(state.round, RoundId(0));
assert_eq!(state.sequence, Sequence(33000));
assert_eq!(state.round_sequence, Sequence(33000));
assert_eq!(state.ttl, TimeToLive(1));
assert_eq!(state.max_received_ttl, None);
assert_eq!(state.received_time, None);
assert_eq!(state.target_ttl, None);
assert_eq!(state.target_found, false);
// The initial state of the probe before sending
let prob_init = state.probe_at(Sequence(33000));
assert_eq!(ProbeStatus::NotSent, prob_init);
// Prepare probe 1 (round 0, sequence 33000, ttl 1) for sending
let sent_1 = SystemTime::now();
let probe_1 = state.next_probe(sent_1);
assert_eq!(probe_1.sequence, Sequence(33000));
assert_eq!(probe_1.ttl, TimeToLive(1));
assert_eq!(probe_1.round, RoundId(0));
assert_eq!(probe_1.sent, sent_1);
// Update the state of the probe 1 after receiving a TimeExceeded
let received_1 = SystemTime::now();
let host = IpAddr::V4(Ipv4Addr::LOCALHOST);
state.complete_probe_time_exceeded(
Sequence(33000),
host,
received_1,
false,
IcmpPacketCode(1),
None,
);
// Validate the state of the probe 1 after the update
let probe_1_fetch = state.probe_at(Sequence(33000)).try_into_complete().unwrap();
assert_eq!(probe_1_fetch.sequence, Sequence(33000));
assert_eq!(probe_1_fetch.ttl, TimeToLive(1));
assert_eq!(probe_1_fetch.round, RoundId(0));
assert_eq!(probe_1_fetch.received, received_1);
assert_eq!(probe_1_fetch.host, host);
assert_eq!(probe_1_fetch.sent, sent_1);
assert_eq!(
probe_1_fetch.icmp_packet_type,
IcmpPacketType::TimeExceeded(IcmpPacketCode(1))
);
// Validate the TracerState after the update
assert_eq!(state.round, RoundId(0));
assert_eq!(state.sequence, Sequence(33001));
assert_eq!(state.round_sequence, Sequence(33000));
assert_eq!(state.ttl, TimeToLive(2));
assert_eq!(state.max_received_ttl, Some(TimeToLive(1)));
assert_eq!(state.received_time, Some(received_1));
assert_eq!(state.target_ttl, None);
assert_eq!(state.target_found, false);
// Validate the probes() iterator returns only a single probe
{
let mut probe_iter = state.probes().iter();
let probe_next1 = probe_iter.next().unwrap();
assert_eq!(ProbeStatus::Complete(probe_1_fetch), probe_next1.clone());
assert_eq!(None, probe_iter.next());
}
// Advance to the next round
state.advance_round(TimeToLive(1));