Skip to content

Commit

Permalink
athena migration: use decoder to decode lines (gravitational#29385)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiaszheller authored Jul 25, 2023
1 parent 968b91f commit 8886987
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 8 deletions.
13 changes: 5 additions & 8 deletions examples/dynamoathenamigration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,12 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC
checkpointValues := checkpoint.checkpointValues()
afterCheckpoint := afterCheckpointIn

scanner := bufio.NewScanner(gzipReader)
t.Logger.Debugf("Scanning %d events", dataObj.ItemCount)
count := 0
for scanner.Scan() {
decoder := json.NewDecoder(gzipReader)
for decoder.More() {
count++
ev, err := exportedDynamoItemToAuditEvent(ctx, scanner.Bytes())
ev, err := exportedDynamoItemToAuditEvent(ctx, decoder)
if err != nil {
return false, trace.Wrap(err)
}
Expand Down Expand Up @@ -451,16 +451,13 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC
}
}

if err := scanner.Err(); err != nil {
return false, trace.Wrap(err)
}
return afterCheckpoint, nil
}

// exportedDynamoItemToAuditEvent converts single line of dynamo export into AuditEvent.
func exportedDynamoItemToAuditEvent(ctx context.Context, in []byte) (apievents.AuditEvent, error) {
func exportedDynamoItemToAuditEvent(ctx context.Context, decoder *json.Decoder) (apievents.AuditEvent, error) {
var itemMap map[string]map[string]any
if err := json.Unmarshal(in, &itemMap); err != nil {
if err := decoder.Decode(&itemMap); err != nil {
return nil, trace.Wrap(err)
}

Expand Down
83 changes: 83 additions & 0 deletions examples/dynamoathenamigration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,35 @@ func TestMigrateProcessDataObjects(t *testing.T) {
}, emitter.events)
}

func TestLargeEventsParse(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

emitter := &mockEmitter{}
mt := &task{
s3Downloader: &fakeDownloader{
dataObjects: map[string]string{
"large.json.gz": generateLargeEventLine(),
},
},
eventsEmitter: emitter,
Config: Config{
Logger: utils.NewLoggerForTests(),
NoOfEmitWorkers: 5,
bufferSize: 10,
CheckpointPath: path.Join(t.TempDir(), "migration-tests.json"),
},
}
err := mt.ProcessDataObjects(ctx, &exportInfo{
ExportARN: "export-arn",
DataObjectsInfo: []dataObjectInfo{
{DataFileS3Key: "large.json.gz"},
},
})
require.NoError(t, err)
require.Len(t, emitter.events, 1)
}

type fakeDownloader struct {
dataObjects map[string]string
}
Expand Down Expand Up @@ -419,6 +448,60 @@ func TestMigrationCheckpoint(t *testing.T) {
})
}

func generateLargeEventLine() string {
// Generate event close to 400KB which is max of dynamoDB to test if
// it can be processed.
return fmt.Sprintf(
`{
"Item": {
"EventIndex": {
"N": "2147483647"
},
"SessionID": {
"S": "4298bd54-a747-4d53-b850-83ba17caae5a"
},
"CreatedAtDate": {
"S": "2023-05-22"
},
"FieldsMap": {
"M": {
"cluster_name": {
"S": "%s"
},
"uid": {
"S": "%s"
},
"code": {
"S": "T2005I"
},
"ei": {
"N": "2147483647"
},
"time": {
"S": "2023-05-22T12:12:21.966Z"
},
"event": {
"S": "session.upload"
},
"sid": {
"S": "4298bd54-a747-4d53-b850-83ba17caae5a"
}
}
},
"EventType": {
"S": "session.upload"
},
"EventNamespace": {
"S": "default"
},
"CreatedAt": {
"N": "1684757541"
}
}
}`,
strings.Repeat("a", 1024*400 /* 400 KB */), uuid.NewString())
}

func generateDynamoExportData(n int) string {
if n < 1 {
panic("number of events to generate must be > 0")
Expand Down

0 comments on commit 8886987

Please sign in to comment.