Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ ext {

subprojects {
group = 'com.github.sonus21'
version = '2.13.1-RELEASE'
version = '2.13.2-SNAPSHOT'

dependencies {
// https://mvnrepository.com/artifact/org.springframework/spring-messaging
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.config;

import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Configuration
@Getter
@Setter
public class RqueueEventBusConfig {

@Value("${rqueue.event.bus.core.pool.size:2}")
private int corePoolSize;

@Value("${rqueue.event.bus.max.pool.size:10}")
private int maxPoolSize;

@Value("${rqueue.event.bus.queue.capacity:100}")
private int queueCapacity;

@Value("${rqueue.event.bus.keep.alive.time:60000}")
private int keepAliveTime;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler;
import com.github.sonus21.rqueue.core.eventbus.EventBusErrorHandler;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
import com.github.sonus21.rqueue.dao.RqueueStringDao;
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
Expand All @@ -37,6 +39,8 @@
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
import com.github.sonus21.rqueue.utils.pebble.ResourceLoader;
import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.mitchellbosecke.pebble.PebbleEngine;
import com.mitchellbosecke.pebble.spring.extension.SpringExtension;
import com.mitchellbosecke.pebble.spring.reactive.PebbleReactiveViewResolver;
Expand All @@ -45,11 +49,13 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs
Expand Down Expand Up @@ -105,8 +111,8 @@ protected MessageConverterProvider getMessageConverterProvider() {
* Database for different ops.
*
* @param beanFactory configurable bean factory
* @param versionKey Rqueue db version key
* @param dbVersion database version
* @param versionKey Rqueue db version key
* @param dbVersion database version
* @return {@link RedisConnectionFactory} object.
*/
@Bean
Expand Down Expand Up @@ -150,6 +156,11 @@ public RqueueWebConfig rqueueWebConfig() {
return new RqueueWebConfig();
}

@Bean
public RqueueEventBusConfig rqueueEventBusConfig() {
return new RqueueEventBusConfig();
}

@Bean
public RqueueSchedulerConfig rqueueSchedulerConfig() {
return new RqueueSchedulerConfig();
Expand Down Expand Up @@ -190,8 +201,14 @@ public RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory()
* @return {@link ScheduledQueueMessageScheduler} object
*/
@Bean
public ScheduledQueueMessageScheduler scheduledMessageScheduler() {
return new ScheduledQueueMessageScheduler();
public ScheduledQueueMessageScheduler scheduledMessageScheduler(
RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig,
RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
@Qualifier("rqueueRedisLongTemplate")
RedisTemplate<String, Long> redisTemplate) {
return new ScheduledQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate);
}

/**
Expand All @@ -201,8 +218,14 @@ public ScheduledQueueMessageScheduler scheduledMessageScheduler() {
* @return {@link ProcessingQueueMessageScheduler} object
*/
@Bean
public ProcessingQueueMessageScheduler processingMessageScheduler() {
return new ProcessingQueueMessageScheduler();
public ProcessingQueueMessageScheduler processingMessageScheduler(
RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig,
RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
@Qualifier("rqueueRedisLongTemplate")
RedisTemplate<String, Long> redisTemplate) {
return new ProcessingQueueMessageScheduler(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory, redisTemplate);
}

@Bean
Expand Down Expand Up @@ -265,12 +288,27 @@ public RqueueInternalPubSubChannel rqueueInternalPubSubChannel(
RqueueConfig rqueueConfig,
RqueueBeanProvider rqueueBeanProvider,
@Qualifier("stringRqueueRedisTemplate")
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
return new RqueueInternalPubSubChannel(
rqueueRedisListenerContainerFactory,
rqueueMessageListenerContainer,
rqueueConfig,
stringRqueueRedisTemplate,
rqueueBeanProvider);
}

@Bean
public RqueueEventBus rqueueEventBus(ApplicationEventPublisher applicationEventPublisher,
RqueueEventBusConfig rqueueEventBusConfig) {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(rqueueEventBusConfig.getCorePoolSize());
threadPoolTaskExecutor.setMaxPoolSize(rqueueEventBusConfig.getMaxPoolSize());
threadPoolTaskExecutor.setKeepAliveSeconds(rqueueEventBusConfig.getKeepAliveTime());
threadPoolTaskExecutor.setQueueCapacity(rqueueEventBusConfig.getQueueCapacity());
threadPoolTaskExecutor.setThreadNamePrefix("RqueueEventBusAsyncExecutor-");
threadPoolTaskExecutor.initialize();
EventBus eventBus = new AsyncEventBus(threadPoolTaskExecutor);
eventBus.register(new EventBusErrorHandler());
return new RqueueEventBus(eventBus, applicationEventPublisher);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 Sonu Kumar
* Copyright (c) 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,13 @@
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.RedisScriptFactory.ScriptType;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.ThreadUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
Expand All @@ -33,29 +36,23 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultScriptExecutor;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public abstract class MessageScheduler implements DisposableBean,
ApplicationListener<RqueueBootstrapEvent> {
public abstract class MessageScheduler implements DisposableBean {

private final Object monitor = new Object();
@Autowired
protected RqueueSchedulerConfig rqueueSchedulerConfig;
@Autowired
protected RqueueConfig rqueueConfig;
protected final RqueueSchedulerConfig rqueueSchedulerConfig;
protected final RqueueConfig rqueueConfig;
private final RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
private final RedisTemplate<String, Long> redisTemplate;
private RedisScript<Long> redisScript;
private DefaultScriptExecutor<String> defaultScriptExecutor;
private Map<String, Boolean> queueRunningState;
Expand All @@ -66,14 +63,21 @@ public abstract class MessageScheduler implements DisposableBean,
protected RedisScheduleTriggerHandler redisScheduleTriggerHandler;

private ThreadPoolTaskScheduler scheduler;
@Autowired
private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;

@Autowired
@Qualifier("rqueueRedisLongTemplate")
private RedisTemplate<String, Long> redisTemplate;
private Map<String, Integer> errorCount;

protected MessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig,
RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
RedisTemplate<String, Long> redisTemplate) {
this.rqueueSchedulerConfig = rqueueSchedulerConfig;
this.rqueueConfig = rqueueConfig;
this.rqueueRedisListenerContainerFactory = rqueueRedisListenerContainerFactory;
this.redisTemplate = redisTemplate;
eventBus.register(this);
}

protected abstract Logger getLogger();

protected abstract long getNextScheduleTime(String queueName, long currentTime, Long value);
Expand Down Expand Up @@ -224,9 +228,9 @@ private void initQueue(String queueName) {
queueRunningState.put(queueName, false);
}

@Override
@Async
@Subscribe
public void onApplicationEvent(RqueueBootstrapEvent event) {
getLogger().info("{} Even received", event);
synchronized (monitor) {
doStop();
if (!rqueueSchedulerConfig.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,20 +18,31 @@

import static java.lang.Long.max;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.listener.QueueDetail;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.github.sonus21.rqueue.utils.Constants;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.springframework.data.redis.core.RedisTemplate;

@Slf4j
public class ProcessingQueueMessageScheduler extends MessageScheduler {

private Map<String, Long> queueNameToDelay;

public ProcessingQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig, RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
RedisTemplate<String, Long> redisTemplate) {
super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory,
redisTemplate);
}


@Override
protected void initialize() {
super.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import com.github.sonus21.rqueue.common.RqueueLockManager;
import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueWebConfig;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import com.github.sonus21.rqueue.core.support.MessageProcessor;
import com.github.sonus21.rqueue.dao.RqueueJobDao;
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
Expand All @@ -28,7 +29,6 @@
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;

@Getter
@Setter
Expand All @@ -38,7 +38,7 @@ public class RqueueBeanProvider {
@Autowired private RqueueSystemConfigDao rqueueSystemConfigDao;
@Autowired private RqueueJobDao rqueueJobDao;
@Autowired private RqueueWebConfig rqueueWebConfig;
@Autowired private ApplicationEventPublisher applicationEventPublisher;
@Autowired private RqueueEventBus rqueueEventBus;
@Autowired private RqueueLockManager rqueueLockManager;

@Autowired(required = false)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 Sonu Kumar
* Copyright 2023 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,25 @@

package com.github.sonus21.rqueue.core;

import com.github.sonus21.rqueue.config.RqueueConfig;
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
import com.github.sonus21.rqueue.core.eventbus.RqueueEventBus;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.springframework.data.redis.core.RedisTemplate;

@Slf4j
public class ScheduledQueueMessageScheduler extends MessageScheduler {


public ScheduledQueueMessageScheduler(RqueueSchedulerConfig rqueueSchedulerConfig,
RqueueConfig rqueueConfig, RqueueEventBus eventBus,
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
RedisTemplate<String, Long> redisTemplate) {
super(rqueueSchedulerConfig, rqueueConfig, eventBus, rqueueRedisListenerContainerFactory,
redisTemplate);
}

@Override
protected Logger getLogger() {
return log;
Expand Down
Loading