Skip to content

Commit

Permalink
Modify the existing fileWatcher to make it more generic (#3325)
Browse files Browse the repository at this point in the history
* Modify the secretWatcher implementation

* Modify the secretWatcher implementation

* Modify the secretWatcher implementation

* chaneg to test FileWatcher

* Update the implementation of FileWatcher to make it generic

* Update the implementation of FileWatcher to make it generic

* Remove the secretWatcher

* Update all the usage of fileWatcher due to the change made to FileWatcher class

* revert the SecretWatcher commit

* Reset the secretwatcher format so it doesn't show up in the file changed PR

* Reset the secretwatcher format so it doesn't show up in the file changed PR

* Format

* Update the test

* Format

* Format

* Format

* Fix the test logic

* Remove the uncessary comments

* reformat the code

* Add the new function to update the ContractPublisher

* updateContract function

* updateContract function

* Fixing the new updateContract Function

* Replace the FileWatcher test with the new tests

* Add the contract publisher test

* Modify the FileWatcher to support runAtBeginning

* Formatting

* Modify the code that use the FileWatcher

* Remove the uncessary Null check

* Remove the runAtBeginning flag because it might increase complexity

* Reverse the format change

* Remove the secret watcher

* Modify the fileWatcher Test

* Code refector

* remove uncessary check

* Update the contractpublisher test

* Remove secretWatcher

* fix

* Update data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/file/FileWatcher.java

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Use await instead of sleep

* Update data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/eventbus/ContractPublisherTest.java

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Update the contractPublisherTest

* Update codegen

* Update the spotless

* Revert the protoc version change

---------

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
Leo6Leo and pierDipi authored Sep 29, 2023
1 parent 977b7ed commit 1ddc823
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
*/
package dev.knative.eventing.kafka.broker.core.eventbus;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This object publishes all consumed contracts to the event bus.
Expand All @@ -28,12 +39,17 @@ public class ContractPublisher implements Consumer<DataPlaneContract.Contract>,

private static final DeliveryOptions DELIVERY_OPTIONS = new DeliveryOptions().setLocalOnly(true);

private static final Logger logger = LoggerFactory.getLogger(ContractPublisher.class);

private final EventBus eventBus;
private final String address;

private long lastContract;

public ContractPublisher(EventBus eventBus, String address) {
this.eventBus = eventBus;
this.address = address;
this.lastContract = -1;
}

@Override
Expand All @@ -45,4 +61,42 @@ public void accept(DataPlaneContract.Contract contract) {
public void close() throws Exception {
this.accept(DataPlaneContract.Contract.newBuilder().build());
}

public void updateContract(File newContract) {
if (Thread.interrupted()) {
return;
}
try (final var fileReader = new FileReader(newContract);
final var bufferedReader = new BufferedReader(fileReader)) {
final var contract = parseFromJson(bufferedReader);
if (contract == null) {
return;
}
// The check, which is based only on the generation number, works because the control plane doesn't update
// the file if nothing changes.
final var previousLastContract = this.lastContract;
this.lastContract = contract.getGeneration();
if (contract.getGeneration() == previousLastContract) {
logger.debug(
"Contract unchanged {} {}",
keyValue("generation", contract.getGeneration()),
keyValue("lastGeneration", previousLastContract));
return;
}
this.accept(contract);
} catch (IOException e) {
logger.warn("Error reading the contract file, retrying...", e);
}
}

public static DataPlaneContract.Contract parseFromJson(final Reader content) throws IOException {
try {
final var contract = DataPlaneContract.Contract.newBuilder();
JsonFormat.parser().merge(content, contract);
return contract.build();
} catch (final InvalidProtocolBufferException ex) {
logger.debug("failed to parse from JSON", ex);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,65 +15,62 @@
*/
package dev.knative.eventing.kafka.broker.core.file;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import static java.nio.file.StandardWatchEventKinds.OVERFLOW;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.nio.file.FileSystems;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is responsible for watching a given file and reports update.
* This class is responsible for watching a given file and reports update or execute a trigger function.
* <p>
* Using {@link #start()}, this class will create a background thread running the file watcher.
* Using {@link #start()}, this class will create a background thread running
* the file watcher.
* You can interrupt such thread with {@link #close()}
* <p>
* This class is thread safe, and it cannot start more than one watch at the time.
* This class is thread safe, and it cannot start more than one watch at the
* time.
*/
public class FileWatcher implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(FileWatcher.class);

private final File toWatch;
private final Consumer<DataPlaneContract.Contract> contractConsumer;
private Runnable triggerFunction;

private Thread watcherThread;
private WatchService watcher;
private long lastContract;

/**
* All args constructor.
*
* @param contractConsumer updates receiver.
* @param file file to watch
*/
public FileWatcher(File file, Consumer<DataPlaneContract.Contract> contractConsumer) {
public FileWatcher(File file, Runnable triggerFunction) {
Objects.requireNonNull(file, "provide file");
Objects.requireNonNull(contractConsumer, "provide consumer");
Objects.requireNonNull(triggerFunction, "provide trigger function");

this.contractConsumer = contractConsumer;
this.triggerFunction = triggerFunction;
this.toWatch = file.getAbsoluteFile();
this.lastContract = -1;
}

public Thread getWatcherThread() {
return this.watcherThread;
}

/**
* Start the watcher thread.
* This is going to create a new deamon thread, which can be stopped using {@link #close()}.
* This is going to create a new deamon thread, which can be stopped using
* {@link #close()}.
*
* @throws IOException if an error happened while starting to watch
* @throws IllegalStateException if the watcher is already running
Expand Down Expand Up @@ -104,58 +101,48 @@ public synchronized void close() throws Exception {
this.watcherThread = null;
}

private void run() {
public void run() {
try {
// register the given watch service.
// Note: this watch a directory and not the single file we're interested in, so that's the
// reason in #watch() we filter watch service events based on the updated file.
// Note: this watches a directory and not the single file we're interested in, so
// that's the reason we filter watch service events based on the updated file.
this.toWatch.getParentFile().toPath().register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
} catch (IOException e) {
logger.error("Error while starting watching the file", e);
return;
}
logger.info("Started watching {}", toWatch);

// If the container restarts, the mounted file never gets reconciled, so update as soon as we
// start watching
update();
triggerFunction.run();

while (!Thread.interrupted()) {
var shouldUpdate = false;

// Note: take() blocks
WatchKey key;
try {
key = watcher.take();
logger.debug("Contract updates");
} catch (InterruptedException e) {
break; // Looks good, this means Thread.interrupt was invoked
break; // Thread.interrupt was invoked
}

// this should be rare but it can actually happen so check watch key validity
// Check the watch key's validity
if (!key.isValid()) {
logger.warn("Invalid key");
continue;
}

// loop through all watch service events and determine if an update we're interested in
// has occurred.
// Loop through all watch service events
for (final var event : key.pollEvents()) {

final var kind = event.kind();

// check if we're interested in the updated file
// We check if the event's context (the file) matches our target file
if (kind != OVERFLOW) {
shouldUpdate = true;
triggerFunction.run();
break;
}
}

if (shouldUpdate) {
update();
}

// reset the watch key, so that we receives new events
// Reset the watch key to receive new events
key.reset();
}

Expand All @@ -166,43 +153,4 @@ private void run() {
logger.warn("Error while closing the file watcher", e);
}
}

private void update() {
if (Thread.interrupted()) {
return;
}
try (final var fileReader = new FileReader(toWatch);
final var bufferedReader = new BufferedReader(fileReader)) {
final var contract = parseFromJson(bufferedReader);
if (contract == null) {
return;
}
// The check, which is based only on the generation number, works because the control plane doesn't update
// the
// file if nothing changes.
final var previousLastContract = this.lastContract;
this.lastContract = contract.getGeneration();
if (contract.getGeneration() == previousLastContract) {
logger.debug(
"Contract unchanged {} {}",
keyValue("generation", contract.getGeneration()),
keyValue("lastGeneration", previousLastContract));
return;
}
contractConsumer.accept(contract);
} catch (IOException e) {
logger.warn("Error reading the contract file, retrying...", e);
}
}

private DataPlaneContract.Contract parseFromJson(final Reader content) throws IOException {
try {
final var contract = DataPlaneContract.Contract.newBuilder();
JsonFormat.parser().merge(content, contract);
return contract.build();
} catch (final InvalidProtocolBufferException ex) {
logger.debug("failed to parse from JSON", ex);
}
return null;
}
}

This file was deleted.

Loading

0 comments on commit 1ddc823

Please sign in to comment.