Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(decoding): Implement chunked GELF decoding #20859

Open
wants to merge 60 commits into
base: master
Choose a base branch
from

Conversation

jorgehermo9
Copy link
Contributor

@jorgehermo9 jorgehermo9 commented Jul 15, 2024

Closes #20769. This PR is kind of large(900 lines of code, 600 are from generated docs), if your prefer to chat via discord (and in order to be more agile while merging this), I'm in the vector community server, username @jorgehermo9.

Implementation is based on Graylog's documentation and Graylog's go-gelf library

In my local environment some tests are failing. Could you please trigger the CI so I can see if it is a problem of my environment and if not, I can proceed to fix them?

@jorgehermo9 jorgehermo9 requested a review from a team as a code owner July 15, 2024 09:09
@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Jul 15, 2024
Cargo.toml Outdated
@@ -139,6 +139,7 @@ serde_json = { version = "1.0.120", default-features = false, features = ["raw_v
serde = { version = "1.0.204", default-features = false, features = ["alloc", "derive", "rc"] }
toml = { version = "0.8.14", default-features = false, features = ["display", "parse"] }
vrl = { version = "0.16.1", features = ["arbitrary", "cli", "test", "test_framework"] }
tokio = { version = "1.38.0", default-features = false, features = ["full"] }
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to use tokio inside the lib/codecs crate, in order to implement gelf decoding timeouts with tokio tasks, so I added the dependency as a workspace one. If there is any problem with this, we may find another solution

[const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize];
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000;
// TODO: ask what would be an appropriate default value for this
const DEFAULT_PENDING_MESSAGES_LIMIT: usize = 1000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what value is appropriate here. Do you have any recommendation?

This limit was enforced so we have a memory-bounded decoder.

The maximum UDP packet size is 65536 bytes... so with this limit, I think we have roughly 65MB of memory limit for pending messages storage.

However, the framing is agnostic of the transport protocol, so maybe other protocols does not have that per-message size limit and thus this can be "theoretically unbounded" (for example, reading raw bytes from a file).

Should we enforce too a per-message limit such as

?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping the reference implementation would serve as prior art here, but https://github.com/Graylog2/graylog2-server/blob/3c7f9df250f7d58d99e9c554d9307dc1eec9fdac/graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java seems like they have no pending message limit, just the timeout of 5 seconds as you have. I think I'd suggest having this as an option for people that do want to bound the memory, but default to unlimited to match Graylog server behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in 85edb00. Feel free to resolve this thread if the change is what you expected

pub timeout_millis: u64,

/// The maximum number of pending uncomplete messages. If this limit is reached, the decoder will start
/// dropping chunks of new messages. This limit ensures the memory usage of the decoder's state is bounded.
Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bound is per total messages, but there is no per-message memory usage limit. We can theoretically have a 100GB single message and it won't be limited by this setting.

As stated before, should we include a per-message limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of having a configurable bound on the number of pending messages.

The chunked encoding is only used for UDP, yes? Shouldn't that provide a defacto bound on size? How can we have a 100 GB message?

Copy link
Contributor Author

@jorgehermo9 jorgehermo9 Jul 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunked encoding is only used for UDP, yes?

Yes, it is intended to use it only for UDP and therefore it would be limited by the UDP packets limit of 65KB.
Nevertheless, as the chunked_gelf is a framing method, nothing blocks user to use that method with other types of sources, for example, with file sources and explictly stating the config framing.method="chunked-gelf". Although, it really does not have sense to use that framing method outside of UDP socket sources, and no one will use that in real environments... So maybe it is ok to leave this as it is.

Copy link
Member

@jszwedko jszwedko Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a max_length option. This would be consistent with other framers: https://vector.dev/docs/reference/configuration/sources/socket/#framing.newline_delimited.max_length

In chunked_gelf's case, I think we'd want to limit the length of the accumulated chunks in addition to each individual chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the nature of gelf's messages, as they are just json, I don't think it would be fine to just truncate the input, as the json message would be most likely broken and the GELF deserialization would fail in nearly all cases after truncating.

Should we instead discard the whole message (including previously stored cunks) if an individual chunk reaches its defined limit or the accumulated chunks limit is reached? I don't know if its worth to do this, but I'm open to implement it if you see that it would be worth

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Sep 3, 2024

CI failed because component docs were not updated. Fixed that, sorry.

@jszwedko
Copy link
Member

jszwedko commented Sep 3, 2024

Apologies that I still haven't gotten this meaty one. It's on my list!

@jorgehermo9
Copy link
Contributor Author

Dont apologize! I completely understand it. No hurries, really.

Thanks!

Copy link
Member

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the very long delay in review here! I appreciate you contributing this useful feature.

I'll give the code a read too, to leave some feedback inline, but I had trouble getting this working with fluent-bit which is able to write chunked GELF. What did you use for testing? I seem to be running into an error in deserializing:

2024-10-10T21:21:45.017503Z ERROR source{component_kind="source" component_id=source0 component_type=socket}: vector::internal_events::codecs: Failed deserializing frame. error=expected value at line 1 column 1 error_code="decoder_deserialize" error_type="parser_failed" stage="processing" internal_log_rate_limit=true

This is what I tried:

fluent.conf

[INPUT]
    Name                    tail
    Tag                     log.*
    Path                    /tmp/logs/*.log
    DB                      /tmp/fluent/flb_kube.db
    Mem_Buf_Limit           5MB
    Refresh_Interval        10
    Parser                  json

[OUTPUT]
    Name                    gelf
    Match                   log.*
    Host                    127.0.0.1
    Port                    15514
    Mode                    udp
    Gelf_Short_Message_Key  message
    Compress                false
    Packet_Size             5

[OUTPUT]
    Name                    stdout
    Match                   log.*

Note that I configured the packet size to 5.

parsers.conf

[PARSER]
    Name        json
    Format      json
    Time_Key    timestamp
    Time_Format %Y-%m-%dT%H:%M:%S %z

Running fluent-bit:

fluent-bit -c /tmp/fluent.conf -R /tmp/parsers.conf

I then used this Vector config:

sources:
  source0:
    address: 0.0.0.0:15514
    mode: udp
    type: socket
    decoding:
      codec: gelf
    framing:
      method: chunked_gelf
      chunked_gelf: {} # note that this is required, that might also be a bug
sinks:
  sink0:
    type: console
    inputs:
    - source0
    encoding:
      codec: json

Then writing test data:

echo '{"host": "localhost", "level": "debug", "message": "1", "time": "2006-07-28T13:22:04Z"}' >> 14.log

When using a larger packet_size, where the messages aren't chunked, is parsed correctly, but it doesn't seem to be handling the case where the messages are chunked. I'm curious if you can reproduce.

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Oct 10, 2024

Hi @jszwedko, dont worry for the delay! I appreciate the review a lot

For testing, I used a custom python script that generates the packets, as I didn't found any other way to do it. Thanks for pointing out that fluent-bit config, I will debug what can be happening with that.

The script I used was something like this, please ignore the compression things, as I used it to test the compressed gelf chunks (which are not implemented in this PR and I will submit later).

import os
import socket
import random
import json
import zlib
import gzip

PORT = os.environ.get('PORT', 15514)
HOST = os.environ.get('HOST', '127.0.0.1')
ADDRESS = (HOST, PORT)
MAX_SIZE = 3000
SOCKET = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

def generate_gelf_chunk(message_id, message, seq_num, total_seq):
    return b''.join([
        b'\x1e\x0f',
        message_id,
        seq_num.to_bytes(1, 'big'),
        total_seq.to_bytes(1, 'big'),
        message
    ])

def generate_gelf_chunks(gelf_payload):
    message_id = os.urandom(8)
    chunks = [gelf_payload[i:i+MAX_SIZE] for i in range(0, len(gelf_payload), MAX_SIZE)]
    total_seq = len(chunks)
    chunks = [generate_gelf_chunk(message_id, chunk, i, total_seq) for i, chunk in enumerate(chunks)]
    # Randomly shuffle the chunks
    random.shuffle(chunks)
    return chunks

def generate_gelf_payload(message):
    return json.dumps({
      "version": "1.1",
      "host": "example.org",
      "short_message": message,
      "full_message": "There is no full message",
      "timestamp": 1385053862.3072,
      "level": 1,
      "_user_id": 9001,
      "_some_info": "foo",
      "_some_env_var": "bar"
    })


def generate_compressed_gelf_payload(message,compression):
    payload = generate_gelf_payload(message)
    if compression == "zlib":
        return zlib.compress(payload.encode())
    elif compression == "gzip":
        return gzip.compress(payload.encode())
    elif compression == None:
        return payload.encode()
    else:
        raise ValueError("Invalid compression type")


def send_message(message,compression):
    gelf_payload = generate_compressed_gelf_payload(message,compression)
    chunks = generate_gelf_chunks(gelf_payload)
    for chunk in chunks:
        SOCKET.sendto(chunk, ADDRESS)

    print(f"Sent {len(chunks)} chunks")

def main():
    small_message = "This is a small message" *100
    large_message = "This is a large message" * 500
    very_large_message = "This is a very large message" * 10000

    send_message(small_message,None)
    send_message(small_message,None)
    send_message(large_message,None)
    # send_message(very_large_message,none)
    #
    # zlib_message = "This is a zlib message" * 100
    # gzip_message = "This is a gzip message" * 100

    # send_message(zlib_message,"zlib")
    # send_message(gzip_message,"gzip")

if __name__ == '__main__':
    main()

Executing this, with the vector config you depicted, shows this to me:

image

Copy link
Member

@jszwedko jszwedko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, left some comments inline too 😄 Apologies again for the long delay in review on this one.

[const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize];
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000;
// TODO: ask what would be an appropriate default value for this
const DEFAULT_PENDING_MESSAGES_LIMIT: usize = 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping the reference implementation would serve as prior art here, but https://github.com/Graylog2/graylog2-server/blob/3c7f9df250f7d58d99e9c554d9307dc1eec9fdac/graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java seems like they have no pending message limit, just the timeout of 5 seconds as you have. I think I'd suggest having this as an option for people that do want to bound the memory, but default to unlimited to match Graylog server behavior.

/// decoder drops all the received chunks of the incomplete message and starts over.
#[serde(
default = "default_timeout_secs",
skip_serializing_if = "vector_core::serde::is_default"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think it will work. It seems to depend on the default for the type, rather than the deserialization default:

/// Answers "Is this value in it's default state?" which can be used to skip serializing the value.
#[inline]
pub fn is_default<E: Default + PartialEq>(e: &E) -> bool {
e == &E::default()
}

That's a gotcha.

pub timeout_millis: u64,

/// The maximum number of pending uncomplete messages. If this limit is reached, the decoder will start
/// dropping chunks of new messages. This limit ensures the memory usage of the decoder's state is bounded.
Copy link
Member

@jszwedko jszwedko Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a max_length option. This would be consistent with other framers: https://vector.dev/docs/reference/configuration/sources/socket/#framing.newline_delimited.max_length

In chunked_gelf's case, I think we'd want to limit the length of the accumulated chunks in addition to each individual chunk.

lib/codecs/src/decoding/framing/chunked_gelf.rs Outdated Show resolved Hide resolved
lib/codecs/src/decoding/framing/chunked_gelf.rs Outdated Show resolved Hide resolved
lib/codecs/src/decoding/mod.rs Show resolved Hide resolved
src/sources/socket/mod.rs Show resolved Hide resolved
// TODO: maybe this should be in an integration test, such as `src/sources/redis/mod.rs` and `scripts/integration/redis`
// and so? There currently are no integration tests for the socket source
#[tokio::test]
async fn udp_decodes_chunked_gelf_messages() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see what you are getting at. It would be nice to have integration tests, but you are correct that we don't have any for socket yet. Another place we could put them is in tests/integration. Those are Rust-based "integration tests" that are more black box.

src/sources/socket/mod.rs Show resolved Hide resolved
@jorgehermo9
Copy link
Contributor Author

  chunked_gelf: {} # note that this is required, that might also be a bug

Fixed in 72414d5

The failing CI fixed in 6c66ff0


I tried the fluentbit config that you depicted, but it seems that fluentbit is not encoding the strings as I would expect.

I tested this source with a Graylog Server output stream and it worked too. In the script I used, I think I was complying with the GELF spec, and also in the unit tests added in this PR. The fluentbit behaviour is very weird and I have to spend more time debugging it 🤔

Thank you very much @jszwedko for the review and do not worry for the delay!

I will address the comments as soon as I can 😃

@jorgehermo9
Copy link
Contributor Author

jorgehermo9 commented Oct 10, 2024

Debugging a bit more about this... It seems that the first bytes sent from fluentbit after the gelf header
image
Are 0x1f 0x8b 0x08... Which casually are the gzip headers bytes... (see for example https://stackoverflow.com/a/58552729), so it seems that fluentbit is sending the compressed bytes

I used the same fluentbit config as yours... I think that fluentbit is not honoring the Compress false option...

If I use the code from my compressed gelf branch (still not ready for submitting a PR) jorgehermo9#2

It gets parsed successfully!
image

So fluentbit is definitively compressing the payload as gzip regardless of the Compress config...
image
https://docs.fluentbit.io/manual/pipeline/outputs/gelf

@jszwedko
Copy link
Member

Aha! You are right, it is compressing even though Compress is set to false:

https://github.com/fluent/fluent-bit/blob/d94857f43ec2ff2741a0bdf7acd65a796c9132b5/plugins/out_gelf/gelf.c#L209-L226

It seems to compress regardless if the message size is greater than the packet size. I'll open an issue on fluent-bit since I don't see another one.

Good validation for your other PR adding compression support 😄

@jszwedko
Copy link
Member

fluent-bit issue report: fluent/fluent-bit#9482

@jorgehermo9
Copy link
Contributor Author

I absolutely did not expect that my implementation would raise an issue at fluentbit 😄

I think that the few hours reading a barely documented spec and navigating through various kind of cursed implementations really paid off 😆

Thanks for submitting the issue!

It seems that as they have the message_size > packet_size condition, when there are chunked packets, they will be always compressed. The only situation where they are not compressed is with unchunked messages

nice catch!

@jorgehermo9 jorgehermo9 requested a review from a team as a code owner October 18, 2024 15:32
&mut self,
mut chunk: Bytes,
) -> Result<Option<Bytes>, ChunkedGelfDecoderError> {
// Encoding scheme:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be useful to record a metric for tracking the number of current pending messages. How can we approach this? I think this is usually done at Vector's binary level and not in inner libs... as I see a lot of metrics located at src/internal_events.

Also, it would be useful to record the number of timed out messages

}

#[tokio::test]
async fn decode_shuffled_messages() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

included this new test in 2ab9bf3

@jorgehermo9
Copy link
Contributor Author

Hi @jszwedko, thank you very much for the review, addressed all the comments and I'm ready for another review round.

Things left:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sources Anything related to the Vector's sources
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Chunked GELF Decoding
3 participants