From ce2f7195061901abcca772aa9f439dc5fa12ebf4 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 8 Jul 2022 11:57:44 -0700 Subject: [PATCH] Fix race condition in sync-delta code path --- .../sdk/metric/internal/syncstate/sync.go | 4 +-- .../metric/internal/viewstate/collectors.go | 34 +++++++++++-------- .../internal/viewstate/viewstate_test.go | 9 +++-- lightstep/sdk/metric/periodic_test.go | 2 ++ 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/lightstep/sdk/metric/internal/syncstate/sync.go b/lightstep/sdk/metric/internal/syncstate/sync.go index d022cb96..1137d6c8 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync.go +++ b/lightstep/sdk/metric/internal/syncstate/sync.go @@ -193,8 +193,7 @@ func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) ( } // Note: the accumulator set below is created speculatively; - // if it is never returned, it will not be updated and can be - // safely discarded. + // if it is never returned, it will be released below. newRec := &record{ refMapped: newRefcountMapped(), accumulator: inst.compiled.NewAccumulator(aset), @@ -204,6 +203,7 @@ func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) ( if found, loaded := inst.current.LoadOrStore(aset, newRec); loaded { oldRec := found.(*record) if oldRec.refMapped.ref() { + newRec.accumulator.SnapshotAndProcess(true) return oldRec, oldRec.accumulator.(viewstate.Updater[N]) } // When this happens, we are waiting for the call to Delete() diff --git a/lightstep/sdk/metric/internal/viewstate/collectors.go b/lightstep/sdk/metric/internal/viewstate/collectors.go index 7ac29d05..ff261224 100644 --- a/lightstep/sdk/metric/internal/viewstate/collectors.go +++ b/lightstep/sdk/metric/internal/viewstate/collectors.go @@ -56,6 +56,12 @@ func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence ioutput := p.appendInstrument(output) for set, entry := range p.data { + // capture the number of references before the Move() call + // below. we're holding the lock that prevents new refs, so + // the value before Move() indicates when it's safe to remove + // this entry from the map. + numRefs := atomic.LoadInt64(&entry.auxiliary) + p.appendPoint(ioutput, set, &entry.storage, aggregation.DeltaTemporality, seq.Last, seq.Now, true) // By passing reset=true above, the aggregator data in @@ -65,25 +71,23 @@ func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence point := &ptsArr[len(ptsArr)-1] cpy, _ := methods.ToStorage(point.Aggregation) - if !methods.HasChange(cpy) { - // We allowed the array to grow before this - // test speculatively, since when it succeeds - // we are able to re-use the underlying - // aggregator. - ioutput.Points = ptsArr[0 : len(ptsArr)-1 : cap(ptsArr)] + + if methods.HasChange(cpy) { + continue } + + // We allowed the array to grow before this + // test speculatively, since when it succeeds + // we are able to re-use the underlying + // aggregator. + ioutput.Points = ptsArr[0 : len(ptsArr)-1 : cap(ptsArr)] + // If there are no more accumulator references to the - // entry, remove from the map. This happens when the - // syncstate the entry goes unused for the interval - // between collection, so if it happens here probably - // there was no change. This branch is outside the - // HasChange() block above in case of a race -- the - // entry can have a final change if it was updated - // after the (mods == coll) test in conditionalSnapshotAndProcess() - // but before the entry is unmapped. - if atomic.LoadInt64(&entry.auxiliary) == 0 { + // entry, remove from the map. + if numRefs == 0 { delete(p.data, set) } + } } diff --git a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go index 3eff4fff..6eaa903b 100644 --- a/lightstep/sdk/metric/internal/viewstate/viewstate_test.go +++ b/lightstep/sdk/metric/internal/viewstate/viewstate_test.go @@ -1090,14 +1090,13 @@ func TestDeltaTemporalitySyncGauge(t *testing.T) { require.Equal(t, 1, len(compI.data)) require.Equal(t, 1, len(compF.data)) - // observation races w/ collection, release anyway + // observation races w/ collection, release w/ active ref observe(true, 12) expectValues(12, seq) tick() - // The release=true signal is honored, even when a final - // update races with the unmapping. - require.Equal(t, 0, len(compI.data)) - require.Equal(t, 0, len(compF.data)) + + require.Equal(t, 1, len(compI.data)) + require.Equal(t, 1, len(compF.data)) // repeat use makeAccums() diff --git a/lightstep/sdk/metric/periodic_test.go b/lightstep/sdk/metric/periodic_test.go index 93080c88..2fa44d0d 100644 --- a/lightstep/sdk/metric/periodic_test.go +++ b/lightstep/sdk/metric/periodic_test.go @@ -147,6 +147,8 @@ func TestPeriodicRepeats(t *testing.T) { periodic.Register(producer) periodic.Register(producer) + periodic.stop() + require.Equal(t, 1, len(*errs)) require.True(t, errors.Is((*errs)[0], ErrMultipleReaderRegistration)) })