Skip to content

Commit 305577d

Browse files
authored
Merge pull request #35 from renvins/fix/database
fix: Update database service to handle better multi-threading access
2 parents 57379ef + f6b0c4a commit 305577d

File tree

1 file changed

+70
-56
lines changed

1 file changed

+70
-56
lines changed

common/src/main/java/it/renvins/serverpulse/common/DatabaseService.java

Lines changed: 70 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package it.renvins.serverpulse.common;
22

33
import java.net.URI;
4+
import java.net.URLEncoder;
45
import java.net.http.HttpClient;
56
import java.net.http.HttpRequest;
67
import java.net.http.HttpResponse;
78
import java.nio.charset.StandardCharsets;
89
import java.time.Duration;
910
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicReference;
1013

1114
import it.renvins.serverpulse.api.service.IDatabaseService;
1215
import it.renvins.serverpulse.common.config.DatabaseConfiguration;
@@ -26,19 +29,22 @@ public class DatabaseService implements IDatabaseService {
2629

2730
private HttpClient httpClient; // Keep for ping
2831

29-
private volatile Task retryTask; // volatile for visibility across threads
30-
3132
private final int MAX_RETRIES = 5;
3233
private final long RETRY_DELAY_MS = 30000L; // 30 seconds
3334

3435
// Use volatile as this is read/written by different threads
3536
private volatile boolean isConnected = false;
36-
private volatile int retryCount = 0;
37+
38+
private final AtomicInteger retryCount = new AtomicInteger(0);
39+
private final AtomicReference<Task> retryTask = new AtomicReference<>(null);
3740

3841
//HTTP API endpoints
3942
private String pingUrl;
4043
private String writeUrl;
4144

45+
// Optimization: Cache the ping request object
46+
private HttpRequest pingRequest;
47+
4248
public DatabaseService(PulseLogger logger, Platform platform, GeneralConfiguration generalConfig, TaskScheduler scheduler) {
4349
this.logger = logger;
4450
this.platform = platform;
@@ -51,6 +57,10 @@ public DatabaseService(PulseLogger logger, Platform platform, GeneralConfigurati
5157
.build();
5258
}
5359

60+
private String encode(String value) {
61+
return URLEncoder.encode(value, StandardCharsets.UTF_8);
62+
}
63+
5464
@Override
5565
public void load() {
5666
if (!checkConnectionData()) {
@@ -67,8 +77,16 @@ public void load() {
6777
}
6878

6979
this.pingUrl = baseUrl + "ping";
70-
this.writeUrl = baseUrl + "api/v2/write?org=" + configuration.getOrg() +
71-
"&bucket=" + configuration.getBucket() + "&precision=ns";
80+
81+
this.pingRequest = HttpRequest.newBuilder()
82+
.uri(URI.create(pingUrl))
83+
.GET()
84+
.timeout(Duration.ofSeconds(5))
85+
.build();
86+
87+
88+
this.writeUrl = baseUrl + "api/v2/write?org=" + encode(configuration.getOrg()) +
89+
"&bucket=" + encode(configuration.getBucket()) + "&precision=ns";
7290

7391
scheduler.runAsync(this::connect);
7492
}
@@ -125,13 +143,7 @@ public CompletableFuture<Boolean> writeLineProtocol(String lineProtocol) {
125143
@Override
126144
public boolean ping() {
127145
try {
128-
HttpRequest request = HttpRequest.newBuilder()
129-
.uri(URI.create(pingUrl))
130-
.GET()
131-
.timeout(Duration.ofSeconds(5))
132-
.build();
133-
134-
HttpResponse<Void> response = httpClient.send(request, HttpResponse.BodyHandlers.discarding());
146+
HttpResponse<Void> response = httpClient.send(this.pingRequest, HttpResponse.BodyHandlers.discarding());
135147
return response.statusCode() == 204;
136148
} catch (Exception e) {
137149
logger.error("InfluxDB ping failed", e);
@@ -152,50 +164,54 @@ public boolean isConnected() {
152164

153165

154166
@Override
155-
public synchronized void startRetryTaskIfNeeded() {
156-
if (retryTask != null && !retryTask.isCancelled()) {
167+
public void startRetryTaskIfNeeded() {
168+
// FAST EXIT: Because a task is already running
169+
if (retryTask.get() != null) {
157170
return;
158171
}
159172
if (!platform.isEnabled()) {
160-
logger.warning("Plugin disabling, not starting retry task...");
161173
return;
162174
}
163175

164-
// Reset retry count ONLY when starting the task sequence
165-
this.retryCount = 0;
166-
logger.warning("Connection failed. Starting connection retry task (Max " + MAX_RETRIES + " attempts)...");
176+
// We lock on 'retryTask' itself to prevent other threads from
177+
// starting a new task simulataneously
178+
synchronized (retryTask) {
179+
if (retryTask.get() != null) return;
167180

168-
retryTask = scheduler.runTaskTimerAsync(() -> {
169-
// Check connection status *first* using the flag
170-
if (this.isConnected) {
171-
logger.info("Connection successful, stopping retry task...");
172-
stopRetryTask();
173-
return;
174-
}
181+
retryCount.set(0);
182+
logger.warning("Connection failed. Starting connection retry task (Max " + MAX_RETRIES + " attempts)...");
175183

176-
// Check if plugin got disabled externally
177-
if (!platform.isEnabled()) {
178-
logger.warning("Plugin disabled during retry task execution...");
179-
stopRetryTask();
180-
return;
181-
}
184+
Task task = scheduler.runTaskTimerAsync(this::retryLogic, RETRY_DELAY_MS, RETRY_DELAY_MS);
185+
retryTask.set(task);
186+
}
187+
}
182188

183-
// Check retries *before* attempting connection
184-
if (retryCount >= MAX_RETRIES) {
185-
logger.error("Max connection retries (" + MAX_RETRIES + ") reached. Disabling ServerPulse metrics...");
186-
stopRetryTask();
187-
disconnect(); // Clean up any partial connection
188-
// Schedule plugin disable on main thread
189-
scheduler.runSync(platform::disable);
190-
return;
191-
}
192-
retryCount++;
189+
private void retryLogic() {
190+
// This logic runs on an async thread, so no blocking
191+
if (isConnected) {
192+
logger.info("Connection successful, stoppin retry task...");
193+
stopRetryTask();
194+
return;
195+
}
193196

194-
logger.info("Retrying InfluxDB connection... Attempt " + retryCount + "/" + MAX_RETRIES);
195-
connect(); // Note: connect() will handle setting isConnected flag and potentially stopping the task if successful.
196-
}, RETRY_DELAY_MS, RETRY_DELAY_MS); // Start after delay, repeat at delay
197-
}
197+
if (!platform.isEnabled()) {
198+
logger.warning("Plugin disabled during retry task execution...");
199+
stopRetryTask();
200+
return;
201+
}
198202

203+
if (retryCount.get() >= MAX_RETRIES) {
204+
logger.error("Max connection retries (" + MAX_RETRIES + ") reached. Disabling ServerPulse metrics...");
205+
stopRetryTask();
206+
disconnect();
207+
scheduler.runSync(platform::disable);
208+
return;
209+
}
210+
211+
retryCount.incrementAndGet();
212+
logger.info("Retrying InfluxDB connection... Attempt " + retryCount.get() + "/" + MAX_RETRIES);
213+
connect();
214+
}
199215

200216
/**
201217
* Attempts to connect to InfluxDB. Updates the internal connection status
@@ -211,7 +227,7 @@ private void connect() {
211227

212228
if (isPingSuccessful) {
213229
this.isConnected = true;
214-
this.retryCount = 0; // Reset retry count on successful connection
230+
this.retryCount.set(0);; // Reset retry count on successful connection
215231

216232
stopRetryTask(); // Stop retrying if we just connected
217233
logger.info("Successfully connected to InfluxDB and ping successful...");
@@ -227,17 +243,15 @@ private void connect() {
227243
}
228244
}
229245

230-
/** Stops and nullifies the retry task if it's running. */
231-
private synchronized void stopRetryTask() {
232-
if (retryTask != null) {
233-
if (!retryTask.isCancelled()) {
234-
try {
235-
retryTask.cancel();
236-
} catch (Exception e) {
237-
// Ignore potential errors during cancellation
238-
}
246+
/* Stops and nullifies the retry task if it's running. */
247+
private void stopRetryTask() {
248+
Task task = retryTask.getAndSet(null);
249+
if (task != null) {
250+
try {
251+
task.cancel();
252+
} catch (Exception e) {
253+
// Ignore potential errors during cancellation
239254
}
240-
retryTask = null;
241255
}
242256
}
243257

0 commit comments

Comments
 (0)