diff --git a/autoconfigure/README.md b/autoconfigure/README.md index e2a88493..03799a53 100644 --- a/autoconfigure/README.md +++ b/autoconfigure/README.md @@ -13,7 +13,32 @@ | Configuration | Description | Default | |---------------|-------------|---------| +| `KAFKA_STORAGE_HOST_NAME` | Host name used by storage instances to scatter-gather results | `localhost` | | `KAFKA_STORAGE_DIR` | Root path where Zipkin stores tracing data | `/tmp/zipkin-storage-kafka` | | `KAFKA_STORAGE_TRACE_TIMEOUT` | How long to wait until a trace window is closed (ms). If this config is to small, dependency links won't be caught and metrics may drift. | `600000` (1 minute) | | `KAFKA_STORAGE_TRACE_TTL` | How long to keep traces stored. | `259200000` (3 days) | | `KAFKA_STORAGE_DEPENDENCY_TTL` | How long to keep dependencies stored. | `604800000` (1 week) | + +### When Kubernetes/Openshift + +When running on Kubernetes/Openshift is recommended to use `statefulsets` in order to maintain +storage state directories. + +#### Configure hostname + +For instances to access other pods on the stateful set, we have to use valid DNS-names: + +```yaml + env: + - name: STORAGE_TYPE + value: kafka + # Gather hostname (name-${POD_ID}) from metadata + - name: HOSTNAME + valueFrom: + fieldRef: + fieldPath: metadata.name + # Mapping hostname to Kubernetes DNS defined service name (${NAME}-${POD_ID}.${SVC}.${NAMESPACE}.svc.cluster.local), + # then instance storage becomes accessible between them + - name: KAFKA_STORAGE_HOST_NAME + value: $(HOSTNAME).zipkin.default.svc.cluster.local +``` diff --git a/autoconfigure/src/main/java/zipkin2/autoconfigure/storage/kafka/ZipkinKafkaStorageProperties.java b/autoconfigure/src/main/java/zipkin2/autoconfigure/storage/kafka/ZipkinKafkaStorageProperties.java index 1b28f743..af5bce0c 100644 --- a/autoconfigure/src/main/java/zipkin2/autoconfigure/storage/kafka/ZipkinKafkaStorageProperties.java +++ b/autoconfigure/src/main/java/zipkin2/autoconfigure/storage/kafka/ZipkinKafkaStorageProperties.java @@ -27,6 +27,8 @@ public class ZipkinKafkaStorageProperties implements Serializable { private Boolean spanConsumerEnabled; + private String hostname; + private String bootstrapServers; private Long traceTtlCheckInterval; @@ -57,6 +59,7 @@ public class ZipkinKafkaStorageProperties implements Serializable { KafkaStorageBuilder toBuilder() { KafkaStorageBuilder builder = KafkaStorage.newBuilder(); if (spanConsumerEnabled != null) builder.spanConsumerEnabled(spanConsumerEnabled); + if (hostname != null) builder.hostname(hostname); if (bootstrapServers != null) builder.bootstrapServers(bootstrapServers); if (traceTimeout != null) { builder.traceTimeout(Duration.ofMillis(traceTimeout)); @@ -99,7 +102,11 @@ KafkaStorageBuilder toBuilder() { return builder; } - public void setSpanConsumerEnabled(boolean spanConsumerEnabled) { + public Boolean getSpanConsumerEnabled() { + return spanConsumerEnabled; + } + + public void setSpanConsumerEnabled(Boolean spanConsumerEnabled) { this.spanConsumerEnabled = spanConsumerEnabled; } @@ -111,6 +118,14 @@ public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + public Long getTraceTtlCheckInterval() { return traceTtlCheckInterval; } @@ -143,14 +158,6 @@ public void setSpansTopic(String spansTopic) { this.spansTopic = spansTopic; } - public Boolean getSpanConsumerEnabled() { - return spanConsumerEnabled; - } - - public void setSpanConsumerEnabled(Boolean spanConsumerEnabled) { - this.spanConsumerEnabled = spanConsumerEnabled; - } - public String getTraceTopic() { return traceTopic; } diff --git a/autoconfigure/src/main/resources/zipkin-server-kafka.yml b/autoconfigure/src/main/resources/zipkin-server-kafka.yml index 49a64cb5..44a8e1d8 100644 --- a/autoconfigure/src/main/resources/zipkin-server-kafka.yml +++ b/autoconfigure/src/main/resources/zipkin-server-kafka.yml @@ -2,6 +2,7 @@ zipkin: storage: kafka: + hostname: ${KAFKA_STORAGE_HOST_NAME:${HOSTNAME:localhost}} # Connection to Kafka bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} # Kafka topic names @@ -19,4 +20,4 @@ zipkin: trace-timeout: ${KAFKA_STORAGE_TRACE_TIMEOUT:60000} trace-ttl: ${KAFKA_STORAGE_TRACE_TTL:259200000} trace-ttl-check-interval: ${KAFKA_STORAGE_TRACE_TTL_CHECK_INTERVAL:3600000} - dependency-ttl: ${KAFKA_STORAGE_DEPENDENCY_TTL:604800000} \ No newline at end of file + dependency-ttl: ${KAFKA_STORAGE_DEPENDENCY_TTL:604800000} diff --git a/autoconfigure/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageAutoConfigurationTest.java b/autoconfigure/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageAutoConfigurationTest.java index 5e464b67..ecb3dc4b 100644 --- a/autoconfigure/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageAutoConfigurationTest.java +++ b/autoconfigure/src/test/java/zipkin2/storage/kafka/ZipkinKafkaStorageAutoConfigurationTest.java @@ -168,4 +168,16 @@ public class ZipkinKafkaStorageAutoConfigurationTest { assertThat(context.getBean(KafkaStorage.class).dependencyTopicName).isEqualTo( "zipkin-dependencies-1"); } + + @Test void canOverridesProperty_hostname() { + TestPropertyValues.of( + "zipkin.storage.type:kafka", + "zipkin.storage.kafka.hostname:other_host" + ).applyTo(context); + Access.registerKafka(context); + context.refresh(); + + assertThat(context.getBean(KafkaStorage.class).hostname).isEqualTo( + "other_host"); + } } diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java index 6e3ba6a1..7ad101f3 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java @@ -67,6 +67,7 @@ public static KafkaStorageBuilder newBuilder() { // Kafka Storage configs final String storageDir; final long minTracesStored; + final String hostname; final int httpPort; // Kafka Topics final String spansTopicName, traceTopicName, dependencyTopicName; @@ -97,6 +98,7 @@ public static KafkaStorageBuilder newBuilder() { // Storage directories this.storageDir = builder.storageDir; this.minTracesStored = builder.minTracesStored; + this.hostname = builder.hostname; this.httpPort = builder.httpPort; this.httpBaseUrl = builder.httpBaseUrl; // Kafka Configs diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java index f6ef31d6..1a1b3b08 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java @@ -43,6 +43,7 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder { Duration dependencyWindowSize = Duration.ofMinutes(1); long minTracesStored = 10_000; + String hostname = "localhost"; int httpPort = 9412; BiFunction httpBaseUrl = (hostname, port) -> "http://" + hostname + ":" + port; @@ -100,13 +101,7 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder { } String hostInfo() { - String hostInfo = "localhost"; - try { - hostInfo = InetAddress.getLocalHost().getHostName() + ":" + httpPort; - } catch (UnknownHostException e) { - e.printStackTrace(); - } - return hostInfo; + return hostname + ":" + httpPort; } @Override public KafkaStorageBuilder strictTraceId(boolean strictTraceId) { @@ -135,6 +130,14 @@ public KafkaStorageBuilder spanConsumerEnabled(boolean spanConsumerEnabled) { return this; } + public KafkaStorageBuilder hostname(String hostname) { + if (hostname == null) throw new NullPointerException("hostname == null"); + this.hostname = hostname; + traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo()); + dependencyStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo()); + return this; + } + public KafkaStorageBuilder httpPort(int httpPort) { this.httpPort = httpPort; traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo()); @@ -146,9 +149,7 @@ public KafkaStorageBuilder httpPort(int httpPort) { * How long to wait for a span in order to trigger a trace as completed. */ public KafkaStorageBuilder traceTimeout(Duration traceTimeout) { - if (traceTimeout == null) { - throw new NullPointerException("traceTimeout == null"); - } + if (traceTimeout == null) throw new NullPointerException("traceTimeout == null"); this.traceTimeout = traceTimeout; return this; } diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStreamsMetadata.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStreamsMetadata.java index a5bad486..53e6fdb3 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStreamsMetadata.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStreamsMetadata.java @@ -19,7 +19,7 @@ final class KafkaStreamsMetadata { static KafkaStreamsMetadata create( - Collection other) { + Collection other) { KafkaStreamsMetadata metadata = new KafkaStreamsMetadata(); metadata.metadata = other.stream().map(StreamsMetadata::create).collect(Collectors.toSet()); return metadata; @@ -27,7 +27,15 @@ static KafkaStreamsMetadata create( Set metadata; - Set getMetadata() { + KafkaStreamsMetadata() { + } + + public void setMetadata( + Set metadata) { + this.metadata = metadata; + } + + public Set getMetadata() { return metadata; } @@ -37,8 +45,8 @@ static StreamsMetadata create(org.apache.kafka.streams.state.StreamsMetadata oth metadata.hostInfo = HostInfo.create(other.hostInfo()); metadata.storeNames = other.stateStoreNames(); metadata.topicPartitions = other.topicPartitions().stream() - .map(TopicPartition::create) - .collect(Collectors.toSet()); + .map(TopicPartition::create) + .collect(Collectors.toSet()); return metadata; } @@ -46,6 +54,22 @@ static StreamsMetadata create(org.apache.kafka.streams.state.StreamsMetadata oth Set storeNames; Set topicPartitions; + StreamsMetadata() { + } + + public void setHostInfo(HostInfo hostInfo) { + this.hostInfo = hostInfo; + } + + public void setStoreNames(Set storeNames) { + this.storeNames = storeNames; + } + + public void setTopicPartitions( + Set topicPartitions) { + this.topicPartitions = topicPartitions; + } + public HostInfo getHostInfo() { return hostInfo; } @@ -69,6 +93,17 @@ static HostInfo create(org.apache.kafka.streams.state.HostInfo other) { String host; Integer port; + HostInfo() { + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(Integer port) { + this.port = port; + } + public String getHost() { return host; } @@ -86,9 +121,20 @@ static TopicPartition create(org.apache.kafka.common.TopicPartition other) { return topicPartition; } + TopicPartition() { + } + String topic; Integer partition; + public void setTopic(String topic) { + this.topic = topic; + } + + public void setPartition(Integer partition) { + this.partition = partition; + } + public String getTopic() { return topic; } diff --git a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java index 0905925c..1e632dae 100644 --- a/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java +++ b/storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java @@ -197,7 +197,6 @@ class KafkaStorageIT { .limit(1) .build()) .execute(); - assertThat(filteredTraces).hasSize(1); assertThat(filteredTraces.get(0)).hasSize(1); // last trace is returned first List services = serviceAndSpanNames.getServiceNames().execute();