Skip to content

Commit 4840174

Browse files
committed
format
Signed-off-by: Gregor Zeitlinger <gregor.zeitlinger@grafana.com>
1 parent ba6a137 commit 4840174

File tree

1 file changed

+90
-89
lines changed
  • prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics

1 file changed

+90
-89
lines changed
Lines changed: 90 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.prometheus.metrics.core.metrics;
22

33
import io.prometheus.metrics.model.snapshots.DataPointSnapshot;
4-
54
import java.util.Arrays;
65
import java.util.concurrent.atomic.AtomicLong;
76
import java.util.concurrent.locks.Condition;
@@ -12,102 +11,104 @@
1211

1312
/**
1413
* Metrics support concurrent write and scrape operations.
15-
* <p>
16-
* This is implemented by switching to a Buffer when the scrape starts,
17-
* and applying the values from the buffer after the scrape ends.
14+
*
15+
* <p>This is implemented by switching to a Buffer when the scrape starts, and applying the values
16+
* from the buffer after the scrape ends.
1817
*/
1918
class Buffer {
2019

21-
private static final long bufferActiveBit = 1L << 63;
22-
private final AtomicLong observationCount = new AtomicLong(0);
23-
private double[] observationBuffer = new double[0];
24-
private int bufferPos = 0;
25-
private boolean reset = false;
26-
27-
ReentrantLock appendLock = new ReentrantLock();
28-
ReentrantLock runLock = new ReentrantLock();
29-
Condition bufferFilled = appendLock.newCondition();
30-
31-
boolean append(double value) {
32-
long count = observationCount.incrementAndGet();
33-
if ((count & bufferActiveBit) == 0) {
34-
return false; // sign bit not set -> buffer not active.
20+
private static final long bufferActiveBit = 1L << 63;
21+
private final AtomicLong observationCount = new AtomicLong(0);
22+
private double[] observationBuffer = new double[0];
23+
private int bufferPos = 0;
24+
private boolean reset = false;
25+
26+
ReentrantLock appendLock = new ReentrantLock();
27+
ReentrantLock runLock = new ReentrantLock();
28+
Condition bufferFilled = appendLock.newCondition();
29+
30+
boolean append(double value) {
31+
long count = observationCount.incrementAndGet();
32+
if ((count & bufferActiveBit) == 0) {
33+
return false; // sign bit not set -> buffer not active.
34+
} else {
35+
doAppend(value);
36+
return true;
37+
}
38+
}
39+
40+
private void doAppend(double amount) {
41+
try {
42+
appendLock.lock();
43+
if (bufferPos >= observationBuffer.length) {
44+
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
45+
}
46+
observationBuffer[bufferPos] = amount;
47+
bufferPos++;
48+
49+
bufferFilled.signalAll();
50+
} finally {
51+
appendLock.unlock();
52+
}
53+
}
54+
55+
/** Must be called by the runnable in the run() method. */
56+
void reset() {
57+
reset = true;
58+
}
59+
60+
<T extends DataPointSnapshot> T run(
61+
Function<Long, Boolean> complete,
62+
Supplier<T> createResult,
63+
Consumer<Double> observeFunction) {
64+
double[] buffer;
65+
int bufferSize;
66+
T result;
67+
try {
68+
runLock.lock();
69+
70+
// Signal that the buffer is active.
71+
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
72+
try {
73+
appendLock.lock();
74+
75+
while (!complete.apply(expectedCount)) {
76+
// Wait until all in-flight threads have added their observations to the buffer.
77+
bufferFilled.await();
78+
}
79+
result = createResult.get();
80+
81+
// Signal that the buffer is inactive.
82+
int expectedBufferSize;
83+
if (reset) {
84+
expectedBufferSize =
85+
(int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
86+
reset = false;
3587
} else {
36-
doAppend(value);
37-
return true;
88+
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
3889
}
39-
}
4090

41-
private void doAppend(double amount) {
42-
try {
43-
appendLock.lock();
44-
if (bufferPos >= observationBuffer.length) {
45-
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
46-
}
47-
observationBuffer[bufferPos] = amount;
48-
bufferPos++;
49-
50-
bufferFilled.signalAll();
51-
} finally {
52-
appendLock.unlock();
91+
while (bufferPos < expectedBufferSize) {
92+
// Wait until all in-flight threads have added their observations to the buffer.
93+
bufferFilled.await();
5394
}
95+
} finally {
96+
appendLock.unlock();
97+
}
98+
99+
buffer = observationBuffer;
100+
bufferSize = bufferPos;
101+
observationBuffer = new double[0];
102+
bufferPos = 0;
103+
} catch (InterruptedException e) {
104+
throw new RuntimeException(e);
105+
} finally {
106+
runLock.unlock();
54107
}
55108

56-
/**
57-
* Must be called by the runnable in the run() method.
58-
*/
59-
void reset() {
60-
reset = true;
61-
}
62-
63-
<T extends DataPointSnapshot> T run(Function<Long, Boolean> complete, Supplier<T> createResult, Consumer<Double> observeFunction) {
64-
double[] buffer;
65-
int bufferSize;
66-
T result;
67-
try {
68-
runLock.lock();
69-
70-
// Signal that the buffer is active.
71-
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
72-
try {
73-
appendLock.lock();
74-
75-
while (!complete.apply(expectedCount)) {
76-
// Wait until all in-flight threads have added their observations to the buffer.
77-
bufferFilled.await();
78-
}
79-
result = createResult.get();
80-
81-
// Signal that the buffer is inactive.
82-
int expectedBufferSize;
83-
if (reset) {
84-
expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
85-
reset = false;
86-
} else {
87-
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
88-
}
89-
90-
while (bufferPos < expectedBufferSize) {
91-
// Wait until all in-flight threads have added their observations to the buffer.
92-
bufferFilled.await();
93-
}
94-
} finally {
95-
appendLock.unlock();
96-
}
97-
98-
buffer = observationBuffer;
99-
bufferSize = bufferPos;
100-
observationBuffer = new double[0];
101-
bufferPos = 0;
102-
} catch (InterruptedException e) {
103-
throw new RuntimeException(e);
104-
} finally {
105-
runLock.unlock();
106-
}
107-
108-
for (int i = 0; i < bufferSize; i++) {
109-
observeFunction.accept(buffer[i]);
110-
}
111-
return result;
109+
for (int i = 0; i < bufferSize; i++) {
110+
observeFunction.accept(buffer[i]);
112111
}
112+
return result;
113+
}
113114
}

0 commit comments

Comments
 (0)