Skip to content

Interface changes for pagination #1528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 53 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
4f775a6
add missing apis
eric-maynard Mar 20, 2025
e2fcd5e
more tests, fixes
eric-maynard Mar 21, 2025
467ebcb
clean up drop
eric-maynard Mar 21, 2025
5f5ca42
autolint
eric-maynard Mar 21, 2025
c1b2b73
Merge branch 'main' of github.com:apache/polaris into implement-Polar…
eric-maynard Mar 21, 2025
34c8e96
changes per review
eric-maynard Mar 24, 2025
dab7279
revert iceberg messages to comply with oss tests
eric-maynard Mar 24, 2025
54a47a5
another revert
eric-maynard Mar 24, 2025
1ffbf07
more iceberg catalog changes
eric-maynard Mar 24, 2025
113c941
autolint
eric-maynard Mar 24, 2025
b672986
dependency issues
eric-maynard Mar 24, 2025
4a9ed69
more wiring
eric-maynard Mar 24, 2025
6d7c59b
continuing rebase
eric-maynard Mar 24, 2025
5c969d7
remaining issues are related to task loading
eric-maynard Mar 24, 2025
e67b0e9
re-add tests
eric-maynard Mar 24, 2025
663dafb
debugging
eric-maynard Mar 27, 2025
90b4c73
resolve conflicts
eric-maynard Mar 27, 2025
133b658
fix failing tests
eric-maynard Mar 27, 2025
98661be
fix another test
eric-maynard Mar 27, 2025
0798b63
changes per review
eric-maynard Apr 1, 2025
241e7ed
autolint
eric-maynard Apr 1, 2025
0ad4ced
some fixes
eric-maynard Apr 1, 2025
fcaa2b7
rebase
eric-maynard Apr 3, 2025
dc86441
stable
eric-maynard Apr 3, 2025
4b58278
rebase
eric-maynard Apr 16, 2025
6050494
pull main
eric-maynard Apr 28, 2025
0fcec2f
updates for new persistence
eric-maynard Apr 28, 2025
6b9031c
fix
eric-maynard Apr 28, 2025
01a8468
merge conflicts
eric-maynard Apr 29, 2025
81a7bf0
continuing work
eric-maynard May 4, 2025
97d7f3d
more reverts
eric-maynard May 4, 2025
4fab2c5
fix conflicts
eric-maynard May 4, 2025
1343e10
continue reverts
eric-maynard May 4, 2025
d250d78
more reverts
eric-maynard May 4, 2025
f5a9c51
yank tests
eric-maynard May 4, 2025
5def9ed
autolint
eric-maynard May 4, 2025
f2042b5
test reverts
eric-maynard May 4, 2025
08a5a9c
try to support limit without real page tokens
eric-maynard May 5, 2025
9192f04
autolint
eric-maynard May 5, 2025
ec89a46
Stable
eric-maynard May 5, 2025
5c91235
change comment
eric-maynard May 5, 2025
e0911eb
autolint
eric-maynard May 5, 2025
dde29d0
remove catalog config for now
eric-maynard May 5, 2025
b87bc63
merge conflicts
eric-maynard May 7, 2025
0d4bd9c
changes per review
eric-maynard May 7, 2025
17dba07
more tweaks
eric-maynard May 7, 2025
a159f23
simplify types per review
eric-maynard May 8, 2025
941b34c
Stable, about to refactor more
eric-maynard May 8, 2025
8ec7d7c
re-stable
eric-maynard May 8, 2025
806454c
polish
eric-maynard May 8, 2025
442341f
autolint
eric-maynard May 8, 2025
9b6c591
more changes per review
eric-maynard May 8, 2025
e578809
stable
eric-maynard May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
Expand All @@ -52,6 +53,9 @@
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.HasPageSize;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
Expand Down Expand Up @@ -419,29 +423,30 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(

/** {@inheritDoc} */
@Override
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType) {
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
return this.listEntitiesInCurrentTxn(
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue(), pageToken);
}

@Override
public @Nonnull List<EntityNameLookupRecord> listEntitiesInCurrentTxn(
public @Nonnull Page<EntityNameLookupRecord> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull PageToken pageToken) {
// full range scan under the parent for that type
return this.listEntitiesInCurrentTxn(
callCtx,
catalogId,
parentId,
entityType,
Integer.MAX_VALUE,
entityFilter,
entity ->
new EntityNameLookupRecord(
Expand All @@ -450,27 +455,33 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
entity.getParentId(),
entity.getName(),
entity.getTypeCode(),
entity.getSubTypeCode()));
entity.getSubTypeCode()),
pageToken);
}

@Override
public @Nonnull <T> List<T> listEntitiesInCurrentTxn(
public @Nonnull <T> Page<T> listEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer) {
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
// full range scan under the parent for that type
return this.store
.lookupFullEntitiesActive(localSession.get(), catalogId, parentId, entityType)
.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter)
.limit(limit)
.map(transformer)
.collect(Collectors.toList());
Stream<PolarisBaseEntity> data =
this.store
.lookupFullEntitiesActive(
localSession.get(), catalogId, parentId, entityType, pageToken)
.stream()
.map(ModelEntity::toEntity)
.filter(entityFilter);

if (pageToken instanceof HasPageSize hasPageSize) {
Copy link
Contributor

@dimas-b dimas-b May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is functionally correct. If the token is a sub-class of HasPageSize but has other information that the client provided, but EclipseLink does not recognise, I do not think it would be correct to just use the page size from the token.

I believe we should check for a specific type here and fail on all types, whose details cannot be fully processed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct for the current code, where HasPageSize is just used as a limit. When page tokens are implemented this logic will definitely change!

data = data.limit(hasPageSize.getPageSize());
}

return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.jpa.models.ModelEntity;
import org.apache.polaris.jpa.models.ModelEntityActive;
Expand Down Expand Up @@ -282,7 +283,11 @@ long countActiveChildEntities(
}

List<ModelEntity> lookupFullEntitiesActive(
EntityManager session, long catalogId, long parentId, @Nonnull PolarisEntityType entityType) {
EntityManager session,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.pagination.HasPageSize;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyType;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
Expand Down Expand Up @@ -352,49 +355,51 @@ public List<PolarisChangeTrackingVersions> lookupEntityVersions(

@Nonnull
@Override
public List<EntityNameLookupRecord> listEntities(
public Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType) {
@Nonnull PolarisEntityType entityType,
@Nonnull PageToken pageToken) {
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
Integer.MAX_VALUE,
entity -> true,
EntityNameLookupRecord::new);
EntityNameLookupRecord::new,
pageToken);
}

@Nonnull
@Override
public List<EntityNameLookupRecord> listEntities(
public Page<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull PageToken pageToken) {
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
Integer.MAX_VALUE,
entityFilter,
EntityNameLookupRecord::new);
EntityNameLookupRecord::new,
pageToken);
}

@Nonnull
@Override
public <T> List<T> listEntities(
public <T> Page<T> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
PolarisEntityType entityType,
int limit,
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
@Nonnull Function<PolarisBaseEntity, T> transformer) {
@Nonnull Function<PolarisBaseEntity, T> transformer,
@Nonnull PageToken pageToken) {
Map<String, Object> params =
Map.of(
"catalog_id",
Expand All @@ -415,15 +420,17 @@ public <T> List<T> listEntities(
query,
new ModelEntity(),
stream -> {
stream
.map(ModelEntity::toEntity)
.filter(entityFilter)
.limit(limit)
.forEach(results::add);
var data = stream.map(ModelEntity::toEntity).filter(entityFilter);
if (pageToken instanceof HasPageSize hasPageSize) {
data = data.limit(hasPageSize.getPageSize());
}
data.forEach(results::add);
});
return results == null
? Collections.emptyList()
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
List<T> resultsOrEmpty =
results == null
? Collections.emptyList()
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
return Page.fromItems(resultsOrEmpty);
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ public static void enforceFeatureEnabledOrThrow(
.defaultValue(2)
.buildFeatureConfiguration();

public static final PolarisConfiguration<Boolean> LIST_PAGINATION_ENABLED =
PolarisConfiguration.<Boolean>builder()
.key("LIST_PAGINATION_ENABLED")
.description("If set to true, pagination for APIs like listTables is enabled.")
.defaultValue(false)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Boolean> ENABLE_GENERIC_TABLES =
PolarisConfiguration.<Boolean>builder()
.key("ENABLE_GENERIC_TABLES")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyMappingUtil;
Expand Down Expand Up @@ -687,7 +689,8 @@ private void revokeGrantRecord(
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
@Nonnull PolarisEntitySubType entitySubType) {
@Nonnull PolarisEntitySubType entitySubType,
@Nonnull PageToken pageToken) {
// get meta store we should be using
BasePersistence ms = callCtx.getMetaStore();

Expand All @@ -699,15 +702,16 @@ private void revokeGrantRecord(
catalogPath == null || catalogPath.size() == 0
? 0l
: catalogPath.get(catalogPath.size() - 1).getId();
List<EntityNameLookupRecord> toreturnList =
ms.listEntities(callCtx, catalogId, parentId, entityType);
Page<EntityNameLookupRecord> resultPage =
ms.listEntities(callCtx, catalogId, parentId, entityType, pageToken);

// prune the returned list with only entities matching the entity subtype
if (entitySubType != PolarisEntitySubType.ANY_SUBTYPE) {
toreturnList =
toreturnList.stream()
.filter(rec -> rec.getSubTypeCode() == entitySubType.getCode())
.collect(Collectors.toList());
resultPage =
pageToken.buildNextPage(
resultPage.items.stream()
.filter(rec -> rec.getSubTypeCode() == entitySubType.getCode())
.collect(Collectors.toList()));
}

// TODO: Use post-validation to enforce consistent view against catalogPath. In the
Expand All @@ -717,7 +721,7 @@ private void revokeGrantRecord(
// in-flight request (the cache-based resolution follows a different path entirely).

// done
return new ListEntitiesResult(toreturnList);
return ListEntitiesResult.fromPage(resultPage);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -1176,13 +1180,14 @@ private void revokeGrantRecord(
// get the list of catalog roles, at most 2
List<PolarisBaseEntity> catalogRoles =
ms.listEntities(
callCtx,
catalogId,
catalogId,
PolarisEntityType.CATALOG_ROLE,
2,
entity -> true,
Function.identity());
callCtx,
catalogId,
catalogId,
PolarisEntityType.CATALOG_ROLE,
entity -> true,
Function.identity(),
PageToken.fromLimit(2))
.items;

// if we have 2, we cannot drop the catalog. If only one left, better be the admin role
if (catalogRoles.size() > 1) {
Expand Down Expand Up @@ -1488,17 +1493,16 @@ private void revokeGrantRecord(

@Override
public @Nonnull EntitiesResult loadTasks(
@Nonnull PolarisCallContext callCtx, String executorId, int limit) {
@Nonnull PolarisCallContext callCtx, String executorId, PageToken pageToken) {
BasePersistence ms = callCtx.getMetaStore();

// find all available tasks
List<PolarisBaseEntity> availableTasks =
Page<PolarisBaseEntity> availableTasks =
ms.listEntities(
callCtx,
PolarisEntityConstants.getRootEntityId(),
PolarisEntityConstants.getRootEntityId(),
PolarisEntityType.TASK,
limit,
entity -> {
PolarisObjectMapperUtil.TaskExecutionState taskState =
PolarisObjectMapperUtil.parseTaskState(entity);
Expand All @@ -1513,11 +1517,12 @@ private void revokeGrantRecord(
|| taskState.executor == null
|| callCtx.getClock().millis() - taskState.lastAttemptStartTime > taskAgeTimeout;
},
Function.identity());
Function.identity(),
pageToken);

List<PolarisBaseEntity> loadedTasks = new ArrayList<>();
final AtomicInteger failedLeaseCount = new AtomicInteger(0);
availableTasks.forEach(
availableTasks.items.forEach(
task -> {
PolarisBaseEntity updatedTask = new PolarisBaseEntity(task);
Map<String, String> properties =
Expand Down Expand Up @@ -1554,7 +1559,7 @@ private void revokeGrantRecord(
throw new RetryOnConcurrencyException(
"Failed to lease any of %s tasks due to concurrent leases", failedLeaseCount.get());
}
return new EntitiesResult(loadedTasks);
return EntitiesResult.fromPage(Page.fromItems(loadedTasks));
}

/** {@inheritDoc} */
Expand Down
Loading