-
Notifications
You must be signed in to change notification settings - Fork 3
[WIP] scheduler for running operations subsequently #1095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
f14b23e
d7a2b24
0afbb71
0d7e4e5
2c601d8
486360b
d55a53e
5d0a2b2
4c9049c
9559e47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| package ai.lzy.allocator.task; | ||
|
|
||
| import ai.lzy.allocator.alloc.AllocationContext; | ||
| import ai.lzy.allocator.alloc.MountDynamicDiskAction; | ||
| import ai.lzy.allocator.alloc.dao.DynamicMountDao; | ||
| import ai.lzy.allocator.alloc.dao.VmDao; | ||
| import ai.lzy.allocator.model.DynamicMount; | ||
| import ai.lzy.allocator.model.Vm; | ||
| import ai.lzy.longrunning.task.OperationTask; | ||
| import ai.lzy.longrunning.task.OperationTaskScheduler; | ||
| import ai.lzy.longrunning.task.ResolverUtils; | ||
| import ai.lzy.longrunning.task.TypedOperationTaskResolver; | ||
| import ai.lzy.longrunning.task.dao.OperationTaskDao; | ||
| import ai.lzy.model.db.TransactionHandle; | ||
| import jakarta.annotation.Nullable; | ||
| import jakarta.inject.Singleton; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
|
|
||
| import java.sql.SQLException; | ||
| import java.time.Duration; | ||
|
|
||
| @Singleton | ||
| public class MountDynamicDiskResolver implements TypedOperationTaskResolver { | ||
| private static final Logger LOG = LogManager.getLogger(MountDynamicDiskResolver.class); | ||
|
|
||
| private static final String TYPE = "MOUNT"; | ||
| public static final String VM_ID_FIELD = "vm_id"; | ||
| public static final String DYNAMIC_MOUNT_ID_FIELD = "dynamic_mount_id"; | ||
|
|
||
| private final VmDao vmDao; | ||
| private final DynamicMountDao dynamicMountDao; | ||
| private final AllocationContext allocationContext; | ||
| private final OperationTaskDao operationTaskDao; | ||
| private final OperationTaskScheduler taskScheduler; //todo circular dependency | ||
| private final Duration leaseDuration; | ||
|
|
||
| public MountDynamicDiskResolver(VmDao vmDao, DynamicMountDao dynamicMountDao, AllocationContext allocationContext, | ||
| OperationTaskDao operationTaskDao, OperationTaskScheduler taskScheduler, | ||
| Duration leaseDuration) | ||
| //todo mark duration with a qualifier | ||
| { | ||
| this.vmDao = vmDao; | ||
| this.dynamicMountDao = dynamicMountDao; | ||
| this.allocationContext = allocationContext; | ||
| this.operationTaskDao = operationTaskDao; | ||
| this.taskScheduler = taskScheduler; | ||
| this.leaseDuration = leaseDuration; | ||
| } | ||
|
|
||
| @Override | ||
| public Result resolve(OperationTask opTask, @Nullable TransactionHandle tx) throws SQLException { | ||
| var vmId = ResolverUtils.readString(opTask.metadata(), VM_ID_FIELD); | ||
| if (vmId == null) { | ||
| LOG.error("{} field is not present in task {} metadata", VM_ID_FIELD, opTask.id()); | ||
| return Result.BAD_STATE; | ||
| } | ||
| var dynamicMountId = ResolverUtils.readString(opTask.metadata(), DYNAMIC_MOUNT_ID_FIELD); | ||
| if (dynamicMountId == null) { | ||
| LOG.error("{} field is not present in task {} metadata", DYNAMIC_MOUNT_ID_FIELD, opTask.id()); | ||
| return Result.BAD_STATE; | ||
| } | ||
| var vm = vmDao.get(vmId, tx); | ||
| if (vm == null) { | ||
| LOG.error("VM {} is not present for task", vmId); | ||
| return Result.STALE; | ||
| } else if (vm.status() != Vm.Status.RUNNING) { | ||
| LOG.error("VM {} is in wrong status: {}", vmId, vm.status()); | ||
| return Result.STALE; | ||
| } | ||
| var dynamicMount = dynamicMountDao.get(dynamicMountId, false, tx); | ||
| if (dynamicMount == null) { | ||
| LOG.error("Dynamic mount {} is not present for task", dynamicMountId); | ||
| return Result.STALE; | ||
| } else if (dynamicMount.state() != DynamicMount.State.PENDING) { | ||
| LOG.error("Dynamic mount {} is in wrong status: {}", dynamicMount.id(), dynamicMount.state()); | ||
| return Result.STALE; | ||
| } | ||
| return Result.success(new MountDynamicDiskAction(vm, dynamicMount, allocationContext, opTask, operationTaskDao, | ||
| leaseDuration, taskScheduler)); | ||
| } | ||
|
|
||
| @Override | ||
| public String type() { | ||
| return TYPE; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| CREATE TYPE task_status AS ENUM ('PENDING', 'RUNNING', 'FAILED', 'FINISHED', 'STALE'); | ||
|
|
||
| CREATE TYPE task_type AS ENUM ('UNMOUNT', 'MOUNT'); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's should be one type per one action. So this enum could be extended in future migrations to support new types of actions. |
||
|
|
||
| CREATE TABLE IF NOT EXISTS operation_task( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main idea is to provide DB as a single source of truth about order of task execution.
|
||
| id BIGSERIAL NOT NULL, | ||
| name TEXT NOT NULL, | ||
| entity_id TEXT NOT NULL, | ||
| type task_type NOT NULL, | ||
| status task_status NOT NULL, | ||
| created_at TIMESTAMP NOT NULL, | ||
| updated_at TIMESTAMP NOT NULL, | ||
| metadata JSONB NOT NULL, | ||
| operation_id TEXT, | ||
| worker_id TEXT, | ||
| lease_till TIMESTAMP, | ||
| PRIMARY KEY (id), | ||
| FOREIGN KEY (operation_id) REFERENCES operation(id) | ||
| ); | ||
|
|
||
| CREATE INDEX IF NOT EXISTS task_status_entity_id_idx ON operation_task(status, entity_id, id); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| package ai.lzy.longrunning.task; | ||
|
|
||
| import ai.lzy.model.db.TransactionHandle; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import jakarta.annotation.Nullable; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
|
|
||
| import java.sql.SQLException; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class DispatchingOperationTaskResolver implements OperationTaskResolver { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Task resolver that can accept a list of different typed resolver to choose resolver by a task type. |
||
|
|
||
| private static final Logger LOG = LogManager.getLogger(DispatchingOperationTaskResolver.class); | ||
| private final Map<String, TypedOperationTaskResolver> resolvers; | ||
|
|
||
| public DispatchingOperationTaskResolver(List<TypedOperationTaskResolver> resolvers) { | ||
| this.resolvers = generateResolversMap(resolvers); | ||
| } | ||
|
|
||
| private static Map<String, TypedOperationTaskResolver> generateResolversMap( | ||
| List<TypedOperationTaskResolver> resolvers) | ||
| { | ||
| var types = new HashSet<String>(); | ||
| resolvers.forEach(r -> { | ||
| if (!types.add(r.type())) { | ||
| throw new IllegalStateException("Duplicate resolver for type " + r.type()); | ||
| } | ||
| }); | ||
| return resolvers.stream() | ||
| .collect(Collectors.toMap(TypedOperationTaskResolver::type, r -> r)); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| void addResolver(TypedOperationTaskResolver resolver) { | ||
| resolvers.put(resolver.type(), resolver); | ||
| } | ||
|
|
||
| @Override | ||
| public Result resolve(OperationTask operationTask, @Nullable TransactionHandle tx) throws SQLException { | ||
| var resolver = resolvers.get(operationTask.type()); | ||
| if (resolver == null) { | ||
| LOG.error("No resolver for task type {}. Task: {}", operationTask.type(), operationTask); | ||
| return Result.UNKNOWN_TASK; | ||
| } | ||
| try { | ||
| return resolver.resolve(operationTask, tx); | ||
| } catch (Exception e) { | ||
| LOG.error("Error while resolving task {}", operationTask.id(), e); | ||
| return Result.resolveError(e); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| package ai.lzy.longrunning.task; | ||
|
|
||
| import ai.lzy.longrunning.OperationRunnerBase; | ||
| import ai.lzy.longrunning.OperationsExecutor; | ||
| import ai.lzy.longrunning.dao.OperationDao; | ||
| import ai.lzy.longrunning.task.dao.OperationTaskDao; | ||
| import ai.lzy.model.db.Storage; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.Map; | ||
|
|
||
| import static ai.lzy.model.db.DbHelper.withRetries; | ||
|
|
||
| public abstract class OpTaskAwareAction extends OperationRunnerBase { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Type of action that is connected to a task. All inheritants of this class will be executed by task scheduler. |
||
| private final OperationTaskScheduler operationTaskScheduler; | ||
| private final OperationTaskDao operationTaskDao; | ||
| private final Duration leaseDuration; | ||
| private OperationTask operationTask; | ||
|
|
||
| public OpTaskAwareAction(OperationTask operationTask, OperationTaskDao operationTaskDao, Duration leaseDuration, | ||
| String opId, String desc, Storage storage, OperationDao operationsDao, | ||
| OperationsExecutor executor, OperationTaskScheduler operationTaskScheduler) | ||
| { | ||
| super(opId, desc, storage, operationsDao, executor); | ||
| this.operationTask = operationTask; | ||
| this.operationTaskDao = operationTaskDao; | ||
| this.leaseDuration = leaseDuration; | ||
| this.operationTaskScheduler = operationTaskScheduler; | ||
| } | ||
|
|
||
| @Override | ||
| protected Map<String, String> prepareLogContext() { | ||
| var ctx = super.prepareLogContext(); | ||
| ctx.put("task_id", String.valueOf(operationTask.id())); | ||
| ctx.put("task_type", operationTask.type()); | ||
| ctx.put("task_name", operationTask.name()); | ||
| ctx.put("task_entity_id", operationTask.entityId()); | ||
| return ctx; | ||
| } | ||
|
|
||
| protected OperationTask task() { | ||
| return operationTask; | ||
| } | ||
|
|
||
| public void setTask(OperationTask operationTask) { | ||
| this.operationTask = operationTask; | ||
| } | ||
|
|
||
| @Override | ||
| protected void beforeStep() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New step in operation execution to update task lease deadline |
||
| super.beforeStep(); | ||
| try { | ||
| operationTask = withRetries(log(), () -> operationTaskDao.updateLease(operationTask.id(), leaseDuration, | ||
| null)); | ||
| } catch (Exception e) { | ||
| log().error("{} Couldn't update lease on task {}", logPrefix(), task().id()); | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void notifyFinished() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Task should be moved to a final status on operation finish |
||
| var builder = OperationTask.Update.builder(); | ||
| if (isFailed()) { | ||
| builder.status(OperationTask.Status.FAILED); | ||
| } else { | ||
| builder.status(OperationTask.Status.FINISHED); | ||
| } | ||
| try { | ||
| operationTask = withRetries(log(), () -> operationTaskDao.update(operationTask.id(), builder.build(), | ||
| null)); | ||
| } catch (Exception e) { | ||
| log().error("{} Couldn't finish operation task {}", logPrefix(), task().id()); | ||
| } | ||
| operationTaskScheduler.releaseTask(task()); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaseDurationis not a beanThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, but it's just an example. It still requires fixes for circular dependencies and this configuration