Skip to content

Commit

Permalink
feat: print table local address configure (DataLinkDC#2416)
Browse files Browse the repository at this point in the history
* feat: print table local address configure

* fix

* Spotless Apply

* fix

* fix

* fix

* Spotless Apply

* fix: import

---------

Co-authored-by: leechor <leechor@users.noreply.github.com>
  • Loading branch information
leechor and leechor authored Oct 22, 2023
1 parent f5048de commit aba7120
Show file tree
Hide file tree
Showing 16 changed files with 160 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
import org.dinky.context.SseSessionContextHolder;
import org.dinky.data.enums.SseTopic;
import org.dinky.data.vo.PrintTableVo;
import org.dinky.explainer.printTable.PrintStatementExplainer;
import org.dinky.explainer.print_table.PrintStatementExplainer;
import org.dinky.parser.SqlType;
import org.dinky.service.PrintTableService;
import org.dinky.trans.Operations;
import org.dinky.utils.SqlUtil;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -85,13 +87,9 @@ public static String getFullTableName(String table) {
}

Matcher matcher = FULL_TABLE_NAME_PATTERN.matcher(table);
String result = "";
if (matcher.matches()) {
result = matcher.replaceAll("`$1`.`$2`.`print_$3`");
} else {
result = String.format("`default_catalog`.`default_database`.`print_%s`", table);
}
return result;
return matcher.matches()
? matcher.replaceAll("`$1`.`$2`.`print_$3`")
: String.format("`default_catalog`.`default_database`.`print_%s`", table);
}

public static class PrintTableListener {
Expand All @@ -114,10 +112,16 @@ public void start() {
}

private static DatagramSocket getDatagramSocket(int port) {
InetAddress host = null;
try {
return new DatagramSocket(port);
} catch (SocketException e) {
log.error("PrintTableListener:DatagramSocket init failed, port {}: {}", PORT, e.getMessage());
host = InetAddress.getLocalHost();
return new DatagramSocket(port, host);
} catch (SocketException | UnknownHostException e) {
log.error(
"PrintTableListener:DatagramSocket init failed, host: {}, port {}: {}",
host == null ? null : host.getHostAddress(),
PORT,
e.getMessage());
}
return null;
}
Expand All @@ -140,9 +144,5 @@ public void run() {
}
}
}

public ExecutorService getExecutor() {
return executor;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,7 @@ public class CustomerConfigureOptions {

public static final ConfigOption<String> REST_FORMAT_TYPE =
key("rest.formatType").stringType().defaultValue("DEFAULT").withDescription("for savepoint format type");

public static final ConfigOption<String> DINKY_HOST =
key("dinky.dinkyHost").stringType().noDefaultValue().withDescription("dinky local address");
}
9 changes: 7 additions & 2 deletions dinky-core/src/main/java/org/dinky/explainer/Explainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.dinky.data.result.SqlExplainResult;
import org.dinky.executor.CustomTableEnvironment;
import org.dinky.executor.Executor;
import org.dinky.explainer.printTable.PrintStatementExplainer;
import org.dinky.explainer.print_table.PrintStatementExplainer;
import org.dinky.function.data.model.UDF;
import org.dinky.function.util.UDFUtil;
import org.dinky.gateway.enums.GatewayType;
Expand All @@ -53,6 +53,7 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -139,9 +140,13 @@ public JobParam pretreatStatements(String[] statements) {
} else if (operationType.equals(SqlType.PRINT)) {
PrintStatementExplainer printStatementExplainer = new PrintStatementExplainer(statement);

Map<String, String> config = this.executor.getExecutorConfig().getConfig();
String host = config.getOrDefault("dinky.dinkyHost", "127.0.0.1");
int port = Integer.parseInt(config.getOrDefault("dinky.dinkyPrintPort", "7125"));
String[] tableNames = printStatementExplainer.getTableNames();
for (String tableName : tableNames) {
trans.add(new StatementParam(PrintStatementExplainer.getCreateStatement(tableName), SqlType.CTAS));
trans.add(new StatementParam(
PrintStatementExplainer.getCreateStatement(tableName, host, port), SqlType.CTAS));
}
} else {
UDF udf = UDFUtil.toUDF(statement);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.explainer.printTable;
package org.dinky.explainer.print_table;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand All @@ -26,6 +26,11 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.base.Strings;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PrintStatementExplainer {

public static final String PATTERN_STR = "PRINT (.+)";
Expand All @@ -34,7 +39,7 @@ public class PrintStatementExplainer {
public static final String CREATE_SQL_TEMPLATE = "CREATE TABLE print_{0} WITH (''connector'' = ''printnet'', "
+ "''port''=''{2,number,#}'', ''hostName'' = ''{1}'')\n"
+ "AS SELECT * FROM {0}";
public static final int PORT = 7125;
public static final int DEFAULT_PORT = 7125;

private final String statement;

Expand All @@ -48,23 +53,31 @@ public String[] getTableNames() {

public static String[] splitTableNames(String statement) {
Matcher matcher = PATTERN.matcher(statement);
matcher.find();
String tableNames = matcher.group(1);
return tableNames.replaceAll(" ", "").split(",");
if (matcher.find()) {
String tableNames = matcher.group(1);
return tableNames.replace(" ", "").split(",");
}
throw new IllegalArgumentException("Invalid print statement: " + statement);
}

public static String getCreateStatement(String tableName) {
Optional<InetAddress> address = getSystemLocalIp();
String ip = address.isPresent() ? address.get().getHostAddress() : "127.0.0.1";
return MessageFormat.format(CREATE_SQL_TEMPLATE, tableName, ip, PORT);
return getCreateStatement(tableName, null, null);
}

public static String getCreateStatement(String tableName, String localIp, Integer localPort) {
String ip = Strings.isNullOrEmpty(localIp)
? getSystemLocalIp().map(InetAddress::getHostAddress).orElse("127.0.0.1")
: localIp;
int port = localPort == null ? DEFAULT_PORT : localPort;
return MessageFormat.format(CREATE_SQL_TEMPLATE, tableName, ip, port);
}

private static Optional<InetAddress> getSystemLocalIp() {
try {
InetAddress ip = InetAddress.getLocalHost();
return Optional.of(ip);
return Optional.of(InetAddress.getLocalHost());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
log.error("get local ip failed: {}", e.getMessage());
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

package org.dinky.explainer.printTable;
package org.dinky.explainer.print_table;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import java.util.HashSet;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PrintNetDynamicTableSinkFactory implements DynamicTableSinkFactory {
public static final String IDENTIFIER = "printnet";

Expand All @@ -54,19 +57,17 @@ public class PrintNetDynamicTableSinkFactory implements DynamicTableSinkFactory
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

ObjectIdentifier objectIdentifier = context.getObjectIdentifier();

final ReadableConfig options = helper.getOptions();

ObjectIdentifier objectIdentifier = context.getObjectIdentifier();
FactoryUtil.validateFactoryOptions(this, options);

EncodingFormat<SerializationSchema<RowData>> serializingFormat = null;

try {
// TODO: 2023/3/17 maybe not right
// keep no serialization schema for changelog mode now, you can implement it by yourself
serializingFormat = helper.discoverEncodingFormat(SerializationFormatFactory.class, FactoryUtil.FORMAT);
} catch (Exception ignored) {
log.debug("Could not create serialization format for '{}'.", objectIdentifier, ignored);
}

return new PrintNetDynamicTableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;

Expand All @@ -32,33 +31,32 @@
import java.net.InetAddress;
import java.net.UnknownHostException;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PrintNetSinkFunction extends RichSinkFunction<RowData> {
private final String hostname;
private final int port;
private final SerializationSchema<RowData> serializer;
private DynamicTableSink.DataStructureConverter converter;
private String printIdentifier;
private byte[] printHeader;
private volatile boolean running = true;
private DatagramSocket socket;
private final InetAddress target;
private final DynamicTableSink.DataStructureConverter converter;
private final byte[] printHeader;
private transient DatagramSocket socket;
private final InetAddress targetAddress;

public PrintNetSinkFunction(
String hostname,
int port,
SerializationSchema<RowData> serializer,
DynamicTableSink.DataStructureConverter converter,
String printIdentifier) {
this.hostname = hostname;
this.port = port;
this.serializer = serializer;
this.converter = converter;
this.printIdentifier = printIdentifier;
printHeader = (printIdentifier + "\n").getBytes();

try {
this.target = InetAddress.getByName(hostname);
this.targetAddress = InetAddress.getByName(hostname);
} catch (UnknownHostException e) {
log.error("Unknown host: {}", hostname);
throw new RuntimeException(e);
}
}
Expand All @@ -70,13 +68,12 @@ public void open(Configuration parameters) throws Exception {
serializer.open(null);
}

StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
// StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
socket = new DatagramSocket();
}

@Override
public void invoke(RowData value, Context context) throws IOException {

try {
byte[] buf = serializer != null
? serializer.serialize(value)
Expand All @@ -86,9 +83,10 @@ public void invoke(RowData value, Context context) throws IOException {
System.arraycopy(printHeader, 0, target, 0, printHeader.length);
System.arraycopy(buf, 0, target, printHeader.length, buf.length);

DatagramPacket packet = new DatagramPacket(target, target.length, this.target, port);
DatagramPacket packet = new DatagramPacket(target, target.length, this.targetAddress, port);
socket.send(packet);
} catch (Exception e) {
log.error("Failed to send packet: {}", e.getMessage());
throw new RuntimeException(e);
}
}
Expand Down
56 changes: 27 additions & 29 deletions dinky-web/src/models/Sse.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,35 @@
*
*/

import {useEffect, useRef, useState} from 'react'
import {postAll} from "@/services/api";
import { postAll } from '@/services/api';
import { useEffect, useRef, useState } from 'react';

export type SseData = {
topic: string,
data: any
}
topic: string;
data: any;
};
export type SubscriberData = {
topic: string[],
call: (data: SseData) => void
}
topic: string[];
call: (data: SseData) => void;
};

export default () => {

const uuidRef = useRef<string>(crypto.randomUUID());
const subscriberRef = useRef<SubscriberData[]>([]);
const [eventSource, setEventSource] = useState<EventSource>();

const subscribe = async () => {
const topics: string[] = []
subscriberRef.current.forEach(sub => topics.push(...sub.topic))
const para = {sessionKey: uuidRef.current, topics: topics}
await postAll('api/sse/subscribeTopic', para)
}
const topics: string[] = [];
subscriberRef.current.forEach((sub) => topics.push(...sub.topic));
const para = { sessionKey: uuidRef.current, topics: topics };
await postAll('api/sse/subscribeTopic', para);
};

const reconnectSse = () => {
const sseUrl = '/api/sse/connect?sessionKey=' + uuidRef.current;
eventSource?.close();
setEventSource(new EventSource(sseUrl))
}
setEventSource(new EventSource(sseUrl));
};

useEffect(() => {
reconnectSse();
Expand All @@ -58,26 +57,25 @@ export default () => {
eventSource.onmessage = (e) => {
const data: SseData = JSON.parse(e.data);
subscriberRef.current
.filter(sub => sub.topic.includes(data.topic))
.forEach(sub => sub.call(data));
}
.filter((sub) => sub.topic.includes(data.topic))
.forEach((sub) => sub.call(data));
};
}
}, [eventSource]);

const subscribeTopic = (topic: string[], onMessage: (data: SseData) => void) => {
const sub:SubscriberData = {topic: topic, call: onMessage}
subscriberRef.current = [...subscriberRef.current,sub];
subscribe()
const sub: SubscriberData = { topic: topic, call: onMessage };
subscriberRef.current = [...subscriberRef.current, sub];
subscribe();
return () => {
//组件卸载回调方法,取消订阅此topic
subscriberRef.current = subscriberRef.current.filter(item => item !== sub);
subscribe()
}
}
subscriberRef.current = subscriberRef.current.filter((item) => item !== sub);
subscribe();
};
};

return {
subscribeTopic,
reconnectSse
}

}
};
};
Loading

0 comments on commit aba7120

Please sign in to comment.