diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisher.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisher.java index 646c55cb93..58fe0d8802 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisher.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisher.java @@ -15,10 +15,21 @@ */ package dev.knative.eventing.kafka.broker.core.eventbus; +import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBus; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This object publishes all consumed contracts to the event bus. @@ -28,12 +39,17 @@ public class ContractPublisher implements Consumer, private static final DeliveryOptions DELIVERY_OPTIONS = new DeliveryOptions().setLocalOnly(true); + private static final Logger logger = LoggerFactory.getLogger(ContractPublisher.class); + private final EventBus eventBus; private final String address; + private long lastContract; + public ContractPublisher(EventBus eventBus, String address) { this.eventBus = eventBus; this.address = address; + this.lastContract = -1; } @Override @@ -45,4 +61,42 @@ public void accept(DataPlaneContract.Contract contract) { public void close() throws Exception { this.accept(DataPlaneContract.Contract.newBuilder().build()); } + + public void updateContract(File newContract) { + if (Thread.interrupted()) { + return; + } + try (final var fileReader = new FileReader(newContract); + final var bufferedReader = new BufferedReader(fileReader)) { + final var contract = parseFromJson(bufferedReader); + if (contract == null) { + return; + } + // The check, which is based only on the generation number, works because the control plane doesn't update + // the file if nothing changes. + final var previousLastContract = this.lastContract; + this.lastContract = contract.getGeneration(); + if (contract.getGeneration() == previousLastContract) { + logger.debug( + "Contract unchanged {} {}", + keyValue("generation", contract.getGeneration()), + keyValue("lastGeneration", previousLastContract)); + return; + } + this.accept(contract); + } catch (IOException e) { + logger.warn("Error reading the contract file, retrying...", e); + } + } + + public static DataPlaneContract.Contract parseFromJson(final Reader content) throws IOException { + try { + final var contract = DataPlaneContract.Contract.newBuilder(); + JsonFormat.parser().merge(content, contract); + return contract.build(); + } catch (final InvalidProtocolBufferException ex) { + logger.debug("failed to parse from JSON", ex); + } + return null; + } } diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java index 93b7da5a6e..fd21a0fa47 100644 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java @@ -15,46 +15,39 @@ */ package dev.knative.eventing.kafka.broker.core.file; -import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE; import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE; import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY; import static java.nio.file.StandardWatchEventKinds.OVERFLOW; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.util.JsonFormat; -import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; -import java.io.Reader; import java.nio.file.FileSystems; import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.util.Objects; -import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class is responsible for watching a given file and reports update. + * This class is responsible for watching a given file and reports update or execute a trigger function. *

- * Using {@link #start()}, this class will create a background thread running the file watcher. + * Using {@link #start()}, this class will create a background thread running + * the file watcher. * You can interrupt such thread with {@link #close()} *

- * This class is thread safe, and it cannot start more than one watch at the time. + * This class is thread safe, and it cannot start more than one watch at the + * time. */ public class FileWatcher implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(FileWatcher.class); private final File toWatch; - private final Consumer contractConsumer; + private Runnable triggerFunction; private Thread watcherThread; private WatchService watcher; - private long lastContract; /** * All args constructor. @@ -62,18 +55,22 @@ public class FileWatcher implements AutoCloseable { * @param contractConsumer updates receiver. * @param file file to watch */ - public FileWatcher(File file, Consumer contractConsumer) { + public FileWatcher(File file, Runnable triggerFunction) { Objects.requireNonNull(file, "provide file"); - Objects.requireNonNull(contractConsumer, "provide consumer"); + Objects.requireNonNull(triggerFunction, "provide trigger function"); - this.contractConsumer = contractConsumer; + this.triggerFunction = triggerFunction; this.toWatch = file.getAbsoluteFile(); - this.lastContract = -1; + } + + public Thread getWatcherThread() { + return this.watcherThread; } /** * Start the watcher thread. - * This is going to create a new deamon thread, which can be stopped using {@link #close()}. + * This is going to create a new deamon thread, which can be stopped using + * {@link #close()}. * * @throws IOException if an error happened while starting to watch * @throws IllegalStateException if the watcher is already running @@ -104,11 +101,11 @@ public synchronized void close() throws Exception { this.watcherThread = null; } - private void run() { + public void run() { try { // register the given watch service. - // Note: this watch a directory and not the single file we're interested in, so that's the - // reason in #watch() we filter watch service events based on the updated file. + // Note: this watches a directory and not the single file we're interested in, so + // that's the reason we filter watch service events based on the updated file. this.toWatch.getParentFile().toPath().register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); } catch (IOException e) { logger.error("Error while starting watching the file", e); @@ -116,46 +113,36 @@ private void run() { } logger.info("Started watching {}", toWatch); - // If the container restarts, the mounted file never gets reconciled, so update as soon as we - // start watching - update(); + triggerFunction.run(); while (!Thread.interrupted()) { - var shouldUpdate = false; - // Note: take() blocks WatchKey key; try { key = watcher.take(); logger.debug("Contract updates"); } catch (InterruptedException e) { - break; // Looks good, this means Thread.interrupt was invoked + break; // Thread.interrupt was invoked } - // this should be rare but it can actually happen so check watch key validity + // Check the watch key's validity if (!key.isValid()) { logger.warn("Invalid key"); continue; } - // loop through all watch service events and determine if an update we're interested in - // has occurred. + // Loop through all watch service events for (final var event : key.pollEvents()) { - final var kind = event.kind(); - // check if we're interested in the updated file + // We check if the event's context (the file) matches our target file if (kind != OVERFLOW) { - shouldUpdate = true; + triggerFunction.run(); break; } } - if (shouldUpdate) { - update(); - } - - // reset the watch key, so that we receives new events + // Reset the watch key to receive new events key.reset(); } @@ -166,43 +153,4 @@ private void run() { logger.warn("Error while closing the file watcher", e); } } - - private void update() { - if (Thread.interrupted()) { - return; - } - try (final var fileReader = new FileReader(toWatch); - final var bufferedReader = new BufferedReader(fileReader)) { - final var contract = parseFromJson(bufferedReader); - if (contract == null) { - return; - } - // The check, which is based only on the generation number, works because the control plane doesn't update - // the - // file if nothing changes. - final var previousLastContract = this.lastContract; - this.lastContract = contract.getGeneration(); - if (contract.getGeneration() == previousLastContract) { - logger.debug( - "Contract unchanged {} {}", - keyValue("generation", contract.getGeneration()), - keyValue("lastGeneration", previousLastContract)); - return; - } - contractConsumer.accept(contract); - } catch (IOException e) { - logger.warn("Error reading the contract file, retrying...", e); - } - } - - private DataPlaneContract.Contract parseFromJson(final Reader content) throws IOException { - try { - final var contract = DataPlaneContract.Contract.newBuilder(); - JsonFormat.parser().merge(content, contract); - return contract.build(); - } catch (final InvalidProtocolBufferException ex) { - logger.debug("failed to parse from JSON", ex); - } - return null; - } } diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/SecretWatcher.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/SecretWatcher.java deleted file mode 100644 index 152c7f6eac..0000000000 --- a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/SecretWatcher.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.core.file; - -import java.io.IOException; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Watches a directory for changes to TLS secrets. */ -public class SecretWatcher implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(SecretWatcher.class); - - private final String dir; // directory to watch - private final WatchService watcher; // watch service - private final Runnable updateAction; // action to run when a change is detected - - private static String KEY_FILE = "tls.key"; - private static String CRT_FILE = "tls.crt"; - - public SecretWatcher(String dir, Runnable updateAction) throws IOException { - this.dir = dir; - this.updateAction = updateAction; - this.watcher = FileSystems.getDefault().newWatchService(); - - Path path = Path.of(dir); - path.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY); - } - - @Override - public void run() { - try { - WatchKey key; - while ((key = watcher.take()) != null) { - for (WatchEvent event : key.pollEvents()) { - Path changed = (Path) event.context(); - if (changed.endsWith(KEY_FILE) || changed.endsWith(CRT_FILE)) { - logger.debug("Detected change to secret {}", changed); - updateAction.run(); - } - } - key.reset(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Watcher exception", e); - } finally { - this.stop(); - } - } - - // stop the watcher - public void stop() { - try { - watcher.close(); - } catch (IOException e) { - logger.error("Failed to close secret watcher", e); - } - } -} diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisherTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisherTest.java index 5094931eb9..1d0fa6c73c 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisherTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisherTest.java @@ -15,15 +15,26 @@ */ package dev.knative.eventing.kafka.broker.core.eventbus; +import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.resource1; +import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.resource2; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import com.google.protobuf.util.JsonFormat; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import dev.knative.eventing.kafka.broker.core.file.FileWatcherTest; import dev.knative.eventing.kafka.broker.core.testing.CoreObjects; import io.vertx.core.Vertx; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.LoggerFactory; @ExtendWith(VertxExtension.class) public class ContractPublisherTest { @@ -43,4 +54,97 @@ public void publishTest(Vertx vertx, VertxTestContext testContext) { ContractPublisher publisher = new ContractPublisher(vertx.eventBus(), address); publisher.accept(expected); } + + @Test + public void updateWithSameContractFileShouldNotTriggerUpdate(Vertx vertx) throws Exception { + // This test should aim to verify that the update function is not triggered when + // the file is updated with the + // same content. + + ContractMessageCodec.register(vertx.eventBus()); + + final var file = Files.createTempFile("fw-", "-fw").toFile(); + String address = "aaa"; + + // Create a contract object and write it in the file + final var broker1 = DataPlaneContract.Contract.newBuilder() + .addResources(resource1()) + .build(); + write(file, broker1); + + final var counter = new AtomicInteger(); + vertx.eventBus().localConsumer(address).handler(message -> { + // count the times the handler is called + counter.incrementAndGet(); + }); + + ContractPublisher publisher = new ContractPublisher(vertx.eventBus(), address); + + // Update the contract twice with the same content + // Only one update event will be passed to the event bus + publisher.updateContract(file); + publisher.updateContract(file); + + // Sleep to make sure that the handler is called + Thread.sleep(2000L); + + await().until(() -> counter.get() == 1); + } + + @Test + public void updateWithDifferentContractFileShouldTriggerUpdate(Vertx vertx) throws Exception { + // This test should aim to verify that the update function is triggered when + // the file is updated with a + // different content. + + ContractMessageCodec.register(vertx.eventBus()); + + final var file = Files.createTempFile("fw-", "-fw").toFile(); + final var file2 = Files.createTempFile("fw-", "-fw").toFile(); + String address = "aaa"; + + // Create a contract object and write it in the file + final var broker1 = DataPlaneContract.Contract.newBuilder() + .addResources(resource1()) + .setGeneration(1) + .build(); + write(file, broker1); + + final var counter = new AtomicInteger(); + vertx.eventBus().localConsumer(address).handler(message -> { + // count the times the handler is called + counter.incrementAndGet(); + }); + + ContractPublisher publisher = new ContractPublisher(vertx.eventBus(), address); + + // Update the contract twice with the same content + // Only one update event will be passed to the event bus + publisher.updateContract(file); + + // Create a new contract object and write it in the file + final var broker2 = DataPlaneContract.Contract.newBuilder() + .addResources(resource2()) + .setGeneration(2) + .build(); + write(file2, broker2); + + // Update the contract twice with the same content + // Only one update event will be passed to the event bus + publisher.updateContract(file2); + + // Sleep to make sure that the handler is called + Thread.sleep(2000L); + + await().until(() -> counter.get() == 2); + } + + public static void write(File file, DataPlaneContract.Contract contract) throws IOException { + final var f = new File(file.toString()); + try (final var out = new FileWriter(f)) { + JsonFormat.printer().appendTo(contract, out); + } finally { + LoggerFactory.getLogger(FileWatcherTest.class).info("file written"); + } + } } diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/file/FileWatcherTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/file/FileWatcherTest.java index 4d33dbf0c7..51bf6e98a0 100644 --- a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/file/FileWatcherTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/file/FileWatcherTest.java @@ -13,114 +13,69 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package dev.knative.eventing.kafka.broker.core.file; -import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.resource1; -import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.resource2; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.*; -import com.google.protobuf.util.JsonFormat; -import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import java.io.File; import java.io.FileWriter; -import java.io.IOException; import java.nio.file.Files; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; -import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.*; public class FileWatcherTest { - @Test - @Timeout(value = 5) - public void shouldReceiveUpdatesOnUpdate() throws Exception { - final var file = Files.createTempFile("fw-", "-fw").toFile(); - - final var broker1 = DataPlaneContract.Contract.newBuilder() - .addResources(resource1()) - .setGeneration(1) - .build(); - - final var broker2 = DataPlaneContract.Contract.newBuilder() - .addResources(resource2()) - .setGeneration(2) - .build(); - - final var isFirst = new AtomicBoolean(true); - final var waitFirst = new CountDownLatch(1); - final var waitSecond = new CountDownLatch(1); - final Consumer brokersConsumer = broker -> { - if (isFirst.getAndSet(false)) { - assertThat(broker).isEqualTo(broker1); - waitFirst.countDown(); - } else if (!broker.equals(broker1)) { - assertThat(broker).isEqualTo(broker2); - waitSecond.countDown(); - } - }; - - try (FileWatcher fw = new FileWatcher(file, brokersConsumer)) { - fw.start(); + private File tempFile; + private FileWatcher fileWatcher; - write(file, broker1); - waitFirst.await(); - - write(file, broker2); - waitSecond.await(); - } + @BeforeEach + public void setUp() throws Exception { + // Create a temporary file for testing purposes + tempFile = Files.createTempFile("test", ".txt").toFile(); } - @Test - @Timeout(value = 5) - public void shouldReadFileWhenStartWatchingWithoutUpdates() throws Exception { - - final var file = Files.createTempFile("fw-", "-fw").toFile(); - - final var broker1 = DataPlaneContract.Contract.newBuilder() - .addResources(resource1()) - .build(); - write(file, broker1); - - final var waitBroker = new CountDownLatch(1); - final Consumer brokersConsumer = broker -> { - assertThat(broker).isEqualTo(broker1); - waitBroker.countDown(); - }; - - try (FileWatcher fw = new FileWatcher(file, brokersConsumer)) { - fw.start(); - - waitBroker.await(); + @AfterEach + public void tearDown() throws Exception { + if (fileWatcher != null && fileWatcher.getWatcherThread() != null) { + fileWatcher.close(); } + Files.deleteIfExists(tempFile.toPath()); } @Test - @Timeout(value = 5) - public void shouldNotStartTwice() throws Exception { - - final var file = Files.createTempFile("fw-", "-fw").toFile(); - - final Consumer brokersConsumer = broker -> {}; - - try (FileWatcher fw = new FileWatcher(file, brokersConsumer)) { - // Started once - fw.start(); - - // Now this should fail - assertThatThrownBy(fw::start).isInstanceOf(IllegalStateException.class); + public void testFileModification() throws Exception { + // Set up a counter to track how many times the trigger function is called + AtomicInteger counter = new AtomicInteger(0); + + fileWatcher = new FileWatcher(tempFile, () -> { + counter.incrementAndGet(); + }); + fileWatcher.start(); + + // Modify the file + try (FileWriter writer = new FileWriter(tempFile)) { + writer.write("Test Data"); } + + // Await until the trigger function is called twice: 1 is for the initial file + // read, and 1 is for the file modification + await().until(() -> counter.get() == 2); } - public static void write(File file, DataPlaneContract.Contract contract) throws IOException { - final var f = new File(file.toString()); - try (final var out = new FileWriter(f)) { - JsonFormat.printer().appendTo(contract, out); - } finally { - LoggerFactory.getLogger(FileWatcherTest.class).info("file written"); - } + @Test + public void testFileNoUpdate() throws Exception { + // Set up a counter to track how many times the trigger function is called + AtomicInteger counter = new AtomicInteger(0); + + fileWatcher = new FileWatcher(tempFile, () -> { + counter.incrementAndGet(); + }); + fileWatcher.start(); + + // Await until the trigger function is called once: 1 is for the initial file + // read + await().until(() -> counter.get() == 1); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java index 3406055e10..01a608b26e 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java @@ -126,10 +126,12 @@ public static void start( ContractPublisher publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS); - FileWatcher fileWatcher = new FileWatcher(new File(env.getDataPlaneConfigFilePath()), publisher); + + File file = new File(env.getDataPlaneConfigFilePath()); + FileWatcher fileWatcher = new FileWatcher(file, () -> publisher.updateContract(file)); fileWatcher.start(); - // Register shutdown hook for graceful shutdown. + // Register shutdown hook for graceful shutdown. Shutdown.registerHook(vertx, publisher, fileWatcher, openTelemetry.getSdkTracerProvider()); } catch (final Exception ex) { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java index b836ff6676..222ac5ef0b 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/integration/UnorderedConsumerTest.java @@ -15,7 +15,6 @@ */ package dev.knative.eventing.kafka.broker.dispatcher.integration; -import static dev.knative.eventing.kafka.broker.core.file.FileWatcherTest.write; import static dev.knative.eventing.kafka.broker.core.testing.CoreObjects.contract; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -23,6 +22,7 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec; import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher; +import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisherTest; import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler; @@ -100,12 +100,13 @@ public void testUnorderedConsumer(final Vertx vertx) throws Exception { startServer(vertx, new VertxTestContext(), event, waitEvents); final var file = Files.createTempFile("fw-", "-fw").toFile(); - final var fileWatcher = new FileWatcher( - file, new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS)); + final var contractPublisher = + new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS); + final var fileWatcher = new FileWatcher(file, () -> contractPublisher.updateContract(file)); fileWatcher.start(); - write(file, contract); + ContractPublisherTest.write(file, contract); await().atMost(6, TimeUnit.SECONDS) .untilAsserted(() -> assertThat(vertx.deploymentIDs()).hasSize(numEgresses + NUM_SYSTEM_VERTICLES)); diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 02107f6d69..06a4332ff3 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -21,7 +21,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import dev.knative.eventing.kafka.broker.core.file.SecretWatcher; +import dev.knative.eventing.kafka.broker.core.file.FileWatcher; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; import dev.knative.eventing.kafka.broker.core.reconciler.ResourcesReconciler; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; @@ -89,7 +89,7 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler messageConsumer; private IngressProducerReconcilableStore ingressProducerStore; - private SecretWatcher secretWatcher; + private FileWatcher secretWatcher; public ReceiverVerticle( final ReceiverEnv env, @@ -176,15 +176,16 @@ public void start(final Promise startPromise) { // Set up the secret watcher private void setupSecretWatcher() { try { - this.secretWatcher = new SecretWatcher(secretVolumePath, this::updateServerConfig); - new Thread(this.secretWatcher).start(); + File file = new File(secretVolumePath + "/tls.crt"); + this.secretWatcher = new FileWatcher(file, this::updateServerConfig); + this.secretWatcher.start(); } catch (IOException e) { logger.error("Failed to start SecretWatcher", e); } } @Override - public void stop(Promise stopPromise) { + public void stop(Promise stopPromise) throws Exception { CompositeFuture.all( (this.httpServer != null ? this.httpServer.close().mapEmpty() : Future.succeededFuture()), (this.httpsServer != null ? this.httpsServer.close().mapEmpty() : Future.succeededFuture()), @@ -194,7 +195,11 @@ public void stop(Promise stopPromise) { // close the watcher if (this.secretWatcher != null) { - this.secretWatcher.stop(); + try { + this.secretWatcher.close(); + } catch (IOException e) { + logger.error("Failed to close SecretWatcher", e); + } } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index 0ca83974cc..8638931f45 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -118,7 +118,8 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk ContractPublisher publisher = new ContractPublisher(vertx.eventBus(), ResourcesReconcilerMessageHandler.ADDRESS); - FileWatcher fileWatcher = new FileWatcher(new File(env.getDataPlaneConfigFilePath()), publisher); + File file = new File(env.getDataPlaneConfigFilePath()); + FileWatcher fileWatcher = new FileWatcher(file, () -> publisher.updateContract(file)); fileWatcher.start(); // Register shutdown hook for graceful shutdown. diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index ce70296f60..eb37cfecca 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -256,28 +256,28 @@ public void secretFileUpdated() throws InterruptedException { // Write the new CA cert to the file String new_TLS_Cert = """ - -----BEGIN CERTIFICATE----- - MIIDmDCCAoCgAwIBAgIUZx4ztTK7wyEpRYKkKqM9+oFr+PwwDQYJKoZIhvcNAQEL - BQAwJzELMAkGA1UEBhMCVVMxGDAWBgNVBAMMD0V4YW1wbGUtUm9vdC1DQTAeFw0y - MzA3MTcxNDI1MzhaFw0yNjA1MDYxNDI1MzhaMG0xCzAJBgNVBAYTAlVTMRIwEAYD - VQQIDAlZb3VyU3RhdGUxETAPBgNVBAcMCFlvdXJDaXR5MR0wGwYDVQQKDBRFeGFt - cGxlLUNlcnRpZmljYXRlczEYMBYGA1UEAwwPbG9jYWxob3N0LmxvY2FsMIIBIjAN - BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyq0tbWj3zb/lhcykAAXlc8RVVPiZ - 898NxNV1od3XvFUFRYkQP9DU/3nE/5DxDQbQmfTlov50WbgSgQxt9GR7iC3lheOm - B3ODaA0p3C7bBg7LeUvtrhvPyHITDI9Aqy8cUO5XHVgbTceW7XOvcmju/DVpm9Id - iSpEEPMT2GsuLQ2rVvNupIccYRe0NhZly7l27AAkf5y1G2Yd9Oklt+gOPNPB+afH - /eFlYRrKokp58Kt1eyDNAwaYV8arEKIapU2AQheZTZQSBOi/tFCc7oKFQOmO9sFf - HEuQfCVd8TZJ2vb7qdiLVlgTDwjVYmUkfkxR7JJ/feDacyfjGkqYd1bngQIDAQAB - o3YwdDAfBgNVHSMEGDAWgBQGanp895VYiwZNv+X+JJ7GWjQtWTAJBgNVHRMEAjAA - MAsGA1UdDwQEAwIE8DAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwHQYDVR0O - BBYEFOlfLUC1MJOOjGRWfVzHQYA+Iya4MA0GCSqGSIb3DQEBCwUAA4IBAQACCgdN - Sj+W39W+8JdHpBU/fw1wwNDB4SyIyxAgPXp8TWiOwoo3ozcALP44ab4jP9b+Etlm - yNMNdayOf42SCZUhihO4PKiiqDgolDQfYaZbiIEXJ/xaXtao5SxyBPY77eXtXN/+ - E7/TOWQ5U7qJYd7H5vqhlFk6fn7s6WKkue8ELUrWh8r3THASXUsa8xzxHu0nsp2v - SsbYyR0vyrGE4yvComvl75Igw6jY70cswWdyThGKV6ZLip2BrjLQlFhr3IZN5tbg - rHxaoqIen8NYjNpBdJDInPMFZshZSx1lAzw6uwP4OuM5WQHgYEk7V+TkOU3osqgD - 5bOo/SpCokC166Ym - -----END CERTIFICATE-----"""; +-----BEGIN CERTIFICATE----- +MIIDmDCCAoCgAwIBAgIUZx4ztTK7wyEpRYKkKqM9+oFr+PwwDQYJKoZIhvcNAQEL +BQAwJzELMAkGA1UEBhMCVVMxGDAWBgNVBAMMD0V4YW1wbGUtUm9vdC1DQTAeFw0y +MzA3MTcxNDI1MzhaFw0yNjA1MDYxNDI1MzhaMG0xCzAJBgNVBAYTAlVTMRIwEAYD +VQQIDAlZb3VyU3RhdGUxETAPBgNVBAcMCFlvdXJDaXR5MR0wGwYDVQQKDBRFeGFt +cGxlLUNlcnRpZmljYXRlczEYMBYGA1UEAwwPbG9jYWxob3N0LmxvY2FsMIIBIjAN +BgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyq0tbWj3zb/lhcykAAXlc8RVVPiZ +898NxNV1od3XvFUFRYkQP9DU/3nE/5DxDQbQmfTlov50WbgSgQxt9GR7iC3lheOm +B3ODaA0p3C7bBg7LeUvtrhvPyHITDI9Aqy8cUO5XHVgbTceW7XOvcmju/DVpm9Id +iSpEEPMT2GsuLQ2rVvNupIccYRe0NhZly7l27AAkf5y1G2Yd9Oklt+gOPNPB+afH +/eFlYRrKokp58Kt1eyDNAwaYV8arEKIapU2AQheZTZQSBOi/tFCc7oKFQOmO9sFf +HEuQfCVd8TZJ2vb7qdiLVlgTDwjVYmUkfkxR7JJ/feDacyfjGkqYd1bngQIDAQAB +o3YwdDAfBgNVHSMEGDAWgBQGanp895VYiwZNv+X+JJ7GWjQtWTAJBgNVHRMEAjAA +MAsGA1UdDwQEAwIE8DAaBgNVHREEEzARgglsb2NhbGhvc3SHBH8AAAEwHQYDVR0O +BBYEFOlfLUC1MJOOjGRWfVzHQYA+Iya4MA0GCSqGSIb3DQEBCwUAA4IBAQACCgdN +Sj+W39W+8JdHpBU/fw1wwNDB4SyIyxAgPXp8TWiOwoo3ozcALP44ab4jP9b+Etlm +yNMNdayOf42SCZUhihO4PKiiqDgolDQfYaZbiIEXJ/xaXtao5SxyBPY77eXtXN/+ +E7/TOWQ5U7qJYd7H5vqhlFk6fn7s6WKkue8ELUrWh8r3THASXUsa8xzxHu0nsp2v +SsbYyR0vyrGE4yvComvl75Igw6jY70cswWdyThGKV6ZLip2BrjLQlFhr3IZN5tbg +rHxaoqIen8NYjNpBdJDInPMFZshZSx1lAzw6uwP4OuM5WQHgYEk7V+TkOU3osqgD +5bOo/SpCokC166Ym +-----END CERTIFICATE-----"""; String new_TLS_key = """