Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring streams and storage #25

Merged
merged 84 commits into from
Aug 23, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
84 commits
Select commit Hold shift + click to select a range
6794c8a
chore: update to kstreams 2.3
jeqo Aug 5, 2019
1b9e4fe
refactor: draft aggregation and indexing without lucene
jeqo Aug 5, 2019
546171a
refactor: dependency link aggregation
jeqo Aug 6, 2019
420f706
refactor: compilable set of changes;
jeqo Aug 7, 2019
f4f4fc1
chore: fix formatting
jeqo Aug 7, 2019
af02905
fix: failing tests
jeqo Aug 7, 2019
2c3e799
chore: deprecate and format
jeqo Aug 7, 2019
21214e2
chore: update docker image for it
jeqo Aug 7, 2019
c7f1d64
fix: remote service call and store names
jeqo Aug 7, 2019
0690142
feat: testing suppress on trace aggregation
jeqo Aug 7, 2019
5d42256
feat: unit testing aggregation
jeqo Aug 8, 2019
de706f6
feat: testing traces retention
jeqo Aug 8, 2019
a83fa22
feat: complete streams unit testing
jeqo Aug 9, 2019
c28fd60
feat: it testing trace queries;
jeqo Aug 9, 2019
c0ef7d6
feat: it test for dependencies
jeqo Aug 9, 2019
5d07318
refactor: join stream apps into 2
jeqo Aug 9, 2019
2e55020
chore: remove old comment
jeqo Aug 9, 2019
8627fa5
chore: remove unused plugin
jeqo Aug 9, 2019
f298585
docs: add methods description
jeqo Aug 9, 2019
506fc6d
chore: update versions
jeqo Aug 9, 2019
3832e66
feat: autocomplete tags support
jeqo Aug 10, 2019
44dd4a8
feat: moving to zipkin/distroless images
jeqo Aug 10, 2019
9439dd7
feat: docker image renewed;
jeqo Aug 10, 2019
1a75c0d
fix: failing test
jeqo Aug 10, 2019
a4d732b
chore: clean properties
jeqo Aug 10, 2019
0bc72f9
fix: execution handling and code reuse
jeqo Aug 11, 2019
19d2764
chore: add todo for async call
jeqo Aug 11, 2019
1ef4c4e
fix: issue with dependency query
jeqo Aug 11, 2019
6968287
chore: close producer
jeqo Aug 11, 2019
fb055f2
chore: increase ti
jeqo Aug 11, 2019
e3302f7
chore: trying to chase issue with travis test
jeqo Aug 11, 2019
3a92195
chore: trying to chase issue with travis test
jeqo Aug 11, 2019
1fd43e6
docs: update stream images
jeqo Aug 11, 2019
c2ce59c
fix: ensure topics are created before running tests
jeqo Aug 11, 2019
53e8a7a
chore: remove non needed print exception stack
jeqo Aug 11, 2019
70aa7e7
chore: update configs and docs;
jeqo Aug 11, 2019
2668613
chore: var names and logging
jeqo Aug 12, 2019
9c21aae
chore: var names and logging
jeqo Aug 12, 2019
eb28ea8
chore: var names and logging
jeqo Aug 12, 2019
e69fc21
chore: var names and logging
jeqo Aug 12, 2019
10ab62e
chore: var names and logging
jeqo Aug 12, 2019
4687d83
feat: aligning configs and docker
jeqo Aug 12, 2019
2795943
fix: topic name in tests
jeqo Aug 12, 2019
551b252
chore: update docs and configs
jeqo Aug 12, 2019
f9562f7
chore: doc mounting custom libs
jeqo Aug 12, 2019
67627c2
fix: dependency-links to dependencies
jeqo Aug 12, 2019
e7920e7
docs: add steps to test
jeqo Aug 12, 2019
0c5b9a2
fix: issue with inactivity gap variable
jeqo Aug 13, 2019
9be6a84
chore: simplify link mapping and clean imports
jeqo Aug 13, 2019
e2667c6
docs: add ack to ksteram viz app
jeqo Aug 13, 2019
49bac94
fix: topic name
jeqo Aug 16, 2019
7db654d
chore: add container names
jeqo Aug 16, 2019
428d1d3
chore: update versions
jeqo Aug 16, 2019
1428589
feat: remove topics details and rename vars
jeqo Aug 16, 2019
cca270a
docs: more specific naming
jeqo Aug 16, 2019
a06b8d5
docs: remove too specific variables and simplify config
jeqo Aug 16, 2019
59d29a0
fix: store dir test
jeqo Aug 16, 2019
5175d97
feat: add clients overrides
jeqo Aug 17, 2019
f39db2a
fix: overrides
jeqo Aug 17, 2019
4e889d0
chore: remove 'all' filter
jeqo Aug 18, 2019
95d7c62
chore: rename kafka topic env variables
jeqo Aug 18, 2019
1876dae
docs: update env list
jeqo Aug 18, 2019
ccdd485
feat: back to separated stores to support dependency only use-case
jeqo Aug 19, 2019
f0dada6
docs: updating environments and doc of new approach
jeqo Aug 19, 2019
fdf3960
chore: simplifying testing
jeqo Aug 19, 2019
6aa3b23
chore: rename vars
jeqo Aug 19, 2019
dc54c59
chore: add suppress warning for ignored future
jeqo Aug 19, 2019
be2ded5
feat: rename profile and storage to kafka
jeqo Aug 19, 2019
671125e
docs: component javadocs
jeqo Aug 21, 2019
6a4ad2d
docs: add graph details
jeqo Aug 21, 2019
d4bc461
docs: add graph details
jeqo Aug 21, 2019
fe5f8b0
chore: fix acks
jeqo Aug 21, 2019
a22ee71
update base zipkin version
jeqo Aug 22, 2019
6e4056b
chore: rename from span to spans topic
jeqo Aug 22, 2019
120ae46
Merge branch 'develop' of github.com:jeqo/zipkin-storage-kafka into d…
jeqo Aug 22, 2019
139379d
refactor: simplify config names
jeqo Aug 22, 2019
76c997a
docs: add topics list;
jeqo Aug 22, 2019
d46afc0
fix: joining spans
jeqo Aug 22, 2019
0cba428
chore: update javadocs, config passing, and minimum traces stored.
jeqo Aug 22, 2019
cbba4c8
chore: small fixes
jeqo Aug 22, 2019
6365b87
chore: rename vars
jeqo Aug 23, 2019
a74f862
refactor: rename to flush interval
jeqo Aug 23, 2019
48e87a9
docs: add details about inactivity gap
jeqo Aug 23, 2019
7206365
refactor: more renames
jeqo Aug 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ defines if a trace is still active or not. This is evaluated on the next span re
regardless of incoming `traceId`. If session window is closed, a trace message is emitted to the
traces topic.

![Session Windows](https://kafka.apache.org/20/images/streams-session-windows-02.png)

Each color represents a trace. The longer `inactivity gap` (default: 1 minute) we have, the longer we wait
to close a window and the longer we wait to emit traces downstream for dependency link and additional
aggregations; but also the more consistent the trace aggregation is.
If we choose a smaller gap, then we emit traces faster with the risk of breaking traces into
smaller chunks, and potentially affecting counters downstream.

**Dependencies**

Once `traces` are emitted downstream as part of the initial processing, dependency links are evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ZipkinKafkaStorageProperties implements Serializable {

private String bootstrapServers;

private Long traceTtlCheckInterval;
private Long traceFlushInterval;
jeqo marked this conversation as resolved.
Show resolved Hide resolved
private Long traceTtl;
private Long traceInactivityGap;

Expand Down Expand Up @@ -60,8 +60,8 @@ KafkaStorage.Builder toBuilder() {
if (traceInactivityGap != null) {
builder.traceInactivityGap(Duration.ofMillis(traceInactivityGap));
}
if (traceTtlCheckInterval != null) {
builder.traceTtlCheckInterval(Duration.ofMillis(traceTtlCheckInterval));
if (traceFlushInterval != null) {
builder.traceFlushInterval(Duration.ofMillis(traceFlushInterval));
}
if (traceTtl != null) {
builder.traceTtl(Duration.ofMillis(traceTtl));
Expand Down Expand Up @@ -110,12 +110,12 @@ public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public Long getTraceTtlCheckInterval() {
return traceTtlCheckInterval;
public Long getTraceFlushInterval() {
return traceFlushInterval;
}

public void setTraceTtlCheckInterval(Long traceTtlCheckInterval) {
this.traceTtlCheckInterval = traceTtlCheckInterval;
public void setTraceFlushInterval(Long traceFlushInterval) {
this.traceFlushInterval = traceFlushInterval;
}

public Long getTraceTtl() {
Expand Down
2 changes: 1 addition & 1 deletion autoconfigure/src/main/resources/zipkin-server-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ zipkin:

span-consumer-enabled: true

trace-ttl-check-interval: ${KAFKA_STORAGE_TRACE_TTL_CHECK_INTERVAL:3600000}
trace-flush-interval: ${KAFKA_STORAGE_TRACE_FLUSH_INTERVAL:3600000}
trace-ttl: ${KAFKA_STORAGE_TRACE_TTL:259200000}
trace-inactivity-gap: ${KAFKA_STORAGE_TRACE_INACTIVITY_GAP:30000}

Expand Down
12 changes: 6 additions & 6 deletions storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class KafkaStorage extends StorageComponent {
traceStoreTopology = new TraceStoreTopologySupplier(
spansTopicName,
autocompleteKeys,
builder.traceTtlCheckInterval,
builder.traceFlushInterval,
builder.traceTtl,
builder.minTracesStored).get();
dependencyStoreTopology = new DependencyStoreTopologySupplier(
Expand Down Expand Up @@ -342,7 +342,7 @@ public static class Builder extends StorageComponent.Builder {
List<String> autocompleteKeys = new ArrayList<>();

Duration traceTtl = Duration.ofDays(3);
Duration traceTtlCheckInterval = Duration.ofHours(1);
Duration traceFlushInterval = Duration.ofHours(1);
Duration traceInactivityGap = Duration.ofMinutes(1);
Duration dependencyTtl = Duration.ofDays(7);
Duration dependencyWindowSize = Duration.ofMinutes(1);
Expand Down Expand Up @@ -537,11 +537,11 @@ public Builder storeDirectory(String storeDirectory) {
/**
* Frequency to check retention policy.
*/
public Builder traceTtlCheckInterval(Duration traceTtlCheckInterval) {
if (traceTtlCheckInterval == null) {
throw new NullPointerException("traceTtlCheckInterval == null");
public Builder traceFlushInterval(Duration traceFlushInterval) {
if (traceFlushInterval == null) {
throw new NullPointerException("traceFlushInterval == null");
}
this.traceTtlCheckInterval = traceTtlCheckInterval;
this.traceFlushInterval = traceFlushInterval;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ public class TraceStoreTopologySupplier implements Supplier<Topology> {
// Kafka topics
final String spansTopicName;
// Limits
final List<String> autocompleteKeys;
final Duration tracesGcInterval;
final List<String> autoCompleteKeys;
final Duration tracesFlushInterval;
final Duration tracesTtl;
final long minTracesStored;
// SerDes
final SpansSerde spansSerde;
final SpanIdsSerde spanIdsSerde;
final NamesSerde namesSerde;

public TraceStoreTopologySupplier(String spansTopicName, List<String> autocompleteKeys,
Duration tracesGcInterval, Duration tracesTtl, long minTracesStored) {
public TraceStoreTopologySupplier(String spansTopicName, List<String> autoCompleteKeys,
Duration tracesFlushInterval, Duration tracesTtl, long minTracesStored) {
this.spansTopicName = spansTopicName;
this.autocompleteKeys = autocompleteKeys;
this.tracesGcInterval = tracesGcInterval;
this.autoCompleteKeys = autoCompleteKeys;
this.tracesFlushInterval = tracesFlushInterval;
this.tracesTtl = tracesTtl;
this.minTracesStored = minTracesStored;
spansSerde = new SpansSerde();
Expand Down Expand Up @@ -124,7 +124,7 @@ public TraceStoreTopologySupplier(String spansTopicName, List<String> autocomple
(KeyValueStore<Long, Set<String>>) context.getStateStore(SPAN_IDS_BY_TS_STORE_NAME);
// Retention scheduling
context.schedule(
tracesGcInterval,
tracesFlushInterval,
PunctuationType.STREAM_TIME,
timestamp -> {
if (tracesTtl.toMillis() > 0 &&
Expand Down Expand Up @@ -216,7 +216,7 @@ public void process(String traceId, List<Span> spans) {
}
if (!span.tags().isEmpty()) {
span.tags().forEach((key, value) -> {
if (autocompleteKeys.contains(key)) {
if (autoCompleteKeys.contains(key)) {
Set<String> values = autocompleteTagsStore.get(key);
if (values == null) values = new HashSet<>();
values.add(value);
Expand Down