20
20
import static java .util .concurrent .TimeUnit .SECONDS ;
21
21
22
22
import com .google .common .collect .ImmutableList ;
23
+ import com .google .common .util .concurrent .SettableFuture ;
23
24
import com .google .protobuf .ByteString ;
24
25
import io .grpc .Grpc ;
25
26
import io .grpc .InsecureChannelCredentials ;
26
27
import io .grpc .ManagedChannel ;
27
28
import io .grpc .Server ;
28
29
import io .grpc .ServerBuilder ;
29
- import io .grpc .benchmarks .Utils ;
30
30
import io .grpc .s2a .internal .handshaker .ValidatePeerCertificateChainReq .VerificationMode ;
31
31
import io .grpc .stub .StreamObserver ;
32
32
import java .io .IOException ;
@@ -49,51 +49,52 @@ public final class FakeS2AServerTest {
49
49
50
50
private static final ImmutableList <ByteString > FAKE_CERT_DER_CHAIN =
51
51
ImmutableList .of (ByteString .copyFrom ("fake-der-chain" .getBytes (StandardCharsets .US_ASCII )));
52
- private int port ;
53
52
private String serverAddress ;
54
- private SessionResp response = null ;
55
53
private Server fakeS2AServer ;
56
54
57
55
@ Before
58
56
public void setUp () throws Exception {
59
- port = Utils .pickUnusedPort ();
60
- fakeS2AServer = ServerBuilder .forPort (port ).addService (new FakeS2AServer ()).build ();
57
+ fakeS2AServer = ServerBuilder .forPort (0 ).addService (new FakeS2AServer ()).build ();
61
58
fakeS2AServer .start ();
62
- serverAddress = String .format ("localhost:%d" , port );
59
+ serverAddress = String .format ("localhost:%d" , fakeS2AServer . getPort () );
63
60
}
64
61
65
62
@ After
66
- public void tearDown () {
63
+ public void tearDown () throws Exception {
67
64
fakeS2AServer .shutdown ();
65
+ fakeS2AServer .awaitTermination (10 , SECONDS );
68
66
}
69
67
70
68
@ Test
71
69
public void callS2AServerOnce_getTlsConfiguration_returnsValidResult ()
72
- throws InterruptedException , IOException {
70
+ throws InterruptedException , IOException , java . util . concurrent . ExecutionException {
73
71
ExecutorService executor = Executors .newSingleThreadExecutor ();
74
72
logger .info ("Client connecting to: " + serverAddress );
75
73
ManagedChannel channel =
76
74
Grpc .newChannelBuilder (serverAddress , InsecureChannelCredentials .create ())
77
75
.executor (executor )
78
76
.build ();
79
-
77
+ SettableFuture < SessionResp > respFuture = SettableFuture . create ();
80
78
try {
81
79
S2AServiceGrpc .S2AServiceStub asyncStub = S2AServiceGrpc .newStub (channel );
82
80
StreamObserver <SessionReq > requestObserver =
83
81
asyncStub .setUpSession (
84
82
new StreamObserver <SessionResp >() {
83
+ SessionResp recvResp ;
85
84
@ Override
86
85
public void onNext (SessionResp resp ) {
87
- response = resp ;
86
+ recvResp = resp ;
88
87
}
89
88
90
89
@ Override
91
90
public void onError (Throwable t ) {
92
- throw new RuntimeException (t );
91
+ respFuture . setException (t );
93
92
}
94
93
95
94
@ Override
96
- public void onCompleted () {}
95
+ public void onCompleted () {
96
+ respFuture .set (recvResp );
97
+ }
97
98
});
98
99
try {
99
100
requestObserver .onNext (
@@ -138,36 +139,39 @@ public void onCompleted() {}
138
139
.addCiphersuites (
139
140
Ciphersuite .CIPHERSUITE_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256 )))
140
141
.build ();
141
- assertThat (response ).ignoringRepeatedFieldOrder ().isEqualTo (expected );
142
+ assertThat (respFuture . get () ).ignoringRepeatedFieldOrder ().isEqualTo (expected );
142
143
}
143
144
144
145
@ Test
145
146
public void callS2AServerOnce_validatePeerCertifiate_returnsValidResult ()
146
- throws InterruptedException {
147
+ throws InterruptedException , java . util . concurrent . ExecutionException {
147
148
ExecutorService executor = Executors .newSingleThreadExecutor ();
148
149
logger .info ("Client connecting to: " + serverAddress );
149
150
ManagedChannel channel =
150
151
Grpc .newChannelBuilder (serverAddress , InsecureChannelCredentials .create ())
151
152
.executor (executor )
152
153
.build ();
153
-
154
+ SettableFuture < SessionResp > respFuture = SettableFuture . create ();
154
155
try {
155
156
S2AServiceGrpc .S2AServiceStub asyncStub = S2AServiceGrpc .newStub (channel );
156
157
StreamObserver <SessionReq > requestObserver =
157
158
asyncStub .setUpSession (
158
159
new StreamObserver <SessionResp >() {
160
+ private SessionResp recvResp ;
159
161
@ Override
160
162
public void onNext (SessionResp resp ) {
161
- response = resp ;
163
+ recvResp = resp ;
162
164
}
163
165
164
166
@ Override
165
167
public void onError (Throwable t ) {
166
- throw new RuntimeException (t );
168
+ respFuture . setException (t );
167
169
}
168
170
169
171
@ Override
170
- public void onCompleted () {}
172
+ public void onCompleted () {
173
+ respFuture .set (recvResp );
174
+ }
171
175
});
172
176
try {
173
177
requestObserver .onNext (
@@ -200,7 +204,7 @@ public void onCompleted() {}
200
204
ValidatePeerCertificateChainResp .newBuilder ()
201
205
.setValidationResult (ValidatePeerCertificateChainResp .ValidationResult .SUCCESS ))
202
206
.build ();
203
- assertThat (response ).ignoringRepeatedFieldOrder ().isEqualTo (expected );
207
+ assertThat (respFuture . get () ).ignoringRepeatedFieldOrder ().isEqualTo (expected );
204
208
}
205
209
206
210
@ Test
0 commit comments