diff --git a/pom.xml b/pom.xml index c20f40d..4b7cd2a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ top.meethigher tcp-reverse-proxy - 1.0.5 + 1.0.6 8 diff --git a/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java b/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java index bdde133..de0878f 100644 --- a/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java +++ b/src/main/java/top/meethigher/proxy/http/ReverseHttpProxy.java @@ -289,12 +289,12 @@ protected Object getRouteMetadata(Route route, String key) { * @param value 数据值 */ protected void setContextData(RoutingContext ctx, String key, Object value) { - ctx.put(key, value == null ? "" : value); + ctx.put(key, value == null ? "null" : value); } protected Object getContextData(RoutingContext ctx, String key) { Object metadata = ctx.get(key); - return metadata == null ? "" : metadata; + return metadata == null ? "null" : metadata; } protected HttpServerResponse setStatusCode(RoutingContext ctx, HttpServerResponse resp, int code) { @@ -630,14 +630,14 @@ protected Handler> connectHandler(RoutingContext setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, true); // 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭 - setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, clientReq.connection().localAddress().toString()); - setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, clientReq.connection().remoteAddress().toString()); - log.debug("{} --> {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR)); - + HttpConnection connection = clientReq.connection(); + setContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR, connection.localAddress().toString()); + setContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR, connection.remoteAddress().toString()); + log.debug("target {} -- {} connected", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR)); - clientReq.connection().closeHandler(v -> { + connection.closeHandler(v -> { setContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN, false); - log.debug("{} --> {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR)); + log.debug("target {} -- {} closed", getContextData(ctx, INTERNAL_CLIENT_LOCAL_ADDR), getContextData(ctx, INTERNAL_CLIENT_REMOTE_ADDR)); }); @@ -645,12 +645,9 @@ protected Handler> connectHandler(RoutingContext copyRequestHeaders(ctx, serverReq, clientReq); if ((boolean) getContextData(ctx, INTERNAL_CLIENT_CONNECTION_OPEN) && (boolean) getContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN)) { - // 若存在请求体,则将请求体复制。使用流式复制,避免占用大量内存 - if (clientReq.headers().contains("Content-Length") || clientReq.headers().contains("Transfer-Encoding")) { - clientReq.send(serverReq).onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl)); - } else { - clientReq.send().onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl)); - } + // bug: https://github.com/meethigher/tcp-reverse-proxy/issues/13 + // 解决办法: 不管是否有请求体,都直接send pipeto。 + clientReq.send(serverReq).onComplete(sendRequestHandler(ctx, serverReq, serverResp, proxyUrl)); } } else { badGateway(ctx, serverResp); @@ -679,6 +676,15 @@ protected Handler routingContextHandler(HttpClient httpClient) { // 暂停流读取 ctx.request().pause(); + HttpConnection connection = ctx.request().connection(); + setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, connection.remoteAddress().toString()); + setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, connection.localAddress().toString()); + // 记录请求开始时间 + setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis()); + // 记录连接状态 + setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true); + log.debug("source {} -- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR)); + // vertx的uri()是包含query参数的。而path()才是我们常说的不带有query的uri // route不是线程安全的。route里的metadata应以路由为单元存储,而不是以请求为单元存储。一个路由会有很多请求。 // 若想要以请求为单元存储数据,应该使用routingContext.put @@ -687,11 +693,6 @@ protected Handler routingContextHandler(HttpClient httpClient) { setContextData(ctx, key, ctx.currentRoute().getMetadata(key)); } - // 记录请求开始时间 - setContextData(ctx, INTERNAL_SEND_TIMESTAMP, System.currentTimeMillis()); - // 记录连接状态 - setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, true); - // 获取代理地址 String proxyUrl = getProxyUrl(ctx, ctx.request(), ctx.response()); setContextData(ctx, INTERNAL_PROXY_URL, proxyUrl); @@ -707,15 +708,9 @@ protected Handler routingContextHandler(HttpClient httpClient) { requestOptions.setMethod(ctx.request().method()); requestOptions.setFollowRedirects(getContextData(ctx, P_FOLLOW_REDIRECTS) != null && Boolean.parseBoolean(getContextData(ctx, P_FOLLOW_REDIRECTS).toString())); - // 注册客户端与代理服务之间连接的断开监听事件。可监听主动关闭和被动关闭 - setContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR, ctx.request().connection().remoteAddress().toString()); - setContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR, ctx.request().connection().localAddress().toString()); - - log.debug("{} <-- {} connected", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR)); - - ctx.request().connection().closeHandler(v -> { + connection.closeHandler(v -> { setContextData(ctx, INTERNAL_SERVER_CONNECTION_OPEN, false); - log.debug("{} <-- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR)); + log.debug("source {} -- {} closed", getContextData(ctx, INTERNAL_SERVER_LOCAL_ADDR), getContextData(ctx, INTERNAL_SERVER_REMOTE_ADDR)); }); // 如果跨域由代理服务接管,那么针对跨域使用的OPTIONS预检请求,就由代理服务接管,而不经过实际的后端服务 diff --git a/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java b/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java index 7662988..911b203 100644 --- a/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java +++ b/src/main/java/top/meethigher/proxy/tcp/ReverseTcpProxy.java @@ -48,47 +48,41 @@ protected ReverseTcpProxy(NetServer netServer, NetClient netClient, sourceSocket.pause(); SocketAddress sourceRemote = sourceSocket.remoteAddress(); SocketAddress sourceLocal = sourceSocket.localAddress(); - log.debug("{} <-- {} connected", sourceLocal, sourceRemote); - sourceSocket.closeHandler(v -> log.debug("{} <-- {} closed", sourceLocal, sourceRemote)); - netClient.connect(targetPort, targetHost).onComplete(ar -> { - if (ar.succeeded()) { - NetSocket targetSocket = ar.result(); - targetSocket.pause(); - SocketAddress targetRemote = targetSocket.remoteAddress(); - SocketAddress targetLocal = targetSocket.localAddress(); - log.debug("{} --> {} connected", targetLocal, targetRemote); - // feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。 - // 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6 - targetSocket.closeHandler(v -> log.debug("{} --> {} closed", targetLocal, targetRemote)); - sourceSocket.pipeTo(targetSocket).onComplete(ar1 -> { - if (ar1.succeeded()) { - log.debug("pipeTo successful. {} --> {} --> {} --> {}", - sourceRemote, sourceLocal, targetLocal, targetRemote); - } else { - log.error("pipeTo failed. {} --> {} --> {} --> {}", - sourceRemote, sourceLocal, targetLocal, targetRemote, - ar1.cause()); - } + log.debug("source {} -- {} connected", sourceLocal, sourceRemote); + sourceSocket.closeHandler(v -> log.debug("source {} -- {} closed", sourceLocal, sourceRemote)); + netClient.connect(targetPort, targetHost) + .onFailure(e -> { + log.error("failed to connect to {}:{}", targetHost, targetPort, e); + // 若连接目标服务失败,需要断开源头服务 + sourceSocket.close(); + }) + .onSuccess(targetSocket -> { + targetSocket.pause(); + SocketAddress targetRemote = targetSocket.remoteAddress(); + SocketAddress targetLocal = targetSocket.localAddress(); + log.debug("target {} -- {} connected", targetLocal, targetRemote); + + // feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。 + // 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6 + targetSocket.closeHandler(v -> log.debug("target {} -- {} closed", targetLocal, targetRemote)); + + // https://github.com/meethigher/tcp-reverse-proxy/issues/12 + // 将日志记录详细,便于排查问题 + sourceSocket.pipeTo(targetSocket) + .onSuccess(v -> log.debug("source {} -- {} pipe to target {} -- {} succeeded", + sourceLocal, sourceRemote, targetLocal, targetRemote)) + .onFailure(e -> log.error("source {} -- {} pipe to target {} -- {} failed", + sourceLocal, sourceRemote, targetLocal, targetRemote, e)); + targetSocket.pipeTo(sourceSocket) + .onSuccess(v -> log.debug("target {} -- {} pipe to source {} -- {} succeeded", + targetLocal, targetRemote, sourceLocal, sourceRemote)) + .onFailure(e -> log.error("target {} -- {} pipe to source {} -- {} failed", + targetLocal, targetRemote, sourceLocal, sourceRemote, e)); + log.debug("source {} -- {} bound to target {} -- {}", + sourceLocal, sourceRemote, targetLocal, targetRemote); + sourceSocket.resume(); + targetSocket.resume(); }); - targetSocket.pipeTo(sourceSocket).onComplete(ar1 -> { - if (ar1.succeeded()) { - log.debug("pipeTo successful. {} <-- {} <-- {} <-- {}", - sourceRemote, sourceLocal, targetLocal, targetRemote); - } else { - log.error("pipeTo failed. {} <-- {} <-- {} <-- {}", - sourceRemote, sourceLocal, targetLocal, targetRemote, - ar1.cause()); - } - }); - sourceSocket.resume(); - targetSocket.resume(); - - } else { - log.error("failed to connect to {}:{}", targetHost, targetPort, ar.cause()); - // 若连接目标服务失败,需要断开源头服务 - sourceSocket.close(); - } - }); }; } diff --git a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java index 891ddc9..67a94a1 100644 --- a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java +++ b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java @@ -214,6 +214,7 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t if (buf.length() < 8) { return; } + // note: 前8个字节是tunnel通信使用的。 if (buf.getByte(0) == Tunnel.DATA_CONN_FLAG[0] && buf.getByte(1) == Tunnel.DATA_CONN_FLAG[1] && buf.getByte(2) == Tunnel.DATA_CONN_FLAG[2] @@ -232,30 +233,46 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t }) .onSuccess(backendSocket -> { backendSocket.pause(); + // 若实际数据传输的长度大于8字节,那么后面的字节需要发出去。 + // https://github.com/meethigher/tcp-reverse-proxy/issues/9 + if (buf.length() > 8) { + backendSocket.write(buf.getBuffer(8, buf.length())) + .onSuccess(o -> log.debug("{}: sessionId {}, data connection {} -- {} write to backend connection {} -- {} succeeded", + dataProxyName, + sessionId, + dataSocket.remoteAddress(), dataSocket.localAddress(), + backendSocket.remoteAddress(), backendSocket.localAddress())); + } log.debug("{}: sessionId {}, backend connection {} -- {} established", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress()); // 双向生命周期绑定、双向数据转发 // feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。 // 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6 - dataSocket.closeHandler(v -> { - log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress()); - }).pipeTo(backendSocket).onFailure(e -> { - log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed", - dataProxyName, - sessionId, - dataSocket.remoteAddress(), dataSocket.localAddress(), - backendSocket.remoteAddress(), backendSocket.localAddress(), - e); - }); - backendSocket.closeHandler(v -> { - log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress()); - }).pipeTo(dataSocket).onFailure(e -> { - log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed", - dataProxyName, - sessionId, - backendSocket.remoteAddress(), backendSocket.localAddress(), - dataSocket.remoteAddress(), dataSocket.localAddress(), - e); - }); + dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress())) + .pipeTo(backendSocket) + .onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed", + dataProxyName, + sessionId, + dataSocket.remoteAddress(), dataSocket.localAddress(), + backendSocket.remoteAddress(), backendSocket.localAddress(), + e)) + .onSuccess(v -> log.debug("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} succeeded", + dataProxyName, + sessionId, + dataSocket.remoteAddress(), dataSocket.localAddress(), + backendSocket.remoteAddress(), backendSocket.localAddress())); + backendSocket.closeHandler(v -> log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress())) + .pipeTo(dataSocket) + .onFailure(e -> log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed", + dataProxyName, + sessionId, + backendSocket.remoteAddress(), backendSocket.localAddress(), + dataSocket.remoteAddress(), dataSocket.localAddress(), + e)) + .onSuccess(v -> log.debug("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} succeeded", + dataProxyName, + sessionId, + backendSocket.remoteAddress(), backendSocket.localAddress(), + dataSocket.remoteAddress(), dataSocket.localAddress())); backendSocket.resume(); dataSocket.resume(); log.debug("{}: sessionId {}, data connection {} -- {} bound to backend connection {} -- {} for session id {}", diff --git a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java index f48a46b..459ac2d 100644 --- a/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java +++ b/src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java @@ -94,7 +94,7 @@ protected void handleConnect(NetSocket socket) { socket.pause(); socket.handler(decode(socket)); socket.closeHandler(v -> { - log.debug("closed {} -- {}", socket.remoteAddress(), socket.localAddress()); + log.debug("{} -- {} closed", socket.remoteAddress(), socket.localAddress()); DataProxyServer removed = authedSockets.remove(socket); if (removed != null) { removed.stop(); @@ -301,25 +301,29 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in // 双向生命周期绑定、双向数据转发 // feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。 // 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6 - userSocket.closeHandler(v -> { - log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress()); - }).pipeTo(dataSocket).onFailure(e -> { - log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed, connection will be closed", - name, - sessionId, - userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress(), e); - }); - dataSocket.closeHandler(v -> { - log.debug("{}: sessionId {}, data connection {} -- {} closed", - name, - sessionId, - dataSocket.remoteAddress(), dataSocket.localAddress()); - }).pipeTo(userSocket).onFailure(e -> { - log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed, connection will be closed", - name, - sessionId, - dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e); - }); + userSocket.closeHandler(v -> log.debug("{}: sessionId {}, user connection {} -- {} closed", name, sessionId, userSocket.remoteAddress(), userSocket.localAddress())) + .pipeTo(dataSocket) + .onFailure(e -> log.error("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} failed", + name, + sessionId, + userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress(), e)) + .onSuccess(v -> log.debug("{}: sessionId {}, user connection {} -- {} pipe to data connection {} -- {} succeeded", + name, + sessionId, + userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress())); + dataSocket.closeHandler(v -> log.debug("{}: sessionId {}, data connection {} -- {} closed", + name, + sessionId, + dataSocket.remoteAddress(), dataSocket.localAddress())) + .pipeTo(userSocket) + .onFailure(e -> log.error("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} failed", + name, + sessionId, + dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e)) + .onSuccess(v -> log.debug("{}: sessionId {}, data connection {} -- {} pipe to user connection {} -- {} succeeded", + name, + sessionId, + dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress())); log.debug("{}: sessionId {}, data connection {} -- {} bound to user connection {} -- {} for session id {}", name, sessionId, @@ -331,7 +335,11 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in .appendBytes(DATA_CONN_FLAG) .appendInt(sessionId)).onSuccess(v -> { // 将用户连接中的缓存数据发出。 - userConn.buffers.forEach(dataSocket::write); + userConn.buffers.forEach(b -> dataSocket.write(b) + .onSuccess(o -> log.debug("{}: sessionId {}, user connection {} -- {} write to data connection {} -- {} succeeded", + name, + sessionId, + userSocket.remoteAddress(), userSocket.localAddress(), dataSocket.remoteAddress(), dataSocket.localAddress()))); }); } @@ -410,7 +418,7 @@ public boolean stopSync() { */ protected void addMessageHandler() { // 监听连接成功事件 - this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} connected", netSocket.remoteAddress())); + this.onConnected((vertx1, netSocket, buffer) -> log.debug("{} -- {} connected", netSocket.remoteAddress(), netSocket.localAddress())); // 监听心跳事件 this.on(TunnelMessageType.HEARTBEAT, new AbstractTunnelHandler() { diff --git a/src/test/java/top/meethigher/proxy/Pipe.java b/src/test/java/top/meethigher/proxy/Pipe.java new file mode 100644 index 0000000..be104d6 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/Pipe.java @@ -0,0 +1,202 @@ +package top.meethigher.proxy; + +import io.vertx.core.*; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.NetSocket; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 在io.vertx.core.streams.impl.PipeImpl + */ +public class Pipe implements io.vertx.core.streams.Pipe { + + private static final Logger log = LoggerFactory.getLogger(Pipe.class); + private final Promise result; + private final ReadStream src; + private boolean endOnSuccess = true; + private boolean endOnFailure = true; + private WriteStream dst; + + public Pipe(ReadStream src) { + this.src = src; + this.result = Promise.promise(); + if (src instanceof NetSocket) { + NetSocket tsrc = (NetSocket) src; + tsrc.remoteAddress(); + tsrc.localAddress(); + } else if (src instanceof HttpServerRequest) { + HttpServerRequest req = (HttpServerRequest) src; + req.connection().remoteAddress(); + req.connection().localAddress(); + } else if (src instanceof HttpClientResponse) { + HttpClientResponse resp = (HttpClientResponse) src; + resp.request().connection().remoteAddress(); + resp.request().connection().localAddress(); + } + + // Set handlers now + src.endHandler(result::tryComplete); + src.exceptionHandler(result::tryFail); + } + + @Override + public synchronized io.vertx.core.streams.Pipe endOnFailure(boolean end) { + endOnFailure = end; + return this; + } + + @Override + public synchronized io.vertx.core.streams.Pipe endOnSuccess(boolean end) { + endOnSuccess = end; + return this; + } + + @Override + public synchronized io.vertx.core.streams.Pipe endOnComplete(boolean end) { + endOnSuccess = end; + endOnFailure = end; + return this; + } + + private void handleWriteResult(AsyncResult ack) { + if (ack.failed()) { + result.tryFail(new WriteException(ack.cause())); + } + } + + @Override + public void to(WriteStream ws, Handler> completionHandler) { + if (ws == null) { + throw new NullPointerException(); + } + boolean endOnSuccess; + boolean endOnFailure; + synchronized (Pipe.this) { + if (dst != null) { + throw new IllegalStateException(); + } + dst = ws; + endOnSuccess = this.endOnSuccess; + endOnFailure = this.endOnFailure; + } + Handler drainHandler = v -> src.resume(); + src.handler(item -> { + if (src instanceof NetSocket) { + NetSocket tsrc = (NetSocket) src; + log.trace("{} -- {} received:\n{}", tsrc.remoteAddress(), tsrc.localAddress(), + item.toString()); + } else if (src instanceof HttpServerRequest) { + HttpServerRequest req = (HttpServerRequest) src; + log.trace("{} -- {} received:\n{}", + req.connection().remoteAddress(), + req.connection().localAddress(), + item.toString()); + } else if (src instanceof HttpClientResponse) { + HttpClientResponse resp = (HttpClientResponse) src; + log.trace("{} -- {} received:\n{}", + resp.request().connection().remoteAddress(), + resp.request().connection().localAddress(), + item.toString()); + } + ws.write(item, this::handleWriteResult); + if (ws.writeQueueFull()) { + src.pause(); + ws.drainHandler(drainHandler); + } + }); + src.resume(); + result.future().onComplete(ar -> { + try { + src.handler(null); + } catch (Exception ignore) { + } + try { + src.exceptionHandler(null); + } catch (Exception ignore) { + } + try { + src.endHandler(null); + } catch (Exception ignore) { + } + if (ar.succeeded()) { + handleSuccess(completionHandler); + } else { + Throwable err = ar.cause(); + if (err instanceof WriteException) { + src.resume(); + err = err.getCause(); + } + handleFailure(err, completionHandler); + } + }); + } + + private void handleSuccess(Handler> completionHandler) { + if (endOnSuccess) { + dst.end(completionHandler); + } else { + completionHandler.handle(Future.succeededFuture()); + } + } + + private void handleFailure(Throwable cause, Handler> completionHandler) { + Future res = Future.failedFuture(cause); + if (endOnFailure) { + dst.end(ignore -> { + completionHandler.handle(res); + }); + } else { + completionHandler.handle(res); + } + } + + public void close() { + synchronized (this) { + src.exceptionHandler(null); + src.handler(null); + if (dst != null) { + dst.drainHandler(null); + dst.exceptionHandler(null); + } + } + VertxException err = new VertxException("Pipe closed", true); + if (result.tryFail(err)) { + src.resume(); + } + } + + private static class WriteException extends VertxException { + private WriteException(Throwable cause) { + super(cause, true); + } + } + + /** + * 转换为八进制字符串 + * Wireshark默认抓包时,使用的就是八进制字符串 + */ + private static String toOctString(byte[] buf) { + StringBuilder sb = new StringBuilder(); + for (byte value : buf) { + int b = value & 0xFF;// 转为无符号字节 + sb.append(String.format("%03o ", b));// ddd格式。每个d表示一个0-7的数字 + } + return sb.toString(); + } + + /** + * 转换为16进制字符串 + */ + private static String toHexString(byte[] buf) { + StringBuilder sb = new StringBuilder(); + for (byte value : buf) { + int b = value & 0xFF;//转为无符号字节 + sb.append(String.format("%02x ", b));// 两位十六进制,不足补0 + } + return sb.toString(); + } +} diff --git a/src/test/java/top/meethigher/proxy/http/Issue13Test.java b/src/test/java/top/meethigher/proxy/http/Issue13Test.java new file mode 100644 index 0000000..7af747e --- /dev/null +++ b/src/test/java/top/meethigher/proxy/http/Issue13Test.java @@ -0,0 +1,109 @@ +package top.meethigher.proxy.http; + +import io.vertx.core.Vertx; +import io.vertx.core.http.*; +import io.vertx.ext.web.Router; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.locks.LockSupport; + +public class Issue13Test { + + private static final Vertx vertx = Vertx.vertx(); + private static final Logger log = LoggerFactory.getLogger(Issue13Test.class); + + @Test + public void name() { + + HttpServerOptions httpServerOptions = new HttpServerOptions() + // 服务端支持与客户端进行协商,支持通过alpn用于协商客户端和服务端使用http1.1还是http2 + // 开启h2c,使其支持http2,默认情况下http2只在开启了tls使用。如果不开启tls还想使用http2,那么需要开启h2c + // alpn基于tls,若未开启tls,则不支持alpn + .setAlpnVersions(Collections.unmodifiableList(Arrays.asList(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2))) + .setUseAlpn(true) + .setHttp2ClearTextEnabled(true); + HttpServer server = vertx.createHttpServer(httpServerOptions); + + HttpClientOptions httpClientOptions = new HttpClientOptions() + // 设置客户端默认使用的HTTP协议版本是http1.1,并且开启alpn支持协商http1.1和http2 + // alpn基于tls,若对方没有开启tls,则不支持alpn + .setProtocolVersion(HttpVersion.HTTP_1_1) + .setUseAlpn(true) + .setAlpnVersions(new ArrayList() {{ + add(HttpVersion.HTTP_1_1); + add(HttpVersion.HTTP_2); + }}); + HttpClient client = vertx.createHttpClient(httpClientOptions); + + ReverseHttpProxy.create(Router.router(vertx), server, client) + .port(18088) + .addRoute(new ProxyRoute() + .setName("proxy") + .setTargetUrl("http://127.0.0.1:18080") + .setSourceUrl("/*")) + .start(); + + + RequestOptions requestOptions = new RequestOptions() + .setAbsoluteURI("http://127.0.0.1:18088/test") + .setMethod(HttpMethod.POST); + + String body = "halo wode"; + + // 发送http1.1的content-length请求体 + vertx.setTimer(4000, id -> { + HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_1_1)); + httpClient.request(requestOptions).onSuccess(req -> { + req.send(body).onSuccess(resp -> { + resp.bodyHandler(buf -> { + log.info("{} content-length received:\n{}", resp.version(), buf.toString()); + }); + }); + }); + }); + // 发送http1.1的transfer-encoding:chunked请求体 + vertx.setTimer(5000, id -> { + HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_1_1)); + httpClient.request(requestOptions).onSuccess(req -> { + req.setChunked(true); + req.send(body).onSuccess(resp -> { + resp.bodyHandler(buf -> { + log.info("{} transfer-encoding received:\n{}", resp.version(), buf.toString()); + }); + }); + }); + }); + + // 发送http2的content-length请求体 + vertx.setTimer(6000, id -> { + HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2)); + httpClient.request(requestOptions).onSuccess(req -> { + req.send(body).onSuccess(resp -> { + resp.bodyHandler(buf -> { + log.info("{} content-length received:\n{}", resp.version(), buf.toString()); + }); + }); + }); + }); + + //发送http2的transfer-encoding:chunked请求体 + vertx.setTimer(7000, id -> { + HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2)); + httpClient.request(requestOptions).onSuccess(req -> { + req.setChunked(true); + req.send(body).onSuccess(resp -> { + resp.bodyHandler(buf -> { + log.info("{} transfer-encoding received:\n{}", resp.version(), buf.toString()); + }); + }); + }); + }); + + LockSupport.park(); + } +} diff --git a/src/test/java/top/meethigher/proxy/http/issue13/Issue13SimpleHttpServer.java b/src/test/java/top/meethigher/proxy/http/issue13/Issue13SimpleHttpServer.java new file mode 100644 index 0000000..7ee2ca3 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/http/issue13/Issue13SimpleHttpServer.java @@ -0,0 +1,54 @@ +package top.meethigher.proxy.http.issue13; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.locks.LockSupport; + +public class Issue13SimpleHttpServer { + private static final Logger log = LoggerFactory.getLogger(Issue13SimpleHttpServer.class); + + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + + + HttpServerOptions httpServerOptions = new HttpServerOptions() + // 服务端支持与客户端进行协商,支持通过alpn用于协商客户端和服务端使用http1.1还是http2 + // 开启h2c,使其支持http2,默认情况下http2只在开启了tls使用。如果不开启tls还想使用http2,那么需要开启h2c + // alpn基于tls,若未开启tls,则不支持alpn + .setAlpnVersions(Collections.unmodifiableList(Arrays.asList(HttpVersion.HTTP_1_1, HttpVersion.HTTP_2))) + .setUseAlpn(true) + .setHttp2ClearTextEnabled(true); + + HttpServer httpServer = vertx.createHttpServer(); + httpServer.requestHandler(req -> { + req.pause(); + long id = vertx.setTimer(5000, t -> { + req.response().setStatusCode(400).end("body not exist"); + }); + req.response().setChunked(true); + req.response().write("\nrequest headers:\n"); + for (String key : req.headers().names()) { + req.response().write(key + ":" + req.headers().get(key) + "\n"); + } + req.response().write("\nrequest body:\n"); + req.bodyHandler(buf -> { + vertx.cancelTimer(id); + log.info("received:\n{}", buf.toString()); + req.response().end(buf); + }); + req.resume(); + }); + httpServer.listen(18080).onFailure(e -> { + System.exit(1); + }); + + LockSupport.park(); + } +} diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue8Test.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue8Test.java new file mode 100644 index 0000000..120cad4 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue8Test.java @@ -0,0 +1,61 @@ +package top.meethigher.proxy.tcp.tunnel; + +import io.vertx.core.Vertx; +import io.vertx.core.net.NetClient; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class Issue8Test { + + private static final Logger log = LoggerFactory.getLogger(Issue8Test.class); + + @Test + public void test() throws Exception { + + /** + * IDEA 多配置“批量启动”——使用 Run/Debug Configuration 的“Compound”功能 + * 步骤如下: + * 打开Run/Debug Configurations窗口(快捷键 Ctrl+Alt+R / 右上角 Edit Configurations)。 + * + * 新建三个 Application 类型配置,分别设置好 ClassA、ClassB、ClassC 的 main。 + * + * 再新建一个Compound类型配置。 + * + * 在 Compound 配置的“Run/Debug Configurations”里,添加刚刚那三个配置,顺序拖拽即可调整。 + * + * 选中 Compound 配置,点绿色启动按钮,一次性批量顺序启动三(或更多)个实例。 + * + * 注意:Compound 是并发同时,不是顺序启动。IDEA 会按照你添加的顺序一个一个起。 + */ + + // step1: run top.meethigher.proxy.tcp.tunnel.issue8.Issue8BackendServer.main + + // step2: run top.meethigher.proxy.tcp.tunnel.issue8.Issue8TunnelServer.main + + // step3: run top.meethigher.proxy.tcp.tunnel.issue8.Issue8TunnelClient.main + + Vertx vertx = Vertx.vertx(); + NetClient netClient = vertx.createNetClient(); + int total = 500; + CountDownLatch latch = new CountDownLatch(total); + for (int i = 0; i < total; i++) { + final String id = String.valueOf(i + 1); + netClient.connect(2222, "127.0.0.1").onSuccess(socket -> { + socket.pause(); + socket.handler(buf -> { + log.info("{} received: {}", id, buf); + latch.countDown(); + }); + socket.resume(); + }); + } + + latch.await(5, TimeUnit.SECONDS); + Assert.assertEquals(total, total - latch.getCount()); + } +} diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue9Test.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue9Test.java new file mode 100644 index 0000000..ae11530 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/Issue9Test.java @@ -0,0 +1,67 @@ +package top.meethigher.proxy.tcp.tunnel; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.PoolOptions; +import io.vertx.core.http.RequestOptions; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class Issue9Test { + private static final Logger log = LoggerFactory.getLogger(Issue9Test.class); + + @Test + public void test() throws Exception { + + /** + * IDEA 多配置“批量启动”——使用 Run/Debug Configuration 的“Compound”功能 + * 步骤如下: + * 打开Run/Debug Configurations窗口(快捷键 Ctrl+Alt+R / 右上角 Edit Configurations)。 + * + * 新建三个 Application 类型配置,分别设置好 ClassA、ClassB、ClassC 的 main。 + * + * 再新建一个Compound类型配置。 + * + * 在 Compound 配置的“Run/Debug Configurations”里,添加刚刚那三个配置,顺序拖拽即可调整。 + * + * 选中 Compound 配置,点绿色启动按钮,一次性批量顺序启动三(或更多)个实例。 + * + * 注意:Compound 是并发同时,不是顺序启动。IDEA 会按照你添加的顺序一个一个起。 + */ + + // step1: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9SimpleHttpServer.main + + // step2: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9TunnelServer.main + + // step3: run top.meethigher.proxy.tcp.tunnel.issue9.Issue9TunnelClient.main + + Vertx vertx = Vertx.vertx(); + HttpClient httpClient = vertx.createHttpClient(new HttpClientOptions() + .setKeepAlive(true) + .setMaxPoolSize(Runtime.getRuntime().availableProcessors()), + new PoolOptions().setHttp1MaxSize(100)); + int total = 500; + CountDownLatch latch = new CountDownLatch(total); + for (int i = 0; i < total; i++) { + final String id = String.valueOf(i + 1); + RequestOptions requestOptions = new RequestOptions() + .setAbsoluteURI("http://127.0.0.1:808/api"); + httpClient.request(requestOptions).onSuccess(req -> { + req.send().onSuccess(resp -> { + log.info("{} succeeded", id); + latch.countDown(); + }); + }); + } + + latch.await(5, TimeUnit.SECONDS); + Assert.assertEquals(total, total - latch.getCount()); + + } +} diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8BackendServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8BackendServer.java new file mode 100644 index 0000000..99c7b85 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8BackendServer.java @@ -0,0 +1,26 @@ +package top.meethigher.proxy.tcp.tunnel.issue8; + +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetServer; + +public class Issue8BackendServer { + public static void main(String[] args) { + Vertx vertx = Vertx.vertx(); + NetServer netServer = vertx.createNetServer(); + netServer.connectHandler(socket -> { + socket.remoteAddress(); + socket.write(Buffer.buffer("SSH-2.0-OpenSSH_8.7")).onComplete(ar -> { + System.out.println(socket.remoteAddress().toString() + " write " + ar.succeeded()); + }); + }).listen(23).onComplete(ar -> { + if (ar.succeeded()) { + System.out.println("Server started on port " + ar.result().actualPort()); + } else { + System.err.println("Server failed to start"); + ar.cause().printStackTrace(); + System.exit(1); + } + }); + } +} \ No newline at end of file diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelClient.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelClient.java new file mode 100644 index 0000000..f64db92 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelClient.java @@ -0,0 +1,29 @@ +package top.meethigher.proxy.tcp.tunnel.issue8; + +import io.vertx.core.Vertx; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient; + +import java.util.concurrent.TimeUnit; + +public class Issue8TunnelClient { + + private static final Vertx vertx = Vertx.vertx(); + private static final Logger log = LoggerFactory.getLogger(Issue8TunnelClient.class); + + public static void main(String[] args) { + NetClient netClient = vertx.createNetClient(new NetClientOptions() + .setIdleTimeout(999999999) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)); + ReverseTcpProxyTunnelClient.create(vertx, netClient) + .dataProxyPort(2222) + .dataProxyHost("127.0.0.1") + .dataProxyName("ssh") + .backendHost("127.0.0.1") + .backendPort(23) + .connect("127.0.0.1", 44444); + } +} diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelServer.java new file mode 100644 index 0000000..84be10c --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue8/Issue8TunnelServer.java @@ -0,0 +1,28 @@ +package top.meethigher.proxy.tcp.tunnel.issue8; + +import io.vertx.core.Vertx; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelServer; + +import java.util.concurrent.TimeUnit; + +public class Issue8TunnelServer { + + private static final Vertx vertx = Vertx.vertx(); + private static final Logger log = LoggerFactory.getLogger(Issue8TunnelServer.class); + + public static void main(String[] args) { + NetServer netServer = vertx.createNetServer( + new NetServerOptions() + .setIdleTimeout(999999999) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)); + ReverseTcpProxyTunnelServer.create(vertx, netServer) + .heartbeatDelay(888888888) + .judgeDelay(50) + .port(44444) + .start(); + } +} diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9SimpleHttpServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9SimpleHttpServer.java new file mode 100644 index 0000000..5563d3f --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9SimpleHttpServer.java @@ -0,0 +1,37 @@ +package top.meethigher.proxy.tcp.tunnel.issue9; + +import io.vertx.core.Vertx; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Issue9SimpleHttpServer { + + private static final Logger log = LoggerFactory.getLogger(Issue9SimpleHttpServer.class); + private static final Vertx vertx = Vertx.vertx(); + + public static void main(String[] args) { + final String result = "novnc2
"; + HttpServer httpServer = vertx.createHttpServer(); + httpServer.requestHandler(req -> { + String address = req.connection().remoteAddress().toString(); + log.info("{} connected", address); + req.connection().closeHandler(v -> { + log.info("{} closed", address); + }); + HttpServerResponse response = req.response(); + response.putHeader("Server", "nginx/1.18.0 (Ubuntu)"); + response.putHeader("Date", "Mon, 02 Jun 2025 07:22:44 GMT"); + response.putHeader("Content-Type", "text/html"); + response.putHeader("Content-Length", "512"); + response.putHeader("Last-Modified", "Sun, 04 Apr 2021 03:44:30 GMT"); + response.putHeader("ETag", "\"6069361e-200\""); + response.putHeader("Accept-Ranges", "bytes"); + response.setStatusCode(200).end(result); + }).listen(80).onFailure(e -> { + log.error("http server start failed", e); + System.exit(1); + }); + } +} \ No newline at end of file diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelClient.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelClient.java new file mode 100644 index 0000000..fe0d393 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelClient.java @@ -0,0 +1,29 @@ +package top.meethigher.proxy.tcp.tunnel.issue9; + +import io.vertx.core.Vertx; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelClient; + +import java.util.concurrent.TimeUnit; + +public class Issue9TunnelClient { + + private static final Vertx vertx = Vertx.vertx(); + private static final Logger log = LoggerFactory.getLogger(Issue9TunnelClient.class); + + public static void main(String[] args) { + NetClient netClient = vertx.createNetClient(new NetClientOptions() + .setIdleTimeout(999999999) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)); + ReverseTcpProxyTunnelClient.create(vertx, netClient) + .dataProxyPort(808) + .dataProxyHost("127.0.0.1") + .dataProxyName("http") + .backendHost("127.0.0.1") + .backendPort(80) + .connect("127.0.0.1", 44444); + } +} diff --git a/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelServer.java b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelServer.java new file mode 100644 index 0000000..6239d20 --- /dev/null +++ b/src/test/java/top/meethigher/proxy/tcp/tunnel/issue9/Issue9TunnelServer.java @@ -0,0 +1,28 @@ +package top.meethigher.proxy.tcp.tunnel.issue9; + +import io.vertx.core.Vertx; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import top.meethigher.proxy.tcp.tunnel.ReverseTcpProxyTunnelServer; + +import java.util.concurrent.TimeUnit; + +public class Issue9TunnelServer { + + private static final Vertx vertx = Vertx.vertx(); + private static final Logger log = LoggerFactory.getLogger(Issue9TunnelServer.class); + + public static void main(String[] args) { + NetServer netServer = vertx.createNetServer( + new NetServerOptions() + .setIdleTimeout(999999999) + .setIdleTimeoutUnit(TimeUnit.MILLISECONDS)); + ReverseTcpProxyTunnelServer.create(vertx, netServer) + .heartbeatDelay(888888888) + .judgeDelay(50) + .port(44444) + .start(); + } +}