Skip to content

Commit

Permalink
Merge pull request SigNoz#5652 from shivanshuraj1333/feat/issues/1588
Browse files Browse the repository at this point in the history
Add network latency (traces-metrics) correlation for kafka
  • Loading branch information
shivanshuraj1333 authored Aug 27, 2024
2 parents 4b861b2 + 9644297 commit 7191168
Show file tree
Hide file tree
Showing 6 changed files with 404 additions and 87 deletions.
103 changes: 103 additions & 0 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,10 +2496,113 @@ func (aH *APIHandler) RegisterMessagingQueuesRoutes(router *mux.Router, am *Auth

kafkaSubRouter.HandleFunc("/producer-details", am.ViewAccess(aH.getProducerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/consumer-details", am.ViewAccess(aH.getConsumerData)).Methods(http.MethodPost)
kafkaSubRouter.HandleFunc("/network-latency", am.ViewAccess(aH.getNetworkData)).Methods(http.MethodPost)

// for other messaging queues, add SubRouters here
}

// not using md5 hashing as the plain string would work
func uniqueIdentifier(clientID, serviceInstanceID, serviceName, separator string) string {
return clientID + separator + serviceInstanceID + separator + serviceName
}

func (aH *APIHandler) getNetworkData(
w http.ResponseWriter, r *http.Request,
) {
attributeCache := &mq.Clients{
Hash: make(map[string]struct{}),
}
messagingQueue, apiErr := ParseMessagingQueueBody(r)

if apiErr != nil {
zap.L().Error(apiErr.Err.Error())
RespondError(w, apiErr, nil)
return
}

queryRangeParams, err := mq.BuildQRParamsNetwork(messagingQueue, "throughput", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

var result []*v3.Result
var errQueriesByName map[string]error

result, errQueriesByName, err = aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByName)
return
}

for _, res := range result {
for _, series := range res.Series {
clientID, clientIDOk := series.Labels["client_id"]
serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"]
serviceName, serviceNameOk := series.Labels["service_name"]
hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#")
_, ok := attributeCache.Hash[hashKey]
if clientIDOk && serviceInstanceIDOk && serviceNameOk && !ok {
attributeCache.Hash[hashKey] = struct{}{}
attributeCache.ClientID = append(attributeCache.ClientID, clientID)
attributeCache.ServiceInstanceID = append(attributeCache.ServiceInstanceID, serviceInstanceID)
attributeCache.ServiceName = append(attributeCache.ServiceName, serviceName)
}
}
}

queryRangeParams, err = mq.BuildQRParamsNetwork(messagingQueue, "fetch-latency", attributeCache)
if err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}
if err := validateQueryRangeParamsV3(queryRangeParams); err != nil {
zap.L().Error(err.Error())
RespondError(w, apiErr, nil)
return
}

resultFetchLatency, errQueriesByNameFetchLatency, err := aH.querierV2.QueryRange(r.Context(), queryRangeParams, nil)
if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
RespondError(w, apiErrObj, errQueriesByNameFetchLatency)
return
}

latencyColumn := &v3.Result{QueryName: "latency"}
var latencySeries []*v3.Series
for _, res := range resultFetchLatency {
for _, series := range res.Series {
clientID, clientIDOk := series.Labels["client_id"]
serviceInstanceID, serviceInstanceIDOk := series.Labels["service_instance_id"]
serviceName, serviceNameOk := series.Labels["service_name"]
hashKey := uniqueIdentifier(clientID, serviceInstanceID, serviceName, "#")
_, ok := attributeCache.Hash[hashKey]
if clientIDOk && serviceInstanceIDOk && serviceNameOk && ok {
latencySeries = append(latencySeries, series)
}
}
}

latencyColumn.Series = latencySeries
result = append(result, latencyColumn)

resultFetchLatency = postprocess.TransformToTableForBuilderQueries(result, queryRangeParams)

resp := v3.QueryRangeResponse{
Result: resultFetchLatency,
}
aH.Respond(w, resp)
}

func (aH *APIHandler) getProducerData(
w http.ResponseWriter, r *http.Request,
) {
Expand Down
Loading

0 comments on commit 7191168

Please sign in to comment.