-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(decoding): Implement chunked GELF decoding #20859
base: master
Are you sure you want to change the base?
feat(decoding): Implement chunked GELF decoding #20859
Conversation
Cargo.toml
Outdated
@@ -139,6 +139,7 @@ serde_json = { version = "1.0.120", default-features = false, features = ["raw_v | |||
serde = { version = "1.0.204", default-features = false, features = ["alloc", "derive", "rc"] } | |||
toml = { version = "0.8.14", default-features = false, features = ["display", "parse"] } | |||
vrl = { version = "0.16.1", features = ["arbitrary", "cli", "test", "test_framework"] } | |||
tokio = { version = "1.38.0", default-features = false, features = ["full"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to use tokio inside the lib/codecs
crate, in order to implement gelf decoding timeouts with tokio tasks, so I added the dependency as a workspace one. If there is any problem with this, we may find another solution
[const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize]; | ||
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000; | ||
// TODO: ask what would be an appropriate default value for this | ||
const DEFAULT_PENDING_MESSAGES_LIMIT: usize = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what value is appropriate here. Do you have any recommendation?
This limit was enforced so we have a memory-bounded decoder.
The maximum UDP packet size is 65536 bytes... so with this limit, I think we have roughly 65MB of memory limit for pending messages storage.
However, the framing is agnostic of the transport protocol, so maybe other protocols does not have that per-message size limit and thus this can be "theoretically unbounded" (for example, reading raw bytes from a file).
Should we enforce too a per-message limit such as
pub max_length: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping the reference implementation would serve as prior art here, but https://github.com/Graylog2/graylog2-server/blob/3c7f9df250f7d58d99e9c554d9307dc1eec9fdac/graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java seems like they have no pending message limit, just the timeout of 5 seconds as you have. I think I'd suggest having this as an option for people that do want to bound the memory, but default to unlimited to match Graylog server behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this in 85edb00. Feel free to resolve this thread if the change is what you expected
pub timeout_millis: u64, | ||
|
||
/// The maximum number of pending uncomplete messages. If this limit is reached, the decoder will start | ||
/// dropping chunks of new messages. This limit ensures the memory usage of the decoder's state is bounded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bound is per total messages, but there is no per-message memory usage limit. We can theoretically have a 100GB single message and it won't be limited by this setting.
As stated before, should we include a per-message limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of having a configurable bound on the number of pending messages.
The chunked encoding is only used for UDP, yes? Shouldn't that provide a defacto bound on size? How can we have a 100 GB message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunked encoding is only used for UDP, yes?
Yes, it is intended to use it only for UDP and therefore it would be limited by the UDP packets limit of 65KB.
Nevertheless, as the chunked_gelf is a framing method, nothing blocks user to use that method with other types of sources, for example, with file
sources and explictly stating the config framing.method="chunked-gelf"
. Although, it really does not have sense to use that framing method outside of UDP socket sources, and no one will use that in real environments... So maybe it is ok to leave this as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a max_length
option. This would be consistent with other framers: https://vector.dev/docs/reference/configuration/sources/socket/#framing.newline_delimited.max_length
In chunked_gelf
's case, I think we'd want to limit the length of the accumulated chunks in addition to each individual chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the nature of gelf's messages, as they are just json, I don't think it would be fine to just truncate the input, as the json message would be most likely broken and the GELF deserialization would fail in nearly all cases after truncating.
Should we instead discard the whole message (including previously stored cunks) if an individual chunk reaches its defined limit or the accumulated chunks limit is reached? I don't know if its worth to do this, but I'm open to implement it if you see that it would be worth
CI failed because component docs were not updated. Fixed that, sorry. |
Apologies that I still haven't gotten this meaty one. It's on my list! |
Dont apologize! I completely understand it. No hurries, really. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the very long delay in review here! I appreciate you contributing this useful feature.
I'll give the code a read too, to leave some feedback inline, but I had trouble getting this working with fluent-bit which is able to write chunked GELF. What did you use for testing? I seem to be running into an error in deserializing:
2024-10-10T21:21:45.017503Z ERROR source{component_kind="source" component_id=source0 component_type=socket}: vector::internal_events::codecs: Failed deserializing frame. error=expected value at line 1 column 1 error_code="decoder_deserialize" error_type="parser_failed" stage="processing" internal_log_rate_limit=true
This is what I tried:
fluent.conf
[INPUT]
Name tail
Tag log.*
Path /tmp/logs/*.log
DB /tmp/fluent/flb_kube.db
Mem_Buf_Limit 5MB
Refresh_Interval 10
Parser json
[OUTPUT]
Name gelf
Match log.*
Host 127.0.0.1
Port 15514
Mode udp
Gelf_Short_Message_Key message
Compress false
Packet_Size 5
[OUTPUT]
Name stdout
Match log.*
Note that I configured the packet size to 5.
parsers.conf
[PARSER]
Name json
Format json
Time_Key timestamp
Time_Format %Y-%m-%dT%H:%M:%S %z
Running fluent-bit:
fluent-bit -c /tmp/fluent.conf -R /tmp/parsers.conf
I then used this Vector config:
sources:
source0:
address: 0.0.0.0:15514
mode: udp
type: socket
decoding:
codec: gelf
framing:
method: chunked_gelf
chunked_gelf: {} # note that this is required, that might also be a bug
sinks:
sink0:
type: console
inputs:
- source0
encoding:
codec: json
Then writing test data:
echo '{"host": "localhost", "level": "debug", "message": "1", "time": "2006-07-28T13:22:04Z"}' >> 14.log
When using a larger packet_size
, where the messages aren't chunked, is parsed correctly, but it doesn't seem to be handling the case where the messages are chunked. I'm curious if you can reproduce.
Hi @jszwedko, dont worry for the delay! I appreciate the review a lot For testing, I used a custom python script that generates the packets, as I didn't found any other way to do it. Thanks for pointing out that fluent-bit config, I will debug what can be happening with that. The script I used was something like this, please ignore the compression things, as I used it to test the compressed gelf chunks (which are not implemented in this PR and I will submit later). import os
import socket
import random
import json
import zlib
import gzip
PORT = os.environ.get('PORT', 15514)
HOST = os.environ.get('HOST', '127.0.0.1')
ADDRESS = (HOST, PORT)
MAX_SIZE = 3000
SOCKET = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def generate_gelf_chunk(message_id, message, seq_num, total_seq):
return b''.join([
b'\x1e\x0f',
message_id,
seq_num.to_bytes(1, 'big'),
total_seq.to_bytes(1, 'big'),
message
])
def generate_gelf_chunks(gelf_payload):
message_id = os.urandom(8)
chunks = [gelf_payload[i:i+MAX_SIZE] for i in range(0, len(gelf_payload), MAX_SIZE)]
total_seq = len(chunks)
chunks = [generate_gelf_chunk(message_id, chunk, i, total_seq) for i, chunk in enumerate(chunks)]
# Randomly shuffle the chunks
random.shuffle(chunks)
return chunks
def generate_gelf_payload(message):
return json.dumps({
"version": "1.1",
"host": "example.org",
"short_message": message,
"full_message": "There is no full message",
"timestamp": 1385053862.3072,
"level": 1,
"_user_id": 9001,
"_some_info": "foo",
"_some_env_var": "bar"
})
def generate_compressed_gelf_payload(message,compression):
payload = generate_gelf_payload(message)
if compression == "zlib":
return zlib.compress(payload.encode())
elif compression == "gzip":
return gzip.compress(payload.encode())
elif compression == None:
return payload.encode()
else:
raise ValueError("Invalid compression type")
def send_message(message,compression):
gelf_payload = generate_compressed_gelf_payload(message,compression)
chunks = generate_gelf_chunks(gelf_payload)
for chunk in chunks:
SOCKET.sendto(chunk, ADDRESS)
print(f"Sent {len(chunks)} chunks")
def main():
small_message = "This is a small message" *100
large_message = "This is a large message" * 500
very_large_message = "This is a very large message" * 10000
send_message(small_message,None)
send_message(small_message,None)
send_message(large_message,None)
# send_message(very_large_message,none)
#
# zlib_message = "This is a zlib message" * 100
# gzip_message = "This is a gzip message" * 100
# send_message(zlib_message,"zlib")
# send_message(gzip_message,"gzip")
if __name__ == '__main__':
main() Executing this, with the vector config you depicted, shows this to me: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, left some comments inline too 😄 Apologies again for the long delay in review on this one.
[const { Bytes::new() }; GELF_MAX_TOTAL_CHUNKS as usize]; | ||
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000; | ||
// TODO: ask what would be an appropriate default value for this | ||
const DEFAULT_PENDING_MESSAGES_LIMIT: usize = 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping the reference implementation would serve as prior art here, but https://github.com/Graylog2/graylog2-server/blob/3c7f9df250f7d58d99e9c554d9307dc1eec9fdac/graylog2-server/src/main/java/org/graylog2/inputs/codecs/GelfChunkAggregator.java seems like they have no pending message limit, just the timeout of 5 seconds as you have. I think I'd suggest having this as an option for people that do want to bound the memory, but default to unlimited to match Graylog server behavior.
/// decoder drops all the received chunks of the incomplete message and starts over. | ||
#[serde( | ||
default = "default_timeout_secs", | ||
skip_serializing_if = "vector_core::serde::is_default" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think it will work. It seems to depend on the default for the type, rather than the deserialization default:
vector/lib/vector-core/src/serde.rs
Lines 5 to 9 in 91d0fab
/// Answers "Is this value in it's default state?" which can be used to skip serializing the value. | |
#[inline] | |
pub fn is_default<E: Default + PartialEq>(e: &E) -> bool { | |
e == &E::default() | |
} |
That's a gotcha.
pub timeout_millis: u64, | ||
|
||
/// The maximum number of pending uncomplete messages. If this limit is reached, the decoder will start | ||
/// dropping chunks of new messages. This limit ensures the memory usage of the decoder's state is bounded. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a max_length
option. This would be consistent with other framers: https://vector.dev/docs/reference/configuration/sources/socket/#framing.newline_delimited.max_length
In chunked_gelf
's case, I think we'd want to limit the length of the accumulated chunks in addition to each individual chunk.
// TODO: maybe this should be in an integration test, such as `src/sources/redis/mod.rs` and `scripts/integration/redis` | ||
// and so? There currently are no integration tests for the socket source | ||
#[tokio::test] | ||
async fn udp_decodes_chunked_gelf_messages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I see what you are getting at. It would be nice to have integration tests, but you are correct that we don't have any for socket
yet. Another place we could put them is in tests/integration
. Those are Rust-based "integration tests" that are more black box.
Fixed in 72414d5 The failing CI fixed in 6c66ff0 I tried the fluentbit config that you depicted, but it seems that fluentbit is not encoding the strings as I would expect. I tested this source with a Graylog Server output stream and it worked too. In the script I used, I think I was complying with the GELF spec, and also in the unit tests added in this PR. The fluentbit behaviour is very weird and I have to spend more time debugging it 🤔 Thank you very much @jszwedko for the review and do not worry for the delay! I will address the comments as soon as I can 😃 |
Debugging a bit more about this... It seems that the first bytes sent from fluentbit after the gelf header I used the same fluentbit config as yours... I think that fluentbit is not honoring the If I use the code from my compressed gelf branch (still not ready for submitting a PR) jorgehermo9#2 So fluentbit is definitively compressing the payload as gzip regardless of the Compress config... |
Aha! You are right, it is compressing even though It seems to compress regardless if the message size is greater than the packet size. I'll open an issue on fluent-bit since I don't see another one. Good validation for your other PR adding compression support 😄 |
fluent-bit issue report: fluent/fluent-bit#9482 |
I absolutely did not expect that my implementation would raise an issue at fluentbit 😄 I think that the few hours reading a barely documented spec and navigating through various kind of cursed implementations really paid off 😆 Thanks for submitting the issue! It seems that as they have the nice catch! |
&mut self, | ||
mut chunk: Bytes, | ||
) -> Result<Option<Bytes>, ChunkedGelfDecoderError> { | ||
// Encoding scheme: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be useful to record a metric for tracking the number of current pending messages. How can we approach this? I think this is usually done at Vector's binary level and not in inner libs... as I see a lot of metrics located at src/internal_events
.
Also, it would be useful to record the number of timed out messages
Co-authored-by: Jesse Szwedko <jesse@szwedko.me>
} | ||
|
||
#[tokio::test] | ||
async fn decode_shuffled_messages() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
included this new test in 2ab9bf3
Hi @jszwedko, thank you very much for the review, addressed all the comments and I'm ready for another review round. Things left: |
Closes #20769. This PR is kind of large(900 lines of code, 600 are from generated docs), if your prefer to chat via discord (and in order to be more agile while merging this), I'm in the vector community server, username
@jorgehermo9
.Implementation is based on Graylog's documentation and Graylog's go-gelf library
In my local environment some tests are failing. Could you please trigger the CI so I can see if it is a problem of my environment and if not, I can proceed to fix them?