-
Notifications
You must be signed in to change notification settings - Fork 260
/
Copy pathFlink 原理.txt
1350 lines (1025 loc) · 70.3 KB
/
Flink 原理.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--------------Flink 原理 -----------------
1.flink是怎样保证exact-once(最重要) !
2.flink的checkpoint机制与恢复?
为甚是要插入barrier来做checkpoint,而不是在不同算子的定时器来做checkpoint?
3:flink的 state介绍一下 ,状态管理 和容错机制,flink哪里用到了状态,用户怎么使用?
4.你在使用中怎样使用flink的?
5.源码是否已经阅读?
6.flink中其中一个节点宕机之后,怎样恢复的??
7.flink 安全认证
8:flink怎样解决乱序问题?
9:流批系统的区别和各自优缺点?
10:Flink JobManager的HA(高可用)原理分析
11.flink与storm对比怎么样?为什么要从storm迁移到flink上?
flink与spark对比
12:flink程序常见问题分析和调优?
13:现有开发的程序有什么不足?
14:flink 时间+水印+窗口 关系?什么时候会触发计算? 窗口流程涉及知识点梳理
15:flink on yarn 原理
16:批流是怎样统一的?
17:Flink JobManagerHA ?
18:Kappa架构 && Lambda架构
19:程序优化
20:Chandy-Lamport算法
21:Flink实现风控系统
22:Flink-CEP
23 说一下flink 常用的窗口函数
24:业务场景是否需要引入实时系统
25:Streaming System 灵魂四问
26:Flink Trigger 如何自定义触发器?
27:什么是flink侧输出 Side Output?
28:说一下Flink Runtime 的作业执行的核心机制
29:如何分析和处理Flink反压?
30:Flink的网络协议栈
31:Flink集群监控 ?
32:Flink 异步IO ?
Flink 维表关联方案对比
33:flink savepoint?
34:Apache Flink快照(snapshot)原理
35:基于Flink实现的商品实时推荐系统 https://gitee.com/zhouyouwei/flink-recommandSystem-demo
36:Flink监控:如何实现一个某种数据缺失半小时触发报警的监控
37:Flink广播变量与累积加器的区别?
38:broadcast如何实现实时更新维表? 常问
------------------------------------------------
state checkpoint time window watermark processFunction
join trigger sideoutput sink source asyncio
table sql cep
------------- 疑惑--------
增加水印时间(数据允许延迟的最大时间 maxLaggedTime) ,缓解计算压力?
等于延长窗口被触发的时间,程序计算的间隔延长
水印5->120 算过来了 5->180算不过来?
AssignerWithPeriodicWatermarks,定时抽取更新
使用的是AssignerWithPunctuatedWatermarks,每一次数据局来都会抽取更新,更精确一点,但是频繁的更新wartermark会比较影响性能
说明触发窗口计算的单个slot算子没问题,问题也不是出在前面的打标签(是否外地车,省内外地,省外,外地长期)上这些计算,
问题在于集群CPU负载高,集群上Solr查询频繁GC导致的
数据被丢弃的问题: 数据质量导致的,数据延迟/乱序导致窗口提前被触发
推测:未进行keyBy,给的资源不足时,统计的数值被偏少 比如,1分钟流量 正常早晚高峰3w5+ 白天平常2w7左右
结果少,是数据还没进来完就被触发,输出了
另一个原因是:采集设备获取的异常数据(过车时间提前几分钟) 这种数据会提前触发窗口计算,
后面正常时间的数据还没进来,窗口已经闭合了
----------------------------------------
1.flink是怎样保证端到端exactly-once(精确一次)?
exactly-once语义就是保证 最后数据处理的结果和数据摄入时没有数据的丢失与重复
Flink主要是靠两个机制实现: checkpoint(检查点) 和 TwoPhaseCommitSinkFunction(两阶段提交协议)
checkpoint保证flink内部的exactly-once,不会进行重复计算
flink的checkpoint包含了flink此时的状态 和 数据输入流的位置(kafka的offset),checkpoint
会异步的持久化到hdfs上,如果flink应用失败或者升级时,可以拉取checkpoint中的状态来恢复上次失败时的数据
--- before ---
flink1.4版本之前实现了 输出结果是exactly-once
1.4之前的实现思路:
Flink的基本思路就是将state周期地checkpiont到hdfs中去,当发生failure的时候failover上一次的状态
然后将输出update到外部(注意:输入流offset也是状态的一部分),如果发生错误 就能从最后一次状态恢复
--- after ---
flink1.4版本之后实现了 端到端的exactly-once,即输入和输出是一一对应的 没有丢失和重复
需要kafka_0.11及以上版本(原因kafka_0.11开始支持事务)
1.4发布后: 两阶段提交的sink function(TwoPhaseCommitSinkFunction)
两次提交来保证语义的方式需要flink所连接的外部系统(kafka等消息系统)支持两步提交,也就是外部系统
需要支持可以预提交和回滚没有最终提交的数据 的特性
------------------两阶段提交-------------------------
具体实现:
两阶段提交协议 && 预提交
在一个分布式且含有多个并发执行sink的应用中
单次提交或回滚不够,需要所有组件都必须对这些提交或回滚达成共识,这样才能得到一致性的结果
step1:两步提交协议的第一步是预提交
flink的jobmanager会在数据流中插入一个检查点的标记(这个标记可以用来区别这次checkpoint的数据和下次checkpoint的数据)
这个标记会在整个DAG中传递.每个DAG中的算子遇到这个标记就会触发这个算子状态的快照
step2:读取kafka的算子,在遇到检查点标记时会存储kafka的offset.
之后会把这个检查点标记传到下一个算子 接下来就到了flink的内存操作算子这些 内部算子 就不用考虑两步提交协议了
因为他们的状态会随着flink整体的状态来更新或者回滚
step3:到了和外部系统打交道的时候,就需要 两步提交协议 来保证数据不丢失不重复了.
在 预提交 这个步骤下,所有向kafka提交的数据都是预提交
step4:当所有算子的快照完成,也就是这次的checkpoint完成时,flink的 jobmanager 会向所有算子发通知说这次checkpoint完成.
flink负责向kafka写入数据的算子也会正式提交之前写操作的数据.
在任务运行中的任何阶段失败,都会从上一次的状态恢复,所有没有正式提交的数据也会回滚
总结一下flink的两步提交:
正常计算,输出到kafka-->预提交. 当所有算子都完成他们的快照时,进行正式提交操作
当任意子任务在预提交阶段失败时,其他任务立即停止,并回滚到上一次成功快照的状态
在预提交状态成功后,外部系统需要完美支持正式提交之前的操作.
如果有提交失败发生,整个flink应用会进入失败状态并重启,重启后将会继续从上次状态来尝试进行提交操作.
2.flink的checkpoint机制与恢复,failover(故障转移/切换)机制 ?
what checkpoint:(一致的检查点)
简单来说就是flink为了达到容错和exactly-once语义的功能,
定期把state持久化下来,这一持久化的过程就叫checkpoint.
checkpoint是flink在某一时刻全局状态的快照.
错误恢复机制:(Fault Tolerance)
Apache Flink 会利用State记录计算的状态,在Failover时候Task会根据State进行恢复
容错,在故障发生后自动检测 并使系统自动恢复正常运行状态
流计算Fault Tolerance的一个很大的挑战是低延迟:
很多Apache Flink任务都是7 x 24小时不间断,端到端的秒级延迟
要想在遇上网络闪断,机器坏掉等非预期的问题时候快速恢复正常,并且不影响计算结果正确性是一件极其困难的事情
实现机制:
Apache Flink的Fault Tolerance机制核心是持续创建分布式流数据及其状态的快照
以Checkpointing的机制进行容错,Checkpointing会产生类似binlog一样的、可以用来恢复任务状态的数据文件
算法逻辑:
在Apache Flink中采用以在流信息中插入barrier的方式完成DAG中异步快照
核心逻辑:
a:barrier 由source节点发出;
b:barrier会将流上event切分到不同的checkpoint中;
c:汇聚到当前节点的多流的barrier要对齐;
d:barrier对齐之后会进行Checkpointing,生成snapshot;
e:完成snapshot之后向下游发出barrier,继续直到Sink节点;
这样在整个流计算中以barrier方式进行Checkpointing,随着时间的推移,整个流的计算过程中按时间顺序不断的进行Checkpointing
生成的snapshot会存储到StateBackend中,这样在进行Failover时候,从最后一次成功的checkpoint进行恢复
checkpointing 控制?
随这时间推移Flink程序不断的做Checkpointing,不断的产生snapshot存储到Statebackend中
-------
QA:某一刻任务执行失败,下一刻怎样完全恢复,重新回到失败的的时间点,任务接着跑?
A:Source的偏移量位置 offset
B:当时已经进入flink的数据
C:操作状态的数据
#flink会通过定期做checkpoint来实现A B
how checkpoint ?
在流数据中增加一个标记数据记录,barrier栅栏
barrier数据将流数据分割成多份,每份对应一次checkpoint操作,checkpoint会保留每一次的offset信息
三分钟2个barrier,生成三个checkpoint
流:
当barrier标记从source上游流向sink下游,在接到sink端的确认消息后,此checkpoint完成
如果涉及到多个input的输入时,处理快的barrier流会等待其他流,直到它们的barrier信息到达,然后在一起往下游传输数据
#flink 使用state来实现 中间状态数据
用户可以自定义状态持久化操作,然后在应用在重新启动时,从外部存储中重新恢复状态数据
一般情况下,为了保证状态数据的一致性,checkpoint 状态数据 就是同步的过程
flink实现异步状态同步方式,实现方式:拷贝原状态的数据,然后用异步线程去持久化拷贝的那份状态
为了防止每次copy重复的状态数据,flink实现了增量的checkpoint
--------------------------
全量checkpoint && 增量checkpoint
--------------------------
checkpoint的实现原理:Chandy-Lamport(前dei-兰伯特) 算法
将流计算看作成一个流式的拓扑,定期在这个拓扑的头部source点开始插入特殊的barriers(栅栏)
从上游开始不断向下游广播这个Barriers.
每一个节点收到所有的barriers,会将state做一次snapshot(快照)
当每个节点都做完Snapshot之后,整个拓扑就算做完一次checkpoint
接下来不管出现任何故障,都会从最近的checkpoint进行恢复
Flink用这套算法,保证了强一致性的语义,
也是Flink区别于其他无状态流计算引擎的核心区别
Flink引入Chandy-Lamport为程序提供一致性语义
--------------------------
3:flink的 state介绍一下 ,状态管理 和容错机制,flink哪里用到了状态,用户怎么使用?
Flink怎样判断一个任务是否是有状态的?
指的是flink内部的State的概念,不单单是指Event->State这样比较固定的概念
而是指任务运行间的数据信息,这些状态数据在容错恢复及checkpoint时将起到很关键作用
State的类型是怎样划分的? State的序列化内容?
flink提供状态api 最底层
why need 管理状态?
流式作业的特点是7*24小时运行,数据不重复消费,不丢失,保证只计算一次
数据实时出结果不延迟
---------------------------
无状态处理 && 有状态处理
如果处理一个事件(或一条数据)的结果只跟事件本身的内容有关,称为无状态处理.
一个无状态的计算程序挂掉了,那么它在内存中的状态都会丢失,所有数据都会重新计算
反之结果还和之前处理过的事件有关,称为有状态处理
Flink引入state和checkpoint实现,通过周期性做checkpoint来实现容错和恢复
---------------------------
------- State 状态介绍--------
简单举例,Flink的Count聚合计算每次触发计算是将历史上所有流入的数据
重新计算一次,还是在上一次的计算结果上增量计算?
A:增量计算
那么上一次计算的结果保存在哪里,内存中可以吗?
如果保存到内存中 由于网络硬件等原因造成某个计算节点失败的情况下,上一次计算结果会丢失
在节点恢复的时候,就需要将历史上所有数据(可能十几天,上百天的数据)重新计算一次
为避免这种情况,flink 会利用State存储计算结果
State是指流计算过程中计算节点的 中间计算结果 或 元数据属性
在aggregation过程中要在state中记录中间聚合结果
Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)
所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照
Why need State?
1:与批计算相比,State是流计算特有的,批计算没有failover机制,要么成功,要么重新计算
2:流计算在 大多数场景 下都是增量计算,数据逐条处理(大多数场景),每次计算是在上一次计算结果之上进行处理的
这样的机制势必要将上一次的计算结果进行存储(生产模式要持久化),
3:另外由于 机器/网络/脏数据等原因导致的程序错误
在重启job时候需要从成功的检查点进行state的恢复。增量计算,Failover这些机制都需要state的支撑
where need state?
去重: 比如上游的系统数据可能会有重复,落到下游系统时希望把重复的数据都去掉
去重需要把所有的主键都记录下来,当一条数据到来后,能够看到在主键当中是否存在
窗口计算: 比如统计每分钟 Nginx 日志 API 被访问了多少次,需要先在窗口中缓存一分钟内所有数据
未触发的窗口数据也是一种状态
机器/深度学习: 多次用到同一个数据集
访问历史数据: 比如与昨天的数据进行对比,需要访问一些历史数据
如果每次都从外部去读,对资源的消耗比较大,所以希望将昨天的数据也放到状态中
-----------------------------
状态管理:
State和checkpoint的存储方式: MemoryStateBackend FsStateBackend RockDBStateBackend
如果数据量较小:
可以存放到MemoryStateBackend和FsStateBackend中(内置)
如果数据量特别大:
可以存放到RockDBStateBackend(需要引入jar)
区别:
MemoryStateBackend: Checkpoint数据直接传递给Master节点
FsStateBackend: Checkpoint将数据写到文件中,将文件的路径传给master
RockDBStateBackend: 同FsStateBackend
Why need stage manage:
管理状态最直接的方式就是将数据都放到内存中,这也是很常见的做法
比如:做wordcount时,word作为input,count作为out,计算过程中不断把输入累加到count.
but流式作业有以下要求:
7*24小时运行,高可靠
数据不丢不重,exactly-once精确一次
数据实时产出,不延迟
基于以上要求内存的管理就会出现一些问题:
内存有容量限制,可能会出现内存不足
作业是 7*24,需要保障高可用,机器若出现故障或者宕机,
需要考虑如何备份及从备份中去恢复,保证运行的作业不受影响
考虑横向扩展,流量加大 添加节点横向扩展时,数据的状态怎样平分到新节点问题
理想的状态管理:
易用: 丰富的数据结构
多样的状态组织形式
简洁的扩展接口
高效: 读写快,恢复快
可以方便地横向扩展
备份不影响处理性能
可靠: 持久化
不丢不重
容错能力
---------------Flink状态类型--------------
1:Managed State & Raw State
Managed State 是Flink自动管理的State
Raw State是原生态State
两者区别
状态管理方式:
Managed State由 Flink Runtime 管理,自动存储,自动恢复,在内存管理上有优化
而Raw State需要用户自己管理,需要自己序列化,
Flink不知道State中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构
状态数据结构:
Managed State 支持已知的数据结构,如 Value、List、Map 等
而Raw State只支持字节数组,所有状态都要转换为二进制字节数组才可以
推荐使用场景:
Managed State 大多数情况下均可使用,而Raw State需要自定义Operator时,推荐使用 Raw State
#总结:大部分情况下Managed State都能满足要求,极端情况需要自己实现Raw State
2:Managed State又分为Keyed State(键控状态) & Operator State(算子状态)
Keyed State: 键控状态
键控状态是根据input数据流中定义的键(key)来维护和访问的
Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到统一算子任务中
#具有相同key的所有数据都会访问相同的状态#
在Flink Stream模型中,Datastream 经过keyBy()的操作可以变为 KeyedStream
Keyed State只能用在KeyedStream的算子中,即在整个程序中没有keyBy的过程就没有办法使用KeyedStream
在访问上,Keyed State 通过 RuntimeContext 访问,这需要Operator(算子)是一个Rich Function
在数据结构上,Keyed State支持的数据结构,比如ValueState/ListState/ReducingState/AggregatingState 和 MapState
Operator State: 算子状态
算子状态的作用范围限定为算子任务.这意味着由 #同一并行任务 所处理的数据都可以访问到相同的状态#
在一个operator上,可能会有很多个key,从而对应多个keyed state
Operator State 可以用于所有算子,常用于 Source
比如,Flink中的Kafka Connector,就使用了operator state
它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射
在访问上:Operator State需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口
在数据结构上,Operator State 支持的数据结构相对较少,如 ListState
------------------------------------------
flink机制自己用到的:
Window 中的的元素同样是通过 State 进行维护,对用户是隐藏的 !!! 然后由 Checkpoint 机制保证 Exactly Once 语义
Kafka Connector 是在 Flink 中使用 Operator State 的一个示例:维护topic 分区和offset的映射到Operator State
-----------------------------
为什么不使用list 存所有数据/计数,而要用state :
ArrayList存数据,首先是无状态的,另外大数据量会撑爆内存
State可以通过状态后端管理,数据存储到别处 出错可恢复
----------------------------
使用示例:
private var listKakouRocord:ListState[Event] = _
...
listKakouRocord = getRuntimeContext.getListState(
new ListStateDescriptor[Event]("listState",createTypeInformation[Event])
)
...
add/get/clear
------------------------------------------
状态后端(State Backends):
-------------------容错机制和故障恢复-----------------------
State如何保存及恢复
Flink 状态保存主要依靠 Checkpoint 机制,Checkpoint会定时制作 分布式快照,对程序中的状态进行备份
当作业发生故障了如何去恢复?
假如作业分布跑在3台机器上.其中一台挂了,这个时候需要把 进程或者线程 移到active 的2台机器上
此时还需要将整个作业的所有Task都回滚到最后一次成功Checkpoint中的状态,然后从该点开始继续处理
如果要从 Checkpoint 恢复,必要条件是 数据源需要支持数据重新发送.
Checkpoint恢复后,Flink 提供两种一致性语义,一种是恰好一次,一种是至少一次
在做Checkpoint时,可根据 Barries 对齐来判断是恰好一次还是至少一次,如果Barries对齐则为恰好一次,没有对齐 即为至少一次
如果只有一个上游,也就是说 Barries 是不需要对齐的的
如果只有一个 Checkpoint 在做,不管什么时候从 Checkpoint 恢复,都会恢复到刚才的状态 -->精确一次
如果有多个上游,假如一个上游的 Barries 到了,另一个 Barries 还没有来,
如果这个时候对状态进行快照,那么从这个快照恢复的时候其中一个上游的数据可能会有重复 -->至少一次
---------------------------------------------------
checkpoint && savepoint
一方面Flink在Cancel 时允许在外部介质保留Checkpoint;另一方面Flink 还有另外一个机制是 SavePoint
Savepoint 与 Checkpoint类似,同样是把状态存储到外部介质,当作业失败时,可以从外部恢复.
Savepoint与Checkpoint有什么区别呢?
从触发管理上: Checkpoint由Flink自动触发管理,而Savepoint由用户手动触发并人工管理
从用途上: Checkpoint 在 Task 发生异常时快速恢复, 例如网络抖动或超时异常.
而 Savepoint 有计划地进行备份,使作业能停止后再恢复, 例如修改代码、调整并发
从特点上: Checkpoint 比较轻量级,作业出现问题会自动从故障中恢复,在作业停止后默认清除
Savepoint 比较持久,以标准格式存储,允许代码或配置发生改变,恢复需要启动作业手动指定一个路径恢复
-----------------------------------------------
4:你在使用中怎样使用flink的?
实时ETL(尾号限行,车辆轨迹)
实时数据分析(30s流量统计 高频车 重点路段/区域 流量/拥堵 在途车辆归属地分析)
实时预警(套牌车,流量突发预警,拥堵突发预警)
警情gps实时分析:定位信息,在线时长,油耗,里程数
停车场实时分析:饱和度排行,僵尸车判定
kafka --> flink --> hbase(phoenix) 切换到 kafka --> flink --> kafka-->hbase(phoenix)
数据进行聚合计算后 不是立即持久化到存储 还是先Sink到Kafka
原因:通过Kafka对数据落盘进行缓冲,减少直接写存储可能带来的阻塞,让流计算程序更稳定,降低背压的产生概率
7:flink安全认证
使用的kerberos+truststore
Flink证书过期 重新执行下
查看证书过期时间 可以通过/opt/huawei/hadoopclient/Flink/flink/test 查看 flink.keystore && flink.truststore
这两个文件生成的时间(有效期3个月)
------------------------------------flink证书问题过期-------------------------------
如何生成truststore: 生成在conf下
进入flink客户端的bin目录下
sh generate_keystore.sh XXXXXX
出现几个警告,然后提示:generate keystore/truststore success.
#生成到/opt/huawei/hadoopclient/Flink/flink/conf目录下,同时flink-conf.yaml文件会自动填充上对应参数
将这两个文件添加到/opt/huawei/hadoopclient/Flink/flink/test中
下载kerberos秘钥 放到服务器,在flink-conf.yaml 文件中把路径添加上 用户加上
security.kerberos.login.keytab: /opt/xxx/keytab_xxx/user.keytab
security.kerberos.login.principal: xxx
注意:文件需要分配权限
[root@host-mn02]# chmod 777 krb5.conf
[root@host-mn02]# chmod 777 user.keytab
修改flink-conf.yaml配置文件中 指定认证文件:
security.ssl.truststore: test/flink.truststore
security.kerberos.login.keytab: /opt/xxx/xxx/user.keytab
security.kerberos.login.principal: xxx
security.ssl.key-password: XXXXXX
security.ssl.keystore-password: XXXXXX
security.ssl.keystore: test/flink.keystore
8:flink怎样解决乱序问题?
使用事件时间+水位线+窗口 详见14
通过watermark对数据重排序,来保证整体数据流的有序性
每当我们每接收到一份数据到buffer中时,我们选定其中最新的watermark值,对buffer里数据的时间小于此watermark值的数据在buffer中做一个排序.然后将此排序好的数据发向下游,因为是排好序的,所以窗口收到15:00的数据时,就知道不会有之前的数据在进来,所以水印可以作为触发计算的标识
参考:https://blog.csdn.net/Androidlushangderen/article/details/85058701
9:流批系统的区别和各自优缺点?
流系统的一大优势就是低延迟,批处理优势是错误恢复容易
批处理任务在每次的批处理操作中都会保存住全部的输入数据,如果出现算错的情况,重新执行一次处理过程即可,没有failover机制
而流式计算中连续不断的数据处理,使得错误恢复变得复杂,Flink的错误恢复机制依靠 CheckPoint来可以实现
某一刻任务执行失败,下一刻怎样完全恢复,重新回到失败的的时间点,任务接着跑?
A:Source的偏移量位置 offset
B:当时已经进入flink的数据
C:操作状态的数据
#flink会通过定期做checkpoint来实现A B
how checkpoint ?
在流数据中增加一个标记数据记录,barrier栅栏
barrier数据将流数据分割成多份,每份对应一次checkpoint操作,checkpoint会保留每一次的offset信息
三分钟2个barrier,生成三个checkpoint
流:
当barrier标记从source上游流向sink下游,在接到sink端的确认消息后,此checkpoint完成
如果涉及到多个input的输入时,处理快的barrier流会等待其他流,直到它们的barrier信息到达,然后在一起往下游传输数据
#flink 使用state来实现 中间状态数据
用户可以自定义状态持久化操作,然后在应用在重新启动时,从外部存储中重新恢复状态数据
一般情况下,为了保证状态数据的一致性,checkpoint 状态数据 就是同步的过程
flink实现异步状态同步方式,实现方式:拷贝原状态的数据,然后用异步线程去持久化拷贝的那份状态
为了防止每次copy重复的状态数据,flink实现了增量的checkpoint
10:Flink JobManager的HA(高可用)原理分析
Flink的JobManager的HA 跟HDFS的HA相比 不太一样,并不仅仅是主从切换
HDFS的HA切换,主要是为了保证数据请求处理的正常服务
Flink要让所有的失败任务能够快速回复
即:一个是存储系统的HA实现 一个是计算框架的HA实现
Flink的JobManager在服务发生切换时(出现故障)要及时的通知外界事物:
JobManager管理的多个TaskManager
在运行的所有Job
在请求的JobClient客户端
这些TaskManager,Job,JobClient收到新的leader信息,能够主动重连新的JobManager地址
源码调用过程:
Flink内部定义2类服务做HA时的领导选举和消息通知:
LeaderElectionService
LeaderRetrievalService 监听端口
LeaderRetrievalListener监听接口
在LeaderElectionService服务的实现中,是采用Apache Curator框架中的LeaderLatch来做领导选举的
新的leader选出来以后,LeaderRetrievalService服务会第一时间得到通知,然后提取出新的leader地址
然后通知监听接口LeaderRetrievalListener,通知jobclient job taskmanager
11.flink与storm对比怎么样?为什么要从storm迁移到flink上?
开发更简单,提供了精确一次的语义,计算更精确
flink与spark对比:
https://ververica.cn/developers/big-data-computing-engine-battle/
12:flink程序常见问题分析和调优?
出现最多的问题就是反压,交通业务卡口数据,早晚高峰时产生,下游的处理速度跟不上上游消费kafka的速度
猜测的原因:大量的计算指标 13个指标 算不过来 流控
窗口算子使用的timeWindowAll 非并行算子 只能一个slot
反压产生在source,数据最终都会被积压在发生反压上游的算子的本地缓冲区(localbuffer)中
每一个taskmanager都有一个本地缓冲池,每一个算子数据进来后都会把数据填充到本地缓冲池中,
数据从这个算子出去后会回收这块内存,但是当被反压后,数据发不出去,本地缓冲池就无法释放,导致一直请求缓冲区(requestBuffer).
解决:调大等待时延120s-->180s 12个slot 60s算一次,窗口被触发的时间延迟了
或者是任务拆分,将流量统计跟外地车拆分开
//TODO 专门写一个流量统计 最低资源跑一下 1slot + 30s+5s延迟
slot 内存隔离,CPU不隔离
实际原因:线上CPU负载高,CPU争夺导致
产生过两次,一次是集群中solr数据量超过规划导致,50亿数据查询导致GC频繁
一次是离线任务:伴随车计算 分时段并行跑导致
实时离线程序,以及跟solr 未隔离导致的
另一个数据质量问题:
卡口设备采集到的过车时间 会提前或者滞后
网络传输/设备故障问题 会导致今天过车数据明天才会到达
过车时间提前(脏数据) 会导致计算窗口被提前触发-结束,数据失真 逻辑上需要过滤掉这部分数据
还有CPU指定的太小,且CPU不隔离 导致CPU在高峰期计算紧张 造成滞后
数据倾斜问题:
影响:单点问题 --> GC频繁 --> 吞吐下降,延迟增大 --> 系统崩溃
单点问题:数据集中到某些partition上
GC频繁:过多的数据集中到某些JVM中导致其内存资源短缺,进而引起频繁GC
吞吐下降,延迟增大:频繁的GC和数据单点问题导致系统吞吐下降,数据延迟
系统崩溃:严重情况下过长的GC会导致TM和JM失联,系统崩溃
倾斜-源头分析;
对于数据源消费不均匀(比如kafka数据源,通过调整数据源算子的并发度解决)
原则:通常情况下是source的并发度和kafka的分区数一致
或kafka分区数是source并发度的正整数倍
注:公司卡口数据源 kk_data_first 6分区
指定6个以上算子会出现数据倾斜的现象
-----------------调优 ---------------------
#调优1:
--------------------------------------------------------
yarn-session 示例:
nohup bin/yarn-session.sh \
--queue xxx \ 指定yarn队列
--container 12 \ 指定 taskManager 个数
--jobManagerMemory 10240 \ 指定 jobManagerMemory 内存
--taskManagerMemory 20480 \ 指定 taskManager 内存
--slots 10 \ 指定单个taskManager 的 并发数 ,每个slot会均分内存
--ship test/ \
>/opt/xxx/xxx/flink-cluster/log/yarn-session.log 2>&1 &
nohup bin/flink run \
--class com.xxx.xxx.xxx.xxx \ 主类
-yid application_1570900515161_13406 \ 已经发布yarn-session的AM
-p 12 \ 指定并行度,即使用12个slot(slot从总的taskmanager内存)
/opt/xxx/xxx//xxx-xxx-xxx-1.0.jar \ jar包路径
>/opt/xxx/xxx//xxx-xxx-xxx-1.0.log 2>&1 & 日志路径
--------------------------------------------------------
设置并行度:
并行度控制任务的数量,影响操作后数据被切分成的块数
调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优
一般并行度设置为集群CPU核数总和的2-3倍
配置进程参数:
Flink on YARN模式下,有JobManager和TaskManager两种进程
在任务调度和运行过程中,JobManager和TaskManager承担很大责任
JobManager内存配置:
负责任务的调度,以及TaskManager、 RM之间的消息通信
如果出现任务数变多,任务并行度增大时,JobManager内存都需要相应增大
TaskManager个数配置:
每个TaskManager每个核同时能跑一个task,
所以增加了TaskManager的个数相当于增大了任务的并发度
资源充足,可以相应增加TaskManager的个数,以提高运行效率
单个TaskManager Slot数配置:
每个TaskManager多个核同时能跑多个task,相当于增大了任务的并发度
注意: 但是由于所有核共用TaskManager的内存,所以要在内存和核数之间做好平衡!
单个TaskManager 内存配置:
TaskManager的内存主要用于任务执行、通信等
当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加
#调优2:设置分区方法:
合理的设计分区依据,可以优化task的切分
思路:在程序编写过程中要尽量分区均匀,这样可以实现每个task数据不倾斜,
防止由于某个task的执行时间过长导致整个任务执行缓慢
查看是否倾斜: UI -->Subtasks 点开一个阶段 -->看数据分布是否均匀
下面几种分区方式:
随机分区: 将元素随机地进行分区 dataStream.shuffle()
Rebalancing分区: 基于round-robin对元素进行分区,使得每个分区负载均衡
对于存在数据倾斜的性能优化是很有用的 dataStream.rebalance()
Rescaling:以round-robin的形式将元素分区到下游操作的子集中
将数据从一个源的每个并行实例中散发到一些mappers的子集中,用来分散负载(不需要完全rebalance)
dataStream.rescale()
广播: 广播每个元素到所有分区
dataStream.broadcast()
自定义分区:自定义的Partitioner对每一个元素选择目标task,以按照某个特征进行分区
-----------
#调优3:
内存调优
非堆内存:network buffer && manager pool
简单:主要是调整两者比例
head内存:flink系统内
flink是运行在JVM上的,flink的堆内存调优跟其他JVM调优一致
将默认的Parallel Scavenge的垃圾回收器改为 G1垃圾回收器
配置 netty 网络通信
经验总结:
判断数据倾斜:
当出现数据倾斜(某一部分数据量特别大),虽然没有GC,但是task时间严重不一致
判断缓冲区超时设置:
由于task在执行过程中存在数据通过网络进行交换
数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置
setBufferTimeout(-1):会等待缓冲区满之后才会刷新,使其达到最大吞吐量
setBufferTimeout(0):可以最小化延迟,数据一旦接收到就会刷新
setBufferTimeout(n):n>0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新
代码中设置:
env.setBufferTimeout(n)
缓存区:存在的原因是为了维持内存跟输入输出的设备速度不匹配
程序执行时,可以先将数据拷贝至缓冲区,一定情形下将数据传给设备
缓冲区刷新:就是将当前缓冲区数据全部提交
忘记刷新可能会导致输出停留在缓冲区
-------
13:现有开发的程序有什么不足?
A1:多个任务在一个yarn-cluster上,一个taskmanager上可能会有多个task,会发生CPU资源争夺
A2:实时离线程序 没有进行任务隔离
这种情况一般会在CPU负载高的情况下产生 (solr单表数据量太大,频繁GC 另一个是离线程序MR 并发时产生)
A3:车辆轨迹
有两个job需要计算车辆轨迹.车辆轨迹全天每辆车一条/重点路段拥堵最近10分钟每辆车一条
车辆轨迹是来一条算一条,依赖redis进行更新/创建
重点路段拥堵 window+process 中对十分钟数据进行计算
在全天轨迹实时性要不要那么高间作取舍,是否需要加上窗口,在窗口中使用state存一天轨迹
A3:联合两条流的事件是常见的流处理需求: 没接触过
14:flink 时间+水印+窗口 关系?什么时候会触发计算? 窗口流程涉及知识点梳理
以在途车辆归属地分析这个功能为例.使用事件时间+水印抽取+时间取余规整 5分钟一次 + 滚动窗口5分钟
9:15分就出9:10-9:15的结果 程序刚执行1分钟,就被触发输出结果了
可以得出:设置5分钟计算窗口 并不一定是程序运行5分钟后第一批数据才会输出 !!!
(如果设置的时间属性是处理时间,5分钟后第一条出来,
如果设置的时间属性是事件时间,触发时间是数据真实产生的时间,可能几秒钟 窗口就会被触发
触发的时间,是取余后的水印时间+最大延时5s
所以窗口触发的时间并不是程序执行5分钟才算第一次,是按照五分钟划分一个窗口
只要水印中出现窗口结束的那个时间(按照配置属性触发),就会触发窗口计算
中间会对数据进行排序! (数据是存放在堆内存中的)
一批数据进来 进到已经划分好的窗口中 进行排序 当期间出现窗口结束时间的那条数据,窗口闭合 后续数据被抛弃 触发计算 输出
顺便解决一个疑惑:
程序刚发布时,会出现大量历史过车时间的汇总数据? 比如:2019-9-12 09:43:00发布,出现 大批 2019-09-11 XX:XX:XX 甚至更久数据
原因就是,数据质量不高(脏数据) 出现的错误过车时间的日志,这些过车的事件时间 是在当前时间之前的窗口 然后再加上等待的5s,被触发
处理:这些数据过滤掉
------------------------------Keyed Windows----------------------------------
stream 按照某个Key对数据分流,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流
.keyBy(...) <- 分流
.window(...) <- 窗口
[.trigger(...)] <- 触发器
[.evictor(...)] <- 驱逐器
[.allowedLateness(...)] <- 处理延迟数据.默认是0 不作处理 注:只对EvenTime有效
[.sideOutputLateData(...)] <- 侧输出
.reduce/aggregate/fold/apply() <- 窗口函数
[.getSideOutput(...)] <- 获取侧输出数据
------------------------------Non-Keyed Windows----------------------------------
stream 未分流,没有按照某个字段分组的数据使用的窗口
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
-------------------------------顺序捋捋---------------------------------
窗口周期:
只要属于该窗口的第一个元素到达,窗口就会被建立
当时间(处理/事件时间)超过窗口结束时间+允许延迟时间时,窗口就会被清除(通过水位线是否超过end_time 来判断)
触发器:
触发策略默认是:当时间(处理/事件时间)超过窗口结束时间+允许延迟时间时触发
或者自定义触发: 当当前窗口的数据量>4时,窗口触发,窗口清除
触发器还可以决定在创建和删除之前的任何时间清除窗口的内容
注意:仅清除窗口里已经到达的数据,而非元数据,这意味着新数据依然会进入到该窗口
--------------------------------
窗口是怎样切分的:
大部分流系统是按照当前task节点的本地时钟切分 优点:切分容易,不易造成阻塞
缺点:1:无法满足用户希望按照消息本身的时间特性进行分段处理
2:不同节点的时钟不同,以及数据到达各节点的延迟不同 会导致某个节点的同一窗口的数据
到达下游时,被切分到不同窗口
Flink 提供三种方式:
Operator Time:根据Task所在节点的本地时钟来进行切分的时间窗口
Event Time:消息自带时间戳
优点:确保时间戳在同一个时间窗口的所有消息一定会被正确处理
缺点:如果乱序的消息延迟很高的话,会影响分布式系统的吞吐量和延迟
Ingress Time:摄取时间,数据到达Flink系统的时间.到达时给一个自增的时间戳
Ingress Time可以看成是Event Time的一个特例,由于其在消息源处时间戳一定是有序的!
--------------------------------
window && timeWindow
.window(): 必须指定时间类型 eg:window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.timeWindow: 使用的env设置的时间类型 eg:timeWindow(Time.seconds(5))
--------------------------------
QA:1分钟的窗口是一分钟算一次吗?
处理时间:是
事件时间:否 可能大于1分钟或小于一分钟,窗口内数据的某个事件时间减去窗口开始的时间>=一分钟才会触发计算
--------------------------------
trigger触发器
.trigger(CountTrigger.of(100)) //100条数据触发一次计算
--------------------------------
watermark:水位线
基于事件时间的窗口如何被触发???
水位线是全局进度的度量标准.
标志着系统可以确信在一个时间点之后,不会再有早于这个时间点发生的事件到达
有事件时间为何还需要处理时间?
处理时间窗口将会带来理论上最低的延迟
处理无界数据集的一个通常做法是将输入的事件转成 微批,然后交由批处理器来处理
分布式系统常见的问题:
集群中资源的分配和管理、进程协调调度、持久化和高可用的数据存储,以及故障恢复
15:flink on yarn (重要)
Client提交App到RM上面去运行,然后RM分配第一个container去运行AM,然后由AM去负责资源的监督和管理。
需要说明的是,Flink的yarn模式更加类似spark on yarn的cluster模式,在cluster模式中,dirver将作为AM中的一个线程去运行,
在Flink on yarn模式也是会将JobManager启动在container里面,去做个driver类似的task调度和分配,
YARN AM与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,
从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,
Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理。
Flink on Yarn-Per Job模式:
指每提交一个任务,任务完成后资源就会被释放
首先 Client 提交 Yarn App (比如 JobGraph 或者 JARs)
接着 Yarn 的 ResourceManager 会申请第一个 Container
这个 Container 通过 Application Master 启动进程
Application Master 里面运行的是 Flink 程序,即 Flink-Yarn ResourceManager 和 JobManager。
最后 Flink-Yarn ResourceManager 向 Yarn ResourceManager 申请资源
当分配到资源后,启动 TaskManager TaskManager 启动后向 Flink-Yarn ResourceManager 进行注册,
注册成功后 JobManager 就会分配具体的任务给 TaskManager 开始执行
Flink on Yarn-Session模式:
per job 模式中,执行完任务后整个资源就会被释放
session模式中,Dispatcher和resourcemanager是复用的:
当Dispatcher收到请求后,会启动JobManager(A),让jobManager(A)来完成启动TaskManager
接着会启动JobManager(B)和对应的TaskManager.当A,B任务完成后,资源并不会释放.
Session模式也被称为多线程模式,其特点是资源会一直存在不会释放
多个JobManager共享一个Dispatcher,还共享Flink-YARN的ResourceManager
16:批流是怎样统一的?
Batch和streaming会有两个不同的ExecutionEnvironment,不同的ExecutionEnvironment会将不同的API翻译成不同的JobGgrah,
JobGraph 之上除了 StreamGraph 还有 OptimizedPlan.OptimizedPlan 是由 Batch API 转换而来的.
StreamGraph 是由 Stream API 转换而来的,JobGraph 的责任就是统一 Batch 和 Stream 的图.
17:Flink JobManagerHA ?
与Storm不同的是,知道Storm在遇到异常的时候是非常简单粗暴的,
比如说有发生了异常,可能用户没有在代码中进行比较规范的异常处(至少执行一次)的语义,
比如说一个网络超时的异常对他而言影响可能并没有那么大,
但是Flink不同的是他对异常的容忍度是非常的苛刻的,那时候就考虑的是比如说会发生节点或者是网络的故障,
那JobManager单点问题可能就是一个瓶颈,JobManager那个如果挂掉的话,
那么可能对整个作业的影响就是不可恢复的,所以考虑了做HA
18:Kappa架构
用来解决lambda架构的不足,即更多的开发和运维工作
lambda架构背景是流处理引擎还不完善,流处理的结果只作为临时的、近似的值提供参考
Flink流处理引擎出现后,为了解决两套代码的问题,Kappa架构出现
Kappa架构介绍:
Kappa 架构可以认为是 Lambda 架构的简化版(只要移除 lambda 架构中的批处理部分即可)
在 Kappa 架构中,需求修改或历史数据重新处理都通过上游重放完成。
Kappa 架构最大的问题是流式重新处理历史的吞吐能力会低于批处理,但这个可以通过增加计算资源来弥补。
调研:flink可以保证计算的准确性,但是有一个前提是数据时准时到达的.
卡口过车数据 设备会因为网络延迟迟到几个小时,所以 Kappa架构不适合我们
建议次日凌晨使用离线计算统计前天数据,替换实时表数据
Lambda架构:
Lambda 架构用定期运行的批处理作业来实现应用程序的持续性,并通过流
处理器获得预警。流处理器实时提供近似结果;批处理层最终会对近似结果予以纠正
批处理架构很难解决乱序事件流问题
批处理作业的界限不清晰,写死了 假设需要根据产生数据的时间段(如从用户登录到退出)生成
聚合结果,而不是简单地以小时为单位分割数据
19:集群CPU负载高 会影响Flink 及Solr查询
产生原因 solr单表数据量太大(50亿/65字段/25索引字段) 并发查询时,垃圾回收不及时
影响: solr查询慢,flink 任务进行CPU争夺 数据滞后
解决方式: 数据分表 分为实时表和历史表 实时表保持在20亿左右,数据定期删除
20:Chandy-Lamport算法
将流计算看作成一个流式的拓扑,定期在这个拓扑的头部source点开始插入特殊的barriers(栅栏)
从上游开始不断向下游广播这个Barriers.
每一个节点收到所有的barriers,会将state做一次snapshot(快照)
当每个节点都做完Snapshot之后,整个拓扑就算做完一次checkpoint
接下来不管出现任何故障,都会从最近的checkpoint进行恢复
Flink用这套算法,保证了强一致性的语义,
也是Flink区别于其他无状态流计算引擎的核心区别
21:Flink实现风控系统
参考:https://mp.weixin.qq.com/s/RnUnMtlm4M6nPvjvmo8HWw
flink+规则引擎实现实时风控解决方案
互联网产品,典型的风控场景:
注册风控、登陆风控、交易风控、活动风控
达到事前预警,事中控制的效果
# 业务系统 风控系统 惩罚系统 分析系统
业务系统:通常是 APP + 后台 或者 web,是互联网业务的载体,风险从业务系统触发
风控系统:为业务系统提供支持,根据业务系统传来的数据 或 埋点信息来判断当前用户 或 事件有无风险
惩罚系统:业务系统 根据风控系统的结果 来调用,对有风险的用户或事件进行控制 或 惩罚
比如增加验证码、限制登陆、禁止下单等等
分析系统:根据数据来衡量风控系统的表现:
比如某策略拦截率突然降低,那可能意味着该策略已经失效,
又比如活动商品被抢完的时间突然变短,这表明总体活动策略可能有问题等等,
该系统也应支持运营/分析人员发现新策略
风控系统 && 分析系统
业务场景: 电商业务
风控范围: 注册(虚假/批量注册) 登录(盗号登陆) 交易(盗刷客户余额) 活动(薅羊毛)
风控实现方案:事中风控 --> 目标为拦截异常事件
风控系统:规则 && 模型 两种技术路线
规则:简单直观、可解释性强、灵活 缺点:容易被攻破,一但被黑产猜中就会失效
实际的风控系统中,往往需要再结合上基于模型的风控环节来增加健壮性
规则可以组合成规则组
//TODO
22:Flink-CEP
CEP的规则解析之后,本质上是一个不确定状态的转换机,所以在匹配过程中:每个状态会对应着一个或多个元素
23 说一下flink 常用的窗口函数
窗口函数是触发器在确认窗口数据到达完毕后,执行的函数
ReduceFunction AggregateFunction FoldFunction ProcessWindowFunction
前两个函数执行效率更高,因为 Flink 可以在每个窗口中元素到达时增量地聚合
ProcessWindowFunction 将获得一个窗口内 所有元素的迭代器 以及 元素所在窗口的 附加元信息
.timeWindow(Time.seconds(10)) //这里的时间窗口用于保证从外部接收当前批次的所有数据,不宜过小
.process(new KeyAreaIndexProcessFunction) //(重点区域编码,计算时间,时间标识,区域拥堵指数)
注意:
使用 ProcessWindowFunction 的窗口转换操作不能像其他那样有效率
是因为 Flink 在调用该函数之前 必须在内部缓存窗口中 的所有元素
优化:这可以通过将 ProcessWindowFunction 与 ReduceFunction, AggregateFunction 或 FoldFunction
组合使用来获得窗口元素的 增量聚合 以及 WindowFunction接收的附加窗口元数据
-----------------
ProcessWindowFunction(灵活的窗口函数) : keyBy + window + process
优势:
能够一次性迭代整个窗口里的所有元素,最重要的是context对象: 可以获取到事件和状态信息
缺陷:
全量计算,要缓存整个窗口然后再去处理,所以要设计好内存
注:一般不这样直接使用window + process ,跟增量计算的窗口函数结合使用
24:业务场景是否需要引入实时系统
需要实时查看业务信息
数据实时采集 数据实时计算 数据实时下发
告警:邮件/钉钉/短信/微信
在计算层会将计算结果与阈值进行比较,超过阈值触发告警,让运维提前收到通知
存储:消息队列/DB/文件系统
数据存储后,监控大盘(Dashboard)从存储(HBase,ES)中查询对应指标的数据就可以查看实时的监控信息
25:Streaming System:无界数据处理过程中最关键的几个问题:
1:计算什么结果 What results are calculated ? What
2:在事件时间的哪个地方计算结果 Where in event time are results calculated ? Where
3:在处理过程的什么时间点,可以输出结果 When in processing time are results materialized ? When
4:如何更新结果 How do refinements of results relate ? How
26:Flink Trigger 如何自定义触发器?
27:什么是flink侧输出 Side Output?
28:说一下Flink Runtime 的作业执行的核心机制
Flink 在 Runtime 层之上提供了 DataStream 和 DataSet 两套 API,
分别用来编写流作业与批作业,以及一组更高级的 API 来简化特定作业的编写
架构:标准的master-slave
Master:它负责管理整个集群中的资源和作业 application_1570900515161_2765
Slave:负责提供具体的资源并实际执行作业
Master部分 包含3个组件: Dispatcher(调度器) ResourceManager JobManager
Dispatcher 负责接收用户提交的作业,并且负责为这个新提交的作业拉起一个新的JobManager组件
ResourceManager 负责资源的管理,在一个Flink集群中只有一个ResourceManager
JobManager 负责管理作业的执行,在一个Flink集群中可能有多个作业同时执行
每个作业都有自己的JobManager组件
注:这三个组件都包含在 AppMaster 进程中
-----Flink Runtime 层执行作业的基本流程-----
当用户提交脚本会首先启动一个Client进程负责Job的编译和提交
它首先将用户的代码编译成一个jobGraph(图),同时会进行检查和优化等工作--> 例如判断哪些 Operator 可以 Chain 到同一个 Task 中
然后Client将产生的JobGraph提交到集群中执行: 分两种情况
A:类似于 Standalone 这种 Session 模式,AM 会预先启动 此时 Client 直接与 Dispatcher 建立连接并提交作业即可
B:另一种是 Per-Job 模式, AM(ApplicationMaster) 不会预先启动 ,Client 将首先向资源管理系统 (如Yarn、K8S)申请资源来启动 AM,
然后再向 AM 中的 Dispatcher 提交作业
当作业到达Dispatcher 后:
Dispatcher会首先启动一个JobManager 组件,然后JobManager向ResourceManager请求资源来执行具体的job
这时根据session 和pre-job 的区别,分两种情况:
A:Session: TaskExecutor 已经启动,可以直接选取空闲资源进行分配
B:Per-Job: TaskExecutor 尚未启动,首先向外部资源管理系统申请资源来启动,等待TaskExecutor 注册相应资源后再继续选择空闲资源进程分配
注:TaskExecutor 的资源是按照Slot描述
ResourceManager 选择到空闲的 Slot 之后,就会通知相应的TM 将该 Slot 分配分 JobManager XX
然后 TaskExecutor 进行相应的记录后,会向 JobManager 进行注册.
JobManager收到 TaskExecutor 注册上来的 Slot 后,就可以实际提交Task了
TaskExecutor 收到 JobManager 提交的 Task 之后,会启动一个新的线程来执行该 Task
Task 启动后就会开始进行预先指定的计算,并通过数据 Shuffle 模块互相交换数据
----------------------------------------------------------------------------
Flink Runtime支持两种作业执行模式的区别:
Per-Job:独享调度器和ResourceManager
按需申请资源(TaskExecutor)
适合执行时间较长的大作业
仅执行单个作业
Session:共享调度器和ResourceManager
共享资源TaskExecutor
适合规模小,执行时间短的作业
-------------------------------------
29:如何分析和处理Flink反压?
反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题
反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速
由于实时计算应用通常使用 消息队列 来进行生产端和消费端的解耦,消费端数据源是 pull-based(基于拉动式) 的
所以反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率
Flink的反压机制:
简单来说,Flink拓扑中每个节点(Task)间的数据都以阻塞队列的方式传输,下游来不及消费导致队列被占满后,
上游的生产也会被阻塞,最终导致数据源的摄入被阻塞
---------------------------------
Flink反压相关博客:
1.Flink 原理与实现:如何处理反压问题
http://1t.click/bgxt
2.一文彻底搞懂 Flink 网络流控与反压机制
https://mp.weixin.qq.com/s/ynT6u8WQiKxAc6MC3-H_Xw
3.Flink 轻量级异步快照 ABS 实现原理
http://1t.click/bgxr