diff --git a/.gitignore b/.gitignore index 76d8c0f8d..aa64c92da 100644 --- a/.gitignore +++ b/.gitignore @@ -25,7 +25,6 @@ dump.rdb .apt_generated artifacts **/dependency-reduced-pom.xml -core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java node node_modules diff --git a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java index 055d38722..15bf8ac39 100644 --- a/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java +++ b/core/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/function/HeaderTests.java @@ -16,10 +16,7 @@ package org.springframework.cloud.stream.function; -import java.nio.charset.StandardCharsets; -import java.util.Collections; import java.util.Locale; -import java.util.Map; import java.util.function.Function; import org.junit.jupiter.api.BeforeAll; @@ -29,14 +26,9 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.function.context.message.MessageUtils; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.cloud.stream.binder.BinderHeaders; -import org.springframework.cloud.stream.binder.test.EnableTestBinder; import org.springframework.cloud.stream.binder.test.InputDestination; import org.springframework.cloud.stream.binder.test.OutputDestination; import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; -import org.springframework.cloud.stream.utils.BuildInformationProvider; -import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -51,6 +43,7 @@ /** * @author Omer Celik */ + public class HeaderTests { @BeforeAll @@ -70,8 +63,10 @@ void checkWithEmptyPojo() { OutputDestination outputDestination = context.getBean(OutputDestination.class); Message messageReceived = outputDestination.receive(1000, "emptyConfigurationDestination"); - - checkCommonHeaders(messageReceived.getHeaders()); + MessageHeaders headers = messageReceived.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); } } @@ -80,7 +75,6 @@ void checkIfHeaderProvidedInData() { try (ConfigurableApplicationContext context = new SpringApplicationBuilder( TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class)) .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false")) { - StreamBridge streamBridge = context.getBean(StreamBridge.class); String jsonPayload = "{\"name\":\"Omer\"}"; streamBridge.send("myBinding-out-0", @@ -88,12 +82,13 @@ void checkIfHeaderProvidedInData() { .setHeader("anyHeader", "anyValue") .build(), MimeTypeUtils.APPLICATION_JSON); - OutputDestination output = context.getBean(OutputDestination.class); Message result = output.receive(1000, "myBinding-out-0"); - - checkCommonHeaders(result.getHeaders()); - assertThat(result.getHeaders().get("anyHeader")).isEqualTo("anyValue"); + MessageHeaders headers = result.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); + assertThat(headers.get("anyHeader")).isEqualTo("anyValue"); } } @@ -104,35 +99,16 @@ void checkGenericMessageSent() { .web(WebApplicationType.NONE) .run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase")) { - String jsonPayload = "{\"surname\":\"Celik\"}"; InputDestination input = context.getBean(InputDestination.class); input.send(new GenericMessage<>(jsonPayload.getBytes()), "uppercase-in-0"); - OutputDestination output = context.getBean(OutputDestination.class); - Message result = output.receive(1000, "uppercase-out-0"); - - checkCommonHeaders(result.getHeaders()); - } - } - @Test - void checkGenericMessageSentUsingStreamBridge() { - try (ConfigurableApplicationContext context = new SpringApplicationBuilder( - TestChannelBinderConfiguration.getCompleteConfiguration(FunctionUpperCaseConfiguration.class)) - .web(WebApplicationType.NONE) - .run("--spring.jmx.enabled=false", - "--spring.cloud.function.definition=uppercase")) { - - String jsonPayload = "{\"anyFieldName\":\"anyValue\"}"; - final StreamBridge streamBridge = context.getBean(StreamBridge.class); - GenericMessage message = new GenericMessage<>(jsonPayload); - streamBridge.send("uppercase-in-0", message); - - OutputDestination output = context.getBean(OutputDestination.class); Message result = output.receive(1000, "uppercase-out-0"); - - checkCommonHeaders(result.getHeaders()); + MessageHeaders headers = result.getHeaders(); + assertThat(headers).isNotNull(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); + assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); } } @@ -151,96 +127,11 @@ void checkMessageWrappedFunctionalConsumer() { OutputDestination target = context.getBean(OutputDestination.class); Message message = target.receive(5, "uppercase-out-0"); - - checkCommonHeaders(message.getHeaders()); - } - - @Test - void checkStringToMapMessageStreamListener() { - ApplicationContext context = new SpringApplicationBuilder( - StringToMapMessageConfiguration.class).web(WebApplicationType.NONE) - .run("--spring.jmx.enabled=false"); - InputDestination source = context.getBean(InputDestination.class); - String jsonPayload = "{\"name\":\"Omer\"}"; - source.send(new GenericMessage<>(jsonPayload.getBytes())); - OutputDestination target = context.getBean(OutputDestination.class); - Message outputMessage = target.receive(); - checkCommonHeaders(outputMessage.getHeaders()); - } - - @Test - void checkPojoToPojo() { - ApplicationContext context = new SpringApplicationBuilder( - PojoToPojoConfiguration.class).web(WebApplicationType.NONE) - .run("--spring.jmx.enabled=false"); - InputDestination source = context.getBean(InputDestination.class); - String jsonPayload = "{\"name\":\"Omer\"}"; - source.send(new GenericMessage<>(jsonPayload.getBytes())); - OutputDestination target = context.getBean(OutputDestination.class); - Message outputMessage = target.receive(); - checkCommonHeaders(outputMessage.getHeaders()); - } - - @Test - void checkPojoToString() { - ApplicationContext context = new SpringApplicationBuilder( - PojoToStringConfiguration.class).web(WebApplicationType.NONE) - .run("--spring.jmx.enabled=false"); - InputDestination source = context.getBean(InputDestination.class); - OutputDestination target = context.getBean(OutputDestination.class); - String jsonPayload = "{\"name\":\"Neso\"}"; - source.send(new GenericMessage<>(jsonPayload.getBytes())); - Message outputMessage = target.receive(); - checkCommonHeaders(outputMessage.getHeaders()); - } - - @Test - void checkPojoToByteArray() { - ApplicationContext context = new SpringApplicationBuilder( - PojoToByteArrayConfiguration.class).web(WebApplicationType.NONE) - .run("--spring.jmx.enabled=false"); - InputDestination source = context.getBean(InputDestination.class); - OutputDestination target = context.getBean(OutputDestination.class); - String jsonPayload = "{\"name\":\"Neptune\"}"; - source.send(new GenericMessage<>(jsonPayload.getBytes())); - Message outputMessage = target.receive(); - checkCommonHeaders(outputMessage.getHeaders()); - } - - @Test - void checkStringToPojoInboundContentTypeHeader() { - ApplicationContext context = new SpringApplicationBuilder( - StringToPojoConfiguration.class).web(WebApplicationType.NONE) - .run("--spring.jmx.enabled=false"); - InputDestination source = context.getBean(InputDestination.class); - OutputDestination target = context.getBean(OutputDestination.class); - String jsonPayload = "{\"name\":\"Mercury\"}"; - source.send(new GenericMessage<>(jsonPayload.getBytes(), - new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, - MimeTypeUtils.APPLICATION_JSON_VALUE)))); - Message outputMessage = target.receive(); - checkCommonHeaders(outputMessage.getHeaders()); - } - - @Test - void checkPojoMessageToStringMessage() { - ApplicationContext context = new SpringApplicationBuilder( - PojoMessageToStringMessageConfiguration.class) - .web(WebApplicationType.NONE).run("--spring.jmx.enabled=false"); - InputDestination source = context.getBean(InputDestination.class); - OutputDestination target = context.getBean(OutputDestination.class); - String jsonPayload = "{\"name\":\"Earth\"}"; - source.send(new GenericMessage<>(jsonPayload.getBytes())); - Message outputMessage = target.receive(); - MessageHeaders headers = outputMessage.getHeaders(); - assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue(); - } - - private void checkCommonHeaders(MessageHeaders headers) { + MessageHeaders headers = message.getHeaders(); + assertThat(headers).isNotNull(); assertThat(headers).isNotNull(); - assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); assertThat(headers.get(MessageHeaders.CONTENT_TYPE)).isEqualTo("application/json"); - assertThat(BuildInformationProvider.isVersionValid((String) headers.get(BinderHeaders.SCST_VERSION))).isTrue(); + assertThat(headers.get(MessageUtils.TARGET_PROTOCOL)).isEqualTo("kafka"); } @EnableAutoConfiguration @@ -265,97 +156,6 @@ public Function uppercase() { } } - @EnableTestBinder - @EnableAutoConfiguration - public static class StringToMapMessageConfiguration { - @Bean - public Function>, String> echo() { - return value -> { - assertThat(value.getPayload() instanceof Map).isTrue(); - return (String) value.getPayload().get("name"); - }; - } - } - - @EnableTestBinder - @EnableAutoConfiguration - public static class PojoToPojoConfiguration { - - @Bean - public Function echo() { - return value -> value; - } - } - - @EnableTestBinder - @EnableAutoConfiguration - public static class PojoToStringConfiguration { - - @Bean - public Function echo() { - return Planet::toString; - } - } - - @EnableTestBinder - @EnableAutoConfiguration - public static class PojoToByteArrayConfiguration { - - @Bean - public Function echo() { - return value -> value.toString().getBytes(StandardCharsets.UTF_8); - } - } - - @EnableTestBinder - @EnableAutoConfiguration - public static class StringToPojoConfiguration { - - @Bean - public Function echo(JsonMapper mapper) { - return value -> mapper.fromJson(value, Planet.class); - } - } - - @EnableTestBinder - @EnableAutoConfiguration - public static class PojoMessageToStringMessageConfiguration { - - @Bean - public Function, Message> echo() { - return value -> MessageBuilder.withPayload(value.getPayload().toString()) - .setHeader("expected-content-type", MimeTypeUtils.TEXT_PLAIN_VALUE) - .build(); - } - } - - public static class Planet { - - private String name; - - Planet() { - this(null); - } - - Planet(String name) { - this.name = name; - } - - public String getName() { - return this.name; - } - - public void setName(String name) { - this.name = name; - } - - @Override - public String toString() { - return this.name; - } - - } - public static class EmptyPojo { } diff --git a/core/spring-cloud-stream/pom.xml b/core/spring-cloud-stream/pom.xml index 3dffa5ff1..6b0e54b73 100644 --- a/core/spring-cloud-stream/pom.xml +++ b/core/spring-cloud-stream/pom.xml @@ -15,10 +15,6 @@ 4.2.0-SNAPSHOT - - ${maven.build.timestamp} - - org.springframework.boot @@ -133,77 +129,6 @@ 1.8 - - org.apache.maven.plugins - maven-clean-plugin - - - clean - - clean - - - - - src/main/java - - **/GeneratedBuildProperties.java - - - - - - - - - org.apache.maven.plugins - maven-antrun-plugin - - - generate-build-info - generate-sources - - run - - - - - - - - - - - - - - - - org.apache.maven.plugins - maven-resources-plugin - - - process-sources - - copy-resources - - - - - src/main/template - - **/*.java - - true - - - src/main/java - true - - - - diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index cbb221c62..98c10ebb6 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -79,7 +79,6 @@ import org.springframework.cloud.stream.config.BindingServiceConfiguration; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; -import org.springframework.cloud.stream.utils.BuildInformationProvider; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; @@ -133,7 +132,6 @@ * @author Ivan Shapoval * @author Patrik Péter Süli * @author Artem Bilan - * @author Omer Celik * @since 2.1 */ @Lazy(false) @@ -472,7 +470,7 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor if (this.functionProperties.isComposeFrom()) { AbstractSubscribableChannel outputChannel = this.applicationContext.getBean(outputBindingNames.iterator().next(), AbstractSubscribableChannel.class); logger.info("Composing at the head of output destination: " + outputChannel.getBeanName()); - String outputChannelName = outputChannel.getBeanName(); + String outputChannelName = ((AbstractMessageChannel) outputChannel).getBeanName(); DirectWithAttributesChannel newOutputChannel = new DirectWithAttributesChannel(); newOutputChannel.setAttribute("type", "output"); newOutputChannel.setComponentName("output.extended"); @@ -499,14 +497,11 @@ private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactor headersField.setAccessible(true); targetProtocolEnhancer.set(message -> { Map headersMap = (Map) ReflectionUtils - .getField(headersField, message.getHeaders()); + .getField(headersField, ((Message) message).getHeaders()); headersMap.putIfAbsent(MessageUtils.TARGET_PROTOCOL, targetProtocol); if (CloudEventMessageUtils.isCloudEvent((message))) { headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); } - if (BuildInformationProvider.isVersionValid()) { - headersMap.putIfAbsent(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); - } return message; }); } @@ -841,9 +836,6 @@ private void setHeadersIfNeeded(Message message) { if (CloudEventMessageUtils.isCloudEvent(message)) { headersMap.putIfAbsent(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE); } - if (BuildInformationProvider.isVersionValid()) { - headersMap.putIfAbsent(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); - } } } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java index 399101448..7e22cdb75 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/StreamBridge.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.function; import java.lang.reflect.Type; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -42,7 +43,6 @@ import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.BinderFactory; -import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.binding.BindingService; import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor; @@ -50,7 +50,6 @@ import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel; -import org.springframework.cloud.stream.utils.BuildInformationProvider; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.ResolvableType; import org.springframework.integration.channel.AbstractMessageChannel; @@ -86,7 +85,6 @@ * @author Soby Chacko * @author Byungjun You * @author Michał Rowicki - * @author Omer Celik * @since 3.0.3 * */ @@ -208,7 +206,8 @@ public boolean send(String bindingName, @Nullable String binderName, Object data this.applicationContext.getBean(BinderFactory.class)); Message messageToSend = data instanceof Message messageData - ? createMessageWithHeader(messageData, targetType) : createGenericMessageWithHeader(data, targetType); + ? MessageBuilder.fromMessage(messageData).setHeaderIfAbsent(MessageUtils.TARGET_PROTOCOL, targetType).build() + : new GenericMessage<>(data, Collections.singletonMap(MessageUtils.TARGET_PROTOCOL, targetType)); Message resultMessage; lock.lock(); @@ -229,24 +228,6 @@ public boolean send(String bindingName, @Nullable String binderName, Object data return messageChannel.send(resultMessage); } - private Message createMessageWithHeader(Message messageData, String targetType) { - MessageBuilder messageBuilder = MessageBuilder.fromMessage(messageData) - .copyHeaders(createHeaders(targetType)); - return messageBuilder.build(); - } - private Message createGenericMessageWithHeader(Object data, String targetType) { - return new GenericMessage<>(data, createHeaders(targetType)); - } - - private Map createHeaders(String targetType) { - Map headers = new HashMap<>(); - headers.put(MessageUtils.TARGET_PROTOCOL, targetType); - if (BuildInformationProvider.isVersionValid()) { - headers.put(BinderHeaders.SCST_VERSION, BuildInformationProvider.getVersion()); - } - return headers; - } - private int hashProducerProperties(ProducerProperties producerProperties, String outputContentType) { int hash = outputContentType.hashCode() + Boolean.hashCode(producerProperties.isUseNativeEncoding()) diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/BuildInformationProvider.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/BuildInformationProvider.java deleted file mode 100644 index 67c117731..000000000 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/utils/BuildInformationProvider.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2024-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.utils; - - -/** - * Provides information about current Spring Cloud Stream build. - * - * @author Omer Celik - */ -public final class BuildInformationProvider { - - private static final String UNKNOWN_SCST_VERSION = "-1"; - - private static final BuildInformation BUILD_INFO_CACHE; - - static { - BUILD_INFO_CACHE = createBuildInformation(); - } - - private BuildInformationProvider() { - } - - public static boolean isVersionValid() { - return !getVersion().equals(UNKNOWN_SCST_VERSION); - } - public static boolean isVersionValid(String version) { - return !version.equals(UNKNOWN_SCST_VERSION); - } - public static String getVersion() { - return BUILD_INFO_CACHE.version(); - } - - // If you have a compilation error at GeneratedBuildProperties then run 'mvn clean install' - // the GeneratedBuildProperties class is generated at a compile-time - private static BuildInformation createBuildInformation() { - return new BuildInformation(calculateVersion(), GeneratedBuildProperties.TIMESTAMP); - } - - // If you have a compilation error at GeneratedBuildProperties then run 'mvn clean install' - // the GeneratedBuildProperties class is generated at a compile-time - private static String calculateVersion() { - String version = GeneratedBuildProperties.VERSION; - if (version.startsWith("@") && version.endsWith("@")) { - return UNKNOWN_SCST_VERSION; - } - return version; - } - - private record BuildInformation(String version, String timestamp) { - } -} diff --git a/core/spring-cloud-stream/src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java b/core/spring-cloud-stream/src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java deleted file mode 100644 index d82b1ff9f..000000000 --- a/core/spring-cloud-stream/src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2024-2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.utils; - -import javax.annotation.processing.Generated; - -/** - * Exposes the Spring Cloud Stream Build Properties. - * This class is generated in a build-time from a template stored at - * src/main/template/org/springframework/cloud/stream/utils/GeneratedBuildProperties. - * - * Do not edit by hand as the changes will be overwritten in the next build. - * We use this method to support all build types. (Fat jar, shaded jar, war, etc.) - * - * WARNING: DO NOT CHANGE FIELD VALUES IN THE TEMPLATE.(For example: @project.version@) - * The fields are injected using the @.....@ keywords with the Maven Antrun Plugin. - * - * @author Omer Celik - * @since 4.2.0 - */ -@Generated("") -public final class GeneratedBuildProperties { - - /** - * Indicates the Spring Cloud Stream version. - */ - public static final String VERSION = "@project.version@"; - - /** - * Indicates the build time of the project. - */ - public static final String TIMESTAMP = "@timestamp@"; - - private GeneratedBuildProperties() { - } -}