Skip to content

Commit

Permalink
修正IOCP连接刚建立时TriggerReceived可能会先于TriggerConnected触发的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
winddriver committed Jun 29, 2017
1 parent 5feef2c commit 9256000
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 70 deletions.
99 changes: 57 additions & 42 deletions Net/Net.CrossSocket.IocpLoop.pas
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{******************************************************************************}
{******************************************************************************}
{ }
{ Delphi cross platform socket library }
{ }
Expand Down Expand Up @@ -37,7 +37,7 @@ TAddrBuffer = record
TPerIoBufUnion = record
case Integer of
0: (DataBuf: WSABUF);
// 这个Buffer只用于AcceptEx保存终端地址数据,大小为2倍地址结构
// 这个Buffer只用于AcceptEx保存终端地址数据,大小为2倍地址结构
1: (AcceptExBuffer: TAcceptExBuffer);
end;

Expand Down Expand Up @@ -190,10 +190,13 @@ procedure TIocpLoop.RequestAcceptComplete(ASocket: THandle;
Exit;
end;

if NewReadZero(APerIoData.Socket) then
TriggerConnected(APerIoData.Socket, CT_ACCEPT)
else
TSocketAPI.CloseSocket(APerIoData.Socket);
TriggerConnected(APerIoData.Socket, CT_ACCEPT);

if not NewReadZero(APerIoData.Socket) then
begin
if (TSocketAPI.CloseSocket(APerIoData.Socket) = 0) then
TriggerDisconnected(APerIoData.Socket);
end;
end;

procedure TIocpLoop.RequestConnectComplete(ASocket: THandle;
Expand All @@ -209,39 +212,51 @@ procedure TIocpLoop.RequestConnectComplete(ASocket: THandle;
APerIoData.Callback(ASocket, True);
end;

procedure _Failed;
procedure _Failed1;
begin
{$IFDEF DEBUG}
__RaiseLastOSError;
{$ENDIF}
TSocketAPI.CloseSocket(ASocket);

TSocketAPI.CloseSocket(ASocket);
TriggerConnectFailed(ASocket);

if Assigned(APerIoData.Callback) then
APerIoData.Callback(ASocket, False);
end;

procedure _Failed2;
begin
{$IFDEF DEBUG}
__RaiseLastOSError;
{$ENDIF}

if (TSocketAPI.CloseSocket(ASocket) = 0) then
TriggerDisconnected(ASocket);

if Assigned(APerIoData.Callback) then
APerIoData.Callback(ASocket, False);
end;
begin
if (TSocketAPI.GetError(ASocket) <> 0) then
begin
_Failed;
_Failed1;
Exit;
end;

// 不设置该参数, 会导致 getpeername 调用失败
// 不设置该参数, 会导致 getpeername 调用失败
LOptVal := 1;
if (TSocketAPI.SetSockOpt(ASocket, SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT, LOptVal, SizeOf(Integer)) < 0) then
begin
_Failed;
_Failed1;
Exit;
end;

if NewReadZero(ASocket) then
_Success
else
_Failed;
_Success;

if not NewReadZero(ASocket) then
_Failed2;
end;

procedure TIocpLoop.RequestReadZeroComplete(ASocket: THandle;
Expand All @@ -253,7 +268,7 @@ procedure TIocpLoop.RequestReadZeroComplete(ASocket: THandle;
begin
LRcvd := TSocketAPI.Recv(ASocket, FRecvBuf[0], RCV_BUF_SIZE);

// 对方主动断开连接
// 对方主动断开连接
if (LRcvd = 0) then
begin
if (TSocketAPI.CloseSocket(ASocket) = 0) then
Expand All @@ -263,7 +278,7 @@ procedure TIocpLoop.RequestReadZeroComplete(ASocket: THandle;

if (LRcvd < 0) then
begin
// 需要重试
// 需要重试
if _Again(GetLastError) then Break;

if (TSocketAPI.CloseSocket(ASocket) = 0) then
Expand Down Expand Up @@ -520,13 +535,13 @@ function TIocpLoop.Listen(const AHost: string; APort: Word;
Exit(-1);
end;

// 给每个IO线程投递一个AcceptEx
// 给每个IO线程投递一个AcceptEx
for I := 1 to GetIoThreads do
NewAccept(LSocket, LAddrInfo.ai_family, LAddrInfo.ai_socktype, LAddrInfo.ai_protocol);

_Success;

// 如果端口传入0,让所有地址统一用首个分配到的端口
// 如果端口传入0,让所有地址统一用首个分配到的端口
if (APort = 0) and (LAddrInfo.ai_next <> nil) then
LAddrInfo.ai_next.ai_addr.sin_port := LAddrInfo.ai_addr.sin_port;

Expand Down Expand Up @@ -554,22 +569,22 @@ function TIocpLoop.Send(ASocket: THandle; const ABuf; ALen: Integer;

LFlags := 0;
LBytes := 0;
// WSASend 不会出现部分发送的情况, 要么全部失败, 要么全部成功
// 所以不需要像 kqueue 或 epoll 中调用 send 那样调用完之后还得检查实际发送了多少
// 唯一需要注意的是: WSASend 会将待发送的数据锁定到非页面内存, 非页面内存资源
// 是非常紧张的, 所以不要无节制的调用 WSASend, 最好通过回调发送完一批数据再继
// 续发送下一批
// WSASend 不会出现部分发送的情况, 要么全部失败, 要么全部成功
// 所以不需要像 kqueue 或 epoll 中调用 send 那样调用完之后还得检查实际发送了多少
// 唯一需要注意的是: WSASend 会将待发送的数据锁定到非页面内存, 非页面内存资源
// 是非常紧张的, 所以不要无节制的调用 WSASend, 最好通过回调发送完一批数据再继
// 续发送下一批
if (WSASend(ASocket, @LPerIoData.Buffer.DataBuf, 1, LBytes, LFlags, PWSAOverlapped(LPerIoData), nil) < 0)
and (WSAGetLastError <> WSA_IO_PENDING) then
begin
if Assigned(ACallback) then
ACallback(ASocket, False);

// 出错多半是 WSAENOBUFS, 也就是投递的 WSASend 过多, 来不及发送
// 导致非页面内存资源全部被锁定, 要避免这种情况必须上层发送逻辑
// 保证不能无节制的调用Send发送大量数据, 最好发送完一个再继续下
// 一个, 本函数提供了发送结果的回调函数, 在回调函数报告发送成功
// 之后就可以继续下一块数据发送了
// 出错多半是 WSAENOBUFS, 也就是投递的 WSASend 过多, 来不及发送
// 导致非页面内存资源全部被锁定, 要避免这种情况必须上层发送逻辑
// 保证不能无节制的调用Send发送大量数据, 最好发送完一个再继续下
// 一个, 本函数提供了发送结果的回调函数, 在回调函数报告发送成功
// 之后就可以继续下一块数据发送了
FreeIoData(LPerIoData);
if (TSocketAPI.CloseSocket(ASocket) = 0) then
TriggerDisconnected(ASocket);
Expand All @@ -587,8 +602,8 @@ function TIocpLoop.ProcessIoEvent: Boolean;
begin
if not GetQueuedCompletionStatus(FIocpHandle, LBytes, ULONG_PTR(LSocket), POverlapped(LPerIoData), INFINITE) then
begin
// 出错了, 并且完成数据也都是空的,
// 这种情况即便重试, 应该也会继续出错, 最好立即终止IO线程
// 出错了, 并且完成数据也都是空的,
// 这种情况即便重试, 应该也会继续出错, 最好立即终止IO线程
if (LSocket = 0) or (LPerIoData = nil) then
begin
{$IFDEF DEBUG}
Expand All @@ -600,19 +615,19 @@ function TIocpLoop.ProcessIoEvent: Boolean;
try
case LPerIoData.Action of
ioAccept:
// WSA_OPERATION_ABORTED, 995, 由于线程退出或应用程序请求,已中止 I/O 操作。
// WSAENOTSOCK, 10038, 在一个非套接字上尝试了一个操作。
// 在主动关闭监听的socket时会出现该错误, 这时候只需要简单的关掉
// AcceptEx对应的客户端socket即可
// WSA_OPERATION_ABORTED, 995, 由于线程退出或应用程序请求,已中止 I/O 操作。
// WSAENOTSOCK, 10038, 在一个非套接字上尝试了一个操作。
// 在主动关闭监听的socket时会出现该错误, 这时候只需要简单的关掉
// AcceptEx对应的客户端socket即可
TSocketAPI.CloseSocket(LPerIoData.Socket);

ioConnect:
// ERROR_CONNECTION_REFUSED, 1225, 远程计算机拒绝网络连接。
// ERROR_CONNECTION_REFUSED, 1225, 远程计算机拒绝网络连接。
if (TSocketAPI.CloseSocket(LSocket) = 0) then
begin
TriggerConnectFailed(LSocket);
if Assigned(LPerIoData.Callback) then
TProc<THandle, Boolean>(LPerIoData.Callback)(LSocket, False);
LPerIoData.Callback(LSocket, False);
end;

ioReadZero:
Expand All @@ -622,7 +637,7 @@ function TIocpLoop.ProcessIoEvent: Boolean;
ioSend:
begin
if Assigned(LPerIoData.Callback) then
TProc<Boolean>(LPerIoData.Callback)(False);
LPerIoData.Callback(LSocket, False);

if (TSocketAPI.CloseSocket(LSocket) = 0) then
TriggerDisconnected(LSocket);
Expand All @@ -632,15 +647,15 @@ function TIocpLoop.ProcessIoEvent: Boolean;
FreeIoData(LPerIoData);
end;

// 出错了, 但是完成数据不是空的, 需要重试
// 出错了, 但是完成数据不是空的, 需要重试
Exit(True);
end;

// 主动调用了 StopLoop
// 主动调用了 StopLoop
if (LBytes = 0) and (ULONG_PTR(LPerIoData) = SHUTDOWN_FLAG) then Exit(False);

// 由于未知原因未获取到完成数据, 但是返回的错误代码又是正常
// 这种情况需要进行重试(返回True之后IO线程会再次调用ProcessIoEvent)
// 由于未知原因未获取到完成数据, 但是返回的错误代码又是正常
// 这种情况需要进行重试(返回True之后IO线程会再次调用ProcessIoEvent)
if (LSocket = 0) or (LPerIoData = nil) then Exit(True);

try
Expand Down
39 changes: 11 additions & 28 deletions Net/Net.CrossSocket.pas
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ TCrossConnection = class(TInterfacedObject, ICrossConnection)

property Owner: TCustomCrossSocket read GetOwner;
property Socket: THandle read GetSocket;
property LocalAddr: string read GetLocalAddr;
property LocalPort: Word read GetLocalPort;
property PeerAddr: string read GetPeerAddr;
property PeerPort: Word read GetPeerPort;
property ConnectType: TConnectType read GetConnectType;
Expand Down Expand Up @@ -390,7 +392,7 @@ TCrossSocket = class(TCustomCrossSocket)
/// 回调函数
/// </param>
/// <returns>
/// 返回值只能表明Connect调用是否成功
/// 返回值只能表明 connect 调用是否成功
/// <list type="bullet">
/// <item>
/// 0, 调用成功
Expand All @@ -399,7 +401,7 @@ TCrossSocket = class(TCustomCrossSocket)
/// 非0, 调用失败
/// </item>
/// </list>
/// 当OnConnected触发时才表明连接建立, 而OnConnectFailed触发则表明连接失败
/// 当回调被触发时才表明连接建立或连接失败
/// </returns>
function Connect(const AHost: string; APort: Word;
const ACallback: TProc<THandle, Boolean> = nil): Integer; override;
Expand Down Expand Up @@ -430,8 +432,11 @@ TCrossSocket = class(TCustomCrossSocket)
/// <param name="APort">
/// 端口
/// </param>
/// <param name="ACallback">
/// 回调函数
/// </param>
/// <returns>
/// 返回值只能表明bind是否调用成功
/// 返回值只能表明 bind 是否调用成功
/// <list type="bullet">
/// <item>
/// 0, 调用成功
Expand All @@ -440,7 +445,7 @@ TCrossSocket = class(TCustomCrossSocket)
/// 非0, 调用失败
/// </item>
/// </list>
/// 当OnListened触发时才表明监听成功
/// 当回调被触发时才表明监听成功或失败
/// </returns>
function Listen(const AHost: string; APort: Word;
const ACallback: TProc<THandle, Boolean> = nil): Integer; override;
Expand Down Expand Up @@ -488,7 +493,7 @@ TCrossSocket = class(TCustomCrossSocket)
implementation

uses
System.Math;
System.Math, Utils.Logger;

{ TCrossConnection }

Expand Down Expand Up @@ -525,7 +530,6 @@ procedure TCrossConnection.DirectSend(const ABuffer; ACount: Integer;
LBuffer: Pointer;
begin
LConnection := Self as ICrossConnection;
LBuffer := @ABuffer;

if (FSocket = INVALID_HANDLE_VALUE) then
begin
Expand All @@ -534,6 +538,7 @@ procedure TCrossConnection.DirectSend(const ABuffer; ACount: Integer;
Exit;
end;

LBuffer := @ABuffer;
FOwner.Send(FSocket, ABuffer, ACount,
procedure(ASocket: THandle; ASuccess: Boolean)
begin
Expand Down Expand Up @@ -884,28 +889,6 @@ procedure TCustomCrossSocket.TriggerConnected(ASocket: THandle;
FConnectionsLocker.EndWrite;
end;

// FConnectionsLocker.BeginWrite;
// try
// if Assigned(FConnections) then
// begin
// LConnection := GetConnectionClass.Create;
// (LConnection as TCrossConnection).FOwner := Self;
// (LConnection as TCrossConnection).FSocket := ASocket;
// (LConnection as TCrossConnection).FConnectType := AConnectType;
// FillChar(LAddr, SizeOf(TRawSockAddrIn), 0);
// LAddr.AddrLen := SizeOf(LAddr.Addr6);
// if (TSocketAPI.GetPeerName(ASocket, @LAddr.Addr, LAddr.AddrLen) = 0) then
// TSocketAPI.ExtractAddrInfo(@LAddr.Addr, LAddr.AddrLen,
// (LConnection as TCrossConnection).FPeerAddr, (LConnection as TCrossConnection).FPeerPort);
// (LConnection as TCrossConnection).Initialize;
//
// FConnections.AddOrSetValue(ASocket, LConnection);
// end else
// LConnection := nil;
// finally
// FConnectionsLocker.EndWrite;
// end;

if (LConnection <> nil) then
begin
AtomicIncrement(FConnectionsCount);
Expand Down

0 comments on commit 9256000

Please sign in to comment.