2525
2626import com .google .common .base .Preconditions ;
2727import org .apache .hadoop .security .AuthConfigureHolder ;
28+ import org .apache .hadoop .ipc .CallerContext ;
2829import org .slf4j .Logger ;
2930import org .slf4j .LoggerFactory ;
3031import org .apache .hadoop .fs .FSExceptionMessages ;
@@ -85,18 +86,57 @@ private String toAccessControlString(INodeAttributes inodeAttrib,
8586 private final Collection <String > groups ;
8687 private final boolean isSuper ;
8788 private final INodeAttributeProvider attributeProvider ;
89+ private final boolean authorizeWithContext ;
90+
91+ private static ThreadLocal <String > operationType = new ThreadLocal <>();
8892
8993
9094 protected FSPermissionChecker (String fsOwner , String supergroup ,
9195 UserGroupInformation callerUgi ,
9296 INodeAttributeProvider attributeProvider ) {
97+ boolean useNewAuthorizationWithContextAPI ;
9398 this .fsOwner = fsOwner ;
9499 this .supergroup = supergroup ;
95100 this .callerUgi = callerUgi ;
96101 this .groups = callerUgi .getGroups ();
97102 user = callerUgi .getShortUserName ();
98103 isSuper = isSameUser (user , fsOwner ) || groups .contains (supergroup );
99104 this .attributeProvider = attributeProvider ;
105+
106+ // If the AccessControlEnforcer supports context enrichment, call
107+ // the new API. Otherwise choose the old API.
108+ Class [] cArg = new Class [1 ];
109+ cArg [0 ] = INodeAttributeProvider .AuthorizationContext .class ;
110+
111+ AccessControlEnforcer ace ;
112+ if (attributeProvider == null ) {
113+ // If attribute provider is null, use FSPermissionChecker default
114+ // implementation to authorize, which supports authorization with context.
115+ useNewAuthorizationWithContextAPI = true ;
116+ LOG .info ("Default authorization provider supports the new authorization" +
117+ " provider API" );
118+ } else {
119+ ace = attributeProvider .getExternalAccessControlEnforcer (this );
120+ // if the runtime external authorization provider doesn't support
121+ // checkPermissionWithContext(), fall back to the old API
122+ // checkPermission().
123+ try {
124+ Class <?> clazz = ace .getClass ();
125+ clazz .getDeclaredMethod ("checkPermissionWithContext" , cArg );
126+ useNewAuthorizationWithContextAPI = true ;
127+ LOG .info ("Use the new authorization provider API" );
128+ } catch (NoSuchMethodException e ) {
129+ useNewAuthorizationWithContextAPI = false ;
130+ LOG .info ("Fallback to the old authorization provider API because " +
131+ "the expected method is not found." );
132+ }
133+ }
134+
135+ authorizeWithContext = useNewAuthorizationWithContextAPI ;
136+ }
137+
138+ public static void setOperationType (String opType ) {
139+ operationType .set (opType );
100140 }
101141
102142 public boolean isMemberOfGroup (String group ) {
@@ -191,9 +231,35 @@ void checkPermission(INodesInPath inodesInPath, boolean doCheckOwner,
191231 int ancestorIndex = inodes .length - 2 ;
192232
193233 AccessControlEnforcer enforcer = getAccessControlEnforcer ();
194- enforcer .checkPermission (fsOwner , supergroup , callerUgi , inodeAttrs , inodes ,
195- components , snapshotId , path , ancestorIndex , doCheckOwner ,
196- ancestorAccess , parentAccess , access , subAccess , ignoreEmptyDir );
234+
235+ String opType = operationType .get ();
236+ if (this .authorizeWithContext && opType != null ) {
237+ INodeAttributeProvider .AuthorizationContext .Builder builder =
238+ new INodeAttributeProvider .AuthorizationContext .Builder ();
239+ builder .fsOwner (fsOwner ).
240+ supergroup (supergroup ).
241+ callerUgi (callerUgi ).
242+ inodeAttrs (inodeAttrs ).
243+ inodes (inodes ).
244+ pathByNameArr (components ).
245+ snapshotId (snapshotId ).
246+ path (path ).
247+ ancestorIndex (ancestorIndex ).
248+ doCheckOwner (doCheckOwner ).
249+ ancestorAccess (ancestorAccess ).
250+ parentAccess (parentAccess ).
251+ access (access ).
252+ subAccess (subAccess ).
253+ ignoreEmptyDir (ignoreEmptyDir ).
254+ operationName (opType ).
255+ callerContext (CallerContext .getCurrent ());
256+ enforcer .checkPermissionWithContext (builder .build ());
257+ } else {
258+ enforcer .checkPermission (fsOwner , supergroup , callerUgi , inodeAttrs ,
259+ inodes , components , snapshotId , path , ancestorIndex , doCheckOwner ,
260+ ancestorAccess , parentAccess , access , subAccess , ignoreEmptyDir );
261+ }
262+
197263 }
198264
199265 /**
@@ -213,17 +279,45 @@ void checkPermission(INode inode, int snapshotId, FsAction access)
213279 try {
214280 INodeAttributes [] iNodeAttr = {nodeAttributes };
215281 AccessControlEnforcer enforcer = getAccessControlEnforcer ();
216- enforcer .checkPermission (
217- fsOwner , supergroup , callerUgi ,
218- iNodeAttr , // single inode attr in the array
219- new INode []{inode }, // single inode in the array
220- pathComponents , snapshotId ,
221- null , -1 , // this will skip checkTraverse() because
222- // not checking ancestor here
223- false , null , null ,
224- access , // the target access to be checked against the inode
225- null , // passing null sub access avoids checking children
226- false );
282+ String opType = operationType .get ();
283+ if (this .authorizeWithContext && opType != null ) {
284+ INodeAttributeProvider .AuthorizationContext .Builder builder =
285+ new INodeAttributeProvider .AuthorizationContext .Builder ();
286+ builder .fsOwner (fsOwner )
287+ .supergroup (supergroup )
288+ .callerUgi (callerUgi )
289+ .inodeAttrs (iNodeAttr ) // single inode attr in the array
290+ .inodes (new INode [] { inode }) // single inode attr in the array
291+ .pathByNameArr (pathComponents )
292+ .snapshotId (snapshotId )
293+ .path (null )
294+ .ancestorIndex (-1 ) // this will skip checkTraverse()
295+ // because not checking ancestor here
296+ .doCheckOwner (false )
297+ .ancestorAccess (null )
298+ .parentAccess (null )
299+ .access (access ) // the target access to be checked against
300+ // the inode
301+ .subAccess (null ) // passing null sub access avoids checking
302+ // children
303+ .ignoreEmptyDir (false )
304+ .operationName (opType )
305+ .callerContext (CallerContext .getCurrent ());
306+
307+ enforcer .checkPermissionWithContext (builder .build ());
308+ } else {
309+ enforcer .checkPermission (
310+ fsOwner , supergroup , callerUgi ,
311+ iNodeAttr , // single inode attr in the array
312+ new INode []{inode }, // single inode in the array
313+ pathComponents , snapshotId ,
314+ null , -1 , // this will skip checkTraverse() because
315+ // not checking ancestor here
316+ false , null , null ,
317+ access , // the target access to be checked against the inode
318+ null , // passing null sub access avoids checking children
319+ false );
320+ }
227321 } catch (AccessControlException ace ) {
228322 throw new AccessControlException (
229323 toAccessControlString (nodeAttributes , inode .getFullPathName (),
@@ -274,6 +368,22 @@ public void checkPermission(String fsOwner, String supergroup,
274368 }
275369 }
276370
371+ @ Override
372+ public void checkPermissionWithContext (
373+ INodeAttributeProvider .AuthorizationContext authzContext )
374+ throws AccessControlException {
375+ // The default authorization provider does not use the additional context
376+ // parameters including operationName and callerContext.
377+ this .checkPermission (authzContext .getFsOwner (),
378+ authzContext .getSupergroup (), authzContext .getCallerUgi (),
379+ authzContext .getInodeAttrs (), authzContext .getInodes (),
380+ authzContext .getPathByNameArr (), authzContext .getSnapshotId (),
381+ authzContext .getPath (), authzContext .getAncestorIndex (),
382+ authzContext .isDoCheckOwner (), authzContext .getAncestorAccess (),
383+ authzContext .getParentAccess (), authzContext .getAccess (),
384+ authzContext .getSubAccess (), authzContext .isIgnoreEmptyDir ());
385+ }
386+
277387 private INodeAttributes getINodeAttrs (byte [][] pathByNameArr , int pathIdx ,
278388 INode inode , int snapshotId ) {
279389 INodeAttributes inodeAttrs = inode .getSnapshotINode (snapshotId );
0 commit comments