Skip to content

Commit 8ee76e8

Browse files
committed
Simplify connector handle serialization
1 parent 9555472 commit 8ee76e8

File tree

19 files changed

+195
-477
lines changed

19 files changed

+195
-477
lines changed

core/trino-main/src/main/java/io/trino/connector/ConnectorContextInstance.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
import io.trino.spi.connector.MetadataProvider;
2222
import io.trino.spi.type.TypeManager;
2323

24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.function.Supplier;
2526

27+
import static com.google.common.base.Preconditions.checkState;
2628
import static java.util.Objects.requireNonNull;
2729

2830
public class ConnectorContextInstance
@@ -35,6 +37,7 @@ public class ConnectorContextInstance
3537
private final PageSorter pageSorter;
3638
private final PageIndexerFactory pageIndexerFactory;
3739
private final Supplier<ClassLoader> duplicatePluginClassLoaderFactory;
40+
private final AtomicBoolean pluginClassLoaderDuplicated = new AtomicBoolean();
3841

3942
public ConnectorContextInstance(
4043
NodeManager nodeManager,
@@ -93,6 +96,7 @@ public PageIndexerFactory getPageIndexerFactory()
9396
@Override
9497
public ClassLoader duplicatePluginClassLoader()
9598
{
99+
checkState(!pluginClassLoaderDuplicated.getAndSet(true), "plugin class loader already duplicated");
96100
return duplicatePluginClassLoaderFactory.get();
97101
}
98102
}

core/trino-main/src/main/java/io/trino/connector/ConnectorManager.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import java.util.concurrent.ConcurrentHashMap;
8282
import java.util.concurrent.ConcurrentMap;
8383
import java.util.concurrent.atomic.AtomicBoolean;
84+
import java.util.function.Function;
8485
import java.util.function.Supplier;
8586

8687
import static com.google.common.base.Preconditions.checkArgument;
@@ -213,7 +214,7 @@ public synchronized void stop()
213214
}
214215
}
215216

216-
public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
217+
public synchronized void addConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory)
217218
{
218219
requireNonNull(connectorFactory, "connectorFactory is null");
219220
requireNonNull(duplicatePluginClassLoaderFactory, "duplicatePluginClassLoaderFactory is null");
@@ -253,9 +254,13 @@ private synchronized void createCatalog(CatalogName catalogName, String connecto
253254
// create all connectors before adding, so a broken connector does not leave the system half updated
254255
MaterializedConnector connector = new MaterializedConnector(catalogName, createConnector(catalogName, factory, properties));
255256

256-
ConnectorHandleResolver connectorHandleResolver = connector.getConnector().getHandleResolver()
257-
.orElseGet(factory.getConnectorFactory()::getHandleResolver);
258-
checkArgument(connectorHandleResolver != null, "Connector %s does not have a handle resolver", factory);
257+
Set<Class<?>> handleClasses = connector.getConnector().getHandleClasses();
258+
if (handleClasses.isEmpty()) {
259+
handleClasses = connector.getConnector().getHandleResolver()
260+
.or(() -> Optional.ofNullable(factory.getConnectorFactory().getHandleResolver()))
261+
.map(ConnectorHandleResolver::getHandleClasses)
262+
.orElseThrow(() -> new IllegalArgumentException(format("Connector %s does not have a handle resolver", factory)));
263+
}
259264

260265
MaterializedConnector informationSchemaConnector = new MaterializedConnector(
261266
createInformationSchemaCatalogName(catalogName),
@@ -298,10 +303,10 @@ private synchronized void createCatalog(CatalogName catalogName, String connecto
298303
addConnectorInternal(informationSchemaConnector);
299304
addConnectorInternal(systemConnector);
300305
catalogManager.registerCatalog(catalog);
301-
handleResolver.addCatalogHandleResolver(catalogName.getCatalogName(), connectorHandleResolver);
306+
handleResolver.addCatalogHandleClasses(catalogName.getCatalogName(), handleClasses);
302307
}
303308
catch (Throwable e) {
304-
handleResolver.removeCatalogHandleResolver(catalogName.getCatalogName());
309+
handleResolver.removeCatalogHandleClasses(catalogName.getCatalogName());
305310
catalogManager.removeCatalog(catalog.getCatalogName());
306311
removeConnectorInternal(systemConnector.getCatalogName());
307312
removeConnectorInternal(informationSchemaConnector.getCatalogName());
@@ -362,7 +367,7 @@ public synchronized void dropConnection(String catalogName)
362367
removeConnectorInternal(catalog);
363368
removeConnectorInternal(createInformationSchemaCatalogName(catalog));
364369
removeConnectorInternal(createSystemTablesCatalogName(catalog));
365-
handleResolver.removeCatalogHandleResolver(catalogName);
370+
handleResolver.removeCatalogHandleClasses(catalogName);
366371
});
367372
}
368373

@@ -405,7 +410,7 @@ private Connector createConnector(CatalogName catalogName, InternalConnectorFact
405410
new InternalMetadataProvider(metadataManager, typeManager),
406411
pageSorter,
407412
pageIndexerFactory,
408-
factory.getDuplicatePluginClassLoaderFactory());
413+
factory.getDuplicatePluginClassLoaderFactory(catalogName));
409414

410415
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getConnectorFactory().getClass().getClassLoader())) {
411416
return factory.getConnectorFactory().create(catalogName.getCatalogName(), properties, context);
@@ -415,9 +420,9 @@ private Connector createConnector(CatalogName catalogName, InternalConnectorFact
415420
private static class InternalConnectorFactory
416421
{
417422
private final ConnectorFactory connectorFactory;
418-
private final Supplier<ClassLoader> duplicatePluginClassLoaderFactory;
423+
private final Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory;
419424

420-
public InternalConnectorFactory(ConnectorFactory connectorFactory, Supplier<ClassLoader> duplicatePluginClassLoaderFactory)
425+
public InternalConnectorFactory(ConnectorFactory connectorFactory, Function<CatalogName, ClassLoader> duplicatePluginClassLoaderFactory)
421426
{
422427
this.connectorFactory = connectorFactory;
423428
this.duplicatePluginClassLoaderFactory = duplicatePluginClassLoaderFactory;
@@ -428,9 +433,9 @@ public ConnectorFactory getConnectorFactory()
428433
return connectorFactory;
429434
}
430435

431-
public Supplier<ClassLoader> getDuplicatePluginClassLoaderFactory()
436+
public Supplier<ClassLoader> getDuplicatePluginClassLoaderFactory(CatalogName catalogName)
432437
{
433-
return duplicatePluginClassLoaderFactory;
438+
return () -> duplicatePluginClassLoaderFactory.apply(catalogName);
434439
}
435440

436441
@Override

core/trino-main/src/main/java/io/trino/connector/system/SystemConnectorModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private static class SystemConnectorRegistrar
9292
@Inject
9393
public SystemConnectorRegistrar(ConnectorManager manager, GlobalSystemConnectorFactory globalSystemConnectorFactory)
9494
{
95-
manager.addConnectorFactory(globalSystemConnectorFactory, globalSystemConnectorFactory.getClass()::getClassLoader);
95+
manager.addConnectorFactory(globalSystemConnectorFactory, ignored -> globalSystemConnectorFactory.getClass().getClassLoader());
9696
manager.createCatalog(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of());
9797
}
9898
}

core/trino-main/src/main/java/io/trino/metadata/AbstractTypedJacksonModule.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ public abstract class AbstractTypedJacksonModule<T>
5353

5454
protected AbstractTypedJacksonModule(
5555
Class<T> baseClass,
56-
Function<T, String> nameResolver,
57-
Function<String, Class<? extends T>> classResolver)
56+
Function<Object, String> nameResolver,
57+
Function<String, Class<?>> classResolver)
5858
{
5959
super(baseClass.getSimpleName() + "Module", Version.unknownVersion());
6060

61-
TypeIdResolver typeResolver = new InternalTypeResolver<>(nameResolver, classResolver);
61+
TypeIdResolver typeResolver = new InternalTypeResolver(nameResolver, classResolver);
6262

6363
addSerializer(baseClass, new InternalTypeSerializer<>(baseClass, typeResolver));
6464
addDeserializer(baseClass, new InternalTypeDeserializer<>(baseClass, typeResolver));
@@ -133,13 +133,13 @@ private static <T> JsonSerializer<T> createSerializer(SerializerProvider provide
133133
}
134134
}
135135

136-
private static class InternalTypeResolver<T>
136+
private static class InternalTypeResolver
137137
extends TypeIdResolverBase
138138
{
139-
private final Function<T, String> nameResolver;
140-
private final Function<String, Class<? extends T>> classResolver;
139+
private final Function<Object, String> nameResolver;
140+
private final Function<String, Class<?>> classResolver;
141141

142-
public InternalTypeResolver(Function<T, String> nameResolver, Function<String, Class<? extends T>> classResolver)
142+
public InternalTypeResolver(Function<Object, String> nameResolver, Function<String, Class<?>> classResolver)
143143
{
144144
this.nameResolver = requireNonNull(nameResolver, "nameResolver is null");
145145
this.classResolver = requireNonNull(classResolver, "classResolver is null");
@@ -151,13 +151,12 @@ public String idFromValue(Object value)
151151
return idFromValueAndType(value, value.getClass());
152152
}
153153

154-
@SuppressWarnings("unchecked")
155154
@Override
156155
public String idFromValueAndType(Object value, Class<?> suggestedType)
157156
{
158157
requireNonNull(value, "value is null");
159-
String type = nameResolver.apply((T) value);
160-
checkArgument(type != null, "Unknown class: %s", suggestedType.getSimpleName());
158+
String type = nameResolver.apply(value);
159+
checkArgument(type != null, "Unknown class: %s", value.getClass().getName());
161160
return type;
162161
}
163162

core/trino-main/src/main/java/io/trino/metadata/HandleJsonModule.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,55 +42,55 @@ public void configure(Binder binder)
4242
@ProvidesIntoSet
4343
public static com.fasterxml.jackson.databind.Module tableHandleModule(HandleResolver resolver)
4444
{
45-
return new AbstractTypedJacksonModule<>(ConnectorTableHandle.class, resolver::getId, resolver::getTableHandleClass) {};
45+
return new AbstractTypedJacksonModule<>(ConnectorTableHandle.class, resolver::getId, resolver::getHandleClass) {};
4646
}
4747

4848
@ProvidesIntoSet
4949
public static com.fasterxml.jackson.databind.Module columnHandleModule(HandleResolver resolver)
5050
{
51-
return new AbstractTypedJacksonModule<>(ColumnHandle.class, resolver::getId, resolver::getColumnHandleClass) {};
51+
return new AbstractTypedJacksonModule<>(ColumnHandle.class, resolver::getId, resolver::getHandleClass) {};
5252
}
5353

5454
@ProvidesIntoSet
5555
public static com.fasterxml.jackson.databind.Module splitModule(HandleResolver resolver)
5656
{
57-
return new AbstractTypedJacksonModule<>(ConnectorSplit.class, resolver::getId, resolver::getSplitClass) {};
57+
return new AbstractTypedJacksonModule<>(ConnectorSplit.class, resolver::getId, resolver::getHandleClass) {};
5858
}
5959

6060
@ProvidesIntoSet
6161
public static com.fasterxml.jackson.databind.Module outputTableHandleModule(HandleResolver resolver)
6262
{
63-
return new AbstractTypedJacksonModule<>(ConnectorOutputTableHandle.class, resolver::getId, resolver::getOutputTableHandleClass) {};
63+
return new AbstractTypedJacksonModule<>(ConnectorOutputTableHandle.class, resolver::getId, resolver::getHandleClass) {};
6464
}
6565

6666
@ProvidesIntoSet
6767
public static com.fasterxml.jackson.databind.Module insertTableHandleModule(HandleResolver resolver)
6868
{
69-
return new AbstractTypedJacksonModule<>(ConnectorInsertTableHandle.class, resolver::getId, resolver::getInsertTableHandleClass) {};
69+
return new AbstractTypedJacksonModule<>(ConnectorInsertTableHandle.class, resolver::getId, resolver::getHandleClass) {};
7070
}
7171

7272
@ProvidesIntoSet
7373
public static com.fasterxml.jackson.databind.Module tableExecuteHandleModule(HandleResolver resolver)
7474
{
75-
return new AbstractTypedJacksonModule<>(ConnectorTableExecuteHandle.class, resolver::getId, resolver::getTableExecuteHandleClass) {};
75+
return new AbstractTypedJacksonModule<>(ConnectorTableExecuteHandle.class, resolver::getId, resolver::getHandleClass) {};
7676
}
7777

7878
@ProvidesIntoSet
7979
public static com.fasterxml.jackson.databind.Module indexHandleModule(HandleResolver resolver)
8080
{
81-
return new AbstractTypedJacksonModule<>(ConnectorIndexHandle.class, resolver::getId, resolver::getIndexHandleClass) {};
81+
return new AbstractTypedJacksonModule<>(ConnectorIndexHandle.class, resolver::getId, resolver::getHandleClass) {};
8282
}
8383

8484
@ProvidesIntoSet
8585
public static com.fasterxml.jackson.databind.Module transactionHandleModule(HandleResolver resolver)
8686
{
87-
return new AbstractTypedJacksonModule<>(ConnectorTransactionHandle.class, resolver::getId, resolver::getTransactionHandleClass) {};
87+
return new AbstractTypedJacksonModule<>(ConnectorTransactionHandle.class, resolver::getId, resolver::getHandleClass) {};
8888
}
8989

9090
@ProvidesIntoSet
9191
public static com.fasterxml.jackson.databind.Module partitioningHandleModule(HandleResolver resolver)
9292
{
93-
return new AbstractTypedJacksonModule<>(ConnectorPartitioningHandle.class, resolver::getId, resolver::getPartitioningHandleClass) {};
93+
return new AbstractTypedJacksonModule<>(ConnectorPartitioningHandle.class, resolver::getId, resolver::getHandleClass) {};
9494
}
9595

9696
@ProvidesIntoSet

0 commit comments

Comments
 (0)