Skip to content

The StreamTransformer must remove a IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE header from the output message after resource is closed. #9792

Closed
@tasosz

Description

@tasosz

In what version(s) of Spring Integration are you seeing this issue?

Observed with 6.1.0 and 6.3.4

Describe the bug

ObjectMapper from JacksonUtils.enhancedObjectMapper() used by KafkaProducerMessageHandler (through new DefaultKafkaHeaderMapper()) is not using spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false

To Reproduce

Add spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false to Spring Boot (tested with 3.1.0 & 3.3.4) application.properties.

Using SFTP adapter to input file (and using StreamTransformer) and then producing output to a kafka topic.
Due to a closable session (for SFTP), the MessageHeaders on the call to DefaultKafkaHeaderMapper.fromHeaders(MessageHeaders headers, Headers target), contains an extra non-String closableResource header. This is expected per SI documentation.
But this header object then fails to serialise!
In 6.1.0 it produces a debug log, but in 6.3.4 it produces an error log (both with stacktrace).

Expected behavior

Using the spring.jackson.serialization.FAIL_ON_EMPTY_BEANS=false application property, the logic should have bypassed the exception throwing, but this property has no effect.

Sample

Log entries from 6.3.4:

 - 2025-01-24T12:19:30.405Z ERROR [hexapole-file-poller,,,] 11897 --- [hexapole-file-poller] [   scheduling-1] o.s.k.support.DefaultKafkaHeaderMapper   : Could not map closeableResource with type org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class java.lang.Object and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession["clientInstance"]->org.springframework.integration.sftp.session.DefaultSftpSessionFactory$ConcurrentSftpClient["clientSession"]->org.apache.sshd.client.session.ClientSessionImpl["futureLock"])

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions