-
Notifications
You must be signed in to change notification settings - Fork 260
/
Copy pathFlink 初识.txt
1080 lines (796 loc) · 50.8 KB
/
Flink 初识.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# 流数据 #
什么是流数据 ?
从广义上说,所有大数据的生成均可以看作是一连串发生的离散事件.
这些离散的事件以时间轴为维度看就形成了一条条事件流/数据流
数据是指由数千个数据源持续生成的数据,流数据通常也以数据记录的形式发送,
但相较于离线数据,流数据普遍的规模较小.
在典型的大数据业务场景下数据业务最通用的做法是:
选出批处理的技术处理全量数据,采用流式计算处理实时增量数据
现有批流统一方案:spark flink ,ray引擎融合计算(spark+flink)
推荐流计算神书:Streaming Systems!
无界数据处理过程中最关键的几个问题:
1:计算什么结果?
2:在事件时间的哪个地方计算结果?
3:在处理过程的什么时间点,可以输出结果?
4:如何更新结果?
带着问题往下看.
# Flink #
Apache Flink 是一个分布式的大数据处理引擎,可对有限数据流和无限数据流
进行有状态或无状态的的计算.
优势:
Flink具备7X24小时高可用的SOA(面向服务架构),原因是在实现上 Flink 提供了一致性的 Checkpoint.
Checkpoint 是 Flink 实现容错机制的核心,它周期性的记录计算过程中 Operator 的状态,并生成快照持久化存储
当 Flink 作业发生故障崩溃时,可以有选择的从 Checkpoint 中恢复,保证了计算的一致性
##Flink 基石##
state checkpoint time window
Flink是有状态的计算!
----------------------流计算的特征:--------------------------
时效性高 :数据实时采集、实时处理,延时粒度在秒级甚至毫秒级,业务方能
够在第一时间拿到经过加工处理后的数据
常驻任务 :区别于离线任务的周期调度,流式任务属于常驻进程任务, 一旦启
动后就会一直运行,直到人为地终止,因此计算成本会相对比较高
性能要求高 :实时计算程序的性能优化占任务开发的很大一部分工作.
应用局限性 :数据到达时间的不确定性导致实时处理眼离线处理得出来的结果会有一定的差异
---- 实时计算应用场景 ----
why?
在大数据系统中,离线批处理技术可以满足非常多的数据使用场景需求,
但是在DT时代,每天面对的信息是瞬息万变的,越来越多的应用场景对 数据的时效性
提出更高的要求.数据是价值是具有效性的:一条数据产生的时候,如果不能及时处理
并在业务系统中使用,就不能让数据保持最高的'新鲜度'和价值最大化.
what?
# 实时ETL
实时ETL&数据流的目的是实时的把数据从A点投递到B点.
中间可能会加上一些数据清洗和集成的工作,比如实时构建搜索系统的索引,实时数仓中的ETL过程等
eg:尾号限行(对违反限行车辆进行初步筛选, 再进行汇总) 车辆轨迹(将一辆车一天轨迹拼接成一条,时间列表 点位列表 过车间隔列表)
# 实时数据分析(实时指标汇总)
数据分析指的是根据业务目标从原始数据中抽取对应信息并整合的过程.
比如查看每天卖的最好的十种商品,仓库平均周转时间,文章平均点击率,推送打开率等等.
实时数据分析则是上述过程的实时化,一般最终体现为实时报表或实时大屏
eg:30s流量统计 高频车 重点路段/区域 流量/拥堵 在途车辆归属地分析
# 事件驱动应用 (实时预警)
事件驱动应用是对一系列订阅事件进行处理或作出响应的系统.
事件驱动应用往往还会依赖内部状态,比如点击欺诈检测,风控系统,运维异常检测系统等
当用户的行为触发某些风险控制点时,系统会捕获这个事件,并根据当前行为和用户之前的行为进行分析,
决定是否对用户进行风险控制
eg: 套牌车 流量突发预警 路段突发拥堵预警
--------------------Flink应用场景------------------------
1. 实时智能推荐 2. 复杂事件处理
3. 实时欺诈检测 4. 实时数仓与ETL
5. 流数据分析 6. 实时报表分析
7. 监控平台
实时ETL:实时消费Kafka数据进行清洗、转换、结构化处理用于下游计算处理
实时数仓:实时化数据计算,仓库模型加工和存储.实时分析业务及用户各类指标,让运营更加实时化
实时监控:对系统和用户行为进行实时检测和分析,如业务指标实时监控,运维线上稳定性监控,金融风控等
-------------基于交通卡口数据的实时计算------------
实时分析: 路段/区域/全市/卡口/进出城/公交专用道流量
路段/区域/全市拥堵
在途车辆归属地分析
点位高频车分析
实时预警: 套牌车/突变流量/突发拥堵
实时ETL: 实时抽取 卡口过车数据关键字段
实时抽取 违法尾号限行车辆
实时更新每辆车车辆轨迹
------------------------------------------------
注:公司数据服务化,都是通过服务的方式对外提供数据
排行类的数据现在是在查询层phoenix做的
没有热点数据top10的功能
做的是基础数据,5/1分钟/30s算一次,接口拿去做趋势图
改进:在状态中进行排序输出 ListState
eg: ... DataStream-->KeyedStream-->DataStream
.timeWindow(Time.minutes(1), Time.seconds(5))
.aggregate(new CountAgg(), new WinResultFunction())
.keyBy("windowEnd")
.process(new TopNHotItems(3))
------------------------ Flink Application Flink应用开发 相关概念 ------------------------
Stream 流
分为有界数据流(bounded stream) && 无界数据流(unbounded stream)
两者区别:无界数据流的数据会随着时间的推移而持续增加,计算持续进行且不存在结束的状态 7x24 实时
有界数据流数据大小固定,计算最终会完成并处于结束的状态 离线
State 状态
是计算过程中的数据信息,在容错恢复和checkpoint中有重要作用
流计算在本质上是 增量处理 (Incremental Processing),因此需要不断查询保持状态
同时,为了保持精确一次(Exactly-once)语义,需要数据能写入到状态中
同时持久化存储,能够保证在整个分布式系统运行失败或挂掉的情况下做到Exactly- once
Time 时间
事件时间,摄取时间,处理时间
Flink的无限数据流是一个持续的过程,时间是我们判断业务状态是否滞后,数据处理是否及时的依据
Window 窗口
数据流是无限的,无界限. 但是可以通过一个有界的范围来处理无界的数据流.
滚动窗口:每个界限计算结果互不影响(不重合) 比如几分钟统计一次
滑动窗口:每个计算窗口统计数据是有重复的 每30s统计过去一分钟的过车数量
简写: .timeWindow(Time.minutes(1), Time.seconds(5)) //一个参数是滚动,两个是滑动+滑动频率 窗口中时间的属性根据env配置决定
----------- 流处理如何解释时间?? -----------
Time
Timer
定时器,作为window的触发源,分为两类:
WallTime Timer:按照 正常的现实时间 作为触发源
LowWatermark Timer:以 低水位 作为触发源
low watermark :最低水位
其实就是一个时间戳 ,每一个计算节点都会维护一个时间戳作为watermark
A的低水位值不只和A本身的最旧数据有关,也跟上游的低水位有关.
因此,只要上游还有更旧的数据存在,就会通过低水位机制维护的low watermark告知下游,
下游便会更新它自己的low watermark并且由于lwm timer未触发,因此会进行等待
只要进入到flink中,当前窗口的 low watermark 一定是flink中所有数据中 属于这个window的最小的
在一定程度上保证数据的完整性和实效性,但是如果有数据比lowwatermark还晚到达仍没有办法解决
比如:数据在没有进入流系统之前就耽搁了,那low watermark根本不知道
flink为了解决这个问题,还有allow lateness参数(等待时延)
即Window被low watermark timer触发后,
还会等待allow lateness时间才开始计算,但这样会损失一定的实时性
join:双流转换成单流
coGroup+innerjoin
----------- 水位线 watermark -----------
水位线是衡量Event Time进展的机制,属于数据本身的隐藏属性
即 基于事件时间的数据本身除了事件发生时的时间戳A,还包含一个水位线时间戳B !!!
,这条水位线用来表示在B时间戳之前的数据都已经到达!!
作用:水印是为了解决乱序问题的,解决乱序问题通常是水印+窗口来实现
watermark如何分配:
通常是在接收到source数据后,应该立刻生成水印 或者数据经过简单的map和filter之后 立刻生成watermark
两种生成方式:
1:AssignerWithPeriodicWatermarks,定时抽取更新(允许定义一个最大延迟,比较常用 在用)
2:AssignerWithPunctuatedWatermarks,每一次数据进来都会抽取timetamp并生成watermar,
AssignerWithPunctuatedWatermarks 更精确,但是频繁的更新wartermark会比较影响性能
注:所以不要经常生成时间戳和水印,这样会加大系统的计算负担
水印必须跟窗口一起使用才有效
source数据接入,map进入到自定义水印类,从EvenTime中抽取出水印时间
当包含隐藏属性的 水位线+允许的最大延迟的数据到达时,窗口被触发计算
上下游顺序: source-->map-->filter-->水印-->keyBy-->window-->reduce--->process-->sink
window的触发机制?
先按照自然时间将window划分,30s窗口,1分钟分两个0-30,31-60
window的设定跟数据无关,是系统定义好的
1:input的数据,根据自身的Event Time将数据划分到不同的window中
2:如果窗口中有数据,且水印时间>=window_end_time时 触发窗口计算
#最终决定触发的规则是数据本身的Event Time所在的window中的window_end_time决定
Flink如何处理乱序?
watermark + window机制 :Watermark是用于处理乱序事件的
window中可以对input进行按照Event Time排序,使得完全按照Event Time发生的顺序去处理数据,以达到处理乱序数据的目的
Flink何时触发window?
1:Event Time < watermark时间 (对于延迟太久的数据而言)
2:watermark >=window_end_time(对于无序及正常到达的数据而言)
在[window_start_time,window_end_time]中有数据存在
Flink应该如何设置最大延迟时间?
根据业务决定,如果延迟时间设置的太小,而数据因为网络传输的造成延迟太久
就会出现很多单条数据在窗口中被触发,对数据准确性影响很大
当延迟及乱序很严重时,水位(等待时间)越小,被丢弃的可能性越大
----------- flink的state管理 -----------
按照数据划分和扩张方式:
Keyed States Operator States
checkpoint:程序指定时间定期生成 ,保留当前时间的算子的状态
savepoint:Savepoint是指允许用户在持久化存储中保存某个checkpoint,以便用户可以暂停自
己的任务进行升级。升级完后将任务状态设置为savepoint存储的状态开始恢复运行,保证数据处理的延续性。
不管配置什么样flink都会从这个checkpoint恢复,常用于版本升级,保证了数据的延续性
External Checkpoint:(外部checkpoint)做完一次checkpoint后在制定的目录中
多存储一份checkpoint 保留meta数据 双备份
作业失败或取消状态结束时,外部存储的meta数据将保留下来
##State 和 checkpoint数据的存储方式:
MemoryStateBackend
FsStateBackend
RockDBStateBackend
数据量小存储在 MemoryStateBackend 和 FsStateBackend中
数据量大 存储在RockDBStateBackend 中
---------------- 状态state和检查点checkpoint ----------------
How:最简单的wordcount,给一些word计算他们的count,count输出不断累加的结果,count就是状态
批处理对state要求不高:之前的批处理都是将数据分片计算,分片完成后做一个聚合,state要求较少
流处理程序,输入的是一个无限制的数据流,会运行很大一段时间,要求运行几天,几个月都不会宕机,
需要对中间的状态数据好好管理.storm采用的将状态数据存储到Hbase(计算时从hbase读,完了在更新到hbase,还需要保证数据一致性)
Flink提供了不同的状态后端机制,用于指定状态的存储方式和位置
state可能会很大,ali的几T都有
参考:Chandy-Lamport算法
Flink是通过检查点的方式来实现 exactly-once 只执行一次,当遇到故障时将系统重置为初始状态
----------------------------------- savepoint -------------------------------------
flink savepoint:
Savepoints在持久化存储中保存某个checkpoint,以便用户可以暂停自己的应用进行升级,
并将状态设置为savepoint的状态,并继续运行.
该机制利用了Flink的checkpoint机制创建流应用的快照,并将快照的元数据(meta-data)写入到一个额外的持久化文件
系统中
1:每个算子需要id
手动设置UID
程序自动生成(跟应用结构算子有关)
建议手动给每个算子通过UID(String)分配一个固定的id
2:Savepoint产生的数据将被保
存到配置的文件系统中,如FsStateBackend或者RocksDBStateBackend
触发:
bin/flink savepoint jobId 路径[选,不写读flink-conf.yaml文件]
停止并触发:
bin/flink cancel -s [路径] jobId
3:从savepoint恢复作业:
bin/flink run -s savepointPath [runArgs]
允许不恢复某个算子的状态:
bin/flink run -s savepointPath -n [runArgs]
#默认情况下,系统会尝试恢复savepoint的状态全部映射到用户的流应用
如果代码有修改(删除某个算子),可以通过--allowNonRestoredState(简写-n)恢复状态
4:清除savepoint
bin/flink savepoint -d savepointPath
查看hdfs上的checkpoint
hdfs dfs -ls /flink
hdfs dfs -ls /flink/checkpoints/
hdfs dfs -ls /flink/savepoint/
---------savepoint实测-------------
生成savepoint: ok
bin/flink savepoint -yid application_1572866255793_17760 8b08d32f2f0be5c5b6d064576768f20c
查看生成的savepoint:ok
hdfs dfs -ls /flink/savepoint/
取消job,并生成保存点,ok
bin/flink cancel -yid application_1572866255793_17760 8b08d32f2f0be5c5b6d064576768f20c -savepoint hdfs:///flink/savepoint
启动job:接着保存点运行 ok
bin/flink run -s hdfs:///flink/savepoint/savepoint-8b08d3-c077f4bd59b0 -m yarn-cluster -yn 2 -yjm 1024 -ytm 2048 -yt test/ --class com.xxx.xxx.xxx.Kafka2Phoenix /opt/xxx/xxx/xxx/xxx-1.0.jar
后台运行:ok
nohup bin/flink run -s hdfs:///flink/savepoint/savepoint-8b08d3-c077f4bd59b0 -m yarn-cluster -yn 2 -yjm 1024 -ytm 2048 -yt test/ --class com.xxx.xxx.xxx.Kafka2Phoenix /opt/xxx/xxx/xxx/xxx-1.0.jar >/opt/xxx/xxx/xxx/xxx-1.0.log &
---------------- 流处理系统 ----------------
包括 Flink 在内的分布式流处理引擎一般采用 DAG 图来表示整个计算逻辑,
DAG的每个点都代表一个基本的逻辑单元,即算子
Source接入-->transformation网络传输/本地传输在算子间进行发送和处理-->sink发送到外部系统或数据库
--------- 基本模型 ---------
逻辑模型 && 物理模型(运行时)
逻辑模型可能有多个并发,实际的分布式流处理引擎更复杂
,每个算子都可能有多个实例
假设: A B C三个算子,A source 有两个实例,C有两个实例,B 一个
在逻辑模型中,A和B是C的上游节点
而在对应的物理模型中,C的所有实例和A,B的所有实例之间可能都存在数据交换
# 注: 在物理模型中,我们会根据计算逻辑,采用系统自动优化或认为指定的方式将计算工作分布到不同的实例中,
只有当算子实例分布到不同进程时,才需要通过网络进行数据传输
,而同一进程中的 多个实例之间的数据传输通常不需要经过网络的
Storm&& Flink 构建DAG计算逻辑图区别?
Storm需要在图中添加Spout或Bolt这种算子,并指定算子之前的连接方式
Flink的Api定义更加面向数据本身的处理逻辑:
将数据抽象成一个无限集,然后定义一组集合上的操作,最后在底层自动构建相应的DAG图
区别:Storm更底层,自由度高 Flink的Api更上层,也更简单
---------------- Flink DataStream API ----------------
//1、设置运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、配置数据源读取数据
DataStream<String> text = env.readTextFile ("input");
//3、进行一系列转换
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
//4、配置数据汇写出数据
counts.writeAsText("output");
//5、提交执行
env.execute("Streaming WordCount");
上面实现一个流式的wordcount,首先需要获得一个StreamExecutionEnvironment对象(构建DAG图的上下文对象)
,基于这个对象,我们可以添加一些算子
1:使用了 Environment 对象中内置的读取文件算子readTextFile获取数据源,获取到DataStream对象,它可以看做是一个无限的数据集
2:调用flatMap将每一条数据记录(文件中的每一行)分解成单词,同时会在底层的DAG图中添加一个flatMap算子
3:得到的是处理过的单词的流,调用keyBy算子将流中的单词分组/分流,然后调用sum算子进行累计
4:计算出的结果形成一个新的流,调用writeAsText算子将结果写到文件中
5:只有最终调用execute方法时,才会把DAG提供到集群中,接入数据并执行实际的逻辑
前面调用所有算子并没有实际处理数据,而是在构建表达计算逻辑的DAG图.
# 整个代码实现过程就是一个构建DAG 图的过程,将算子加到DAG 中 输入 处理 输出
Flink DataStream API的核心就是代表流数据的DataStream对象,开发就是对DataStream对象进行操作
--------------------------------
Rich Function有一个生命周期的概念:
open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用
close()方法是生命周期中的最后一个调用的方法,做一些清理工作
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
---------------- DataStream操作分类:(四类) ----------------
1:基于单条记录(filter,map)
2:基于窗口(window)
3:合并多条流(union,join,connect)
4:拆分单条流(split)
第一类是对于单条记录的操作.比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)
第二类是对多条记录的操作.比如说统计一个小时内的订单总成交量.就需要将一个小时内的所有订单记录的成交量加到一起
为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理
第三类是对多个流进行操作并转换为单个流.比如 多个流可以通过 Union、Join 或 Connect等操作合到一起
这些操作合并的逻辑不同,但最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作
第四类DataStream 还支持与合并对称的操作,就是把一个流按一定规则拆分为多个流(Split 操作)
每个流是之前流的一个子集,这样我们就可以对不同的流作不同的处理
------------------------------ Flink通信模型(Akka) ------------------------------
Flink客户端(JobClient) JobManager(1) TaskManager(N) 之间通信都是基于Akka actor模型
JobClient 从用户处获取到Flink job,提交给JobManager
1:JobManager 负责这个job的执行:首先分配所需的(slot:CPU,内存)资源,即TaskManagers上要执行的slot
2:获取到slot后,jobmanager 部署单独的任务到响应的TaskManager上
TaskManager产生一个线程来执行这个任务
3:状态改变时(开始计算,结束计算,每一次算子计算),状态会被发送回JobManager
基于这些状态的更新,JobManager将引导着个job执行完成
4:一旦执行完,结果将会发送回JobClient
## JobManager和TaskManager
JobManager是核心控制单元,负责整个Flink Job,负责资源分配,任务调度和状态汇报
//TODO
TaskManager
## Flink Blob:
JobManager-blob服务 ,也是在flink-conf.yaml文件中配置
接收jar包/发送jar包到Taskmanager,传输log文件
------------------------------ 反压/背压 ------------------------------
backpressure 背压概念:
当数据流启动时,数据源把一行行/条条数据填到一个类似桶的缓存中(buffer)
一旦缓存(buffer)满了,桶就顺着流水线流到下游组件(component)中,流处理引擎
会拿来一个新的空缓存(空桶),源数据不知道这一切,只会不停填桶...
如果transformation和driver来不及处理这些数据,流处理引擎会启动反压机制,让数据源睡眠/等待
当流水线又有空余的桶(缓存)后,源数据被唤醒继续往桶里填数据.
背压传播的趋势:
一个任务的back pressure警告(high),则意味着该任务产生数据的速度要高于下游Operator消化的速度
数据沿着job的数据流图向下游流动(如从source到sink),而背压则是沿着相反的方向传播,逆流而上,可以理解为 水流漫上去
flink 反压机制(backpressure)
产生原因:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率
flink利用自身作为纯数据流引擎的优势来优雅地响应反压问题
Q:Flink 是如何在 Task 之间传输数据的,以及数据流如何实现自然降速?
运行时:
operators组件
每个operator会消费中间态的流,并在流上进行转换,然后生成新的流
streams组件
Flink中的反压:
Flink 使用了高效有界的分布式阻塞队列,就像Java通用的阻塞队列(BlockingQueue)
Java使用BlockingQueue时:一个较慢的接受者会降低发送者的发送速率,因为一旦队列
满了(有界队列)发送者会被阻塞
在 Flink 中,这些分布式阻塞队列就是这些逻辑流,而队列容量是通过缓冲池来(LocalBufferPool)实现的
每个生产和被消费的流都会被分配一个缓冲池.
缓冲池管理着一组缓冲(Buffer),缓冲在被消费后可以被回收循环利用.
Flink 如何处理背压?
Flink与持久化的source(例如kafka),能够为你提供即时的背压处理.而无需担心数据丢失.
Flink不需要一个特殊的机制来处理背压.因为Flink中的数据传输相当于已经提供了应对背压的机制.
因此,Flink所获得的最大吞吐量由其pipeline中最慢的部件决定
------------------------------ flink内存管理 ------------------------------
主流实时计算框架都是基于jvm语言开发的(Java Scala)
为了加快计算,通常都是将数据加载在内存中,由于数据量巨大,对内存造成很大压力
==数据存储==
最简单做法试封装成对象直接存储在List或Map这样的数据结构中(
公司从mq中拿到的实时计算生产到的数据通过消费者程序写入到hbase
kafka json map list(map) hbase)
引发两个问题?
1:数据规模大时,需要创建的对象非常多(数据加上存储的数据结构,耗费大量内存)
可能引发OOM
2:源源不断的数据需要被处理,对象持续产生并需要被销毁
GC压力大
SO:
JVM自带的GC无法满足高效+稳定的流处理,Flink建立一套自己的内存管理体系
Flink将内存分为3个部分(network buffers,Memory Manager pool,Remaining Heap)
每个部分都有不同用途;
1:Network buffers:一些以32KB Byte数组为单位的buffer,主要被网络模块用于数据的网络传输。
在Flink中主要是基于Netty进行网络传输
2:Memory Manager pool大量以32KB Byte数组为单位的内存池,所有的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存,
并将序列化后的数据存储其中,结束后释放回内存池
内存池,由多个MemorySegment组成,每个MemorySegment代表一块连续的内存空间 byte[]数据结构存储 默认32kb
3:Remaining(Free)Heap主要留给UDF中用户自己创建的Java对象,由JVM管理.
用在UDF中用户自己创建的对象 在UDF中,用户流式的处理数据 并不需要太大内存
同时flink也不建议在UDF中缓存很多数据
重点:
Flink的主动内存管理避免了令人讨厌的OutOfMemoryErrors杀死JVM并减少垃圾收集开销的问题。
Flink具有高效的数据去/序列化堆栈,有助于对二进制数据进行操作,并使更多数据适合内存。
Flink的DBMS风格的运算符本身在二进制数据上运行,在必要时可以在内存中高性能地转移到磁盘。
------------------------------ Flink程序发布 ------------------------------
flink发布命令并没有指定yarn 怎么就在yarn上运行了?
配置一个容器yarn-session,再发布一个flink程序 会自动找到 JobManager address (如果创建容器的时间和程序发布的时间间隔太久 会抛找不到jobManager address的异常) 使用 -yid 可以指定flink程序发布到 yarn cluster 集群上运行
flink on yarn 两种运行方式 1:yarn-session 2:per job
# yarn-session
nohup bin/yarn-session.sh \
--queue xxxxx \
--container 12 \
--jobManagerMemory 10240 \
--taskManagerMemory 20480 \
--slots 10 \
--ship test/ \
>/opt/xxxxx/flink-cluster/log/yarn-session.log 2>&1 &
nohup bin/flink run \
-yid application_1563070919166_1815 \
-p 6 \
--class com.xxxxx.xxxxx.xxxxx.FakeVehicle \
/opt/xxxxx/xxxxx.jar \
>/opt/xxxxx/log/xxxxx.log 2>&1 &
# per job
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 2048 -yt test/ --class com.xxxxx.xxxxx.xxxxx.Kafka2Mysql /opt/xxxx/xxx.jar
------------------------------ Flink 压力测试 ------------------------------
雅虎15年测试:
Storm 能够承受每秒 40 万事件,但受限于 CPU;
Flink 则可以达到每秒 300万事件(7.5 倍)但受限于 Kafka 集群和 Flink 集群之间的网络
--------------------------------------
Flink 的执行过程是基于流的,这意味着各个处理阶段有更多的重叠,并且混洗操作是流水线式的,因此磁盘访问操作
更少.相反, MapReduce、Tez和Spark是基于批的,这意味着数据在通过
网络传输之前必须先被写入磁盘.该测试说明,在使用 Flink 时,系统空闲时间和磁盘访问操作更少。
接收器 数据源
Kafka position也是由Flink自己维护的
理想下 无边际数据流 源源不断来 按照时间窗口 计算 输出
现实情况是: 数据不是按时来的 有延迟
所以划分为事件时间 摄取时间 处理时间
NC市现场部署的flink作业,运行90d了也没出现问题,原因在于flink使用的是自己的内存管理体系
中间出错挂掉,会自动重连并通过checkpoint检查点机制 重新计算
--------------- AM RM ---------------
Flink log中的 RM AM 指的是yarn上的一个ResourceManager 和若干个ApplicationMaster 指的是yarn的AM RM 通信
ApplicationMaster管理在yarn上运行的应用程序的每个实例
同时负责协调来自 ResourceManager 的资源,并通过NodeManager监视容器的执行和资源使用 (CPU、内存等的资源分配)
------------------------- Lambda架构 -------------------------
Lambda 架构用定期运行的批处理作业来实现应用程序的持续性,并通过流
处理器获得预警。流处理器实时提供近似结果;批处理层最终会对近似结果予以纠正
批处理架构很难解决乱序事件流问题
批处理作业的界限不清晰,写死了 假设需要根据产生数据的时间段(如从用户登录到退出)生成
聚合结果,而不是简单地以小时为单位分割数据
------------------------- Kappa架构 -------------------------
用来解决lambda架构的不足,即更多的开发和运维工作
lambda架构背景是流处理引擎还不完善,流处理的结果只作为临时的、近似的值提供参考
Flink流处理引擎出现后,为了解决两套代码的问题,Kappa架构出现
Kappa架构介绍:
Kappa 架构可以认为是 Lambda 架构的简化版(只要移除 lambda 架构中的批处理部分即可)
在 Kappa 架构中,需求修改或历史数据重新处理都通过上游重放完成。
Kappa 架构最大的问题是流式重新处理历史的吞吐能力会低于批处理,但这个可以通过增加计算资源来弥补。
调研:flink可以保证计算的准确性,但是有一个前提是数据时准时到达的.
卡口过车数据 设备会因为网络延迟迟到几个小时,所以 Kappa架构不适合我们
建议次日凌晨使用离线计算统计前天数据,替换实时表数据
------------------------- flink SQL -------------------------
流和动态表(Dynamic Table)的对偶(duality)性。
----- 流式SQL -----
SQL 声明式语言
批处理实例:
SELECT a.id FROM A a,B b WHERE a.id=b.id;
最简单的双流join,找出表A和表B中相同的id
两个经典的Join算法(归并连接 && 哈希连接):
归并连接算法:拿到两表后,将id由小到大排好序,然后从前往后同时遍历两表
一旦遇到相等的id就输出一条Join结果(1,排序2,合并及连接)
哈希连接算法:拿到两张表,对数据规模就行一个评估,然后选取较小一张表
,以id作为key,以数据行作为value,建立哈希索引.
接着便利另一张大表,对每一个id值到建立好的哈希索引中查找有没有id相等的数据行,
有则Join输出
区别:哈希连接算法中,只需要将较小的一张表加载到内存
归并连接算法,需要将两张表都加在到内存中
流式场景下的持续查询:
持续查询:没有外界干预的情况下,查询会一直输出结果,不会停止
由于数据流本身无穷,所以在上面的查询都是持续查询,每当有新数据的到来,可能都会有新的查询结果产生
A B两张表双流Join ,表中数据不断增加,当A B存在相同id,才会生成相应的Join结果
问题:传统数据库允许对一张表进行全表扫描,但是流式场景却做不到
原因:1:我们无法确定下一条数据属于表A还是表B 2:流数据具有无尽性,找不到表的边界
所以传统的归并连接跟哈希连接算法不适用于流式SQL
--------------------
堆外内存(off-heap),堆内存(on-heap)
https://blog.csdn.net/u010722938/article/details/51558315
---- flink学习 ----
flink人大视频
https://www.bilibili.com/video/av42427050/
---- Blink ----
2019.2开源的Blink版本,主要是基于flink 1.5.1
---- 告警 ----
Apache Committer 2017.7月 ali 5个
参考:http://wuchong.me/blog/2019/02/12/how-to-become-apache-committer/
-------------------- flink开发步骤 --------------------
第一步:构建环境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
第二步:添加数据源
val prop:Properties = new Properties()
prop.setProperty("bootstrap.servers",kafkaProp.getProperty("bootstrap.servers"))
val consumer010= new FlinkKafkaConsumer010(kafkaProp.getProperty("source.topic")
,new SimpleStringSchema(),prop)
consumer010.setStartFromLatest()
val dataStream = streamEnv.addSource(consumer010)
第三步:数据预处理
val outputStream = dataStream
.map(x=>getRecord(x))
.filter(!_._1.isEmpty)
.map(x=>recordProcess(x))
第四步:设置时间戳和水印
.assignTimestampsAndWatermarks(new TimestampExtractor(basicProp.getProperty("job.interval").toInt))
第五步:数据分组
.keyBy(0)
第六步:指定时间窗口+聚合计算+输出格式
横向汇总: 通过打标签,在累加计算多个不同指标
.timeWindow(Time.seconds(basicProp.getProperty("max.lagged.time").toInt))
.reduce((v1,v2)=>(v1._1,v1._2,v1._3+v2._3,v1._4+v2._4))
.map(x=>toJson(x))
纵向汇总: 类似于单词计数和groupBy
.timeWindow(Time.minutes(5))
.sum(1)
.map(x=>toJson(x))
第七步:输出
outputStream.addSink(producer010)
第八步:执行flink
env.execute(basicProp.getProperty("application.name"))
注:Flink没有类似于Spark中foreach方法,让用户进行迭代的操作.
所有对外的输出操作都要利用Sink完成
-------------------- transformation --------------------
flink 常用转换算子:
map,flatMap,filter,keyBy,reduce,fold,aggregations,window,WindowAll,
Union,Window join,Split,Select,Project
dataSource.map(getRecord(_))
.filter(new FilterFunction[(String, String, String, Long)] {
override def filter(t: (String, String, String, Long)): Boolean = {
t._4 match {
case t._4 if (t._4) > 20 => true
case t._4 if (t._4) <= 20 => false
}
}
})
/*上下等同,相当于是源码中类似这样已经实现*/
.filter(_._4>20)
map:数据清洗 简单ETL
flatmap:除了map操作 还可以一对多 的 输出
#### 调用filter算子()可以通过重写FilterFunction接口来实现 filter方法
--------------分布式转换算子----------------
Random 随机数据交换由DataStream.shuffle()方法实现,shuffle方法将数据随机的分配到并行的任务中去
Round-Robin 一种负载均衡算法,可以将数据平均分配到并行的任务中去
Rescale 将数据发送到接下来的task slots中的一部分task slots中
Broadcast 将数据复制并发送到所有的并行任务中去
Global 将所有的数据都发送到下游算子的第一个并行任务中去
Custom 自定义
---------------------------------- Flink 双流关联/转换 ----------------------------------
算子:coGroup join coflatmap
Join:只输出匹配成功的数据
CoGroup:无论是否匹配都会输出
CoFlatMap:没有匹配操作,只是分别接收两个流的输入
-------------------------
join,coGroup实现代码结构
val stream1 = ...
val stream2 = ...
stream1.join(stream2)
.where(_._1).equalTo(_._1) //join的条件stream1中的某个字段和stream2中的字段值相等
.window(...) // 指定window,stream1和stream2中的数据会进入到该window中。只有该window中的数据才会被后续操作join
.apply((t1, t2, out: Collector[String]) => {
out.collect(...) // 捕获到匹配的数据t1和t2,在这里可以进行组装等操作
})
.print()
-------- parallelism && slot --------
parallelism 并行度 默认1
程序中设置 env.setParallelism(3); 这里设置的是全局的,包含下面执行的每一个算子
为每一个算子单独设置并行度: dataStream.map(new XxxMapFunction).setParallelism(5)
优先级:算子设置并行度 > env 设置并行度 > 配置文件默认并行度
eg:如果在代码中设置的是env.setParallelism(5),flink-conf.yaml文件中默认为1(1.4) 发布上flink-cluster 上会使用5个slot
slot:作用,是指taskmanager的并发执行能力,简单理解是将CPU和内存分成一个个逻辑单位,即slot
Slot只对内存隔离,不对CPU隔离 CPU共享
---------------------------- 数据流分组 -----------------------------
keyBy 用于指定数据流是否进行分组
需要在window函数前指定好,使用keyBy(...)可以将数据流拆分成逻辑分组的数据流
如果不使用keyBy,你的数据流不是分组的
分组数据流将你的window计算通过多任务并发执行,每一个逻辑分组流在执行中与其他的逻辑分组流是独立地进行的
在非分组的数据流中,你的原始数据并不会拆分成多个逻辑流并且所有的window逻辑都在一个任务中执行,并发度为1
KeyBy 起始就是对数据进行分区
-------------------------------- Flink Window Time WaterMark 基础 --------------------------------
由问题引入 window time watermark
主要包括为什么要有 Window ?
Window 中的三个核心组件:WindowAssigner、Trigger 和 Evictor ?
Window 中怎么处理乱序数据, 乱序数据是否允许延迟, 以及怎么处理迟到的数据 ?
最后我们梳理了整个 Window 的数据流程, 以及 Window 中怎么保证 Exactly Once 语义 ?
-------------------------------- window --------------------------------
Flink 天然支持无限流数据处理的分布式计算框架,使用window将无限流切分成有限流,是处理有限流的核心组件
窗口分类:
时间驱动timewindow,数据驱动countwindow
相关Api:[] 不强制使用,有默认值
keyed windows
stream
.keyBy(...)
.window(...)
[.trigger(...)]
[.evictor(...)]
[.allowedLateness(...)]
[.sideOutPutLateData(...)]
.reduce/aggregate/fold/apply()
[.getSideOutput(...)]
non-keyed windows
stream
.windowAll(...)
[.trigger(...)]
[.evictor(...)]
[.allowedLateness(...)]
[.sideOutPutLateData(...)]
.reduce/aggregate/fold/apply()
[.getSideOutput(...)]
dataStream :
对于非KeyedStream,有timeWindowAll、countWindowAll、windowAll操作,其中最主要的是windowAll操作,它的parallelism为1,它需要一个WindowAssigner参数,返回的是AllWindowedStream
TimeWindowAll 不是并行的 所以slot只能用1 个,滚动窗口
KeyedStream: 通过keyBy将DataStream 转换成KeyedStream 对于KeyedStream除了继承了DataStream的window相关操作,它主要用的是timeWindow、countWindow、window操作,其中最主要的是window操作,它也需要一个WindowAssigner参数,返回的是WindowedStream
## Window API使用
WindowAssigner(窗口指定器) Evictor(清除) Trigger(触发)
1:Window方法接收的输入是一个WindowAssigner,WindowAssigner负责将每条数据分发到正确的Window中(一条数据有可能会分发到多个窗口中)
几种通用的WindowAssigner:(tumbing window 滚动窗口 无重复 ,sliding window 滑动窗口 有重复,
session window 事件窗口,global window 全局窗口)
## 自定义数据分发策略:新建一个class 继承WindowAssigner
2:Evictor 主要用于做一些数据的自定义操作,可选操作
CountEvictor 保留指定数量的元素
DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 threshold,判断是否删除一个元素
TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值
3:Trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 Trigger
允许自定义,继承 Trigger
onElement() 每次往 window 增加一个元素的时候都会触发
onEventTime() 当 event-time timer 被触发的时候会调用
onProcessingTime() 当 processing-time timer 被触发的时候会调用
onMerge() 对两个 trigger 的 state 进行 merge 操作
clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult
CONTINUE 不做任何事情
FIRE 触发 window
PURGE 清空整个 window 的元素并销毁窗口
FIRE_AND_PURGE 触发窗口,然后销毁窗口
flink window:
http://www.aboutyun.com/thread-26483-1-1.html
https://www.infoq.cn/article/WCOvi-D68Y8ycCiYZ8pX
flink触发当前窗口计算的前提是下一条数据的时间在当前窗口的结束时间之后
---------------------------- Trigger ------------------------------
Trigger 触发器
触发器是决定某个窗口何时输出的一种机制
作用跟照相机的快门相同,按下去,就能拿到某个时间点计算结果的快照
通过触发器,能多次看到某个窗口的输出结果。因此可以实现迟到数据(late event)的处理
---------------------------- Time && Watermark ------------------------------
前提:Time 和 Watermark适用于时间驱动类的窗口
分布式环境中Time,事件时间(Event-Time),摄取时间(Ingestion-Time),处理时间(Processing-Time)
Event-Time:数据产生的时间
Ingestion-Time:数据进入到flink的时间
Processing-Time:窗口开始计算的时间
### env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Watermark: 解决乱序的问题
如何保证基于事件时间的窗口销毁时,flink已经处理完所有数据?
Watermark会携带一个单调递增的时间戳t,watermark(t)表示所有时间戳不大于t的都已经到来
未来小于t的数据不会到来,因此可以放心触发和销毁窗口
# 迟到的数据 late elements
问题:
watermater的时间不好设置 :要么不正确 要么耗费太大 ,所以在设置watermark(t)之后,还有较小概率接收到时间戳(t)
之前的数据,这部分称为 late elements
解决方式:指定允许延迟的最大时间(默认0)
DataStream<T> input = ... ;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(window function)
设置allowedLateness之后,迟到的数据同样可以触发窗口,进行计算
利用Flink的side output机制,我们可以获取到这些迟到的数据
final OutputTag<T> lateOutputTag = new OutputTag[T]("late-data"){};
DataStream<T> input = ... ;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutPutLateData(lateOutputTag)
.<windowed transformation>(window function)
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
# 设置allowedLateness之后,迟到的数据也可能触发窗口,如果使用的是Session window
可能会对窗口进行合并,产生预期外的行为
3:Window 内部实现
每条过车数据进来后,会先由WindowAssigner 分配到对应的window(滚动/滑动..)
当window经过watermark被trigger后,会交给Evictor(如果没有设置 则跳过),
然后处理UserFunction(用户处理的逻辑代码)
### Window中状态存储
#Flink 是支持 Exactly Once 处理语义的,那么 Window 中的状态存储和普通的状态存储又有什么不一样的地方呢?
A:从接口上可以认为没有区别,但是每个 Window 会属于不同的 namespace,
而非 Window 场景下,则都属于 VoidNamespace ,最终由 State/Checkpoint 来保证数据的 Exactly Once 语义
简单说:Window 中的的元素同样是通过 State 进行维护!!! 然后由 Checkpoint 机制保证 Exactly Once 语义
Kafka Connector 也是在 Flink 中使用 Operator State 的一个示例:维护topic 分区和offset的映射到Operator State
------------------- flink精确一次优势 ------------------
Spark Streaming 的端到端 Exactly-once 需要下游支持幂等、上游支持流量重放,
Spark Streaming 这一层做到了 At-least-once,正常情况下数据不重不少,但在程序重启时可能
会重发部分数据,为了实现全局的 Exactly-once,在下游做了去重逻辑
通过快照机制,Flink 获得了端到端数据一致性
------------- 怎样判断是否是新用户 ------------
跟判断是否首次入城车一样,redis 缓存数据库 永不过期 数据流实时对比判断
Flink 现在大部分开发属于实时汇总 : 简单算子+业务函数+水印+分组+聚合+json格式输出
source transformation sink
-------------------------- 怎样确定Flink集群所需资源 --------------------------
#吞吐量:
估算预期进入流计算系统每秒的记录数(吞吐量),以及每条记录数的大小
#不同key的数量以及每个key存储的state大小
key的数量和key所需state的大小,都将会影响Flink应用程序所需的资源.可以通过查看反压状态
#状态的更新频率和状态后端的访问模式:
不同的状态后端(RocksDB,java Heap)的访问模式差距很大.RocksDB每次读取和更新会进行序列化
反序列化以及JNI操作,Java Heap不支持增量checkpoint,会导致状态大的场景每次持久化的数据量很大
都会影响Flink作业所需的资源
#网络容量
网络容量不仅会受flink内部,也会受到Flink跟正在交互的kafka,hdfs等外部服务
比如启动kafka的replication会增加额外的网络容量
#磁盘带宽
如果应用程序依赖 RocksDB Kafka HDFS
#机器数量以及可用的CPU和内存
另外需要提供额外的资源来保证:
当你的 Flink 发生故障时,系统会需要额外的资源来做恢复工作以及从 Kafka topic 或其他消息客户端追上最新的数据
-------------------------- Flink 分支库 --------------------------
## Flink CEP
复杂事件处理,模式匹配
flink的CEP跟DataStream Api 都是对流式数据处理,有什么区别?
CEP:更看重流式数据中查找,也就是对源数据不做处理
DataStream:更看重对数据的加工和处理,一般不会在数据中查找匹配
--------------- flink监控 ---------------
grafana
https://blog.csdn.net/qq_22222499/article/details/95009793
--------------- Chandy-Lamport 算法 ---------------
将流计算看作成一个流式的拓扑,定期在这个拓扑的头部source点开始插入特殊的barriers(栅栏)
从上游开始不断向下游广播这个Barriers.
每一个节点收到所有的barriers,会将state做一次snapshot(快照)
当每个节点都做完Snapshot之后,整个拓扑就算做完一次checkpoint
接下来不管出现任何故障,都会从最近的checkpoint进行恢复
Flink用这套算法,保证了强一致性的语义,
也是Flink区别于其他无状态流计算引擎的核心区别
-------------- 实时计算的趋势 StreamSQL ---------
实时任务SQL化
阿里的实时计算方案就是Flink SQL 1CU 一天6 RMB
使用sql 开发实时计算程序
------------ 源表 && 维表 && 结果表 --------------
源表: 流数据分析的源表是指流式数据存储,流式数据存储驱动流数据分析的运行 相当于ods层
事实表&&维表 dwd层
事实表:事实表是数据聚合后依据某个维度生成的结果表,一般很大
维表:对数据分析时所用的一个量,你可以选择按类别来进行分析,或按区域来分析. 这样的按..分析就构成一个维度
结果表: 数据统计后的结果 dws层
mysql phoenix
-------------- flink程序调优 --------------
DataStream性能调优
A:配置内存
Flink是依赖内存计算,计算过程中内存不够对Flink的执行效率影响很大.可以通过监控GC
B:设置并行度
并行度控制任务的数量,影响操作后数据被切分成的块数。调整并行度让任务的数量
和每个任务处理的数据与机器的处理能力达到最优
设置合适的parallelism能提高运算效率,太多了和太少了都不行
C:配置进程参数
Flink on YARN模式下,有JobManager和TaskManager两种进程
JobManager负责任务的调度,以及TaskManager、 RM之间的消息通信
TaskManager的内存主要用于任务执行、通信
每个TaskManager每个核同时能跑一个task,所以增加了TaskManager的个数相当于增大了任务的并发度
--container 12 \ 12个taskmanager 每个Container都是一个独立的进程
--jobManagerMemory 10240 \
--taskManagerMemory 20480 \ 每个taskmanager有20G 内存 一共占用20G * 12
--slots 10 \ 每个taskmanager10核,slot 内存隔离,CPU不隔离
上述 每个slot的可用资源是2g
这样的一个公共的yarn-session,可以发布多个flink 任务
每个算子一个slot,flink优化将多个算子的并行度实例进行链式操作,链式操作结束后
得到task,再作为一个调度执行单元,放到一个线程里执行
好处:避免频繁线程切换时,影响吞吐量
优化:保持分配给Job的资源不变的情况下将总Container数量减半
D:设计分区方法 (#####)
设置分区是为了优化task的切分,在程序编写尽量分区均匀,避免数据倾斜,
防止某个task的执行时间过长导致整个job执行缓慢
1:随机分区 ,将元素随机地进行分区
dataStream.shuffle()
2:Rebalancing数据重分区
基于round-robin对元素进行分区,使得每个分区负载均衡,十分有用
dataStream.rebalance();
3: Rescaling 以round-robin的形式将元素分区到下游操作的子集中
dataStream.rescale();
4:广播 广播每个元素到所有的分区
dataStream.broadcast();
5:自定义分区
可以按照某个特征进行分区,从而优化任务执行
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(Tuple2.of("hello",1),
Tuple2.of("test",2), Tuple2.of("world",100));
// 定义用于分区的key值,返回即属于哪个partition的,该值加1就是对应的子任务的id号
Partitioner<Tuple2<String, Integer>> strPartitioner = new Partitioner<Tuple2<String,Integer>>() {
@Override
public int partition(Tuple2<String, Integer> key, int numPartitions) {
return (key.f0.length() + key.f1) % numPartitions;
}
};
// 使用Tuple2进行分区的key值
dataStream.partitionCustom(strPartitioner, new KeySelector<Tuple2<String, Integer>,Tuple2<String, Integer>>() {