Skip to content

Commit

Permalink
Upgrade to Zipkin 2.19.1 (#63)
Browse files Browse the repository at this point in the history
* update dependencies and change to consumer

* update to latest zipkin
  • Loading branch information
jeqo authored Oct 31, 2019
1 parent 532ed96 commit 7ee7d0a
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 46 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ RUN cd /code && mkdir -p /zipkin-storage-kafka && \
jar xf *.jar && \
rm *.jar

FROM openzipkin/zipkin:2.18.3
FROM openzipkin/zipkin:2.19.1
MAINTAINER Zipkin "https://zipkin.io/"

COPY --from=0 /zipkin-storage-kafka/ /zipkin/lib/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class ZipkinKafkaStorageModule {
.build();
}

// TODO: to be changed when >zipkin 2.18.4 #61
// @Bean public Consumer<ServerBuilder> storageHttpService(StorageComponent storage) {
//TODO replace when Armeria supports Consumer<ServerBuilder> #61
//@Bean public Consumer<ServerBuilder> storageHttpService(StorageComponent storage) {
@Bean public ArmeriaServerConfigurator storageHttpService(StorageComponent storage) {
return sb -> sb.annotatedService(HTTP_PATH_PREFIX, ((KafkaStorage) storage).httpService());
}
Expand Down
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@
<main.basedir>${project.basedir}</main.basedir>

<!-- groupId overrinds allow testing feature branches with jitpack -->
<!-- TODO: update armeria to 0.95 when zipkin >2.18.3 -->
<zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId>
<zipkin.version>2.18.3</zipkin.version>
<zipkin.version>2.19.1</zipkin.version>
<armeria.groupId>com.linecorp.armeria</armeria.groupId>
<armeria.version>0.94.0</armeria.version>
<armeria.version>0.95.0</armeria.version>
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>

<kafka.version>2.3.1</kafka.version>
Expand Down Expand Up @@ -101,7 +100,8 @@
<scm>
<url>https://github.com/openzipkin-contrib/zipkin-storage-kafka</url>
<connection>scm:git:https://github.com/openzipkin-contrib/zipkin-storage-kafka.git</connection>
<developerConnection>scm:git:https://github.com/openzipkin-contrib/zipkin-storage-kafka.git
<developerConnection>
scm:git:https://github.com/openzipkin-contrib/zipkin-storage-kafka.git
</developerConnection>
<tag>HEAD</tag>
</scm>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
* processing of spans as part of a trace.
*/
final class KafkaSpanConsumer implements SpanConsumer {
// Topic names
final String spansTopicName;
// Kafka producers
final Producer<String, byte[]> producer;

KafkaSpanConsumer(KafkaStorage storage) {
Expand Down Expand Up @@ -69,7 +67,8 @@ static class KafkaProducerCall extends Call.Base<Void> {
Producer<String, byte[]> kafkaProducer,
String topic,
String key,
byte[] value) {
byte[] value
) {
this.kafkaProducer = kafkaProducer;
this.topic = topic;
this.key = key;
Expand All @@ -80,7 +79,8 @@ static Call<Void> create(
Producer<String, byte[]> producer,
String topic,
String key,
byte[] value) {
byte[] value
) {
return new KafkaProducerCall(producer, topic, key, value);
}

Expand Down
33 changes: 10 additions & 23 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static KafkaStorageBuilder newBuilder() {
return new KafkaStorageBuilder();
}

// Kafka Storage modes
// Kafka Storage flags
final boolean partitioningEnabled;
final boolean aggregationEnabled;
final boolean traceByIdQueryEnabled;
Expand Down Expand Up @@ -107,7 +107,6 @@ public static KafkaStorageBuilder newBuilder() {
this.storageSpansTopic = builder.traceStorage.spansTopic;
this.storageDependencyTopic = builder.dependencyStorage.dependencyTopic;
// Storage directories
//this.storageDir = builder.storageStateDir;
this.minTracesStored = builder.traceStorage.minTracesStored;
this.httpBaseUrl = builder.httpBaseUrl;
this.hostname = builder.hostname;
Expand Down Expand Up @@ -213,18 +212,10 @@ void checkResources() {
void doClose() {
try {
if (adminClient != null) adminClient.close(Duration.ofSeconds(1));
if (producer != null) {
producer.close(Duration.ofSeconds(1));
}
if (traceStoreStream != null) {
traceStoreStream.close(Duration.ofSeconds(1));
}
if (dependencyStoreStream != null) {
dependencyStoreStream.close(Duration.ofSeconds(1));
}
if (aggregationStream != null) {
aggregationStream.close(Duration.ofSeconds(1));
}
if (producer != null) producer.close(Duration.ofSeconds(1));
if (traceStoreStream != null) traceStoreStream.close(Duration.ofSeconds(1));
if (dependencyStoreStream != null) dependencyStoreStream.close(Duration.ofSeconds(1));
if (aggregationStream != null) aggregationStream.close(Duration.ofSeconds(1));
if (server != null) server.close();
} catch (Exception | Error e) {
LOG.debug("error closing client {}", e.getMessage(), e);
Expand All @@ -234,9 +225,7 @@ void doClose() {
Producer<String, byte[]> getProducer() {
if (producer == null) {
synchronized (this) {
if (producer == null) {
producer = new KafkaProducer<>(producerConfig);
}
if (producer == null) producer = new KafkaProducer<>(producerConfig);
}
}
return producer;
Expand All @@ -245,9 +234,7 @@ Producer<String, byte[]> getProducer() {
AdminClient getAdminClient() {
if (adminClient == null) {
synchronized (this) {
if (adminClient == null) {
adminClient = AdminClient.create(adminConfig);
}
if (adminClient == null) adminClient = AdminClient.create(adminConfig);
}
}
return adminClient;
Expand All @@ -260,7 +247,7 @@ KafkaStreams getTraceStorageStream() {
try {
traceStoreStream = new KafkaStreams(traceStoreTopology, traceStoreStreamConfig);
traceStoreStream.start();
LOG.info("Trace storage topology: {}", traceStoreTopology.describe());
LOG.info("Trace storage topology:\n{}", traceStoreTopology.describe());
} catch (Exception e) {
LOG.debug("Error starting trace storage process", e);
traceStoreStream = null;
Expand All @@ -279,7 +266,7 @@ KafkaStreams getDependencyStorageStream() {
dependencyStoreStream =
new KafkaStreams(dependencyStoreTopology, dependencyStoreStreamConfig);
dependencyStoreStream.start();
LOG.info("Dependency storage topology: {}", dependencyStoreTopology.describe());
LOG.info("Dependency storage topology:\n{}", dependencyStoreTopology.describe());
} catch (Exception e) {
LOG.debug("Error starting dependency storage", e);
dependencyStoreStream = null;
Expand All @@ -297,7 +284,7 @@ KafkaStreams getAggregationStream() {
try {
aggregationStream = new KafkaStreams(aggregationTopology, aggregationStreamConfig);
aggregationStream.start();
LOG.info("Aggregation topology: {}", aggregationTopology.describe());
LOG.info("Aggregation topology:\n{}", aggregationTopology.describe());
} catch (Exception e) {
LOG.debug("Error loading aggregation process", e);
aggregationStream = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ public final KafkaStorageBuilder overrides(Map<String, ?> overrides) {
return this;
}

public KafkaStorageBuilder spanPartitioningBuilder(SpanPartitioningBuilder builder) {
if (builder == null) throw new NullPointerException("builder == null");
this.spanPartitioning = builder;
return this;
}

/**
* Use this hostname to locate zipkin server between each other when forming a cluster.
* <p>
* When running multiple instances server local IP might not be the same as the external IP. e.g.
* Kubernetes Pod IP not been accessible from other Pods, and Service IPs that are accessible.
*/
public KafkaStorageBuilder hostname(String hostname) {
if (hostname == null) throw new NullPointerException("hostname == null");
this.hostname = hostname;
Expand All @@ -128,6 +128,9 @@ public KafkaStorageBuilder hostname(String hostname) {
return this;
}

/**
* Same port as Zipkin Server. To be changed only when Zipkin Server port is changed.
*/
public KafkaStorageBuilder serverPort(int serverPort) {
if (serverPort <= 0) throw new IllegalArgumentException("serverPort <= 0");
this.serverPort = serverPort;
Expand All @@ -136,6 +139,12 @@ public KafkaStorageBuilder serverPort(int serverPort) {
return this;
}

public KafkaStorageBuilder spanPartitioningBuilder(SpanPartitioningBuilder builder) {
if (builder == null) throw new NullPointerException("builder == null");
this.spanPartitioning = builder;
return this;
}

public KafkaStorageBuilder spanAggregationBuilder(SpanAggregationBuilder builder) {
if (builder == null) throw new NullPointerException("builder == null");
this.spanAggregation = builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ final class KafkaStorageHttpService {
@Get("/dependencies")
public AggregatedHttpResponse getDependencies(
@Param("endTs") long endTs,
@Param("lookback") long lookback) {
@Param("lookback") long lookback
) {
try {
if (!storage.dependencyQueryEnabled) return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND);
ReadOnlyWindowStore<Long, DependencyLink> store =
Expand Down Expand Up @@ -165,7 +166,8 @@ public AggregatedHttpResponse getTraces(
@Param("maxDuration") Optional<Long> maxDuration,
@Param("endTs") Optional<Long> endTs,
@Default("86400000") @Param("lookback") Long lookback,
@Default("10") @Param("limit") int limit) {
@Default("10") @Param("limit") int limit
) {
try {
if (!storage.traceSearchEnabled) return AggregatedHttpResponse.of(HttpStatus.NOT_FOUND);
QueryRequest request =
Expand Down Expand Up @@ -223,7 +225,8 @@ void addResults(
ReadOnlyKeyValueStore<String, List<Span>> tracesStore,
List<List<Span>> traces,
List<String> traceIds,
KeyValueIterator<Long, Set<String>> spanIds) {
KeyValueIterator<Long, Set<String>> spanIds
) {
spanIds.forEachRemaining(keyValue -> {
for (String traceId : keyValue.value) {
if (!traceIds.contains(traceId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public DependencyStorageTopology(
String dependencyTopic,
Duration dependencyTtl,
Duration dependencyWindowSize,
boolean dependencyQueryEnabled) {
boolean dependencyQueryEnabled
) {
this.dependencyTopic = dependencyTopic;
this.dependencyTtl = dependencyTtl;
this.dependencyWindowSize = dependencyWindowSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public SpanAggregationTopology(
String traceTopic,
String dependencyTopic,
Duration traceTimeout,
boolean aggregationEnabled) {
boolean aggregationEnabled
) {
this.spansTopic = spansTopic;
this.traceTopic = traceTopic;
this.dependencyTopic = dependencyTopic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public TraceStorageTopology(
Duration traceTtlCheckInterval,
long minTracesStored,
boolean traceByIdQueryEnabled,
boolean traceSearchEnabled) {
boolean traceSearchEnabled
) {
this.spansTopic = spansTopic;
this.autoCompleteKeys = autoCompleteKeys;
this.traceTtl = traceTtl;
Expand Down

0 comments on commit 7ee7d0a

Please sign in to comment.