From 1e2ad857534d15ce70680e5db89faa7933d18f07 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Mon, 27 Jan 2025 11:10:00 +0800 Subject: [PATCH] add kingbase source connector --- .../qlangtech/tis/fullbuild/servlet/Test.java | 53 -------- .../IncrStatusUmbilicalProtocolImpl.java | 2 +- .../coredefine/module/action/DataxAction.java | 15 ++- .../module/action/PluginAction.java | 1 + .../tis/manage/TerminatorEndpointConfig.java | 45 ------- .../manage/servlet/LogFeedbackServlet.java | 72 ++++++----- .../tis/realtime/yarn/rpc/JobType.java | 6 +- .../com/qlangtech/tis/datax/IDataxReader.java | 2 +- .../tis/datax/SourceColMetaGetter.java | 9 +- .../tis/extension/impl/EnumFieldMode.java | 15 ++- .../tis/extension/impl/SuFormProperties.java | 2 +- .../qlangtech/tis/plugin/IEndTypeGetter.java | 2 +- .../tis/plugin/IRepositoryTargetFile.java | 9 ++ .../tis/plugin/datax/SelectedTab.java | 6 +- .../plugin/datax/common/AutoCreateTable.java | 118 ++++++++++++++++++ .../AutoCreateTableColCommentSwitch.java | 112 +++++++++++++++++ .../AutoCreateTableColCommentSwitchOFF.java | 55 ++++++++ .../datax/common/impl/NoneCreateTable.java | 78 ++++++++++++ .../transformer/RecordTransformerRules.java | 23 +++- .../datax/transformer/TargetColumn.java | 25 +++- .../tis/plugin/ds/DataSourceFactory.java | 32 +++-- .../plugin/ds/IInitWriterTableExecutor.java | 9 ++ .../tis/plugin/ds/JDBCConnection.java | 11 +- .../tis/plugin/ds/JDBCConnectionPool.java | 4 +- .../plugin/datax/common/AutoCreateTable.json | 7 ++ .../tis/plugin/endtype/icon/kingbase/fill.svg | 1 + .../plugin/endtype/icon/kingbase/outline.svg | 1 + tis-web-config/config.properties | 4 +- 28 files changed, 549 insertions(+), 170 deletions(-) delete mode 100644 tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/servlet/Test.java delete mode 100644 tis-console/src/main/java/com/qlangtech/tis/manage/TerminatorEndpointConfig.java create mode 100644 tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.java create mode 100644 tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTableColCommentSwitch.java create mode 100644 tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/AutoCreateTableColCommentSwitchOFF.java create mode 100644 tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/NoneCreateTable.java create mode 100644 tis-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.json create mode 100644 tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/fill.svg create mode 100644 tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/outline.svg diff --git a/tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/servlet/Test.java b/tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/servlet/Test.java deleted file mode 100644 index 9bddee0fc..000000000 --- a/tis-assemble/src/main/java/com/qlangtech/tis/fullbuild/servlet/Test.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.qlangtech.tis.fullbuild.servlet; - -import java.util.concurrent.Future; - -/** - * @author: 百岁(baisui@qlangtech.com) - * @create: 2021-08-31 13:00 - **/ -public class Test { - public static void main(String[] args) throws Exception { - long l = 511900036588699690l; - System.out.println(l); - System.out.println( Integer.MIN_VALUE); -// Future f = TisServlet.executeService.submit(() -> { -// -// -// while (true) { -// try { -// -// System.out.println("i am here"); -// Thread.sleep(5000l); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// -// } -// }); -// -// Thread.sleep(1000l); -// f.cancel(true); -// System.out.println("all over"); -// -// Thread.sleep(90000l); - } -} diff --git a/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusUmbilicalProtocolImpl.java b/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusUmbilicalProtocolImpl.java index 680f36594..3a4ebab87 100644 --- a/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusUmbilicalProtocolImpl.java +++ b/tis-assemble/src/main/java/com/qlangtech/tis/rpc/server/IncrStatusUmbilicalProtocolImpl.java @@ -153,7 +153,7 @@ public void reportStatus(com.qlangtech.tis.grpc.UpdateCounterMap updateCounter, tableMultiDataIndexStatus.setIncrProcessPaused(updateCounterFromClient.getIncrProcessPaused()); // tableMultiDataIndexStatus.setTis30sAvgRT(updateCounterFromClient.getTis30sAvgRT()); tableMultiDataIndexStatus.setTis30sAvgRT(updateCounterFromClient.getTis30SAvgRT()); - for (Map.Entry tabUpdate : entry.getValue().getTableConsumeDataMap().entrySet()) { + for (Map.Entry tabUpdate : updateCounterFromClient.getTableConsumeDataMap().entrySet()) { tableMultiDataIndexStatus.put(tabUpdate.getKey(), new ConsumeDataKeeper(tabUpdate.getValue(), updateTime)); } } diff --git a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java index 1daf18abb..f8cd9b867 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java +++ b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java @@ -92,6 +92,7 @@ import com.qlangtech.tis.plugin.ds.DataTypeMeta; import com.qlangtech.tis.plugin.ds.DataTypeMeta.IMultiItemsView; import com.qlangtech.tis.plugin.ds.DefaultTab; +import com.qlangtech.tis.plugin.ds.IInitWriterTableExecutor; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugin.ds.IdlistElementCreatorFactory; import com.qlangtech.tis.plugin.trigger.JobTrigger; @@ -1335,6 +1336,7 @@ public void doPreviewTableRows(Context context) { @Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false) public void doGetTableMapper(Context context) { String dataxName = this.getString(PARAM_KEY_DATAX_NAME); + boolean forceInit = this.getBoolean("forceInit"); KeyedPluginStore readerStore = DataxReader.getPluginStore(this, dataxName); DataxReader dataxReader = readerStore.getPlugin(); Objects.requireNonNull(dataxReader, "dataReader:" + dataxName + " relevant instance can not be null"); @@ -1342,8 +1344,13 @@ public void doGetTableMapper(Context context) { TableAlias tableAlias; Optional dataXAppSource = IAppSource.loadNullable(this, dataxName); TableAliasMapper tabMaps = null;//Collections.emptyMap(); + Optional mapperTabPrefix = Optional.empty(); if (dataXAppSource.isPresent()) { DataxProcessor dataxSource = dataXAppSource.get(); + IDataxWriter dataXWriter = dataxSource.getWriter(this, true); + if (dataXWriter instanceof IInitWriterTableExecutor) { + mapperTabPrefix = ((IInitWriterTableExecutor) dataXWriter).getAutoCreateTableCanNotBeNull().getMapperTabPrefix(); + } tabMaps = dataxSource.getTabAlias(this); } if (tabMaps == null) { @@ -1356,8 +1363,12 @@ public void doGetTableMapper(Context context) { List tmapList = Lists.newArrayList(); for (ISelectedTab selectedTab : dataxReader.getSelectedTabs()) { tableAlias = tabMaps.get(selectedTab); - if (tableAlias == null) { - tmapList.add(new TableAlias(selectedTab.getName())); + if (forceInit || tableAlias == null) { + tableAlias = new TableAlias(selectedTab.getName()); + if (mapperTabPrefix.isPresent()) { + tableAlias.setTo(mapperTabPrefix.get() + selectedTab.getName()); + } + tmapList.add(tableAlias); } else { tmapList.add(tableAlias); } diff --git a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/PluginAction.java b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/PluginAction.java index 1e402a6d3..787da83f2 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/PluginAction.java +++ b/tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/PluginAction.java @@ -1076,6 +1076,7 @@ public static DatasourceDb createDatabase(BasicModule module, Descriptor.ParseDe criteria.createCriteria().andNameEqualTo(dbName); int exist = module.getWorkflowDAOFacade().getDatasourceDbDAO().countByExample(criteria); if (exist > 0) { + module.addErrorMessage(context, "已经有了同名(" + dbName + ")的数据库"); return null; } diff --git a/tis-console/src/main/java/com/qlangtech/tis/manage/TerminatorEndpointConfig.java b/tis-console/src/main/java/com/qlangtech/tis/manage/TerminatorEndpointConfig.java deleted file mode 100644 index 52046dce4..000000000 --- a/tis-console/src/main/java/com/qlangtech/tis/manage/TerminatorEndpointConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.qlangtech.tis.manage; - -/** - * @author 百岁(baisui@qlangtech.com) - * @date 2014-4-9 - */ -public class TerminatorEndpointConfig { - // private static final Logger log = LogFactory - // .getLog(TerminatorEndpointConfig.class); - // - // @Override - // public Set> getAnnotatedEndpointClasses(Set> scanned) { - // - // return null; - // } - // - // @Override - // public Set getEndpointConfigs( - // Set> scanned) { - // log.info("execute getEndpointConfigs"); - // Set result = new HashSet(); - // - // result.add(ServerEndpointConfig.Builder.create( - // LogFeedbackServlet.class, "/download/logfeedback").build()); - // - // return result; - // } -} diff --git a/tis-console/src/main/java/com/qlangtech/tis/manage/servlet/LogFeedbackServlet.java b/tis-console/src/main/java/com/qlangtech/tis/manage/servlet/LogFeedbackServlet.java index 843dc6c77..45e4cde9c 100644 --- a/tis-console/src/main/java/com/qlangtech/tis/manage/servlet/LogFeedbackServlet.java +++ b/tis-console/src/main/java/com/qlangtech/tis/manage/servlet/LogFeedbackServlet.java @@ -22,15 +22,22 @@ import com.alibaba.fastjson.JSONObject; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.JsonFormat; +import com.qlangtech.tis.TIS; import com.qlangtech.tis.assemble.ExecResult; import com.qlangtech.tis.assemble.FullbuildPhase; +import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory; +import com.qlangtech.tis.cloud.ITISCoordinator; +import com.qlangtech.tis.coredefine.module.action.CoreAction; import com.qlangtech.tis.coredefine.module.action.ExtendWorkFlowBuildHistory; import com.qlangtech.tis.coredefine.module.action.TISK8sDelegate; import com.qlangtech.tis.exec.ExecutePhaseRange; import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection; import com.qlangtech.tis.job.common.JobCommon; import com.qlangtech.tis.manage.spring.ZooKeeperGetter; +import com.qlangtech.tis.plugin.PluginStore; import com.qlangtech.tis.pubhook.common.RunEnvironment; +import com.qlangtech.tis.realtime.yarn.rpc.JobType; +import com.qlangtech.tis.realtime.yarn.rpc.TopicInfo; import com.qlangtech.tis.rpc.grpc.log.LogCollectorClient; import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState; import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget; @@ -56,6 +63,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -193,7 +201,7 @@ public void onWebSocketText(String message) { private StreamObserver getMonitorSet() { if (pMonotorObserver == null) { - // StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get(); + // StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get(); pMonotorObserver = getStatusRpc().registerMonitorEvent(this); } @@ -301,7 +309,7 @@ private void addMonitor(MonotorTarget monitorTarget) throws Exception { } else if (monitorTarget.testLogType(LogType.BuildPhraseMetrics)) { executorService.execute(() -> { try { - // StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get(); + // StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get(); final Iterator statIt = getStatusRpc().buildPhraseStatus(taskid); boolean serverSideBreak = true; while (isConnected() && statIt.hasNext()) { @@ -319,24 +327,24 @@ private void addMonitor(MonotorTarget monitorTarget) throws Exception { throw new RuntimeException("taskid:" + taskid, e); } }); - //} else if (monitorTarget.testLogType(LogType.MQ_TAGS_STATUS)) { -// PluginStore mqListenerFactory = TIS.getPluginStore(this.collectionName, MQListenerFactory.class); -// MQListenerFactory plugin = mqListenerFactory.getPlugin(); -// // 增量节点处理 -// final Map /* this.tag */ -// transferTagStatus = new HashMap<>(); -// final Map /* this.tag */ -// binlogTopicTagStatus = new HashMap<>(); -// List focusTags = getFocusTags(zkGetter.getInstance(), collectionName); -// // 如果size为0,则说明远程工作节点没有正常执行 -// if (focusTags.size() > 0) { -// TopicTagIncrStatus topicTagIncrStatus = new TopicTagIncrStatus(focusTags); -// executorService.execute(() -> { -// IncrTagHeatBeatMonitor incrTagHeatBeatMonitor = new IncrTagHeatBeatMonitor(this.collectionName, this -// , transferTagStatus, binlogTopicTagStatus, topicTagIncrStatus, plugin.createConsumerStatus(), zkGetter); -// incrTagHeatBeatMonitor.build(); -// }); -// } + } else if (monitorTarget.testLogType(LogType.MQ_TAGS_STATUS)) { + PluginStore mqListenerFactory = (PluginStore) TIS.getPluginStore(this.collectionName, MQListenerFactory.class); + MQListenerFactory plugin = mqListenerFactory.getPlugin(); + // 增量节点处理 + final Map /* this.tag */ + transferTagStatus = new HashMap<>(); + final Map /* this.tag */ + binlogTopicTagStatus = new HashMap<>(); + List focusTags = getFocusTags(zkGetter.getInstance(), collectionName); + // 如果size为0,则说明远程工作节点没有正常执行 + if (focusTags.size() > 0) { + TopicTagIncrStatus topicTagIncrStatus = new TopicTagIncrStatus(focusTags); + executorService.execute(() -> { + IncrTagHeatBeatMonitor incrTagHeatBeatMonitor = new IncrTagHeatBeatMonitor(this.collectionName, this + , transferTagStatus, binlogTopicTagStatus, topicTagIncrStatus, plugin.createConsumerStatus(), zkGetter); + incrTagHeatBeatMonitor.build(); + }); + } } else { throw new IllegalStateException("monitor type:" + monitorTarget + " is illegal"); } @@ -483,18 +491,18 @@ private String getParameter(String key, List dft) { } } -// public static List getFocusTags(ITISCoordinator zookeeper, String collectionName) throws MalformedURLException { -// // -// JobType.RemoteCallResult topicInfo = JobType.ACTION_getTopicTags.assembIncrControlWithResult( -// CoreAction.getAssembleNodeAddress(zookeeper), -// collectionName, Collections.emptyList(), TopicInfo.class); -// if (topicInfo.biz.getTopicWithTags().size() < 1) { -// // 返回为空的话可以证明没有正常启动 -// return Collections.emptyList(); -// } -// TopicInfo topicTags = topicInfo.biz; -// return topicTags.getTopicWithTags().entrySet().stream().map((entry) -> new TopicTagIncrStatus.FocusTags(entry.getKey(), entry.getValue())).collect(Collectors.toList()); -// } + public static List getFocusTags(ITISCoordinator zookeeper, String collectionName) throws MalformedURLException { + // + JobType.RemoteCallResult topicInfo = JobType.ACTION_getTopicTags.assembIncrControlWithResult( + CoreAction.getAssembleNodeAddress(zookeeper), + collectionName, Collections.emptyList(), TopicInfo.class); + if (topicInfo.biz.getTopicWithTags().size() < 1) { + // 返回为空的话可以证明没有正常启动 + return Collections.emptyList(); + } + TopicInfo topicTags = topicInfo.biz; + return topicTags.getTopicWithTags().entrySet().stream().map((entry) -> new TopicTagIncrStatus.FocusTags(entry.getKey(), entry.getValue())).collect(Collectors.toList()); + } static class TagCountMap extends HashMap { diff --git a/tis-manage-pojo/src/main/java/com/qlangtech/tis/realtime/yarn/rpc/JobType.java b/tis-manage-pojo/src/main/java/com/qlangtech/tis/realtime/yarn/rpc/JobType.java index ac4769959..f4b6983e7 100644 --- a/tis-manage-pojo/src/main/java/com/qlangtech/tis/realtime/yarn/rpc/JobType.java +++ b/tis-manage-pojo/src/main/java/com/qlangtech/tis/realtime/yarn/rpc/JobType.java @@ -42,9 +42,9 @@ public enum JobType { // // // QueryIndexJobRunningStatus(2, "QueryIncrStatus"), // // incr process tags的状态 - Collection_TopicTags_status(3, "collection_topic_tags_status"); -// // 取得增量监听的tags -// ACTION_getTopicTags(4, "get_topic_tags"); + Collection_TopicTags_status(3, "collection_topic_tags_status"), + // 取得增量监听的tags + ACTION_getTopicTags(4, "get_topic_tags"); public int getValue() { return value; diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxReader.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxReader.java index 561955351..a76e5e015 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxReader.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/IDataxReader.java @@ -34,7 +34,7 @@ public interface IDataxReader extends DataSourceMeta, IDataXPluginMeta , IStreamTableMeataCreator.ISourceStreamMetaCreator, IRepositoryResourceScannable, IReaderSource { default SourceColMetaGetter createSourceColMetaGetter() { - return new SourceColMetaGetter(this); + return new SourceColMetaGetter(this, true); } /** diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/datax/SourceColMetaGetter.java b/tis-plugin/src/main/java/com/qlangtech/tis/datax/SourceColMetaGetter.java index b81fda3d2..59993a879 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/datax/SourceColMetaGetter.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/datax/SourceColMetaGetter.java @@ -37,7 +37,7 @@ public class SourceColMetaGetter { private final Map> tab2ColsMapper = Maps.newHashMap(); public static SourceColMetaGetter getNone() { - SourceColMetaGetter colMetaGetter = new SourceColMetaGetter(null) { + SourceColMetaGetter colMetaGetter = new SourceColMetaGetter(null, false) { @Override public ColumnMetaData getColMeta(TableMap tableMapper, String colName) { return null; @@ -46,8 +46,11 @@ public ColumnMetaData getColMeta(TableMap tableMapper, String colName) { return colMetaGetter; } - public SourceColMetaGetter(IDataxReader dataXReader) { - this.dataXReader = Objects.requireNonNull(dataXReader, "dataXReader"); + public SourceColMetaGetter(IDataxReader dataXReader, boolean validateNull) { + if (validateNull) { + Objects.requireNonNull(dataXReader, "dataXReader"); + } + this.dataXReader = dataXReader; } protected Map getColMetaDataMap(IDataxReader dataXReader, TableMap tableMapper) { diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/EnumFieldMode.java b/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/EnumFieldMode.java index e3351e77e..bfe7e9c5c 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/EnumFieldMode.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/EnumFieldMode.java @@ -18,10 +18,13 @@ package com.qlangtech.tis.extension.impl; +import com.qlangtech.tis.plugin.IdentityName; + import java.lang.reflect.Field; import java.util.List; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.stream.Collectors; /** * @author 百岁 (baisui@qlangtech.com) @@ -32,7 +35,7 @@ public enum EnumFieldMode { if (!isValOfOptionList(dftVal, fieldDesc)) { throw new IllegalStateException(fieldDesc + " val " + dftVal.getClass() + " " + "must be " + "type" + " " + "of " + List.class.getName()); } - return dftVal; + return ((List) dftVal).stream().map((val) -> (val instanceof IdentityName) ? ((IdentityName) val).identityValue() : val).collect(Collectors.toList()); }) // , DEFAULT("default", (dftVal, fieldDesc) -> { if (isValOfOptionList(dftVal, fieldDesc)) { @@ -84,10 +87,15 @@ private static boolean isValOfOptionList(Object val, String fieldDesc) { Class valClass = val.getClass(); if (List.class.isAssignableFrom(valClass)) { for (Object o : ((List) val)) { - if (!(o.getClass() == String.class)) { + + if (!((o.getClass() == String.class) || (IdentityName.class.isAssignableFrom(o.getClass())))) { throw new IllegalStateException(fieldDesc + ",opt" + " " + "element " + o.getClass() + " " + - "must be type of " + String.class); + "must be type of " + String.class.getSimpleName() + " or " + IdentityName.class.getSimpleName()); } +// if (!(o.getClass() == String.class)) { +// throw new IllegalStateException(fieldDesc + ",opt" + " " + "element " + o.getClass() + " " + +// "must be type of " + String.class); +// } } return true; } @@ -97,7 +105,6 @@ private static boolean isValOfOptionList(Object val, String fieldDesc) { private static Object getFirstVal(Object val) { for (Object vv : (List) val) { return String.valueOf(vv); - } return null; } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/SuFormProperties.java b/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/SuFormProperties.java index 9b4efddf9..d8b865c60 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/SuFormProperties.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/extension/impl/SuFormProperties.java @@ -70,7 +70,7 @@ public static SuFormGetterContext setSuFormGetterContext(Describable plugin, IPl if (plugin instanceof IDataxReader) { subFormContext.plugin = (IDataxReader) plugin; } else { - throw new IllegalStateException("plugin must be type of " + IDataxReader.class); + throw new IllegalStateException("plugin" + plugin.getClass().getName() + " must be type of " + IDataxReader.class); } pluginMeta.putExtraParams(SubFormFilter.PLUGIN_META_SUBFORM_DETAIL_ID_VALUE, subFormDetailId); subFormContext.store = store; diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IEndTypeGetter.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IEndTypeGetter.java index 1c065faee..02be6a54b 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IEndTypeGetter.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IEndTypeGetter.java @@ -61,7 +61,7 @@ enum EndType implements IEndType { , MariaDB("mariaDB", true) // , Postgres("pg", true), Oracle("oracle", true) // , ElasticSearch("es", true), MongoDB("mongoDB", true) // - , StarRocks("starRocks", true), Doris("doris", true) // + , StarRocks("starRocks", true), Doris("doris", true) , KingBase("kingbase", true) // , Clickhouse("clickhouse", true), Hudi("hudi", true) //, AliyunOSS("aliyunOSS") , TDFS("t-dfs", true) // , Cassandra("cassandra") //, HDFS("hdfs") diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IRepositoryTargetFile.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IRepositoryTargetFile.java index 934122bca..ab0c1d91d 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IRepositoryTargetFile.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/IRepositoryTargetFile.java @@ -20,6 +20,10 @@ import com.qlangtech.tis.extension.impl.XmlFile; + +import java.io.File; +import java.util.Objects; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2023-07-13 11:02 @@ -37,4 +41,9 @@ public interface IRepositoryTargetFile { * @return */ XmlFile getTargetFile(); + + default File getTargetFileParentDir() { + XmlFile targetFile = this.getTargetFile(); + return Objects.requireNonNull(targetFile, "targetFile can not be null").getFile().getParentFile(); + } } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SelectedTab.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SelectedTab.java index 1d3582204..6ba3f83bc 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SelectedTab.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SelectedTab.java @@ -513,10 +513,10 @@ public void afterVerified(IPluginStoreSave pluginStore) { } } - private static XmlFile getTmpTableStoreFile(IPluginStoreSave pluginStore, String tabName) { - XmlFile targetFile = pluginStore.getTargetFile(); + public static XmlFile getTmpTableStoreFile(IPluginStoreSave pluginStore, String tabName) { + // XmlFile targetFile = pluginStore.getTargetFile(); String pluginFileName = Descriptor.getPluginFileName(tabName); - File tabTmp = new File(targetFile.getFile().getParent() + File tabTmp = new File(pluginStore.getTargetFileParentDir() , ".tmp" + File.separator + "tabs" + File.separator + pluginFileName); XmlFile xml = new XmlFile(tabTmp, pluginFileName); return xml; diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.java new file mode 100644 index 000000000..b7ab6ae55 --- /dev/null +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.datax.common; + +import com.qlangtech.tis.datax.IDataxProcessor; +import com.qlangtech.tis.datax.SourceColMetaGetter; +import com.qlangtech.tis.datax.impl.DataxWriter; +import com.qlangtech.tis.extension.Describable; +import com.qlangtech.tis.extension.Descriptor; +import com.qlangtech.tis.plugin.IEndTypeGetter.EndType; +import com.qlangtech.tis.plugin.annotation.FormField; +import com.qlangtech.tis.plugin.annotation.FormFieldType; +import com.qlangtech.tis.plugin.annotation.Validator; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder.ColWrapper; +import com.qlangtech.tis.plugin.datax.common.impl.NoneCreateTable; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; +import org.apache.commons.collections.CollectionUtils; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * 自动创建目标表 + * + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-12-14 11:29 + **/ +//@TISExtensible +public abstract class AutoCreateTable implements Describable>, Serializable { + + @FormField(ordinal = 100, advance = true, type = FormFieldType.INPUTTEXT, validate = {Validator.identity}) + public String aliasPrefix; + + /** + * 取得映射表的前缀 + * + * @return + */ + public Optional getMapperTabPrefix() { + return Optional.ofNullable(aliasPrefix); + } + + public abstract AutoCreateTableColCommentSwitch getAddComment(); + + /** + * 过滤得到目标实例 + * + * @param descs + * @param endType + * @return + */ + public static List descFilter(List descs, String endType) { + if (CollectionUtils.isEmpty(descs)) { + return Collections.emptyList(); + } + EndType targetEndType = EndType.parse(endType); + return descs.stream().filter((desc) -> { + return desc.getEndType() == null || targetEndType == desc.getEndType(); + }).collect(Collectors.toList()); + } + + public static AutoCreateTable dft() { + return new NoneCreateTable(); + } + + public abstract CreateTableSqlBuilder createSQLDDLBuilder( + DataxWriter rdbmsWriter, SourceColMetaGetter sourceColMetaGetter + , IDataxProcessor.TableMap tableMapper, Optional transformers); + + /** + * 是否开启了 + * + * @return + */ + public abstract boolean enabled(); + + public boolean enabledColumnComment() { + return false; + } + + + @Override + public Descriptor> getDescriptor() { + Descriptor> desc = Describable.super.getDescriptor(); + + if (!BasicDescriptor.class.isAssignableFrom(desc.getClass())) { + throw new IllegalStateException("desc class:" + desc.getClass().getName() + + " must be child of " + BasicDescriptor.class.getName()); + } + + return desc; + } + + public static abstract class BasicDescriptor extends Descriptor { + + public abstract EndType getEndType(); + } +} diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTableColCommentSwitch.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTableColCommentSwitch.java new file mode 100644 index 000000000..3d96871a8 --- /dev/null +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/AutoCreateTableColCommentSwitch.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.datax.common; + +import com.qlangtech.tis.datax.IDataxProcessor.TableMap; +import com.qlangtech.tis.datax.SourceColMetaGetter; +import com.qlangtech.tis.extension.Describable; +import com.qlangtech.tis.extension.TISExtensible; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder.ColWrapper; +import com.qlangtech.tis.sql.parser.visitor.BlockScriptBuffer; + +import java.io.Serializable; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-12-18 09:18 + **/ +@TISExtensible +public abstract class AutoCreateTableColCommentSwitch implements Describable, Serializable { + + + public abstract boolean turnOn(); + + + /** + * 添加标准的列注释,当然小部分目标端例如oracle不支持这种方式添加列注释 + * clickhouse: + *

+     *     CREATE TABLE employees (
+     *     employee_id UInt32 COMMENT '员工的唯一标识符',
+     *     first_name String COMMENT '员工的名字',
+     *     last_name String COMMENT '员工的姓氏',
+     *     email String COMMENT '员工的电子邮件地址,用作公司内部通信' DEFAULT '',
+     *     phone_number String COMMENT '员工的联系电话' DEFAULT '',
+     *     hire_date Date COMMENT '员工加入公司的日期',
+     *     job_id String COMMENT '员工职位的标识符,对应于jobs表中的job_id',
+     *     salary Float64 COMMENT '员工每月的基本工资',
+     *     commission_pct Float32 COMMENT '基于销售额的提成比例',
+     *     manager_id Nullable(UInt32) COMMENT '直接上级经理的employee_id',
+     *     department_id Nullable(UInt32) COMMENT '员工所属部门的标识符'
+     * ) ENGINE = MergeTree()
+     * ORDER BY employee_id;
+     * 
+ *

+ * + * @param sourceColMetaGetter + * @param tableMapper + * @param colWrapper + * @param ddlScript + */ + public abstract void addStandardColComment( + SourceColMetaGetter sourceColMetaGetter, TableMap tableMapper, ColWrapper colWrapper, BlockScriptBuffer ddlScript); + + /** + * Oracle(create table DDL 中不能出现添加列注释的内容要分开): + *

+     *     -- 创建 employees 表
+     * CREATE TABLE employees (
+     *     employee_id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, -- 员工ID,自动生成
+     *     first_name  VARCHAR2(50) NOT NULL,                               -- 员工的名字
+     *     last_name   VARCHAR2(50) NOT NULL,                               -- 员工的姓氏
+     *     email       VARCHAR2(100) NOT NULL UNIQUE,                       -- 员工的电子邮件地址,唯一
+     *     phone_number VARCHAR2(20),                                       -- 员工的电话号码
+     *     hire_date   DATE NOT NULL,                                       -- 雇佣日期
+     *     job_id      VARCHAR2(10) NOT NULL,                               -- 工作ID,关联到job表
+     *     salary      NUMBER(8,2),                                         -- 薪水,允许两位小数
+     *     commission_pct NUMBER(4,2),                                      -- 提成百分比,允许两位小数
+     *     manager_id  NUMBER,                                              -- 经理ID,可能为空
+     *     department_id NUMBER                                             -- 部门ID,可能为空
+     * );
+     *
+     * -- 分别为每个字段添加注释
+     * COMMENT ON COLUMN employees.employee_id IS '员工的唯一标识符';
+     * COMMENT ON COLUMN employees.first_name IS '员工的名字';
+     * COMMENT ON COLUMN employees.last_name IS '员工的姓氏';
+     * COMMENT ON COLUMN employees.email IS '员工的电子邮件地址,用作公司内部通信';
+     * COMMENT ON COLUMN employees.phone_number IS '员工的联系电话';
+     * COMMENT ON COLUMN employees.hire_date IS '员工加入公司的日期';
+     * COMMENT ON COLUMN employees.job_id IS '员工职位的标识符,对应于jobs表中的job_id';
+     * COMMENT ON COLUMN employees.salary IS '员工每月的基本工资';
+     * COMMENT ON COLUMN employees.commission_pct IS '基于销售额的提成比例';
+     * COMMENT ON COLUMN employees.manager_id IS '直接上级经理的employee_id';
+     * COMMENT ON COLUMN employees.department_id IS '员工所属部门的标识符';
+     * 
+ * + * @param createTableSqlBuilder + * @param colMetaGetter + * @param tableMapper + * @param script + */ + public void addOracleLikeColComment(CreateTableSqlBuilder createTableSqlBuilder + , SourceColMetaGetter colMetaGetter, TableMap tableMapper, BlockScriptBuffer script) { + } + +} diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/AutoCreateTableColCommentSwitchOFF.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/AutoCreateTableColCommentSwitchOFF.java new file mode 100644 index 000000000..7778d731b --- /dev/null +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/AutoCreateTableColCommentSwitchOFF.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.datax.common.impl; + +import com.qlangtech.tis.datax.IDataxProcessor.TableMap; +import com.qlangtech.tis.datax.SourceColMetaGetter; +import com.qlangtech.tis.extension.Descriptor; +import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder.ColWrapper; +import com.qlangtech.tis.plugin.datax.common.AutoCreateTableColCommentSwitch; +import com.qlangtech.tis.sql.parser.visitor.BlockScriptBuffer; + +/** + * + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-12-18 09:24 + **/ +public class AutoCreateTableColCommentSwitchOFF extends AutoCreateTableColCommentSwitch { + @Override + public boolean turnOn() { + return false; + } + + @Override + public void addStandardColComment(SourceColMetaGetter sourceColMetaGetter, TableMap tableMapper, ColWrapper colWrapper, BlockScriptBuffer ddlScript) { + + } + + @TISExtension + public static final class DftDesc extends Descriptor { + public DftDesc() { + super(); + } + @Override + public String getDisplayName() { + return SWITCH_OFF; + } + } +} diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/NoneCreateTable.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/NoneCreateTable.java new file mode 100644 index 000000000..257271757 --- /dev/null +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/impl/NoneCreateTable.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.datax.common.impl; + +import com.qlangtech.tis.datax.IDataxProcessor.TableMap; +import com.qlangtech.tis.datax.SourceColMetaGetter; +import com.qlangtech.tis.datax.impl.DataxWriter; +import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.plugin.IEndTypeGetter.EndType; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder; +import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder.ColWrapper; +import com.qlangtech.tis.plugin.datax.common.AutoCreateTable; +import com.qlangtech.tis.plugin.datax.common.AutoCreateTableColCommentSwitch; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; + +import java.util.Optional; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-12-14 11:31 + **/ +public class NoneCreateTable extends AutoCreateTable { + @Override + public boolean enabled() { + return false; + } + + @Override + public AutoCreateTableColCommentSwitch getAddComment() { + return new AutoCreateTableColCommentSwitchOFF(); + } + + @Override + public CreateTableSqlBuilder createSQLDDLBuilder( + DataxWriter rdbmsWriter, SourceColMetaGetter sourceColMetaGetter + , TableMap tableMapper, Optional transformers) { + throw new UnsupportedOperationException(); + } + +// @Override +// public void addStandardColComment( +// SourceColMetaGetter sourceColMetaGetter, TableMap tableMapper, ColWrapper colWrapper, BlockScriptBuffer ddlScript) { +// +// } + + @TISExtension + public static final class DftDesc extends BasicDescriptor { + public DftDesc() { + super(); + } + + @Override + public String getDisplayName() { + return SWITCH_OFF; + } + + @Override + public EndType getEndType() { + return null; + } + } +} diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/RecordTransformerRules.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/RecordTransformerRules.java index 54d6d1d4d..e80e49770 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/RecordTransformerRules.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/RecordTransformerRules.java @@ -84,6 +84,25 @@ public static void main(String[] args) { System.out.println(String.join(",", test.keySet())); } + /** + * 目前单元测试中使用 + * + * @param udfs + * @return + */ + public static RecordTransformerRules create(UDFDefinition... udfs) { + RecordTransformerRules transformerRules = new RecordTransformerRules(); + List transformers = Lists.newArrayList(); + RecordTransformer transformer = null; + for (UDFDefinition udf : udfs) { + transformer = new RecordTransformer(); + transformer.setUdf(udf); + transformers.add(transformer); + } + transformerRules.rules = transformers; + return transformerRules; + } + public static void cleanPluginStoreCache(IPluginContext context, String appName) { // TIS.appSourcePluginStore.clear(createAppSourceKey(context, appName)); @@ -383,7 +402,7 @@ public OverwriteColsWithContextParams appendSourceContextParams(DataSourceMeta d , "inParamer:" + inParamer.getKey() + " relevant ContextParam can not be null"); contextParams.add(contextParam); rewriterResult.add(OutputParameter.create(inParamer.getKey(), false, contextParam.getDataType())); - // rewriterResult.add(IColMetaGetter.create(inParamer.getKey(), contextParam.getDataType())); + // rewriterResult.add(IColMetaGetter.create(inParamer.getKey(), contextParam.getDataType())); } } return new OverwriteColsWithContextParams(rewriterResult, contextParams); @@ -410,6 +429,8 @@ public List getContextParams() { * @return */ public TransformerOverwriteCols overwriteCols(List sourceCols) { + + TransformerOverwriteCols rewriterResult = new TransformerOverwriteCols(); Map col2IdxBuilder = Maps.newHashMap(); int idx = 0; diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/TargetColumn.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/TargetColumn.java index fb6de9a98..58fc77c33 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/TargetColumn.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/TargetColumn.java @@ -25,6 +25,8 @@ import com.qlangtech.tis.plugin.IPluginStore.AfterPluginSaved; import com.qlangtech.tis.plugin.IPluginStore.ManipuldateProcessor; import com.qlangtech.tis.plugin.IdentityName; +import com.qlangtech.tis.plugin.datax.transformer.impl.ExistTargetCoumn; +import com.qlangtech.tis.plugin.datax.transformer.impl.VirtualTargetColumn; import com.qlangtech.tis.util.IPluginContext; import java.io.Serializable; @@ -33,10 +35,31 @@ /** * @author: 百岁(baisui@qlangtech.com) * @create: 2024-06-09 13:43 + * @see com.qlangtech.tis.plugin.datax.transformer.impl.VirtualTargetColumn + * @see com.qlangtech.tis.plugin.datax.transformer.impl.ExistTargetCoumn + * @see **/ - public abstract class TargetColumn implements Describable, ManipuldateProcessor, PluginLiteriaDesc, IdentityName, Serializable { + /** + * 创建目标列 + * + * @param virtual + * @param colName + * @return + */ + public static TargetColumn create(boolean virtual, String colName) { + if (virtual) { + VirtualTargetColumn virtualTargetColumn = new VirtualTargetColumn(); + virtualTargetColumn.name = colName; + return virtualTargetColumn; + } else { + ExistTargetCoumn existTargetCoumn = new ExistTargetCoumn(); + existTargetCoumn.name = colName; + return existTargetCoumn; + } + } + /** * 是否是虚拟列(通过原表记录值计算之后新增加的列) * diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactory.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactory.java index 1ba067ed1..360658297 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactory.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactory.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -127,7 +128,7 @@ protected Class getExpectDesClass protected void validateConnection(String jdbcUrl, IConnProcessor p) throws TableNotFoundException { JDBCConnection conn = null; try { - conn = this.getConnection(jdbcUrl, true); // getConnection(jdbcUrl, true); + conn = this.getConnection(jdbcUrl, Optional.empty(), true); // getConnection(jdbcUrl, true); p.vist(conn); } catch (TableNotFoundException e) { throw e; @@ -177,7 +178,7 @@ public boolean isWrapperFor(Class iface) throws SQLException { * @param jdbcUrl * @return */ - public final JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + public final JDBCConnection getConnection(String jdbcUrl, Optional props, boolean verify) throws SQLException { JDBCConnectionPool connectionPool = JDBCConnection.connectionPool.get(); JDBCConnection conn = null; @@ -186,7 +187,7 @@ public final JDBCConnection getConnection(String jdbcUrl, boolean verify) throws if (conn == null) { return connectionPool.getConnection(jdbcUrl, verify, (url) -> { try { - return createConnection(jdbcUrl, verify); + return createConnection(jdbcUrl, props, verify); } catch (SQLException e) { throw new RuntimeException(e); } @@ -195,7 +196,7 @@ public final JDBCConnection getConnection(String jdbcUrl, boolean verify) throws return conn; } } else { - return createConnection(jdbcUrl, verify); + return createConnection(jdbcUrl, props, verify); } } @@ -207,12 +208,12 @@ public final JDBCConnection getConnection(String jdbcUrl, boolean verify) throws * @return * @throws SQLException */ - protected JDBCConnection createConnection(String jdbcUrl, boolean verify) throws SQLException { + protected JDBCConnection createConnection(String jdbcUrl, Optional props, boolean verify) throws SQLException { throw new UnsupportedOperationException("class:" + this.getClass().getName() + ",jdbcUrl:" + jdbcUrl); } - public JDBCConnection getConnection(String jdbcUrl, boolean usingPool, boolean verify) throws SQLException { - return this.getConnection(jdbcUrl, verify); + public JDBCConnection getConnection(String jdbcUrl, Optional props, boolean usingPool, boolean verify) throws SQLException { + return this.getConnection(jdbcUrl, props, verify); } // // 密码可以为空 @@ -250,7 +251,7 @@ protected List parseTableColMeta(boolean inSink, String jdbcUrl, } primaryKeys = getPrimaryKeys(table, metaData1); - columns1 = getColumnsMeta(table, metaData1); + columns1 = getColumnsMeta(table, conn, metaData1); Set pkCols = createAddedCols(table); while (primaryKeys.next()) { // $NON-NLS-1$ @@ -386,7 +387,7 @@ protected List parseTableColMeta(boolean inSink, EntityName tabl return ref.get(); } - protected ResultSet getColumnsMeta(EntityName table, DatabaseMetaData metaData1) throws SQLException { + protected ResultSet getColumnsMeta(EntityName table, JDBCConnection conn, DatabaseMetaData metaData1) throws SQLException { return metaData1.getColumns(null, getDbSchema(), table.getTableName(), null); } @@ -521,7 +522,7 @@ protected boolean validateDSFactory(IControlMsgHandler msgHandler, Context conte boolean[] validateResult = new boolean[1]; final Object actionContext = context.getContext(); dbConfig.vistDbURL(false, 5, (dbName, dbHost, jdbcUrl) -> { - try (JDBCConnection conn = dsFactory.getConnection(jdbcUrl, true)) { + try (JDBCConnection conn = dsFactory.getConnection(jdbcUrl, Optional.empty(), true)) { // 由于不在同一个线程内需要重新版定线程 context.setContext(actionContext); validateResult[0] = validateConnection(conn, dsFactory, msgHandler, context); @@ -569,11 +570,11 @@ public ColumnMetaData create(String colName, int index) throws SQLException { protected DataType getDataType(String colName) throws SQLException { // decimal 的小数位长度 - int decimalDigits = columns1.getInt(KEY_DECIMAL_DIGITS); //数据如果是INT类型,但如果是UNSIGNED,那实际类型需要转换成Long,INT UNSIGNED String typeName = columns1.getString(KEY_TYPE_NAME); - DataType colType = createColDataType(colName, typeName, columns1.getInt(KEY_DATA_TYPE), columns1.getInt(KEY_COLUMN_SIZE), decimalDigits); + DataType colType = createColDataType(colName, typeName + , columns1.getInt(KEY_DATA_TYPE), columns1.getInt(KEY_COLUMN_SIZE), decimalDigits); if (decimalDigits > 0) { colType.setDecimalDigits(decimalDigits); } @@ -585,6 +586,13 @@ protected DataType createColDataType( // 类似oracle驱动内部有一套独立的类型 oracle.jdbc.OracleTypes,有需要可以在具体的实现类里面去实现 DataType type = DataType.create(dbColType, typeName, colSize); type.setDecimalDigits(decimalDigits); + + if ((type.getJdbcType() == JDBCTypes.DECIMAL || type.getJdbcType() == JDBCTypes.NUMERIC) + && decimalDigits == 0) { + // 例如:Decimal(19,0) 直接视作为 BigInt + return DataType.create((type.getColumnSize() > 10 ? JDBCTypes.BIGINT : JDBCTypes.INTEGER).getType() + , type.typeName, type.getColumnSize()); + } return type; } } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/IInitWriterTableExecutor.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/IInitWriterTableExecutor.java index 03a4e025f..1f01be822 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/IInitWriterTableExecutor.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/IInitWriterTableExecutor.java @@ -18,6 +18,8 @@ package com.qlangtech.tis.plugin.ds; +import com.qlangtech.tis.plugin.datax.common.AutoCreateTable; + import java.util.List; /** @@ -25,8 +27,15 @@ * @create: 2022-09-19 11:51 **/ public interface IInitWriterTableExecutor { + /** + * 取得自动建表逻辑 * + * @return + */ + AutoCreateTable getAutoCreateTableCanNotBeNull(); + + /** * @param targetTabName * @param jdbcUrls * @throws Exception diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnection.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnection.java index 659400ec1..e64304340 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnection.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnection.java @@ -26,6 +26,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** @@ -63,7 +64,7 @@ public final boolean initializeSinkTab(String tabName, Runnable initProcessor) { } public JDBCConnection(Connection conn, String url) { - this.conn = conn; + this.conn = conn;// Objects.requireNonNull(, "conn can not be null"); this.url = url; } @@ -121,19 +122,23 @@ public PreparedStatement preparedStatement(String sql) throws SQLException { * * @param sql * @param resultProcess + * @return 是否找到任务记录 * @throws Exception */ - public void query(String sql, ResultProcess resultProcess) throws Exception { + public boolean query(String sql, ResultProcess resultProcess) throws Exception { synchronized (JDBCConnection.class) { try (Statement stmt = conn.createStatement()) { try { + boolean hasAnyRows = false; try (ResultSet result = stmt.executeQuery(sql)) { while (result.next()) { + hasAnyRows = true; if (!resultProcess.callback(result)) { - return; + return hasAnyRows; } } } + return hasAnyRows; } catch (Exception e) { throw new RuntimeException(sql, e); } diff --git a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnectionPool.java b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnectionPool.java index 6b817b633..dedba0625 100644 --- a/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnectionPool.java +++ b/tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/JDBCConnectionPool.java @@ -131,8 +131,8 @@ public Connection getConnection() { } @Override - public void query(String sql, ResultProcess resultProcess) throws Exception { - conn.query(sql, resultProcess); + public boolean query(String sql, ResultProcess resultProcess) throws Exception { + return conn.query(sql, resultProcess); } @Override diff --git a/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.json b/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.json new file mode 100644 index 000000000..4b08046d4 --- /dev/null +++ b/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/common/AutoCreateTable.json @@ -0,0 +1,7 @@ +{ + "aliasPrefix": { + "label": "别名前缀", + "placeholder": "ods_", + "help": "统一为目标表添加前缀,例如在构建分层数仓用于为ods层目标表统一添加“ods_erp_”前缀" + } +} \ No newline at end of file diff --git a/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/fill.svg b/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/fill.svg new file mode 100644 index 000000000..7908177b8 --- /dev/null +++ b/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/fill.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/outline.svg b/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/outline.svg new file mode 100644 index 000000000..11d5a3ed1 --- /dev/null +++ b/tis-plugin/src/main/resources/com/qlangtech/tis/plugin/endtype/icon/kingbase/outline.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/tis-web-config/config.properties b/tis-web-config/config.properties index 3409f403c..86dc35a6e 100644 --- a/tis-web-config/config.properties +++ b/tis-web-config/config.properties @@ -30,8 +30,8 @@ tis.datasource.dbname=tis_console #tis.datasource.dbname=tis_console_db -assemble.host=192.168.28.125 -tis.host=192.168.28.125 +assemble.host=192.168.28.133 +tis.host=192.168.28.133