From 92560008bb498ab1b4d7a3ebe18a820cdb089b5d Mon Sep 17 00:00:00 2001 From: WiNDDRiVER Date: Thu, 29 Jun 2017 10:12:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3IOCP=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E5=88=9A=E5=BB=BA=E7=AB=8B=E6=97=B6TriggerReceived=E5=8F=AF?= =?UTF-8?q?=E8=83=BD=E4=BC=9A=E5=85=88=E4=BA=8ETriggerConnected=E8=A7=A6?= =?UTF-8?q?=E5=8F=91=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Net/Net.CrossSocket.IocpLoop.pas | 99 ++++++++++++++++++-------------- Net/Net.CrossSocket.pas | 39 ++++--------- 2 files changed, 68 insertions(+), 70 deletions(-) diff --git a/Net/Net.CrossSocket.IocpLoop.pas b/Net/Net.CrossSocket.IocpLoop.pas index 16001f4..8d7827d 100644 --- a/Net/Net.CrossSocket.IocpLoop.pas +++ b/Net/Net.CrossSocket.IocpLoop.pas @@ -1,4 +1,4 @@ -{******************************************************************************} +锘縶******************************************************************************} { } { Delphi cross platform socket library } { } @@ -37,7 +37,7 @@ TAddrBuffer = record TPerIoBufUnion = record case Integer of 0: (DataBuf: WSABUF); - // 这个Buffer只用于AcceptEx保存终端地址数据,大小为2倍地址结构 + // 杩欎釜Buffer鍙敤浜嶢cceptEx淇濆瓨缁堢鍦板潃鏁版嵁锛屽ぇ灏忎负2鍊嶅湴鍧缁撴瀯 1: (AcceptExBuffer: TAcceptExBuffer); end; @@ -190,10 +190,13 @@ procedure TIocpLoop.RequestAcceptComplete(ASocket: THandle; Exit; end; - if NewReadZero(APerIoData.Socket) then - TriggerConnected(APerIoData.Socket, CT_ACCEPT) - else - TSocketAPI.CloseSocket(APerIoData.Socket); + TriggerConnected(APerIoData.Socket, CT_ACCEPT); + + if not NewReadZero(APerIoData.Socket) then + begin + if (TSocketAPI.CloseSocket(APerIoData.Socket) = 0) then + TriggerDisconnected(APerIoData.Socket); + end; end; procedure TIocpLoop.RequestConnectComplete(ASocket: THandle; @@ -209,39 +212,51 @@ procedure TIocpLoop.RequestConnectComplete(ASocket: THandle; APerIoData.Callback(ASocket, True); end; - procedure _Failed; + procedure _Failed1; begin {$IFDEF DEBUG} __RaiseLastOSError; {$ENDIF} - TSocketAPI.CloseSocket(ASocket); + TSocketAPI.CloseSocket(ASocket); TriggerConnectFailed(ASocket); if Assigned(APerIoData.Callback) then APerIoData.Callback(ASocket, False); end; + procedure _Failed2; + begin + {$IFDEF DEBUG} + __RaiseLastOSError; + {$ENDIF} + + if (TSocketAPI.CloseSocket(ASocket) = 0) then + TriggerDisconnected(ASocket); + + if Assigned(APerIoData.Callback) then + APerIoData.Callback(ASocket, False); + end; begin if (TSocketAPI.GetError(ASocket) <> 0) then begin - _Failed; + _Failed1; Exit; end; - // 不设置该参数, 会导致 getpeername 调用失败 + // 涓嶈缃鍙傛暟, 浼氬鑷 getpeername 璋冪敤澶辫触 LOptVal := 1; if (TSocketAPI.SetSockOpt(ASocket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, LOptVal, SizeOf(Integer)) < 0) then begin - _Failed; + _Failed1; Exit; end; - if NewReadZero(ASocket) then - _Success - else - _Failed; + _Success; + + if not NewReadZero(ASocket) then + _Failed2; end; procedure TIocpLoop.RequestReadZeroComplete(ASocket: THandle; @@ -253,7 +268,7 @@ procedure TIocpLoop.RequestReadZeroComplete(ASocket: THandle; begin LRcvd := TSocketAPI.Recv(ASocket, FRecvBuf[0], RCV_BUF_SIZE); - // 对方主动断开连接 + // 瀵规柟涓诲姩鏂紑杩炴帴 if (LRcvd = 0) then begin if (TSocketAPI.CloseSocket(ASocket) = 0) then @@ -263,7 +278,7 @@ procedure TIocpLoop.RequestReadZeroComplete(ASocket: THandle; if (LRcvd < 0) then begin - // 需要重试 + // 闇瑕侀噸璇 if _Again(GetLastError) then Break; if (TSocketAPI.CloseSocket(ASocket) = 0) then @@ -520,13 +535,13 @@ function TIocpLoop.Listen(const AHost: string; APort: Word; Exit(-1); end; - // 给每个IO线程投递一个AcceptEx + // 缁欐瘡涓狪O绾跨▼鎶曢掍竴涓狝cceptEx for I := 1 to GetIoThreads do NewAccept(LSocket, LAddrInfo.ai_family, LAddrInfo.ai_socktype, LAddrInfo.ai_protocol); _Success; - // 如果端口传入0,让所有地址统一用首个分配到的端口 + // 濡傛灉绔彛浼犲叆0锛岃鎵鏈夊湴鍧缁熶竴鐢ㄩ涓垎閰嶅埌鐨勭鍙 if (APort = 0) and (LAddrInfo.ai_next <> nil) then LAddrInfo.ai_next.ai_addr.sin_port := LAddrInfo.ai_addr.sin_port; @@ -554,22 +569,22 @@ function TIocpLoop.Send(ASocket: THandle; const ABuf; ALen: Integer; LFlags := 0; LBytes := 0; - // WSASend 不会出现部分发送的情况, 要么全部失败, 要么全部成功 - // 所以不需要像 kqueue 或 epoll 中调用 send 那样调用完之后还得检查实际发送了多少 - // 唯一需要注意的是: WSASend 会将待发送的数据锁定到非页面内存, 非页面内存资源 - // 是非常紧张的, 所以不要无节制的调用 WSASend, 最好通过回调发送完一批数据再继 - // 续发送下一批 + // WSASend 涓嶄細鍑虹幇閮ㄥ垎鍙戦佺殑鎯呭喌, 瑕佷箞鍏ㄩ儴澶辫触, 瑕佷箞鍏ㄩ儴鎴愬姛 + // 鎵浠ヤ笉闇瑕佸儚 kqueue 鎴 epoll 涓皟鐢 send 閭f牱璋冪敤瀹屼箣鍚庤繕寰楁鏌ュ疄闄呭彂閫佷簡澶氬皯 + // 鍞竴闇瑕佹敞鎰忕殑鏄: WSASend 浼氬皢寰呭彂閫佺殑鏁版嵁閿佸畾鍒伴潪椤甸潰鍐呭瓨, 闈為〉闈㈠唴瀛樿祫婧 + // 鏄潪甯哥揣寮犵殑, 鎵浠ヤ笉瑕佹棤鑺傚埗鐨勮皟鐢 WSASend, 鏈濂介氳繃鍥炶皟鍙戦佸畬涓鎵规暟鎹啀缁 + // 缁彂閫佷笅涓鎵 if (WSASend(ASocket, @LPerIoData.Buffer.DataBuf, 1, LBytes, LFlags, PWSAOverlapped(LPerIoData), nil) < 0) and (WSAGetLastError <> WSA_IO_PENDING) then begin if Assigned(ACallback) then ACallback(ASocket, False); - // 出错多半是 WSAENOBUFS, 也就是投递的 WSASend 过多, 来不及发送 - // 导致非页面内存资源全部被锁定, 要避免这种情况必须上层发送逻辑 - // 保证不能无节制的调用Send发送大量数据, 最好发送完一个再继续下 - // 一个, 本函数提供了发送结果的回调函数, 在回调函数报告发送成功 - // 之后就可以继续下一块数据发送了 + // 鍑洪敊澶氬崐鏄 WSAENOBUFS, 涔熷氨鏄姇閫掔殑 WSASend 杩囧, 鏉ヤ笉鍙婂彂閫 + // 瀵艰嚧闈為〉闈㈠唴瀛樿祫婧愬叏閮ㄨ閿佸畾, 瑕侀伩鍏嶈繖绉嶆儏鍐靛繀椤讳笂灞傚彂閫侀昏緫 + // 淇濊瘉涓嶈兘鏃犺妭鍒剁殑璋冪敤Send鍙戦佸ぇ閲忔暟鎹, 鏈濂藉彂閫佸畬涓涓啀缁х画涓 + // 涓涓, 鏈嚱鏁版彁渚涗簡鍙戦佺粨鏋滅殑鍥炶皟鍑芥暟, 鍦ㄥ洖璋冨嚱鏁版姤鍛婂彂閫佹垚鍔 + // 涔嬪悗灏卞彲浠ョ户缁笅涓鍧楁暟鎹彂閫佷簡 FreeIoData(LPerIoData); if (TSocketAPI.CloseSocket(ASocket) = 0) then TriggerDisconnected(ASocket); @@ -587,8 +602,8 @@ function TIocpLoop.ProcessIoEvent: Boolean; begin if not GetQueuedCompletionStatus(FIocpHandle, LBytes, ULONG_PTR(LSocket), POverlapped(LPerIoData), INFINITE) then begin - // 出错了, 并且完成数据也都是空的, - // 这种情况即便重试, 应该也会继续出错, 最好立即终止IO线程 + // 鍑洪敊浜, 骞朵笖瀹屾垚鏁版嵁涔熼兘鏄┖鐨, + // 杩欑鎯呭喌鍗充究閲嶈瘯, 搴旇涔熶細缁х画鍑洪敊, 鏈濂界珛鍗崇粓姝O绾跨▼ if (LSocket = 0) or (LPerIoData = nil) then begin {$IFDEF DEBUG} @@ -600,19 +615,19 @@ function TIocpLoop.ProcessIoEvent: Boolean; try case LPerIoData.Action of ioAccept: - // WSA_OPERATION_ABORTED, 995, 由于线程退出或应用程序请求,已中止 I/O 操作。 - // WSAENOTSOCK, 10038, 在一个非套接字上尝试了一个操作。 - // 在主动关闭监听的socket时会出现该错误, 这时候只需要简单的关掉 - // AcceptEx对应的客户端socket即可 + // WSA_OPERATION_ABORTED, 995, 鐢变簬绾跨▼閫鍑烘垨搴旂敤绋嬪簭璇锋眰锛屽凡涓 I/O 鎿嶄綔銆 + // WSAENOTSOCK, 10038, 鍦ㄤ竴涓潪濂楁帴瀛椾笂灏濊瘯浜嗕竴涓搷浣溿 + // 鍦ㄤ富鍔ㄥ叧闂洃鍚殑socket鏃朵細鍑虹幇璇ラ敊璇, 杩欐椂鍊欏彧闇瑕佺畝鍗曠殑鍏虫帀 + // AcceptEx瀵瑰簲鐨勫鎴风socket鍗冲彲 TSocketAPI.CloseSocket(LPerIoData.Socket); ioConnect: - // ERROR_CONNECTION_REFUSED, 1225, 远程计算机拒绝网络连接。 + // ERROR_CONNECTION_REFUSED, 1225, 杩滅▼璁$畻鏈烘嫆缁濈綉缁滆繛鎺ャ if (TSocketAPI.CloseSocket(LSocket) = 0) then begin TriggerConnectFailed(LSocket); if Assigned(LPerIoData.Callback) then - TProc(LPerIoData.Callback)(LSocket, False); + LPerIoData.Callback(LSocket, False); end; ioReadZero: @@ -622,7 +637,7 @@ function TIocpLoop.ProcessIoEvent: Boolean; ioSend: begin if Assigned(LPerIoData.Callback) then - TProc(LPerIoData.Callback)(False); + LPerIoData.Callback(LSocket, False); if (TSocketAPI.CloseSocket(LSocket) = 0) then TriggerDisconnected(LSocket); @@ -632,15 +647,15 @@ function TIocpLoop.ProcessIoEvent: Boolean; FreeIoData(LPerIoData); end; - // 出错了, 但是完成数据不是空的, 需要重试 + // 鍑洪敊浜, 浣嗘槸瀹屾垚鏁版嵁涓嶆槸绌虹殑, 闇瑕侀噸璇 Exit(True); end; - // 主动调用了 StopLoop + // 涓诲姩璋冪敤浜 StopLoop if (LBytes = 0) and (ULONG_PTR(LPerIoData) = SHUTDOWN_FLAG) then Exit(False); - // 由于未知原因未获取到完成数据, 但是返回的错误代码又是正常 - // 这种情况需要进行重试(返回True之后IO线程会再次调用ProcessIoEvent) + // 鐢变簬鏈煡鍘熷洜鏈幏鍙栧埌瀹屾垚鏁版嵁, 浣嗘槸杩斿洖鐨勯敊璇唬鐮佸張鏄甯 + // 杩欑鎯呭喌闇瑕佽繘琛岄噸璇(杩斿洖True涔嬪悗IO绾跨▼浼氬啀娆¤皟鐢≒rocessIoEvent) if (LSocket = 0) or (LPerIoData = nil) then Exit(True); try diff --git a/Net/Net.CrossSocket.pas b/Net/Net.CrossSocket.pas index 23f6659..1d7a1e6 100644 --- a/Net/Net.CrossSocket.pas +++ b/Net/Net.CrossSocket.pas @@ -281,6 +281,8 @@ TCrossConnection = class(TInterfacedObject, ICrossConnection) property Owner: TCustomCrossSocket read GetOwner; property Socket: THandle read GetSocket; + property LocalAddr: string read GetLocalAddr; + property LocalPort: Word read GetLocalPort; property PeerAddr: string read GetPeerAddr; property PeerPort: Word read GetPeerPort; property ConnectType: TConnectType read GetConnectType; @@ -390,7 +392,7 @@ TCrossSocket = class(TCustomCrossSocket) /// 鍥炶皟鍑芥暟 /// /// - /// 杩斿洖鍊煎彧鑳借〃鏄嶤onnect璋冪敤鏄惁鎴愬姛 + /// 杩斿洖鍊煎彧鑳借〃鏄 connect 璋冪敤鏄惁鎴愬姛 /// /// /// 0, 璋冪敤鎴愬姛 @@ -399,7 +401,7 @@ TCrossSocket = class(TCustomCrossSocket) /// 闈0, 璋冪敤澶辫触 /// /// - /// 褰揙nConnected瑙﹀彂鏃舵墠琛ㄦ槑杩炴帴寤虹珛, 鑰孫nConnectFailed瑙﹀彂鍒欒〃鏄庤繛鎺ュけ璐 + /// 褰撳洖璋冭瑙﹀彂鏃舵墠琛ㄦ槑杩炴帴寤虹珛鎴栬繛鎺ュけ璐 /// function Connect(const AHost: string; APort: Word; const ACallback: TProc = nil): Integer; override; @@ -430,8 +432,11 @@ TCrossSocket = class(TCustomCrossSocket) /// /// 绔彛 /// + /// + /// 鍥炶皟鍑芥暟 + /// /// - /// 杩斿洖鍊煎彧鑳借〃鏄巄ind鏄惁璋冪敤鎴愬姛 + /// 杩斿洖鍊煎彧鑳借〃鏄 bind 鏄惁璋冪敤鎴愬姛 /// /// /// 0, 璋冪敤鎴愬姛 @@ -440,7 +445,7 @@ TCrossSocket = class(TCustomCrossSocket) /// 闈0, 璋冪敤澶辫触 /// /// - /// 褰揙nListened瑙﹀彂鏃舵墠琛ㄦ槑鐩戝惉鎴愬姛 + /// 褰撳洖璋冭瑙﹀彂鏃舵墠琛ㄦ槑鐩戝惉鎴愬姛鎴栧け璐 /// function Listen(const AHost: string; APort: Word; const ACallback: TProc = nil): Integer; override; @@ -488,7 +493,7 @@ TCrossSocket = class(TCustomCrossSocket) implementation uses - System.Math; + System.Math, Utils.Logger; { TCrossConnection } @@ -525,7 +530,6 @@ procedure TCrossConnection.DirectSend(const ABuffer; ACount: Integer; LBuffer: Pointer; begin LConnection := Self as ICrossConnection; - LBuffer := @ABuffer; if (FSocket = INVALID_HANDLE_VALUE) then begin @@ -534,6 +538,7 @@ procedure TCrossConnection.DirectSend(const ABuffer; ACount: Integer; Exit; end; + LBuffer := @ABuffer; FOwner.Send(FSocket, ABuffer, ACount, procedure(ASocket: THandle; ASuccess: Boolean) begin @@ -884,28 +889,6 @@ procedure TCustomCrossSocket.TriggerConnected(ASocket: THandle; FConnectionsLocker.EndWrite; end; -// FConnectionsLocker.BeginWrite; -// try -// if Assigned(FConnections) then -// begin -// LConnection := GetConnectionClass.Create; -// (LConnection as TCrossConnection).FOwner := Self; -// (LConnection as TCrossConnection).FSocket := ASocket; -// (LConnection as TCrossConnection).FConnectType := AConnectType; -// FillChar(LAddr, SizeOf(TRawSockAddrIn), 0); -// LAddr.AddrLen := SizeOf(LAddr.Addr6); -// if (TSocketAPI.GetPeerName(ASocket, @LAddr.Addr, LAddr.AddrLen) = 0) then -// TSocketAPI.ExtractAddrInfo(@LAddr.Addr, LAddr.AddrLen, -// (LConnection as TCrossConnection).FPeerAddr, (LConnection as TCrossConnection).FPeerPort); -// (LConnection as TCrossConnection).Initialize; -// -// FConnections.AddOrSetValue(ASocket, LConnection); -// end else -// LConnection := nil; -// finally -// FConnectionsLocker.EndWrite; -// end; - if (LConnection <> nil) then begin AtomicIncrement(FConnectionsCount);