Skip to content

Commit 10376a7

Browse files
committed
Feat(SpecParrten):User define log pattern to support log identify by cluster type , node type, node id.
1 parent fe9e287 commit 10376a7

File tree

4 files changed

+202
-2
lines changed

4 files changed

+202
-2
lines changed
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Flink(spark等计算集群日志采集) 日志第二弹(终极解决日志收集问题)
2+
3+
问题1 :
4+
5+
1. 无论是`Flink on yarn` 还是`on k8s`,都会存在一个问题,`crash`后的日志不方便查看。
6+
7+
> yarn 聚合是不包含重启`container`日志,`k8s` pod 重启也是类似(刨除共享存储)。
8+
>
9+
> 可以通过 Push 到 es 解决,只要 k8s 外挂log 目录, yarn 采集 tmp 下目录也可以实现类似的功能(crash 掉的文件也能重启)。
10+
11+
2. 把日志采集到`ES` ,如何区分`tm``jm`等角色日志,如何进行集群的唯一标识(作业标识做不了,对于 session 集群来说)。
12+
13+
> 方案1:
14+
>
15+
> Filebeat -> es
16+
>
17+
> 1. Filebeat 采集的时候进行文件名处理,在推送的日志前做好逻辑。(无需任何改动,唯一需要修改的是log4j 引入 appName 进行标识)
18+
>
19+
> 方案2:
20+
>
21+
> 修改log4j 配置 -> Filebeat -> Es
22+
>
23+
> Log4j 自定义`LogEventPatternConverter`方式,添加:集群标识,节点类型标识,节点唯一标识。(好处是解耦了filebeat 依赖,数据源解决问题,推送到任何源都含有标识)
24+
25+
**本文采用方案二**
26+
27+
```
28+
整体业务流程
29+
30+
## LogEventPatternConverter 添加信息
31+
-> k8s 挂载日志目录
32+
--> filebeat 扫描固定目录
33+
---> push es
34+
----> 用户按照使用的集群标识,对集群的日志搜索
35+
```
36+
37+
## LogEventPatternConverter
38+
39+
### 自定义 LogEventPatternConverter 规则
40+
41+
```
42+
43+
@Plugin(name = "FlinkLog", category = PatternConverter.CATEGORY)
44+
@ConverterKeys({"fl", "flinkLog"})
45+
public class SpecParrten extends LogEventPatternConverter {
46+
private final static Map<String, String> dic = new HashMap<String, String>() {{
47+
put("taskexecutor", "TaskManager");
48+
put("zookeeper", "FlinkZooKeeperQuorumPeer");
49+
put("historyserver", "HistoryServer");
50+
put("standalonesession", "JobManager");
51+
put("standalonejob", "JobManager");
52+
put("kubernetes-session", "JobManager");
53+
put("kubernetes-application", "JobManager");
54+
put("kubernetes-taskmanager", "TaskManager");
55+
}};
56+
57+
private String type = "NeedIni";
58+
private String nodeId = "NeedIni";
59+
60+
private SpecParrten(String name, String style) {
61+
super(name, style);
62+
63+
}
64+
65+
public static SpecParrten newInstance(final String[] options) {
66+
return new SpecParrten("FlinkLog", "flinkLog");
67+
}
68+
69+
@Override
70+
public void format(LogEvent logEvent, StringBuilder stringBuilder) {
71+
if (type.equals("NeedIni") || nodeId.equals("NeedIni")) {
72+
synchronized (this) {
73+
String filename = Paths.get(System.getProperty("log.file")).getFileName().toString();
74+
for (Map.Entry<String, String> each : dic.entrySet()) {
75+
if (filename.contains(each.getKey())) {
76+
this.type = each.getValue();
77+
filename.indexOf(each.getKey());
78+
this.nodeId = filename.substring(filename.indexOf(each.getKey()), filename.length() - 4);
79+
}
80+
}
81+
if (this.type.equals("NeedIni")) {
82+
this.type = "unknow";
83+
this.nodeId = "unknow";
84+
}
85+
}
86+
}
87+
stringBuilder.append(this.type);
88+
stringBuilder.append(" ");
89+
stringBuilder.append(this.nodeId);
90+
}
91+
}
92+
93+
```
94+
95+
### log4j 配置修改
96+
97+
```
98+
第一个填入 集群标识,第二引用自定义 LogEventPatternConverter 后面的信息保持原状
99+
appender.main.layout.pattern = test_cluster %fl %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
100+
101+
```
102+
103+
### 日志打印效果
104+
105+
```
106+
test_cluster JobManager standalonesession-1-cdh01 2022-03-10 14:24:45,337 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils [] - Trying to staxxxx
107+
```
108+
109+

flinklearn/docs/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,7 @@
4949

5050
- [Flink on Yarn 日志-历史任务如何查看tm日志(yarn logs源码解析&问题解决)](https://github.com/Whojohn/learn/blob/master/flinklearn/docs/Flink_on_Yarn%E6%97%A5%E5%BF%97_tm%E6%97%A5%E5%BF%97%E6%9F%A5%E7%9C%8B%E9%97%AE%E9%A2%98.md)
5151

52+
- [Flink(spark等计算集群日志采集).md](https://github.com/Whojohn/learn/blob/master/flinklearn/docs/Flink%20%E6%97%A5%E5%BF%97_%E7%BB%88%E6%9E%81%E8%A7%A3%E5%86%B3%E6%97%A5%E5%BF%97%E6%94%B6%E9%9B%86%E9%97%AE%E9%A2%98.md)
5253

53-
- [Flink 数据交换(网络)](https://github.com/Whojohn/learn/tree/master/flinklearn/docs/Flink%20%E6%95%B0%E6%8D%AE%E4%BA%A4%E6%8D%A2(%E7%BD%91%E7%BB%9C).md)
54+
55+
- [Flink 数据交换(网络)](https://github.com/Whojohn/learn/tree/master/flinklearn/docs/Flink%20%E6%95%B0%E6%8D%AE%E4%BA%A4%E6%8D%A2(%E7%BD%91%E7%BB%9C).md)

flinklearn/pom.xml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@
196196
<groupId>org.apache.flink</groupId>
197197
<artifactId>flink-sql-client_${scala.binary.version}</artifactId>
198198
<version>${flink.version}</version>
199-
<scope>compile</scope>
199+
<scope>provided</scope>
200200
</dependency>
201201

202202
<dependency>
@@ -206,13 +206,40 @@
206206
<scope>provided</scope>
207207
</dependency>
208208

209+
210+
<dependency>
211+
<groupId>org.apache.flink</groupId>
212+
<artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
213+
<version>${flink.version}</version>
214+
<scope>provided</scope>
215+
</dependency>
216+
209217
<dependency>
210218
<groupId>mysql</groupId>
211219
<artifactId>mysql-connector-java</artifactId>
212220
<version>8.0.17</version>
213221
<scope>provided</scope>
214222
</dependency>
215223

224+
<dependency>
225+
<groupId>redis.clients</groupId>
226+
<artifactId>jedis</artifactId>
227+
<version>3.6.2</version>
228+
</dependency>
229+
230+
<dependency>
231+
<groupId>org.apache.logging.log4j</groupId>
232+
<artifactId>log4j-api</artifactId>
233+
<version>2.12.1</version>
234+
<scope>provided</scope>
235+
</dependency>
236+
<dependency>
237+
<groupId>org.apache.logging.log4j</groupId>
238+
<artifactId>log4j-core</artifactId>
239+
<version>2.12.1</version>
240+
<scope>provided</scope>
241+
</dependency>
242+
216243
</dependencies>
217244

218245

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package log;
2+
3+
import org.apache.logging.log4j.core.LogEvent;
4+
import org.apache.logging.log4j.core.config.plugins.Plugin;
5+
import org.apache.logging.log4j.core.pattern.ConverterKeys;
6+
import org.apache.logging.log4j.core.pattern.LogEventPatternConverter;
7+
import org.apache.logging.log4j.core.pattern.PatternConverter;
8+
9+
import java.nio.file.Paths;
10+
import java.util.HashMap;
11+
import java.util.Map;
12+
13+
14+
@Plugin(name = "FlinkLog", category = PatternConverter.CATEGORY)
15+
@ConverterKeys({"fl", "flinkLog"})
16+
public class SpecParrten extends LogEventPatternConverter {
17+
private final static Map<String, String> dic = new HashMap<String, String>() {{
18+
put("taskexecutor", "TaskManager");
19+
put("zookeeper", "FlinkZooKeeperQuorumPeer");
20+
put("historyserver", "HistoryServer");
21+
put("standalonesession", "JobManager");
22+
put("standalonejob", "JobManager");
23+
put("kubernetes-session", "JobManager");
24+
put("kubernetes-application", "JobManager");
25+
put("kubernetes-taskmanager", "TaskManager");
26+
}};
27+
28+
private String type = "NeedIni";
29+
private String nodeId = "NeedIni";
30+
31+
private SpecParrten(String name, String style) {
32+
super(name, style);
33+
34+
}
35+
36+
public static SpecParrten newInstance(final String[] options) {
37+
return new SpecParrten("FlinkLog", "flinkLog");
38+
}
39+
40+
@Override
41+
public void format(LogEvent logEvent, StringBuilder stringBuilder) {
42+
if (type.equals("NeedIni") || nodeId.equals("NeedIni")) {
43+
synchronized (this) {
44+
String filename = Paths.get(System.getProperty("log.file")).getFileName().toString();
45+
for (Map.Entry<String, String> each : dic.entrySet()) {
46+
if (filename.contains(each.getKey())) {
47+
this.type = each.getValue();
48+
filename.indexOf(each.getKey());
49+
this.nodeId = filename.substring(filename.indexOf(each.getKey()), filename.length() - 4);
50+
}
51+
}
52+
if (this.type.equals("NeedIni")) {
53+
this.type = "unknow";
54+
this.nodeId = "unknow";
55+
}
56+
}
57+
}
58+
stringBuilder.append(this.type);
59+
stringBuilder.append(" ");
60+
stringBuilder.append(this.nodeId);
61+
}
62+
}

0 commit comments

Comments
 (0)