Skip to content

feat(datafile-cache): Add support for datafile cache service. #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/optimizely/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func loadConfig(v *viper.Viper) *config.AgentConfig {
conf.Client.UserProfileService = userProfileService
}

// Check if JSON string was set using OPTIMIZELY_CLIENT_DATAFILECACHESERVICE environment variable
if datafileCacheService := v.GetStringMap("client.datafilecacheservice"); datafileCacheService != nil {
conf.Client.DatafileCacheService = datafileCacheService
}

return conf
}

Expand Down
71 changes: 45 additions & 26 deletions cmd/optimizely/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,44 @@ func assertServer(t *testing.T, actual config.ServerConfig, assertPlugins bool)
}
}

func assertClient(t *testing.T, actual config.ClientConfig, assertUserProfileService bool) {
func assertClient(t *testing.T, actual config.ClientConfig) {
assert.Equal(t, 10*time.Second, actual.PollingInterval)
assert.Equal(t, 1, actual.BatchSize)
assert.Equal(t, 10, actual.QueueSize)
assert.Equal(t, 1*time.Minute, actual.FlushInterval)
assert.Equal(t, "https://localhost/v1/%s.json", actual.DatafileURLTemplate)
assert.Equal(t, "https://logx.localhost.com/v1", actual.EventURL)
assert.Equal(t, "custom-regex", actual.SdkKeyRegex)
if assertUserProfileService {
assert.Equal(t, "in-memory", actual.UserProfileService["default"])
userProfileServices := map[string]interface{}{
"in-memory": map[string]interface{}{
// Viper.set is case in-sensitive
"storagestrategy": "fifo",
},
"redis": map[string]interface{}{
"host": "localhost:6379",
"password": "",
},
"rest": map[string]interface{}{
"host": "http://localhost",
"lookuppath": "/ups/lookup",
"savepath": "/ups/save",
"headers": map[string]interface{}{"content-type": "application/json"},
},
"custom": map[string]interface{}{
"path": "http://test2.com",
},
}
assert.Equal(t, userProfileServices, actual.UserProfileService["services"])
assert.Equal(t, "in-memory", actual.UserProfileService["default"])
userProfileServices := map[string]interface{}{
"in-memory": map[string]interface{}{
// Viper.set is case in-sensitive
"storagestrategy": "fifo",
},
"redis": map[string]interface{}{
"host": "localhost:6379",
"password": "",
},
"rest": map[string]interface{}{
"host": "http://localhost",
"lookuppath": "/ups/lookup",
"savepath": "/ups/save",
"headers": map[string]interface{}{"content-type": "application/json"},
},
"custom": map[string]interface{}{
"path": "http://test2.com",
},
}
assert.Equal(t, userProfileServices, actual.UserProfileService["services"])

datafileCacheService := map[string]interface{}{
"redis": map[string]interface{}{
"host": "localhost:6379",
"password": "123",
},
}
assert.Equal(t, datafileCacheService, actual.DatafileCacheService["services"])
assert.Equal(t, true, actual.DatafileCacheService["enabled"])
}

func assertLog(t *testing.T, actual config.LogConfig) {
Expand Down Expand Up @@ -164,7 +171,7 @@ func TestViperYaml(t *testing.T) {

assertRoot(t, actual)
assertServer(t, actual.Server, true)
assertClient(t, actual.Client, true)
assertClient(t, actual.Client)
assertLog(t, actual.Log)
assertAdmin(t, actual.Admin)
assertAdminAuth(t, actual.Admin.Auth)
Expand Down Expand Up @@ -225,6 +232,17 @@ func TestViperProps(t *testing.T) {
}
v.Set("client.userProfileService", userProfileServices)

datafileCacheService := map[string]interface{}{
"enabled": true,
"services": map[string]interface{}{
"redis": map[string]interface{}{
"host": "localhost:6379",
"password": "123",
},
},
}
v.Set("client.datafileCacheService", datafileCacheService)

v.Set("log.pretty", true)
v.Set("log.includeSdkKey", false)
v.Set("log.level", "debug")
Expand Down Expand Up @@ -277,7 +295,7 @@ func TestViperProps(t *testing.T) {

assertRoot(t, actual)
assertServer(t, actual.Server, true)
assertClient(t, actual.Client, true)
assertClient(t, actual.Client)
assertLog(t, actual.Log)
assertAdmin(t, actual.Admin)
assertAdminAuth(t, actual.Admin.Auth)
Expand Down Expand Up @@ -311,6 +329,7 @@ func TestViperEnv(t *testing.T) {
_ = os.Setenv("OPTIMIZELY_CLIENT_EVENTURL", "https://logx.localhost.com/v1")
_ = os.Setenv("OPTIMIZELY_CLIENT_SDKKEYREGEX", "custom-regex")
_ = os.Setenv("OPTIMIZELY_CLIENT_USERPROFILESERVICE", `{"default":"in-memory","services":{"in-memory":{"storagestrategy":"fifo"},"redis":{"host":"localhost:6379","password":""},"rest":{"host":"http://localhost","lookuppath":"/ups/lookup","savepath":"/ups/save","headers":{"content-type":"application/json"}},"custom":{"path":"http://test2.com"}}}`)
_ = os.Setenv("OPTIMIZELY_CLIENT_DATAFILECACHESERVICE", `{"enabled":true,"services":{"redis":{"host":"localhost:6379","password":"123"}}}`)

_ = os.Setenv("OPTIMIZELY_LOG_PRETTY", "true")
_ = os.Setenv("OPTIMIZELY_LOG_INCLUDESDKKEY", "false")
Expand Down Expand Up @@ -340,7 +359,7 @@ func TestViperEnv(t *testing.T) {

assertRoot(t, actual)
assertServer(t, actual.Server, false)
assertClient(t, actual.Client, true)
assertClient(t, actual.Client)
assertLog(t, actual.Log)
assertAdmin(t, actual.Admin)
assertAPI(t, actual.API)
Expand Down
6 changes: 6 additions & 0 deletions cmd/optimizely/testdata/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ client:
datafileURLTemplate: "https://localhost/v1/%s.json"
eventURL: "https://logx.localhost.com/v1"
sdkKeyRegex: "custom-regex"
datafileCacheService:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafileCacheServices will be more relevant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to keep it similar to the naming convention we are following for userProfileService.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add one more property enableDatafileCacheService and based on that, execute the logic.

enabled: true
services:
redis:
host: "localhost:6379"
password: "123"
userProfileService:
default: "in-memory"
services:
Expand Down
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ client:
flushInterval: 30s
## Template URL for SDK datafile location. The template should specify a "%s" token for SDK key substitution.
datafileURLTemplate: "https://cdn.optimizely.com/datafiles/%s.json"
## Datafile cache to save initial datafile fetch call in a multi-node environment.
datafileCacheService:
enabled: false
services:
# redis:
# host: "localhost:6379"
# password: ""
# database: 0
## URL for dispatching events.
eventURL: "https://logx.optimizely.com/v1/events"
## Validation Regex on the request SDK Key
Expand Down
24 changes: 16 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func NewDefaultConfig() *AgentConfig {
EventURL: "https://logx.optimizely.com/v1/events",
// https://github.com/google/re2/wiki/Syntax
SdkKeyRegex: "^\\w+(:\\w+)?$",
DatafileCacheService: DatafileCache{
"enabled": false,
"services": map[string]interface{}{},
},
UserProfileService: UserProfileServiceConfigs{
"default": "",
"services": map[string]interface{}{},
Expand Down Expand Up @@ -153,16 +157,20 @@ func (ac *AgentConfig) LogConfigWarnings() {
// UserProfileServiceConfigs defines the generic mapping of userprofileservice plugins
type UserProfileServiceConfigs map[string]interface{}

// DatafileCache holds the configuration options for the Datafile Cache.
type DatafileCache map[string]interface{}

// ClientConfig holds the configuration options for the Optimizely Client.
type ClientConfig struct {
PollingInterval time.Duration `json:"pollingInterval"`
BatchSize int `json:"batchSize" default:"10"`
QueueSize int `json:"queueSize" default:"1000"`
FlushInterval time.Duration `json:"flushInterval" default:"30s"`
DatafileURLTemplate string `json:"datafileURLTemplate"`
EventURL string `json:"eventURL"`
SdkKeyRegex string `json:"sdkKeyRegex"`
UserProfileService UserProfileServiceConfigs `json:"userProfileService"`
PollingInterval time.Duration `json:"pollingInterval"`
BatchSize int `json:"batchSize" default:"10"`
QueueSize int `json:"queueSize" default:"1000"`
FlushInterval time.Duration `json:"flushInterval" default:"30s"`
DatafileURLTemplate string `json:"datafileURLTemplate"`
DatafileCacheService DatafileCache `json:"datafileCacheService"`
EventURL string `json:"eventURL"`
SdkKeyRegex string `json:"sdkKeyRegex"`
UserProfileService UserProfileServiceConfigs `json:"userProfileService"`
}

// LogConfig holds the log configuration
Expand Down
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func TestDefaultConfig(t *testing.T) {
assert.Equal(t, "^\\w+(:\\w+)?$", conf.Client.SdkKeyRegex)
assert.Equal(t, "", conf.Client.UserProfileService["default"])
assert.Equal(t, map[string]interface{}{}, conf.Client.UserProfileService["services"])
assert.Equal(t, DatafileCache{
"enabled": false,
"services": map[string]interface{}{},
}, conf.Client.DatafileCacheService)

assert.Equal(t, 0, conf.Runtime.BlockProfileRate)
assert.Equal(t, 0, conf.Runtime.MutexProfileFraction)
Expand Down
80 changes: 67 additions & 13 deletions pkg/optimizely/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"

"github.com/optimizely/agent/config"
"github.com/optimizely/agent/pkg/optimizely/datafilecacheservice"
"github.com/optimizely/agent/plugins/userprofileservice"
"github.com/optimizely/go-sdk/pkg/client"
sdkconfig "github.com/optimizely/go-sdk/pkg/config"
Expand All @@ -46,6 +47,14 @@ type OptlyCache struct {
wg sync.WaitGroup
}

type datafileCacheServiceKey = string

// Represents keys in datafile cache services
const (
enabled datafileCacheServiceKey = "enabled"
redis datafileCacheServiceKey = "redis"
)

// NewCache returns a new implementation of OptlyCache interface backed by a concurrent map.
func NewCache(ctx context.Context, conf config.ClientConfig, metricsRegistry *MetricsRegistry) *OptlyCache {

Expand All @@ -58,7 +67,7 @@ func NewCache(ctx context.Context, conf config.ClientConfig, metricsRegistry *Me
cache := &OptlyCache{
ctx: ctx,
wg: sync.WaitGroup{},
loader: defaultLoader(conf, metricsRegistry, userProfileServiceMap, cmLoader, event.NewBatchEventProcessor),
loader: defaultLoader(ctx, conf, metricsRegistry, userProfileServiceMap, cmLoader, event.NewBatchEventProcessor),
optlyMap: cmap.New(),
userProfileServiceMap: userProfileServiceMap,
}
Expand Down Expand Up @@ -147,6 +156,7 @@ func regexValidator(sdkKeyRegex string) func(string) bool {
}

func defaultLoader(
ctx context.Context,
conf config.ClientConfig,
metricsRegistry *MetricsRegistry,
userProfileServiceMap cmap.ConcurrentMap,
Expand Down Expand Up @@ -186,25 +196,39 @@ func defaultLoader(
log.Info().Msg(message)
}

// Options for PollingProjectConfigManager
options := []sdkconfig.OptionFunc{}

// Check if datafile is already present in cache
cachedDatafile, cacheService := getDatafileFromCacheService(ctx, sdkKey, conf)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a condition of cache is enabled then go into this method.

if cachedDatafile != "" {
// Set datafile in config manager so it uses the cached datafile for initialization
options = append(options, sdkconfig.WithInitialDatafile([]byte(cachedDatafile)))
}

options = append(options,
sdkconfig.WithPollingInterval(conf.PollingInterval),
sdkconfig.WithDatafileURLTemplate(conf.DatafileURLTemplate),
)

if datafileAccessToken != "" {
configManager = pcFactory(
sdkKey,
sdkconfig.WithPollingInterval(conf.PollingInterval),
sdkconfig.WithDatafileURLTemplate(conf.DatafileURLTemplate),
options = append(options,
sdkconfig.WithDatafileAccessToken(datafileAccessToken),
)
} else {
configManager = pcFactory(
sdkKey,
sdkconfig.WithPollingInterval(conf.PollingInterval),
sdkconfig.WithDatafileURLTemplate(conf.DatafileURLTemplate),
)
}

configManager = pcFactory(sdkKey, options...)

if _, err := configManager.GetConfig(); err != nil {
return &OptlyClient{}, err
}

// Set datafile in datafileCacheService if not present
if cachedDatafile == "" && cacheService != nil {
datafile := configManager.GetOptimizelyConfig().GetDatafile()
cacheService.SetDatafileInCacheService(ctx, sdkKey, datafile)
}

q := event.NewInMemoryQueue(conf.QueueSize)
ep := bpFactory(
event.WithSDKKey(sdkKey),
Expand All @@ -226,7 +250,7 @@ func defaultLoader(
}

var clientUserProfileService decision.UserProfileService
if clientUserProfileService = getUserProfileService(sdkKey, userProfileServiceMap, conf); clientUserProfileService != nil {
if clientUserProfileService = getUserProfileService(ctx, sdkKey, userProfileServiceMap, conf); clientUserProfileService != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, We were giving UPS a different context altogether which might've resulted in UPS requests still running for a little while after agent had shut down. This fix is to pass the original agent context to UPS so that once agents shuts down, UPS terminates all its requests as well.

clientOptions = append(clientOptions, client.WithUserProfileService(clientUserProfileService))
}

Expand All @@ -237,8 +261,34 @@ func defaultLoader(
}
}

func getDatafileFromCacheService(ctx context.Context, sdkKey string, conf config.ClientConfig) (datafile string, cacheService datafilecacheservice.DatafileCacheService) {
// Check whether datafileCacheService should be enabled
if shouldEnable, ok := conf.DatafileCacheService[enabled].(bool); ok && shouldEnable {
if services, ok := conf.DatafileCacheService["services"].(map[string]interface{}); ok {
// In case of multiple cache services provided, use the first valid service
for k, v := range services {
bytes, err := json.Marshal(v)
if err != nil {
continue
}
switch k {
case redis:
var redisDatafileCache datafilecacheservice.RedisCacheService
if err = json.Unmarshal(bytes, &redisDatafileCache); err != nil || redisDatafileCache.Address == "" {
continue
}
return redisDatafileCache.GetDatafileFromCacheService(ctx, sdkKey), &redisDatafileCache
default:
// do nothing
}
}
}
}
return "", nil
}

// Returns the registered userProfileService against the sdkKey
func getUserProfileService(sdkKey string, userProfileServiceMap cmap.ConcurrentMap, conf config.ClientConfig) decision.UserProfileService {
func getUserProfileService(ctx context.Context, sdkKey string, userProfileServiceMap cmap.ConcurrentMap, conf config.ClientConfig) decision.UserProfileService {

intializeUPSWithName := func(upsName string) decision.UserProfileService {
if clientConfigUPSMap, ok := conf.UserProfileService["services"].(map[string]interface{}); ok {
Expand All @@ -256,6 +306,10 @@ func getUserProfileService(sdkKey string, userProfileServiceMap cmap.ConcurrentM
success = false
}
if success {
// Pass context to ups if required, necessary for redis
if ctxUps, ok := upsInstance.(userprofileservice.ContextUserProfileService); ok {
ctxUps.AddContext(ctx)
}
log.Info().Msgf(`UserProfileService of type: "%s" created for sdkKey: "%s"`, upsName, sdkKey)
return upsInstance
}
Expand Down
Loading