Skip to content

Commit

Permalink
Logging cleanup (livekit#843)
Browse files Browse the repository at this point in the history
* Logging cleanup

Changes log levels to better match significance

* fix lock
  • Loading branch information
davidzhao authored Jul 21, 2022
1 parent f2e1e67 commit 53f51c8
Show file tree
Hide file tree
Showing 17 changed files with 45 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pkg/rtc/mediatrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func (t *MediaTrack) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.Tra
}
}

t.params.Logger.Debugw("primary codec published, set potential codecs", "potential", potentialCodecs)
if len(potentialCodecs) > 0 {
t.params.Logger.Debugw("primary codec published, set potential codecs", "potential", potentialCodecs)
t.MediaTrackReceiver.SetPotentialCodecs(potentialCodecs, parameters.HeaderExtensions)
}
t.params.Telemetry.TrackPublished(
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/mediatrackreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (t *MediaTrackReceiver) removeSubscriber(subscriberID livekit.ParticipantID
}

func (t *MediaTrackReceiver) RemoveAllSubscribers(willBeResumed bool) {
t.params.Logger.Debugw("removing all subscribers")
t.params.Logger.Infow("removing all subscribers")
for _, subscriberID := range t.MediaTrackSubscriptions.GetAllSubscribers() {
t.RemoveSubscriber(subscriberID, willBeResumed)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rtc/mediatracksubscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (t *MediaTrackSubscriptions) downTrackClosed(
if sender == nil {
return
}
sub.GetLogger().Infow("removing peerconnection track",
sub.GetLogger().Debugw("removing PeerConnection track",
"publisher", subTrack.PublisherIdentity(),
"publisherID", subTrack.PublisherID(),
"kind", t.params.MediaTrack.Kind(),
Expand All @@ -767,7 +767,7 @@ func (t *MediaTrackSubscriptions) downTrackClosed(
if _, ok := err.(*rtcerr.InvalidStateError); !ok {
// most of these are safe to ignore, since the track state might have already
// been set to Inactive
sub.GetLogger().Infow("could not remove remoteTrack from forwarder",
sub.GetLogger().Debugw("could not remove remoteTrack from forwarder",
"error", err,
"publisher", subTrack.PublisherIdentity(),
"publisherID", subTrack.PublisherID(),
Expand Down
22 changes: 7 additions & 15 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func (p *ParticipantImpl) Close(sendLeave bool, reason types.ParticipantCloseRea
return nil
}

p.params.Logger.Infow("closing participant", "sendLeave", sendLeave, "reason", reason)
p.params.Logger.Infow("closing participant", "sendLeave", sendLeave, "reason", reason.String())
// send leave message
if sendLeave {
_ = p.writeMessage(&livekit.SignalResponse{
Expand Down Expand Up @@ -884,15 +884,6 @@ func (p *ParticipantImpl) GetConnectionQuality() *livekit.ConnectionQualityInfo
}

rating := connectionquality.Score2Rating(avgScore)
if rating != livekit.ConnectionQuality_EXCELLENT {
p.params.Logger.Infow("connection quality not optimal",
"totalScore", totalScore,
"numTracks", numTracks,
"rating", rating,
"publisherScores", publisherScores,
"subscriberScores", subscriberScores,
)
}

return &livekit.ConnectionQualityInfo{
ParticipantSid: string(p.ID()),
Expand Down Expand Up @@ -1314,7 +1305,7 @@ func (p *ParticipantImpl) handlePrimaryStateChange(state webrtc.PeerConnectionSt
}
if primaryPC.ConnectionState() != webrtc.PeerConnectionStateConnected {
p.params.Logger.Infow("closing disconnected participant")
p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected)
_ = p.Close(true, types.ParticipantCloseReasonPeerConnectionDisconnected)
}
})
p.lock.Unlock()
Expand Down Expand Up @@ -1580,7 +1571,7 @@ func (p *ParticipantImpl) addPendingTrackLocked(req *livekit.AddTrackRequest) *l
if req.Sid != "" {
track := p.GetPublishedTrack(livekit.TrackID(req.Sid))
if track == nil {
p.params.Logger.Infow("track not found for new codec publish", "trackID", req.Sid)
p.params.Logger.Infow("could not find existing track for multi-codec simulcast", "trackID", req.Sid)
return nil
}

Expand Down Expand Up @@ -2098,7 +2089,7 @@ func (p *ParticipantImpl) setDowntracksConnected() {
func (p *ParticipantImpl) CacheDownTrack(trackID livekit.TrackID, rtpTransceiver *webrtc.RTPTransceiver, forwarderState sfu.ForwarderState) {
p.lock.Lock()
if existing := p.cachedDownTracks[trackID]; existing != nil && existing.transceiver != rtpTransceiver {
p.params.Logger.Infow("cached transceiver change", "trackID", trackID)
p.params.Logger.Infow("cached transceiver changed", "trackID", trackID)
}
p.cachedDownTracks[trackID] = &downTrackState{transceiver: rtpTransceiver, forwarder: forwarderState}
p.lock.Unlock()
Expand Down Expand Up @@ -2128,7 +2119,7 @@ func (p *ParticipantImpl) GetCachedDownTrack(trackID livekit.TrackID) (*webrtc.R
}

func (p *ParticipantImpl) handleNegotiationFailed() {
p.params.Logger.Infow("negotiation failed, notify client do full reconnect")
p.params.Logger.Infow("negotiation failed, starting full reconnect")
_ = p.writeMessage(&livekit.SignalResponse{
Message: &livekit.SignalResponse_Leave{
Leave: &livekit.LeaveRequest{
Expand Down Expand Up @@ -2202,7 +2193,8 @@ func (p *ParticipantImpl) ProcessSubscriptionRequestsQueue(trackID livekit.Track
}

default:
p.params.Logger.Warnw("unknown request type", nil)
p.params.Logger.Warnw("unknown request type", nil,
"requestType", request.requestType)

// let the queue move forward
p.ClearInProgressAndProcessSubscriptionRequestsQueue(trackID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/rtc/signalhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
return nil
}
if err := participant.AddICECandidate(candidateInit, msg.Trickle.Target); err != nil {
pLogger.Warnw("could not handle trickle", err)
pLogger.Warnw("could not add ICE candidate", err)
}
case *livekit.SignalRequest_Mute:
participant.SetTrackMuted(livekit.TrackID(msg.Mute.Sid), msg.Mute.Muted, false)
Expand Down Expand Up @@ -65,7 +65,7 @@ func HandleParticipantSignal(room types.Room, participant types.LocalParticipant
continue
}

pLogger.Debugw("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting)
pLogger.Infow("updated subscribed track settings", "trackID", sid, "settings", msg.TrackSetting)
}
case *livekit.SignalRequest_UpdateLayers:
err := room.UpdateVideoLayers(participant, msg.UpdateLayers)
Expand Down
5 changes: 2 additions & 3 deletions pkg/rtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
// restart without current remote description, sent current local description again to try recover
offer := t.pc.LocalDescription()
if offer == nil {
// it should not happend, check for safety
t.params.Logger.Infow("ice restart without local offer")
// it should not happen, log just in case
t.params.Logger.Warnw("ice restart without local offer", nil)
return ErrIceRestartWithoutLocalSDP
} else {
t.negotiationState = negotiationRetry
Expand Down Expand Up @@ -449,7 +449,6 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
failed := t.negotiationState != negotiationStateNone
t.lock.RUnlock()
if t.negotiateCounter.Load() == negotiateVersion && failed {
t.params.Logger.Infow("negotiation timeout")
if t.onNegotiationFailed != nil {
t.onNegotiationFailed()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/service/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *EgressService) updateWorker() {
if res.Error != "" {
logger.Errorw("egress failed", errors.New(res.Error), "egressID", res.EgressId)
} else {
logger.Debugw("egress ended", "egressID", res.EgressId)
logger.Infow("egress ended", "egressID", res.EgressId)
}

s.telemetry.EgressEnded(context.Background(), res)
Expand Down
14 changes: 7 additions & 7 deletions pkg/service/roommanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (r *RoomManager) StartSession(
// When reconnecting, it means WS has interrupted by underlying peer connection is still ok
// in this mode, we'll keep the participant SID, and just swap the sink for the underlying connection
if pi.Reconnect {
logger.Debugw("resuming RTC session",
logger.Infow("resuming RTC session",
"room", roomName,
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
Expand All @@ -225,7 +225,7 @@ func (r *RoomManager) StartSession(
return errors.New("could not restart participant")
}

logger.Debugw("starting RTC session",
logger.Infow("starting RTC session",
"room", roomName,
"nodeID", r.currentNode.Id,
"participant", pi.Identity,
Expand Down Expand Up @@ -351,22 +351,22 @@ func (r *RoomManager) getOrCreateRoom(ctx context.Context, roomName livekit.Room
newRoom.OnClose(func() {
r.telemetry.RoomEnded(ctx, newRoom.ToProto())
if err := r.DeleteRoom(ctx, roomName); err != nil {
logger.Errorw("could not delete room", err)
newRoom.Logger.Errorw("could not delete room", err)
}

logger.Infow("room closed")
newRoom.Logger.Infow("room closed")
})

newRoom.OnMetadataUpdate(func(metadata string) {
if err := r.roomStore.StoreRoom(ctx, newRoom.ToProto()); err != nil {
logger.Errorw("could not handle metadata update", err)
newRoom.Logger.Errorw("could not handle metadata update", err)
}
})

newRoom.OnParticipantChanged(func(p types.LocalParticipant) {
if p.State() != livekit.ParticipantInfo_DISCONNECTED {
if err := r.roomStore.StoreParticipant(ctx, roomName, p.ToProto()); err != nil {
logger.Errorw("could not handle participant change", err)
newRoom.Logger.Errorw("could not handle participant change", err)
}
}
})
Expand All @@ -388,7 +388,7 @@ func (r *RoomManager) rtcSessionWorker(room *rtc.Room, participant types.LocalPa
// give time for the participant to be closed with a proper reason.
// if participant is closed from here, we would be obscuring the real reason the participant is closed.
time.Sleep(2 * time.Second)
logger.Debugw("RTC session finishing",
logger.Infow("RTC session finishing",
"participant", participant.Identity(),
"pID", participant.ID(),
"room", room.Name(),
Expand Down
15 changes: 4 additions & 11 deletions pkg/service/rtcservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/routing/selector"
"github.com/livekit/livekit-server/pkg/rtc"
"github.com/livekit/livekit-server/pkg/rtc/types"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
)

Expand Down Expand Up @@ -182,15 +181,15 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

pLogger := rtc.LoggerWithParticipant(
rtc.LoggerWithRoom(logger.GetDefaultLogger(), roomName, ""),
rtc.LoggerWithRoom(logger.GetDefaultLogger(), roomName, livekit.RoomID(rm.Sid)),
pi.Identity,
"",
false,
)
done := make(chan struct{})
// function exits when websocket terminates, it'll close the event reading off of response sink as well
defer func() {
pLogger.Infow("server closing WS connection", "connID", connId)
pLogger.Infow("finishing WS connection", "connID", connId)
resSource.Close()
reqSink.Close()
close(done)
Expand All @@ -205,15 +204,9 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
sigConn := NewWSSignalConnection(conn)
if types.ProtocolVersion(pi.Client.Protocol).SupportsProtobuf() {
sigConn.useJSON = false
}

prometheus.ServiceOperationCounter.WithLabelValues("signal_ws", "success", "").Add(1)
pLogger.Infow("new client WS connected",
"connID", connId,
"roomID", rm.Sid,
)
pLogger.Infow("new client WS connected", "connID", connId)

// handle responses
go func() {
Expand Down Expand Up @@ -256,7 +249,7 @@ func (s *RTCService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
if err == io.EOF || strings.HasSuffix(err.Error(), "use of closed network connection") ||
websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
pLogger.Infow("exit ws read loop for closed connection", "connID", connId)
pLogger.Debugw("exit ws read loop for closed connection", "connID", connId)
return
} else {
pLogger.Errorw("error reading from websocket", err)
Expand Down
7 changes: 5 additions & 2 deletions pkg/service/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
}
}

values := make([]interface{}, 0)
values = append(values, "sentinel", conf.UseSentinel())
if conf.UseSentinel() {
logger.Infow("using multi-node routing via redis", "sentinel", true, "addr", conf.Redis.SentinelAddresses, "masterName", conf.Redis.MasterName)
values = append(values, "addr", conf.Redis.SentinelAddresses, "masterName", conf.Redis.MasterName)
rcOptions := &redis.FailoverOptions{
SentinelAddrs: conf.Redis.SentinelAddresses,
SentinelUsername: conf.Redis.SentinelUsername,
Expand All @@ -136,7 +138,7 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
}
rc = redis.NewFailoverClient(rcOptions)
} else {
logger.Infow("using multi-node routing via redis", "sentinel", false, "addr", conf.Redis.Address)
values = append(values, "addr", conf.Redis.Address)
rcOptions := &redis.Options{
Addr: conf.Redis.Address,
Username: conf.Redis.Username,
Expand All @@ -146,6 +148,7 @@ func createRedisClient(conf *config.Config) (*redis.Client, error) {
}
rc = redis.NewClient(rcOptions)
}
logger.Infow("using multi-node routing via redis", values...)

if err := rc.Ping(context.Background()).Err(); err != nil {
err = errors.Wrap(err, "unable to connect to redis")
Expand Down
7 changes: 5 additions & 2 deletions pkg/service/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/service/wsprotocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewWSSignalConnection(conn types.WebsocketClient) *WSSignalConnection {
wsc := &WSSignalConnection{
conn: conn,
mu: sync.Mutex{},
useJSON: true,
useJSON: false,
}
go wsc.pingWorker()
return wsc
Expand Down
2 changes: 1 addition & 1 deletion pkg/sfu/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (b *Buffer) Close() error {

if b.rtpStats != nil {
b.rtpStats.Stop()
b.logger.Infow("rtp stats", "stats", b.rtpStats.ToString())
b.logger.Infow("rtp stats", "direction", "upstream", "stats", b.rtpStats.ToString())
}

if b.onClose != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sfu/buffer/dependencydescriptorparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type DependencyDescriptorParser struct {
}

func NewDependencyDescriptorParser(ddExt uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser {
logger.Infow("creating dependency descriptor parse", "ddExt", ddExt)
logger.Infow("creating dependency descriptor parser", "ddExt", ddExt)
return &DependencyDescriptorParser{
ddExt: ddExt,
logger: logger,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sfu/connectionquality/connectionstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (cs *ConnectionStats) updateScore(streams map[uint32]*buffer.StreamStatsWit
}

if cs.score < 4.0 {
cs.params.Logger.Infow("low score", "score", cs.score, "params", params)
cs.params.Logger.Debugw("low connection quality score", "score", cs.score, "params", params)
}

return cs.score
Expand Down
6 changes: 3 additions & 3 deletions pkg/sfu/downtrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (d *DownTrack) Bind(t webrtc.TrackLocalContext) (webrtc.RTPCodecParameters,

d.connectionStats.SetTrackSource(d.receiver.TrackSource())
d.connectionStats.Start()
d.logger.Debugw("bound")
d.logger.Debugw("downtrack bound")

return codec, nil
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func (d *DownTrack) CloseWithFlush(flush bool) {
d.bindLock.Unlock()
d.connectionStats.Close()
d.rtpStats.Stop()
d.logger.Infow("rtp stats", "stats", d.rtpStats.ToString())
d.logger.Infow("rtp stats", "direction", "downstream", "stats", d.rtpStats.ToString())

if d.onMaxLayerChanged != nil && d.kind == webrtc.RTPCodecTypeVideo {
d.onMaxLayerChanged(d, InvalidLayerSpatial)
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func (d *DownTrack) getTranslatedRTPHeader(extPkt *buffer.ExtPacket, tp *Transla
if d.dependencyDescriptorID != 0 && tp.ddExtension != nil {
bytes, err := tp.ddExtension.Marshal()
if err != nil {
d.logger.Infow("marshalling dependency descriptor extension err", "err", err)
d.logger.Warnw("error marshalling dependency descriptor extension", err)
} else {
extension = append(extension, extensionData{
id: uint8(d.dependencyDescriptorID),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sfu/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (f *Forwarder) AllocateOptimal(brs Bitrates) VideoAllocation {
if bandwidthRequested == 0 && f.maxLayers.IsValid() {
// if we cannot allocate anything below max layer,
// look for a layer above. It is okay to overshoot
// in optimal allocation (i. e. no bandwidth restricstions).
// in optimal allocation (i.e. no bandwidth restrictions).
// It is possible that clients send only a higher layer.
// To accommodate cases like that, try finding a layer
// above the requested maximum to ensure streaming
Expand Down

0 comments on commit 53f51c8

Please sign in to comment.