Skip to content

Commit

Permalink
Add Account cache (#1519)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurb9 authored Oct 14, 2020
1 parent 779bb5f commit c7ddfa6
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 111 deletions.
78 changes: 58 additions & 20 deletions config/stored_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,31 @@ const (
)

// Section returns the config section this type is defined in
func (sr *StoredRequests) Section() string {
func (dataType DataType) Section() string {
return map[DataType]string{
RequestDataType: "stored_requests",
CategoryDataType: "categories",
VideoDataType: "stored_video_req",
AMPRequestDataType: "stored_amp_req",
AccountDataType: "accounts",
}[sr.dataType]
}[dataType]
}

// Section returns the config section
func (sr *StoredRequests) Section() string {
return sr.dataType.Section()
}

// DataType returns the DataType associated with this config
func (sr *StoredRequests) DataType() DataType {
return sr.dataType
}

// SetDataType sets the DataType on this config. Needed for tests.
func (sr *StoredRequests) SetDataType(dataType DataType) {
sr.dataType = dataType
}

// StoredRequests struct defines options for stored requests for each data type
// including some amp stored_requests options
type StoredRequests struct {
Expand Down Expand Up @@ -132,7 +143,7 @@ func (cfg *StoredRequests) validate(errs configErrors) configErrors {
if cfg.DataType() == AccountDataType && cfg.Postgres.ConnectionInfo.Database != "" {
errs = append(errs, fmt.Errorf("%s.postgres: retrieving accounts via postgres not available, use accounts.files", cfg.Section()))
} else {
errs = cfg.Postgres.validate(cfg.Section(), errs)
errs = cfg.Postgres.validate(cfg.DataType(), errs)
}

// Categories do not use cache so none of the following checks apply
Expand All @@ -156,7 +167,7 @@ func (cfg *StoredRequests) validate(errs configErrors) configErrors {
errs = append(errs, fmt.Errorf("%s: postgres.initialize_caches.query must be empty if in_memory_cache=none", cfg.Section()))
}
}
errs = cfg.InMemoryCache.validate(cfg.Section(), errs)
errs = cfg.InMemoryCache.validate(cfg.DataType(), errs)
return errs
}

Expand All @@ -169,12 +180,12 @@ type PostgresConfig struct {
PollUpdates PostgresUpdatePolling `mapstructure:"poll_for_updates"`
}

func (cfg *PostgresConfig) validate(section string, errs configErrors) configErrors {
func (cfg *PostgresConfig) validate(dataType DataType, errs configErrors) configErrors {
if cfg.ConnectionInfo.Database == "" {
return errs
}

return cfg.PollUpdates.validate(section, errs)
return cfg.PollUpdates.validate(dataType, errs)
}

// PostgresConnection has options which put types to the Postgres Connection string. See:
Expand Down Expand Up @@ -269,7 +280,8 @@ type PostgresCacheInitializer struct {
AmpQuery string `mapstructure:"amp_query"`
}

func (cfg *PostgresCacheInitializer) validate(section string, errs configErrors) configErrors {
func (cfg *PostgresCacheInitializer) validate(dataType DataType, errs configErrors) configErrors {
section := dataType.Section()
if cfg.Query == "" {
return errs
}
Expand Down Expand Up @@ -305,7 +317,8 @@ type PostgresUpdatePolling struct {
AmpQuery string `mapstructure:"amp_query"`
}

func (cfg *PostgresUpdatePolling) validate(section string, errs configErrors) configErrors {
func (cfg *PostgresUpdatePolling) validate(dataType DataType, errs configErrors) configErrors {
section := dataType.Section()
if cfg.Query == "" {
return errs
}
Expand Down Expand Up @@ -384,32 +397,57 @@ type InMemoryCache struct {
// TTL is the maximum number of seconds that an unused value will stay in the cache.
// TTL <= 0 can be used for "no ttl". Elements will still be evicted based on the Size.
TTL int `mapstructure:"ttl_seconds"`
// Size is the max total cache size allowed for single caches
Size int `mapstructure:"size_bytes"`
// RequestCacheSize is the max number of bytes allowed in the cache for Stored Requests. Values <= 0 will have no limit
RequestCacheSize int `mapstructure:"request_cache_size_bytes"`
// ImpCacheSize is the max number of bytes allowed in the cache for Stored Imps. Values <= 0 will have no limit
ImpCacheSize int `mapstructure:"imp_cache_size_bytes"`
}

func (cfg *InMemoryCache) validate(section string, errs configErrors) configErrors {
func (cfg *InMemoryCache) validate(dataType DataType, errs configErrors) configErrors {
section := dataType.Section()
switch cfg.Type {
case "none":
// No errors for no config options
case "unbounded":
if cfg.TTL != 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache must be 0 for unbounded caches. Got %d", section, cfg.TTL))
errs = append(errs, fmt.Errorf("%s: in_memory_cache.ttl_seconds is not supported for unbounded caches. Got %d", section, cfg.TTL))
}
if cfg.RequestCacheSize != 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.request_cache_size_bytes must be 0 for unbounded caches. Got %d", section, cfg.RequestCacheSize))
}
if cfg.ImpCacheSize != 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.imp_cache_size_bytes must be 0 for unbounded caches. Got %d", section, cfg.ImpCacheSize))
if dataType == AccountDataType {
// single cache
if cfg.Size != 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.size_bytes is not supported for unbounded caches. Got %d", section, cfg.Size))
}
} else {
// dual (request and imp) caches
if cfg.RequestCacheSize != 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.request_cache_size_bytes is not supported for unbounded caches. Got %d", section, cfg.RequestCacheSize))
}
if cfg.ImpCacheSize != 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.imp_cache_size_bytes is not supported for unbounded caches. Got %d", section, cfg.ImpCacheSize))
}
}
case "lru":
if cfg.RequestCacheSize <= 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.request_cache_size_bytes must be >= 0 when in_memory_cache.type=lru. Got %d", section, cfg.RequestCacheSize))
}
if cfg.ImpCacheSize <= 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.imp_cache_size_bytes must be >= 0 when in_memory_cache.type=lru. Got %d", section, cfg.ImpCacheSize))
if dataType == AccountDataType {
// single cache
if cfg.Size <= 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.size_bytes must be >= 0 when in_memory_cache.type=lru. Got %d", section, cfg.Size))
}
if cfg.RequestCacheSize > 0 || cfg.ImpCacheSize > 0 {
glog.Warningf("%s: in_memory_cache.request_cache_size_bytes and imp_cache_size_bytes do not apply to this section and will be ignored", section)
}
} else {
// dual (request and imp) caches
if cfg.RequestCacheSize <= 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.request_cache_size_bytes must be >= 0 when in_memory_cache.type=lru. Got %d", section, cfg.RequestCacheSize))
}
if cfg.ImpCacheSize <= 0 {
errs = append(errs, fmt.Errorf("%s: in_memory_cache.imp_cache_size_bytes must be >= 0 when in_memory_cache.type=lru. Got %d", section, cfg.ImpCacheSize))
}
if cfg.Size > 0 {
glog.Warningf("%s: in_memory_cache.size_bytes does not apply in this section and will be ignored", section)
}
}
default:
errs = append(errs, fmt.Errorf("%s: in_memory_cache.type %s is invalid", section, cfg.Type))
Expand Down
60 changes: 50 additions & 10 deletions config/stored_requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,43 +75,83 @@ func TestPostgressConnString(t *testing.T) {
assertHasValue(t, params, "sslmode", "disable")
}

func TestInMemoryCacheValidation(t *testing.T) {
func TestInMemoryCacheValidationStoredRequests(t *testing.T) {
assertNoErrs(t, (&InMemoryCache{
Type: "unbounded",
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertNoErrs(t, (&InMemoryCache{
Type: "none",
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertNoErrs(t, (&InMemoryCache{
Type: "lru",
RequestCacheSize: 1000,
ImpCacheSize: 1000,
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unrecognized",
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unbounded",
ImpCacheSize: 1000,
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unbounded",
RequestCacheSize: 1000,
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unbounded",
TTL: 500,
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "lru",
RequestCacheSize: 0,
ImpCacheSize: 1000,
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "lru",
RequestCacheSize: 1000,
ImpCacheSize: 0,
}).validate("Test", nil))
}).validate(RequestDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "lru",
Size: 1000,
}).validate(RequestDataType, nil))
}

func TestInMemoryCacheValidationSingleCache(t *testing.T) {
assertNoErrs(t, (&InMemoryCache{
Type: "unbounded",
}).validate(AccountDataType, nil))
assertNoErrs(t, (&InMemoryCache{
Type: "none",
}).validate(AccountDataType, nil))
assertNoErrs(t, (&InMemoryCache{
Type: "lru",
Size: 1000,
}).validate(AccountDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unrecognized",
}).validate(AccountDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unbounded",
Size: 1000,
}).validate(AccountDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "unbounded",
TTL: 500,
}).validate(AccountDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "lru",
Size: 0,
}).validate(AccountDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "lru",
RequestCacheSize: 1000,
}).validate(AccountDataType, nil))
assertErrsExist(t, (&InMemoryCache{
Type: "lru",
ImpCacheSize: 1000,
}).validate(AccountDataType, nil))
}

func assertErrsExist(t *testing.T, err configErrors) {
Expand Down
3 changes: 2 additions & 1 deletion stored_requests/caches/memory/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
// For no TTL, use ttlSeconds <= 0
func NewCache(size int, ttl int, dataType string) stored_requests.CacheJSON {
if ttl > 0 && size <= 0 {
glog.Fatalf("No in-memory %s caches defined with a finite TTL but unbounded size. Config validation should have caught this. Failing fast because something is buggy.", dataType)
// a positive ttl indicates "LRU" cache type, while unlimited size indicates an "unbounded" cache type
glog.Fatalf("unbounded in-memory %s cache with TTL not allowed. Config validation should have caught this. Failing fast because something is buggy.", dataType)
}
if size > 0 {
glog.Infof("Using a Stored %s in-memory cache. Max size: %d bytes. TTL: %d seconds.", dataType, size, ttl)
Expand Down
18 changes: 10 additions & 8 deletions stored_requests/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,17 @@ func newFetcher(cfg *config.StoredRequests, client *http.Client, db *sql.DB) (fe
}

func newCache(cfg *config.StoredRequests) stored_requests.Cache {
if cfg.InMemoryCache.Type == "none" {
glog.Infof("No Stored %s cache configured. The %s Fetcher backend will be used for all data requests", cfg.DataType(), cfg.DataType())
return stored_requests.Cache{&nil_cache.NilCache{}, &nil_cache.NilCache{}}
}

return stored_requests.Cache{
Requests: memory.NewCache(cfg.InMemoryCache.RequestCacheSize, cfg.InMemoryCache.TTL, "Requests"),
Imps: memory.NewCache(cfg.InMemoryCache.ImpCacheSize, cfg.InMemoryCache.TTL, "Imps"),
cache := stored_requests.Cache{&nil_cache.NilCache{}, &nil_cache.NilCache{}, &nil_cache.NilCache{}}
switch {
case cfg.InMemoryCache.Type == "none":
glog.Warningf("No %s cache configured. The %s Fetcher backend will be used for all data requests", cfg.DataType(), cfg.DataType())
case cfg.DataType() == config.AccountDataType:
cache.Accounts = memory.NewCache(cfg.InMemoryCache.Size, cfg.InMemoryCache.TTL, "Accounts")
default:
cache.Requests = memory.NewCache(cfg.InMemoryCache.RequestCacheSize, cfg.InMemoryCache.TTL, "Requests")
cache.Imps = memory.NewCache(cfg.InMemoryCache.ImpCacheSize, cfg.InMemoryCache.TTL, "Imps")
}
return cache
}

func newEventProducers(cfg *config.StoredRequests, client *http.Client, db *sql.DB, router *httprouter.Router) (eventProducers []events.EventProducer) {
Expand Down
Loading

0 comments on commit c7ddfa6

Please sign in to comment.