Skip to content

Commit

Permalink
close RC CDN db resource
Browse files Browse the repository at this point in the history
  • Loading branch information
timothyalexandersoftware committed Sep 11, 2024
1 parent 41131b7 commit 286e008
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 47 deletions.
64 changes: 56 additions & 8 deletions cmd/installer/subcommands/installer/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ func installCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
i.span.SetTag("params.url", args[0])
return i.Install(i.ctx, args[0], installArgs)
},
Expand All @@ -248,7 +254,13 @@ func removeCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
i.span.SetTag("params.package", args[0])
return i.Remove(i.ctx, args[0])
},
Expand All @@ -267,7 +279,13 @@ func purgeCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
i.Purge(i.ctx)
return nil
},
Expand All @@ -286,7 +304,13 @@ func installExperimentCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
i.span.SetTag("params.url", args[0])
return i.InstallExperiment(i.ctx, args[0])
},
Expand All @@ -305,7 +329,13 @@ func removeExperimentCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
i.span.SetTag("params.package", args[0])
return i.RemoveExperiment(i.ctx, args[0])
},
Expand All @@ -324,7 +354,13 @@ func promoteExperimentCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
i.span.SetTag("params.package", args[0])
return i.PromoteExperiment(i.ctx, args[0])
},
Expand All @@ -343,7 +379,13 @@ func garbageCollectCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
return i.GarbageCollect(i.ctx)
},
}
Expand All @@ -366,7 +408,13 @@ func isInstalledCommand() *cobra.Command {
if err != nil {
return err
}
defer func() { i.Stop(err) }()
defer func() {
i.Stop(err)
err = i.Installer.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to close Installer: %v\n", err)
}
}()
installed, err := i.IsInstalled(i.ctx, args[0])
if err != nil {
return err
Expand Down
88 changes: 49 additions & 39 deletions pkg/config/remote/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ type Service struct {
rcType string

uptane uptaneClient
db *bbolt.DB
}

// CoreAgentService fetches Remote Configurations from the RC backend
type CoreAgentService struct {
Service
firstUpdate bool
Expand All @@ -113,7 +115,6 @@ type CoreAgentService struct {
hostname string
tagsGetter func() []string
traceAgentEnv string
db *bbolt.DB
coreAgentUptane coreAgentUptaneClient
api api.API

Expand Down Expand Up @@ -374,6 +375,7 @@ func NewService(cfg model.Reader, rcType, baseRawURL, hostname string, tagsGette
Service: Service{
rcType: rcType,
uptane: uptaneClient,
db: db,
},
firstUpdate: true,
defaultRefreshInterval: options.refresh,
Expand All @@ -386,7 +388,6 @@ func NewService(cfg model.Reader, rcType, baseRawURL, hostname string, tagsGette
tagsGetter: tagsGetter,
clock: clock,
traceAgentEnv: options.traceAgentEnv,
db: db,
api: http,
coreAgentUptane: uptaneClient,
clients: newClients(clock, options.clientTTL),
Expand Down Expand Up @@ -950,7 +951,7 @@ func enforceCanonicalJSON(raw []byte) ([]byte, error) {
return canonical, nil
}

// HTTPClient defines a client that can be used to fetch Remote Configurations from an HTTP(s)-based backend
// HTTPClient fetches Remote Configurations from an HTTP(s)-based backend
type HTTPClient struct {
Service
lastUpdate time.Time
Expand All @@ -959,13 +960,15 @@ type HTTPClient struct {
}

// NewHTTPClient creates a new HTTPClient that can be used to fetch Remote Configurations from an HTTP(s)-based backend
// It uses a local db to cache the fetched configurations. Only one HTTPClient should be created per agent.
// An HTTPClient must be closed via HTTPClient.Close() before creating a new one.
func NewHTTPClient(cfg model.Reader, baseRawURL, host, site, apiKey, rcKey, agentVersion string) (*HTTPClient, error) {

dbPath := path.Join(cfg.GetString("run_path"), "remote-config-cdn.db")
db, err := openCacheDB(dbPath, agentVersion, apiKey)
if err != nil {
return nil, err
}

uptaneClientOptions := []uptane.ClientOption{
uptane.WithConfigRootOverride(site, ""),
uptane.WithDirectorRootOverride(site, ""),
Expand Down Expand Up @@ -998,86 +1001,93 @@ func NewHTTPClient(cfg model.Reader, baseRawURL, host, site, apiKey, rcKey, agen
Service: Service{
rcType: "CDN",
uptane: uptaneHTTPClient,
db: db,
},
api: http,
cdnUptane: uptaneHTTPClient,
}, nil
}

func (s *HTTPClient) update() error {
s.Lock()
defer s.Unlock()

err := s.cdnUptane.Update()
if err != nil {
return err
}

return nil
}

func (s *HTTPClient) shouldUpdate() bool {
s.Lock()
defer s.Unlock()
if time.Since(s.lastUpdate) > maxCDNUpdateFrequency {
s.lastUpdate = time.Now()
return true
}
return false
// Close closes the HTTPClient and cleans up any resources. Close must be called
// before any other HTTPClients are instantiated via NewHTTPClient
func (c *HTTPClient) Close() error {
return c.db.Close()
}

// GetCDNConfigUpdate returns any updated configs. If multiple requests have been made
// in a short amount of time, a cached response is returned. If RC has been disabled,
// an error is returned. If there is no update (the targets version is up-to-date) nil
// is returned for both the update and error.
func (s *HTTPClient) GetCDNConfigUpdate(
func (c *HTTPClient) GetCDNConfigUpdate(
products []string,
currentTargetsVersion, currentRootVersion uint64,
cachedTargetFiles []*pbgo.TargetFileMeta,
) (*state.Update, error) {

if !s.shouldUpdate() {
return s.getUpdate(products, currentTargetsVersion, currentRootVersion, cachedTargetFiles)
if !c.shouldUpdate() {
return c.getUpdate(products, currentTargetsVersion, currentRootVersion, cachedTargetFiles)
}

// check org status in the backend. If RC is disabled, return current state.
response, err := s.api.FetchOrgStatus(context.Background())
response, err := c.api.FetchOrgStatus(context.Background())
if err != nil || !response.Enabled || !response.Authorized {
return s.getUpdate(products, currentTargetsVersion, currentRootVersion, cachedTargetFiles)
return c.getUpdate(products, currentTargetsVersion, currentRootVersion, cachedTargetFiles)
}

err = s.update()
err = c.update()
if err != nil {
_ = log.Warn(fmt.Sprintf("Error updating CDN config repo: %v", err))
}

return s.getUpdate(products, currentTargetsVersion, currentRootVersion, cachedTargetFiles)
return c.getUpdate(products, currentTargetsVersion, currentRootVersion, cachedTargetFiles)
}

func (s *HTTPClient) getUpdate(
func (c *HTTPClient) update() error {
c.Lock()
defer c.Unlock()

err := c.cdnUptane.Update()
if err != nil {
return err
}

return nil
}

func (c *HTTPClient) shouldUpdate() bool {
c.Lock()
defer c.Unlock()
if time.Since(c.lastUpdate) > maxCDNUpdateFrequency {
c.lastUpdate = time.Now()
return true
}
return false
}

func (c *HTTPClient) getUpdate(
products []string,
currentTargetsVersion, currentRootVersion uint64,
cachedTargetFiles []*pbgo.TargetFileMeta,
) (*state.Update, error) {
s.Lock()
defer s.Unlock()
c.Lock()
defer c.Unlock()

tufVersions, err := s.uptane.TUFVersionState()
tufVersions, err := c.uptane.TUFVersionState()
if err != nil {
return nil, err
}
if tufVersions.DirectorTargets == currentTargetsVersion {
return nil, nil
}
roots, err := s.getNewDirectorRoots(currentRootVersion, tufVersions.DirectorRoot)
roots, err := c.getNewDirectorRoots(currentRootVersion, tufVersions.DirectorRoot)
if err != nil {
return nil, err
}
targetsRaw, err := s.uptane.TargetsMeta()
targetsRaw, err := c.uptane.TargetsMeta()
if err != nil {
return nil, err
}
targetFiles, err := s.getTargetFiles(rdata.StringListToProduct(products), cachedTargetFiles)
targetFiles, err := c.getTargetFiles(rdata.StringListToProduct(products), cachedTargetFiles)
if err != nil {
return nil, err
}
Expand All @@ -1087,7 +1097,7 @@ func (s *HTTPClient) getUpdate(
return nil, err
}

directorTargets, err := s.uptane.Targets()
directorTargets, err := c.uptane.Targets()
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/remote/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ func TestHTTPClientRecentUpdate(t *testing.T) {
uptaneClient.On("TargetFile", "datadog/2/TESTING1/id/1").Return([]byte(`testing_1`), nil)

client := setupCDNClient(t, uptaneClient, api)
defer client.Close()
client.lastUpdate = time.Now()

u, err := client.GetCDNConfigUpdate([]string{"TESTING1"}, 0, 0, []*pbgo.TargetFileMeta{})
Expand Down Expand Up @@ -1275,6 +1276,7 @@ func TestHTTPClientNegativeOrgStatus(t *testing.T) {
uptaneClient.On("TargetFile", "datadog/2/TESTING1/id/1").Return([]byte(`testing_1`), nil)

client := setupCDNClient(t, uptaneClient, api)
defer client.Close()
client.lastUpdate = time.Now().Add(time.Second * -60)

u, err := client.GetCDNConfigUpdate([]string{"TESTING1"}, 0, 0, []*pbgo.TargetFileMeta{})
Expand Down Expand Up @@ -1334,6 +1336,7 @@ func TestHTTPClientUpdateSuccess(t *testing.T) {
uptaneClient.On("Update").Return(updateErr)

client := setupCDNClient(t, uptaneClient, api)
defer client.Close()
client.lastUpdate = time.Now().Add(time.Second * -60)

u, err := client.GetCDNConfigUpdate([]string{"TESTING1"}, 0, 0, []*pbgo.TargetFileMeta{})
Expand Down
1 change: 1 addition & 0 deletions pkg/fleet/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (d *daemonImpl) Stop(_ context.Context) error {
defer d.m.Unlock()
d.rc.Close()
close(d.stopChan)
d.cdn.Close()
d.requestsWG.Wait()
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/fleet/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

configmock "github.com/DataDog/datadog-agent/pkg/config/mock"
"github.com/DataDog/datadog-agent/pkg/fleet/env"
"github.com/DataDog/datadog-agent/pkg/fleet/installer/repository"
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
Expand Down Expand Up @@ -100,6 +101,11 @@ func (m *testPackageManager) UninstrumentAPMInjector(ctx context.Context, method
return args.Error(0)
}

func (m *testPackageManager) Close() error {
args := m.Called()
return args.Error(0)
}

type testRemoteConfigClient struct {
sync.Mutex
t *testing.T
Expand Down Expand Up @@ -181,6 +187,9 @@ type testInstaller struct {
}

func newTestInstaller(t *testing.T) *testInstaller {
mockConfig := configmock.New(t)
mockConfig.SetWithoutSource("run_path", t.TempDir())

pm := &testPackageManager{}
pm.On("States").Return(map[string]repository.State{}, nil)
pm.On("ConfigStates").Return(map[string]repository.State{}, nil)
Expand Down
Loading

0 comments on commit 286e008

Please sign in to comment.