Skip to content

Commit

Permalink
Adds Kuma destination namespace detection (#12)
Browse files Browse the repository at this point in the history
Adds 'service_mesh' configuration

Possible choices are 'istio' (default) and 'kuma'.

Different strategy to find the destination namespace depending on the
service mesh.
  • Loading branch information
jubarbot-cisco authored Jan 30, 2023
1 parent 5f78e31 commit a7b4eb8
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 11 deletions.
Binary file modified bin/release/http-trace-filter.wasm
Binary file not shown.
54 changes: 43 additions & 11 deletions src/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
tickMilliseconds uint32 = 60000 // 1 Minute
statusCodePseudoHeaderName = ":status"
contentTypeHeaderName = "content-type"
defaultServiceMesh = "istio"
)

func main() {
Expand All @@ -92,22 +93,25 @@ func (*vmContext) NewPluginContext(_ uint32) types.PluginContext {
type pluginContext struct {
types.DefaultPluginContext
pluginConfig
hostsToTrace map[string]struct{}
hostsToTrace map[string]struct{}
getDestinationNamespaceFn func(ctx *TraceFilterContext) (string, error)
}

type pluginConfig struct {
serverAddress string // The server to which the traces will be sent
traceSamplingEnabled bool
serviceMesh string
}

func (ctx *pluginContext) NewHttpContext(contextID uint32) types.HttpContext {
proxywasm.LogDebugf("Called new http context. contextID: %v", contextID)

return &TraceFilterContext{
contextID: contextID,
serverAddress: ctx.serverAddress,
hostsToTrace: ctx.hostsToTrace,
traceSamplingEnabled: ctx.traceSamplingEnabled,
contextID: contextID,
serverAddress: ctx.serverAddress,
hostsToTrace: ctx.hostsToTrace,
traceSamplingEnabled: ctx.traceSamplingEnabled,
getDestinationNamespaceFn: ctx.getDestinationNamespaceFn,
Telemetry: Telemetry{
Request: &Request{
Common: &Common{
Expand Down Expand Up @@ -136,9 +140,10 @@ type TraceFilterContext struct {

Telemetry

traceSamplingEnabled bool
hostsToTrace map[string]struct{}
isHostFixed bool
traceSamplingEnabled bool
hostsToTrace map[string]struct{}
isHostFixed bool
getDestinationNamespaceFn func(ctx *TraceFilterContext) (string, error)
}

func (ctx *pluginContext) OnPluginStart(_ int) types.OnPluginStartStatus {
Expand All @@ -151,6 +156,14 @@ func (ctx *pluginContext) OnPluginStart(_ int) types.OnPluginStartStatus {
return types.OnPluginStartStatusFailed
}
}
switch ctx.pluginConfig.serviceMesh {
case "istio":
ctx.getDestinationNamespaceFn = getIstioDestinationNamespace
case "kuma":
ctx.getDestinationNamespaceFn = getKumaDestinationNamespace
default:
ctx.getDestinationNamespaceFn = getIstioDestinationNamespace
}

return types.OnPluginStartStatusOK
}
Expand All @@ -159,6 +172,7 @@ func readPluginConfig() pluginConfig {
ret := pluginConfig{
serverAddress: "trace_analyzer",
traceSamplingEnabled: false,
serviceMesh: defaultServiceMesh,
}

data, err := proxywasm.GetPluginConfiguration()
Expand All @@ -178,6 +192,17 @@ func readPluginConfig() pluginConfig {
ret.traceSamplingEnabled = string(parsedData.GetStringBytes("trace_sampling_enabled")) == "true"
proxywasm.LogDebugf("Trace sampling enabled = %v", ret.traceSamplingEnabled)

serviceMesh := string(parsedData.GetStringBytes("service_mesh"))
switch serviceMesh {
case "istio", "kuma":
ret.serviceMesh = serviceMesh
default:
proxywasm.LogWarnf("Service Mesh '%s' is not supported, defaulting to '%s' for backward compatibility", serviceMesh, defaultServiceMesh)
serviceMesh = defaultServiceMesh
}

proxywasm.LogDebugf("Running on service Mesh = %v", ret.serviceMesh)

return ret
}

Expand Down Expand Up @@ -398,7 +423,7 @@ func (ctx *TraceFilterContext) OnHttpResponseHeaders(numHeaders int, endOfStream
}
var err error
// here we should have the upstream data (namespace)
ctx.Telemetry.DestinationNamespace, err = ctx.getDestinationNamespace()
ctx.Telemetry.DestinationNamespace, err = ctx.getDestinationNamespaceFn(ctx)
if err != nil {
proxywasm.LogInfof("Failed to get destination namespace: %v", err)
}
Expand Down Expand Up @@ -603,7 +628,7 @@ func (ctx *TraceFilterContext) shouldShortCircuitOnBody(bodySize int, truncatedB
return false
}

func (ctx *TraceFilterContext) getDestinationNamespace() (string, error) {
func getIstioDestinationNamespace(ctx *TraceFilterContext) (string, error) {
// catalogue;sock-shop;catalogue;latest;Kubernetes or catalogue;sock-shop;catalogue;latest
dstWorkload, err := proxywasm.GetProperty([]string{"upstream_host_metadata", "filter_metadata", "istio", "workload"})
if err != nil {
Expand All @@ -616,6 +641,14 @@ func (ctx *TraceFilterContext) getDestinationNamespace() (string, error) {
return "", fmt.Errorf("destination namespace was not found")
}

func getKumaDestinationNamespace(ctx *TraceFilterContext) (string, error) {
dstNamespace, err := proxywasm.GetProperty([]string{"upstream_host_metadata", "filter_metadata", "envoy.lb", "k8s.kuma.io/namespace"})
if err != nil {
return "", fmt.Errorf("failed to get upstream_host_metadata: %v", err)
}
return string(dstNamespace), nil
}

func (ctx *TraceFilterContext) shouldTrace() bool {
// check if we have enough information to decide according to hosts to trace map
if !ctx.isHostFixed {
Expand Down Expand Up @@ -645,7 +678,6 @@ func (ctx *TraceFilterContext) shouldTrace() bool {
return true
}

//
// fixHostname will return only hostname without scheme and port
// ex. https://example.org:8000 --> example.org. (for external services)
// for internal services:
Expand Down

0 comments on commit a7b4eb8

Please sign in to comment.