Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@
<parent>
<groupId>today.bonfire.oss</groupId>
<artifactId>bonfire-oss-parent</artifactId>
<version>1.1.8</version>
<version>1.1.11</version>
</parent>

<groupId>today.bonfire.oss</groupId>
<artifactId>bth4j</artifactId>
<version>2.3.0</version>
<version>2.4.3-ALPHA</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<module.name>bonfire.oss.bth4j</module.name>
<jedis.version>5.2.1</jedis.version>
<jedis.version>5.2.2</jedis.version>
<cron-utils.version>9.2.1</cron-utils.version>
</properties>

Expand Down Expand Up @@ -63,6 +63,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>


Expand All @@ -82,7 +83,7 @@
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.53</version>
<version>2.0.56</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
143 changes: 76 additions & 67 deletions src/main/java/today/bonfire/oss/bth4j/service/TaskOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ Task getTaskFromQueue(String queueName) {
return new Task(t, eventParser);
}

public void addTaskToQueue(Task task, Object data) {
addTaskToQueue(task, data, false);
}

/**
*
*/
Expand All @@ -86,65 +82,10 @@ void addTaskToQueue(Task task, Object data, boolean isDataRaw) {
log.info("Added {} to queue: {}", task, queueName);
}

public String addRecurringTask(Task task) {
jedis.hset(keys.RECURRING_TASK_SET, task.taskString(), Instant.now().getEpochSecond() + "");
log.info("Added recurring task: {}", task);
return task.taskString();
}

public void deleteRecurringTask(Task task) {
jedis.hdel(keys.RECURRING_TASK_SET, task.taskString());
log.info("Deleted recurring task: {}", task);
}

/**
* @param uniqueId unique identifier for the task
* @return data associated with the given task id.
* Will return null if task id is blank or no data is associated with the given task id.
* This will not do any conversion of the data to the desired type. just the pure string value.
* Use {@link #getDataForTask(String, Class)} for type conversion
*/
public String getDataForTask(String uniqueId) {
if (StringUtils.isBlank(uniqueId)) return null;
return jedis.get(keys.DATA + uniqueId);
}

/**
* Gets the data associated with the given task id.
* This will try to convert the data to the given type.
* If the task id is blank or no data is associated with the given task id or
* if the type conversion fails, the method will return throw an exception.
*
* @param uniqueId unique identifier for the task
* @param clazz the desired type of the data
* @return data associated with the given task id, converted to the given type.
* May return null.
*/
public <T> T getDataForTask(String uniqueId, Class<T> clazz) {
if (StringUtils.isBlank(uniqueId) || clazz == null) {
return null;
}
try {
var r = jedis.get(keys.DATA + uniqueId);
return mapper.fromJson(r, clazz);
} catch (Exception e) {
log.error("Error getting data for task: {}", uniqueId, e);
throw new TaskDataException("Error getting data for task: " + uniqueId);
}
}

public <T> T getDataForTask(Task task, Class<T> clazz) {
return getDataForTask(task.uniqueId(), clazz);
}

int incrementRetryCount(String uniqueId) {
return (int) jedis.hincrBy(keys.TASK_RETRY_COUNT, uniqueId, 1);
}

public void updateExecutionTimeForRecurringTasks(String key, Instant instant) {
jedis.hset(keys.RECURRING_TASK_SET, key, String.valueOf(instant.getEpochSecond()));
}

String getLock(String lockName) {
return jedis.get(lockName);
}
Expand All @@ -163,10 +104,6 @@ void releaseLock(String lockDelayedTasks) {
jedis.del(lockDelayedTasks);
}

public long queueSize(String queueName) {
return jedis.llen(queueName);
}

void deleteTaskFromInProgressQueue(Task task) {
try (var pipeline = jedis.pipelined()) {
pipeline.del(keys.DATA + task.uniqueId());
Expand Down Expand Up @@ -208,10 +145,6 @@ void deleteFromRotationListAndAddToQueue(List<String> items) {
}
}

public long getNumberOfTaskInProgress() {
return jedis.zcard(keys.IN_PROGRESS_TASKS);
}

ScanResult<Tuple> scanSortedSet(String key, String cursor) {
return jedis.zscan(key, cursor);
}
Expand Down Expand Up @@ -243,6 +176,7 @@ void deleteFromInProgressQueueAndAddToQueue(
transaction.expire(keys.DATA + t.uniqueId(), THC.Time.T_30_DAYS);
transaction.hdel(keys.TASK_RETRY_COUNT, t.uniqueId());
});
transaction.exec();
}
}

Expand Down Expand Up @@ -277,4 +211,79 @@ void removeTasksFromSortedSetBasedOnTime(String key, Instant time) {
});
} while (true);
}

public void addTaskToQueue(Task task, Object data) {
addTaskToQueue(task, data, false);
}

public String addRecurringTask(Task task) {
jedis.hset(keys.RECURRING_TASK_SET, task.taskString(), Instant.now().getEpochSecond() + "");
log.info("Added recurring task: {}", task);
return task.taskString();
}

public void deleteRecurringTask(Task task) {
jedis.hdel(keys.RECURRING_TASK_SET, task.taskString());
log.info("Deleted recurring task: {}", task);
}

public void deleteAllRecurringTasks() {
log.info("{} recurring tasks found", jedis.hlen(keys.RECURRING_TASK_SET));
jedis.del(keys.RECURRING_TASK_SET);
log.info("Deleted all recurring tasks");
}

/**
* @param uniqueId unique identifier for the task
*
* @return data associated with the given task id.
* Will return null if task id is blank or no data is associated with the given task id.
* This will not do any conversion of the data to the desired type. just the pure string value.
* Use {@link #getDataForTask(String, Class)} for type conversion
*/
public String getDataForTask(String uniqueId) {
if (StringUtils.isBlank(uniqueId)) return null;
return jedis.get(keys.DATA + uniqueId);
}

/**
* Gets the data associated with the given task id.
* This will try to convert the data to the given type.
* If the task id is blank or no data is associated with the given task id or
* if the type conversion fails, the method will return throw an exception.
*
* @param uniqueId unique identifier for the task
* @param clazz the desired type of the data
*
* @return data associated with the given task id, converted to the given type.
* May return null.
*/
public <T> T getDataForTask(String uniqueId, Class<T> clazz) {
if (StringUtils.isBlank(uniqueId) || clazz == null) {
return null;
}
try {
var r = jedis.get(keys.DATA + uniqueId);
return mapper.fromJson(r, clazz);
} catch (Exception e) {
log.error("Error getting data for task: {}", uniqueId, e);
throw new TaskDataException("Error getting data for task: " + uniqueId);
}
}

public <T> T getDataForTask(Task task, Class<T> clazz) {
return getDataForTask(task.uniqueId(), clazz);
}

public void updateExecutionTimeForRecurringTasks(String key, Instant instant) {
jedis.hset(keys.RECURRING_TASK_SET, key, String.valueOf(instant.getEpochSecond()));
}

public long queueSize(String queueName) {
return jedis.llen(queueName);
}

public long getNumberOfTaskInProgress() {
return jedis.zcard(keys.IN_PROGRESS_TASKS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,29 @@ public class TaskRunnerWrapper implements Runnable {
this.taskOps = taskOps;
}

private void rescheduleTask(Task task, long delay) {
var newTask = Task.Builder.newTask()
.event(task.event())
.accountId(task.accountId())
.queueName(task.queueName())
.executeAfter(delay)
.build();
taskOps.addTaskToQueue(newTask, taskOps.getDataForTask(task.uniqueId()), true);
log.info("Rescheduling old task {} to run after {} seconds, new task id {}",
task.uniqueId(), delay, newTask.uniqueId());
deleteTask(task);
}

private void deleteTask(Task task) {
// delete the task and data if present
taskOps.deleteTaskFromInProgressQueue(task);
}

private void moveToDeadQueue(Task task) {
// move to dead list if task failed because of BGTaskUnrecoverableException
taskOps.moveToDeadQueue(task);
}

/**
* if task fails it may be retired automatically by maintenance service
* unless the exception is UnrecoverableException in which case the task is marked
Expand All @@ -41,44 +64,21 @@ public void run() {
callbacks.onSuccess().accept(task);
} catch (Exception e) {
callbacks.onError().accept(task, e);
log.error("Task {} failed", task.taskString(), e);
if (e instanceof TaskUnrecoverableException || e instanceof TaskDataException) {
log.error("Task {} failed", task.taskString(), e);
// task will not be retried
moveToDeadQueue(task);
} else if (e instanceof TaskErrorException) {
// task can be retried
log.info("Task will be retried");
log.warn("Task {} failed", task.taskString(), e);
} else if (e instanceof TaskRescheduleException ex) {
rescheduleTask(task, ex.delay());
} else {
// unknown exception task can be retried
log.info("Unhandled exception. Task will be retried");
// unknown exception task may be retried
log.warn("Unhandled exception. Task will be retried", e);
}
} finally {
callbacks.afterTask().accept(task);
}
}

private void rescheduleTask(Task task, long delay) {
var newTask = Task.Builder.newTask()
.event(task.event())
.accountId(task.accountId())
.queueName(task.queueName())
.executeAfter(delay)
.build();
taskOps.addTaskToQueue(newTask, taskOps.getDataForTask(task.uniqueId()), true);
log.info("Rescheduling old task {} to run after {} seconds, new task id {}",
task.uniqueId(), delay, newTask.uniqueId());
deleteTask(task);
}

private void deleteTask(Task task) {
// delete the task and data if present
taskOps.deleteTaskFromInProgressQueue(task);
}

private void moveToDeadQueue(Task task) {
// move to dead list if task failed because of BGTaskUnrecoverableException
taskOps.moveToDeadQueue(task);
}
}