Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mount storage service on zipkin-server #58

Merged
merged 10 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker/single-instance/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ services:
hostname: zipkin # required to route call to scatter-gather endpoint properly. should not be needed after #40 is solved
ports:
- 9411:9411
- 9412:9412
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka-zookeeper:9092
KAFKA_STORAGE_DIR: /data
Expand Down
6 changes: 5 additions & 1 deletion module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-spring-boot-autoconfigure</artifactId>
<version>${armeria.version}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,40 @@
*/
package zipkin2.module.storage.kafka;

import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
import java.util.List;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin2.storage.StorageComponent;
import zipkin2.storage.kafka.KafkaStorage;

@Configuration
import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX;

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(ZipkinKafkaStorageProperties.class)
@ConditionalOnProperty(name = "zipkin.storage.type", havingValue = "kafka")
@ConditionalOnMissingBean(StorageComponent.class)
class ZipkinKafkaStorageModule {

@Bean
@ConditionalOnMissingBean
StorageComponent storage(ZipkinKafkaStorageProperties properties) {
return properties.toBuilder().build();
@ConditionalOnMissingBean @Bean StorageComponent storage(
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys,
@Value("${server.port:9411}") int port,
ZipkinKafkaStorageProperties properties) {
return properties.toBuilder()
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys)
.serverPort(port)
.build();
}

// TODO: to be changed when >zipkin 2.18.4 #61
// @Bean public Consumer<ServerBuilder> storageHttpService(StorageComponent storage) {
@Bean public ArmeriaServerConfigurator storageHttpService(StorageComponent storage) {
return sb -> sb.annotatedService(HTTP_PATH_PREFIX, ((KafkaStorage) storage).httpService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ KafkaStorageBuilder toBuilder() {
if (dependencyStoreStreamAppId != null) {
builder.dependencyStoreStreamAppId(dependencyStoreStreamAppId);
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
/** opens package access for testing */
public final class Access {
public static void registerKafka(AnnotationConfigApplicationContext context) {
context.register(
PropertyPlaceholderAutoConfiguration.class, ZipkinKafkaStorageModule.class);
context.register(PropertyPlaceholderAutoConfiguration.class, ZipkinKafkaStorageModule.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ZipkinKafkaStorageModuleTest {
class ZipkinKafkaStorageModuleTest {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();

@AfterEach void close() {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<zipkin.groupId>io.zipkin.zipkin2</zipkin.groupId>
<zipkin.version>2.18.3</zipkin.version>
<armeria.groupId>com.linecorp.armeria</armeria.groupId>
<armeria.version>0.94.0</armeria.version>
<armeria.version>0.95.0</armeria.version>
<spring-boot.version>2.2.0.RELEASE</spring-boot.version>

<log4j.version>2.12.1</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
/**
* Span store backed by Kafka Stream distributed state stores built by {@link
* TraceStoreTopologySupplier} and {@link DependencyStoreTopologySupplier}, and made accessible by
* {@link KafkaStoreHttpService}.
* {@link KafkaStorageHttpService}.
*/
final class KafkaSpanStore implements SpanStore, Traces, ServiceAndSpanNames {
static final ObjectMapper MAPPER = new ObjectMapper();
Expand Down
30 changes: 5 additions & 25 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
* </ul>
*/
public class KafkaStorage extends StorageComponent {
public static final String HTTP_PATH_PREFIX = "/storage/kafka";

static final Logger LOG = LogManager.getLogger();

public static KafkaStorageBuilder newBuilder() {
Expand Down Expand Up @@ -181,7 +183,6 @@ void checkResources() {
if (searchEnabled) {
getTraceStoreStream();
getDependencyStoreStream();
getServer();
}
}

Expand All @@ -207,9 +208,6 @@ void checkResources() {
return CheckResult.failed(
new IllegalStateException("Store stream not running. " + dependencyStateStore));
}
if (!getServer().activePort().isPresent()) {
return CheckResult.failed(new IllegalStateException("Storage HTTP server not running."));
}
}
return CheckResult.OK;
} catch (Exception e) {
Expand Down Expand Up @@ -323,31 +321,13 @@ KafkaStreams getAggregationStream() {
return traceAggregationStream;
}

@SuppressWarnings("FutureReturnValueIgnored")
Server getServer() {
if (server == null) {
synchronized (this) {
if (server == null) {
try {
server = Server.builder()
.http(httpPort)
.annotatedService(new KafkaStoreHttpService(this))
.build();
server.start();
} catch (Exception e) {
LOG.error("Error starting http server", e);
server = null;
}
}
}
}
return server;
public KafkaStorageHttpService httpService() {
return new KafkaStorageHttpService(this);
}

@Override public String toString() {
return "KafkaStorage{" +
"httpPort=" + httpPort +
", spanConsumerEnabled=" + spanConsumerEnabled +
"spanConsumerEnabled=" + spanConsumerEnabled +
", searchEnabled=" + searchEnabled +
", storageDir='" + storageDir + '\'' +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.kafka.streams.StreamsConfig;
import zipkin2.storage.StorageComponent;

import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX;

// extracted as the type is huge
public final class KafkaStorageBuilder extends StorageComponent.Builder {
boolean spanConsumerEnabled = true;
Expand All @@ -42,9 +44,9 @@ public final class KafkaStorageBuilder extends StorageComponent.Builder {

long minTracesStored = 10_000;
String hostname = "localhost";
int httpPort = 9412;
int httpPort = 9411;
BiFunction<String, Integer, String> httpBaseUrl =
(hostname, port) -> "http://" + hostname + ":" + port;
(hostname, port) -> "http://" + hostname + ":" + port + HTTP_PATH_PREFIX;

String storageDir = "/tmp/zipkin-storage-kafka";

Expand Down Expand Up @@ -139,7 +141,7 @@ public KafkaStorageBuilder hostname(String hostname) {
return this;
}

public KafkaStorageBuilder httpPort(int httpPort) {
public KafkaStorageBuilder serverPort(int httpPort) {
this.httpPort = httpPort;
traceStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
dependencyStoreStreamConfig.put(StreamsConfig.APPLICATION_SERVER_CONFIG, hostInfo());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.annotation.Default;
import com.linecorp.armeria.server.annotation.Get;
import com.linecorp.armeria.server.annotation.Param;
Expand All @@ -31,7 +30,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -65,22 +63,18 @@
* distributed state. This component exposes access to local state via Http call from {@link
* KafkaSpanStore}
*/
final class KafkaStoreHttpService implements Consumer<ServerBuilder> {
final class KafkaStorageHttpService {
static final Logger LOG = LogManager.getLogger();
static final ObjectMapper MAPPER = new ObjectMapper();

final KafkaStorage storage;
final long minTracesStored;

KafkaStoreHttpService(KafkaStorage storage) {
KafkaStorageHttpService(KafkaStorage storage) {
this.storage = storage;
this.minTracesStored = storage.minTracesStored;
}

@Override public void accept(ServerBuilder serverBuilder) {
serverBuilder.annotatedService("/zipkin/storage/kafka", this);
}

@Get("/dependencies")
public AggregatedHttpResponse getDependencies(
@Param("endTs") long endTs,
Expand Down
11 changes: 10 additions & 1 deletion storage/src/test/java/zipkin2/storage/kafka/KafkaStorageIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package zipkin2.storage.kafka;

import com.linecorp.armeria.server.Server;
import java.io.IOException;
import java.net.ServerSocket;
import java.time.Duration;
Expand Down Expand Up @@ -61,6 +62,7 @@ class KafkaStorageIT {

Duration traceTimeout;
KafkaStorage storage;
Server server;
Properties consumerConfig;
KafkaProducer<String, List<Span>> tracesProducer;
KafkaProducer<String, DependencyLink> dependencyProducer;
Expand All @@ -77,12 +79,18 @@ class KafkaStorageIT {
assertThat(kafka.isRunning()).isTrue();

traceTimeout = Duration.ofSeconds(5);
int httpPort = randomPort();
storage = (KafkaStorage) KafkaStorage.newBuilder()
.bootstrapServers(kafka.getBootstrapServers())
.storageDir("target/zipkin_" + System.currentTimeMillis())
.traceTimeout(traceTimeout)
.httpPort(randomPort())
.serverPort(httpPort)
.build();
server = Server.builder()
.http(httpPort)
.annotatedService(KafkaStorage.HTTP_PATH_PREFIX, storage.httpService())
.build();
server.start();

Collection<NewTopic> newTopics = new ArrayList<>();
newTopics.add(new NewTopic(storage.aggregationSpansTopic, 1, (short) 1));
Expand All @@ -107,6 +115,7 @@ class KafkaStorageIT {
tracesProducer = null;
storage.close();
storage = null;
server.close();
spansSerde.close();
dependencyLinkSerde.close();
}
Expand Down