diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index cb9bc3a867ee7..0919f92a6e701 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -1,5 +1,8 @@ name: Checks -on: [push] +on: + pull_request: + branches: + - main jobs: checks: runs-on: ubuntu-latest @@ -10,6 +13,11 @@ jobs: steps: - uses: actions/checkout@v4 - run: git config --global --add safe.directory "$GITHUB_WORKSPACE" + - name: golangci-lint + uses: golangci/golangci-lint-action@08e2f20817b15149a52b5b3ebe7de50aff2ba8c5 + with: + version: v1.55.1 + only-new-issues: true - run: make lint - run: make check-doc - run: make check-mod diff --git a/CHANGELOG.md b/CHANGELOG.md index 37599ae8d347c..5e5adea5d2a02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ * [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results. * [11143](https://github.com/grafana/loki/pull/11143) **sandeepsukhani** otel: Add support for per tenant configuration for mapping otlp data to loki format * [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs. +* [11817](https://github.com/grafana/loki/pull/11817) **ashwanthgoli** Ruler: Add support for filtering results of `/prometheus/api/v1/rules` endpoint by rule_name, rule_group, file and type. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 25e4f70f987c3..b675f85157423 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3143,14 +3143,22 @@ shard_streams: # OTLP log ingestion configurations otlp_config: + # Configuration for resource attributes to store them as index labels or + # Structured Metadata or drop them altogether resource_attributes: - [ignore_defaults: ] + # Configure whether to ignore the default list of resource attributes to be + # stored as index labels and only use the given resource attributes config + [ignore_defaults: | default = false] - [attributes: ] + [attributes_config: ] - [scope_attributes: ] + # Configuration for scope attributes to store them as Structured Metadata or + # drop them altogether + [scope_attributes: ] - [log_attributes: ] + # Configuration for log attributes to store them as Structured Metadata or + # drop them altogether + [log_attributes: ] ``` ### frontend_worker @@ -5292,6 +5300,24 @@ Named store from this example can be used by setting object_store to store-1 in [cos: ] ``` +### attributes_config + +Define actions for matching OpenTelemetry (OTEL) attributes. + +```yaml +# Configures action to take on matching attributes. It allows one of +# [structured_metadata, drop] for all attribute types. It additionally allows +# index_label action for resource attributes +[action: | default = ""] + +# List of attributes to configure how to store them or drop them altogether +[attributes: ] + +# Regex to choose attributes to configure how to store them or drop them +# altogether +[regex: ] +``` + ## Runtime Configuration file Loki has a concept of "runtime config" file, which is simply a file that is reloaded while Loki is running. It is used by some Loki components to allow operator to change some aspects of Loki configuration without restarting it. File is specified by using `-runtime-config.file=` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=` is not specified. diff --git a/docs/sources/reference/api.md b/docs/sources/reference/api.md index 2e48e178534d2..cf384859c6a71 100644 --- a/docs/sources/reference/api.md +++ b/docs/sources/reference/api.md @@ -1178,11 +1178,15 @@ Deletes all the rule groups in a namespace (including the namespace itself). Thi ### List rules ``` -GET /prometheus/api/v1/rules +GET /prometheus/api/v1/rules?type={alert|record}&file={}&rule_group={}&rule_name={} ``` Prometheus-compatible rules endpoint to list alerting and recording rules that are currently loaded. +The `type` parameter is optional. If set, only the specified type of rule is returned. + +The `file`, `rule_group` and `rule_name` parameters are optional, and can accept multiple values. If set, the response content is filtered accordingly. + For more information, refer to the [Prometheus rules](https://prometheus.io/docs/prometheus/latest/querying/api/#rules) documentation. ### List alerts diff --git a/docs/sources/send-data/otel/_index.md b/docs/sources/send-data/otel/_index.md index cd4b91f93399b..12f9cdd0e4af5 100644 --- a/docs/sources/send-data/otel/_index.md +++ b/docs/sources/send-data/otel/_index.md @@ -65,7 +65,7 @@ service: ## Format considerations -Since the OpenTelemetry protocol differs from the Loki storage model, here is how data in the OpenTelemetry format will be mapped to the Loki data model during ingestion: +Since the OpenTelemetry protocol differs from the Loki storage model, here is how data in the OpenTelemetry format will be mapped by default to the Loki data model during ingestion, which can be changed as explained later: - Index labels: Resource attributes map well to index labels in Loki, since both usually identify the source of the logs. Because Loki has a limit of 30 index labels, we have selected the following resource attributes to be stored as index labels, while the remaining attributes are stored as [Structured Metadata]({{< relref "../../get-started/labels/structured-metadata" >}}) with each log entry: - cloud.availability_zone @@ -112,3 +112,102 @@ Things to note before ingesting OpenTelemetry logs to Loki: - Stringification of non-string Attribute values While converting Attribute values in OTLP to Index label values or Structured Metadata, any non-string values are converted to string using [AsString method from the OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). + +### Changing the default mapping of OTLP to Loki Format + +Loki supports [per tenant]({{< relref "../../configure#limits_config" >}}) OTLP config which lets you change the default mapping of OTLP to Loki format for each tenant. +It currently only supports changing the storage of Attributes. Here is how the config looks like: + +```yaml +# OTLP log ingestion configurations +otlp_config: + # Configuration for Resource Attributes to store them as index labels or + # Structured Metadata or drop them altogether + resource_attributes: + # Configure whether to ignore the default list of Resource Attributes to be + # stored as Index Labels and only use the given Resource Attributes config + [ignore_defaults: ] + + [attributes_config: ] + + # Configuration for Scope Attributes to store them as Structured Metadata or + # drop them altogether + [scope_attributes: ] + + # Configuration for Log Attributes to store them as Structured Metadata or + # drop them altogether + [log_attributes: ] + +attributes_config: + # Configures action to take on matching Attributes. It allows one of + # [structured_metadata, drop] for all Attribute types. It additionally allows + # index_label action for Resource Attributes + [action: | default = ""] + + # List of attributes to configure how to store them or drop them altogether + [attributes: ] + + # Regex to choose attributes to configure how to store them or drop them + # altogether + [regex: ] +``` + +Here are some example configs to change the default mapping of OTLP to Loki format: + +#### Example 1: + +```yaml +otlp_config: + resource_attributes: + attributes_config: + - action: index_label + attributes: + - service.group +``` + +With the example config, here is how various kinds of Attributes would be stored: +* Store all 17 Resource Attributes mentioned earlier and `service.group` Resource Attribute as index labels. +* Store remaining Resource Attributes as Structured Metadata. +* Store all the Scope and Log Attributes as Structured Metadata. + +#### Example 2: + +```yaml +otlp_config: + resource_attributes: + ignore_defaults: true + attributes_config: + - action: index_label + regex: service.group +``` + +With the example config, here is how various kinds of Attributes would be stored: +* **Only** store `service.group` Resource Attribute as index labels. +* Store remaining Resource Attributes as Structured Metadata. +* Store all the Scope and Log Attributes as Structured Metadata. + +#### Example 2: + +```yaml +otlp_config: + resource_attributes: + attributes_config: + - action: index_label + regex: service.group + scope_attributes: + - action: drop + attributes: + - method.name + log_attributes: + - action: structured_metadata + attributes: + - user.id + - action: drop + regex: .* +``` + +With the example config, here is how various kinds of Attributes would be stored: +* Store all 17 Resource Attributes mentioned earlier and `service.group` Resource Attribute as index labels. +* Store remaining Resource Attributes as Structured Metadata. +* Drop Scope Attribute named `method.name` and store all other Scope Attributes as Structured Metadata. +* Store Log Attribute named `user.id` as Structured Metadata and drop all other Log Attributes. \ No newline at end of file diff --git a/pkg/loghttp/push/otlp_config.go b/pkg/loghttp/push/otlp_config.go index 64120d4a6252e..44c0e932f9c12 100644 --- a/pkg/loghttp/push/otlp_config.go +++ b/pkg/loghttp/push/otlp_config.go @@ -56,9 +56,9 @@ var DefaultOTLPConfig = OTLPConfig{ } type OTLPConfig struct { - ResourceAttributes ResourceAttributesConfig `yaml:"resource_attributes,omitempty"` - ScopeAttributes []AttributesConfig `yaml:"scope_attributes,omitempty"` - LogAttributes []AttributesConfig `yaml:"log_attributes,omitempty"` + ResourceAttributes ResourceAttributesConfig `yaml:"resource_attributes,omitempty" doc:"description=Configuration for resource attributes to store them as index labels or Structured Metadata or drop them altogether"` + ScopeAttributes []AttributesConfig `yaml:"scope_attributes,omitempty" doc:"description=Configuration for scope attributes to store them as Structured Metadata or drop them altogether"` + LogAttributes []AttributesConfig `yaml:"log_attributes,omitempty" doc:"description=Configuration for log attributes to store them as Structured Metadata or drop them altogether"` } func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -115,9 +115,9 @@ func (c *OTLPConfig) Validate() error { } type AttributesConfig struct { - Action Action `yaml:"action,omitempty"` - Attributes []string `yaml:"attributes,omitempty"` - Regex relabel.Regexp `yaml:"regex,omitempty"` + Action Action `yaml:"action,omitempty" doc:"description=Configures action to take on matching attributes. It allows one of [structured_metadata, drop] for all attribute types. It additionally allows index_label action for resource attributes"` + Attributes []string `yaml:"attributes,omitempty" doc:"description=List of attributes to configure how to store them or drop them altogether"` + Regex relabel.Regexp `yaml:"regex,omitempty" doc:"description=Regex to choose attributes to configure how to store them or drop them altogether"` } func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -146,8 +146,8 @@ func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro } type ResourceAttributesConfig struct { - IgnoreDefaults bool `yaml:"ignore_defaults,omitempty"` - AttributesConfig []AttributesConfig `yaml:"attributes,omitempty"` + IgnoreDefaults bool `yaml:"ignore_defaults,omitempty" doc:"default=false|description=Configure whether to ignore the default list of resource attributes to be stored as index labels and only use the given resource attributes config"` + AttributesConfig []AttributesConfig `yaml:"attributes_config,omitempty"` } func (c *ResourceAttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { diff --git a/pkg/loghttp/push/otlp_config_test.go b/pkg/loghttp/push/otlp_config_test.go index a1cfc15ff52c8..5fa6251628507 100644 --- a/pkg/loghttp/push/otlp_config_test.go +++ b/pkg/loghttp/push/otlp_config_test.go @@ -19,7 +19,7 @@ func TestUnmarshalOTLPConfig(t *testing.T) { name: "only resource_attributes set", yamlConfig: []byte(` resource_attributes: - attributes: + attributes_config: - action: index_label regex: foo`), expectedCfg: OTLPConfig{ @@ -39,7 +39,7 @@ resource_attributes: yamlConfig: []byte(` resource_attributes: ignore_defaults: true - attributes: + attributes_config: - action: index_label regex: foo`), expectedCfg: OTLPConfig{ @@ -82,7 +82,7 @@ scope_attributes: name: "all 3 set", yamlConfig: []byte(` resource_attributes: - attributes: + attributes_config: - action: index_label regex: foo scope_attributes: diff --git a/pkg/ruler/base/api.go b/pkg/ruler/base/api.go index 53fb3e457460c..04a303993228b 100644 --- a/pkg/ruler/base/api.go +++ b/pkg/ruler/base/api.go @@ -2,6 +2,7 @@ package base import ( "encoding/json" + "fmt" "io" "net/http" "net/url" @@ -101,10 +102,10 @@ type recordingRule struct { EvaluationTime float64 `json:"evaluationTime"` } -func respondError(logger log.Logger, w http.ResponseWriter, msg string) { +func respondError(logger log.Logger, w http.ResponseWriter, status int, errorType v1.ErrorType, msg string) { b, err := json.Marshal(&response{ Status: "error", - ErrorType: v1.ErrServer, + ErrorType: errorType, Error: msg, Data: nil, }) @@ -115,12 +116,20 @@ func respondError(logger log.Logger, w http.ResponseWriter, msg string) { return } - w.WriteHeader(http.StatusInternalServerError) + w.WriteHeader(status) if n, err := w.Write(b); err != nil { level.Error(logger).Log("msg", "error writing response", "bytesWritten", n, "err", err) } } +func respondInvalidRequest(logger log.Logger, w http.ResponseWriter, msg string) { + respondError(logger, w, http.StatusBadRequest, v1.ErrBadData, msg) +} + +func respondServerError(logger log.Logger, w http.ResponseWriter, msg string) { + respondError(logger, w, http.StatusInternalServerError, v1.ErrServer, msg) +} + // API is used to handle HTTP requests for the ruler service type API struct { ruler *Ruler @@ -143,15 +152,34 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { userID, err := tenant.TenantID(req.Context()) if err != nil || userID == "" { level.Error(logger).Log("msg", "error extracting org id from context", "err", err) - respondError(logger, w, "no valid org id found") + respondServerError(logger, w, "no valid org id found") return } - w.Header().Set("Content-Type", "application/json") - rgs, err := a.ruler.GetRules(req.Context()) + var rulesReq = RulesRequest{ + Filter: AnyRule, + RuleName: req.URL.Query()["rule_name"], + RuleGroup: req.URL.Query()["rule_group"], + File: req.URL.Query()["file"], + } + + ruleTypeFilter := strings.ToLower(req.URL.Query().Get("type")) + if ruleTypeFilter != "" { + switch ruleTypeFilter { + case "alert": + rulesReq.Filter = AlertingRule + case "record": + rulesReq.Filter = RecordingRule + default: + respondInvalidRequest(logger, w, fmt.Sprintf("not supported value %q", ruleTypeFilter)) + return + } + } + + rgs, err := a.ruler.GetRules(req.Context(), &rulesReq) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -221,7 +249,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { }) if err != nil { level.Error(logger).Log("msg", "error marshaling json response", "err", err) - respondError(logger, w, "unable to marshal the requested data") + respondServerError(logger, w, "unable to marshal the requested data") return } w.Header().Set("Content-Type", "application/json") @@ -236,15 +264,15 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) { userID, err := tenant.TenantID(req.Context()) if err != nil || userID == "" { level.Error(logger).Log("msg", "error extracting org id from context", "err", err) - respondError(logger, w, "no valid org id found") + respondServerError(logger, w, "no valid org id found") return } w.Header().Set("Content-Type", "application/json") - rgs, err := a.ruler.GetRules(req.Context()) + rgs, err := a.ruler.GetRules(req.Context(), &RulesRequest{Filter: AlertingRule}) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -272,7 +300,7 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) { }) if err != nil { level.Error(logger).Log("msg", "error marshaling json response", "err", err) - respondError(logger, w, "unable to marshal the requested data") + respondServerError(logger, w, "unable to marshal the requested data") return } w.Header().Set("Content-Type", "application/json") @@ -314,7 +342,7 @@ func respondAccepted(w http.ResponseWriter, logger log.Logger) { }) if err != nil { level.Error(logger).Log("msg", "error marshaling json response", "err", err) - respondError(logger, w, "unable to marshal the requested data") + respondServerError(logger, w, "unable to marshal the requested data") return } w.Header().Set("Content-Type", "application/json") @@ -466,7 +494,7 @@ func (a *API) ListRules(w http.ResponseWriter, req *http.Request) { pr, err := parseRequest(req, false, false) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -504,7 +532,7 @@ func (a *API) GetRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), a.logger) pr, err := parseRequest(req, true, true) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -526,7 +554,7 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), a.logger) pr, err := parseRequest(req, true, false) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -600,7 +628,7 @@ func (a *API) DeleteNamespace(w http.ResponseWriter, req *http.Request) { pr, err := parseRequest(req, true, false) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -610,7 +638,7 @@ func (a *API) DeleteNamespace(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusNotFound) return } - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -622,7 +650,7 @@ func (a *API) DeleteRuleGroup(w http.ResponseWriter, req *http.Request) { pr, err := parseRequest(req, true, true) if err != nil { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } @@ -632,7 +660,7 @@ func (a *API) DeleteRuleGroup(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusNotFound) return } - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } diff --git a/pkg/ruler/base/api_test.go b/pkg/ruler/base/api_test.go index 9f0e7b46cbd79..c14f5de8d4614 100644 --- a/pkg/ruler/base/api_test.go +++ b/pkg/ruler/base/api_test.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/http/httptest" + "net/url" "strings" "testing" "time" @@ -16,54 +17,102 @@ import ( "github.com/gorilla/mux" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/rulefmt" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" "github.com/grafana/loki/pkg/ruler/rulespb" ) -func TestRuler_rules(t *testing.T) { - cfg := defaultRulerConfig(t, newMockRuleStore(mockRules)) - - r := newTestRuler(t, cfg) - defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck +func TestRuler_PrometheusRules(t *testing.T) { + const ( + userID = "user1" + interval = time.Minute + ) - a := NewAPI(r, r.store, log.NewNopLogger()) + groupName := func(group int) string { + return fmt.Sprintf("group%d+", group) + } - req := requestFor(t, "GET", "https://localhost:8080/api/prom/api/v1/rules", nil, "user1") - w := httptest.NewRecorder() - a.PrometheusRules(w, req) + namespaceName := func(ns int) string { + return fmt.Sprintf("namespace%d+", ns) + } - resp := w.Result() - body, _ := io.ReadAll(resp.Body) + makeFilterTestRules := func() rulespb.RuleGroupList { + result := rulespb.RuleGroupList{} + for ns := 1; ns <= 3; ns++ { + for group := 1; group <= 3; group++ { + g := &rulespb.RuleGroupDesc{ + Name: groupName(group), + Namespace: namespaceName(ns), + User: userID, + Rules: []*rulespb.RuleDesc{ + createRecordingRule("NonUniqueNamedRule", `count_over_time({foo="bar"}[5m])`), + createAlertingRule(fmt.Sprintf("UniqueNamedRuleN%dG%d", ns, group), `count_over_time({foo="bar"}[5m]) < 1`), + }, + Interval: interval, + } + result = append(result, g) + } + } + return result + } - // Check status code and status response - responseJSON := response{} - err := json.Unmarshal(body, &responseJSON) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) - require.Equal(t, responseJSON.Status, "success") + filterTestExpectedRule := func(name string) *recordingRule { + return &recordingRule{ + Name: name, + Query: `count_over_time({foo="bar"}[5m])`, + Health: "unknown", + Type: "recording", + } + } + filterTestExpectedAlert := func(name string) *alertingRule { + return &alertingRule{ + Name: name, + Query: `count_over_time({foo="bar"}[5m]) < 1`, + State: "inactive", + Health: "unknown", + Type: "alerting", + Alerts: []*Alert{}, + } + } - // Testing the running rules for user1 in the mock store - expectedResponse, _ := json.Marshal(response{ - Status: "success", - Data: &RuleDiscovery{ - RuleGroups: []*RuleGroup{ + testCases := map[string]struct { + configuredRules rulespb.RuleGroupList + expectedConfigured int + expectedStatusCode int + expectedErrorType v1.ErrorType + expectedRules []*RuleGroup + queryParams string + }{ + "should load and evaluate the configured rules": { + configuredRules: rulespb.RuleGroupList{ + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "namespace1", + User: userID, + Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`), createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) < 1`)}, + Interval: interval, + }, + }, + expectedConfigured: 1, + expectedRules: []*RuleGroup{ { Name: "group1", File: "namespace1", Rules: []rule{ &recordingRule{ - Name: "UP_RULE", - Query: "up", + Name: "COUNT_RULE", + Query: `count_over_time({foo="bar"}[5m])`, Health: "unknown", Type: "recording", }, &alertingRule{ - Name: "UP_ALERT", - Query: "up < 1", + Name: "COUNT_ALERT", + Query: `count_over_time({foo="bar"}[5m]) < 1`, State: "inactive", Health: "unknown", Type: "alerting", @@ -71,55 +120,34 @@ func TestRuler_rules(t *testing.T) { }, }, Interval: 60, - Limit: 10, }, }, }, - }) - - require.Equal(t, string(expectedResponse), string(body)) -} - -func TestRuler_rules_special_characters(t *testing.T) { - cfg := defaultRulerConfig(t, newMockRuleStore(mockSpecialCharRules)) - - r := newTestRuler(t, cfg) - defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck - - a := NewAPI(r, r.store, log.NewNopLogger()) - - req := requestFor(t, http.MethodGet, "https://localhost:8080/api/prom/api/v1/rules", nil, "user1") - w := httptest.NewRecorder() - a.PrometheusRules(w, req) - - resp := w.Result() - body, _ := io.ReadAll(resp.Body) - - // Check status code and status response - responseJSON := response{} - err := json.Unmarshal(body, &responseJSON) - require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) - require.Equal(t, responseJSON.Status, "success") - - // Testing the running rules for user1 in the mock store - expectedResponse, _ := json.Marshal(response{ - Status: "success", - Data: &RuleDiscovery{ - RuleGroups: []*RuleGroup{ + "should load and evaluate rule groups and namespaces with special characters": { + configuredRules: rulespb.RuleGroupList{ + &rulespb.RuleGroupDesc{ + Name: ")(_+?/|group1+/?", + Namespace: ")(_+?/|namespace1+/?", + User: userID, + Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`), createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) < 1`)}, + Interval: interval, + }, + }, + expectedConfigured: 1, + expectedRules: []*RuleGroup{ { Name: ")(_+?/|group1+/?", File: ")(_+?/|namespace1+/?", Rules: []rule{ &recordingRule{ - Name: "UP_RULE", - Query: "up", + Name: "COUNT_RULE", + Query: `count_over_time({foo="bar"}[5m])`, Health: "unknown", Type: "recording", }, &alertingRule{ - Name: "UP_ALERT", - Query: "up < 1", + Name: "COUNT_ALERT", + Query: `count_over_time({foo="bar"}[5m]) < 1`, State: "inactive", Health: "unknown", Type: "alerting", @@ -127,16 +155,407 @@ func TestRuler_rules_special_characters(t *testing.T) { }, }, Interval: 60, - Limit: 10, }, }, }, - }) + "API returns only alerts": { + configuredRules: rulespb.RuleGroupList{ + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "namespace1", + User: userID, + Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`), createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) < 1`)}, + Interval: interval, + }, + }, + expectedConfigured: 1, + queryParams: "?type=alert", + expectedRules: []*RuleGroup{ + { + Name: "group1", + File: "namespace1", + Rules: []rule{ + &alertingRule{ + Name: "COUNT_ALERT", + Query: `count_over_time({foo="bar"}[5m]) < 1`, + State: "inactive", + Health: "unknown", + Type: "alerting", + Alerts: []*Alert{}, + }, + }, + Interval: 60, + }, + }, + }, + "API returns only rules": { + configuredRules: rulespb.RuleGroupList{ + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "namespace1", + User: userID, + Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`), createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) < 1`)}, + Interval: interval, + }, + }, + expectedConfigured: 1, + queryParams: "?type=record", + expectedRules: []*RuleGroup{ + { + Name: "group1", + File: "namespace1", + Rules: []rule{ + &recordingRule{ + Name: "COUNT_RULE", + Query: `count_over_time({foo="bar"}[5m])`, + Health: "unknown", + Type: "recording", + }, + }, + Interval: 60, + }, + }, + }, + "Invalid type param": { + configuredRules: rulespb.RuleGroupList{}, + expectedConfigured: 0, + queryParams: "?type=foo", + expectedStatusCode: http.StatusBadRequest, + expectedErrorType: v1.ErrBadData, + expectedRules: []*RuleGroup{}, + }, + "when filtering by an unknown namespace then the API returns nothing": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?file=unknown", + expectedRules: []*RuleGroup{}, + }, + "when filtering by a single known namespace then the API returns only rules from that namespace": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{"file": []string{namespaceName(1)}}.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(1), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G1"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G3"), + }, + Interval: 60, + }, + }, + }, + "when filtering by a multiple known namespaces then the API returns rules from both namespaces": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{"file": []string{namespaceName(1), namespaceName(2)}}.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(1), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G1"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G3"), + }, + Interval: 60, + }, + { + Name: groupName(1), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN2G1"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN2G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN2G3"), + }, + Interval: 60, + }, + }, + }, + "when filtering by an unknown group then the API returns nothing": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?rule_group=unknown", + expectedRules: []*RuleGroup{}, + }, + "when filtering by a known group then the API returns only rules from that group": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{"rule_group": []string{groupName(2)}}.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(2), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G2"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN2G2"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(3), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN3G2"), + }, + Interval: 60, + }, + }, + }, + "when filtering by multiple known groups then the API returns rules from both groups": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{"rule_group": []string{groupName(2), groupName(3)}}.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(2), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN1G3"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN2G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN2G3"), + }, + Interval: 60, + }, + { + Name: groupName(2), + File: namespaceName(3), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN3G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(3), + Rules: []rule{ + filterTestExpectedRule("NonUniqueNamedRule"), + filterTestExpectedAlert("UniqueNamedRuleN3G3"), + }, + Interval: 60, + }, + }, + }, - require.Equal(t, string(expectedResponse), string(body)) + "when filtering by an unknown rule name then the API returns all empty groups": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?rule_name=unknown", + expectedRules: []*RuleGroup{}, + }, + "when filtering by a known rule name then the API returns only rules with that name": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{"rule_name": []string{"UniqueNamedRuleN1G2"}}.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(2), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedAlert("UniqueNamedRuleN1G2"), + }, + Interval: 60, + }, + }, + }, + "when filtering by multiple known rule names then the API returns both rules": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{"rule_name": []string{"UniqueNamedRuleN1G2", "UniqueNamedRuleN2G3"}}.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(2), + File: namespaceName(1), + Rules: []rule{ + filterTestExpectedAlert("UniqueNamedRuleN1G2"), + }, + Interval: 60, + }, + { + Name: groupName(3), + File: namespaceName(2), + Rules: []rule{ + filterTestExpectedAlert("UniqueNamedRuleN2G3"), + }, + Interval: 60, + }, + }, + }, + "when filtering by a known namespace and group then the API returns only rules from that namespace and group": { + configuredRules: makeFilterTestRules(), + expectedConfigured: len(makeFilterTestRules()), + queryParams: "?" + url.Values{ + "file": []string{namespaceName(3)}, + "rule_group": []string{groupName(2)}, + }.Encode(), + expectedRules: []*RuleGroup{ + { + Name: groupName(2), + File: namespaceName(3), + Rules: []rule{ + &recordingRule{ + Name: "NonUniqueNamedRule", + Query: `count_over_time({foo="bar"}[5m])`, + Health: "unknown", + Type: "recording", + }, + &alertingRule{ + Name: "UniqueNamedRuleN3G2", + Query: `count_over_time({foo="bar"}[5m]) < 1`, + State: "inactive", + Health: "unknown", + Type: "alerting", + Alerts: []*Alert{}, + }, + }, + Interval: 60, + }, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + storageRules := map[string]rulespb.RuleGroupList{ + userID: tc.configuredRules, + } + cfg := defaultRulerConfig(t, newMockRuleStore(storageRules)) + + r := newTestRuler(t, cfg) + defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + a := NewAPI(r, r.store, log.NewNopLogger()) + + req := requestFor(t, "GET", "https://localhost:8080/api/prom/api/v1/rules"+tc.queryParams, nil, "user1") + w := httptest.NewRecorder() + a.PrometheusRules(w, req) + + resp := w.Result() + if tc.expectedStatusCode != 0 { + require.Equal(t, tc.expectedStatusCode, resp.StatusCode) + } else { + require.Equal(t, http.StatusOK, resp.StatusCode) + } + + body, _ := io.ReadAll(resp.Body) + + // Check status code and status response + responseJSON := response{} + err := json.Unmarshal(body, &responseJSON) + require.NoError(t, err) + + if tc.expectedErrorType != "" { + assert.Equal(t, "error", responseJSON.Status) + assert.Equal(t, tc.expectedErrorType, responseJSON.ErrorType) + return + } + require.Equal(t, responseJSON.Status, "success") + + // Testing the running rules + expectedResponse, err := json.Marshal(response{ + Status: "success", + Data: &RuleDiscovery{ + RuleGroups: tc.expectedRules, + }, + }) + + require.NoError(t, err) + require.Equal(t, string(expectedResponse), string(body)) + }) + } } -func TestRuler_alerts(t *testing.T) { +func TestRuler_PrometheusAlerts(t *testing.T) { cfg := defaultRulerConfig(t, newMockRuleStore(mockRules)) r := newTestRuler(t, cfg) @@ -593,3 +1012,17 @@ func requestFor(t *testing.T, method string, url string, body io.Reader, userID return req.WithContext(ctx) } + +func createRecordingRule(record, expr string) *rulespb.RuleDesc { + return &rulespb.RuleDesc{ + Record: record, + Expr: expr, + } +} + +func createAlertingRule(alert, expr string) *rulespb.RuleDesc { + return &rulespb.RuleDesc{ + Alert: alert, + Expr: expr, + } +} diff --git a/pkg/ruler/base/ruler.go b/pkg/ruler/base/ruler.go index eba29f0baed2e..a0dc0df08f2d7 100644 --- a/pkg/ruler/base/ruler.go +++ b/pkg/ruler/base/ruler.go @@ -799,26 +799,68 @@ func RemoveRuleTokenFromGroupName(name string) string { // GetRules retrieves the running rules from this ruler and all running rulers in the ring if // sharding is enabled -func (r *Ruler) GetRules(ctx context.Context) ([]*GroupStateDesc, error) { +func (r *Ruler) GetRules(ctx context.Context, req *RulesRequest) ([]*GroupStateDesc, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, fmt.Errorf("no user id found in context") } if r.cfg.EnableSharding { - return r.getShardedRules(ctx, userID) + return r.getShardedRules(ctx, userID, req) } - return r.getLocalRules(userID) + return r.getLocalRules(userID, req) } -func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { +type StringFilterSet map[string]struct{} + +func makeStringFilterSet(values []string) StringFilterSet { + set := make(map[string]struct{}, len(values)) + for _, v := range values { + set[v] = struct{}{} + } + return set +} + +// IsFiltered returns whether to filter the value or not. +// If the set is empty, then nothing is filtered. +func (fs StringFilterSet) IsFiltered(val string) bool { + if len(fs) == 0 { + return false + } + _, ok := fs[val] + return !ok +} + +func (r *Ruler) getLocalRules(userID string, req *RulesRequest) ([]*GroupStateDesc, error) { + var getRecordingRules, getAlertingRules bool + + switch req.Filter { + case AlertingRule: + getAlertingRules = true + case RecordingRule: + getRecordingRules = true + case AnyRule: + getAlertingRules = true + getRecordingRules = true + default: + return nil, fmt.Errorf("unexpected rule filter %s", req.Filter) + } + + fileSet := makeStringFilterSet(req.File) + groupSet := makeStringFilterSet(req.RuleGroup) + ruleSet := makeStringFilterSet(req.RuleName) + groups := r.manager.GetRules(userID) groupDescs := make([]*GroupStateDesc, 0, len(groups)) prefix := filepath.Join(r.cfg.RulePath, userID) + "/" for _, group := range groups { + if groupSet.IsFiltered(group.Name()) { + continue + } + interval := group.Interval() // The mapped filename is url path escaped encoded to make handling `/` characters easier @@ -827,6 +869,10 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { return nil, errors.Wrap(err, "unable to decode rule filename") } + if fileSet.IsFiltered(decodedNamespace) { + continue + } + groupDesc := &GroupStateDesc{ Group: &rulespb.RuleGroupDesc{ Name: group.Name(), @@ -840,6 +886,10 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { EvaluationDuration: group.GetEvaluationTime(), } for _, r := range group.Rules() { + if ruleSet.IsFiltered(r.Name()) { + continue + } + lastError := "" if r.LastError() != nil { lastError = r.LastError().Error() @@ -848,6 +898,10 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { var ruleDesc *RuleStateDesc switch rule := r.(type) { case *promRules.AlertingRule: + if !getAlertingRules { + continue + } + rule.ActiveAlerts() alerts := []*AlertStateDesc{} for _, a := range rule.ActiveAlerts() { @@ -879,6 +933,10 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { EvaluationDuration: rule.GetEvaluationDuration(), } case *promRules.RecordingRule: + if !getRecordingRules { + continue + } + ruleDesc = &RuleStateDesc{ Rule: &rulespb.RuleDesc{ Record: rule.Name(), @@ -895,12 +953,16 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { } groupDesc.ActiveRules = append(groupDesc.ActiveRules, ruleDesc) } - groupDescs = append(groupDescs, groupDesc) + + // Prometheus does not return a rule group if it has no rules after filtering. + if len(groupDesc.ActiveRules) > 0 { + groupDescs = append(groupDescs, groupDesc) + } } return groupDescs, nil } -func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupStateDesc, error) { +func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesReq *RulesRequest) ([]*GroupStateDesc, error) { ring := ring.ReadRing(r.ring) if shardSize := r.limits.RulerTenantShardSize(userID); shardSize > 0 && r.cfg.ShardingStrategy == util.ShardingStrategyShuffle { @@ -933,7 +995,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta return errors.Wrapf(err, "unable to get client for ruler %s", addr) } - newGrps, err := rulerClient.Rules(ctx, &RulesRequest{}) + newGrps, err := rulerClient.Rules(ctx, rulesReq) if err != nil || newGrps == nil { return fmt.Errorf("unable to retrieve rules from ruler %s: %w", addr, err) } @@ -975,13 +1037,13 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta } // Rules implements the rules service -func (r *Ruler) Rules(ctx context.Context, _ *RulesRequest) (*RulesResponse, error) { +func (r *Ruler) Rules(ctx context.Context, req *RulesRequest) (*RulesResponse, error) { userID, err := tenant.TenantID(ctx) if err != nil { return nil, fmt.Errorf("no user id found in context") } - groupDescs, err := r.getLocalRules(userID) + groupDescs, err := r.getLocalRules(userID, req) if err != nil { return nil, err } @@ -1033,7 +1095,7 @@ func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Reque err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) { - respondError(logger, w, err.Error()) + respondServerError(logger, w, err.Error()) return } diff --git a/pkg/ruler/base/ruler.pb.go b/pkg/ruler/base/ruler.pb.go index b53b55afc96ee..63904a26e3927 100644 --- a/pkg/ruler/base/ruler.pb.go +++ b/pkg/ruler/base/ruler.pb.go @@ -22,6 +22,7 @@ import ( math "math" math_bits "math/bits" reflect "reflect" + strconv "strconv" strings "strings" time "time" ) @@ -38,7 +39,35 @@ var _ = time.Kitchen // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package +type RulesRequest_RuleType int32 + +const ( + AnyRule RulesRequest_RuleType = 0 + AlertingRule RulesRequest_RuleType = 1 + RecordingRule RulesRequest_RuleType = 2 +) + +var RulesRequest_RuleType_name = map[int32]string{ + 0: "AnyRule", + 1: "AlertingRule", + 2: "RecordingRule", +} + +var RulesRequest_RuleType_value = map[string]int32{ + "AnyRule": 0, + "AlertingRule": 1, + "RecordingRule": 2, +} + +func (RulesRequest_RuleType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_ca810a0fd7057a73, []int{0, 0} +} + type RulesRequest struct { + Filter RulesRequest_RuleType `protobuf:"varint,1,opt,name=filter,proto3,enum=base.RulesRequest_RuleType" json:"filter,omitempty"` + RuleName []string `protobuf:"bytes,2,rep,name=rule_name,json=ruleName,proto3" json:"rule_name,omitempty"` + RuleGroup []string `protobuf:"bytes,3,rep,name=rule_group,json=ruleGroup,proto3" json:"rule_group,omitempty"` + File []string `protobuf:"bytes,4,rep,name=file,proto3" json:"file,omitempty"` } func (m *RulesRequest) Reset() { *m = RulesRequest{} } @@ -73,6 +102,34 @@ func (m *RulesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_RulesRequest proto.InternalMessageInfo +func (m *RulesRequest) GetFilter() RulesRequest_RuleType { + if m != nil { + return m.Filter + } + return AnyRule +} + +func (m *RulesRequest) GetRuleName() []string { + if m != nil { + return m.RuleName + } + return nil +} + +func (m *RulesRequest) GetRuleGroup() []string { + if m != nil { + return m.RuleGroup + } + return nil +} + +func (m *RulesRequest) GetFile() []string { + if m != nil { + return m.File + } + return nil +} + type RulesResponse struct { Groups []*GroupStateDesc `protobuf:"bytes,1,rep,name=groups,proto3" json:"groups,omitempty"` } @@ -370,6 +427,7 @@ func (m *AlertStateDesc) GetValidUntil() time.Time { } func init() { + proto.RegisterEnum("base.RulesRequest_RuleType", RulesRequest_RuleType_name, RulesRequest_RuleType_value) proto.RegisterType((*RulesRequest)(nil), "base.RulesRequest") proto.RegisterType((*RulesResponse)(nil), "base.RulesResponse") proto.RegisterType((*GroupStateDesc)(nil), "base.GroupStateDesc") @@ -380,52 +438,66 @@ func init() { func init() { proto.RegisterFile("pkg/ruler/base/ruler.proto", fileDescriptor_ca810a0fd7057a73) } var fileDescriptor_ca810a0fd7057a73 = []byte{ - // 682 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x54, 0x41, 0x6f, 0xd3, 0x30, - 0x14, 0x8e, 0xbb, 0xb6, 0x6b, 0xdd, 0x31, 0x24, 0x6f, 0x42, 0x59, 0x01, 0x77, 0x2a, 0x97, 0x09, - 0xa1, 0x44, 0x0c, 0x84, 0x84, 0x10, 0x42, 0x9d, 0x36, 0xb8, 0xec, 0x80, 0x32, 0xe0, 0x3a, 0xb9, - 0xad, 0x97, 0x45, 0xf3, 0xe2, 0x60, 0x3b, 0x95, 0xb8, 0xf1, 0x13, 0x76, 0xe4, 0xca, 0x8d, 0x9f, - 0xb2, 0xe3, 0x8e, 0x13, 0x48, 0x83, 0x65, 0x17, 0x8e, 0xfb, 0x01, 0x1c, 0x90, 0xed, 0x64, 0xc9, - 0x60, 0x1c, 0x2a, 0xb4, 0x4b, 0x6b, 0xbf, 0xf7, 0xbe, 0xef, 0x7b, 0xef, 0xb3, 0x1d, 0xd8, 0x4d, - 0xf6, 0x42, 0x5f, 0xa4, 0x8c, 0x0a, 0x7f, 0x48, 0x24, 0xb5, 0x4b, 0x2f, 0x11, 0x5c, 0x71, 0x54, - 0xd7, 0x91, 0xee, 0x62, 0xc8, 0x43, 0x6e, 0x02, 0xbe, 0x5e, 0xd9, 0x5c, 0x17, 0x87, 0x9c, 0x87, - 0x8c, 0xfa, 0x66, 0x37, 0x4c, 0x77, 0xfc, 0x71, 0x2a, 0x88, 0x8a, 0x78, 0x9c, 0xe7, 0x7b, 0x7f, - 0xe6, 0x55, 0xb4, 0x4f, 0xa5, 0x22, 0xfb, 0x49, 0x5e, 0x70, 0x5b, 0x0b, 0x33, 0x1e, 0x5a, 0xe6, - 0x62, 0x91, 0x27, 0xef, 0x96, 0x5d, 0xe9, 0x5f, 0x99, 0x0c, 0xed, 0xbf, 0x4d, 0xf7, 0xe7, 0xe1, - 0x5c, 0xa0, 0xb7, 0x01, 0x7d, 0x9f, 0x52, 0xa9, 0xfa, 0xcf, 0xe1, 0x8d, 0x7c, 0x2f, 0x13, 0x1e, - 0x4b, 0x8a, 0x1e, 0xc0, 0x66, 0x28, 0x78, 0x9a, 0x48, 0x17, 0x2c, 0xcf, 0xac, 0x74, 0x56, 0x17, - 0x3d, 0x3d, 0x8a, 0xf7, 0x4a, 0xc7, 0xb6, 0x14, 0x51, 0x74, 0x9d, 0xca, 0x51, 0x90, 0xd7, 0xf4, - 0x3f, 0xd7, 0xe0, 0xfc, 0xe5, 0x14, 0xba, 0x0f, 0x1b, 0x26, 0xe9, 0x82, 0x65, 0x60, 0xf0, 0x56, - 0x5e, 0xab, 0x98, 0x4a, 0x83, 0xb7, 0x25, 0xe8, 0x09, 0x9c, 0x23, 0x23, 0x15, 0x4d, 0xe8, 0xb6, - 0x29, 0x72, 0x6b, 0x46, 0x72, 0xc1, 0x4a, 0x6a, 0x44, 0xa9, 0xd8, 0xb1, 0x85, 0xa6, 0x59, 0xf4, - 0x0e, 0x2e, 0xd0, 0x09, 0x61, 0xa9, 0xb1, 0xed, 0x4d, 0x61, 0x8f, 0x3b, 0x63, 0x14, 0xbb, 0x9e, - 0x35, 0xd0, 0x2b, 0x0c, 0xf4, 0x2e, 0x2a, 0xd6, 0x5a, 0x87, 0x27, 0x3d, 0xe7, 0xe0, 0x7b, 0x0f, - 0x04, 0x57, 0x11, 0xa0, 0x2d, 0x88, 0xca, 0xf0, 0x7a, 0x7e, 0x2c, 0x6e, 0xdd, 0xd0, 0x2e, 0xfd, - 0x45, 0x5b, 0x14, 0x58, 0xd6, 0x4f, 0x9a, 0xf5, 0x0a, 0x78, 0xff, 0x5b, 0xcd, 0x7a, 0x5c, 0x5a, - 0x74, 0x0f, 0xd6, 0xf5, 0xbc, 0xb9, 0x43, 0x37, 0x2b, 0x0e, 0x99, 0x51, 0x4d, 0x12, 0x2d, 0xc2, - 0x86, 0xd4, 0x08, 0xb7, 0xb6, 0x0c, 0x56, 0xda, 0x81, 0xdd, 0xa0, 0x5b, 0xb0, 0xb9, 0x4b, 0x09, - 0x53, 0xbb, 0x66, 0xd8, 0x76, 0x90, 0xef, 0xd0, 0x1d, 0xd8, 0x66, 0x44, 0xaa, 0x0d, 0x21, 0xb8, - 0x30, 0x0d, 0xb7, 0x83, 0x32, 0xa0, 0x0f, 0x95, 0x30, 0x2a, 0x94, 0x74, 0x1b, 0xd5, 0x43, 0x1d, - 0xe8, 0x58, 0xe5, 0x50, 0x6d, 0xcd, 0xbf, 0xdc, 0x6d, 0x5e, 0x8f, 0xbb, 0xb3, 0xff, 0xe7, 0xee, - 0xaf, 0x3a, 0x9c, 0xbf, 0x3c, 0x47, 0xe9, 0x1c, 0xa8, 0x3a, 0xc7, 0x60, 0x93, 0x91, 0x21, 0x65, - 0xc5, 0x2d, 0x5b, 0xf2, 0x2e, 0x5e, 0xce, 0x26, 0x0d, 0xc9, 0xe8, 0xc3, 0xa6, 0xce, 0xbe, 0x26, - 0x91, 0x58, 0x7b, 0xaa, 0x15, 0xbf, 0x9e, 0xf4, 0x1e, 0x86, 0x91, 0xda, 0x4d, 0x87, 0xde, 0x88, - 0xef, 0xfb, 0xa1, 0x20, 0x3b, 0x24, 0x26, 0x3e, 0xe3, 0x7b, 0x91, 0x5f, 0x7d, 0x80, 0x9e, 0xc1, - 0x0d, 0xc6, 0x24, 0x51, 0x54, 0x04, 0xb9, 0x06, 0x9a, 0xc0, 0x0e, 0x89, 0x63, 0xae, 0x4c, 0x93, - 0xd2, 0x9d, 0xb9, 0x46, 0xc9, 0xaa, 0x90, 0x9e, 0x5d, 0x7b, 0x44, 0xcd, 0x1d, 0x00, 0x81, 0xdd, - 0xa0, 0x01, 0x6c, 0xe7, 0xef, 0x8c, 0x28, 0xb7, 0x31, 0xc5, 0x39, 0xb6, 0x2c, 0x6c, 0xa0, 0xd0, - 0x0b, 0xd8, 0xda, 0x89, 0x04, 0x1d, 0x6b, 0x86, 0x69, 0x6e, 0xc2, 0xac, 0x41, 0x0d, 0x14, 0xda, - 0x80, 0x1d, 0x41, 0x25, 0x67, 0x13, 0xcb, 0x31, 0x3b, 0x05, 0x07, 0x2c, 0x80, 0x03, 0x85, 0x5e, - 0xc2, 0x39, 0x7d, 0xaf, 0xb7, 0x25, 0x8d, 0x95, 0xe6, 0x69, 0x4d, 0xc3, 0xa3, 0x91, 0x5b, 0x34, - 0x56, 0xb6, 0x9d, 0x09, 0x61, 0xd1, 0x78, 0x3b, 0x8d, 0x55, 0xc4, 0xdc, 0xf6, 0x34, 0x34, 0x06, - 0xf8, 0x56, 0xe3, 0x56, 0x9f, 0xc1, 0x86, 0x7e, 0xb7, 0x02, 0xad, 0xda, 0x85, 0x44, 0xa8, 0xfc, - 0x7a, 0x15, 0x5f, 0xd9, 0xee, 0xc2, 0xa5, 0x98, 0xfd, 0xd2, 0xf6, 0x9d, 0xb5, 0xc7, 0x47, 0xa7, - 0xd8, 0x39, 0x3e, 0xc5, 0xce, 0xf9, 0x29, 0x06, 0x1f, 0x33, 0x0c, 0xbe, 0x64, 0x18, 0x1c, 0x66, - 0x18, 0x1c, 0x65, 0x18, 0xfc, 0xc8, 0x30, 0xf8, 0x99, 0x61, 0xe7, 0x3c, 0xc3, 0xe0, 0xe0, 0x0c, - 0x3b, 0x47, 0x67, 0xd8, 0x39, 0x3e, 0xc3, 0xce, 0xb0, 0x69, 0x9a, 0x7b, 0xf4, 0x3b, 0x00, 0x00, - 0xff, 0xff, 0x35, 0xb6, 0x62, 0xce, 0x80, 0x06, 0x00, 0x00, + // 791 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0x4f, 0x4f, 0xdb, 0x48, + 0x14, 0xf7, 0xe4, 0xbf, 0x27, 0x81, 0x65, 0x07, 0xb4, 0x32, 0x61, 0x71, 0xa2, 0xec, 0x25, 0x5a, + 0xad, 0x6c, 0x6d, 0x58, 0xad, 0xb4, 0xda, 0x5d, 0x55, 0x41, 0xd0, 0x5e, 0x50, 0x55, 0x19, 0xda, + 0x6b, 0x34, 0x49, 0x26, 0xc6, 0x62, 0xe2, 0x71, 0xc7, 0xe3, 0x48, 0xdc, 0xfa, 0x11, 0x38, 0xf6, + 0xda, 0x5b, 0x3f, 0x0a, 0x47, 0x7a, 0x43, 0xad, 0x44, 0x4b, 0xb8, 0xf4, 0xc8, 0x07, 0xe8, 0xa1, + 0x9a, 0x19, 0x9b, 0x84, 0x42, 0x0f, 0x51, 0xc5, 0x05, 0xe6, 0xfd, 0xf9, 0xfd, 0xde, 0x7b, 0xbf, + 0x79, 0xe3, 0xc0, 0x7a, 0x74, 0xe4, 0xbb, 0x3c, 0xa1, 0x84, 0xbb, 0x7d, 0x1c, 0x13, 0x7d, 0x74, + 0x22, 0xce, 0x04, 0x43, 0x05, 0xe9, 0xa9, 0xaf, 0xf9, 0xcc, 0x67, 0xca, 0xe1, 0xca, 0x93, 0x8e, + 0xd5, 0x6d, 0x9f, 0x31, 0x9f, 0x12, 0x57, 0x59, 0xfd, 0x64, 0xe4, 0x0e, 0x13, 0x8e, 0x45, 0xc0, + 0xc2, 0x34, 0xde, 0xf8, 0x36, 0x2e, 0x82, 0x31, 0x89, 0x05, 0x1e, 0x47, 0x69, 0xc2, 0x86, 0x2c, + 0x4c, 0x99, 0xaf, 0x99, 0xb3, 0x43, 0x1a, 0xdc, 0x9c, 0x75, 0x25, 0xff, 0xc6, 0x51, 0x5f, 0xff, + 0xd7, 0xe1, 0xd6, 0x3b, 0x00, 0x6b, 0x9e, 0xb4, 0x3d, 0xf2, 0x32, 0x21, 0xb1, 0x40, 0x5b, 0xb0, + 0x34, 0x0a, 0xa8, 0x20, 0xdc, 0x02, 0x4d, 0xd0, 0x5e, 0xee, 0x6c, 0x38, 0xb2, 0x75, 0x67, 0x3e, + 0x47, 0x19, 0x07, 0xc7, 0x11, 0xf1, 0xd2, 0x54, 0xb4, 0x01, 0x4d, 0x49, 0xda, 0x0b, 0xf1, 0x98, + 0x58, 0xb9, 0x66, 0xbe, 0x6d, 0x7a, 0x15, 0xe9, 0x78, 0x8a, 0xc7, 0x04, 0x6d, 0x42, 0xa8, 0x82, + 0x3e, 0x67, 0x49, 0x64, 0xe5, 0x55, 0x54, 0xa5, 0x3f, 0x91, 0x0e, 0x84, 0x60, 0x61, 0x14, 0x50, + 0x62, 0x15, 0x54, 0x40, 0x9d, 0x5b, 0xff, 0xc1, 0x4a, 0x56, 0x03, 0x55, 0x61, 0xb9, 0x1b, 0x1e, + 0x4b, 0x73, 0xc5, 0x40, 0x2b, 0xb0, 0xd6, 0xa5, 0x84, 0x8b, 0x20, 0xf4, 0x95, 0x07, 0xa0, 0x9f, + 0xe1, 0x92, 0x47, 0x06, 0x8c, 0x0f, 0x33, 0x57, 0xae, 0xf5, 0x3f, 0x5c, 0x4a, 0xdb, 0x8d, 0x23, + 0x16, 0xc6, 0x04, 0xfd, 0x01, 0x4b, 0xaa, 0x78, 0x6c, 0x81, 0x66, 0xbe, 0x5d, 0xed, 0xac, 0xe9, + 0x99, 0x54, 0xfd, 0x7d, 0x81, 0x05, 0xd9, 0x21, 0xf1, 0xc0, 0x4b, 0x73, 0x5a, 0x6f, 0x72, 0x70, + 0xf9, 0x76, 0x08, 0xfd, 0x0e, 0x8b, 0xba, 0x7b, 0xa9, 0x89, 0xc4, 0x6b, 0x09, 0xbd, 0x6c, 0x08, + 0x85, 0xd7, 0x29, 0xe8, 0x6f, 0x58, 0xc3, 0x03, 0x11, 0x4c, 0x48, 0x4f, 0x25, 0x29, 0x39, 0xaa, + 0x9d, 0xd5, 0x99, 0x8c, 0xb3, 0x8a, 0x55, 0x9d, 0xa8, 0x9a, 0x45, 0x2f, 0xe0, 0x2a, 0x99, 0x60, + 0x9a, 0xa8, 0xab, 0x3f, 0xc8, 0xae, 0xd8, 0xca, 0xab, 0x8a, 0x75, 0x47, 0x2f, 0x81, 0x93, 0x2d, + 0x81, 0x73, 0x93, 0xb1, 0x5d, 0x39, 0xbd, 0x68, 0x18, 0x27, 0x1f, 0x1b, 0xc0, 0xbb, 0x8f, 0x00, + 0xed, 0x43, 0x34, 0x73, 0xef, 0xa4, 0xab, 0x65, 0x15, 0x14, 0xed, 0xfa, 0x1d, 0xda, 0x2c, 0x41, + 0xb3, 0xbe, 0x96, 0xac, 0xf7, 0xc0, 0x5b, 0x1f, 0x72, 0x5a, 0xe3, 0x99, 0x44, 0xbf, 0xc1, 0x82, + 0x9c, 0x37, 0x55, 0xe8, 0xa7, 0x39, 0x85, 0xd4, 0xa8, 0x2a, 0x88, 0xd6, 0x60, 0x31, 0x96, 0x08, + 0x2b, 0xd7, 0x04, 0x6d, 0xd3, 0xd3, 0x06, 0xfa, 0x05, 0x96, 0x0e, 0x09, 0xa6, 0xe2, 0x50, 0x0d, + 0x6b, 0x7a, 0xa9, 0x85, 0x7e, 0x85, 0x26, 0xc5, 0xb1, 0xd8, 0xe5, 0x9c, 0x71, 0xd5, 0xb0, 0xe9, + 0xcd, 0x1c, 0xf2, 0x52, 0xb1, 0x5c, 0x85, 0xd8, 0x2a, 0xce, 0x5f, 0xaa, 0x5a, 0x8f, 0xb9, 0x4b, + 0xd5, 0x39, 0xdf, 0x53, 0xb7, 0xf4, 0x30, 0xea, 0x96, 0x7f, 0x4c, 0xdd, 0x2f, 0x05, 0xb8, 0x7c, + 0x7b, 0x8e, 0x99, 0x72, 0x60, 0x5e, 0x39, 0x0a, 0x4b, 0x14, 0xf7, 0x09, 0xcd, 0xb6, 0x6c, 0xdd, + 0xb9, 0x79, 0xfd, 0x7b, 0xc4, 0xc7, 0x83, 0xe3, 0x3d, 0x19, 0x7d, 0x86, 0x03, 0xbe, 0xfd, 0x8f, + 0xac, 0xf8, 0xfe, 0xa2, 0xf1, 0xa7, 0x1f, 0x88, 0xc3, 0xa4, 0xef, 0x0c, 0xd8, 0xd8, 0xf5, 0x39, + 0x1e, 0xe1, 0x10, 0xbb, 0x94, 0x1d, 0x05, 0xee, 0xfc, 0x47, 0xc4, 0x51, 0xb8, 0xee, 0x10, 0x47, + 0x82, 0x70, 0x2f, 0xad, 0x81, 0x26, 0xb0, 0x8a, 0xc3, 0x90, 0x09, 0xd5, 0x64, 0xac, 0x5e, 0xf2, + 0x43, 0x95, 0x9c, 0x2f, 0x24, 0x67, 0x97, 0x1a, 0x11, 0xb5, 0x03, 0xc0, 0xd3, 0x06, 0xea, 0x42, + 0x33, 0x7d, 0x67, 0x58, 0x58, 0xc5, 0x05, 0xee, 0xb1, 0xa2, 0x61, 0x5d, 0x81, 0x1e, 0xc1, 0xca, + 0x28, 0xe0, 0x64, 0x28, 0x19, 0x16, 0xd9, 0x84, 0xb2, 0x42, 0x75, 0x05, 0xda, 0x85, 0x55, 0x4e, + 0x62, 0x46, 0x27, 0x9a, 0xa3, 0xbc, 0x00, 0x07, 0xcc, 0x80, 0x5d, 0x81, 0x1e, 0xc3, 0x9a, 0xdc, + 0xeb, 0x5e, 0x4c, 0x42, 0x21, 0x79, 0x2a, 0x8b, 0xf0, 0x48, 0xe4, 0x3e, 0x09, 0x85, 0x6e, 0x67, + 0x82, 0x69, 0x30, 0xec, 0x25, 0xa1, 0x08, 0xa8, 0x65, 0x2e, 0x42, 0xa3, 0x80, 0xcf, 0x25, 0xae, + 0xf3, 0x2f, 0x2c, 0xca, 0x77, 0xcb, 0x51, 0x47, 0x1f, 0x62, 0x84, 0xee, 0xfe, 0x08, 0xd4, 0x57, + 0x6f, 0xf9, 0xf4, 0x97, 0xb6, 0x65, 0x6c, 0xff, 0x75, 0x76, 0x69, 0x1b, 0xe7, 0x97, 0xb6, 0x71, + 0x7d, 0x69, 0x83, 0x57, 0x53, 0x1b, 0xbc, 0x9d, 0xda, 0xe0, 0x74, 0x6a, 0x83, 0xb3, 0xa9, 0x0d, + 0x3e, 0x4d, 0x6d, 0xf0, 0x79, 0x6a, 0x1b, 0xd7, 0x53, 0x1b, 0x9c, 0x5c, 0xd9, 0xc6, 0xd9, 0x95, + 0x6d, 0x9c, 0x5f, 0xd9, 0x46, 0xbf, 0xa4, 0x9a, 0xdb, 0xfa, 0x1a, 0x00, 0x00, 0xff, 0xff, 0xf0, + 0x4e, 0x48, 0x39, 0x44, 0x07, 0x00, 0x00, +} + +func (x RulesRequest_RuleType) String() string { + s, ok := RulesRequest_RuleType_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) } - func (this *RulesRequest) Equal(that interface{}) bool { if that == nil { return this == nil @@ -445,6 +517,33 @@ func (this *RulesRequest) Equal(that interface{}) bool { } else if this == nil { return false } + if this.Filter != that1.Filter { + return false + } + if len(this.RuleName) != len(that1.RuleName) { + return false + } + for i := range this.RuleName { + if this.RuleName[i] != that1.RuleName[i] { + return false + } + } + if len(this.RuleGroup) != len(that1.RuleGroup) { + return false + } + for i := range this.RuleGroup { + if this.RuleGroup[i] != that1.RuleGroup[i] { + return false + } + } + if len(this.File) != len(that1.File) { + return false + } + for i := range this.File { + if this.File[i] != that1.File[i] { + return false + } + } return true } func (this *RulesResponse) Equal(that interface{}) bool { @@ -623,8 +722,12 @@ func (this *RulesRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 4) + s := make([]string, 0, 8) s = append(s, "&base.RulesRequest{") + s = append(s, "Filter: "+fmt.Sprintf("%#v", this.Filter)+",\n") + s = append(s, "RuleName: "+fmt.Sprintf("%#v", this.RuleName)+",\n") + s = append(s, "RuleGroup: "+fmt.Sprintf("%#v", this.RuleGroup)+",\n") + s = append(s, "File: "+fmt.Sprintf("%#v", this.File)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -804,6 +907,38 @@ func (m *RulesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.File) > 0 { + for iNdEx := len(m.File) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.File[iNdEx]) + copy(dAtA[i:], m.File[iNdEx]) + i = encodeVarintRuler(dAtA, i, uint64(len(m.File[iNdEx]))) + i-- + dAtA[i] = 0x22 + } + } + if len(m.RuleGroup) > 0 { + for iNdEx := len(m.RuleGroup) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.RuleGroup[iNdEx]) + copy(dAtA[i:], m.RuleGroup[iNdEx]) + i = encodeVarintRuler(dAtA, i, uint64(len(m.RuleGroup[iNdEx]))) + i-- + dAtA[i] = 0x1a + } + } + if len(m.RuleName) > 0 { + for iNdEx := len(m.RuleName) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.RuleName[iNdEx]) + copy(dAtA[i:], m.RuleName[iNdEx]) + i = encodeVarintRuler(dAtA, i, uint64(len(m.RuleName[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if m.Filter != 0 { + i = encodeVarintRuler(dAtA, i, uint64(m.Filter)) + i-- + dAtA[i] = 0x8 + } return len(dAtA) - i, nil } @@ -1116,6 +1251,27 @@ func (m *RulesRequest) Size() (n int) { } var l int _ = l + if m.Filter != 0 { + n += 1 + sovRuler(uint64(m.Filter)) + } + if len(m.RuleName) > 0 { + for _, s := range m.RuleName { + l = len(s) + n += 1 + l + sovRuler(uint64(l)) + } + } + if len(m.RuleGroup) > 0 { + for _, s := range m.RuleGroup { + l = len(s) + n += 1 + l + sovRuler(uint64(l)) + } + } + if len(m.File) > 0 { + for _, s := range m.File { + l = len(s) + n += 1 + l + sovRuler(uint64(l)) + } + } return n } @@ -1241,6 +1397,10 @@ func (this *RulesRequest) String() string { return "nil" } s := strings.Join([]string{`&RulesRequest{`, + `Filter:` + fmt.Sprintf("%v", this.Filter) + `,`, + `RuleName:` + fmt.Sprintf("%v", this.RuleName) + `,`, + `RuleGroup:` + fmt.Sprintf("%v", this.RuleGroup) + `,`, + `File:` + fmt.Sprintf("%v", this.File) + `,`, `}`, }, "") return s @@ -1354,6 +1514,121 @@ func (m *RulesRequest) Unmarshal(dAtA []byte) error { return fmt.Errorf("proto: RulesRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Filter", wireType) + } + m.Filter = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Filter |= RulesRequest_RuleType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RuleName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRuler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRuler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RuleName = append(m.RuleName, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field RuleGroup", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRuler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRuler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.RuleGroup = append(m.RuleGroup, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field File", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRuler + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRuler + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.File = append(m.File, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRuler(dAtA[iNdEx:]) diff --git a/pkg/ruler/base/ruler.proto b/pkg/ruler/base/ruler.proto index 02a3c072533fd..0b143728efe61 100644 --- a/pkg/ruler/base/ruler.proto +++ b/pkg/ruler/base/ruler.proto @@ -18,7 +18,17 @@ service Ruler { rpc Rules(RulesRequest) returns (RulesResponse) {} } -message RulesRequest {} +message RulesRequest { + enum RuleType { + AnyRule = 0; + AlertingRule = 1; + RecordingRule = 2; + } + RuleType filter = 1; + repeated string rule_name = 2; + repeated string rule_group = 3; + repeated string file = 4; +} message RulesResponse { repeated GroupStateDesc groups = 1; diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index 99839ed652536..d17691e1bb6a0 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -401,32 +401,80 @@ func TestGetRules(t *testing.T) { shuffleShardSize int } - expectedRules := expectedRulesMap{ + allRules := expectedRulesMap{ "ruler1": map[string]rulespb.RuleGroupList{ "user1": { - &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "first", Interval: 10 * time.Second, Limit: 10}, - &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "second", Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "third", Rules: []*rulespb.RuleDesc{createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) > 0`)}, Interval: 10 * time.Second, Limit: 10}, }, "user2": { - &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "third", Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, }, }, "ruler2": map[string]rulespb.RuleGroupList{ "user1": { - &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "third", Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "fourth", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, }, "user2": { - &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "first", Interval: 10 * time.Second, Limit: 10}, - &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "second", Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "third", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + "user3": { + &rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) > 0`)}, Interval: 10 * time.Second, Limit: 10}, }, }, "ruler3": map[string]rulespb.RuleGroupList{ "user3": { - &rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "third", Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, }, "user2": { - &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "forth", Interval: 10 * time.Second, Limit: 10}, - &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "fifty", Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "forth", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "fifth", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + }, + } + + expectedAlertRules := expectedRulesMap{ + "ruler1": map[string]rulespb.RuleGroupList{ + "user1": { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "third", Rules: []*rulespb.RuleDesc{createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) > 0`)}, Interval: 10 * time.Second, Limit: 10}, + }, + }, + "ruler2": map[string]rulespb.RuleGroupList{ + "user3": { + &rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{createAlertingRule("COUNT_ALERT", `count_over_time({foo="bar"}[5m]) > 0`)}, Interval: 10 * time.Second, Limit: 10}, + }, + }, + "ruler3": map[string]rulespb.RuleGroupList{}, + } + + expectedRecordingRules := expectedRulesMap{ + "ruler1": map[string]rulespb.RuleGroupList{ + "user1": { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + "user2": { + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "first", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + }, + "ruler2": map[string]rulespb.RuleGroupList{ + "user1": { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "fourth", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + "user2": { + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "third", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + }, + "ruler3": map[string]rulespb.RuleGroupList{ + "user3": { + &rulespb.RuleGroupDesc{User: "user3", Namespace: "namespace", Name: "second", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + }, + "user2": { + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "forth", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, + &rulespb.RuleGroupDesc{User: "user2", Namespace: "namespace", Name: "fifth", Rules: []*rulespb.RuleDesc{createRecordingRule("COUNT_RULE", `count_over_time({foo="bar"}[5m])`)}, Interval: 10 * time.Second, Limit: 10}, }, }, } @@ -446,115 +494,134 @@ func TestGetRules(t *testing.T) { } for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) - allRulesByUser := map[string]rulespb.RuleGroupList{} - allRulesByRuler := map[string]rulespb.RuleGroupList{} - allTokensByRuler := map[string][]uint32{} - rulerAddrMap := map[string]*Ruler{} - - createRuler := func(id string) *Ruler { - cfg := defaultRulerConfig(t, newMockRuleStore(allRulesByUser)) - - cfg.ShardingStrategy = tc.shardingStrategy - cfg.EnableSharding = tc.sharding - cfg.ShardingAlgo = tc.shardingAlgo - - cfg.Ring = RingConfig{ - InstanceID: id, - InstanceAddr: id, - KVStore: kv.Config{ - Mock: kvStore, - }, + for _, ruleType := range []RulesRequest_RuleType{AnyRule, AlertingRule, RecordingRule} { + t.Run(name+" "+ruleType.String(), func(t *testing.T) { + kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) + allRulesByUser := map[string]rulespb.RuleGroupList{} + filteredRulesByUser := map[string]rulespb.RuleGroupList{} + allRulesByRuler := map[string]rulespb.RuleGroupList{} + allTokensByRuler := map[string][]uint32{} + rulerAddrMap := map[string]*Ruler{} + + createRuler := func(id string) *Ruler { + cfg := defaultRulerConfig(t, newMockRuleStore(allRulesByUser)) + + cfg.ShardingStrategy = tc.shardingStrategy + cfg.EnableSharding = tc.sharding + cfg.ShardingAlgo = tc.shardingAlgo + + cfg.Ring = RingConfig{ + InstanceID: id, + InstanceAddr: id, + KVStore: kv.Config{ + Mock: kvStore, + }, + } + m := loki_storage.NewClientMetrics() + defer m.Unregister() + r := buildRuler(t, cfg, nil, m, rulerAddrMap) + r.limits = ruleLimits{tenantShard: tc.shuffleShardSize} + rulerAddrMap[id] = r + if r.ring != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring)) + t.Cleanup(r.ring.StopAsync) + } + return r } - m := loki_storage.NewClientMetrics() - defer m.Unregister() - r := buildRuler(t, cfg, nil, m, rulerAddrMap) - r.limits = ruleLimits{tenantShard: tc.shuffleShardSize} - rulerAddrMap[id] = r - if r.ring != nil { - require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring)) - t.Cleanup(r.ring.StopAsync) + + for rID, r := range allRules { + createRuler(rID) + for user, rules := range r { + allRulesByUser[user] = append(allRulesByUser[user], rules...) + allRulesByRuler[rID] = append(allRulesByRuler[rID], rules...) + allTokensByRuler[rID] = generateTokenForGroups(rules, 1) + } } - return r - } - for rID, r := range expectedRules { - createRuler(rID) - for user, rules := range r { - allRulesByUser[user] = append(allRulesByUser[user], rules...) - allRulesByRuler[rID] = append(allRulesByRuler[rID], rules...) - allTokensByRuler[rID] = generateTokenForGroups(rules, 1) + var filteredRules expectedRulesMap + switch ruleType { + case AlertingRule: + filteredRules = expectedAlertRules + case RecordingRule: + filteredRules = expectedRecordingRules + default: + filteredRules = allRules } - } - if tc.sharding { - err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { - d, _ := in.(*ring.Desc) - if d == nil { - d = ring.NewDesc() + for _, r := range filteredRules { + for user, rules := range r { + filteredRulesByUser[user] = append(filteredRulesByUser[user], rules...) } - for rID, tokens := range allTokensByRuler { - d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now()) + } + + if tc.sharding { + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), "", tokens, ring.ACTIVE, time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + } + + forEachRuler := func(f func(rID string, r *Ruler)) { + for rID, r := range rulerAddrMap { + f(rID, r) } - return d, true, nil + } + + // Sync Rules + forEachRuler(func(_ string, r *Ruler) { + r.syncRules(context.Background(), rulerSyncReasonInitial) }) - require.NoError(t, err) - // Wait a bit to make sure ruler's ring is updated. - time.Sleep(100 * time.Millisecond) - } - forEachRuler := func(f func(rID string, r *Ruler)) { - for rID, r := range rulerAddrMap { - f(rID, r) + for u := range filteredRulesByUser { + ctx := user.InjectOrgID(context.Background(), u) + forEachRuler(func(_ string, r *Ruler) { + rules, err := r.GetRules(ctx, &RulesRequest{Filter: ruleType}) + require.NoError(t, err) + require.Equal(t, len(filteredRulesByUser[u]), len(rules)) + if tc.sharding { + mockPoolClient := r.clientsPool.(*mockRulerClientsPool) + + if tc.shardingStrategy == util.ShardingStrategyShuffle { + require.Equal(t, int32(tc.shuffleShardSize), mockPoolClient.numberOfCalls.Load()) + } else { + require.Equal(t, int32(len(rulerAddrMap)), mockPoolClient.numberOfCalls.Load()) + } + mockPoolClient.numberOfCalls.Store(0) + } + }) } - } - // Sync Rules - forEachRuler(func(_ string, r *Ruler) { - r.syncRules(context.Background(), rulerSyncReasonInitial) - }) + totalLoadedRules := 0 + totalConfiguredRules := 0 - for u := range allRulesByUser { - ctx := user.InjectOrgID(context.Background(), u) - forEachRuler(func(_ string, r *Ruler) { - rules, err := r.GetRules(ctx) + forEachRuler(func(rID string, r *Ruler) { + localRules, err := r.listRules(context.Background()) require.NoError(t, err) - require.Equal(t, len(allRulesByUser[u]), len(rules)) - if tc.sharding { - mockPoolClient := r.clientsPool.(*mockRulerClientsPool) - - if tc.shardingStrategy == util.ShardingStrategyShuffle { - require.Equal(t, int32(tc.shuffleShardSize), mockPoolClient.numberOfCalls.Load()) - } else { - require.Equal(t, int32(len(rulerAddrMap)), mockPoolClient.numberOfCalls.Load()) - } - mockPoolClient.numberOfCalls.Store(0) + for _, rules := range localRules { + totalLoadedRules += len(rules) } + totalConfiguredRules += len(allRulesByRuler[rID]) }) - } - - totalLoadedRules := 0 - totalConfiguredRules := 0 - forEachRuler(func(rID string, r *Ruler) { - localRules, err := r.listRules(context.Background()) - require.NoError(t, err) - for _, rules := range localRules { - totalLoadedRules += len(rules) + if tc.sharding { + require.Equal(t, totalConfiguredRules, totalLoadedRules) + } else { + // Not sharding means that all rules will be loaded on all rulers + numberOfRulers := len(rulerAddrMap) + require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules) } - totalConfiguredRules += len(allRulesByRuler[rID]) }) - - if tc.sharding { - require.Equal(t, totalConfiguredRules, totalLoadedRules) - } else { - // Not sharding means that all rules will be loaded on all rulers - numberOfRulers := len(rulerAddrMap) - require.Equal(t, totalConfiguredRules*numberOfRulers, totalLoadedRules) - } - }) + } } } diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index b1b08be008405..c5b115df19e57 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -7,21 +7,23 @@ import ( "github.com/prometheus/common/model" ) +type BlockMetadata struct { + Options BlockOptions + Series SeriesHeader + Checksum uint32 +} + type Block struct { // covers series pages index BlockIndex // covers bloom pages blooms BloomBlock - // TODO(owen-d): implement - // synthetic header for the entire block - // built from all the pages in the index - header SeriesHeader + metadata BlockMetadata reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)? initialized bool - dataRange SeriesHeader } func NewBlock(reader BlockReader) *Block { @@ -38,30 +40,49 @@ func (b *Block) LoadHeaders() error { return errors.Wrap(err, "getting index reader") } - if err := b.index.DecodeHeaders(idx); err != nil { + indexChecksum, err := b.index.DecodeHeaders(idx) + if err != nil { return errors.Wrap(err, "decoding index") } + b.metadata.Options = b.index.opts + // TODO(owen-d): better pattern xs := make([]SeriesHeader, 0, len(b.index.pageHeaders)) for _, h := range b.index.pageHeaders { xs = append(xs, h.SeriesHeader) } - b.dataRange = aggregateHeaders(xs) + b.metadata.Series = aggregateHeaders(xs) blooms, err := b.reader.Blooms() if err != nil { return errors.Wrap(err, "getting blooms reader") } - if err := b.blooms.DecodeHeaders(blooms); err != nil { + bloomChecksum, err := b.blooms.DecodeHeaders(blooms) + if err != nil { return errors.Wrap(err, "decoding blooms") } b.initialized = true + + if !b.metadata.Options.Schema.Compatible(b.blooms.schema) { + return fmt.Errorf( + "schema mismatch: index (%v) vs blooms (%v)", + b.metadata.Options.Schema, b.blooms.schema, + ) + } + + b.metadata.Checksum = combineChecksums(indexChecksum, bloomChecksum) } return nil } +// XOR checksums as a simple checksum combiner with the benefit that +// each part can be recomputed by XORing the result against the other +func combineChecksums(index, blooms uint32) uint32 { + return index ^ blooms +} + // convenience method func (b *Block) Querier() *BlockQuerier { return NewBlockQuerier(b) @@ -75,11 +96,18 @@ func (b *Block) Blooms() *LazyBloomIter { return NewLazyBloomIter(b) } +func (b *Block) Metadata() (BlockMetadata, error) { + if err := b.LoadHeaders(); err != nil { + return BlockMetadata{}, err + } + return b.metadata, nil +} + func (b *Block) Schema() (Schema, error) { if err := b.LoadHeaders(); err != nil { return Schema{}, err } - return b.index.schema, nil + return b.metadata.Options.Schema, nil } type BlockQuerier struct { @@ -146,6 +174,11 @@ func (bq *BlockQuerier) Err() error { // passed as the `chks` argument. Chunks will be removed from the result set if they are indexed in the bloom // and fail to pass all the searches. func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRefs, searches [][]byte) (ChunkRefs, error) { + schema, err := bq.Schema() + if err != nil { + return chks, fmt.Errorf("getting schema: %w", err) + } + if err := bq.Seek(fp); err != nil { return chks, errors.Wrapf(err, "seeking to series for fp: %v", fp) } @@ -177,18 +210,22 @@ func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRef } } - // TODO(owen-d): pool, memoize chunk search prefix creation + // TODO(salvacorts): pool tokenBuf + var tokenBuf []byte + var prefixLen int // Check chunks individually now mustCheck, inBlooms := chks.Compare(series.Chunks, true) outer: for _, chk := range inBlooms { + // Get buf to concatenate the chunk and search token + tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf) for _, search := range searches { - // TODO(owen-d): meld chunk + search into a single byte slice from the block schema - var combined = search + tokenBuf = append(tokenBuf[:prefixLen], search...) - if !bloom.Test(combined) { + if !bloom.Test(tokenBuf) { + // chunk didn't pass the search, continue to the next chunk continue outer } } diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index be95f96862eac..20c310ef695c0 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -171,9 +171,9 @@ func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock { } } -func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { +func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { if err := b.schema.DecodeFrom(r); err != nil { - return errors.Wrap(err, "decoding schema") + return 0, errors.Wrap(err, "decoding schema") } var ( @@ -182,35 +182,36 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { ) // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) if _, err := r.Seek(-12, io.SeekEnd); err != nil { - return errors.Wrap(err, "seeking to bloom headers metadata") + return 0, errors.Wrap(err, "seeking to bloom headers metadata") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom headers metadata") + return 0, errors.Wrap(err, "reading bloom headers metadata") } headerOffset := dec.Be64() + checksum := dec.Be32() if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { - return errors.Wrap(err, "seeking to bloom headers") + return 0, errors.Wrap(err, "seeking to bloom headers") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom page headers") + return 0, errors.Wrap(err, "reading bloom page headers") } if err := dec.CheckCrc(castagnoliTable); err != nil { - return errors.Wrap(err, "checksumming page headers") + return 0, errors.Wrap(err, "checksumming page headers") } b.pageHeaders = make([]BloomPageHeader, dec.Uvarint()) for i := 0; i < len(b.pageHeaders); i++ { header := &b.pageHeaders[i] if err := header.Decode(&dec); err != nil { - return errors.Wrapf(err, "decoding %dth series header", i) + return 0, errors.Wrapf(err, "decoding %dth series header", i) } } - return nil + return checksum, nil } func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) { diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index c9ff6f23cc0f1..7dd0d8ae44974 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -61,14 +61,19 @@ func clearCache(cache map[string]interface{}) { // of specific ngram length, along with the length of the prefix. // It ensures enough capacity for the prefix and the token so additional tokens can be created // without allocations by appending them to the prefix length -func prefixedToken(ngram int, chk ChunkRef) ([]byte, int) { - var enc encoding.Encbuf +// If the buffer is nil or too small, a new one is created. The buffer is returned for reuse. +func prefixedToken(ngram int, chk ChunkRef, buf []byte) ([]byte, int) { + enc := encoding.EncWith(buf) + enc.Reset() enc.PutBE64(uint64(chk.Start)) enc.PutBE64(uint64(chk.End)) enc.PutBE32(chk.Checksum) prefixLn := enc.Len() // record the length of the prefix - enc.PutBytes(make([]byte, ngram*MaxRuneLen)) // ensure enough capacity for the ngram + // If the buffer is too small, ensure enough capacity for the ngram + if cap(enc.Get()) < prefixLn+ngram*MaxRuneLen { + enc.PutBytes(make([]byte, ngram*MaxRuneLen)) + } // return the underlying byte slice and the length of the prefix return enc.Get(), prefixLn @@ -86,10 +91,13 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW clearCache(bt.cache) + var tokenBuf []byte + var prefixLn int + for chks.Err() == nil && chks.Next() { chk := chks.At() itr := chk.Itr - tokenBuf, prefixLn := prefixedToken(bt.lineTokenizer.N, chk.Ref) + tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N, chk.Ref, tokenBuf) defer itr.Close() diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index c2b0481e2db58..4ba4ec3eb4f1e 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -56,7 +56,7 @@ func TestPrefixedKeyCreation(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - token, prefixLn := prefixedToken(tc.ngram, ref) + token, prefixLn := prefixedToken(tc.ngram, ref, nil) require.Equal(t, 20, prefixLn) require.Equal(t, tc.expLen, len(token)) // first 8 bytes should be zeros from `from` diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 26b9a39cfd7bf..ac7a83baad374 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -5,7 +5,6 @@ import ( "fmt" "hash" "io" - "sort" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -21,15 +20,46 @@ var ( type BlockOptions struct { // Schema determines the Schema of the block and cannot be changed + // without recreating the block from underlying data Schema Schema // The following options can be changed on the fly. // For instance, adding another page to a block with - // a different target page size is supported. + // a different target page size is supported, although + // the block will store the original sizes it was created with // target size in bytes (decompressed) // of each page type - SeriesPageSize, BloomPageSize, BlockSize int + SeriesPageSize, BloomPageSize, BlockSize uint64 +} + +func (b BlockOptions) Len() int { + return 3*8 + b.Schema.Len() +} + +func (b *BlockOptions) DecodeFrom(r io.ReadSeeker) error { + buf := make([]byte, b.Len()) + _, err := io.ReadFull(r, buf) + if err != nil { + return errors.Wrap(err, "reading block options") + } + + dec := encoding.DecWith(buf) + + if err := b.Schema.Decode(&dec); err != nil { + return errors.Wrap(err, "decoding schema") + } + b.SeriesPageSize = dec.Be64() + b.BloomPageSize = dec.Be64() + b.BlockSize = dec.Be64() + return nil +} + +func (b BlockOptions) Encode(enc *encoding.Encbuf) { + b.Schema.Encode(enc) + enc.PutBE64(b.SeriesPageSize) + enc.PutBE64(b.BloomPageSize) + enc.PutBE64(b.BlockSize) } type BlockBuilder struct { @@ -90,14 +120,19 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) return 0, errors.Wrap(err, "iterating series with blooms") } - checksum, err := b.blooms.Close() + return b.Close() +} + +func (b *BlockBuilder) Close() (uint32, error) { + bloomChecksum, err := b.blooms.Close() if err != nil { return 0, errors.Wrap(err, "closing bloom file") } - if err := b.index.Close(); err != nil { + indexCheckSum, err := b.index.Close() + if err != nil { return 0, errors.Wrap(err, "closing series file") } - return checksum, nil + return combineChecksums(indexCheckSum, bloomChecksum), nil } func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { @@ -131,7 +166,7 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB return &BloomBlockBuilder{ opts: opts, writer: writer, - page: NewPageWriter(opts.BloomPageSize), + page: NewPageWriter(int(opts.BloomPageSize)), scratch: &encoding.Encbuf{}, } } @@ -307,16 +342,16 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder { return &IndexBuilder{ opts: opts, writer: writer, - page: NewPageWriter(opts.SeriesPageSize), + page: NewPageWriter(int(opts.SeriesPageSize)), scratch: &encoding.Encbuf{}, } } -func (b *IndexBuilder) WriteSchema() error { +func (b *IndexBuilder) WriteOpts() error { b.scratch.Reset() - b.opts.Schema.Encode(b.scratch) + b.opts.Encode(b.scratch) if _, err := b.writer.Write(b.scratch.Get()); err != nil { - return errors.Wrap(err, "writing schema") + return errors.Wrap(err, "writing opts+schema") } b.writtenSchema = true b.offset += b.scratch.Len() @@ -325,8 +360,8 @@ func (b *IndexBuilder) WriteSchema() error { func (b *IndexBuilder) Append(series SeriesWithOffset) error { if !b.writtenSchema { - if err := b.WriteSchema(); err != nil { - return errors.Wrap(err, "writing schema") + if err := b.WriteOpts(); err != nil { + return errors.Wrap(err, "appending series") } } @@ -408,8 +443,7 @@ func (b *IndexBuilder) flushPage() error { DecompressedLen: decompressedLen, SeriesHeader: SeriesHeader{ NumSeries: b.page.Count(), - FromFp: b.fromFp, - ThroughFp: b.previousFp, + Bounds: NewBounds(b.fromFp, b.previousFp), FromTs: b.fromTs, ThroughTs: b.throughTs, }, @@ -428,10 +462,10 @@ func (b *IndexBuilder) flushPage() error { return nil } -func (b *IndexBuilder) Close() error { +func (b *IndexBuilder) Close() (uint32, error) { if b.page.Count() > 0 { if err := b.flushPage(); err != nil { - return errors.Wrap(err, "flushing final series page") + return 0, errors.Wrap(err, "flushing final series page") } } @@ -451,39 +485,9 @@ func (b *IndexBuilder) Close() error { b.scratch.PutHash(crc32Hash) _, err := b.writer.Write(b.scratch.Get()) if err != nil { - return errors.Wrap(err, "writing series page headers") + return 0, errors.Wrap(err, "writing series page headers") } - return errors.Wrap(b.writer.Close(), "closing series writer") -} - -// SortBlocksIntoOverlappingGroups sorts a list of blocks into a sorted list of lists, -// where each list contains blocks that overlap with each other. -// TODO(owen-d): implement as an iterator so we don't have to load all blocks at once -// NB: unused now, but likely useful when we want to optimize compaction. I wrote this expecting to need it now -// but it feels unsavory to remove it -func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) { - sort.Slice(xs, func(i, j int) bool { - a, b := xs[i].index, xs[j].index - return a.pageHeaders[0].FromFp <= b.pageHeaders[0].FromFp - }) - - var curGroup []*Block - for _, x := range xs { - switch { - case len(curGroup) == 0: - curGroup = append(curGroup, x) - case curGroup[len(curGroup)-1].dataRange.OverlapFingerprintRange(x.dataRange): - curGroup = append(curGroup, x) - default: - groups = append(groups, curGroup) - curGroup = []*Block{x} - } - } - - if len(curGroup) > 0 { - groups = append(groups, curGroup) - } - return groups + return crc32Hash.Sum32(), errors.Wrap(b.writer.Close(), "closing series writer") } // Simplistic implementation of a merge builder that builds a single block @@ -586,12 +590,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } - checksum, err := builder.blooms.Close() + checksum, err := builder.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") - } - if err := builder.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, errors.Wrap(err, "closing block") } return checksum, nil } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index cb28f0cb53357..6bf2c26e7b585 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -9,8 +9,32 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/util/encoding" ) +func TestBlockOptionsRoundTrip(t *testing.T) { + opts := BlockOptions{ + Schema: Schema{ + version: V1, + encoding: chunkenc.EncSnappy, + nGramLength: 10, + nGramSkip: 2, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + BlockSize: 10 << 20, + } + + var enc encoding.Encbuf + opts.Encode(&enc) + + var got BlockOptions + err := got.DecodeFrom(bytes.NewReader(enc.Get())) + require.Nil(t, err) + + require.Equal(t, opts, got) +} + func TestBlockBuilderRoundTrip(t *testing.T) { numSeries := 100 numKeysPerSeries := 10000 @@ -334,7 +358,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { checksum, err := mb.Build(builder) require.Nil(t, err) - require.Equal(t, uint32(0x2ec4fd6a), checksum) + require.Equal(t, uint32(0xe306ec6e), checksum) // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index c397a7a55fd57..77937ecfc08e3 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -56,6 +56,11 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request]) *Fused } func (fq *FusedQuerier) Run() error { + schema, err := fq.bq.Schema() + if err != nil { + return errors.Wrap(err, "getting schema") + } + for fq.inputs.Next() { // find all queries for the next relevant fingerprint nextBatch := fq.inputs.At() @@ -119,13 +124,18 @@ func (fq *FusedQuerier) Run() error { // TODO(owen-d): pool var removals ChunkRefs + // TODO(salvacorts): pool tokenBuf + var tokenBuf []byte + var prefixLen int + chunkLoop: for _, chk := range inBlooms { + // Get buf to concatenate the chunk and search token + tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf) for _, search := range input.Searches { - // TODO(owen-d): meld chunk + search into a single byte slice from the block schema - var combined = search + tokenBuf = append(tokenBuf[:prefixLen], search...) - if !bloom.ScalableBloomFilter.Test(combined) { + if !bloom.Test(tokenBuf) { removals = append(removals, chk) continue chunkLoop } diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 1b51320e1566b..4102ea5de8da9 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -20,7 +20,7 @@ func TestFusedQuerier(t *testing.T) { reader := NewByteReader(indexBuf, bloomsBuf) numSeries := 100 numKeysPerSeries := 10000 - data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, keys := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) builder, err := NewBlockBuilder( BlockOptions{ @@ -53,6 +53,7 @@ func TestFusedQuerier(t *testing.T) { Fp: data[idx].Series.Fingerprint, Chks: data[idx].Series.Chunks, Response: ch, + Searches: keys[idx], }) } inputs = append(inputs, reqs) diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index 10c1e41fd1139..e3a14dc5453ea 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -2,6 +2,7 @@ package v1 import ( "bytes" + "fmt" "io" "github.com/pkg/errors" @@ -17,6 +18,10 @@ type Schema struct { nGramLength, nGramSkip uint64 } +func (s Schema) String() string { + return fmt.Sprintf("v%d,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip) +} + func (s Schema) Compatible(other Schema) bool { return s == other } @@ -89,19 +94,14 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { // Block index is a set of series pages along with // the headers for each page type BlockIndex struct { - schema Schema - pageHeaders []SeriesPageHeaderWithOffset // headers for each series page -} + opts BlockOptions -func NewBlockIndex(encoding chunkenc.Encoding) BlockIndex { - return BlockIndex{ - schema: Schema{version: DefaultSchemaVersion, encoding: encoding}, - } + pageHeaders []SeriesPageHeaderWithOffset // headers for each series page } -func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { - if err := b.schema.DecodeFrom(r); err != nil { - return errors.Wrap(err, "decoding schema") +func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) (uint32, error) { + if err := b.opts.DecodeFrom(r); err != nil { + return 0, errors.Wrap(err, "decoding block options") } var ( @@ -111,24 +111,25 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) if _, err := r.Seek(-12, io.SeekEnd); err != nil { - return errors.Wrap(err, "seeking to bloom headers metadata") + return 0, errors.Wrap(err, "seeking to bloom headers metadata") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom headers metadata") + return 0, errors.Wrap(err, "reading bloom headers metadata") } headerOffset := dec.Be64() + checksum := dec.Be32() if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { - return errors.Wrap(err, "seeking to index headers") + return 0, errors.Wrap(err, "seeking to index headers") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading index page headers") + return 0, errors.Wrap(err, "reading index page headers") } if err := dec.CheckCrc(castagnoliTable); err != nil { - return errors.Wrap(err, "checksumming page headers") + return 0, errors.Wrap(err, "checksumming page headers") } b.pageHeaders = make( @@ -139,12 +140,12 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { for i := 0; i < len(b.pageHeaders); i++ { var s SeriesPageHeaderWithOffset if err := s.Decode(&dec); err != nil { - return errors.Wrapf(err, "decoding %dth series header", i) + return 0, errors.Wrapf(err, "decoding %dth series header", i) } b.pageHeaders[i] = s } - return nil + return checksum, nil } // decompress page and return an iterator over the bytes @@ -167,7 +168,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "checksumming series page") } - decompressor, err := b.schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get())) + decompressor, err := b.opts.Schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get())) if err != nil { return nil, errors.Wrap(err, "getting decompressor") } @@ -213,12 +214,12 @@ func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error { type SeriesHeader struct { NumSeries int - FromFp, ThroughFp model.Fingerprint + Bounds FingerprintBounds FromTs, ThroughTs model.Time } func (h SeriesHeader) OverlapFingerprintRange(other SeriesHeader) bool { - return h.ThroughFp >= other.FromFp && h.FromFp <= other.ThroughFp + return h.Bounds.Overlaps(other.Bounds) } // build one aggregated header for the entire block @@ -227,9 +228,10 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { return SeriesHeader{} } + fromFp, _ := xs[0].Bounds.GetFromThrough() + _, throughFP := xs[len(xs)-1].Bounds.GetFromThrough() res := SeriesHeader{ - FromFp: xs[0].FromFp, - ThroughFp: xs[len(xs)-1].ThroughFp, + Bounds: NewBounds(fromFp, throughFP), } for _, x := range xs { @@ -245,16 +247,16 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { func (h *SeriesHeader) Encode(enc *encoding.Encbuf) { enc.PutUvarint(h.NumSeries) - enc.PutUvarint64(uint64(h.FromFp)) - enc.PutUvarint64(uint64(h.ThroughFp)) + enc.PutUvarint64(uint64(h.Bounds.Min)) + enc.PutUvarint64(uint64(h.Bounds.Max)) enc.PutVarint64(int64(h.FromTs)) enc.PutVarint64(int64(h.ThroughTs)) } func (h *SeriesHeader) Decode(dec *encoding.Decbuf) error { h.NumSeries = dec.Uvarint() - h.FromFp = model.Fingerprint(dec.Uvarint64()) - h.ThroughFp = model.Fingerprint(dec.Uvarint64()) + h.Bounds.Min = model.Fingerprint(dec.Uvarint64()) + h.Bounds.Max = model.Fingerprint(dec.Uvarint64()) h.FromTs = model.Time(dec.Varint64()) h.ThroughTs = model.Time(dec.Varint64()) return dec.Err() @@ -305,7 +307,7 @@ func (d *SeriesPageDecoder) Next() bool { } func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) { - if fp > d.header.ThroughFp { + if fp > d.header.Bounds.Max { // shortcut: we know the fingerprint is too large so nothing in this page // will match the seek call, which returns the first found fingerprint >= fp. // so masquerade the index as if we've already iterated through diff --git a/pkg/storage/bloom/v1/index_querier.go b/pkg/storage/bloom/v1/index_querier.go index 005f480e68e9c..142b6423185b6 100644 --- a/pkg/storage/bloom/v1/index_querier.go +++ b/pkg/storage/bloom/v1/index_querier.go @@ -49,7 +49,7 @@ func (it *LazySeriesIter) Seek(fp model.Fingerprint) error { // first potentially relevant page desiredPage := sort.Search(len(it.b.index.pageHeaders), func(i int) bool { header := it.b.index.pageHeaders[i] - return header.ThroughFp >= fp + return header.Bounds.Max >= fp }) switch { diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index fb9c8a0e4a389..ada8cd7319e90 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -47,13 +47,14 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, } func MkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) { + const nGramLen = 4 seriesList = make([]SeriesWithBloom, 0, nSeries) keysList = make([][][]byte, 0, nSeries) step := (throughFp - fromFp) / model.Fingerprint(nSeries) timeDelta := time.Duration(throughTs.Sub(fromTs).Nanoseconds() / int64(nSeries)) - tokenizer := NewNGramTokenizer(4, 0) + tokenizer := NewNGramTokenizer(nGramLen, 0) for i := 0; i < nSeries; i++ { var series Series series.Fingerprint = fromFp + model.Fingerprint(i)*step @@ -74,8 +75,16 @@ func MkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model it := tokenizer.Tokens(fmt.Sprintf("series %d", i*keysPerSeries+j)) for it.Next() { key := it.At() + // series-level key bloom.Add(key) keys = append(keys, key) + + // chunk-level key + for _, chk := range series.Chunks { + tokenBuf, prefixLen := prefixedToken(nGramLen, chk, nil) + tokenBuf = append(tokenBuf[:prefixLen], key...) + bloom.Add(tokenBuf) + } } } diff --git a/pkg/storage/bloom/v1/tokenizer_test.go b/pkg/storage/bloom/v1/tokenizer_test.go index b70d9610fab47..eec9645fcd25e 100644 --- a/pkg/storage/bloom/v1/tokenizer_test.go +++ b/pkg/storage/bloom/v1/tokenizer_test.go @@ -171,7 +171,7 @@ func BenchmarkTokens(b *testing.B) { { desc: "v2", f: func() func() { - buf, prefixLn := prefixedToken(v2Three.N, ChunkRef{}) + buf, prefixLn := prefixedToken(v2Three.N, ChunkRef{}, nil) return func() { itr := NewPrefixedTokenIter(buf, prefixLn, v2Three.Tokens(lorem)) for itr.Next() { @@ -188,7 +188,7 @@ func BenchmarkTokens(b *testing.B) { { desc: "v2", f: func() func() { - buf, prefixLn := prefixedToken(v2Three.N, ChunkRef{}) + buf, prefixLn := prefixedToken(v2Three.N, ChunkRef{}, nil) return func() { itr := NewPrefixedTokenIter(buf, prefixLn, v2ThreeSkip1.Tokens(lorem)) for itr.Next() { diff --git a/tools/doc-generator/parse/parser.go b/tools/doc-generator/parse/parser.go index 1003c1e08753d..d5896a0666bf5 100644 --- a/tools/doc-generator/parse/parser.go +++ b/tools/doc-generator/parse/parser.go @@ -462,6 +462,8 @@ func getCustomFieldType(t reflect.Type) (string, bool) { return "remote_write_config...", true case reflect.TypeOf(validation.OverwriteMarshalingStringMap{}).String(): return "headers", true + case reflect.TypeOf(relabel.Regexp{}).String(): + return fieldString, true default: return "", false } diff --git a/tools/doc-generator/parse/root_blocks.go b/tools/doc-generator/parse/root_blocks.go index 66a7a72321bb3..25a4e785ded63 100644 --- a/tools/doc-generator/parse/root_blocks.go +++ b/tools/doc-generator/parse/root_blocks.go @@ -20,6 +20,7 @@ import ( "github.com/grafana/loki/pkg/distributor" "github.com/grafana/loki/pkg/ingester" ingester_client "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/loki/common" frontend "github.com/grafana/loki/pkg/lokifrontend" "github.com/grafana/loki/pkg/querier" @@ -272,6 +273,12 @@ storage_config: store-1: endpoint: s3://foo-bucket region: us-west1 -Named store from this example can be used by setting object_store to store-1 in period_config.`}, +Named store from this example can be used by setting object_store to store-1 in period_config.`, + }, + { + Name: "attributes_config", + StructType: []reflect.Type{reflect.TypeOf(push.AttributesConfig{})}, + Desc: "Define actions for matching OpenTelemetry (OTEL) attributes.", + }, } )