Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove refcounter from cache #292

Merged
merged 6 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,7 @@ type Config struct {
// NewClientFromConfig constructs a new local client from the proto configuration.
func NewClientFromConfig(cfg *Config, app ApplicationCallback, errors func(error)) (*Client, error) {
var cacheExpiration time.Duration
switch {
case cfg.DeviceCacheExpirationSeconds < 0:
cacheExpiration = time.Microsecond
case cfg.DeviceCacheExpirationSeconds == 0:
cacheExpiration = time.Second * 3600
default:
if cfg.DeviceCacheExpirationSeconds > 0 {
cacheExpiration = time.Second * time.Duration(cfg.DeviceCacheExpirationSeconds)
}

Expand Down Expand Up @@ -148,7 +143,7 @@ func NewClient(
client := Client{
client: oc,
app: app,
deviceCache: NewRefDeviceCache(cacheExpiration, errors),
deviceCache: NewDeviceCache(cacheExpiration, time.Minute, errors),
observeResourceCache: kitSync.NewMap(),
deviceOwner: deviceOwner,
subscriptions: make(map[string]subscription),
Expand All @@ -173,7 +168,7 @@ type Client struct {
app ApplicationCallback
client *core.Client

deviceCache *refDeviceCache
deviceCache *DeviceCache

observeResourceCache *kitSync.Map
observerPollingInterval time.Duration
Expand Down
35 changes: 25 additions & 10 deletions client/core/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type DeviceConfiguration struct {
}

type Device struct {
deviceID string
deviceID uberAtom.String
foundByIP uberAtom.String
deviceTypes []string
getEndpoints func() schema.Endpoints
Expand All @@ -40,6 +40,18 @@ type Device struct {
lock sync.Mutex
}

func (d *Device) UpdateBy(v *Device) {
d.setDeviceID(v.DeviceID())
// foundByIP can be overwritten only when it is set.
if v.foundByIP.Load() != "" {
d.foundByIP.Store(v.foundByIP.Load())
}
d.lock.Lock()
defer d.lock.Unlock()
d.deviceTypes = v.deviceTypes
d.getEndpoints = v.getEndpoints
}

// GetCertificateFunc returns certificate for connection
type GetCertificateFunc func() (tls.Certificate, error)

Expand Down Expand Up @@ -115,14 +127,15 @@ func NewDevice(
deviceTypes []string,
getEndpoints func() schema.Endpoints,
) *Device {
return &Device{
d := &Device{
cfg: cfg,
deviceID: deviceID,
deviceTypes: deviceTypes,
observations: &sync.Map{},
getEndpoints: getEndpoints,
conn: make(map[string]*conn),
}
d.setDeviceID(deviceID)
return d
}

func (d *Device) popConnections() []*conn {
Expand Down Expand Up @@ -294,15 +307,11 @@ func (d *Device) connectToEndpoints(ctx context.Context, endpoints schema.Endpoi
}

func (d *Device) DeviceID() string {
d.lock.Lock()
defer d.lock.Unlock()
return d.deviceID
return d.deviceID.Load()
}

func (d *Device) setDeviceID(deviceID string) {
d.lock.Lock()
defer d.lock.Unlock()
d.deviceID = deviceID
d.deviceID.Store(deviceID)
}

func (d *Device) FoundByIP() string {
Expand All @@ -320,5 +329,11 @@ func (d *Device) setFoundByIP(foundByIP string) {
}

func (d *Device) DeviceTypes() []string {
return d.deviceTypes
d.lock.Lock()
deviceTypes := d.deviceTypes
d.lock.Unlock()
if deviceTypes == nil {
return nil
}
return deviceTypes
}
3 changes: 2 additions & 1 deletion client/core/disownDevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func (d *Device) Disown(
if err != nil {
if connectionWasClosed(ctx, err) {
// connection was closed by disown so we don't report error just log it.
d.cfg.ErrFunc(cannotDisownErr(err))
// TODO: use logger

return nil
}

Expand Down
8 changes: 7 additions & 1 deletion client/core/getEndpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,11 @@ import (
)

func (d *Device) GetEndpoints() schema.Endpoints {
return d.getEndpoints()
d.lock.Lock()
getEndpoints := d.getEndpoints
d.lock.Unlock()
if getEndpoints != nil {
return getEndpoints()
}
return nil
}
3 changes: 1 addition & 2 deletions client/createResource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ func (c *Client) CreateResource(
cfg = o.applyOnCreate(cfg)
}

d, links, err := c.GetRefDevice(ctx, deviceID, WithDiscoveryConfiguration(cfg.discoveryConfiguration))
d, links, err := c.GetDevice(ctx, deviceID, WithDiscoveryConfiguration(cfg.discoveryConfiguration))
if err != nil {
return err
}
defer d.Release(ctx)

link, err := core.GetResourceLink(links, href)
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions client/deleteDevice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package client

import (
"context"
)

func (c *Client) DeleteDevice(ctx context.Context, deviceID string) (bool, error) {
dev, ok := c.deviceCache.LoadAndDeleteDevice(ctx, deviceID)
if !ok {
return false, nil
}
err := dev.Close(ctx)
if err != nil {
c.errors(err)
}
return true, nil
}
3 changes: 1 addition & 2 deletions client/deleteResource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ func (c *Client) DeleteResource(
cfg = o.applyOnDelete(cfg)
}

d, links, err := c.GetRefDevice(ctx, deviceID, WithDiscoveryConfiguration(cfg.discoveryConfiguration))
d, links, err := c.GetDevice(ctx, deviceID, WithDiscoveryConfiguration(cfg.discoveryConfiguration))
if err != nil {
return err
}
defer d.Release(ctx)

link, err := core.GetResourceLink(links, href)
if err != nil {
Expand Down
196 changes: 196 additions & 0 deletions client/deviceCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package client

import (
"context"
"fmt"
"time"

"github.com/plgd-dev/device/client/core"
"github.com/plgd-dev/go-coap/v2/pkg/cache"
"go.uber.org/atomic"
)

type DeviceCache struct {
deviceExpiration time.Duration
devicesCache *cache.Cache
errors func(error)

closed atomic.Bool
done chan struct{}
}

// Creates a new cache for devices.
// - deviceExpiration: default expiration time for the device in the cache, 0 means infinite. The device expiration is refreshed by getting or updating the device.
// - pollInterval: pool interval for cleaning expired devices from the cache
// - errors: function for logging errors
func NewDeviceCache(deviceExpiration, pollInterval time.Duration, errors func(error)) *DeviceCache {
done := make(chan struct{})
cache := cache.NewCache()
if deviceExpiration > 0 {
go func() {
t := time.NewTicker(pollInterval)
defer t.Stop()
for {
select {
case now := <-t.C:
cache.CheckExpirations(now)
case <-done:
return
}
}
}()
}
return &DeviceCache{
devicesCache: cache,
deviceExpiration: deviceExpiration,
errors: errors,
done: done,
}
}

// This function loads the device from the cache and deletes it from the cache. To cleanup the device you have to call device.Close.
func (c *DeviceCache) LoadAndDeleteDevice(ctx context.Context, deviceID string) (*core.Device, bool) {
d := c.devicesCache.Load(deviceID)
if d == nil {
return nil, false
}
dev := d.Data().(*core.Device)
c.devicesCache.Delete(deviceID)
return dev, true
}

func (c *DeviceCache) GetDevice(deviceID string) (*core.Device, bool) {
d := c.devicesCache.Load(deviceID)
if d == nil {
return nil, false
}
if deviceIsStoredWithExpiration(d) {
d.ValidUntil.Store(time.Now().Add(c.deviceExpiration))
}
return d.Data().(*core.Device), true
}

func (c *DeviceCache) GetDeviceByFoundIP(ip string) *core.Device {
var d *core.Device
c.devicesCache.Range(func(key, val interface{}) bool {
dev := val.(*core.Device)
if dev.FoundByIP() == ip {
d = dev
return false
}
return true
})
return d
}

func (c *DeviceCache) GetDeviceExpiration(deviceID string) (time.Time, bool) {
d := c.devicesCache.Load(deviceID)
if d == nil {
return time.Time{}, false
}
return d.ValidUntil.Load(), true
}

// This function stores the device without timeout into the cache. The device can be removed from
// the cache only by invoking LoadAndDeleteDevice function and device.Close to cleanup connections. If a device with the same deviceID is already
// in the cache, the previous reference will be updated in the cache and it's expiration time will be set to infinite.
func (c *DeviceCache) UpdateOrStoreDevice(device *core.Device) (*core.Device, bool) {
return c.updateOrStoreDevice(device, time.Time{})
}

// This function stores the device with the default timeout into the cache. If a device with the same
// deviceID is already in the cache the device will be updated and the expiration time will be reset
// only when the device has it set.
func (c *DeviceCache) UpdateOrStoreDeviceWithExpiration(device *core.Device) (*core.Device, bool) {
return c.updateOrStoreDevice(device, time.Now().Add(c.deviceExpiration))
}

// Try to change the expiration time for the device in cache to default expiration.
func (c *DeviceCache) TryToChangeDeviceExpirationToDefault(deviceID string) bool {
d := c.devicesCache.Load(deviceID)
if d == nil {
return false
}
if d.Data().(*core.Device).FoundByIP() == "" {
d.ValidUntil.Store(time.Now().Add(c.deviceExpiration))
return true
}
return false
}

func deviceIsStoredWithExpiration(e *cache.Element) bool {
return !e.ValidUntil.Load().IsZero()
}

func (c *DeviceCache) updateOrStoreDevice(device *core.Device, expiration time.Time) (*core.Device, bool) {
deviceID := device.DeviceID()

d := c.devicesCache.Load(deviceID)
if d != nil {
dev := d.Data().(*core.Device)
dev.UpdateBy(device)

// record is already in cache
// if someone requirers from the device to be stored permanently (without timeout)
// override the expiration
if deviceIsStoredWithExpiration(d) {
d.ValidUntil.Store(expiration)
}
return dev, true
}
// if the device was not in the cache store it
loadedDev, loaded := c.devicesCache.LoadOrStore(deviceID, cache.NewElement(device, expiration, func(d1 interface{}) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
err := d1.(*core.Device).Close(ctx)
if err != nil {
c.errors(err)
}
}))
dev := loadedDev.Data().(*core.Device)
if loaded {
dev.UpdateBy(device)
// record is already in cache
// if someone requirers from the device to be stored permanently (without timeout)
// override the expiration
if deviceIsStoredWithExpiration(d) {
loadedDev.ValidUntil.Store(expiration)
}
return dev, true
}
return dev, false
}

func (c *DeviceCache) GetDevicesFoundByIP() map[string]string {
devices := make(map[string]string)

c.devicesCache.Range(func(key, value interface{}) bool {
d := value.(*core.Device)

if ip := d.FoundByIP(); ip != "" {
devices[d.DeviceID()] = ip
}
return true
})

return devices
}

func (c *DeviceCache) Close(ctx context.Context) error {
var errors []error
if c.closed.CompareAndSwap(false, true) {
close(c.done)
}
for _, val := range c.devicesCache.PullOutAll() {
d := val.(*core.Device)
err := d.Close(ctx)
if err != nil {
errors = append(errors, err)
}
}

if len(errors) > 0 {
return fmt.Errorf("%v", errors)
}
return nil
}
Loading