Skip to content

Enhance the performance of the frontend JSON codec #6816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 #6821
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/json_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
)

func init() {
jsoniter.RegisterTypeDecoderFunc("labels.Labels", decodeLabels)
jsoniter.RegisterTypeEncoderFunc("labels.Labels", encodeLabels, labelsIsEmpty)
jsoniter.RegisterTypeDecoderFunc("labels.Labels", DecodeLabels)
jsoniter.RegisterTypeEncoderFunc("labels.Labels", EncodeLabels, labelsIsEmpty)
jsoniter.RegisterTypeDecoderFunc("model.Time", decodeModelTime)
jsoniter.RegisterTypeEncoderFunc("model.Time", encodeModelTime, modelTimeIsEmpty)
}

// Override Prometheus' labels.Labels decoder which goes via a map
func decodeLabels(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
func DecodeLabels(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
labelsPtr := (*labels.Labels)(ptr)
*labelsPtr = make(labels.Labels, 0, 10)
iter.ReadMapCB(func(iter *jsoniter.Iterator, key string) bool {
Expand All @@ -31,7 +31,7 @@ func decodeLabels(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
}

// Override Prometheus' labels.Labels encoder which goes via a map
func encodeLabels(ptr unsafe.Pointer, stream *jsoniter.Stream) {
func EncodeLabels(ptr unsafe.Pointer, stream *jsoniter.Stream) {
labelsPtr := (*labels.Labels)(ptr)
stream.WriteObjectStart()
for i, v := range *labelsPtr {
Expand Down
29 changes: 7 additions & 22 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/util/jsonutil"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/runutil"
Expand Down Expand Up @@ -113,12 +114,8 @@ func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
switch field {
case "metric":
metricString := iter.ReadAny().ToString()
lbls := labels.Labels{}
if err := json.UnmarshalFromString(metricString, &lbls); err != nil {
iter.ReportError("unmarshal SampleStream", err.Error())
return
}
chunk.DecodeLabels(unsafe.Pointer(&lbls), iter)
ss.Labels = cortexpb.FromLabelsToLabelAdapters(lbls)
case "values":
for iter.ReadArray() {
Expand Down Expand Up @@ -302,12 +299,8 @@ func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteObjectStart()

stream.WriteObjectField(`metric`)
lbls, err := cortexpb.FromLabelAdaptersToLabels(ss.Labels).MarshalJSON()
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), lbls...))
metric := cortexpb.FromLabelAdaptersToLabels(ss.Labels)
chunk.EncodeLabels(unsafe.Pointer(&metric), stream)

if len(ss.Samples) > 0 {
stream.WriteMore()
Expand Down Expand Up @@ -343,12 +336,8 @@ func decodeSample(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
switch field {
case "metric":
metricString := iter.ReadAny().ToString()
lbls := labels.Labels{}
if err := json.UnmarshalFromString(metricString, &lbls); err != nil {
iter.ReportError("unmarshal Sample", err.Error())
return
}
chunk.DecodeLabels(unsafe.Pointer(&lbls), iter)
ss.Labels = cortexpb.FromLabelsToLabelAdapters(lbls)
case "value":
ss.Sample = &cortexpb.Sample{}
Expand All @@ -368,12 +357,8 @@ func encodeSample(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteObjectStart()

stream.WriteObjectField(`metric`)
lbls, err := cortexpb.FromLabelAdaptersToLabels(ss.Labels).MarshalJSON()
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), lbls...))
metric := cortexpb.FromLabelAdaptersToLabels(ss.Labels)
chunk.EncodeLabels(unsafe.Pointer(&metric), stream)

if ss.Sample != nil {
stream.WriteMore()
Expand Down
Loading