diff --git a/docker/Dockerfile b/docker/Dockerfile index 37a9746d..51dcbca5 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -24,7 +24,7 @@ RUN cd /code && mkdir -p /zipkin-storage-kafka && \ jar xf *.jar && \ rm *.jar -FROM openzipkin/zipkin:2.18.3 +FROM openzipkin/zipkin:2.19.1 MAINTAINER Zipkin "https://zipkin.io/" COPY --from=0 /zipkin-storage-kafka/ /zipkin/lib/ diff --git a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java index 16bfd8af..3ab4a7a5 100644 --- a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java +++ b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageModule.java @@ -44,8 +44,8 @@ class ZipkinKafkaStorageModule { .build(); } - // TODO: to be changed when >zipkin 2.18.4 #61 - // @Bean public Consumer storageHttpService(StorageComponent storage) { + //TODO replace when Armeria supports Consumer #61 + //@Bean public Consumer storageHttpService(StorageComponent storage) { @Bean public ArmeriaServerConfigurator storageHttpService(StorageComponent storage) { return sb -> sb.annotatedService(HTTP_PATH_PREFIX, ((KafkaStorage) storage).httpService()); } diff --git a/pom.xml b/pom.xml index a0c9be2b..d95a4534 100644 --- a/pom.xml +++ b/pom.xml @@ -50,11 +50,10 @@ ${project.basedir} - io.zipkin.zipkin2 - 2.18.3 + 2.19.1 com.linecorp.armeria - 0.94.0 + 0.95.0 2.2.0.RELEASE 2.3.1 @@ -101,7 +100,8 @@ https://github.com/openzipkin-contrib/zipkin-storage-kafka scm:git:https://github.com/openzipkin-contrib/zipkin-storage-kafka.git - scm:git:https://github.com/openzipkin-contrib/zipkin-storage-kafka.git + + scm:git:https://github.com/openzipkin-contrib/zipkin-storage-kafka.git HEAD diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanConsumer.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanConsumer.java index cb849534..8dfccafb 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanConsumer.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaSpanConsumer.java @@ -35,9 +35,7 @@ * processing of spans as part of a trace. */ final class KafkaSpanConsumer implements SpanConsumer { - // Topic names final String spansTopicName; - // Kafka producers final Producer producer; KafkaSpanConsumer(KafkaStorage storage) { @@ -69,7 +67,8 @@ static class KafkaProducerCall extends Call.Base { Producer kafkaProducer, String topic, String key, - byte[] value) { + byte[] value + ) { this.kafkaProducer = kafkaProducer; this.topic = topic; this.key = key; @@ -80,7 +79,8 @@ static Call create( Producer producer, String topic, String key, - byte[] value) { + byte[] value + ) { return new KafkaProducerCall(producer, topic, key, value); } diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java index f6b588e6..02c3ca4e 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java @@ -60,7 +60,7 @@ public static KafkaStorageBuilder newBuilder() { return new KafkaStorageBuilder(); } - // Kafka Storage modes + // Kafka Storage flags final boolean partitioningEnabled; final boolean aggregationEnabled; final boolean traceByIdQueryEnabled; @@ -107,7 +107,6 @@ public static KafkaStorageBuilder newBuilder() { this.storageSpansTopic = builder.traceStorage.spansTopic; this.storageDependencyTopic = builder.dependencyStorage.dependencyTopic; // Storage directories - //this.storageDir = builder.storageStateDir; this.minTracesStored = builder.traceStorage.minTracesStored; this.httpBaseUrl = builder.httpBaseUrl; this.hostname = builder.hostname; @@ -213,18 +212,10 @@ void checkResources() { void doClose() { try { if (adminClient != null) adminClient.close(Duration.ofSeconds(1)); - if (producer != null) { - producer.close(Duration.ofSeconds(1)); - } - if (traceStoreStream != null) { - traceStoreStream.close(Duration.ofSeconds(1)); - } - if (dependencyStoreStream != null) { - dependencyStoreStream.close(Duration.ofSeconds(1)); - } - if (aggregationStream != null) { - aggregationStream.close(Duration.ofSeconds(1)); - } + if (producer != null) producer.close(Duration.ofSeconds(1)); + if (traceStoreStream != null) traceStoreStream.close(Duration.ofSeconds(1)); + if (dependencyStoreStream != null) dependencyStoreStream.close(Duration.ofSeconds(1)); + if (aggregationStream != null) aggregationStream.close(Duration.ofSeconds(1)); if (server != null) server.close(); } catch (Exception | Error e) { LOG.debug("error closing client {}", e.getMessage(), e); @@ -234,9 +225,7 @@ void doClose() { Producer getProducer() { if (producer == null) { synchronized (this) { - if (producer == null) { - producer = new KafkaProducer<>(producerConfig); - } + if (producer == null) producer = new KafkaProducer<>(producerConfig); } } return producer; @@ -245,9 +234,7 @@ Producer getProducer() { AdminClient getAdminClient() { if (adminClient == null) { synchronized (this) { - if (adminClient == null) { - adminClient = AdminClient.create(adminConfig); - } + if (adminClient == null) adminClient = AdminClient.create(adminConfig); } } return adminClient; @@ -260,7 +247,7 @@ KafkaStreams getTraceStorageStream() { try { traceStoreStream = new KafkaStreams(traceStoreTopology, traceStoreStreamConfig); traceStoreStream.start(); - LOG.info("Trace storage topology: {}", traceStoreTopology.describe()); + LOG.info("Trace storage topology:\n{}", traceStoreTopology.describe()); } catch (Exception e) { LOG.debug("Error starting trace storage process", e); traceStoreStream = null; @@ -279,7 +266,7 @@ KafkaStreams getDependencyStorageStream() { dependencyStoreStream = new KafkaStreams(dependencyStoreTopology, dependencyStoreStreamConfig); dependencyStoreStream.start(); - LOG.info("Dependency storage topology: {}", dependencyStoreTopology.describe()); + LOG.info("Dependency storage topology:\n{}", dependencyStoreTopology.describe()); } catch (Exception e) { LOG.debug("Error starting dependency storage", e); dependencyStoreStream = null; @@ -297,7 +284,7 @@ KafkaStreams getAggregationStream() { try { aggregationStream = new KafkaStreams(aggregationTopology, aggregationStreamConfig); aggregationStream.start(); - LOG.info("Aggregation topology: {}", aggregationTopology.describe()); + LOG.info("Aggregation topology:\n{}", aggregationTopology.describe()); } catch (Exception e) { LOG.debug("Error loading aggregation process", e); aggregationStream = null; diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java index 1b9d2097..50372f62 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java @@ -114,12 +114,12 @@ public final KafkaStorageBuilder overrides(Map overrides) { return this; } - public KafkaStorageBuilder spanPartitioningBuilder(SpanPartitioningBuilder builder) { - if (builder == null) throw new NullPointerException("builder == null"); - this.spanPartitioning = builder; - return this; - } - + /** + * Use this hostname to locate zipkin server between each other when forming a cluster. + *

+ * When running multiple instances server local IP might not be the same as the external IP. e.g. + * Kubernetes Pod IP not been accessible from other Pods, and Service IPs that are accessible. + */ public KafkaStorageBuilder hostname(String hostname) { if (hostname == null) throw new NullPointerException("hostname == null"); this.hostname = hostname; @@ -128,6 +128,9 @@ public KafkaStorageBuilder hostname(String hostname) { return this; } + /** + * Same port as Zipkin Server. To be changed only when Zipkin Server port is changed. + */ public KafkaStorageBuilder serverPort(int serverPort) { if (serverPort <= 0) throw new IllegalArgumentException("serverPort <= 0"); this.serverPort = serverPort; @@ -136,6 +139,12 @@ public KafkaStorageBuilder serverPort(int serverPort) { return this; } + public KafkaStorageBuilder spanPartitioningBuilder(SpanPartitioningBuilder builder) { + if (builder == null) throw new NullPointerException("builder == null"); + this.spanPartitioning = builder; + return this; + } + public KafkaStorageBuilder spanAggregationBuilder(SpanAggregationBuilder builder) { if (builder == null) throw new NullPointerException("builder == null"); this.spanAggregation = builder; diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java index 4437ac98..3eb5df54 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageHttpService.java @@ -79,7 +79,8 @@ final class KafkaStorageHttpService { @Get("/dependencies") public AggregatedHttpResponse getDependencies( @Param("endTs") long endTs, - @Param("lookback") long lookback) { + @Param("lookback") long lookback + ) { try { if (!storage.dependencyQueryEnabled) return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND); ReadOnlyWindowStore store = @@ -165,7 +166,8 @@ public AggregatedHttpResponse getTraces( @Param("maxDuration") Optional maxDuration, @Param("endTs") Optional endTs, @Default("86400000") @Param("lookback") Long lookback, - @Default("10") @Param("limit") int limit) { + @Default("10") @Param("limit") int limit + ) { try { if (!storage.traceSearchEnabled) return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND); QueryRequest request = @@ -223,7 +225,8 @@ void addResults( ReadOnlyKeyValueStore> tracesStore, List> traces, List traceIds, - KeyValueIterator> spanIds) { + KeyValueIterator> spanIds + ) { spanIds.forEachRemaining(keyValue -> { for (String traceId : keyValue.value) { if (!traceIds.contains(traceId)) { diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/DependencyStorageTopology.java b/storage/src/main/java/zipkin2/storage/kafka/streams/DependencyStorageTopology.java index b3527fbd..72b7ee19 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/streams/DependencyStorageTopology.java +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/DependencyStorageTopology.java @@ -49,7 +49,8 @@ public DependencyStorageTopology( String dependencyTopic, Duration dependencyTtl, Duration dependencyWindowSize, - boolean dependencyQueryEnabled) { + boolean dependencyQueryEnabled + ) { this.dependencyTopic = dependencyTopic; this.dependencyTtl = dependencyTtl; this.dependencyWindowSize = dependencyWindowSize; diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java b/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java index a3be8506..7cafa94b 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java @@ -60,7 +60,8 @@ public SpanAggregationTopology( String traceTopic, String dependencyTopic, Duration traceTimeout, - boolean aggregationEnabled) { + boolean aggregationEnabled + ) { this.spansTopic = spansTopic; this.traceTopic = traceTopic; this.dependencyTopic = dependencyTopic; diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/TraceStorageTopology.java b/storage/src/main/java/zipkin2/storage/kafka/streams/TraceStorageTopology.java index 300ae30b..a2dbbc16 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/streams/TraceStorageTopology.java +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/TraceStorageTopology.java @@ -80,7 +80,8 @@ public TraceStorageTopology( Duration traceTtlCheckInterval, long minTracesStored, boolean traceByIdQueryEnabled, - boolean traceSearchEnabled) { + boolean traceSearchEnabled + ) { this.spansTopic = spansTopic; this.autoCompleteKeys = autoCompleteKeys; this.traceTtl = traceTtl;