Skip to content

Commit

Permalink
Recover from retry on ICE restart (livekit#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
boks1971 authored Jul 2, 2022
1 parent 03b0a01 commit e7033a2
Showing 1 changed file with 64 additions and 41 deletions.
105 changes: 64 additions & 41 deletions pkg/rtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ type SimulcastTrackInfo struct {

// PCTransport is a wrapper around PeerConnection, with some helper methods
type PCTransport struct {
pc *webrtc.PeerConnection
me *webrtc.MediaEngine
params TransportParams
pc *webrtc.PeerConnection
me *webrtc.MediaEngine

lock sync.Mutex
pendingCandidates []webrtc.ICECandidateInit
Expand All @@ -63,8 +64,6 @@ type PCTransport struct {
// stream allocator for subscriber PC
streamAllocator *sfu.StreamAllocator

logger logger.Logger

previousAnswer *webrtc.SessionDescription
}

Expand Down Expand Up @@ -181,47 +180,58 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat
}

func NewPCTransport(params TransportParams) (*PCTransport, error) {
var bwe cc.BandwidthEstimator
pc, me, err := newPeerConnection(params, func(estimator cc.BandwidthEstimator) {
bwe = estimator
})
if err != nil {
return nil, err
}

t := &PCTransport{
pc: pc,
me: me,
params: params,
debouncedNegotiate: debounce.New(negotiationFrequency),
negotiationState: negotiationStateNone,
logger: params.Logger,
}
if params.Target == livekit.SignalTarget_SUBSCRIBER {
t.streamAllocator = sfu.NewStreamAllocator(sfu.StreamAllocatorParams{
Config: params.CongestionControlConfig,
Logger: params.Logger,
})
t.streamAllocator.Start()
if bwe != nil {
t.streamAllocator.SetBandwidthEstimator(bwe)
}
}

if err := t.createPeerConnection(); err != nil {
return nil, err
}

return t, nil
}

func (t *PCTransport) createPeerConnection() error {
var bwe cc.BandwidthEstimator
pc, me, err := newPeerConnection(t.params, func(estimator cc.BandwidthEstimator) {
bwe = estimator
})
if err != nil {
return err
}

t.pc = pc
t.pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
if state == webrtc.ICEGathererStateComplete {
go func() {
t.lock.Lock()
defer t.lock.Unlock()
if t.restartAfterGathering {
params.Logger.Debugw("restarting ICE after ICE gathering")
t.params.Logger.Debugw("restarting ICE after ICE gathering")
if err := t.createAndSendOffer(&webrtc.OfferOptions{ICERestart: true}); err != nil {
params.Logger.Warnw("could not restart ICE", err)
t.params.Logger.Warnw("could not restart ICE", err)
}
}
}()
}
})

return t, nil
t.me = me

if bwe != nil && t.streamAllocator != nil {
t.streamAllocator.SetBandwidthEstimator(bwe)
}

return nil
}

func (t *PCTransport) AddICECandidate(candidate webrtc.ICECandidateInit) error {
Expand All @@ -232,7 +242,7 @@ func (t *PCTransport) AddICECandidate(candidate webrtc.ICECandidateInit) error {
return nil
}

t.logger.Debugw("add candidate ", "candidate", candidate.Candidate)
t.params.Logger.Debugw("add candidate ", "candidate", candidate.Candidate)

return t.pc.AddICECandidate(candidate)
}
Expand Down Expand Up @@ -275,9 +285,9 @@ func (t *PCTransport) SetRemoteDescription(sd webrtc.SessionDescription) error {

// only initiate when we are the offerer
if lastState == negotiationRetry && sd.Type == webrtc.SDPTypeAnswer {
t.logger.Debugw("re-negotiate after answering")
t.params.Logger.Debugw("re-negotiate after receiving answer")
if err := t.createAndSendOffer(nil); err != nil {
t.logger.Errorw("could not negotiate", err)
t.params.Logger.Errorw("could not negotiate", err)
}
}
return nil
Expand All @@ -294,12 +304,12 @@ func (t *PCTransport) Negotiate(force bool) {
// no op to cancel pending negotiation
})
if err := t.CreateAndSendOffer(nil); err != nil {
t.logger.Errorw("could not negotiate", err)
t.params.Logger.Errorw("could not negotiate", err)
}
} else {
t.debouncedNegotiate(func() {
if err := t.CreateAndSendOffer(nil); err != nil {
t.logger.Errorw("could not negotiate", err)
t.params.Logger.Errorw("could not negotiate", err)
}
})
}
Expand All @@ -325,30 +335,43 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
// if restart is requested, and we are not ready, then continue afterwards
if iceRestart {
if t.pc.ICEGatheringState() == webrtc.ICEGatheringStateGathering {
t.logger.Debugw("restart ICE after gathering")
t.params.Logger.Debugw("restart ICE after gathering")
t.restartAfterGathering = true
return nil
}
t.logger.Debugw("restarting ICE")
t.params.Logger.Debugw("restarting ICE")
}

// when there's an ongoing negotiation, let it finish and not disrupt its state
if t.negotiationState == negotiationStateClient {
if iceRestart && t.negotiationState != negotiationStateNone {
currentSD := t.pc.CurrentRemoteDescription()
if iceRestart && currentSD != nil {
t.logger.Debugw("recovering from client negotiation state")
if currentSD == nil {
// never received an answer from client, create a new peer connection
t.params.Logger.Infow("no previous answer from client on ICE restart, creating a new PC")
t.pc.Close()
if err := t.createPeerConnection(); err != nil {
t.params.Logger.Errorw("could not create new PC on ICE restart", err)
return err
} else {
options.ICERestart = false
}
} else {
// recover by re-applying the last answer
t.params.Logger.Infow("recovering from client negotiation state on ICE restart")
if err := t.pc.SetRemoteDescription(*currentSD); err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "remote_description").Add(1)
return err
}
} else {
t.logger.Debugw("skipping negotiation, trying again later")
}
} else {
// when there's an ongoing negotiation, let it finish and not disrupt its state
if t.negotiationState == negotiationStateClient {
t.params.Logger.Infow("skipping negotiation, trying again later")
t.negotiationState = negotiationRetry
return nil
} else if t.negotiationState == negotiationRetry {
// already set to retry, we can safely skip this attempt
return nil
}
} else if t.negotiationState == negotiationRetry {
// already set to retry, we can safely skip this attempt
return nil
}

if t.previousAnswer != nil {
Expand All @@ -362,14 +385,14 @@ func (t *PCTransport) createAndSendOffer(options *webrtc.OfferOptions) error {
offer, err := t.pc.CreateOffer(options)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "create").Add(1)
t.logger.Errorw("could not create offer", err)
t.params.Logger.Errorw("could not create offer", err)
return err
}

err = t.pc.SetLocalDescription(offer)
if err != nil {
prometheus.ServiceOperationCounter.WithLabelValues("offer", "error", "local_description").Add(1)
t.logger.Errorw("could not set local description", err)
t.params.Logger.Errorw("could not set local description", err)
return err
}

Expand Down Expand Up @@ -471,7 +494,7 @@ func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDesc
// because sdp can negotiate multi times before migration.(it will sticky to the last m-line atfirst negotiate)
// so use a dumb pc to negotiate sdp to fixed the datachannel's mid at same position with previous answer
if err := t.preparePC(previousAnswer); err != nil {
t.logger.Errorw("prepare pc for migration failed", err)
t.params.Logger.Errorw("prepare pc for migration failed", err)
return err
}
continue
Expand Down Expand Up @@ -526,7 +549,7 @@ func (t *PCTransport) SetPreviousAnswer(answer *webrtc.SessionDescription) {
if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
t.previousAnswer = answer
if err := t.initPCWithPreviousAnswer(*t.previousAnswer); err != nil {
t.logger.Errorw("initPCWithPreviousAnswer failed", err)
t.params.Logger.Errorw("initPCWithPreviousAnswer failed", err)
}
}
}
Expand Down

0 comments on commit e7033a2

Please sign in to comment.