Skip to content

Commit 9451efa

Browse files
committed
border router: traffic engineering (scheduling)
1 parent 5f3b0f8 commit 9451efa

File tree

10 files changed

+669
-68
lines changed

10 files changed

+669
-68
lines changed

go/pkg/router/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"//go/lib/util:go_default_library",
3030
"//go/pkg/router/bfd:go_default_library",
3131
"//go/pkg/router/control:go_default_library",
32+
"//go/pkg/router/te:go_default_library",
3233
"@com_github_google_gopacket//:go_default_library",
3334
"@com_github_google_gopacket//layers:go_default_library",
3435
"@com_github_prometheus_client_golang//prometheus:go_default_library",

go/pkg/router/dataplane.go

Lines changed: 116 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/scionproto/scion/go/lib/util"
5151
"github.com/scionproto/scion/go/pkg/router/bfd"
5252
"github.com/scionproto/scion/go/pkg/router/control"
53+
"github.com/scionproto/scion/go/pkg/router/te"
5354
)
5455

5556
const (
@@ -76,6 +77,7 @@ type bfdSession interface {
7677
// BatchConn is a connection that supports batch reads and writes.
7778
type BatchConn interface {
7879
ReadBatch(underlayconn.Messages) (int, error)
80+
WriteBatch(underlayconn.Messages) (int, error)
7981
WriteTo([]byte, *net.UDPAddr) (int, error)
8082
Close() error
8183
}
@@ -102,6 +104,8 @@ type DataPlane struct {
102104
running bool
103105
Metrics *Metrics
104106
forwardingMetrics map[uint16]forwardingMetrics
107+
TE bool
108+
queueMap map[BatchConn]*te.Queues
105109
}
106110

107111
var (
@@ -187,6 +191,8 @@ func (d *DataPlane) AddInternalInterface(conn BatchConn, ip net.IP) error {
187191
}
188192
d.internal = conn
189193
d.internalIP = ip
194+
195+
d.addQueues(conn)
190196
return nil
191197
}
192198

@@ -209,6 +215,8 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn) error {
209215
d.external = make(map[uint16]BatchConn)
210216
}
211217
d.external[ifID] = conn
218+
219+
d.addQueues(conn)
212220
return nil
213221
}
214222

@@ -278,6 +286,9 @@ func (d *DataPlane) AddExternalInterfaceBFD(ifID uint16, conn BatchConn,
278286
With(labels...),
279287
}
280288
}
289+
290+
d.addQueues(conn)
291+
281292
s := &bfdSend{
282293
conn: conn,
283294
srcAddr: src.Addr,
@@ -286,6 +297,7 @@ func (d *DataPlane) AddExternalInterfaceBFD(ifID uint16, conn BatchConn,
286297
dstIA: dst.IA,
287298
ifID: ifID,
288299
mac: d.macFactory(),
300+
d: d,
289301
}
290302
return d.addBFDController(ifID, s, cfg, m)
291303
}
@@ -427,6 +439,7 @@ func (d *DataPlane) AddNextHopBFD(ifID uint16, src, dst *net.UDPAddr, cfg contro
427439
dstIA: d.localIA,
428440
ifID: 0,
429441
mac: d.macFactory(),
442+
d: d,
430443
}
431444
return d.addBFDController(ifID, s, cfg, m)
432445
}
@@ -484,10 +497,7 @@ func (d *DataPlane) Run() error {
484497
if result.OutConn == nil { // e.g. BFD case no message is forwarded
485498
continue
486499
}
487-
_, err = result.OutConn.WriteTo(result.OutPkt, result.OutAddr)
488-
if err != nil {
489-
log.Debug("Error writing packet", "err", err)
490-
// error metric
500+
if !d.enqueue(&result) {
491501
continue
492502
}
493503
// ok metric
@@ -498,6 +508,37 @@ func (d *DataPlane) Run() error {
498508
}
499509
}
500510

511+
write := func(rd BatchConn) {
512+
defer log.HandlePanic()
513+
514+
myQueues, ok := d.queueMap[rd]
515+
if d.TE {
516+
myQueues.SetScheduler(te.SchedStrictPriority)
517+
} else {
518+
myQueues.SetScheduler(te.SchedOthersOnly)
519+
}
520+
if !ok {
521+
panic("Error getting queues for scheduling")
522+
}
523+
524+
for d.running {
525+
myQueues.WaitUntilNonempty()
526+
ms, err := myQueues.Schedule()
527+
if err != nil {
528+
log.Debug("Error scheduling packet", "err", err)
529+
}
530+
531+
if len(ms) > 0 {
532+
_, err = rd.WriteBatch(ms)
533+
534+
if err != nil {
535+
log.Debug("Error writing packet", "err", err)
536+
}
537+
myQueues.ReturnBuffers(ms)
538+
}
539+
}
540+
}
541+
501542
for k, v := range d.bfdSessions {
502543
go func(ifID uint16, c bfdSession) {
503544
defer log.HandlePanic()
@@ -506,12 +547,22 @@ func (d *DataPlane) Run() error {
506547
}
507548
}(k, v)
508549
}
550+
509551
for ifID, v := range d.external {
552+
go func(c BatchConn) {
553+
defer log.HandlePanic()
554+
write(c)
555+
}(v)
510556
go func(i uint16, c BatchConn) {
511557
defer log.HandlePanic()
512558
read(i, c)
513559
}(ifID, v)
514560
}
561+
562+
go func(c BatchConn) {
563+
defer log.HandlePanic()
564+
write(c)
565+
}(d.internal)
515566
go func(c BatchConn) {
516567
defer log.HandlePanic()
517568
read(0, c)
@@ -538,11 +589,43 @@ func (d *DataPlane) initMetrics() {
538589
}
539590
}
540591

592+
// addQueues creates new packet queues for the given connection.
593+
func (d *DataPlane) addQueues(conn BatchConn) {
594+
if d.queueMap == nil {
595+
d.queueMap = make(map[BatchConn]*te.Queues)
596+
}
597+
if _, ok := d.queueMap[conn]; !ok {
598+
d.queueMap[conn] = te.NewQueues(d.TE, bufSize)
599+
}
600+
}
601+
602+
// enqueue puts the processed packet into the queue of the correct router interface.
603+
func (d *DataPlane) enqueue(result *processResult) bool {
604+
otherConnectionQueues, ok := d.queueMap[result.OutConn]
605+
if !ok {
606+
log.Debug("Error finding queues for scheduling")
607+
return false
608+
}
609+
var cls te.TrafficClass
610+
if d.TE {
611+
cls = result.Class
612+
} else {
613+
cls = te.ClsOthers
614+
}
615+
err := otherConnectionQueues.Enqueue(cls, result.OutPkt, result.OutAddr)
616+
if err != nil {
617+
log.Debug("Enqueue failed", "err", err)
618+
return false
619+
}
620+
return true
621+
}
622+
541623
type processResult struct {
542624
EgressID uint16
543625
OutConn BatchConn
544626
OutAddr *net.UDPAddr
545627
OutPkt []byte
628+
Class te.TrafficClass
546629
}
547630

548631
func newPacketProcessor(d *DataPlane, ingressID uint16) *scionPacketProcessor {
@@ -708,6 +791,7 @@ func (p *scionPacketProcessor) processEPIC() (processResult, error) {
708791
}
709792
}
710793

794+
result.Class = te.ClsEpic
711795
return result, nil
712796
}
713797

@@ -1170,7 +1254,8 @@ func (p *scionPacketProcessor) process() (processResult, error) {
11701254
if err != nil {
11711255
return r, err
11721256
}
1173-
return processResult{OutConn: p.d.internal, OutAddr: a, OutPkt: p.rawPkt}, nil
1257+
return processResult{OutConn: p.d.internal, OutAddr: a,
1258+
OutPkt: p.rawPkt, Class: te.ClsScion}, nil
11741259
}
11751260

11761261
// Outbound: pkts leaving the local IA.
@@ -1199,12 +1284,14 @@ func (p *scionPacketProcessor) process() (processResult, error) {
11991284
if err := p.processEgress(); err != nil {
12001285
return processResult{}, err
12011286
}
1202-
return processResult{EgressID: egressID, OutConn: c, OutPkt: p.rawPkt}, nil
1287+
return processResult{EgressID: egressID, OutConn: c,
1288+
OutPkt: p.rawPkt, Class: te.ClsScion}, nil
12031289
}
12041290

12051291
// ASTransit: pkts leaving from another AS BR.
12061292
if a, ok := p.d.internalNextHops[egressID]; ok {
1207-
return processResult{OutConn: p.d.internal, OutAddr: a, OutPkt: p.rawPkt}, nil
1293+
return processResult{OutConn: p.d.internal, OutAddr: a,
1294+
OutPkt: p.rawPkt, Class: te.ClsScion}, nil
12081295
}
12091296
errCode := slayers.SCMPCodeUnknownHopFieldEgress
12101297
if !p.infoField.ConsDir {
@@ -1253,8 +1340,8 @@ func (p *scionPacketProcessor) processOHP() (processResult, error) {
12531340
// OHP should always be directed to the correct BR.
12541341
if c, ok := p.d.external[ohp.FirstHop.ConsEgress]; ok {
12551342
// buffer should already be correct
1256-
return processResult{EgressID: ohp.FirstHop.ConsEgress, OutConn: c, OutPkt: p.rawPkt},
1257-
nil
1343+
return processResult{EgressID: ohp.FirstHop.ConsEgress, OutConn: c, OutPkt: p.rawPkt,
1344+
Class: te.ClsOhp}, nil
12581345
}
12591346
// TODO parameter problem invalid interface
12601347
return processResult{}, serrors.WithCtx(cannotRoute, "type", "ohp",
@@ -1275,7 +1362,8 @@ func (p *scionPacketProcessor) processOHP() (processResult, error) {
12751362
if err != nil {
12761363
return processResult{}, err
12771364
}
1278-
return processResult{OutConn: p.d.internal, OutAddr: a, OutPkt: p.rawPkt}, nil
1365+
return processResult{OutConn: p.d.internal, OutAddr: a, OutPkt: p.rawPkt,
1366+
Class: te.ClsOhp}, nil
12791367
}
12801368

12811369
func (d *DataPlane) resolveLocalDst(s slayers.SCION) (*net.UDPAddr, error) {
@@ -1325,6 +1413,7 @@ type bfdSend struct {
13251413
srcIA, dstIA addr.IA
13261414
mac hash.Hash
13271415
ifID uint16
1416+
d *DataPlane
13281417
}
13291418

13301419
func (b *bfdSend) String() string {
@@ -1377,8 +1466,23 @@ func (b *bfdSend) Send(bfd *layers.BFD) error {
13771466
if err != nil {
13781467
return err
13791468
}
1380-
_, err = b.conn.WriteTo(buffer.Bytes(), b.dstAddr)
1381-
return err
1469+
1470+
d := b.d
1471+
myQueue, ok := d.queueMap[b.conn]
1472+
if !ok {
1473+
return serrors.New("Error finding queues for scheduling")
1474+
}
1475+
var cls te.TrafficClass
1476+
if d.TE {
1477+
cls = te.ClsBfd
1478+
} else {
1479+
cls = te.ClsOthers
1480+
}
1481+
err = myQueue.Enqueue(cls, buffer.Bytes(), b.dstAddr)
1482+
if err != nil {
1483+
return serrors.WrapStr("Enqueue failed", err)
1484+
}
1485+
return nil
13821486
}
13831487

13841488
type scmpPacker struct {

0 commit comments

Comments
 (0)