Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][awss3] - Added support for parquet decoding and decoder config #35578

Merged
merged 56 commits into from
Jun 20, 2023

Conversation

ShourieG
Copy link
Contributor

@ShourieG ShourieG commented May 25, 2023

Type of change

  • Enhancement
  • Docs

What does this PR do?

This PR adds support for a new decoding config option inside the readerConfig struct along with support for
parquet file decoding using the libbeat parquet reader. The decoding config is created in such a manner that
in future we will be able to add more decoding codes as well as migrate decoding processes for JSON, NDJSON
files which currently occur based on the contentType config option.

An example of the new decoding config:

  decoding.codec.parquet.enabled: true
  decoding.codec.parquet.process_parallel: true
  decoding.codec.parquet.batch_size: 1000

Why is it important?

This change allows us to officially support parquet decoding for the s3 input and also enable integrations like
amazon security lake.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
    - [ ] I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc.

Related issues

@ShourieG
Copy link
Contributor Author

ShourieG commented Jun 5, 2023

@andrewkroh wrt the questions you had -

  1. If we were designing a decoder interface that we wanted to be able to apply other input sources, what would that API be? Imagine we wanted to be able to decode parquet files in GCS or simply from disk (filestream).

    • I think if we were to have a generic decoder interface applicable for all inputs, then I would have the decoder API accept a decoder specific config and a io.Reader stream for initialisation. For the decode func, it would ideally return a data output channel and an error channel. This would be done so that any input specific logic is kept out of the decoder and the input can easily listen on the channels and extract the data along with any errors.
  2. Should it allow decoder chaining? Why or why not?

    • I feel chaining would not be a necessity and the decoder config should be extensive enough to define the exact behaviour. This also helps keep the api simple to use.
  3. What's relationship (if any) between parser and decoders?

    • If it's a generic decoder then it should not be concerned with parsing as that is the job of the input. Hence the output is channeled to keep any parsing specific logic out of the decoder itself.

For the current implementation, since its a decoder that is specific to the input, these design decisions were not made.

@ShourieG ShourieG removed the blocked label Jun 13, 2023
@ShourieG
Copy link
Contributor Author

ShourieG commented Jun 13, 2023

@andrewkroh I've refactored the decoder to be more generic, updated the tests and docs and added necessary comments. For now I've kept the implementation simple and very basic, suited for the current use case but this can be easily extended in future. I've not added a no-op decoder here since I need the nil value returned to branch on the legacy path. I've tried to make as little modifications as possible to the legacy logic so that complications are avoided.

@ShourieG
Copy link
Contributor Author

@andrewkroh could you review the current implementation while we wait for the CI pipeline to be fixed, so that we can merge before FF, that would be really great as Crest will be taking up the security lake integration after this merge.

I opened a public issue: apache/arrow#36052 for the cross build errors on 32bit systems and they have been resolved with a recent PR, but still not updating the library since there is no stable release out with these fixes.

@faec
Copy link
Contributor

faec commented Jun 15, 2023

Build fix is in review: #35789

x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc Outdated Show resolved Hide resolved
@narph narph requested a review from andrewkroh June 20, 2023 08:17
@yago82
Copy link

yago82 commented Dec 14, 2023

Hello everyone,

I'm facing an issue while trying to retrieve a Parquet file from S3 using Filebeat. Below, I've included configuration details:

filebeat.inputs:
- type: aws-s3
  bucket_arn: ${BUCKET_ARN}
  bucket_list_prefix: ${BUCKET_LIST_PREFIX}
  bucket_list_interval: 60s
  region: eu-west-1
  default_region: eu-west-1
  number_of_workers: 5
  access_key_id: ${ACCESS_KEY_ID}
  secret_access_key: ${SECRET_ACCESS_KEY}
  decoding.codec.parquet.enabled: true
  decoding.codec.parquet.process_parallel: true
  decoding.codec.parquet.batch_size: 1000

setup.template.enabled: false

processors:
  - add_fields:
      target: '@metadata'
      fields:
        op_type: "index"

output.elasticsearch:
  hosts: ["${ELASTICSEARCH_HOSTS}"]
  username: ${ELASTICSEARCH_USERNAME}
  password: ${ELASTICSEARCH_PASSWORD}
  protocol: https
  index: utenti123
  allow_older_versions: true

I have tried various bucket_list_prefix solutions including:

emr-serverless/user-output/
emr-serverless/user--output//
emr-serverless/user-output/*
emr-serverless/user-output/*/

However, we consistently encounter the following error:

failed processing S3 event for object key "emr-serverless/user-output/" in bucket "root-content": failed to create parquet decoder: failed to create parquet reader: parquet: file too small (size=0)

Any insights or suggestions on troubleshooting steps would be highly appreciated. Please let me know if additional information is needed.

Thank you

@andrewkroh
Copy link
Member

Trying using file_selectors to selectively apply the parquet decoding to specific files.

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-aws-s3.html#_file_selectors

@kdHub
Copy link

kdHub commented Oct 4, 2024

Thanks for the suggestion on file_selectors. I was using them and did not realize my issue until seeing this last comment and moving the decoding block into file_selector section solved my problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Filebeat S3 Input] Add support for Apache Parquet files
8 participants