forked from apache/kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams.html
342 lines (273 loc) · 18.2 KB
/
streams.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
<!--~
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~-->
<h3><a id="streams_overview" href="#streams_overview">9.1 Overview</a></h3>
<p>
Kafka Streams is a client library for processing and analyzing data stored in Kafka and either write the resulting data back to Kafka or send the final output to an external system. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state.
Kafka Streams has a <b>low barrier to entry</b>: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka's parallelism model.
</p>
<p>
Some highlights of Kafka Streams:
</p>
<ul>
<li>Designed as a <b>simple and lightweight client library</b>, which can be easily embedded in any Java application and integrated with any existing packaging, deployment and operational tools that users have for their streaming applications.</li>
<li>Has <b>no external dependencies on systems other than Apache Kafka itself</b> as the internal messaging layer; notably, it uses Kafka's partitioning model to horizontally scale processing while maintaining strong ordering guarantees.</li>
<li>Supports <b>fault-tolerant local state</b>, which enables very fast and efficient stateful operations like joins and windowed aggregations.</li>
<li>Employs <b>one-record-at-a-time processing</b> to achieve low processing latency, and supports <b>event-time based windowing operations</b>.</li>
<li>Offers necessary stream processing primitives, along with a <b>high-level Streams DSL</b> and a <b>low-level Processor API</b>.</li>
</ul>
<h3><a id="streams_developer" href="#streams_developer">9.2 Developer Guide</a></h3>
<p>
There is a <a href="#quickstart_kafkastreams">quickstart</a> example that provides how to run a stream processing program coded in the Kafka Streams library.
This section focuses on how to write, configure, and execute a Kafka Streams application.
</p>
<h4><a id="streams_concepts" href="#streams_concepts">Core Concepts</a></h4>
<p>
We first summarize the key concepts of Kafka Streams.
</p>
<h5><a id="streams_topology" href="#streams_topology">Stream Processing Topology</a></h5>
<ul>
<li>A <b>stream</b> is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a <b>data record</b> is defined as a key-value pair.</li>
<li>A stream processing application written in Kafka Streams defines its computational logic through one or more <b>processor topologies</b>, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).</li>
<li>A <b>stream processor</b> is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently producing one or more output records to its downstream processors.</li>
</ul>
<p>
Kafka Streams offers two ways to define the stream processing topology: the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a> provides
the most common data transformation operations such as <code>map</code> and <code>filter</code>; the lower-level <a href="#streams_processor"><b>Processor API</b></a> allows
developers define and connect custom processors as well as to interact with <a href="#streams_state">state stores</a>.
</p>
<h5><a id="streams_time" href="#streams_time">Time</a></h5>
<p>
A critical aspect in stream processing is the notion of <b>time</b>, and how it is modeled and integrated.
For example, some operations such as <b>windowing</b> are defined based on time boundaries.
</p>
<p>
Common notions of time in streams are:
</p>
<ul>
<li><b>Event time</b> - The point in time when an event or data record occurred, i.e. was originally created "at the source".</li>
<li><b>Processing time</b> - The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing time may be milliseconds, hours, or days etc. later than the original event time.</li>
</ul>
<p>
Kafka Streams assigns a <b>timestamp</b> to every data record
via the <code>TimestampExtractor</code> interface.
Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field
to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing,
thereby yielding processing-time semantics to stream processing applications.
Developers can thus enforce different notions of time depending on their business needs. For example,
per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and
are leveraged by time-dependent operations such as joins.
</p>
<h5><a id="streams_state" href="#streams_state">States</a></h5>
<p>
Some stream processing applications don't require state, which means the processing of a message is independent from
the processing of all other messages.
However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you
can join input streams, or group and aggregate data records. Many such stateful operators are provided by the <a href="#streams_dsl"><b>Kafka Streams DSL</b></a>.
</p>
<p>
Kafka Streams provides so-called <b>state stores</b>, which can be used by stream processing applications to store and query data.
This is an important capability when implementing stateful operations.
Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing.
These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure.
Kafka Streams offers fault-tolerance and automatic recovery for local state stores.
</p>
<br>
<p>
As we have mentioned above, the computational logic of a Kafka Streams application is defined as a <a href="#streams_topology">processor topology</a>.
Currently Kafka Streams provides two sets of APIs to define the processor topology, which will be described in the subsequent sections.
</p>
<h4><a id="streams_processor" href="#streams_processor">Low-Level Processor API</a></h4>
<h5><a id="streams_processor_process" href="#streams_processor_process">Processor</a></h5>
<p>
Developers can define their customized processing logic by implementing the <code>Processor</code> interface, which
provides <code>process</code> and <code>punctuate</code> methods. The <code>process</code> method is performed on each
of the received record; and the <code>punctuate</code> method is performed periodically based on elapsed time.
In addition, the processor can maintain the current <code>ProcessorContext</code> instance variable initialized in the
<code>init</code> method, and use the context to schedule the punctuation period (<code>context().schedule</code>), to
forward the modified / new key-value pair to downstream processors (<code>context().forward</code>), to commit the current
processing progress (<code>context().commit</code>), etc.
</p>
<pre>
public class MyProcessor extends Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
}
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase().split(" ");
for (String word : words) {
Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
while (iter.hasNext()) {
KeyValue<String, Integer> entry = iter.next();
context.forward(entry.key, entry.value.toString());
}
iter.close();
context.commit();
}
@Override
public void close() {
this.kvStore.close();
}
};
</pre>
<p>
In the above implementation, the following actions are performed:
<ul>
<li>In the <code>init</code> method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".</li>
<li>In the <code>process</code> method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).</li>
<li>In the <code>punctuate</code> method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.</li>
</ul>
</p>
<h5><a id="streams_processor_topology" href="#streams_processor_topology">Processor Topology</a></h5>
<p>
With the customized processors defined in the Processor API, developers can use the <code>TopologyBuilder</code> to build a processor topology
by connecting these processors together:
<pre>
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
.addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");
</pre>
There are several steps in the above code to build the topology, and here is a quick walk through:
<ul>
<li>First of all a source node named "SOURCE" is added to the topology using the <code>addSource</code> method, with one Kafka topic "src-topic" fed to it.</li>
<li>Three processor nodes are then added using the <code>addProcessor</code> method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.</li>
<li>Finally three sink nodes are added to complete the topology using the <code>addSink</code> method, each piping from a different parent processor node and writing to a separate topic.</li>
</ul>
</p>
<h5><a id="streams_processor_statestore" href="#streams_processor_statestore">Local State Store</a></h5>
<p>
Note that the Processor API is not limited to only accessing the current records as they arrive, but can also maintain local state stores
that keep recently arrived records to use in stateful processing operations such as aggregation or windowed joins.
To take advantage of this local states, developers can use the <code>TopologyBuilder.addStateStore</code> method when building the
processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created
local state store with the existing processor nodes through <code>TopologyBuilder.connectProcessorAndStateStores</code>.
<pre>
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "src-topic")
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// create the in-memory state store "COUNTS" associated with processor "PROCESS1"
.addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
// connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");
</pre>
</p>
In the next section we present another way to build the processor topology: the Kafka Streams DSL.
<h4><a id="streams_dsl" href="#streams_dsl">High-Level Streams DSL</a></h4>
To build a processor topology using the Streams DSL, developers can apply the <code>KStreamBuilder</code> class, which is extended from the <code>TopologyBuilder</code>.
A simple example is included with the source code for Kafka in the <code>streams/examples</code> package. The rest of this section will walk
through some code to demonstrate the key steps in creating a topology using the Streams DSL, but we recommend developers to read the full example source
codes for details.
<h5><a id="streams_dsl_source" href="#streams_dsl_source">Create Source Streams from Kafka</a></h5>
<p>
Either a <b>record stream</b> (defined as <code>KStream</code>) or a <b>changelog stream</b> (defined as <code>KTable</code>)
can be created as a source stream from one or more Kafka topics (for <code>KTable</code> you can only create the source stream
from a single topic).
</p>
<pre>
KStreamBuilder builder = new KStreamBuilder();
KStream<String, GenericRecord> source1 = builder.stream("topic1", "topic2");
KTable<String, GenericRecord> source2 = builder.table("topic3", "stateStoreName");
</pre>
<h5><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform a stream</a></h5>
<p>
There is a list of transformation operations provided for <code>KStream</code> and <code>KTable</code> respectively.
Each of these operations may generate either one or more <code>KStream</code> and <code>KTable</code> objects and
can be translated into one or more connected processors into the underlying processor topology.
All these transformation methods can be chained together to compose a complex processor topology.
Since <code>KStream</code> and <code>KTable</code> are strongly typed, all these transformation operations are defined as
generics functions where users could specify the input and output data types.
</p>
<p>
Among these transformations, <code>filter</code>, <code>map</code>, <code>mapValues</code>, etc, are stateless
transformation operations and can be applied to both <code>KStream</code> and <code>KTable</code>,
where users can usually pass a customized function to these functions as a parameter, such as <code>Predicate</code> for <code>filter</code>,
<code>KeyValueMapper</code> for <code>map</code>, etc:
</p>
<pre>
// written in Java 8+, using lambda expressions
KStream<String, GenericRecord> mapped = source1.mapValue(record -> record.get("category"));
</pre>
<p>
Stateless transformations, by definition, do not depend on any state for processing, and hence implementation-wise
they do not require a state store associated with the stream processor; Stateful transformations, on the other hand,
require accessing an associated state for processing and producing outputs.
For example, in <code>join</code> and <code>aggregate</code> operations, a windowing state is usually used to store all the received records
within the defined window boundary so far. The operators can then access these accumulated records in the store and compute
based on them.
</p>
<pre>
// written in Java 8+, using lambda expressions
KTable<Windowed<String>, Long> counts = source1.groupByKey().aggregate(
() -> 0L, // initial value
(aggKey, value, aggregate) -> aggregate + 1L, // aggregating value
TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
Serdes.Long() // serde for aggregated value
);
KStream<String, String> joined = source1.leftJoin(source2,
(record1, record2) -> record1.get("user") + "-" + record2.get("region");
);
</pre>
<h5><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back to Kafka</a></h5>
<p>
At the end of the processing, users can choose to (continuously) write the final resulted streams back to a Kafka topic through
<code>KStream.to</code> and <code>KTable.to</code>.
</p>
<pre>
joined.to("topic4");
</pre>
If your application needs to continue reading and processing the records after they have been materialized
to a topic via <code>to</code> above, one option is to construct a new stream that reads from the output topic;
Kafka Streams provides a convenience method called <code>through</code>:
<pre>
// equivalent to
//
// joined.to("topic4");
// materialized = builder.stream("topic4");
KStream<String, String> materialized = joined.through("topic4");
</pre>
<br>
<p>
Besides defining the topology, developers will also need to configure their applications
in <code>StreamsConfig</code> before running it. A complete list of
Kafka Streams configs can be found <a href="#streamsconfigs"><b>here</b></a>.
</p>