diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/PrintTableServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/PrintTableServiceImpl.java index d97da6b2a6..6593df102d 100644 --- a/dinky-admin/src/main/java/org/dinky/service/impl/PrintTableServiceImpl.java +++ b/dinky-admin/src/main/java/org/dinky/service/impl/PrintTableServiceImpl.java @@ -22,7 +22,7 @@ import org.dinky.context.SseSessionContextHolder; import org.dinky.data.enums.SseTopic; import org.dinky.data.vo.PrintTableVo; -import org.dinky.explainer.printTable.PrintStatementExplainer; +import org.dinky.explainer.print_table.PrintStatementExplainer; import org.dinky.parser.SqlType; import org.dinky.service.PrintTableService; import org.dinky.trans.Operations; @@ -30,7 +30,9 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.InetAddress; import java.net.SocketException; +import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; @@ -85,13 +87,9 @@ public static String getFullTableName(String table) { } Matcher matcher = FULL_TABLE_NAME_PATTERN.matcher(table); - String result = ""; - if (matcher.matches()) { - result = matcher.replaceAll("`$1`.`$2`.`print_$3`"); - } else { - result = String.format("`default_catalog`.`default_database`.`print_%s`", table); - } - return result; + return matcher.matches() + ? matcher.replaceAll("`$1`.`$2`.`print_$3`") + : String.format("`default_catalog`.`default_database`.`print_%s`", table); } public static class PrintTableListener { @@ -114,10 +112,16 @@ public void start() { } private static DatagramSocket getDatagramSocket(int port) { + InetAddress host = null; try { - return new DatagramSocket(port); - } catch (SocketException e) { - log.error("PrintTableListener:DatagramSocket init failed, port {}: {}", PORT, e.getMessage()); + host = InetAddress.getLocalHost(); + return new DatagramSocket(port, host); + } catch (SocketException | UnknownHostException e) { + log.error( + "PrintTableListener:DatagramSocket init failed, host: {}, port {}: {}", + host == null ? null : host.getHostAddress(), + PORT, + e.getMessage()); } return null; } @@ -140,9 +144,5 @@ public void run() { } } } - - public ExecutorService getExecutor() { - return executor; - } } } diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java index 8848283975..2617c3ee1b 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/CustomerConfigureOptions.java @@ -39,4 +39,7 @@ public class CustomerConfigureOptions { public static final ConfigOption REST_FORMAT_TYPE = key("rest.formatType").stringType().defaultValue("DEFAULT").withDescription("for savepoint format type"); + + public static final ConfigOption DINKY_HOST = + key("dinky.dinkyHost").stringType().noDefaultValue().withDescription("dinky local address"); } diff --git a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java index 333bd335e1..3e92249088 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/Explainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/Explainer.java @@ -29,7 +29,7 @@ import org.dinky.data.result.SqlExplainResult; import org.dinky.executor.CustomTableEnvironment; import org.dinky.executor.Executor; -import org.dinky.explainer.printTable.PrintStatementExplainer; +import org.dinky.explainer.print_table.PrintStatementExplainer; import org.dinky.function.data.model.UDF; import org.dinky.function.util.UDFUtil; import org.dinky.gateway.enums.GatewayType; @@ -53,6 +53,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; @@ -139,9 +140,13 @@ public JobParam pretreatStatements(String[] statements) { } else if (operationType.equals(SqlType.PRINT)) { PrintStatementExplainer printStatementExplainer = new PrintStatementExplainer(statement); + Map config = this.executor.getExecutorConfig().getConfig(); + String host = config.getOrDefault("dinky.dinkyHost", "127.0.0.1"); + int port = Integer.parseInt(config.getOrDefault("dinky.dinkyPrintPort", "7125")); String[] tableNames = printStatementExplainer.getTableNames(); for (String tableName : tableNames) { - trans.add(new StatementParam(PrintStatementExplainer.getCreateStatement(tableName), SqlType.CTAS)); + trans.add(new StatementParam( + PrintStatementExplainer.getCreateStatement(tableName, host, port), SqlType.CTAS)); } } else { UDF udf = UDFUtil.toUDF(statement); diff --git a/dinky-core/src/main/java/org/dinky/explainer/printTable/PrintStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/print_table/PrintStatementExplainer.java similarity index 67% rename from dinky-core/src/main/java/org/dinky/explainer/printTable/PrintStatementExplainer.java rename to dinky-core/src/main/java/org/dinky/explainer/print_table/PrintStatementExplainer.java index 7141894c14..cd1e2c28b5 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/printTable/PrintStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/print_table/PrintStatementExplainer.java @@ -17,7 +17,7 @@ * */ -package org.dinky.explainer.printTable; +package org.dinky.explainer.print_table; import java.net.InetAddress; import java.net.UnknownHostException; @@ -26,6 +26,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Strings; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class PrintStatementExplainer { public static final String PATTERN_STR = "PRINT (.+)"; @@ -34,7 +39,7 @@ public class PrintStatementExplainer { public static final String CREATE_SQL_TEMPLATE = "CREATE TABLE print_{0} WITH (''connector'' = ''printnet'', " + "''port''=''{2,number,#}'', ''hostName'' = ''{1}'')\n" + "AS SELECT * FROM {0}"; - public static final int PORT = 7125; + public static final int DEFAULT_PORT = 7125; private final String statement; @@ -48,23 +53,31 @@ public String[] getTableNames() { public static String[] splitTableNames(String statement) { Matcher matcher = PATTERN.matcher(statement); - matcher.find(); - String tableNames = matcher.group(1); - return tableNames.replaceAll(" ", "").split(","); + if (matcher.find()) { + String tableNames = matcher.group(1); + return tableNames.replace(" ", "").split(","); + } + throw new IllegalArgumentException("Invalid print statement: " + statement); } public static String getCreateStatement(String tableName) { - Optional address = getSystemLocalIp(); - String ip = address.isPresent() ? address.get().getHostAddress() : "127.0.0.1"; - return MessageFormat.format(CREATE_SQL_TEMPLATE, tableName, ip, PORT); + return getCreateStatement(tableName, null, null); + } + + public static String getCreateStatement(String tableName, String localIp, Integer localPort) { + String ip = Strings.isNullOrEmpty(localIp) + ? getSystemLocalIp().map(InetAddress::getHostAddress).orElse("127.0.0.1") + : localIp; + int port = localPort == null ? DEFAULT_PORT : localPort; + return MessageFormat.format(CREATE_SQL_TEMPLATE, tableName, ip, port); } private static Optional getSystemLocalIp() { try { - InetAddress ip = InetAddress.getLocalHost(); - return Optional.of(ip); + return Optional.of(InetAddress.getLocalHost()); } catch (UnknownHostException e) { - throw new RuntimeException(e); + log.error("get local ip failed: {}", e.getMessage()); + return Optional.empty(); } } } diff --git a/dinky-core/src/test/java/org/dinky/explainer/printTable/PrintStatementExplainerTest.java b/dinky-core/src/test/java/org/dinky/explainer/print_table/PrintStatementExplainerTest.java similarity index 97% rename from dinky-core/src/test/java/org/dinky/explainer/printTable/PrintStatementExplainerTest.java rename to dinky-core/src/test/java/org/dinky/explainer/print_table/PrintStatementExplainerTest.java index a302cda339..31dba3c60e 100644 --- a/dinky-core/src/test/java/org/dinky/explainer/printTable/PrintStatementExplainerTest.java +++ b/dinky-core/src/test/java/org/dinky/explainer/print_table/PrintStatementExplainerTest.java @@ -17,7 +17,7 @@ * */ -package org.dinky.explainer.printTable; +package org.dinky.explainer.print_table; import static org.junit.jupiter.api.Assertions.assertArrayEquals; diff --git a/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetDynamicTableSinkFactory.java b/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetDynamicTableSinkFactory.java index 22efbcc133..b3a81e3364 100644 --- a/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetDynamicTableSinkFactory.java +++ b/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetDynamicTableSinkFactory.java @@ -37,6 +37,9 @@ import java.util.HashSet; import java.util.Set; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class PrintNetDynamicTableSinkFactory implements DynamicTableSinkFactory { public static final String IDENTIFIER = "printnet"; @@ -54,19 +57,17 @@ public class PrintNetDynamicTableSinkFactory implements DynamicTableSinkFactory @Override public DynamicTableSink createDynamicTableSink(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - - ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); - final ReadableConfig options = helper.getOptions(); + ObjectIdentifier objectIdentifier = context.getObjectIdentifier(); FactoryUtil.validateFactoryOptions(this, options); - EncodingFormat> serializingFormat = null; try { - // TODO: 2023/3/17 maybe not right + // keep no serialization schema for changelog mode now, you can implement it by yourself serializingFormat = helper.discoverEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT); } catch (Exception ignored) { + log.debug("Could not create serialization format for '{}'.", objectIdentifier, ignored); } return new PrintNetDynamicTableSink( diff --git a/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetSinkFunction.java b/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetSinkFunction.java index 9a855cf466..54ed5bbfd4 100644 --- a/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetSinkFunction.java +++ b/dinky-executor/src/main/java/org/dinky/connector/printnet/sink/PrintNetSinkFunction.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; @@ -32,16 +31,16 @@ import java.net.InetAddress; import java.net.UnknownHostException; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class PrintNetSinkFunction extends RichSinkFunction { - private final String hostname; private final int port; private final SerializationSchema serializer; - private DynamicTableSink.DataStructureConverter converter; - private String printIdentifier; - private byte[] printHeader; - private volatile boolean running = true; - private DatagramSocket socket; - private final InetAddress target; + private final DynamicTableSink.DataStructureConverter converter; + private final byte[] printHeader; + private transient DatagramSocket socket; + private final InetAddress targetAddress; public PrintNetSinkFunction( String hostname, @@ -49,16 +48,15 @@ public PrintNetSinkFunction( SerializationSchema serializer, DynamicTableSink.DataStructureConverter converter, String printIdentifier) { - this.hostname = hostname; this.port = port; this.serializer = serializer; this.converter = converter; - this.printIdentifier = printIdentifier; printHeader = (printIdentifier + "\n").getBytes(); try { - this.target = InetAddress.getByName(hostname); + this.targetAddress = InetAddress.getByName(hostname); } catch (UnknownHostException e) { + log.error("Unknown host: {}", hostname); throw new RuntimeException(e); } } @@ -70,13 +68,12 @@ public void open(Configuration parameters) throws Exception { serializer.open(null); } - StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + // StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); socket = new DatagramSocket(); } @Override public void invoke(RowData value, Context context) throws IOException { - try { byte[] buf = serializer != null ? serializer.serialize(value) @@ -86,9 +83,10 @@ public void invoke(RowData value, Context context) throws IOException { System.arraycopy(printHeader, 0, target, 0, printHeader.length); System.arraycopy(buf, 0, target, printHeader.length, buf.length); - DatagramPacket packet = new DatagramPacket(target, target.length, this.target, port); + DatagramPacket packet = new DatagramPacket(target, target.length, this.targetAddress, port); socket.send(packet); } catch (Exception e) { + log.error("Failed to send packet: {}", e.getMessage()); throw new RuntimeException(e); } } diff --git a/dinky-web/src/models/Sse.tsx b/dinky-web/src/models/Sse.tsx index 2d31254285..fc1bc592ae 100644 --- a/dinky-web/src/models/Sse.tsx +++ b/dinky-web/src/models/Sse.tsx @@ -17,36 +17,35 @@ * */ -import {useEffect, useRef, useState} from 'react' -import {postAll} from "@/services/api"; +import { postAll } from '@/services/api'; +import { useEffect, useRef, useState } from 'react'; export type SseData = { - topic: string, - data: any -} + topic: string; + data: any; +}; export type SubscriberData = { - topic: string[], - call: (data: SseData) => void -} + topic: string[]; + call: (data: SseData) => void; +}; export default () => { - const uuidRef = useRef(crypto.randomUUID()); const subscriberRef = useRef([]); const [eventSource, setEventSource] = useState(); const subscribe = async () => { - const topics: string[] = [] - subscriberRef.current.forEach(sub => topics.push(...sub.topic)) - const para = {sessionKey: uuidRef.current, topics: topics} - await postAll('api/sse/subscribeTopic', para) - } + const topics: string[] = []; + subscriberRef.current.forEach((sub) => topics.push(...sub.topic)); + const para = { sessionKey: uuidRef.current, topics: topics }; + await postAll('api/sse/subscribeTopic', para); + }; const reconnectSse = () => { const sseUrl = '/api/sse/connect?sessionKey=' + uuidRef.current; eventSource?.close(); - setEventSource(new EventSource(sseUrl)) - } + setEventSource(new EventSource(sseUrl)); + }; useEffect(() => { reconnectSse(); @@ -58,26 +57,25 @@ export default () => { eventSource.onmessage = (e) => { const data: SseData = JSON.parse(e.data); subscriberRef.current - .filter(sub => sub.topic.includes(data.topic)) - .forEach(sub => sub.call(data)); - } + .filter((sub) => sub.topic.includes(data.topic)) + .forEach((sub) => sub.call(data)); + }; } }, [eventSource]); const subscribeTopic = (topic: string[], onMessage: (data: SseData) => void) => { - const sub:SubscriberData = {topic: topic, call: onMessage} - subscriberRef.current = [...subscriberRef.current,sub]; - subscribe() + const sub: SubscriberData = { topic: topic, call: onMessage }; + subscriberRef.current = [...subscriberRef.current, sub]; + subscribe(); return () => { //组件卸载回调方法,取消订阅此topic - subscriberRef.current = subscriberRef.current.filter(item => item !== sub); - subscribe() - } - } + subscriberRef.current = subscriberRef.current.filter((item) => item !== sub); + subscribe(); + }; + }; return { subscribeTopic, reconnectSse - } - -} + }; +}; diff --git a/dinky-web/src/pages/DataStudio/BottomContainer/TableData/index.tsx b/dinky-web/src/pages/DataStudio/BottomContainer/TableData/index.tsx index b1e3684975..ae3844a34d 100644 --- a/dinky-web/src/pages/DataStudio/BottomContainer/TableData/index.tsx +++ b/dinky-web/src/pages/DataStudio/BottomContainer/TableData/index.tsx @@ -1,15 +1,15 @@ +import { SseData } from '@/models/Sse'; import { getCurrentData } from '@/pages/DataStudio/function'; import { StateType } from '@/pages/DataStudio/model'; +import { SSE_TOPIC } from '@/pages/DevOps/constants'; import { getData, postAll } from '@/services/api'; import { l } from '@/utils/intl'; -import {connect, useModel} from '@@/exports'; +import { connect, useModel } from '@@/exports'; import { Modal, Select, Tabs } from 'antd'; import TextArea from 'antd/es/input/TextArea'; import { Tab } from 'rc-tabs/lib/interface.d'; import * as React from 'react'; import { useEffect, useState } from 'react'; -import {SSE_TOPIC} from "@/pages/DevOps/constants"; -import {SseData} from "@/models/Sse"; export async function getPrintTables(statement: string) { return postAll('api/printTable/getPrintTables', { statement }); @@ -20,21 +20,23 @@ export function clearConsole() { return getData('api/process/clearConsole', {}); } export type PrintTable = { - tableName:string, - fullTableName:string -} + tableName: string; + fullTableName: string; +}; const DataPage = (props: any) => { const { style, title } = props; const [consoleInfo, setConsoleInfo] = useState(''); - const {subscribeTopic} = useModel('Sse',(model:any)=>({subscribeTopic:model.subscribeTopic})) + const { subscribeTopic } = useModel('Sse', (model: any) => ({ + subscribeTopic: model.subscribeTopic + })); const [tableName, setTableName] = useState(''); useEffect(() => { - if (title){ + if (title) { setTableName(title.tableName); - const topic = `${SSE_TOPIC.PRINT_TABLE}/${title.fullTableName}` - return subscribeTopic([topic],(data:SseData)=>{ + const topic = `${SSE_TOPIC.PRINT_TABLE}/${title.fullTableName}`; + return subscribeTopic([topic], (data: SseData) => { setConsoleInfo((preConsoleInfo) => preConsoleInfo + '\n' + data.data); }); } @@ -63,17 +65,17 @@ const TableData = (props: any) => { const result = await getPrintTables(statement); const tables: PrintTable[] = result.datas; - let selectTable:PrintTable; + let selectTable: PrintTable; Modal.confirm({ title: l('pages.datastudio.print.table.inputTableName'), content: (