Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-instance synchronized start-up #474

Merged
merged 10 commits into from
Feb 8, 2023
Prev Previous commit
Next Next commit
Start Prometheus server after instance sync
  • Loading branch information
acogoluegnes committed Feb 7, 2023
commit 28855690b638c0f773dd1f88f02024119c4c57c6
7 changes: 6 additions & 1 deletion src/main/java/com/rabbitmq/perf/CompositeMetrics.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -62,6 +62,11 @@ public void configure(ConfigurationContext context) throws Exception {
}
}

@Override
public void start() {
metrics.forEach(Metrics::start);
}

@Override
public boolean isEnabled(CommandLineProxy cmd) {
for (Metrics metric : metrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -68,6 +71,11 @@ static String processConfigurationFile(InputStream configuration, String namespa
return builder.toString().replace("${namespace}", namespace);
}

@Override
public void addPostSyncListener(Runnable listener) {
this.delegate.addPostSyncListener(listener);
}

@Override
public void synchronize() throws Exception {
this.delegate.synchronize();
Expand All @@ -88,6 +96,7 @@ private static class JGroupsInstanceSynchronization implements InstanceSynchroni
private final int expectedInstances;
private final Duration timeout;
private final JChannel channel;
private Set<Runnable> listeners = Collections.synchronizedSet(new LinkedHashSet<>());

private JGroupsInstanceSynchronization(String id, int expectedInstances, String namespace,
Duration timeout) {
Expand All @@ -112,6 +121,11 @@ private JGroupsInstanceSynchronization(String id, int expectedInstances, String
}
}

@Override
public void addPostSyncListener(Runnable listener) {
listeners.add(listener);
}

@Override
public void synchronize() throws Exception {
long start = System.nanoTime();
Expand Down Expand Up @@ -188,6 +202,14 @@ public void setState(InputStream input) throws Exception {
} catch (Exception e) {
LOGGER.info("Error while closing JGroups channel: {}", e.getMessage());
}

for (Runnable listener : listeners) {
try {
listener.run();
} catch (Exception e) {
LOGGER.info("Error while running instance synchronization listener: {}", e.getMessage());
}
}
}

}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/rabbitmq/perf/InstanceSynchronization.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ interface InstanceSynchronization {

void synchronize() throws Exception;

default void addPostSyncListener(Runnable listener) {
listener.run();
}

}
5 changes: 4 additions & 1 deletion src/main/java/com/rabbitmq/perf/Metrics.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand Down Expand Up @@ -39,6 +39,9 @@ default boolean isEnabled(CommandLineProxy cmd) {
return false;
}

default void start() {
}

default void close() throws Exception { }

class ConfigurationContext {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/rabbitmq/perf/MulticastSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,15 @@ private void shutdown(List<Connection> configurationConnections, Connection[] co
if (Thread.interrupted()) {
return;
}
boolean cancelled = producerState.task.cancel(true);
LOGGER.debug("Producer has been correctly cancelled: {}", cancelled);
if (producerState.task != null) {
boolean cancelled = producerState.task.cancel(true);
LOGGER.debug("Producer has been correctly cancelled: {}", cancelled);
}
}

// we do our best to stop producers before closing their connections
for (AgentState producerState : producerStates) {
if (!producerState.task.isDone()) {
if (producerState.task != null && !producerState.task.isDone()) {
try {
if (Thread.interrupted()) {
return;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ public static void main(String [] args, PerfTestOptions perfTestOptions) {
InstanceSynchronization instanceSynchronization = new DefaultInstanceSynchronization(
testID, expectedInstances, instanceSyncNamespace, Duration.ofSeconds(instanceSyncTimeout)
);
instanceSynchronization.addPostSyncListener(() -> metrics.start());

MulticastSet set = new MulticastSet(performanceMetrics, factory, p, testID, uris, completionHandler,
shutdownService, expectedMetrics, instanceSynchronization);
Expand Down
53 changes: 38 additions & 15 deletions src/main/java/com/rabbitmq/perf/PrometheusMetrics.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2018-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -19,9 +19,11 @@
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

Expand All @@ -34,6 +36,8 @@
public class PrometheusMetrics implements Metrics {

private volatile HttpServer server;
private volatile Runnable serverStart = () -> { };
private volatile Runnable serverStop = () -> { };

private volatile PrometheusMeterRegistry registry;

Expand All @@ -53,25 +57,44 @@ public void configure(ConfigurationContext context) throws Exception {
registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
meterRegistry.add(registry);
int prometheusHttpEndpointPort = intArg(cmd, "mpp", 8080);
String prometheusHttpEndpoint = strArg(cmd, "mpe", "metrics");
prometheusHttpEndpoint = prometheusHttpEndpoint.startsWith("/") ? prometheusHttpEndpoint : "/" + prometheusHttpEndpoint;
server = HttpServer.create(new InetSocketAddress(prometheusHttpEndpointPort), 0);
server.createContext(prometheusHttpEndpoint, exchange -> {
exchange.getResponseHeaders().set("Content-Type", "text/plain");
byte[] content = registry.scrape().getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(200, content.length);
try (OutputStream out = exchange.getResponseBody()) {
out.write(content);
String endpoint = strArg(cmd, "mpe", "metrics");
String prometheusHttpEndpoint = endpoint.startsWith("/") ? endpoint : "/" + endpoint;
AtomicBoolean serverStarted = new AtomicBoolean(false);
this.serverStart = () -> {
try {
server = HttpServer.create(new InetSocketAddress(prometheusHttpEndpointPort), 0);
} catch (IOException e) {
throw new PerfTestException("Error while starting Prometheus HTTP server", e);
}
});
server.start();
server.createContext(prometheusHttpEndpoint, exchange -> {
exchange.getResponseHeaders().set("Content-Type", "text/plain");
byte[] content = registry.scrape().getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(200, content.length);
try (OutputStream out = exchange.getResponseBody()) {
out.write(content);
}
});
server.start();
serverStarted.set(true);
};
this.serverStop = () -> {
if (serverStarted.compareAndSet(true, false)) {
server.stop(0);
}
};
} else {
this.serverStart = () -> { };
this.serverStop = () -> { };
}
}

@Override
public void start() {
this.serverStart.run();
}

public void close() throws Exception {
if (server != null) {
server.stop(0);
}
this.serverStop.run();
if (registry != null) {
registry.close();
}
Expand Down