Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Aggregation process methods #158

Merged
merged 1 commit into from
Mar 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,32 @@ func (a *AggregationProcess) ForAllRecordsDo(callback FlowKeyRecordMapCallBack)
return nil
}

func (a *AggregationProcess) DeleteFlowKeyFromMapWithLock(flowKey FlowKey) {
// GetLastUpdatedTimeOfFlow provides the last updated time in the format of IPFIX
// field "flowEndSeconds".
func (a *AggregationProcess) GetLastUpdatedTimeOfFlow(flowKey FlowKey) (uint32, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
delete(a.flowKeyRecordMap, flowKey)
record, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return 0, fmt.Errorf("flow key is not present in the map")
}
flowEndField, exists := record.Record.GetInfoElementWithValue("flowEndSeconds")
if exists {
return flowEndField.Value.(uint32), nil
} else {
return 0, fmt.Errorf("flowEndSeconds field is not present in the record")
}
}

// DeleteFlowKeyFromMapWithoutLock need to be used only when the caller has already
// acquired the lock. For example, this can be used in a callback of ForAllRecordsDo
// function.
// TODO:Remove this when there is notion of invalid flows supported in aggregation
// process.
func (a *AggregationProcess) DeleteFlowKeyFromMapWithoutLock(flowKey FlowKey) {
func (a *AggregationProcess) DeleteFlowKeyFromMap(flowKey FlowKey) error {
a.mutex.Lock()
defer a.mutex.Unlock()
_, exists := a.flowKeyRecordMap[flowKey]
if !exists {
return fmt.Errorf("flow key is not present in the map")
}
delete(a.flowKeyRecordMap, flowKey)
return nil
}

// addOrUpdateRecordInMap either adds the record to flowKeyMap or updates the record in
Expand Down
46 changes: 20 additions & 26 deletions pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,21 +514,20 @@ func TestCorrelateRecordsForInterNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, record2, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, false, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, false, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record2, record1, false, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)

err = ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
// Test the scenario, where record1 is added first and then record2.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, record2, true, false, false)
// Cleanup the flowKeyMap in aggregation process.
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
// Test the scenario, where record2 is added first and then record1.
record1 = createDataMsgForSrc(t, true, false, false, false).GetSet().GetRecords()[0]
record2 = createDataMsgForDst(t, true, false, false).GetSet().GetRecords()[0]
Expand All @@ -548,7 +547,8 @@ func TestCorrelateRecordsForIntraNodeFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, false)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, false).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, false)
Expand All @@ -567,7 +567,8 @@ func TestCorrelateRecordsForToExternalFlow(t *testing.T) {
runCorrelationAndCheckResult(t, ap, record1, nil, false, true, true)
// Cleanup the flowKeyMap in aggregation process.
flowKey1, _ := getFlowKeyFromRecord(record1)
ap.DeleteFlowKeyFromMapWithLock(*flowKey1)
err := ap.DeleteFlowKeyFromMap(*flowKey1)
assert.NoError(t, err)
// Test IPv6 fields.
record1 = createDataMsgForSrc(t, true, true, false, true).GetSet().GetRecords()[0]
runCorrelationAndCheckResult(t, ap, record1, nil, true, true, true)
Expand Down Expand Up @@ -615,13 +616,15 @@ func TestDeleteFlowKeyFromMapWithLock(t *testing.T) {
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey2)
err := aggregationProcess.DeleteFlowKeyFromMap(flowKey2)
assert.Error(t, err)
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.DeleteFlowKeyFromMapWithLock(flowKey1)
err = aggregationProcess.DeleteFlowKeyFromMap(flowKey1)
assert.NoError(t, err)
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
}

func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
func TestAggregationProcess_GetLastUpdatedTimeOfFlow(t *testing.T) {
messageChan := make(chan *entities.Message)
input := AggregationInput{
MessageChan: messageChan,
Expand All @@ -631,22 +634,13 @@ func TestDeleteFlowKeyFromMapWithoutLock(t *testing.T) {
aggregationProcess, _ := InitAggregationProcess(input)
message := createDataMsgForSrc(t, false, false, false, false)
flowKey1 := FlowKey{"10.0.0.1", "10.0.0.2", 6, 1234, 5678}
flowKey2 := FlowKey{"2001:0:3238:dfe1:63::fefb", "2001:0:3238:dfe1:63::fefc", 6, 1234, 5678}
aggFlowRecord := AggregationFlowRecord{
message.GetSet().GetRecords()[0],
true,
true,
}
aggregationProcess.flowKeyRecordMap[flowKey1] = aggFlowRecord
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey2)
aggregationProcess.mutex.Unlock()
assert.Equal(t, 1, len(aggregationProcess.flowKeyRecordMap))
aggregationProcess.mutex.Lock()
aggregationProcess.DeleteFlowKeyFromMapWithoutLock(flowKey1)
aggregationProcess.mutex.Unlock()
assert.Empty(t, aggregationProcess.flowKeyRecordMap)
_, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.Error(t, err)
err = aggregationProcess.addOrUpdateRecordInMap(&flowKey1, message.GetSet().GetRecords()[0])
assert.NoError(t, err)
flowUpdatedTime, err := aggregationProcess.GetLastUpdatedTimeOfFlow(flowKey1)
assert.NoError(t, err)
assert.Equal(t, uint32(1), flowUpdatedTime)
}

func runCorrelationAndCheckResult(t *testing.T, ap *AggregationProcess, record1, record2 entities.Record, isIPv6, isIntraNode, isToExternal bool) {
Expand Down