Skip to content

Commit d973c4a

Browse files
committed
Add first examples for Chapter 6
1 parent dd9d201 commit d973c4a

File tree

2 files changed

+310
-0
lines changed

2 files changed

+310
-0
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright 2015 Fabian Hueske / Vasia Kalavri
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.streamingwithflink.chapter6;
17+
18+
import io.github.streamingwithflink.util.SensorReading;
19+
import io.github.streamingwithflink.util.SensorSource;
20+
import org.apache.flink.api.common.state.ValueState;
21+
import org.apache.flink.api.common.state.ValueStateDescriptor;
22+
import org.apache.flink.api.common.typeinfo.Types;
23+
import org.apache.flink.api.java.tuple.Tuple2;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.streaming.api.TimeCharacteristic;
26+
import org.apache.flink.streaming.api.datastream.DataStream;
27+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28+
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
29+
import org.apache.flink.util.Collector;
30+
31+
/**
32+
* This example shows how to use a CoProcessFunction and Timers.
33+
*/
34+
public class CoProcessFunctionTimers {
35+
36+
public static void main(String[] args) throws Exception {
37+
38+
// set up the streaming execution environment
39+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
40+
41+
// use event time for the application
42+
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
43+
44+
// switch messages disable filtering of sensor readings for a specific amount of time
45+
DataStream<Tuple2<String, Long>> filterSwitches = env
46+
.fromElements(
47+
// forward readings of sensor_2 for 10 seconds
48+
Tuple2.of("sensor_2", 10_000L),
49+
// forward readings of sensor_7 for 1 minute
50+
Tuple2.of("sensor_7", 60_000L));
51+
52+
// ingest sensor stream
53+
DataStream<SensorReading> readings = env
54+
// SensorSource generates random temperature readings
55+
.addSource(new SensorSource());
56+
57+
DataStream<SensorReading> forwardedReadings = readings
58+
// connect readings and switches
59+
.connect(filterSwitches)
60+
// key by sensor ids
61+
.keyBy(r -> r.id, s -> s.f0)
62+
// apply filtering CoProcessFunction
63+
.process(new ReadingFilter());
64+
65+
forwardedReadings.print();
66+
67+
env.execute("Filter sensor readings");
68+
}
69+
70+
public static class ReadingFilter extends CoProcessFunction<SensorReading, Tuple2<String, Long>, SensorReading> {
71+
72+
// switch to enable forwarding
73+
private ValueState<Boolean> forwardingEnabled;
74+
// timestamp to disable the currently active timer
75+
private ValueState<Long> disableTimer;
76+
77+
@Override
78+
public void open(Configuration parameters) throws Exception {
79+
forwardingEnabled = getRuntimeContext().getState(
80+
new ValueStateDescriptor<>("filterSwitch", Types.BOOLEAN));
81+
disableTimer = getRuntimeContext().getState(
82+
new ValueStateDescriptor<Long>("timer", Types.LONG));
83+
}
84+
85+
@Override
86+
public void processElement1(SensorReading r, Context ctx, Collector<SensorReading> out) throws Exception {
87+
// check if we need to forward the reading
88+
Boolean forward = forwardingEnabled.value();
89+
if (forward != null && forward) {
90+
out.collect(r);
91+
}
92+
}
93+
94+
@Override
95+
public void processElement2(Tuple2<String, Long> s, Context ctx, Collector<SensorReading> out) throws Exception {
96+
// enable forwarding of readings
97+
forwardingEnabled.update(true);
98+
// set timer to disable switch
99+
long timerTimestamp = ctx.timerService().currentProcessingTime() + s.f1;
100+
Long curTimerTimestamp = disableTimer.value();
101+
if (curTimerTimestamp == null || timerTimestamp > curTimerTimestamp) {
102+
// remove current timer
103+
if (curTimerTimestamp != null) {
104+
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
105+
}
106+
// register new timer
107+
ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
108+
disableTimer.update(timerTimestamp);
109+
}
110+
}
111+
112+
@Override
113+
public void onTimer(long ts, OnTimerContext ctx, Collector<SensorReading> out) throws Exception {
114+
// remove all state
115+
forwardingEnabled.clear();
116+
disableTimer.clear();
117+
}
118+
}
119+
}
120+
121+
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Copyright 2015 Fabian Hueske / Vasia Kalavri
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.streamingwithflink.chapter6;
17+
18+
import io.github.streamingwithflink.util.SensorReading;
19+
import io.github.streamingwithflink.util.SensorSource;
20+
import io.github.streamingwithflink.util.SensorTimeAssigner;
21+
import org.apache.flink.api.common.ExecutionConfig;
22+
import org.apache.flink.api.common.state.ValueState;
23+
import org.apache.flink.api.common.state.ValueStateDescriptor;
24+
import org.apache.flink.api.common.typeinfo.Types;
25+
import org.apache.flink.api.common.typeutils.TypeSerializer;
26+
import org.apache.flink.api.java.tuple.Tuple4;
27+
import org.apache.flink.streaming.api.TimeCharacteristic;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
30+
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
31+
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
32+
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
33+
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
34+
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
35+
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
36+
import org.apache.flink.util.Collector;
37+
38+
import java.util.Collection;
39+
import java.util.Collections;
40+
41+
public class CustomWindow {
42+
43+
public static void main(String[] args) throws Exception {
44+
45+
// set up the streaming execution environment
46+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
47+
48+
// checkpoint every 10 seconds
49+
env.getCheckpointConfig().setCheckpointInterval(10_000);
50+
51+
// use event time for the application
52+
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
53+
// configure watermark interval
54+
env.getConfig().setAutoWatermarkInterval(1000L);
55+
56+
// ingest sensor stream
57+
DataStream<SensorReading> sensorData = env
58+
// SensorSource generates random temperature readings
59+
.addSource(new SensorSource())
60+
// assign timestamps and watermarks which are required for event time
61+
.assignTimestampsAndWatermarks(new SensorTimeAssigner());
62+
63+
DataStream<Tuple4<String, Long, Long, Integer>> countsPerThirtySecs = sensorData
64+
.keyBy(r -> r.id)
65+
// a custom window assigner for 30 seconds tumbling windows
66+
.window(new ThirtySecondsWindows())
67+
// a custom trigger that fires early (at most) every second
68+
.trigger(new OneSecondIntervalTrigger())
69+
// count readings per window
70+
.process(new CountFunction());
71+
72+
countsPerThirtySecs.print();
73+
74+
env.execute("Run custom window example");
75+
}
76+
77+
/**
78+
* A custom window that groups events in to 30 second tumbling windows.
79+
*/
80+
public static class ThirtySecondsWindows extends WindowAssigner<Object, TimeWindow> {
81+
82+
long windowSize = 30_000L;
83+
84+
@Override
85+
public Collection<TimeWindow> assignWindows(Object e, long ts, WindowAssignerContext ctx) {
86+
87+
// rounding down by 30 seconds
88+
long startTime = ts - (ts % windowSize);
89+
long endTime = startTime + windowSize;
90+
// emitting the corresponding time window
91+
return Collections.singletonList(new TimeWindow(startTime, endTime));
92+
}
93+
94+
@Override
95+
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
96+
return EventTimeTrigger.create();
97+
}
98+
99+
@Override
100+
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
101+
return new TimeWindow.Serializer();
102+
}
103+
104+
@Override
105+
public boolean isEventTime() {
106+
return true;
107+
}
108+
}
109+
110+
/**
111+
* A trigger thet fires early. The trigger fires at most every second.
112+
*/
113+
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
114+
115+
@Override
116+
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
117+
// firstSeen will be false if not set yet
118+
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
119+
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
120+
121+
// register initial timer only for first element
122+
if (firstSeen.value() == null) {
123+
// compute time for next early firing by rounding watermark to second
124+
long t = ctx.getCurrentWatermark() + (1000 - (ctx.getCurrentWatermark() % 1000));
125+
ctx.registerEventTimeTimer(t);
126+
// register timer for the end of the window
127+
ctx.registerEventTimeTimer(w.getEnd());
128+
firstSeen.update(true);
129+
}
130+
// Continue. Do not evaluate window per element
131+
return TriggerResult.CONTINUE;
132+
}
133+
134+
@Override
135+
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
136+
if (ts == w.getEnd()) {
137+
// final evaluation and purge window state
138+
return TriggerResult.FIRE_AND_PURGE;
139+
} else {
140+
// register next early firing timer
141+
long t = ctx.getCurrentWatermark() + (1000 - (ctx.getCurrentWatermark() % 1000));
142+
if (t < w.getEnd()) {
143+
ctx.registerEventTimeTimer(t);
144+
}
145+
// fire trigger to early evaluate window
146+
return TriggerResult.FIRE;
147+
}
148+
}
149+
150+
@Override
151+
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
152+
// Continue. We don't use processing time timers
153+
return TriggerResult.CONTINUE;
154+
}
155+
156+
@Override
157+
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
158+
// clear trigger state
159+
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
160+
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
161+
firstSeen.clear();
162+
}
163+
}
164+
165+
/**
166+
* A window function that counts the readings per sensor and window.
167+
* The function emits the sensor id, window end, tiem of function evaluation, and count.
168+
*/
169+
public static class CountFunction
170+
extends ProcessWindowFunction<SensorReading, Tuple4<String, Long, Long, Integer>, String, TimeWindow> {
171+
172+
@Override
173+
public void process(
174+
String id,
175+
Context ctx,
176+
Iterable<SensorReading> readings,
177+
Collector<Tuple4<String, Long, Long, Integer>> out) throws Exception {
178+
// count readings
179+
int cnt = 0;
180+
for (SensorReading r : readings) {
181+
cnt++;
182+
}
183+
// get current watermark
184+
long evalTime = ctx.currentWatermark();
185+
// emit result
186+
out.collect(Tuple4.of(id, ctx.window().getEnd(), evalTime, cnt));
187+
}
188+
}
189+
}

0 commit comments

Comments
 (0)