Skip to content

Commit 43c2f1a

Browse files
committed
fix: 1. 解决内网穿透在高并发时偶现数据丢失问题 2. 更新整体流程的时序图
1 parent d7647f1 commit 43c2f1a

File tree

3 files changed

+125
-98
lines changed

3 files changed

+125
-98
lines changed

README.md

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ ReverseTcpProxy.create(Vertx.vertx(), "10.0.0.1", 8080)
5151

5252
## 二、TCP内网穿透
5353

54-
虚线表示进程内部通信。实线表示外部通信
54+
虚线表示控制连接通信,实线表示非控制连接通信
5555

5656
一些代码上的设计思路,参考[socket.io-client-java](https://github.com/socketio/socket.io-client-java/blob/socket.io-client-2.1.0/src/main/java/io/socket/client/Socket.java)
5757

@@ -63,34 +63,34 @@ sequenceDiagram
6363
participant ts as TunnelServer
6464
participant tc as TunnelClient
6565
participant bs as BackendServer
66-
67-
ts-->ts: 监听44444端口
68-
tc->>ts: 建立控制连接、发送鉴权密钥、申请启用22端口数据服务
69-
ts-->ts: 鉴权校验通过
70-
ts-->>dps: 你要开启22端口
71-
dps-->dps: 监听22端口
72-
dps-->>ts: 已开启
73-
ts->>tc: 成功
74-
tc-->tc: 开启与控制服务的周期心跳
66+
bs->bs: 监听22端口
67+
ts->ts: 监听44444端口
68+
tc-->>ts: 建立控制连接、发送鉴权密钥、申请启用2222端口数据服务
69+
ts->ts: 鉴权校验通过
70+
ts->>dps: 你要开启2222端口
71+
dps->dps: 监听2222端口
72+
dps->>ts: 已开启
73+
ts-->>tc: 端口已启用
74+
tc->tc: 开启与控制服务的周期心跳
7575
loop 控制连接保活
76-
tc->>ts: 发送心跳
77-
ts->>tc: 响应心跳
76+
tc-->>ts: 发送心跳
77+
ts-->>tc: 响应心跳
7878
end
79-
u->>dps: 建立数据连接
80-
dps-->>ts: 通知
81-
ts->>tc: 你需要主动与22数据服务建立数据连接
82-
tc->>dps: 建立数据连接
79+
u->>dps: 建立用户连接
80+
dps->>ts:
81+
ts-->>tc: 你需要主动与2222数据服务端口建立数据连接
82+
tc->>dps: 建立数据连接,并告知对方“我是数据连接”
83+
tc-->>ts: 连接已建立
8384
note left of dps: 用户连接和数据连接绑定双向生命周期、双向数据传输
85+
dps->>dps: 绑定用户连接与数据连接
86+
dps->>tc: 用户连接与数据连接已绑定
8487
tc->>bs: 建立后端连接
85-
dps->>tc: 成功
86-
bs->>tc: 成功
88+
bs->>tc: 后端连接已建立
8789
note right of tc: 数据连接和后端连接绑定双向生命周期、双向数据传输
88-
90+
tc->>tc: 绑定数据连接和后端连接
8991
u->dps: 双向传输
9092
dps->tc: 双向传输
9193
tc->bs: 双向传输
92-
93-
9494
```
9595

9696
假如我有一个内网`SSH`服务`10.0.0.10:22`,需要通过`192.168.0.200:22`穿透出去。并且网络条件受限如下

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelClient.java

Lines changed: 94 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package top.meethigher.proxy.tcp.tunnel;
22

3-
import io.vertx.core.AsyncResult;
43
import io.vertx.core.Handler;
54
import io.vertx.core.Vertx;
65
import io.vertx.core.buffer.Buffer;
@@ -185,81 +184,106 @@ protected boolean doHandle(Vertx vertx, NetSocket netSocket, TunnelMessageType t
185184
try {
186185
TunnelMessage.OpenDataConn parsed = TunnelMessage.OpenDataConn.parseFrom(bodyBytes);
187186
final int sessionId = parsed.getSessionId();
188-
// 保证顺序执行
187+
// 保证顺序,并将建立数据连接的逻辑返回给控制连接
189188
CountDownLatch latch = new CountDownLatch(1);
190-
Handler<AsyncResult<NetSocket>> asyncResultHandler = ar -> {
191-
if (ar.succeeded()) {
192-
final NetSocket dataSocket = ar.result();
193-
dataSocket.pause();
194-
// 连接建立成功后,立马发送消息告诉数据服务,我是数据连接,并与用户连接进行绑定
195-
dataSocket.write(Buffer.buffer()
196-
.appendBytes(DATA_CONN_FLAG)
197-
.appendInt(sessionId));
198-
log.debug("{}: sessionId {}, data connection {} -- {} established. wait for backend connection",
199-
dataProxyName,
200-
sessionId,
201-
dataSocket.remoteAddress(), dataSocket.localAddress());
202-
netClient.connect(backendPort, backendHost).onComplete(rst -> {
203-
if (rst.succeeded()) {
204-
atomicResult.set(rst.succeeded());
205-
final NetSocket backendSocket = rst.result();
206-
backendSocket.pause();
207-
log.debug("{}: sessionId {}, backend connection {} -- {} established", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
208-
// 双向生命周期绑定、双向数据转发
209-
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
210-
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
211-
dataSocket.closeHandler(v -> {
212-
log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress());
213-
}).pipeTo(backendSocket).onFailure(e -> {
214-
log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
215-
dataProxyName,
216-
sessionId,
217-
dataSocket.remoteAddress(), dataSocket.localAddress(),
218-
backendSocket.remoteAddress(), backendSocket.localAddress(),
219-
e);
220-
});
221-
backendSocket.closeHandler(v -> {
222-
log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
223-
}).pipeTo(dataSocket).onFailure(e -> {
224-
log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
225-
dataProxyName,
226-
sessionId,
227-
backendSocket.remoteAddress(), backendSocket.localAddress(),
228-
dataSocket.remoteAddress(), dataSocket.localAddress(),
229-
e);
230-
});
231-
backendSocket.resume();
232-
dataSocket.resume();
233-
log.debug("{}: sessionId {}, data connection {} -- {} bound to backend connection {} -- {} for session id {}",
234-
dataProxyName,
235-
sessionId,
236-
dataSocket.remoteAddress(), dataSocket.localAddress(),
237-
backendSocket.remoteAddress(), backendSocket.localAddress(),
238-
sessionId);
239-
} else {
240-
// 建立连接失败,那么数据连接就要关闭
241-
dataSocket.close();
242-
log.error("{}: sessionId {}, client open backend connection to {}:{} failed",
243-
dataProxyName,
244-
sessionId,
245-
backendHost, backendPort, rst.cause());
246-
}
247-
latch.countDown();
248-
});
249-
} else {
250-
log.error("{}: sessionId {}, client open data connection to {}:{} failed", dataProxyName,
251-
sessionId,
252-
dataProxyHost, dataProxyPort, ar.cause());
253-
latch.countDown();
254-
}
189+
// 建立数据连接
190+
Handler<Throwable> dataSocketFailureHandler = e -> {
191+
log.error("{}: sessionId {}, client failed to open data connection {}:{}",
192+
dataProxyName,
193+
sessionId,
194+
dataProxyHost,
195+
dataProxyPort);
196+
latch.countDown();
197+
};
198+
Handler<NetSocket> dataSocketSuccessHandler = dataSocket -> {
199+
atomicResult.set(true);
200+
latch.countDown();
201+
dataSocket.pause();
202+
log.debug("{}: sessionId {}, data connection {} -- {} established. ",
203+
dataProxyName,
204+
sessionId,
205+
dataSocket.remoteAddress(), dataSocket.localAddress());
206+
// 连接建立成功后,立马发送消息告诉数据服务"我是数据连接"
207+
dataSocket.write(Buffer.buffer()
208+
.appendBytes(DATA_CONN_FLAG)
209+
.appendInt(sessionId));
210+
final Buffer buf = Buffer.buffer();
211+
// 等待数据连接返回与用户连接的绑定结果
212+
dataSocket.handler(buffer -> {
213+
buf.appendBuffer(buffer);
214+
if (buf.length() < 8) {
215+
return;
216+
}
217+
if (buf.getByte(0) == Tunnel.DATA_CONN_FLAG[0]
218+
&& buf.getByte(1) == Tunnel.DATA_CONN_FLAG[1]
219+
&& buf.getByte(2) == Tunnel.DATA_CONN_FLAG[2]
220+
&& buf.getByte(3) == Tunnel.DATA_CONN_FLAG[3]
221+
&& buf.getInt(4) == sessionId
222+
) {
223+
// 用户连接已成功与数据连接绑定。开始建立后端连接
224+
dataSocket.pause();
225+
netClient.connect(backendPort, backendHost)
226+
.onFailure(e -> {
227+
log.error("{}: sessionId {}, client open backend connection to {}:{} failed",
228+
dataProxyName,
229+
sessionId,
230+
backendHost, backendPort, e);
231+
dataSocket.close();
232+
})
233+
.onSuccess(backendSocket -> {
234+
backendSocket.pause();
235+
log.debug("{}: sessionId {}, backend connection {} -- {} established", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
236+
// 双向生命周期绑定、双向数据转发
237+
// feat: v1.0.5以前的版本,在closeHandler里面,将对端连接也关闭。比如targetSocket关闭时,则将sourceSocket也关闭。
238+
// 结果导致在转发短连接时,出现了bug。参考https://github.com/meethigher/tcp-reverse-proxy/issues/6
239+
dataSocket.closeHandler(v -> {
240+
log.debug("{}: sessionId {}, data connection {} -- {} closed", dataProxyName, sessionId, dataSocket.remoteAddress(), dataSocket.localAddress());
241+
}).pipeTo(backendSocket).onFailure(e -> {
242+
log.error("{}: sessionId {}, data connection {} -- {} pipe to backend connection {} -- {} failed",
243+
dataProxyName,
244+
sessionId,
245+
dataSocket.remoteAddress(), dataSocket.localAddress(),
246+
backendSocket.remoteAddress(), backendSocket.localAddress(),
247+
e);
248+
});
249+
backendSocket.closeHandler(v -> {
250+
log.debug("{}: sessionId {}, backend connection {} -- {} closed", dataProxyName, sessionId, backendSocket.remoteAddress(), backendSocket.localAddress());
251+
}).pipeTo(dataSocket).onFailure(e -> {
252+
log.error("{}: sessionId {}, backend connection {} -- {} pipe to data connection {} -- {} failed",
253+
dataProxyName,
254+
sessionId,
255+
backendSocket.remoteAddress(), backendSocket.localAddress(),
256+
dataSocket.remoteAddress(), dataSocket.localAddress(),
257+
e);
258+
});
259+
backendSocket.resume();
260+
dataSocket.resume();
261+
log.debug("{}: sessionId {}, data connection {} -- {} bound to backend connection {} -- {} for session id {}",
262+
dataProxyName,
263+
sessionId,
264+
dataSocket.remoteAddress(), dataSocket.localAddress(),
265+
backendSocket.remoteAddress(), backendSocket.localAddress(),
266+
sessionId);
267+
});
255268

269+
} else {
270+
dataSocket.close();
271+
log.warn("{}: sessionId {}, data connection {} -- {} received invalid message, will be closed. ",
272+
dataProxyName,
273+
sessionId,
274+
dataSocket.remoteAddress(), dataSocket.localAddress());
275+
}
276+
});
277+
dataSocket.resume();
256278
};
257-
netClient.connect(dataProxyPort, dataProxyHost).onComplete(asyncResultHandler);
279+
netClient.connect(dataProxyPort, dataProxyHost)
280+
.onFailure(dataSocketFailureHandler)
281+
.onSuccess(dataSocketSuccessHandler);
258282
latch.await();
259-
netSocket.write(encode(TunnelMessageType.OPEN_DATA_CONN_ACK, TunnelMessage.OpenDataConnAck.newBuilder()
260-
.setSuccess(atomicResult.get()).setMessage("").build().toByteArray()));
261283
} catch (Exception ignore) {
262284
}
285+
netSocket.write(encode(TunnelMessageType.OPEN_DATA_CONN_ACK, TunnelMessage.OpenDataConnAck.newBuilder()
286+
.setSuccess(atomicResult.get()).setMessage("").build().toByteArray()));
263287
return atomicResult.get();
264288
}
265289
});

src/main/java/top/meethigher/proxy/tcp/tunnel/ReverseTcpProxyTunnelServer.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ protected void handleDataConnection(NetSocket socket, Buffer buf, long timerId)
254254
}
255255
// 数据连接
256256
int sessionId = buf.getInt(4);
257-
log.debug("{}: sessionId {}, connection {} -- {} is a data connection!", name, sessionId, socket.remoteAddress(), socket.localAddress());
257+
log.debug("{}: sessionId {}, connection {} -- {} is a data connection", name, sessionId, socket.remoteAddress(), socket.localAddress());
258258
UserConnection userConn = unboundUserConnections.remove(sessionId);
259259
if (userConn != null) {
260260
bindConnections(userConn, socket, sessionId);
@@ -278,15 +278,12 @@ protected void handleUserConnection(NetSocket socket, Buffer buf, long timerId)
278278
}
279279
// 用户连接
280280
int sessionId = IdGenerator.nextId();
281-
log.debug("{}: sessionId {}, connection {} -- {} is a user connection!", name, sessionId, socket.remoteAddress(), socket.localAddress());
281+
log.debug("{}: sessionId {}, connection {} -- {} is a user connection", name, sessionId, socket.remoteAddress(), socket.localAddress());
282282
UserConnection userConn = new UserConnection(sessionId, socket, new ArrayList<>());
283283
if (buf != null) {
284284
userConn.buffers.add(buf.copy());
285285
}
286286
unboundUserConnections.put(sessionId, userConn);
287-
log.debug("{}: sessionId {}, user connection {} -- {} wait for data connection ...",
288-
name, sessionId,
289-
socket.remoteAddress(), socket.localAddress());
290287
// 通过控制连接通知TunnelClient主动建立数据连接。服务端不需要通知客户端需要连接的端口,因为数据端口的启动是由客户端通知服务端开启的。
291288
controlSocket.write(TunnelMessageCodec.encode(TunnelMessageType.OPEN_DATA_CONN.code(),
292289
TunnelMessage.OpenDataConn.newBuilder().setSessionId(sessionId).build().toByteArray()));
@@ -323,14 +320,20 @@ protected void bindConnections(UserConnection userConn, NetSocket dataSocket, in
323320
sessionId,
324321
dataSocket.remoteAddress(), dataSocket.localAddress(), userSocket.remoteAddress(), userSocket.localAddress(), e);
325322
});
326-
// 将用户连接中的缓存数据发出。
327-
userConn.buffers.forEach(dataSocket::write);
328323
log.debug("{}: sessionId {}, data connection {} -- {} bound to user connection {} -- {} for session id {}",
329324
name,
330325
sessionId,
331326
dataSocket.remoteAddress(), dataSocket.localAddress(),
332327
userSocket.remoteAddress(), userSocket.localAddress(),
333328
sessionId);
329+
// 通过数据连接传输"用户连接与数据连接已进行双向数据传输绑定"
330+
dataSocket.write(Buffer.buffer()
331+
.appendBytes(DATA_CONN_FLAG)
332+
.appendInt(sessionId)).onSuccess(v -> {
333+
// 将用户连接中的缓存数据发出。
334+
userConn.buffers.forEach(dataSocket::write);
335+
});
336+
334337
}
335338

336339
public void start() {

0 commit comments

Comments
 (0)