Skip to content

Commit

Permalink
add kingbase source connector
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jan 27, 2025
1 parent 4a966e3 commit 1e2ad85
Show file tree
Hide file tree
Showing 28 changed files with 549 additions and 170 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void reportStatus(com.qlangtech.tis.grpc.UpdateCounterMap updateCounter,
tableMultiDataIndexStatus.setIncrProcessPaused(updateCounterFromClient.getIncrProcessPaused());
// tableMultiDataIndexStatus.setTis30sAvgRT(updateCounterFromClient.getTis30sAvgRT());
tableMultiDataIndexStatus.setTis30sAvgRT(updateCounterFromClient.getTis30SAvgRT());
for (Map.Entry<String, Long> tabUpdate : entry.getValue().getTableConsumeDataMap().entrySet()) {
for (Map.Entry<String, Long> tabUpdate : updateCounterFromClient.getTableConsumeDataMap().entrySet()) {
tableMultiDataIndexStatus.put(tabUpdate.getKey(), new ConsumeDataKeeper(tabUpdate.getValue(), updateTime));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import com.qlangtech.tis.plugin.ds.DataTypeMeta;
import com.qlangtech.tis.plugin.ds.DataTypeMeta.IMultiItemsView;
import com.qlangtech.tis.plugin.ds.DefaultTab;
import com.qlangtech.tis.plugin.ds.IInitWriterTableExecutor;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.IdlistElementCreatorFactory;
import com.qlangtech.tis.plugin.trigger.JobTrigger;
Expand Down Expand Up @@ -1335,15 +1336,21 @@ public void doPreviewTableRows(Context context) {
@Func(value = PermissionConstant.DATAX_MANAGE, sideEffect = false)
public void doGetTableMapper(Context context) {
String dataxName = this.getString(PARAM_KEY_DATAX_NAME);
boolean forceInit = this.getBoolean("forceInit");
KeyedPluginStore<DataxReader> readerStore = DataxReader.getPluginStore(this, dataxName);
DataxReader dataxReader = readerStore.getPlugin();
Objects.requireNonNull(dataxReader, "dataReader:" + dataxName + " relevant instance can not be null");

TableAlias tableAlias;
Optional<DataxProcessor> dataXAppSource = IAppSource.loadNullable(this, dataxName);
TableAliasMapper tabMaps = null;//Collections.emptyMap();
Optional<String> mapperTabPrefix = Optional.empty();
if (dataXAppSource.isPresent()) {
DataxProcessor dataxSource = dataXAppSource.get();
IDataxWriter dataXWriter = dataxSource.getWriter(this, true);
if (dataXWriter instanceof IInitWriterTableExecutor) {
mapperTabPrefix = ((IInitWriterTableExecutor) dataXWriter).getAutoCreateTableCanNotBeNull().getMapperTabPrefix();
}
tabMaps = dataxSource.getTabAlias(this);
}
if (tabMaps == null) {
Expand All @@ -1356,8 +1363,12 @@ public void doGetTableMapper(Context context) {
List<TableAlias> tmapList = Lists.newArrayList();
for (ISelectedTab selectedTab : dataxReader.getSelectedTabs()) {
tableAlias = tabMaps.get(selectedTab);
if (tableAlias == null) {
tmapList.add(new TableAlias(selectedTab.getName()));
if (forceInit || tableAlias == null) {
tableAlias = new TableAlias(selectedTab.getName());
if (mapperTabPrefix.isPresent()) {
tableAlias.setTo(mapperTabPrefix.get() + selectedTab.getName());
}
tmapList.add(tableAlias);
} else {
tmapList.add(tableAlias);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,7 @@ public static DatasourceDb createDatabase(BasicModule module, Descriptor.ParseDe
criteria.createCriteria().andNameEqualTo(dbName);
int exist = module.getWorkflowDAOFacade().getDatasourceDbDAO().countByExample(criteria);
if (exist > 0) {

module.addErrorMessage(context, "已经有了同名(" + dbName + ")的数据库");
return null;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@
import com.alibaba.fastjson.JSONObject;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.assemble.ExecResult;
import com.qlangtech.tis.assemble.FullbuildPhase;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.cloud.ITISCoordinator;
import com.qlangtech.tis.coredefine.module.action.CoreAction;
import com.qlangtech.tis.coredefine.module.action.ExtendWorkFlowBuildHistory;
import com.qlangtech.tis.coredefine.module.action.TISK8sDelegate;
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.fullbuild.phasestatus.PhaseStatusCollection;
import com.qlangtech.tis.job.common.JobCommon;
import com.qlangtech.tis.manage.spring.ZooKeeperGetter;
import com.qlangtech.tis.plugin.PluginStore;
import com.qlangtech.tis.pubhook.common.RunEnvironment;
import com.qlangtech.tis.realtime.yarn.rpc.JobType;
import com.qlangtech.tis.realtime.yarn.rpc.TopicInfo;
import com.qlangtech.tis.rpc.grpc.log.LogCollectorClient;
import com.qlangtech.tis.rpc.grpc.log.stream.PExecuteState;
import com.qlangtech.tis.rpc.grpc.log.stream.PMonotorTarget;
Expand All @@ -56,6 +63,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -193,7 +201,7 @@ public void onWebSocketText(String message) {

private StreamObserver<PMonotorTarget> getMonitorSet() {
if (pMonotorObserver == null) {
// StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get();
// StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get();

pMonotorObserver = getStatusRpc().registerMonitorEvent(this);
}
Expand Down Expand Up @@ -301,7 +309,7 @@ private void addMonitor(MonotorTarget monitorTarget) throws Exception {
} else if (monitorTarget.testLogType(LogType.BuildPhraseMetrics)) {
executorService.execute(() -> {
try {
// StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get();
// StatusRpcClientFactory.AssembleSvcCompsite feedback = getStatusRpc().get();
final Iterator<PPhaseStatusCollection> statIt = getStatusRpc().buildPhraseStatus(taskid);
boolean serverSideBreak = true;
while (isConnected() && statIt.hasNext()) {
Expand All @@ -319,24 +327,24 @@ private void addMonitor(MonotorTarget monitorTarget) throws Exception {
throw new RuntimeException("taskid:" + taskid, e);
}
});
//} else if (monitorTarget.testLogType(LogType.MQ_TAGS_STATUS)) {
// PluginStore<MQListenerFactory> mqListenerFactory = TIS.getPluginStore(this.collectionName, MQListenerFactory.class);
// MQListenerFactory plugin = mqListenerFactory.getPlugin();
// // 增量节点处理
// final Map<String, TopicTagStatus> /* this.tag */
// transferTagStatus = new HashMap<>();
// final Map<String, TopicTagStatus> /* this.tag */
// binlogTopicTagStatus = new HashMap<>();
// List<TopicTagIncrStatus.FocusTags> focusTags = getFocusTags(zkGetter.getInstance(), collectionName);
// // 如果size为0,则说明远程工作节点没有正常执行
// if (focusTags.size() > 0) {
// TopicTagIncrStatus topicTagIncrStatus = new TopicTagIncrStatus(focusTags);
// executorService.execute(() -> {
// IncrTagHeatBeatMonitor incrTagHeatBeatMonitor = new IncrTagHeatBeatMonitor(this.collectionName, this
// , transferTagStatus, binlogTopicTagStatus, topicTagIncrStatus, plugin.createConsumerStatus(), zkGetter);
// incrTagHeatBeatMonitor.build();
// });
// }
} else if (monitorTarget.testLogType(LogType.MQ_TAGS_STATUS)) {
PluginStore<MQListenerFactory> mqListenerFactory = (PluginStore<MQListenerFactory>) TIS.getPluginStore(this.collectionName, MQListenerFactory.class);
MQListenerFactory plugin = mqListenerFactory.getPlugin();
// 增量节点处理
final Map<String, TopicTagStatus> /* this.tag */
transferTagStatus = new HashMap<>();
final Map<String, TopicTagStatus> /* this.tag */
binlogTopicTagStatus = new HashMap<>();
List<TopicTagIncrStatus.FocusTags> focusTags = getFocusTags(zkGetter.getInstance(), collectionName);
// 如果size为0,则说明远程工作节点没有正常执行
if (focusTags.size() > 0) {
TopicTagIncrStatus topicTagIncrStatus = new TopicTagIncrStatus(focusTags);
executorService.execute(() -> {
IncrTagHeatBeatMonitor incrTagHeatBeatMonitor = new IncrTagHeatBeatMonitor(this.collectionName, this
, transferTagStatus, binlogTopicTagStatus, topicTagIncrStatus, plugin.createConsumerStatus(), zkGetter);
incrTagHeatBeatMonitor.build();
});
}
} else {
throw new IllegalStateException("monitor type:" + monitorTarget + " is illegal");
}
Expand Down Expand Up @@ -483,18 +491,18 @@ private String getParameter(String key, List<String> dft) {
}
}

// public static List<TopicTagIncrStatus.FocusTags> getFocusTags(ITISCoordinator zookeeper, String collectionName) throws MalformedURLException {
// //
// JobType.RemoteCallResult<TopicInfo> topicInfo = JobType.ACTION_getTopicTags.assembIncrControlWithResult(
// CoreAction.getAssembleNodeAddress(zookeeper),
// collectionName, Collections.emptyList(), TopicInfo.class);
// if (topicInfo.biz.getTopicWithTags().size() < 1) {
// // 返回为空的话可以证明没有正常启动
// return Collections.emptyList();
// }
// TopicInfo topicTags = topicInfo.biz;
// return topicTags.getTopicWithTags().entrySet().stream().map((entry) -> new TopicTagIncrStatus.FocusTags(entry.getKey(), entry.getValue())).collect(Collectors.toList());
// }
public static List<TopicTagIncrStatus.FocusTags> getFocusTags(ITISCoordinator zookeeper, String collectionName) throws MalformedURLException {
//
JobType.RemoteCallResult<TopicInfo> topicInfo = JobType.ACTION_getTopicTags.assembIncrControlWithResult(
CoreAction.getAssembleNodeAddress(zookeeper),
collectionName, Collections.emptyList(), TopicInfo.class);
if (topicInfo.biz.getTopicWithTags().size() < 1) {
// 返回为空的话可以证明没有正常启动
return Collections.emptyList();
}
TopicInfo topicTags = topicInfo.biz;
return topicTags.getTopicWithTags().entrySet().stream().map((entry) -> new TopicTagIncrStatus.FocusTags(entry.getKey(), entry.getValue())).collect(Collectors.toList());
}

static class TagCountMap extends HashMap<String, /* tag */
Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public enum JobType {
// //
// QueryIndexJobRunningStatus(2, "QueryIncrStatus"),
// // incr process tags的状态
Collection_TopicTags_status(3, "collection_topic_tags_status");
// // 取得增量监听的tags
// ACTION_getTopicTags(4, "get_topic_tags");
Collection_TopicTags_status(3, "collection_topic_tags_status"),
// 取得增量监听的tags
ACTION_getTopicTags(4, "get_topic_tags");

public int getValue() {
return value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface IDataxReader extends DataSourceMeta, IDataXPluginMeta
, IStreamTableMeataCreator.ISourceStreamMetaCreator, IRepositoryResourceScannable, IReaderSource {

default SourceColMetaGetter createSourceColMetaGetter() {
return new SourceColMetaGetter(this);
return new SourceColMetaGetter(this, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SourceColMetaGetter {
private final Map<TableMap, Map<String, ColumnMetaData>> tab2ColsMapper = Maps.newHashMap();

public static SourceColMetaGetter getNone() {
SourceColMetaGetter colMetaGetter = new SourceColMetaGetter(null) {
SourceColMetaGetter colMetaGetter = new SourceColMetaGetter(null, false) {
@Override
public ColumnMetaData getColMeta(TableMap tableMapper, String colName) {
return null;
Expand All @@ -46,8 +46,11 @@ public ColumnMetaData getColMeta(TableMap tableMapper, String colName) {
return colMetaGetter;
}

public SourceColMetaGetter(IDataxReader dataXReader) {
this.dataXReader = Objects.requireNonNull(dataXReader, "dataXReader");
public SourceColMetaGetter(IDataxReader dataXReader, boolean validateNull) {
if (validateNull) {
Objects.requireNonNull(dataXReader, "dataXReader");
}
this.dataXReader = dataXReader;
}

protected Map<String, ColumnMetaData> getColMetaDataMap(IDataxReader dataXReader, TableMap tableMapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package com.qlangtech.tis.extension.impl;

import com.qlangtech.tis.plugin.IdentityName;

import java.lang.reflect.Field;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @author 百岁 (baisui@qlangtech.com)
Expand All @@ -32,7 +35,7 @@ public enum EnumFieldMode {
if (!isValOfOptionList(dftVal, fieldDesc)) {
throw new IllegalStateException(fieldDesc + " val " + dftVal.getClass() + " " + "must be " + "type" + " " + "of " + List.class.getName());
}
return dftVal;
return ((List) dftVal).stream().map((val) -> (val instanceof IdentityName) ? ((IdentityName) val).identityValue() : val).collect(Collectors.toList());
}) //
, DEFAULT("default", (dftVal, fieldDesc) -> {
if (isValOfOptionList(dftVal, fieldDesc)) {
Expand Down Expand Up @@ -84,10 +87,15 @@ private static boolean isValOfOptionList(Object val, String fieldDesc) {
Class valClass = val.getClass();
if (List.class.isAssignableFrom(valClass)) {
for (Object o : ((List) val)) {
if (!(o.getClass() == String.class)) {

if (!((o.getClass() == String.class) || (IdentityName.class.isAssignableFrom(o.getClass())))) {
throw new IllegalStateException(fieldDesc + ",opt" + " " + "element " + o.getClass() + " " +
"must be type of " + String.class);
"must be type of " + String.class.getSimpleName() + " or " + IdentityName.class.getSimpleName());
}
// if (!(o.getClass() == String.class)) {
// throw new IllegalStateException(fieldDesc + ",opt" + " " + "element " + o.getClass() + " " +
// "must be type of " + String.class);
// }
}
return true;
}
Expand All @@ -97,7 +105,6 @@ private static boolean isValOfOptionList(Object val, String fieldDesc) {
private static Object getFirstVal(Object val) {
for (Object vv : (List<?>) val) {
return String.valueOf(vv);

}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static SuFormGetterContext setSuFormGetterContext(Describable plugin, IPl
if (plugin instanceof IDataxReader) {
subFormContext.plugin = (IDataxReader) plugin;
} else {
throw new IllegalStateException("plugin must be type of " + IDataxReader.class);
throw new IllegalStateException("plugin" + plugin.getClass().getName() + " must be type of " + IDataxReader.class);
}
pluginMeta.putExtraParams(SubFormFilter.PLUGIN_META_SUBFORM_DETAIL_ID_VALUE, subFormDetailId);
subFormContext.store = store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ enum EndType implements IEndType {
, MariaDB("mariaDB", true) //
, Postgres("pg", true), Oracle("oracle", true) //
, ElasticSearch("es", true), MongoDB("mongoDB", true) //
, StarRocks("starRocks", true), Doris("doris", true) //
, StarRocks("starRocks", true), Doris("doris", true) , KingBase("kingbase", true) //
, Clickhouse("clickhouse", true), Hudi("hudi", true) //, AliyunOSS("aliyunOSS")
, TDFS("t-dfs", true) //
, Cassandra("cassandra") //, HDFS("hdfs")
Expand Down
Loading

0 comments on commit 1e2ad85

Please sign in to comment.