Skip to content

Commit

Permalink
NIFI-13722: Kerberos ticket renewal issue due static thread pool in I…
Browse files Browse the repository at this point in the history
…ceberg library

This closes #9258.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
  • Loading branch information
mark-bathori authored and turcsanyip committed Sep 23, 2024
1 parent b12d26b commit 07611de
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ClassloaderIsolationKeyProvider;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
Expand All @@ -34,12 +33,15 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.ietf.jgss.GSSException;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;

/**
Expand Down Expand Up @@ -68,35 +70,34 @@ public abstract class AbstractIcebergProcessor extends AbstractProcessor impleme
.description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.")
.build();

protected final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
protected static final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
protected final AtomicReference<UserGroupInformation> ugiReference = new AtomicReference<>();

@OnScheduled
public void onScheduled(final ProcessContext context) {
initKerberosCredentials(context);
}

protected void initKerberosCredentials(ProcessContext context) {
protected synchronized void initKerberosCredentials(ProcessContext context) {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final IcebergCatalogService catalogService = context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);

if (kerberosUserService != null) {
final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
kerberosUserReference.set(kerberosUser);
KerberosUser kerberosUser;
if (kerberosUserReference.get() == null) {
kerberosUser = kerberosUserService.createKerberosUser();
} else {
kerberosUser = kerberosUserReference.get();
}
try {
ugiReference.set(getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()), kerberosUser));
} catch (IOException e) {
throw new ProcessException("Kerberos authentication failed", e);
}
kerberosUserReference.set(kerberosUser);
}
}

@OnStopped
public void onStopped() {
kerberosUserReference.set(null);
ugiReference.set(null);
}

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
Expand All @@ -115,18 +116,26 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
});

} catch (Exception e) {
getLogger().error("Privileged action failed with kerberos user {}", kerberosUser, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Privileged action failed with kerberos user " + kerberosUser, e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
}
}
}

@Override
public String getClassloaderIsolationKey(PropertyContext context) {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
return kerberosUserService.getIdentifier();
try {
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
if (kerberosUserService != null) {
final KerberosUser kerberosUser = kerberosUserService.createKerberosUser();
return kerberosUser.getPrincipal();
}
} catch (IllegalStateException e) {
// the Kerberos controller service is disabled, therefore this part of the isolation key cannot be determined yet
}

return null;
}

Expand All @@ -135,5 +144,17 @@ private UserGroupInformation getUgi() {
return ugiReference.get();
}

protected boolean handleAuthErrors(Throwable t, ProcessSession session, ProcessContext context) {
final Optional<GSSException> causeOptional = findCause(t, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().info("No valid Kerberos credential found, retrying login", causeOptional.get());
kerberosUserReference.get().checkTGTAndRelogin();
session.rollback();
context.yield();
return true;
}
return false;
}

protected abstract void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile flowFile) throws ProcessException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.services.iceberg.IcebergCatalogService;
import org.ietf.jgss.GSSException;

import java.io.InputStream;
import java.util.ArrayList;
Expand All @@ -62,13 +61,11 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
import static org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;

Expand Down Expand Up @@ -277,13 +274,7 @@ public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile
try {
table = loadTable(context, flowFile);
} catch (Exception e) {
final Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get());
initKerberosCredentials(context);
session.rollback();
context.yield();
} else {
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Failed to load table from catalog", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
Expand All @@ -310,13 +301,7 @@ public void doOnTrigger(ProcessContext context, ProcessSession session, FlowFile
final WriteResult result = taskWriter.complete();
appendDataFiles(context, flowFile, table, result);
} catch (Exception e) {
final Optional<GSSException> causeOptional = findCause(e, GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
if (causeOptional.isPresent()) {
getLogger().warn("No valid Kerberos credential found, retrying login", causeOptional.get());
initKerberosCredentials(context);
session.rollback();
context.yield();
} else {
if (!handleAuthErrors(e, session, context)) {
getLogger().error("Exception occurred while writing Iceberg records", e);
session.transfer(session.penalize(flowFile), REL_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public abstract class AbstractCatalogService extends AbstractControllerService i
.required(false)
.identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dynamicallyModifiesClasspath(true)
.build();

public static final PropertyDescriptor WAREHOUSE_PATH = new PropertyDescriptor.Builder()
Expand Down

0 comments on commit 07611de

Please sign in to comment.