Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][awss3] - Added support for parquet decoding and decoder config #35578

Merged
merged 56 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ffe109d
initial commit for s3 parquet support
ShourieG Apr 24, 2023
5295efd
updated changelog
ShourieG Apr 24, 2023
0f5b475
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG Apr 24, 2023
b41aa40
added license updates
ShourieG Apr 24, 2023
83598fa
updated notice and go mod/sum
ShourieG Apr 24, 2023
1ad3fe9
Merge branch 'main' into awss3/parquet
ShourieG Apr 24, 2023
f7c5498
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG Apr 24, 2023
ec642f5
removed libgering panic
ShourieG Apr 24, 2023
1664648
added parquet benchmark tests
ShourieG Apr 25, 2023
8f56a5e
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG Apr 25, 2023
4d090a3
updated osquery package due to update in dependant thrift package
ShourieG Apr 25, 2023
b370093
added parquet reader with benchmark tests and implemented that reader…
ShourieG Apr 26, 2023
e8e45af
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG Apr 26, 2023
2ff7b38
addressed linting errors
ShourieG Apr 26, 2023
2d8321b
refactored parquet reader, added tests and benchmarks and addressed p…
ShourieG Apr 28, 2023
cbf864c
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG Apr 28, 2023
42b7d06
addressed pr comments
ShourieG May 2, 2023
8119a06
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG May 2, 2023
6e6687d
resolved merged conflicts
ShourieG May 8, 2023
2c9d32a
resolved merged conflicts
ShourieG May 8, 2023
8c536a4
updated notice
ShourieG May 8, 2023
9b2e330
added more parquet file tests with json comparisons, addressed pr com…
ShourieG May 9, 2023
35df388
Merge remote-tracking branch 'upstream/main' into awss3/parquet
ShourieG May 9, 2023
fc9c0c6
removed commented codeS
ShourieG May 9, 2023
ed6edca
removed bad imports & cleaned up tests
ShourieG May 12, 2023
6384a11
updated notice
ShourieG May 12, 2023
47c61a1
added graceful closures with err checks in test
ShourieG May 12, 2023
3049ee5
added graceful closures with err checks in test
ShourieG May 12, 2023
7330873
merged with upstream and resolved conflicts
ShourieG May 23, 2023
7aca5fa
updated go sum
ShourieG May 23, 2023
4e60653
added support for parquet decoding in awss3 input
ShourieG May 25, 2023
3c31cf5
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG May 25, 2023
12c93ce
updated changelog
ShourieG May 25, 2023
807e0b5
updated NOTICE
ShourieG May 25, 2023
78ca3af
updated NOTICE
ShourieG May 25, 2023
a7f35b0
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG May 26, 2023
e8582d7
Merge branch 's3/parquet' of github.com:ShourieG/beats into s3/parquet
ShourieG May 26, 2023
7611baf
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG May 27, 2023
fa6367d
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG May 30, 2023
7df4854
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG May 31, 2023
1b8bff5
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 1, 2023
0b8a71a
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 5, 2023
41f9885
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 8, 2023
ba7927a
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 13, 2023
a0fb21f
refactored the decoder interface
ShourieG Jun 13, 2023
5005542
updated docs
ShourieG Jun 13, 2023
3f627e7
updated changelog
ShourieG Jun 13, 2023
392e89e
updated the tests to read prettified json
ShourieG Jun 13, 2023
c92bf31
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 13, 2023
4b458e0
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 14, 2023
f296baf
updated offset tracking logic
ShourieG Jun 16, 2023
414bd70
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 16, 2023
24dafcf
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 20, 2023
bf214a7
updated docs
ShourieG Jun 20, 2023
e536a51
added event offset logic
ShourieG Jun 20, 2023
f9d2859
Merge remote-tracking branch 'upstream/main' into s3/parquet
ShourieG Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add nginx ingress_controller parsing if one of upstreams fails to return response {pull}34787[34787]
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
- Add support for collecting IPv6 metrics. {pull}35123[35123]
- Added support for decoding apache parquet files in awss3 input. {issue}34662[34662] {pull}35578[35578]
- Add oracle authentication messages parsing {pull}35127[35127]
- Add sanitization capabilities to azure-eventhub input {pull}34874[34874]
- Add support for CRC validation in Filebeat's HTTP endpoint input. {pull}35204[35204]
Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,43 @@ it was uploaded. For example: `application/json`.
The file encoding to use for reading data that contains international
characters. This only applies to non-JSON logs. See <<_encoding_3>>.

[id="input-{type}-decoding"]
[float]
==== `decoding`

The file decoding option is used to specify a codec that will be used to
decode the file contents. This can apply to any file stream data.
An example config is shown below:

[source,yaml]
----
decoding.codec.parquet: enabled
----

Currently supported codecs are given below:-

1. <<attrib-decoding-parquet,Parquet>>: This codec decodes parquet compressed data streams.

[id="attrib-decoding-parquet"]
[float]
==== `the parquet codec`
The `parquet` codec is used to decode parquet compressed data streams.
Only enabling the codec will use the default codec options. The parquet codec supports
two sub attributes which can make parquet decoding more efficient. The `batch_size` attribute and
the `process_parallel` attribute. The `batch_size` attribute can be used to specify the number of
records to read from the parquet stream at a time. By default the `batch size` is set to `1` and
`process_parallel` is set to `false`. If the `process_parallel` attribute is set to `true` then functions
which read multiple columns will read those columns in parallel from the parquet stream with a
number of readers equal to the number of columns. Setting `process_parallel` to `true` will greatly
increase the rate of processing at the cost of increased memory usage. Having a larger `batch_size`
also helps to increase the rate of processing. An example config is shown below:

[source,yaml]
----
decoding.codec.parquet: enabled
decoding.codec.parquet.process_parallel = true
decoding.codec.parquet.batch_size = 1000
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
----

[float]
==== `expand_event_list_from_field`
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type readerConfig struct {
LineTerminator readfile.LineTerminator `config:"line_terminator"`
MaxBytes cfgtype.ByteSize `config:"max_bytes"`
Parsers parser.Config `config:",inline"`
Decoding decoderConfig `config:"decoding"`
}

func (rc *readerConfig) Validate() error {
Expand Down
78 changes: 78 additions & 0 deletions x-pack/filebeat/input/awss3/decoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"fmt"
"io"

"github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet"
)

// decoder is an interface for decoding data from an io reader.
type decoder interface {
// decode reads and decodes data from an io reader based on the codec type.
// It returns the decoded data and an error if the data cannot be decoded.
decode() ([]byte, error)
// next advances the decoder to the next data item and returns true if there is more data to be decoded.
next() bool
// close closes the decoder and releases any resources associated with it.
// It returns an error if the decoder cannot be closed.
close() error
}

// newDecoder creates a new decoder based on the codec type.
// It returns a decoder type and an error if the codec type is not supported.
// If the reader config codec option is not set, it returns a nil decoder and nil error.
func newDecoder(config decoderConfig, r io.Reader) (decoder, error) {
switch {
case config.Codec == nil:
return nil, nil
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
case config.Codec.Parquet != nil:
return newParquetDecoder(config, r)
default:
return nil, fmt.Errorf("unsupported config value: %v", config)
}
}

// parquetDecoder is a decoder for parquet data.
type parquetDecoder struct {
reader *parquet.BufferedReader
}

// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood.
// It returns an error if the parquet reader cannot be created.
func newParquetDecoder(config decoderConfig, r io.Reader) (decoder, error) {
reader, err := parquet.NewBufferedReader(r, &parquet.Config{
ProcessParallel: config.Codec.Parquet.ProcessParallel,
BatchSize: config.Codec.Parquet.BatchSize,
})
if err != nil {
return nil, fmt.Errorf("failed to create parquet decoder: %w", err)
}
return &parquetDecoder{
reader: reader,
}, nil
}

// next advances the parquet decoder to the next data item and returns true if there is more data to be decoded.
func (pd *parquetDecoder) next() bool {
return pd.reader.Next()
}

// decode reads and decodes a parquet data stream. After reading the parquet data it decodes
// the output to JSON and returns it as a byte slice. It returns an error if the data cannot be decoded.
func (pd *parquetDecoder) decode() ([]byte, error) {
data, err := pd.reader.Record()
if err != nil {
return nil, err
}
return data, nil
}

// close closes the parquet decoder and releases the resources.
func (pd *parquetDecoder) close() error {
return pd.reader.Close()
}
22 changes: 22 additions & 0 deletions x-pack/filebeat/input/awss3/decoding_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

// decoderConfig contains the configuration options for instantiating a decoder.
type decoderConfig struct {
Codec *codecConfig `config:"codec"`
}

// codecConfig contains the configuration options for different codecs used by a decoder.
type codecConfig struct {
Parquet *parquetCodecConfig `config:"parquet"`
}

// parquetCodecConfig contains the configuration options for the parquet codec.
type parquetCodecConfig struct {
Enabled bool `config:"enabled"`
ProcessParallel bool `config:"process_parallel"`
BatchSize int `config:"batch_size" default:"1"`
}
130 changes: 130 additions & 0 deletions x-pack/filebeat/input/awss3/decoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"encoding/json"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
)

// all test files are read from the "testdata" directory
const testDataPath = "testdata"

func TestParquetDecoding(t *testing.T) {
testCases := []struct {
name string
file string
contentType string
numEvents int
assertAgainst string
config *readerConfig
}{
{
name: "test decoding of a parquet file and compare the number of events with batch size 1",
file: "vpc-flow.gz.parquet",
numEvents: 1304,
config: &readerConfig{
Decoding: decoderConfig{
Codec: &codecConfig{
Parquet: &parquetCodecConfig{
ProcessParallel: true,
BatchSize: 1,
},
},
},
},
},
{
name: "test decoding of a parquet file and compare the number of events with batch size 100",
file: "vpc-flow.gz.parquet",
numEvents: 1304,
config: &readerConfig{
Decoding: decoderConfig{
Codec: &codecConfig{
Parquet: &parquetCodecConfig{
ProcessParallel: true,
BatchSize: 100,
},
},
},
},
},
{
name: "test decoding of a parquet file and compare the number of events with default parquet config",
file: "vpc-flow.gz.parquet",
numEvents: 1304,
config: &readerConfig{
Decoding: decoderConfig{
Codec: &codecConfig{
Parquet: &parquetCodecConfig{
Enabled: true,
},
},
},
},
},
{
name: "test decoding of a parquet file and compare the number of events along with the content",
file: "cloudtrail.parquet",
numEvents: 1,
assertAgainst: "cloudtrail.json",
config: &readerConfig{
Decoding: decoderConfig{
Codec: &codecConfig{
Parquet: &parquetCodecConfig{
Enabled: true,
ProcessParallel: true,
BatchSize: 1,
},
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
file := filepath.Join(testDataPath, tc.file)
sel := fileSelectorConfig{ReaderConfig: *tc.config}
if tc.contentType == "" {
tc.contentType = "application/octet-stream"
}
// uses the s3_objects test method to perform the test
events := testProcessS3Object(t, file, tc.contentType, tc.numEvents, sel)
// if assertAgainst is not empty, then compare the events with the target file
// there is a chance for this comparison to become flaky if number of events > 1 as
// the order of events are not guaranteed by beats
if tc.assertAgainst != "" {
targetData := readJSONFromFile(t, filepath.Join(testDataPath, tc.assertAgainst))
assert.Equal(t, len(targetData), len(events))

for i, event := range events {
msg, err := event.Fields.GetValue("message")
assert.NoError(t, err)
assert.JSONEq(t, targetData[i], msg.(string))
}
}
})
}
}

// readJSONFromFile reads the json file and returns the data as a slice of strings
func readJSONFromFile(t *testing.T, filepath string) []string {
fileBytes, err := os.ReadFile(filepath)
assert.NoError(t, err)
var rawMessages []json.RawMessage
err = json.Unmarshal(fileBytes, &rawMessages)
assert.NoError(t, err)
var data []string

for _, rawMsg := range rawMessages {
data = append(data, string(rawMsg))
}
return data
}
69 changes: 61 additions & 8 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"reflect"
"strings"
"time"
Expand Down Expand Up @@ -152,14 +151,37 @@ func (p *s3ObjectProcessor) ProcessS3Object() error {
contentType = p.readerConfig.ContentType
}

// Process object content stream.
switch {
case strings.HasPrefix(contentType, contentTypeJSON) || strings.HasPrefix(contentType, contentTypeNDJSON):
err = p.readJSON(reader)
default:
err = p.readFile(reader)
// try to create a decoder from the using the codec config
decoder, err := newDecoder(p.readerConfig.Decoding, reader)
if err != nil {
return err
}
if decoder != nil {
defer decoder.close()

for decoder.next() {
data, err := decoder.decode()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
break
}
err = p.readJSONSlice(bytes.NewReader(data))
if err != nil {
break
}
}
} else {
// This is the legacy path. It will be removed in future and clubbed together with the decoder.
// Process object content stream.
switch {
case strings.HasPrefix(contentType, contentTypeJSON) || strings.HasPrefix(contentType, contentTypeNDJSON):
err = p.readJSON(reader)
default:
err = p.readFile(reader)
}
}
if err != nil {
return fmt.Errorf("failed reading s3 object (elapsed_time_ns=%d): %w",
time.Since(start).Nanoseconds(), err)
Expand Down Expand Up @@ -233,6 +255,37 @@ func (p *s3ObjectProcessor) readJSON(r io.Reader) error {
return nil
}

// readJSONSlice uses a slice of json.RawMessage to process JSON slice data
// as individual JSON objects.
func (p *s3ObjectProcessor) readJSONSlice(r io.Reader) error {
dec := json.NewDecoder(r)
dec.UseNumber()

for dec.More() && p.ctx.Err() == nil {
offset := dec.InputOffset()

var items []json.RawMessage
if err := dec.Decode(&items); err != nil {
return fmt.Errorf("failed to decode json: %w", err)
}

for _, item := range items {
if p.readerConfig.ExpandEventListFromField != "" {
if err := p.splitEventList(p.readerConfig.ExpandEventListFromField, item, offset, p.s3ObjHash); err != nil {
return err
}
continue
}

data, _ := item.MarshalJSON()
evt := p.createEvent(string(data), offset)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
p.publish(p.acker, &evt)
}
}

return nil
}

func (p *s3ObjectProcessor) splitEventList(key string, raw json.RawMessage, offset int64, objHash string) error {
// .[] signifies the root object is an array, and it should be split.
if key != ".[]" {
Expand Down Expand Up @@ -289,7 +342,7 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error {
}

var reader reader.Reader
reader, err = readfile.NewEncodeReader(ioutil.NopCloser(r), readfile.Config{
reader, err = readfile.NewEncodeReader(io.NopCloser(r), readfile.Config{
Codec: enc,
BufferSize: int(p.readerConfig.BufferSize),
Terminator: p.readerConfig.LineTerminator,
Expand Down
Loading