2424import java .util .Stack ;
2525
2626import com .google .common .base .Preconditions ;
27+ import org .apache .hadoop .ipc .CallerContext ;
2728import org .slf4j .Logger ;
2829import org .slf4j .LoggerFactory ;
2930import org .apache .hadoop .fs .FSExceptionMessages ;
@@ -84,18 +85,57 @@ private String toAccessControlString(INodeAttributes inodeAttrib,
8485 private final Collection <String > groups ;
8586 private final boolean isSuper ;
8687 private final INodeAttributeProvider attributeProvider ;
88+ private final boolean authorizeWithContext ;
89+
90+ private static ThreadLocal <String > operationType = new ThreadLocal <>();
8791
8892
8993 protected FSPermissionChecker (String fsOwner , String supergroup ,
9094 UserGroupInformation callerUgi ,
9195 INodeAttributeProvider attributeProvider ) {
96+ boolean useNewAuthorizationWithContextAPI ;
9297 this .fsOwner = fsOwner ;
9398 this .supergroup = supergroup ;
9499 this .callerUgi = callerUgi ;
95100 this .groups = callerUgi .getGroups ();
96101 user = callerUgi .getShortUserName ();
97102 isSuper = user .equals (fsOwner ) || groups .contains (supergroup );
98103 this .attributeProvider = attributeProvider ;
104+
105+ // If the AccessControlEnforcer supports context enrichment, call
106+ // the new API. Otherwise choose the old API.
107+ Class [] cArg = new Class [1 ];
108+ cArg [0 ] = INodeAttributeProvider .AuthorizationContext .class ;
109+
110+ AccessControlEnforcer ace ;
111+ if (attributeProvider == null ) {
112+ // If attribute provider is null, use FSPermissionChecker default
113+ // implementation to authorize, which supports authorization with context.
114+ useNewAuthorizationWithContextAPI = true ;
115+ LOG .info ("Default authorization provider supports the new authorization" +
116+ " provider API" );
117+ } else {
118+ ace = attributeProvider .getExternalAccessControlEnforcer (this );
119+ // if the runtime external authorization provider doesn't support
120+ // checkPermissionWithContext(), fall back to the old API
121+ // checkPermission().
122+ try {
123+ Class <?> clazz = ace .getClass ();
124+ clazz .getDeclaredMethod ("checkPermissionWithContext" , cArg );
125+ useNewAuthorizationWithContextAPI = true ;
126+ LOG .info ("Use the new authorization provider API" );
127+ } catch (NoSuchMethodException e ) {
128+ useNewAuthorizationWithContextAPI = false ;
129+ LOG .info ("Fallback to the old authorization provider API because " +
130+ "the expected method is not found." );
131+ }
132+ }
133+
134+ authorizeWithContext = useNewAuthorizationWithContextAPI ;
135+ }
136+
137+ public static void setOperationType (String opType ) {
138+ operationType .set (opType );
99139 }
100140
101141 public boolean isMemberOfGroup (String group ) {
@@ -190,9 +230,35 @@ void checkPermission(INodesInPath inodesInPath, boolean doCheckOwner,
190230 int ancestorIndex = inodes .length - 2 ;
191231
192232 AccessControlEnforcer enforcer = getAccessControlEnforcer ();
193- enforcer .checkPermission (fsOwner , supergroup , callerUgi , inodeAttrs , inodes ,
194- components , snapshotId , path , ancestorIndex , doCheckOwner ,
195- ancestorAccess , parentAccess , access , subAccess , ignoreEmptyDir );
233+
234+ String opType = operationType .get ();
235+ if (this .authorizeWithContext && opType != null ) {
236+ INodeAttributeProvider .AuthorizationContext .Builder builder =
237+ new INodeAttributeProvider .AuthorizationContext .Builder ();
238+ builder .fsOwner (fsOwner ).
239+ supergroup (supergroup ).
240+ callerUgi (callerUgi ).
241+ inodeAttrs (inodeAttrs ).
242+ inodes (inodes ).
243+ pathByNameArr (components ).
244+ snapshotId (snapshotId ).
245+ path (path ).
246+ ancestorIndex (ancestorIndex ).
247+ doCheckOwner (doCheckOwner ).
248+ ancestorAccess (ancestorAccess ).
249+ parentAccess (parentAccess ).
250+ access (access ).
251+ subAccess (subAccess ).
252+ ignoreEmptyDir (ignoreEmptyDir ).
253+ operationName (opType ).
254+ callerContext (CallerContext .getCurrent ());
255+ enforcer .checkPermissionWithContext (builder .build ());
256+ } else {
257+ enforcer .checkPermission (fsOwner , supergroup , callerUgi , inodeAttrs ,
258+ inodes , components , snapshotId , path , ancestorIndex , doCheckOwner ,
259+ ancestorAccess , parentAccess , access , subAccess , ignoreEmptyDir );
260+ }
261+
196262 }
197263
198264 /**
@@ -212,17 +278,45 @@ void checkPermission(INode inode, int snapshotId, FsAction access)
212278 try {
213279 INodeAttributes [] iNodeAttr = {nodeAttributes };
214280 AccessControlEnforcer enforcer = getAccessControlEnforcer ();
215- enforcer .checkPermission (
216- fsOwner , supergroup , callerUgi ,
217- iNodeAttr , // single inode attr in the array
218- new INode []{inode }, // single inode in the array
219- pathComponents , snapshotId ,
220- null , -1 , // this will skip checkTraverse() because
221- // not checking ancestor here
222- false , null , null ,
223- access , // the target access to be checked against the inode
224- null , // passing null sub access avoids checking children
225- false );
281+ String opType = operationType .get ();
282+ if (this .authorizeWithContext && opType != null ) {
283+ INodeAttributeProvider .AuthorizationContext .Builder builder =
284+ new INodeAttributeProvider .AuthorizationContext .Builder ();
285+ builder .fsOwner (fsOwner )
286+ .supergroup (supergroup )
287+ .callerUgi (callerUgi )
288+ .inodeAttrs (iNodeAttr ) // single inode attr in the array
289+ .inodes (new INode [] { inode }) // single inode attr in the array
290+ .pathByNameArr (pathComponents )
291+ .snapshotId (snapshotId )
292+ .path (null )
293+ .ancestorIndex (-1 ) // this will skip checkTraverse()
294+ // because not checking ancestor here
295+ .doCheckOwner (false )
296+ .ancestorAccess (null )
297+ .parentAccess (null )
298+ .access (access ) // the target access to be checked against
299+ // the inode
300+ .subAccess (null ) // passing null sub access avoids checking
301+ // children
302+ .ignoreEmptyDir (false )
303+ .operationName (opType )
304+ .callerContext (CallerContext .getCurrent ());
305+
306+ enforcer .checkPermissionWithContext (builder .build ());
307+ } else {
308+ enforcer .checkPermission (
309+ fsOwner , supergroup , callerUgi ,
310+ iNodeAttr , // single inode attr in the array
311+ new INode []{inode }, // single inode in the array
312+ pathComponents , snapshotId ,
313+ null , -1 , // this will skip checkTraverse() because
314+ // not checking ancestor here
315+ false , null , null ,
316+ access , // the target access to be checked against the inode
317+ null , // passing null sub access avoids checking children
318+ false );
319+ }
226320 } catch (AccessControlException ace ) {
227321 throw new AccessControlException (
228322 toAccessControlString (nodeAttributes , inode .getFullPathName (),
@@ -273,6 +367,22 @@ public void checkPermission(String fsOwner, String supergroup,
273367 }
274368 }
275369
370+ @ Override
371+ public void checkPermissionWithContext (
372+ INodeAttributeProvider .AuthorizationContext authzContext )
373+ throws AccessControlException {
374+ // The default authorization provider does not use the additional context
375+ // parameters including operationName and callerContext.
376+ this .checkPermission (authzContext .getFsOwner (),
377+ authzContext .getSupergroup (), authzContext .getCallerUgi (),
378+ authzContext .getInodeAttrs (), authzContext .getInodes (),
379+ authzContext .getPathByNameArr (), authzContext .getSnapshotId (),
380+ authzContext .getPath (), authzContext .getAncestorIndex (),
381+ authzContext .isDoCheckOwner (), authzContext .getAncestorAccess (),
382+ authzContext .getParentAccess (), authzContext .getAccess (),
383+ authzContext .getSubAccess (), authzContext .isIgnoreEmptyDir ());
384+ }
385+
276386 private INodeAttributes getINodeAttrs (byte [][] pathByNameArr , int pathIdx ,
277387 INode inode , int snapshotId ) {
278388 INodeAttributes inodeAttrs = inode .getSnapshotINode (snapshotId );
0 commit comments