forked from google/cadvisor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchainsample.go
171 lines (146 loc) · 4.96 KB
/
chainsample.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright 2014 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sampling
import (
"log"
"math/rand"
"sync"
"github.com/kr/pretty"
)
type empty struct{}
// Randomly generate number [start,end) except @except.
func randInt64Except(start, end int64, except map[int64]empty) int64 {
n := end - start
ret := rand.Int63n(n) + start
for _, ok := except[ret]; ok; _, ok = except[ret] {
ret = rand.Int63n(n) + start
}
return ret
}
// Basic idea:
// Every observation will have a sequence number as its id.
// Suppose we want to sample k observations within latest n observations
// At first, we generated k random numbers in [0,n). These random numbers
// will be used as ids of observations that will be sampled.
type chainSampler struct {
sampleSize int
windowSize int64
// Every observation will have a sequence number starting from 1.
// The sequence number must increase by one for each observation.
numObservations int64
// All samples stored as id -> value.
samples map[int64]interface{}
// The set of id of future observations.
futureSamples map[int64]empty
// The chain of samples: old observation id -> future observation id.
// When the old observation expires, the future observation will be
// stored as a sample.
sampleChain map[int64]int64
// Replacements are: observations whose previous sample is not expired
// id->value.
replacements map[int64]interface{}
lock sync.RWMutex
}
func (self *chainSampler) initFutureSamples() {
for i := 0; i < self.sampleSize; i++ {
n := randInt64Except(1, self.windowSize+1, self.futureSamples)
self.futureSamples[n] = empty{}
}
}
func (self *chainSampler) arrive(seqNum int64, obv interface{}) {
if _, ok := self.futureSamples[seqNum]; !ok {
// If this observation is not selected, ignore it.
return
}
delete(self.futureSamples, seqNum)
if len(self.samples) < self.sampleSize {
self.samples[seqNum] = obv
}
self.replacements[seqNum] = obv
// Select a future observation which will replace current observation
// when it expires.
futureSeqNum := randInt64Except(seqNum+1, seqNum+self.windowSize+1, self.futureSamples)
self.futureSamples[futureSeqNum] = empty{}
self.sampleChain[seqNum] = futureSeqNum
}
func (self *chainSampler) expireAndReplace() {
expSeqNum := self.numObservations - self.windowSize
if _, ok := self.samples[expSeqNum]; !ok {
// No sample expires
return
}
delete(self.samples, expSeqNum)
// There must be a replacement, otherwise panic.
replacementSeqNum := self.sampleChain[expSeqNum]
// The sequence number must increase by one for each observation.
replacement, ok := self.replacements[replacementSeqNum]
if !ok {
log.Printf("cannot find %v. which is the replacement of %v\n", replacementSeqNum, expSeqNum)
pretty.Printf("chain: %# v\n", self)
panic("Should never occur!")
}
// This observation must have arrived before.
self.samples[replacementSeqNum] = replacement
}
func (self *chainSampler) Update(obv interface{}) {
self.lock.Lock()
defer self.lock.Unlock()
self.numObservations++
self.arrive(self.numObservations, obv)
self.expireAndReplace()
}
func (self *chainSampler) Len() int {
self.lock.RLock()
defer self.lock.RUnlock()
return len(self.samples)
}
func (self *chainSampler) Reset() {
self.lock.Lock()
defer self.lock.Unlock()
self.numObservations = 0
self.samples = make(map[int64]interface{}, self.sampleSize)
self.futureSamples = make(map[int64]empty, self.sampleSize*2)
self.sampleChain = make(map[int64]int64, self.sampleSize*2)
self.replacements = make(map[int64]interface{}, self.sampleSize*2)
self.initFutureSamples()
}
func (self *chainSampler) Map(f func(d interface{})) {
self.lock.RLock()
defer self.lock.RUnlock()
for seqNum, obv := range self.samples {
if _, ok := obv.(int); !ok {
pretty.Printf("Seq %v. WAT: %# v\n", seqNum, obv)
}
f(obv)
}
}
// NOT SUPPORTED
func (self *chainSampler) Filter(filter func(d interface{}) bool) {
return
}
// Chain sampler described in
// Brian Babcok, Mayur Datar and Rajeev Motwani,
// Sampling From a Moving Window Over Streaming Data
func NewChainSampler(sampleSize, windowSize int) Sampler {
sampler := &chainSampler{
sampleSize: sampleSize,
windowSize: int64(windowSize),
samples: make(map[int64]interface{}, sampleSize),
futureSamples: make(map[int64]empty, sampleSize*2),
sampleChain: make(map[int64]int64, sampleSize*2),
replacements: make(map[int64]interface{}, sampleSize*2),
}
sampler.initFutureSamples()
return sampler
}