Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import jakarta.annotation.Nullable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -164,12 +165,19 @@ public CommitFailedException wrapped() {
}
}

private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken, int pageSize) {
private <T> Pair<List<T>, String> paginate(
List<T> list, @Nullable String pageToken, @Nullable Integer pageSize) {
if (pageToken == null) {
return Pair.of(list, null);
}

int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
if (pageStart >= list.size()) {
return Pair.of(Collections.emptyList(), null);
}

// if pageSize is null, return the rest of the list
pageSize = pageSize == null ? list.size() : pageSize;
int end = Math.min(pageStart + pageSize, list.size());
List<T> subList = list.subList(pageStart, end);
String nextPageToken = end >= list.size() ? null : String.valueOf(end);
Expand All @@ -189,7 +197,7 @@ public ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, Namespa
}

public ListNamespacesResponse listNamespaces(
SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) {
SupportsNamespaces catalog, Namespace parent, String pageToken, Integer pageSize) {
List<Namespace> results;

if (parent.isEmpty()) {
Expand All @@ -198,7 +206,7 @@ public ListNamespacesResponse listNamespaces(
results = catalog.listNamespaces(parent);
}

Pair<List<Namespace>, String> page = paginate(results, pageToken, Integer.parseInt(pageSize));
Pair<List<Namespace>, String> page = paginate(results, pageToken, pageSize);

return ListNamespacesResponse.builder()
.addAll(page.first())
Expand Down Expand Up @@ -269,11 +277,10 @@ public ListTablesResponse listTables(Catalog catalog, Namespace namespace) {
}

public ListTablesResponse listTables(
Catalog catalog, Namespace namespace, String pageToken, String pageSize) {
Catalog catalog, Namespace namespace, String pageToken, Integer pageSize) {
List<TableIdentifier> results = catalog.listTables(namespace);

Pair<List<TableIdentifier>, String> page =
paginate(results, pageToken, Integer.parseInt(pageSize));
Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, pageSize);

return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
}
Expand Down Expand Up @@ -725,11 +732,10 @@ public ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) {
}

public ListTablesResponse listViews(
ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) {
ViewCatalog catalog, Namespace namespace, String pageToken, Integer pageSize) {
List<TableIdentifier> results = catalog.listViews(namespace);

Pair<List<TableIdentifier>, String> page =
paginate(results, pageToken, Integer.parseInt(pageSize));
Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, pageSize);

return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
import static org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation.validateIcebergProperties;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -189,8 +190,8 @@ private Response withCatalog(
}
}

private IcebergCatalogHandler newHandlerWrapper(
SecurityContext securityContext, String catalogName) {
@VisibleForTesting
IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String catalogName) {
validatePrincipal(securityContext);

return new IcebergCatalogHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ public ListNamespacesResponse listNamespaces(
.nextPageToken(results.pageToken.toTokenString())
.build();
} else {
return catalogHandlerUtils.listNamespaces(
namespaceCatalog, parent, pageToken, String.valueOf(pageSize));
return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent, pageToken, pageSize);
}
}

Expand Down Expand Up @@ -351,8 +350,7 @@ public ListTablesResponse listTables(Namespace namespace, String pageToken, Inte
.nextPageToken(results.pageToken.toTokenString())
.build();
} else {
return catalogHandlerUtils.listTables(
baseCatalog, namespace, pageToken, String.valueOf(pageSize));
return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken, pageSize);
}
}

Expand Down Expand Up @@ -1017,8 +1015,7 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ
.nextPageToken(results.pageToken.toTokenString())
.build();
} else if (baseCatalog instanceof ViewCatalog viewCatalog) {
return catalogHandlerUtils.listViews(
viewCatalog, namespace, pageToken, String.valueOf(pageSize));
return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, pageSize);
} else {
throw new BadRequestException(
"Unsupported operation: listViews with baseCatalog type: %s",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.catalog.iceberg;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
import org.apache.polaris.core.admin.model.CreateCatalogRequest;
import org.apache.polaris.core.admin.model.ExternalCatalog;
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.service.TestServices;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Strings;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

public class IcebergCatalogAdapterTest {

private static final String FEDERATED_CATALOG_NAME = "polaris-federated-catalog";

private TestServices testServices;
private IcebergCatalogAdapter catalogAdapter;

@BeforeEach
public void setUp() {
// Set up test services with catalog federation enabled
testServices =
TestServices.builder().config(Map.of("ENABLE_CATALOG_FEDERATION", "true")).build();
catalogAdapter = Mockito.spy(testServices.catalogAdapter());

// Prepare storage and connection configs for a federated Iceberg REST catalog
String storageLocation = "s3://my-bucket/path/to/data";
AwsStorageConfigInfo storageConfig =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::012345678901:role/polaris-user-role")
.setExternalId("externalId")
.setUserArn("aws::a:user:arn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setAllowedLocations(List.of(storageLocation, "s3://externally-owned-bucket"))
.build();

AuthenticationParameters authParams =
BearerAuthenticationParameters.builder()
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.BEARER)
.setBearerToken("xxx")
.build();

IcebergRestConnectionConfigInfo connectionConfig =
IcebergRestConnectionConfigInfo.builder()
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
.setAuthenticationParameters(authParams)
.setUri("http://localhost:8080/api/v1/catalogs")
.setRemoteCatalogName("remote-catalog")
.build();

// Register the catalog in the test environment
testServices
.catalogsApi()
.createCatalog(
new CreateCatalogRequest(
ExternalCatalog.builder()
.setName(FEDERATED_CATALOG_NAME)
.setProperties(
CatalogProperties.builder().setDefaultBaseLocation(storageLocation).build())
.setConnectionConfigInfo(connectionConfig)
.setStorageConfigInfo(storageConfig)
.build()),
testServices.realmContext(),
testServices.securityContext());
}

@ParameterizedTest(name = "[{index}] initialPageToken={0}, pageSize={1}")
@MethodSource("paginationTestCases")
void testPaginationForNonIcebergCatalog(String initialPageToken, Integer pageSize)
throws IOException {

try (InMemoryCatalog inMemoryCatalog = new InMemoryCatalog()) {
// Initialize and replace the default handler with one backed by in-memory catalog
inMemoryCatalog.initialize("inMemory", Map.of());
mockCatalogAdapter(inMemoryCatalog);

// Set up 10 entities in the catalog: 10 namespaces, 10 tables, 10 views
int entityCount = 10;
for (int i = 0; i < entityCount; ++i) {
inMemoryCatalog.createNamespace(Namespace.of("ns" + i));
inMemoryCatalog.createTable(TableIdentifier.of("ns0", "table" + i), new Schema());
inMemoryCatalog
.buildView(TableIdentifier.of("ns0", "view" + i))
.withSchema(new Schema())
.withDefaultNamespace(Namespace.of("ns0"))
.withQuery("a", "SELECT * FROM ns0.table" + i)
.create();
}

// Determine starting index for pagination based on the initial page token
int pageStart =
Strings.isNullOrEmpty(initialPageToken) ? 0 : Integer.parseInt(initialPageToken);
int remain = entityCount - pageStart;

// Initial tokens for pagination
String listNamespacePageToken = initialPageToken;
String listTablesPageToken = initialPageToken;
String listViewsPageToken = initialPageToken;

// Simulate page-by-page fetching until all entities are consumed
while (remain > 0) {
int expectedSize =
(pageSize != null && initialPageToken != null) ? Math.min(remain, pageSize) : remain;

// Verify namespaces pagination
ListNamespacesResponse namespacesResponse =
(ListNamespacesResponse)
catalogAdapter
.listNamespaces(
FEDERATED_CATALOG_NAME,
listNamespacePageToken,
pageSize,
null,
testServices.realmContext(),
testServices.securityContext())
.getEntity();
Assertions.assertThat(namespacesResponse.namespaces()).hasSize(expectedSize);
listNamespacePageToken = namespacesResponse.nextPageToken();

// Verify tables pagination
ListTablesResponse tablesResponse =
(ListTablesResponse)
catalogAdapter
.listTables(
FEDERATED_CATALOG_NAME,
"ns0",
listTablesPageToken,
pageSize,
testServices.realmContext(),
testServices.securityContext())
.getEntity();
Assertions.assertThat(tablesResponse.identifiers()).hasSize(expectedSize);
listTablesPageToken = tablesResponse.nextPageToken();

// Verify views pagination
ListTablesResponse viewsResponse =
(ListTablesResponse)
catalogAdapter
.listViews(
FEDERATED_CATALOG_NAME,
"ns0",
listViewsPageToken,
pageSize,
testServices.realmContext(),
testServices.securityContext())
.getEntity();
Assertions.assertThat(viewsResponse.identifiers()).hasSize(expectedSize);
listViewsPageToken = viewsResponse.nextPageToken();

remain -= expectedSize;
}
}
}

private void mockCatalogAdapter(org.apache.iceberg.catalog.Catalog catalog) {
// Override handler creation to inject in-memory catalog and suppress actual close()
Mockito.doAnswer(
invocation -> {
IcebergCatalogHandler realHandler =
(IcebergCatalogHandler) invocation.callRealMethod();
IcebergCatalogHandler wrappedHandler = Mockito.spy(realHandler);

// Override initializeCatalog to inject test catalog using reflection
Mockito.doAnswer(
innerInvocation -> {
for (String fieldName :
List.of("baseCatalog", "namespaceCatalog", "viewCatalog")) {
Field field = IcebergCatalogHandler.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(wrappedHandler, catalog);
}
return null;
})
.when(wrappedHandler)
.initializeCatalog();

// Prevent catalog from being closed during test lifecycle
Mockito.doNothing().when(wrappedHandler).close();

return wrappedHandler;
})
.when(catalogAdapter)
.newHandlerWrapper(Mockito.any(), Mockito.any());
}

private static Stream<Arguments> paginationTestCases() {
return Stream.of(
Arguments.of(null, null),
Arguments.of(null, 1),
Arguments.of(null, 3),
Arguments.of(null, 5),
Arguments.of(null, 10),
Arguments.of(null, 20),
Arguments.of("", null),
Arguments.of("", 1),
Arguments.of("", 3),
Arguments.of("", 5),
Arguments.of("", 10),
Arguments.of("", 20),
Arguments.of("5", null),
Arguments.of("5", 1),
Arguments.of("5", 3),
Arguments.of("5", 5),
Arguments.of("5", 10));
}
}
Loading
Loading