Closed
Description
Hi, I am using spring-integration for redis stream, but the data will be kept in memory all the time, resulting in more and more memory, how I configure XTRIM or XADD?
XADD mystream MAXLEN ~ 10 * value
XTRIM mystream MAXLEN ~ 10
1. producerConfig.java
@Slf4j
@Configuration
public class ProducerConfig {
@Bean
public FluxMessageChannel fluxMessageChannel() {
return new FluxMessageChannel();
}
/**
* 生产者
* send message to redis stream
*
* @param reactiveRedisConnectionFactory reactiveRedisConnectionFactory
* @return ReactiveRedisStreamMessageHandler
*/
@Bean
@ServiceActivator(inputChannel = "fluxMessageChannel", reactive = @Reactive)
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
RedisSerializationContext<String, Object> serializationContext = RedisSerializationContext.<String, Object>newSerializationContext()
.key(new StringRedisSerializer())
.value(new GenericJackson2JsonRedisSerializer())
.hashKey(new StringRedisSerializer())
.hashValue(new StringRedisSerializer())
.build();
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey");
reactiveStreamMessageHandler.setSerializationContext(serializationContext);
reactiveStreamMessageHandler.setExtractPayload(true);
return reactiveStreamMessageHandler;
}
}
2. producer.java
@Component
@RequiredArgsConstructor
public class RedisStreamProducer {
private final FluxMessageChannel fluxMessageChannel;
public Mono<Boolean> send(BaseMessage downMessage) {
var message = MessageBuilder.withPayload(downMessage).build();
return Mono.fromCallable(() -> fluxMessageChannel.send(message));
}
}