1414
1515package org .hyperledger .fabric .sdk ;
1616
17+ import java .io .IOException ;
18+ import java .io .ObjectInputStream ;
1719import java .io .Serializable ;
1820import java .util .ArrayList ;
1921import java .util .Properties ;
5254
5355public class EventHub implements Serializable {
5456 private static final long serialVersionUID = 2882609588201108148L ;
55- private static final Log logger = LogFactory .getLog (EventHub .class );
5657 private static final Config config = Config .getConfig ();
58+ private transient String id = config .getNextID ();
59+ private static final Log logger = LogFactory .getLog (EventHub .class );
60+ private static final boolean IS_TRACE_LEVEL = logger .isTraceEnabled ();
61+
5762 private static final long EVENTHUB_CONNECTION_WAIT_TIME = config .getEventHubConnectionWaitTime ();
5863 private static final long EVENTHUB_RECONNECTION_WARNING_RATE = config .getEventHubReconnectionWarningRate ();
5964
@@ -78,6 +83,7 @@ public class EventHub implements Serializable {
7883 private transient long reconnectCount ;
7984 private transient long lastBlockNumber ;
8085 private transient BlockEvent lastBlockEvent ;
86+ private String channelName ;
8187
8288 /**
8389 * Get disconnected time.
@@ -99,6 +105,22 @@ public boolean isConnected() {
99105 return connected ;
100106 }
101107
108+ String getStatus () {
109+
110+ StringBuilder sb = new StringBuilder (1000 );
111+ sb .append (toString ()).append (", connected: " ).append (connected );
112+ ManagedChannel lmanagedChannel = managedChannel ;
113+ if (lmanagedChannel == null ) {
114+ sb .append ("managedChannel: null" );
115+ } else {
116+ sb .append (", isShutdown: " ).append (lmanagedChannel .isShutdown ());
117+ sb .append (", isTerminated: " ).append (lmanagedChannel .isTerminated ());
118+ sb .append (", state: " ).append ("" + lmanagedChannel .getState (false ));
119+ }
120+
121+ return sb .toString ();
122+ }
123+
102124 /**
103125 * Get last connect time.
104126 *
@@ -136,6 +158,7 @@ public long getLastConnectedAttempt() {
136158 this .name = name ;
137159 this .executorService = executorService ;
138160 this .properties = properties == null ? null : (Properties ) properties .clone (); //keep our own copy.
161+ logger .debug ("Created " + toString ());
139162 }
140163
141164 /**
@@ -186,7 +209,7 @@ synchronized boolean connect(final TransactionContext transactionContext, final
186209
187210 final CountDownLatch finishLatch = new CountDownLatch (1 );
188211
189- logger .debug (format ("EventHub %s is connecting." , name ));
212+ logger .debug (format ("%s is connecting." , toString () ));
190213
191214 lastConnectedAttempt = System .currentTimeMillis ();
192215
@@ -203,12 +226,14 @@ synchronized boolean connect(final TransactionContext transactionContext, final
203226 @ Override
204227 public void onNext (PeerEvents .Event event ) {
205228
206- logger .debug (format ("EventHub %s got event type: %s" , EventHub .this .name , event .getEventCase ().name ()));
229+ logger .debug (format ("%s got event type: %s" , EventHub .this .toString () , event .getEventCase ().name ()));
207230
208231 if (event .getEventCase () == PeerEvents .Event .EventCase .BLOCK ) {
209232 try {
210233
211234 BlockEvent blockEvent = new BlockEvent (EventHub .this , event );
235+
236+ logger .trace (format ("%s got block number: %d" , EventHub .this .toString (), blockEvent .getBlockNumber ()));
212237 setLastBlockSeen (blockEvent );
213238
214239 eventQue .addBEvent (blockEvent ); //add to channel queue
@@ -220,14 +245,17 @@ public void onNext(PeerEvents.Event event) {
220245 } else if (event .getEventCase () == PeerEvents .Event .EventCase .REGISTER ) {
221246
222247 if (reconnectCount > 1 ) {
223- logger .info (format ("Eventhub %s has reconnecting after %d attempts" , name , reconnectCount ));
248+ logger .info (format ("%s has reconnecting after %d attempts" , EventHub . this . toString () , reconnectCount ));
224249 }
225250
226251 connected = true ;
227252 connectedTime = System .currentTimeMillis ();
228253 reconnectCount = 0L ;
229254
230255 finishLatch .countDown ();
256+ } else {
257+ logger .error (format ("%s got a unexpected block type: %s" ,
258+ EventHub .this .toString (), event .getEventCase ().name ()));
231259 }
232260 }
233261
@@ -271,7 +299,7 @@ public void onError(Throwable t) {
271299 try {
272300 reconnect ();
273301 } catch (Exception e ) {
274- logger .warn (format ("Eventhub %s Failed shutdown msg: %s" , EventHub .this .name , e .getMessage ()));
302+ logger .warn (format ("%s Failed shutdown msg: %s" , EventHub .this .toString () , e .getMessage ()));
275303 }
276304
277305 }
@@ -300,17 +328,17 @@ public void onCompleted() {
300328
301329 if (!reconnection && !finishLatch .await (EVENTHUB_CONNECTION_WAIT_TIME , TimeUnit .MILLISECONDS )) {
302330
303- logger .warn (format ("EventHub %s failed to connect in %s ms." , name , EVENTHUB_CONNECTION_WAIT_TIME ));
331+ logger .warn (format ("%s failed to connect in %s ms." , toString () , EVENTHUB_CONNECTION_WAIT_TIME ));
304332
305333 } else {
306- logger .trace (format ("Eventhub %s Done waiting for reply!" , name ));
334+ logger .trace (format ("%s done waiting for reply!" , toString () ));
307335 }
308336
309337 } catch (InterruptedException e ) {
310338 logger .error (e );
311339 }
312340
313- logger .debug (format ("Eventhub %s connect is done with connect status: %b " , name , connected ));
341+ logger .debug (format ("%s connect is done with connect status: %b " , toString () , connected ));
314342
315343 if (connected ) {
316344 eventStream = eventStreamLocal ;
@@ -383,16 +411,19 @@ void setEventQue(Channel.ChannelEventQue eventQue) {
383411
384412 @ Override
385413 public String toString () {
386- return "EventHub: " + getName ();
414+ return "EventHub{" + "id: " + id + ", name: " + getName () + ", channelName: " + channelName + ", url: " + getUrl () + "}" ;
387415 }
388416
389- public void shutdown () {
417+ public synchronized void shutdown () {
418+ if (shutdown ) {
419+ return ;
420+ }
421+ logger .trace (toString () + " being shutdown." );
390422 shutdown = true ;
391423 lastBlockEvent = null ;
392424 lastBlockNumber = 0 ;
393425 connected = false ;
394426 disconnectedHandler = null ;
395- channel = null ;
396427 eventStream = null ;
397428 final ManagedChannel lmanagedChannel = managedChannel ;
398429 managedChannel = null ;
@@ -406,20 +437,23 @@ void setChannel(Channel channel) throws InvalidArgumentException {
406437 throw new InvalidArgumentException ("setChannel Channel can not be null" );
407438 }
408439
409- if (null != this . channel ) {
440+ if (null != channelName ) {
410441 throw new InvalidArgumentException (format ("Can not add event hub %s to channel %s because it already belongs to channel %s." ,
411- name , channel .getName (), this . channel . getName () ));
442+ name , channel .getName (), channelName ));
412443 }
413-
414- this .channel = channel ;
444+ logger . debug ( toString () + " set to channel: " + channel );
445+ this .channelName = channel . getName () ;
415446 }
416447
417- synchronized void setLastBlockSeen (BlockEvent lastBlockSeen ) {
448+ private synchronized void setLastBlockSeen (BlockEvent lastBlockSeen ) {
418449 long newLastBlockNumber = lastBlockSeen .getBlockNumber ();
419450 // overkill but make sure.
420451 if (lastBlockNumber < newLastBlockNumber ) {
421452 lastBlockNumber = newLastBlockNumber ;
422453 this .lastBlockEvent = lastBlockSeen ;
454+ if (IS_TRACE_LEVEL ) {
455+ logger .trace (toString () + " last block seen: " + lastBlockNumber );
456+ }
423457 }
424458 }
425459
@@ -446,7 +480,7 @@ public interface EventHubDisconnected {
446480 @ Override
447481 public synchronized void disconnected (final EventHub eventHub ) {
448482 if (reconnectCount == 1 ) {
449- logger .warn (format ("Channel %s detected disconnect on event hub %s (%s) " , channel . getName (), eventHub .toString (), url ));
483+ logger .warn (format ("%s detected disconnect. " , eventHub .toString ()));
450484 }
451485
452486 executorService .execute (() -> {
@@ -455,7 +489,7 @@ public synchronized void disconnected(final EventHub eventHub) {
455489 Thread .sleep (500 );
456490
457491 if (transactionContext == null ) {
458- logger .warn ("Eventhub reconnect failed with no user context" );
492+ logger .warn (EventHub . this . toString () + " reconnect failed with no user context" );
459493 return ;
460494 }
461495
@@ -485,4 +519,16 @@ public EventHubDisconnected setEventHubDisconnectedHandler(EventHubDisconnected
485519 return ret ;
486520 }
487521
522+ private void readObject (ObjectInputStream in ) throws IOException , ClassNotFoundException {
523+ in .defaultReadObject ();
524+ id = config .getNextID ();
525+ }
526+
527+ public void finalize () throws Throwable {
528+ logger .trace (format ("%s finalized" , toString ()));
529+ shutdown ();
530+ super .finalize ();
531+
532+ }
533+
488534}
0 commit comments