Skip to content

Commit

Permalink
Merge branch 'main' of github.com:lightstep/otel-launcher-go into jma…
Browse files Browse the repository at this point in the history
…cd/histminmax
  • Loading branch information
jmacd committed Jul 8, 2022
2 parents c54cb7a + e1ddd9c commit e3a09ed
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 22 deletions.
4 changes: 2 additions & 2 deletions lightstep/sdk/metric/internal/syncstate/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) (
}

// Note: the accumulator set below is created speculatively;
// if it is never returned, it will not be updated and can be
// safely discarded.
// if it is never returned, it will be released below.
newRec := &record{
refMapped: newRefcountMapped(),
accumulator: inst.compiled.NewAccumulator(aset),
Expand All @@ -204,6 +203,7 @@ func acquireRecord[N number.Any](inst *Instrument, attrs []attribute.KeyValue) (
if found, loaded := inst.current.LoadOrStore(aset, newRec); loaded {
oldRec := found.(*record)
if oldRec.refMapped.ref() {
newRec.accumulator.SnapshotAndProcess(true)
return oldRec, oldRec.accumulator.(viewstate.Updater[N])
}
// When this happens, we are waiting for the call to Delete()
Expand Down
34 changes: 19 additions & 15 deletions lightstep/sdk/metric/internal/viewstate/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence
ioutput := p.appendInstrument(output)

for set, entry := range p.data {
// capture the number of references before the Move() call
// below. we're holding the lock that prevents new refs, so
// the value before Move() indicates when it's safe to remove
// this entry from the map.
numRefs := atomic.LoadInt64(&entry.auxiliary)

p.appendPoint(ioutput, set, &entry.storage, aggregation.DeltaTemporality, seq.Last, seq.Now, true)

// By passing reset=true above, the aggregator data in
Expand All @@ -65,25 +71,23 @@ func (p *statelessSyncInstrument[N, Storage, Methods]) Collect(seq data.Sequence
point := &ptsArr[len(ptsArr)-1]

cpy, _ := methods.ToStorage(point.Aggregation)
if !methods.HasChange(cpy) {
// We allowed the array to grow before this
// test speculatively, since when it succeeds
// we are able to re-use the underlying
// aggregator.
ioutput.Points = ptsArr[0 : len(ptsArr)-1 : cap(ptsArr)]

if methods.HasChange(cpy) {
continue
}

// We allowed the array to grow before this
// test speculatively, since when it succeeds
// we are able to re-use the underlying
// aggregator.
ioutput.Points = ptsArr[0 : len(ptsArr)-1 : cap(ptsArr)]

// If there are no more accumulator references to the
// entry, remove from the map. This happens when the
// syncstate the entry goes unused for the interval
// between collection, so if it happens here probably
// there was no change. This branch is outside the
// HasChange() block above in case of a race -- the
// entry can have a final change if it was updated
// after the (mods == coll) test in conditionalSnapshotAndProcess()
// but before the entry is unmapped.
if atomic.LoadInt64(&entry.auxiliary) == 0 {
// entry, remove from the map.
if numRefs == 0 {
delete(p.data, set)
}

}
}

Expand Down
9 changes: 4 additions & 5 deletions lightstep/sdk/metric/internal/viewstate/viewstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,14 +1090,13 @@ func TestDeltaTemporalitySyncGauge(t *testing.T) {
require.Equal(t, 1, len(compI.data))
require.Equal(t, 1, len(compF.data))

// observation races w/ collection, release anyway
// observation races w/ collection, release w/ active ref
observe(true, 12)
expectValues(12, seq)
tick()
// The release=true signal is honored, even when a final
// update races with the unmapping.
require.Equal(t, 0, len(compI.data))
require.Equal(t, 0, len(compF.data))

require.Equal(t, 1, len(compI.data))
require.Equal(t, 1, len(compF.data))

// repeat use
makeAccums()
Expand Down
2 changes: 2 additions & 0 deletions lightstep/sdk/metric/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func TestPeriodicRepeats(t *testing.T) {
periodic.Register(producer)
periodic.Register(producer)

periodic.stop()

require.Equal(t, 1, len(*errs))
require.True(t, errors.Is((*errs)[0], ErrMultipleReaderRegistration))
})
Expand Down

0 comments on commit e3a09ed

Please sign in to comment.