Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add named pipe support for windows #169

Merged
merged 20 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
version: 2.1

orbs:
win: circleci/windows@2.4.1

commands:
create_custom_cache_lock:
description: "Create custom cache lock for java version."
Expand Down Expand Up @@ -45,6 +48,14 @@ jobs:
docker:
- image: circleci/openjdk:14.0.2-buster
<<: *default_steps
windows-openjdk12:
executor: win/default
steps:
- checkout
- run: |
choco install maven
- run: |
mvn clean install

workflows:
version: 2
Expand All @@ -55,3 +66,4 @@ workflows:
- openjdk9
- openjdk11
- openjdk13
- windows-openjdk12
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@
<scope>test</scope>
<version>1.18.0</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<scope>test</scope>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-unixsocket</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/timgroup/statsd/ClientChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.timgroup.statsd;

import java.nio.channels.WritableByteChannel;

public interface ClientChannel extends WritableByteChannel {
String getTransportType();
}
55 changes: 55 additions & 0 deletions src/main/java/com/timgroup/statsd/DatagramClientChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;

public class DatagramClientChannel implements ClientChannel {
protected final DatagramChannel delegate;
private final SocketAddress address;

/**
* Creates a new DatagramClientChannel using the default DatagramChannel.
* @param address Address to connect the channel to
* @throws IOException if an I/O error occurs
*/
public DatagramClientChannel(SocketAddress address) throws IOException {
this(DatagramChannel.open(), address);
}

/**
* Creates a new DatagramClientChannel that wraps the delegate.
* @param delegate Implementation this instance wraps
* @param address Address to connect the channel to
*/
public DatagramClientChannel(DatagramChannel delegate, SocketAddress address) {
this.delegate = delegate;
this.address = address;
}

@Override
public boolean isOpen() {
return delegate.isOpen();
}

@Override
public int write(ByteBuffer src) throws IOException {
return delegate.send(src, address);
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public String getTransportType() {
return "udp";
}

@Override
public String toString() {
return "[" + getTransportType() + "] " + address;
}
}
51 changes: 51 additions & 0 deletions src/main/java/com/timgroup/statsd/NamedPipeClientChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.timgroup.statsd;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class NamedPipeClientChannel implements ClientChannel {
private final RandomAccessFile randomAccessFile;
private final FileChannel fileChannel;
private final String pipe;

/**
* Creates a new NamedPipeClientChannel with the given address.
*
* @param address Location of named pipe
* @throws FileNotFoundException if pipe does not exist
*/
public NamedPipeClientChannel(NamedPipeSocketAddress address) throws FileNotFoundException {
pipe = address.getPipe();
randomAccessFile = new RandomAccessFile(pipe, "rw");
fileChannel = randomAccessFile.getChannel();
}

@Override
public boolean isOpen() {
return fileChannel.isOpen();
}

@Override
public int write(ByteBuffer src) throws IOException {
return fileChannel.write(src);
}

@Override
public void close() throws IOException {
// closing the file also closes the channel
randomAccessFile.close();
}

@Override
public String getTransportType() {
return "namedpipe";
}

@Override
public String toString() {
return pipe;
}
}
27 changes: 27 additions & 0 deletions src/main/java/com/timgroup/statsd/NamedPipeSocketAddress.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.timgroup.statsd;

import java.net.SocketAddress;

public class NamedPipeSocketAddress extends SocketAddress {
private static final String NAMED_PIPE_PREFIX = "\\\\.\\pipe\\";
private final String pipe;

public NamedPipeSocketAddress(String pipeName) {
this.pipe = normalizePipeName(pipeName);
}

public String getPipe() {
return pipe;
}

/**
* A normalized version of the pipe name that includes the `\\.\pipe\` prefix
*/
static String normalizePipeName(String pipeName) {
if (pipeName.startsWith(NAMED_PIPE_PREFIX)) {
return pipeName;
} else {
return NAMED_PIPE_PREFIX + pipeName;
}
}
}
87 changes: 37 additions & 50 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.timgroup.statsd;

import jnr.unixsocket.UnixDatagramChannel;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketOptions;

import java.io.IOException;
import java.lang.Double;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
Expand All @@ -23,7 +22,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


/**
* A simple StatsD client implementation facilitating metrics recording.
*
Expand Down Expand Up @@ -56,6 +54,7 @@ public class NonBlockingStatsDClient implements StatsDClient {

public static final String DD_DOGSTATSD_PORT_ENV_VAR = "DD_DOGSTATSD_PORT";
public static final String DD_AGENT_HOST_ENV_VAR = "DD_AGENT_HOST";
public static final String DD_NAMED_PIPE_ENV_VAR = "DD_DOGSTATSD_PIPE_NAME";
public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID";
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;

Expand Down Expand Up @@ -100,7 +99,7 @@ String tag() {
/**
* UTF-8 is the expected encoding for data sent to the agent.
*/
public static final Charset UTF_8 = Charset.forName("UTF-8");
public static final Charset UTF_8 = StandardCharsets.UTF_8;

private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(final Exception ex) { /* No-op */ }
Expand Down Expand Up @@ -154,7 +153,8 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
}

private final String prefix;
private final DatagramChannel clientChannel;
private final ClientChannel clientChannel;
private final ClientChannel telemetryClientChannel;
private final StatsDClientErrorHandler handler;
private final String constantTagsRendered;

Expand Down Expand Up @@ -261,58 +261,27 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
costantPreTags = null;
}

String transportType = "";
try {
final SocketAddress address = addressLookup.call();
if (address instanceof UnixSocketAddress) {
clientChannel = UnixDatagramChannel.open();
// Set send timeout, to handle the case where the transmission buffer is full
// If no timeout is set, the send becomes blocking
if (timeout > 0) {
clientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout);
}
if (bufferSize > 0) {
clientChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize);
}
transportType = "uds";
} else {
clientChannel = DatagramChannel.open();
transportType = "udp";
}
clientChannel = createByteChannel(addressLookup, timeout, bufferSize);

ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory();

statsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes, poolSize,
processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory);
telemetryStatsDProcessor = statsDProcessor;

Properties properties = new Properties();
properties.load(getClass().getClassLoader().getResourceAsStream(
"dogstatsd/version.properties"));

String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + transportType,
String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + clientChannel.getTransportType(),
CLIENT_VERSION_TAG + properties.getProperty("dogstatsd_client_version"),
CLIENT_TAG}, new StringBuilder()).toString();

DatagramChannel telemetryClientChannel = clientChannel;
if (addressLookup != telemetryAddressLookup) {

final SocketAddress telemetryAddress = telemetryAddressLookup.call();
if (telemetryAddress instanceof UnixSocketAddress) {
telemetryClientChannel = UnixDatagramChannel.open();
// Set send timeout, to handle the case where the transmission buffer is full
// If no timeout is set, the send becomes blocking
if (timeout > 0) {
telemetryClientChannel.setOption(UnixSocketOptions.SO_SNDTIMEO, timeout);
}
if (bufferSize > 0) {
telemetryClientChannel.setOption(UnixSocketOptions.SO_SNDBUF, bufferSize);
}
} else if (transportType == "uds") {
// UDP clientChannel can submit to multiple addresses, we only need
// a new channel if transport type is UDS for main traffic.
telemetryClientChannel = DatagramChannel.open();
}
if (addressLookup == telemetryAddressLookup) {
telemetryClientChannel = clientChannel;
telemetryStatsDProcessor = statsDProcessor;
} else {
telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, bufferSize);

// similar settings, but a single worker and non-blocking.
telemetryStatsDProcessor = createProcessor(queueSize, handler, maxPacketSizeBytes,
Expand All @@ -324,16 +293,15 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
.processor(telemetryStatsDProcessor)
.build();

statsDSender = createSender(addressLookup, handler, clientChannel, statsDProcessor.getBufferPool(),
statsDSender = createSender(handler, clientChannel, statsDProcessor.getBufferPool(),
statsDProcessor.getOutboundQueue(), senderWorkers, threadFactory);

telemetryStatsDSender = statsDSender;
if (telemetryStatsDProcessor != statsDProcessor) {
// TODO: figure out why the hell telemetryClientChannel does not work here!
telemetryStatsDSender = createSender(telemetryAddressLookup, handler, telemetryClientChannel,
telemetryStatsDSender = createSender(handler, telemetryClientChannel,
telemetryStatsDProcessor.getBufferPool(), telemetryStatsDProcessor.getOutboundQueue(),
1, threadFactory);

}

// set telemetry
Expand Down Expand Up @@ -389,10 +357,10 @@ protected StatsDProcessor createProcessor(final int queueSize, final StatsDClien
}
}

protected StatsDSender createSender(final Callable<SocketAddress> addressLookup, final StatsDClientErrorHandler handler,
final DatagramChannel clientChannel, BufferPool pool, BlockingQueue<ByteBuffer> buffers, final int senderWorkers,
protected StatsDSender createSender(final StatsDClientErrorHandler handler,
final WritableByteChannel clientChannel, BufferPool pool, BlockingQueue<ByteBuffer> buffers, final int senderWorkers,
final ThreadFactory threadFactory) throws Exception {
return new StatsDSender(addressLookup, clientChannel, handler, pool, buffers, senderWorkers, threadFactory);
return new StatsDSender(clientChannel, handler, pool, buffers, senderWorkers, threadFactory);
}

/**
Expand Down Expand Up @@ -427,6 +395,14 @@ public void stop() {
handler.handle(e);
}
}

if (telemetryClientChannel != null && telemetryClientChannel != clientChannel) {
try {
telemetryClientChannel.close();
} catch (final IOException e) {
handler.handle(e);
}
}
}
}

Expand Down Expand Up @@ -469,6 +445,17 @@ StringBuilder tagString(final String[] tags, StringBuilder builder) {
return tagString(tags, constantTagsRendered, builder);
}

ClientChannel createByteChannel(Callable<SocketAddress> addressLookup, int timeout, int bufferSize) throws Exception {
final SocketAddress address = addressLookup.call();
if (address instanceof NamedPipeSocketAddress) {
return new NamedPipeClientChannel((NamedPipeSocketAddress) address);
} else if (address instanceof UnixSocketAddress) {
return new UnixDatagramClientChannel(address, timeout, bufferSize);
} else {
return new DatagramClientChannel(DatagramChannel.open(), address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work?

Suggested change
return new DatagramClientChannel(DatagramChannel.open(), address);
return new DatagramClientChannel(address);

And then DatagramChannel import can be removed, and maybe 2-argument constructor too (other client channel impls don't have one, so I guess we don't need one there too).

}
}

abstract class StatsDMessage<T extends Number> extends NumericMessage<T> {
final double sampleRate; // NaN for none

Expand Down
Loading