Skip to content

Commit a701f10

Browse files
XJDKCYufei Gu
authored andcommitted
Fix Pagination for Catalog Federation (#1849)
Details can be found in this issue: #1848
1 parent 9da4e41 commit a701f10

File tree

5 files changed

+272
-20
lines changed

5 files changed

+272
-20
lines changed

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.common.base.Preconditions;
2727
import com.google.common.collect.Maps;
2828
import com.google.common.collect.Sets;
29+
import jakarta.annotation.Nullable;
2930
import jakarta.enterprise.context.ApplicationScoped;
3031
import jakarta.inject.Inject;
3132
import java.lang.reflect.Field;
@@ -164,12 +165,19 @@ public CommitFailedException wrapped() {
164165
}
165166
}
166167

167-
private <T> Pair<List<T>, String> paginate(List<T> list, String pageToken, int pageSize) {
168+
private <T> Pair<List<T>, String> paginate(
169+
List<T> list, @Nullable String pageToken, @Nullable Integer pageSize) {
170+
if (pageToken == null) {
171+
return Pair.of(list, null);
172+
}
173+
168174
int pageStart = INITIAL_PAGE_TOKEN.equals(pageToken) ? 0 : Integer.parseInt(pageToken);
169175
if (pageStart >= list.size()) {
170176
return Pair.of(Collections.emptyList(), null);
171177
}
172178

179+
// if pageSize is null, return the rest of the list
180+
pageSize = pageSize == null ? list.size() : pageSize;
173181
int end = Math.min(pageStart + pageSize, list.size());
174182
List<T> subList = list.subList(pageStart, end);
175183
String nextPageToken = end >= list.size() ? null : String.valueOf(end);
@@ -189,7 +197,7 @@ public ListNamespacesResponse listNamespaces(SupportsNamespaces catalog, Namespa
189197
}
190198

191199
public ListNamespacesResponse listNamespaces(
192-
SupportsNamespaces catalog, Namespace parent, String pageToken, String pageSize) {
200+
SupportsNamespaces catalog, Namespace parent, String pageToken, Integer pageSize) {
193201
List<Namespace> results;
194202

195203
if (parent.isEmpty()) {
@@ -198,7 +206,7 @@ public ListNamespacesResponse listNamespaces(
198206
results = catalog.listNamespaces(parent);
199207
}
200208

201-
Pair<List<Namespace>, String> page = paginate(results, pageToken, Integer.parseInt(pageSize));
209+
Pair<List<Namespace>, String> page = paginate(results, pageToken, pageSize);
202210

203211
return ListNamespacesResponse.builder()
204212
.addAll(page.first())
@@ -269,11 +277,10 @@ public ListTablesResponse listTables(Catalog catalog, Namespace namespace) {
269277
}
270278

271279
public ListTablesResponse listTables(
272-
Catalog catalog, Namespace namespace, String pageToken, String pageSize) {
280+
Catalog catalog, Namespace namespace, String pageToken, Integer pageSize) {
273281
List<TableIdentifier> results = catalog.listTables(namespace);
274282

275-
Pair<List<TableIdentifier>, String> page =
276-
paginate(results, pageToken, Integer.parseInt(pageSize));
283+
Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, pageSize);
277284

278285
return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
279286
}
@@ -725,11 +732,10 @@ public ListTablesResponse listViews(ViewCatalog catalog, Namespace namespace) {
725732
}
726733

727734
public ListTablesResponse listViews(
728-
ViewCatalog catalog, Namespace namespace, String pageToken, String pageSize) {
735+
ViewCatalog catalog, Namespace namespace, String pageToken, Integer pageSize) {
729736
List<TableIdentifier> results = catalog.listViews(namespace);
730737

731-
Pair<List<TableIdentifier>, String> page =
732-
paginate(results, pageToken, Integer.parseInt(pageSize));
738+
Pair<List<TableIdentifier>, String> page = paginate(results, pageToken, pageSize);
733739

734740
return ListTablesResponse.builder().addAll(page.first()).nextPageToken(page.second()).build();
735741
}

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;
2222
import static org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation.validateIcebergProperties;
2323

24+
import com.google.common.annotations.VisibleForTesting;
2425
import com.google.common.base.Preconditions;
2526
import com.google.common.collect.ImmutableList;
2627
import com.google.common.collect.ImmutableMap;
@@ -189,8 +190,8 @@ private Response withCatalog(
189190
}
190191
}
191192

192-
private IcebergCatalogHandler newHandlerWrapper(
193-
SecurityContext securityContext, String catalogName) {
193+
@VisibleForTesting
194+
IcebergCatalogHandler newHandlerWrapper(SecurityContext securityContext, String catalogName) {
194195
validatePrincipal(securityContext);
195196

196197
return new IcebergCatalogHandler(

service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ public ListNamespacesResponse listNamespaces(
190190
.nextPageToken(results.pageToken.toTokenString())
191191
.build();
192192
} else {
193-
return catalogHandlerUtils.listNamespaces(
194-
namespaceCatalog, parent, pageToken, String.valueOf(pageSize));
193+
return catalogHandlerUtils.listNamespaces(namespaceCatalog, parent, pageToken, pageSize);
195194
}
196195
}
197196

@@ -351,8 +350,7 @@ public ListTablesResponse listTables(Namespace namespace, String pageToken, Inte
351350
.nextPageToken(results.pageToken.toTokenString())
352351
.build();
353352
} else {
354-
return catalogHandlerUtils.listTables(
355-
baseCatalog, namespace, pageToken, String.valueOf(pageSize));
353+
return catalogHandlerUtils.listTables(baseCatalog, namespace, pageToken, pageSize);
356354
}
357355
}
358356

@@ -1017,8 +1015,7 @@ public ListTablesResponse listViews(Namespace namespace, String pageToken, Integ
10171015
.nextPageToken(results.pageToken.toTokenString())
10181016
.build();
10191017
} else if (baseCatalog instanceof ViewCatalog viewCatalog) {
1020-
return catalogHandlerUtils.listViews(
1021-
viewCatalog, namespace, pageToken, String.valueOf(pageSize));
1018+
return catalogHandlerUtils.listViews(viewCatalog, namespace, pageToken, pageSize);
10221019
} else {
10231020
throw new BadRequestException(
10241021
"Unsupported operation: listViews with baseCatalog type: %s",
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.service.catalog.iceberg;
21+
22+
import java.io.IOException;
23+
import java.lang.reflect.Field;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.stream.Stream;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.catalog.Namespace;
29+
import org.apache.iceberg.catalog.TableIdentifier;
30+
import org.apache.iceberg.inmemory.InMemoryCatalog;
31+
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
32+
import org.apache.iceberg.rest.responses.ListTablesResponse;
33+
import org.apache.polaris.core.admin.model.AuthenticationParameters;
34+
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
35+
import org.apache.polaris.core.admin.model.BearerAuthenticationParameters;
36+
import org.apache.polaris.core.admin.model.CatalogProperties;
37+
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
38+
import org.apache.polaris.core.admin.model.CreateCatalogRequest;
39+
import org.apache.polaris.core.admin.model.ExternalCatalog;
40+
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
41+
import org.apache.polaris.core.admin.model.StorageConfigInfo;
42+
import org.apache.polaris.service.TestServices;
43+
import org.assertj.core.api.Assertions;
44+
import org.assertj.core.util.Strings;
45+
import org.junit.jupiter.api.BeforeEach;
46+
import org.junit.jupiter.params.ParameterizedTest;
47+
import org.junit.jupiter.params.provider.Arguments;
48+
import org.junit.jupiter.params.provider.MethodSource;
49+
import org.mockito.Mockito;
50+
51+
public class IcebergCatalogAdapterTest {
52+
53+
private static final String FEDERATED_CATALOG_NAME = "polaris-federated-catalog";
54+
55+
private TestServices testServices;
56+
private IcebergCatalogAdapter catalogAdapter;
57+
58+
@BeforeEach
59+
public void setUp() {
60+
// Set up test services with catalog federation enabled
61+
testServices =
62+
TestServices.builder().config(Map.of("ENABLE_CATALOG_FEDERATION", "true")).build();
63+
catalogAdapter = Mockito.spy(testServices.catalogAdapter());
64+
65+
// Prepare storage and connection configs for a federated Iceberg REST catalog
66+
String storageLocation = "s3://my-bucket/path/to/data";
67+
AwsStorageConfigInfo storageConfig =
68+
AwsStorageConfigInfo.builder()
69+
.setRoleArn("arn:aws:iam::012345678901:role/polaris-user-role")
70+
.setExternalId("externalId")
71+
.setUserArn("aws::a:user:arn")
72+
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
73+
.setAllowedLocations(List.of(storageLocation, "s3://externally-owned-bucket"))
74+
.build();
75+
76+
AuthenticationParameters authParams =
77+
BearerAuthenticationParameters.builder()
78+
.setAuthenticationType(AuthenticationParameters.AuthenticationTypeEnum.BEARER)
79+
.setBearerToken("xxx")
80+
.build();
81+
82+
IcebergRestConnectionConfigInfo connectionConfig =
83+
IcebergRestConnectionConfigInfo.builder()
84+
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
85+
.setAuthenticationParameters(authParams)
86+
.setUri("http://localhost:8080/api/v1/catalogs")
87+
.setRemoteCatalogName("remote-catalog")
88+
.build();
89+
90+
// Register the catalog in the test environment
91+
testServices
92+
.catalogsApi()
93+
.createCatalog(
94+
new CreateCatalogRequest(
95+
ExternalCatalog.builder()
96+
.setName(FEDERATED_CATALOG_NAME)
97+
.setProperties(
98+
CatalogProperties.builder().setDefaultBaseLocation(storageLocation).build())
99+
.setConnectionConfigInfo(connectionConfig)
100+
.setStorageConfigInfo(storageConfig)
101+
.build()),
102+
testServices.realmContext(),
103+
testServices.securityContext());
104+
}
105+
106+
@ParameterizedTest(name = "[{index}] initialPageToken={0}, pageSize={1}")
107+
@MethodSource("paginationTestCases")
108+
void testPaginationForNonIcebergCatalog(String initialPageToken, Integer pageSize)
109+
throws IOException {
110+
111+
try (InMemoryCatalog inMemoryCatalog = new InMemoryCatalog()) {
112+
// Initialize and replace the default handler with one backed by in-memory catalog
113+
inMemoryCatalog.initialize("inMemory", Map.of());
114+
mockCatalogAdapter(inMemoryCatalog);
115+
116+
// Set up 10 entities in the catalog: 10 namespaces, 10 tables, 10 views
117+
int entityCount = 10;
118+
for (int i = 0; i < entityCount; ++i) {
119+
inMemoryCatalog.createNamespace(Namespace.of("ns" + i));
120+
inMemoryCatalog.createTable(TableIdentifier.of("ns0", "table" + i), new Schema());
121+
inMemoryCatalog
122+
.buildView(TableIdentifier.of("ns0", "view" + i))
123+
.withSchema(new Schema())
124+
.withDefaultNamespace(Namespace.of("ns0"))
125+
.withQuery("a", "SELECT * FROM ns0.table" + i)
126+
.create();
127+
}
128+
129+
// Determine starting index for pagination based on the initial page token
130+
int pageStart =
131+
Strings.isNullOrEmpty(initialPageToken) ? 0 : Integer.parseInt(initialPageToken);
132+
int remain = entityCount - pageStart;
133+
134+
// Initial tokens for pagination
135+
String listNamespacePageToken = initialPageToken;
136+
String listTablesPageToken = initialPageToken;
137+
String listViewsPageToken = initialPageToken;
138+
139+
// Simulate page-by-page fetching until all entities are consumed
140+
while (remain > 0) {
141+
int expectedSize =
142+
(pageSize != null && initialPageToken != null) ? Math.min(remain, pageSize) : remain;
143+
144+
// Verify namespaces pagination
145+
ListNamespacesResponse namespacesResponse =
146+
(ListNamespacesResponse)
147+
catalogAdapter
148+
.listNamespaces(
149+
FEDERATED_CATALOG_NAME,
150+
listNamespacePageToken,
151+
pageSize,
152+
null,
153+
testServices.realmContext(),
154+
testServices.securityContext())
155+
.getEntity();
156+
Assertions.assertThat(namespacesResponse.namespaces()).hasSize(expectedSize);
157+
listNamespacePageToken = namespacesResponse.nextPageToken();
158+
159+
// Verify tables pagination
160+
ListTablesResponse tablesResponse =
161+
(ListTablesResponse)
162+
catalogAdapter
163+
.listTables(
164+
FEDERATED_CATALOG_NAME,
165+
"ns0",
166+
listTablesPageToken,
167+
pageSize,
168+
testServices.realmContext(),
169+
testServices.securityContext())
170+
.getEntity();
171+
Assertions.assertThat(tablesResponse.identifiers()).hasSize(expectedSize);
172+
listTablesPageToken = tablesResponse.nextPageToken();
173+
174+
// Verify views pagination
175+
ListTablesResponse viewsResponse =
176+
(ListTablesResponse)
177+
catalogAdapter
178+
.listViews(
179+
FEDERATED_CATALOG_NAME,
180+
"ns0",
181+
listViewsPageToken,
182+
pageSize,
183+
testServices.realmContext(),
184+
testServices.securityContext())
185+
.getEntity();
186+
Assertions.assertThat(viewsResponse.identifiers()).hasSize(expectedSize);
187+
listViewsPageToken = viewsResponse.nextPageToken();
188+
189+
remain -= expectedSize;
190+
}
191+
}
192+
}
193+
194+
private void mockCatalogAdapter(org.apache.iceberg.catalog.Catalog catalog) {
195+
// Override handler creation to inject in-memory catalog and suppress actual close()
196+
Mockito.doAnswer(
197+
invocation -> {
198+
IcebergCatalogHandler realHandler =
199+
(IcebergCatalogHandler) invocation.callRealMethod();
200+
IcebergCatalogHandler wrappedHandler = Mockito.spy(realHandler);
201+
202+
// Override initializeCatalog to inject test catalog using reflection
203+
Mockito.doAnswer(
204+
innerInvocation -> {
205+
for (String fieldName :
206+
List.of("baseCatalog", "namespaceCatalog", "viewCatalog")) {
207+
Field field = IcebergCatalogHandler.class.getDeclaredField(fieldName);
208+
field.setAccessible(true);
209+
field.set(wrappedHandler, catalog);
210+
}
211+
return null;
212+
})
213+
.when(wrappedHandler)
214+
.initializeCatalog();
215+
216+
// Prevent catalog from being closed during test lifecycle
217+
Mockito.doNothing().when(wrappedHandler).close();
218+
219+
return wrappedHandler;
220+
})
221+
.when(catalogAdapter)
222+
.newHandlerWrapper(Mockito.any(), Mockito.any());
223+
}
224+
225+
private static Stream<Arguments> paginationTestCases() {
226+
return Stream.of(
227+
Arguments.of(null, null),
228+
Arguments.of(null, 1),
229+
Arguments.of(null, 3),
230+
Arguments.of(null, 5),
231+
Arguments.of(null, 10),
232+
Arguments.of(null, 20),
233+
Arguments.of("", null),
234+
Arguments.of("", 1),
235+
Arguments.of("", 3),
236+
Arguments.of("", 5),
237+
Arguments.of("", 10),
238+
Arguments.of("", 20),
239+
Arguments.of("5", null),
240+
Arguments.of("5", 1),
241+
Arguments.of("5", 3),
242+
Arguments.of("5", 5),
243+
Arguments.of("5", 10));
244+
}
245+
}

0 commit comments

Comments
 (0)