Skip to content

Use isolated request contexts for task execution #1817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.quarkus.config;

import jakarta.enterprise.context.RequestScoped;
import jakarta.ws.rs.container.ContainerRequestContext;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.polaris.core.context.RealmContext;

/**
* A container for request-scoped information discovered during request execution.
*
* <p>This is an equivalent for {@link ContainerRequestContext}, but for use in non-HTTP requests.
*/
@RequestScoped
public class PolarisRequestContext {
private final AtomicReference<RealmContext> realmCtx = new AtomicReference<>();

/**
* Records the {@link RealmContext} that applies to current request. The realm context may be
* determined from REST API header or by passing explicit realm ID values from one CDI context to
* another.
*
* <p>During the execution of a particular request, this method should be called before {@link
* #realmContext()}.
*/
public void setRealmContext(RealmContext rc) {
realmCtx.set(rc);
}

/**
* Returns the realm context for this request previously set via {@link
* #setRealmContext(RealmContext)}.
*/
public RealmContext realmContext() {
return realmCtx.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.polaris.service.quarkus.config;

import com.google.common.base.Preconditions;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.context.SmallRyeManagedExecutor;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -29,8 +30,6 @@
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Singleton;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.Context;
import java.time.Clock;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
Expand Down Expand Up @@ -67,7 +66,6 @@
import org.apache.polaris.service.quarkus.auth.external.tenant.OidcTenantResolver;
import org.apache.polaris.service.quarkus.catalog.io.QuarkusFileIOConfiguration;
import org.apache.polaris.service.quarkus.context.QuarkusRealmContextConfiguration;
import org.apache.polaris.service.quarkus.context.RealmContextFilter;
import org.apache.polaris.service.quarkus.events.QuarkusPolarisEventListenerConfiguration;
import org.apache.polaris.service.quarkus.persistence.QuarkusPersistenceConfiguration;
import org.apache.polaris.service.quarkus.ratelimiter.QuarkusRateLimiterFilterConfiguration;
Expand Down Expand Up @@ -115,8 +113,10 @@ public PolarisDiagnostics polarisDiagnostics() {

@Produces
@RequestScoped
public RealmContext realmContext(@Context ContainerRequestContext request) {
return (RealmContext) request.getProperty(RealmContextFilter.REALM_CONTEXT_KEY);
public RealmContext realmContext(PolarisRequestContext context) {
RealmContext realmContext = context.realmContext();
Preconditions.checkState(realmContext != null, "RealmContext was not property configured");
return realmContext;
}

@Produces
Expand Down Expand Up @@ -297,7 +297,7 @@ public TokenBroker tokenBroker(
public ManagedExecutor taskExecutor(TaskHandlerConfiguration config) {
return SmallRyeManagedExecutor.builder()
.injectionPointName("task-executor")
.propagated(ThreadContext.ALL_REMAINING)
.propagated(ThreadContext.NONE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a bit too much? How about:

    return SmallRyeManagedExecutor.builder()
        .injectionPointName("task-executor")
        .propagated(ThreadContext.ALL_REMAINING)
        .cleared(ThreadContext.CDI)
        .maxAsync(config.maxConcurrentTasks())
        .maxQueued(config.maxQueuedTasks())
        .build();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Will do.... a bit later, when I get to work on this again :)

.maxAsync(config.maxConcurrentTasks())
.maxQueued(config.maxQueuedTasks())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.polaris.service.config.PolarisFilterPriorities;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.quarkus.config.PolarisRequestContext;
import org.jboss.resteasy.reactive.server.ServerRequestFilter;

public class RealmContextFilter {

public static final String REALM_CONTEXT_KEY = "realmContext";

@Inject RealmContextResolver realmContextResolver;
@Inject PolarisRequestContext polarisRequestContext;

@ServerRequestFilter(preMatching = true, priority = PolarisFilterPriorities.REALM_CONTEXT_FILTER)
public Uni<Response> resolveRealmContext(ContainerRequestContext rc) {
Expand All @@ -46,7 +48,7 @@ public Uni<Response> resolveRealmContext(ContainerRequestContext rc) {
rc.getUriInfo().getPath(),
rc.getHeaders()::getFirst))
.onItem()
.invoke(realmContext -> rc.setProperty(REALM_CONTEXT_KEY, realmContext))
.invoke(realmContext -> polarisRequestContext.setRealmContext(realmContext))
.invoke(realmContext -> ContextLocals.put(REALM_CONTEXT_KEY, realmContext))
.onItemOrFailure()
.transform((realmContext, error) -> error == null ? null : errorResponse(error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.polaris.service.quarkus.logging;

import static org.apache.polaris.service.quarkus.context.RealmContextFilter.REALM_CONTEXT_KEY;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand All @@ -28,6 +26,7 @@
import jakarta.ws.rs.container.PreMatching;
import jakarta.ws.rs.ext.Provider;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.service.quarkus.config.PolarisRequestContext;
import org.apache.polaris.service.quarkus.config.QuarkusFilterPriorities;
import org.slf4j.MDC;

Expand All @@ -41,6 +40,7 @@ public class QuarkusLoggingMDCFilter implements ContainerRequestFilter {
public static final String REQUEST_ID_KEY = "requestId";

@Inject QuarkusLoggingConfiguration loggingConfiguration;
@Inject PolarisRequestContext polarisRequestContext;

@Override
public void filter(ContainerRequestContext rc) {
Expand All @@ -54,7 +54,7 @@ public void filter(ContainerRequestContext rc) {
MDC.put(REQUEST_ID_KEY, requestId);
rc.setProperty(REQUEST_ID_KEY, requestId);
}
RealmContext realmContext = (RealmContext) rc.getProperty(REALM_CONTEXT_KEY);
RealmContext realmContext = polarisRequestContext.realmContext();
MDC.put(REALM_ID_KEY, realmContext.getRealmIdentifier());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import io.quarkus.runtime.Startup;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.control.ActivateRequestContext;
import jakarta.inject.Inject;
import java.util.concurrent.ExecutorService;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.service.events.PolarisEventListener;
import org.apache.polaris.service.quarkus.config.PolarisRequestContext;
import org.apache.polaris.service.quarkus.tracing.QuarkusTracingFilter;
import org.apache.polaris.service.task.TaskExecutorImpl;
import org.apache.polaris.service.task.TaskFileIOSupplier;
Expand All @@ -38,9 +40,10 @@
public class QuarkusTaskExecutorImpl extends TaskExecutorImpl {

private final Tracer tracer;
private final PolarisRequestContext polarisRequestContext;

public QuarkusTaskExecutorImpl() {
this(null, null, null, null, null);
this(null, null, null, null, null, null);
}

@Inject
Expand All @@ -49,9 +52,11 @@ public QuarkusTaskExecutorImpl(
MetaStoreManagerFactory metaStoreManagerFactory,
TaskFileIOSupplier fileIOSupplier,
Tracer tracer,
PolarisEventListener polarisEventListener) {
PolarisEventListener polarisEventListener,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand how the propagation works, but will we be able to add some test to make sure the context is propagated correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered by existing tests that produce tasks.

It works like this:

  1. Quarkus create a new request context when handleTask is called
  2. The code below this like puts the RealmContext for the tasks into polarisRequestContext
  3. The Quarkus proxy for polarisRequestContext redirects this "set" call to the instance in the current request context
  4. When something needs a RealmContext during the execution of a task, Quarkus invokes QuarkusProducers.realmContext(PolarisRequestContext context) and gives it the same PolarisRequestContext instance that was used in step 3.

For HTTP requests step 1 is the start of the request at the REST API layer, in which case the RealmContext is derived from HTTP headers by a filter.

This whole workflow is very similar to how it worked before, but we use a custom "holder" object instead of ContainerRequestContext, which allows us to support non-HTTP requests (i.e. async tasks).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, after rebasing test coverage is gone 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is covered by existing tests that produce tasks.

Unfortunately I don't think we have such case running in our CI today, otherwise, it would have caught the problem when we do ConfigurationStore injection. The only case i know which triggers background task is purge, which was caught by an aws regression test, which can only run manually today.

I am trying to add a spark integration test here https://github.com/apache/polaris/pull/1825/files, but runs into following error when do drop with purge

2025-06-05 18:36:21,503 INFO  [org.apa.pol.ser.exc.IcebergExceptionMapper] [,POLARIS] [,,,] (executor-thread-1) Handling runtimeException Principal 'root' with activated PrincipalRoles '[]' and activated grants via '[service_admin, catalog_admin]' is not authorized for op DROP_TABLE_WITH_PURGE

i am wondering if you know how the catalogAPI privilege is setup with quarkus test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's using default privileges in this case. Related slack discussion: https://apache-polaris.slack.com/archives/C084XDM50CB/p1748381388095509

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: test coverage : when my new code had checkState assertions, it caused CI failures in the tests I had to modify. Unfortunately, after rebasing request context stuff is no longer injected into the default config impl., so it cannot perform those checks 🤷 I'll address this if the community decides to go ahead with this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's using default privileges in this case. Related slack discussion: https://apache-polaris.slack.com/archives/C084XDM50CB/p1748381388095509

Thanks! let me see if I can get this test work.

PolarisRequestContext polarisRequestContext) {
super(executorService, metaStoreManagerFactory, fileIOSupplier, polarisEventListener);
this.tracer = tracer;
this.polarisRequestContext = polarisRequestContext;
}

@Startup
Expand All @@ -61,6 +66,7 @@ public void init() {
}

@Override
@ActivateRequestContext
protected void handleTask(long taskEntityId, CallContext callContext, int attempt) {
Span span =
tracer
Expand All @@ -73,6 +79,7 @@ protected void handleTask(long taskEntityId, CallContext callContext, int attemp
.setAttribute("polaris.task.attempt", attempt)
.startSpan();
try (Scope ignored = span.makeCurrent()) {
polarisRequestContext.setRealmContext(callContext.getRealmContext());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@dimas-b dimas-b Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good point. I'll do it later if people agree on the general approach in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, i think we probably need to propagate the whole callcontext, instead of just the realmContext, because the whole callContext is needed for the background executor, the realmContext is just what needed for getConfiguration, which is called during the task execution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually - yes. With the recent config changes this PR is no longer meaningful in isolation, but it shows how CDI can be done for tasks.

I also opened a bigger related proposal on the dev ML: https://lists.apache.org/thread/0cyrzft2oon28otxlmmhvd5671rd3r3d

super.handleTask(taskEntityId, callContext, attempt);
} finally {
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ public static void setUpMocks() {
public void before(TestInfo testInfo) {
RealmContext realmContext = testInfo::getDisplayName;
QuarkusMock.installMockForType(realmContext, RealmContext.class);
metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext);
userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext);

polarisAuthorizer = new PolarisAuthorizerImpl(configurationStore);

polarisContext =
new PolarisCallContext(
Expand All @@ -242,11 +238,16 @@ public void before(TestInfo testInfo) {
diagServices,
configurationStore,
clock);
this.entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext);

callContext = polarisContext;
CallContext.setCurrentContext(callContext);

metaStoreManager = managerFactory.getOrCreateMetaStoreManager(realmContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the real change is moving CallContext.setCurrentContext(callContext) above, but GH formats it strangely 😅

CallContext.setCurrentContext must happen earlier to make sure "thread local" stuff is aligned with CDI context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, after rebasing these changes are not meaningful, indeed 🤷

userSecretsManager = userSecretsManagerFactory.getOrCreateUserSecretsManager(realmContext);

polarisAuthorizer = new PolarisAuthorizerImpl(configurationStore);

this.entityManager = realmEntityManagerFactory.getOrCreateEntityManager(realmContext);

PrincipalEntity rootEntity =
new PrincipalEntity(
PolarisEntity.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,14 @@ public void before(TestInfo testInfo) {
diagServices,
configurationStore,
Clock.systemDefaultZone());
CallContext.setCurrentContext(polarisContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same: is this change required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moving CallContext.setCurrentContext above the calls that rely on it -- to ensure CDI context has the same data and thread locals.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, after rebasing these changes are not meaningful, indeed 🤷


PolarisEntityManager entityManager =
new PolarisEntityManager(
metaStoreManager,
new StorageCredentialCache(),
new InMemoryEntityCache(metaStoreManager));

CallContext.setCurrentContext(polarisContext);

PrincipalEntity rootEntity =
new PrincipalEntity(
PolarisEntity.of(
Expand Down
Loading