Skip to content

Commit

Permalink
Change flow export mechanism for Flow Aggregator
Browse files Browse the repository at this point in the history
Introduce active and inactive flow timeout configuration
knobs.
With active flow record timeout, every flow record is sent to the
collector based on its own active expiry timeout value.
With inactive flow record timeout, a flow record is sent to the
collector if no records are seen by the flow aggregator for the flow,
since it was last updated.

Used priority queue approach to keep track of individual expiry values
for records.
  • Loading branch information
srikartati committed Mar 25, 2021
1 parent a9ec050 commit 886c22f
Show file tree
Hide file tree
Showing 9 changed files with 526 additions and 179 deletions.
6 changes: 3 additions & 3 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -1381,8 +1381,8 @@ func (c *client) conjunctionActionFlow(conjunctionID uint32, tableID binding.Tab
if enableLogging {
return c.pipeline[tableID].BuildFlow(ofPriority).MatchProtocol(proto).
MatchConjID(conjunctionID).
Action().LoadRegRange(int(conjReg), conjunctionID, binding.Range{0, 31}). // Traceflow.
Action().LoadRegRange(int(marksReg), DispositionAllow, APDispositionMarkRange). // AntreaPolicy.
Action().LoadRegRange(int(conjReg), conjunctionID, binding.Range{0, 31}). // Traceflow.
Action().LoadRegRange(int(marksReg), DispositionAllow, APDispositionMarkRange). // AntreaPolicy.
Action().LoadRegRange(int(marksReg), CustomReasonLogging, CustomReasonMarkRange). // Enable logging.
Action().SendToController(uint8(PacketInReasonNP)).
Action().CT(true, nextTable, ctZone). // CT action requires commit flag if actions other than NAT without arguments are specified.
Expand All @@ -1394,7 +1394,7 @@ func (c *client) conjunctionActionFlow(conjunctionID uint32, tableID binding.Tab
return c.pipeline[tableID].BuildFlow(ofPriority).MatchProtocol(proto).
MatchConjID(conjunctionID).
Action().LoadRegRange(int(conjReg), conjunctionID, binding.Range{0, 31}). // Traceflow.
Action().CT(true, nextTable, ctZone). // CT action requires commit flag if actions other than NAT without arguments are specified.
Action().CT(true, nextTable, ctZone). // CT action requires commit flag if actions other than NAT without arguments are specified.
LoadToLabelRange(uint64(conjunctionID), &labelRange).
CTDone().
Cookie(c.cookieAllocator.Request(cookie.Policy).Raw()).
Expand Down
68 changes: 34 additions & 34 deletions pkg/apis/cni/v1beta1/cni.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,55 @@ package github.com.vmware_tanzu.antrea.pkg.apis.cni.v1beta1;
option go_package = "v1beta1";

message CniCmdArgs {
string container_id = 1;
string netns = 2;
string ifname = 3;
string args = 4;
string path = 5;
bytes network_configuration = 6;
string container_id = 1;
string netns = 2;
string ifname = 3;
string args = 4;
string path = 5;
bytes network_configuration = 6;
}

message CniCmdRequest {
CniCmdArgs cni_args = 1;
CniCmdArgs cni_args = 1;
}

enum ErrorCode {
UNKNOWN = 0;
INCOMPATIBLE_CNI_VERSION = 1;
UNSUPPORTED_FIELD = 2;
UNKNOWN_CONTAINER = 3;
INVALID_ENVIRONMENT_VARIABLES = 4;
IO_FAILURE = 5;
DECODING_FAILURE = 6;
INVALID_NETWORK_CONFIG = 7;
TRY_AGAIN_LATER = 11;
IPAM_FAILURE = 101;
CONFIG_INTERFACE_FAILURE = 102;
CHECK_INTERFACE_FAILURE = 103;
// these errors are not used by the servers, but we declare them here to
// make sure they are reserved.
UNKNOWN_RPC_ERROR = 201;
INCOMPATIBLE_API_VERSION = 202;
UNKNOWN = 0;
INCOMPATIBLE_CNI_VERSION = 1;
UNSUPPORTED_FIELD = 2;
UNKNOWN_CONTAINER = 3;
INVALID_ENVIRONMENT_VARIABLES = 4;
IO_FAILURE = 5;
DECODING_FAILURE = 6;
INVALID_NETWORK_CONFIG = 7;
TRY_AGAIN_LATER = 11;
IPAM_FAILURE = 101;
CONFIG_INTERFACE_FAILURE = 102;
CHECK_INTERFACE_FAILURE = 103;
// these errors are not used by the servers, but we declare them here to
// make sure they are reserved.
UNKNOWN_RPC_ERROR = 201;
INCOMPATIBLE_API_VERSION = 202;
}

message Error {
ErrorCode code = 1;
string message = 2;
repeated google.protobuf.Any details = 3;
ErrorCode code = 1;
string message = 2;
repeated google.protobuf.Any details = 3;
}

message CniCmdResponse {
bytes cni_result = 1;
Error error = 2;
bytes cni_result = 1;
Error error = 2;
}

service Cni {
rpc CmdAdd (CniCmdRequest) returns (CniCmdResponse) {
}
rpc CmdAdd (CniCmdRequest) returns (CniCmdResponse) {
}

rpc CmdCheck (CniCmdRequest) returns (CniCmdResponse) {
}
rpc CmdCheck (CniCmdRequest) returns (CniCmdResponse) {
}

rpc CmdDel (CniCmdRequest) returns (CniCmdResponse) {
}
rpc CmdDel (CniCmdRequest) returns (CniCmdResponse) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (

var alwaysReady = func() bool { return true }

const informerDefaultResync = 30 * time.Second
const informerDefaultResync time.Duration = 30 * time.Second

var (
k8sProtocolUDP = corev1.ProtocolUDP
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/traceflow/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

var alwaysReady = func() bool { return true }

const informerDefaultResync = 30 * time.Second
const informerDefaultResync time.Duration = 30 * time.Second

type traceflowController struct {
*Controller
Expand Down
Loading

0 comments on commit 886c22f

Please sign in to comment.