Skip to content

Commit

Permalink
YARN-11578. Cache fs supports chmod in LogAggregationFileController. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
tomicooler authored Oct 20, 2023
1 parent 885af9d commit 702e5e7
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -113,6 +115,35 @@ public abstract class LogAggregationFileController {

protected boolean fsSupportsChmod = true;

private static class FsLogPathKey {
private Class<? extends FileSystem> fsType;
private Path logPath;

FsLogPathKey(Class<? extends FileSystem> fsType, Path logPath) {
this.fsType = fsType;
this.logPath = logPath;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FsLogPathKey that = (FsLogPathKey) o;
return Objects.equals(fsType, that.fsType) && Objects.equals(logPath, that.logPath);
}

@Override
public int hashCode() {
return Objects.hash(fsType, logPath);
}
}
private static final ConcurrentHashMap<FsLogPathKey, Boolean> FS_CHMOD_CACHE
= new ConcurrentHashMap<>();

public LogAggregationFileController() {}

/**
Expand Down Expand Up @@ -370,26 +401,34 @@ public void verifyAndCreateRemoteLogDir() {
+ remoteRootLogDir + "]", e);
}
} else {
//Check if FS has capability to set/modify permissions
Path permissionCheckFile = new Path(qualified, String.format("%s.permission_check",
RandomStringUtils.randomAlphanumeric(8)));
final FsLogPathKey key = new FsLogPathKey(remoteFS.getClass(), qualified);
FileSystem finalRemoteFS = remoteFS;
fsSupportsChmod = FS_CHMOD_CACHE.computeIfAbsent(key,
k -> checkFsSupportsChmod(finalRemoteFS, remoteRootLogDir, qualified));
}
}

private boolean checkFsSupportsChmod(FileSystem remoteFS, Path logDir, Path qualified) {
//Check if FS has capability to set/modify permissions
Path permissionCheckFile = new Path(qualified, String.format("%s.permission_check",
RandomStringUtils.randomAlphanumeric(8)));
try {
remoteFS.createNewFile(permissionCheckFile);
remoteFS.setPermission(permissionCheckFile, new FsPermission(TLDIR_PERMISSIONS));
return true;
} catch (UnsupportedOperationException use) {
LOG.info("Unable to set permissions for configured filesystem since"
+ " it does not support this {}", remoteFS.getScheme());
} catch (IOException e) {
LOG.warn("Failed to check if FileSystem supports permissions on "
+ "remoteLogDir [{}]", logDir, e);
} finally {
try {
remoteFS.createNewFile(permissionCheckFile);
remoteFS.setPermission(permissionCheckFile, new FsPermission(TLDIR_PERMISSIONS));
} catch (UnsupportedOperationException use) {
LOG.info("Unable to set permissions for configured filesystem since"
+ " it does not support this", remoteFS.getScheme());
fsSupportsChmod = false;
} catch (IOException e) {
LOG.warn("Failed to check if FileSystem suppports permissions on "
+ "remoteLogDir [" + remoteRootLogDir + "]", e);
} finally {
try {
remoteFS.delete(permissionCheckFile, false);
} catch (IOException ignored) {
}
remoteFS.delete(permissionCheckFile, false);
} catch (IOException ignored) {
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,29 @@

package org.apache.hadoop.yarn.logaggregation.filecontroller;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.net.URI;

import static org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController.TLDIR_PERMISSIONS;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
Expand All @@ -46,30 +53,73 @@ public class TestLogAggregationFileController {

@Test
public void testRemoteDirCreationWithCustomUser() throws Exception {
LogAggregationFileController controller = mock(
LogAggregationFileController.class, Mockito.CALLS_REAL_METHODS);
FileSystem fs = mock(FileSystem.class);
doReturn(new URI("")).when(fs).getUri();
doReturn(new FileStatus(128, false, 0, 64, System.currentTimeMillis(),
System.currentTimeMillis(), new FsPermission(TLDIR_PERMISSIONS),
"not_yarn_user", "yarn_group", new Path("/tmp/logs"))).when(fs)
.getFileStatus(any(Path.class));
setupCustomUserMocks(controller, fs, "/tmp/logs");

Configuration conf = new Configuration();
controller.initialize(new Configuration(), "TFile");
controller.fsSupportsChmod = false;

controller.verifyAndCreateRemoteLogDir();
assertPermissionFileWasUsedOneTime(fs);
Assert.assertTrue(controller.fsSupportsChmod);

doThrow(new UnsupportedOperationException()).when(fs).setPermission(any(), any());
controller.verifyAndCreateRemoteLogDir();
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
Assert.assertTrue(controller.fsSupportsChmod);

controller.fsSupportsChmod = false;
controller.verifyAndCreateRemoteLogDir();
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
Assert.assertTrue(controller.fsSupportsChmod);
}

@Test
public void testRemoteDirCreationWithCustomUserFsChmodNotSupported() throws Exception {
LogAggregationFileController controller = mock(
LogAggregationFileController.class, Mockito.CALLS_REAL_METHODS);
FileSystem fs = mock(FileSystem.class);
setupCustomUserMocks(controller, fs, "/tmp/logs2");
doThrow(new UnsupportedOperationException()).when(fs).setPermission(any(), any());

Configuration conf = new Configuration();
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "/tmp/logs2");
controller.initialize(conf, "TFile");
controller.verifyAndCreateRemoteLogDir();
assertPermissionFileWasUsedOneTime(fs);
Assert.assertFalse(controller.fsSupportsChmod);

controller.verifyAndCreateRemoteLogDir();
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
Assert.assertFalse(controller.fsSupportsChmod);

controller.fsSupportsChmod = true;
controller.verifyAndCreateRemoteLogDir();
assertPermissionFileWasUsedOneTime(fs); // still once -> cached
Assert.assertFalse(controller.fsSupportsChmod);
}

private static void setupCustomUserMocks(LogAggregationFileController controller,
FileSystem fs, String path)
throws URISyntaxException, IOException {
doReturn(new URI("")).when(fs).getUri();
doReturn(new FileStatus(128, false, 0, 64, System.currentTimeMillis(),
System.currentTimeMillis(), new FsPermission(TLDIR_PERMISSIONS),
"not_yarn_user", "yarn_group", new Path(path))).when(fs)
.getFileStatus(any(Path.class));
doReturn(fs).when(controller).getFileSystem(any(Configuration.class));
doNothing().when(controller).initInternal(any(Configuration.class));

UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
"yarn_user", new String[]{"yarn_group", "other_group"});
UserGroupInformation.setLoginUser(ugi);
}

doNothing().when(controller).initInternal(any(Configuration.class));
controller.initialize(conf, "TFile");
controller.verifyAndCreateRemoteLogDir();

verify(fs).createNewFile(any());
verify(fs).setPermission(any(), eq(new FsPermission(TLDIR_PERMISSIONS)));
verify(fs).delete(any(), eq(false));
Assert.assertTrue(controller.fsSupportsChmod);
private static void assertPermissionFileWasUsedOneTime(FileSystem fs) throws IOException {
verify(fs, times(1)).createNewFile(any());
verify(fs, times(1)).setPermission(any(), eq(new FsPermission(TLDIR_PERMISSIONS)));
verify(fs, times(1)).delete(any(), eq(false));
}
}

0 comments on commit 702e5e7

Please sign in to comment.