Skip to content

Commit

Permalink
Merge pull request #655 from seswarrajan/dev
Browse files Browse the repository at this point in the history
Use Kubearmor Alert struct instead of Log
  • Loading branch information
nyrahul authored Jan 20, 2023
2 parents 9b495bf + 2d0830c commit d500480
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 81 deletions.
2 changes: 1 addition & 1 deletion src/feedconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error {
if cfc.syslogEventsCount == cfc.eventsBuffer {
if len(cfc.syslogEvents) > 0 {
for _, syslog := range cfc.syslogEvents {
log := pb.Log{
log := pb.Alert{
ClusterName: syslog.ClusterName,
HostName: syslog.HostName,
NamespaceName: syslog.NamespaceName,
Expand Down
15 changes: 4 additions & 11 deletions src/observability/kubearmor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ func ProcessSystemLogs() {

SystemLogsMutex.Lock()
locSysLogs := SystemLogs
SystemLogs = []*pb.Log{} //reset
SystemLogs = []*pb.Alert{} //reset
SystemLogsMutex.Unlock()

ObsMutex.Lock()
defer ObsMutex.Unlock()

res := []types.KubeArmorLog{}

if config.GetCfgObservabilityWriteLogsToDB() {
Expand All @@ -88,7 +90,6 @@ func ProcessSystemLogs() {
jsonLog, _ := json.Marshal(kubearmorLog)
if err := json.Unmarshal(jsonLog, &locPbLog); err != nil {
log.Error().Msg(err.Error())
ObsMutex.Unlock()
return
}

Expand Down Expand Up @@ -149,22 +150,14 @@ func ProcessSystemLogs() {
}

//clearSummarizerMap()

ObsMutex.Unlock()
}

func ProcessKubearmorLog(kubearmorLog *pb.Log) {
func ProcessKubearmorLogs(kubearmorLog *pb.Alert) {
SystemLogsMutex.Lock()
SystemLogs = append(SystemLogs, kubearmorLog)
SystemLogsMutex.Unlock()
}

func ProcessKubearmorAlert(kubearmorAlert *pb.Log) {
SystemLogsMutex.Lock()
SystemLogs = append(SystemLogs, kubearmorAlert)
SystemLogsMutex.Unlock()
}

func aggregateProcFileData(data []types.SysObsProcFileData) []types.SysObsProcFileData {
if len(data) <= 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion src/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (
CfgDB types.ConfigDB
log *zerolog.Logger
// Kubearmor relay logs
SystemLogs []*pb.Log
SystemLogs []*pb.Alert
// Hubble relay logs
NetworkLogs []*flow.Flow
// Mutex
Expand Down
17 changes: 9 additions & 8 deletions src/observability/summarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
pb "github.com/kubearmor/KubeArmor/protobuf"
)

func extractNetworkInfoFromSystemLog(netLog pb.Log, pods []types.Pod, services []types.Service) (string, string, string, string, string, string, string, string, error) {
func extractNetworkInfoFromSystemLog(netLog pb.Alert, pods []types.Pod, services []types.Service) (string, string, string, string, string, string, string, string, error) {
var ip, destNs, destLabel, port, bindPort, bindAddress, protocol, nwrule string = "", "", "", "", "", "", "", ""
err := errors.New("not a valid incoming/outgoing connection")

Expand Down Expand Up @@ -73,7 +73,7 @@ func extractNetworkInfoFromSystemLog(netLog pb.Log, pods []types.Pod, services [
return ip, destNs, destLabel, port, bindPort, bindAddress, protocol, nwrule, nil
}

func convertSysLogToSysSummaryMap(syslogs []*pb.Log) {
func convertSysLogToSysSummaryMap(syslogs []*pb.Alert) {

deployments := cluster.GetDeploymentsFromK8sClient()

Expand All @@ -97,12 +97,8 @@ func convertSysLogToSysSummaryMap(syslogs []*pb.Log) {
continue
}

if syslog.Type == "MatchedPolicy" || syslog.Type == "MatchedHostPolicy" {
if syslog.Result == "Passed" {
sysSummary.Action = "Audit"
} else {
sysSummary.Action = "Deny"
}
if syslog.Action != "" {
sysSummary.Action = syslog.Action
} else {
sysSummary.Action = "Allow"
}
Expand All @@ -123,6 +119,11 @@ func convertSysLogToSysSummaryMap(syslogs []*pb.Log) {
sysSummary.Operation = syslog.Operation
sysSummary.Source = strings.Split(syslog.Source, " ")[0]
sysSummary.Labels = syslog.Labels
sysSummary.Enforcer = syslog.Enforcer
sysSummary.Tags = syslog.Tags
sysSummary.Message = syslog.Message
sysSummary.Severity = syslog.Severity
sysSummary.PolicyName = syslog.PolicyName
sysSummary.Deployment = ""

for _, d := range deployments {
Expand Down
98 changes: 53 additions & 45 deletions src/plugin/kubearmor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
)

// Global Variable
var KubeArmorRelayLogs []*pb.Log
var KubeArmorNetworkLogs []*pb.Log
var KubeArmorRelayLogs []*pb.Alert
var KubeArmorNetworkLogs []*pb.Alert
var KubeArmorRelayLogsMutex *sync.Mutex

var KubeArmorFCLogs []*types.KnoxSystemLog
Expand Down Expand Up @@ -229,7 +229,7 @@ func ConvertKubeArmorSystemLogsToKnoxSystemLogs(dbDriver string, docs []map[stri
return []types.KnoxSystemLog{}
}

func ConvertKubeArmorLogToKnoxSystemLog(relayLog *pb.Log) (types.KnoxSystemLog, error) {
func ConvertKubeArmorLogToKnoxSystemLog(relayLog *pb.Alert) (types.KnoxSystemLog, error) {

sources := strings.Split(relayLog.Source, " ")
source := ""
Expand Down Expand Up @@ -320,8 +320,8 @@ func ConnectKubeArmorRelay(cfg types.ConfigKubeArmorRelay) *grpc.ClientConn {
return conn
}

func GetSystemAlertsFromKubeArmorRelay(trigger int) []*pb.Log {
results := []*pb.Log{}
func GetSystemAlertsFromKubeArmorRelay(trigger int) []*pb.Alert {
results := []*pb.Alert{}
KubeArmorRelayLogsMutex.Lock()
if len(KubeArmorRelayLogs) == 0 {
log.Info().Msgf("KubeArmor Relay traffic flow not exist")
Expand All @@ -335,8 +335,8 @@ func GetSystemAlertsFromKubeArmorRelay(trigger int) []*pb.Log {
return results
}

results = KubeArmorRelayLogs // copy
KubeArmorRelayLogs = []*pb.Log{} // reset
results = KubeArmorRelayLogs // copy
KubeArmorRelayLogs = []*pb.Alert{} // reset
KubeArmorRelayLogsMutex.Unlock()

log.Info().Msgf("The total number of KubeArmor relay traffic flow: [%d] from %s ~ to %s", len(results),
Expand All @@ -346,25 +346,25 @@ func GetSystemAlertsFromKubeArmorRelay(trigger int) []*pb.Log {
return results
}

func ignoreLogFromRelayWithSource(filter []string, log *pb.Log) bool {
func ignoreLogFromRelayWithSource(filter []string, source string) bool {
for _, srcFilter := range filter {
if strings.Contains(log.Source, srcFilter) {
if strings.Contains(source, srcFilter) {
return true
}
}
return false
}

func ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter []string, log *pb.Log) bool {
func ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter []string, namespace string) bool {
if len(nsFilter) > 0 {
for _, ns := range nsFilter {
if !strings.Contains(log.NamespaceName, ns) {
if !strings.Contains(namespace, ns) {
return true
}
}
} else if len(nsNotFilter) > 0 {
for _, notns := range nsNotFilter {
if strings.Contains(log.NamespaceName, notns) {
if strings.Contains(namespace, notns) {
return true
}
}
Expand Down Expand Up @@ -414,11 +414,11 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
return
}

if ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter, res) {
if ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter, res.NamespaceName) {
continue
}

if ignoreLogFromRelayWithSource(fromSourceFilter, res) {
if ignoreLogFromRelayWithSource(fromSourceFilter, res.Source) {
continue
}

Expand All @@ -427,17 +427,43 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
continue
}

kubearmorLog := pb.Alert{
Timestamp: res.Timestamp,
UpdatedTime: res.UpdatedTime,
ClusterName: res.ClusterName,
HostName: res.HostName,
NamespaceName: res.NamespaceName,
PodName: res.PodName,
Labels: res.Labels,
ContainerID: res.ContainerID,
ContainerName: res.ContainerName,
ContainerImage: res.ContainerImage,
ParentProcessName: res.ParentProcessName,
ProcessName: res.ProcessName,
HostPPID: res.HostPPID,
HostPID: res.HostPID,
PPID: res.PPID,
PID: res.PID,
UID: res.UID,
Type: res.Type,
Source: res.Source,
Operation: res.Operation,
Resource: res.Resource,
Data: res.Data,
Result: res.Result,
}

KubeArmorRelayLogsMutex.Lock()
KubeArmorRelayLogs = append(KubeArmorRelayLogs, res)
KubeArmorRelayLogs = append(KubeArmorRelayLogs, &kubearmorLog)
KubeArmorRelayLogsMutex.Unlock()

if config.GetCfgObservabilityEnable() {
obs.ProcessKubearmorLog(res)
obs.ProcessKubearmorLogs(&kubearmorLog)
}

if config.CurrentCfg.ConfigNetPolicy.NetworkLogFrom == "kubearmor" {
if res.Operation == "Network" {
KubeArmorNetworkLogs = append(KubeArmorNetworkLogs, res)
KubeArmorNetworkLogs = append(KubeArmorNetworkLogs, &kubearmorLog)
}
}
}
Expand Down Expand Up @@ -468,29 +494,11 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
return
}

kubearmorLog := pb.Log{
ClusterName: res.ClusterName,
ContainerName: res.ContainerName,
ContainerID: res.ContainerID,
HostName: res.HostName,
NamespaceName: res.NamespaceName,
PodName: res.PodName,
Source: res.Source,
Operation: res.Operation,
Resource: res.Resource,
Data: res.Data,
Result: res.Result,
Type: res.Type,
ProcessName: res.ProcessName,
ParentProcessName: res.ParentProcessName,
Timestamp: res.Timestamp,
}

if ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter, &kubearmorLog) {
if ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter, res.NamespaceName) {
continue
}

if ignoreLogFromRelayWithSource(fromSourceFilter, &kubearmorLog) {
if ignoreLogFromRelayWithSource(fromSourceFilter, res.Source) {
continue
}

Expand All @@ -500,17 +508,17 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
}

KubeArmorRelayLogsMutex.Lock()
KubeArmorRelayLogs = append(KubeArmorRelayLogs, &kubearmorLog)
KubeArmorRelayLogs = append(KubeArmorRelayLogs, res)
KubeArmorRelayLogsMutex.Unlock()

if config.GetCfgObservabilityEnable() {
obs.ProcessKubearmorAlert(&kubearmorLog)
obs.ProcessKubearmorLogs(res)
}

if config.CurrentCfg.ConfigNetPolicy.NetworkLogFrom == "kubearmor" {

if kubearmorLog.Operation == "Network" {
KubeArmorNetworkLogs = append(KubeArmorNetworkLogs, &kubearmorLog)
if res.Operation == "Network" {
KubeArmorNetworkLogs = append(KubeArmorNetworkLogs, res)
}
}
}
Expand Down Expand Up @@ -540,20 +548,20 @@ func GetSystemLogsFromFeedConsumer(trigger int) []*types.KnoxSystemLog {
return results
}

func GetNetworkLogsFromKubeArmor() []*pb.Log {
func GetNetworkLogsFromKubeArmor() []*pb.Alert {
if len(KubeArmorNetworkLogs) <= 0 {
return nil
}

results := KubeArmorNetworkLogs // copy
KubeArmorNetworkLogs = []*pb.Log{} // reset
results := KubeArmorNetworkLogs // copy
KubeArmorNetworkLogs = []*pb.Alert{} // reset

log.Info().Msgf("The total number of KubeArmor network log : [%d]", len(results))

return results
}

func ConvertKubeArmorNetLogToKnoxNetLog(kaNwLogs []*pb.Log) []types.KnoxNetworkLog {
func ConvertKubeArmorNetLogToKnoxNetLog(kaNwLogs []*pb.Alert) []types.KnoxNetworkLog {
if len(kaNwLogs) <= 0 {
return nil
}
Expand Down
Loading

0 comments on commit d500480

Please sign in to comment.