Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backports for ActivityLog and Reporting 1.13.x #21140

Merged
merged 17 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b7e224b
backport of commit 9f7f8d5bfad0aa3f06a4fcd86484f3e2f01a40a4
miagilepner Mar 6, 2023
335ff2d
backport of commit e3c59773e969336bb4e85ddbf3a3700a6250f4c8
mpalmi Mar 20, 2023
8e112e8
backport of commit b4fab6ac2ae830f3bec8c287f07d5193dcfcdc22
miagilepner Mar 31, 2023
bea77fc
backport of commit 54904e4cd6d6cb37e876d8b93c37d292b3419dd3
miagilepner Apr 4, 2023
5e16922
backport of commit 4b6ec4079d1bdccde4cab416417a296c8c233c1b
miagilepner Apr 11, 2023
bf48b22
backport of commit 05ba6bbddded428d2fa010f9359d0543f46af52b
mpalmi Apr 12, 2023
e2767f0
backport of commit 002a59a370a80c846191ece427bef92f25bf81eb
mpalmi Apr 13, 2023
76fd19f
backport of commit 77f83d9fe8b85c126347794a460410c2025675fd
mpalmi Apr 21, 2023
a8a1fd5
backport of commit 730d0e2821dbc3bb1fe91ade183aa8c2908eaae5
miagilepner May 16, 2023
56910ab
backport of commit 35e2c1665f009183088387532e17d02ded312e18
miagilepner May 19, 2023
2fa366a
backport of commit 810d504e4f676e857632230bf565eaa214927bcd
mpalmi May 22, 2023
2083299
backport of commit 5b23dd506fb3d2c79f4a18b995a72548560cc799
miagilepner May 23, 2023
3e2bd2f
backport of commit 018ea84997b49137ae3884e00e4dc9fc389f8b50
miagilepner May 23, 2023
8c4b018
backport of commit 541f18eeb782cd0c8ee28b961e99c3adf952bd22
miagilepner May 24, 2023
adb7d0d
backport of commit b4e2751a09d411abef62a3769b08e9f1ce647e25
miagilepner May 25, 2023
0fe6348
backport of commit dc5dd71c72a981e703379484bcac57e32af01fec
ncabatoff Jun 2, 2023
f2a62da
backport of commit 5002489d279ea2a98342ae31f342e0cd9c888e85
miagilepner Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
backport of commit 5002489
  • Loading branch information
miagilepner authored and mpalmi committed Jun 14, 2023
commit f2a62daf3fbdc99226a2010542172402e07248fe
75 changes: 75 additions & 0 deletions vault/logical_system_activity_write_testonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package vault
import (
"context"
"fmt"
"io"
"sync"
"time"

Expand Down Expand Up @@ -315,7 +316,23 @@ func (m *multipleMonthsActivityClients) addRepeatedClients(monthsAgo int32, c *g
func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[generation.WriteOptions]struct{}, activityLog *ActivityLog) ([]string, error) {
now := timeutil.StartOfMonth(time.Now().UTC())
paths := []string{}

_, writePQ := opts[generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES]
_, writeDistinctClients := opts[generation.WriteOptions_WRITE_DISTINCT_CLIENTS]

pqOpts := pqOptions{}
if writePQ || writeDistinctClients {
pqOpts.byNamespace = make(map[string]*processByNamespace)
pqOpts.byMonth = make(map[int64]*processMonth)
pqOpts.activePeriodEnd = m.latestTimestamp(now)
pqOpts.endTime = timeutil.EndOfMonth(pqOpts.activePeriodEnd)
pqOpts.activePeriodStart = m.earliestTimestamp(now)
}

for i, month := range m.months {
if month.generationParameters == nil {
continue
}
var timestamp time.Time
if i > 0 {
timestamp = timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
Expand Down Expand Up @@ -344,6 +361,14 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
paths = append(paths, entityPath)
}
}

if writePQ || writeDistinctClients {
reader := newProtoSegmentReader(segments)
err = activityLog.segmentToPrecomputedQuery(ctx, timestamp, reader, pqOpts)
if err != nil {
return nil, err
}
}
}
wg := sync.WaitGroup{}
err := activityLog.refreshFromStoredLog(ctx, &wg, now)
Expand All @@ -353,6 +378,25 @@ func (m *multipleMonthsActivityClients) write(ctx context.Context, opts map[gene
return paths, nil
}

func (m *multipleMonthsActivityClients) latestTimestamp(now time.Time) time.Time {
for i, month := range m.months {
if month.generationParameters != nil {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
}
return time.Time{}
}

func (m *multipleMonthsActivityClients) earliestTimestamp(now time.Time) time.Time {
for i := len(m.months) - 1; i >= 0; i-- {
month := m.months[i]
if month.generationParameters != nil {
return timeutil.StartOfMonth(timeutil.MonthsPreviousTo(i, now))
}
}
return time.Time{}
}

func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivityClients {
m := &multipleMonthsActivityClients{
months: make([]*singleMonthActivityClients, numberOfMonths),
Expand All @@ -364,3 +408,34 @@ func newMultipleMonthsActivityClients(numberOfMonths int) *multipleMonthsActivit
}
return m
}

func newProtoSegmentReader(segments map[int][]*activity.EntityRecord) SegmentReader {
allRecords := make([][]*activity.EntityRecord, 0, len(segments))
for _, records := range segments {
if segments == nil {
continue
}
allRecords = append(allRecords, records)
}
return &sliceSegmentReader{
records: allRecords,
}
}

type sliceSegmentReader struct {
records [][]*activity.EntityRecord
i int
}

func (p *sliceSegmentReader) ReadToken(ctx context.Context) (*activity.TokenCount, error) {
return nil, io.EOF
}

func (p *sliceSegmentReader) ReadEntity(ctx context.Context) (*activity.EntityActivityLog, error) {
if p.i == len(p.records) {
return nil, io.EOF
}
record := p.records[p.i]
p.i++
return &activity.EntityActivityLog{Clients: record}, nil
}
261 changes: 146 additions & 115 deletions vault/logical_system_activity_write_testonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/helper/timeutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault/activity"
"github.com/hashicorp/vault/vault/activity/generation"
Expand Down Expand Up @@ -441,136 +443,165 @@ func Test_singleMonthActivityClients_populateSegments(t *testing.T) {
}
}

// Test_multipleMonthsActivityClients_write_entities writes 4 months of data
// splitting some months across segments and using empty segments and skipped
// segments. Entities are written and then storage is queried. The test verifies
// that the correct timestamps are present in the activity log and that the correct
// segment numbers for each month contain the correct number of clients
func Test_multipleMonthsActivityClients_write_entities(t *testing.T) {
// Test_handleActivityWriteData writes 4 months of data splitting some months
// across segments and using empty segments and skipped segments. Entities and
// precomputed queries are written. written and then storage is queried. The
// test verifies that the correct timestamps are present in the activity log and
// that the correct segment numbers for each month contain the correct number of
// clients
func Test_handleActivityWriteData(t *testing.T) {
index5 := int32(5)
index4 := int32(4)
data := &generation.ActivityLogMockInput{
Write: []generation.WriteOptions{
generation.WriteOptions_WRITE_ENTITIES,
data := []*generation.Data{
{
// segments: 0:[x,y], 1:[z]
Month: &generation.Data_MonthsAgo{MonthsAgo: 3},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}},
NumSegments: 2,
},
Data: []*generation.Data{
{
// segments: 0:[x,y], 1:[z]
Month: &generation.Data_MonthsAgo{MonthsAgo: 3},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 3}}}},
NumSegments: 2,
},
{
// segments: 1:[a,b,c], 2:[d,e]
Month: &generation.Data_MonthsAgo{MonthsAgo: 2},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}},
NumSegments: 3,
SkipSegmentIndexes: []int32{0},
{
// segments: 1:[a,b,c], 2:[d,e]
Month: &generation.Data_MonthsAgo{MonthsAgo: 2},
Clients: &generation.Data_All{All: &generation.Clients{Clients: []*generation.Client{{Count: 5}}}},
NumSegments: 3,
SkipSegmentIndexes: []int32{0},
},
{
// segments: 5:[f,g]
Month: &generation.Data_MonthsAgo{MonthsAgo: 1},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}},
}}},
},
{
// segments: 5:[f,g]
Month: &generation.Data_MonthsAgo{MonthsAgo: 1},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{{
},
{
// segments: 1:[], 2:[], 4:[n], 5:[o]
Month: &generation.Data_CurrentMonth{},
EmptySegmentIndexes: []int32{1, 2},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{
{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 2}}},
}}},
},
},
{
// segments: 1:[], 2:[], 4:[n], 5:[o]
Month: &generation.Data_CurrentMonth{},
EmptySegmentIndexes: []int32{1, 2},
Clients: &generation.Data_Segments{
Segments: &generation.Segments{Segments: []*generation.Segment{
{
SegmentIndex: &index5,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
{
SegmentIndex: &index4,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
}},
},
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
{
SegmentIndex: &index4,
Clients: &generation.Clients{Clients: []*generation.Client{{Count: 1}}},
},
}},
},
},
}

core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(data)
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)
paths := resp.Data["paths"].([]string)
require.Len(t, paths, 9)
t.Run("write entitites", func(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{
Data: data,
Write: []generation.WriteOptions{generation.WriteOptions_WRITE_ENTITIES},
})
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
resp, err := core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)
paths := resp.Data["paths"].([]string)
require.Len(t, paths, 9)

times, err := core.activityLog.availableLogs(context.Background())
require.NoError(t, err)
require.Len(t, times, 4)
times, err := core.activityLog.availableLogs(context.Background())
require.NoError(t, err)
require.Len(t, times, 4)

sortPaths := func(monthPaths []string) {
sort.Slice(monthPaths, func(i, j int) bool {
iVal, _ := parseSegmentNumberFromPath(monthPaths[i])
jVal, _ := parseSegmentNumberFromPath(monthPaths[j])
return iVal < jVal
})
}
sortPaths := func(monthPaths []string) {
sort.Slice(monthPaths, func(i, j int) bool {
iVal, _ := parseSegmentNumberFromPath(monthPaths[i])
jVal, _ := parseSegmentNumberFromPath(monthPaths[j])
return iVal < jVal
})
}

month0Paths := paths[0:4]
month1Paths := paths[4:5]
month2Paths := paths[5:7]
month3Paths := paths[7:9]
sortPaths(month0Paths)
sortPaths(month1Paths)
sortPaths(month2Paths)
sortPaths(month3Paths)
entities := func(paths []string) map[int][]*activity.EntityRecord {
segments := make(map[int][]*activity.EntityRecord)
for _, path := range paths {
segmentNum, _ := parseSegmentNumberFromPath(path)
entry, err := core.activityLog.view.Get(context.Background(), path)
require.NoError(t, err)
if entry == nil {
segments[segmentNum] = []*activity.EntityRecord{}
continue
month0Paths := paths[0:4]
month1Paths := paths[4:5]
month2Paths := paths[5:7]
month3Paths := paths[7:9]
sortPaths(month0Paths)
sortPaths(month1Paths)
sortPaths(month2Paths)
sortPaths(month3Paths)
entities := func(paths []string) map[int][]*activity.EntityRecord {
segments := make(map[int][]*activity.EntityRecord)
for _, path := range paths {
segmentNum, _ := parseSegmentNumberFromPath(path)
entry, err := core.activityLog.view.Get(context.Background(), path)
require.NoError(t, err)
if entry == nil {
segments[segmentNum] = []*activity.EntityRecord{}
continue
}
activities := &activity.EntityActivityLog{}
err = proto.Unmarshal(entry.Value, activities)
require.NoError(t, err)
segments[segmentNum] = activities.Clients
}
activities := &activity.EntityActivityLog{}
err = proto.Unmarshal(entry.Value, activities)
require.NoError(t, err)
segments[segmentNum] = activities.Clients
return segments
}
return segments
}
month0Entities := entities(month0Paths)
require.Len(t, month0Entities, 4)
require.Contains(t, month0Entities, 1)
require.Contains(t, month0Entities, 2)
require.Contains(t, month0Entities, 4)
require.Contains(t, month0Entities, 5)
require.Len(t, month0Entities[1], 0)
require.Len(t, month0Entities[2], 0)
require.Len(t, month0Entities[4], 1)
require.Len(t, month0Entities[5], 1)
month0Entities := entities(month0Paths)
require.Len(t, month0Entities, 4)
require.Contains(t, month0Entities, 1)
require.Contains(t, month0Entities, 2)
require.Contains(t, month0Entities, 4)
require.Contains(t, month0Entities, 5)
require.Len(t, month0Entities[1], 0)
require.Len(t, month0Entities[2], 0)
require.Len(t, month0Entities[4], 1)
require.Len(t, month0Entities[5], 1)

month1Entities := entities(month1Paths)
require.Len(t, month1Entities, 1)
require.Contains(t, month1Entities, 5)
require.Len(t, month1Entities[5], 2)
month1Entities := entities(month1Paths)
require.Len(t, month1Entities, 1)
require.Contains(t, month1Entities, 5)
require.Len(t, month1Entities[5], 2)

month2Entities := entities(month2Paths)
require.Len(t, month2Entities, 2)
require.Contains(t, month2Entities, 1)
require.Contains(t, month2Entities, 2)
require.Len(t, month2Entities[1], 3)
require.Len(t, month2Entities[2], 2)

month3Entities := entities(month3Paths)
require.Len(t, month3Entities, 2)
require.Contains(t, month3Entities, 0)
require.Contains(t, month3Entities, 1)
require.Len(t, month3Entities[0], 2)
require.Len(t, month3Entities[1], 1)
})
t.Run("write precomputed queries", func(t *testing.T) {
core, _, _ := TestCoreUnsealed(t)
marshaled, err := protojson.Marshal(&generation.ActivityLogMockInput{
Data: data,
Write: []generation.WriteOptions{generation.WriteOptions_WRITE_PRECOMPUTED_QUERIES},
})
require.NoError(t, err)
req := logical.TestRequest(t, logical.CreateOperation, "internal/counters/activity/write")
req.Data = map[string]interface{}{"input": string(marshaled)}
_, err = core.systemBackend.HandleRequest(namespace.RootContext(nil), req)
require.NoError(t, err)

month2Entities := entities(month2Paths)
require.Len(t, month2Entities, 2)
require.Contains(t, month2Entities, 1)
require.Contains(t, month2Entities, 2)
require.Len(t, month2Entities[1], 3)
require.Len(t, month2Entities[2], 2)
queries, err := core.activityLog.queryStore.QueriesAvailable(context.Background())
require.NoError(t, err)
require.True(t, queries)

month3Entities := entities(month3Paths)
require.Len(t, month3Entities, 2)
require.Contains(t, month3Entities, 0)
require.Contains(t, month3Entities, 1)
require.Len(t, month3Entities[0], 2)
require.Len(t, month3Entities[1], 1)
now := time.Now().UTC()
start := timeutil.StartOfMonth(timeutil.MonthsPreviousTo(3, now))
end := timeutil.EndOfMonth(now)
pq, err := core.activityLog.queryStore.Get(context.Background(), start, end)
require.NoError(t, err)
require.NotNil(t, pq)
require.Equal(t, end, pq.EndTime)
require.Equal(t, start, pq.StartTime)
require.Len(t, pq.Namespaces, 1)
require.Equal(t, uint64(12), pq.Namespaces[0].Entities)
require.Len(t, pq.Months, 4)
})
}