Skip to content

Commit

Permalink
Add Hostname variable (#49)
Browse files Browse the repository at this point in the history
* feat: add hostname to storage builder

* fix: metadata getter/setters for jackson to work

* chore: test set hostname

* docs: how to config hostname
  • Loading branch information
jeqo committed Oct 8, 2019
1 parent d78f3cf commit 92ae81b
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 25 deletions.
25 changes: 25 additions & 0 deletions autoconfigure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,32 @@

| Configuration | Description | Default |
|---------------|-------------|---------|
| `KAFKA_STORAGE_HOST_NAME` | Host name used by storage instances to scatter-gather results | `localhost` |
| `KAFKA_STORAGE_DIR` | Root path where Zipkin stores tracing data | `/tmp/zipkin-storage-kafka` |
| `KAFKA_STORAGE_TRACE_TIMEOUT` | How long to wait until a trace window is closed (ms). If this config is to small, dependency links won't be caught and metrics may drift. | `600000` (1 minute) |
| `KAFKA_STORAGE_TRACE_TTL` | How long to keep traces stored. | `259200000` (3 days) |
| `KAFKA_STORAGE_DEPENDENCY_TTL` | How long to keep dependencies stored. | `604800000` (1 week) |

### When Kubernetes/Openshift

When running on Kubernetes/Openshift is recommended to use `statefulsets` in order to maintain
storage state directories.

#### Configure hostname

For instances to access other pods on the stateful set, we have to use valid DNS-names:

```yaml
env:
- name: STORAGE_TYPE
value: kafka
# Gather hostname (name-${POD_ID}) from metadata
- name: HOSTNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
# Mapping hostname to Kubernetes DNS defined service name (${NAME}-${POD_ID}.${SVC}.${NAMESPACE}.svc.cluster.local),
# then instance storage becomes accessible between them
- name: KAFKA_STORAGE_HOST_NAME
value: $(HOSTNAME).zipkin.default.svc.cluster.local
```
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class ZipkinKafkaStorageProperties implements Serializable {

private Boolean spanConsumerEnabled;

private String hostname;

private String bootstrapServers;

private Long traceTtlCheckInterval;
Expand Down Expand Up @@ -57,6 +59,7 @@ public class ZipkinKafkaStorageProperties implements Serializable {
KafkaStorageBuilder toBuilder() {
KafkaStorageBuilder builder = KafkaStorage.newBuilder();
if (spanConsumerEnabled != null) builder.spanConsumerEnabled(spanConsumerEnabled);
if (hostname != null) builder.hostname(hostname);
if (bootstrapServers != null) builder.bootstrapServers(bootstrapServers);
if (traceTimeout != null) {
builder.traceTimeout(Duration.ofMillis(traceTimeout));
Expand Down Expand Up @@ -99,7 +102,11 @@ KafkaStorageBuilder toBuilder() {
return builder;
}

public void setSpanConsumerEnabled(boolean spanConsumerEnabled) {
public Boolean getSpanConsumerEnabled() {
return spanConsumerEnabled;
}

public void setSpanConsumerEnabled(Boolean spanConsumerEnabled) {
this.spanConsumerEnabled = spanConsumerEnabled;
}

Expand All @@ -111,6 +118,14 @@ public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public String getHostname() {
return hostname;
}

public void setHostname(String hostname) {
this.hostname = hostname;
}

public Long getTraceTtlCheckInterval() {
return traceTtlCheckInterval;
}
Expand Down Expand Up @@ -143,14 +158,6 @@ public void setSpansTopic(String spansTopic) {
this.spansTopic = spansTopic;
}

public Boolean getSpanConsumerEnabled() {
return spanConsumerEnabled;
}

public void setSpanConsumerEnabled(Boolean spanConsumerEnabled) {
this.spanConsumerEnabled = spanConsumerEnabled;
}

public String getTraceTopic() {
return traceTopic;
}
Expand Down
3 changes: 2 additions & 1 deletion autoconfigure/src/main/resources/zipkin-server-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
zipkin:
storage:
kafka:
hostname: ${KAFKA_STORAGE_HOST_NAME:${HOSTNAME:localhost}}
# Connection to Kafka
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# Kafka topic names
Expand All @@ -19,4 +20,4 @@ zipkin:
trace-timeout: ${KAFKA_STORAGE_TRACE_TIMEOUT:60000}
trace-ttl: ${KAFKA_STORAGE_TRACE_TTL:259200000}
trace-ttl-check-interval: ${KAFKA_STORAGE_TRACE_TTL_CHECK_INTERVAL:3600000}
dependency-ttl: ${KAFKA_STORAGE_DEPENDENCY_TTL:604800000}
dependency-ttl: ${KAFKA_STORAGE_DEPENDENCY_TTL:604800000}
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,16 @@ public class ZipkinKafkaStorageAutoConfigurationTest {
assertThat(context.getBean(KafkaStorage.class).dependencyTopicName).isEqualTo(
"zipkin-dependencies-1");
}

@Test void canOverridesProperty_hostname() {
TestPropertyValues.of(
"zipkin.storage.type:kafka",
"zipkin.storage.kafka.hostname:other_host"
).applyTo(context);
Access.registerKafka(context);
context.refresh();

assertThat(context.getBean(KafkaStorage.class).hostname).isEqualTo(
"other_host");
}
}
2 changes: 2 additions & 0 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public static KafkaStorageBuilder newBuilder() {
// Kafka Storage configs
final String storageDir;
final long minTracesStored;
final String hostname;
final int httpPort;
// Kafka Topics
final String spansTopicName, traceTopicName, dependencyTopicName;
Expand Down Expand Up @@ -97,6 +98,7 @@ public static KafkaStorageBuilder newBuilder() {
// Storage directories
this.storageDir = builder.storageDir;
this.minTracesStored = builder.minTracesStored;
this.hostname = builder.hostname;
this.httpPort = builder.httpPort;
this.httpBaseUrl = builder.httpBaseUrl;
// Kafka Configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {
Duration dependencyWindowSize = Duration.ofMinutes(1);

long minTracesStored = 10_000;
String hostname = "localhost";
int httpPort = 9412;
BiFunction<String, Integer, String> httpBaseUrl =
(hostname, port) -> "http://" + hostname + ":" + port;
Expand Down Expand Up @@ -100,13 +101,7 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {
}

String hostInfo() {
String hostInfo = "localhost";
try {
hostInfo = InetAddress.getLocalHost().getHostName() + ":" + httpPort;
} catch (UnknownHostException e) {
e.printStackTrace();
}
return hostInfo;
return hostname + ":" + httpPort;
}

@Override public KafkaStorageBuilder strictTraceId(boolean strictTraceId) {
Expand Down Expand Up @@ -135,6 +130,14 @@ public KafkaStorageBuilder spanConsumerEnabled(boolean spanConsumerEnabled) {
return this;
}

public KafkaStorageBuilder hostname(String hostname) {
if (hostname == null) throw new NullPointerException("hostname == null");
this.hostname = hostname;
traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
dependencyStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
return this;
}

public KafkaStorageBuilder httpPort(int httpPort) {
this.httpPort = httpPort;
traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
Expand All @@ -146,9 +149,7 @@ public KafkaStorageBuilder httpPort(int httpPort) {
* How long to wait for a span in order to trigger a trace as completed.
*/
public KafkaStorageBuilder traceTimeout(Duration traceTimeout) {
if (traceTimeout == null) {
throw new NullPointerException("traceTimeout == null");
}
if (traceTimeout == null) throw new NullPointerException("traceTimeout == null");
this.traceTimeout = traceTimeout;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,23 @@

final class KafkaStreamsMetadata {
static KafkaStreamsMetadata create(
Collection<org.apache.kafka.streams.state.StreamsMetadata> other) {
Collection<org.apache.kafka.streams.state.StreamsMetadata> other) {
KafkaStreamsMetadata metadata = new KafkaStreamsMetadata();
metadata.metadata = other.stream().map(StreamsMetadata::create).collect(Collectors.toSet());
return metadata;
}

Set<StreamsMetadata> metadata;

Set<StreamsMetadata> getMetadata() {
KafkaStreamsMetadata() {
}

public void setMetadata(
Set<StreamsMetadata> metadata) {
this.metadata = metadata;
}

public Set<StreamsMetadata> getMetadata() {
return metadata;
}

Expand All @@ -37,15 +45,31 @@ static StreamsMetadata create(org.apache.kafka.streams.state.StreamsMetadata oth
metadata.hostInfo = HostInfo.create(other.hostInfo());
metadata.storeNames = other.stateStoreNames();
metadata.topicPartitions = other.topicPartitions().stream()
.map(TopicPartition::create)
.collect(Collectors.toSet());
.map(TopicPartition::create)
.collect(Collectors.toSet());
return metadata;
}

HostInfo hostInfo;
Set<String> storeNames;
Set<TopicPartition> topicPartitions;

StreamsMetadata() {
}

public void setHostInfo(HostInfo hostInfo) {
this.hostInfo = hostInfo;
}

public void setStoreNames(Set<String> storeNames) {
this.storeNames = storeNames;
}

public void setTopicPartitions(
Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
}

public HostInfo getHostInfo() {
return hostInfo;
}
Expand All @@ -69,6 +93,17 @@ static HostInfo create(org.apache.kafka.streams.state.HostInfo other) {
String host;
Integer port;

HostInfo() {
}

public void setHost(String host) {
this.host = host;
}

public void setPort(Integer port) {
this.port = port;
}

public String getHost() {
return host;
}
Expand All @@ -86,9 +121,20 @@ static TopicPartition create(org.apache.kafka.common.TopicPartition other) {
return topicPartition;
}

TopicPartition() {
}

String topic;
Integer partition;

public void setTopic(String topic) {
this.topic = topic;
}

public void setPartition(Integer partition) {
this.partition = partition;
}

public String getTopic() {
return topic;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ class KafkaStorageIT {
.limit(1)
.build())
.execute();

assertThat(filteredTraces).hasSize(1);
assertThat(filteredTraces.get(0)).hasSize(1); // last trace is returned first
List<String> services = serviceAndSpanNames.getServiceNames().execute();
Expand Down

0 comments on commit 92ae81b

Please sign in to comment.