Skip to content

Commit

Permalink
Implement fixActiveParticipants function in mongo.go
Browse files Browse the repository at this point in the history
  • Loading branch information
juancarmore committed Nov 18, 2024
1 parent e14458e commit 97b626e
Showing 1 changed file with 95 additions and 8 deletions.
103 changes: 95 additions & 8 deletions openvidu/analytics/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,18 +335,16 @@ func (m *MongoDatabaseClient) FixActiveEntities() {
var newEvents []interface{}
var deletedActiveEntities []interface{}

// Get all active entities from MongoDB
activeEntities := m.getActiveEntities()
// Get last timestamp that server was alive
lastAlive := m.getLastTimestampAlive()

// Fix active rooms
if activeEntities != nil && len(activeEntities.Rooms) > 0 {
deletedActiveEntities, newEvents = m.fixActiveRooms(activeEntities.Rooms, deletedActiveEntities, newEvents, lastAlive)
deletedActiveEntities, newEvents = m.fixActiveRooms(activeEntities.Rooms, newEvents, deletedActiveEntities, lastAlive)
}

// TODO: Implement the following fixes
// Fix active participants
if activeEntities != nil && len(activeEntities.Participants) > 0 {
deletedActiveEntities, newEvents = m.fixActiveParticipants(activeEntities.Participants, newEvents, deletedActiveEntities, lastAlive)
}

// Fix active egress

Expand Down Expand Up @@ -398,7 +396,6 @@ func (m *MongoDatabaseClient) FixActiveEntities() {
logger.Errorw("failed to execute transaction in MongoDB", err)
}

// Update last timestamp that server was alive
m.updateLastTimestampAlive()
}

Expand Down Expand Up @@ -490,7 +487,7 @@ func (m *MongoDatabaseClient) getActiveEntities() *ActiveEntities {

func (m *MongoDatabaseClient) fixActiveRooms(
activeRoomsDb []string,
deletedActiveEntities, newEvents []interface{},
newEvents, deletedActiveEntities []interface{},
lastAlive Timestamp,
) ([]interface{}, []interface{}) {
// Get all active rooms from LiveKit
Expand Down Expand Up @@ -575,6 +572,96 @@ func (m *MongoDatabaseClient) fixActiveRooms(
return deletedActiveEntities, newEvents
}

func (m *MongoDatabaseClient) fixActiveParticipants(
activeParticipantsDb []string,
newEvents, deletedActiveEntities []interface{},
lastAlive Timestamp,
) ([]interface{}, []interface{}) {
// Get all active participants from LiveKit
activeParticipants, err := m.livekitHelper.ListActiveParticipants()
if err != nil {
logger.Errorw("failed to list active participants from LiveKit", err)
return deletedActiveEntities, newEvents
}

activeParticipantsSet := make(map[string]bool)
for _, participant := range activeParticipants {
activeParticipantsSet[participant.Sid] = true
}

// Filter participants that are not actually active by checking if they are present in LiveKit
for _, participantId := range activeParticipantsDb {
if !activeParticipantsSet[participantId] {
// Check if "PARTICIPANT_LEFT" event already exists
eventCollection := m.client.Database("openvidu").Collection("events")
result := eventCollection.FindOne(
context.Background(),
bson.D{
{Key: "participant_id", Value: participantId},
{Key: "type", Value: livekit.AnalyticsEventType_PARTICIPANT_LEFT.String()},
},
options.FindOne().SetProjection(bson.D{{Key: "participant_id", Value: 1}}),
)

// Check if there was an error different from "no documents found"
if result.Err() != nil && result.Err() != mongo.ErrNoDocuments {
logger.Errorw("failed to find PARTICIPANT_LEFT event for participant in MongoDB", result.Err(), "participant_id", participantId)
continue
}

deletedActiveEntities = append(deletedActiveEntities, bson.D{{Key: "_id", Value: participantId}})

// If "PARTICIPANT_LEFT" event already exists, skip
if result.Err() == nil {
continue
}

// Save "PARTICIPANT_LEFT" fake event to keep consistency
// Get info from "PARTICIPANT_ACTIVE" event
var participantActiveEventMap map[string]interface{}
err = eventCollection.FindOne(
context.Background(),
bson.D{
{Key: "participant_id", Value: participantId},
{Key: "type", Value: livekit.AnalyticsEventType_PARTICIPANT_ACTIVE.String()},
},
options.FindOne().SetProjection(bson.D{
{Key: "room_id", Value: 1},
{Key: "room.sid", Value: 1},
{Key: "participant_id", Value: 1},
{Key: "participant.sid", Value: 1},
{Key: "participant.identity", Value: 1},
{Key: "participant.name", Value: 1},
{Key: "participant.joined_at", Value: 1},
}),
).Decode(&participantActiveEventMap)
if err != nil {
if err != mongo.ErrNoDocuments {
logger.Errorw("failed to find PARTICIPANT_ACTIVE event for participant in MongoDB", err, "participant_id", participantId)
deletedActiveEntities = deletedActiveEntities[:len(deletedActiveEntities)-1]
}
continue
}

// Fill "PARTICIPANT_LEFT" event with necessary info
participantLeftEvent := participantActiveEventMap
participantLeftEvent["type"] = livekit.AnalyticsEventType_PARTICIPANT_LEFT.String()
participantLeftEvent["openvidu_expire_at"] = time.Now().Add(ANALYTICS_CONFIGURATION.Expiration).UTC()

creationTime := participantActiveEventMap["participant"].(map[string]interface{})["joined_at"].(int64)
if creationTime >= lastAlive.Seconds {
participantLeftEvent["timestamp"].(map[string]interface{})["seconds"] = creationTime + 5
} else {
participantLeftEvent["timestamp"].(map[string]interface{})["seconds"] = lastAlive.Seconds
}

newEvents = append(newEvents, participantLeftEvent)
}
}

return deletedActiveEntities, newEvents
}

func (m *MongoDatabaseClient) getLastTimestampAlive() Timestamp {
lastAliveCollection := m.client.Database("openvidu").Collection("last_alive")

Expand Down

0 comments on commit 97b626e

Please sign in to comment.