Skip to content
This repository was archived by the owner on Jul 31, 2023. It is now read-only.

Commit d87ad4b

Browse files
committed
draft
1 parent d96a588 commit d87ad4b

File tree

5 files changed

+316
-0
lines changed

5 files changed

+316
-0
lines changed

.gitignore

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,11 @@
2121

2222
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
2323
hs_err_pid*
24+
25+
# Eclipse
26+
.settings
27+
.project
28+
.classpath
29+
30+
# Maven
31+
target

pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<groupId>de.sfl.kafka.streams</groupId>
4+
<artifactId>ks-dynamic-windows</artifactId>
5+
<version>0.1.0-SNAPSHOT</version>
6+
<name>ks-dynamic-windows</name>
7+
8+
<properties>
9+
<maven.compiler.source>11</maven.compiler.source>
10+
<maven.compiler.target>11</maven.compiler.target>
11+
12+
<kafka.version>2.1.0</kafka.version>
13+
<lombok.version>1.18.4</lombok.version>
14+
<guava.version>27.0.1-jre</guava.version>
15+
</properties>
16+
17+
<dependencies>
18+
<dependency>
19+
<groupId>org.apache.kafka</groupId>
20+
<artifactId>kafka-streams</artifactId>
21+
<version>${kafka.version}</version>
22+
</dependency>
23+
24+
<dependency>
25+
<groupId>org.projectlombok</groupId>
26+
<artifactId>lombok</artifactId>
27+
<version>${lombok.version}</version>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>com.google.guava</groupId>
32+
<artifactId>guava</artifactId>
33+
<version>${guava.version}</version>
34+
</dependency>
35+
36+
</dependencies>
37+
38+
</project>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package de.sfl.kafka.streams;
2+
3+
import java.time.Duration;
4+
import java.util.Iterator;
5+
import java.util.NoSuchElementException;
6+
7+
import org.apache.kafka.streams.kstream.internals.WindowingDefaults;
8+
9+
import com.google.common.collect.Range;
10+
11+
import lombok.AccessLevel;
12+
import lombok.Getter;
13+
import lombok.RequiredArgsConstructor;
14+
15+
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
16+
@Getter
17+
public class DynamicWindows implements Iterable<Duration> {
18+
19+
private static final Duration DEFAULT_MINIMUM = Duration.ofSeconds(1L);
20+
21+
private static final Duration DEFAULT_ADVANCE = Duration.ofSeconds(1L);
22+
23+
private static final Duration DEFAULT_RETENTION = Duration
24+
.ofMillis(WindowingDefaults.DEFAULT_RETENTION_MS);
25+
26+
private final Duration minimum;
27+
28+
private final Duration maximum;
29+
30+
private final Duration advanceBy;
31+
32+
private final Duration gracePeriod;
33+
34+
private final Duration retentionTime;
35+
36+
public static DynamicWindows between(Duration minimum, Duration maximum) {
37+
return new DynamicWindows(minimum, maximum, DEFAULT_ADVANCE, null, DEFAULT_RETENTION);
38+
}
39+
40+
public static DynamicWindows maximum(Duration maximum) {
41+
return new DynamicWindows(DEFAULT_MINIMUM, maximum, DEFAULT_ADVANCE, null,
42+
DEFAULT_RETENTION);
43+
}
44+
45+
public DynamicWindows advanceBy(Duration advanceBy) {
46+
return new DynamicWindows(minimum, maximum, advanceBy, gracePeriod, retentionTime);
47+
}
48+
49+
public DynamicWindows gracePeriod(Duration gracePeriod) {
50+
return new DynamicWindows(minimum, maximum, advanceBy, gracePeriod, retentionTime);
51+
}
52+
53+
public DynamicWindows retentionTime(Duration retentionTime) {
54+
return new DynamicWindows(minimum, maximum, advanceBy, gracePeriod, retentionTime);
55+
}
56+
57+
@Override
58+
public Iterator<Duration> iterator() {
59+
return new DurationAdvanceIterator();
60+
}
61+
62+
public boolean isInRange(Duration duration) {
63+
var range = Range.closed(minimum, maximum);
64+
if (range.contains(duration)) {
65+
return duration.minus(minimum).toMillis() % advanceBy.toMillis() == 0;
66+
}
67+
68+
return false;
69+
}
70+
71+
private class DurationAdvanceIterator implements Iterator<Duration> {
72+
73+
private Duration currentElement = minimum;
74+
75+
@Override
76+
public boolean hasNext() {
77+
return currentElement.plus(advanceBy).compareTo(maximum) <= 0;
78+
}
79+
80+
@Override
81+
public Duration next() {
82+
currentElement = currentElement.plus(advanceBy);
83+
if (currentElement.compareTo(maximum) > 0) {
84+
throw new NoSuchElementException();
85+
}
86+
return currentElement;
87+
}
88+
89+
}
90+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package de.sfl.kafka.streams.state;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.stream.Collectors;
6+
import java.util.stream.StreamSupport;
7+
8+
import org.apache.kafka.common.utils.Bytes;
9+
import org.apache.kafka.streams.kstream.Windowed;
10+
import org.apache.kafka.streams.processor.ProcessorContext;
11+
import org.apache.kafka.streams.processor.StateStore;
12+
import org.apache.kafka.streams.state.KeyValueIterator;
13+
import org.apache.kafka.streams.state.StoreBuilder;
14+
import org.apache.kafka.streams.state.WindowStore;
15+
import org.apache.kafka.streams.state.WindowStoreIterator;
16+
17+
import de.sfl.kafka.streams.DynamicWindows;
18+
import lombok.RequiredArgsConstructor;
19+
20+
class DynamicWindowStore<K, V> implements WindowStore<K, V> {
21+
22+
private final String name;
23+
24+
private final DynamicWindows windows;
25+
26+
private ProcessorContext context;
27+
28+
private Map<Duration, WindowStore<K, V>> windowStores;
29+
30+
public DynamicWindowStore(String name, DynamicWindows windows) {
31+
this.name = name;
32+
this.windows = windows;
33+
}
34+
35+
Map<String, StoreBuilder<WindowStore<K, V>>> buildInternalStores() {
36+
return StreamSupport.stream(windows.spliterator(), false).collect(
37+
Collectors.toMap(this::buildInternalStoreName, this::createStoreBuilder));
38+
}
39+
40+
private StoreBuilder<WindowStore<K, V>> createStoreBuilder(Duration window) {
41+
// TODO Auto-generated method stub
42+
return null;
43+
}
44+
45+
private String buildInternalStoreName(Duration window) {
46+
return String.format("%s-%s", name, window);
47+
}
48+
49+
@Override
50+
public String name() {
51+
return name;
52+
}
53+
54+
@Override
55+
public void init(ProcessorContext context, StateStore root) {
56+
this.context = context;
57+
}
58+
59+
@Override
60+
public void flush() {
61+
// TODO Auto-generated method stub
62+
63+
}
64+
65+
@Override
66+
public void close() {
67+
// TODO Auto-generated method stub
68+
69+
}
70+
71+
@Override
72+
public boolean persistent() {
73+
// TODO Auto-generated method stub
74+
return false;
75+
}
76+
77+
@Override
78+
public boolean isOpen() {
79+
// TODO Auto-generated method stub
80+
return false;
81+
}
82+
83+
@Override
84+
public V fetch(K key, long time) {
85+
// TODO Auto-generated method stub
86+
return null;
87+
}
88+
89+
@Override
90+
public KeyValueIterator<Windowed<K>, V> all() {
91+
// TODO Auto-generated method stub
92+
return null;
93+
}
94+
95+
@Override
96+
public void put(K key, V value) {
97+
// TODO Auto-generated method stub
98+
99+
}
100+
101+
@Override
102+
public void put(K key, V value, long windowStartTimestamp) {
103+
// TODO Auto-generated method stub
104+
105+
}
106+
107+
@Override
108+
public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
109+
// TODO Auto-generated method stub
110+
return null;
111+
}
112+
113+
@Override
114+
public KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo) {
115+
// TODO Auto-generated method stub
116+
return null;
117+
}
118+
119+
@Override
120+
public KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo) {
121+
// TODO Auto-generated method stub
122+
return null;
123+
}
124+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package de.sfl.kafka.streams.state;
2+
3+
import org.apache.kafka.common.utils.Bytes;
4+
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
5+
import org.apache.kafka.streams.state.WindowStore;
6+
7+
public class DynamicWindowsStoreSupplier implements WindowBytesStoreSupplier {
8+
9+
@Override
10+
public String name() {
11+
// TODO Auto-generated method stub
12+
return null;
13+
}
14+
15+
@Override
16+
public WindowStore<Bytes, byte[]> get() {
17+
// TODO Auto-generated method stub
18+
return null;
19+
}
20+
21+
@Override
22+
public String metricsScope() {
23+
// TODO Auto-generated method stub
24+
return null;
25+
}
26+
27+
@Override
28+
public int segments() {
29+
// TODO Auto-generated method stub
30+
return 0;
31+
}
32+
33+
@Override
34+
public long segmentIntervalMs() {
35+
// TODO Auto-generated method stub
36+
return 0;
37+
}
38+
39+
@Override
40+
public long windowSize() {
41+
// TODO Auto-generated method stub
42+
return 0;
43+
}
44+
45+
@Override
46+
public boolean retainDuplicates() {
47+
return false;
48+
}
49+
50+
@Override
51+
public long retentionPeriod() {
52+
// TODO Auto-generated method stub
53+
return 0;
54+
}
55+
56+
}

0 commit comments

Comments
 (0)