From 8886987dc89146ed669f852c9b89255804953a0e Mon Sep 17 00:00:00 2001 From: Tobiasz Heller <14020794+tobiaszheller@users.noreply.github.com> Date: Tue, 25 Jul 2023 12:36:29 +0200 Subject: [PATCH] athena migration: use decoder to decode lines (#29385) --- examples/dynamoathenamigration/migration.go | 13 ++- .../dynamoathenamigration/migration_test.go | 83 +++++++++++++++++++ 2 files changed, 88 insertions(+), 8 deletions(-) diff --git a/examples/dynamoathenamigration/migration.go b/examples/dynamoathenamigration/migration.go index 76ff0020f96b3..93eeaf0fce4a2 100644 --- a/examples/dynamoathenamigration/migration.go +++ b/examples/dynamoathenamigration/migration.go @@ -403,12 +403,12 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC checkpointValues := checkpoint.checkpointValues() afterCheckpoint := afterCheckpointIn - scanner := bufio.NewScanner(gzipReader) t.Logger.Debugf("Scanning %d events", dataObj.ItemCount) count := 0 - for scanner.Scan() { + decoder := json.NewDecoder(gzipReader) + for decoder.More() { count++ - ev, err := exportedDynamoItemToAuditEvent(ctx, scanner.Bytes()) + ev, err := exportedDynamoItemToAuditEvent(ctx, decoder) if err != nil { return false, trace.Wrap(err) } @@ -451,16 +451,13 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC } } - if err := scanner.Err(); err != nil { - return false, trace.Wrap(err) - } return afterCheckpoint, nil } // exportedDynamoItemToAuditEvent converts single line of dynamo export into AuditEvent. -func exportedDynamoItemToAuditEvent(ctx context.Context, in []byte) (apievents.AuditEvent, error) { +func exportedDynamoItemToAuditEvent(ctx context.Context, decoder *json.Decoder) (apievents.AuditEvent, error) { var itemMap map[string]map[string]any - if err := json.Unmarshal(in, &itemMap); err != nil { + if err := decoder.Decode(&itemMap); err != nil { return nil, trace.Wrap(err) } diff --git a/examples/dynamoathenamigration/migration_test.go b/examples/dynamoathenamigration/migration_test.go index d94ec2bbb1172..9f01f73a2e208 100644 --- a/examples/dynamoathenamigration/migration_test.go +++ b/examples/dynamoathenamigration/migration_test.go @@ -115,6 +115,35 @@ func TestMigrateProcessDataObjects(t *testing.T) { }, emitter.events) } +func TestLargeEventsParse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + emitter := &mockEmitter{} + mt := &task{ + s3Downloader: &fakeDownloader{ + dataObjects: map[string]string{ + "large.json.gz": generateLargeEventLine(), + }, + }, + eventsEmitter: emitter, + Config: Config{ + Logger: utils.NewLoggerForTests(), + NoOfEmitWorkers: 5, + bufferSize: 10, + CheckpointPath: path.Join(t.TempDir(), "migration-tests.json"), + }, + } + err := mt.ProcessDataObjects(ctx, &exportInfo{ + ExportARN: "export-arn", + DataObjectsInfo: []dataObjectInfo{ + {DataFileS3Key: "large.json.gz"}, + }, + }) + require.NoError(t, err) + require.Len(t, emitter.events, 1) +} + type fakeDownloader struct { dataObjects map[string]string } @@ -419,6 +448,60 @@ func TestMigrationCheckpoint(t *testing.T) { }) } +func generateLargeEventLine() string { + // Generate event close to 400KB which is max of dynamoDB to test if + // it can be processed. + return fmt.Sprintf( + `{ + "Item": { + "EventIndex": { + "N": "2147483647" + }, + "SessionID": { + "S": "4298bd54-a747-4d53-b850-83ba17caae5a" + }, + "CreatedAtDate": { + "S": "2023-05-22" + }, + "FieldsMap": { + "M": { + "cluster_name": { + "S": "%s" + }, + "uid": { + "S": "%s" + }, + "code": { + "S": "T2005I" + }, + "ei": { + "N": "2147483647" + }, + "time": { + "S": "2023-05-22T12:12:21.966Z" + }, + "event": { + "S": "session.upload" + }, + "sid": { + "S": "4298bd54-a747-4d53-b850-83ba17caae5a" + } + } + }, + "EventType": { + "S": "session.upload" + }, + "EventNamespace": { + "S": "default" + }, + "CreatedAt": { + "N": "1684757541" + } + } + }`, + strings.Repeat("a", 1024*400 /* 400 KB */), uuid.NewString()) +} + func generateDynamoExportData(n int) string { if n < 1 { panic("number of events to generate must be > 0")