From 69df67d4498cd2ca424569f25a03b450f13affef Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 6 Feb 2020 14:45:56 -0800 Subject: [PATCH] Refactor metric records logic (#468) * Refactor metric records logic. Signed-off-by: Bogdan Cristian Drutu * Fix lint errors Signed-off-by: Bogdan Cristian Drutu * Fix a bug that we try to readd the old entry instead of a new one. Signed-off-by: Bogdan Cristian Drutu * Update comments in refcount_mapped. Signed-off-by: Bogdan Cristian Drutu * Remove the need to use a records list, iterate over the map. Signed-off-by: Bogdan Cristian Drutu * Fix comments and typos Signed-off-by: Bogdan Cristian Drutu * Fix more comments Signed-off-by: Bogdan Cristian Drutu * Clarify tryUnmap comment Signed-off-by: Bogdan Cristian Drutu * Fix one more typo. Signed-off-by: Bogdan Cristian Drutu --- sdk/metric/alignment_test.go | 16 +-- sdk/metric/list.go | 53 --------- sdk/metric/refcount_mapped.go | 65 +++++++++++ sdk/metric/sdk.go | 203 ++++++++++------------------------ 4 files changed, 130 insertions(+), 207 deletions(-) create mode 100644 sdk/metric/refcount_mapped.go diff --git a/sdk/metric/alignment_test.go b/sdk/metric/alignment_test.go index 9e00ae9e827..7330f057a00 100644 --- a/sdk/metric/alignment_test.go +++ b/sdk/metric/alignment_test.go @@ -12,20 +12,12 @@ import ( func TestMain(m *testing.M) { fields := []ottest.FieldOffset{ { - Name: "record.refcount", - Offset: unsafe.Offsetof(record{}.refcount), + Name: "record.refMapped.value", + Offset: unsafe.Offsetof(record{}.refMapped.value), }, { - Name: "record.collectedEpoch", - Offset: unsafe.Offsetof(record{}.collectedEpoch), - }, - { - Name: "record.modifiedEpoch", - Offset: unsafe.Offsetof(record{}.modifiedEpoch), - }, - { - Name: "record.reclaim", - Offset: unsafe.Offsetof(record{}.reclaim), + Name: "record.modified", + Offset: unsafe.Offsetof(record{}.modified), }, } if !ottest.Aligned8Byte(fields, os.Stderr) { diff --git a/sdk/metric/list.go b/sdk/metric/list.go index ddff72c96c3..078781c9924 100644 --- a/sdk/metric/list.go +++ b/sdk/metric/list.go @@ -14,11 +14,6 @@ package metric -import ( - "sync/atomic" - "unsafe" -) - func (l *sortedLabels) Len() int { return len(*l) } @@ -30,51 +25,3 @@ func (l *sortedLabels) Swap(i, j int) { func (l *sortedLabels) Less(i, j int) bool { return (*l)[i].Key < (*l)[j].Key } - -func (m *SDK) addPrimary(rec *record) { - for { - rec.next.primary.store(m.records.primary.load()) - if atomic.CompareAndSwapPointer( - &m.records.primary.ptr, - rec.next.primary.ptr, - unsafe.Pointer(rec), - ) { - return - } - } -} - -func (m *SDK) addReclaim(rec *record) { - for { - rec.next.reclaim.store(m.records.reclaim.load()) - if atomic.CompareAndSwapPointer( - &m.records.reclaim.ptr, - rec.next.reclaim.ptr, - unsafe.Pointer(rec), - ) { - return - } - } -} - -func (s *singlePtr) swapNil() *record { - for { - newValue := unsafe.Pointer(nil) - swapped := atomic.LoadPointer(&s.ptr) - if atomic.CompareAndSwapPointer(&s.ptr, swapped, newValue) { - return (*record)(swapped) - } - } -} - -func (s *singlePtr) load() *record { - return (*record)(atomic.LoadPointer(&s.ptr)) -} - -func (s *singlePtr) store(r *record) { - atomic.StorePointer(&s.ptr, unsafe.Pointer(r)) -} - -func (s *singlePtr) clear() { - atomic.StorePointer(&s.ptr, unsafe.Pointer(nil)) -} diff --git a/sdk/metric/refcount_mapped.go b/sdk/metric/refcount_mapped.go new file mode 100644 index 00000000000..b180dd1a77b --- /dev/null +++ b/sdk/metric/refcount_mapped.go @@ -0,0 +1,65 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "sync/atomic" +) + +// refcountMapped atomically counts the number of references (usages) of an entry +// while also keeping a state of mapped/unmapped into a different data structure +// (an external map or list for example). +// +// refcountMapped uses an atomic value where the least significant bit is used to +// keep the state of mapping ('1' is used for unmapped and '0' is for mapped) and +// the rest of the bits are used for refcounting. +type refcountMapped struct { + // refcount has to be aligned for 64-bit atomic operations. + value int64 +} + +// ref returns true if the entry is still mapped and increases the +// reference usages, if unmapped returns false. +func (rm *refcountMapped) ref() bool { + // Check if this entry was marked as unmapped between the moment + // we got a reference to it (or will be removed very soon) and here. + return atomic.AddInt64(&rm.value, 2)&1 == 0 +} + +func (rm *refcountMapped) unref() { + atomic.AddInt64(&rm.value, -2) +} + +// inUse returns true if there is a reference to the entry and it is mapped. +func (rm *refcountMapped) inUse() bool { + val := atomic.LoadInt64(&rm.value) + return val >= 2 && val&1 == 0 +} + +// tryUnmap flips the mapped bit to "unmapped" state and returns true if both of the +// following conditions are true upon entry to this function: +// * There are no active references; +// * The mapped bit is in "mapped" state. +// Otherwise no changes are done to mapped bit and false is returned. +func (rm *refcountMapped) tryUnmap() bool { + if atomic.LoadInt64(&rm.value) != 0 { + return false + } + return atomic.CompareAndSwapInt64( + &rm.value, + 0, + 1, + ) +} diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 29355b499a5..69ad1695552 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -21,7 +21,6 @@ import ( "sort" "sync" "sync/atomic" - "unsafe" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/metric" @@ -47,10 +46,6 @@ type ( // w/ zero arguments. empty labels - // records is the head of both the primary and the - // reclaim records lists. - records doublePtr - // currentEpoch is the current epoch number. It is // incremented in `Collect()`. currentEpoch int64 @@ -97,32 +92,15 @@ type ( // `record` in existence at a time, although at most one can // be referenced from the `SDK.current` map. record struct { - // refcount counts the number of active handles on - // referring to this record. active handles prevent - // removing the record from the current map. - // - // refcount has to be aligned for 64-bit atomic operations. - refcount int64 - - // collectedEpoch is the epoch number for which this - // record has been exported. This is modified by the - // `Collect()` method. - // - // collectedEpoch has to be aligned for 64-bit atomic operations. - collectedEpoch int64 + // refMapped keeps track of refcounts and the mapping state to the + // SDK.current map. + refMapped refcountMapped - // modifiedEpoch is the latest epoch number for which - // this record was updated. Generally, if - // modifiedEpoch is less than collectedEpoch, this - // record is due for reclaimation. + // modified is an atomic boolean that tracks if the current record + // was modified since the last Collect(). // - // modifiedEpoch has to be aligned for 64-bit atomic operations. - modifiedEpoch int64 - - // reclaim is an atomic to control the start of reclaiming. - // - // reclaim has to be aligned for 64-bit atomic operations. - reclaim int64 + // modified has to be aligned for 64-bit atomic operations. + modified int64 // labels is the LabelSet passed by the user. labels *labels @@ -134,25 +112,9 @@ type ( // depending on the type of aggregation. If nil, the // metric was disabled by the exporter. recorder export.Aggregator - - // next contains the next pointer for both the primary - // and the reclaim lists. - next doublePtr } ErrorHandler func(error) - - // singlePointer wraps an unsafe.Pointer and supports basic - // load(), store(), clear(), and swapNil() operations. - singlePtr struct { - ptr unsafe.Pointer - } - - // doublePtr is used for the head and next links of two lists. - doublePtr struct { - primary singlePtr - reclaim singlePtr - } ) var ( @@ -160,12 +122,6 @@ var ( _ api.LabelSet = &labels{} _ api.InstrumentImpl = &instrument{} _ api.BoundInstrumentImpl = &record{} - - // hazardRecord is used as a pointer value that indicates the - // value is not included in any list. (`nil` would be - // ambiguous, since the final element in a list has `nil` as - // the next pointer). - hazardRecord = &record{} ) func (i *instrument) Meter() api.Meter { @@ -186,31 +142,48 @@ func (i *instrument) acquireHandle(ls *labels) *record { if actual, ok := i.meter.current.Load(mk); ok { // Existing record case, only one allocation so far. rec := actual.(*record) - atomic.AddInt64(&rec.refcount, 1) - return rec + if rec.refMapped.ref() { + // At this moment it is guaranteed that the entry is in + // the map and will not be removed. + return rec + } + // This entry is no longer mapped, try to add a new entry. } // There's a memory allocation here. rec := &record{ - labels: ls, - descriptor: i.descriptor, - refcount: 1, - collectedEpoch: -1, - modifiedEpoch: 0, - recorder: i.meter.batcher.AggregatorFor(i.descriptor), + labels: ls, + descriptor: i.descriptor, + refMapped: refcountMapped{value: 2}, + modified: 0, + recorder: i.meter.batcher.AggregatorFor(i.descriptor), } - // Load/Store: there's a memory allocation to place `mk` into - // an interface here. - if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded { - // Existing record case. - rec = actual.(*record) - atomic.AddInt64(&rec.refcount, 1) + for { + // Load/Store: there's a memory allocation to place `mk` into + // an interface here. + if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded { + // Existing record case. Cannot change rec here because if fail + // will try to add rec again to avoid new allocations. + oldRec := actual.(*record) + if oldRec.refMapped.ref() { + // At this moment it is guaranteed that the entry is in + // the map and will not be removed. + return oldRec + } + // This loaded entry is marked as unmapped (so Collect will remove + // it from the map immediately), try again - this is a busy waiting + // strategy to wait until Collect() removes this entry from the map. + // + // This can be improved by having a list of "Unmapped" entries for + // one time only usages, OR we can make this a blocking path and use + // a Mutex that protects the delete operation (delete only if the old + // record is associated with the key). + continue + } + // The new entry was added to the map, good to go. return rec } - - i.meter.addPrimary(rec) - return rec } func (i *instrument) Bind(ls api.LabelSet) api.BoundInstrumentImpl { @@ -355,22 +328,6 @@ func (m *SDK) NewFloat64Measure(name string, mos ...api.MeasureOptionApplier) ap return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...)) } -// saveFromReclaim puts a record onto the "reclaim" list when it -// detects an attempt to delete the record while it is still in use. -func (m *SDK) saveFromReclaim(rec *record) { - for { - reclaimed := atomic.LoadInt64(&rec.reclaim) - if reclaimed != 0 { - return - } - if atomic.CompareAndSwapInt64(&rec.reclaim, 0, 1) { - break - } - } - - m.addReclaim(rec) -} - // Collect traverses the list of active records and exports data for // each active instrument. Collect() may not be called concurrently. // @@ -384,45 +341,25 @@ func (m *SDK) Collect(ctx context.Context) int { checkpointed := 0 - var next *record - for inuse := m.records.primary.swapNil(); inuse != nil; inuse = next { - next = inuse.next.primary.load() - - refcount := atomic.LoadInt64(&inuse.refcount) - - if refcount > 0 { - checkpointed += m.checkpoint(ctx, inuse) - m.addPrimary(inuse) - continue + m.current.Range(func(key interface{}, value interface{}) bool { + inuse := value.(*record) + unmapped := inuse.refMapped.tryUnmap() + // If able to unmap then remove the record from the current Map. + if unmapped { + m.current.Delete(inuse.mapkey()) } - modified := atomic.LoadInt64(&inuse.modifiedEpoch) - collected := atomic.LoadInt64(&inuse.collectedEpoch) - checkpointed += m.checkpoint(ctx, inuse) - - if modified >= collected { - atomic.StoreInt64(&inuse.collectedEpoch, m.currentEpoch) - m.addPrimary(inuse) - continue + // Always report the values if a reference to the Record is active, + // this is to keep the previous behavior. + // TODO: Reconsider this logic. + if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 { + atomic.StoreInt64(&inuse.modified, 0) + checkpointed += m.checkpoint(ctx, inuse) } - // Remove this entry. - m.current.Delete(inuse.mapkey()) - inuse.next.primary.store(hazardRecord) - } - - for chances := m.records.reclaim.swapNil(); chances != nil; chances = next { - atomic.StoreInt64(&chances.collectedEpoch, m.currentEpoch) - - next = chances.next.reclaim.load() - chances.next.reclaim.clear() - atomic.StoreInt64(&chances.reclaim, 0) - - if chances.next.primary.load() == hazardRecord { - checkpointed += m.checkpoint(ctx, chances) - m.addPrimary(chances) - } - } + // Always continue to iterate over the entire map. + return true + }) m.currentEpoch++ return checkpointed @@ -474,29 +411,11 @@ func (r *record) RecordOne(ctx context.Context, number core.Number) { } func (r *record) Unbind() { - for { - collected := atomic.LoadInt64(&r.collectedEpoch) - modified := atomic.LoadInt64(&r.modifiedEpoch) - - updated := collected + 1 - - if modified == updated { - // No change - break - } - if !atomic.CompareAndSwapInt64(&r.modifiedEpoch, modified, updated) { - continue - } - - if modified < collected { - // This record could have been reclaimed. - r.labels.meter.saveFromReclaim(r) - } - - break - } - - _ = atomic.AddInt64(&r.refcount, -1) + // Record was modified, inform the Collect() that things need to be collected. + // TODO: Reconsider if we should marked as modified when an Update happens and + // collect only when updates happened even for Bounds. + atomic.StoreInt64(&r.modified, 1) + r.refMapped.unref() } func (r *record) mapkey() mapkey {