@@ -23,7 +23,6 @@ public class SocketClient {
2323 private ExecutorService executorService = Executors .newCachedThreadPool ();
2424 private Timer timer = new Timer ();
2525
26- private ReplyPollThread pollThread ;
2726 /** ReplyPollThread 是否堵塞 */
2827 private boolean blocked ;
2928
@@ -107,10 +106,41 @@ private synchronized void connectTransport(InetSocketAddress socketAddress, Opti
107106 public void run () {
108107 try {
109108 socket .sendUrgentData (0xff );
110- if (!connected && pollThread == null ) {
109+ if (!connected ) {
111110 // 开启服务器消息回复线程处理
112- pollThread = new ReplyPollThread ();
113- pollThread .start ();
111+ executorService .execute (new Runnable () {
112+ private BufferedInputStream bis ;
113+
114+ @ Override
115+ public void run () {
116+ while (isConnected ()) {
117+ try {
118+ if (bis == null ) {
119+ bis = new BufferedInputStream (socket .getInputStream ());
120+ }
121+
122+ blocked = true ;
123+
124+ // ”一次性“从输入流中读完
125+ int available = bis .available ();
126+ if (available != 0 ) {
127+ byte [] buffer = new byte [available ];
128+ int bytesRead = bis .read (buffer );
129+ // 不能使用 != -1 来判断是否读到完,因为 socket 的输入流只有 socket 断开时才会返回 -1
130+ if (bytesRead > 0 ) {
131+ transportData (buffer );
132+ }
133+ }
134+
135+ blocked = false ;
136+ } catch (IOException e ) {
137+ transportError (e );
138+ return ;
139+ }
140+ }
141+ transportDisconnected ();
142+ }
143+ });
114144 }
115145 connected = true ;
116146 } catch (IOException e ) {
@@ -126,22 +156,17 @@ private void transportData(byte[] respData) {
126156 }
127157
128158 private void transportDisconnected () {
129- // 角标更新为 false
130- connected = false ;
131-
132159 // 关闭任务执行器
133160 executorService .shutdownNow ();
134161
135- // 通知回调
136- onCall .onDisconnect ();
137-
138162 // 取消计时器
139163 timer .cancel ();
140164
141- // 终止轮询线程
142- if (pollThread != null ) {
143- pollThread .interrupt ();
144- }
165+ // 角标更新为 false
166+ connected = false ;
167+
168+ // 通知回调
169+ onCall .onDisconnect ();
145170
146171 // 断开 socket
147172 if (socket != null ) {
@@ -159,43 +184,4 @@ private void transportError(Exception error) {
159184 transportDisconnected ();
160185 }
161186
162- private class ReplyPollThread extends Thread {
163- private BufferedInputStream bis ;
164-
165- public ReplyPollThread () {
166- super ("Reply Poll Thread." );
167- }
168-
169- @ Override
170- public void run () {
171- super .run ();
172- while (isConnected ()) {
173- try {
174- if (bis == null ) {
175- bis = new BufferedInputStream (socket .getInputStream ());
176- }
177-
178- blocked = true ;
179-
180- // ”一次性“从输入流中读完
181- int available = bis .available ();
182- if (available != 0 ) {
183- byte [] buffer = new byte [available ];
184- int bytesRead = bis .read (buffer );
185- // 不能使用 != -1 来判断是否读到完,因为 socket 的输入流只有 socket 断开时才会返回 -1
186- if (bytesRead > 0 ) {
187- transportData (buffer );
188- }
189- }
190-
191- blocked = false ;
192- } catch (IOException e ) {
193- transportError (e );
194- return ;
195- }
196- }
197- transportDisconnected ();
198- }
199- }
200-
201187}
0 commit comments