Skip to content

Commit ef9ca2b

Browse files
authored
Revert "Fix v8.5.0 migration painless script" (#1878)
* Revert "Fix v8.5.0 migration painless script (#1839)" This reverts commit de5d74b. * Revert "Allow multiple ES outputs as long as they are the same ES (#1684)" This reverts commit 63fdcbf.
1 parent 2558289 commit ef9ca2b

31 files changed

+457
-1445
lines changed

cmd/fleet/main.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -821,21 +821,17 @@ func (f *FleetServer) runSubsystems(ctx context.Context, cfg *config.Config, g *
821821
remoteVersion, err := ver.CheckCompatibility(ctx, esCli, f.bi.Version)
822822
if err != nil {
823823
if len(remoteVersion) != 0 {
824-
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w",
825-
f.bi.Version, remoteVersion, err)
824+
return fmt.Errorf("failed version compatibility check with elasticsearch (Agent: %s, Elasticsearch: %s): %w", f.bi.Version, remoteVersion, err)
826825
}
827826
return fmt.Errorf("failed version compatibility check with elasticsearch: %w", err)
828827
}
829828

830-
// Run migrations
831-
loggedMigration := loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
829+
// Run migrations; current safe to do in background. That may change in the future.
830+
g.Go(loggedRunFunc(ctx, "Migrations", func(ctx context.Context) error {
832831
return dl.Migrate(ctx, bulker)
833-
})
834-
if err = loggedMigration(); err != nil {
835-
return fmt.Errorf("failed to run subsystems: %w", err)
836-
}
832+
}))
837833

838-
// Run scheduler for periodic GC/cleanup
834+
// Run schduler for periodic GC/cleanup
839835
gcCfg := cfg.Inputs[0].Server.GC
840836
sched, err := scheduler.New(gc.Schedules(bulker, gcCfg.ScheduleInterval, gcCfg.CleanupAfterExpiredInterval))
841837
if err != nil {

internal/pkg/api/handleAck.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"strings"
1616
"time"
1717

18-
"github.com/pkg/errors"
19-
2018
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
2119
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
2220
"github.com/elastic/fleet-server/v7/internal/pkg/config"
@@ -26,6 +24,7 @@ import (
2624
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
2725
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2826
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
27+
"github.com/pkg/errors"
2928

3029
"github.com/julienschmidt/httprouter"
3130
"github.com/rs/zerolog"
@@ -338,9 +337,8 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
338337
Int64("rev.coordinatorIdx", rev.CoordinatorIdx).
339338
Msg("ack policy revision")
340339

341-
if ok && rev.PolicyID == agent.PolicyID &&
342-
(rev.RevisionIdx > currRev ||
343-
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
340+
if ok && rev.PolicyID == agent.PolicyID && (rev.RevisionIdx > currRev ||
341+
(rev.RevisionIdx == currRev && rev.CoordinatorIdx > currCoord)) {
344342
found = true
345343
currRev = rev.RevisionIdx
346344
currCoord = rev.CoordinatorIdx
@@ -351,7 +349,17 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
351349
return nil
352350
}
353351

354-
ack.invalidateAPIKeys(ctx, agent)
352+
sz := len(agent.DefaultAPIKeyHistory)
353+
if sz > 0 {
354+
ids := make([]string, sz)
355+
for i := 0; i < sz; i++ {
356+
ids[i] = agent.DefaultAPIKeyHistory[i].ID
357+
}
358+
log.Info().Strs("ids", ids).Msg("Invalidate old API keys")
359+
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
360+
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
361+
}
362+
}
355363

356364
body := makeUpdatePolicyBody(
357365
agent.PolicyID,
@@ -377,24 +385,8 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
377385
return errors.Wrap(err, "handlePolicyChange update")
378386
}
379387

380-
func (ack *AckT) invalidateAPIKeys(ctx context.Context, agent *model.Agent) {
381-
var ids []string
382-
for _, out := range agent.Outputs {
383-
for _, k := range out.ToRetireAPIKeyIds {
384-
ids = append(ids, k.ID)
385-
}
386-
}
387-
388-
if len(ids) > 0 {
389-
log.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
390-
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
391-
log.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
392-
}
393-
}
394-
}
395-
396388
func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent *model.Agent) error {
397-
apiKeys := agent.APIKeyIDs()
389+
apiKeys := _getAPIKeyIDs(agent)
398390
if len(apiKeys) > 0 {
399391
zlog = zlog.With().Strs(LogAPIKeyID, apiKeys).Logger()
400392

@@ -449,6 +441,17 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
449441
return nil
450442
}
451443

444+
func _getAPIKeyIDs(agent *model.Agent) []string {
445+
keys := make([]string, 0, 1)
446+
if agent.AccessAPIKeyID != "" {
447+
keys = append(keys, agent.AccessAPIKeyID)
448+
}
449+
if agent.DefaultAPIKeyID != "" {
450+
keys = append(keys, agent.DefaultAPIKeyID)
451+
}
452+
return keys
453+
}
454+
452455
// Generate an update script that validates that the policy_id
453456
// has not changed underneath us by an upstream process (Kibana or otherwise).
454457
// We have a race condition where a user could have assigned a new policy to

internal/pkg/api/handleAck_test.go

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@ import (
1515
"net/http"
1616
"testing"
1717

18-
"github.com/google/go-cmp/cmp"
19-
2018
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
2119
"github.com/elastic/fleet-server/v7/internal/pkg/config"
2220
"github.com/elastic/fleet-server/v7/internal/pkg/es"
2321
"github.com/elastic/fleet-server/v7/internal/pkg/model"
2422
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
2523
testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
24+
"github.com/google/go-cmp/cmp"
2625

2726
"github.com/stretchr/testify/assert"
2827
"github.com/stretchr/testify/mock"
@@ -440,39 +439,3 @@ func TestHandleAckEvents(t *testing.T) {
440439
})
441440
}
442441
}
443-
444-
func TestInvalidateAPIKeys(t *testing.T) {
445-
toRetire1 := []model.ToRetireAPIKeyIdsItems{{
446-
ID: "toRetire1",
447-
}}
448-
toRetire2 := []model.ToRetireAPIKeyIdsItems{{
449-
ID: "toRetire2_0",
450-
}, {
451-
ID: "toRetire2_1",
452-
}}
453-
var toRetire3 []model.ToRetireAPIKeyIdsItems
454-
455-
want := []string{"toRetire1", "toRetire2_0", "toRetire2_1"}
456-
457-
agent := model.Agent{
458-
Outputs: map[string]*model.PolicyOutput{
459-
"1": {ToRetireAPIKeyIds: toRetire1},
460-
"2": {ToRetireAPIKeyIds: toRetire2},
461-
"3": {ToRetireAPIKeyIds: toRetire3},
462-
},
463-
}
464-
465-
bulker := ftesting.NewMockBulk()
466-
bulker.On("APIKeyInvalidate",
467-
context.Background(), mock.MatchedBy(func(ids []string) bool {
468-
// if A contains B and B contains A => A = B
469-
return assert.Subset(t, ids, want) &&
470-
assert.Subset(t, want, ids)
471-
})).
472-
Return(nil)
473-
474-
ack := &AckT{bulk: bulker}
475-
ack.invalidateAPIKeys(context.Background(), &agent)
476-
477-
bulker.AssertExpectations(t)
478-
}

internal/pkg/api/handleCheckin.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"compress/gzip"
1111
"context"
1212
"encoding/json"
13-
"fmt"
1413
"math/rand"
1514
"net/http"
1615
"reflect"
@@ -61,6 +60,7 @@ func (rt Router) handleCheckin(w http.ResponseWriter, r *http.Request, ps httpro
6160
Logger()
6261

6362
err := rt.ct.handleCheckin(&zlog, w, r, id)
63+
6464
if err != nil {
6565
cntCheckin.IncError(err)
6666
resp := NewHTTPErrResp(err)
@@ -430,13 +430,13 @@ func convertActions(agentID string, actions []model.Action) ([]ActionResp, strin
430430
//
431431
func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agentID string, pp *policy.ParsedPolicy) (*ActionResp, error) {
432432
zlog = zlog.With().
433-
Str("fleet.ctx", "processPolicy").
434-
Int64("fleet.policyRevision", pp.Policy.RevisionIdx).
435-
Int64("fleet.policyCoordinator", pp.Policy.CoordinatorIdx).
433+
Str("ctx", "processPolicy").
434+
Int64("policyRevision", pp.Policy.RevisionIdx).
435+
Int64("policyCoordinator", pp.Policy.CoordinatorIdx).
436436
Str(LogPolicyID, pp.Policy.PolicyID).
437437
Logger()
438438

439-
// Repull and decode the agent object. Do not trust the cache.
439+
// Repull and decode the agent object. Do not trust the cache.
440440
agent, err := dl.FindAgent(ctx, bulker, dl.QueryAgentByID, dl.FieldID, agentID)
441441
if err != nil {
442442
zlog.Error().Err(err).Msg("fail find agent record")
@@ -446,6 +446,7 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
446446
// Parse the outputs maps in order to prepare the outputs
447447
const outputsProperty = "outputs"
448448
outputs, err := smap.Parse(pp.Fields[outputsProperty])
449+
449450
if err != nil {
450451
return nil, err
451452
}
@@ -457,9 +458,9 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
457458
// Iterate through the policy outputs and prepare them
458459
for _, policyOutput := range pp.Outputs {
459460
err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs)
461+
460462
if err != nil {
461-
return nil, fmt.Errorf("failed to prepare output %q: %w",
462-
policyOutput.Name, err)
463+
return nil, err
463464
}
464465
}
465466

internal/pkg/api/handleEnroll.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type EnrollerT struct {
5353
}
5454

5555
func NewEnrollerT(verCon version.Constraints, cfg *config.Server, bulker bulk.Bulk, c cache.Cache) (*EnrollerT, error) {
56+
5657
log.Info().
5758
Interface("limits", cfg.Limits.EnrollLimit).
5859
Msg("Setting config enroll_limit")
@@ -186,13 +187,7 @@ func (et *EnrollerT) processRequest(rb *rollback.Rollback, zlog zerolog.Logger,
186187
return et._enroll(r.Context(), rb, zlog, req, erec.PolicyID, ver)
187188
}
188189

189-
func (et *EnrollerT) _enroll(
190-
ctx context.Context,
191-
rb *rollback.Rollback,
192-
zlog zerolog.Logger,
193-
req *EnrollRequest,
194-
policyID,
195-
ver string) (*EnrollResponse, error) {
190+
func (et *EnrollerT) _enroll(ctx context.Context, rb *rollback.Rollback, zlog zerolog.Logger, req *EnrollRequest, policyID, ver string) (*EnrollResponse, error) {
196191

197192
if req.SharedID != "" {
198193
// TODO: Support pre-existing install
@@ -432,7 +427,7 @@ func generateAccessAPIKey(ctx context.Context, bulk bulk.Bulk, agentID string) (
432427
agentID,
433428
"",
434429
[]byte(kFleetAccessRolesJSON),
435-
apikey.NewMetadata(agentID, "", apikey.TypeAccess),
430+
apikey.NewMetadata(agentID, apikey.TypeAccess),
436431
)
437432
}
438433

internal/pkg/apikey/apikey.go

Lines changed: 0 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,12 @@
66
package apikey
77

88
import (
9-
"bytes"
10-
"context"
119
"encoding/base64"
12-
"encoding/json"
1310
"errors"
1411
"fmt"
1512
"net/http"
1613
"strings"
1714
"unicode/utf8"
18-
19-
"github.com/elastic/go-elasticsearch/v7"
20-
"github.com/elastic/go-elasticsearch/v7/esapi"
2115
)
2216

2317
const (
@@ -34,61 +28,6 @@ var (
3428

3529
var AuthKey = http.CanonicalHeaderKey("Authorization")
3630

37-
// APIKeyMetadata tracks Metadata associated with an APIKey.
38-
type APIKeyMetadata struct {
39-
ID string
40-
Metadata Metadata
41-
}
42-
43-
// Read gathers APIKeyMetadata from Elasticsearch using the given client.
44-
func Read(ctx context.Context, client *elasticsearch.Client, id string) (*APIKeyMetadata, error) {
45-
opts := []func(*esapi.SecurityGetAPIKeyRequest){
46-
client.Security.GetAPIKey.WithContext(ctx),
47-
client.Security.GetAPIKey.WithID(id),
48-
}
49-
50-
res, err := client.Security.GetAPIKey(
51-
opts...,
52-
)
53-
if err != nil {
54-
return nil, fmt.Errorf("request to elasticsearch failed: %w", err)
55-
}
56-
defer res.Body.Close()
57-
58-
if res.IsError() {
59-
return nil, fmt.Errorf("%s: %w", res.String(), ErrAPIKeyNotFound)
60-
}
61-
62-
type APIKeyResponse struct {
63-
ID string `json:"id"`
64-
Metadata Metadata `json:"metadata"`
65-
}
66-
type GetAPIKeyResponse struct {
67-
APIKeys []APIKeyResponse `json:"api_keys"`
68-
}
69-
70-
var buff bytes.Buffer
71-
if _, err := buff.ReadFrom(res.Body); err != nil {
72-
return nil, fmt.Errorf("could not read from response body: %w", err)
73-
}
74-
75-
var resp GetAPIKeyResponse
76-
if err = json.Unmarshal(buff.Bytes(), &resp); err != nil {
77-
return nil, fmt.Errorf(
78-
"could not Unmarshal elasticsearch GetAPIKeyResponse: %w", err)
79-
}
80-
81-
if len(resp.APIKeys) == 0 {
82-
return nil, ErrAPIKeyNotFound
83-
}
84-
85-
first := resp.APIKeys[0]
86-
return &APIKeyMetadata{
87-
ID: first.ID,
88-
Metadata: first.Metadata,
89-
}, nil
90-
}
91-
9231
// APIKey is used to represent an Elasticsearch API Key.
9332
type APIKey struct {
9433
ID string

0 commit comments

Comments
 (0)