Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent for Apache Spark on EMR #805

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,57 @@ domain_beanPropertyValue1_key1_key2_...keyN_attrName{beanpropertyName2="beanProp
```
If a given part isn't set, it'll be excluded.

## Running the MultiPort Java Agent

The reason for creation of this agent is monitoring and providing multiple webserver running on 1 machine. Origin of it
stems from running Apache Spark jobs on EMR clusters, where it's typical to have more than 1 executor running on 1 machine.

Differences compared to **Java Agent**
* different configuration string format
* it is capable of running more than 1 webserver on 1 machine

### Apache Spark example configuration

```bash
# version of the agent
AGENT_VERSION="0.18.1"

# path to javagent_multiport jar on S3
JMX_JAVA_AGENT_JAR_S3_PATH="s3://bucket/jmx_prometheus_javaagent_multiport-${AGENT_VERSION}.jar"

# path to configuration file for the agent on S3
JMX_JAVA_AGENT_CONFIG_S3_PATH="s3://bucket/jmx_exporter.yaml"

spark-submit --master yarn --deploy-mode cluster \
--jars "${JMX_JAVA_AGENT_JAR_S3_PATH}" \
--files "${JMX_JAVA_AGENT_CONFIG_S3_PATH}" \
--conf "spark.driver.extraJavaOptions=-javaagent:./jmx_prometheus_javaagent_multiport-${AGENT_VERSION}.jar=portStart=222,portEnd=333,configFile=./jmx_exporter.yaml" \
--conf "spark.executor.extraJavaOptions=-javaagent:./jmx_prometheus_javaagent_multiport-${AGENT_VERSION}.jar=portStart=222,portEnd=333,configFile=./jmx_exporter.yaml" \
```

### Configuration

With 6 parameters in total it has become rather impractical to extend the config line parsing with regex.

The configuration string has following shape `hostname=some_host.dc,portStart=222,portEnd=333,timeout=500,backoffMin=2000,backoffMax=4000,configFile=/some/path.yaml`.

| Parameter | value | optional | note |
|------------|-------------------------------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------|
| hostname | typically not used | true | defaults to 0.0.0.0 |
| portStart | port number - start of the range | false | |
| portEnd | port number - start of the range | false | the range must be continuous |
| timeout | timeout on how long will the agent try to connect to an supposedly empty port | true | default is 500ms |
| backoffMin | backoff before the agent will try to lookup and start on a given port | true | default is 2000ms |
| backoffMax | backoff before the agent will try to lookup and start on a given port | true | default is 4000ms |
| configFile | path to the config file | false | if used with `spark-submit --files`, it will be located on the same level as the jar. For example (`./config.yaml`) |



### Caveats

* You need to know how many JVMs will run on each node and play it safe by giving enough headroom for your application
* Some ports will be wasted and never used

## Integration Testing

The JMX exporter uses the [AntuBLUE Test Engine](https://github.com/antublue/test-engine) and [Testcontainers](https://www.testcontainers.org/) to run integration tests with different Java versions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package io.prometheus.jmx;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;

import javax.management.MalformedObjectNameException;
import java.io.File;
import java.io.IOException;
import java.lang.instrument.Instrumentation;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Random;

public class JavaAgentMultiPort {
static HTTPServer server;
static Random RNG = new Random();
static HashMap<Integer, Integer> failedAttempts = new HashMap<Integer, Integer>();

public static void agentmain(String agentArgument, Instrumentation instrumentation) throws Exception {
premain(agentArgument, instrumentation);
}

public static void premain(String agentArgument, Instrumentation instrumentation) {
// Bind to all interfaces by default (this includes IPv6).
try {
Config config = parseConfig(agentArgument);
new BuildInfoCollector().register();
new JmxCollector(new File(config.configFile), JmxCollector.Mode.AGENT).register();
DefaultExports.initialize();
startMultipleServers(config, config.portEnd - config.portStart + 1, config.backoffMin, config.backoffMax);
} catch (Exception e) {
System.err.println("Example Usage: -javaagent:/path/to/JavaAgent.jar=hostname=some_host.dc,portStart=222,portEnd=333,[timeout=500],[backoffMin=2000],[backoffMax=4000],configFile=/some/path.yaml \n" + e.getMessage());
System.exit(1);
}
}

/**
* Checks if a connection can be made to host:port. If A connection is successful, it means there is already an agent
* running on that host:port. In this case we will look to the next port (port + 1) and repeat, until we run out of
* ports defined a range or fail to make a connection. Failure to make connection means there is no agent running yet,
* on that host:port
*
* @param host Host we are checking (typically 0.0.0.0)
* @param timeout Timeout on connection attempt
* @param port Port we are checking (automatically incremented)
* @param portLookupAttempts Limits how many look-ups we will make on this host
* @return A connection on which will an agent start
* @throws ConnectException
*/
public static InetSocketAddress findAvailableSocket(final String host, final int timeout, int port, int portLookupAttempts) throws ConnectException {
System.out.println("Looking up free port. Checking: " + port + ", remaining ports in range: " + portLookupAttempts);
if (portLookupAttempts == 0) {
System.err.println("Run out of ports to try. Agent won't start.");
return null;
}
try {
if (failedAttempts.containsKey(port) && failedAttempts.get(port) >= 3) {
/* On some occasions, the agent/webserver tried to start on one particular port and despite the port
* being free, it kept failing. This will make it step over the port.
*/
System.out.println("Skipping " + port + ", because off too many retries on it");
port += 1;
portLookupAttempts -= 1;
}
Socket socket = new Socket();
final InetAddress inetAddress = InetAddress.getByName(host);
final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
// if it will make the connection, it means something else is running on that port already
socket.connect(inetSocketAddress, timeout);
System.out.println("Port " + port + " is used. Trying next one.");
socket.close();
return findAvailableSocket(host, timeout, port + 1, portLookupAttempts - 1);
} catch (IOException e) {
// when it fails to make connection the socket, it is considered as free.
System.out.println("Found free port " + port);
return JavaAgentMultiPort.createInetSocket(host, port);
}
}

/**
* Starts number of agents/servers that provides metrics
*
* @param config config
* @param retries indicating how many times the agent will attempt to launch after a 'port collision' with another
* agent. Value is set to the same number as ports in the range.
* @throws IOException
* @throws InterruptedException
*/
public static void startMultipleServers(Config config, int retries, int minBackoff, int maxBackoff) throws IOException, InterruptedException {
int portLookupAttemnpts = config.portEnd - config.portStart + 1;
// Randomize start to prevent race conditions on startup
if (portLookupAttemnpts > 1) {
backoff(minBackoff, maxBackoff, "server start");
}
/*
* Retries indicate how many times the agent will try to start (including port search). Sometimes a port is
* deemed free, but more than 1 agent has picked that particular port and only 1 will get it. The ones who
* lose out, will be given the #<retries> to start again.
*
* portLookupAttempts is "just" for scanning the range of ports.
*/
InetSocketAddress socketUpdate = findAvailableSocket(config.host, config.timeout, config.portStart, portLookupAttemnpts);
config.setSocket(socketUpdate); //this does not update config.port
if (config.socket != null) {
try {
System.out.println("Trying to start JMX agent on " + config.socket.getPort());
server = new HTTPServer(config.socket, CollectorRegistry.defaultRegistry, true);
System.out.println("Started JMX on " + config.socket.getPort() + ". (retries left: " + retries + ")");
} catch (IOException e) {
if (retries <= 0) {
System.out.println("Ran out of retries.");
throw e;
}
System.out.println("Port has been taken before server started - retrying to start." + "(retries left: " + retries + ")");
/*
* This had to be introduced, because of competition for resources. There is 'n' number of executors/agents
* which will compete for first port in range. It can happen that port is determined to be free, but
* before server start, it will be taken by a different agent. Each agent acts independently.
*/
if (failedAttempts.containsKey(config.socket.getPort())) {
failedAttempts.put(config.socket.getPort(), failedAttempts.get(config.socket.getPort()) + 1);
} else {
failedAttempts.put(config.socket.getPort(), 1);
}
System.out.println("Start on port " + config.socket.getPort() + " failed " + failedAttempts.get(config.socket.getPort()) + "x");
startMultipleServers(config, retries - 1, minBackoff, maxBackoff);
}
} else {
throw new IOException("Cannot start server - all ports are used");
}
}

private static void backoff(int min, int max, String source) throws InterruptedException {
int backoff = RNG.nextInt(max - min + 1) + min;
System.out.println("Backing off at " + source + " for " + backoff + "ms");
Thread.sleep(backoff);
}

public static InetSocketAddress createInetSocket(String givenHost, int port) {

InetSocketAddress socket;
if (givenHost != null && !givenHost.isEmpty()) {
socket = new InetSocketAddress(givenHost, port);
} else {
socket = new InetSocketAddress("0.0.0.0", port);
}
return socket;
}

/**
* Parse the Java Agent configuration. The arguments are typically specified to the JVM as a javaagent as
* {@code -javaagent:/path/to/agent.jar=<CONFIG>}. This method parses the {@code <CONFIG>} portion.
*
* @param args provided agent args
* @return configuration to use for our application
*/
public static Config parseConfig(String args) {
String[] requiredParameters = "portStart,portEnd,configFile".split(",");

String[] argsSplits = args.split(",");
HashMap<String, String> configMap = new HashMap<String, String>();
for (String arg : argsSplits) {
String[] kv = arg.split("=");
configMap.put(kv[0], kv[1]);
}

for (String requiredParameter : requiredParameters) {
if (!configMap.containsKey(requiredParameter)) {
throw new IllegalArgumentException("Argument " + requiredParameter + " is missing");
}
}

// Defaults
if (!configMap.containsKey("timeout")) {
configMap.put("timeout", "500");
}

if (!configMap.containsKey("backoffMin")) {
configMap.put("backoffMin", "2000");
}

if (!configMap.containsKey("backoffMax")) {
configMap.put("backoffMax", "4000");
}

return new Config(configMap.get("hostname"),
Integer.parseInt(configMap.get("portStart")),
Integer.parseInt(configMap.get("portEnd")),
Integer.parseInt(configMap.get("timeout")),
Integer.parseInt(configMap.get("backoffMin")),
Integer.parseInt(configMap.get("backoffMax")),
configMap.get("configFile"),
createInetSocket(configMap.get("hostname"), Integer.parseInt(configMap.get("portStart"))));
}

static class Config {
String host;
int portStart;
int portEnd;
int timeout;
int backoffMin;
int backoffMax;
String configFile;
InetSocketAddress socket;

Config(String host, int portStart, int portEnd, int timeout, int backoffMin, int backoffMax, String configFile, InetSocketAddress socket) {
this.host = host;
this.portStart = portStart;
this.portEnd = portEnd;
this.timeout = timeout;
this.backoffMin = backoffMin;
this.backoffMax = backoffMax;
this.configFile = configFile;
this.socket = socket;
}

public void setSocket(InetSocketAddress socket) {
this.socket = socket;
}

@Override
public String toString() {
return MessageFormat.format("Address: {0}:{1}{2} configFile: {3}, timout: {4}, backoff (min: {5}, max: {6})", host, portStart, portEnd, configFile, timeout, backoffMin, backoffMax);
}
}
}
Loading