Skip to content

Commit

Permalink
[FLINK-20713][core][runtime] out/err file support rolling
Browse files Browse the repository at this point in the history
  • Loading branch information
muzimusi committed Dec 23, 2020
1 parent d42aa45 commit 7cfd1cf
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.log;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

/**
* Redirect System.out System.err
*/
public class LoggerStream extends java.io.OutputStream {
protected static final byte[] LINE_SEPERATOR_BYTES = System.getProperty("line.separator").getBytes();
private ByteArrayOutputStream baos = new ByteArrayOutputStream();

private org.slf4j.Logger logger;
private String level;

public LoggerStream(org.slf4j.Logger logger, String level) {
if (logger == null) {
throw new IllegalArgumentException("logger cannot be null.");
}
this.logger = logger;
this.level = level;
}

@Override
public void write(int b) throws IOException {
if (b == LINE_SEPERATOR_BYTES[LINE_SEPERATOR_BYTES.length - 1]) {
switch (this.level) {
case "ERROR":
logger.error(baos.toString());
break;
case "INFO":
default:
logger.info(baos.toString());
break;
}
baos.reset();
} else {
baos.write(b);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.log;

import org.slf4j.LoggerFactory;

import java.io.PrintStream;

public class OutErrLoggerUitls {

private static String STDOUT_APPENDER = "stdout";
private static String STDERR_APPENDER = "stderr";

private static String INFO = "INFO";
private static String ERROR = "ERROR";

public static void setOutAndErrToLog() {
setOutToLog(STDOUT_APPENDER, INFO);
setErrToLog(STDERR_APPENDER, ERROR);
}

private static void setOutToLog(String name, String level) {
System.setOut(new PrintStream(new LoggerStream(LoggerFactory.getLogger(name), level)));
}

private static void setErrToLog(String name, String level) {
System.setErr(new PrintStream(new LoggerStream(LoggerFactory.getLogger(name), level)));
}
}
38 changes: 38 additions & 0 deletions flink-dist/src/main/flink-bin/conf/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ monitorInterval=30
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# logger for System.out & System.err
logger.stdout.name = stdout
logger.stdout.level = INFO
logger.stdout.additivity = false
logger.stdout.appenderRef.rolling.ref = OutAppender

logger.stderr.name = stderr
logger.stderr.level = INFO
logger.stderr.additivity = false
logger.stderr.appenderRef.rolling.ref = ErrAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
Expand Down Expand Up @@ -54,6 +65,33 @@ appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}

# logger for System.out & System.err
appender.out.name = OutAppender
appender.out.type = RollingFile
appender.out.append = false
appender.out.fileName = ${sys:out.file}
appender.out.layout.type = PatternLayout
appender.out.filePattern = ${sys:out.file}.%i
appender.out.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.out.policies.type = Policies
appender.out.policies.size.type = SizeBasedTriggeringPolicy
appender.out.policies.size.size=25MB
appender.out.strategy.type = DefaultRolloverStrategy
appender.out.strategy.max = 50

appender.err.name = ErrAppender
appender.err.type = RollingFile
appender.err.append = false
appender.err.fileName = ${sys:err.file}
appender.err.layout.type = PatternLayout
appender.err.filePattern = ${sys:err.file}.%i
appender.err.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.err.policies.type = Policies
appender.err.policies.size.type = SizeBasedTriggeringPolicy
appender.err.policies.size.size=25MB
appender.err.strategy.type = DefaultRolloverStrategy
appender.err.strategy.max = 50

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
Original file line number Diff line number Diff line change
Expand Up @@ -333,18 +333,18 @@ public static String getCommonStartCommand(
startCommandValues.put("jvmmem", jvmMemOpts);

final String opts;
final String logFileName;
final String fileName;
if (mode == ClusterComponent.JOB_MANAGER) {
opts = getJavaOpts(flinkConfig, CoreOptions.FLINK_JM_JVM_OPTIONS);
logFileName = "jobmanager";
fileName = "jobmanager";
} else {
opts = getJavaOpts(flinkConfig, CoreOptions.FLINK_TM_JVM_OPTIONS);
logFileName = "taskmanager";
fileName = "taskmanager";
}
startCommandValues.put("jvmopts", opts);

startCommandValues.put("logging",
getLogging(logDirectory + "/" + logFileName + ".log", configDirectory, hasLogback, hasLog4j));
getLogging(logDirectory + "/" + fileName, configDirectory, hasLogback, hasLog4j));

startCommandValues.put("class", mainClass);

Expand Down Expand Up @@ -379,10 +379,12 @@ private static String getJavaOpts(Configuration flinkConfig, ConfigOption<String
}
}

private static String getLogging(String logFile, String confDir, boolean hasLogback, boolean hasLog4j) {
private static String getLogging(String fileName, String confDir, boolean hasLogback, boolean hasLog4j) {
StringBuilder logging = new StringBuilder();
if (hasLogback || hasLog4j) {
logging.append("-Dlog.file=").append(logFile);
logging.append("-Dout.file=").append(fileName).append(".out");
logging.append(" -Dlog.file=").append(fileName).append(".log");
logging.append(" -Derr.file=").append(fileName).append(".err");
if (hasLogback) {
logging.append(" -Dlogback.configurationFile=file:").append(confDir).append("/").append(CONFIG_FILE_LOGBACK_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ public static String getTaskManagerShellCommand(

String logging = "";
if (hasLogback || hasLog4j) {
logging = "-Dlog.file=" + logDirectory + "/taskmanager.log";
logging = "-Dout.file=" + logDirectory + "/taskmanager.out";
logging += " -Dlog.file=" + logDirectory + "/taskmanager.log";
logging += " -Derr.file=" + logDirectory + "/taskmanager.err";
if (hasLogback) {
logging +=
" -Dlogback.configurationFile=file:" + configDirectory +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.util.log.OutErrLoggerUitls;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -524,6 +526,8 @@ protected static Configuration loadConfiguration(EntrypointClusterConfiguration

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {

OutErrLoggerUitls.setOutAndErrToLog();

final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {
clusterEntrypoint.startCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TaskManagerExceptionUtils;

import org.apache.flink.util.log.OutErrLoggerUitls;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -332,6 +334,7 @@ public static Configuration loadConfiguration(String[] args) throws FlinkParseEx
}

public static void runTaskManager(Configuration configuration, PluginManager pluginManager) throws Exception {
OutErrLoggerUitls.setOutAndErrToLog();
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, pluginManager, TaskManagerRunner::createTaskExecutorService);

taskManagerRunner.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ private static String getLog4jCommand(final String logConfigFilePath) {
return "";
}

return new StringBuilder("-Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"")
return new StringBuilder("-Dout.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out\"")
.append(" -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.log\"")
.append(" -Derr.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err\"")
.append(" -Dlog4j.configuration=file:" + CONFIG_FILE_LOG4J_NAME)
.append(" -Dlog4j.configurationFile=file:" + CONFIG_FILE_LOG4J_NAME)
.toString();
Expand Down

0 comments on commit 7cfd1cf

Please sign in to comment.