Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,6 @@ protected boolean isInjectedError(Error e) {
return e instanceof InjectedFailures.TerminateException;
}

@Override
protected void notifyExpired() {
}

@Override
protected void notifyFinished() {
}

@Override
protected void onExpired(TransactionHandle tx) {
throw new RuntimeException("Unexpected, sessionId: '%s', op: '%s'".formatted(sessionId, id()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import ai.lzy.allocator.volume.VolumeManager;
import ai.lzy.logs.LogContextKey;
import ai.lzy.longrunning.Operation;
import ai.lzy.longrunning.OperationRunnerBase;
import ai.lzy.longrunning.task.OpTaskAwareAction;
import ai.lzy.longrunning.task.OperationTask;
import ai.lzy.longrunning.task.OperationTaskScheduler;
import ai.lzy.longrunning.task.dao.OperationTaskDao;
import ai.lzy.model.db.TransactionHandle;
import ai.lzy.v1.VmAllocatorApi;
import com.google.protobuf.Any;
Expand All @@ -17,6 +20,7 @@
import jakarta.annotation.Nullable;

import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -25,7 +29,7 @@

import static ai.lzy.model.db.DbHelper.withRetries;

public final class MountDynamicDiskAction extends OperationRunnerBase {
public final class MountDynamicDiskAction extends OpTaskAwareAction {
private final AllocationContext allocationContext;
private final VolumeManager volumeManager;
private final MountHolderManager mountHolderManager;
Expand All @@ -42,9 +46,13 @@ public final class MountDynamicDiskAction extends OperationRunnerBase {
private Long nextId;
private boolean mountPodsDeleted;

public MountDynamicDiskAction(Vm vm, DynamicMount dynamicMount, AllocationContext allocationContext) {
super(dynamicMount.mountOperationId(), String.format("Mount %s to VM %s", dynamicMount.mountName(), vm.vmId()),
allocationContext.storage(), allocationContext.operationsDao(), allocationContext.executor());
public MountDynamicDiskAction(Vm vm, DynamicMount dynamicMount, AllocationContext allocationContext,
OperationTask operationTask, OperationTaskDao operationTaskDao,
Duration leaseDuration, OperationTaskScheduler operationTaskScheduler)
{
super(operationTask, operationTaskDao, leaseDuration, dynamicMount.mountOperationId(),
String.format("Mount %s to VM %s", dynamicMount.mountName(), vm.vmId()), allocationContext.storage(),
allocationContext.operationsDao(), allocationContext.executor(), operationTaskScheduler);
this.dynamicMount = dynamicMount;
this.vm = vm;
this.allocationContext = allocationContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package ai.lzy.allocator.task;

import ai.lzy.allocator.alloc.AllocationContext;
import ai.lzy.allocator.alloc.MountDynamicDiskAction;
import ai.lzy.allocator.alloc.dao.DynamicMountDao;
import ai.lzy.allocator.alloc.dao.VmDao;
import ai.lzy.allocator.model.DynamicMount;
import ai.lzy.allocator.model.Vm;
import ai.lzy.longrunning.task.OperationTask;
import ai.lzy.longrunning.task.OperationTaskScheduler;
import ai.lzy.longrunning.task.ResolverUtils;
import ai.lzy.longrunning.task.TypedOperationTaskResolver;
import ai.lzy.longrunning.task.dao.OperationTaskDao;
import ai.lzy.model.db.TransactionHandle;
import jakarta.annotation.Nullable;
import jakarta.inject.Singleton;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.sql.SQLException;
import java.time.Duration;

@Singleton
public class MountDynamicDiskResolver implements TypedOperationTaskResolver {
private static final Logger LOG = LogManager.getLogger(MountDynamicDiskResolver.class);

private static final String TYPE = "MOUNT";
public static final String VM_ID_FIELD = "vm_id";
public static final String DYNAMIC_MOUNT_ID_FIELD = "dynamic_mount_id";

private final VmDao vmDao;
private final DynamicMountDao dynamicMountDao;
private final AllocationContext allocationContext;
private final OperationTaskDao operationTaskDao;
private final OperationTaskScheduler taskScheduler; //todo circular dependency
private final Duration leaseDuration;

public MountDynamicDiskResolver(VmDao vmDao, DynamicMountDao dynamicMountDao, AllocationContext allocationContext,
OperationTaskDao operationTaskDao, OperationTaskScheduler taskScheduler,
Duration leaseDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaseDuration is not a bean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but it's just an example. It still requires fixes for circular dependencies and this configuration

//todo mark duration with a qualifier
{
this.vmDao = vmDao;
this.dynamicMountDao = dynamicMountDao;
this.allocationContext = allocationContext;
this.operationTaskDao = operationTaskDao;
this.taskScheduler = taskScheduler;
this.leaseDuration = leaseDuration;
}

@Override
public Result resolve(OperationTask opTask, @Nullable TransactionHandle tx) throws SQLException {
var vmId = ResolverUtils.readString(opTask.metadata(), VM_ID_FIELD);
if (vmId == null) {
LOG.error("{} field is not present in task {} metadata", VM_ID_FIELD, opTask.id());
return Result.BAD_STATE;
}
var dynamicMountId = ResolverUtils.readString(opTask.metadata(), DYNAMIC_MOUNT_ID_FIELD);
if (dynamicMountId == null) {
LOG.error("{} field is not present in task {} metadata", DYNAMIC_MOUNT_ID_FIELD, opTask.id());
return Result.BAD_STATE;
}
var vm = vmDao.get(vmId, tx);
if (vm == null) {
LOG.error("VM {} is not present for task", vmId);
return Result.STALE;
} else if (vm.status() != Vm.Status.RUNNING) {
LOG.error("VM {} is in wrong status: {}", vmId, vm.status());
return Result.STALE;
}
var dynamicMount = dynamicMountDao.get(dynamicMountId, false, tx);
if (dynamicMount == null) {
LOG.error("Dynamic mount {} is not present for task", dynamicMountId);
return Result.STALE;
} else if (dynamicMount.state() != DynamicMount.State.PENDING) {
LOG.error("Dynamic mount {} is in wrong status: {}", dynamicMount.id(), dynamicMount.state());
return Result.STALE;
}
return Result.success(new MountDynamicDiskAction(vm, dynamicMount, allocationContext, opTask, operationTaskDao,
leaseDuration, taskScheduler));
}

@Override
public String type() {
return TYPE;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TYPE task_status AS ENUM ('PENDING', 'RUNNING', 'FAILED', 'FINISHED', 'STALE');

CREATE TYPE task_type AS ENUM ('UNMOUNT', 'MOUNT');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's should be one type per one action. So this enum could be extended in future migrations to support new types of actions.


CREATE TABLE IF NOT EXISTS operation_task(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main idea is to provide DB as a single source of truth about order of task execution.
Here's short explaination of operation_task fields:

  • id - is bigserial and thus generated on insert of task. This is the main way to present order among certain tasks (see entity_id).
  • name - for debug and readability purposes
  • entity_id - this is the way to group tasks by some user-generated text id. Tasks with same entity_id are executed subsequently according the id field (in ascending order). Thus, task with smaller id will be executed first. Tasks with different entity_id can be executed in parallel.
  • type - is necessary to match code representation of a task
  • status - status of a task.
  • created_at, updated_at - self-explainatory, for debug purposes
  • metadata - JSON to keep task arguments and other useful information about the task. The content of this field is defined by user and parsed mainly depending by the type.
  • operation_id - an operation that is linked to a task. Contains all details about execution. There should be (0-1) <-> 1 relation between a task and an operation.
  • worker_id - name of the instance that captured a task. This is needed to ensure that a task is executed just once.
  • lease_till - deadline for scheduler instance to execute this task. Scheduler instance should update lease_till field. In case of instance death or any other reason that make instance impossible to finish a task, another scheduler instance can "capture" the task with expired lease_till deadline and replace worker_id field.

id BIGSERIAL NOT NULL,
name TEXT NOT NULL,
entity_id TEXT NOT NULL,
type task_type NOT NULL,
status task_status NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
metadata JSONB NOT NULL,
operation_id TEXT,
worker_id TEXT,
lease_till TIMESTAMP,
PRIMARY KEY (id),
FOREIGN KEY (operation_id) REFERENCES operation(id)
);

CREATE INDEX IF NOT EXISTS task_status_entity_id_idx ON operation_task(status, entity_id, id);
10 changes: 10 additions & 0 deletions lzy/long-running/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.zonky.test</groupId>
<artifactId>embedded-postgres</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public abstract class OperationRunnerBase extends ContextAwareTask {
private final OperationDao operationsDao;
private final OperationsExecutor executor;
private Operation op;
private volatile boolean failed = false;

protected OperationRunnerBase(String id, String descr, Storage storage, OperationDao operationsDao,
OperationsExecutor executor)
Expand All @@ -55,6 +56,7 @@ protected final void execute() {
}

for (var step : steps()) {
beforeStep();
final var stepResult = step.get();
switch (stepResult.code()) {
case ALREADY_DONE -> { }
Expand Down Expand Up @@ -83,6 +85,7 @@ protected final void execute() {
}
}
} catch (Throwable e) {
setFailed();
notifyFinished();
if (e instanceof Error err && isInjectedError(err)) {
log.error("{} Terminated by InjectedFailure exception: {}", logPrefix, e.getMessage());
Expand All @@ -98,6 +101,18 @@ protected final void execute() {
}
}

protected void setFailed() {
failed = true;
}

protected boolean isFailed() {
return failed;
}

protected void beforeStep() {

}

protected Map<String, String> prepareLogContext() {
var ctx = super.prepareLogContext();
ctx.put(LogContextKey.OPERATION_ID, id);
Expand Down Expand Up @@ -276,6 +291,7 @@ protected void notifyFinished() {
}

protected final void failOperation(Status status, @Nullable TransactionHandle tx) throws SQLException {
setFailed();
operationsDao.fail(id, toProto(status), tx);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package ai.lzy.longrunning.task;

import ai.lzy.model.db.TransactionHandle;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nullable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DispatchingOperationTaskResolver implements OperationTaskResolver {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task resolver that can accept a list of different typed resolver to choose resolver by a task type.


private static final Logger LOG = LogManager.getLogger(DispatchingOperationTaskResolver.class);
private final Map<String, TypedOperationTaskResolver> resolvers;

public DispatchingOperationTaskResolver(List<TypedOperationTaskResolver> resolvers) {
this.resolvers = generateResolversMap(resolvers);
}

private static Map<String, TypedOperationTaskResolver> generateResolversMap(
List<TypedOperationTaskResolver> resolvers)
{
var types = new HashSet<String>();
resolvers.forEach(r -> {
if (!types.add(r.type())) {
throw new IllegalStateException("Duplicate resolver for type " + r.type());
}
});
return resolvers.stream()
.collect(Collectors.toMap(TypedOperationTaskResolver::type, r -> r));
}

@VisibleForTesting
void addResolver(TypedOperationTaskResolver resolver) {
resolvers.put(resolver.type(), resolver);
}

@Override
public Result resolve(OperationTask operationTask, @Nullable TransactionHandle tx) throws SQLException {
var resolver = resolvers.get(operationTask.type());
if (resolver == null) {
LOG.error("No resolver for task type {}. Task: {}", operationTask.type(), operationTask);
return Result.UNKNOWN_TASK;
}
try {
return resolver.resolve(operationTask, tx);
} catch (Exception e) {
LOG.error("Error while resolving task {}", operationTask.id(), e);
return Result.resolveError(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ai.lzy.longrunning.task;

import ai.lzy.longrunning.OperationRunnerBase;
import ai.lzy.longrunning.OperationsExecutor;
import ai.lzy.longrunning.dao.OperationDao;
import ai.lzy.longrunning.task.dao.OperationTaskDao;
import ai.lzy.model.db.Storage;

import java.time.Duration;
import java.util.Map;

import static ai.lzy.model.db.DbHelper.withRetries;

public abstract class OpTaskAwareAction extends OperationRunnerBase {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type of action that is connected to a task. All inheritants of this class will be executed by task scheduler.

private final OperationTaskScheduler operationTaskScheduler;
private final OperationTaskDao operationTaskDao;
private final Duration leaseDuration;
private OperationTask operationTask;

public OpTaskAwareAction(OperationTask operationTask, OperationTaskDao operationTaskDao, Duration leaseDuration,
String opId, String desc, Storage storage, OperationDao operationsDao,
OperationsExecutor executor, OperationTaskScheduler operationTaskScheduler)
{
super(opId, desc, storage, operationsDao, executor);
this.operationTask = operationTask;
this.operationTaskDao = operationTaskDao;
this.leaseDuration = leaseDuration;
this.operationTaskScheduler = operationTaskScheduler;
}

@Override
protected Map<String, String> prepareLogContext() {
var ctx = super.prepareLogContext();
ctx.put("task_id", String.valueOf(operationTask.id()));
ctx.put("task_type", operationTask.type());
ctx.put("task_name", operationTask.name());
ctx.put("task_entity_id", operationTask.entityId());
return ctx;
}

protected OperationTask task() {
return operationTask;
}

public void setTask(OperationTask operationTask) {
this.operationTask = operationTask;
}

@Override
protected void beforeStep() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New step in operation execution to update task lease deadline

super.beforeStep();
try {
operationTask = withRetries(log(), () -> operationTaskDao.updateLease(operationTask.id(), leaseDuration,
null));
} catch (Exception e) {
log().error("{} Couldn't update lease on task {}", logPrefix(), task().id());
throw new RuntimeException(e);
}
}

@Override
protected void notifyFinished() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task should be moved to a final status on operation finish

var builder = OperationTask.Update.builder();
if (isFailed()) {
builder.status(OperationTask.Status.FAILED);
} else {
builder.status(OperationTask.Status.FINISHED);
}
try {
operationTask = withRetries(log(), () -> operationTaskDao.update(operationTask.id(), builder.build(),
null));
} catch (Exception e) {
log().error("{} Couldn't finish operation task {}", logPrefix(), task().id());
}
operationTaskScheduler.releaseTask(task());
}
}
Loading