Skip to content

Commit

Permalink
enhance: [2.4] Add cgo call metrics for load/write API (#37405) (#37627)
Browse files Browse the repository at this point in the history
Cherry-pick from master
pr: #37405

Cgo API cost is not observerable since not metrics is related to them.
This PR add metrics for some sync cgo call related to load & write

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Nov 13, 2024
1 parent 6dc879b commit d073f32
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
11 changes: 11 additions & 0 deletions internal/querynodev2/segments/load_index_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import "C"

import (
"context"
"fmt"
"runtime"
"time"
"unsafe"

"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/cgopb"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -197,6 +200,14 @@ func (li *LoadIndexInfo) loadIndex(ctx context.Context) error {
var status C.CStatus

_, _ = GetLoadPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"AppendIndexV2",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
status = C.AppendIndexV3(li.cLoadIndexInfo)
} else {
Expand Down
41 changes: 41 additions & 0 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"io"
"runtime"
"strings"
"time"
"unsafe"

"github.com/apache/arrow/go/v12/arrow/array"
Expand Down Expand Up @@ -795,6 +796,14 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
var status C.CStatus

GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"Insert",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
status = C.Insert(s.ptr,
cOffset,
cNumOfRows,
Expand Down Expand Up @@ -870,6 +879,14 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
}
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"Delete",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
status = C.Delete(s.ptr,
cOffset,
cSize,
Expand Down Expand Up @@ -932,6 +949,14 @@ func (s *LocalSegment) LoadMultiFieldData(ctx context.Context) error {

var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"LoadFieldData",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
if err != nil {
Expand Down Expand Up @@ -1010,6 +1035,14 @@ func (s *LocalSegment) LoadFieldData(ctx context.Context, fieldID int64, rowCoun

var status C.CStatus
GetLoadPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"LoadFieldData",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
log.Info("submitted loadFieldData task to load pool")
if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() {
uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID())
Expand Down Expand Up @@ -1236,6 +1269,14 @@ func (s *LocalSegment) LoadDeltaData(ctx context.Context, deltaData *storage.Del
*/
var status C.CStatus
GetDynamicPool().Submit(func() (any, error) {
start := time.Now()
defer func() {
metrics.QueryNodeCGOCallLatency.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
"LoadDeletedRecord",
"Sync",
).Observe(float64(time.Since(start).Milliseconds()))
}()
status = C.LoadDeletedRecord(s.ptr, loadInfo)
return nil, nil
}).Await()
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const (
lockOp = "lock_op"
loadTypeName = "load_type"
pathLabelName = "path"
cgoNameLabelName = `cgo_name`
cgoTypeLabelName = `cgo_type`

// entities label
LoadedLabel = "loaded"
Expand Down
14 changes: 14 additions & 0 deletions pkg/metrics/querynode_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,19 @@ var (
channelNameLabelName,
},
)

QueryNodeCGOCallLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "cgo_latency",
Help: "latency of each cgo call",
Buckets: buckets,
}, []string{
nodeIDLabelName,
cgoNameLabelName,
cgoTypeLabelName,
})
)

// RegisterQueryNode registers QueryNode metrics
Expand Down Expand Up @@ -859,6 +872,7 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeSearchHitSegmentNum)
registry.MustRegister(QueryNodeDeleteBufferSize)
registry.MustRegister(QueryNodeDeleteBufferRowNum)
registry.MustRegister(QueryNodeCGOCallLatency)
// Add cgo metrics
RegisterCGOMetrics(registry)
}
Expand Down

0 comments on commit d073f32

Please sign in to comment.