-
Notifications
You must be signed in to change notification settings - Fork 120
[BUG] Question about Flink Event-time support. #615
Description
Describe the bug
I think I have found a problem when I tries to execute Flink's job with both pulsar-connector especially in flink's TimeCharacteristic.EventTime mode.
Why I think so is that I have tried same code with Kafka source It works well with the Event time window.
By the way with Pulsar connector, Flink doesn't seems to recognize its TimeCharacteristic is EventTime.
And Flink's dashboard says that it is not running in EventTime Window mode like below.
I have checked that timestamps that I'm creating with source data is normal like this.
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844581]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844581]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845321]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845321]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845307]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845307]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844954]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844954]
Could you please let me know whether if pulsar connector support watermarking code like mine ?
Here is the environment that I have tested so far.
- Flink ( version 1.14.5 )
- Pulsar Connector ( version 1.14.3.4)
- Event time processing code is under here.
public class StreamRealtime {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.getConfig().setGlobalJobParameters(parameterTool);
// Only in Eventtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
String adminServiceUrl = "http://10.96.77.102:31143";
String brokerServiceUrl = "pulsar://10.96.77.102:32543";
String inputTopic = "persistent://nds/nds/lcs-refined-topic";
int parallelism = 30;
Properties properties = new Properties();
properties.setProperty("topic", inputTopic);
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
brokerServiceUrl,
adminServiceUrl,
new SimpleStringSchema(),
properties
).setStartFromLatest();
DataStream<Tuple3<String, String, Long>> stream = env
.setParallelism( parallelism )
.addSource(source)
.setParallelism( parallelism )
.flatMap( LCSSTMSUrlMapperRefined.create() )
.setParallelism( parallelism );
//Event Timestamp
stream
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(200))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
Debug.sendUDP("timstamp : [" + element.f2 +"]\n");
return element.f2;
}
})
)
.setParallelism ( parallelism )
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply( LCSSTMSProcessor.create(0))
.setParallelism ( parallelism );
env.execute("Pulsar NDS Streaming");
}
}
To Reproduce
Steps to reproduce the behavior:
- Developed Flink eventtime aggregation code with pulsar-connector.
- Checked whether if message have valid timestamps. --> ( okay, It works)
- Checked whether if my code works in Flink's processing-time window and pulsar-connector after ... ( -->okay, It works)
- Removals of watermarking assigner and change to EventProcessing
- Changed Configuration from TimeCharacteristic.EventTime -> TimeCharacteristic.ProcessingTime
Expected behavior
- Flink's eventtime windows fired regularly.
Screenshots
Posted with description.
Additional context
