Skip to content

Commit

Permalink
[#1201] improvement: only invoking LOG.debug when LOG.isDebugEnabled(…
Browse files Browse the repository at this point in the history
…) is true (#1217)

### What changes were proposed in this pull request?

Improve invoking LOG.debug when LOG.isDebugEnabled() is true .

### Why are the changes needed?

Fix: #1201

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
No
  • Loading branch information
misselvexu authored Oct 8, 2023
1 parent 81844b0 commit 05aba70
Show file tree
Hide file tree
Showing 25 changed files with 242 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ private String doRequest(URI uri, String authHeader, RequestBuilder requestBuild
}
HttpUriRequest httpRequest = requestBuilder.setUri(uri).build();

LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri);
if (LOG.isDebugEnabled()) {
LOG.debug("Executing {} request: {}", httpRequest.getMethod(), uri);
}

ResponseHandler<String> responseHandler =
resp -> {
Expand All @@ -112,7 +114,9 @@ private String doRequest(URI uri, String authHeader, RequestBuilder requestBuild
};

response = httpclient.execute(httpRequest, responseHandler);
LOG.debug("Response: {}", response);
if (LOG.isDebugEnabled()) {
LOG.debug("Response: {}", response);
}
} catch (ConnectException | ConnectTimeoutException | NoHttpResponseException e) {
throw new UniffleRestException("Api request failed for " + uri.toString(), e);
} catch (UniffleRestException rethrow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,15 @@ public int compare(Record<K> o1, Record<K> o2) {

private boolean compact(int lastIndex, int lastOffset, int dataLength) {
if (lastIndex != currentIndex) {
LOG.debug(
"compact lastIndex {}, currentIndex {}, lastOffset {} currentOffset {} dataLength {}",
lastIndex,
currentIndex,
lastOffset,
currentOffset,
dataLength);
if (LOG.isDebugEnabled()) {
LOG.debug(
"compact lastIndex {}, currentIndex {}, lastOffset {} currentOffset {} dataLength {}",
lastIndex,
currentIndex,
lastOffset,
currentOffset,
dataLength);
}
WrappedBuffer buffer = new WrappedBuffer(lastOffset + dataLength);
// copy data
int offset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ public void addRecord(int partitionId, K key, V value) throws IOException, Inter
if (length > maxMemSize) {
throw new RssException("record is too big");
}
LOG.debug("memoryUsedSize {} increase {}", memoryUsedSize, length);
if (LOG.isDebugEnabled()) {
LOG.debug("memoryUsedSize {} increase {}", memoryUsedSize, length);
}
memoryUsedSize.addAndGet(length);
if (buffer.getDataLength() > maxBufferSize) {
if (waitSendBuffers.remove(buffer)) {
Expand Down Expand Up @@ -255,7 +257,9 @@ public void run() {
} finally {
try {
memoryLock.lock();
LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
if (LOG.isDebugEnabled()) {
LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
}
memoryUsedSize.addAndGet(-size);
inSendListBytes.addAndGet(-size);
full.signalAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ public void acceptMapCompletionEvents() throws IOException {
maxEventsToFetch,
(org.apache.hadoop.mapred.TaskAttemptID) reduce);
events = update.getMapTaskCompletionEvents();
LOG.debug("Got " + events.length + " map completion events from " + fromEventIdx);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + events.length + " map completion events from " + fromEventIdx);
}

assert !update.shouldReset() : "Unexpected legacy state";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,30 +207,34 @@ public synchronized MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedS
throws IOException {
// we disable OnDisk MapOutput to avoid merging disk immediate data
if (usedMemory > memoryLimit) {
if (LOG.isDebugEnabled()) {
LOG.debug(
mapId
+ ": Stalling shuffle since usedMemory ("
+ usedMemory
+ ") is greater than memoryLimit ("
+ memoryLimit
+ ")."
+ " CommitMemory is ("
+ commitMemory
+ ")");
}
return null;
}

// Allow the in-memory shuffle to progress
if (LOG.isDebugEnabled()) {
LOG.debug(
mapId
+ ": Stalling shuffle since usedMemory ("
+ ": Proceeding with shuffle since usedMemory ("
+ usedMemory
+ ") is greater than memoryLimit ("
+ ") is lesser than memoryLimit ("
+ memoryLimit
+ ")."
+ " CommitMemory is ("
+ "CommitMemory is ("
+ commitMemory
+ ")");
return null;
}

// Allow the in-memory shuffle to progress
LOG.debug(
mapId
+ ": Proceeding with shuffle since usedMemory ("
+ usedMemory
+ ") is lesser than memoryLimit ("
+ memoryLimit
+ ")."
+ "CommitMemory is ("
+ commitMemory
+ ")");
usedMemory += requestedSize;
// use this rss merger as the callback
return new InMemoryMapOutput<K, V>(jobConf, mapId, this, (int) requestedSize, codec, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,18 @@ private List<ShuffleBlockInfo> insertIntoBuffer(
sentBlocks.add(createShuffleBlock(partitionId, wb));
copyTime += wb.getCopyTime();
buffers.remove(partitionId);
LOG.debug(
"Single buffer is full for shuffleId["
+ shuffleId
+ "] partition["
+ partitionId
+ "] with memoryUsed["
+ wb.getMemoryUsed()
+ "], dataLength["
+ wb.getDataLength()
+ "]");
if (LOG.isDebugEnabled()) {
LOG.debug(
"Single buffer is full for shuffleId["
+ shuffleId
+ "] partition["
+ partitionId
+ "] with memoryUsed["
+ wb.getMemoryUsed()
+ "], dataLength["
+ wb.getDataLength()
+ "]");
}
}
} else {
// The true of hasRequested means the former partitioned buffer has been flushed, that is
Expand Down Expand Up @@ -401,12 +403,14 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
shuffleBlockInfosPerEvent.add(sbi);
// split shuffle data according to the size
if (totalSize > sendSizeLimit) {
LOG.debug(
"Build event with "
+ shuffleBlockInfosPerEvent.size()
+ " blocks and "
+ totalSize
+ " bytes");
if (LOG.isDebugEnabled()) {
LOG.debug(
"Build event with "
+ shuffleBlockInfosPerEvent.size()
+ " blocks and "
+ totalSize
+ " bytes");
}
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
Expand All @@ -424,12 +428,14 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
}
}
if (!shuffleBlockInfosPerEvent.isEmpty()) {
LOG.debug(
"Build event with "
+ shuffleBlockInfosPerEvent.size()
+ " blocks and "
+ totalSize
+ " bytes");
if (LOG.isDebugEnabled()) {
LOG.debug(
"Build event with "
+ shuffleBlockInfosPerEvent.size()
+ " blocks and "
+ totalSize
+ " bytes");
}
// Use final temporary variables for closures
final long memoryUsedTemp = memoryUsed;
final List<ShuffleBlockInfo> shuffleBlocksTemp = shuffleBlockInfosPerEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ public static TezClassLoader getInstance() {
}

public static void setupTezClassLoader() {
LOG.debug(
"Setting up TezClassLoader: thread: {}, current thread classloader: {} system classloader: {}",
Thread.currentThread().getId(),
Thread.currentThread().getContextClassLoader(),
ClassLoader.getSystemClassLoader());
if (LOG.isDebugEnabled()) {
LOG.debug(
"Setting up TezClassLoader: thread: {}, current thread classloader: {} system classloader: {}",
Thread.currentThread().getId(),
Thread.currentThread().getContextClassLoader(),
ClassLoader.getSystemClassLoader());
}
Thread.currentThread().setContextClassLoader(INSTANCE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ public static void initAndStartRSSClient(final RssDAGAppMaster appMaster, Config
() -> {
try {
appMaster.getShuffleWriteClient().sendAppHeartbeat(strAppAttemptId, heartbeatTimeout);
LOG.debug("Finish send heartbeat to coordinator and servers");
if (LOG.isDebugEnabled()) {
LOG.debug("Finish send heartbeat to coordinator and servers");
}
} catch (Exception e) {
LOG.warn("Fail to send heartbeat to coordinator and servers", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ public void run() {
} finally {
try {
memoryLock.lock();
LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
if (LOG.isDebugEnabled()) {
LOG.debug("memoryUsedSize {} decrease {}", memoryUsedSize, size);
}
memoryUsedSize.addAndGet(-size);
inSendListBytes.addAndGet(-size);
full.signalAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ private boolean sendShuffleDataAsync(
if (defectiveServers != null) {
defectiveServers.remove(ssi);
}
LOG.debug("{} successfully.", logMsg);
if (LOG.isDebugEnabled()) {
LOG.debug("{} successfully.", logMsg);
}
} else {
if (defectiveServers != null) {
defectiveServers.add(ssi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public static FileSystem getFilesystem(String user, Path path, Configuration con
}

if (fileSystem instanceof LocalFileSystem) {
LOGGER.debug("{} is local file system", path);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} is local file system", path);
}
return ((LocalFileSystem) fileSystem).getRawFileSystem();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ public void initChannel(SocketChannel ch) {
TransportClient client = clientRef.get();
assert client != null : "Channel future completed successfully with null client";

logger.debug("Connection to {} successful", address);
if (logger.isDebugEnabled()) {
logger.debug("Connection to {} successful", address);
}

return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ public void exceptionCaught(Throwable cause) {

@Override
public void channelActive() {
logger.debug("channelActive: {}", reverseClient.getSocketAddress());
if (logger.isDebugEnabled()) {
logger.debug("channelActive: {}", reverseClient.getSocketAddress());
}
}

@Override
public void channelInactive() {
logger.debug("channelInactive: {}", reverseClient.getSocketAddress());
if (logger.isDebugEnabled()) {
logger.debug("channelInactive: {}", reverseClient.getSocketAddress());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ public void heartbeat(
.setRetMsg("")
.setStatus(StatusCode.SUCCESS)
.build();
LOG.debug("Got heartbeat from {}", serverNode);
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from {}", serverNode);
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
Expand Down Expand Up @@ -214,7 +216,9 @@ public void appHeartbeat(
AppHeartBeatRequest request, StreamObserver<AppHeartBeatResponse> responseObserver) {
String appId = request.getAppId();
coordinatorServer.getApplicationManager().refreshAppId(appId);
LOG.debug("Got heartbeat from application: {}", appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Got heartbeat from application: {}", appId);
}
AppHeartBeatResponse response =
AppHeartBeatResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();

Expand All @@ -235,7 +239,9 @@ public void registerApplicationInfo(
String appId = request.getAppId();
String user = request.getUser();
coordinatorServer.getApplicationManager().registerApplicationInfo(appId, user);
LOG.debug("Got a registered application info: {}", appId);
if (LOG.isDebugEnabled()) {
LOG.debug("Got a registered application info: {}", appId);
}
ApplicationInfoResponse response =
ApplicationInfoResponse.newBuilder().setRetMsg("").setStatus(StatusCode.SUCCESS).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public AccessCandidatesChecker(AccessManager accessManager) throws Exception {
LOG.error(msg);
throw new RssException(msg);
}
LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
if (LOG.isDebugEnabled()) {
LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
}

int updateIntervalS =
conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC);
Expand All @@ -96,7 +98,9 @@ public AccessCheckResult check(AccessInfo accessInfo) {
String accessId = accessInfo.getAccessId().trim();
if (!candidates.get().contains(accessId)) {
String msg = String.format("Denied by AccessCandidatesChecker, accessInfo[%s].", accessInfo);
LOG.debug("Candidates is {}, {}", candidates.get(), msg);
if (LOG.isDebugEnabled()) {
LOG.debug("Candidates is {}, {}", candidates.get(), msg);
}
CoordinatorMetrics.counterTotalCandidatesDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
Expand All @@ -119,7 +123,9 @@ private void updateAccessCandidates() {
if (lastCandidatesUpdateMS.get() != lastModifiedMS) {
updateAccessCandidatesInternal();
lastCandidatesUpdateMS.set(lastModifiedMS);
LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
if (LOG.isDebugEnabled()) {
LOG.debug("Load candidates: {}", String.join(";", candidates.get()));
}
}
} else {
LOG.warn("Candidates file not found.");
Expand Down
Loading

0 comments on commit 05aba70

Please sign in to comment.