diff --git a/go.mod b/go.mod index 19243907bdb..43738d236e9 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ replace github.com/livekit/livekit-server => ./ require ( github.com/bep/debounce v1.2.1 + github.com/bsm/redislock v0.9.4 github.com/d5/tengo/v2 v2.17.0 github.com/dustin/go-humanize v1.0.1 github.com/elliotchance/orderedmap/v2 v2.4.0 diff --git a/go.sum b/go.sum index 00ca325cbd9..a9921e3c53b 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bsm/redislock v0.9.4 h1:X/Wse1DPpiQgHbVYRE9zv6m070UcKoOGekgvpNhiSvw= +github.com/bsm/redislock v0.9.4/go.mod h1:Epf7AJLiSFwLCiZcfi6pWFO/8eAYrYpQXFxEDPoDeAk= github.com/bufbuild/protovalidate-go v0.6.1 h1:uzW8r0CDvqApUChNj87VzZVoQSKhcVdw5UWOE605UIw= github.com/bufbuild/protovalidate-go v0.6.1/go.mod h1:4BR3rKEJiUiTy+sqsusFn2ladOf0kYmA2Reo6BHSBgQ= github.com/bufbuild/protoyaml-go v0.1.9 h1:anV5UtF1Mlvkkgp4NWA6U/zOnJFng8Orq4Vf3ZUQHBU= diff --git a/openvidu/analytics/analytics.go b/openvidu/analytics/analytics.go index f6e2e7bf1ab..d1f74e995f3 100644 --- a/openvidu/analytics/analytics.go +++ b/openvidu/analytics/analytics.go @@ -15,14 +15,17 @@ package analytics import ( + "context" "encoding/json" "strconv" "sync" "time" + "github.com/bsm/redislock" "github.com/livekit/livekit-server/pkg/config" "github.com/livekit/protocol/livekit" "github.com/livekit/protocol/logger" + redisLiveKit "github.com/livekit/protocol/redis" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -32,8 +35,38 @@ import ( "github.com/openvidu/openvidu-livekit/openvidu/queue" ) +const dbLockName = "analytics-db-operations-lock" + var ANALYTICS_CONFIGURATION *openviduconfig.AnalyticsConfig var ANALYTICS_SENDERS []*AnalyticsSender +var redisLocker *redislock.Client = nil +var mutex sync.Mutex + +type EntityType string + +const ( + RoomEntity EntityType = "ROOM" + ParticipantEntity EntityType = "PARTICIPANT" + EgressEntity EntityType = "EGRESS" + IngressEntity EntityType = "INGRESS" +) + +type ActiveEntities struct { + Rooms []string + Participants []string + Egresses []string + Ingresses []string +} + +type LastAlive struct { + ID string `bson:"_id"` + LastAlive Timestamp `bson:"last_alive"` +} + +type Timestamp struct { + Seconds int64 `bson:"seconds"` + Nanos int32 `bson:"nanos"` +} type AnalyticsSender struct { eventsQueue queue.Queue[*livekit.AnalyticsEvent] @@ -49,6 +82,7 @@ type BaseDatabaseClient struct { type DatabaseClient interface { InitializeDatabase() error SendBatch() + FixActiveEntities() } func InitializeAnalytics(configuration *config.Config, livekithelper livekithelperinterface.LivekitHelper) error { @@ -57,23 +91,35 @@ func InitializeAnalytics(configuration *config.Config, livekithelper livekithelp if err != nil { return err } + err = mongoDatabaseClient.InitializeDatabase() if err != nil { return err } + ANALYTICS_CONFIGURATION = &configuration.OpenVidu.Analytics ANALYTICS_SENDERS = []*AnalyticsSender{mongoDatabaseClient.owner} - // // To also store events and stats in Redis (given that it has module RedisJSON): - // + if configuration.Redis.IsConfigured() { + rc, err := redisLiveKit.GetRedisClient(&configuration.Redis) + if err != nil { + return err + } + + redisLocker = redislock.New(rc) + } + + // To also store events and stats in Redis (given that it has module RedisJSON): // redisDatabaseClient, err := NewRedisDatabaseClient(&configuration.OpenVidu.Analytics, &configuration.Redis, livekithelper) // if err != nil { // return err // } + // // err = redisDatabaseClient.InitializeDatabase() // if err != nil { // return err // } + // // ANALYTICS_SENDERS = append(ANALYTICS_SENDERS, redisDatabaseClient.owner) return nil @@ -82,15 +128,36 @@ func InitializeAnalytics(configuration *config.Config, livekithelper livekithelp // Blocking method. Launch in goroutine func Start() { var wg sync.WaitGroup - wg.Add(1) + wg.Add(2) go startAnalyticsRoutine() + go startActiveEntitiesFixer() wg.Wait() } func startAnalyticsRoutine() { for { - time.Sleep(ANALYTICS_CONFIGURATION.Interval) - sendBatch() + func() { + time.Sleep(ANALYTICS_CONFIGURATION.Interval) + + // If Redis is configured, use Redis Locker instead of mutex + if redisLocker != nil { + context := context.Background() + backoff := redislock.LinearBackoff(1 * time.Second) + lock, err := redisLocker.Obtain(context, dbLockName, 2*time.Second, &redislock.Options{ + RetryStrategy: backoff, + }) + if err != nil { + return + } + + defer lock.Release(context) + } else { + mutex.Lock() + defer mutex.Unlock() + } + + sendBatch() + }() } } @@ -100,6 +167,59 @@ func sendBatch() { } } +func startActiveEntitiesFixer() { + for { + func() { + if redisLocker != nil { + context := context.Background() + backoff := redislock.LinearBackoff(10 * time.Second) + + lock, err := redisLocker.Obtain(context, "active-entities-lock", 2*time.Minute, &redislock.Options{ + RetryStrategy: backoff, + }) + if err != nil { + return + } + + defer lock.Release(context) + } + + // If Redis is configured, use Redis Locker instead of mutex + var redisLock *redislock.Lock + context := context.Background() + if redisLocker != nil { + backoff := redislock.LinearBackoff(1 * time.Second) + + var err error + redisLock, err = redisLocker.Obtain(context, dbLockName, 2*time.Second, &redislock.Options{ + RetryStrategy: backoff, + }) + if err != nil { + return + } + } else { + mutex.Lock() + } + + fixActiveEntities() + + if redisLocker != nil { + redisLock.Release(context) + } else { + mutex.Unlock() + } + + time.Sleep(time.Minute) + }() + } +} + +func fixActiveEntities() { + for _, sender := range ANALYTICS_SENDERS { + sender.databaseClient.FixActiveEntities() + } +} + type OpenViduEventsIngestClient struct { // Must have this empty property to implement interface livekit.AnalyticsRecorderService_IngestEventsClient grpc.ClientStream @@ -192,6 +312,14 @@ func getTimestampFromStruct(timestamp *timestamppb.Timestamp) string { return timestampKey } +func getCurrentTimestamp() Timestamp { + now := time.Now() + return Timestamp{ + Seconds: now.Unix(), + Nanos: int32(now.Nanosecond()), + } +} + func parseEvent(eventMap map[string]interface{}, event *livekit.AnalyticsEvent) { eventMap["type"] = event.Type.String() if eventMap["participant"] != nil { diff --git a/openvidu/analytics/mongo.go b/openvidu/analytics/mongo.go index 04f50400ae7..5f8d9ae6c4c 100644 --- a/openvidu/analytics/mongo.go +++ b/openvidu/analytics/mongo.go @@ -19,6 +19,7 @@ import ( "crypto/md5" "encoding/hex" "errors" + "strconv" "time" "github.com/livekit/protocol/livekit" @@ -34,7 +35,9 @@ import ( type MongoDatabaseClient struct { BaseDatabaseClient - client *mongo.Client + client *mongo.Client + fakeCloseEvents []interface{} + deletedActiveEntities []interface{} } func NewMongoDatabaseClient(conf *openviduconfig.AnalyticsConfig, livekithelper livekithelperinterface.LivekitHelper) (*MongoDatabaseClient, error) { @@ -43,14 +46,17 @@ func NewMongoDatabaseClient(conf *openviduconfig.AnalyticsConfig, livekithelper if err != nil { return nil, err } + logger.Infow("connecting to mongodb", "url", conf.MongoUrl) err = mongoClient.Ping(context, nil) if err != nil { return nil, err } + mongoDatabaseClient := &MongoDatabaseClient{ client: mongoClient, } + sender := &AnalyticsSender{ eventsQueue: queue.NewSliceQueue[*livekit.AnalyticsEvent](), statsQueue: queue.NewSliceQueue[*livekit.AnalyticsStat](), @@ -63,66 +69,104 @@ func NewMongoDatabaseClient(conf *openviduconfig.AnalyticsConfig, livekithelper } func (m *MongoDatabaseClient) InitializeDatabase() error { - err := m.createMongoJsonIndexDocuments() - return err + return m.createMongoJsonIndexDocuments() } func (m *MongoDatabaseClient) SendBatch() { + eventsNumber := m.owner.eventsQueue.Len() + if eventsNumber > 0 { + m.sendEventsBatch() + } + + statsNumber := m.owner.statsQueue.Len() + if statsNumber > 0 { + m.sendStatsBatch() + } +} +func (m *MongoDatabaseClient) sendEventsBatch() { events := dequeEvents(m.owner.eventsQueue) - stats := dequeStats(m.owner.statsQueue) - if len(events) > 0 || len(stats) > 0 { + var parsedEvents []interface{} + var newActiveEntities []interface{} + var deletedActiveEntities []interface{} - openviduDb := m.client.Database("openvidu") - eventCollection := openviduDb.Collection("events") - statCollection := openviduDb.Collection("stats") + for _, event := range events { + eventMap := obtainMapInterfaceFromEvent(event) + parseEvent(eventMap, event) + mongoParseEvent(eventMap, event) + parsedEvents = append(parsedEvents, eventMap) - var parsedEvents []interface{} - for _, event := range events { - eventMap := obtainMapInterfaceFromEvent(event) - parseEvent(eventMap, event) - mongoParseEvent(eventMap, event) - parsedEvents = append(parsedEvents, eventMap) - } + newActiveEntities = m.accumluateActiveEntityForCreationEvents(event, newActiveEntities) + deletedActiveEntities = m.deleteActiveEntityForDestructionEvents(event, deletedActiveEntities) + } - var parsedStats []interface{} - for _, stat := range stats { - statMap := obtainMapInterfaceFromStat(stat) - parseStat(statMap, stat) - mongoParseStat(statMap, stat) - parsedStats = append(parsedStats, statMap) - } + openviduDb := m.client.Database("openvidu") + eventCollection := openviduDb.Collection("events") + activeEntityCollection := openviduDb.Collection("active_entities") + ctx := context.Background() - if len(parsedEvents) > 0 { - logger.Debugw("inserting events into MongoDB...") + logger.Debugw("inserting events into MongoDB...") - eventsResult, err := eventCollection.InsertMany(context.Background(), parsedEvents, options.InsertMany().SetOrdered(false)) + result, err := eventCollection.InsertMany(ctx, parsedEvents, options.InsertMany().SetOrdered(false)) + if err != nil { + logger.Errorw("failed to insert events into MongoDB", err) + logger.Warnw("restoring events for next batch", nil) + handleInsertManyError(err, m.owner.eventsQueue, events) + return + } else { + logger.Debugw("inserted events", "#", len(result.InsertedIDs)) + } - if err != nil { - logger.Errorw("failed to insert events into MongoDB", err) - logger.Warnw("restoring events for next batch", nil) - handleInsertManyError(err, m.owner.eventsQueue, events) - } else { - logger.Debugw("inserted events", "#", len(eventsResult.InsertedIDs)) - } + if len(newActiveEntities) > 0 { + logger.Debugw("inserting active entities into MongoDB...") + + result, err := activeEntityCollection.InsertMany(ctx, newActiveEntities, options.InsertMany().SetOrdered(false)) + if err != nil { + logger.Errorw("failed to insert active entities in MongoDB", err) + } else { + logger.Debugw("inserted active entities", "#", len(result.InsertedIDs)) } - if len(parsedStats) > 0 { - logger.Debugw("inserting stats into MongoDB...") + } - statsResult, err := statCollection.InsertMany(context.Background(), parsedStats, options.InsertMany().SetOrdered(false)) + if len(deletedActiveEntities) > 0 { + logger.Debugw("deleting active entities from MongoDB...") - if err != nil { - logger.Errorw("failed to insert stats into MongoDB", err) - logger.Warnw("restoring stats for next batch", nil) - handleInsertManyError(err, m.owner.statsQueue, stats) - } else { - logger.Debugw("inserted stats", "#", len(statsResult.InsertedIDs)) - } + result, err := activeEntityCollection.DeleteMany(ctx, bson.D{{Key: "$or", Value: deletedActiveEntities}}) + if err != nil { + logger.Errorw("failed to delete active entities from MongoDB", err) + } else { + logger.Debugw("deleted active entities", "#", result.DeletedCount) } } } +func (m *MongoDatabaseClient) sendStatsBatch() { + stats := dequeStats(m.owner.statsQueue) + + var parsedStats []interface{} + for _, stat := range stats { + statMap := obtainMapInterfaceFromStat(stat) + parseStat(statMap, stat) + mongoParseStat(statMap, stat) + parsedStats = append(parsedStats, statMap) + } + + logger.Debugw("inserting stats into MongoDB...") + + openviduDb := m.client.Database("openvidu") + statCollection := openviduDb.Collection("stats") + + result, err := statCollection.InsertMany(context.Background(), parsedStats, options.InsertMany().SetOrdered(false)) + if err != nil { + logger.Errorw("failed to insert stats into MongoDB", err) + logger.Warnw("restoring stats for next batch", nil) + handleInsertManyError(err, m.owner.statsQueue, stats) + } else { + logger.Debugw("inserted stats", "#", len(result.InsertedIDs)) + } +} + func (m *MongoDatabaseClient) createMongoJsonIndexDocuments() error { context := context.TODO() @@ -130,21 +174,21 @@ func (m *MongoDatabaseClient) createMongoJsonIndexDocuments() error { logger.Infow("created database openvidu", "result", openviduDb) eventCollection := openviduDb.Collection("events") - result, err1 := eventCollection.Indexes().CreateMany(context, []mongo.IndexModel{ + result, err := eventCollection.Indexes().CreateMany(context, []mongo.IndexModel{ {Keys: bson.D{{Key: "type", Value: 1}}}, {Keys: bson.D{{Key: "room.sid", Value: 1}}}, {Keys: bson.D{{Key: "participant.sid", Value: 1}}}, {Keys: bson.D{{Key: "timestamp.seconds", Value: 1}}}, {Keys: bson.D{{Key: "openvidu_expire_at", Value: 1}}, Options: options.Index().SetExpireAfterSeconds(0)}, }) - if err1 != nil { - logger.Errorw("failed to create MongoDB event indexes", err1) - return err1 + if err != nil { + logger.Errorw("failed to create MongoDB event indexes", err) + return err } logger.Infow("created mongo event indexes", "result", result) statCollection := openviduDb.Collection("stats") - result, err2 := statCollection.Indexes().CreateMany(context, []mongo.IndexModel{ + result, err = statCollection.Indexes().CreateMany(context, []mongo.IndexModel{ {Keys: bson.D{{Key: "room_id", Value: 1}}}, {Keys: bson.D{{Key: "participant_id", Value: 1}}}, {Keys: bson.D{{Key: "track_id", Value: 1}}}, @@ -153,11 +197,21 @@ func (m *MongoDatabaseClient) createMongoJsonIndexDocuments() error { {Keys: bson.D{{Key: "score", Value: 1}}}, {Keys: bson.D{{Key: "openvidu_expire_at", Value: 1}}, Options: options.Index().SetExpireAfterSeconds(0)}, }) - if err2 != nil { - logger.Errorw("failed to create MongoDB stat indexes", err2) - return err2 + if err != nil { + logger.Errorw("failed to create MongoDB stat indexes", err) + return err } logger.Infow("created mongo stat indexes", "result", result) + + activeEntityCollection := openviduDb.Collection("active_entities") + resultIndex, err := activeEntityCollection.Indexes().CreateOne(context, + mongo.IndexModel{Keys: bson.D{{Key: "entity", Value: 1}}}, + ) + if err != nil { + logger.Errorw("failed to create MongoDB active entity index", err) + return err + } + logger.Infow("created mongo active entity index", "result", resultIndex) return nil } @@ -215,3 +269,523 @@ func hashFromStringId(id string) string { hash := md5.Sum([]byte(id)) return hex.EncodeToString(hash[:]) } + +func (m *MongoDatabaseClient) accumluateActiveEntityForCreationEvents(event *livekit.AnalyticsEvent, activeEntities []interface{}) []interface{} { + var entity EntityType + var id string + + switch event.Type { + case livekit.AnalyticsEventType_ROOM_CREATED: + entity = RoomEntity + id = event.Room.Sid + case livekit.AnalyticsEventType_PARTICIPANT_ACTIVE: + entity = ParticipantEntity + id = event.ParticipantId + case livekit.AnalyticsEventType_EGRESS_STARTED: + entity = EgressEntity + id = event.EgressId + case livekit.AnalyticsEventType_INGRESS_STARTED: + entity = IngressEntity + id = event.Ingress.State.ResourceId + default: + return activeEntities + } + + return append(activeEntities, bson.D{ + {Key: "_id", Value: id}, + {Key: "entity", Value: entity}, + }) +} + +func (m *MongoDatabaseClient) deleteActiveEntityForDestructionEvents(event *livekit.AnalyticsEvent, activeEntities []interface{}) []interface{} { + var id string + + switch event.Type { + case livekit.AnalyticsEventType_ROOM_ENDED: + id = event.RoomId + case livekit.AnalyticsEventType_PARTICIPANT_LEFT: + id = event.ParticipantId + case livekit.AnalyticsEventType_EGRESS_ENDED: + id = event.EgressId + case livekit.AnalyticsEventType_INGRESS_ENDED: + id = event.Ingress.State.ResourceId + default: + return activeEntities + } + + return append(activeEntities, bson.D{{Key: "_id", Value: id}}) +} + +func (m *MongoDatabaseClient) FixActiveEntities() { + /* + * IMPORTANT: + * Perform MongoDB operations (inserting fake close events and deleting active entities) + * before checking for additional entities marked as active but no longer truly active. + * This is crucial to avoid duplicating close events for the following reason: + * + * - There is another goroutine that runs every 10 seconds (by default) to send events to MongoDB. + * - This function runs every minute and may detect a closed entity as still active because the + * 10-second goroutine has not yet saved its authentic close event. + * + * To handle this, a fake close event for the entity is added to the list, and on the next iteration + * (after one minute), the process rechecks MongoDB to verify if an authentic close event has already + * been saved before performing further write or delete operations. + */ + m.filterFakeCloseEvents() + + openviduDb := m.client.Database("openvidu") + ctx := context.Background() + + // Insert all necessary fake close events in MongoDB + if len(m.fakeCloseEvents) > 0 { + logger.Debugw("inserting events into MongoDB...") + + eventCollection := openviduDb.Collection("events") + result, err := eventCollection.InsertMany(ctx, m.fakeCloseEvents, options.InsertMany().SetOrdered(false)) + if err != nil { + logger.Errorw("failed to insert events into MongoDB", err) + return + } else { + logger.Debugw("inserted events", "#", len(result.InsertedIDs)) + m.fakeCloseEvents = nil + } + } + + // Delete all active entities from MongoDB that are not actually active + if len(m.deletedActiveEntities) > 0 { + logger.Debugw("deleting active entities from MongoDB...") + + activeEntityCollection := openviduDb.Collection("active_entities") + result, err := activeEntityCollection.DeleteMany(ctx, bson.D{{Key: "$or", Value: m.deletedActiveEntities}}) + if err != nil { + logger.Errorw("failed to delete inactive entities from MongoDB", err) + return + } else { + logger.Debugw("deleted active entities", "#", result.DeletedCount) + m.deletedActiveEntities = nil + } + } + + activeEntities := m.getActiveEntities() + lastAlive := m.getLastTimestampAlive() + + if activeEntities != nil { + if len(activeEntities.Rooms) > 0 { + m.fixActiveRooms(activeEntities.Rooms, lastAlive) + } + if len(activeEntities.Participants) > 0 { + m.fixActiveParticipants(activeEntities.Participants, lastAlive) + } + if len(activeEntities.Egresses) > 0 { + m.fixActiveEgresses(activeEntities.Egresses, lastAlive) + } + if len(activeEntities.Ingresses) > 0 { + m.fixActiveIngresses(activeEntities.Ingresses, lastAlive) + } + } + + m.updateLastTimestampAlive() +} + +func (m *MongoDatabaseClient) getActiveEntities() *ActiveEntities { + activeEntityCollection := m.client.Database("openvidu").Collection("active_entities") + + // Get all active entities from MongoDB + activeEntitiesCursor, err := activeEntityCollection.Find(context.Background(), bson.D{}) + if err != nil { + logger.Errorw("failed to find active entities in MongoDB", err) + return nil + } + + var activeEntitiesDb []map[string]interface{} + if err = activeEntitiesCursor.All(context.Background(), &activeEntitiesDb); err != nil { + logger.Errorw("failed to decode active entities in MongoDB", err) + return nil + } + + activeEntities := &ActiveEntities{} + for _, entity := range activeEntitiesDb { + entityTypeRaw := entity["entity"].(string) + entityType := EntityType(entityTypeRaw) + id := entity["_id"].(string) + + switch entityType { + case RoomEntity: + activeEntities.Rooms = append(activeEntities.Rooms, id) + case ParticipantEntity: + activeEntities.Participants = append(activeEntities.Participants, id) + case EgressEntity: + activeEntities.Egresses = append(activeEntities.Egresses, id) + case IngressEntity: + activeEntities.Ingresses = append(activeEntities.Ingresses, id) + } + } + + return activeEntities +} + +func (m *MongoDatabaseClient) fixActiveRooms(activeRoomsDb []string, lastAlive Timestamp) { + // Get all active rooms from LiveKit + activeRooms, err := m.livekitHelper.ListActiveRooms() + if err != nil { + logger.Errorw("failed to list active rooms from LiveKit", err) + return + } + + activeRoomsSet := make(map[string]bool) + for _, room := range activeRooms { + activeRoomsSet[room.Sid] = true + } + + // Filter rooms that are not actually active by checking if they are present in LiveKit + for _, roomId := range activeRoomsDb { + if !activeRoomsSet[roomId] { + // Save "ROOM_ENDED" fake event to keep consistency + eventCollection := m.client.Database("openvidu").Collection("events") + + // Get info from "ROOM_CREATED" event + var roomCreatedEventMap map[string]interface{} + err = eventCollection.FindOne( + context.Background(), + bson.D{ + {Key: "room.sid", Value: roomId}, + {Key: "type", Value: livekit.AnalyticsEventType_ROOM_CREATED.String()}, + }, + options.FindOne().SetProjection(bson.D{ + {Key: "_id", Value: 0}, + {Key: "room.sid", Value: 1}, + {Key: "room.name", Value: 1}, + {Key: "room.creation_time", Value: 1}, + }), + ).Decode(&roomCreatedEventMap) + if err != nil { + if err == mongo.ErrNoDocuments { + m.deletedActiveEntities = append(m.deletedActiveEntities, bson.D{{Key: "_id", Value: roomId}}) + } else { + logger.Errorw("failed to find ROOM_CREATED event in MongoDB", err, "room_id", roomId) + } + + continue + } + + // Fill "ROOM_ENDED" event with necessary info + roomEndedEvent := roomCreatedEventMap + roomEndedEvent["type"] = livekit.AnalyticsEventType_ROOM_ENDED.String() + roomEndedEvent["room_id"] = roomId + roomEndedEvent["openvidu_expire_at"] = time.Now().Add(ANALYTICS_CONFIGURATION.Expiration).UTC() + + creationTimeFloat := roomCreatedEventMap["room"].(map[string]interface{})["creation_time"].(float64) + creationTime, _ := strconv.ParseInt(strconv.FormatFloat(creationTimeFloat, 'f', -1, 64), 10, 64) + if creationTime >= lastAlive.Seconds { + lastAlive.Seconds = creationTime + 20 + } + roomEndedEvent["timestamp"] = lastAlive + + m.fakeCloseEvents = append(m.fakeCloseEvents, roomEndedEvent) + } + } +} + +func (m *MongoDatabaseClient) fixActiveParticipants(activeParticipantsDb []string, lastAlive Timestamp) { + // Get all active participants from LiveKit + activeParticipants, err := m.livekitHelper.ListActiveParticipants() + if err != nil { + logger.Errorw("failed to list active participants from LiveKit", err) + return + } + + activeParticipantsSet := make(map[string]bool) + for _, participant := range activeParticipants { + activeParticipantsSet[participant.Sid] = true + } + + // Filter participants that are not actually active by checking if they are present in LiveKit + for _, participantId := range activeParticipantsDb { + if !activeParticipantsSet[participantId] { + // Save "PARTICIPANT_LEFT" fake event to keep consistency + eventCollection := m.client.Database("openvidu").Collection("events") + + // Get info from "PARTICIPANT_ACTIVE" event + var participantActiveEventMap map[string]interface{} + err = eventCollection.FindOne( + context.Background(), + bson.D{ + {Key: "participant_id", Value: participantId}, + {Key: "type", Value: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE.String()}, + }, + options.FindOne().SetProjection(bson.D{ + {Key: "_id", Value: 0}, + {Key: "room_id", Value: 1}, + {Key: "room.sid", Value: 1}, + {Key: "participant_id", Value: 1}, + {Key: "participant.sid", Value: 1}, + {Key: "participant.identity", Value: 1}, + {Key: "participant.name", Value: 1}, + {Key: "participant.joined_at", Value: 1}, + }), + ).Decode(&participantActiveEventMap) + if err != nil { + if err == mongo.ErrNoDocuments { + m.deletedActiveEntities = append(m.deletedActiveEntities, bson.D{{Key: "_id", Value: participantId}}) + } else { + logger.Errorw("failed to find PARTICIPANT_ACTIVE event in MongoDB", err, "participant_id", participantId) + } + + continue + } + + // Fill "PARTICIPANT_LEFT" event with necessary info + participantLeftEvent := participantActiveEventMap + participantLeftEvent["type"] = livekit.AnalyticsEventType_PARTICIPANT_LEFT.String() + participantLeftEvent["openvidu_expire_at"] = time.Now().Add(ANALYTICS_CONFIGURATION.Expiration).UTC() + + joinedAtFloat := participantActiveEventMap["participant"].(map[string]interface{})["joined_at"].(float64) + joinedAt, _ := strconv.ParseInt(strconv.FormatFloat(joinedAtFloat, 'f', -1, 64), 10, 64) + if joinedAt >= lastAlive.Seconds { + lastAlive.Seconds = joinedAt + 5 + } + participantLeftEvent["timestamp"] = lastAlive + + m.fakeCloseEvents = append(m.fakeCloseEvents, participantLeftEvent) + } + } +} + +func (m *MongoDatabaseClient) fixActiveEgresses(activeEgressesDb []string, lastAlive Timestamp) { + // Get all active egresses from LiveKit + activeEgresses, err := m.livekitHelper.ListActiveEgresses() + if err != nil { + logger.Errorw("failed to list active egresses from LiveKit", err) + return + } + + activeEgressesSet := make(map[string]bool) + for _, egress := range activeEgresses { + activeEgressesSet[egress.EgressId] = true + } + + // Filter egresses that are not actually active by checking if they are present in LiveKit + for _, egressId := range activeEgressesDb { + if !activeEgressesSet[egressId] { + // Save "EGRESS_ENDED" fake event to keep consistency + eventCollection := m.client.Database("openvidu").Collection("events") + + // Get info from "EGRESS_STARTED" event + var egressStartedEventMap map[string]interface{} + err = eventCollection.FindOne( + context.Background(), + bson.D{ + {Key: "egress_id", Value: egressId}, + {Key: "type", Value: livekit.AnalyticsEventType_EGRESS_STARTED.String()}, + }, + options.FindOne().SetProjection(bson.D{ + {Key: "_id", Value: 0}, + {Key: "egress_id", Value: 1}, + {Key: "egress.room_id", Value: 1}, + {Key: "egress.room_name", Value: 1}, + {Key: "egress.started_at", Value: 1}, + {Key: "egress.updated_at", Value: 1}, + {Key: "egress.Request", Value: 1}, + {Key: "egress.file_results.filename", Value: 1}, + {Key: "egress.stream_results.url", Value: 1}, + {Key: "egress.segment_results.playlist_name", Value: 1}, + {Key: "timestamp.seconds", Value: 1}, + }), + ).Decode(&egressStartedEventMap) + if err != nil { + if err == mongo.ErrNoDocuments { + m.deletedActiveEntities = append(m.deletedActiveEntities, bson.D{{Key: "_id", Value: egressId}}) + } else { + logger.Errorw("failed to find EGRESS_STARTED event in MongoDB", err, "egress_id", egressId) + } + + continue + } + + // Fill "EGRESS_ENDED" event with necessary info + egressEndedEvent := egressStartedEventMap + egressEndedEvent["type"] = livekit.AnalyticsEventType_EGRESS_ENDED.String() + egressEndedEvent["egress"].(map[string]interface{})["status"] = "EGRESS_COMPLETE" + egressEndedEvent["openvidu_expire_at"] = time.Now().Add(ANALYTICS_CONFIGURATION.Expiration).UTC() + + float, ok := egressStartedEventMap["egress"].(map[string]interface{})["started_at"].(float64) + if !ok { + float, ok = egressStartedEventMap["egress"].(map[string]interface{})["updated_at"].(float64) + if !ok { + float = egressStartedEventMap["timestamp"].(map[string]interface{})["seconds"].(float64) * 1000000000 + } + } + startedAt, _ := strconv.ParseInt(strconv.FormatFloat(float, 'f', -1, 64), 10, 64) + egressEndedEvent["egress"].(map[string]interface{})["started_at"] = startedAt + startedAt = startedAt / 1000000000 + + if startedAt >= lastAlive.Seconds { + lastAlive.Seconds = startedAt + 5 + } + egressEndedEvent["timestamp"] = lastAlive + timestampInNanos := lastAlive.Seconds*1000000000 + int64(lastAlive.Nanos) + egressEndedEvent["egress"].(map[string]interface{})["updated_at"] = timestampInNanos + egressEndedEvent["egress"].(map[string]interface{})["ended_at"] = timestampInNanos + + m.fakeCloseEvents = append(m.fakeCloseEvents, egressEndedEvent) + } + } +} + +func (m *MongoDatabaseClient) fixActiveIngresses(activeIngressesDb []string, lastAlive Timestamp) { + // Get all active ingresses from LiveKit + activeIngresses, err := m.livekitHelper.ListActiveIngresses() + if err != nil { + logger.Errorw("failed to list active ingresses from LiveKit", err) + return + } + + activeIngressesSet := make(map[string]bool) + for _, ingress := range activeIngresses { + activeIngressesSet[ingress.State.ResourceId] = true + } + + // Filter ingress that are not actually active by checking if they are present in LiveKit + for _, ingressResourceId := range activeIngressesDb { + if !activeIngressesSet[ingressResourceId] { + // Save "INGRESS_ENDED" fake event to keep consistency + eventCollection := m.client.Database("openvidu").Collection("events") + + // Get info from "INGRESS_STARTED" event + var ingressStartedEventMap map[string]interface{} + err = eventCollection.FindOne( + context.Background(), + bson.D{ + {Key: "ingress.state.resource_id", Value: ingressResourceId}, + {Key: "type", Value: livekit.AnalyticsEventType_INGRESS_STARTED.String()}, + }, + options.FindOne().SetProjection(bson.D{ + {Key: "_id", Value: 0}, + {Key: "ingress_id", Value: 1}, + {Key: "ingress.state.resource_id", Value: 1}, + {Key: "ingress.state.started_at", Value: 1}, + }), + ).Decode(&ingressStartedEventMap) + if err != nil { + if err == mongo.ErrNoDocuments { + m.deletedActiveEntities = append(m.deletedActiveEntities, bson.D{{Key: "_id", Value: ingressResourceId}}) + } else { + logger.Errorw("failed to find INGRESS_STARTED event in MongoDB", err, "ingress_resource_id", ingressResourceId) + } + + continue + } + + // Fill "INGRESS_ENDED" event with necessary info + ingressEndedEvent := ingressStartedEventMap + ingressEndedEvent["type"] = livekit.AnalyticsEventType_INGRESS_ENDED.String() + ingressEndedEvent["ingress"].(map[string]interface{})["state"].(map[string]interface{})["status"] = "ENDPOINT_INACTIVE" + ingressEndedEvent["openvidu_expire_at"] = time.Now().Add(ANALYTICS_CONFIGURATION.Expiration).UTC() + + startedAtFloat := ingressStartedEventMap["ingress"].(map[string]interface{})["state"].(map[string]interface{})["started_at"].(float64) + startedAt, _ := strconv.ParseInt(strconv.FormatFloat(startedAtFloat, 'f', -1, 64), 10, 64) + startedAt = startedAt / 1000000000 + if startedAt >= lastAlive.Seconds { + lastAlive.Seconds = startedAt + 5 + } + ingressEndedEvent["timestamp"] = lastAlive + + m.fakeCloseEvents = append(m.fakeCloseEvents, ingressEndedEvent) + } + } +} + +// filterFakeCloseEvents removes fake close events that are already present in MongoDB or +// adds the respective entity to the list of deleted active entities if the event is not present +func (m *MongoDatabaseClient) filterFakeCloseEvents() { + if len(m.fakeCloseEvents) == 0 { + return + } + + var filteredFakeCloseEvents []interface{} + for _, event := range m.fakeCloseEvents { + eventType := event.(map[string]interface{})["type"].(string) + switch eventType { + case livekit.AnalyticsEventType_ROOM_ENDED.String(): + roomId := event.(map[string]interface{})["room_id"].(string) + filteredFakeCloseEvents = m.filterEventsByType("room.sid", roomId, []string{eventType}, event, filteredFakeCloseEvents) + case livekit.AnalyticsEventType_PARTICIPANT_LEFT.String(): + participantId := event.(map[string]interface{})["participant_id"].(string) + filteredFakeCloseEvents = m.filterEventsByType("participant_id", participantId, []string{eventType}, event, filteredFakeCloseEvents) + case livekit.AnalyticsEventType_EGRESS_ENDED.String(): + egressId := event.(map[string]interface{})["egress_id"].(string) + filteredFakeCloseEvents = m.filterEventsByType("egress_id", egressId, []string{eventType}, event, filteredFakeCloseEvents) + case livekit.AnalyticsEventType_INGRESS_ENDED.String(): + ingressResourceId := event.(map[string]interface{})["ingress"].(map[string]interface{})["state"].(map[string]interface{})["resource_id"].(string) + eventTypes := []string{eventType, livekit.AnalyticsEventType_INGRESS_DELETED.String()} + filteredFakeCloseEvents = m.filterEventsByType("ingress.state.resource_id", ingressResourceId, eventTypes, event, filteredFakeCloseEvents) + } + } + + m.fakeCloseEvents = filteredFakeCloseEvents +} + +func (m *MongoDatabaseClient) filterEventsByType( + idField string, + id string, + eventTypes []string, + event interface{}, + filteredEvents []interface{}, +) []interface{} { + eventCollection := m.client.Database("openvidu").Collection("events") + result := eventCollection.FindOne( + context.Background(), + bson.D{ + {Key: idField, Value: id}, + {Key: "type", Value: bson.D{ + {Key: "$in", Value: eventTypes}, + }}, + }, + options.FindOne().SetProjection(bson.D{{Key: idField, Value: 1}}), + ) + + if result.Err() != nil { + if result.Err() == mongo.ErrNoDocuments { + filteredEvents = append(filteredEvents, event) + } else { + logger.Errorw("failed to find close event in MongoDB", result.Err(), idField, id) + return filteredEvents + } + } + + m.deletedActiveEntities = append(m.deletedActiveEntities, bson.D{{Key: "_id", Value: id}}) + return filteredEvents +} + +func (m *MongoDatabaseClient) getLastTimestampAlive() Timestamp { + lastAliveCollection := m.client.Database("openvidu").Collection("last_alive") + + var lastAlive LastAlive + err := lastAliveCollection.FindOne(context.Background(), bson.D{{Key: "_id", Value: "server"}}).Decode(&lastAlive) + if err != nil { + return getCurrentTimestamp() + } + + return lastAlive.LastAlive +} + +func (m *MongoDatabaseClient) updateLastTimestampAlive() { + lastAliveCollection := m.client.Database("openvidu").Collection("last_alive") + lastActive := LastAlive{ + ID: "server", + LastAlive: getCurrentTimestamp(), + } + + _, err := lastAliveCollection.UpdateOne( + context.Background(), + bson.D{{Key: "_id", Value: "server"}}, + bson.D{{Key: "$set", Value: bson.D{ + {Key: "_id", Value: lastActive.ID}, + {Key: "last_alive", Value: lastActive.LastAlive}}, + }}, + options.Update().SetUpsert(true), + ) + if err != nil { + logger.Errorw("failed to update last alive timestamp in MongoDB", err) + } +} diff --git a/openvidu/analytics/redis.go b/openvidu/analytics/redis.go index 6f28c2bc77c..e23d33f32c2 100644 --- a/openvidu/analytics/redis.go +++ b/openvidu/analytics/redis.go @@ -35,7 +35,6 @@ type RedisDatabaseClient struct { } func NewRedisDatabaseClient(conf *openviduconfig.AnalyticsConfig, redisConfig *redisLiveKit.RedisConfig, livekithelper livekithelperinterface.LivekitHelper) (*RedisDatabaseClient, error) { - var err error redisClient, err := redisLiveKit.GetRedisClient(redisConfig) if err != nil { @@ -45,6 +44,7 @@ func NewRedisDatabaseClient(conf *openviduconfig.AnalyticsConfig, redisConfig *r redisDatabaseClient := &RedisDatabaseClient{ client: redisClient, } + sender := &AnalyticsSender{ eventsQueue: queue.NewSliceQueue[*livekit.AnalyticsEvent](), statsQueue: queue.NewSliceQueue[*livekit.AnalyticsStat](), @@ -57,12 +57,10 @@ func NewRedisDatabaseClient(conf *openviduconfig.AnalyticsConfig, redisConfig *r } func (m *RedisDatabaseClient) InitializeDatabase() error { - err := m.createRedisJsonIndexDocuments() - return err + return m.createRedisJsonIndexDocuments() } func (r *RedisDatabaseClient) createRedisJsonIndexDocuments() error { - // Create text index for event "$.type" _, err := r.client.Do(context.Background(), "FT.CREATE", "idx:eventType", "ON", "JSON", "PREFIX", "1", "event:", "SCHEMA", "$.type", "AS", "eventType", "TEXT").Result() err = handleIndexCreationError(err, "$.type") @@ -153,18 +151,18 @@ func handleIndexCreationError(err error, indexSchema string) error { } func (r *RedisDatabaseClient) SendBatch() { - events := dequeEvents(r.owner.eventsQueue) stats := dequeStats(r.owner.statsQueue) if len(events) > 0 || len(stats) > 0 { - pipelinesByRoom := make(map[string]redis.Pipeliner) + for _, event := range events { if _, ok := pipelinesByRoom[event.Room.Sid]; !ok { pipelinesByRoom[event.Room.Sid] = r.client.Pipeline() } } + for _, stat := range stats { if _, ok := pipelinesByRoom[stat.RoomId]; !ok { pipelinesByRoom[stat.RoomId] = r.client.Pipeline() @@ -172,7 +170,6 @@ func (r *RedisDatabaseClient) SendBatch() { } for _, event := range events { - eventMap := obtainMapInterfaceFromEvent(event) eventKey := "event:{" + event.Room.Sid + "}" + ":" + event.Type.String() + ":" + getTimestampFromStruct(event.Timestamp) @@ -183,7 +180,6 @@ func (r *RedisDatabaseClient) SendBatch() { } for _, stat := range stats { - statMap := obtainMapInterfaceFromStat(stat) statKey := "stat:{" + stat.RoomId + "}" + ":" + stat.ParticipantId + ":" + stat.TrackId + ":" + getTimestampFromStruct(stat.TimeStamp) @@ -198,3 +194,7 @@ func (r *RedisDatabaseClient) SendBatch() { } } } + +func (r *RedisDatabaseClient) FixActiveEntities() { + panic("FixActiveEntities not implemented for RedisDatabaseClient") +} diff --git a/openvidu/livekithelper/livekithelper.go b/openvidu/livekithelper/livekithelper.go index 75d3eb594dc..61a27d1d370 100644 --- a/openvidu/livekithelper/livekithelper.go +++ b/openvidu/livekithelper/livekithelper.go @@ -70,12 +70,24 @@ func (o *LivekitHelper) ListActiveRooms() ([]*livekit.Room, error) { return rooms, nil } -func (o *LivekitHelper) ListActiveParticipants(roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) { - ctx := context.Background() - participants, err := (*o.roomStore).ListParticipants(ctx, roomName) +func (o *LivekitHelper) ListActiveParticipants() ([]*livekit.ParticipantInfo, error) { + rooms, err := o.ListActiveRooms() if err != nil { return nil, err } + + ctx := context.Background() + participants := make([]*livekit.ParticipantInfo, 0) + + for _, room := range rooms { + roomParticipants, err := (*o.roomStore).ListParticipants(ctx, livekit.RoomName(room.Name)) + if err != nil { + return nil, err + } + + participants = append(participants, roomParticipants...) + } + return participants, nil } @@ -94,5 +106,13 @@ func (o *LivekitHelper) ListActiveIngresses() ([]*livekit.IngressInfo, error) { if err != nil { return nil, err } - return ingresses, nil + + activeIngresses := make([]*livekit.IngressInfo, 0) + for _, ingress := range ingresses { + if ingress.State.Status == livekit.IngressState_ENDPOINT_PUBLISHING && ingress.State.ResourceId != "" { + activeIngresses = append(activeIngresses, ingress) + } + } + + return activeIngresses, nil } diff --git a/openvidu/livekithelper/livekithelperinterface/livekithelperinterface.go b/openvidu/livekithelper/livekithelperinterface/livekithelperinterface.go index 38ae3a4e308..3a1b5379576 100644 --- a/openvidu/livekithelper/livekithelperinterface/livekithelperinterface.go +++ b/openvidu/livekithelper/livekithelperinterface/livekithelperinterface.go @@ -5,7 +5,7 @@ import "github.com/livekit/protocol/livekit" // This interface in a separate package fixes import cycles type LivekitHelper interface { ListActiveRooms() ([]*livekit.Room, error) - ListActiveParticipants(roomName livekit.RoomName) ([]*livekit.ParticipantInfo, error) + ListActiveParticipants() ([]*livekit.ParticipantInfo, error) ListActiveEgresses() ([]*livekit.EgressInfo, error) ListActiveIngresses() ([]*livekit.IngressInfo, error) } diff --git a/openvidu/queue/queue.go b/openvidu/queue/queue.go index 6ff60dfadb2..26719c17684 100644 --- a/openvidu/queue/queue.go +++ b/openvidu/queue/queue.go @@ -22,6 +22,7 @@ type Queue[T any] interface { Enqueue(T) error Dequeue() (T, error) Len() int + Contains(T, func(T, T) bool) bool } var ( @@ -58,3 +59,13 @@ func (q *SliceQueue[T]) Dequeue() (T, error) { var empty T return empty, ErrQueueEmpty } + +// Contains checks if an element exists in the queue based on the given equality function. +func (q *SliceQueue[T]) Contains(element T, equals func(T, T) bool) bool { + for _, e := range *q { + if equals(e, element) { + return true + } + } + return false +} diff --git a/openvidu/queue/queue_test.go b/openvidu/queue/queue_test.go index babbd494899..60b2a6845bb 100644 --- a/openvidu/queue/queue_test.go +++ b/openvidu/queue/queue_test.go @@ -32,6 +32,15 @@ func TestSliceQueue(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 2, q.Len()) + equalsFn := func(a, b int) bool { + return a == b + } + contains := q.Contains(1, equalsFn) + assert.True(t, contains) + + contains = q.Contains(3, equalsFn) + assert.False(t, contains) + value, err := q.Dequeue() assert.NoError(t, err) assert.Equal(t, 1, value)