Skip to content

How to configure the XTRIM or XADD #8592

Closed
@zhuangzibin

Description

@zhuangzibin

Hi, I am using spring-integration for redis stream, but the data will be kept in memory all the time, resulting in more and more memory, how I configure XTRIM or XADD?
XADD mystream MAXLEN ~ 10 * value
XTRIM mystream MAXLEN ~ 10

1. producerConfig.java

@Slf4j
@Configuration
public class ProducerConfig {

    @Bean
    public FluxMessageChannel fluxMessageChannel() {
        return new FluxMessageChannel();
    }

    /**
     * 生产者 
     * send message to redis stream
     *
     * @param reactiveRedisConnectionFactory reactiveRedisConnectionFactory
     * @return ReactiveRedisStreamMessageHandler
     */
    @Bean
    @ServiceActivator(inputChannel = "fluxMessageChannel", reactive = @Reactive)
    public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext.<String, Object>newSerializationContext()
                .key(new StringRedisSerializer())
                .value(new GenericJackson2JsonRedisSerializer())
                .hashKey(new StringRedisSerializer())
                .hashValue(new StringRedisSerializer())
                .build();
        ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
                new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey");
        reactiveStreamMessageHandler.setSerializationContext(serializationContext);
        reactiveStreamMessageHandler.setExtractPayload(true);
        return reactiveStreamMessageHandler;
    }
}

2. producer.java

@Component
@RequiredArgsConstructor
public class RedisStreamProducer {

    private final FluxMessageChannel fluxMessageChannel;

    public Mono<Boolean> send(BaseMessage downMessage) {
        var message = MessageBuilder.withPayload(downMessage).build();
        return Mono.fromCallable(() -> fluxMessageChannel.send(message));
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions