Skip to content

HADOOP-18631. Migrate Async appenders to log4j properties #5418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,12 +733,43 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
/**
* Deprecated. Use log4j properties instead.
* Set system env variable HDFS_AUDIT_LOGGER, which in tern assigns the value to
* "hdfs.audit.logger" for log4j properties to determine log level and appender.
*/
@Deprecated
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
@Deprecated
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY = "dfs.namenode.audit.log.async.blocking";

/**
* Deprecated. Use log4j properties instead.
* Set value to Async appender "blocking" property as part of log4j properties configuration.
* <p>
* For example,
* log4j.appender.ASYNCAPPENDER=org.apache.log4j.AsyncAppender
* log4j.appender.ASYNCAPPENDER.blocking=false
*/
@Deprecated
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY =
"dfs.namenode.audit.log.async.blocking";
@Deprecated
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_DEFAULT = true;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY = "dfs.namenode.audit.log.async.buffer.size";
public static final int DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT = 128;

/**
* Deprecated. Use log4j properties instead.
* Set value to Async appender "bufferSize" property as part of log4j properties configuration.
* <p>
* For example,
* log4j.appender.ASYNCAPPENDER=org.apache.log4j.AsyncAppender
* log4j.appender.ASYNCAPPENDER.bufferSize=128
*/
@Deprecated
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY =
"dfs.namenode.audit.log.async.buffer.size";
@Deprecated
public static final int DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT = 128;
public static final String DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST = "dfs.namenode.audit.log.debug.cmdlist";
public static final String DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY =
"dfs.namenode.metrics.logger.period.seconds";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.hadoop.hdfs.server.common;

import java.lang.management.ManagementFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.management.Attribute;
Expand All @@ -34,8 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;

/**
* MetricsLoggerTask can be used as utility to dump metrics to log.
Expand All @@ -56,12 +52,12 @@ public class MetricsLoggerTask implements Runnable {
}
}

private org.apache.log4j.Logger metricsLog;
private Logger metricsLog;
private String nodeName;
private short maxLogLineLength;

public MetricsLoggerTask(String metricsLog, String nodeName, short maxLogLineLength) {
this.metricsLog = org.apache.log4j.Logger.getLogger(metricsLog);
this.metricsLog = LoggerFactory.getLogger(metricsLog);
this.nodeName = nodeName;
this.maxLogLineLength = maxLogLineLength;
}
Expand Down Expand Up @@ -115,8 +111,11 @@ private String trimLine(String valueStr) {
.substring(0, maxLogLineLength) + "...");
}

private static boolean hasAppenders(org.apache.log4j.Logger logger) {
return logger.getAllAppenders().hasMoreElements();
// TODO : hadoop-logging module to hide log4j implementation details, this method
// can directly call utility from hadoop-logging.
private static boolean hasAppenders(Logger logger) {
return org.apache.log4j.Logger.getLogger(logger.getName()).getAllAppenders()
.hasMoreElements();
}

/**
Expand All @@ -138,26 +137,4 @@ private static Set<String> getFilteredAttributes(MBeanInfo mBeanInfo) {
return attributeNames;
}

/**
* Make the metrics logger async and add all pre-existing appenders to the
* async appender.
*/
public static void makeMetricsLoggerAsync(String metricsLog) {
org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(metricsLog);
logger.setAdditivity(false); // Don't pollute actual logs with metrics dump

@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4058,8 +4058,6 @@ protected void startMetricsLogger() {
return;
}

MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG_NAME);

// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,16 @@
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Lists;
import org.apache.log4j.Logger;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.slf4j.LoggerFactory;

/**
* FSNamesystem is a container of both transient
* and persisted name-space state, and does all the book-keeping
Expand Down Expand Up @@ -384,8 +381,7 @@
public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNodeMXBean, ReplicatedBlocksMBean, ECBlockGroupsMBean {

public static final org.slf4j.Logger LOG = LoggerFactory
.getLogger(FSNamesystem.class.getName());
public static final Logger LOG = LoggerFactory.getLogger(FSNamesystem.class);

// The following are private configurations
public static final String DFS_NAMENODE_SNAPSHOT_TRASHROOT_ENABLED =
Expand Down Expand Up @@ -488,7 +484,8 @@ private boolean isClientPortInfoAbsent(CallerContext ctx){
* perm=&lt;permissions (optional)&gt;
* </code>
*/
public static final Logger AUDIT_LOG = Logger.getLogger(FSNamesystem.class.getName() + ".audit");
public static final Logger AUDIT_LOG =
LoggerFactory.getLogger(FSNamesystem.class.getName() + ".audit");

private final int maxCorruptFileBlocksReturn;
private final boolean isPermissionEnabled;
Expand Down Expand Up @@ -858,11 +855,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
throws IOException {
provider = DFSUtil.createKeyProviderCryptoExtension(conf);
LOG.info("KeyProvider: " + provider);
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog(conf);
}
checkForAsyncLogEnabledByOldConfigs(conf);
auditLogWithRemotePort =
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_KEY,
DFS_NAMENODE_AUDIT_LOG_WITH_REMOTE_PORT_DEFAULT);
Expand Down Expand Up @@ -1076,6 +1069,14 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
}
}

@SuppressWarnings("deprecation")
private static void checkForAsyncLogEnabledByOldConfigs(Configuration conf) {
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.warn("Use log4j properties to enable async log for audit logs. {} is deprecated",
DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY);
}
}

@VisibleForTesting
public List<AuditLogger> getAuditLoggers() {
return auditLoggers;
Expand Down Expand Up @@ -8856,30 +8857,6 @@ public void logAuditMessage(String message) {
}
}

private static void enableAsyncAuditLog(Configuration conf) {
Logger logger = AUDIT_LOG;
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
asyncAppender.setBlocking(conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BLOCKING_DEFAULT
));
asyncAppender.setBufferSize(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_BUFFER_SIZE_DEFAULT
));
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
/**
* Return total number of Sync Operations on FSEditLog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,6 @@ protected void startMetricsLogger(Configuration conf) {
return;
}

MetricsLoggerTask.makeMetricsLoggerAsync(METRICS_LOG_NAME);

// Schedule the periodic logging.
metricsLoggerTimer = new ScheduledThreadPoolExecutor(1);
metricsLoggerTimer.setExecuteExistingDelayedTasksAfterShutdownPolicy(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.util;

import java.io.IOException;

import org.apache.log4j.AsyncAppender;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.apache.log4j.spi.LoggingEvent;

/**
* Until we migrate to log4j2, use this appender for namenode audit logger as well as
* datanode and namenode metric loggers with log4j1 properties.
* This appender will take parameters necessary to supply RollingFileAppender to AsyncAppender.
* While migrating to log4j2, we can directly specify appender ref to Async appender.
*/
public class AsyncRFAAppender extends AsyncAppender {
Copy link
Contributor Author

@virajjasani virajjasani Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this patch, even though we have new custom appender, it's only temporary here.

This solves two purposes:

  1. When we migrate from log4j to log4j2 properties, we can directly use log4j2 Async appender and provide appender ref as other appenders (i.e. wrap RFA appender to Async appender). Hence, this custom appender no longer needs to be migrated to log4j2, rather we can remove it cleanly.
  2. Before migrating to log4j2, we still have a way to wrap RFA in Async appender without touching the code. We can directly refer to this custom appender in log4j.properties (as done with this patch)

It's temporary but necessary until going to log4j2, so that we don't loose functionality of wrapping RFA in Async appender.


/**
* The default maximum file size is 10MB.
*/
private String maxFileSize = String.valueOf(10*1024*1024);

/**
* There is one backup file by default.
*/
private int maxBackupIndex = 1;

/**
* The name of the log file.
*/
private String fileName = null;

private String conversionPattern = null;

/**
* Does appender block when buffer is full.
*/
private boolean blocking = true;

/**
* Buffer size.
*/
private int bufferSize = DEFAULT_BUFFER_SIZE;

private RollingFileAppender rollingFileAppender = null;

private final Object rollingFileAppenderLock = new Object();

@Override
public void append(LoggingEvent event) {
if (rollingFileAppender == null) {
synchronized (rollingFileAppenderLock) {
PatternLayout patternLayout;
if (conversionPattern != null) {
patternLayout = new PatternLayout(conversionPattern);
} else {
patternLayout = new PatternLayout();
}
try {
rollingFileAppender = new RollingFileAppender(patternLayout, fileName, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
rollingFileAppender.setMaxBackupIndex(maxBackupIndex);
rollingFileAppender.setMaxFileSize(maxFileSize);
this.addAppender(rollingFileAppender);
super.setBlocking(blocking);
super.setBufferSize(bufferSize);
}
}
super.append(event);
}

public String getMaxFileSize() {
return maxFileSize;
}

public void setMaxFileSize(String maxFileSize) {
this.maxFileSize = maxFileSize;
}

public int getMaxBackupIndex() {
return maxBackupIndex;
}

public void setMaxBackupIndex(int maxBackupIndex) {
this.maxBackupIndex = maxBackupIndex;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public String getConversionPattern() {
return conversionPattern;
}

public void setConversionPattern(String conversionPattern) {
this.conversionPattern = conversionPattern;
}

public boolean isBlocking() {
return blocking;
}

public void setBlocking(boolean blocking) {
this.blocking = blocking;
}

public int getBufferSize() {
return bufferSize;
}

public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}
Loading