Skip to content

Commit

Permalink
Add scheduler to fix active entities each minute
Browse files Browse the repository at this point in the history
  • Loading branch information
juancarmore committed Nov 14, 2024
1 parent c5e4af0 commit 5b38003
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ replace github.com/livekit/livekit-server => ./

require (
github.com/bep/debounce v1.2.1
github.com/bsm/redislock v0.9.4
github.com/d5/tengo/v2 v2.17.0
github.com/dustin/go-humanize v1.0.1
github.com/elliotchance/orderedmap/v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw=
github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk=
github.com/bufbuild/protovalidate-go v0.6.1 h1:uzW8r0CDvqApUChNj87VzZVoQSKhcVdw5UWOE605UIw=
github.com/bufbuild/protovalidate-go v0.6.1/go.mod h1:4BR3rKEJiUiTy+sqsusFn2ladOf0kYmA2Reo6BHSBgQ=
github.com/bufbuild/protoyaml-go v0.1.9 h1:anV5UtF1Mlvkkgp4NWA6U/zOnJFng8Orq4Vf3ZUQHBU=
Expand Down
54 changes: 50 additions & 4 deletions openvidu/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package analytics

import (
"context"
"encoding/json"
"strconv"
"sync"
"time"

"github.com/bsm/redislock"
"github.com/livekit/livekit-server/pkg/config"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
redisLiveKit "github.com/livekit/protocol/redis"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand All @@ -34,6 +37,7 @@ import (

var ANALYTICS_CONFIGURATION *openviduconfig.AnalyticsConfig
var ANALYTICS_SENDERS []*AnalyticsSender
var redisLocker *redislock.Client = nil

type AnalyticsSender struct {
eventsQueue queue.Queue[*livekit.AnalyticsEvent]
Expand All @@ -49,31 +53,43 @@ type BaseDatabaseClient struct {
type DatabaseClient interface {
InitializeDatabase() error
SendBatch()
FixActiveEntities()
}

func InitializeAnalytics(configuration *config.Config, livekithelper livekithelperinterface.LivekitHelper) error {

mongoDatabaseClient, err := NewMongoDatabaseClient(&configuration.OpenVidu.Analytics, livekithelper)
if err != nil {
return err
}

err = mongoDatabaseClient.InitializeDatabase()
if err != nil {
return err
}

ANALYTICS_CONFIGURATION = &configuration.OpenVidu.Analytics
ANALYTICS_SENDERS = []*AnalyticsSender{mongoDatabaseClient.owner}

// // To also store events and stats in Redis (given that it has module RedisJSON):
//
if configuration.Redis.IsConfigured() {
rc, err := redisLiveKit.GetRedisClient(&configuration.Redis)
if err != nil {
return err
}

redisLocker = redislock.New(rc)
}

// To also store events and stats in Redis (given that it has module RedisJSON):
// redisDatabaseClient, err := NewRedisDatabaseClient(&configuration.OpenVidu.Analytics, &configuration.Redis, livekithelper)
// if err != nil {
// return err
// }
//
// err = redisDatabaseClient.InitializeDatabase()
// if err != nil {
// return err
// }
//
// ANALYTICS_SENDERS = append(ANALYTICS_SENDERS, redisDatabaseClient.owner)

return nil
Expand All @@ -82,8 +98,9 @@ func InitializeAnalytics(configuration *config.Config, livekithelper livekithelp
// Blocking method. Launch in goroutine
func Start() {
var wg sync.WaitGroup
wg.Add(1)
wg.Add(2)
go startAnalyticsRoutine()
go startActiveEntitiesFixer()
wg.Wait()
}

Expand All @@ -100,6 +117,35 @@ func sendBatch() {
}
}

func startActiveEntitiesFixer() {
for {
func() {
if redisLocker != nil {
context := context.Background()
backoff := redislock.LinearBackoff(500 * time.Millisecond)

lock, err := redisLocker.Obtain(context, "active-entities-lock", 2*time.Minute, &redislock.Options{
RetryStrategy: backoff,
})
if err != nil {
return
}

defer lock.Release(context)
}

fixActiveEntities()
time.Sleep(time.Minute)
}()
}
}

func fixActiveEntities() {
for _, sender := range ANALYTICS_SENDERS {
sender.databaseClient.FixActiveEntities()
}
}

type OpenViduEventsIngestClient struct {
// Must have this empty property to implement interface livekit.AnalyticsRecorderService_IngestEventsClient
grpc.ClientStream
Expand Down
16 changes: 8 additions & 8 deletions openvidu/analytics/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type RedisDatabaseClient struct {
}

func NewRedisDatabaseClient(conf *openviduconfig.AnalyticsConfig, redisConfig *redisLiveKit.RedisConfig, livekithelper livekithelperinterface.LivekitHelper) (*RedisDatabaseClient, error) {

var err error
redisClient, err := redisLiveKit.GetRedisClient(redisConfig)
if err != nil {
Expand All @@ -45,6 +44,7 @@ func NewRedisDatabaseClient(conf *openviduconfig.AnalyticsConfig, redisConfig *r
redisDatabaseClient := &RedisDatabaseClient{
client: redisClient,
}

sender := &AnalyticsSender{
eventsQueue: queue.NewSliceQueue[*livekit.AnalyticsEvent](),
statsQueue: queue.NewSliceQueue[*livekit.AnalyticsStat](),
Expand All @@ -57,12 +57,10 @@ func NewRedisDatabaseClient(conf *openviduconfig.AnalyticsConfig, redisConfig *r
}

func (m *RedisDatabaseClient) InitializeDatabase() error {
err := m.createRedisJsonIndexDocuments()
return err
return m.createRedisJsonIndexDocuments()
}

func (r *RedisDatabaseClient) createRedisJsonIndexDocuments() error {

// Create text index for event "$.type"
_, err := r.client.Do(context.Background(), "FT.CREATE", "idx:eventType", "ON", "JSON", "PREFIX", "1", "event:", "SCHEMA", "$.type", "AS", "eventType", "TEXT").Result()
err = handleIndexCreationError(err, "$.type")
Expand Down Expand Up @@ -153,26 +151,25 @@ func handleIndexCreationError(err error, indexSchema string) error {
}

func (r *RedisDatabaseClient) SendBatch() {

events := dequeEvents(r.owner.eventsQueue)
stats := dequeStats(r.owner.statsQueue)

if len(events) > 0 || len(stats) > 0 {

pipelinesByRoom := make(map[string]redis.Pipeliner)

for _, event := range events {
if _, ok := pipelinesByRoom[event.Room.Sid]; !ok {
pipelinesByRoom[event.Room.Sid] = r.client.Pipeline()
}
}

for _, stat := range stats {
if _, ok := pipelinesByRoom[stat.RoomId]; !ok {
pipelinesByRoom[stat.RoomId] = r.client.Pipeline()
}
}

for _, event := range events {

eventMap := obtainMapInterfaceFromEvent(event)
eventKey := "event:{" + event.Room.Sid + "}" + ":" + event.Type.String() + ":" + getTimestampFromStruct(event.Timestamp)

Expand All @@ -183,7 +180,6 @@ func (r *RedisDatabaseClient) SendBatch() {
}

for _, stat := range stats {

statMap := obtainMapInterfaceFromStat(stat)
statKey := "stat:{" + stat.RoomId + "}" + ":" + stat.ParticipantId + ":" + stat.TrackId + ":" + getTimestampFromStruct(stat.TimeStamp)

Expand All @@ -198,3 +194,7 @@ func (r *RedisDatabaseClient) SendBatch() {
}
}
}

func (r *RedisDatabaseClient) FixActiveEntities() {
panic("FixActiveEntities not implemented for RedisDatabaseClient")
}

0 comments on commit 5b38003

Please sign in to comment.