Skip to content

Commit

Permalink
Merge branch 'openvidu/ensure-send-close-events' into openvidu/main
Browse files Browse the repository at this point in the history
  • Loading branch information
juancarmore committed Nov 26, 2024
2 parents e32f6cc + 67ced98 commit 381b7d0
Show file tree
Hide file tree
Showing 9 changed files with 813 additions and 68 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ replace github.com/livekit/livekit-server => ./

require (
github.com/bep/debounce v1.2.1
github.com/bsm/redislock v0.9.4
github.com/d5/tengo/v2 v2.17.0
github.com/dustin/go-humanize v1.0.1
github.com/elliotchance/orderedmap/v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw=
github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
github.com/bufbuild/protovalidate-go v0.6.1 h1:uzW8r0CDvqApUChNj87VzZVoQSKhcVdw5UWOE605UIw=
github.com/bufbuild/protovalidate-go v0.6.1/go.mod h1:4BR3rKEJiUiTy+sqsusFn2ladOf0kYmA2Reo6BHSBgQ=
github.com/bufbuild/protoyaml-go v0.1.9 h1:anV5UtF1Mlvkkgp4NWA6U/zOnJFng8Orq4Vf3ZUQHBU=
Expand Down
138 changes: 133 additions & 5 deletions openvidu/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package analytics

import (
"context"
"encoding/json"
"strconv"
"sync"
"time"

"github.com/bsm/redislock"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
redisLiveKit "github.com/livekit/protocol/redis"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -32,8 +35,38 @@ import (
"github.com/openvidu/openvidu-livekit/openvidu/queue"
)

const dbLockName = "analytics-db-operations-lock"

var ANALYTICS_CONFIGURATION *openviduconfig.AnalyticsConfig
var ANALYTICS_SENDERS []*AnalyticsSender
var redisLocker *redislock.Client = nil
var mutex sync.Mutex

type EntityType string

const (
RoomEntity EntityType = "ROOM"
ParticipantEntity EntityType = "PARTICIPANT"
EgressEntity EntityType = "EGRESS"
IngressEntity EntityType = "INGRESS"
)

type ActiveEntities struct {
Rooms []string
Participants []string
Egresses []string
Ingresses []string
}

type LastAlive struct {
ID string `bson:"_id"`
LastAlive Timestamp `bson:"last_alive"`
}

type Timestamp struct {
Seconds int64 `bson:"seconds"`
Nanos int32 `bson:"nanos"`
}

type AnalyticsSender struct {
eventsQueue queue.Queue[*livekit.AnalyticsEvent]
Expand All @@ -49,6 +82,7 @@ type BaseDatabaseClient struct {
type DatabaseClient interface {
InitializeDatabase() error
SendBatch()
FixActiveEntities()
}

func InitializeAnalytics(configuration *config.Config, livekithelper livekithelperinterface.LivekitHelper) error {
Expand All @@ -57,23 +91,35 @@ func InitializeAnalytics(configuration *config.Config, livekithelper livekithelp
if err != nil {
return err
}

err = mongoDatabaseClient.InitializeDatabase()
if err != nil {
return err
}

ANALYTICS_CONFIGURATION = &configuration.OpenVidu.Analytics
ANALYTICS_SENDERS = []*AnalyticsSender{mongoDatabaseClient.owner}

// // To also store events and stats in Redis (given that it has module RedisJSON):
//
if configuration.Redis.IsConfigured() {
rc, err := redisLiveKit.GetRedisClient(&configuration.Redis)
if err != nil {
return err
}

redisLocker = redislock.New(rc)
}

// To also store events and stats in Redis (given that it has module RedisJSON):
// redisDatabaseClient, err := NewRedisDatabaseClient(&configuration.OpenVidu.Analytics, &configuration.Redis, livekithelper)
// if err != nil {
// return err
// }
//
// err = redisDatabaseClient.InitializeDatabase()
// if err != nil {
// return err
// }
//
// ANALYTICS_SENDERS = append(ANALYTICS_SENDERS, redisDatabaseClient.owner)

return nil
Expand All @@ -82,15 +128,36 @@ func InitializeAnalytics(configuration *config.Config, livekithelper livekithelp
// Blocking method. Launch in goroutine
func Start() {
var wg sync.WaitGroup
wg.Add(1)
wg.Add(2)
go startAnalyticsRoutine()
go startActiveEntitiesFixer()
wg.Wait()
}

func startAnalyticsRoutine() {
for {
time.Sleep(ANALYTICS_CONFIGURATION.Interval)
sendBatch()
func() {
time.Sleep(ANALYTICS_CONFIGURATION.Interval)

// If Redis is configured, use Redis Locker instead of mutex
if redisLocker != nil {
context := context.Background()
backoff := redislock.LinearBackoff(1 * time.Second)
lock, err := redisLocker.Obtain(context, dbLockName, 2*time.Second, &redislock.Options{
RetryStrategy: backoff,
})
if err != nil {
return
}

defer lock.Release(context)
} else {
mutex.Lock()
defer mutex.Unlock()
}

sendBatch()
}()
}
}

Expand All @@ -100,6 +167,59 @@ func sendBatch() {
}
}

func startActiveEntitiesFixer() {
for {
func() {
if redisLocker != nil {
context := context.Background()
backoff := redislock.LinearBackoff(10 * time.Second)

lock, err := redisLocker.Obtain(context, "active-entities-lock", 2*time.Minute, &redislock.Options{
RetryStrategy: backoff,
})
if err != nil {
return
}

defer lock.Release(context)
}

// If Redis is configured, use Redis Locker instead of mutex
var redisLock *redislock.Lock
context := context.Background()
if redisLocker != nil {
backoff := redislock.LinearBackoff(1 * time.Second)

var err error
redisLock, err = redisLocker.Obtain(context, dbLockName, 2*time.Second, &redislock.Options{
RetryStrategy: backoff,
})
if err != nil {
return
}
} else {
mutex.Lock()
}

fixActiveEntities()

if redisLocker != nil {
redisLock.Release(context)
} else {
mutex.Unlock()
}

time.Sleep(time.Minute)
}()
}
}

func fixActiveEntities() {
for _, sender := range ANALYTICS_SENDERS {
sender.databaseClient.FixActiveEntities()
}
}

type OpenViduEventsIngestClient struct {
// Must have this empty property to implement interface livekit.AnalyticsRecorderService_IngestEventsClient
grpc.ClientStream
Expand Down Expand Up @@ -192,6 +312,14 @@ func getTimestampFromStruct(timestamp *timestamppb.Timestamp) string {
return timestampKey
}

func getCurrentTimestamp() Timestamp {
now := time.Now()
return Timestamp{
Seconds: now.Unix(),
Nanos: int32(now.Nanosecond()),
}
}

func parseEvent(eventMap map[string]interface{}, event *livekit.AnalyticsEvent) {
eventMap["type"] = event.Type.String()
if eventMap["participant"] != nil {
Expand Down
Loading

0 comments on commit 381b7d0

Please sign in to comment.