Skip to content

Commit

Permalink
[MM-54335] Ceiling tests improvements (#141)
Browse files Browse the repository at this point in the history
* Increase UDP send concurrency

* Bump pion/webrtc

* Print only once

* Measure track writes time

* Track write channel full error

* Update tests

* Expose instant CPU load

* Make sampling non-blocking

* Bump channels sizes

* Bump channels size further
  • Loading branch information
streamer45 authored May 23, 2024
1 parent 6de0290 commit a21f616
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 55 deletions.
26 changes: 13 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/mattermost/mattermost/server/public v0.0.12
github.com/pborman/uuid v1.2.1
github.com/pion/ice/v2 v2.3.14
github.com/pion/interceptor v0.1.28
github.com/pion/ice/v2 v2.3.24
github.com/pion/interceptor v0.1.29
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.14
github.com/pion/rtp v1.8.5
github.com/pion/rtp v1.8.6
github.com/pion/stun v0.6.1
github.com/pion/webrtc/v3 v3.2.37
github.com/pion/webrtc/v3 v3.2.40
github.com/prometheus/client_golang v1.15.0
github.com/prometheus/procfs v0.9.0
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
golang.org/x/crypto v0.22.0
golang.org/x/sys v0.19.0
golang.org/x/crypto v0.23.0
golang.org/x/sys v0.20.0
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
)

replace github.com/pion/interceptor v0.1.28 => github.com/streamer45/interceptor v0.0.0-20240411210059-c7d42d2dafc1
replace github.com/pion/interceptor v0.1.29 => github.com/streamer45/interceptor v0.0.0-20240411210059-c7d42d2dafc1

require (
github.com/abcum/lcp v0.0.0-20201209214815-7a3f3840be81 // indirect
Expand All @@ -48,14 +48,14 @@ require (
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.15 // indirect
github.com/pion/sctp v1.8.16 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v2 v2.1.5 // indirect
github.com/pion/transport/v2 v2.2.5 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/plar/go-adaptive-radix-tree v1.0.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -68,8 +68,8 @@ require (
github.com/wiggin77/merror v1.0.5 // indirect
github.com/wiggin77/srslog v1.0.1 // indirect
golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
42 changes: 22 additions & 20 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2
github.com/pion/datachannel v1.5.6 h1:1IxKJntfSlYkpUj8LlYRSWpYiTTC02nUrOE8T3DqGeg=
github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNIVb/NfGW4=
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA=
github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.14 h1:A7UaEmalw12Fko8YO0qguUbWyE69BnN4mDEqT7cLWQI=
github.com/pion/ice/v2 v2.3.14/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
Expand All @@ -332,11 +332,11 @@ github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.5 h1:uYzINfaK+9yWs7r537z/Rc1SvT8ILjBcmDOpJcTB+OU=
github.com/pion/rtp v1.8.5/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.6 h1:MTmn/b0aWWsAzux2AmP8WGllusBVw4NPYPVFFd7jUPw=
github.com/pion/rtp v1.8.6/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.13/go.mod h1:YKSgO/bO/6aOMP9LCie1DuD7m+GamiK2yIiPM6vH+GA=
github.com/pion/sctp v1.8.15 h1:Eig4+c0KUtlftepSYQeFjeeG6A/UhsK+NPVSxUf4Mmc=
github.com/pion/sctp v1.8.15/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY=
github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY=
github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M=
github.com/pion/srtp/v2 v2.0.18 h1:vKpAXfawO9RtTRKZJbG4y0v1b11NZxQnxRl85kGuUlo=
Expand All @@ -346,16 +346,17 @@ github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.2/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc=
github.com/pion/transport/v2 v2.2.3/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.4 h1:41JJK6DZQYSeVLxILA2+F4ZkKb4Xd/tFJZRFZQ9QAlo=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v2 v2.2.5 h1:iyi25i/21gQck4hfRhomF6SktmUQjRsRW4WJdhfc3Kc=
github.com/pion/transport/v2 v2.2.5/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4=
github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0=
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/turn/v2 v2.1.5 h1:tTyy7TM3DCoX9IxTt/yHc/bThiRLyXK3T1YbNcgx9k4=
github.com/pion/turn/v2 v2.1.5/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/webrtc/v3 v3.2.37 h1:iKe2Ufu4g94KBRy63fzWRU5ufOpE+RIw05M9TkK/dzk=
github.com/pion/webrtc/v3 v3.2.37/go.mod h1:wWQz1PuKNSNK4VrJJNpPN3vZmKEi4zA6i2ynaQOlxIU=
github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc=
github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/webrtc/v3 v3.2.40 h1:Wtfi6AZMQg+624cvCXUuSmrKWepSB7zfgYDOYqsSOVU=
github.com/pion/webrtc/v3 v3.2.40/go.mod h1:M1RAe3TNTD1tzyvqHrbVODfwdPGSXOUo/OgpoGGJqFY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -529,8 +530,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -626,8 +627,8 @@ golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down Expand Up @@ -718,8 +719,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand All @@ -742,8 +743,9 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
4 changes: 3 additions & 1 deletion service/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,9 +770,11 @@ func TestClientGetSystemInfo(t *testing.T) {
defer c.Close()

t.Run("success", func(t *testing.T) {
// Give enough time to collect a sample.
time.Sleep(2 * time.Second)
info, err := c.GetSystemInfo()
require.NoError(t, err)
require.NotEmpty(t, info)
require.NotEmpty(t, info)
require.NotZero(t, info.CPULoad)
})
}
16 changes: 16 additions & 0 deletions service/perf/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Metrics struct {
registry *prometheus.Registry

RTPTracks *prometheus.GaugeVec
RTPTrackWrites *prometheus.HistogramVec
RTCSessions *prometheus.GaugeVec
RTCConnStateCounters *prometheus.CounterVec
RTCErrors *prometheus.CounterVec
Expand Down Expand Up @@ -52,6 +53,17 @@ func NewMetrics(namespace string, registry *prometheus.Registry) *Metrics {
)
m.registry.MustRegister(m.RTPTracks)

m.RTPTrackWrites = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: metricsSubSystemRTC,
Name: "rtp_tracks_writes_time",
Help: "Time taken to write to RTP tracks",
},
[]string{"groupID", "type"},
)
m.registry.MustRegister(m.RTPTrackWrites)

m.RTCSessions = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -149,3 +161,7 @@ func (m *Metrics) IncWSMessages(clientID, msgType, direction string) {
func (m *Metrics) Handler() http.Handler {
return promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{})
}

func (m *Metrics) ObserveRTPTracksWrite(groupID, trackType string, dur float64) {
m.RTPTrackWrites.With(prometheus.Labels{"groupID": groupID, "type": trackType}).Observe(dur)
}
1 change: 1 addition & 0 deletions service/rtc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ type Metrics interface {
IncRTCErrors(groupID string, errType string)
IncRTPTracks(groupID string, direction, trackType string)
DecRTPTracks(groupID string, direction, trackType string)
ObserveRTPTracksWrite(groupID, trackType string, dur float64)
}
17 changes: 14 additions & 3 deletions service/rtc/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ const (
tcpSocketWriteBufferSize = 1024 * 1024 * 4 // 4MB
)

func getUDPListeningSocketsCount() int {
// Originally we used runtime.NumCPU() but increased it as a result of v1 ceiling tests.
// The reason is that having just a few sockets caused significant lock contentions on
// the underlying file descriptors (at WriteToInet4 in internal/poll/fd_unix.go).
return runtime.NumCPU() * 100
}

// getSystemIPs returns a list of all the available local addresses.
func getSystemIPs(log mlog.LoggerIFace, dualStack bool) ([]netip.Addr, error) {
var ips []netip.Addr
Expand Down Expand Up @@ -74,7 +81,7 @@ func getSystemIPs(log mlog.LoggerIFace, dualStack bool) ([]netip.Addr, error) {
func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string) ([]net.PacketConn, error) {
var conns []net.PacketConn

for i := 0; i < runtime.NumCPU(); i++ {
for i := 0; i < getUDPListeningSocketsCount(); i++ {
listenConfig := net.ListenConfig{
Control: func(_, _ string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) {
Expand All @@ -97,7 +104,9 @@ func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string)
return nil, fmt.Errorf("failed to listen on udp: %w", err)
}

log.Info(fmt.Sprintf("rtc: server is listening on udp %s", listenAddress))
if i == 0 {
log.Info(fmt.Sprintf("rtc: server is listening on udp %s", listenAddress))
}

if err := udpConn.(*net.UDPConn).SetWriteBuffer(udpSocketBufferSize); err != nil {
log.Warn("rtc: failed to set udp send buffer", mlog.Err(err))
Expand Down Expand Up @@ -128,7 +137,9 @@ func createUDPConnsForAddr(log mlog.LoggerIFace, network, listenAddress string)
log.Error("failed to get buffer size", mlog.Err(err))
return
}
log.Debug("rtc: udp buffers", mlog.Int("writeBufSize", writeBufSize), mlog.Int("readBufSize", readBufSize))
if i == 0 {
log.Debug("rtc: udp buffers", mlog.Int("writeBufSize", writeBufSize), mlog.Int("readBufSize", readBufSize))
}
})
if err != nil {
return nil, fmt.Errorf("Control call failed: %w", err)
Expand Down
5 changes: 2 additions & 3 deletions service/rtc/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package rtc
import (
"net/netip"
"os"
"runtime"
"testing"

"github.com/mattermost/mattermost/server/public/shared/mlog"
Expand Down Expand Up @@ -74,7 +73,7 @@ func TestCreateUDPConnsForAddr(t *testing.T) {
for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp4", netip.AddrPortFrom(ip, 30443).String())
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
require.Len(t, conns, getUDPListeningSocketsCount())
for _, conn := range conns {
require.NoError(t, conn.Close())
}
Expand All @@ -94,7 +93,7 @@ func TestCreateUDPConnsForAddr(t *testing.T) {
for _, ip := range ips {
conns, err := createUDPConnsForAddr(log, "udp", netip.AddrPortFrom(ip, 30443).String())
require.NoError(t, err)
require.Len(t, conns, runtime.NumCPU())
require.Len(t, conns, getUDPListeningSocketsCount())
for _, conn := range conns {
require.NoError(t, conn.Close())
}
Expand Down
2 changes: 1 addition & 1 deletion service/rtc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
msgChSize = 512
msgChSize = 2000
signalingTimeout = 10 * time.Second
catchAllIP = "0.0.0.0"
)
Expand Down
8 changes: 7 additions & 1 deletion service/rtc/sfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
const (
nackResponderBufferSize = 256
audioLevelExtensionURI = "urn:ietf:params:rtp-hdrext:ssrc-audio-level"
writerQueueSize = 100
writerQueueSize = 200 // Enough to hold up to one second of video packets.
)

func (s *Server) initSettingEngine() (webrtc.SettingEngine, error) {
Expand Down Expand Up @@ -469,12 +469,14 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error {
}
}

writeStartTime := time.Now()
if err := outAudioTrack.WriteRTP(packet); err != nil && !errors.Is(err, io.ErrClosedPipe) {
s.log.Error("failed to write RTP packet",
mlog.Err(err), mlog.String("sessionID", us.cfg.SessionID))
s.metrics.IncRTCErrors(us.cfg.GroupID, "rtp")
return
}
s.metrics.ObserveRTPTracksWrite(us.cfg.GroupID, string(trackType), time.Since(writeStartTime).Seconds())
}
} else if params, ok := rtpVideoCodecs[trackMimeType]; ok {
if screenStreamID != "" && screenStreamID != streamID {
Expand Down Expand Up @@ -556,11 +558,14 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error {

writeTrack := func(writerCh <-chan *rtp.Packet, outTrack *webrtc.TrackLocalStaticRTP) {
for pkt := range writerCh {
writeStartTime := time.Now()
if err := outTrack.WriteRTP(pkt); err != nil && !errors.Is(err, io.ErrClosedPipe) {
s.log.Error("failed to write RTP packet",
mlog.Err(err), mlog.String("sessionID", us.cfg.SessionID))
s.metrics.IncRTCErrors(us.cfg.GroupID, "rtp")
continue
}
s.metrics.ObserveRTPTracksWrite(us.cfg.GroupID, string(trackTypeScreen), time.Since(writeStartTime).Seconds())
}
}

Expand Down Expand Up @@ -605,6 +610,7 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error {
case writerCh <- &pkt:
default:
s.log.Error("failed to write RTP packet to writer channel", mlog.String("trackID", outScreenTracks[i].ID()))
s.metrics.IncRTCErrors(us.cfg.GroupID, "rtp")
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Service struct {
auth *auth.Service
metrics *perf.Metrics
proc procfs.FS
systemInfo SystemInfo
log *mlog.Logger
sessionCache *auth.SessionCache
// connMap maps user sessions to the websocket connection they originated
Expand All @@ -42,6 +43,7 @@ type Service struct {
// intra-cluster messaging layer that can introduce race conditions.
connMap map[string]string
mut sync.RWMutex
stopCh chan struct{}
}

func New(cfg Config) (*Service, error) {
Expand All @@ -53,6 +55,7 @@ func New(cfg Config) (*Service, error) {
cfg: cfg,
metrics: perf.NewMetrics("rtcd", nil),
connMap: map[string]string{},
stopCh: make(chan struct{}),
}

var err error
Expand All @@ -69,6 +72,8 @@ func New(cfg Config) (*Service, error) {
}
s.proc = proc

go s.collectSystemInfo()

s.store, err = store.New(cfg.Store.DataSource)
if err != nil {
return nil, fmt.Errorf("failed to create store: %w", err)
Expand Down Expand Up @@ -200,6 +205,8 @@ func (s *Service) Stop() error {
defer s.log.Flush()
s.log.Info("rtcd: shutting down")

close(s.stopCh)

if err := s.rtcServer.Stop(); err != nil {
return fmt.Errorf("failed to stop rtc server: %w", err)
}
Expand Down
Loading

0 comments on commit a21f616

Please sign in to comment.