Skip to content

Commit

Permalink
refactor: update NTP spike detector
Browse files Browse the repository at this point in the history
See #7080 (comment)

The NTP spike detector code was refactored out of the main NTP code so
that it can be unit-tested.

I dropped one check which I think is causing false-positives in the
spike detector (when NTP offset is higher than the RTT of the best
packet received so far).

The overall flow resembles the one in systemd-timesync, the current
implementation has this check:

https://github.com/systemd/systemd/blob/6639ac474eb7a5325a72a3d7370492792dd00bc0/src/timesync/timesyncd-manager.c#L357-L360

This check was introduced in the initial release, after some
refactoring:

systemd/systemd@3dbc762#diff-4aa9995f07bb31b9884d40a7634f5f6d30245dfd26ac27b89cd5fd3bd4eef56aR429-R431

There is no equivalent of it in the RFC:

https://datatracker.ietf.org/doc/html/rfc5905#appendix-A.5.2

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Aug 29, 2023
1 parent af0cc70 commit 2d3ac92
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 67 deletions.
109 changes: 109 additions & 0 deletions internal/pkg/ntp/internal/spike/spike.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package spike provides a spike detector for NTP responses.
package spike

import (
"math"

"github.com/beevik/ntp"
)

const defaultCapacity = 8

// Sample is a single NTP response sample.
type Sample struct {
Offset, RTT float64 // in seconds
}

// SampleFromNTPResponse converts an NTP response to a Sample.
func SampleFromNTPResponse(resp *ntp.Response) Sample {
return Sample{
Offset: resp.ClockOffset.Seconds(),
RTT: resp.RTT.Seconds(),
}
}

// Detector detects spikes in NTP response samples.
//
// Zero value is ready to use.
type Detector struct {
packetCount int64
samples []Sample
samplesIdx int
samplesJitter float64
}

// IsSpike returns true if the given sample is a spike.
//
// The sample is added to the detector's internal state.
func (d *Detector) IsSpike(sample Sample) bool {
if d.samples == nil {
d.samples = make([]Sample, defaultCapacity)
}

d.packetCount++

if d.packetCount == 1 {
// ignore first packet
return false
}

var currentIndex int

currentIndex, d.samplesIdx = d.samplesIdx, (d.samplesIdx+1)%len(d.samples)

d.samples[d.samplesIdx] = sample

jitter := d.samplesJitter

indexMin := currentIndex

for i := range d.samples {
if d.samples[i].RTT == 0 {
continue
}

if d.samples[i].RTT < d.samples[indexMin].RTT {
indexMin = i
}
}

var j float64

for i := range d.samples {
j += math.Pow(d.samples[i].Offset-d.samples[indexMin].Offset, 2)
}

d.samplesJitter = math.Sqrt(j / (float64(len(d.samples)) - 1))

if math.Abs(sample.Offset) > sample.RTT {
// always accept clock offset if that is larger than rtt
return false
}

if d.packetCount < 4 {
// need more samples to make a decision
return false
}

// This check was specifically removed (while it exists in systemd-timesync),
// as I don't understand why it's needed (@smira).
// It seems to give false positives when the RTT and Offset are close to each other,
// e.g. when NTP server is on the same LAN.
//
// if math.Abs(sample.Offset) > d.samples[indexMin].RTT {
// // do not accept anything worse than the maximum possible error of the best sample
// return true
// }

// check that diff to the last offset is not more than 3*(observed jitter)
return math.Abs(sample.Offset-d.samples[currentIndex].Offset) > 3*jitter
}

// Jitter returns the current jitter.
func (d *Detector) Jitter() float64 {
return d.samplesJitter
}
135 changes: 135 additions & 0 deletions internal/pkg/ntp/internal/spike/spike_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package spike_test

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/siderolabs/talos/internal/pkg/ntp/internal/spike"
)

func TestSpikeDetector(t *testing.T) {
for _, test := range []struct {
name string
samples []spike.Sample

expectedSpikes []bool
}{
{
name: "no spikes",

samples: []spike.Sample{
{Offset: 0.01, RTT: 0.01},
{Offset: 0.05, RTT: 0.01},
{Offset: 0.03, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
{Offset: -0.01, RTT: 0.01},
{Offset: -0.02, RTT: 0.03}, // not a spike, just a large RTT
},

expectedSpikes: []bool{
false,
false,
false,
false,
false,
false,
},
},
{
name: "offset spike",

samples: []spike.Sample{
{Offset: 0.01, RTT: 0.01},
{Offset: 0.05, RTT: 0.01},
{Offset: 0.03, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
{Offset: -0.01, RTT: 0.01},
{Offset: -0.5, RTT: 0.7}, // spike
},

expectedSpikes: []bool{
false,
false,
false,
false,
false,
false,
false,
true,
},
},
{
name: "adjusting to higher RTT",

samples: []spike.Sample{
{Offset: 0.01, RTT: 0.01},
{Offset: 0.05, RTT: 0.01},
{Offset: 0.03, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
{Offset: -0.01, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
{Offset: -0.01, RTT: 0.01},
{Offset: -0.5, RTT: 0.7}, // spike
{Offset: 0.5, RTT: 0.7}, // spike
{Offset: -0.5, RTT: 0.7}, // spike
{Offset: 0.5, RTT: 0.7}, // not a spike anymore, filter adjusted itself
{Offset: -0.5, RTT: 0.7},
{Offset: 0.01, RTT: 0.01},
},

expectedSpikes: []bool{
false,
false,
false,
false,
false,
false,
false,
true,
true,
true,
false,
false,
false,
},
},
{
name: "initial ignore",

samples: []spike.Sample{
{Offset: 5, RTT: 0.01}, // initial packet is ignored completely
{Offset: 0.05, RTT: 0.05},
{Offset: 0.5, RTT: 0.5}, // spike detection kicks in after 4 packets
{Offset: 0.01, RTT: 0.01},
{Offset: -0.01, RTT: 0.01},
{Offset: 0.01, RTT: 0.01},
},

expectedSpikes: []bool{
false,
false,
false,
false,
false,
false,
},
},
} {
t.Run(test.name, func(t *testing.T) {
var detector spike.Detector

for i, sample := range test.samples {
isSpike := detector.IsSpike(sample)

assert.Equal(t, test.expectedSpikes[i], isSpike, "unexpected spike: %v (position %d)", test.expectedSpikes[i], i)
}
})
}
}
74 changes: 7 additions & 67 deletions internal/pkg/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"math/bits"
"net"
"reflect"
Expand All @@ -23,6 +22,7 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/sys/unix"

"github.com/siderolabs/talos/internal/pkg/ntp/internal/spike"
"github.com/siderolabs/talos/internal/pkg/timex"
)

Expand All @@ -42,10 +42,7 @@ type Syncer struct {

firstSync bool

packetCount int64
samples []sample
samplesIdx int
samplesJitter float64
spikeDetector spike.Detector

MinPoll, MaxPoll, RetryPoll time.Duration

Expand All @@ -55,12 +52,6 @@ type Syncer struct {
AdjustTime AdjustTimeFunc
}

const sampleCount = 8

type sample struct {
offset, rtt float64 // in seconds
}

// NewSyncer creates new Syncer with default configuration.
func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer {
syncer := &Syncer{
Expand All @@ -74,7 +65,7 @@ func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer {

firstSync: true,

samples: make([]sample, sampleCount),
spikeDetector: spike.Detector{},

MinPoll: MinAllowablePoll,
MaxPoll: MaxAllowablePoll,
Expand Down Expand Up @@ -149,59 +140,8 @@ func absDuration(d time.Duration) time.Duration {
return d
}

func (syncer *Syncer) spikeDetector(resp *ntp.Response) bool {
syncer.packetCount++

if syncer.packetCount == 1 {
// ignore first packet
return false
}

var currentIndex int

currentIndex, syncer.samplesIdx = syncer.samplesIdx, (syncer.samplesIdx+1)%sampleCount

syncer.samples[syncer.samplesIdx].offset = resp.ClockOffset.Seconds()
syncer.samples[syncer.samplesIdx].rtt = resp.RTT.Seconds()

jitter := syncer.samplesJitter

indexMin := currentIndex

for i := range syncer.samples {
if syncer.samples[i].rtt == 0 {
continue
}

if syncer.samples[i].rtt < syncer.samples[indexMin].rtt {
indexMin = i
}
}

var j float64

for i := range syncer.samples {
j += math.Pow(syncer.samples[i].offset-syncer.samples[indexMin].offset, 2)
}

syncer.samplesJitter = math.Sqrt(j / (sampleCount - 1))

if absDuration(resp.ClockOffset) > resp.RTT {
// always accept clock offset if that is larger than rtt
return false
}

if syncer.packetCount < 4 {
// need more samples to make a decision
return false
}

if absDuration(resp.ClockOffset).Seconds() > syncer.samples[indexMin].rtt {
// do not accept anything worse than the maximum possible error of the best sample
return true
}

return math.Abs(resp.ClockOffset.Seconds()-syncer.samples[currentIndex].offset) > 3*jitter
func (syncer *Syncer) isSpike(resp *ntp.Response) bool {
return syncer.spikeDetector.IsSpike(spike.SampleFromNTPResponse(resp))
}

// Run runs the sync process.
Expand Down Expand Up @@ -231,7 +171,7 @@ func (syncer *Syncer) Run(ctx context.Context) {
spike := false

if resp != nil && resp.Validate() == nil {
spike = syncer.spikeDetector(resp)
spike = syncer.isSpike(resp)
}

switch {
Expand Down Expand Up @@ -264,7 +204,7 @@ func (syncer *Syncer) Run(ctx context.Context) {
}

syncer.logger.Debug("sample stats",
zap.Duration("jitter", time.Duration(syncer.samplesJitter*float64(time.Second))),
zap.Duration("jitter", time.Duration(syncer.spikeDetector.Jitter()*float64(time.Second))),
zap.Duration("poll_interval", pollInterval),
zap.Bool("spike", spike),
)
Expand Down

0 comments on commit 2d3ac92

Please sign in to comment.