Skip to content

Commit

Permalink
Uniform DNS Interception (#5392)
Browse files Browse the repository at this point in the history
1. Change to use ct_state to match the DNS responses that need
further interception.
2. Add OF meter for DNS interception.

Signed-off-by: graysonwu <wgrayson@vmware.com>
  • Loading branch information
GraysonWu authored Aug 23, 2023
1 parent 1bb4085 commit 72bc791
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 77 deletions.
7 changes: 4 additions & 3 deletions pkg/agent/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ const (
metricNamespaceAntrea = "antrea"
metricSubsystemAgent = "agent"

LabelPacketInMeterNetworkPolicy = "PacketInMeterNetworkPolicy"
LabelPacketInMeterTraceflow = "PacketInMeterTraceflow"
LabelPacketInMeterNetworkPolicy = "PacketInMeterNetworkPolicy"
LabelPacketInMeterTraceflow = "PacketInMeterTraceflow"
LabelPacketInMeterDNSInterception = "PacketInMeterDNSInterception"
)

var (
Expand Down Expand Up @@ -240,7 +241,7 @@ func InitializeOVSMetrics() {
OVSFlowOpsErrorCount.WithLabelValues(ops)
OVSFlowOpsLatency.WithLabelValues(ops)
}
for _, label := range []string{LabelPacketInMeterNetworkPolicy, LabelPacketInMeterTraceflow} {
for _, label := range []string{LabelPacketInMeterNetworkPolicy, LabelPacketInMeterTraceflow, LabelPacketInMeterDNSInterception} {
OVSMeterPacketDroppedCount.WithLabelValues(label)
}
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,9 @@ func (c *client) initialize() error {
if err := c.genPacketInMeter(PacketInMeterIDTF, PacketInMeterRateTF).Add(); err != nil {
return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for TraceFlow packet-in rate limiting: %v", PacketInMeterIDTF, PacketInMeterRateTF, err)
}
if err := c.genPacketInMeter(PacketInMeterIDDNS, PacketInMeterRateDNS).Add(); err != nil {
return fmt.Errorf("failed to install OpenFlow meter entry (meterID:%d, rate:%d) for DNS interception packet-in rate limiting: %v", PacketInMeterIDDNS, PacketInMeterRateDNS, err)
}
}

for _, activeFeature := range c.activatedFeatures {
Expand Down Expand Up @@ -1561,6 +1564,8 @@ func (c *client) getMeterStats() {
metrics.OVSMeterPacketDroppedCount.WithLabelValues(metrics.LabelPacketInMeterNetworkPolicy).Set(float64(packetCount))
case PacketInMeterIDTF:
metrics.OVSMeterPacketDroppedCount.WithLabelValues(metrics.LabelPacketInMeterTraceflow).Set(float64(packetCount))
case PacketInMeterIDDNS:
metrics.OVSMeterPacketDroppedCount.WithLabelValues(metrics.LabelPacketInMeterDNSInterception).Set(float64(packetCount))
default:
klog.V(4).InfoS("Received unexpected meterID", "meterID", meterID)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2611,6 +2611,7 @@ func Test_client_ReplayFlows(t *testing.T) {
}{
{id: PacketInMeterIDNP, rate: PacketInMeterRateNP},
{id: PacketInMeterIDTF, rate: PacketInMeterRateTF},
{id: PacketInMeterIDDNS, rate: PacketInMeterRateDNS},
} {
meter := ovsoftest.NewMockMeter(ctrl)
meterBuilder := ovsoftest.NewMockMeterBandBuilder(ctrl)
Expand Down
86 changes: 51 additions & 35 deletions pkg/agent/openflow/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ var (
MatchLabelID = types.NewMatchKey(binding.ProtocolIP, types.LabelIDAddr, "tun_id")
MatchTCPFlags = types.NewMatchKey(binding.ProtocolTCP, types.TCPFlagsAddr, "tcp_flags")
MatchTCPv6Flags = types.NewMatchKey(binding.ProtocolTCPv6, types.TCPFlagsAddr, "tcp_flags")
Unsupported = types.NewMatchKey(binding.ProtocolIP, types.UnSupported, "unknown")
// MatchCTState should be used with ct_state condition as matchValue.
// MatchValue example: `+rpl+trk`.
MatchCTState = types.NewMatchKey(binding.ProtocolIP, types.CTStateAddr, "ct_state")
Unsupported = types.NewMatchKey(binding.ProtocolIP, types.UnSupported, "unknown")

// metricFlowIdentifier is used to identify metric flows in metric table.
// There could be other flows like default flow and Traceflow flows in the table. Only metric flows are supposed to
// have normal priority.
metricFlowIdentifier = fmt.Sprintf("priority=%d,", priorityNormal)

protocolUDP = v1beta2.ProtocolUDP
protocolTCP = v1beta2.ProtocolTCP
dnsPort = int32(53)
)
Expand Down Expand Up @@ -706,53 +708,67 @@ func (c *client) NewDNSPacketInConjunction(id uint32) error {
if err := c.ofEntryOperations.AddAll(conj.actionFlows); err != nil {
return fmt.Errorf("error when adding action flows for the DNS conjunction: %w", err)
}
udpService := v1beta2.Service{
Protocol: &protocolUDP,
SrcPort: &dnsPort,
}

dnsPriority := priorityDNSIntercept
dnsCTState := &openflow15.CTStates{
// Use ct_state=+trk+rpl as matching condition.
// CTState bit-state map:
// dnat | snat | trk | inv | rpl | rel | est | new
Data: 0b00101000,
Mask: 0b00101000,
}
dnsPortMatchValue := types.BitRange{Value: uint16(dnsPort)}

conj.serviceClause = conj.newClause(1, 2, getTableByID(conj.ruleTableID), nil)
conj.toClause = conj.newClause(2, 2, getTableByID(conj.ruleTableID), nil)
c.featureNetworkPolicy.conjMatchFlowLock.Lock()
defer c.featureNetworkPolicy.conjMatchFlowLock.Unlock()
ctxChanges := conj.serviceClause.addServiceFlows(c.featureNetworkPolicy, []v1beta2.Service{udpService}, &dnsPriority, false)

tcpFlags := TCPFlags{
// URG|ACK|PSH|RST|SYN|FIN|
Flag: 0b011000,
Mask: 0b011000,
}
tcpDNSPort := types.BitRange{Value: uint16(dnsPort)}
var ctxChanges []*conjMatchFlowContextChange
for _, proto := range c.featureNetworkPolicy.ipProtocols {
tcpServiceMatch := &conjunctiveMatch{
tcpMatch := &conjunctiveMatch{
tableID: conj.serviceClause.ruleTable.GetID(),
priority: &dnsPriority,
}
if proto == binding.ProtocolIP {
tcpServiceMatch.matchPairs = []matchPair{
matchPairs: []matchPair{
{
matchKey: MatchTCPSrcPort,
matchValue: tcpDNSPort,
matchKey: MatchCTState,
matchValue: dnsCTState,
},
},
}
udpMatch := &conjunctiveMatch{
tableID: conj.serviceClause.ruleTable.GetID(),
priority: &dnsPriority,
matchPairs: []matchPair{
// Add CTState for UDP as well to make sure only solicited DNS responses are sent
// to userspace.
{
matchKey: MatchTCPFlags,
matchValue: tcpFlags,
matchKey: MatchCTState,
matchValue: dnsCTState,
},
}
},
}
if proto == binding.ProtocolIP {
tcpMatch.matchPairs = append(tcpMatch.matchPairs, matchPair{
matchKey: MatchTCPSrcPort,
matchValue: dnsPortMatchValue,
})
udpMatch.matchPairs = append(udpMatch.matchPairs, matchPair{
matchKey: MatchUDPSrcPort,
matchValue: dnsPortMatchValue,
})
} else if proto == binding.ProtocolIPv6 {
tcpServiceMatch.matchPairs = []matchPair{
{
matchKey: MatchTCPv6SrcPort,
matchValue: tcpDNSPort,
},
{
matchKey: MatchTCPv6Flags,
matchValue: tcpFlags,
},
}
tcpMatch.matchPairs = append(tcpMatch.matchPairs, matchPair{
matchKey: MatchTCPv6SrcPort,
matchValue: dnsPortMatchValue,
})
udpMatch.matchPairs = append(udpMatch.matchPairs, matchPair{
matchKey: MatchUDPv6SrcPort,
matchValue: dnsPortMatchValue,
})
}
ctxChange := conj.serviceClause.addConjunctiveMatchFlow(c.featureNetworkPolicy, tcpServiceMatch, false, false)
ctxChanges = append(ctxChanges, ctxChange)
tcpCtxChange := conj.serviceClause.addConjunctiveMatchFlow(c.featureNetworkPolicy, tcpMatch, false, false)
udpCtxChange := conj.serviceClause.addConjunctiveMatchFlow(c.featureNetworkPolicy, udpMatch, false, false)
ctxChanges = append(ctxChanges, tcpCtxChange, udpCtxChange)
}
if err := c.featureNetworkPolicy.applyConjunctiveMatchFlows(ctxChanges); err != nil {
return err
Expand Down
85 changes: 56 additions & 29 deletions pkg/agent/openflow/network_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,47 @@ func Test_featureNetworkPolicy_initFlows(t *testing.T) {
}

func Test_NewDNSPacketInConjunction(t *testing.T) {
ovsMetersSupported := ovsMetersAreSupported()
ipv4ExpFlows := []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)",
}
if ovsMetersSupported {
ipv4ExpFlows = []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)",
}
}
ipv6ExpFlows := []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)",
}
if ovsMetersSupported {
ipv6ExpFlows = []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)",
}
}
dsExpFlows := []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)",
}
if ovsMetersSupported {
dsExpFlows = []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=meter:3,controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,udp6,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,ct_state=+rpl+trk,tcp6,tp_src=53 actions=conjunction(1,1/2)",
}
}
for _, tc := range []struct {
name string
enableIPv4 bool
Expand All @@ -1455,39 +1496,25 @@ func Test_NewDNSPacketInConjunction(t *testing.T) {
expectedFlows []string
}{
{
name: "IPv4 only",
enableIPv4: true,
enableIPv6: false,
conjID: 1,
expectedFlows: []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)",
},
name: "IPv4 only",
enableIPv4: true,
enableIPv6: false,
conjID: 1,
expectedFlows: ipv4ExpFlows,
},
{
name: "IPv6 only",
enableIPv4: false,
enableIPv6: true,
conjID: 1,
expectedFlows: []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp6,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp6,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)",
},
name: "IPv6 only",
enableIPv4: false,
enableIPv6: true,
conjID: 1,
expectedFlows: ipv6ExpFlows,
},
{
name: "dual stack",
enableIPv4: true,
enableIPv6: true,
conjID: 1,
expectedFlows: []string{
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,conj_id=1 actions=controller(id=32776,reason=no_match,userdata=02,max_len=128),goto_table:IngressMetric",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,udp6,tp_src=53 actions=conjunction(1,1/2)",
"cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=64991,tcp6,tp_src=53,tcp_flags=+psh+ack actions=conjunction(1,1/2)",
},
name: "dual stack",
enableIPv4: true,
enableIPv6: true,
conjID: 1,
expectedFlows: dsExpFlows,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ const (

// We use OpenFlow Meter for packetIn rate limiting on OVS side.
// Meter Entry ID.
PacketInMeterIDNP = 1
PacketInMeterIDTF = 2
PacketInMeterIDNP = 1
PacketInMeterIDTF = 2
PacketInMeterIDDNS = 3
// Meter Entry Rate. It is represented as number of events per second.
// Packets which exceed the rate will be dropped.
PacketInMeterRateNP = 100
PacketInMeterRateTF = 100
PacketInMeterRateNP = 100
PacketInMeterRateTF = 100
PacketInMeterRateDNS = 100

// PacketInQueueSize defines the size of PacketInQueue.
// When PacketInQueue reaches PacketInQueueSize, new packetIn will be dropped.
Expand Down
18 changes: 12 additions & 6 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2094,6 +2094,9 @@ func (f *featureNetworkPolicy) addFlowMatch(fb binding.FlowBuilder, matchKey *ty
fb = fb.MatchProtocol(matchKey.GetOFProtocol())
tcpFlag := matchValue.(TCPFlags)
fb = fb.MatchTCPFlags(tcpFlag.Flag, tcpFlag.Mask)
case MatchCTState:
ctState := matchValue.(*openflow15.CTStates)
fb = fb.MatchCTState(ctState)
}
return fb
}
Expand Down Expand Up @@ -2177,13 +2180,16 @@ func (f *featureNetworkPolicy) multiClusterNetworkPolicySecurityDropFlow(table b
// dnsPacketInFlow generates the flow to send dns response packets of fqdn policy selected Pods to the fqdnController for
// processing.
func (f *featureNetworkPolicy) dnsPacketInFlow(conjunctionID uint32) binding.Flow {
return AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priorityDNSIntercept).
fb := AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priorityDNSIntercept).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchConjID(conjunctionID).
// FQDN should pause DNS response packets and send them to the controller. After
// the controller processes DNS response packets, like creating related flows in
// the OVS or no operations are needed, the controller will resume those packets.
Action().SendToController([]byte{uint8(PacketInCategoryDNS)}, true).
MatchConjID(conjunctionID)
if f.ovsMetersAreSupported {
fb = fb.Action().Meter(PacketInMeterIDDNS)
}
// FQDN should pause DNS response packets and send them to the controller. After
// the controller processes DNS response packets, like creating related flows in
// the OVS or no operations are needed, the controller will resume those packets.
return fb.Action().SendToController([]byte{uint8(PacketInCategoryDNS)}, true).
Action().GotoTable(IngressMetricTable.GetID()).
Done()
}
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/types/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
IGMPAddr
LabelIDAddr
TCPFlagsAddr
CTStateAddr
UnSupported
)

Expand Down
1 change: 1 addition & 0 deletions pkg/ovs/openflow/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ type FlowBuilder interface {
MatchARPTpa(ip net.IP) FlowBuilder
MatchARPOp(op uint16) FlowBuilder
MatchIPDSCP(dscp uint8) FlowBuilder
MatchCTState(ctStates *openflow15.CTStates) FlowBuilder
MatchCTStateNew(isSet bool) FlowBuilder
MatchCTStateRel(isSet bool) FlowBuilder
MatchCTStateRpl(isSet bool) FlowBuilder
Expand Down
5 changes: 5 additions & 0 deletions pkg/ovs/openflow/ofctrl_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (b *ofFlowBuilder) MatchRegFieldWithValue(field *RegField, data uint32) Flo
return b.matchRegRange(field.regID, data, field.rng)
}

func (b *ofFlowBuilder) MatchCTState(ctStates *openflow15.CTStates) FlowBuilder {
b.ctStates = ctStates
return b
}

func (b *ofFlowBuilder) MatchCTStateNew(set bool) FlowBuilder {
if b.ctStates == nil {
b.ctStates = openflow15.NewCTStates()
Expand Down
14 changes: 14 additions & 0 deletions pkg/ovs/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 72bc791

Please sign in to comment.