Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
Expand Down Expand Up @@ -57,7 +57,7 @@ public static void main(String[] args) {
public void run(ApplicationReadyEvent readyEvent) {
ServerLifeCycleManager.toRunning();
log.info("Received spring application context ready event will load taskPlugin and write to DB");
DataSourceProcessorProvider.initialize();
DataSourcePluginManager.loadDataSourcePlugin();
TaskPluginManager.loadTaskPlugin();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.dolphinscheduler.plugin.datasource.api.client;

import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
Expand All @@ -38,7 +38,7 @@ protected BaseAdHocDataSourceClient(BaseConnectionParam baseConnectionParam, DbT
@Override
public Connection getConnection() throws SQLException {
try {
return DataSourceProcessorProvider.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
return DataSourcePluginManager.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
} catch (Exception e) {
throw new SQLException("Create adhoc connection error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,13 @@ public class DataSourceClientProvider {
})
.maximumSize(100)
.build();
private static final DataSourcePluginManager dataSourcePluginManager = new DataSourcePluginManager();

static {
dataSourcePluginManager.installPlugin();
}

public static DataSourceClient getPooledDataSourceClient(DbType dbType,
ConnectionParam connectionParam) throws ExecutionException {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
return POOLED_DATASOURCE_CLIENT_CACHE.get(datasourceUniqueId, () -> {
DataSourceChannel dataSourceChannel = dataSourcePluginManager.getDataSourceChannel(dbType);
DataSourceChannel dataSourceChannel = DataSourcePluginManager.getDataSourceChannel(dbType);
if (null == dataSourceChannel) {
throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getName()));
}
Expand All @@ -83,7 +78,7 @@ public static Connection getPooledConnection(DbType dbType,

public static AdHocDataSourceClient getAdHocDataSourceClient(DbType dbType, ConnectionParam connectionParam) {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
DataSourceChannel dataSourceChannel = dataSourcePluginManager.getDataSourceChannel(dbType);
DataSourceChannel dataSourceChannel = DataSourcePluginManager.getDataSourceChannel(dbType);
if (null == dataSourceChannel) {
throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,65 @@

import static java.lang.String.format;

import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;

import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataSourcePluginManager {

private final Map<String, DataSourceChannel> datasourceChannelMap = new ConcurrentHashMap<>();
private static final Map<String, DataSourceChannel> datasourceChannelMap = new ConcurrentHashMap<>();

public DataSourceChannel getDataSourceChannel(final DbType dbType) {
private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();

public static DataSourceChannel getDataSourceChannel(final DbType dbType) {
return datasourceChannelMap.get(dbType.getName());
}

public void installPlugin() {
public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType dbType) {
return DataSourcePluginManager.getDataSourceProcessor(dbType);
}

public static void loadDataSourcePlugin() {
initializeDataSourceChannel();
initializeDataSourceProcessor();
}

private static void initializeDataSourceChannel() {
PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory =
new PrioritySPIFactory<>(DataSourceChannelFactory.class);
for (Map.Entry<String, DataSourceChannelFactory> entry : prioritySPIFactory.getSPIMap().entrySet()) {
final DataSourceChannelFactory factory = entry.getValue();
final String name = entry.getKey();

log.info("Registering datasource plugin: {}", name);
final DataSourceChannelFactory factory = entry.getValue();

if (datasourceChannelMap.containsKey(name)) {
throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
throw new IllegalStateException(format("Duplicate datasource channel named '%s'", name));
}

loadDatasourceClient(factory);

log.info("Registered datasource plugin: {}", name);
datasourceChannelMap.put(factory.getName(), factory.create());
log.info("Registered datasource channel: {}", name);
}
}

private void loadDatasourceClient(DataSourceChannelFactory datasourceChannelFactory) {
DataSourceChannel datasourceChannel = datasourceChannelFactory.create();
datasourceChannelMap.put(datasourceChannelFactory.getName(), datasourceChannel);
private static void initializeDataSourceProcessor() {

ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
final String name = factory.getDbType().name();
if (dataSourceProcessorMap.containsKey(name)) {
throw new IllegalStateException(format("Duplicate datasource processor named '%s'", name));
}
DataSourceProcessor dataSourceProcessor = factory.create();
dataSourceProcessorMap.put(dataSourceProcessor.getDbType().name(), dataSourceProcessor);
log.info("Success register datasource processor -> {}", name);
});
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;

import java.sql.Connection;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -75,12 +74,7 @@ public static BaseDataSourceParamDTO buildDatasourceParamDTO(DbType dbType, Stri
}

public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
Map<String, DataSourceProcessor> dataSourceProcessorMap =
DataSourceProcessorProvider.getDataSourceProcessorMap();
if (!dataSourceProcessorMap.containsKey(dbType.name())) {
throw new IllegalArgumentException("illegal datasource type");
}
return dataSourceProcessorMap.get(dbType.name());
return DataSourcePluginManager.getDataSourceProcessor(dbType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void initialized() {

// install task plugin
TaskPluginManager.loadTaskPlugin();
DataSourceProcessorProvider.initialize();
DataSourcePluginManager.loadDataSourcePlugin();

// self tolerant
this.masterRegistryClient.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
Expand Down Expand Up @@ -95,7 +95,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
procedureParameters.getLocalParams());

DbType dbType = DbType.valueOf(procedureParameters.getType());
DataSourceProcessor dataSourceProcessor = DataSourceProcessorProvider.getDataSourceProcessor(dbType);
DataSourceProcessor dataSourceProcessor = DataSourcePluginManager.getDataSourceProcessor(dbType);
ConnectionParam connectionParams =
dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams());
try (Connection connection = DataSourceClientProvider.getAdHocConnection(dbType, connectionParams)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
// get datasource
baseConnectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dbType,
sqlTaskExecutionContext.getConnectionParams());
List<String> subSqls = DataSourceProcessorProvider.getDataSourceProcessor(dbType)
List<String> subSqls = DataSourcePluginManager.getDataSourceProcessor(dbType)
.splitAndRemoveComment(sqlParameters.getSql());

// ready to execute SQL and parameter entity Map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void run() {

TaskPluginManager.loadTaskPlugin();

DataSourceProcessorProvider.initialize();
DataSourcePluginManager.loadDataSourcePlugin();

this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
Expand Down
Loading