diff --git a/pom.xml b/pom.xml
index c20f40d..4b7cd2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
top.meethigher
tcp-reverse-proxy
- 1.0.5
+ 1.0.6
8
diff --git a/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java b/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java
index bdde133..de0878f 100644
--- a/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java
+++ b/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java
@@ -289,12 +289,12 @@ protected Object getRouteMetadata(Route route, String key) {
* @param value 数据值
*/
protected void setContextData(RoutingContext ctx, String key, Object value) {
- ctx.put(key, value == null ? "" : value);
+ ctx.put(key, value == null ? "null" : value);
}
protected Object getContextData(RoutingContext ctx, String key) {
Object metadata = ctx.get(key);
- return metadata == null ? "" : metadata;
+ return metadata == null ? "null" : metadata;
}
protected HttpServerResponse setStatusCode(RoutingContext ctx, HttpServerResponse resp, int code) {
@@ -630,14 +630,14 @@ protected Handler> connectHandler(RoutingContext
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, true);
// 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
- setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, clientReq.connection().localAddress().toString());
- setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, clientReq.connection().remoteAddress().toString());
- log.debug("{} --> {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
-
+ HttpConnection connection = clientReq.connection();
+ setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, connection.localAddress().toString());
+ setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, connection.remoteAddress().toString());
+ log.debug("target {} -- {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
- clientReq.connection().closeHandler(v -> {
+ connection.closeHandler(v -> {
setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false);
- log.debug("{} --> {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
+ log.debug("target {} -- {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR));
});
@@ -645,12 +645,9 @@ protected Handler> connectHandler(RoutingContext
copyRequestHeaders(ctx, serverReq, clientReq);
if ((boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN) && (boolean) getContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN)) {
- // 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存
- if (clientReq.headers().contains("Content-Length") || clientReq.headers().contains("Transfer-Encoding")) {
- clientReq.send(serverReq).onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl));
- } else {
- clientReq.send().onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl));
- }
+ // bug: https://github.com/meethigher/tcp-reverse-proxy/issues/13
+ // 解决办法: 不管是否有请求体,都直接send pipeto。
+ clientReq.send(serverReq).onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl));
}
} else {
badGateway(ctx, serverResp);
@@ -679,6 +676,15 @@ protected Handler routingContextHandler(HttpClient httpClient) {
// 暂停流读取
ctx.request().pause();
+ HttpConnection connection = ctx.request().connection();
+ setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, connection.remoteAddress().toString());
+ setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, connection.localAddress().toString());
+ // 记录请求开始时间
+ setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
+ // 记录连接状态
+ setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true);
+ log.debug("source {} -- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
+
// vertx的uri()是包含query参数的。而path()才是我们常说的不带有query的uri
// route不是线程安全的。route里的metadata应以路由为单元存储,而不是以请求为单元存储。一个路由会有很多请求。
// 若想要以请求为单元存储数据,应该使用routingContext.put
@@ -687,11 +693,6 @@ protected Handler routingContextHandler(HttpClient httpClient) {
setContextData(ctx, key, ctx.currentRoute().getMetadata(key));
}
- // 记录请求开始时间
- setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis());
- // 记录连接状态
- setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true);
-
// 获取代理地址
String proxyUrl = getProxyUrl(ctx, ctx.request(), ctx.response());
setContextData(ctx, INTERNAL_PROXY_URL, proxyUrl);
@@ -707,15 +708,9 @@ protected Handler routingContextHandler(HttpClient httpClient) {
requestOptions.setMethod(ctx.request().method());
requestOptions.setFollowRedirects(getContextData(ctx, P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(getContextData(ctx, P_FOLLOW_REDIRECTS).toString()));
- // 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭
- setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, ctx.request().connection().remoteAddress().toString());
- setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, ctx.request().connection().localAddress().toString());
-
- log.debug("{} <-- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
-
- ctx.request().connection().closeHandler(v -> {
+ connection.closeHandler(v -> {
setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, false);
- log.debug("{} <-- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
+ log.debug("source {} -- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR));
});
// 如果跨域由代理服务接管,那么针对跨域使用的OPTIONS预检请求,就由代理服务接管,而不经过实际的后端服务
diff --git a/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java b/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java
index 7662988..911b203 100644
--- a/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java
+++ b/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java
@@ -48,47 +48,41 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient,
sourceSocket.pause();
SocketAddress sourceRemote = sourceSocket.remoteAddress();
SocketAddress sourceLocal = sourceSocket.localAddress();
- log.debug("{} <-- {} connected", sourceLocal, sourceRemote);
- sourceSocket.closeHandler(v -> log.debug("{} <-- {} closed", sourceLocal, sourceRemote));
- netClient.connect(targetPort, targetHost).onComplete(ar -> {
- if (ar.succeeded()) {
- NetSocket targetSocket = ar.result();
- targetSocket.pause();
- SocketAddress targetRemote = targetSocket.remoteAddress();
- SocketAddress targetLocal = targetSocket.localAddress();
- log.debug("{} --> {} connected", targetLocal, targetRemote);
- // feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
- // 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
- targetSocket.closeHandler(v -> log.debug("{} --> {} closed", targetLocal, targetRemote));
- sourceSocket.pipeTo(targetSocket).onComplete(ar1 -> {
- if (ar1.succeeded()) {
- log.debug("pipeTo successful. {} --> {} --> {} --> {}",
- sourceRemote, sourceLocal, targetLocal, targetRemote);
- } else {
- log.error("pipeTo failed. {} --> {} --> {} --> {}",
- sourceRemote, sourceLocal, targetLocal, targetRemote,
- ar1.cause());
- }
+ log.debug("source {} -- {} connected", sourceLocal, sourceRemote);
+ sourceSocket.closeHandler(v -> log.debug("source {} -- {} closed", sourceLocal, sourceRemote));
+ netClient.connect(targetPort, targetHost)
+ .onFailure(e -> {
+ log.error("failed to connect to {}:{}", targetHost, targetPort, e);
+ // 若连接目标服务失败,需要断开源头服务
+ sourceSocket.close();
+ })
+ .onSuccess(targetSocket -> {
+ targetSocket.pause();
+ SocketAddress targetRemote = targetSocket.remoteAddress();
+ SocketAddress targetLocal = targetSocket.localAddress();
+ log.debug("target {} -- {} connected", targetLocal, targetRemote);
+
+ // feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
+ // 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
+ targetSocket.closeHandler(v -> log.debug("target {} -- {} closed", targetLocal, targetRemote));
+
+ // https://github.com/meethigher/tcp-reverse-proxy/issues/12
+ // 将日志记录详细,便于排查问题
+ sourceSocket.pipeTo(targetSocket)
+ .onSuccess(v -> log.debug("source {} -- {} pipe to target {} -- {} succeeded",
+ sourceLocal, sourceRemote, targetLocal, targetRemote))
+ .onFailure(e -> log.error("source {} -- {} pipe to target {} -- {} failed",
+ sourceLocal, sourceRemote, targetLocal, targetRemote, e));
+ targetSocket.pipeTo(sourceSocket)
+ .onSuccess(v -> log.debug("target {} -- {} pipe to source {} -- {} succeeded",
+ targetLocal, targetRemote, sourceLocal, sourceRemote))
+ .onFailure(e -> log.error("target {} -- {} pipe to source {} -- {} failed",
+ targetLocal, targetRemote, sourceLocal, sourceRemote, e));
+ log.debug("source {} -- {} bound to target {} -- {}",
+ sourceLocal, sourceRemote, targetLocal, targetRemote);
+ sourceSocket.resume();
+ targetSocket.resume();
});
- targetSocket.pipeTo(sourceSocket).onComplete(ar1 -> {
- if (ar1.succeeded()) {
- log.debug("pipeTo successful. {} <-- {} <-- {} <-- {}",
- sourceRemote, sourceLocal, targetLocal, targetRemote);
- } else {
- log.error("pipeTo failed. {} <-- {} <-- {} <-- {}",
- sourceRemote, sourceLocal, targetLocal, targetRemote,
- ar1.cause());
- }
- });
- sourceSocket.resume();
- targetSocket.resume();
-
- } else {
- log.error("failed to connect to {}:{}", targetHost, targetPort, ar.cause());
- // 若连接目标服务失败,需要断开源头服务
- sourceSocket.close();
- }
- });
};
}
diff --git a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java
index 891ddc9..67a94a1 100644
--- a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java
+++ b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java
@@ -214,6 +214,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
if (buf.length() < 8) {
return;
}
+ // note: 前8个字节是tunnel通信使用的。
if (buf.getByte(0) == Tunnel.DATA_CONN_FLAG[0]
&& buf.getByte(1) == Tunnel.DATA_CONN_FLAG[1]
&& buf.getByte(2) == Tunnel.DATA_CONN_FLAG[2]
@@ -232,30 +233,46 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
})
.onSuccess(backendSocket -> {
backendSocket.pause();
+ // 若实际数据传输的长度大于8字节,那么后面的字节需要发出去。
+ // https://github.com/meethigher/tcp-reverse-proxy/issues/9
+ if (buf.length() > 8) {
+ backendSocket.write(buf.getBuffer(8, buf.length()))
+ .onSuccess(o -> log.debug("{}: sessionId {}, data connection {} -- {} write to backend connection {} -- {} succeeded",
+ dataProxyName,
+ sessionId,
+ dataSocket.remoteAddress(), dataSocket.localAddress(),
+ backendSocket.remoteAddress(), backendSocket.localAddress()));
+ }
log.debug("{}: sessionId {}, backend connection {} -- {} established", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
// 双向生命周期绑定、双向数据转发
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
- dataSocket.closeHandler(v -> {
- log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress());
- }).pipeTo(backendSocket).onFailure(e -> {
- log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
- dataProxyName,
- sessionId,
- dataSocket.remoteAddress(), dataSocket.localAddress(),
- backendSocket.remoteAddress(), backendSocket.localAddress(),
- e);
- });
- backendSocket.closeHandler(v -> {
- log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
- }).pipeTo(dataSocket).onFailure(e -> {
- log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
- dataProxyName,
- sessionId,
- backendSocket.remoteAddress(), backendSocket.localAddress(),
- dataSocket.remoteAddress(), dataSocket.localAddress(),
- e);
- });
+ dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress()))
+ .pipeTo(backendSocket)
+ .onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
+ dataProxyName,
+ sessionId,
+ dataSocket.remoteAddress(), dataSocket.localAddress(),
+ backendSocket.remoteAddress(), backendSocket.localAddress(),
+ e))
+ .onSuccess(v -> log.debug("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} succeeded",
+ dataProxyName,
+ sessionId,
+ dataSocket.remoteAddress(), dataSocket.localAddress(),
+ backendSocket.remoteAddress(), backendSocket.localAddress()));
+ backendSocket.closeHandler(v -> log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress()))
+ .pipeTo(dataSocket)
+ .onFailure(e -> log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
+ dataProxyName,
+ sessionId,
+ backendSocket.remoteAddress(), backendSocket.localAddress(),
+ dataSocket.remoteAddress(), dataSocket.localAddress(),
+ e))
+ .onSuccess(v -> log.debug("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} succeeded",
+ dataProxyName,
+ sessionId,
+ backendSocket.remoteAddress(), backendSocket.localAddress(),
+ dataSocket.remoteAddress(), dataSocket.localAddress()));
backendSocket.resume();
dataSocket.resume();
log.debug("{}: sessionId {}, data connection {} -- {} bound to backend connection {} -- {} for session id {}",
diff --git a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java
index f48a46b..459ac2d 100644
--- a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java
+++ b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java
@@ -94,7 +94,7 @@ protected void handleConnect(NetSocket socket) {
socket.pause();
socket.handler(decode(socket));
socket.closeHandler(v -> {
- log.debug("closed {} -- {}", socket.remoteAddress(), socket.localAddress());
+ log.debug("{} -- {} closed", socket.remoteAddress(), socket.localAddress());
DataProxyServer removed = authedSockets.remove(socket);
if (removed != null) {
removed.stop();
@@ -301,25 +301,29 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
// 双向生命周期绑定、双向数据转发
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
- userSocket.closeHandler(v -> {
- log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress());
- }).pipeTo(dataSocket).onFailure(e -> {
- log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed, connection will be closed",
- name,
- sessionId,
- userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress(), e);
- });
- dataSocket.closeHandler(v -> {
- log.debug("{}: sessionId {}, data connection {} -- {} closed",
- name,
- sessionId,
- dataSocket.remoteAddress(), dataSocket.localAddress());
- }).pipeTo(userSocket).onFailure(e -> {
- log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed, connection will be closed",
- name,
- sessionId,
- dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e);
- });
+ userSocket.closeHandler(v -> log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress()))
+ .pipeTo(dataSocket)
+ .onFailure(e -> log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed",
+ name,
+ sessionId,
+ userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress(), e))
+ .onSuccess(v -> log.debug("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} succeeded",
+ name,
+ sessionId,
+ userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress()));
+ dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed",
+ name,
+ sessionId,
+ dataSocket.remoteAddress(), dataSocket.localAddress()))
+ .pipeTo(userSocket)
+ .onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed",
+ name,
+ sessionId,
+ dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e))
+ .onSuccess(v -> log.debug("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} succeeded",
+ name,
+ sessionId,
+ dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress()));
log.debug("{}: sessionId {}, data connection {} -- {} bound to user connection {} -- {} for session id {}",
name,
sessionId,
@@ -331,7 +335,11 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
.appendBytes(DATA_CONN_FLAG)
.appendInt(sessionId)).onSuccess(v -> {
// 将用户连接中的缓存数据发出。
- userConn.buffers.forEach(dataSocket::write);
+ userConn.buffers.forEach(b -> dataSocket.write(b)
+ .onSuccess(o -> log.debug("{}: sessionId {}, user connection {} -- {} write to data connection {} -- {} succeeded",
+ name,
+ sessionId,
+ userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress())));
});
}
@@ -410,7 +418,7 @@ public boolean stopSync() {
*/
protected void addMessageHandler() {
// 监听连接成功事件
- this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} connected", netSocket.remoteAddress()));
+ this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} -- {} connected", netSocket.remoteAddress(), netSocket.localAddress()));
// 监听心跳事件
this.on(TunnelMessageType.HEARTBEAT, new AbstractTunnelHandler() {
diff --git a/src/test/java/top/meethigher/proxy/Pipe.java b/src/test/java/top/meethigher/proxy/Pipe.java
new file mode 100644
index 0000000..be104d6
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/Pipe.java
@@ -0,0 +1,202 @@
+package top.meethigher.proxy;
+
+import io.vertx.core.*;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.NetSocket;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 在io.vertx.core.streams.impl.PipeImpl
+ */
+public class Pipe implements io.vertx.core.streams.Pipe {
+
+ private static final Logger log = LoggerFactory.getLogger(Pipe.class);
+ private final Promise result;
+ private final ReadStream src;
+ private boolean endOnSuccess = true;
+ private boolean endOnFailure = true;
+ private WriteStream dst;
+
+ public Pipe(ReadStream src) {
+ this.src = src;
+ this.result = Promise.promise();
+ if (src instanceof NetSocket) {
+ NetSocket tsrc = (NetSocket) src;
+ tsrc.remoteAddress();
+ tsrc.localAddress();
+ } else if (src instanceof HttpServerRequest) {
+ HttpServerRequest req = (HttpServerRequest) src;
+ req.connection().remoteAddress();
+ req.connection().localAddress();
+ } else if (src instanceof HttpClientResponse) {
+ HttpClientResponse resp = (HttpClientResponse) src;
+ resp.request().connection().remoteAddress();
+ resp.request().connection().localAddress();
+ }
+
+ // Set handlers now
+ src.endHandler(result::tryComplete);
+ src.exceptionHandler(result::tryFail);
+ }
+
+ @Override
+ public synchronized io.vertx.core.streams.Pipe endOnFailure(boolean end) {
+ endOnFailure = end;
+ return this;
+ }
+
+ @Override
+ public synchronized io.vertx.core.streams.Pipe endOnSuccess(boolean end) {
+ endOnSuccess = end;
+ return this;
+ }
+
+ @Override
+ public synchronized io.vertx.core.streams.Pipe endOnComplete(boolean end) {
+ endOnSuccess = end;
+ endOnFailure = end;
+ return this;
+ }
+
+ private void handleWriteResult(AsyncResult ack) {
+ if (ack.failed()) {
+ result.tryFail(new WriteException(ack.cause()));
+ }
+ }
+
+ @Override
+ public void to(WriteStream ws, Handler> completionHandler) {
+ if (ws == null) {
+ throw new NullPointerException();
+ }
+ boolean endOnSuccess;
+ boolean endOnFailure;
+ synchronized (Pipe.this) {
+ if (dst != null) {
+ throw new IllegalStateException();
+ }
+ dst = ws;
+ endOnSuccess = this.endOnSuccess;
+ endOnFailure = this.endOnFailure;
+ }
+ Handler drainHandler = v -> src.resume();
+ src.handler(item -> {
+ if (src instanceof NetSocket) {
+ NetSocket tsrc = (NetSocket) src;
+ log.trace("{} -- {} received:\n{}", tsrc.remoteAddress(), tsrc.localAddress(),
+ item.toString());
+ } else if (src instanceof HttpServerRequest) {
+ HttpServerRequest req = (HttpServerRequest) src;
+ log.trace("{} -- {} received:\n{}",
+ req.connection().remoteAddress(),
+ req.connection().localAddress(),
+ item.toString());
+ } else if (src instanceof HttpClientResponse) {
+ HttpClientResponse resp = (HttpClientResponse) src;
+ log.trace("{} -- {} received:\n{}",
+ resp.request().connection().remoteAddress(),
+ resp.request().connection().localAddress(),
+ item.toString());
+ }
+ ws.write(item, this::handleWriteResult);
+ if (ws.writeQueueFull()) {
+ src.pause();
+ ws.drainHandler(drainHandler);
+ }
+ });
+ src.resume();
+ result.future().onComplete(ar -> {
+ try {
+ src.handler(null);
+ } catch (Exception ignore) {
+ }
+ try {
+ src.exceptionHandler(null);
+ } catch (Exception ignore) {
+ }
+ try {
+ src.endHandler(null);
+ } catch (Exception ignore) {
+ }
+ if (ar.succeeded()) {
+ handleSuccess(completionHandler);
+ } else {
+ Throwable err = ar.cause();
+ if (err instanceof WriteException) {
+ src.resume();
+ err = err.getCause();
+ }
+ handleFailure(err, completionHandler);
+ }
+ });
+ }
+
+ private void handleSuccess(Handler> completionHandler) {
+ if (endOnSuccess) {
+ dst.end(completionHandler);
+ } else {
+ completionHandler.handle(Future.succeededFuture());
+ }
+ }
+
+ private void handleFailure(Throwable cause, Handler> completionHandler) {
+ Future res = Future.failedFuture(cause);
+ if (endOnFailure) {
+ dst.end(ignore -> {
+ completionHandler.handle(res);
+ });
+ } else {
+ completionHandler.handle(res);
+ }
+ }
+
+ public void close() {
+ synchronized (this) {
+ src.exceptionHandler(null);
+ src.handler(null);
+ if (dst != null) {
+ dst.drainHandler(null);
+ dst.exceptionHandler(null);
+ }
+ }
+ VertxException err = new VertxException("Pipe closed", true);
+ if (result.tryFail(err)) {
+ src.resume();
+ }
+ }
+
+ private static class WriteException extends VertxException {
+ private WriteException(Throwable cause) {
+ super(cause, true);
+ }
+ }
+
+ /**
+ * 转换为八进制字符串
+ * Wireshark默认抓包时,使用的就是八进制字符串
+ */
+ private static String toOctString(byte[] buf) {
+ StringBuilder sb = new StringBuilder();
+ for (byte value : buf) {
+ int b = value & 0xFF;// 转为无符号字节
+ sb.append(String.format("%03o ", b));// ddd格式。每个d表示一个0-7的数字
+ }
+ return sb.toString();
+ }
+
+ /**
+ * 转换为16进制字符串
+ */
+ private static String toHexString(byte[] buf) {
+ StringBuilder sb = new StringBuilder();
+ for (byte value : buf) {
+ int b = value & 0xFF;//转为无符号字节
+ sb.append(String.format("%02x ", b));// 两位十六进制,不足补0
+ }
+ return sb.toString();
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/http/Issue13Test.java b/src/test/java/top/meethigher/proxy/http/Issue13Test.java
new file mode 100644
index 0000000..7af747e
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/http/Issue13Test.java
@@ -0,0 +1,109 @@
+package top.meethigher.proxy.http;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.http.*;
+import io.vertx.ext.web.Router;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.LockSupport;
+
+public class Issue13Test {
+
+ private static final Vertx vertx = Vertx.vertx();
+ private static final Logger log = LoggerFactory.getLogger(Issue13Test.class);
+
+ @Test
+ public void name() {
+
+ HttpServerOptions httpServerOptions = new HttpServerOptions()
+ // 服务端支持与客户端进行协商,支持通过alpn用于协商客户端和服务端使用http1.1还是http2
+ // 开启h2c,使其支持http2,默认情况下http2只在开启了tls使用。如果不开启tls还想使用http2,那么需要开启h2c
+ // alpn基于tls,若未开启tls,则不支持alpn
+ .setAlpnVersions(Collections.unmodifiableList(Arrays.asList(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2)))
+ .setUseAlpn(true)
+ .setHttp2ClearTextEnabled(true);
+ HttpServer server = vertx.createHttpServer(httpServerOptions);
+
+ HttpClientOptions httpClientOptions = new HttpClientOptions()
+ // 设置客户端默认使用的HTTP协议版本是http1.1,并且开启alpn支持协商http1.1和http2
+ // alpn基于tls,若对方没有开启tls,则不支持alpn
+ .setProtocolVersion(HttpVersion.HTTP_1_1)
+ .setUseAlpn(true)
+ .setAlpnVersions(new ArrayList() {{
+ add(HttpVersion.HTTP_1_1);
+ add(HttpVersion.HTTP_2);
+ }});
+ HttpClient client = vertx.createHttpClient(httpClientOptions);
+
+ ReverseHttpProxy.create(Router.router(vertx), server, client)
+ .port(18088)
+ .addRoute(new ProxyRoute()
+ .setName("proxy")
+ .setTargetUrl("http://127.0.0.1:18080")
+ .setSourceUrl("/*"))
+ .start();
+
+
+ RequestOptions requestOptions = new RequestOptions()
+ .setAbsoluteURI("http://127.0.0.1:18088/test")
+ .setMethod(HttpMethod.POST);
+
+ String body = "halo wode";
+
+ // 发送http1.1的content-length请求体
+ vertx.setTimer(4000, id -> {
+ HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_1_1));
+ httpClient.request(requestOptions).onSuccess(req -> {
+ req.send(body).onSuccess(resp -> {
+ resp.bodyHandler(buf -> {
+ log.info("{} content-length received:\n{}", resp.version(), buf.toString());
+ });
+ });
+ });
+ });
+ // 发送http1.1的transfer-encoding:chunked请求体
+ vertx.setTimer(5000, id -> {
+ HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_1_1));
+ httpClient.request(requestOptions).onSuccess(req -> {
+ req.setChunked(true);
+ req.send(body).onSuccess(resp -> {
+ resp.bodyHandler(buf -> {
+ log.info("{} transfer-encoding received:\n{}", resp.version(), buf.toString());
+ });
+ });
+ });
+ });
+
+ // 发送http2的content-length请求体
+ vertx.setTimer(6000, id -> {
+ HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2));
+ httpClient.request(requestOptions).onSuccess(req -> {
+ req.send(body).onSuccess(resp -> {
+ resp.bodyHandler(buf -> {
+ log.info("{} content-length received:\n{}", resp.version(), buf.toString());
+ });
+ });
+ });
+ });
+
+ //发送http2的transfer-encoding:chunked请求体
+ vertx.setTimer(7000, id -> {
+ HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2));
+ httpClient.request(requestOptions).onSuccess(req -> {
+ req.setChunked(true);
+ req.send(body).onSuccess(resp -> {
+ resp.bodyHandler(buf -> {
+ log.info("{} transfer-encoding received:\n{}", resp.version(), buf.toString());
+ });
+ });
+ });
+ });
+
+ LockSupport.park();
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/http/issue13/Issue13SimpleHttpServer.java b/src/test/java/top/meethigher/proxy/http/issue13/Issue13SimpleHttpServer.java
new file mode 100644
index 0000000..7ee2ca3
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/http/issue13/Issue13SimpleHttpServer.java
@@ -0,0 +1,54 @@
+package top.meethigher.proxy.http.issue13;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerOptions;
+import io.vertx.core.http.HttpVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.LockSupport;
+
+public class Issue13SimpleHttpServer {
+ private static final Logger log = LoggerFactory.getLogger(Issue13SimpleHttpServer.class);
+
+ public static void main(String[] args) {
+ Vertx vertx = Vertx.vertx();
+
+
+ HttpServerOptions httpServerOptions = new HttpServerOptions()
+ // 服务端支持与客户端进行协商,支持通过alpn用于协商客户端和服务端使用http1.1还是http2
+ // 开启h2c,使其支持http2,默认情况下http2只在开启了tls使用。如果不开启tls还想使用http2,那么需要开启h2c
+ // alpn基于tls,若未开启tls,则不支持alpn
+ .setAlpnVersions(Collections.unmodifiableList(Arrays.asList(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2)))
+ .setUseAlpn(true)
+ .setHttp2ClearTextEnabled(true);
+
+ HttpServer httpServer = vertx.createHttpServer();
+ httpServer.requestHandler(req -> {
+ req.pause();
+ long id = vertx.setTimer(5000, t -> {
+ req.response().setStatusCode(400).end("body not exist");
+ });
+ req.response().setChunked(true);
+ req.response().write("\nrequest headers:\n");
+ for (String key : req.headers().names()) {
+ req.response().write(key + ":" + req.headers().get(key) + "\n");
+ }
+ req.response().write("\nrequest body:\n");
+ req.bodyHandler(buf -> {
+ vertx.cancelTimer(id);
+ log.info("received:\n{}", buf.toString());
+ req.response().end(buf);
+ });
+ req.resume();
+ });
+ httpServer.listen(18080).onFailure(e -> {
+ System.exit(1);
+ });
+
+ LockSupport.park();
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue8Test.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue8Test.java
new file mode 100644
index 0000000..120cad4
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue8Test.java
@@ -0,0 +1,61 @@
+package top.meethigher.proxy.tcp.tunnel;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetClient;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class Issue8Test {
+
+ private static final Logger log = LoggerFactory.getLogger(Issue8Test.class);
+
+ @Test
+ public void test() throws Exception {
+
+ /**
+ * IDEA 多配置“批量启动”——使用 Run/Debug Configuration 的“Compound”功能
+ * 步骤如下:
+ * 打开Run/Debug Configurations窗口(快捷键 Ctrl+Alt+R / 右上角 Edit Configurations)。
+ *
+ * 新建三个 Application 类型配置,分别设置好 ClassA、ClassB、ClassC 的 main。
+ *
+ * 再新建一个Compound类型配置。
+ *
+ * 在 Compound 配置的“Run/Debug Configurations”里,添加刚刚那三个配置,顺序拖拽即可调整。
+ *
+ * 选中 Compound 配置,点绿色启动按钮,一次性批量顺序启动三(或更多)个实例。
+ *
+ * 注意:Compound 是并发同时,不是顺序启动。IDEA 会按照你添加的顺序一个一个起。
+ */
+
+ // step1: run top.meethigher.proxy.tcp.tunnel.issue8.Issue8BackendServer.main
+
+ // step2: run top.meethigher.proxy.tcp.tunnel.issue8.Issue8TunnelServer.main
+
+ // step3: run top.meethigher.proxy.tcp.tunnel.issue8.Issue8TunnelClient.main
+
+ Vertx vertx = Vertx.vertx();
+ NetClient netClient = vertx.createNetClient();
+ int total = 500;
+ CountDownLatch latch = new CountDownLatch(total);
+ for (int i = 0; i < total; i++) {
+ final String id = String.valueOf(i + 1);
+ netClient.connect(2222, "127.0.0.1").onSuccess(socket -> {
+ socket.pause();
+ socket.handler(buf -> {
+ log.info("{} received: {}", id, buf);
+ latch.countDown();
+ });
+ socket.resume();
+ });
+ }
+
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertEquals(total, total - latch.getCount());
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue9Test.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue9Test.java
new file mode 100644
index 0000000..ae11530
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue9Test.java
@@ -0,0 +1,67 @@
+package top.meethigher.proxy.tcp.tunnel;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.PoolOptions;
+import io.vertx.core.http.RequestOptions;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class Issue9Test {
+ private static final Logger log = LoggerFactory.getLogger(Issue9Test.class);
+
+ @Test
+ public void test() throws Exception {
+
+ /**
+ * IDEA 多配置“批量启动”——使用 Run/Debug Configuration 的“Compound”功能
+ * 步骤如下:
+ * 打开Run/Debug Configurations窗口(快捷键 Ctrl+Alt+R / 右上角 Edit Configurations)。
+ *
+ * 新建三个 Application 类型配置,分别设置好 ClassA、ClassB、ClassC 的 main。
+ *
+ * 再新建一个Compound类型配置。
+ *
+ * 在 Compound 配置的“Run/Debug Configurations”里,添加刚刚那三个配置,顺序拖拽即可调整。
+ *
+ * 选中 Compound 配置,点绿色启动按钮,一次性批量顺序启动三(或更多)个实例。
+ *
+ * 注意:Compound 是并发同时,不是顺序启动。IDEA 会按照你添加的顺序一个一个起。
+ */
+
+ // step1: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9SimpleHttpServer.main
+
+ // step2: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9TunnelServer.main
+
+ // step3: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9TunnelClient.main
+
+ Vertx vertx = Vertx.vertx();
+ HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions()
+ .setKeepAlive(true)
+ .setMaxPoolSize(Runtime.getRuntime().availableProcessors()),
+ new PoolOptions().setHttp1MaxSize(100));
+ int total = 500;
+ CountDownLatch latch = new CountDownLatch(total);
+ for (int i = 0; i < total; i++) {
+ final String id = String.valueOf(i + 1);
+ RequestOptions requestOptions = new RequestOptions()
+ .setAbsoluteURI("http://127.0.0.1:808/api");
+ httpClient.request(requestOptions).onSuccess(req -> {
+ req.send().onSuccess(resp -> {
+ log.info("{} succeeded", id);
+ latch.countDown();
+ });
+ });
+ }
+
+ latch.await(5, TimeUnit.SECONDS);
+ Assert.assertEquals(total, total - latch.getCount());
+
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8BackendServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8BackendServer.java
new file mode 100644
index 0000000..99c7b85
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8BackendServer.java
@@ -0,0 +1,26 @@
+package top.meethigher.proxy.tcp.tunnel.issue8;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.NetServer;
+
+public class Issue8BackendServer {
+ public static void main(String[] args) {
+ Vertx vertx = Vertx.vertx();
+ NetServer netServer = vertx.createNetServer();
+ netServer.connectHandler(socket -> {
+ socket.remoteAddress();
+ socket.write(Buffer.buffer("SSH-2.0-OpenSSH_8.7")).onComplete(ar -> {
+ System.out.println(socket.remoteAddress().toString() + " write " + ar.succeeded());
+ });
+ }).listen(23).onComplete(ar -> {
+ if (ar.succeeded()) {
+ System.out.println("Server started on port " + ar.result().actualPort());
+ } else {
+ System.err.println("Server failed to start");
+ ar.cause().printStackTrace();
+ System.exit(1);
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelClient.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelClient.java
new file mode 100644
index 0000000..f64db92
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelClient.java
@@ -0,0 +1,29 @@
+package top.meethigher.proxy.tcp.tunnel.issue8;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetClientOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient;
+
+import java.util.concurrent.TimeUnit;
+
+public class Issue8TunnelClient {
+
+ private static final Vertx vertx = Vertx.vertx();
+ private static final Logger log = LoggerFactory.getLogger(Issue8TunnelClient.class);
+
+ public static void main(String[] args) {
+ NetClient netClient = vertx.createNetClient(new NetClientOptions()
+ .setIdleTimeout(999999999)
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS));
+ ReverseTcpProxyTunnelClient.create(vertx, netClient)
+ .dataProxyPort(2222)
+ .dataProxyHost("127.0.0.1")
+ .dataProxyName("ssh")
+ .backendHost("127.0.0.1")
+ .backendPort(23)
+ .connect("127.0.0.1", 44444);
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelServer.java
new file mode 100644
index 0000000..84be10c
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelServer.java
@@ -0,0 +1,28 @@
+package top.meethigher.proxy.tcp.tunnel.issue8;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetServer;
+import io.vertx.core.net.NetServerOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelServer;
+
+import java.util.concurrent.TimeUnit;
+
+public class Issue8TunnelServer {
+
+ private static final Vertx vertx = Vertx.vertx();
+ private static final Logger log = LoggerFactory.getLogger(Issue8TunnelServer.class);
+
+ public static void main(String[] args) {
+ NetServer netServer = vertx.createNetServer(
+ new NetServerOptions()
+ .setIdleTimeout(999999999)
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS));
+ ReverseTcpProxyTunnelServer.create(vertx, netServer)
+ .heartbeatDelay(888888888)
+ .judgeDelay(50)
+ .port(44444)
+ .start();
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9SimpleHttpServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9SimpleHttpServer.java
new file mode 100644
index 0000000..5563d3f
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9SimpleHttpServer.java
@@ -0,0 +1,37 @@
+package top.meethigher.proxy.tcp.tunnel.issue9;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.core.http.HttpServerResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Issue9SimpleHttpServer {
+
+ private static final Logger log = LoggerFactory.getLogger(Issue9SimpleHttpServer.class);
+ private static final Vertx vertx = Vertx.vertx();
+
+ public static void main(String[] args) {
+ final String result = "novnc2
";
+ HttpServer httpServer = vertx.createHttpServer();
+ httpServer.requestHandler(req -> {
+ String address = req.connection().remoteAddress().toString();
+ log.info("{} connected", address);
+ req.connection().closeHandler(v -> {
+ log.info("{} closed", address);
+ });
+ HttpServerResponse response = req.response();
+ response.putHeader("Server", "nginx/1.18.0 (Ubuntu)");
+ response.putHeader("Date", "Mon, 02 Jun 2025 07:22:44 GMT");
+ response.putHeader("Content-Type", "text/html");
+ response.putHeader("Content-Length", "512");
+ response.putHeader("Last-Modified", "Sun, 04 Apr 2021 03:44:30 GMT");
+ response.putHeader("ETag", "\"6069361e-200\"");
+ response.putHeader("Accept-Ranges", "bytes");
+ response.setStatusCode(200).end(result);
+ }).listen(80).onFailure(e -> {
+ log.error("http server start failed", e);
+ System.exit(1);
+ });
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelClient.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelClient.java
new file mode 100644
index 0000000..fe0d393
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelClient.java
@@ -0,0 +1,29 @@
+package top.meethigher.proxy.tcp.tunnel.issue9;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetClient;
+import io.vertx.core.net.NetClientOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient;
+
+import java.util.concurrent.TimeUnit;
+
+public class Issue9TunnelClient {
+
+ private static final Vertx vertx = Vertx.vertx();
+ private static final Logger log = LoggerFactory.getLogger(Issue9TunnelClient.class);
+
+ public static void main(String[] args) {
+ NetClient netClient = vertx.createNetClient(new NetClientOptions()
+ .setIdleTimeout(999999999)
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS));
+ ReverseTcpProxyTunnelClient.create(vertx, netClient)
+ .dataProxyPort(808)
+ .dataProxyHost("127.0.0.1")
+ .dataProxyName("http")
+ .backendHost("127.0.0.1")
+ .backendPort(80)
+ .connect("127.0.0.1", 44444);
+ }
+}
diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelServer.java
new file mode 100644
index 0000000..6239d20
--- /dev/null
+++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelServer.java
@@ -0,0 +1,28 @@
+package top.meethigher.proxy.tcp.tunnel.issue9;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.net.NetServer;
+import io.vertx.core.net.NetServerOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelServer;
+
+import java.util.concurrent.TimeUnit;
+
+public class Issue9TunnelServer {
+
+ private static final Vertx vertx = Vertx.vertx();
+ private static final Logger log = LoggerFactory.getLogger(Issue9TunnelServer.class);
+
+ public static void main(String[] args) {
+ NetServer netServer = vertx.createNetServer(
+ new NetServerOptions()
+ .setIdleTimeout(999999999)
+ .setIdleTimeoutUnit(TimeUnit.MILLISECONDS));
+ ReverseTcpProxyTunnelServer.create(vertx, netServer)
+ .heartbeatDelay(888888888)
+ .judgeDelay(50)
+ .port(44444)
+ .start();
+ }
+}