Skip to content

Commit

Permalink
Merge pull request #92 from diwise/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
nordbergmikael authored Aug 13, 2024
2 parents ed1e7ad + cc039ea commit a0eabd1
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 30 deletions.
23 changes: 20 additions & 3 deletions cmd/iot-core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/diwise/iot-core/internal/pkg/application"
"github.com/diwise/iot-core/internal/pkg/application/functions"
"github.com/diwise/iot-core/internal/pkg/application/measurements"
"github.com/diwise/iot-core/internal/pkg/infrastructure/database"
"github.com/diwise/iot-core/internal/pkg/presentation/api"
"github.com/diwise/iot-core/pkg/messaging/events"
Expand Down Expand Up @@ -45,6 +46,8 @@ func main() {
dmClient := createDeviceManagementClientOrDie(ctx)
defer dmClient.Close(ctx)

measurementsClient := createMeasurementsClientOrDie(ctx)

msgCtx := createMessagingContextOrDie(ctx)
defer msgCtx.Close()

Expand All @@ -60,7 +63,7 @@ func main() {
defer configFile.Close()
}

_, api_, err := initialize(ctx, dmClient, msgCtx, configFile, storage)
_, api_, err := initialize(ctx, dmClient, measurementsClient, msgCtx, configFile, storage)
if err != nil {
fatal(ctx, "initialization failed", err)
}
Expand All @@ -86,6 +89,20 @@ func createDeviceManagementClientOrDie(ctx context.Context) client.DeviceManagem
return dmClient
}

func createMeasurementsClientOrDie(ctx context.Context) measurements.MeasurementsClient {
dmURL := env.GetVariableOrDie(ctx, "MEASUREMENTS_URL", "url to measurements service")
tokenURL := env.GetVariableOrDie(ctx, "OAUTH2_TOKEN_URL", "a valid oauth2 token URL")
clientID := env.GetVariableOrDie(ctx, "OAUTH2_CLIENT_ID", "a valid oauth2 client id")
clientSecret := env.GetVariableOrDie(ctx, "OAUTH2_CLIENT_SECRET", "a valid oauth2 client secret")

measurementsClient, err := measurements.NewMeasurementsClient(ctx, dmURL, tokenURL, clientID, clientSecret)
if err != nil {
fatal(ctx, "failed to create measurements client", err)
}

return measurementsClient
}

func createMessagingContextOrDie(ctx context.Context) messaging.MsgContext {
logger := logging.GetFromContext(ctx)

Expand All @@ -111,13 +128,13 @@ func createDatabaseConnectionOrDie(ctx context.Context) database.Storage {
return storage
}

func initialize(ctx context.Context, dmClient client.DeviceManagementClient, msgctx messaging.MsgContext, fconfig io.Reader, storage database.Storage) (application.App, api.API, error) {
func initialize(ctx context.Context, dmClient client.DeviceManagementClient, mClient measurements.MeasurementsClient, msgctx messaging.MsgContext, fconfig io.Reader, storage database.Storage) (application.App, api.API, error) {
functionsRegistry, err := functions.NewRegistry(ctx, fconfig, storage)
if err != nil {
return nil, nil, err
}

app := application.New(dmClient, functionsRegistry)
app := application.New(dmClient, mClient, functionsRegistry)

msgctx.RegisterCommandHandler(func(m messaging.Message) bool {
return strings.HasPrefix(m.ContentType(), "application/vnd.oma.lwm2m")
Expand Down
4 changes: 2 additions & 2 deletions cmd/iot-core/main.integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestAPIfunctionsReturns200OK(t *testing.T) {
is, dmClient, msgCtx := testSetup(t)

fconf := bytes.NewBufferString("fid1;name;counter;overflow;internalID;false")
_, api, err := initialize(context.Background(), dmClient, msgCtx, fconf, &database.StorageMock{
_, api, err := initialize(context.Background(), dmClient, nil, msgCtx, fconf, &database.StorageMock{
AddFnFunc: func(ctx context.Context, id, fnType, subType, tenant, source string, lat, lon float64) error {
return nil
},
Expand All @@ -47,7 +47,7 @@ func TestReceiveDigitalInputUpdateMessage(t *testing.T) {
sID := "internalID"

fconf := bytes.NewBufferString("fid1;name;counter;overflow;" + sID + ";false")
_, _, err := initialize(context.Background(), dmClient, msgCtx, fconf, &database.StorageMock{
_, _, err := initialize(context.Background(), dmClient, nil, msgCtx, fconf, &database.StorageMock{
AddFnFunc: func(ctx context.Context, id, fnType, subType, tenant, source string, lat, lon float64) error {
return nil
},
Expand Down
74 changes: 74 additions & 0 deletions internal/pkg/application/decorators/device.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package decorators

import (
"context"
"fmt"
"math"

"github.com/diwise/iot-core/internal/pkg/application/measurements"
"github.com/diwise/iot-core/pkg/messaging/events"
"github.com/diwise/senml"
"github.com/diwise/service-chassis/pkg/infrastructure/o11y/logging"
)

type ValueFinder func() float64

const (
BatteryLevel string = "9"
PowerSourceVoltage string = "7"
DeviceObjectID string = "3"
)

func GetMaxPowerSourceVoltage(ctx context.Context, maxValueFinder measurements.MaxValueFinder, deviceID string) ValueFinder {
powerSourceVoltageMeasurementID := fmt.Sprintf("%s/%s/%s", deviceID, DeviceObjectID, PowerSourceVoltage)

m, err := maxValueFinder.GetMaxValue(ctx, powerSourceVoltageMeasurementID)
if err != nil {
return func() float64 {
return 0.0
}
}

return func() float64 {
return m
}
}

func Device(ctx context.Context, max ValueFinder) events.EventDecoratorFunc {
log := logging.GetFromContext(ctx)

return func(m *events.MessageAccepted) {
objID := events.GetObjectID(m.Pack)
if objID != DeviceObjectID {
return
}

_, ok := m.Pack.GetValue(senml.FindByName(BatteryLevel))
if ok {
log.Debug("battery level already set")
return
}

vvd, ok := m.Pack.GetValue(senml.FindByName(PowerSourceVoltage))
if !ok {
log.Warn("no power source voltage found")
return
}

percentage := math.RoundToEven((vvd / max()) * 100)

if percentage < 0 {
percentage = 0
}

if percentage > 100 {
percentage = 100
}

m.Pack = append(m.Pack, senml.Record{
Name: BatteryLevel,
Value: &percentage,
Unit: "%",
})
}
}
24 changes: 15 additions & 9 deletions internal/pkg/application/iotcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"log/slog"
"sync"

"github.com/diwise/iot-core/internal/pkg/application/decorators"
"github.com/diwise/iot-core/internal/pkg/application/functions"
"github.com/diwise/iot-core/internal/pkg/application/measurements"
"github.com/diwise/iot-core/pkg/messaging/events"
"github.com/diwise/iot-device-mgmt/pkg/client"
"github.com/diwise/messaging-golang/pkg/messaging"
Expand All @@ -19,15 +21,17 @@ type App interface {
}

type app struct {
client client.DeviceManagementClient
fnctRegistry functions.Registry
mu sync.Mutex
client client.DeviceManagementClient
measurementsClient measurements.MeasurementsClient
fnctRegistry functions.Registry
mu sync.Mutex
}

func New(client client.DeviceManagementClient, functionRegistry functions.Registry) App {
func New(client client.DeviceManagementClient, measurementsClient measurements.MeasurementsClient, functionRegistry functions.Registry) App {
return &app{
client: client,
fnctRegistry: functionRegistry,
client: client,
fnctRegistry: functionRegistry,
measurementsClient: measurementsClient,
}
}

Expand Down Expand Up @@ -73,7 +77,7 @@ func (a *app) MessageReceived(ctx context.Context, msg events.MessageReceived) (
device, err := a.client.FindDeviceFromInternalID(ctx, msg.DeviceID())
if err != nil {
log.Debug(fmt.Sprintf("could not find device with internalID %s", msg.DeviceID()), "err", err.Error())
return nil, ErrCouldNotFindDevice
return nil, ErrCouldNotFindDevice
}

clone := msg.Pack.Clone()
Expand All @@ -83,9 +87,11 @@ func (a *app) MessageReceived(ctx context.Context, msg events.MessageReceived) (
events.Lon(device.Longitude()),
events.Environment(device.Environment()),
events.Source(device.Source()),
events.Tenant(device.Tenant()))
events.Tenant(device.Tenant()),
decorators.Device(ctx, decorators.GetMaxPowerSourceVoltage(ctx, a.measurementsClient, device.ID())),
)

log.Debug(fmt.Sprintf("message.accepted created for device %s with object type %s", ma.DeviceID(), ma.ObjectID()))
log.Debug(fmt.Sprintf("message.accepted created for device %s with object type %s", ma.DeviceID(), ma.ObjectID()), slog.String("body", string(ma.Body())))

return ma, nil
}
61 changes: 61 additions & 0 deletions internal/pkg/application/measurements/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package measurements

import (
"sync"
"time"
)

type CacheItem struct {
Value any
ExpiryTime time.Time
}

type Cache struct {
items map[string]CacheItem
mutex sync.RWMutex
}

func NewCache() *Cache {
return &Cache{
items: make(map[string]CacheItem),
}
}

func (c *Cache) Set(key string, value any, duration time.Duration) {
c.mutex.Lock()
defer c.mutex.Unlock()

c.items[key] = CacheItem{
Value: value,
ExpiryTime: time.Now().Add(duration),
}
}

func (c *Cache) Get(key string) (any, bool) {
c.mutex.RLock()
defer c.mutex.RUnlock()

item, exists := c.items[key]
if !exists || item.ExpiryTime.Before(time.Now()) {
return nil, false
}

return item.Value, true
}

func (c *Cache) Cleanup(interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for {
<-ticker.C
now := time.Now()
c.mutex.Lock()
for key, item := range c.items {
if item.ExpiryTime.Before(now) {
delete(c.items, key)
}
}
c.mutex.Unlock()
}
}()
}
Loading

0 comments on commit a0eabd1

Please sign in to comment.