Skip to content
This repository was archived by the owner on Apr 22, 2025. It is now read-only.

Commit e1ca125

Browse files
committed
FABJ-428 Provide queued block event listener.
Change-Id: Id9af1d8938a595d43f42ad8361ec70c61bd697d8 Signed-off-by: rickr <cr22rc@gmail.com>
1 parent 1865ed8 commit e1ca125

File tree

6 files changed

+328
-15
lines changed

6 files changed

+328
-15
lines changed

src/main/java/org/hyperledger/fabric/sdk/Channel.java

Lines changed: 140 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4969,7 +4969,77 @@ public String registerBlockListener(BlockListener listener) throws InvalidArgume
49694969
throw new InvalidArgumentException(format("Channel %s has been shutdown.", name));
49704970
}
49714971

4972-
return new BL(listener).getHandle();
4972+
if (null == listener) {
4973+
throw new InvalidArgumentException("Listener parameter is null.");
4974+
}
4975+
4976+
String handle = new BL(listener).getHandle();
4977+
4978+
logger.trace(format("Register event BlockEvent listener %s", handle));
4979+
4980+
return handle;
4981+
4982+
}
4983+
4984+
/**
4985+
* Register a Queued block listener. This queue should never block insertion of events.
4986+
*
4987+
* @param blockEventQueue the queue
4988+
* @return return a handle to ungregister the handler.
4989+
* @throws InvalidArgumentException
4990+
*/
4991+
4992+
public String registerBlockListener(BlockingQueue<QueuedBlockEvent> blockEventQueue) throws InvalidArgumentException {
4993+
4994+
if (shutdown) {
4995+
throw new InvalidArgumentException(format("Channel %s has been shutdown.", name));
4996+
}
4997+
4998+
if (null == blockEventQueue) {
4999+
throw new InvalidArgumentException("BlockEventQueue parameter is null.");
5000+
}
5001+
5002+
String handle = new BL(blockEventQueue, -1L, null).getHandle();
5003+
5004+
logger.trace(format("Register QueuedBlockEvent listener %s", handle));
5005+
5006+
return handle;
5007+
5008+
}
5009+
5010+
/**
5011+
* Register a Queued block listener. This queue should never block insertion of events.
5012+
*
5013+
* @param blockEventQueue the queue
5014+
* @param timeout The time that is waited on for event to be waited on the queue
5015+
* @param timeUnit the time unit for timeout.
5016+
* @return return a handle to ungregister the handler.
5017+
* @throws InvalidArgumentException
5018+
*/
5019+
5020+
public String registerBlockListener(BlockingQueue<QueuedBlockEvent> blockEventQueue, long timeout, TimeUnit timeUnit) throws InvalidArgumentException {
5021+
5022+
if (shutdown) {
5023+
throw new InvalidArgumentException(format("Channel %s has been shutdown.", name));
5024+
}
5025+
5026+
if (null == blockEventQueue) {
5027+
throw new InvalidArgumentException("BlockEventQueue parameter is null.");
5028+
}
5029+
5030+
if (timeout < 0L) {
5031+
throw new InvalidArgumentException(format("Timeout parameter must be greater than 0 not %d", timeout));
5032+
}
5033+
5034+
if (null == timeUnit) {
5035+
throw new InvalidArgumentException("TimeUnit parameter must not be null.");
5036+
}
5037+
5038+
String handle = new BL(blockEventQueue, timeout, timeUnit).getHandle();
5039+
5040+
logger.trace(format("Register QueuedBlockEvent listener %s", handle));
5041+
5042+
return handle;
49735043

49745044
}
49755045

@@ -4987,11 +5057,41 @@ public boolean unregisterBlockListener(String handle) throws InvalidArgumentExce
49875057
}
49885058

49895059
checkHandle(BLOCK_LISTENER_TAG, handle);
5060+
logger.trace(format("Unregister BlockListener with handle %s.", handle));
5061+
5062+
LinkedHashMap<String, BL> lblockListeners = blockListeners;
5063+
if (lblockListeners == null) {
5064+
return false;
5065+
}
5066+
5067+
synchronized (lblockListeners) {
5068+
5069+
return null != lblockListeners.remove(handle);
5070+
5071+
}
5072+
}
5073+
5074+
public Collection<String> getBlockListenerHandles() throws InvalidArgumentException {
5075+
5076+
if (shutdown) {
5077+
throw new InvalidArgumentException(format("Channel %s has been shutdown.", name));
5078+
}
5079+
5080+
LinkedHashMap<String, BL> lblockListeners = blockListeners;
5081+
if (lblockListeners == null) {
5082+
return Collections.emptyList();
5083+
}
49905084

4991-
synchronized (blockListeners) {
5085+
synchronized (lblockListeners) {
49925086

4993-
return null != blockListeners.remove(handle);
5087+
Set<String> ret = new HashSet<>(lblockListeners.keySet());
5088+
// remove the SDKs own transaction block listener.
5089+
final String ltransactionListenerProcessorHandle = transactionListenerProcessorHandle;
5090+
if (null != ltransactionListenerProcessorHandle) {
5091+
ret.remove(ltransactionListenerProcessorHandle);
5092+
}
49945093

5094+
return Collections.unmodifiableSet(ret);
49955095
}
49965096
}
49975097
////////// Transaction monitoring /////////////////////////////
@@ -5053,7 +5153,23 @@ private void startEventQue() {
50535153
for (BL l : blcopy) {
50545154
try {
50555155
logger.trace(format("Sending block event '%s' to block listener %s", from, l.handle));
5056-
client.getExecutorService().execute(() -> l.listener.received(blockEvent));
5156+
if (l.listener != null) {
5157+
client.getExecutorService().execute(() -> l.listener.received(blockEvent));
5158+
} else if (l.blockingQueue != null) {
5159+
5160+
if (l.timeout < 0 || l.timeUnit == null) {
5161+
5162+
l.blockingQueue.put(new QueuedBlockEvent(l.handle, blockEvent));
5163+
5164+
} else {
5165+
5166+
if (!l.blockingQueue.offer(new QueuedBlockEvent(l.handle, blockEvent), l.timeout, l.timeUnit)) {
5167+
logger.warn(format("Error calling block listener %s on channel: %s event: %s could not be added in time %d %s ",
5168+
l.handle, name, from, l.timeout, l.timeUnit));
5169+
}
5170+
}
5171+
5172+
}
50575173
} catch (Throwable e) { //Don't let one register stop rest.
50585174
logger.error(format("Error calling block listener %s on channel: %s event: %s ", l.handle, name, from), e);
50595175
}
@@ -5906,18 +6022,35 @@ class BL {
59066022

59076023
final BlockListener listener;
59086024
final String handle;
6025+
private final BlockingQueue<QueuedBlockEvent> blockingQueue;
6026+
private final long timeout;
6027+
private final TimeUnit timeUnit;
59096028

5910-
BL(BlockListener listener) {
5911-
6029+
{
59126030
handle = BLOCK_LISTENER_TAG + Utils.generateUUID() + BLOCK_LISTENER_TAG;
59136031
logger.debug(format("Channel %s blockListener %s starting", name, handle));
59146032

5915-
this.listener = listener;
59166033
synchronized (blockListeners) {
59176034

59186035
blockListeners.put(handle, this);
59196036

59206037
}
6038+
}
6039+
6040+
BL(BlockListener listener) {
6041+
6042+
this.listener = listener;
6043+
blockingQueue = null;
6044+
timeout = Long.MAX_VALUE;
6045+
timeUnit = null;
6046+
6047+
}
6048+
6049+
BL(BlockingQueue<QueuedBlockEvent> blockingQueue, long timeout, TimeUnit timeUnit) {
6050+
this.blockingQueue = blockingQueue;
6051+
this.timeout = timeout;
6052+
this.timeUnit = timeUnit;
6053+
listener = null;
59216054

59226055
}
59236056

src/main/java/org/hyperledger/fabric/sdk/Peer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,5 +681,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
681681
reconnectCount = new AtomicLong(0L);
682682
id = config.getNextID();
683683
lastBlockNumber = -1L;
684+
685+
logger.debug(format("Reserialized peer: %s", this.toString()));
684686
}
685687
} // end Peer
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
*
3+
* Copyright 2018 IBM - All Rights Reserved.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
*/
16+
17+
package org.hyperledger.fabric.sdk;
18+
19+
/**
20+
* Event returned on a Queued Block event listener.
21+
*/
22+
public class QueuedBlockEvent {
23+
24+
private final String listenerHandle;
25+
private final BlockEvent blockEvent;
26+
27+
QueuedBlockEvent(String listenerHandle, BlockEvent blockEvent) {
28+
this.listenerHandle = listenerHandle;
29+
this.blockEvent = blockEvent;
30+
}
31+
32+
/**
33+
* The BlockEvent received.
34+
*
35+
* @return The BlockEvent received.
36+
*/
37+
38+
public BlockEvent getBlockEvent() {
39+
return blockEvent;
40+
}
41+
42+
/**
43+
* The listner handle that produced this event.
44+
*
45+
* @return The listner handle that produced this event.
46+
*/
47+
public String getListenerHandle() {
48+
return listenerHandle;
49+
}
50+
}

src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import java.util.LinkedList;
2929
import java.util.Map;
3030
import java.util.Properties;
31+
import java.util.concurrent.BlockingQueue;
3132
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.LinkedBlockingQueue;
34+
import java.util.concurrent.TimeUnit;
3235

3336
import com.google.protobuf.ByteString;
3437
import io.grpc.Status;
@@ -1014,6 +1017,79 @@ public void testProposalBuilderWithMetaInfEmpty() throws Exception {
10141017
FabricProposal.Proposal proposal = installProposalBuilder.build(); // Build it get the proposal. Then unpack it to see if it's what we epect.
10151018
}
10161019

1020+
//testing of blocklistner
1021+
1022+
@Test
1023+
public void testRegisterBlockListenerNULL() throws Exception {
1024+
1025+
thrown.expect(InvalidArgumentException.class);
1026+
thrown.expectMessage("BlockEventQueue parameter is null.");
1027+
1028+
Channel channel = hfclient.newChannel("testRegisterBlockListenerNULL");
1029+
BlockingQueue<QueuedBlockEvent> nblis = null;
1030+
channel.registerBlockListener(nblis);
1031+
1032+
}
1033+
1034+
@Test
1035+
public void testRegisterBlockListenerNULL2() throws Exception {
1036+
1037+
thrown.expect(InvalidArgumentException.class);
1038+
thrown.expectMessage("BlockEventQueue parameter is null.");
1039+
1040+
Channel channel = hfclient.newChannel("testRegisterBlockListenerNULL2");
1041+
BlockingQueue<QueuedBlockEvent> nblis = null;
1042+
channel.registerBlockListener(nblis, 10, TimeUnit.SECONDS);
1043+
1044+
}
1045+
1046+
@Test
1047+
public void testRegisterBlockListenerBadArg() throws Exception {
1048+
1049+
thrown.expect(InvalidArgumentException.class);
1050+
thrown.expectMessage("Timeout parameter must be greater than 0 not -1");
1051+
1052+
Channel channel = hfclient.newChannel("testRegisterBlockListenerBadArg");
1053+
BlockingQueue<QueuedBlockEvent> nblis = null;
1054+
channel.registerBlockListener(new LinkedBlockingQueue<>(), -1, TimeUnit.SECONDS);
1055+
1056+
}
1057+
1058+
@Test
1059+
public void testRegisterBlockListenerBadNULLArg() throws Exception {
1060+
1061+
thrown.expect(InvalidArgumentException.class);
1062+
thrown.expectMessage("TimeUnit parameter must not be null.");
1063+
1064+
Channel channel = hfclient.newChannel("testRegisterBlockListenerBadNULLArg");
1065+
channel.registerBlockListener(new LinkedBlockingQueue<>(), 10, null);
1066+
1067+
}
1068+
1069+
@Test
1070+
public void testRegisterBlockListenerShutdown() throws Exception {
1071+
1072+
thrown.expect(InvalidArgumentException.class);
1073+
thrown.expectMessage("Channel testRegisterBlockListenerShutdown has been shutdown.");
1074+
1075+
Channel channel = hfclient.newChannel("testRegisterBlockListenerShutdown");
1076+
channel.shutdown(false);
1077+
channel.registerBlockListener(new LinkedBlockingQueue<>(), 10, TimeUnit.SECONDS);
1078+
1079+
}
1080+
1081+
@Test
1082+
public void testRegisterBlockListenerShutdown2() throws Exception {
1083+
1084+
thrown.expect(InvalidArgumentException.class);
1085+
thrown.expectMessage("Channel testRegisterBlockListenerShutdown2 has been shutdown.");
1086+
1087+
Channel channel = hfclient.newChannel("testRegisterBlockListenerShutdown2");
1088+
channel.shutdown(false);
1089+
channel.registerBlockListener(new LinkedBlockingQueue<>());
1090+
1091+
}
1092+
10171093
class MockEndorserClient extends EndorserClient {
10181094
final Throwable throwThis;
10191095
private final CompletableFuture<FabricProposalResponse.ProposalResponse> returnedFuture;

src/test/java/org/hyperledger/fabric/sdkintegration/End2endAndBackAgainIT.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,14 +200,6 @@ public void setup() throws Exception {
200200

201201
try {
202202

203-
// client.setMemberServices(peerOrg1FabricCA);
204-
205-
//Persistence is not part of SDK. Sample file store is for demonstration purposes only!
206-
// MUST be replaced with more robust application implementation (Database, LDAP)
207-
208-
// if (sampleStoreFile.exists()) { //For testing start fresh
209-
// sampleStoreFile.delete();
210-
// }
211203
sampleStore = new SampleStore(sampleStoreFile);
212204

213205
setupUsers(sampleStore);

0 commit comments

Comments
 (0)