Skip to content

Commit

Permalink
Revert "Adding Spring Cloud Stream Version To Message Headers For Eas…
Browse files Browse the repository at this point in the history
…ier Debugging Of Issues."

This reverts commit a8fb34d.
  • Loading branch information
sobychacko committed Nov 5, 2024
1 parent 13874dc commit e14bc1c
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 441 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ dump.rdb
.apt_generated
artifacts
**/dependency-reduced-pom.xml
core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java

node
node_modules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package org.springframework.cloud.stream.function;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

import org.junit.jupiter.api.BeforeAll;
Expand All @@ -29,14 +26,9 @@
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.test.EnableTestBinder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.cloud.stream.utils.BuildInformationProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -51,6 +43,7 @@
/**
* @author Omer Celik
*/

public class HeaderTests {

@BeforeAll
Expand All @@ -70,8 +63,10 @@ void checkWithEmptyPojo() {

OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination");

checkCommonHeaders(messageReceived.getHeaders());
MessageHeaders headers = messageReceived.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
}
}

Expand All @@ -80,20 +75,20 @@ void checkIfHeaderProvidedInData() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) {

StreamBridge streamBridge = context.getBean(StreamBridge.class);
String jsonPayload = "{\"name\":\"Omer\"}";
streamBridge.send("myBinding-out-0",
MessageBuilder.withPayload(jsonPayload.getBytes())
.setHeader("anyHeader", "anyValue")
.build(),
MimeTypeUtils.APPLICATION_JSON);

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "myBinding-out-0");

checkCommonHeaders(result.getHeaders());
assertThat(result.getHeaders().get("anyHeader")).isEqualTo("anyValue");
MessageHeaders headers = result.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(headers.get("anyHeader")).isEqualTo("anyValue");
}
}

Expand All @@ -104,35 +99,16 @@ void checkGenericMessageSent() {
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase")) {

String jsonPayload = "{\"surname\":\"Celik\"}";
InputDestination input = context.getBean(InputDestination.class);
input.send(new GenericMessage<>(jsonPayload.getBytes()), "uppercase-in-0");

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "uppercase-out-0");

checkCommonHeaders(result.getHeaders());
}
}

@Test
void checkGenericMessageSentUsingStreamBridge() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(FunctionUpperCaseConfiguration.class))
.web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false",
"--spring.cloud.function.definition=uppercase")) {

String jsonPayload = "{\"anyFieldName\":\"anyValue\"}";
final StreamBridge streamBridge = context.getBean(StreamBridge.class);
GenericMessage<String> message = new GenericMessage<>(jsonPayload);
streamBridge.send("uppercase-in-0", message);

OutputDestination output = context.getBean(OutputDestination.class);
Message<byte[]> result = output.receive(1000, "uppercase-out-0");

checkCommonHeaders(result.getHeaders());
MessageHeaders headers = result.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
}
}

Expand All @@ -151,96 +127,11 @@ void checkMessageWrappedFunctionalConsumer() {

OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> message = target.receive(5, "uppercase-out-0");

checkCommonHeaders(message.getHeaders());
}

@Test
void checkStringToMapMessageStreamListener() {
ApplicationContext context = new SpringApplicationBuilder(
StringToMapMessageConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
String jsonPayload = "{\"name\":\"Omer\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToPojo() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToPojoConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
String jsonPayload = "{\"name\":\"Omer\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToString() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToStringConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Neso\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoToByteArray() {
ApplicationContext context = new SpringApplicationBuilder(
PojoToByteArrayConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Neptune\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkStringToPojoInboundContentTypeHeader() {
ApplicationContext context = new SpringApplicationBuilder(
StringToPojoConfiguration.class).web(WebApplicationType.NONE)
.run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Mercury\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes(),
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_JSON_VALUE))));
Message<byte[]> outputMessage = target.receive();
checkCommonHeaders(outputMessage.getHeaders());
}

@Test
void checkPojoMessageToStringMessage() {
ApplicationContext context = new SpringApplicationBuilder(
PojoMessageToStringMessageConfiguration.class)
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false");
InputDestination source = context.getBean(InputDestination.class);
OutputDestination target = context.getBean(OutputDestination.class);
String jsonPayload = "{\"name\":\"Earth\"}";
source.send(new GenericMessage<>(jsonPayload.getBytes()));
Message<byte[]> outputMessage = target.receive();
MessageHeaders headers = outputMessage.getHeaders();
assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue();
}

private void checkCommonHeaders(MessageHeaders headers) {
MessageHeaders headers = message.getHeaders();
assertThat(headers).isNotNull();
assertThat(headers).isNotNull();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json");
assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue();
assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka");
}

@EnableAutoConfiguration
Expand All @@ -265,97 +156,6 @@ public Function<String, String> uppercase() {
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class StringToMapMessageConfiguration {
@Bean
public Function<Message<Map<?, ?>>, String> echo() {
return value -> {
assertThat(value.getPayload() instanceof Map).isTrue();
return (String) value.getPayload().get("name");
};
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToPojoConfiguration {

@Bean
public Function<Planet, Planet> echo() {
return value -> value;
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToStringConfiguration {

@Bean
public Function<Planet, String> echo() {
return Planet::toString;
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoToByteArrayConfiguration {

@Bean
public Function<Planet, byte[]> echo() {
return value -> value.toString().getBytes(StandardCharsets.UTF_8);
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class StringToPojoConfiguration {

@Bean
public Function<String, Planet> echo(JsonMapper mapper) {
return value -> mapper.fromJson(value, Planet.class);
}
}

@EnableTestBinder
@EnableAutoConfiguration
public static class PojoMessageToStringMessageConfiguration {

@Bean
public Function<Message<Planet>, Message<String>> echo() {
return value -> MessageBuilder.withPayload(value.getPayload().toString())
.setHeader("expected-content-type", MimeTypeUtils.TEXT_PLAIN_VALUE)
.build();
}
}

public static class Planet {

private String name;

Planet() {
this(null);
}

Planet(String name) {
this.name = name;
}

public String getName() {
return this.name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return this.name;
}

}

public static class EmptyPojo {

}
Expand Down
Loading

0 comments on commit e14bc1c

Please sign in to comment.