Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/remove memory runner #3948

Merged
merged 11 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.runner.memory.MemoryRunner;
import io.kestra.core.runners.StandAloneRunner;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
Expand Down Expand Up @@ -71,7 +71,7 @@ private static Path generateTempDir() {
public Integer call() throws Exception {
super.call();

MemoryRunner runner = applicationContext.getBean(MemoryRunner.class);
StandAloneRunner runner = applicationContext.getBean(StandAloneRunner.class);
LocalFlowRepositoryLoader repositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
FlowRepositoryInterface flowRepository = applicationContext.getBean(FlowRepositoryInterface.class);
FlowInputOutput flowInputOutput = applicationContext.getBean(FlowInputOutput.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Integer call() throws Exception {
if (flowPath != null) {
try {
LocalFlowRepositoryLoader localFlowRepositoryLoader = applicationContext.getBean(LocalFlowRepositoryLoader.class);
localFlowRepositoryLoader.load(this.flowPath, true);
localFlowRepositoryLoader.load(this.flowPath);
} catch (IOException e) {
throw new CommandLine.ParameterException(this.spec.commandLine(), "Invalid flow path", e);
}
Expand Down
50 changes: 0 additions & 50 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,56 +123,6 @@ kestra:
delay: 1s
maxDelay: ""

jdbc:
tables:
queues:
table: "queues"
flows:
table: "flows"
cls: io.kestra.core.models.flows.Flow
executions:
table: "executions"
cls: io.kestra.core.models.executions.Execution
templates:
table: "templates"
cls: io.kestra.core.models.templates.Template
triggers:
table: "triggers"
cls: io.kestra.core.models.triggers.Trigger
logs:
table: "logs"
cls: io.kestra.core.models.executions.LogEntry
metrics:
table: "metrics"
cls: io.kestra.core.models.executions.MetricEntry
multipleconditions:
table: "multipleconditions"
cls: io.kestra.core.models.triggers.multipleflows.MultipleConditionWindow
subflow_executions:
table: "subflow_executions"
cls: io.kestra.core.runners.SubflowExecution
executorstate:
table: "executorstate"
cls: io.kestra.core.runners.ExecutorState
executordelayed:
table: "executordelayed"
cls: io.kestra.core.runners.ExecutionDelay
settings:
table: "settings"
cls: io.kestra.core.models.Setting
flowtopologies:
table: "flow_topologies"
cls: io.kestra.core.models.topologies.FlowTopology
serviceinstance:
cls: io.kestra.core.server.ServiceInstance
table: "service_instance"
workerjobrunning:
cls: io.kestra.core.runners.WorkerJobRunning
table: "worker_job_running"
executionqueued:
table: "execution_queued"
cls: io.kestra.core.runners.ExecutionQueued

queues:
min-poll-interval: 25ms
max-poll-interval: 1000ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringStartsWith.startsWith;

class FlowNamespaceCommandTest {
@Test
Expand All @@ -23,7 +22,7 @@ void runWithNoParam() {
Integer call = PicocliRunner.call(FlowNamespaceCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), startsWith("Usage: kestra flow namespace"));
assertThat(out.toString(), containsString("Usage: kestra flow namespace"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.io.PrintStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringStartsWith.startsWith;

class NamespaceCommandTest {
@Test
Expand All @@ -22,7 +22,7 @@ void runWithNoParam() {
Integer call = PicocliRunner.call(NamespaceCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), startsWith("Usage: kestra namespace"));
assertThat(out.toString(), containsString("Usage: kestra namespace"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.env.Environment;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringStartsWith.startsWith;

class NamespaceFilesCommandTest {
@Test
Expand All @@ -23,7 +22,7 @@ void runWithNoParam() {
Integer call = PicocliRunner.call(NamespaceFilesCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), startsWith("Usage: kestra namespace files"));
assertThat(out.toString(), containsString("Usage: kestra namespace files"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.io.PrintStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringStartsWith.startsWith;

class DatabaseCommandTest {
@Test
Expand All @@ -22,7 +22,7 @@ void runWithNoParam() {
Integer call = PicocliRunner.call(DatabaseCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), startsWith("Usage: kestra sys database"));
assertThat(out.toString(), containsString("Usage: kestra sys database"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.io.PrintStream;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringStartsWith.startsWith;

class TemplateNamespaceCommandTest {
@Test
Expand All @@ -22,7 +22,7 @@ void runWithNoParam() {
Integer call = PicocliRunner.call(TemplateNamespaceCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), startsWith("Usage: kestra template namespace"));
assertThat(out.toString(), containsString("Usage: kestra template namespace"));
}
}
}
8 changes: 4 additions & 4 deletions core/src/main/java/io/kestra/core/contexts/KestraContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public static void setContext(final KestraContext context) {
* @return the string version.
*/
public abstract String getVersion();

/**
* Returns the Kestra Plugin Registry.
*
* @return the {@link PluginRegistry}.
*/
public abstract PluginRegistry getPluginRegistry();

/**
* Shutdowns the Kestra application.
*/
Expand Down Expand Up @@ -106,7 +106,7 @@ public Initializer(ApplicationContext applicationContext,
public ServerType getServerType() {
return Optional.ofNullable(environment)
.flatMap(env -> env.getProperty(KESTRA_SERVER_TYPE, ServerType.class))
.orElseThrow(() -> new IllegalStateException("Cannot found required environment property '" + KESTRA_SERVER_TYPE + "'."));
.orElse(ServerType.STANDALONE);
}

/** {@inheritDoc} **/
Expand All @@ -124,7 +124,7 @@ public void shutdown() {
public String getVersion() {
return version;
}

/** {@inheritDoc} **/
@Override
public PluginRegistry getPluginRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
import java.nio.file.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import jakarta.validation.ConstraintViolationException;

Expand Down Expand Up @@ -65,12 +68,9 @@ public void load(URL basePath) throws IOException, URISyntaxException {
}
}


public void load(File basePath) throws IOException {
this.load(basePath, false);
}

public void load(File basePath, Boolean update) throws IOException {
Map<String, Flow> flowByUidInRepository = flowRepository.findAllForAllTenants().stream()
.collect(Collectors.toMap(Flow::uidWithoutRevision, Function.identity()));
List<Path> list = Files.walk(basePath.toPath())
.filter(YamlFlowParser::isValidExtension)
.toList();
Expand All @@ -81,16 +81,12 @@ public void load(File basePath, Boolean update) throws IOException {
Flow parse = yamlFlowParser.parse(file.toFile(), Flow.class);
modelValidator.validate(parse);

if (!update) {
Flow inRepository = flowByUidInRepository.get(parse.uidWithoutRevision());

if (inRepository == null) {
this.createFlow(flowSource, parse);
} else {
Optional<Flow> find = flowRepository.findById(parse.getTenantId(), parse.getNamespace(), parse.getId());

if (find.isEmpty()) {
this.createFlow(flowSource, parse);
} else {
this.udpateFlow(flowSource, parse, find.get());
}
this.udpateFlow(flowSource, parse, inRepository);
}
} catch (ConstraintViolationException e) {
log.warn("Unable to create flow {}", file, e);
Expand Down
52 changes: 47 additions & 5 deletions core/src/main/java/io/kestra/core/runners/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
import io.kestra.core.repositories.MetricRepositoryInterface;
import io.kestra.core.repositories.SaveRepositoryInterface;
import io.kestra.core.repositories.TriggerRepositoryInterface;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.annotation.Requires;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
Expand All @@ -36,6 +41,10 @@ public class Indexer implements IndexerInterface {
private final MetricRegistry metricRegistry;
private final List<Runnable> receiveCancellations = new ArrayList<>();

private final String id = IdUtils.create();
private final AtomicReference<ServiceState> state = new AtomicReference<>();
private final ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher;

@Inject
public Indexer(
ExecutionRepositoryInterface executionRepository,
Expand All @@ -44,7 +53,8 @@ public Indexer(
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) QueueInterface<LogEntry> logQueue,
MetricRepositoryInterface metricRepositor,
@Named(QueueFactoryInterface.METRIC_QUEUE) QueueInterface<MetricEntry> metricQueue,
MetricRegistry metricRegistry
MetricRegistry metricRegistry,
ApplicationEventPublisher<ServiceStateChangeEvent> eventPublisher
) {
this.executionRepository = executionRepository;
this.executionQueue = executionQueue;
Expand All @@ -53,13 +63,16 @@ public Indexer(
this.metricRepository = metricRepositor;
this.metricQueue = metricQueue;
this.metricRegistry = metricRegistry;
this.eventPublisher = eventPublisher;
setState(ServiceState.CREATED);
}

@Override
public void run() {
this.send(executionQueue, executionRepository);
this.send(logQueue, logRepository);
this.send(metricQueue, metricRepository);
setState(ServiceState.RUNNING);
}

protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterface<T> saveRepositoryInterface) {
Expand All @@ -80,11 +93,40 @@ protected <T> void send(QueueInterface<T> queueInterface, SaveRepositoryInterfac
}));
}

protected void setState(final ServiceState state) {
this.state.set(state);
this.eventPublisher.publishEvent(new ServiceStateChangeEvent(this));
}

/** {@inheritDoc} **/
@Override
public String getId() {
return id;
}
/** {@inheritDoc} **/
@Override
public ServiceType getType() {
return ServiceType.INDEXER;
}
/** {@inheritDoc} **/
@Override
public ServiceState getState() {
return state.get();
}

@PreDestroy
@Override
public void close() throws IOException {
public void close() {
setState(ServiceState.TERMINATING);
this.receiveCancellations.forEach(Runnable::run);
this.executionQueue.close();
this.logQueue.close();
this.metricQueue.close();
try {
this.executionQueue.close();
this.logQueue.close();
this.metricQueue.close();
setState(ServiceState.TERMINATED_GRACEFULLY);
} catch (IOException e) {
log.error("Failed to close the queue", e);
setState(ServiceState.TERMINATED_FORCED);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.core.runners;

import java.io.Closeable;
import io.kestra.core.server.Service;

public interface IndexerInterface extends Runnable, Closeable {
public interface IndexerInterface extends Service, Runnable {

}
4 changes: 2 additions & 2 deletions core/src/main/java/io/kestra/core/runners/RunnerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ public Execution runOneUntilRunning(Flow flow, BiFunction<Flow, Execution, Map<S
public Execution awaitExecution(Predicate<Execution> predicate, Runnable executionEmitter, Duration duration) throws TimeoutException {
AtomicReference<Execution> receive = new AtomicReference<>();

Runnable cancel = this.executionQueue.receive(current -> {
Runnable cancel = this.executionQueue.receive(null, current -> {
if (predicate.test(current.getLeft())) {
receive.set(current.getLeft());
}
});
}, false);

executionEmitter.run();

Expand Down
Loading