Skip to content

Commit

Permalink
Adds cache TTL and routing/caching strategy (per plugin)
Browse files Browse the repository at this point in the history
* Moves caching to the plugin pool (from plugin client)
* Moves routing strategy to the plugin pool (from availableplugins)

Resolves intelsdi-x#601
Partially addresses intelsdi-x#539
  • Loading branch information
jcooklin committed Dec 17, 2015
1 parent 5da30c0 commit f091061
Show file tree
Hide file tree
Showing 22 changed files with 801 additions and 569 deletions.
250 changes: 206 additions & 44 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/client"
"github.com/intelsdi-x/snap/control/routing"
"github.com/intelsdi-x/snap/control/strategy"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/serror"
)

Expand All @@ -51,12 +53,11 @@ const (
)

var (
// ErrPoolNotFound - error message when the plugin pool not found
ErrPoolNotFound = errors.New("plugin pool not found")
// ErrBadKey - error message when a bad key used
ErrBadKey = errors.New("bad key")
// ErrBadType - error message when a bad plugin type used
ErrBadType = errors.New("bad plugin type")
ErrPoolEmpty = errors.New("plugin pool is empty")
ErrBadKey = errors.New("bad key")
ErrBadType = errors.New("bad plugin type")
ErrBadStrategy = errors.New("bad strategy")

// This defines the maximum running instances of a loaded plugin.
// It is initialized at runtime via the cli.
Expand Down Expand Up @@ -307,6 +308,9 @@ type apPool struct {

// The number of subscriptions per running instance
concurrencyCount int

// The routing and caching strategy declared by the plugin.
strategy Strategy
}

func newPool(key string, plugins ...*availablePlugin) (*apPool, error) {
Expand All @@ -327,20 +331,8 @@ func newPool(key string, plugins ...*availablePlugin) (*apPool, error) {

if len(plugins) > 0 {
for _, plg := range plugins {
plg.id = p.generatePID()
p.plugins[plg.id] = plg
}
// Because plugin metadata is a singleton and immutable (in static code)
// it is safe to take the first item. Reloading an identical plugin
// with new metadata is protected by plugin loading.

// Checking if plugin is exclusive
// (only one instance should be running).
if plugins[0].meta.Exclusive {
p.max = 1
p.insert(plg)
}
// set concurrency count
p.concurrencyCount = plugins[0].meta.ConcurrencyCount
}

return p, nil
Expand All @@ -350,17 +342,46 @@ func (p *apPool) insert(ap *availablePlugin) error {
if ap.pluginType != plugin.CollectorPluginType && ap.pluginType != plugin.ProcessorPluginType && ap.pluginType != plugin.PublisherPluginType {
return ErrBadType
}
ap.id = p.generatePID()
p.plugins[ap.id] = ap

// If an empty pool is created, it does not have
// any available plugins from which to retrieve
// concurrency count or exclusivity. We ensure it
// is set correctly on an insert.
if len(p.plugins) == 0 {
if err := p.applyPluginMeta(ap); err != nil {
return err
}
}

ap.id = p.generatePID()
p.plugins[ap.id] = ap

return nil
}

func (p *apPool) applyPluginMeta(ap *availablePlugin) error {
// Checking if plugin is exclusive
// (only one instance should be running).
if ap.meta.Exclusive {
p.max = 1
}

// Set the cache TTL
cacheTTL := strategy.GlobalCacheExpiration
if ap.meta.CacheTTL != 0 {
cacheTTL = ap.meta.CacheTTL
}

// Set the routing and caching strategy
switch ap.meta.RoutingStrategy {
case plugin.DefaultRouting:
p.strategy = strategy.NewLRU(cacheTTL)
default:
return ErrBadStrategy
}

// set concurrency count
p.concurrencyCount = ap.meta.ConcurrencyCount

return nil
}

Expand Down Expand Up @@ -484,17 +505,17 @@ func (p *apPool) subscriptionCount() int {
return len(p.subs)
}

func (p *apPool) selectAP(strat RoutingStrategy) (*availablePlugin, serror.SnapError) {
func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) {
p.RLock()
defer p.RUnlock()

sp := make([]routing.SelectablePlugin, p.count())
sp := make([]strategy.SelectablePlugin, p.count())
i := 0
for _, plg := range p.plugins {
sp[i] = plg
i++
}
sap, err := strat.Select(p, sp)
sap, err := p.strategy.Select(sp)
if err != nil || sap == nil {
return nil, serror.New(err)
}
Expand All @@ -506,10 +527,6 @@ func (p *apPool) generatePID() uint32 {
return p.pidCounter
}

func (p *apPool) release() {
p.RUnlock()
}

func (p *apPool) moveSubscriptions(to *apPool) []subscription {
var subs []subscription

Expand All @@ -532,24 +549,58 @@ type subscription struct {
taskID string
}

func (p *apPool) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) {
return p.strategy.CheckCache(mts)
}

func (p *apPool) UpdateCache(mts []core.Metric) {
p.strategy.UpdateCache(mts)
}

func (p *apPool) CacheHits(ns string, ver int) (uint64, error) {
return p.strategy.CacheHits(ns, ver)
}

func (p *apPool) CacheMisses(ns string, ver int) (uint64, error) {
return p.strategy.CacheMisses(ns, ver)
}
func (p *apPool) AllCacheHits() uint64 {
return p.strategy.AllCacheHits()
}

func (p *apPool) AllCacheMisses() uint64 {
return p.strategy.AllCacheMisses()
}

func (p *apPool) CacheTTL() (time.Duration, error) {
if len(p.plugins) == 0 {
return 0, ErrPoolEmpty
}
return p.strategy.CacheTTL(), nil
}

type availablePlugins struct {
// Used to coordinate operations on the table.
*sync.RWMutex

// the strategy used to select a plugin for execution
routingStrategy RoutingStrategy

// table holds all the plugin pools.
// The Pools' primary keys are equal to
// {plugin_type}:{plugin_name}:{plugin_version}
table map[string]*apPool
}

func newAvailablePlugins(routingStrategy RoutingStrategy) *availablePlugins {
type requestedMetrics struct {
plugin *availablePlugin
metricTypes []core.Metric
}

type selectsAvailablePlugins interface {
get(metricTypes []core.Metric) (*requestedMetrics, serror.SnapError)
}

func newAvailablePlugins() *availablePlugins {
return &availablePlugins{
RWMutex: &sync.RWMutex{},
table: make(map[string]*apPool),
routingStrategy: routingStrategy,
RWMutex: &sync.RWMutex{},
table: make(map[string]*apPool),
}
}

Expand Down Expand Up @@ -606,16 +657,127 @@ func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) {
return pool, nil
}

func (ap *availablePlugins) holdPool(key string) (*apPool, serror.SnapError) {
pool, err := ap.getPool(key)
func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric) ([]core.Metric, error) {
var results []core.Metric
pool, serr := ap.getPool(pluginKey)
if serr != nil {
return nil, serr
}
if pool == nil {
return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}

metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes)

if len(metricsToCollect) == 0 {
return metricsFromCache, nil
}

pool.RLock()
defer pool.RUnlock()
p, serr := pool.selectAP()
if serr != nil {
return nil, serr
}

// cast client to PluginCollectorClient
cli, ok := p.client.(client.PluginCollectorClient)
if !ok {
return nil, serror.New(errors.New("unable to cast client to PluginCollectorClient"))
}

// get a metrics
metrics, err := cli.CollectMetrics(metricsToCollect)
if err != nil {
return nil, err
return nil, serror.New(err)
}

if pool != nil {
pool.RLock()
pool.UpdateCache(metrics)

results = make([]core.Metric, len(metricsFromCache)+len(metrics))
idx := 0
for _, m := range metrics {
results[idx] = m
idx++
}
return pool, nil
for _, m := range metricsFromCache {
results[idx] = m
idx++
}

// update statics about plugin
p.hitCount++
p.lastHitTime = time.Now()

return metrics, nil
}

func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error {
var errs []error
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
return errs
}
if pool == nil {
return []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
}

pool.RLock()
defer pool.RUnlock()
p, err := pool.selectAP()
if err != nil {
errs = append(errs, err)
return errs
}

cli, ok := p.client.(client.PluginPublisherClient)
if !ok {
return []error{errors.New("unable to cast client to PluginPublisherClient")}
}

errp := cli.Publish(contentType, content, config)
if errp != nil {
return []error{errp}
}
p.hitCount++
p.lastHitTime = time.Now()
return nil
}

func (ap *availablePlugins) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) {
var errs []error
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
return "", nil, errs
}
if pool == nil {
return "", nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
}

pool.RLock()
defer pool.RUnlock()
p, err := pool.selectAP()
if err != nil {
errs = append(errs, err)
return "", nil, errs
}

cli, ok := p.client.(client.PluginProcessorClient)
if !ok {
return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")}
}

ct, c, errp := cli.Process(contentType, content, config)
if errp != nil {
return "", nil, []error{errp}
}
p.hitCount++
p.lastHitTime = time.Now()
return ct, c, nil
}

func (ap *availablePlugins) findLatestPool(pType, name string) (*apPool, serror.SnapError) {
Expand Down Expand Up @@ -664,7 +826,7 @@ func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapE
return nil, err
}

return pool.selectAP(ap.routingStrategy)
return pool.selectAP()
}

func (ap *availablePlugins) pools() map[string]*apPool {
Expand Down
9 changes: 4 additions & 5 deletions control/available_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"testing"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/routing"
. "github.com/smartystreets/goconvey/convey"
)

Expand All @@ -50,7 +49,7 @@ func TestAvailablePlugin(t *testing.T) {

Convey("Stop()", t, func() {
Convey("returns nil if plugin successfully stopped", func() {
r := newRunner(&routing.RoundRobinStrategy{})
r := newRunner()
a := plugin.Arg{
PluginLogPath: "/tmp/snap-test-plugin-stop.log",
}
Expand All @@ -68,13 +67,13 @@ func TestAvailablePlugin(t *testing.T) {
func TestAvailablePlugins(t *testing.T) {
Convey("newAvailablePlugins()", t, func() {
Convey("returns a pointer to an availablePlugins struct", func() {
aps := newAvailablePlugins(&routing.RoundRobinStrategy{})
aps := newAvailablePlugins()
So(aps, ShouldHaveSameTypeAs, new(availablePlugins))
})
})
Convey("insert()", t, func() {
Convey("adds a collector into the collectors collection", func() {
aps := newAvailablePlugins(&routing.RoundRobinStrategy{})
aps := newAvailablePlugins()
ap := &availablePlugin{
pluginType: plugin.CollectorPluginType,
name: "test",
Expand All @@ -90,7 +89,7 @@ func TestAvailablePlugins(t *testing.T) {
So(nap, ShouldEqual, ap)
})
Convey("returns an error if an unknown plugin type is given", func() {
aps := newAvailablePlugins(&routing.RoundRobinStrategy{})
aps := newAvailablePlugins()
ap := &availablePlugin{
pluginType: 99,
name: "test",
Expand Down
Loading

0 comments on commit f091061

Please sign in to comment.