Skip to content

Commit

Permalink
Merge pull request #169 from DataDog/landerson/named-pipes
Browse files Browse the repository at this point in the history
Add named pipe support for windows
  • Loading branch information
randomanderson authored Dec 16, 2021
2 parents 56420ad + 7ba4731 commit 455f4a5
Show file tree
Hide file tree
Showing 21 changed files with 535 additions and 254 deletions.
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
version: 2.1

orbs:
win: circleci/windows@2.4.1

commands:
create_custom_cache_lock:
description: "Create custom cache lock for java version."
Expand Down Expand Up @@ -45,6 +48,14 @@ jobs:
docker:
- image: circleci/openjdk:14.0.2-buster
<<: *default_steps
windows-openjdk12:
executor: win/default
steps:
- checkout
- run: |
choco install maven
- run: |
mvn clean install
workflows:
version: 2
Expand All @@ -55,3 +66,4 @@ workflows:
- openjdk9
- openjdk11
- openjdk13
- windows-openjdk12
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@
<scope>test</scope>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<scope>test</scope>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-unixsocket</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/timgroup/statsd/ClientChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.timgroup.statsd;

import java.nio.channels.WritableByteChannel;

public interface ClientChannel extends WritableByteChannel {
String getTransportType();
}
55 changes: 55 additions & 0 deletions src/main/java/com/timgroup/statsd/DatagramClientChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

public class DatagramClientChannel implements ClientChannel {
protected final DatagramChannel delegate;
private final SocketAddress address;

/**
* Creates a new DatagramClientChannel using the default DatagramChannel.
* @param address Address to connect the channel to
* @throws IOException if an I/O error occurs
*/
public DatagramClientChannel(SocketAddress address) throws IOException {
this(DatagramChannel.open(), address);
}

/**
* Creates a new DatagramClientChannel that wraps the delegate.
* @param delegate Implementation this instance wraps
* @param address Address to connect the channel to
*/
public DatagramClientChannel(DatagramChannel delegate, SocketAddress address) {
this.delegate = delegate;
this.address = address;
}

@Override
public boolean isOpen() {
return delegate.isOpen();
}

@Override
public int write(ByteBuffer src) throws IOException {
return delegate.send(src, address);
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public String getTransportType() {
return "udp";
}

@Override
public String toString() {
return "[" + getTransportType() + "] " + address;
}
}
51 changes: 51 additions & 0 deletions src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.timgroup.statsd;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NamedPipeClientChannel implements ClientChannel {
private final RandomAccessFile randomAccessFile;
private final FileChannel fileChannel;
private final String pipe;

/**
* Creates a new NamedPipeClientChannel with the given address.
*
* @param address Location of named pipe
* @throws FileNotFoundException if pipe does not exist
*/
public NamedPipeClientChannel(NamedPipeSocketAddress address) throws FileNotFoundException {
pipe = address.getPipe();
randomAccessFile = new RandomAccessFile(pipe, "rw");
fileChannel = randomAccessFile.getChannel();
}

@Override
public boolean isOpen() {
return fileChannel.isOpen();
}

@Override
public int write(ByteBuffer src) throws IOException {
return fileChannel.write(src);
}

@Override
public void close() throws IOException {
// closing the file also closes the channel
randomAccessFile.close();
}

@Override
public String getTransportType() {
return "namedpipe";
}

@Override
public String toString() {
return pipe;
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.timgroup.statsd;

import java.net.SocketAddress;

public class NamedPipeSocketAddress extends SocketAddress {
private static final String NAMED_PIPE_PREFIX = "\\\\.\\pipe\\";
private final String pipe;

public NamedPipeSocketAddress(String pipeName) {
this.pipe = normalizePipeName(pipeName);
}

public String getPipe() {
return pipe;
}

/**
* A normalized version of the pipe name that includes the `\\.\pipe\` prefix
*/
static String normalizePipeName(String pipeName) {
if (pipeName.startsWith(NAMED_PIPE_PREFIX)) {
return pipeName;
} else {
return NAMED_PIPE_PREFIX + pipeName;
}
}
}
88 changes: 37 additions & 51 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.timgroup.statsd;

import jnr.unixsocket.UnixDatagramChannel;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketOptions;

import java.io.IOException;
import java.lang.Double;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
Expand All @@ -23,7 +21,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


/**
* A simple StatsD client implementation facilitating metrics recording.
*
Expand Down Expand Up @@ -56,6 +53,7 @@ public class NonBlockingStatsDClient implements StatsDClient {

public static final String DD_DOGSTATSD_PORT_ENV_VAR = "DD_DOGSTATSD_PORT";
public static final String DD_AGENT_HOST_ENV_VAR = "DD_AGENT_HOST";
public static final String DD_NAMED_PIPE_ENV_VAR = "DD_DOGSTATSD_PIPE_NAME";
public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID";
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;

Expand Down Expand Up @@ -100,7 +98,7 @@ String tag() {
/**
* UTF-8 is the expected encoding for data sent to the agent.
*/
public static final Charset UTF_8 = Charset.forName("UTF-8");
public static final Charset UTF_8 = StandardCharsets.UTF_8;

private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(final Exception ex) { /* No-op */ }
Expand Down Expand Up @@ -154,7 +152,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
}

private final String prefix;
private final DatagramChannel clientChannel;
private final ClientChannel clientChannel;
private final ClientChannel telemetryClientChannel;
private final StatsDClientErrorHandler handler;
private final String constantTagsRendered;

Expand Down Expand Up @@ -261,58 +260,27 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
costantPreTags = null;
}

String transportType = "";
try {
final SocketAddress address = addressLookup.call();
if (address instanceof UnixSocketAddress) {
clientChannel = UnixDatagramChannel.open();
// Set send timeout, to handle the case where the transmission buffer is full
// If no timeout is set, the send becomes blocking
if (timeout > 0) {
clientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout);
}
if (bufferSize > 0) {
clientChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize);
}
transportType = "uds";
} else {
clientChannel = DatagramChannel.open();
transportType = "udp";
}
clientChannel = createByteChannel(addressLookup, timeout, bufferSize);

ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();

statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory);
telemetryStatsDProcessor = statsDProcessor;

Properties properties = new Properties();
properties.load(getClass().getClassLoader().getResourceAsStream(
"dogstatsd/version.properties"));

String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + transportType,
String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + clientChannel.getTransportType(),
CLIENT_VERSION_TAG + properties.getProperty("dogstatsd_client_version"),
CLIENT_TAG}, new StringBuilder()).toString();

DatagramChannel telemetryClientChannel = clientChannel;
if (addressLookup != telemetryAddressLookup) {

final SocketAddress telemetryAddress = telemetryAddressLookup.call();
if (telemetryAddress instanceof UnixSocketAddress) {
telemetryClientChannel = UnixDatagramChannel.open();
// Set send timeout, to handle the case where the transmission buffer is full
// If no timeout is set, the send becomes blocking
if (timeout > 0) {
telemetryClientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout);
}
if (bufferSize > 0) {
telemetryClientChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize);
}
} else if (transportType == "uds") {
// UDP clientChannel can submit to multiple addresses, we only need
// a new channel if transport type is UDS for main traffic.
telemetryClientChannel = DatagramChannel.open();
}
if (addressLookup == telemetryAddressLookup) {
telemetryClientChannel = clientChannel;
telemetryStatsDProcessor = statsDProcessor;
} else {
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, bufferSize);

// similar settings, but a single worker and non-blocking.
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
Expand All @@ -324,16 +292,15 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
.processor(telemetryStatsDProcessor)
.build();

statsDSender = createSender(addressLookup, handler, clientChannel, statsDProcessor.getBufferPool(),
statsDSender = createSender(handler, clientChannel, statsDProcessor.getBufferPool(),
statsDProcessor.getOutboundQueue(), senderWorkers, threadFactory);

telemetryStatsDSender = statsDSender;
if (telemetryStatsDProcessor != statsDProcessor) {
// TODO: figure out why the hell telemetryClientChannel does not work here!
telemetryStatsDSender = createSender(telemetryAddressLookup, handler, telemetryClientChannel,
telemetryStatsDSender = createSender(handler, telemetryClientChannel,
telemetryStatsDProcessor.getBufferPool(), telemetryStatsDProcessor.getOutboundQueue(),
1, threadFactory);

}

// set telemetry
Expand Down Expand Up @@ -389,10 +356,10 @@ protected StatsDProcessor createProcessor(final int queueSize, final StatsDClien
}
}

protected StatsDSender createSender(final Callable<SocketAddress> addressLookup, final StatsDClientErrorHandler handler,
final DatagramChannel clientChannel, BufferPool pool, BlockingQueue<ByteBuffer> buffers, final int senderWorkers,
protected StatsDSender createSender(final StatsDClientErrorHandler handler,
final WritableByteChannel clientChannel, BufferPool pool, BlockingQueue<ByteBuffer> buffers, final int senderWorkers,
final ThreadFactory threadFactory) throws Exception {
return new StatsDSender(addressLookup, clientChannel, handler, pool, buffers, senderWorkers, threadFactory);
return new StatsDSender(clientChannel, handler, pool, buffers, senderWorkers, threadFactory);
}

/**
Expand Down Expand Up @@ -427,6 +394,14 @@ public void stop() {
handler.handle(e);
}
}

if (telemetryClientChannel != null && telemetryClientChannel != clientChannel) {
try {
telemetryClientChannel.close();
} catch (final IOException e) {
handler.handle(e);
}
}
}
}

Expand Down Expand Up @@ -469,6 +444,17 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) {
return tagString(tags, constantTagsRendered, builder);
}

ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int bufferSize) throws Exception {
final SocketAddress address = addressLookup.call();
if (address instanceof NamedPipeSocketAddress) {
return new NamedPipeClientChannel((NamedPipeSocketAddress) address);
} else if (address instanceof UnixSocketAddress) {
return new UnixDatagramClientChannel(address, timeout, bufferSize);
} else {
return new DatagramClientChannel(address);
}
}

abstract class StatsDMessage<T extends Number> extends NumericMessage<T> {
final double sampleRate; // NaN for none

Expand Down
Loading

0 comments on commit 455f4a5

Please sign in to comment.