Skip to content

Commit beb19b9

Browse files
committed
support adding request-dependent tags to stream calls
1 parent 4b458cb commit beb19b9

File tree

6 files changed

+296
-149
lines changed

6 files changed

+296
-149
lines changed

grpc-spring-boot-starter-demo/src/main/java/org/lognet/springboot/grpc/demo/GreeterService.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import lombok.extern.slf4j.Slf4j;
88
import org.lognet.springboot.grpc.GRpcService;
99
import org.lognet.springboot.grpc.security.GrpcSecurity;
10-
import org.springframework.beans.factory.annotation.Autowired;
1110
import org.springframework.security.access.annotation.Secured;
1211
import org.springframework.security.access.prepost.PostAuthorize;
1312
import org.springframework.security.access.prepost.PreAuthorize;
@@ -17,15 +16,36 @@
1716
import org.springframework.util.Assert;
1817

1918
@Slf4j
20-
@GRpcService(interceptors = { LogInterceptor.class })
19+
@GRpcService(interceptors = {LogInterceptor.class})
2120
public class GreeterService extends GreeterGrpc.GreeterImplBase {
2221
@Override
2322
public void sayHello(GreeterOuterClass.HelloRequest request, StreamObserver<GreeterOuterClass.HelloReply> responseObserver) {
2423
String message = "Hello " + request.getName();
2524
final GreeterOuterClass.HelloReply.Builder replyBuilder = GreeterOuterClass.HelloReply.newBuilder().setMessage(message);
2625
responseObserver.onNext(replyBuilder.build());
2726
responseObserver.onCompleted();
28-
log.info("Returning " +message);
27+
log.info("Returning " + message);
28+
}
29+
30+
@Override public StreamObserver<GreeterOuterClass.HelloRequest> sayManyHellos(
31+
StreamObserver<GreeterOuterClass.HelloReply> responseObserver
32+
) {
33+
return new StreamObserver<GreeterOuterClass.HelloRequest>() {
34+
@Override public void onNext(GreeterOuterClass.HelloRequest request) {
35+
String message = "Hello " + request.getName();
36+
final GreeterOuterClass.HelloReply.Builder replyBuilder = GreeterOuterClass.HelloReply.newBuilder().setMessage(message);
37+
responseObserver.onNext(replyBuilder.build());
38+
log.info("Returning " + message);
39+
}
40+
41+
@Override
42+
public void onError(Throwable t) {}
43+
44+
@Override
45+
public void onCompleted() {
46+
responseObserver.onCompleted();
47+
}
48+
};
2949
}
3050

3151
@Override
@@ -34,15 +54,16 @@ public void sayAuthHello(Empty request, StreamObserver<GreeterOuterClass.HelloRe
3454

3555

3656
final Authentication auth = GrpcSecurity.AUTHENTICATION_CONTEXT_KEY.get();
37-
Assert.isTrue(SecurityContextHolder.getContext().getAuthentication() == auth,()->"Authentication object should be the same as in GRPC context");
38-
if(null!=auth) {
57+
Assert.isTrue(SecurityContextHolder.getContext().getAuthentication() == auth,
58+
() -> "Authentication object should be the same as in GRPC context");
59+
if (null != auth) {
3960

4061
String user = auth.getName();
4162
if (auth instanceof JwtAuthenticationToken) {
4263
user = JwtAuthenticationToken.class.cast(auth).getTokenAttributes().get("preferred_username").toString();
4364
}
4465
responseObserver.onNext(GreeterOuterClass.HelloReply.newBuilder().setMessage(user).build());
45-
}else{
66+
} else {
4667
responseObserver.onNext(GreeterOuterClass.HelloReply.newBuilder().setMessage("Hello").build());
4768
}
4869
responseObserver.onCompleted();
@@ -63,22 +84,22 @@ public void sayPreAuthHello(GreeterOuterClass.Person person, StreamObserver<Gree
6384
public void sayAuthOnlyHello(Empty request, StreamObserver<GreeterOuterClass.HelloReply> responseObserver) {
6485

6586

66-
sayAuthHello(request,responseObserver);
87+
sayAuthHello(request, responseObserver);
6788
}
6889

6990
@Override
7091
public void helloPersonValidResponse(GreeterOuterClass.Person request, StreamObserver<GreeterOuterClass.Person> responseObserver) {
7192
responseObserver.onNext(GreeterOuterClass.Person.newBuilder(request)
72-
.setNickName(request.getName().toLowerCase())
73-
.build());
93+
.setNickName(request.getName().toLowerCase())
94+
.build());
7495
responseObserver.onCompleted();
7596
}
7697

7798
@Override
7899
public void helloPersonInvalidResponse(GreeterOuterClass.Person request, StreamObserver<GreeterOuterClass.Person> responseObserver) {
79100
responseObserver.onNext(GreeterOuterClass.Person.newBuilder(request)
80-
.clearNickName()
81-
.build());
101+
.clearNickName()
102+
.build());
82103
responseObserver.onCompleted();
83104
}
84105

grpc-spring-boot-starter-demo/src/main/proto/greeter.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ option java_package = "io.grpc.examples";
88
service Greeter {
99
// Sends a greeting
1010
rpc SayHello ( HelloRequest) returns ( HelloReply) {}
11+
rpc SayManyHellos (stream HelloRequest) returns (stream HelloReply) {}
1112
rpc SayAuthHello ( google.protobuf.Empty) returns ( HelloReply) {}
1213
rpc SayAuthOnlyHello ( google.protobuf.Empty) returns ( HelloReply) {}
1314
rpc SayPreAuthHello ( Person) returns ( Person) {}
Lines changed: 107 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,35 @@
11
package org.lognet.springboot.grpc;
22

3+
import static io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING;
4+
import static org.hamcrest.MatcherAssert.assertThat;
5+
import static org.hamcrest.Matchers.containsString;
6+
import static org.hamcrest.Matchers.greaterThan;
7+
import static org.hamcrest.Matchers.is;
8+
import static org.hamcrest.Matchers.notNullValue;
9+
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
10+
11+
import java.time.Duration;
12+
import java.util.Arrays;
13+
import java.util.Collections;
14+
import java.util.Objects;
15+
import java.util.concurrent.TimeUnit;
16+
317
import io.grpc.Attributes;
418
import io.grpc.MethodDescriptor;
519
import io.grpc.Status;
620
import io.grpc.examples.GreeterGrpc;
721
import io.grpc.examples.GreeterOuterClass;
22+
import io.grpc.examples.GreeterOuterClass.HelloRequest;
23+
import io.grpc.stub.StreamObserver;
824
import io.micrometer.core.instrument.MeterRegistry;
925
import io.micrometer.core.instrument.Tag;
26+
import io.micrometer.core.instrument.Tags;
1027
import io.micrometer.core.instrument.Timer;
11-
import io.micrometer.core.instrument.simple.SimpleConfig;
28+
import io.micrometer.core.instrument.search.MeterNotFoundException;
1229
import io.micrometer.prometheus.PrometheusConfig;
1330
import org.awaitility.Awaitility;
1431
import org.junit.Before;
32+
import org.junit.Test;
1533
import org.junit.runner.RunWith;
1634
import org.lognet.springboot.grpc.autoconfigure.metrics.RequestAwareGRpcMetricsTagsContributor;
1735
import org.lognet.springboot.grpc.context.LocalRunningGrpcPort;
@@ -26,43 +44,35 @@
2644
import org.springframework.test.context.ActiveProfiles;
2745
import org.springframework.test.context.junit4.SpringRunner;
2846

29-
import java.time.Duration;
30-
import java.util.Collections;
31-
import java.util.concurrent.TimeUnit;
32-
33-
import static org.hamcrest.MatcherAssert.assertThat;
34-
import static org.hamcrest.Matchers.containsString;
35-
import static org.hamcrest.Matchers.greaterThan;
36-
import static org.hamcrest.Matchers.is;
37-
import static org.hamcrest.Matchers.notNullValue;
38-
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
39-
4047
@RunWith(SpringRunner.class)
4148
@SpringBootTest(classes = {DemoApp.class}, webEnvironment = NONE, properties = {"grpc.port=0"})
4249
@ActiveProfiles("measure")
4350
@Import(GrpcMeterTest.Config.class)
4451
public class GrpcMeterTest extends GrpcServerTestBase {
4552
@TestConfiguration
46-
static class Config{
53+
static class Config {
4754
@Bean
48-
public RequestAwareGRpcMetricsTagsContributor<GreeterOuterClass.HelloRequest> helloContributor(){
49-
return new RequestAwareGRpcMetricsTagsContributor<GreeterOuterClass.HelloRequest>(GreeterOuterClass.HelloRequest.class) {
55+
public RequestAwareGRpcMetricsTagsContributor<HelloRequest> helloContributor() {
56+
return new RequestAwareGRpcMetricsTagsContributor<HelloRequest>(HelloRequest.class) {
5057
@Override
51-
public Iterable<Tag> getTags(GreeterOuterClass.HelloRequest request, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes) {
52-
return Collections.singletonList(Tag.of("hello",request.getName()));
58+
public Iterable<Tag> getTags(HelloRequest request, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes) {
59+
return Collections.singletonList(Tag.of("hello", request.getName()));
5360
}
5461

5562
@Override
5663
public Iterable<Tag> getTags(Status status, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes) {
57-
return Collections.singletonList(Tag.of("customTagName",status.getCode().name()));
64+
return Collections.singletonList(Tag.of("customTagName", status.getCode().name()));
5865
}
5966
};
6067
}
68+
6169
@Bean
62-
public RequestAwareGRpcMetricsTagsContributor<GreeterOuterClass.Person> shouldNotBeInvoked(){
70+
public RequestAwareGRpcMetricsTagsContributor<GreeterOuterClass.Person> shouldNotBeInvoked() {
6371
return new RequestAwareGRpcMetricsTagsContributor<GreeterOuterClass.Person>(GreeterOuterClass.Person.class) {
6472
@Override
65-
public Iterable<Tag> getTags(GreeterOuterClass.Person request, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes) {
73+
public Iterable<Tag> getTags(
74+
GreeterOuterClass.Person request, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes
75+
) {
6676
return Collections.emptyList();
6777
}
6878

@@ -72,6 +82,28 @@ public Iterable<Tag> getTags(Status status, MethodDescriptor<?, ?> methodDescrip
7282
}
7383
};
7484
}
85+
86+
@Bean
87+
public RequestAwareGRpcMetricsTagsContributor<HelloRequest> multiHelloContributor() {
88+
return new RequestAwareGRpcMetricsTagsContributor<HelloRequest>(HelloRequest.class, BIDI_STREAMING) {
89+
@Override
90+
public Tags getTags(
91+
HelloRequest request, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes, Tags existingTags
92+
) {
93+
String existingTag = existingTags.stream()
94+
.filter(tag -> tag.getKey().equals("many-hellos"))
95+
.findAny()
96+
.map(Tag::getValue)
97+
.orElse("");
98+
return Tags.of("many-hellos", existingTag.isEmpty() ? request.getName() : existingTag + ", " + request.getName());
99+
}
100+
101+
@Override
102+
public Iterable<Tag> getTags(Status status, MethodDescriptor<?, ?> methodDescriptor, Attributes attributes) {
103+
return Collections.singletonList(Tag.of("endTag", status.getCode().name()));
104+
}
105+
};
106+
}
75107
}
76108

77109
@SpyBean
@@ -87,54 +119,84 @@ public Iterable<Tag> getTags(Status status, MethodDescriptor<?, ?> methodDescrip
87119
private PrometheusConfig registryConfig;
88120

89121
@Before
90-
public void setUp() {
122+
public void setUp() {
91123
registry.clear();
92124
}
93125

94126
@Override
95-
protected void afterGreeting() {
96-
97-
127+
protected void afterGreeting() {
98128
final Timer timer = registry.find("grpc.server.calls").timer();
99-
assertThat(timer,notNullValue(Timer.class));
129+
assertThat(timer, notNullValue(Timer.class));
100130

101131
Awaitility
102-
.waitAtMost(Duration.ofMillis(registryConfig.step().toMillis() * 2))
103-
.until(timer::count,greaterThan(0L));
132+
.waitAtMost(Duration.ofMillis(registryConfig.step().toMillis() * 2))
133+
.until(timer::count, greaterThan(0L));
104134

105-
assertThat(timer.max(TimeUnit.MILLISECONDS),greaterThan(0d));
106-
assertThat(timer.mean(TimeUnit.MILLISECONDS),greaterThan(0d));
107-
assertThat(timer.totalTime(TimeUnit.MILLISECONDS),greaterThan(0d));
135+
assertThat(timer.max(TimeUnit.MILLISECONDS), greaterThan(0d));
136+
assertThat(timer.mean(TimeUnit.MILLISECONDS), greaterThan(0d));
137+
assertThat(timer.totalTime(TimeUnit.MILLISECONDS), greaterThan(0d));
108138

109139

110140
final String addressTag = timer.getId().getTag("address");
111-
assertThat(addressTag,notNullValue());
112-
assertThat(addressTag,containsString(String.valueOf(port)));
141+
assertThat(addressTag, notNullValue());
142+
assertThat(addressTag, containsString(String.valueOf(port)));
113143

114144
final String methodTag = timer.getId().getTag("method");
115-
assertThat(methodTag,notNullValue());
116-
assertThat(methodTag,is(GreeterGrpc.getSayHelloMethod().getFullMethodName()));
145+
assertThat(methodTag, notNullValue());
146+
assertThat(methodTag, is(GreeterGrpc.getSayHelloMethod().getFullMethodName()));
117147

118148
final String resultTag = timer.getId().getTag("result");
119-
assertThat(resultTag,notNullValue());
120-
assertThat(resultTag,is(Status.OK.getCode().name()));
149+
assertThat(resultTag, notNullValue());
150+
assertThat(resultTag, is(Status.OK.getCode().name()));
121151

122152
//from contributor
123153

124154
final String helloTag = timer.getId().getTag("hello");
125-
assertThat(helloTag,notNullValue());
126-
assertThat(helloTag,is(name));
155+
assertThat(helloTag, notNullValue());
156+
assertThat(helloTag, is(name));
127157

128158
final String customTag = timer.getId().getTag("customTagName");
129-
assertThat(customTag,notNullValue());
130-
assertThat(customTag,is(Status.OK.getCode().name()));
131-
132-
Mockito.verify(shouldNotBeInvoked,Mockito.times(1)).getTags(Mockito.any(Status.class),Mockito.any(),Mockito.any());
133-
Mockito.verify(shouldNotBeInvoked,Mockito.never()).getTags(Mockito.any(GreeterOuterClass.Person.class),Mockito.any(),Mockito.any());
134-
135-
159+
assertThat(customTag, notNullValue());
160+
assertThat(customTag, is(Status.OK.getCode().name()));
161+
162+
Mockito.verify(shouldNotBeInvoked, Mockito.times(1)).getTags(Mockito.any(Status.class), Mockito.any(), Mockito.any());
163+
Mockito.verify(shouldNotBeInvoked, Mockito.never())
164+
.getTags(Mockito.any(GreeterOuterClass.Person.class), Mockito.any(), Mockito.any());
165+
Mockito.verify(shouldNotBeInvoked, Mockito.never())
166+
.getTags(Mockito.any(GreeterOuterClass.Person.class), Mockito.any(), Mockito.any(), Mockito.any());
167+
}
136168

169+
@Test
170+
public void tagsForStream() {
171+
final GreeterGrpc.GreeterStub greeterFutureStub = GreeterGrpc.newStub(selectedChanel);
172+
io.grpc.stub.StreamObserver<HelloRequest> helloInput =
173+
greeterFutureStub.sayManyHellos(new StreamObserver<GreeterOuterClass.HelloReply>() {
174+
@Override
175+
public void onNext(GreeterOuterClass.HelloReply value) {}
137176

177+
@Override
178+
public void onError(Throwable t) {}
138179

180+
@Override
181+
public void onCompleted() {}
182+
});
183+
Arrays.asList("a", "b", "c", "d").stream()
184+
.map(name -> HelloRequest.newBuilder().setName(name).build())
185+
.forEach(helloInput::onNext);
186+
helloInput.onCompleted();
187+
188+
final Timer timer = Awaitility
189+
.waitAtMost(Duration.ofMillis(registryConfig.step().toMillis() * 2))
190+
.ignoreExceptionsInstanceOf(MeterNotFoundException.class)
191+
.until(
192+
() -> registry.get("grpc.server.calls")
193+
.tags("method", "Greeter/SayManyHellos")
194+
.timer(),
195+
Objects::nonNull
196+
);
197+
198+
assertThat(timer.totalTime(TimeUnit.MILLISECONDS), greaterThan(0d));
199+
assertThat(timer.getId().getTag("many-hellos"), is("a, b, c, d"));
200+
assertThat(timer.getId().getTag("endTag"), is("OK"));
139201
}
140202
}

0 commit comments

Comments
 (0)