Skip to content

Commit ba6a137

Browse files
committed
fix buffer sync logic using modern concurrency primitives
Signed-off-by: Gregor Zeitlinger <gregor.zeitlinger@grafana.com>
1 parent 1316484 commit ba6a137

File tree

1 file changed

+94
-63
lines changed
  • prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics

1 file changed

+94
-63
lines changed
Lines changed: 94 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,113 @@
11
package io.prometheus.metrics.core.metrics;
22

33
import io.prometheus.metrics.model.snapshots.DataPointSnapshot;
4+
45
import java.util.Arrays;
56
import java.util.concurrent.atomic.AtomicLong;
7+
import java.util.concurrent.locks.Condition;
8+
import java.util.concurrent.locks.ReentrantLock;
69
import java.util.function.Consumer;
710
import java.util.function.Function;
811
import java.util.function.Supplier;
912

1013
/**
1114
* Metrics support concurrent write and scrape operations.
12-
*
13-
* <p>This is implemented by switching to a Buffer when the scrape starts, and applying the values
14-
* from the buffer after the scrape ends.
15+
* <p>
16+
* This is implemented by switching to a Buffer when the scrape starts,
17+
* and applying the values from the buffer after the scrape ends.
1518
*/
1619
class Buffer {
1720

18-
private static final long signBit = 1L << 63;
19-
private final AtomicLong observationCount = new AtomicLong(0);
20-
private double[] observationBuffer = new double[0];
21-
private int bufferPos = 0;
22-
private boolean reset = false;
23-
private final Object appendLock = new Object();
24-
private final Object runLock = new Object();
25-
26-
boolean append(double value) {
27-
long count = observationCount.incrementAndGet();
28-
if ((count & signBit) == 0) {
29-
return false; // sign bit not set -> buffer not active.
30-
} else {
31-
doAppend(value);
32-
return true;
21+
private static final long bufferActiveBit = 1L << 63;
22+
private final AtomicLong observationCount = new AtomicLong(0);
23+
private double[] observationBuffer = new double[0];
24+
private int bufferPos = 0;
25+
private boolean reset = false;
26+
27+
ReentrantLock appendLock = new ReentrantLock();
28+
ReentrantLock runLock = new ReentrantLock();
29+
Condition bufferFilled = appendLock.newCondition();
30+
31+
boolean append(double value) {
32+
long count = observationCount.incrementAndGet();
33+
if ((count & bufferActiveBit) == 0) {
34+
return false; // sign bit not set -> buffer not active.
35+
} else {
36+
doAppend(value);
37+
return true;
38+
}
3339
}
34-
}
35-
36-
private void doAppend(double amount) {
37-
synchronized (appendLock) {
38-
if (bufferPos >= observationBuffer.length) {
39-
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
40-
}
41-
observationBuffer[bufferPos] = amount;
42-
bufferPos++;
40+
41+
private void doAppend(double amount) {
42+
try {
43+
appendLock.lock();
44+
if (bufferPos >= observationBuffer.length) {
45+
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
46+
}
47+
observationBuffer[bufferPos] = amount;
48+
bufferPos++;
49+
50+
bufferFilled.signalAll();
51+
} finally {
52+
appendLock.unlock();
53+
}
4354
}
44-
}
45-
46-
/** Must be called by the runnable in the run() method. */
47-
void reset() {
48-
reset = true;
49-
}
50-
51-
<T extends DataPointSnapshot> T run(
52-
Function<Long, Boolean> complete, Supplier<T> runnable, Consumer<Double> observeFunction) {
53-
double[] buffer;
54-
int bufferSize;
55-
T result;
56-
synchronized (runLock) {
57-
Long count = observationCount.getAndAdd(signBit);
58-
while (!complete.apply(count)) {
59-
Thread.yield();
60-
}
61-
result = runnable.get();
62-
int expectedBufferSize;
63-
if (reset) {
64-
expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~signBit) - count);
65-
reset = false;
66-
} else {
67-
expectedBufferSize = (int) (observationCount.addAndGet(signBit) - count);
68-
}
69-
while (bufferPos != expectedBufferSize) {
70-
Thread.yield();
71-
}
72-
buffer = observationBuffer;
73-
bufferSize = bufferPos;
74-
observationBuffer = new double[0];
75-
bufferPos = 0;
55+
56+
/**
57+
* Must be called by the runnable in the run() method.
58+
*/
59+
void reset() {
60+
reset = true;
7661
}
77-
for (int i = 0; i < bufferSize; i++) {
78-
observeFunction.accept(buffer[i]);
62+
63+
<T extends DataPointSnapshot> T run(Function<Long, Boolean> complete, Supplier<T> createResult, Consumer<Double> observeFunction) {
64+
double[] buffer;
65+
int bufferSize;
66+
T result;
67+
try {
68+
runLock.lock();
69+
70+
// Signal that the buffer is active.
71+
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);
72+
try {
73+
appendLock.lock();
74+
75+
while (!complete.apply(expectedCount)) {
76+
// Wait until all in-flight threads have added their observations to the buffer.
77+
bufferFilled.await();
78+
}
79+
result = createResult.get();
80+
81+
// Signal that the buffer is inactive.
82+
int expectedBufferSize;
83+
if (reset) {
84+
expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
85+
reset = false;
86+
} else {
87+
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
88+
}
89+
90+
while (bufferPos < expectedBufferSize) {
91+
// Wait until all in-flight threads have added their observations to the buffer.
92+
bufferFilled.await();
93+
}
94+
} finally {
95+
appendLock.unlock();
96+
}
97+
98+
buffer = observationBuffer;
99+
bufferSize = bufferPos;
100+
observationBuffer = new double[0];
101+
bufferPos = 0;
102+
} catch (InterruptedException e) {
103+
throw new RuntimeException(e);
104+
} finally {
105+
runLock.unlock();
106+
}
107+
108+
for (int i = 0; i < bufferSize; i++) {
109+
observeFunction.accept(buffer[i]);
110+
}
111+
return result;
79112
}
80-
return result;
81-
}
82113
}

0 commit comments

Comments
 (0)