Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(discovery): port kubeApi discovery to v3 #325

Merged
merged 30 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4d11b8a
feat(discovery): port kubeApi discovery to v3
tthvo Jan 26, 2024
f28fe85
chore(discovery): clean up unused codes and address reviews
tthvo Feb 8, 2024
0661f47
build(deps): remove unused mvn deps
tthvo Mar 22, 2024
ca24b07
feat(discovery): implement discovery logic for FOUND events
tthvo Mar 22, 2024
d29a798
feat(discovery): clean up and implement LOST events
tthvo Mar 27, 2024
f262aa8
chore(typo): fix typo
tthvo Mar 27, 2024
b7a68a7
chore(license): add missing license headers
tthvo Mar 27, 2024
aba6f8a
fix(orm): fix unwrap exception when using enum
tthvo Mar 27, 2024
0f63fa3
chore(logs): fix broken logs format
tthvo Mar 27, 2024
e678a3c
fix(discovery): ignore when target does not exist in db
tthvo Mar 27, 2024
06ee05b
fix(discovery): fix DELETE event handler
tthvo Apr 2, 2024
0adbc41
chore(spotless): add missing spotless fix
tthvo Apr 2, 2024
010909e
chore(k8s): correct labels
tthvo Apr 4, 2024
8ff1804
fixup(k8s): add missing changes for watch namespaces from Andrew
tthvo Apr 9, 2024
2fc59af
fix(k8s): fix string comparison
tthvo Apr 9, 2024
e3fc083
fix(k8s): fix duplicate namespace nodes
tthvo Apr 9, 2024
8746ff5
fix(k8s): fix realm node not found and deletion constraint violation
tthvo Apr 9, 2024
24bf1b5
fix(k8s): rebuild tree on endpoint events
tthvo Apr 22, 2024
4bddb2a
feat(discovery): define parent reference for discovery node
tthvo Apr 22, 2024
a1ea98a
feat(discovery): cache discovery queries
tthvo Apr 22, 2024
3029fe3
fix(discovery): fix bugs causing infinite loop in worker thread
tthvo Apr 22, 2024
4252166
fix(discovery): ensure cascade remove when removing nodes
tthvo Apr 22, 2024
2d4a4cf
fix(discovery): ensure all nodes have parent ref
tthvo Apr 22, 2024
543e103
fix(discovery): properly delete nodes when targets disappear
tthvo Apr 22, 2024
4221626
fix(discovery): only fetch target in namespace of interest
tthvo Apr 22, 2024
559a53d
chore(discovery): clean up and address reviews
tthvo Apr 24, 2024
fea4391
fixup(discovery): use optional for injected list
tthvo Apr 24, 2024
5f1d89e
fixup(transaction): run handler in correct transaction context
tthvo Apr 24, 2024
eddd41e
test(integration): fix broken integration tests
tthvo Apr 24, 2024
978cc5a
fix(spotbugs): remove unused equals override
tthvo Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(discovery): clean up and implement LOST events
  • Loading branch information
tthvo committed Apr 24, 2024
commit d29a798b3859eb0ab65dad914168ef7642761459
5 changes: 5 additions & 0 deletions src/main/java/io/cryostat/discovery/DiscoveryNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public static Optional<DiscoveryNode> getChild(
return node.children.stream().filter(predicate).findFirst();
}

public static Optional<DiscoveryNode> getNode(Predicate<DiscoveryNode> predicate) {
List<DiscoveryNode> nodes = listAll();
return nodes.stream().filter(predicate).findFirst();
}

public static DiscoveryNode environment(String name, NodeType nodeType) {
DiscoveryNode node = new DiscoveryNode();
node.name = name;
Expand Down
209 changes: 123 additions & 86 deletions src/main/java/io/cryostat/discovery/KubeApiDiscovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

import javax.management.remote.JMXServiceURL;

import io.cryostat.core.sys.FileSystem;
import io.cryostat.discovery.KubeApiDiscovery.KubeConfig;
import io.cryostat.targets.Target;
import io.cryostat.targets.Target.Annotations;
import io.cryostat.targets.Target.EventKind;

import com.google.common.base.Optional;
import io.fabric8.kubernetes.api.model.EndpointAddress;
import io.fabric8.kubernetes.api.model.EndpointPort;
import io.fabric8.kubernetes.api.model.EndpointSubset;
Expand Down Expand Up @@ -159,8 +160,8 @@ private Map<String, SharedIndexInformer<Endpoints>> safeGetInformers() {
}

private boolean isCompatiblePort(EndpointPort port) {
return JmxPortNames.or(List.of()).contains(port.getName())
|| JmxPortNumbers.or(List.of()).contains(port.getPort());
return JmxPortNames.orElse(List.of()).contains(port.getName())
|| JmxPortNumbers.orElse(List.of()).contains(port.getPort());
}

private List<TargetTuple> getTargetTuplesFrom(Endpoints endpoints) {
Expand All @@ -175,33 +176,87 @@ private List<TargetTuple> getTargetTuplesFrom(Endpoints endpoints) {
@Transactional
tthvo marked this conversation as resolved.
Show resolved Hide resolved
public void handleEndpointEvent(TargetTuple tuple, EventKind eventKind) {
DiscoveryNode realm = DiscoveryNode.getRealm(REALM).orElseThrow();
DiscoveryNode nsNode =
DiscoveryNode.environment(
tuple.objRef.getNamespace(), KubeDiscoveryNodeType.NAMESPACE);
if (realm.children.contains(nsNode)) {
nsNode =
DiscoveryNode.getChild(realm, n -> tuple.objRef.getNamespace() == n.name)
.orElseThrow();
}

switch (eventKind) {
case FOUND:
DiscoveryNode nsNode =
DiscoveryNode.environment(
tuple.objRef.getNamespace(), KubeDiscoveryNodeType.NAMESPACE);
buildOwnerChain(nsNode, tuple);
if (!realm.children.contains(nsNode)) {
realm.children.add(nsNode);
realm.persist();
} else {
nsNode =
DiscoveryNode.getChild(
realm, n -> tuple.objRef.getNamespace() == n.name)
.orElseThrow();
}
buildOwnerChain(nsNode, tuple);
break;
case LOST:
break;
case MODIFIED:
pruneOwnerChain(nsNode, tuple);
if (nsNode.children.isEmpty()) {
realm.children.remove(nsNode);
realm.persist();
}
break;
default:
}
}

private void pruneOwnerChain(DiscoveryNode nsNode, TargetTuple targetTuple) {
ObjectReference TargetTuple = targetTuple.addr.getTargetRef();
if (TargetTuple == null) {
logger.errorv(
"Address {} for Endpoint {} had null target reference",
targetTuple.addr.getIp() != null
? targetTuple.addr.getIp()
: targetTuple.addr.getHostname(),
targetTuple.objRef.getName());
return;
}

String targetKind = TargetTuple.getKind();
KubeDiscoveryNodeType targetType = KubeDiscoveryNodeType.fromKubernetesKind(targetKind);

Target target = Target.getTargetByConnectUrl(targetTuple.toTarget().connectUrl);
target.delete();

DiscoveryNode targetNode = target.discoveryNode;

if (targetType == KubeDiscoveryNodeType.POD) {
Pair<HasMetadata, DiscoveryNode> pod =
queryForNode(
TargetTuple.getNamespace(),
TargetTuple.getName(),
TargetTuple.getKind());

pod.getRight().children.remove(targetNode);
pod.getRight().persist();

Pair<HasMetadata, DiscoveryNode> node = pod;
while (true) {
Pair<HasMetadata, DiscoveryNode> owner = getOwnerNode(node);
if (owner == null) {
break;
}
DiscoveryNode ownerNode = owner.getRight();
if (node.getRight().children.isEmpty()) {
ownerNode.children.remove(node.getRight());
ownerNode.persist();
}
node = owner;
}
} else {
nsNode.children.remove(targetNode);
}

nsNode.persist();
}

private void buildOwnerChain(DiscoveryNode nsNode, TargetTuple targetTuple) {
ObjectReference targetRef = targetTuple.addr.getTargetRef();
if (targetRef == null) {
ObjectReference TargetTuple = targetTuple.addr.getTargetRef();
if (TargetTuple == null) {
logger.errorv(
"Address {} for Endpoint {} had null target reference",
targetTuple.addr.getIp() != null
Expand All @@ -210,24 +265,31 @@ private void buildOwnerChain(DiscoveryNode nsNode, TargetTuple targetTuple) {
targetTuple.objRef.getName());
return;
}
String targetKind = targetRef.getKind();

String targetKind = TargetTuple.getKind();
KubeDiscoveryNodeType targetType = KubeDiscoveryNodeType.fromKubernetesKind(targetKind);

Target target = targetTuple.toTarget();
DiscoveryNode targetNode = DiscoveryNode.target(target, KubeDiscoveryNodeType.ENDPOINT);
target.discoveryNode = targetNode;
target.persist();

if (targetType == KubeDiscoveryNodeType.POD) {
// if the Endpoint points to a Pod, chase the owner chain up as far as possible, then
// add that to the Namespace

Pair<HasMetadata, DiscoveryNode> pod =
queryForNode(
targetRef.getNamespace(), targetRef.getName(), targetRef.getKind());
pod.getRight()
.children
.add(DiscoveryNode.target(target, KubeDiscoveryNodeType.ENDPOINT));
TargetTuple.getNamespace(),
TargetTuple.getName(),
TargetTuple.getKind());

pod.getRight().children.add(targetNode);
pod.getRight().persist();

Pair<HasMetadata, DiscoveryNode> node = pod;
while (true) {
Pair<HasMetadata, DiscoveryNode> owner = getOrCreateOwnerNode(node);
Pair<HasMetadata, DiscoveryNode> owner = getOwnerNode(node);
if (owner == null) {
break;
}
Expand All @@ -242,14 +304,13 @@ private void buildOwnerChain(DiscoveryNode nsNode, TargetTuple targetTuple) {
} else {
// if the Endpoint points to something else(?) than a Pod, just add the target straight
// to the Namespace
nsNode.children.add(DiscoveryNode.target(target, KubeDiscoveryNodeType.ENDPOINT));
nsNode.children.add(targetNode);
}
target.persist();

nsNode.persist();
}

private Pair<HasMetadata, DiscoveryNode> getOrCreateOwnerNode(
Pair<HasMetadata, DiscoveryNode> child) {
private Pair<HasMetadata, DiscoveryNode> getOwnerNode(Pair<HasMetadata, DiscoveryNode> child) {
HasMetadata childRef = child.getLeft();
if (childRef == null) {
logger.errorv(
Expand All @@ -270,10 +331,7 @@ private Pair<HasMetadata, DiscoveryNode> getOrCreateOwnerNode(
.filter(o -> KubeDiscoveryNodeType.fromKubernetesKind(o.getKind()) != null)
.findFirst()
.orElse(owners.get(0));
Pair<HasMetadata, DiscoveryNode> pair =
queryForNode(namespace, owner.getName(), owner.getKind());
pair.getRight().persist();
return pair;
return queryForNode(namespace, owner.getName(), owner.getKind());
}

private Pair<HasMetadata, DiscoveryNode> queryForNode(
Expand All @@ -285,13 +343,31 @@ private Pair<HasMetadata, DiscoveryNode> queryForNode(

HasMetadata kubeObj =
nodeType.getQueryFunction().apply(client()).apply(namespace).apply(name);
DiscoveryNode node = new DiscoveryNode();
node.name = name;
node.nodeType = nodeType.getKind();
node.labels = kubeObj != null ? kubeObj.getMetadata().getLabels() : new HashMap<>();
node.children = new ArrayList<>();
node.target = null;
return Pair.of(kubeObj, node);

Optional<DiscoveryNode> node =
DiscoveryNode.getNode(
n -> {
return name.equals(n.name)
&& namespace.equals(n.labels.get("cryostat.io/namespace"));
});

return Pair.of(
kubeObj,
node.orElseGet(
() -> {
DiscoveryNode newNode = new DiscoveryNode();
newNode.name = name;
newNode.nodeType = nodeType.getKind();
newNode.children = new ArrayList<>();
newNode.target = null;
newNode.labels =
kubeObj != null
? kubeObj.getMetadata().getLabels()
: new HashMap<>();
// Add namespace to label to retrieve node later
newNode.labels.put("cryostat.io/namespace", namespace);
return newNode;
}));
}

@ApplicationScoped
Expand All @@ -311,7 +387,7 @@ static final class KubeConfig {
private KubernetesClient kubeClient;

List<String> getWatchNamespaces() {
return watchNamespaces.or(List.of());
return watchNamespaces.orElse(List.of());
}

String getOwnNamespace() {
Expand Down Expand Up @@ -357,9 +433,6 @@ public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
return;
}

TargetTuple.compare(previousTuples).to(currentTuples).updated().stream()
.forEach(tuple -> handleEndpointEvent(tuple, EventKind.MODIFIED));

TargetTuple.compare(previousTuples).to(currentTuples).added().stream()
.forEach(tuple -> handleEndpointEvent(tuple, EventKind.FOUND));

Expand Down Expand Up @@ -413,7 +486,13 @@ public Target toTarget() {

String ip = addr.getIp().replaceAll("\\.", "-");
String namespace = obj.getMetadata().getNamespace();
String host = String.format("%s.%s.pod", ip, namespace);

boolean isPod = obj.getKind() == KubeDiscoveryNodeType.POD.getKind();

String host = String.format("%s.%s", ip, namespace);
if (isPod) {
host = String.format("%s.pod", host);
}

JMXServiceURL jmxUrl =
new JMXServiceURL(
Expand Down Expand Up @@ -441,7 +520,7 @@ public Target toTarget() {
Integer.toString(port.getPort()),
"NAMESPACE",
objRef.getNamespace(),
"POD_NAME",
isPod ? "POD_NAME" : "OBJECT_NAME",
objRef.getName()));

return target;
Expand Down Expand Up @@ -490,58 +569,16 @@ public Compare to(Collection<TargetTuple> current) {
}

public Collection<TargetTuple> added() {
return removeAllUpdatedTuples(addedOrUpdatedTuples(), updated());
}

public Collection<TargetTuple> removed() {
return removeAllUpdatedTuples(removedOrUpdatedTuples(), updated());
}

public Collection<TargetTuple> updated() {
Collection<TargetTuple> updated = addedOrUpdatedTuples();
intersection(removedOrUpdatedTuples(), addedOrUpdatedTuples(), false)
.forEach((ref) -> updated.add(ref));
return updated;
}

private Collection<TargetTuple> addedOrUpdatedTuples() {
Collection<TargetTuple> added = new HashSet<>(current);
added.removeAll(previous);
return added;
}

private Collection<TargetTuple> removedOrUpdatedTuples() {
public Collection<TargetTuple> removed() {
Collection<TargetTuple> removed = new HashSet<>(previous);
removed.removeAll(current);
return removed;
}

private Collection<TargetTuple> removeAllUpdatedTuples(
Collection<TargetTuple> src, Collection<TargetTuple> updated) {
Collection<TargetTuple> tnSet = new HashSet<>(src);
intersection(src, updated, true).stream().forEach((ref) -> tnSet.remove(ref));
return tnSet;
}

private Collection<TargetTuple> intersection(
Collection<TargetTuple> src, Collection<TargetTuple> other, boolean keepOld) {
final Collection<TargetTuple> intersection = new HashSet<>();

// A tuple is considered as modified if its target reference is the same
for (TargetTuple srcTuple : src) {
for (TargetTuple otherTuple : other) {
if (Objects.equals(
srcTuple.objRef.getNamespace(),
otherTuple.objRef.getNamespace())
&& Objects.equals(
srcTuple.objRef.getName(), otherTuple.objRef.getName())) {

intersection.add(keepOld ? srcTuple : otherTuple);
}
}
}
return intersection;
}
}
}
}