Skip to content

Commit 0e72c87

Browse files
[indexer] Allow manually managed elastic index (#1602)
Summary: Opensearch doesn't use the same index lifecycle management as elasticsearch. This means the current indexer will fail to create a managed index on clusters using opensearch. This PR allows users to bypass our index management logic, and manually deploy a managed index to their elastic cluster. This should allow people using opensearch to still deploy Pixie cloud albeit with more burden on the user. Type of change: /kind cleanup Test Plan: Tested by skaffolding public cloud to my cluster with `PL_MD_MANUAL_INDEX_MANAGEMENT: true`, saw that the indexer reused the index already created and didn't attempt to create a new index lifecycle policy. Signed-off-by: James Bartlett <jamesbartlett@pixielabs.ai>
1 parent 3b2afb4 commit 0e72c87

File tree

6 files changed

+44
-13
lines changed

6 files changed

+44
-13
lines changed

k8s/cloud/base/indexer_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ data:
88
PL_MD_INDEX_REPLICAS: "4"
99
PL_MD_INDEX_MAX_AGE: "3d"
1010
PL_MD_INDEX_DELETE_AFTER: "3d"
11+
PL_MD_MANUAL_INDEX_MANAGEMENT: "false"

src/cloud/autocomplete/suggester_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func TestMain(m *testing.M) {
166166
elasticClient = es
167167

168168
// Set up elastic indexes.
169-
err = md.InitializeMapping(es, indexName, 1, "30d", "30d")
169+
err = md.InitializeMapping(es, indexName, 1, "30d", "30d", false)
170170
if err != nil {
171171
cleanup()
172172
log.Fatal(err)

src/cloud/indexer/indexer_server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func init() {
5353
pflag.String("md_index_max_age", "", "The amount of time before rolling over the elastic index as a string, eg '30d'")
5454
pflag.String("md_index_delete_after", "", "The amount of time after rollover to delete old elastic indices, as a string, eg '30d'")
5555
pflag.Int("md_index_replicas", 4, "The number of replicas to setup for the metadata index.")
56+
pflag.Bool("md_manual_index_management", false, "Skip creation of managed elastic indices. Requires manually deploying an elastic index with md_index_name")
5657
}
5758

5859
func newVZMgrClient() (vzmgrpb.VZMgrServiceClient, error) {
@@ -132,7 +133,7 @@ func main() {
132133
log.Fatal("Must specify a delete after time for the rolled over elastic indices.")
133134
}
134135

135-
err = md.InitializeMapping(es, indexName, replicas, maxAge, deleteAfter)
136+
err = md.InitializeMapping(es, indexName, replicas, maxAge, deleteAfter, viper.GetBool("md_manual_index_management"))
136137
if err != nil {
137138
log.WithError(err).Fatal("Could not initialize elastic mapping")
138139
}

src/cloud/indexer/md/mapping.o.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -207,17 +207,33 @@ const IndexMapping = `
207207
`
208208

209209
// InitializeMapping creates the index in elastic.
210-
func InitializeMapping(es *elastic.Client, indexName string, replicas int, maxAge string, deleteAfter string) error {
211-
err := esutils.NewManagedIndex(es, indexName).
212-
IndexFromJSONString(IndexMapping).
213-
MaxIndexAge(maxAge).
214-
TimeBeforeDelete(deleteAfter).
215-
Migrate(context.Background())
216-
if err != nil {
217-
return err
210+
func InitializeMapping(es *elastic.Client, indexName string, replicas int, maxAge string, deleteAfter string, manualIndex bool) error {
211+
ctx := context.Background()
212+
if manualIndex {
213+
exists, err := es.IndexExists(indexName).Do(ctx)
214+
if err != nil {
215+
return err
216+
}
217+
if !exists {
218+
return fmt.Errorf("elastic index %s does not exist, but manual index management specified", indexName)
219+
}
220+
// Update the index mappings if necessary.
221+
err = esutils.NewIndex(es).Name(indexName).FromJSONString(IndexMapping).Migrate(ctx)
222+
if err != nil {
223+
return err
224+
}
225+
} else {
226+
err := esutils.NewManagedIndex(es, indexName).
227+
IndexFromJSONString(IndexMapping).
228+
MaxIndexAge(maxAge).
229+
TimeBeforeDelete(deleteAfter).
230+
Migrate(ctx)
231+
if err != nil {
232+
return err
233+
}
218234
}
219235

220236
replicaSetting := fmt.Sprintf("{\"index\": {\"number_of_replicas\": %d}}", replicas)
221-
_, err = es.IndexPutSettings(indexName).BodyString(replicaSetting).Do(context.Background())
237+
_, err := es.IndexPutSettings(indexName).BodyString(replicaSetting).Do(context.Background())
222238
return err
223239
}

src/cloud/indexer/md/md_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestMain(m *testing.M) {
5252
vzID = uuid.Must(uuid.NewV4())
5353
orgID = uuid.Must(uuid.NewV4())
5454

55-
err = md.InitializeMapping(es, indexName, 1, "30d", "30d")
55+
err = md.InitializeMapping(es, indexName, 1, "30d", "30d", false)
5656
if err != nil {
5757
cleanup()
5858
log.WithError(err).Fatal("Could not initialize indexes in elastic")

src/cloud/shared/esutils/index.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ package esutils
2121
import (
2222
"context"
2323
"encoding/json"
24+
"errors"
2425
"fmt"
2526
"reflect"
27+
"strings"
2628

2729
"github.com/olivere/elastic/v7"
2830
log "github.com/sirupsen/logrus"
@@ -246,7 +248,18 @@ func (i *Index) updateSettings(ctx context.Context) (bool, error) {
246248
WithField("cause", err.(*elastic.Error).Details.CausedBy).Error("failed to get index settings")
247249
return false, err
248250
}
249-
currentSettings := settingsResp[i.indexName].Settings
251+
var indexResp *elastic.IndicesGetSettingsResponse
252+
for indexName, resp := range settingsResp {
253+
// If `i.indexName` is an alias, then the response can be the full index name instead of the alias name.
254+
if strings.HasPrefix(indexName, i.indexName) {
255+
indexResp = resp
256+
break
257+
}
258+
}
259+
if indexResp == nil {
260+
return false, errors.New("could not get index settings")
261+
}
262+
currentSettings := indexResp.Settings
250263
diff := updates(i.index.Settings, currentSettings)
251264
if diff == nil {
252265
return false, nil

0 commit comments

Comments
 (0)